feat: Allow underscores and hyphens in account name (#1047)

This commit is contained in:
jbe-dw 2022-06-18 15:14:37 +02:00 committed by GitHub
commit 2c0225e961
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 76 additions and 45 deletions

View File

@ -136,6 +136,13 @@ class AccountNotExists(StructuredException):
self.message = message self.message = message
self.name = name self.name = name
class InvalidAccountNameException(StructuredException):
status_code = 400
def __init__(self, name=None, message="The account name is invalid"):
StructuredException.__init__(self)
self.message = message
self.name = name
class UserCreateFail(StructuredException): class UserCreateFail(StructuredException):
status_code = 500 status_code = 500
@ -145,7 +152,6 @@ class UserCreateFail(StructuredException):
self.message = message self.message = message
self.name = name self.name = name
class UserCreateDuplicate(StructuredException): class UserCreateDuplicate(StructuredException):
status_code = 409 status_code = 409

View File

@ -3,6 +3,7 @@ from flask import current_app
from urllib.parse import urljoin from urllib.parse import urljoin
from ..lib import utils from ..lib import utils
from ..lib.errors import InvalidAccountNameException
from .base import db from .base import db
from .setting import Setting from .setting import Setting
from .user import User from .user import User
@ -22,7 +23,7 @@ class Account(db.Model):
back_populates="accounts") back_populates="accounts")
def __init__(self, name=None, description=None, contact=None, mail=None): def __init__(self, name=None, description=None, contact=None, mail=None):
self.name = name self.name = Account.sanitize_name(name) if name is not None else name
self.description = description self.description = description
self.contact = contact self.contact = contact
self.mail = mail self.mail = mail
@ -33,9 +34,30 @@ class Account(db.Model):
self.PDNS_VERSION = Setting().get('pdns_version') self.PDNS_VERSION = Setting().get('pdns_version')
self.API_EXTENDED_URL = utils.pdns_api_extended_uri(self.PDNS_VERSION) self.API_EXTENDED_URL = utils.pdns_api_extended_uri(self.PDNS_VERSION)
if self.name is not None:
self.name = ''.join(c for c in self.name.lower() @staticmethod
if c in "abcdefghijklmnopqrstuvwxyz0123456789") def sanitize_name(name):
"""
Formats the provided name to fit into the constraint
"""
if not isinstance(name, str):
raise InvalidAccountNameException("Account name must be a string")
allowed_characters = "abcdefghijklmnopqrstuvwxyz0123456789"
if Setting().get('account_name_extra_chars'):
allowed_characters += "_-."
sanitized_name = ''.join(c for c in name.lower() if c in allowed_characters)
if len(sanitized_name) > Account.name.type.length:
current_app.logger.error("Account name {0} too long. Truncated to: {1}".format(
sanitized_name, sanitized_name[:Account.name.type.length]))
if not sanitized_name:
raise InvalidAccountNameException("Empty string is not a valid account name")
return sanitized_name[:Account.name.type.length]
def __repr__(self): def __repr__(self):
return '<Account {0}r>'.format(self.name) return '<Account {0}r>'.format(self.name)
@ -68,11 +90,9 @@ class Account(db.Model):
""" """
Create a new account Create a new account
""" """
# Sanity check - account name self.name = Account.sanitize_name(self.name)
if self.name == "":
return {'status': False, 'msg': 'No account name specified'}
# check that account name is not already used # Check that account name is not already used
account = Account.query.filter(Account.name == self.name).first() account = Account.query.filter(Account.name == self.name).first()
if account: if account:
return {'status': False, 'msg': 'Account already exists'} return {'status': False, 'msg': 'Account already exists'}

View File

@ -192,7 +192,8 @@ class Setting(db.Model):
'custom_css': '', 'custom_css': '',
'otp_force': False, 'otp_force': False,
'max_history_records': 1000, 'max_history_records': 1000,
'deny_domain_override': False 'deny_domain_override': False,
'account_name_extra_chars': False
} }
def __init__(self, id=None, name=None, value=None): def __init__(self, id=None, name=None, value=None):
@ -273,15 +274,15 @@ class Setting(db.Model):
def get(self, setting): def get(self, setting):
if setting in self.defaults: if setting in self.defaults:
if setting.upper() in current_app.config: if setting.upper() in current_app.config:
result = current_app.config[setting.upper()] result = current_app.config[setting.upper()]
else: else:
result = self.query.filter(Setting.name == setting).first() result = self.query.filter(Setting.name == setting).first()
if result is not None: if result is not None:
if hasattr(result,'value'): if hasattr(result,'value'):
result = result.value result = result.value
return strtobool(result) if result in [ return strtobool(result) if result in [
'True', 'False' 'True', 'False'
] else result ] else result
@ -289,7 +290,7 @@ class Setting(db.Model):
return self.defaults[setting] return self.defaults[setting]
else: else:
current_app.logger.error('Unknown setting queried: {0}'.format(setting)) current_app.logger.error('Unknown setting queried: {0}'.format(setting))
def get_records_allow_to_edit(self): def get_records_allow_to_edit(self):
return list( return list(
set(self.get_forward_records_allow_to_edit() + set(self.get_forward_records_allow_to_edit() +

View File

@ -1268,7 +1268,8 @@ def setting_basic():
'allow_user_create_domain', 'allow_user_remove_domain', 'allow_user_view_history', 'bg_domain_updates', 'site_name', 'allow_user_create_domain', 'allow_user_remove_domain', 'allow_user_view_history', 'bg_domain_updates', 'site_name',
'session_timeout', 'warn_session_timeout', 'ttl_options', 'session_timeout', 'warn_session_timeout', 'ttl_options',
'pdns_api_timeout', 'verify_ssl_connections', 'verify_user_email', 'pdns_api_timeout', 'verify_ssl_connections', 'verify_user_email',
'delete_sso_accounts', 'otp_field_enabled', 'custom_css', 'enable_api_rr_history', 'max_history_records', 'otp_force', 'deny_domain_override', 'enforce_api_ttl' 'delete_sso_accounts', 'otp_field_enabled', 'custom_css', 'enable_api_rr_history', 'max_history_records', 'otp_force',
'deny_domain_override', 'enforce_api_ttl', 'account_name_extra_chars'
] ]
return render_template('admin_setting_basic.html', settings=settings) return render_template('admin_setting_basic.html', settings=settings)

View File

@ -23,7 +23,7 @@ from ..lib.errors import (
AccountCreateFail, AccountUpdateFail, AccountDeleteFail, AccountCreateFail, AccountUpdateFail, AccountDeleteFail,
AccountCreateDuplicate, AccountNotExists, AccountCreateDuplicate, AccountNotExists,
UserCreateFail, UserCreateDuplicate, UserUpdateFail, UserDeleteFail, UserCreateFail, UserCreateDuplicate, UserUpdateFail, UserDeleteFail,
UserUpdateFailEmail UserUpdateFailEmail, InvalidAccountNameException
) )
from ..decorators import ( from ..decorators import (
api_basic_auth, api_can_create_domain, is_json, apikey_auth, api_basic_auth, api_can_create_domain, is_json, apikey_auth,
@ -870,12 +870,15 @@ def api_create_account():
contact = data['contact'] if 'contact' in data else None contact = data['contact'] if 'contact' in data else None
mail = data['mail'] if 'mail' in data else None mail = data['mail'] if 'mail' in data else None
if not name: if not name:
current_app.logger.debug("Account name missing") current_app.logger.debug("Account creation failed: name missing")
abort(400) raise InvalidAccountNameException(message="Account name missing")
sanitized_name = Account.sanitize_name(name)
account_exists = Account.query.filter(Account.name == sanitized_name).all()
account_exists = [] or Account.query.filter(Account.name == name).all()
if len(account_exists) > 0: if len(account_exists) > 0:
msg = "Account {} already exists".format(name) msg = ("Requested Account {} would be translated to {}"
" which already exists").format(name, sanitized_name)
current_app.logger.debug(msg) current_app.logger.debug(msg)
raise AccountCreateDuplicate(message=msg) raise AccountCreateDuplicate(message=msg)
@ -913,8 +916,9 @@ def api_update_account(account_id):
if not account: if not account:
abort(404) abort(404)
if name and name != account.name: if name and Account.sanitize_name(name) != account.name:
abort(400) msg = "Account name is immutable"
raise AccountUpdateFail(message=msg)
if current_user.role.name not in ['Administrator', 'Operator']: if current_user.role.name not in ['Administrator', 'Operator']:
msg = "User role update accounts" msg = "User role update accounts"
@ -1212,13 +1216,13 @@ def sync_domains():
def health(): def health():
domain = Domain() domain = Domain()
domain_to_query = domain.query.first() domain_to_query = domain.query.first()
if not domain_to_query: if not domain_to_query:
current_app.logger.error("No domain found to query a health check") current_app.logger.error("No domain found to query a health check")
return make_response("Unknown", 503) return make_response("Unknown", 503)
try: try:
domain.get_domain_info(domain_to_query.name) domain.get_domain_info(domain_to_query.name)
except Exception as e: except Exception as e:
current_app.logger.error("Health Check - Failed to query authoritative server for domain {}".format(domain_to_query.name)) current_app.logger.error("Health Check - Failed to query authoritative server for domain {}".format(domain_to_query.name))
return make_response("Down", 503) return make_response("Down", 503)

View File

@ -323,15 +323,15 @@ def login():
# Regexp didn't match, continue to next iteration # Regexp didn't match, continue to next iteration
continue continue
account = Account() sanitized_group_name = Account.sanitize_name(group_name)
account_id = account.get_id_by_name(account_name=group_name) account_id = account.get_id_by_name(account_name=sanitized_group_name)
if account_id: if account_id:
account = Account.query.get(account_id) account = Account.query.get(account_id)
# check if user has permissions # check if user has permissions
account_users = account.get_user() account_users = account.get_user()
current_app.logger.info('Group: {} Users: {}'.format( current_app.logger.info('Group: {} Users: {}'.format(
group_name, group_name,
account_users)) account_users))
if user.id in account_users: if user.id in account_users:
current_app.logger.info('User id {} is already in account {}'.format( current_app.logger.info('User id {} is already in account {}'.format(
@ -345,13 +345,15 @@ def login():
current_app.logger.info('User {} added to Account {}'.format( current_app.logger.info('User {} added to Account {}'.format(
user.username, account.name)) user.username, account.name))
else: else:
account.name = group_name account = Account(
account.description = group_description name=sanitized_group_name,
account.contact = '' description=group_description,
account.mail = '' contact='',
mail=''
)
account.create_account() account.create_account()
history = History(msg='Create account {0}'.format( history = History(msg='Create account {0}'.format(
account.name), account.name),
created_by='System') created_by='System')
history.add() history.add()
@ -401,7 +403,7 @@ def login():
if name_prop in me and desc_prop in me: if name_prop in me and desc_prop in me:
accounts_name_prop = [me[name_prop]] if type(me[name_prop]) is not list else me[name_prop] accounts_name_prop = [me[name_prop]] if type(me[name_prop]) is not list else me[name_prop]
accounts_desc_prop = [me[desc_prop]] if type(me[desc_prop]) is not list else me[desc_prop] accounts_desc_prop = [me[desc_prop]] if type(me[desc_prop]) is not list else me[desc_prop]
#Run on all groups the user is in by the index num. #Run on all groups the user is in by the index num.
for i in range(len(accounts_name_prop)): for i in range(len(accounts_name_prop)):
description = '' description = ''
@ -411,7 +413,7 @@ def login():
account_to_add.append(account) account_to_add.append(account)
user_accounts = user.get_accounts() user_accounts = user.get_accounts()
# Add accounts # Add accounts
for account in account_to_add: for account in account_to_add:
if account not in user_accounts: if account not in user_accounts:
@ -485,13 +487,13 @@ def login():
saml_enabled=SAML_ENABLED, saml_enabled=SAML_ENABLED,
error='Token required') error='Token required')
if Setting().get('autoprovisioning') and auth_method!='LOCAL': if Setting().get('autoprovisioning') and auth_method!='LOCAL':
urn_value=Setting().get('urn_value') urn_value=Setting().get('urn_value')
Entitlements=user.read_entitlements(Setting().get('autoprovisioning_attribute')) Entitlements=user.read_entitlements(Setting().get('autoprovisioning_attribute'))
if len(Entitlements)==0 and Setting().get('purge'): if len(Entitlements)==0 and Setting().get('purge'):
user.set_role("User") user.set_role("User")
user.revoke_privilege(True) user.revoke_privilege(True)
elif len(Entitlements)!=0: elif len(Entitlements)!=0:
if checkForPDAEntries(Entitlements, urn_value): if checkForPDAEntries(Entitlements, urn_value):
user.updateUser(Entitlements) user.updateUser(Entitlements)
@ -1092,14 +1094,10 @@ def create_group_to_account_mapping():
def handle_account(account_name, account_description=""): def handle_account(account_name, account_description=""):
clean_name = ''.join(c for c in account_name.lower() clean_name = Account.sanitize_name(account_name)
if c in "abcdefghijklmnopqrstuvwxyz0123456789")
if len(clean_name) > Account.name.type.length:
current_app.logger.error(
"Account name {0} too long. Truncated.".format(clean_name))
account = Account.query.filter_by(name=clean_name).first() account = Account.query.filter_by(name=clean_name).first()
if not account: if not account:
account = Account(name=clean_name.lower(), account = Account(name=clean_name,
description=account_description, description=account_description,
contact='', contact='',
mail='') mail='')

View File

@ -49,7 +49,7 @@
<span class="fa fa-cog form-control-feedback"></span> <span class="fa fa-cog form-control-feedback"></span>
{% if invalid_accountname %} {% if invalid_accountname %}
<span class="help-block">Cannot be blank and must only contain alphanumeric <span class="help-block">Cannot be blank and must only contain alphanumeric
characters.</span> characters{% if SETTING.get('account_name_extra_chars') %}, dots, hyphens or underscores{% endif %}.</span>
{% elif duplicate_accountname %} {% elif duplicate_accountname %}
<span class="help-block">Account name already in use.</span> <span class="help-block">Account name already in use.</span>
{% endif %} {% endif %}
@ -112,8 +112,9 @@
</p> </p>
<p>Fill in all the fields to the in the form to the left.</p> <p>Fill in all the fields to the in the form to the left.</p>
<p> <p>
<strong>Name</strong> is an account identifier. It will be stored as all lowercase letters (no <strong>Name</strong> is an account identifier. It will be lowercased and can contain alphanumeric
spaces, special characters etc).<br /> characters{% if SETTING.get('account_name_extra_chars') %}, dots, hyphens and underscores (no space or other special character is allowed)
{% else %} (no extra character is allowed){% endif %}.<br />
<strong>Description</strong> is a user friendly name for this account.<br /> <strong>Description</strong> is a user friendly name for this account.<br />
<strong>Contact person</strong> is the name of a contact person at the account.<br /> <strong>Contact person</strong> is the name of a contact person at the account.<br />
<strong>Mail Address</strong> is an e-mail address for the contact person. <strong>Mail Address</strong> is an e-mail address for the contact person.