Merge pull request #328 from ngoduykhanh/fix_ldap

Adjustment in LDAP authentication
This commit is contained in:
Khanh Ngo 2018-08-09 17:35:35 +07:00 committed by GitHub
commit 85e745731b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 81 deletions

View File

@ -25,15 +25,6 @@ logging = logger.getLogger(__name__)
if 'LDAP_TYPE' in app.config.keys(): if 'LDAP_TYPE' in app.config.keys():
LDAP_URI = app.config['LDAP_URI'] LDAP_URI = app.config['LDAP_URI']
if 'LDAP_USERNAME' in app.config.keys() and 'LDAP_PASSWORD' in app.config.keys(): #backward compatability
LDAP_BIND_TYPE = 'search'
if 'LDAP_BIND_TYPE' in app.config.keys():
LDAP_BIND_TYPE = app.config['LDAP_BIND_TYPE']
if LDAP_BIND_TYPE == 'search':
LDAP_USERNAME = app.config['LDAP_USERNAME']
LDAP_PASSWORD = app.config['LDAP_PASSWORD']
LDAP_SEARCH_BASE = app.config['LDAP_SEARCH_BASE'] LDAP_SEARCH_BASE = app.config['LDAP_SEARCH_BASE']
LDAP_TYPE = app.config['LDAP_TYPE'] LDAP_TYPE = app.config['LDAP_TYPE']
LDAP_FILTER = app.config['LDAP_FILTER'] LDAP_FILTER = app.config['LDAP_FILTER']
@ -154,29 +145,29 @@ class User(db.Model):
user_info = User.query.filter(User.username == self.username).first() user_info = User.query.filter(User.username == self.username).first()
return user_info return user_info
def ldap_init_conn(self):
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
conn = ldap.initialize(LDAP_URI)
conn.set_option(ldap.OPT_REFERRALS, ldap.OPT_OFF)
conn.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
conn.set_option(ldap.OPT_X_TLS,ldap.OPT_X_TLS_DEMAND)
conn.set_option( ldap.OPT_X_TLS_DEMAND, True )
conn.set_option( ldap.OPT_DEBUG_LEVEL, 255 )
conn.protocol_version = ldap.VERSION3
return conn
def ldap_search(self, searchFilter, baseDN): def ldap_search(self, searchFilter, baseDN):
searchScope = ldap.SCOPE_SUBTREE searchScope = ldap.SCOPE_SUBTREE
retrieveAttributes = None retrieveAttributes = None
try: try:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) conn = self.ldap_init_conn()
l = ldap.initialize(LDAP_URI) conn.simple_bind_s(app.config['LDAP_ADMIN_USERNAME'], app.config['LDAP_ADMIN_PASSWORD'])
l.set_option(ldap.OPT_REFERRALS, ldap.OPT_OFF) ldap_result_id = conn.search(baseDN, searchScope, searchFilter, retrieveAttributes)
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
l.set_option(ldap.OPT_X_TLS,ldap.OPT_X_TLS_DEMAND)
l.set_option( ldap.OPT_X_TLS_DEMAND, True )
l.set_option( ldap.OPT_DEBUG_LEVEL, 255 )
l.protocol_version = ldap.VERSION3
if LDAP_BIND_TYPE == "direct":
global LDAP_USERNAME; LDAP_USERNAME = self.username
global LDAP_PASSWORD; LDAP_PASSWORD = self.password
l.simple_bind_s(LDAP_USERNAME, LDAP_PASSWORD)
ldap_result_id = l.search(baseDN, searchScope, searchFilter, retrieveAttributes)
result_set = [] result_set = []
while 1: while 1:
result_type, result_data = l.result(ldap_result_id, 0) result_type, result_data = conn.result(ldap_result_id, 0)
if (result_data == []): if (result_data == []):
break break
else: else:
@ -188,6 +179,15 @@ class User(db.Model):
logging.error(e) logging.error(e)
raise raise
def ldap_auth(self, ldap_username, password):
try:
conn = self.ldap_init_conn()
conn.simple_bind_s(ldap_username, password)
return True
except ldap.LDAPError as e:
logging.error(e)
return False
def is_validate(self, method, src_ip=''): def is_validate(self, method, src_ip=''):
""" """
Validate user credential Validate user credential
@ -206,7 +206,6 @@ class User(db.Model):
return False return False
if method == 'LDAP': if method == 'LDAP':
allowedlogin = False
isadmin = False isadmin = False
if not LDAP_TYPE: if not LDAP_TYPE:
logging.error('LDAP authentication is disabled') logging.error('LDAP authentication is disabled')
@ -214,42 +213,43 @@ class User(db.Model):
if LDAP_TYPE == 'ldap': if LDAP_TYPE == 'ldap':
searchFilter = "(&({0}={1}){2})".format(LDAP_USERNAMEFIELD, self.username, LDAP_FILTER) searchFilter = "(&({0}={1}){2})".format(LDAP_USERNAMEFIELD, self.username, LDAP_FILTER)
logging.info('Ldap searchFilter "{0}"'.format(searchFilter)) logging.debug('Ldap searchFilter "{0}"'.format(searchFilter))
elif LDAP_TYPE == 'ad': elif LDAP_TYPE == 'ad':
searchFilter = "(&(objectcategory=person)({0}={1}){2})".format(LDAP_USERNAMEFIELD, self.username, LDAP_FILTER) searchFilter = "(&(objectcategory=person)({0}={1}){2})".format(LDAP_USERNAMEFIELD, self.username, LDAP_FILTER)
result = self.ldap_search(searchFilter, LDAP_SEARCH_BASE) ldap_result = self.ldap_search(searchFilter, LDAP_SEARCH_BASE)
if not result: if not ldap_result:
logging.warning('LDAP User "{0}" does not exist. Authentication request from {1}'.format(self.username, src_ip)) logging.warning('LDAP User "{0}" does not exist. Authentication request from {1}'.format(self.username, src_ip))
return False return False
else:
try: try:
ldap_username = ldap.filter.escape_filter_chars(result[0][0][0]) ldap_username = ldap.filter.escape_filter_chars(ldap_result[0][0][0])
if LDAP_GROUP_SECURITY: # check if LDAP_SECURITY_GROUP is enabled
try: # user can be assigned to ADMIN or USER role.
if LDAP_TYPE == 'ldap': if LDAP_GROUP_SECURITY:
ldap_user_dn = ldap.filter.escape_filter_chars(result[0][0][0]) try:
logging.info(result[0][0][0]) if (self.ldap_search(searchFilter, LDAP_ADMIN_GROUP)):
if (self.ldap_search('(member={0})'.format(ldap_user_dn) ,LDAP_ADMIN_GROUP)): isadmin = True
allowedlogin = True logging.info('User {0} is part of the "{1}" group that allows admin access to PowerDNS-Admin'.format(self.username,LDAP_ADMIN_GROUP))
isadmin = True elif (self.ldap_search(searchFilter, LDAP_USER_GROUP)):
logging.info('User {0} is part of the "{1}" group that allows admin access to PowerDNS-Admin'.format(self.username,LDAP_ADMIN_GROUP)) logging.info('User {0} is part of the "{1}" group that allows user access to PowerDNS-Admin'.format(self.username,LDAP_USER_GROUP))
if (self.ldap_search('(member={0})'.format(ldap_user_dn) ,LDAP_USER_GROUP)): else:
#if (group == LDAP_USER_GROUP): logging.error('User {0} is not part of the "{1}" or "{2}" groups that allow access to PowerDNS-Admin'.format(self.username,LDAP_ADMIN_GROUP,LDAP_USER_GROUP))
allowedlogin = True return False
logging.info('User {0} is part of the "{1}" group that allows user access to PowerDNS-Admin'.format(self.username,LDAP_USER_GROUP)) except Exception as e:
if allowedlogin == False: logging.error('LDAP group lookup for user "{0}" has failed. Authentication request from {1}'.format(self.username, src_ip))
logging.error('User {0} is not part of the "{1}" or "{2}" groups that allow access to PowerDNS-Admin'.format(self.username,LDAP_ADMIN_GROUP,LDAP_USER_GROUP)) logging.debug(traceback.format_exc())
return False return False
except Exception as e:
logging.error('LDAP group lookup for user "{0}" has failed. Authentication request from {1}'.format(self.username, src_ip)) # validate ldap user password
logging.debug(e) if not self.ldap_auth(ldap_username, self.password):
logging.error('User "{0}" input a wrong LDAP password. Authentication request from {1}'.format(self.username, src_ip))
return False return False
logging.info('User "{0}" logged in successfully'.format(self.username))
except Exception as e: except Exception as e:
logging.error('User "{0}" input a wrong LDAP password. Authentication request from {1}'.format(self.username, src_ip)) logging.error('Wrong LDAP configuration. {0}'.format(e))
logging.debug(e) logging.debug(traceback.format_exc())
return False return False
# create user if not exist in the db # create user if not exist in the db
if not User.query.filter(User.username == self.username).first(): if not User.query.filter(User.username == self.username).first():
@ -258,11 +258,12 @@ class User(db.Model):
try: try:
# try to get user's firstname & lastname from LDAP # try to get user's firstname & lastname from LDAP
# this might be changed in the future # this might be changed in the future
self.firstname = result[0][0][1]['givenName'] self.firstname = ldap_result[0][0][1]['givenName'][0].decode("utf-8")
self.lastname = result[0][0][1]['sn'] self.lastname = ldap_result[0][0][1]['sn'][0].decode("utf-8")
self.email = result[0][0][1]['mail'] self.email = ldap_result[0][0][1]['mail'][0].decode("utf-8")
except Exception as e: except Exception as e:
logging.info("reading ldap data threw an exception {0}".format(e)) logging.info("Reading ldap data threw an exception {0}".format(e))
logging.debug(traceback.format_exc())
# first register user will be in Administrator role # first register user will be in Administrator role
self.role_id = Role.query.filter_by(name='User').first().id self.role_id = Role.query.filter_by(name='User').first().id

View File

@ -39,22 +39,18 @@ SQLALCHEMY_TRACK_MODIFICATIONS = True
LDAP_ENABLED = False LDAP_ENABLED = False
LDAP_TYPE = 'ldap' LDAP_TYPE = 'ldap'
LDAP_URI = 'ldaps://your-ldap-server:636' LDAP_URI = 'ldaps://your-ldap-server:636'
# with LDAP_BIND_TYPE you can specify 'direct' or 'search' to use user credentials LDAP_ADMIN_USERNAME = 'cn=admin,dc=mydomain,dc=com'
# for binding or a predefined LDAP_USERNAME and LDAP_PASSWORD, binding with non-DN only works with AD LDAP_ADMIN_PASSWORD = 'password'
LDAP_BIND_TYPE= 'direct' # direct or search LDAP_SEARCH_BASE = 'dc=mydomain,dc=com'
LDAP_USERNAME = 'cn=dnsuser,ou=users,ou=services,dc=duykhanh,dc=me'
LDAP_PASSWORD = 'dnsuser'
LDAP_SEARCH_BASE = 'ou=System Admins,ou=People,dc=duykhanh,dc=me'
LDAP_GROUP_SECURITY = False
LDAP_ADMIN_GROUP = 'CN=PowerDNS-Admin Admin,OU=Custom,DC=ivan,DC=local'
LDAP_USER_GROUP = 'CN=PowerDNS-Admin User,OU=Custom,DC=ivan,DC=local'
# Additional options only if LDAP_TYPE=ldap # Additional options only if LDAP_TYPE=ldap
LDAP_USERNAMEFIELD = 'uid' LDAP_USERNAMEFIELD = 'uid'
LDAP_FILTER = '(objectClass=inetorgperson)' LDAP_FILTER = '(objectClass=inetorgperson)'
# enable LDAP_GROUP_SECURITY to allow Admin and User roles based on LDAP groups # enable LDAP_GROUP_SECURITY to allow Admin and User roles based on LDAP groups
#LDAP_GROUP_SECURITY = True # True or False LDAP_GROUP_SECURITY = False # True or False
#LDAP_ADMIN_GROUP = 'CN=DnsAdmins,CN=Users,DC=example,DC=me' LDAP_ADMIN_GROUP = 'cn=sysops,dc=mydomain,dc=com'
#LDAP_USER_GROUP = 'CN=Domain Admins,CN=Users,DC=example,DC=me' LDAP_USER_GROUP = 'cn=user,dc=mydomain,dc=com'
## AD CONFIG ## AD CONFIG
#LDAP_TYPE = 'ad' #LDAP_TYPE = 'ad'

View File

@ -32,20 +32,19 @@ SIGNUP_ENABLED = True
# LDAP CONFIG # LDAP CONFIG
LDAP_ENABLED = False LDAP_ENABLED = False
LDAP_TYPE = 'ldap' LDAP_TYPE = 'ldap'
LDAP_URI = 'ldaps://your-ldap-server:636' LDAP_URI = 'ldap://docker.for.mac.localhost:389'
# with LDAP_BIND_TYPE you can specify 'direct' or 'search' to use user credentials LDAP_ADMIN_USERNAME = 'cn=admin,dc=mydomain,dc=com'
# for binding or a predefined LDAP_USERNAME and LDAP_PASSWORD, binding with non-DN only works with AD LDAP_ADMIN_PASSWORD = 'password'
LDAP_BIND_TYPE= 'direct' # direct or search LDAP_SEARCH_BASE = 'dc=mydomain,dc=com'
LDAP_USERNAME = 'cn=dnsuser,ou=users,ou=services,dc=duykhanh,dc=me'
LDAP_PASSWORD = 'dnsuser'
LDAP_SEARCH_BASE = 'ou=System Admins,ou=People,dc=duykhanh,dc=me'
# Additional options only if LDAP_TYPE=ldap # Additional options only if LDAP_TYPE=ldap
LDAP_USERNAMEFIELD = 'uid' LDAP_USERNAMEFIELD = 'uid'
LDAP_FILTER = '(objectClass=inetorgperson)' LDAP_FILTER = '(objectClass=inetorgperson)'
# enable LDAP_GROUP_SECURITY to allow Admin and User roles based on LDAP groups # enable LDAP_GROUP_SECURITY to allow Admin and User roles based on LDAP groups
#LDAP_GROUP_SECURITY = True # True or False LDAP_GROUP_SECURITY = False # True or False
#LDAP_ADMIN_GROUP = 'CN=DnsAdmins,CN=Users,DC=example,DC=me' LDAP_ADMIN_GROUP = 'cn=sysops,dc=mydomain,dc=com'
#LDAP_USER_GROUP = 'CN=Domain Admins,CN=Users,DC=example,DC=me' LDAP_USER_GROUP = 'cn=user,dc=mydomain,dc=com'
## AD CONFIG ## AD CONFIG
#LDAP_TYPE = 'ad' #LDAP_TYPE = 'ad'