mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2024-12-04 19:15:30 +00:00
feat: Move the account parse calls to a method
This commit is contained in:
parent
eb13b37e09
commit
a87b931520
@ -136,6 +136,13 @@ class AccountNotExists(StructuredException):
|
||||
self.message = message
|
||||
self.name = name
|
||||
|
||||
class InvalidAccountNameException(StructuredException):
|
||||
status_code = 400
|
||||
|
||||
def __init__(self, name=None, message="The account name is invalid"):
|
||||
StructuredException.__init__(self)
|
||||
self.message = message
|
||||
self.name = name
|
||||
|
||||
class UserCreateFail(StructuredException):
|
||||
status_code = 500
|
||||
@ -145,7 +152,6 @@ class UserCreateFail(StructuredException):
|
||||
self.message = message
|
||||
self.name = name
|
||||
|
||||
|
||||
class UserCreateDuplicate(StructuredException):
|
||||
status_code = 409
|
||||
|
||||
|
@ -3,6 +3,7 @@ from flask import current_app
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from ..lib import utils
|
||||
from ..lib.errors import InvalidAccountNameException
|
||||
from .base import db
|
||||
from .setting import Setting
|
||||
from .user import User
|
||||
@ -22,7 +23,7 @@ class Account(db.Model):
|
||||
back_populates="accounts")
|
||||
|
||||
def __init__(self, name=None, description=None, contact=None, mail=None):
|
||||
self.name = name
|
||||
self.name = Account.sanitize_name(name) if name is not None else name
|
||||
self.description = description
|
||||
self.contact = contact
|
||||
self.mail = mail
|
||||
@ -33,13 +34,30 @@ class Account(db.Model):
|
||||
self.PDNS_VERSION = Setting().get('pdns_version')
|
||||
self.API_EXTENDED_URL = utils.pdns_api_extended_uri(self.PDNS_VERSION)
|
||||
|
||||
if self.name is not None:
|
||||
if Setting().get('account_name_extra_chars'):
|
||||
char_list = "abcdefghijklmnopqrstuvwxyz0123456789_-."
|
||||
else:
|
||||
char_list = "abcdefghijklmnopqrstuvwxyz0123456789"
|
||||
self.name = ''.join(c for c in self.name.lower()
|
||||
if c in char_list)
|
||||
|
||||
@staticmethod
|
||||
def sanitize_name(name):
|
||||
"""
|
||||
Formats the provided name to fit into the constraint
|
||||
"""
|
||||
if not isinstance(name, str):
|
||||
raise InvalidAccountNameException("Account name must be a string")
|
||||
|
||||
allowed_characters = "abcdefghijklmnopqrstuvwxyz0123456789"
|
||||
|
||||
if Setting().get('account_name_extra_chars'):
|
||||
allowed_characters += "_-."
|
||||
|
||||
sanitized_name = ''.join(c for c in name.lower() if c in allowed_characters)
|
||||
|
||||
if len(sanitized_name) > Account.name.type.length:
|
||||
current_app.logger.error("Account name {0} too long. Truncated to: {1}".format(
|
||||
sanitized_name, sanitized_name[:Account.name.type.length]))
|
||||
|
||||
if not sanitized_name:
|
||||
raise InvalidAccountNameException("Empty string is not a valid account name")
|
||||
|
||||
return sanitized_name[:Account.name.type.length]
|
||||
|
||||
def __repr__(self):
|
||||
return '<Account {0}r>'.format(self.name)
|
||||
@ -72,11 +90,9 @@ class Account(db.Model):
|
||||
"""
|
||||
Create a new account
|
||||
"""
|
||||
# Sanity check - account name
|
||||
if self.name == "":
|
||||
return {'status': False, 'msg': 'No account name specified'}
|
||||
self.name = Account.sanitize_name(self.name)
|
||||
|
||||
# check that account name is not already used
|
||||
# Check that account name is not already used
|
||||
account = Account.query.filter(Account.name == self.name).first()
|
||||
if account:
|
||||
return {'status': False, 'msg': 'Account already exists'}
|
||||
|
@ -23,7 +23,7 @@ from ..lib.errors import (
|
||||
AccountCreateFail, AccountUpdateFail, AccountDeleteFail,
|
||||
AccountCreateDuplicate, AccountNotExists,
|
||||
UserCreateFail, UserCreateDuplicate, UserUpdateFail, UserDeleteFail,
|
||||
UserUpdateFailEmail
|
||||
UserUpdateFailEmail, InvalidAccountNameException
|
||||
)
|
||||
from ..decorators import (
|
||||
api_basic_auth, api_can_create_domain, is_json, apikey_auth,
|
||||
@ -870,12 +870,15 @@ def api_create_account():
|
||||
contact = data['contact'] if 'contact' in data else None
|
||||
mail = data['mail'] if 'mail' in data else None
|
||||
if not name:
|
||||
current_app.logger.debug("Account name missing")
|
||||
abort(400)
|
||||
current_app.logger.debug("Account creation failed: name missing")
|
||||
raise InvalidAccountNameException(message="Account name missing")
|
||||
|
||||
sanitized_name = Account.sanitize_name(name)
|
||||
account_exists = Account.query.filter(Account.name == sanitized_name).all()
|
||||
|
||||
account_exists = [] or Account.query.filter(Account.name == name).all()
|
||||
if len(account_exists) > 0:
|
||||
msg = "Account {} already exists".format(name)
|
||||
msg = ("Requested Account {} would be translated to {}"
|
||||
" which already exists").format(name, sanitized_name)
|
||||
current_app.logger.debug(msg)
|
||||
raise AccountCreateDuplicate(message=msg)
|
||||
|
||||
@ -913,8 +916,9 @@ def api_update_account(account_id):
|
||||
if not account:
|
||||
abort(404)
|
||||
|
||||
if name and name != account.name:
|
||||
abort(400)
|
||||
if name and Account.sanitize_name(name) != account.name:
|
||||
msg = "Account name is immutable"
|
||||
raise AccountUpdateFail(message=msg)
|
||||
|
||||
if current_user.role.name not in ['Administrator', 'Operator']:
|
||||
msg = "User role update accounts"
|
||||
@ -1212,13 +1216,13 @@ def sync_domains():
|
||||
def health():
|
||||
domain = Domain()
|
||||
domain_to_query = domain.query.first()
|
||||
|
||||
|
||||
if not domain_to_query:
|
||||
current_app.logger.error("No domain found to query a health check")
|
||||
return make_response("Unknown", 503)
|
||||
|
||||
try:
|
||||
domain.get_domain_info(domain_to_query.name)
|
||||
domain.get_domain_info(domain_to_query.name)
|
||||
except Exception as e:
|
||||
current_app.logger.error("Health Check - Failed to query authoritative server for domain {}".format(domain_to_query.name))
|
||||
return make_response("Down", 503)
|
||||
|
@ -323,8 +323,8 @@ def login():
|
||||
# Regexp didn't match, continue to next iteration
|
||||
continue
|
||||
|
||||
account = Account()
|
||||
account_id = account.get_id_by_name(account_name=group_name)
|
||||
sanitized_group_name = Account.sanitize_name(group_name)
|
||||
account_id = account.get_id_by_name(account_name=sanitized_group_name)
|
||||
|
||||
if account_id:
|
||||
account = Account.query.get(account_id)
|
||||
@ -345,10 +345,12 @@ def login():
|
||||
current_app.logger.info('User {} added to Account {}'.format(
|
||||
user.username, account.name))
|
||||
else:
|
||||
account.name = group_name
|
||||
account.description = group_description
|
||||
account.contact = ''
|
||||
account.mail = ''
|
||||
account = Account(
|
||||
name=sanitized_group_name,
|
||||
description=group_description,
|
||||
contact='',
|
||||
mail=''
|
||||
)
|
||||
account.create_account()
|
||||
history = History(msg='Create account {0}'.format(
|
||||
account.name),
|
||||
@ -1092,18 +1094,10 @@ def create_group_to_account_mapping():
|
||||
|
||||
|
||||
def handle_account(account_name, account_description=""):
|
||||
if Setting().get('account_name_extra_chars'):
|
||||
char_list = "abcdefghijklmnopqrstuvwxyz0123456789_-."
|
||||
else:
|
||||
char_list = "abcdefghijklmnopqrstuvwxyz0123456789"
|
||||
clean_name = ''.join(c for c in account_name.lower()
|
||||
if c in char_list)
|
||||
if len(clean_name) > Account.name.type.length:
|
||||
current_app.logger.error(
|
||||
"Account name {0} too long. Truncated.".format(clean_name))
|
||||
clean_name = Account.sanitize_name(account_name)
|
||||
account = Account.query.filter_by(name=clean_name).first()
|
||||
if not account:
|
||||
account = Account(name=clean_name.lower(),
|
||||
account = Account(name=clean_name,
|
||||
description=account_description,
|
||||
contact='',
|
||||
mail='')
|
||||
|
Loading…
Reference in New Issue
Block a user