import json import datetime import traceback import re from base64 import b64encode from ast import literal_eval from flask import Blueprint, render_template, make_response, url_for, current_app, request, redirect, jsonify, abort, flash, session from flask_login import login_required, current_user from ..decorators import operator_role_required, admin_role_required, history_access_required from ..models.user import User from ..models.account import Account from ..models.account_user import AccountUser from ..models.role import Role from ..models.server import Server from ..models.setting import Setting from ..models.history import History from ..models.domain import Domain from ..models.domain_user import DomainUser from ..models.record import Record from ..models.domain_template import DomainTemplate from ..models.domain_template_record import DomainTemplateRecord from ..models.api_key import ApiKey from ..models.base import db from ..lib.schema import ApiPlainKeySchema apikey_plain_schema = ApiPlainKeySchema(many=True) admin_bp = Blueprint('admin', __name__, template_folder='templates', url_prefix='/admin') @admin_bp.before_request def before_request(): # Manage session timeout session.permanent = True current_app.permanent_session_lifetime = datetime.timedelta( minutes=int(Setting().get('session_timeout'))) session.modified = True @admin_bp.route('/pdns', methods=['GET']) @login_required @operator_role_required def pdns_stats(): if not Setting().get('pdns_api_url') or not Setting().get( 'pdns_api_key') or not Setting().get('pdns_version'): return redirect(url_for('admin.setting_pdns')) domains = Domain.query.all() users = User.query.all() server = Server(server_id='localhost') configs = server.get_config() statistics = server.get_statistic() history_number = History.query.count() if statistics: uptime = list([ uptime for uptime in statistics if uptime['name'] == 'uptime' ])[0]['value'] else: uptime = 0 return render_template('admin_pdns_stats.html', domains=domains, users=users, configs=configs, statistics=statistics, uptime=uptime, history_number=history_number) @admin_bp.route('/user/edit/', methods=['GET', 'POST']) @admin_bp.route('/user/edit', methods=['GET', 'POST']) @login_required @operator_role_required def edit_user(user_username=None): if user_username: user = User.query.filter(User.username == user_username).first() create = False if not user: return render_template('errors/404.html'), 404 if user.role.name == 'Administrator' and current_user.role.name != 'Administrator': return render_template('errors/401.html'), 401 else: user = None create = True if request.method == 'GET': return render_template('admin_edit_user.html', user=user, create=create) elif request.method == 'POST': fdata = request.form if create: user_username = fdata.get('username', '').strip() user = User(username=user_username, plain_text_password=fdata.get('password', ''), firstname=fdata.get('firstname', '').strip(), lastname=fdata.get('lastname', '').strip(), email=fdata.get('email', '').strip(), reload_info=False) if create: if not fdata.get('password', ''): return render_template('admin_edit_user.html', user=user, create=create, blank_password=True) result = user.create_local_user() history = History(msg='Created user {0}'.format(user.username), created_by=current_user.username) else: result = user.update_local_user() history = History(msg='Updated user {0}'.format(user.username), created_by=current_user.username) if result['status']: history.add() return redirect(url_for('admin.manage_user')) return render_template('admin_edit_user.html', user=user, create=create, error=result['msg']) @admin_bp.route('/key/edit/', methods=['GET', 'POST']) @admin_bp.route('/key/edit', methods=['GET', 'POST']) @login_required @operator_role_required def edit_key(key_id=None): domains = Domain.query.all() roles = Role.query.all() apikey = None create = True plain_key = None if key_id: apikey = ApiKey.query.filter(ApiKey.id == key_id).first() create = False if not apikey: return render_template('errors/404.html'), 404 if request.method == 'GET': return render_template('admin_edit_key.html', key=apikey, domains=domains, roles=roles, create=create) if request.method == 'POST': fdata = request.form description = fdata['description'] role = fdata.getlist('key_role')[0] doamin_list = fdata.getlist('key_multi_domain') # Create new apikey if create: domain_obj_list = Domain.query.filter(Domain.name.in_(doamin_list)).all() apikey = ApiKey(desc=description, role_name=role, domains=domain_obj_list) try: apikey.create() except Exception as e: current_app.logger.error('Error: {0}'.format(e)) raise ApiKeyCreateFail(message='Api key create failed') plain_key = apikey_plain_schema.dump([apikey])[0]["plain_key"] plain_key = b64encode(plain_key.encode('utf-8')).decode('utf-8') history_message = "Created API key {0}".format(apikey.id) # Update existing apikey else: try: apikey.update(role,description,doamin_list) history_message = "Updated API key {0}".format(apikey.id) except Exception as e: current_app.logger.error('Error: {0}'.format(e)) history = History(msg=history_message, detail=str({ 'key': apikey.id, 'role': apikey.role.name, 'description': apikey.description, 'domain_acl': [domain.name for domain in apikey.domains] }), created_by=current_user.username) history.add() return render_template('admin_edit_key.html', key=apikey, domains=domains, roles=roles, create=create, plain_key=plain_key) @admin_bp.route('/manage-keys', methods=['GET', 'POST']) @login_required @operator_role_required def manage_keys(): if request.method == 'GET': try: apikeys = ApiKey.query.all() except Exception as e: current_app.logger.error('Error: {0}'.format(e)) abort(500) return render_template('admin_manage_keys.html', keys=apikeys) elif request.method == 'POST': jdata = request.json if jdata['action'] == 'delete_key': apikey = ApiKey.query.get(jdata['data']) try: history_apikey_id = apikey.id history_apikey_role = apikey.role.name history_apikey_description = apikey.description history_apikey_domains = [ domain.name for domain in apikey.domains] apikey.delete() except Exception as e: current_app.logger.error('Error: {0}'.format(e)) current_app.logger.info('Delete API key {0}'.format(apikey.id)) history = History(msg='Delete API key {0}'.format(apikey.id), detail=str({ 'key': history_apikey_id, 'role': history_apikey_role, 'description': history_apikey_description, 'domains': history_apikey_domains }), created_by=current_user.username) history.add() return make_response( jsonify({ 'status': 'ok', 'msg': 'Key has been removed.' }), 200) @admin_bp.route('/manage-user', methods=['GET', 'POST']) @login_required @operator_role_required def manage_user(): if request.method == 'GET': roles = Role.query.all() users = User.query.order_by(User.username).all() return render_template('admin_manage_user.html', users=users, roles=roles) if request.method == 'POST': # # post data should in format # {'action': 'delete_user', 'data': 'username'} # try: jdata = request.json data = jdata['data'] if jdata['action'] == 'user_otp_disable': user = User(username=data) result = user.update_profile(enable_otp=False) if result: history = History( msg='Two factor authentication disabled for user {0}'. format(data), created_by=current_user.username) history.add() return make_response( jsonify({ 'status': 'ok', 'msg': 'Two factor authentication has been disabled for user.' }), 200) else: return make_response( jsonify({ 'status': 'error', 'msg': 'Cannot disable two factor authentication for user.' }), 500) elif jdata['action'] == 'delete_user': user = User(username=data) if user.username == current_user.username: return make_response( jsonify({ 'status': 'error', 'msg': 'You cannot delete yourself.' }), 400) # Remove account associations first user_accounts = Account.query.join(AccountUser).join( User).filter(AccountUser.user_id == user.id, AccountUser.account_id == Account.id).all() for uc in user_accounts: uc.revoke_privileges_by_id(user.id) # Then delete the user result = user.delete() if result: history = History(msg='Delete user {0}'.format(data), created_by=current_user.username) history.add() return make_response( jsonify({ 'status': 'ok', 'msg': 'User has been removed.' }), 200) else: return make_response( jsonify({ 'status': 'error', 'msg': 'Cannot remove user.' }), 500) elif jdata['action'] == 'revoke_user_privileges': user = User(username=data) result = user.revoke_privilege() if result: history = History( msg='Revoke {0} user privileges'.format(data), created_by=current_user.username) history.add() return make_response( jsonify({ 'status': 'ok', 'msg': 'Revoked user privileges.' }), 200) else: return make_response( jsonify({ 'status': 'error', 'msg': 'Cannot revoke user privilege.' }), 500) elif jdata['action'] == 'update_user_role': username = data['username'] role_name = data['role_name'] if username == current_user.username: return make_response( jsonify({ 'status': 'error', 'msg': 'You cannot change you own roles.' }), 400) user = User.query.filter(User.username == username).first() if not user: return make_response( jsonify({ 'status': 'error', 'msg': 'User does not exist.' }), 404) if user.role.name == 'Administrator' and current_user.role.name != 'Administrator': return make_response( jsonify({ 'status': 'error', 'msg': 'You do not have permission to change Administrator users role.' }), 400) if role_name == 'Administrator' and current_user.role.name != 'Administrator': return make_response( jsonify({ 'status': 'error', 'msg': 'You do not have permission to promote a user to Administrator role.' }), 400) user = User(username=username) result = user.set_role(role_name) if result['status']: history = History( msg='Change user role of {0} to {1}'.format( username, role_name), created_by=current_user.username) history.add() return make_response( jsonify({ 'status': 'ok', 'msg': 'Changed user role successfully.' }), 200) else: return make_response( jsonify({ 'status': 'error', 'msg': 'Cannot change user role. {0}'.format( result['msg']) }), 500) else: return make_response( jsonify({ 'status': 'error', 'msg': 'Action not supported.' }), 400) except Exception as e: current_app.logger.error( 'Cannot update user. Error: {0}'.format(e)) current_app.logger.debug(traceback.format_exc()) return make_response( jsonify({ 'status': 'error', 'msg': 'There is something wrong, please contact Administrator.' }), 400) @admin_bp.route('/account/edit/', methods=['GET', 'POST']) @admin_bp.route('/account/edit', methods=['GET', 'POST']) @login_required @operator_role_required def edit_account(account_name=None): users = User.query.all() if request.method == 'GET': if account_name is None: return render_template('admin_edit_account.html', account_user_ids=[], users=users, create=1) else: account = Account.query.filter( Account.name == account_name).first() account_user_ids = account.get_user() return render_template('admin_edit_account.html', account=account, account_user_ids=account_user_ids, users=users, create=0) if request.method == 'POST': fdata = request.form new_user_list = request.form.getlist('account_multi_user') # on POST, synthesize account and account_user_ids from form data if not account_name: account_name = fdata['accountname'] account = Account(name=account_name, description=fdata['accountdescription'], contact=fdata['accountcontact'], mail=fdata['accountmail']) account_user_ids = [] for username in new_user_list: userid = User(username=username).get_user_info_by_username().id account_user_ids.append(userid) create = int(fdata['create']) if create: # account __init__ sanitizes and lowercases the name, so to manage expectations # we let the user reenter the name until it's not empty and it's valid (ignoring the case) if account.name == "" or account.name != account_name.lower(): return render_template('admin_edit_account.html', account=account, account_user_ids=account_user_ids, users=users, create=create, invalid_accountname=True) if Account.query.filter(Account.name == account.name).first(): return render_template('admin_edit_account.html', account=account, account_user_ids=account_user_ids, users=users, create=create, duplicate_accountname=True) result = account.create_account() history = History(msg='Create account {0}'.format(account.name), created_by=current_user.username) else: result = account.update_account() history = History(msg='Update account {0}'.format(account.name), created_by=current_user.username) if result['status']: account.grant_privileges(new_user_list) history.add() return redirect(url_for('admin.manage_account')) return render_template('admin_edit_account.html', account=account, account_user_ids=account_user_ids, users=users, create=create, error=result['msg']) @admin_bp.route('/manage-account', methods=['GET', 'POST']) @login_required @operator_role_required def manage_account(): if request.method == 'GET': accounts = Account.query.order_by(Account.name).all() for account in accounts: account.user_num = AccountUser.query.filter( AccountUser.account_id == account.id).count() return render_template('admin_manage_account.html', accounts=accounts) if request.method == 'POST': # # post data should in format # {'action': 'delete_account', 'data': 'accountname'} # try: jdata = request.json data = jdata['data'] if jdata['action'] == 'delete_account': account = Account.query.filter(Account.name == data).first() if not account: return make_response( jsonify({ 'status': 'error', 'msg': 'Account not found.' }), 404) # Remove account association from domains first for domain in account.domains: Domain(name=domain.name).assoc_account(None) # Then delete the account result = account.delete_account() if result: history = History(msg='Delete account {0}'.format(data), created_by=current_user.username) history.add() return make_response( jsonify({ 'status': 'ok', 'msg': 'Account has been removed.' }), 200) else: return make_response( jsonify({ 'status': 'error', 'msg': 'Cannot remove account.' }), 500) else: return make_response( jsonify({ 'status': 'error', 'msg': 'Action not supported.' }), 400) except Exception as e: current_app.logger.error( 'Cannot update account. Error: {0}'.format(e)) current_app.logger.debug(traceback.format_exc()) return make_response( jsonify({ 'status': 'error', 'msg': 'There is something wrong, please contact Administrator.' }), 400) @admin_bp.route('/history', methods=['GET', 'POST']) @login_required @history_access_required def history(): if request.method == 'POST': if current_user.role.name != 'Administrator': return make_response( jsonify({ 'status': 'error', 'msg': 'You do not have permission to remove history.' }), 401) h = History() result = h.remove_all() if result: history = History(msg='Remove all histories', created_by=current_user.username) history.add() return make_response( jsonify({ 'status': 'ok', 'msg': 'Changed user role successfully.' }), 200) else: return make_response( jsonify({ 'status': 'error', 'msg': 'Can not remove histories.' }), 500) if request.method == 'GET': if current_user.role.name in [ 'Administrator', 'Operator' ]: histories = History.query.all() else: # if the user isn't an administrator or operator, # allow_user_view_history must be enabled to get here, # so include history for the domains for the user histories = db.session.query(History) \ .join(Domain, History.domain_id == Domain.id) \ .outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \ .outerjoin(Account, Domain.account_id == Account.id) \ .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ .filter( db.or_( DomainUser.user_id == current_user.id, AccountUser.user_id == current_user.id )) return render_template('admin_history.html', histories=histories) @admin_bp.route('/setting/basic', methods=['GET']) @login_required @operator_role_required def setting_basic(): if request.method == 'GET': settings = [ 'maintenance', 'fullscreen_layout', 'record_helper', 'login_ldap_first', 'default_record_table_size', 'default_domain_table_size', 'auto_ptr', 'record_quick_edit', 'pretty_ipv6_ptr', 'dnssec_admins_only', 'allow_user_create_domain', 'allow_user_remove_domain', 'allow_user_view_history', 'bg_domain_updates', 'site_name', 'session_timeout', 'warn_session_timeout', 'ttl_options', 'pdns_api_timeout', 'verify_ssl_connections', 'verify_user_email', 'otp_field_enabled', 'custom_css', 'enable_api_rr_history' ] return render_template('admin_setting_basic.html', settings=settings) @admin_bp.route('/setting/basic//edit', methods=['POST']) @login_required @operator_role_required def setting_basic_edit(setting): jdata = request.json new_value = jdata['value'] result = Setting().set(setting, new_value) if (result): return make_response( jsonify({ 'status': 'ok', 'msg': 'Toggled setting successfully.' }), 200) else: return make_response( jsonify({ 'status': 'error', 'msg': 'Unable to toggle setting.' }), 500) @admin_bp.route('/setting/basic//toggle', methods=['POST']) @login_required @operator_role_required def setting_basic_toggle(setting): result = Setting().toggle(setting) if (result): return make_response( jsonify({ 'status': 'ok', 'msg': 'Toggled setting successfully.' }), 200) else: return make_response( jsonify({ 'status': 'error', 'msg': 'Unable to toggle setting.' }), 500) @admin_bp.route('/setting/pdns', methods=['GET', 'POST']) @login_required @admin_role_required def setting_pdns(): if request.method == 'GET': pdns_api_url = Setting().get('pdns_api_url') pdns_api_key = Setting().get('pdns_api_key') pdns_version = Setting().get('pdns_version') return render_template('admin_setting_pdns.html', pdns_api_url=pdns_api_url, pdns_api_key=pdns_api_key, pdns_version=pdns_version) elif request.method == 'POST': pdns_api_url = request.form.get('pdns_api_url') pdns_api_key = request.form.get('pdns_api_key') pdns_version = request.form.get('pdns_version') Setting().set('pdns_api_url', pdns_api_url) Setting().set('pdns_api_key', pdns_api_key) Setting().set('pdns_version', pdns_version) return render_template('admin_setting_pdns.html', pdns_api_url=pdns_api_url, pdns_api_key=pdns_api_key, pdns_version=pdns_version) @admin_bp.route('/setting/dns-records', methods=['GET', 'POST']) @login_required @operator_role_required def setting_records(): if request.method == 'GET': _fr = Setting().get('forward_records_allow_edit') _rr = Setting().get('reverse_records_allow_edit') f_records = literal_eval(_fr) if isinstance(_fr, str) else _fr r_records = literal_eval(_rr) if isinstance(_rr, str) else _rr return render_template('admin_setting_records.html', f_records=f_records, r_records=r_records) elif request.method == 'POST': fr = {} rr = {} records = Setting().defaults['forward_records_allow_edit'] for r in records: fr[r] = True if request.form.get('fr_{0}'.format( r.lower())) else False rr[r] = True if request.form.get('rr_{0}'.format( r.lower())) else False Setting().set('forward_records_allow_edit', str(fr)) Setting().set('reverse_records_allow_edit', str(rr)) return redirect(url_for('admin.setting_records')) def has_an_auth_method(local_db_enabled=None, ldap_enabled=None, google_oauth_enabled=None, github_oauth_enabled=None, oidc_oauth_enabled=None, azure_oauth_enabled=None): if local_db_enabled is None: local_db_enabled = Setting().get('local_db_enabled') if ldap_enabled is None: ldap_enabled = Setting().get('ldap_enabled') if google_oauth_enabled is None: google_oauth_enabled = Setting().get('google_oauth_enabled') if github_oauth_enabled is None: github_oauth_enabled = Setting().get('github_oauth_enabled') if oidc_oauth_enabled is None: oidc_oauth_enabled = Setting().get('oidc_oauth_enabled') if azure_oauth_enabled is None: azure_oauth_enabled = Setting().get('azure_oauth_enabled') return local_db_enabled or ldap_enabled or google_oauth_enabled or github_oauth_enabled or oidc_oauth_enabled or azure_oauth_enabled @admin_bp.route('/setting/authentication', methods=['GET', 'POST']) @login_required @admin_role_required def setting_authentication(): if request.method == 'GET': return render_template('admin_setting_authentication.html') elif request.method == 'POST': conf_type = request.form.get('config_tab') result = None if conf_type == 'general': local_db_enabled = True if request.form.get( 'local_db_enabled') else False signup_enabled = True if request.form.get( 'signup_enabled', ) else False if not has_an_auth_method(local_db_enabled=local_db_enabled): result = { 'status': False, 'msg': 'Must have at least one authentication method enabled.' } else: Setting().set('local_db_enabled', local_db_enabled) Setting().set('signup_enabled', signup_enabled) result = {'status': True, 'msg': 'Saved successfully'} elif conf_type == 'ldap': ldap_enabled = True if request.form.get('ldap_enabled') else False if not has_an_auth_method(ldap_enabled=ldap_enabled): result = { 'status': False, 'msg': 'Must have at least one authentication method enabled.' } else: Setting().set('ldap_enabled', ldap_enabled) Setting().set('ldap_type', request.form.get('ldap_type')) Setting().set('ldap_uri', request.form.get('ldap_uri')) Setting().set('ldap_base_dn', request.form.get('ldap_base_dn')) Setting().set('ldap_admin_username', request.form.get('ldap_admin_username')) Setting().set('ldap_admin_password', request.form.get('ldap_admin_password')) Setting().set('ldap_filter_basic', request.form.get('ldap_filter_basic')) Setting().set('ldap_filter_group', request.form.get('ldap_filter_group')) Setting().set('ldap_filter_username', request.form.get('ldap_filter_username')) Setting().set('ldap_filter_groupname', request.form.get('ldap_filter_groupname')) Setting().set( 'ldap_sg_enabled', True if request.form.get('ldap_sg_enabled') == 'ON' else False) Setting().set('ldap_admin_group', request.form.get('ldap_admin_group')) Setting().set('ldap_operator_group', request.form.get('ldap_operator_group')) Setting().set('ldap_user_group', request.form.get('ldap_user_group')) Setting().set('ldap_domain', request.form.get('ldap_domain')) Setting().set( 'autoprovisioning', True if request.form.get('autoprovisioning') == 'ON' else False) Setting().set('autoprovisioning_attribute', request.form.get('autoprovisioning_attribute')) if request.form.get('autoprovisioning')=='ON': if validateURN(request.form.get('urn_value')): Setting().set('urn_value', request.form.get('urn_value')) else: return render_template('admin_setting_authentication.html', error="Invalid urn") else: Setting().set('urn_value', request.form.get('urn_value')) Setting().set('purge', True if request.form.get('purge') == 'ON' else False) result = {'status': True, 'msg': 'Saved successfully'} elif conf_type == 'google': google_oauth_enabled = True if request.form.get( 'google_oauth_enabled') else False if not has_an_auth_method(google_oauth_enabled=google_oauth_enabled): result = { 'status': False, 'msg': 'Must have at least one authentication method enabled.' } else: Setting().set('google_oauth_enabled', google_oauth_enabled) Setting().set('google_oauth_client_id', request.form.get('google_oauth_client_id')) Setting().set('google_oauth_client_secret', request.form.get('google_oauth_client_secret')) Setting().set('google_token_url', request.form.get('google_token_url')) Setting().set('google_oauth_scope', request.form.get('google_oauth_scope')) Setting().set('google_authorize_url', request.form.get('google_authorize_url')) Setting().set('google_base_url', request.form.get('google_base_url')) result = { 'status': True, 'msg': 'Saved successfully. Please reload PDA to take effect.' } elif conf_type == 'github': github_oauth_enabled = True if request.form.get( 'github_oauth_enabled') else False if not has_an_auth_method(github_oauth_enabled=github_oauth_enabled): result = { 'status': False, 'msg': 'Must have at least one authentication method enabled.' } else: Setting().set('github_oauth_enabled', github_oauth_enabled) Setting().set('github_oauth_key', request.form.get('github_oauth_key')) Setting().set('github_oauth_secret', request.form.get('github_oauth_secret')) Setting().set('github_oauth_scope', request.form.get('github_oauth_scope')) Setting().set('github_oauth_api_url', request.form.get('github_oauth_api_url')) Setting().set('github_oauth_token_url', request.form.get('github_oauth_token_url')) Setting().set('github_oauth_authorize_url', request.form.get('github_oauth_authorize_url')) result = { 'status': True, 'msg': 'Saved successfully. Please reload PDA to take effect.' } elif conf_type == 'azure': azure_oauth_enabled = True if request.form.get( 'azure_oauth_enabled') else False if not has_an_auth_method(azure_oauth_enabled=azure_oauth_enabled): result = { 'status': False, 'msg': 'Must have at least one authentication method enabled.' } else: Setting().set('azure_oauth_enabled', azure_oauth_enabled) Setting().set('azure_oauth_key', request.form.get('azure_oauth_key')) Setting().set('azure_oauth_secret', request.form.get('azure_oauth_secret')) Setting().set('azure_oauth_scope', request.form.get('azure_oauth_scope')) Setting().set('azure_oauth_api_url', request.form.get('azure_oauth_api_url')) Setting().set('azure_oauth_token_url', request.form.get('azure_oauth_token_url')) Setting().set('azure_oauth_authorize_url', request.form.get('azure_oauth_authorize_url')) Setting().set( 'azure_sg_enabled', True if request.form.get('azure_sg_enabled') == 'ON' else False) Setting().set('azure_admin_group', request.form.get('azure_admin_group')) Setting().set('azure_operator_group', request.form.get('azure_operator_group')) Setting().set('azure_user_group', request.form.get('azure_user_group')) Setting().set( 'azure_group_accounts_enabled', True if request.form.get('azure_group_accounts_enabled') == 'ON' else False) Setting().set('azure_group_accounts_name', request.form.get('azure_group_accounts_name')) Setting().set('azure_group_accounts_name_re', request.form.get('azure_group_accounts_name_re')) Setting().set('azure_group_accounts_description', request.form.get('azure_group_accounts_description')) Setting().set('azure_group_accounts_description_re', request.form.get('azure_group_accounts_description_re')) result = { 'status': True, 'msg': 'Saved successfully. Please reload PDA to take effect.' } elif conf_type == 'oidc': oidc_oauth_enabled = True if request.form.get( 'oidc_oauth_enabled') else False if not has_an_auth_method(oidc_oauth_enabled=oidc_oauth_enabled): result = { 'status': False, 'msg': 'Must have at least one authentication method enabled.' } else: Setting().set( 'oidc_oauth_enabled', True if request.form.get('oidc_oauth_enabled') else False) Setting().set('oidc_oauth_key', request.form.get('oidc_oauth_key')) Setting().set('oidc_oauth_secret', request.form.get('oidc_oauth_secret')) Setting().set('oidc_oauth_scope', request.form.get('oidc_oauth_scope')) Setting().set('oidc_oauth_api_url', request.form.get('oidc_oauth_api_url')) Setting().set('oidc_oauth_token_url', request.form.get('oidc_oauth_token_url')) Setting().set('oidc_oauth_authorize_url', request.form.get('oidc_oauth_authorize_url')) Setting().set('oidc_oauth_logout_url', request.form.get('oidc_oauth_logout_url')) Setting().set('oidc_oauth_username', request.form.get('oidc_oauth_username')) Setting().set('oidc_oauth_firstname', request.form.get('oidc_oauth_firstname')) Setting().set('oidc_oauth_last_name', request.form.get('oidc_oauth_last_name')) Setting().set('oidc_oauth_email', request.form.get('oidc_oauth_email')) Setting().set('oidc_oauth_account_name_property', request.form.get('oidc_oauth_account_name_property')) Setting().set('oidc_oauth_account_description_property', request.form.get('oidc_oauth_account_description_property')) result = { 'status': True, 'msg': 'Saved successfully. Please reload PDA to take effect.' } else: return abort(400) return render_template('admin_setting_authentication.html', result=result) @admin_bp.route('/templates', methods=['GET', 'POST']) @admin_bp.route('/templates/list', methods=['GET', 'POST']) @login_required @operator_role_required def templates(): templates = DomainTemplate.query.all() return render_template('template.html', templates=templates) @admin_bp.route('/template/create', methods=['GET', 'POST']) @login_required @operator_role_required def create_template(): if request.method == 'GET': return render_template('template_add.html') if request.method == 'POST': try: name = request.form.getlist('name')[0] description = request.form.getlist('description')[0] if ' ' in name or not name or not type: flash("Please correct your input", 'error') return redirect(url_for('admin.create_template')) if DomainTemplate.query.filter( DomainTemplate.name == name).first(): flash( "A template with the name {0} already exists!".format( name), 'error') return redirect(url_for('admin.create_template')) t = DomainTemplate(name=name, description=description) result = t.create() if result['status'] == 'ok': history = History(msg='Add domain template {0}'.format(name), detail=str({ 'name': name, 'description': description }), created_by=current_user.username) history.add() return redirect(url_for('admin.templates')) else: flash(result['msg'], 'error') return redirect(url_for('admin.create_template')) except Exception as e: current_app.logger.error( 'Cannot create domain template. Error: {0}'.format(e)) current_app.logger.debug(traceback.format_exc()) abort(500) @admin_bp.route('/template/create-from-zone', methods=['POST']) @login_required @operator_role_required def create_template_from_zone(): try: jdata = request.json name = jdata['name'] description = jdata['description'] domain_name = jdata['domain'] if ' ' in name or not name or not type: return make_response( jsonify({ 'status': 'error', 'msg': 'Please correct template name' }), 400) if DomainTemplate.query.filter(DomainTemplate.name == name).first(): return make_response( jsonify({ 'status': 'error', 'msg': 'A template with the name {0} already exists!'.format(name) }), 409) t = DomainTemplate(name=name, description=description) result = t.create() if result['status'] == 'ok': history = History(msg='Add domain template {0}'.format(name), detail=str({ 'name': name, 'description': description }), created_by=current_user.username) history.add() # After creating the domain in Domain Template in the, # local DB. We add records into it Record Template. records = [] domain = Domain.query.filter(Domain.name == domain_name).first() if domain: # Query zone's rrsets from PowerDNS API rrsets = Record().get_rrsets(domain.name) if rrsets: for r in rrsets: name = '@' if r['name'] == domain_name + '.' else r[ 'name'].replace('.{}.'.format(domain_name), '') for record in r['records']: t_record = DomainTemplateRecord( name=name, type=r['type'], status=False if record['disabled'] else True, ttl=r['ttl'], data=record['content']) records.append(t_record) result = t.replace_records(records) if result['status'] == 'ok': return make_response( jsonify({ 'status': 'ok', 'msg': result['msg'] }), 200) else: # Revert the domain template (remove it) # ff we cannot add records. t.delete_template() return make_response( jsonify({ 'status': 'error', 'msg': result['msg'] }), 500) else: return make_response( jsonify({ 'status': 'error', 'msg': result['msg'] }), 500) except Exception as e: current_app.logger.error( 'Cannot create template from zone. Error: {0}'.format(e)) current_app.logger.debug(traceback.format_exc()) return make_response( jsonify({ 'status': 'error', 'msg': 'Error when applying new changes' }), 500) @admin_bp.route('/template//edit', methods=['GET']) @login_required @operator_role_required def edit_template(template): try: t = DomainTemplate.query.filter( DomainTemplate.name == template).first() records_allow_to_edit = Setting().get_records_allow_to_edit() quick_edit = Setting().get('record_quick_edit') ttl_options = Setting().get_ttl_options() if t is not None: records = [] for jr in t.records: if jr.type in records_allow_to_edit: record = DomainTemplateRecord( name=jr.name, type=jr.type, status='Active' if jr.status else 'Disabled', ttl=jr.ttl, data=jr.data, comment=jr.comment if jr.comment else '') records.append(record) return render_template('template_edit.html', template=t.name, records=records, editable_records=records_allow_to_edit, quick_edit=quick_edit, ttl_options=ttl_options) except Exception as e: current_app.logger.error( 'Cannot open domain template page. DETAIL: {0}'.format(e)) current_app.logger.debug(traceback.format_exc()) abort(500) return redirect(url_for('admin.templates')) @admin_bp.route('/template//apply', methods=['POST'], strict_slashes=False) @login_required def apply_records(template): try: jdata = request.json records = [] for j in jdata['records']: name = '@' if j['record_name'] in ['@', ''] else j['record_name'] type = j['record_type'] data = j['record_data'] comment = j['record_comment'] status = 0 if j['record_status'] == 'Disabled' else 1 ttl = int(j['record_ttl']) if j['record_ttl'] else 3600 dtr = DomainTemplateRecord(name=name, type=type, data=data, comment=comment, status=status, ttl=ttl) records.append(dtr) t = DomainTemplate.query.filter( DomainTemplate.name == template).first() result = t.replace_records(records) if result['status'] == 'ok': jdata.pop('_csrf_token', None) # don't store csrf token in the history. history = History( msg='Apply domain template record changes to domain template {0}' .format(template), detail=str(json.dumps(jdata)), created_by=current_user.username) history.add() return make_response(jsonify(result), 200) else: return make_response(jsonify(result), 400) except Exception as e: current_app.logger.error( 'Cannot apply record changes to the template. Error: {0}'.format( e)) current_app.logger.debug(traceback.format_exc()) return make_response( jsonify({ 'status': 'error', 'msg': 'Error when applying new changes' }), 500) @admin_bp.route('/template//delete', methods=['POST']) @login_required @operator_role_required def delete_template(template): try: t = DomainTemplate.query.filter( DomainTemplate.name == template).first() if t is not None: result = t.delete_template() if result['status'] == 'ok': history = History( msg='Deleted domain template {0}'.format(template), detail=str({'name': template}), created_by=current_user.username) history.add() return redirect(url_for('admin.templates')) else: flash(result['msg'], 'error') return redirect(url_for('admin.templates')) except Exception as e: current_app.logger.error( 'Cannot delete template. Error: {0}'.format(e)) current_app.logger.debug(traceback.format_exc()) abort(500) return redirect(url_for('admin.templates')) @admin_bp.route('/global-search', methods=['GET']) @login_required @operator_role_required def global_search(): if request.method == 'GET': domains = [] records = [] comments = [] query = request.args.get('q') if query: server = Server(server_id='localhost') results = server.global_search(object_type='all', query=query) # Format the search result for result in results: if result['object_type'] == 'zone': # Remove the dot at the end of string result['name'] = result['name'][:-1] domains.append(result) elif result['object_type'] == 'record': # Remove the dot at the end of string result['name'] = result['name'][:-1] result['zone_id'] = result['zone_id'][:-1] records.append(result) elif result['object_type'] == 'comment': # Get the actual record name, exclude the domain part result['name'] = result['name'].replace(result['zone_id'], '') if result['name']: result['name'] = result['name'][:-1] else: result['name'] = '@' # Remove the dot at the end of string result['zone_id'] = result['zone_id'][:-1] comments.append(result) else: pass return render_template('admin_global_search.html', domains=domains, records=records, comments=comments) def validateURN(value): NID_PATTERN = re.compile(r'^[0-9a-z][0-9a-z-]{1,31}$', flags=re.IGNORECASE) NSS_PCHAR = '[a-z0-9-._~]|%[a-f0-9]{2}|[!$&\'()*+,;=]|:|@' NSS_PATTERN = re.compile(fr'^({NSS_PCHAR})({NSS_PCHAR}|/|\?)*$', re.IGNORECASE) prefix=value.split(':') if (len(prefix)<3): current_app.logger.warning( "Too small urn prefix" ) return False urn=prefix[0] nid=prefix[1] nss=value.replace(urn+":"+nid+":", "") if not urn.lower()=="urn": current_app.logger.warning( urn + ' contains invalid characters ' ) return False if not re.match(NID_PATTERN, nid.lower()): current_app.logger.warning( nid + ' contains invalid characters ' ) return False if not re.match(NSS_PATTERN, nss): current_app.logger.warning( nss + ' contains invalid characters ' ) return False return True