Merge pull request #125 from timfeirg/master

models refactoring
This commit is contained in:
Khanh Ngo 2016-08-26 10:48:10 +07:00 committed by GitHub
commit 59e91ec7b9

View File

@ -172,73 +172,68 @@ class User(db.Model):
if user_info.password and self.check_password(user_info.password): if user_info.password and self.check_password(user_info.password):
logging.info('User "%s" logged in successfully' % self.username) logging.info('User "%s" logged in successfully' % self.username)
return True return True
else: logging.error('User "%s" input a wrong password' % self.username)
logging.error('User "%s" input a wrong password' % self.username)
return False
else:
logging.warning('User "%s" does not exist' % self.username)
return False return False
elif method == 'LDAP': logging.warning('User "%s" does not exist' % self.username)
return False
if method == 'LDAP':
if not LDAP_TYPE: if not LDAP_TYPE:
logging.error('LDAP authentication is disabled') logging.error('LDAP authentication is disabled')
return False return False
searchFilter = "(&(objectcategory=person)(samaccountname=%s))" % self.username
if LDAP_TYPE == 'ldap': if LDAP_TYPE == 'ldap':
searchFilter = "(&(%s=%s)%s)" % (LDAP_USERNAMEFIELD, self.username, LDAP_FILTER) searchFilter = "(&(%s=%s)%s)" % (LDAP_USERNAMEFIELD, self.username, LDAP_FILTER)
logging.info('Ldap searchFilter "%s"' % searchFilter) logging.info('Ldap searchFilter "%s"' % searchFilter)
else:
searchFilter = "(&(objectcategory=person)(samaccountname=%s))" % self.username
try:
result = self.ldap_search(searchFilter, LDAP_SEARCH_BASE)
except Exception, e:
raise
result = self.ldap_search(searchFilter, LDAP_SEARCH_BASE)
if not result: if not result:
logging.warning('User "%s" does not exist' % self.username) logging.warning('User "%s" does not exist' % self.username)
return False return False
else:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
l = ldap.initialize(LDAP_URI)
l.set_option(ldap.OPT_REFERRALS, 0)
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
l.set_option(ldap.OPT_X_TLS,ldap.OPT_X_TLS_DEMAND)
l.set_option( ldap.OPT_X_TLS_DEMAND, True )
l.set_option( ldap.OPT_DEBUG_LEVEL, 255 )
l.protocol_version = ldap.VERSION3
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
l = ldap.initialize(LDAP_URI)
l.set_option(ldap.OPT_REFERRALS, 0)
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
l.set_option(ldap.OPT_X_TLS,ldap.OPT_X_TLS_DEMAND)
l.set_option( ldap.OPT_X_TLS_DEMAND, True )
l.set_option( ldap.OPT_DEBUG_LEVEL, 255 )
l.protocol_version = ldap.VERSION3
try:
ldap_username = result[0][0][0]
l.simple_bind_s(ldap_username, self.password)
logging.info('User "%s" logged in successfully' % self.username)
except Exception:
logging.error('User "%s" input a wrong password' % self.username)
return False
# create user if not exist in the db
if not User.query.filter(User.username == self.username).first():
try: try:
ldap_username = result[0][0][0] # try to get user's firstname & lastname from LDAP
l.simple_bind_s(ldap_username, self.password) # this might be changed in the future
logging.info('User "%s" logged in successfully' % self.username) self.firstname = result[0][0][1]['givenName'][0]
self.lastname = result[0][0][1]['sn'][0]
self.email = result[0][0][1]['mail'][0]
except Exception:
self.firstname = self.username
self.lastname = ''
# create user if not exist in the db # first register user will be in Administrator role
if User.query.filter(User.username == self.username).first() == None: self.role_id = Role.query.filter_by(name='User').first().id
try: if User.query.count() == 0:
# try to get user's firstname & lastname from LDAP self.role_id = Role.query.filter_by(name='Administrator').first().id
# this might be changed in the future
self.firstname = result[0][0][1]['givenName'][0]
self.lastname = result[0][0][1]['sn'][0]
self.email = result[0][0][1]['mail'][0]
except:
self.firstname = self.username
self.lastname = ''
# first register user will be in Administrator role self.create_user()
if User.query.count() == 0: logging.info('Created user "%s" in the DB' % self.username)
self.role_id = Role.query.filter_by(name='Administrator').first().id
else:
self.role_id = Role.query.filter_by(name='User').first().id
self.create_user() return True
logging.info('Created user "%s" in the DB' % self.username)
return True logging.error('Unsupported authentication method')
except: return False
logging.error('User "%s" input a wrong password' % self.username)
return False
else:
logging.error('Unsupported authentication method')
return False
def create_user(self): def create_user(self):
""" """
@ -246,11 +241,8 @@ class User(db.Model):
We will create a local user (in DB) in order to manage user We will create a local user (in DB) in order to manage user
profile such as name, roles,... profile such as name, roles,...
""" """
user = User(username=self.username, firstname=self.firstname, lastname=self.lastname, role_id=self.role_id, email=self.email) db.session.add(self)
db.session.add(user)
db.session.commit() db.session.commit()
# assgine user_id to current_user after create in the DB
self.id = user.id
def create_local_user(self): def create_local_user(self):
""" """
@ -266,54 +258,43 @@ class User(db.Model):
if user: if user:
return 'Email already existed' return 'Email already existed'
try: # first register user will be in Administrator role
# first register user will be in Administrator role self.role_id = Role.query.filter_by(name='User').first().id
if User.query.count() == 0: if User.query.count() == 0:
self.role_id = Role.query.filter_by(name='Administrator').first().id self.role_id = Role.query.filter_by(name='Administrator').first().id
else: self.password = self.get_hashed_password(self.plain_text_password)
self.role_id = Role.query.filter_by(name='User').first().id
user = User(username=self.username, firstname=self.firstname, lastname=self.lastname, role_id=self.role_id, email=self.email, password=self.get_hashed_password(self.plain_text_password)) db.session.add(self)
db.session.add(user) db.session.commit()
db.session.commit() return True
self.id = user.id
return True
except Exception, e:
raise
def update_profile(self, enable_otp=None): def update_profile(self, enable_otp=None):
""" """
Update user profile Update user profile
""" """
user = User.query.filter(User.username == self.username).first() user = User.query.filter(User.username == self.username).first()
if user: if not user:
if self.firstname: return False
user.firstname = self.firstname
if self.lastname:
user.lastname = self.lastname
if self.email:
user.email = self.email
if self.plain_text_password:
user.password = self.get_hashed_password(self.plain_text_password)
if self.avatar:
user.avatar = self.avatar
if enable_otp == True: user.firstname = self.firstname if self.firstname else user.firstname
# generate the opt secret key user.lastname = self.lastname if self.lastname else user.lastname
user.otp_secret = base64.b32encode(os.urandom(10)).decode('utf-8') user.email = self.email if self.email else user.email
elif enable_otp == False: user.password = self.get_hashed_password(self.plain_text_password) if self.plain_text_password else user.password
# set otp_secret="" means we want disable the otp authenticaion. user.avatar = self.avatar if self.avatar else user.avatar
user.otp_secret = ""
else:
# do nothing.
pass
try: user.otp_secret = ""
db.session.commit() if enable_otp == True:
return True # generate the opt secret key
except: user.otp_secret = base64.b32encode(os.urandom(10)).decode('utf-8')
db.session.rollback()
return False try:
db.session.add(user)
db.session.commit()
return True
except Exception:
db.session.rollback()
return False
def get_domain(self): def get_domain(self):
""" """