diff --git a/app/models.py b/app/models.py index 0eff73d..1cd1733 100644 --- a/app/models.py +++ b/app/models.py @@ -25,15 +25,6 @@ logging = logger.getLogger(__name__) if 'LDAP_TYPE' in app.config.keys(): LDAP_URI = app.config['LDAP_URI'] - - if 'LDAP_USERNAME' in app.config.keys() and 'LDAP_PASSWORD' in app.config.keys(): #backward compatability - LDAP_BIND_TYPE = 'search' - if 'LDAP_BIND_TYPE' in app.config.keys(): - LDAP_BIND_TYPE = app.config['LDAP_BIND_TYPE'] - if LDAP_BIND_TYPE == 'search': - LDAP_USERNAME = app.config['LDAP_USERNAME'] - LDAP_PASSWORD = app.config['LDAP_PASSWORD'] - LDAP_SEARCH_BASE = app.config['LDAP_SEARCH_BASE'] LDAP_TYPE = app.config['LDAP_TYPE'] LDAP_FILTER = app.config['LDAP_FILTER'] @@ -154,29 +145,29 @@ class User(db.Model): user_info = User.query.filter(User.username == self.username).first() return user_info + def ldap_init_conn(self): + ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) + conn = ldap.initialize(LDAP_URI) + conn.set_option(ldap.OPT_REFERRALS, ldap.OPT_OFF) + conn.set_option(ldap.OPT_PROTOCOL_VERSION, 3) + conn.set_option(ldap.OPT_X_TLS,ldap.OPT_X_TLS_DEMAND) + conn.set_option( ldap.OPT_X_TLS_DEMAND, True ) + conn.set_option( ldap.OPT_DEBUG_LEVEL, 255 ) + conn.protocol_version = ldap.VERSION3 + return conn + def ldap_search(self, searchFilter, baseDN): searchScope = ldap.SCOPE_SUBTREE retrieveAttributes = None try: - ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) - l = ldap.initialize(LDAP_URI) - l.set_option(ldap.OPT_REFERRALS, ldap.OPT_OFF) - l.set_option(ldap.OPT_PROTOCOL_VERSION, 3) - l.set_option(ldap.OPT_X_TLS,ldap.OPT_X_TLS_DEMAND) - l.set_option( ldap.OPT_X_TLS_DEMAND, True ) - l.set_option( ldap.OPT_DEBUG_LEVEL, 255 ) - l.protocol_version = ldap.VERSION3 - if LDAP_BIND_TYPE == "direct": - global LDAP_USERNAME; LDAP_USERNAME = self.username - global LDAP_PASSWORD; LDAP_PASSWORD = self.password - - - l.simple_bind_s(LDAP_USERNAME, LDAP_PASSWORD) - ldap_result_id = l.search(baseDN, searchScope, searchFilter, retrieveAttributes) + conn = self.ldap_init_conn() + conn.simple_bind_s(app.config['LDAP_ADMIN_USERNAME'], app.config['LDAP_ADMIN_PASSWORD']) + ldap_result_id = conn.search(baseDN, searchScope, searchFilter, retrieveAttributes) result_set = [] + while 1: - result_type, result_data = l.result(ldap_result_id, 0) + result_type, result_data = conn.result(ldap_result_id, 0) if (result_data == []): break else: @@ -188,6 +179,15 @@ class User(db.Model): logging.error(e) raise + def ldap_auth(self, ldap_username, password): + try: + conn = self.ldap_init_conn() + conn.simple_bind_s(ldap_username, password) + return True + except ldap.LDAPError as e: + logging.error(e) + return False + def is_validate(self, method, src_ip=''): """ Validate user credential @@ -206,7 +206,6 @@ class User(db.Model): return False if method == 'LDAP': - allowedlogin = False isadmin = False if not LDAP_TYPE: logging.error('LDAP authentication is disabled') @@ -214,42 +213,43 @@ class User(db.Model): if LDAP_TYPE == 'ldap': searchFilter = "(&({0}={1}){2})".format(LDAP_USERNAMEFIELD, self.username, LDAP_FILTER) - logging.info('Ldap searchFilter "{0}"'.format(searchFilter)) + logging.debug('Ldap searchFilter "{0}"'.format(searchFilter)) elif LDAP_TYPE == 'ad': searchFilter = "(&(objectcategory=person)({0}={1}){2})".format(LDAP_USERNAMEFIELD, self.username, LDAP_FILTER) - result = self.ldap_search(searchFilter, LDAP_SEARCH_BASE) - if not result: + ldap_result = self.ldap_search(searchFilter, LDAP_SEARCH_BASE) + if not ldap_result: logging.warning('LDAP User "{0}" does not exist. Authentication request from {1}'.format(self.username, src_ip)) return False - - try: - ldap_username = ldap.filter.escape_filter_chars(result[0][0][0]) - if LDAP_GROUP_SECURITY: - try: - if LDAP_TYPE == 'ldap': - ldap_user_dn = ldap.filter.escape_filter_chars(result[0][0][0]) - logging.info(result[0][0][0]) - if (self.ldap_search('(member={0})'.format(ldap_user_dn) ,LDAP_ADMIN_GROUP)): - allowedlogin = True - isadmin = True - logging.info('User {0} is part of the "{1}" group that allows admin access to PowerDNS-Admin'.format(self.username,LDAP_ADMIN_GROUP)) - if (self.ldap_search('(member={0})'.format(ldap_user_dn) ,LDAP_USER_GROUP)): - #if (group == LDAP_USER_GROUP): - allowedlogin = True - logging.info('User {0} is part of the "{1}" group that allows user access to PowerDNS-Admin'.format(self.username,LDAP_USER_GROUP)) - if allowedlogin == False: - logging.error('User {0} is not part of the "{1}" or "{2}" groups that allow access to PowerDNS-Admin'.format(self.username,LDAP_ADMIN_GROUP,LDAP_USER_GROUP)) + else: + try: + ldap_username = ldap.filter.escape_filter_chars(ldap_result[0][0][0]) + # check if LDAP_SECURITY_GROUP is enabled + # user can be assigned to ADMIN or USER role. + if LDAP_GROUP_SECURITY: + try: + if (self.ldap_search(searchFilter, LDAP_ADMIN_GROUP)): + isadmin = True + logging.info('User {0} is part of the "{1}" group that allows admin access to PowerDNS-Admin'.format(self.username,LDAP_ADMIN_GROUP)) + elif (self.ldap_search(searchFilter, LDAP_USER_GROUP)): + logging.info('User {0} is part of the "{1}" group that allows user access to PowerDNS-Admin'.format(self.username,LDAP_USER_GROUP)) + else: + logging.error('User {0} is not part of the "{1}" or "{2}" groups that allow access to PowerDNS-Admin'.format(self.username,LDAP_ADMIN_GROUP,LDAP_USER_GROUP)) + return False + except Exception as e: + logging.error('LDAP group lookup for user "{0}" has failed. Authentication request from {1}'.format(self.username, src_ip)) + logging.debug(traceback.format_exc()) return False - except Exception as e: - logging.error('LDAP group lookup for user "{0}" has failed. Authentication request from {1}'.format(self.username, src_ip)) - logging.debug(e) + + # validate ldap user password + if not self.ldap_auth(ldap_username, self.password): + logging.error('User "{0}" input a wrong LDAP password. Authentication request from {1}'.format(self.username, src_ip)) return False - logging.info('User "{0}" logged in successfully'.format(self.username)) - except Exception as e: - logging.error('User "{0}" input a wrong LDAP password. Authentication request from {1}'.format(self.username, src_ip)) - logging.debug(e) - return False + + except Exception as e: + logging.error('Wrong LDAP configuration. {0}'.format(e)) + logging.debug(traceback.format_exc()) + return False # create user if not exist in the db if not User.query.filter(User.username == self.username).first(): @@ -258,11 +258,12 @@ class User(db.Model): try: # try to get user's firstname & lastname from LDAP # this might be changed in the future - self.firstname = result[0][0][1]['givenName'] - self.lastname = result[0][0][1]['sn'] - self.email = result[0][0][1]['mail'] + self.firstname = ldap_result[0][0][1]['givenName'][0].decode("utf-8") + self.lastname = ldap_result[0][0][1]['sn'][0].decode("utf-8") + self.email = ldap_result[0][0][1]['mail'][0].decode("utf-8") except Exception as e: - logging.info("reading ldap data threw an exception {0}".format(e)) + logging.info("Reading ldap data threw an exception {0}".format(e)) + logging.debug(traceback.format_exc()) # first register user will be in Administrator role self.role_id = Role.query.filter_by(name='User').first().id diff --git a/config_template.py b/config_template.py index 5bb4006..ebbbc29 100644 --- a/config_template.py +++ b/config_template.py @@ -39,22 +39,18 @@ SQLALCHEMY_TRACK_MODIFICATIONS = True LDAP_ENABLED = False LDAP_TYPE = 'ldap' LDAP_URI = 'ldaps://your-ldap-server:636' -# with LDAP_BIND_TYPE you can specify 'direct' or 'search' to use user credentials -# for binding or a predefined LDAP_USERNAME and LDAP_PASSWORD, binding with non-DN only works with AD -LDAP_BIND_TYPE= 'direct' # direct or search -LDAP_USERNAME = 'cn=dnsuser,ou=users,ou=services,dc=duykhanh,dc=me' -LDAP_PASSWORD = 'dnsuser' -LDAP_SEARCH_BASE = 'ou=System Admins,ou=People,dc=duykhanh,dc=me' -LDAP_GROUP_SECURITY = False -LDAP_ADMIN_GROUP = 'CN=PowerDNS-Admin Admin,OU=Custom,DC=ivan,DC=local' -LDAP_USER_GROUP = 'CN=PowerDNS-Admin User,OU=Custom,DC=ivan,DC=local' +LDAP_ADMIN_USERNAME = 'cn=admin,dc=mydomain,dc=com' +LDAP_ADMIN_PASSWORD = 'password' +LDAP_SEARCH_BASE = 'dc=mydomain,dc=com' + # Additional options only if LDAP_TYPE=ldap LDAP_USERNAMEFIELD = 'uid' LDAP_FILTER = '(objectClass=inetorgperson)' + # enable LDAP_GROUP_SECURITY to allow Admin and User roles based on LDAP groups -#LDAP_GROUP_SECURITY = True # True or False -#LDAP_ADMIN_GROUP = 'CN=DnsAdmins,CN=Users,DC=example,DC=me' -#LDAP_USER_GROUP = 'CN=Domain Admins,CN=Users,DC=example,DC=me' +LDAP_GROUP_SECURITY = False # True or False +LDAP_ADMIN_GROUP = 'cn=sysops,dc=mydomain,dc=com' +LDAP_USER_GROUP = 'cn=user,dc=mydomain,dc=com' ## AD CONFIG #LDAP_TYPE = 'ad' diff --git a/configs/development.py b/configs/development.py index 82024c4..10c863d 100644 --- a/configs/development.py +++ b/configs/development.py @@ -32,20 +32,19 @@ SIGNUP_ENABLED = True # LDAP CONFIG LDAP_ENABLED = False LDAP_TYPE = 'ldap' -LDAP_URI = 'ldaps://your-ldap-server:636' -# with LDAP_BIND_TYPE you can specify 'direct' or 'search' to use user credentials -# for binding or a predefined LDAP_USERNAME and LDAP_PASSWORD, binding with non-DN only works with AD -LDAP_BIND_TYPE= 'direct' # direct or search -LDAP_USERNAME = 'cn=dnsuser,ou=users,ou=services,dc=duykhanh,dc=me' -LDAP_PASSWORD = 'dnsuser' -LDAP_SEARCH_BASE = 'ou=System Admins,ou=People,dc=duykhanh,dc=me' +LDAP_URI = 'ldap://docker.for.mac.localhost:389' +LDAP_ADMIN_USERNAME = 'cn=admin,dc=mydomain,dc=com' +LDAP_ADMIN_PASSWORD = 'password' +LDAP_SEARCH_BASE = 'dc=mydomain,dc=com' + # Additional options only if LDAP_TYPE=ldap LDAP_USERNAMEFIELD = 'uid' LDAP_FILTER = '(objectClass=inetorgperson)' + # enable LDAP_GROUP_SECURITY to allow Admin and User roles based on LDAP groups -#LDAP_GROUP_SECURITY = True # True or False -#LDAP_ADMIN_GROUP = 'CN=DnsAdmins,CN=Users,DC=example,DC=me' -#LDAP_USER_GROUP = 'CN=Domain Admins,CN=Users,DC=example,DC=me' +LDAP_GROUP_SECURITY = False # True or False +LDAP_ADMIN_GROUP = 'cn=sysops,dc=mydomain,dc=com' +LDAP_USER_GROUP = 'cn=user,dc=mydomain,dc=com' ## AD CONFIG #LDAP_TYPE = 'ad'