mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-01-06 10:25:40 +00:00
Merge pull request #742 from nfantone/feat/remote-user
Support authenticating using REMOTE_USER environment variable
This commit is contained in:
commit
cfc8567180
@ -130,7 +130,7 @@ SAML_ENABLED = False
|
||||
# SAML_CERT_FILE = '/etc/pki/powerdns-admin/cert.crt'
|
||||
# SAML_CERT_KEY = '/etc/pki/powerdns-admin/key.pem'
|
||||
|
||||
# Cofigures if SAML tokens should be encrypted.
|
||||
# Configures if SAML tokens should be encrypted.
|
||||
# SAML_SIGN_REQUEST = False
|
||||
# #Use SAML standard logout mechanism retreived from idp metadata
|
||||
# #If configured false don't care about SAML session on logout.
|
||||
@ -141,3 +141,19 @@ SAML_ENABLED = False
|
||||
# #SAML_LOGOUT_URL = 'https://google.com'
|
||||
|
||||
# #SAML_ASSERTION_ENCRYPTED = True
|
||||
|
||||
# Remote authentication settings
|
||||
|
||||
# Whether to enable remote user authentication or not
|
||||
# Defaults to False
|
||||
# REMOTE_USER_ENABLED=True
|
||||
|
||||
# If set, users will be redirected to this location on logout
|
||||
# Ignore or set to None to avoid redirecting altogether
|
||||
# Warning: if REMOTE_USER environment variable is still set after logging out and not cleared by
|
||||
# some external module, not defining a custom logout URL might trigger a loop
|
||||
# that will just log the user back in right after logging out
|
||||
# REMOTE_USER_LOGOUT_URL=https://my.sso.com/cas/logout
|
||||
|
||||
# An optional list of remote authentication tied cookies to be removed upon logout
|
||||
# REMOTE_USER_COOKIES=['MOD_AUTH_CAS', 'MOD_AUTH_CAS_S']
|
||||
|
@ -45,7 +45,9 @@ legal_envvars = (
|
||||
'SAML_LOGOUT',
|
||||
'SAML_LOGOUT_URL',
|
||||
'SAML_ASSERTION_ENCRYPTED',
|
||||
'OFFLINE_MODE'
|
||||
'OFFLINE_MODE',
|
||||
'REMOTE_USER_LOGOUT_URL',
|
||||
'REMOTE_USER_COOKIES'
|
||||
)
|
||||
|
||||
legal_envvars_int = ('PORT', 'MAIL_PORT', 'SAML_METADATA_CACHE_LIFETIME')
|
||||
@ -62,7 +64,8 @@ legal_envvars_bool = (
|
||||
'SAML_WANT_MESSAGE_SIGNED',
|
||||
'SAML_LOGOUT',
|
||||
'SAML_ASSERTION_ENCRYPTED',
|
||||
'OFFLINE_MODE'
|
||||
'OFFLINE_MODE',
|
||||
'REMOTE_USER_ENABLED'
|
||||
)
|
||||
|
||||
# import everything from environment variables
|
||||
|
@ -5,6 +5,7 @@ import requests
|
||||
import hashlib
|
||||
import ipaddress
|
||||
|
||||
from collections.abc import Iterable
|
||||
from distutils.version import StrictVersion
|
||||
from urllib.parse import urlparse
|
||||
from datetime import datetime, timedelta
|
||||
@ -212,6 +213,14 @@ def pretty_json(data):
|
||||
return json.dumps(data, sort_keys=True, indent=4)
|
||||
|
||||
|
||||
def ensure_list(l):
|
||||
if not l:
|
||||
l = []
|
||||
elif not isinstance(l, Iterable) or isinstance(l, str):
|
||||
l = [l]
|
||||
|
||||
yield from l
|
||||
|
||||
class customBoxes:
|
||||
boxes = {
|
||||
"reverse": (" ", " "),
|
||||
|
@ -1,7 +1,7 @@
|
||||
import os
|
||||
import base64
|
||||
import bcrypt
|
||||
import traceback
|
||||
import bcrypt
|
||||
import pyotp
|
||||
import ldap
|
||||
import ldap.filter
|
||||
@ -103,7 +103,7 @@ class User(db.Model):
|
||||
return bcrypt.hashpw(pw.encode('utf-8'), bcrypt.gensalt())
|
||||
|
||||
def check_password(self, hashed_password):
|
||||
# Check hased password. Using bcrypt, the salt is saved into the hash itself
|
||||
# Check hashed password. Using bcrypt, the salt is saved into the hash itself
|
||||
if (self.plain_text_password):
|
||||
return bcrypt.checkpw(self.plain_text_password.encode('utf-8'),
|
||||
hashed_password.encode('utf-8'))
|
||||
@ -191,7 +191,7 @@ class User(db.Model):
|
||||
current_app.logger.exception("Recursive AD Group search error")
|
||||
return result
|
||||
|
||||
def is_validate(self, method, src_ip=''):
|
||||
def is_validate(self, method, src_ip='', trust_user=False):
|
||||
"""
|
||||
Validate user credential
|
||||
"""
|
||||
@ -202,8 +202,8 @@ class User(db.Model):
|
||||
User.username == self.username).first()
|
||||
|
||||
if user_info:
|
||||
if user_info.password and self.check_password(
|
||||
user_info.password):
|
||||
if trust_user or (user_info.password and self.check_password(
|
||||
user_info.password)):
|
||||
current_app.logger.info(
|
||||
'User "{0}" logged in successfully. Authentication request from {1}'
|
||||
.format(self.username, src_ip))
|
||||
@ -231,7 +231,7 @@ class User(db.Model):
|
||||
LDAP_GROUP_SECURITY_ENABLED = Setting().get('ldap_sg_enabled')
|
||||
|
||||
# validate AD user password
|
||||
if Setting().get('ldap_type') == 'ad':
|
||||
if Setting().get('ldap_type') == 'ad' and not trust_user:
|
||||
ldap_username = "{0}@{1}".format(self.username,
|
||||
Setting().get('ldap_domain'))
|
||||
if not self.ldap_auth(ldap_username, self.password):
|
||||
@ -258,7 +258,7 @@ class User(db.Model):
|
||||
ldap_username = ldap.filter.escape_filter_chars(
|
||||
ldap_result[0][0][0])
|
||||
|
||||
if Setting().get('ldap_type') != 'ad':
|
||||
if Setting().get('ldap_type') != 'ad' and not trust_user:
|
||||
# validate ldap user password
|
||||
if not self.ldap_auth(ldap_username, self.password):
|
||||
current_app.logger.error(
|
||||
@ -588,4 +588,4 @@ class User(db.Model):
|
||||
db.session.commit()
|
||||
return {'status': True, 'msg': 'Set user role successfully'}
|
||||
else:
|
||||
return {'status': False, 'msg': 'Role does not exist'}
|
||||
return {'status': False, 'msg': 'Role does not exist'}
|
||||
|
@ -28,6 +28,19 @@ def handle_internal_server_error(e):
|
||||
return render_template('errors/500.html', code=500, message=e), 500
|
||||
|
||||
|
||||
def load_if_valid(user, method, src_ip, trust_user = False):
|
||||
try:
|
||||
auth = user.is_validate(method, src_ip, trust_user)
|
||||
if auth == False:
|
||||
return None
|
||||
else:
|
||||
# login_user(user, remember=False)
|
||||
return User.query.filter(User.id==user.id).first()
|
||||
except Exception as e:
|
||||
current_app.logger.error('Error: {0}'.format(e))
|
||||
return None
|
||||
|
||||
|
||||
@login_manager.user_loader
|
||||
def load_user(id):
|
||||
"""
|
||||
@ -37,29 +50,42 @@ def load_user(id):
|
||||
|
||||
|
||||
@login_manager.request_loader
|
||||
def login_via_authorization_header(request):
|
||||
def login_via_authorization_header_or_remote_user(request):
|
||||
# Try to login using Basic Authentication
|
||||
auth_header = request.headers.get('Authorization')
|
||||
if auth_header:
|
||||
auth_method = request.args.get('auth_method', 'LOCAL')
|
||||
auth_method = 'LDAP' if auth_method != 'LOCAL' else 'LOCAL'
|
||||
auth_header = auth_header.replace('Basic ', '', 1)
|
||||
try:
|
||||
auth_header = str(base64.b64decode(auth_header), 'utf-8')
|
||||
username, password = auth_header.split(":")
|
||||
except TypeError as e:
|
||||
return None
|
||||
|
||||
user = User(username=username,
|
||||
password=password,
|
||||
plain_text_password=password)
|
||||
try:
|
||||
auth_method = request.args.get('auth_method', 'LOCAL')
|
||||
auth_method = 'LDAP' if auth_method != 'LOCAL' else 'LOCAL'
|
||||
auth = user.is_validate(method=auth_method,
|
||||
src_ip=request.remote_addr)
|
||||
if auth == False:
|
||||
return None
|
||||
else:
|
||||
# login_user(user, remember=False)
|
||||
return User.query.filter(User.id==user.id).first()
|
||||
except Exception as e:
|
||||
current_app.logger.error('Error: {0}'.format(e))
|
||||
return None
|
||||
return load_if_valid(user, method=auth_method, src_ip=request.remote_addr)
|
||||
|
||||
# Try login by checking a REMOTE_USER environment variable
|
||||
remote_user = request.remote_user
|
||||
if remote_user and current_app.config.get('REMOTE_USER_ENABLED'):
|
||||
session_remote_user = session.get('remote_user')
|
||||
|
||||
# If we already validated a remote user against an authorization method
|
||||
# a local user should have been created in the database, so we force a 'LOCAL' auth_method
|
||||
auth_method = 'LOCAL' if session_remote_user else current_app.config.get('REMOTE_AUTH_METHOD', 'LDAP')
|
||||
current_app.logger.debug(
|
||||
'REMOTE_USER environment variable found: attempting {0} authentication for username "{1}"'
|
||||
.format(auth_method, remote_user))
|
||||
user = User(username=remote_user.strip())
|
||||
valid_remote_user = load_if_valid(user, method=auth_method, src_ip=request.remote_addr, trust_user=True)
|
||||
|
||||
if valid_remote_user:
|
||||
# If we were successful in authenticating a trusted remote user, store it in session
|
||||
session['remote_user'] = valid_remote_user.username
|
||||
|
||||
return valid_remote_user
|
||||
|
||||
return None
|
||||
|
@ -377,8 +377,9 @@ def clear_session():
|
||||
session.pop('github_token', None)
|
||||
session.pop('google_token', None)
|
||||
session.pop('authentication_type', None)
|
||||
session.clear()
|
||||
session.pop('remote_user', None)
|
||||
logout_user()
|
||||
session.clear()
|
||||
|
||||
|
||||
def signin_history(username, authenticator, success):
|
||||
@ -434,7 +435,30 @@ def logout():
|
||||
"urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress",
|
||||
session_index=session['samlSessionIndex'],
|
||||
name_id=session['samlNameId']))
|
||||
|
||||
# Clean cookies and flask session
|
||||
clear_session()
|
||||
|
||||
# If remote user authentication is enabled and a logout URL is configured for it,
|
||||
# redirect users to that instead
|
||||
remote_user_logout_url = current_app.config.get('REMOTE_USER_LOGOUT_URL')
|
||||
if current_app.config.get('REMOTE_USER_ENABLED') and remote_user_logout_url:
|
||||
current_app.logger.debug(
|
||||
'Redirecting remote user "{0}" to logout URL {1}'
|
||||
.format(current_user.username, remote_user_logout_url))
|
||||
# Warning: if REMOTE_USER environment variable is still set and not cleared by
|
||||
# some external module, not defining a custom logout URL will trigger a loop
|
||||
# that will just log the user back in right after logging out
|
||||
res = make_response(redirect(remote_user_logout_url.strip()))
|
||||
|
||||
# Remove any custom cookies the remote authentication mechanism may use
|
||||
# (e.g.: MOD_AUTH_CAS and MOD_AUTH_CAS_S)
|
||||
remote_cookies = current_app.config.get('REMOTE_USER_COOKIES')
|
||||
for r_cookie_name in utils.ensure_list(remote_cookies):
|
||||
res.delete_cookie(r_cookie_name)
|
||||
|
||||
return res
|
||||
|
||||
return redirect(url_for('index.login'))
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user