Completed the implementation of the SERVER_EXTERNAL_SSL environment setting into the app config files.

Completed the implementation of the aforementioned environment setting into the OAuth workflows.

Documented the aforementioned setting in the Environment-variables.md wiki document.
This commit is contained in:
Matt Scott 2023-04-08 17:05:27 -04:00
parent cacfc042e2
commit ab4495dc46
No known key found for this signature in database
GPG Key ID: A9A0AFFC0E079001
8 changed files with 270 additions and 225 deletions

View File

@ -1,3 +1,8 @@
# import everything from environment variables
import os
import sys
import json
# Defaults for Docker image # Defaults for Docker image
BIND_ADDRESS = '0.0.0.0' BIND_ADDRESS = '0.0.0.0'
PORT = 80 PORT = 80
@ -23,6 +28,7 @@ legal_envvars = (
'OIDC_OAUTH_EMAIL', 'OIDC_OAUTH_EMAIL',
'BIND_ADDRESS', 'BIND_ADDRESS',
'PORT', 'PORT',
'SERVER_EXTERNAL_SSL',
'LOG_LEVEL', 'LOG_LEVEL',
'SALT', 'SALT',
'SQLALCHEMY_TRACK_MODIFICATIONS', 'SQLALCHEMY_TRACK_MODIFICATIONS',
@ -97,21 +103,18 @@ legal_envvars_bool = (
'SESSION_COOKIE_SECURE', 'SESSION_COOKIE_SECURE',
'CSRF_COOKIE_SECURE', 'CSRF_COOKIE_SECURE',
'CAPTCHA_ENABLE', 'CAPTCHA_ENABLE',
'SERVER_EXTERNAL_SSL',
) )
legal_envvars_dict = ( legal_envvars_dict = (
'SQLALCHEMY_ENGINE_OPTIONS', 'SQLALCHEMY_ENGINE_OPTIONS',
) )
# import everything from environment variables
import os
import sys
import json
def str2bool(v): def str2bool(v):
return v.lower() in ("true", "yes", "1") return v.lower() in ("true", "yes", "1")
def dictfromstr(v,ret):
def dictfromstr(v, ret):
try: try:
return json.loads(ret) return json.loads(ret)
except Exception as e: except Exception as e:
@ -119,10 +122,11 @@ def dictfromstr(v,ret):
print(e) print(e)
raise ValueError raise ValueError
for v in legal_envvars: for v in legal_envvars:
ret = None ret = None
# _FILE suffix will allow to read value from file, usefull for Docker's # _FILE suffix will allow to read value from file, useful for Docker containers.
# secrets feature # secrets feature
if v + '_FILE' in os.environ: if v + '_FILE' in os.environ:
if v in os.environ: if v in os.environ:

View File

@ -1,64 +1,65 @@
# Supported environment variables # Supported environment variables
| Variable | Description | Required | Default value | | Variable | Description | Required | Default value |
| ---------| ----------- | -------- | ------------- | |--------------------------------|--------------------------------------------------------------------------|------------|---------------|
| BIND_ADDRESS | | BIND_ADDRESS |
| CSRF_COOKIE_SECURE | | CSRF_COOKIE_SECURE |
| SESSION_TYPE | null|filesystem|sqlalchemy | | filesystem | | SESSION_TYPE | null | filesystem | sqlalchemy | | filesystem |
| LDAP_ENABLED | | LDAP_ENABLED |
| LOCAL_DB_ENABLED | | LOCAL_DB_ENABLED |
| LOG_LEVEL | | LOG_LEVEL |
| MAIL_DEBUG | | MAIL_DEBUG |
| MAIL_DEFAULT_SENDER | | MAIL_DEFAULT_SENDER |
| MAIL_PASSWORD | | MAIL_PASSWORD |
| MAIL_PORT | | MAIL_PORT |
| MAIL_SERVER | | MAIL_SERVER |
| MAIL_USERNAME | | MAIL_USERNAME |
| MAIL_USE_SSL | | MAIL_USE_SSL |
| MAIL_USE_TLS | | MAIL_USE_TLS |
| OFFLINE_MODE | | OFFLINE_MODE |
| OIDC_OAUTH_API_URL | | | | | OIDC_OAUTH_API_URL | | | |
| OIDC_OAUTH_AUTHORIZE_URL | | OIDC_OAUTH_AUTHORIZE_URL |
| OIDC_OAUTH_TOKEN_URL | | | | | OIDC_OAUTH_TOKEN_URL | | | |
| OIDC_OAUTH_METADATA_URL | | | | | OIDC_OAUTH_METADATA_URL | | | |
| PORT | | PORT |
| REMOTE_USER_COOKIES | | SERVER_EXTERNAL_SSL | Forceful override of URL schema detection when using the url_for method. | False | None |
| REMOTE_USER_LOGOUT_URL | | REMOTE_USER_COOKIES |
| SALT | | REMOTE_USER_LOGOUT_URL |
| SAML_ASSERTION_ENCRYPTED | | SALT |
| SAML_ATTRIBUTE_ACCOUNT | | SAML_ASSERTION_ENCRYPTED |
| SAML_ATTRIBUTE_ADMIN | | SAML_ATTRIBUTE_ACCOUNT |
| SAML_ATTRIBUTE_EMAIL | | SAML_ATTRIBUTE_ADMIN |
| SAML_ATTRIBUTE_GIVENNAME | | SAML_ATTRIBUTE_EMAIL |
| SAML_ATTRIBUTE_GROUP | | SAML_ATTRIBUTE_GIVENNAME |
| SAML_ATTRIBUTE_NAME | | SAML_ATTRIBUTE_GROUP |
| SAML_ATTRIBUTE_SURNAME | | SAML_ATTRIBUTE_NAME |
| SAML_ATTRIBUTE_USERNAME | | SAML_ATTRIBUTE_SURNAME |
| SAML_CERT | | SAML_ATTRIBUTE_USERNAME |
| SAML_DEBUG | | SAML_CERT |
| SAML_ENABLED | | SAML_DEBUG |
| SAML_GROUP_ADMIN_NAME | | SAML_ENABLED |
| SAML_GROUP_TO_ACCOUNT_MAPPING | | SAML_GROUP_ADMIN_NAME |
| SAML_IDP_SSO_BINDING | | SAML_GROUP_TO_ACCOUNT_MAPPING |
| SAML_IDP_ENTITY_ID | | SAML_IDP_SSO_BINDING |
| SAML_KEY | | SAML_IDP_ENTITY_ID |
| SAML_LOGOUT | | SAML_KEY |
| SAML_LOGOUT_URL | | SAML_LOGOUT |
| SAML_METADATA_CACHE_LIFETIME | | SAML_LOGOUT_URL |
| SAML_METADATA_URL | | SAML_METADATA_CACHE_LIFETIME |
| SAML_NAMEID_FORMAT | | SAML_METADATA_URL |
| SAML_PATH | | SAML_NAMEID_FORMAT |
| SAML_SIGN_REQUEST | | SAML_PATH |
| SAML_SP_CONTACT_MAIL | | SAML_SIGN_REQUEST |
| SAML_SP_CONTACT_NAME | | SAML_SP_CONTACT_MAIL |
| SAML_SP_ENTITY_ID | | SAML_SP_CONTACT_NAME |
| SAML_WANT_MESSAGE_SIGNED | | SAML_SP_ENTITY_ID |
| SECRET_KEY | Flask secret key [^1] | Y | no default | | SAML_WANT_MESSAGE_SIGNED |
| SESSION_COOKIE_SECURE | | SECRET_KEY | Flask secret key [^1] | Y | no default |
| SIGNUP_ENABLED | | SESSION_COOKIE_SECURE |
| SQLALCHEMY_DATABASE_URI | SQL Alchemy URI to connect to database | N | no default | | SIGNUP_ENABLED |
| SQLALCHEMY_DATABASE_URI | SQL Alchemy URI to connect to database | N | no default |
| SQLALCHEMY_TRACK_MODIFICATIONS | | SQLALCHEMY_TRACK_MODIFICATIONS |
| SQLALCHEMY_ENGINE_OPTIONS | json string. e.g. '{"pool_recycle":600,"echo":1}' [^2] | | SQLALCHEMY_ENGINE_OPTIONS | json string. e.g. '{"pool_recycle":600,"echo":1}' [^2] |
[^1]: Flask secret key (see https://flask.palletsprojects.com/en/1.1.x/config/#SECRET_KEY for how to generate) [^1]: Flask secret key (see https://flask.palletsprojects.com/en/1.1.x/config/#SECRET_KEY for how to generate)
[^2]: See Flask-SQLAlchemy Documentation for all engine options. [^2]: See Flask-SQLAlchemy Documentation for all engine options.

View File

@ -8,6 +8,7 @@ SECRET_KEY = 'e951e5a1f4b94151b360f47edf596dd2'
BIND_ADDRESS = '0.0.0.0' BIND_ADDRESS = '0.0.0.0'
PORT = 9191 PORT = 9191
HSTS_ENABLED = False HSTS_ENABLED = False
SERVER_EXTERNAL_SSL = None
SESSION_TYPE = 'sqlalchemy' SESSION_TYPE = 'sqlalchemy'
SESSION_COOKIE_SAMESITE = 'Lax' SESSION_COOKIE_SAMESITE = 'Lax'

View File

@ -45,6 +45,7 @@ index_bp = Blueprint('index',
template_folder='templates', template_folder='templates',
url_prefix='/') url_prefix='/')
@index_bp.before_app_first_request @index_bp.before_app_first_request
def register_modules(): def register_modules():
global google global google
@ -68,7 +69,7 @@ def before_request():
# Check site is in maintenance mode # Check site is in maintenance mode
maintenance = Setting().get('maintenance') maintenance = Setting().get('maintenance')
if maintenance and current_user.is_authenticated and current_user.role.name not in [ if maintenance and current_user.is_authenticated and current_user.role.name not in [
'Administrator', 'Operator' 'Administrator', 'Operator'
]: ]:
return render_template('maintenance.html') return render_template('maintenance.html')
@ -98,7 +99,11 @@ def google_login():
) )
abort(400) abort(400)
else: else:
redirect_uri = url_for('google_authorized', _external=True) use_ssl = current_app.config.get('SERVER_EXTERNAL_SSL')
params = {'_external': True}
if isinstance(use_ssl, bool):
params['_scheme'] = 'https' if use_ssl else 'http'
redirect_uri = url_for('google_authorized', **params)
return google.authorize_redirect(redirect_uri) return google.authorize_redirect(redirect_uri)
@ -110,7 +115,11 @@ def github_login():
) )
abort(400) abort(400)
else: else:
redirect_uri = url_for('github_authorized', _external=True) use_ssl = current_app.config.get('SERVER_EXTERNAL_SSL')
params = {'_external': True}
if isinstance(use_ssl, bool):
params['_scheme'] = 'https' if use_ssl else 'http'
redirect_uri = url_for('github_authorized', **params)
return github.authorize_redirect(redirect_uri) return github.authorize_redirect(redirect_uri)
@ -122,9 +131,11 @@ def azure_login():
) )
abort(400) abort(400)
else: else:
redirect_uri = url_for('azure_authorized', use_ssl = current_app.config.get('SERVER_EXTERNAL_SSL')
_external=True, params = {'_external': True}
_scheme='https') if isinstance(use_ssl, bool):
params['_scheme'] = 'https' if use_ssl else 'http'
redirect_uri = url_for('azure_authorized', **params)
return azure.authorize_redirect(redirect_uri) return azure.authorize_redirect(redirect_uri)
@ -136,7 +147,11 @@ def oidc_login():
) )
abort(400) abort(400)
else: else:
redirect_uri = url_for('oidc_authorized', _external=True) use_ssl = current_app.config.get('SERVER_EXTERNAL_SSL')
params = {'_external': True}
if isinstance(use_ssl, bool):
params['_scheme'] = 'https' if use_ssl else 'http'
redirect_uri = url_for('oidc_authorized', **params)
return oidc.authorize_redirect(redirect_uri) return oidc.authorize_redirect(redirect_uri)
@ -198,7 +213,7 @@ def login():
if 'azure_token' in session: if 'azure_token' in session:
azure_info = azure.get('me?$select=displayName,givenName,id,mail,surname,userPrincipalName').text azure_info = azure.get('me?$select=displayName,givenName,id,mail,surname,userPrincipalName').text
current_app.logger.info('Azure login returned: '+azure_info) current_app.logger.info('Azure login returned: ' + azure_info)
me = json.loads(azure_info) me = json.loads(azure_info)
azure_info = azure.post('me/getMemberGroups', azure_info = azure.post('me/getMemberGroups',
@ -250,30 +265,30 @@ def login():
if Setting().get('azure_sg_enabled'): if Setting().get('azure_sg_enabled'):
if Setting().get('azure_admin_group') in mygroups: if Setting().get('azure_admin_group') in mygroups:
current_app.logger.info('Setting role for user ' + current_app.logger.info('Setting role for user ' +
azure_username + azure_username +
' to Administrator due to group membership') ' to Administrator due to group membership')
user.set_role("Administrator") user.set_role("Administrator")
else: else:
if Setting().get('azure_operator_group') in mygroups: if Setting().get('azure_operator_group') in mygroups:
current_app.logger.info('Setting role for user ' + current_app.logger.info('Setting role for user ' +
azure_username + azure_username +
' to Operator due to group membership') ' to Operator due to group membership')
user.set_role("Operator") user.set_role("Operator")
else: else:
if Setting().get('azure_user_group') in mygroups: if Setting().get('azure_user_group') in mygroups:
current_app.logger.info('Setting role for user ' + current_app.logger.info('Setting role for user ' +
azure_username + azure_username +
' to User due to group membership') ' to User due to group membership')
user.set_role("User") user.set_role("User")
else: else:
current_app.logger.warning('User ' + current_app.logger.warning('User ' +
azure_username + azure_username +
' has no relevant group memberships') ' has no relevant group memberships')
session.pop('azure_token', None) session.pop('azure_token', None)
return render_template('login.html', return render_template('login.html',
saml_enabled=SAML_ENABLED, saml_enabled=SAML_ENABLED,
error=('User ' + azure_username + error=('User ' + azure_username +
' is not in any authorised groups.')) ' is not in any authorised groups.'))
# Handle account/group creation, if enabled # Handle account/group creation, if enabled
if Setting().get('azure_group_accounts_enabled') and mygroups: if Setting().get('azure_group_accounts_enabled') and mygroups:
@ -394,20 +409,21 @@ def login():
session.pop('oidc_token', None) session.pop('oidc_token', None)
return redirect(url_for('index.login')) return redirect(url_for('index.login'))
#This checks if the account_name_property and account_description property were included in settings. # This checks if the account_name_property and account_description property were included in settings.
if Setting().get('oidc_oauth_account_name_property') and Setting().get('oidc_oauth_account_description_property'): if Setting().get('oidc_oauth_account_name_property') and Setting().get(
'oidc_oauth_account_description_property'):
#Gets the name_property and description_property. # Gets the name_property and description_property.
name_prop = Setting().get('oidc_oauth_account_name_property') name_prop = Setting().get('oidc_oauth_account_name_property')
desc_prop = Setting().get('oidc_oauth_account_description_property') desc_prop = Setting().get('oidc_oauth_account_description_property')
account_to_add = [] account_to_add = []
#If the name_property and desc_property exist in me (A variable that contains all the userinfo from the IdP). # If the name_property and desc_property exist in me (A variable that contains all the userinfo from the IdP).
if name_prop in me and desc_prop in me: if name_prop in me and desc_prop in me:
accounts_name_prop = [me[name_prop]] if type(me[name_prop]) is not list else me[name_prop] accounts_name_prop = [me[name_prop]] if type(me[name_prop]) is not list else me[name_prop]
accounts_desc_prop = [me[desc_prop]] if type(me[desc_prop]) is not list else me[desc_prop] accounts_desc_prop = [me[desc_prop]] if type(me[desc_prop]) is not list else me[desc_prop]
#Run on all groups the user is in by the index num. # Run on all groups the user is in by the index num.
for i in range(len(accounts_name_prop)): for i in range(len(accounts_name_prop)):
description = '' description = ''
if i < len(accounts_desc_prop): if i < len(accounts_desc_prop):
@ -417,7 +433,7 @@ def login():
account_to_add.append(account) account_to_add.append(account)
user_accounts = user.get_accounts() user_accounts = user.get_accounts()
# Add accounts # Add accounts
for account in account_to_add: for account in account_to_add:
if account not in user_accounts: if account not in user_accounts:
account.add_user(user) account.add_user(user)
@ -426,7 +442,7 @@ def login():
if Setting().get('delete_sso_accounts'): if Setting().get('delete_sso_accounts'):
for account in user_accounts: for account in user_accounts:
if account not in account_to_add: if account not in account_to_add:
account.remove_user(user) account.remove_user(user)
session['user_id'] = user.id session['user_id'] = user.id
session['authentication_type'] = 'OAuth' session['authentication_type'] = 'OAuth'
@ -490,34 +506,36 @@ def login():
saml_enabled=SAML_ENABLED, saml_enabled=SAML_ENABLED,
error='Token required') error='Token required')
if Setting().get('autoprovisioning') and auth_method!='LOCAL': if Setting().get('autoprovisioning') and auth_method != 'LOCAL':
urn_value=Setting().get('urn_value') urn_value = Setting().get('urn_value')
Entitlements=user.read_entitlements(Setting().get('autoprovisioning_attribute')) Entitlements = user.read_entitlements(Setting().get('autoprovisioning_attribute'))
if len(Entitlements)==0 and Setting().get('purge'): if len(Entitlements) == 0 and Setting().get('purge'):
user.set_role("User") user.set_role("User")
user.revoke_privilege(True) user.revoke_privilege(True)
elif len(Entitlements)!=0: elif len(Entitlements) != 0:
if checkForPDAEntries(Entitlements, urn_value): if checkForPDAEntries(Entitlements, urn_value):
user.updateUser(Entitlements) user.updateUser(Entitlements)
else: else:
current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix') current_app.logger.warning(
'Not a single powerdns-admin record was found, possibly a typo in the prefix')
if Setting().get('purge'): if Setting().get('purge'):
user.set_role("User") user.set_role("User")
user.revoke_privilege(True) user.revoke_privilege(True)
current_app.logger.warning('Procceding to revoke every privilige from ' + user.username + '.' ) current_app.logger.warning('Procceding to revoke every privilige from ' + user.username + '.')
return authenticate_user(user, auth_method, remember_me) return authenticate_user(user, auth_method, remember_me)
def checkForPDAEntries(Entitlements, urn_value): def checkForPDAEntries(Entitlements, urn_value):
""" """
Run through every record located in the ldap attribute given and determine if there are any valid powerdns-admin records Run through every record located in the ldap attribute given and determine if there are any valid powerdns-admin records
""" """
urnArguments=[x.lower() for x in urn_value.split(':')] urnArguments = [x.lower() for x in urn_value.split(':')]
for Entitlement in Entitlements: for Entitlement in Entitlements:
entArguments=Entitlement.split(':powerdns-admin') entArguments = Entitlement.split(':powerdns-admin')
entArguments=[x.lower() for x in entArguments[0].split(':')] entArguments = [x.lower() for x in entArguments[0].split(':')]
if (entArguments==urnArguments): if (entArguments == urnArguments):
return True return True
return False return False
@ -553,14 +571,15 @@ def signin_history(username, authenticator, success):
# Write history # Write history
History(msg='User {} authentication {}'.format(username, str_success), History(msg='User {} authentication {}'.format(username, str_success),
detail = json.dumps({ detail=json.dumps({
'username': username, 'username': username,
'authenticator': authenticator, 'authenticator': authenticator,
'ip_address': request_ip, 'ip_address': request_ip,
'success': 1 if success else 0 'success': 1 if success else 0
}), }),
created_by='System').add() created_by='System').add()
# Get a list of Azure security groups the user is a member of # Get a list of Azure security groups the user is a member of
def get_azure_groups(uri): def get_azure_groups(uri):
azure_info = azure.get(uri).text azure_info = azure.get(uri).text
@ -576,30 +595,33 @@ def get_azure_groups(uri):
mygroups = [] mygroups = []
return mygroups return mygroups
# Handle user login, write history and, if set, handle showing the register_otp QR code. # Handle user login, write history and, if set, handle showing the register_otp QR code.
# if Setting for OTP on first login is enabled, and OTP field is also enabled, # if Setting for OTP on first login is enabled, and OTP field is also enabled,
# but user isn't using it yet, enable OTP, get QR code and display it, logging the user out. # but user isn't using it yet, enable OTP, get QR code and display it, logging the user out.
def authenticate_user(user, authenticator, remember=False): def authenticate_user(user, authenticator, remember=False):
login_user(user, remember=remember) login_user(user, remember=remember)
signin_history(user.username, authenticator, True) signin_history(user.username, authenticator, True)
if Setting().get('otp_force') and Setting().get('otp_field_enabled') and not user.otp_secret and session['authentication_type'] not in ['OAuth']: if Setting().get('otp_force') and Setting().get('otp_field_enabled') and not user.otp_secret \
and session['authentication_type'] not in ['OAuth']:
user.update_profile(enable_otp=True) user.update_profile(enable_otp=True)
user_id = current_user.id user_id = current_user.id
prepare_welcome_user(user_id) prepare_welcome_user(user_id)
return redirect(url_for('index.welcome')) return redirect(url_for('index.welcome'))
return redirect(url_for('index.login')) return redirect(url_for('index.login'))
# Prepare user to enter /welcome screen, otherwise they won't have permission to do so # Prepare user to enter /welcome screen, otherwise they won't have permission to do so
def prepare_welcome_user(user_id): def prepare_welcome_user(user_id):
logout_user() logout_user()
session['welcome_user_id'] = user_id session['welcome_user_id'] = user_id
@index_bp.route('/logout') @index_bp.route('/logout')
def logout(): def logout():
if current_app.config.get( if current_app.config.get(
'SAML_ENABLED' 'SAML_ENABLED'
) and 'samlSessionIndex' in session and current_app.config.get( ) and 'samlSessionIndex' in session and current_app.config.get('SAML_LOGOUT'):
'SAML_LOGOUT'):
req = saml.prepare_flask_request(request) req = saml.prepare_flask_request(request)
auth = saml.init_saml_auth(req) auth = saml.init_saml_auth(req)
if current_app.config.get('SAML_LOGOUT_URL'): if current_app.config.get('SAML_LOGOUT_URL'):
@ -651,13 +673,12 @@ def logout():
def password_policy_check(user, password): def password_policy_check(user, password):
def check_policy(chars, user_password, setting): def check_policy(chars, user_password, setting):
lenreq = int(Setting().get(setting)) setting_as_int = int(Setting().get(setting))
test_string = user_password test_string = user_password
for c in chars: for c in chars:
test_string = test_string.replace(c, '') test_string = test_string.replace(c, '')
return (lenreq, len(user_password) - len(test_string)) return (setting_as_int, len(user_password) - len(test_string))
def matches_policy(item, policy_fails): def matches_policy(item, policy_fails):
return "*" if item in policy_fails else "" return "*" if item in policy_fails else ""
@ -704,12 +725,14 @@ def password_policy_check(user, password):
(pwd_min_lowercase_setting, pwd_lowercase) = check_policy(string.digits, password, 'pwd_min_lowercase') (pwd_min_lowercase_setting, pwd_lowercase) = check_policy(string.digits, password, 'pwd_min_lowercase')
if pwd_lowercase < pwd_min_lowercase_setting: if pwd_lowercase < pwd_min_lowercase_setting:
policy_fails["lowercase"] = True policy_fails["lowercase"] = True
policy.append(f"{matches_policy('lowercase', policy_fails)}lowercase={pwd_lowercase}/{pwd_min_lowercase_setting}") policy.append(
f"{matches_policy('lowercase', policy_fails)}lowercase={pwd_lowercase}/{pwd_min_lowercase_setting}")
# Uppercase # Uppercase
(pwd_min_uppercase_setting, pwd_uppercase) = check_policy(string.digits, password, 'pwd_min_uppercase') (pwd_min_uppercase_setting, pwd_uppercase) = check_policy(string.digits, password, 'pwd_min_uppercase')
if pwd_uppercase < pwd_min_uppercase_setting: if pwd_uppercase < pwd_min_uppercase_setting:
policy_fails["uppercase"] = True policy_fails["uppercase"] = True
policy.append(f"{matches_policy('uppercase', policy_fails)}uppercase={pwd_uppercase}/{pwd_min_uppercase_setting}") policy.append(
f"{matches_policy('uppercase', policy_fails)}uppercase={pwd_uppercase}/{pwd_min_uppercase_setting}")
# Special # Special
(pwd_min_special_setting, pwd_special) = check_policy(string.digits, password, 'pwd_min_special') (pwd_min_special_setting, pwd_special) = check_policy(string.digits, password, 'pwd_min_special')
if pwd_special < pwd_min_special_setting: if pwd_special < pwd_min_special_setting:
@ -728,7 +751,8 @@ def password_policy_check(user, password):
pwd_complexity = result['guesses_log10'] pwd_complexity = result['guesses_log10']
if pwd_complexity < pwd_min_complexity_setting: if pwd_complexity < pwd_min_complexity_setting:
policy_fails["complexity"] = True policy_fails["complexity"] = True
policy.append(f"{matches_policy('complexity', policy_fails)}complexity={pwd_complexity:.0f}/{pwd_min_complexity_setting}") policy.append(
f"{matches_policy('complexity', policy_fails)}complexity={pwd_complexity:.0f}/{pwd_min_complexity_setting}")
policy_str = {"password": f"Fails policy: {', '.join(policy)}. Items prefixed with '*' failed."} policy_str = {"password": f"Fails policy: {', '.join(policy)}. Items prefixed with '*' failed."}
@ -738,77 +762,78 @@ def password_policy_check(user, password):
@index_bp.route('/register', methods=['GET', 'POST']) @index_bp.route('/register', methods=['GET', 'POST'])
def register(): def register():
CAPTCHA_ENABLE = current_app.config.get('CAPTCHA_ENABLE') CAPTCHA_ENABLE = current_app.config.get('CAPTCHA_ENABLE')
if Setting().get('signup_enabled'): if Setting().get('signup_enabled'):
if current_user.is_authenticated: if current_user.is_authenticated:
return redirect(url_for('index.index')) return redirect(url_for('index.index'))
if request.method == 'GET': if request.method == 'GET':
return render_template('register.html', captcha_enable=CAPTCHA_ENABLE) return render_template('register.html', captcha_enable=CAPTCHA_ENABLE)
elif request.method == 'POST': elif request.method == 'POST':
username = request.form.get('username', '').strip() username = request.form.get('username', '').strip()
password = request.form.get('password', '') password = request.form.get('password', '')
firstname = request.form.get('firstname', '').strip() firstname = request.form.get('firstname', '').strip()
lastname = request.form.get('lastname', '').strip() lastname = request.form.get('lastname', '').strip()
email = request.form.get('email', '').strip() email = request.form.get('email', '').strip()
rpassword = request.form.get('rpassword', '') rpassword = request.form.get('rpassword', '')
is_valid_email = re.compile(r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$') is_valid_email = re.compile(r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$')
error_messages = {} error_messages = {}
if not firstname: if not firstname:
error_messages['firstname'] = 'First Name is required' error_messages['firstname'] = 'First Name is required'
if not lastname: if not lastname:
error_messages['lastname'] = 'Last Name is required' error_messages['lastname'] = 'Last Name is required'
if not username: if not username:
error_messages['username'] = 'Username is required' error_messages['username'] = 'Username is required'
if not password: if not password:
error_messages['password'] = 'Password is required' error_messages['password'] = 'Password is required'
if not rpassword: if not rpassword:
error_messages['rpassword'] = 'Password confirmation is required' error_messages['rpassword'] = 'Password confirmation is required'
if not email: if not email:
error_messages['email'] = 'Email is required' error_messages['email'] = 'Email is required'
if not is_valid_email.match(email): if not is_valid_email.match(email):
error_messages['email'] = 'Invalid email address' error_messages['email'] = 'Invalid email address'
if password != rpassword: if password != rpassword:
error_messages['password'] = 'Password confirmation does not match' error_messages['password'] = 'Password confirmation does not match'
error_messages['rpassword'] = 'Password confirmation does not match' error_messages['rpassword'] = 'Password confirmation does not match'
if not captcha.validate(): if not captcha.validate():
return render_template( return render_template(
'register.html', error='Invalid CAPTCHA answer', error_messages=error_messages, captcha_enable=CAPTCHA_ENABLE) 'register.html', error='Invalid CAPTCHA answer', error_messages=error_messages,
captcha_enable=CAPTCHA_ENABLE)
if error_messages: if error_messages:
return render_template('register.html', error_messages=error_messages, captcha_enable=CAPTCHA_ENABLE) return render_template('register.html', error_messages=error_messages, captcha_enable=CAPTCHA_ENABLE)
user = User(username=username, user = User(username=username,
plain_text_password=password, plain_text_password=password,
firstname=firstname, firstname=firstname,
lastname=lastname, lastname=lastname,
email=email email=email
) )
(password_policy_pass, password_policy) = password_policy_check(user, password) (password_policy_pass, password_policy) = password_policy_check(user, password)
if not password_policy_pass: if not password_policy_pass:
return render_template('register.html', error_messages=password_policy, captcha_enable=CAPTCHA_ENABLE) return render_template('register.html', error_messages=password_policy, captcha_enable=CAPTCHA_ENABLE)
try: try:
result = user.create_local_user() result = user.create_local_user()
if result and result['status']: if result and result['status']:
if Setting().get('verify_user_email'): if Setting().get('verify_user_email'):
send_account_verification(email) send_account_verification(email)
if Setting().get('otp_force') and Setting().get('otp_field_enabled'): if Setting().get('otp_force') and Setting().get('otp_field_enabled'):
user.update_profile(enable_otp=True) user.update_profile(enable_otp=True)
prepare_welcome_user(user.id) prepare_welcome_user(user.id)
return redirect(url_for('index.welcome')) return redirect(url_for('index.welcome'))
else: else:
return redirect(url_for('index.login')) return redirect(url_for('index.login'))
else:
return render_template('register.html',
error=result['msg'], captcha_enable=CAPTCHA_ENABLE)
except Exception as e:
return render_template('register.html', error=e, captcha_enable=CAPTCHA_ENABLE)
else: else:
return render_template('register.html', return render_template('errors/404.html'), 404
error=result['msg'], captcha_enable=CAPTCHA_ENABLE)
except Exception as e:
return render_template('register.html', error=e, captcha_enable=CAPTCHA_ENABLE)
else:
return render_template('errors/404.html'), 404
# Show welcome page on first login if otp_force is enabled # Show welcome page on first login if otp_force is enabled
@ -827,12 +852,15 @@ def welcome():
if otp_token and otp_token.isdigit(): if otp_token and otp_token.isdigit():
good_token = user.verify_totp(otp_token) good_token = user.verify_totp(otp_token)
if not good_token: if not good_token:
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user, error="Invalid token") return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user,
error="Invalid token")
else: else:
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user, error="Token required") return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user,
error="Token required")
session.pop('welcome_user_id') session.pop('welcome_user_id')
return redirect(url_for('index.index')) return redirect(url_for('index.index'))
@index_bp.route('/confirm/<token>', methods=['GET']) @index_bp.route('/confirm/<token>', methods=['GET'])
def confirm_email(token): def confirm_email(token):
email = confirm_token(token) email = confirm_token(token)
@ -919,10 +947,10 @@ def dyndns_update():
.outerjoin(Account, Domain.account_id == Account.id) \ .outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \ .outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.filter( .filter(
db.or_( db.or_(
DomainUser.user_id == current_user.id, DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id AccountUser.user_id == current_user.id
)).all() )).all()
except Exception as e: except Exception as e:
current_app.logger.error('DynDNS Error: {0}'.format(e)) current_app.logger.error('DynDNS Error: {0}'.format(e))
current_app.logger.debug(traceback.format_exc()) current_app.logger.debug(traceback.format_exc())
@ -982,13 +1010,13 @@ def dyndns_update():
if result['status'] == 'ok': if result['status'] == 'ok':
history = History( history = History(
msg='DynDNS update: updated {} successfully'.format(hostname), msg='DynDNS update: updated {} successfully'.format(hostname),
detail = json.dumps({ detail=json.dumps({
'domain': domain.name, 'domain': domain.name,
'record': hostname, 'record': hostname,
'type': rtype, 'type': rtype,
'old_value': oldip, 'old_value': oldip,
'new_value': str(ip) 'new_value': str(ip)
}), }),
created_by=current_user.username, created_by=current_user.username,
domain_id=domain.id) domain_id=domain.id)
history.add() history.add()
@ -999,7 +1027,7 @@ def dyndns_update():
elif r.is_allowed_edit(): elif r.is_allowed_edit():
ondemand_creation = DomainSetting.query.filter( ondemand_creation = DomainSetting.query.filter(
DomainSetting.domain == domain).filter( DomainSetting.domain == domain).filter(
DomainSetting.setting == 'create_via_dyndns').first() DomainSetting.setting == 'create_via_dyndns').first()
if (ondemand_creation is not None) and (strtobool( if (ondemand_creation is not None) and (strtobool(
ondemand_creation.value) == True): ondemand_creation.value) == True):
@ -1024,11 +1052,11 @@ def dyndns_update():
msg= msg=
'DynDNS update: created record {0} in zone {1} successfully' 'DynDNS update: created record {0} in zone {1} successfully'
.format(hostname, domain.name, str(ip)), .format(hostname, domain.name, str(ip)),
detail = json.dumps({ detail=json.dumps({
'domain': domain.name, 'domain': domain.name,
'record': hostname, 'record': hostname,
'value': str(ip) 'value': str(ip)
}), }),
created_by=current_user.username, created_by=current_user.username,
domain_id=domain.id) domain_id=domain.id)
history.add() history.add()
@ -1088,7 +1116,7 @@ def saml_authorized():
req = saml.prepare_flask_request(request) req = saml.prepare_flask_request(request)
auth = saml.init_saml_auth(req) auth = saml.init_saml_auth(req)
auth.process_response() auth.process_response()
current_app.logger.debug( auth.get_attributes() ) current_app.logger.debug(auth.get_attributes())
errors = auth.get_errors() errors = auth.get_errors()
if len(errors) == 0: if len(errors) == 0:
session['samlUserdata'] = auth.get_attributes() session['samlUserdata'] = auth.get_attributes()
@ -1097,7 +1125,7 @@ def saml_authorized():
self_url = OneLogin_Saml2_Utils.get_self_url(req) self_url = OneLogin_Saml2_Utils.get_self_url(req)
self_url = self_url + req['script_name'] self_url = self_url + req['script_name']
if 'RelayState' in request.form and self_url != request.form[ if 'RelayState' in request.form and self_url != request.form[
'RelayState']: 'RelayState']:
return redirect(auth.redirect_to(request.form['RelayState'])) return redirect(auth.redirect_to(request.form['RelayState']))
if current_app.config.get('SAML_ATTRIBUTE_USERNAME', False): if current_app.config.get('SAML_ATTRIBUTE_USERNAME', False):
username = session['samlUserdata'][ username = session['samlUserdata'][
@ -1129,7 +1157,7 @@ def saml_authorized():
admin_group_name = current_app.config.get('SAML_GROUP_ADMIN_NAME', admin_group_name = current_app.config.get('SAML_GROUP_ADMIN_NAME',
None) None)
operator_group_name = current_app.config.get('SAML_GROUP_OPERATOR_NAME', operator_group_name = current_app.config.get('SAML_GROUP_OPERATOR_NAME',
None) None)
group_to_account_mapping = create_group_to_account_mapping() group_to_account_mapping = create_group_to_account_mapping()
if email_attribute_name in session['samlUserdata']: if email_attribute_name in session['samlUserdata']:
@ -1170,13 +1198,13 @@ def saml_authorized():
account.add_user(user) account.add_user(user)
history = History(msg='Adding {0} to account {1}'.format( history = History(msg='Adding {0} to account {1}'.format(
user.username, account.name), user.username, account.name),
created_by='SAML Assertion') created_by='SAML Assertion')
history.add() history.add()
for account in user_accounts - saml_accounts: for account in user_accounts - saml_accounts:
account.remove_user(user) account.remove_user(user)
history = History(msg='Removing {0} from account {1}'.format( history = History(msg='Removing {0} from account {1}'.format(
user.username, account.name), user.username, account.name),
created_by='SAML Assertion') created_by='SAML Assertion')
history.add() history.add()
if admin_attribute_name and 'true' in session['samlUserdata'].get( if admin_attribute_name and 'true' in session['samlUserdata'].get(
admin_attribute_name, []): admin_attribute_name, []):
@ -1190,7 +1218,7 @@ def saml_authorized():
user.role_id = Role.query.filter_by(name='User').first().id user.role_id = Role.query.filter_by(name='User').first().id
history = History(msg='Demoting {0} to user'.format( history = History(msg='Demoting {0} to user'.format(
user.username), user.username),
created_by='SAML Assertion') created_by='SAML Assertion')
history.add() history.add()
user.plain_text_password = None user.plain_text_password = None
user.update_profile() user.update_profile()
@ -1234,15 +1262,16 @@ def uplift_to_admin(user):
user.role_id = Role.query.filter_by(name='Administrator').first().id user.role_id = Role.query.filter_by(name='Administrator').first().id
history = History(msg='Promoting {0} to administrator'.format( history = History(msg='Promoting {0} to administrator'.format(
user.username), user.username),
created_by='SAML Assertion') created_by='SAML Assertion')
history.add() history.add()
def uplift_to_operator(user): def uplift_to_operator(user):
if user.role.name != 'Operator': if user.role.name != 'Operator':
user.role_id = Role.query.filter_by(name='Operator').first().id user.role_id = Role.query.filter_by(name='Operator').first().id
history = History(msg='Promoting {0} to operator'.format( history = History(msg='Promoting {0} to operator'.format(
user.username), user.username),
created_by='SAML Assertion') created_by='SAML Assertion')
history.add() history.add()

View File

@ -38,14 +38,16 @@ def azure_oauth():
@current_app.route('/azure/authorized') @current_app.route('/azure/authorized')
def azure_authorized(): def azure_authorized():
session['azure_oauthredir'] = url_for('.azure_authorized', use_ssl = current_app.config.get('SERVER_EXTERNAL_SSL')
_external=True, params = {'_external': True}
_scheme='https') if isinstance(use_ssl, bool):
params['_scheme'] = 'https' if use_ssl else 'http'
session['azure_oauthredir'] = url_for('.azure_authorized', **params)
token = azure.authorize_access_token() token = azure.authorize_access_token()
if token is None: if token is None:
return 'Access denied: reason=%s error=%s' % ( return 'Access denied: reason=%s error=%s' % (
request.args['error'], request.args['error_description']) request.args['error'], request.args['error_description'])
session['azure_token'] = (token) session['azure_token'] = (token)
return redirect(url_for('index.login', _external=True, _scheme='https')) return redirect(url_for('index.login', **params))
return azure return azure

View File

@ -40,13 +40,16 @@ def github_oauth():
@current_app.route('/github/authorized') @current_app.route('/github/authorized')
def github_authorized(): def github_authorized():
session['github_oauthredir'] = url_for('.github_authorized', use_ssl = current_app.config.get('SERVER_EXTERNAL_SSL')
_external=True) params = {'_external': True}
if isinstance(use_ssl, bool):
params['_scheme'] = 'https' if use_ssl else 'http'
session['github_oauthredir'] = url_for('.github_authorized', **params)
token = github.authorize_access_token() token = github.authorize_access_token()
if token is None: if token is None:
return 'Access denied: reason=%s error=%s' % ( return 'Access denied: reason=%s error=%s' % (
request.args['error'], request.args['error_description']) request.args['error'], request.args['error_description'])
session['github_token'] = (token) session['github_token'] = token
return redirect(url_for('index.login')) return redirect(url_for('index.login', **params))
return github return github

View File

@ -39,16 +39,18 @@ def google_oauth():
@current_app.route('/google/authorized') @current_app.route('/google/authorized')
def google_authorized(): def google_authorized():
session['google_oauthredir'] = url_for( use_ssl = current_app.config.get('SERVER_EXTERNAL_SSL')
'.google_authorized', _external=True) params = {'_external': True}
if isinstance(use_ssl, bool):
params['_scheme'] = 'https' if use_ssl else 'http'
session['google_oauthredir'] = url_for('.google_authorized', **params)
token = google.authorize_access_token() token = google.authorize_access_token()
if token is None: if token is None:
return 'Access denied: reason=%s error=%s' % ( return 'Access denied: reason=%s error=%s' % (
request.args['error_reason'], request.args['error_reason'],
request.args['error_description'] request.args['error_description']
) )
session['google_token'] = (token) session['google_token'] = token
return redirect(url_for('index.login')) return redirect(url_for('index.login', **params))
return google return google

View File

@ -39,13 +39,16 @@ def oidc_oauth():
@current_app.route('/oidc/authorized') @current_app.route('/oidc/authorized')
def oidc_authorized(): def oidc_authorized():
session['oidc_oauthredir'] = url_for('.oidc_authorized', use_ssl = current_app.config.get('SERVER_EXTERNAL_SSL')
_external=True) params = {'_external': True}
if isinstance(use_ssl, bool):
params['_scheme'] = 'https' if use_ssl else 'http'
session['oidc_oauthredir'] = url_for('.oidc_authorized', **params)
token = oidc.authorize_access_token() token = oidc.authorize_access_token()
if token is None: if token is None:
return 'Access denied: reason=%s error=%s' % ( return 'Access denied: reason=%s error=%s' % (
request.args['error'], request.args['error_description']) request.args['error'], request.args['error_description'])
session['oidc_token'] = (token) session['oidc_token'] = token
return redirect(url_for('index.login')) return redirect(url_for('index.login', **params))
return oidc return oidc