Support login in through REMOTE_USER environment variable

Support redirecting remote users to logout URL and clearing remote login cookies
This commit is contained in:
Nicolás Fantone 2020-01-02 19:01:13 -03:00
parent a598c52729
commit 52298f8289
3 changed files with 73 additions and 23 deletions

View File

@ -1,7 +1,7 @@
import os import os
import base64 import base64
import bcrypt
import traceback import traceback
import bcrypt
import pyotp import pyotp
import ldap import ldap
import ldap.filter import ldap.filter
@ -103,7 +103,7 @@ class User(db.Model):
return bcrypt.hashpw(pw.encode('utf-8'), bcrypt.gensalt()) return bcrypt.hashpw(pw.encode('utf-8'), bcrypt.gensalt())
def check_password(self, hashed_password): def check_password(self, hashed_password):
# Check hased password. Using bcrypt, the salt is saved into the hash itself # Check hashed password. Using bcrypt, the salt is saved into the hash itself
if (self.plain_text_password): if (self.plain_text_password):
return bcrypt.checkpw(self.plain_text_password.encode('utf-8'), return bcrypt.checkpw(self.plain_text_password.encode('utf-8'),
hashed_password.encode('utf-8')) hashed_password.encode('utf-8'))
@ -191,7 +191,7 @@ class User(db.Model):
current_app.logger.exception("Recursive AD Group search error") current_app.logger.exception("Recursive AD Group search error")
return result return result
def is_validate(self, method, src_ip=''): def is_validate(self, method, src_ip='', trust_user=False):
""" """
Validate user credential Validate user credential
""" """
@ -202,8 +202,8 @@ class User(db.Model):
User.username == self.username).first() User.username == self.username).first()
if user_info: if user_info:
if user_info.password and self.check_password( if trust_user or (user_info.password and self.check_password(
user_info.password): user_info.password)):
current_app.logger.info( current_app.logger.info(
'User "{0}" logged in successfully. Authentication request from {1}' 'User "{0}" logged in successfully. Authentication request from {1}'
.format(self.username, src_ip)) .format(self.username, src_ip))
@ -231,7 +231,7 @@ class User(db.Model):
LDAP_GROUP_SECURITY_ENABLED = Setting().get('ldap_sg_enabled') LDAP_GROUP_SECURITY_ENABLED = Setting().get('ldap_sg_enabled')
# validate AD user password # validate AD user password
if Setting().get('ldap_type') == 'ad': if Setting().get('ldap_type') == 'ad' and not trust_user:
ldap_username = "{0}@{1}".format(self.username, ldap_username = "{0}@{1}".format(self.username,
Setting().get('ldap_domain')) Setting().get('ldap_domain'))
if not self.ldap_auth(ldap_username, self.password): if not self.ldap_auth(ldap_username, self.password):
@ -258,7 +258,7 @@ class User(db.Model):
ldap_username = ldap.filter.escape_filter_chars( ldap_username = ldap.filter.escape_filter_chars(
ldap_result[0][0][0]) ldap_result[0][0][0])
if Setting().get('ldap_type') != 'ad': if Setting().get('ldap_type') != 'ad' and not trust_user:
# validate ldap user password # validate ldap user password
if not self.ldap_auth(ldap_username, self.password): if not self.ldap_auth(ldap_username, self.password):
current_app.logger.error( current_app.logger.error(
@ -588,4 +588,4 @@ class User(db.Model):
db.session.commit() db.session.commit()
return {'status': True, 'msg': 'Set user role successfully'} return {'status': True, 'msg': 'Set user role successfully'}
else: else:
return {'status': False, 'msg': 'Role does not exist'} return {'status': False, 'msg': 'Role does not exist'}

View File

@ -28,6 +28,19 @@ def handle_internal_server_error(e):
return render_template('errors/500.html', code=500, message=e), 500 return render_template('errors/500.html', code=500, message=e), 500
def load_if_valid(user, method, src_ip, trust_user = False):
try:
auth = user.is_validate(method, src_ip, trust_user)
if auth == False:
return None
else:
# login_user(user, remember=False)
return User.query.filter(User.id==user.id).first()
except Exception as e:
current_app.logger.error('Error: {0}'.format(e))
return None
@login_manager.user_loader @login_manager.user_loader
def load_user(id): def load_user(id):
""" """
@ -37,29 +50,42 @@ def load_user(id):
@login_manager.request_loader @login_manager.request_loader
def login_via_authorization_header(request): def login_via_authorization_header_or_remote_user(request):
# Try to login using Basic Authentication
auth_header = request.headers.get('Authorization') auth_header = request.headers.get('Authorization')
if auth_header: if auth_header:
auth_method = request.args.get('auth_method', 'LOCAL')
auth_method = 'LDAP' if auth_method != 'LOCAL' else 'LOCAL'
auth_header = auth_header.replace('Basic ', '', 1) auth_header = auth_header.replace('Basic ', '', 1)
try: try:
auth_header = str(base64.b64decode(auth_header), 'utf-8') auth_header = str(base64.b64decode(auth_header), 'utf-8')
username, password = auth_header.split(":") username, password = auth_header.split(":")
except TypeError as e: except TypeError as e:
return None return None
user = User(username=username, user = User(username=username,
password=password, password=password,
plain_text_password=password) plain_text_password=password)
try: return load_if_valid(user, method=auth_method, src_ip=request.remote_addr)
auth_method = request.args.get('auth_method', 'LOCAL')
auth_method = 'LDAP' if auth_method != 'LOCAL' else 'LOCAL' # Try login by checking a REMOTE_USER environment variable
auth = user.is_validate(method=auth_method, remote_user = request.remote_user
src_ip=request.remote_addr) if remote_user and current_app.config.get('REMOTE_USER_ENABLED'):
if auth == False: session_remote_user = session.get('remote_user')
return None
else: # If we already validated a remote user against an authorization method
# login_user(user, remember=False) # a local user should have been created in the database, so we force a 'LOCAL' auth_method
return User.query.filter(User.id==user.id).first() auth_method = 'LOCAL' if session_remote_user else current_app.config.get('REMOTE_AUTH_METHOD', 'LDAP')
except Exception as e: current_app.logger.debug(
current_app.logger.error('Error: {0}'.format(e)) 'REMOTE_USER environment variable found: attempting {0} authentication for username "{1}"'
return None .format(auth_method, remote_user))
user = User(username=remote_user.strip())
valid_remote_user = load_if_valid(user, method=auth_method, src_ip=request.remote_addr, trust_user=True)
if valid_remote_user:
# If we were successful in authenticating a trusted remote user, store it in session
session['remote_user'] = valid_remote_user.username
return valid_remote_user
return None return None

View File

@ -377,8 +377,9 @@ def clear_session():
session.pop('github_token', None) session.pop('github_token', None)
session.pop('google_token', None) session.pop('google_token', None)
session.pop('authentication_type', None) session.pop('authentication_type', None)
session.clear() session.pop('remote_user', None)
logout_user() logout_user()
session.clear()
def signin_history(username, authenticator, success): def signin_history(username, authenticator, success):
@ -434,7 +435,30 @@ def logout():
"urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress", "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress",
session_index=session['samlSessionIndex'], session_index=session['samlSessionIndex'],
name_id=session['samlNameId'])) name_id=session['samlNameId']))
# Clean cookies and flask session
clear_session() clear_session()
# If remote user authentication is enabled and a logout URL is configured for it,
# redirect users to that instead
remote_user_logout_url = current_app.config.get('REMOTE_USER_LOGOUT_URL')
if current_app.config.get('REMOTE_USER_ENABLED') and remote_user_logout_url:
current_app.logger.debug(
'Redirecting remote user "{0}" to logout URL {1}'
.format(current_user.username, remote_user_logout_url))
# Warning: if REMOTE_USER environment variable is still set and not cleared by
# some external module, not defining a custom logout URL will trigger a loop
# that will just log the user back in right after logging out
res = make_response(redirect(remote_user_logout_url.strip()))
# Remove any custom cookies the remote authentication mechanism may use
# (e.g.: MOD_AUTH_CAS and MOD_AUTH_CAS_S)
remote_cookies = current_app.config.get('REMOTE_USER_COOKIES')
for r_cookie_name in utils.ensure_list(remote_cookies):
res.delete_cookie(r_cookie_name)
return res
return redirect(url_for('index.login')) return redirect(url_for('index.login'))