mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-07-31 17:53:45 +00:00
resolved conflict from 737e104912
This commit is contained in:
@@ -72,8 +72,8 @@ def get_record_changes(del_rrset, add_rrset):
|
||||
"""For the given record, return the state dict."""
|
||||
return {
|
||||
"disabled": record['disabled'],
|
||||
"content": record['content'],
|
||||
"comment": record.get('comment', ''),
|
||||
"content": record['content'],
|
||||
"comment": record.get('comment', ''),
|
||||
}
|
||||
|
||||
add_records = get_records(add_rrset)
|
||||
@@ -882,7 +882,8 @@ class DetailedHistory():
|
||||
description=DetailedHistory.get_key_val(detail_dict,
|
||||
"description"))
|
||||
|
||||
elif any(msg in history.msg for msg in ['Change zone','Change domain']) and 'access control' in history.msg: # added or removed a user from a zone
|
||||
elif any(msg in history.msg for msg in ['Change zone',
|
||||
'Change domain']) and 'access control' in history.msg: # added or removed a user from a zone
|
||||
users_with_access = DetailedHistory.get_key_val(detail_dict, "user_has_access")
|
||||
self.detailed_msg = render_template_string("""
|
||||
<table class="table table-bordered table-striped">
|
||||
@@ -927,7 +928,7 @@ class DetailedHistory():
|
||||
linked_domains=DetailedHistory.get_key_val(detail_dict,
|
||||
"domains"))
|
||||
|
||||
elif any(msg in history.msg for msg in ['Update type for zone','Update type for domain']):
|
||||
elif any(msg in history.msg for msg in ['Update type for zone', 'Update type for domain']):
|
||||
self.detailed_msg = render_template_string("""
|
||||
<table class="table table-bordered table-striped">
|
||||
<tr><td>Zone: </td><td>{{ domain }}</td></tr>
|
||||
@@ -962,7 +963,8 @@ class DetailedHistory():
|
||||
'status'),
|
||||
history_msg=DetailedHistory.get_key_val(detail_dict, 'msg'))
|
||||
|
||||
elif any(msg in history.msg for msg in ['Update zone','Update domain']) and 'associate account' in history.msg: # When an account gets associated or dissociate with zones
|
||||
elif any(msg in history.msg for msg in ['Update zone',
|
||||
'Update domain']) and 'associate account' in history.msg: # When an account gets associated or dissociate with zones
|
||||
self.detailed_msg = render_template_string('''
|
||||
<table class="table table-bordered table-striped">
|
||||
<tr><td>Associate: </td><td>{{ history_assoc_account }}</td></tr>
|
||||
@@ -1208,8 +1210,10 @@ def history_table(): # ajax call data
|
||||
.filter(
|
||||
db.and_(
|
||||
db.or_(
|
||||
History.msg.like("%domain " + domain_name) if domain_name != "*" else History.msg.like("%domain%"),
|
||||
History.msg.like("%zone " + domain_name) if domain_name != "*" else History.msg.like("%zone%"),
|
||||
History.msg.like("%domain " + domain_name) if domain_name != "*" else History.msg.like(
|
||||
"%domain%"),
|
||||
History.msg.like("%zone " + domain_name) if domain_name != "*" else History.msg.like(
|
||||
"%zone%"),
|
||||
History.msg.like(
|
||||
"%domain " + domain_name + " access control") if domain_name != "*" else History.msg.like(
|
||||
"%domain%access control"),
|
||||
@@ -1498,287 +1502,34 @@ def has_an_auth_method(local_db_enabled=None,
|
||||
oidc_oauth_enabled = Setting().get('oidc_oauth_enabled')
|
||||
if azure_oauth_enabled is None:
|
||||
azure_oauth_enabled = Setting().get('azure_oauth_enabled')
|
||||
return local_db_enabled or ldap_enabled or google_oauth_enabled or github_oauth_enabled or oidc_oauth_enabled or azure_oauth_enabled
|
||||
return local_db_enabled or ldap_enabled or google_oauth_enabled or github_oauth_enabled or oidc_oauth_enabled \
|
||||
or azure_oauth_enabled
|
||||
|
||||
|
||||
@admin_bp.route('/setting/authentication', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
@admin_role_required
|
||||
def setting_authentication():
|
||||
if request.method == 'GET':
|
||||
return render_template('admin_setting_authentication.html')
|
||||
elif request.method == 'POST':
|
||||
conf_type = request.form.get('config_tab')
|
||||
result = None
|
||||
return render_template('admin_setting_authentication.html')
|
||||
|
||||
if conf_type == 'general':
|
||||
local_db_enabled = True if request.form.get(
|
||||
'local_db_enabled') else False
|
||||
signup_enabled = True if request.form.get(
|
||||
'signup_enabled') else False
|
||||
|
||||
pwd_enforce_characters = True if request.form.get('pwd_enforce_characters') else False
|
||||
pwd_min_len = safe_cast(request.form.get('pwd_min_len', Setting().defaults["pwd_min_len"]), int,
|
||||
Setting().defaults["pwd_min_len"])
|
||||
pwd_min_lowercase = safe_cast(request.form.get('pwd_min_lowercase', Setting().defaults["pwd_min_lowercase"]), int,
|
||||
Setting().defaults["pwd_min_lowercase"])
|
||||
pwd_min_uppercase = safe_cast(request.form.get('pwd_min_uppercase', Setting().defaults["pwd_min_uppercase"]), int,
|
||||
Setting().defaults["pwd_min_uppercase"])
|
||||
pwd_min_digits = safe_cast(request.form.get('pwd_min_digits', Setting().defaults["pwd_min_digits"]), int,
|
||||
Setting().defaults["pwd_min_digits"])
|
||||
pwd_min_special = safe_cast(request.form.get('pwd_min_special', Setting().defaults["pwd_min_special"]), int,
|
||||
Setting().defaults["pwd_min_special"])
|
||||
@admin_bp.route('/setting/authentication/api', methods=['POST'])
|
||||
@login_required
|
||||
@admin_role_required
|
||||
def setting_authentication_api():
|
||||
result = {'status': 1, 'messages': [], 'data': {}}
|
||||
|
||||
pwd_enforce_complexity = True if request.form.get('pwd_enforce_complexity') else False
|
||||
pwd_min_complexity = safe_cast(request.form.get('pwd_min_complexity', Setting().defaults["pwd_min_complexity"]), int,
|
||||
Setting().defaults["pwd_min_complexity"])
|
||||
if request.form.get('commit') == '1':
|
||||
model = Setting()
|
||||
data = json.loads(request.form.get('data'))
|
||||
|
||||
if not has_an_auth_method(local_db_enabled=local_db_enabled):
|
||||
result = {
|
||||
'status':
|
||||
False,
|
||||
'msg':
|
||||
'Must have at least one authentication method enabled.'
|
||||
}
|
||||
else:
|
||||
Setting().set('local_db_enabled', local_db_enabled)
|
||||
Setting().set('signup_enabled', signup_enabled)
|
||||
for key, value in data.items():
|
||||
if key in model.groups['authentication']:
|
||||
model.set(key, value)
|
||||
|
||||
Setting().set('pwd_enforce_characters', pwd_enforce_characters)
|
||||
Setting().set('pwd_min_len', pwd_min_len)
|
||||
Setting().set('pwd_min_lowercase', pwd_min_lowercase)
|
||||
Setting().set('pwd_min_uppercase', pwd_min_uppercase)
|
||||
Setting().set('pwd_min_digits', pwd_min_digits)
|
||||
Setting().set('pwd_min_special', pwd_min_special)
|
||||
result['data'] = Setting().get_group('authentication')
|
||||
|
||||
Setting().set('pwd_enforce_complexity', pwd_enforce_complexity)
|
||||
Setting().set('pwd_min_complexity', pwd_min_complexity)
|
||||
|
||||
result = {'status': True, 'msg': 'Saved successfully'}
|
||||
|
||||
elif conf_type == 'ldap':
|
||||
ldap_enabled = True if request.form.get('ldap_enabled') else False
|
||||
|
||||
if not has_an_auth_method(ldap_enabled=ldap_enabled):
|
||||
result = {
|
||||
'status':
|
||||
False,
|
||||
'msg':
|
||||
'Must have at least one authentication method enabled.'
|
||||
}
|
||||
else:
|
||||
Setting().set('ldap_enabled', ldap_enabled)
|
||||
Setting().set('ldap_type', request.form.get('ldap_type'))
|
||||
Setting().set('ldap_uri', request.form.get('ldap_uri'))
|
||||
Setting().set('ldap_base_dn', request.form.get('ldap_base_dn'))
|
||||
Setting().set('ldap_admin_username',
|
||||
request.form.get('ldap_admin_username'))
|
||||
Setting().set('ldap_admin_password',
|
||||
request.form.get('ldap_admin_password'))
|
||||
Setting().set('ldap_filter_basic',
|
||||
request.form.get('ldap_filter_basic'))
|
||||
Setting().set('ldap_filter_group',
|
||||
request.form.get('ldap_filter_group'))
|
||||
Setting().set('ldap_filter_username',
|
||||
request.form.get('ldap_filter_username'))
|
||||
Setting().set('ldap_filter_groupname',
|
||||
request.form.get('ldap_filter_groupname'))
|
||||
Setting().set(
|
||||
'ldap_sg_enabled', True
|
||||
if request.form.get('ldap_sg_enabled') == 'ON' else False)
|
||||
Setting().set('ldap_admin_group',
|
||||
request.form.get('ldap_admin_group'))
|
||||
Setting().set('ldap_operator_group',
|
||||
request.form.get('ldap_operator_group'))
|
||||
Setting().set('ldap_user_group',
|
||||
request.form.get('ldap_user_group'))
|
||||
Setting().set('ldap_domain', request.form.get('ldap_domain'))
|
||||
Setting().set(
|
||||
'autoprovisioning', True
|
||||
if request.form.get('autoprovisioning') == 'ON' else False)
|
||||
Setting().set('autoprovisioning_attribute',
|
||||
request.form.get('autoprovisioning_attribute'))
|
||||
|
||||
if request.form.get('autoprovisioning') == 'ON':
|
||||
if validateURN(request.form.get('urn_value')):
|
||||
Setting().set('urn_value',
|
||||
request.form.get('urn_value'))
|
||||
else:
|
||||
return render_template('admin_setting_authentication.html',
|
||||
error="Invalid urn")
|
||||
else:
|
||||
Setting().set('urn_value',
|
||||
request.form.get('urn_value'))
|
||||
|
||||
Setting().set('purge', True
|
||||
if request.form.get('purge') == 'ON' else False)
|
||||
|
||||
result = {'status': True, 'msg': 'Saved successfully'}
|
||||
elif conf_type == 'google':
|
||||
google_oauth_enabled = True if request.form.get(
|
||||
'google_oauth_enabled') else False
|
||||
if not has_an_auth_method(google_oauth_enabled=google_oauth_enabled):
|
||||
result = {
|
||||
'status':
|
||||
False,
|
||||
'msg':
|
||||
'Must have at least one authentication method enabled.'
|
||||
}
|
||||
else:
|
||||
Setting().set('google_oauth_enabled', google_oauth_enabled)
|
||||
Setting().set('google_oauth_client_id',
|
||||
request.form.get('google_oauth_client_id'))
|
||||
Setting().set('google_oauth_client_secret',
|
||||
request.form.get('google_oauth_client_secret'))
|
||||
Setting().set('google_oauth_metadata_url',
|
||||
request.form.get('google_oauth_metadata_url'))
|
||||
Setting().set('google_token_url',
|
||||
request.form.get('google_token_url'))
|
||||
Setting().set('google_oauth_scope',
|
||||
request.form.get('google_oauth_scope'))
|
||||
Setting().set('google_authorize_url',
|
||||
request.form.get('google_authorize_url'))
|
||||
Setting().set('google_base_url',
|
||||
request.form.get('google_base_url'))
|
||||
result = {
|
||||
'status': True,
|
||||
'msg':
|
||||
'Saved successfully. Please reload PDA to take effect.'
|
||||
}
|
||||
elif conf_type == 'github':
|
||||
github_oauth_enabled = True if request.form.get(
|
||||
'github_oauth_enabled') else False
|
||||
if not has_an_auth_method(github_oauth_enabled=github_oauth_enabled):
|
||||
result = {
|
||||
'status':
|
||||
False,
|
||||
'msg':
|
||||
'Must have at least one authentication method enabled.'
|
||||
}
|
||||
else:
|
||||
Setting().set('github_oauth_enabled', github_oauth_enabled)
|
||||
Setting().set('github_oauth_key',
|
||||
request.form.get('github_oauth_key'))
|
||||
Setting().set('github_oauth_secret',
|
||||
request.form.get('github_oauth_secret'))
|
||||
Setting().set('github_oauth_scope',
|
||||
request.form.get('github_oauth_scope'))
|
||||
Setting().set('github_oauth_api_url',
|
||||
request.form.get('github_oauth_api_url'))
|
||||
Setting().set('github_oauth_metadata_url',
|
||||
request.form.get('github_oauth_metadata_url'))
|
||||
Setting().set('github_oauth_token_url',
|
||||
request.form.get('github_oauth_token_url'))
|
||||
Setting().set('github_oauth_authorize_url',
|
||||
request.form.get('github_oauth_authorize_url'))
|
||||
result = {
|
||||
'status': True,
|
||||
'msg':
|
||||
'Saved successfully. Please reload PDA to take effect.'
|
||||
}
|
||||
elif conf_type == 'azure':
|
||||
azure_oauth_enabled = True if request.form.get(
|
||||
'azure_oauth_enabled') else False
|
||||
if not has_an_auth_method(azure_oauth_enabled=azure_oauth_enabled):
|
||||
result = {
|
||||
'status':
|
||||
False,
|
||||
'msg':
|
||||
'Must have at least one authentication method enabled.'
|
||||
}
|
||||
else:
|
||||
Setting().set('azure_oauth_enabled', azure_oauth_enabled)
|
||||
Setting().set('azure_oauth_key',
|
||||
request.form.get('azure_oauth_key'))
|
||||
Setting().set('azure_oauth_secret',
|
||||
request.form.get('azure_oauth_secret'))
|
||||
Setting().set('azure_oauth_scope',
|
||||
request.form.get('azure_oauth_scope'))
|
||||
Setting().set('azure_oauth_api_url',
|
||||
request.form.get('azure_oauth_api_url'))
|
||||
Setting().set('azure_oauth_metadata_url',
|
||||
request.form.get('azure_oauth_metadata_url'))
|
||||
Setting().set('azure_oauth_token_url',
|
||||
request.form.get('azure_oauth_token_url'))
|
||||
Setting().set('azure_oauth_authorize_url',
|
||||
request.form.get('azure_oauth_authorize_url'))
|
||||
Setting().set(
|
||||
'azure_sg_enabled', True
|
||||
if request.form.get('azure_sg_enabled') == 'ON' else False)
|
||||
Setting().set('azure_admin_group',
|
||||
request.form.get('azure_admin_group'))
|
||||
Setting().set('azure_operator_group',
|
||||
request.form.get('azure_operator_group'))
|
||||
Setting().set('azure_user_group',
|
||||
request.form.get('azure_user_group'))
|
||||
Setting().set(
|
||||
'azure_group_accounts_enabled', True
|
||||
if request.form.get('azure_group_accounts_enabled') == 'ON' else False)
|
||||
Setting().set('azure_group_accounts_name',
|
||||
request.form.get('azure_group_accounts_name'))
|
||||
Setting().set('azure_group_accounts_name_re',
|
||||
request.form.get('azure_group_accounts_name_re'))
|
||||
Setting().set('azure_group_accounts_description',
|
||||
request.form.get('azure_group_accounts_description'))
|
||||
Setting().set('azure_group_accounts_description_re',
|
||||
request.form.get('azure_group_accounts_description_re'))
|
||||
result = {
|
||||
'status': True,
|
||||
'msg':
|
||||
'Saved successfully. Please reload PDA to take effect.'
|
||||
}
|
||||
elif conf_type == 'oidc':
|
||||
oidc_oauth_enabled = True if request.form.get(
|
||||
'oidc_oauth_enabled') else False
|
||||
if not has_an_auth_method(oidc_oauth_enabled=oidc_oauth_enabled):
|
||||
result = {
|
||||
'status':
|
||||
False,
|
||||
'msg':
|
||||
'Must have at least one authentication method enabled.'
|
||||
}
|
||||
else:
|
||||
Setting().set(
|
||||
'oidc_oauth_enabled',
|
||||
True if request.form.get('oidc_oauth_enabled') else False)
|
||||
Setting().set('oidc_oauth_key',
|
||||
request.form.get('oidc_oauth_key'))
|
||||
Setting().set('oidc_oauth_secret',
|
||||
request.form.get('oidc_oauth_secret'))
|
||||
Setting().set('oidc_oauth_scope',
|
||||
request.form.get('oidc_oauth_scope'))
|
||||
Setting().set('oidc_oauth_api_url',
|
||||
request.form.get('oidc_oauth_api_url'))
|
||||
Setting().set('oidc_oauth_metadata_url',
|
||||
request.form.get('oidc_oauth_metadata_url'))
|
||||
Setting().set('oidc_oauth_token_url',
|
||||
request.form.get('oidc_oauth_token_url'))
|
||||
Setting().set('oidc_oauth_authorize_url',
|
||||
request.form.get('oidc_oauth_authorize_url'))
|
||||
Setting().set('oidc_oauth_logout_url',
|
||||
request.form.get('oidc_oauth_logout_url'))
|
||||
Setting().set('oidc_oauth_username',
|
||||
request.form.get('oidc_oauth_username'))
|
||||
Setting().set('oidc_oauth_firstname',
|
||||
request.form.get('oidc_oauth_firstname'))
|
||||
Setting().set('oidc_oauth_last_name',
|
||||
request.form.get('oidc_oauth_last_name'))
|
||||
Setting().set('oidc_oauth_email',
|
||||
request.form.get('oidc_oauth_email'))
|
||||
Setting().set('oidc_oauth_account_name_property',
|
||||
request.form.get('oidc_oauth_account_name_property'))
|
||||
Setting().set('oidc_oauth_account_description_property',
|
||||
request.form.get('oidc_oauth_account_description_property'))
|
||||
result = {
|
||||
'status': True,
|
||||
'msg':
|
||||
'Saved successfully. Please reload PDA to take effect.'
|
||||
}
|
||||
else:
|
||||
return abort(400)
|
||||
|
||||
return render_template('admin_setting_authentication.html',
|
||||
result=result)
|
||||
return result
|
||||
|
||||
|
||||
@admin_bp.route('/templates', methods=['GET', 'POST'])
|
||||
@@ -2055,16 +1806,16 @@ def global_search():
|
||||
results = server.global_search(object_type='all', query=query)
|
||||
|
||||
# Filter results to domains to which the user has access permission
|
||||
if current_user.role.name not in [ 'Administrator', 'Operator' ]:
|
||||
if current_user.role.name not in ['Administrator', 'Operator']:
|
||||
allowed_domains = db.session.query(Domain) \
|
||||
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
|
||||
.outerjoin(Account, Domain.account_id == Account.id) \
|
||||
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
|
||||
.filter(
|
||||
db.or_(
|
||||
DomainUser.user_id == current_user.id,
|
||||
AccountUser.user_id == current_user.id
|
||||
)) \
|
||||
db.or_(
|
||||
DomainUser.user_id == current_user.id,
|
||||
AccountUser.user_id == current_user.id
|
||||
)) \
|
||||
.with_entities(Domain.name) \
|
||||
.all()
|
||||
allowed_domains = [value for value, in allowed_domains]
|
||||
|
@@ -141,7 +141,7 @@ def domains_custom(tab_id):
|
||||
filtered_count = domains.count()
|
||||
|
||||
start = int(request.args.get("start", 0))
|
||||
length = min(int(request.args.get("length", 0)), 100)
|
||||
length = min(int(request.args.get("length", 0)), max(100, int(Setting().get('default_domain_table_size'))))
|
||||
|
||||
if length != -1:
|
||||
domains = domains[start:start + length]
|
||||
|
@@ -494,13 +494,17 @@ def setting(domain_name):
|
||||
d = Domain(name=domain_name)
|
||||
domain_user_ids = d.get_user()
|
||||
account = d.get_account()
|
||||
domain_info = d.get_domain_info(domain_name)
|
||||
|
||||
return render_template('domain_setting.html',
|
||||
domain=domain,
|
||||
users=users,
|
||||
domain_user_ids=domain_user_ids,
|
||||
accounts=accounts,
|
||||
domain_account=account)
|
||||
domain_account=account,
|
||||
zone_type=domain_info["kind"].lower(),
|
||||
masters=','.join(domain_info["masters"]),
|
||||
soa_edit_api=domain_info["soa_edit_api"].upper())
|
||||
|
||||
if request.method == 'POST':
|
||||
# username in right column
|
||||
|
@@ -45,6 +45,7 @@ index_bp = Blueprint('index',
|
||||
template_folder='templates',
|
||||
url_prefix='/')
|
||||
|
||||
|
||||
@index_bp.before_app_first_request
|
||||
def register_modules():
|
||||
global google
|
||||
@@ -68,7 +69,7 @@ def before_request():
|
||||
# Check site is in maintenance mode
|
||||
maintenance = Setting().get('maintenance')
|
||||
if maintenance and current_user.is_authenticated and current_user.role.name not in [
|
||||
'Administrator', 'Operator'
|
||||
'Administrator', 'Operator'
|
||||
]:
|
||||
return render_template('maintenance.html')
|
||||
|
||||
@@ -98,7 +99,11 @@ def google_login():
|
||||
)
|
||||
abort(400)
|
||||
else:
|
||||
redirect_uri = url_for('google_authorized', _external=True)
|
||||
use_ssl = current_app.config.get('SERVER_EXTERNAL_SSL')
|
||||
params = {'_external': True}
|
||||
if isinstance(use_ssl, bool):
|
||||
params['_scheme'] = 'https' if use_ssl else 'http'
|
||||
redirect_uri = url_for('google_authorized', **params)
|
||||
return google.authorize_redirect(redirect_uri)
|
||||
|
||||
|
||||
@@ -110,7 +115,11 @@ def github_login():
|
||||
)
|
||||
abort(400)
|
||||
else:
|
||||
redirect_uri = url_for('github_authorized', _external=True)
|
||||
use_ssl = current_app.config.get('SERVER_EXTERNAL_SSL')
|
||||
params = {'_external': True}
|
||||
if isinstance(use_ssl, bool):
|
||||
params['_scheme'] = 'https' if use_ssl else 'http'
|
||||
redirect_uri = url_for('github_authorized', **params)
|
||||
return github.authorize_redirect(redirect_uri)
|
||||
|
||||
|
||||
@@ -122,9 +131,11 @@ def azure_login():
|
||||
)
|
||||
abort(400)
|
||||
else:
|
||||
redirect_uri = url_for('azure_authorized',
|
||||
_external=True,
|
||||
_scheme='https')
|
||||
use_ssl = current_app.config.get('SERVER_EXTERNAL_SSL')
|
||||
params = {'_external': True}
|
||||
if isinstance(use_ssl, bool):
|
||||
params['_scheme'] = 'https' if use_ssl else 'http'
|
||||
redirect_uri = url_for('azure_authorized', **params)
|
||||
return azure.authorize_redirect(redirect_uri)
|
||||
|
||||
|
||||
@@ -136,7 +147,11 @@ def oidc_login():
|
||||
)
|
||||
abort(400)
|
||||
else:
|
||||
redirect_uri = url_for('oidc_authorized', _external=True)
|
||||
use_ssl = current_app.config.get('SERVER_EXTERNAL_SSL')
|
||||
params = {'_external': True}
|
||||
if isinstance(use_ssl, bool):
|
||||
params['_scheme'] = 'https' if use_ssl else 'http'
|
||||
redirect_uri = url_for('oidc_authorized', **params)
|
||||
return oidc.authorize_redirect(redirect_uri)
|
||||
|
||||
|
||||
@@ -149,18 +164,18 @@ def login():
|
||||
|
||||
if 'google_token' in session:
|
||||
user_data = json.loads(google.get('userinfo').text)
|
||||
first_name = user_data['given_name']
|
||||
surname = user_data['family_name']
|
||||
email = user_data['email']
|
||||
user = User.query.filter_by(username=email).first()
|
||||
google_first_name = user_data['given_name']
|
||||
google_last_name = user_data['family_name']
|
||||
google_email = user_data['email']
|
||||
user = User.query.filter_by(username=google_email).first()
|
||||
if user is None:
|
||||
user = User.query.filter_by(email=email).first()
|
||||
user = User.query.filter_by(email=google_email).first()
|
||||
if not user:
|
||||
user = User(username=email,
|
||||
firstname=first_name,
|
||||
lastname=surname,
|
||||
user = User(username=google_email,
|
||||
firstname=google_first_name,
|
||||
lastname=google_last_name,
|
||||
plain_text_password=None,
|
||||
email=email)
|
||||
email=google_email)
|
||||
|
||||
result = user.create_local_user()
|
||||
if not result['status']:
|
||||
@@ -172,10 +187,18 @@ def login():
|
||||
return authenticate_user(user, 'Google OAuth')
|
||||
|
||||
if 'github_token' in session:
|
||||
me = json.loads(github.get('user').text)
|
||||
github_username = me['login']
|
||||
github_name = me['name']
|
||||
github_email = me['email']
|
||||
user_data = json.loads(github.get('user').text)
|
||||
github_username = user_data['login']
|
||||
github_first_name = user_data['name']
|
||||
github_last_name = ''
|
||||
github_email = user_data['email']
|
||||
|
||||
# If the user's full name from GitHub contains at least two words, use the first word as the first name and
|
||||
# the rest as the last name.
|
||||
github_name_parts = github_first_name.split(' ')
|
||||
if len(github_name_parts) > 1:
|
||||
github_first_name = github_name_parts[0]
|
||||
github_last_name = ' '.join(github_name_parts[1:])
|
||||
|
||||
user = User.query.filter_by(username=github_username).first()
|
||||
if user is None:
|
||||
@@ -183,8 +206,8 @@ def login():
|
||||
if not user:
|
||||
user = User(username=github_username,
|
||||
plain_text_password=None,
|
||||
firstname=github_name,
|
||||
lastname='',
|
||||
firstname=github_first_name,
|
||||
lastname=github_last_name,
|
||||
email=github_email)
|
||||
|
||||
result = user.create_local_user()
|
||||
@@ -198,8 +221,8 @@ def login():
|
||||
|
||||
if 'azure_token' in session:
|
||||
azure_info = azure.get('me?$select=displayName,givenName,id,mail,surname,userPrincipalName').text
|
||||
current_app.logger.info('Azure login returned: '+azure_info)
|
||||
me = json.loads(azure_info)
|
||||
current_app.logger.info('Azure login returned: ' + azure_info)
|
||||
user_data = json.loads(azure_info)
|
||||
|
||||
azure_info = azure.post('me/getMemberGroups',
|
||||
json={'securityEnabledOnly': False}).text
|
||||
@@ -211,15 +234,15 @@ def login():
|
||||
else:
|
||||
mygroups = []
|
||||
|
||||
azure_username = me["userPrincipalName"]
|
||||
azure_givenname = me["givenName"]
|
||||
azure_familyname = me["surname"]
|
||||
if "mail" in me:
|
||||
azure_email = me["mail"]
|
||||
azure_username = user_data["userPrincipalName"]
|
||||
azure_first_name = user_data["givenName"]
|
||||
azure_last_name = user_data["surname"]
|
||||
if "mail" in user_data:
|
||||
azure_email = user_data["mail"]
|
||||
else:
|
||||
azure_email = ""
|
||||
if not azure_email:
|
||||
azure_email = me["userPrincipalName"]
|
||||
azure_email = user_data["userPrincipalName"]
|
||||
|
||||
# Handle foreign principals such as guest users
|
||||
azure_email = re.sub(r"#.*$", "", azure_email)
|
||||
@@ -229,8 +252,8 @@ def login():
|
||||
if not user:
|
||||
user = User(username=azure_username,
|
||||
plain_text_password=None,
|
||||
firstname=azure_givenname,
|
||||
lastname=azure_familyname,
|
||||
firstname=azure_first_name,
|
||||
lastname=azure_last_name,
|
||||
email=azure_email)
|
||||
|
||||
result = user.create_local_user()
|
||||
@@ -250,30 +273,30 @@ def login():
|
||||
if Setting().get('azure_sg_enabled'):
|
||||
if Setting().get('azure_admin_group') in mygroups:
|
||||
current_app.logger.info('Setting role for user ' +
|
||||
azure_username +
|
||||
' to Administrator due to group membership')
|
||||
azure_username +
|
||||
' to Administrator due to group membership')
|
||||
user.set_role("Administrator")
|
||||
else:
|
||||
if Setting().get('azure_operator_group') in mygroups:
|
||||
current_app.logger.info('Setting role for user ' +
|
||||
azure_username +
|
||||
' to Operator due to group membership')
|
||||
azure_username +
|
||||
' to Operator due to group membership')
|
||||
user.set_role("Operator")
|
||||
else:
|
||||
if Setting().get('azure_user_group') in mygroups:
|
||||
current_app.logger.info('Setting role for user ' +
|
||||
azure_username +
|
||||
' to User due to group membership')
|
||||
azure_username +
|
||||
' to User due to group membership')
|
||||
user.set_role("User")
|
||||
else:
|
||||
current_app.logger.warning('User ' +
|
||||
azure_username +
|
||||
' has no relevant group memberships')
|
||||
azure_username +
|
||||
' has no relevant group memberships')
|
||||
session.pop('azure_token', None)
|
||||
return render_template('login.html',
|
||||
saml_enabled=SAML_ENABLED,
|
||||
error=('User ' + azure_username +
|
||||
' is not in any authorised groups.'))
|
||||
saml_enabled=SAML_ENABLED,
|
||||
error=('User ' + azure_username +
|
||||
' is not in any authorised groups.'))
|
||||
|
||||
# Handle account/group creation, if enabled
|
||||
if Setting().get('azure_group_accounts_enabled') and mygroups:
|
||||
@@ -369,23 +392,23 @@ def login():
|
||||
return authenticate_user(user, 'Azure OAuth')
|
||||
|
||||
if 'oidc_token' in session:
|
||||
me = json.loads(oidc.get('userinfo').text)
|
||||
oidc_username = me[Setting().get('oidc_oauth_username')]
|
||||
oidc_givenname = me[Setting().get('oidc_oauth_firstname')]
|
||||
oidc_familyname = me[Setting().get('oidc_oauth_last_name')]
|
||||
oidc_email = me[Setting().get('oidc_oauth_email')]
|
||||
user_data = json.loads(oidc.get('userinfo').text)
|
||||
oidc_username = user_data[Setting().get('oidc_oauth_username')]
|
||||
oidc_first_name = user_data[Setting().get('oidc_oauth_firstname')]
|
||||
oidc_last_name = user_data[Setting().get('oidc_oauth_last_name')]
|
||||
oidc_email = user_data[Setting().get('oidc_oauth_email')]
|
||||
|
||||
user = User.query.filter_by(username=oidc_username).first()
|
||||
if not user:
|
||||
user = User(username=oidc_username,
|
||||
plain_text_password=None,
|
||||
firstname=oidc_givenname,
|
||||
lastname=oidc_familyname,
|
||||
firstname=oidc_first_name,
|
||||
lastname=oidc_last_name,
|
||||
email=oidc_email)
|
||||
result = user.create_local_user()
|
||||
else:
|
||||
user.firstname = oidc_givenname
|
||||
user.lastname = oidc_familyname
|
||||
user.firstname = oidc_first_name
|
||||
user.lastname = oidc_last_name
|
||||
user.email = oidc_email
|
||||
user.plain_text_password = None
|
||||
result = user.update_local_user()
|
||||
@@ -394,20 +417,22 @@ def login():
|
||||
session.pop('oidc_token', None)
|
||||
return redirect(url_for('index.login'))
|
||||
|
||||
#This checks if the account_name_property and account_description property were included in settings.
|
||||
if Setting().get('oidc_oauth_account_name_property') and Setting().get('oidc_oauth_account_description_property'):
|
||||
# This checks if the account_name_property and account_description property were included in settings.
|
||||
if Setting().get('oidc_oauth_account_name_property') and Setting().get(
|
||||
'oidc_oauth_account_description_property'):
|
||||
|
||||
#Gets the name_property and description_property.
|
||||
# Gets the name_property and description_property.
|
||||
name_prop = Setting().get('oidc_oauth_account_name_property')
|
||||
desc_prop = Setting().get('oidc_oauth_account_description_property')
|
||||
|
||||
account_to_add = []
|
||||
#If the name_property and desc_property exist in me (A variable that contains all the userinfo from the IdP).
|
||||
if name_prop in me and desc_prop in me:
|
||||
accounts_name_prop = [me[name_prop]] if type(me[name_prop]) is not list else me[name_prop]
|
||||
accounts_desc_prop = [me[desc_prop]] if type(me[desc_prop]) is not list else me[desc_prop]
|
||||
# If the name_property and desc_property exist in me (A variable that contains all the userinfo from the
|
||||
# IdP).
|
||||
if name_prop in user_data and desc_prop in user_data:
|
||||
accounts_name_prop = [user_data[name_prop]] if type(user_data[name_prop]) is not list else user_data[name_prop]
|
||||
accounts_desc_prop = [user_data[desc_prop]] if type(user_data[desc_prop]) is not list else user_data[desc_prop]
|
||||
|
||||
#Run on all groups the user is in by the index num.
|
||||
# Run on all groups the user is in by the index num.
|
||||
for i in range(len(accounts_name_prop)):
|
||||
description = ''
|
||||
if i < len(accounts_desc_prop):
|
||||
@@ -417,7 +442,7 @@ def login():
|
||||
account_to_add.append(account)
|
||||
user_accounts = user.get_accounts()
|
||||
|
||||
# Add accounts
|
||||
# Add accounts
|
||||
for account in account_to_add:
|
||||
if account not in user_accounts:
|
||||
account.add_user(user)
|
||||
@@ -426,7 +451,7 @@ def login():
|
||||
if Setting().get('delete_sso_accounts'):
|
||||
for account in user_accounts:
|
||||
if account not in account_to_add:
|
||||
account.remove_user(user)
|
||||
account.remove_user(user)
|
||||
|
||||
session['user_id'] = user.id
|
||||
session['authentication_type'] = 'OAuth'
|
||||
@@ -490,34 +515,36 @@ def login():
|
||||
saml_enabled=SAML_ENABLED,
|
||||
error='Token required')
|
||||
|
||||
if Setting().get('autoprovisioning') and auth_method!='LOCAL':
|
||||
urn_value=Setting().get('urn_value')
|
||||
Entitlements=user.read_entitlements(Setting().get('autoprovisioning_attribute'))
|
||||
if len(Entitlements)==0 and Setting().get('purge'):
|
||||
if Setting().get('autoprovisioning') and auth_method != 'LOCAL':
|
||||
urn_value = Setting().get('urn_value')
|
||||
Entitlements = user.read_entitlements(Setting().get('autoprovisioning_attribute'))
|
||||
if len(Entitlements) == 0 and Setting().get('purge'):
|
||||
user.set_role("User")
|
||||
user.revoke_privilege(True)
|
||||
|
||||
elif len(Entitlements)!=0:
|
||||
elif len(Entitlements) != 0:
|
||||
if checkForPDAEntries(Entitlements, urn_value):
|
||||
user.updateUser(Entitlements)
|
||||
else:
|
||||
current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix')
|
||||
current_app.logger.warning(
|
||||
'Not a single powerdns-admin record was found, possibly a typo in the prefix')
|
||||
if Setting().get('purge'):
|
||||
user.set_role("User")
|
||||
user.revoke_privilege(True)
|
||||
current_app.logger.warning('Procceding to revoke every privilige from ' + user.username + '.' )
|
||||
current_app.logger.warning('Procceding to revoke every privilige from ' + user.username + '.')
|
||||
|
||||
return authenticate_user(user, auth_method, remember_me)
|
||||
|
||||
|
||||
def checkForPDAEntries(Entitlements, urn_value):
|
||||
"""
|
||||
Run through every record located in the ldap attribute given and determine if there are any valid powerdns-admin records
|
||||
"""
|
||||
urnArguments=[x.lower() for x in urn_value.split(':')]
|
||||
urnArguments = [x.lower() for x in urn_value.split(':')]
|
||||
for Entitlement in Entitlements:
|
||||
entArguments=Entitlement.split(':powerdns-admin')
|
||||
entArguments=[x.lower() for x in entArguments[0].split(':')]
|
||||
if (entArguments==urnArguments):
|
||||
entArguments = Entitlement.split(':powerdns-admin')
|
||||
entArguments = [x.lower() for x in entArguments[0].split(':')]
|
||||
if (entArguments == urnArguments):
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -526,6 +553,8 @@ def clear_session():
|
||||
session.pop('user_id', None)
|
||||
session.pop('github_token', None)
|
||||
session.pop('google_token', None)
|
||||
session.pop('azure_token', None)
|
||||
session.pop('oidc_token', None)
|
||||
session.pop('authentication_type', None)
|
||||
session.pop('remote_user', None)
|
||||
logout_user()
|
||||
@@ -553,14 +582,15 @@ def signin_history(username, authenticator, success):
|
||||
|
||||
# Write history
|
||||
History(msg='User {} authentication {}'.format(username, str_success),
|
||||
detail = json.dumps({
|
||||
'username': username,
|
||||
'authenticator': authenticator,
|
||||
'ip_address': request_ip,
|
||||
'success': 1 if success else 0
|
||||
}),
|
||||
detail=json.dumps({
|
||||
'username': username,
|
||||
'authenticator': authenticator,
|
||||
'ip_address': request_ip,
|
||||
'success': 1 if success else 0
|
||||
}),
|
||||
created_by='System').add()
|
||||
|
||||
|
||||
# Get a list of Azure security groups the user is a member of
|
||||
def get_azure_groups(uri):
|
||||
azure_info = azure.get(uri).text
|
||||
@@ -576,30 +606,33 @@ def get_azure_groups(uri):
|
||||
mygroups = []
|
||||
return mygroups
|
||||
|
||||
|
||||
# Handle user login, write history and, if set, handle showing the register_otp QR code.
|
||||
# if Setting for OTP on first login is enabled, and OTP field is also enabled,
|
||||
# but user isn't using it yet, enable OTP, get QR code and display it, logging the user out.
|
||||
def authenticate_user(user, authenticator, remember=False):
|
||||
login_user(user, remember=remember)
|
||||
signin_history(user.username, authenticator, True)
|
||||
if Setting().get('otp_force') and Setting().get('otp_field_enabled') and not user.otp_secret and session['authentication_type'] not in ['OAuth']:
|
||||
if Setting().get('otp_force') and Setting().get('otp_field_enabled') and not user.otp_secret \
|
||||
and session['authentication_type'] not in ['OAuth']:
|
||||
user.update_profile(enable_otp=True)
|
||||
user_id = current_user.id
|
||||
prepare_welcome_user(user_id)
|
||||
return redirect(url_for('index.welcome'))
|
||||
return redirect(url_for('index.login'))
|
||||
|
||||
|
||||
# Prepare user to enter /welcome screen, otherwise they won't have permission to do so
|
||||
def prepare_welcome_user(user_id):
|
||||
logout_user()
|
||||
session['welcome_user_id'] = user_id
|
||||
|
||||
|
||||
@index_bp.route('/logout')
|
||||
def logout():
|
||||
if current_app.config.get(
|
||||
'SAML_ENABLED'
|
||||
) and 'samlSessionIndex' in session and current_app.config.get(
|
||||
'SAML_LOGOUT'):
|
||||
) and 'samlSessionIndex' in session and current_app.config.get('SAML_LOGOUT'):
|
||||
req = saml.prepare_flask_request(request)
|
||||
auth = saml.init_saml_auth(req)
|
||||
if current_app.config.get('SAML_LOGOUT_URL'):
|
||||
@@ -651,13 +684,12 @@ def logout():
|
||||
|
||||
|
||||
def password_policy_check(user, password):
|
||||
|
||||
def check_policy(chars, user_password, setting):
|
||||
lenreq = int(Setting().get(setting))
|
||||
setting_as_int = int(Setting().get(setting))
|
||||
test_string = user_password
|
||||
for c in chars:
|
||||
test_string = test_string.replace(c, '')
|
||||
return (lenreq, len(user_password) - len(test_string))
|
||||
return (setting_as_int, len(user_password) - len(test_string))
|
||||
|
||||
def matches_policy(item, policy_fails):
|
||||
return "*" if item in policy_fails else ""
|
||||
@@ -704,12 +736,14 @@ def password_policy_check(user, password):
|
||||
(pwd_min_lowercase_setting, pwd_lowercase) = check_policy(string.digits, password, 'pwd_min_lowercase')
|
||||
if pwd_lowercase < pwd_min_lowercase_setting:
|
||||
policy_fails["lowercase"] = True
|
||||
policy.append(f"{matches_policy('lowercase', policy_fails)}lowercase={pwd_lowercase}/{pwd_min_lowercase_setting}")
|
||||
policy.append(
|
||||
f"{matches_policy('lowercase', policy_fails)}lowercase={pwd_lowercase}/{pwd_min_lowercase_setting}")
|
||||
# Uppercase
|
||||
(pwd_min_uppercase_setting, pwd_uppercase) = check_policy(string.digits, password, 'pwd_min_uppercase')
|
||||
if pwd_uppercase < pwd_min_uppercase_setting:
|
||||
policy_fails["uppercase"] = True
|
||||
policy.append(f"{matches_policy('uppercase', policy_fails)}uppercase={pwd_uppercase}/{pwd_min_uppercase_setting}")
|
||||
policy.append(
|
||||
f"{matches_policy('uppercase', policy_fails)}uppercase={pwd_uppercase}/{pwd_min_uppercase_setting}")
|
||||
# Special
|
||||
(pwd_min_special_setting, pwd_special) = check_policy(string.digits, password, 'pwd_min_special')
|
||||
if pwd_special < pwd_min_special_setting:
|
||||
@@ -728,7 +762,8 @@ def password_policy_check(user, password):
|
||||
pwd_complexity = result['guesses_log10']
|
||||
if pwd_complexity < pwd_min_complexity_setting:
|
||||
policy_fails["complexity"] = True
|
||||
policy.append(f"{matches_policy('complexity', policy_fails)}complexity={pwd_complexity:.0f}/{pwd_min_complexity_setting}")
|
||||
policy.append(
|
||||
f"{matches_policy('complexity', policy_fails)}complexity={pwd_complexity:.0f}/{pwd_min_complexity_setting}")
|
||||
|
||||
policy_str = {"password": f"Fails policy: {', '.join(policy)}. Items prefixed with '*' failed."}
|
||||
|
||||
@@ -738,77 +773,78 @@ def password_policy_check(user, password):
|
||||
|
||||
@index_bp.route('/register', methods=['GET', 'POST'])
|
||||
def register():
|
||||
CAPTCHA_ENABLE = current_app.config.get('CAPTCHA_ENABLE')
|
||||
if Setting().get('signup_enabled'):
|
||||
if current_user.is_authenticated:
|
||||
return redirect(url_for('index.index'))
|
||||
if request.method == 'GET':
|
||||
return render_template('register.html', captcha_enable=CAPTCHA_ENABLE)
|
||||
elif request.method == 'POST':
|
||||
username = request.form.get('username', '').strip()
|
||||
password = request.form.get('password', '')
|
||||
firstname = request.form.get('firstname', '').strip()
|
||||
lastname = request.form.get('lastname', '').strip()
|
||||
email = request.form.get('email', '').strip()
|
||||
rpassword = request.form.get('rpassword', '')
|
||||
CAPTCHA_ENABLE = current_app.config.get('CAPTCHA_ENABLE')
|
||||
if Setting().get('signup_enabled'):
|
||||
if current_user.is_authenticated:
|
||||
return redirect(url_for('index.index'))
|
||||
if request.method == 'GET':
|
||||
return render_template('register.html', captcha_enable=CAPTCHA_ENABLE)
|
||||
elif request.method == 'POST':
|
||||
username = request.form.get('username', '').strip()
|
||||
password = request.form.get('password', '')
|
||||
firstname = request.form.get('firstname', '').strip()
|
||||
lastname = request.form.get('lastname', '').strip()
|
||||
email = request.form.get('email', '').strip()
|
||||
rpassword = request.form.get('rpassword', '')
|
||||
|
||||
is_valid_email = re.compile(r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$')
|
||||
is_valid_email = re.compile(r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$')
|
||||
|
||||
error_messages = {}
|
||||
if not firstname:
|
||||
error_messages['firstname'] = 'First Name is required'
|
||||
if not lastname:
|
||||
error_messages['lastname'] = 'Last Name is required'
|
||||
if not username:
|
||||
error_messages['username'] = 'Username is required'
|
||||
if not password:
|
||||
error_messages['password'] = 'Password is required'
|
||||
if not rpassword:
|
||||
error_messages['rpassword'] = 'Password confirmation is required'
|
||||
if not email:
|
||||
error_messages['email'] = 'Email is required'
|
||||
if not is_valid_email.match(email):
|
||||
error_messages['email'] = 'Invalid email address'
|
||||
if password != rpassword:
|
||||
error_messages['password'] = 'Password confirmation does not match'
|
||||
error_messages['rpassword'] = 'Password confirmation does not match'
|
||||
error_messages = {}
|
||||
if not firstname:
|
||||
error_messages['firstname'] = 'First Name is required'
|
||||
if not lastname:
|
||||
error_messages['lastname'] = 'Last Name is required'
|
||||
if not username:
|
||||
error_messages['username'] = 'Username is required'
|
||||
if not password:
|
||||
error_messages['password'] = 'Password is required'
|
||||
if not rpassword:
|
||||
error_messages['rpassword'] = 'Password confirmation is required'
|
||||
if not email:
|
||||
error_messages['email'] = 'Email is required'
|
||||
if not is_valid_email.match(email):
|
||||
error_messages['email'] = 'Invalid email address'
|
||||
if password != rpassword:
|
||||
error_messages['password'] = 'Password confirmation does not match'
|
||||
error_messages['rpassword'] = 'Password confirmation does not match'
|
||||
|
||||
if not captcha.validate():
|
||||
return render_template(
|
||||
'register.html', error='Invalid CAPTCHA answer', error_messages=error_messages, captcha_enable=CAPTCHA_ENABLE)
|
||||
if not captcha.validate():
|
||||
return render_template(
|
||||
'register.html', error='Invalid CAPTCHA answer', error_messages=error_messages,
|
||||
captcha_enable=CAPTCHA_ENABLE)
|
||||
|
||||
if error_messages:
|
||||
return render_template('register.html', error_messages=error_messages, captcha_enable=CAPTCHA_ENABLE)
|
||||
if error_messages:
|
||||
return render_template('register.html', error_messages=error_messages, captcha_enable=CAPTCHA_ENABLE)
|
||||
|
||||
user = User(username=username,
|
||||
plain_text_password=password,
|
||||
firstname=firstname,
|
||||
lastname=lastname,
|
||||
email=email
|
||||
)
|
||||
user = User(username=username,
|
||||
plain_text_password=password,
|
||||
firstname=firstname,
|
||||
lastname=lastname,
|
||||
email=email
|
||||
)
|
||||
|
||||
(password_policy_pass, password_policy) = password_policy_check(user, password)
|
||||
if not password_policy_pass:
|
||||
return render_template('register.html', error_messages=password_policy, captcha_enable=CAPTCHA_ENABLE)
|
||||
(password_policy_pass, password_policy) = password_policy_check(user, password)
|
||||
if not password_policy_pass:
|
||||
return render_template('register.html', error_messages=password_policy, captcha_enable=CAPTCHA_ENABLE)
|
||||
|
||||
try:
|
||||
result = user.create_local_user()
|
||||
if result and result['status']:
|
||||
if Setting().get('verify_user_email'):
|
||||
send_account_verification(email)
|
||||
if Setting().get('otp_force') and Setting().get('otp_field_enabled'):
|
||||
user.update_profile(enable_otp=True)
|
||||
prepare_welcome_user(user.id)
|
||||
return redirect(url_for('index.welcome'))
|
||||
else:
|
||||
return redirect(url_for('index.login'))
|
||||
try:
|
||||
result = user.create_local_user()
|
||||
if result and result['status']:
|
||||
if Setting().get('verify_user_email'):
|
||||
send_account_verification(email)
|
||||
if Setting().get('otp_force') and Setting().get('otp_field_enabled'):
|
||||
user.update_profile(enable_otp=True)
|
||||
prepare_welcome_user(user.id)
|
||||
return redirect(url_for('index.welcome'))
|
||||
else:
|
||||
return redirect(url_for('index.login'))
|
||||
else:
|
||||
return render_template('register.html',
|
||||
error=result['msg'], captcha_enable=CAPTCHA_ENABLE)
|
||||
except Exception as e:
|
||||
return render_template('register.html', error=e, captcha_enable=CAPTCHA_ENABLE)
|
||||
else:
|
||||
return render_template('register.html',
|
||||
error=result['msg'], captcha_enable=CAPTCHA_ENABLE)
|
||||
except Exception as e:
|
||||
return render_template('register.html', error=e, captcha_enable=CAPTCHA_ENABLE)
|
||||
else:
|
||||
return render_template('errors/404.html'), 404
|
||||
return render_template('errors/404.html'), 404
|
||||
|
||||
|
||||
# Show welcome page on first login if otp_force is enabled
|
||||
@@ -827,12 +863,15 @@ def welcome():
|
||||
if otp_token and otp_token.isdigit():
|
||||
good_token = user.verify_totp(otp_token)
|
||||
if not good_token:
|
||||
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user, error="Invalid token")
|
||||
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user,
|
||||
error="Invalid token")
|
||||
else:
|
||||
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user, error="Token required")
|
||||
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user,
|
||||
error="Token required")
|
||||
session.pop('welcome_user_id')
|
||||
return redirect(url_for('index.index'))
|
||||
|
||||
|
||||
@index_bp.route('/confirm/<token>', methods=['GET'])
|
||||
def confirm_email(token):
|
||||
email = confirm_token(token)
|
||||
@@ -919,10 +958,10 @@ def dyndns_update():
|
||||
.outerjoin(Account, Domain.account_id == Account.id) \
|
||||
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
|
||||
.filter(
|
||||
db.or_(
|
||||
DomainUser.user_id == current_user.id,
|
||||
AccountUser.user_id == current_user.id
|
||||
)).all()
|
||||
db.or_(
|
||||
DomainUser.user_id == current_user.id,
|
||||
AccountUser.user_id == current_user.id
|
||||
)).all()
|
||||
except Exception as e:
|
||||
current_app.logger.error('DynDNS Error: {0}'.format(e))
|
||||
current_app.logger.debug(traceback.format_exc())
|
||||
@@ -982,13 +1021,13 @@ def dyndns_update():
|
||||
if result['status'] == 'ok':
|
||||
history = History(
|
||||
msg='DynDNS update: updated {} successfully'.format(hostname),
|
||||
detail = json.dumps({
|
||||
'domain': domain.name,
|
||||
'record': hostname,
|
||||
'type': rtype,
|
||||
'old_value': oldip,
|
||||
'new_value': str(ip)
|
||||
}),
|
||||
detail=json.dumps({
|
||||
'domain': domain.name,
|
||||
'record': hostname,
|
||||
'type': rtype,
|
||||
'old_value': oldip,
|
||||
'new_value': str(ip)
|
||||
}),
|
||||
created_by=current_user.username,
|
||||
domain_id=domain.id)
|
||||
history.add()
|
||||
@@ -999,7 +1038,7 @@ def dyndns_update():
|
||||
elif r.is_allowed_edit():
|
||||
ondemand_creation = DomainSetting.query.filter(
|
||||
DomainSetting.domain == domain).filter(
|
||||
DomainSetting.setting == 'create_via_dyndns').first()
|
||||
DomainSetting.setting == 'create_via_dyndns').first()
|
||||
if (ondemand_creation is not None) and (strtobool(
|
||||
ondemand_creation.value) == True):
|
||||
|
||||
@@ -1024,11 +1063,11 @@ def dyndns_update():
|
||||
msg=
|
||||
'DynDNS update: created record {0} in zone {1} successfully'
|
||||
.format(hostname, domain.name, str(ip)),
|
||||
detail = json.dumps({
|
||||
'domain': domain.name,
|
||||
'record': hostname,
|
||||
'value': str(ip)
|
||||
}),
|
||||
detail=json.dumps({
|
||||
'domain': domain.name,
|
||||
'record': hostname,
|
||||
'value': str(ip)
|
||||
}),
|
||||
created_by=current_user.username,
|
||||
domain_id=domain.id)
|
||||
history.add()
|
||||
@@ -1088,7 +1127,7 @@ def saml_authorized():
|
||||
req = saml.prepare_flask_request(request)
|
||||
auth = saml.init_saml_auth(req)
|
||||
auth.process_response()
|
||||
current_app.logger.debug( auth.get_attributes() )
|
||||
current_app.logger.debug(auth.get_attributes())
|
||||
errors = auth.get_errors()
|
||||
if len(errors) == 0:
|
||||
session['samlUserdata'] = auth.get_attributes()
|
||||
@@ -1097,7 +1136,7 @@ def saml_authorized():
|
||||
self_url = OneLogin_Saml2_Utils.get_self_url(req)
|
||||
self_url = self_url + req['script_name']
|
||||
if 'RelayState' in request.form and self_url != request.form[
|
||||
'RelayState']:
|
||||
'RelayState']:
|
||||
return redirect(auth.redirect_to(request.form['RelayState']))
|
||||
if current_app.config.get('SAML_ATTRIBUTE_USERNAME', False):
|
||||
username = session['samlUserdata'][
|
||||
@@ -1129,7 +1168,7 @@ def saml_authorized():
|
||||
admin_group_name = current_app.config.get('SAML_GROUP_ADMIN_NAME',
|
||||
None)
|
||||
operator_group_name = current_app.config.get('SAML_GROUP_OPERATOR_NAME',
|
||||
None)
|
||||
None)
|
||||
group_to_account_mapping = create_group_to_account_mapping()
|
||||
|
||||
if email_attribute_name in session['samlUserdata']:
|
||||
@@ -1170,13 +1209,13 @@ def saml_authorized():
|
||||
account.add_user(user)
|
||||
history = History(msg='Adding {0} to account {1}'.format(
|
||||
user.username, account.name),
|
||||
created_by='SAML Assertion')
|
||||
created_by='SAML Assertion')
|
||||
history.add()
|
||||
for account in user_accounts - saml_accounts:
|
||||
account.remove_user(user)
|
||||
history = History(msg='Removing {0} from account {1}'.format(
|
||||
user.username, account.name),
|
||||
created_by='SAML Assertion')
|
||||
created_by='SAML Assertion')
|
||||
history.add()
|
||||
if admin_attribute_name and 'true' in session['samlUserdata'].get(
|
||||
admin_attribute_name, []):
|
||||
@@ -1190,7 +1229,7 @@ def saml_authorized():
|
||||
user.role_id = Role.query.filter_by(name='User').first().id
|
||||
history = History(msg='Demoting {0} to user'.format(
|
||||
user.username),
|
||||
created_by='SAML Assertion')
|
||||
created_by='SAML Assertion')
|
||||
history.add()
|
||||
user.plain_text_password = None
|
||||
user.update_profile()
|
||||
@@ -1234,15 +1273,16 @@ def uplift_to_admin(user):
|
||||
user.role_id = Role.query.filter_by(name='Administrator').first().id
|
||||
history = History(msg='Promoting {0} to administrator'.format(
|
||||
user.username),
|
||||
created_by='SAML Assertion')
|
||||
created_by='SAML Assertion')
|
||||
history.add()
|
||||
|
||||
|
||||
def uplift_to_operator(user):
|
||||
if user.role.name != 'Operator':
|
||||
user.role_id = Role.query.filter_by(name='Operator').first().id
|
||||
history = History(msg='Promoting {0} to operator'.format(
|
||||
user.username),
|
||||
created_by='SAML Assertion')
|
||||
created_by='SAML Assertion')
|
||||
history.add()
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user