mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2024-11-09 15:10:27 +00:00
refactor, make model more pythonic
This commit is contained in:
parent
4a8e607ed2
commit
04e068787a
165
app/models.py
165
app/models.py
@ -172,73 +172,68 @@ class User(db.Model):
|
||||
if user_info.password and self.check_password(user_info.password):
|
||||
logging.info('User "%s" logged in successfully' % self.username)
|
||||
return True
|
||||
else:
|
||||
logging.error('User "%s" input a wrong password' % self.username)
|
||||
return False
|
||||
else:
|
||||
logging.warning('User "%s" does not exist' % self.username)
|
||||
logging.error('User "%s" input a wrong password' % self.username)
|
||||
return False
|
||||
|
||||
elif method == 'LDAP':
|
||||
logging.warning('User "%s" does not exist' % self.username)
|
||||
return False
|
||||
|
||||
if method == 'LDAP':
|
||||
if not LDAP_TYPE:
|
||||
logging.error('LDAP authentication is disabled')
|
||||
return False
|
||||
|
||||
searchFilter = "(&(objectcategory=person)(samaccountname=%s))" % self.username
|
||||
if LDAP_TYPE == 'ldap':
|
||||
searchFilter = "(&(%s=%s)%s)" % (LDAP_USERNAMEFIELD, self.username, LDAP_FILTER)
|
||||
logging.info('Ldap searchFilter "%s"' % searchFilter)
|
||||
else:
|
||||
searchFilter = "(&(objectcategory=person)(samaccountname=%s))" % self.username
|
||||
try:
|
||||
result = self.ldap_search(searchFilter, LDAP_SEARCH_BASE)
|
||||
except Exception, e:
|
||||
raise
|
||||
|
||||
result = self.ldap_search(searchFilter, LDAP_SEARCH_BASE)
|
||||
if not result:
|
||||
logging.warning('User "%s" does not exist' % self.username)
|
||||
return False
|
||||
else:
|
||||
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
|
||||
l = ldap.initialize(LDAP_URI)
|
||||
l.set_option(ldap.OPT_REFERRALS, 0)
|
||||
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
||||
l.set_option(ldap.OPT_X_TLS,ldap.OPT_X_TLS_DEMAND)
|
||||
l.set_option( ldap.OPT_X_TLS_DEMAND, True )
|
||||
l.set_option( ldap.OPT_DEBUG_LEVEL, 255 )
|
||||
l.protocol_version = ldap.VERSION3
|
||||
|
||||
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
|
||||
l = ldap.initialize(LDAP_URI)
|
||||
l.set_option(ldap.OPT_REFERRALS, 0)
|
||||
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
||||
l.set_option(ldap.OPT_X_TLS,ldap.OPT_X_TLS_DEMAND)
|
||||
l.set_option( ldap.OPT_X_TLS_DEMAND, True )
|
||||
l.set_option( ldap.OPT_DEBUG_LEVEL, 255 )
|
||||
l.protocol_version = ldap.VERSION3
|
||||
|
||||
try:
|
||||
ldap_username = result[0][0][0]
|
||||
l.simple_bind_s(ldap_username, self.password)
|
||||
logging.info('User "%s" logged in successfully' % self.username)
|
||||
except Exception:
|
||||
logging.error('User "%s" input a wrong password' % self.username)
|
||||
return False
|
||||
|
||||
# create user if not exist in the db
|
||||
if not User.query.filter(User.username == self.username).first():
|
||||
try:
|
||||
ldap_username = result[0][0][0]
|
||||
l.simple_bind_s(ldap_username, self.password)
|
||||
logging.info('User "%s" logged in successfully' % self.username)
|
||||
# try to get user's firstname & lastname from LDAP
|
||||
# this might be changed in the future
|
||||
self.firstname = result[0][0][1]['givenName'][0]
|
||||
self.lastname = result[0][0][1]['sn'][0]
|
||||
self.email = result[0][0][1]['mail'][0]
|
||||
except Exception:
|
||||
self.firstname = self.username
|
||||
self.lastname = ''
|
||||
|
||||
# create user if not exist in the db
|
||||
if User.query.filter(User.username == self.username).first() == None:
|
||||
try:
|
||||
# try to get user's firstname & lastname from LDAP
|
||||
# this might be changed in the future
|
||||
self.firstname = result[0][0][1]['givenName'][0]
|
||||
self.lastname = result[0][0][1]['sn'][0]
|
||||
self.email = result[0][0][1]['mail'][0]
|
||||
except:
|
||||
self.firstname = self.username
|
||||
self.lastname = ''
|
||||
# first register user will be in Administrator role
|
||||
self.role_id = Role.query.filter_by(name='User').first().id
|
||||
if User.query.count() == 0:
|
||||
self.role_id = Role.query.filter_by(name='Administrator').first().id
|
||||
|
||||
# first register user will be in Administrator role
|
||||
if User.query.count() == 0:
|
||||
self.role_id = Role.query.filter_by(name='Administrator').first().id
|
||||
else:
|
||||
self.role_id = Role.query.filter_by(name='User').first().id
|
||||
self.create_user()
|
||||
logging.info('Created user "%s" in the DB' % self.username)
|
||||
|
||||
self.create_user()
|
||||
logging.info('Created user "%s" in the DB' % self.username)
|
||||
return True
|
||||
except:
|
||||
logging.error('User "%s" input a wrong password' % self.username)
|
||||
return False
|
||||
else:
|
||||
logging.error('Unsupported authentication method')
|
||||
return False
|
||||
return True
|
||||
|
||||
logging.error('Unsupported authentication method')
|
||||
return False
|
||||
|
||||
def create_user(self):
|
||||
"""
|
||||
@ -246,11 +241,8 @@ class User(db.Model):
|
||||
We will create a local user (in DB) in order to manage user
|
||||
profile such as name, roles,...
|
||||
"""
|
||||
user = User(username=self.username, firstname=self.firstname, lastname=self.lastname, role_id=self.role_id, email=self.email)
|
||||
db.session.add(user)
|
||||
db.session.add(self)
|
||||
db.session.commit()
|
||||
# assgine user_id to current_user after create in the DB
|
||||
self.id = user.id
|
||||
|
||||
def create_local_user(self):
|
||||
"""
|
||||
@ -266,54 +258,43 @@ class User(db.Model):
|
||||
if user:
|
||||
return 'Email already existed'
|
||||
|
||||
try:
|
||||
# first register user will be in Administrator role
|
||||
if User.query.count() == 0:
|
||||
self.role_id = Role.query.filter_by(name='Administrator').first().id
|
||||
else:
|
||||
self.role_id = Role.query.filter_by(name='User').first().id
|
||||
# first register user will be in Administrator role
|
||||
self.role_id = Role.query.filter_by(name='User').first().id
|
||||
if User.query.count() == 0:
|
||||
self.role_id = Role.query.filter_by(name='Administrator').first().id
|
||||
self.password = self.get_hashed_password(self.plain_text_password)
|
||||
|
||||
user = User(username=self.username, firstname=self.firstname, lastname=self.lastname, role_id=self.role_id, email=self.email, password=self.get_hashed_password(self.plain_text_password))
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
self.id = user.id
|
||||
return True
|
||||
except Exception, e:
|
||||
raise
|
||||
db.session.add(self)
|
||||
db.session.commit()
|
||||
return True
|
||||
|
||||
def update_profile(self, enable_otp=None):
|
||||
"""
|
||||
Update user profile
|
||||
"""
|
||||
|
||||
user = User.query.filter(User.username == self.username).first()
|
||||
if user:
|
||||
if self.firstname:
|
||||
user.firstname = self.firstname
|
||||
if self.lastname:
|
||||
user.lastname = self.lastname
|
||||
if self.email:
|
||||
user.email = self.email
|
||||
if self.plain_text_password:
|
||||
user.password = self.get_hashed_password(self.plain_text_password)
|
||||
if self.avatar:
|
||||
user.avatar = self.avatar
|
||||
if not user:
|
||||
return False
|
||||
|
||||
if enable_otp == True:
|
||||
# generate the opt secret key
|
||||
user.otp_secret = base64.b32encode(os.urandom(10)).decode('utf-8')
|
||||
elif enable_otp == False:
|
||||
# set otp_secret="" means we want disable the otp authenticaion.
|
||||
user.otp_secret = ""
|
||||
else:
|
||||
# do nothing.
|
||||
pass
|
||||
user.firstname = self.firstname if self.firstname else user.firstname
|
||||
user.lastname = self.lastname if self.lastname else user.lastname
|
||||
user.email = self.email if self.email else user.email
|
||||
user.password = self.get_hashed_password(self.plain_text_password) if self.plain_text_password else user.password
|
||||
user.avatar = self.avatar if self.avatar else user.avatar
|
||||
|
||||
try:
|
||||
db.session.commit()
|
||||
return True
|
||||
except:
|
||||
db.session.rollback()
|
||||
return False
|
||||
user.otp_secret = ""
|
||||
if enable_otp == True:
|
||||
# generate the opt secret key
|
||||
user.otp_secret = base64.b32encode(os.urandom(10)).decode('utf-8')
|
||||
|
||||
try:
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
return True
|
||||
except Exception:
|
||||
db.session.rollback()
|
||||
return False
|
||||
|
||||
def get_domain(self):
|
||||
"""
|
||||
|
Loading…
Reference in New Issue
Block a user