mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-01-19 00:26:30 +00:00
add LDAP direct binding and GROUP_SECURITY
This commit is contained in:
parent
501c5292ab
commit
28c7a195e8
@ -1,5 +1,6 @@
|
|||||||
import os
|
import os
|
||||||
import ldap
|
import ldap
|
||||||
|
import ldap.filter
|
||||||
import time
|
import time
|
||||||
import base64
|
import base64
|
||||||
import bcrypt
|
import bcrypt
|
||||||
@ -22,12 +23,24 @@ logging = logger('MODEL', app.config['LOG_LEVEL'], app.config['LOG_FILE']).confi
|
|||||||
|
|
||||||
if 'LDAP_TYPE' in app.config.keys():
|
if 'LDAP_TYPE' in app.config.keys():
|
||||||
LDAP_URI = app.config['LDAP_URI']
|
LDAP_URI = app.config['LDAP_URI']
|
||||||
|
|
||||||
|
if 'LDAP_USERNAME' in app.config.keys() and 'LDAP_PASSWORD' in app.config.keys(): #backward compatability
|
||||||
|
LDAP_BIND_TYPE = 'search'
|
||||||
|
if 'LDAP_BIND_TYPE' in app.config.keys():
|
||||||
|
LDAP_BIND_TYPE = app.config['LDAP_BIND_TYPE']
|
||||||
|
if LDAP_BIND_TYPE == 'search':
|
||||||
LDAP_USERNAME = app.config['LDAP_USERNAME']
|
LDAP_USERNAME = app.config['LDAP_USERNAME']
|
||||||
LDAP_PASSWORD = app.config['LDAP_PASSWORD']
|
LDAP_PASSWORD = app.config['LDAP_PASSWORD']
|
||||||
|
|
||||||
LDAP_SEARCH_BASE = app.config['LDAP_SEARCH_BASE']
|
LDAP_SEARCH_BASE = app.config['LDAP_SEARCH_BASE']
|
||||||
LDAP_TYPE = app.config['LDAP_TYPE']
|
LDAP_TYPE = app.config['LDAP_TYPE']
|
||||||
LDAP_FILTER = app.config['LDAP_FILTER']
|
LDAP_FILTER = app.config['LDAP_FILTER']
|
||||||
LDAP_USERNAMEFIELD = app.config['LDAP_USERNAMEFIELD']
|
LDAP_USERNAMEFIELD = app.config['LDAP_USERNAMEFIELD']
|
||||||
|
|
||||||
|
LDAP_GROUP_SECURITY = app.config['LDAP_GROUP_SECURITY']
|
||||||
|
if app.config['LDAP_GROUP_SECURITY'] == True:
|
||||||
|
LDAP_ADMIN_GROUP = app.config['LDAP_ADMIN_GROUP']
|
||||||
|
LDAP_USER_GROUP = app.config['LDAP_USER_GROUP']
|
||||||
else:
|
else:
|
||||||
LDAP_TYPE = False
|
LDAP_TYPE = False
|
||||||
|
|
||||||
@ -141,15 +154,18 @@ class User(db.Model):
|
|||||||
try:
|
try:
|
||||||
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
|
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
|
||||||
l = ldap.initialize(LDAP_URI)
|
l = ldap.initialize(LDAP_URI)
|
||||||
l.set_option(ldap.OPT_REFERRALS, 0)
|
l.set_option(ldap.OPT_REFERRALS, ldap.OPT_OFF)
|
||||||
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
||||||
l.set_option(ldap.OPT_X_TLS,ldap.OPT_X_TLS_DEMAND)
|
l.set_option(ldap.OPT_X_TLS,ldap.OPT_X_TLS_DEMAND)
|
||||||
l.set_option( ldap.OPT_X_TLS_DEMAND, True )
|
l.set_option( ldap.OPT_X_TLS_DEMAND, True )
|
||||||
l.set_option( ldap.OPT_DEBUG_LEVEL, 255 )
|
l.set_option( ldap.OPT_DEBUG_LEVEL, 255 )
|
||||||
l.protocol_version = ldap.VERSION3
|
l.protocol_version = ldap.VERSION3
|
||||||
|
if LDAP_BIND_TYPE == "direct":
|
||||||
|
global LDAP_USERNAME; LDAP_USERNAME = self.username
|
||||||
|
global LDAP_PASSWORD; LDAP_PASSWORD = self.password
|
||||||
|
|
||||||
#l.simple_bind_s(LDAP_USERNAME, LDAP_PASSWORD)
|
|
||||||
l.simple_bind_s(self.username,self.password)
|
l.simple_bind_s(LDAP_USERNAME, LDAP_PASSWORD)
|
||||||
ldap_result_id = l.search(baseDN, searchScope, searchFilter, retrieveAttributes)
|
ldap_result_id = l.search(baseDN, searchScope, searchFilter, retrieveAttributes)
|
||||||
result_set = []
|
result_set = []
|
||||||
while 1:
|
while 1:
|
||||||
@ -183,57 +199,78 @@ class User(db.Model):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
if method == 'LDAP':
|
if method == 'LDAP':
|
||||||
|
allowedlogin = False
|
||||||
|
isadmin = False
|
||||||
if not LDAP_TYPE:
|
if not LDAP_TYPE:
|
||||||
logging.error('LDAP authentication is disabled')
|
logging.error('LDAP authentication is disabled')
|
||||||
return False
|
return False
|
||||||
|
|
||||||
searchFilter = "(&(objectcategory=person)(samaccountname=%s))" % self.username
|
#searchFilter = "(&(objectcategory=person)(samaccountname=%s))" % self.username
|
||||||
if LDAP_TYPE == 'ldap':
|
if LDAP_TYPE == 'ldap':
|
||||||
searchFilter = "(&(%s=%s)%s)" % (LDAP_USERNAMEFIELD, self.username, LDAP_FILTER)
|
searchFilter = "(&(%s=%s)%s)" % (LDAP_USERNAMEFIELD, self.username, LDAP_FILTER)
|
||||||
logging.info('Ldap searchFilter "%s"' % searchFilter)
|
logging.info('Ldap searchFilter "%s"' % searchFilter)
|
||||||
|
|
||||||
result = self.ldap_search(searchFilter, LDAP_SEARCH_BASE)
|
result = self.ldap_search(searchFilter, LDAP_SEARCH_BASE)
|
||||||
if not result:
|
if not result:
|
||||||
logging.warning('User "%s" does not exist' % self.username)
|
logging.warning('LDAP User "%s" does not exist' % self.username)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
|
|
||||||
l = ldap.initialize(LDAP_URI)
|
|
||||||
l.set_option(ldap.OPT_REFERRALS, 0)
|
|
||||||
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
|
||||||
l.set_option(ldap.OPT_X_TLS,ldap.OPT_X_TLS_DEMAND)
|
|
||||||
l.set_option( ldap.OPT_X_TLS_DEMAND, True )
|
|
||||||
l.set_option( ldap.OPT_DEBUG_LEVEL, 255 )
|
|
||||||
l.protocol_version = ldap.VERSION3
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
ldap_username = result[0][0][0]
|
ldap_username = ldap.filter.escape_filter_chars(result[0][0][0])
|
||||||
l.simple_bind_s(ldap_username, self.password)
|
if LDAP_GROUP_SECURITY:
|
||||||
|
try:
|
||||||
|
if LDAP_TYPE == 'ldap':
|
||||||
|
ldap_user_dn = ldap.filter.escape_filter_chars(result[0][0][0])
|
||||||
|
logging.info(result[0][0][0])
|
||||||
|
if (self.ldap_search('(member=%s)' % ldap_user_dn ,LDAP_ADMIN_GROUP)):
|
||||||
|
allowedlogin = True
|
||||||
|
isadmin = True
|
||||||
|
logging.info('User %s is part of the "%s" group that allows admin access to PowerDNS-Admin' % (self.username,LDAP_ADMIN_GROUP))
|
||||||
|
if (self.ldap_search('(member=%s)' % ldap_user_dn ,LDAP_USER_GROUP)):
|
||||||
|
#if (group == LDAP_USER_GROUP):
|
||||||
|
allowedlogin = True
|
||||||
|
logging.info('User %s is part of the "%s" group that allows user access to PowerDNS-Admin' % (self.username,LDAP_USER_GROUP))
|
||||||
|
if allowedlogin == False:
|
||||||
|
logging.error('User %s is not part of the "%s" or "%s" groups that allow access to PowerDNS-Admin' % (self.username,LDAP_ADMIN_GROUP,LDAP_USER_GROUP))
|
||||||
|
return False
|
||||||
|
except Exception, e:
|
||||||
|
logging.error('LDAP group lookup for user "%s" has failed' % e)
|
||||||
|
return False
|
||||||
logging.info('User "%s" logged in successfully' % self.username)
|
logging.info('User "%s" logged in successfully' % self.username)
|
||||||
except Exception:
|
except Exception, e:
|
||||||
logging.error('User "%s" input a wrong password' % self.username)
|
logging.error('User "%s" input a wrong LDAP password' % e)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# create user if not exist in the db
|
# create user if not exist in the db
|
||||||
if not User.query.filter(User.username == self.username).first():
|
if not User.query.filter(User.username == self.username).first():
|
||||||
|
self.firstname = self.username
|
||||||
|
self.lastname = ''
|
||||||
try:
|
try:
|
||||||
# try to get user's firstname & lastname from LDAP
|
# try to get user's firstname & lastname from LDAP
|
||||||
# this might be changed in the future
|
# this might be changed in the future
|
||||||
self.firstname = result[0][0][1]['givenName'][0]
|
self.firstname = result[0][0][1]['givenName']
|
||||||
self.lastname = result[0][0][1]['sn'][0]
|
self.lastname = result[0][0][1]['sn']
|
||||||
self.email = result[0][0][1]['mail'][0]
|
self.email = result[0][0][1]['mail']
|
||||||
except Exception:
|
except Exception, e:
|
||||||
self.firstname = self.username
|
logging.info("reading ldap data threw an exception %s" % e)
|
||||||
self.lastname = ''
|
|
||||||
|
|
||||||
# first register user will be in Administrator role
|
# first register user will be in Administrator role
|
||||||
self.role_id = Role.query.filter_by(name='User').first().id
|
self.role_id = Role.query.filter_by(name='User').first().id
|
||||||
if User.query.count() == 0:
|
if User.query.count() == 0:
|
||||||
self.role_id = Role.query.filter_by(name='Administrator').first().id
|
self.role_id = Role.query.filter_by(name='Administrator').first().id
|
||||||
|
|
||||||
|
# user will be in Administrator role if part of LDAP Admin group
|
||||||
|
if LDAP_GROUP_SECURITY:
|
||||||
|
if isadmin == True:
|
||||||
|
self.role_id = Role.query.filter_by(name='Administrator').first().id
|
||||||
|
|
||||||
self.create_user()
|
self.create_user()
|
||||||
logging.info('Created user "%s" in the DB' % self.username)
|
logging.info('Created user "%s" in the DB' % self.username)
|
||||||
|
|
||||||
|
# user already exists in database, set their admin status based on group membership (if enabled)
|
||||||
|
if LDAP_GROUP_SECURITY:
|
||||||
|
self.set_admin(isadmin)
|
||||||
|
self.update_profile()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
logging.error('Unsupported authentication method')
|
logging.error('Unsupported authentication method')
|
||||||
|
@ -44,6 +44,10 @@ LDAP_SEARCH_BASE = 'ou=System Admins,ou=People,dc=duykhanh,dc=me'
|
|||||||
# Additional options only if LDAP_TYPE=ldap
|
# Additional options only if LDAP_TYPE=ldap
|
||||||
LDAP_USERNAMEFIELD = 'uid'
|
LDAP_USERNAMEFIELD = 'uid'
|
||||||
LDAP_FILTER = '(objectClass=inetorgperson)'
|
LDAP_FILTER = '(objectClass=inetorgperson)'
|
||||||
|
# enable LDAP_GROUP_SECURITY to allow Admin and User roles based on LDAP groups
|
||||||
|
#LDAP_GROUP_SECURITY = True # True or False
|
||||||
|
#LDAP_ADMIN_GROUP = 'CN=DnsAdmins,CN=Users,DC=example,DC=me'
|
||||||
|
#LDAP_USER_GROUP = 'CN=Domain Admins,CN=Users,DC=example,DC=me'
|
||||||
|
|
||||||
## AD CONFIG
|
## AD CONFIG
|
||||||
#LDAP_TYPE = 'ad'
|
#LDAP_TYPE = 'ad'
|
||||||
|
Loading…
Reference in New Issue
Block a user