From 65bfc53acb25c74053695bc5d98b141b04cf99a1 Mon Sep 17 00:00:00 2001 From: Matt Scott Date: Sun, 19 Feb 2023 15:04:30 -0500 Subject: [PATCH] Split the server statistics and configuration feature into separate pages. --- powerdnsadmin/routes/admin.py | 1001 +++++++++-------- powerdnsadmin/templates/admin_pdns_stats.html | 127 --- .../templates/admin_server_configuration.html | 84 ++ .../templates/admin_server_statistics.html | 82 ++ powerdnsadmin/templates/base.html | 14 +- 5 files changed, 704 insertions(+), 604 deletions(-) delete mode 100644 powerdnsadmin/templates/admin_pdns_stats.html create mode 100644 powerdnsadmin/templates/admin_server_configuration.html create mode 100644 powerdnsadmin/templates/admin_server_statistics.html diff --git a/powerdnsadmin/routes/admin.py b/powerdnsadmin/routes/admin.py index 98f4ef9..5a90e10 100644 --- a/powerdnsadmin/routes/admin.py +++ b/powerdnsadmin/routes/admin.py @@ -4,7 +4,8 @@ import traceback import re from base64 import b64encode from ast import literal_eval -from flask import Blueprint, render_template, render_template_string, make_response, url_for, current_app, request, redirect, jsonify, abort, flash, session +from flask import Blueprint, render_template, render_template_string, make_response, url_for, current_app, request, \ + redirect, jsonify, abort, flash, session from flask_login import login_required, current_user from ..decorators import operator_role_required, admin_role_required, history_access_required @@ -44,6 +45,8 @@ change_type: "addition" or "deletion" or "status" for status change or "unchange Note: A change in "content", is considered a deletion and recreation of the same record, holding the new content value. """ + + def get_record_changes(del_rrset, add_rrset): changeSet = [] delSet = del_rrset['records'] if 'records' in del_rrset else [] @@ -54,15 +57,15 @@ def get_record_changes(del_rrset, add_rrset): if d['content'] == a['content']: exists = True if d['disabled'] != a['disabled']: - changeSet.append( ({"disabled":d['disabled'],"content":d['content']}, - {"disabled":a['disabled'],"content":a['content']}, - "status") ) + changeSet.append(({"disabled": d['disabled'], "content": d['content']}, + {"disabled": a['disabled'], "content": a['content']}, + "status")) break - if not exists: # deletion - changeSet.append( ({"disabled":d['disabled'],"content":d['content']}, - None, - "deletion") ) + if not exists: # deletion + changeSet.append(({"disabled": d['disabled'], "content": d['content']}, + None, + "deletion")) for a in addSet: # get the additions exists = False @@ -72,36 +75,36 @@ def get_record_changes(del_rrset, add_rrset): # already checked for status change break if not exists: - changeSet.append( (None, {"disabled":a['disabled'], "content":a['content']}, "addition") ) + changeSet.append((None, {"disabled": a['disabled'], "content": a['content']}, "addition")) continue for a in addSet: # get the unchanged exists = False for c in changeSet: - if c[1] != None and c[1]["content"] == a['content']: + if c[1] != None and c[1]["content"] == a['content']: exists = True break if not exists: - changeSet.append( ( {"disabled":a['disabled'], "content":a['content']}, {"disabled":a['disabled'], "content":a['content']}, "unchanged") ) + changeSet.append(({"disabled": a['disabled'], "content": a['content']}, + {"disabled": a['disabled'], "content": a['content']}, "unchanged")) return changeSet + # out_changes is a list of HistoryRecordEntry objects in which we will append the new changes # a HistoryRecordEntry represents a pair of add_rrset and del_rrset def extract_changelogs_from_a_history_entry(out_changes, history_entry, change_num, record_name=None, record_type=None): - if history_entry.detail is None: return if "add_rrsets" in history_entry.detail: detail_dict = json.loads(history_entry.detail) - else: # not a record entry + else: # not a record entry return add_rrsets = detail_dict['add_rrsets'] del_rrsets = detail_dict['del_rrsets'] - for add_rrset in add_rrsets: exists = False for del_rrset in del_rrsets: @@ -114,7 +117,8 @@ def extract_changelogs_from_a_history_entry(out_changes, history_entry, change_n if not exists: # this is a new record if change_num not in out_changes: out_changes[change_num] = [] - out_changes[change_num].append(HistoryRecordEntry(history_entry, [], add_rrset, "+")) # (add_rrset, del_rrset, change_type) + out_changes[change_num].append( + HistoryRecordEntry(history_entry, [], add_rrset, "+")) # (add_rrset, del_rrset, change_type) for del_rrset in del_rrsets: exists = False for add_rrset in add_rrsets: @@ -126,23 +130,23 @@ def extract_changelogs_from_a_history_entry(out_changes, history_entry, change_n out_changes[change_num] = [] out_changes[change_num].append(HistoryRecordEntry(history_entry, del_rrset, [], "-")) - # only used for changelog per record - if record_name != None and record_type != None: # then get only the records with the specific (record_name, record_type) tuple + if record_name != None and record_type != None: # then get only the records with the specific (record_name, record_type) tuple if change_num in out_changes: changes_i = out_changes[change_num] else: return - for hre in changes_i: # for each history record entry in changes_i - if 'type' in hre.add_rrset and hre.add_rrset['name'] == record_name and hre.add_rrset['type'] == record_type: + for hre in changes_i: # for each history record entry in changes_i + if 'type' in hre.add_rrset and hre.add_rrset['name'] == record_name and hre.add_rrset[ + 'type'] == record_type: continue - elif 'type' in hre.del_rrset and hre.del_rrset['name'] == record_name and hre.del_rrset['type'] == record_type: + elif 'type' in hre.del_rrset and hre.del_rrset['name'] == record_name and hre.del_rrset[ + 'type'] == record_type: continue else: out_changes[change_num].remove(hre) - # records with same (name,type) are considered as a single HistoryRecordEntry # history_entry is of type History - used to extract created_by and created_on # add_rrset is a dictionary of replace @@ -155,16 +159,15 @@ class HistoryRecordEntry: self.add_rrset = add_rrset self.del_rrset = del_rrset self.change_type = change_type # "*": edit or unchanged, "+" new tuple(name,type), "-" deleted (name,type) tuple - self.changed_fields = [] # contains a subset of : [ttl, name, type] - self.changeSet = [] # all changes for the records of this add_rrset-del_rrset pair + self.changed_fields = [] # contains a subset of : [ttl, name, type] + self.changeSet = [] # all changes for the records of this add_rrset-del_rrset pair - - if change_type == "+": # addition + if change_type == "+": # addition self.changed_fields.append("name") self.changed_fields.append("type") self.changed_fields.append("ttl") self.changeSet = get_record_changes(del_rrset, add_rrset) - elif change_type == "-": # removal + elif change_type == "-": # removal self.changed_fields.append("name") self.changed_fields.append("type") self.changed_fields.append("ttl") @@ -175,22 +178,21 @@ class HistoryRecordEntry: self.changed_fields.append("ttl") self.changeSet = get_record_changes(del_rrset, add_rrset) - - def toDict(self): return { - "add_rrset" : self.add_rrset, - "del_rrset" : self.del_rrset, - "changed_fields" : self.changed_fields, - "created_on" : self.history_entry.created_on, - "created_by" : self.history_entry.created_by, - "change_type" : self.change_type, - "changeSet" : self.changeSet + "add_rrset": self.add_rrset, + "del_rrset": self.del_rrset, + "changed_fields": self.changed_fields, + "created_on": self.history_entry.created_on, + "created_by": self.history_entry.created_by, + "change_type": self.change_type, + "changeSet": self.changeSet } - def __eq__(self, obj2): # used for removal of objects from a list + def __eq__(self, obj2): # used for removal of objects from a list return True if obj2.toDict() == self.toDict() else False + @admin_bp.before_request def before_request(): # Manage session timeout @@ -198,15 +200,14 @@ def before_request(): # current_app.permanent_session_lifetime = datetime.timedelta( # minutes=int(Setting().get('session_timeout'))) current_app.permanent_session_lifetime = datetime.timedelta( - minutes=int(Setting().get('session_timeout'))) + minutes=int(Setting().get('session_timeout'))) session.modified = True - -@admin_bp.route('/pdns', methods=['GET']) +@admin_bp.route('/server/statistics', methods=['GET']) @login_required @operator_role_required -def pdns_stats(): +def server_statistics(): if not Setting().get('pdns_api_url') or not Setting().get( 'pdns_api_key') or not Setting().get('pdns_version'): return redirect(url_for('admin.setting_pdns')) @@ -215,7 +216,6 @@ def pdns_stats(): users = User.query.all() server = Server(server_id='localhost') - configs = server.get_config() statistics = server.get_statistic() history_number = History.query.count() @@ -226,12 +226,33 @@ def pdns_stats(): else: uptime = 0 - return render_template('admin_pdns_stats.html', + return render_template('admin_server_statistics.html', + domains=domains, + users=users, + statistics=statistics, + uptime=uptime, + history_number=history_number) + + +@admin_bp.route('/server/configuration', methods=['GET']) +@login_required +@operator_role_required +def server_configuration(): + if not Setting().get('pdns_api_url') or not Setting().get( + 'pdns_api_key') or not Setting().get('pdns_version'): + return redirect(url_for('admin.setting_pdns')) + + domains = Domain.query.all() + users = User.query.all() + + server = Server(server_id='localhost') + configs = server.get_config() + history_number = History.query.count() + + return render_template('admin_server_configuration.html', domains=domains, users=users, configs=configs, - statistics=statistics, - uptime=uptime, history_number=history_number) @@ -296,6 +317,7 @@ def edit_user(user_username=None): create=create, error=result['msg']) + @admin_bp.route('/key/edit/', methods=['GET', 'POST']) @admin_bp.route('/key/edit', methods=['GET', 'POST']) @login_required @@ -350,26 +372,26 @@ def edit_key(key_id=None): plain_key = apikey_plain_schema.dump([apikey])[0]["plain_key"] plain_key = b64encode(plain_key.encode('utf-8')).decode('utf-8') - history_message = "Created API key {0}".format(apikey.id) + history_message = "Created API key {0}".format(apikey.id) # Update existing apikey else: try: if role != "User": domain_list, account_list = [], [] - apikey.update(role,description,domain_list, account_list) - history_message = "Updated API key {0}".format(apikey.id) + apikey.update(role, description, domain_list, account_list) + history_message = "Updated API key {0}".format(apikey.id) except Exception as e: current_app.logger.error('Error: {0}'.format(e)) history = History(msg=history_message, - detail = json.dumps({ - 'key': apikey.id, - 'role': apikey.role.name, - 'description': apikey.description, - 'domains': [domain.name for domain in apikey.domains], - 'accounts': [a.name for a in apikey.accounts] - }), + detail=json.dumps({ + 'key': apikey.id, + 'role': apikey.role.name, + 'description': apikey.description, + 'domains': [domain.name for domain in apikey.domains], + 'accounts': [a.name for a in apikey.accounts] + }), created_by=current_user.username) history.add() @@ -381,6 +403,7 @@ def edit_key(key_id=None): create=create, plain_key=plain_key) + @admin_bp.route('/manage-keys', methods=['GET', 'POST']) @login_required @operator_role_required @@ -393,7 +416,7 @@ def manage_keys(): abort(500) return render_template('admin_manage_keys.html', - keys=apikeys) + keys=apikeys) elif request.method == 'POST': jdata = request.json @@ -404,7 +427,7 @@ def manage_keys(): history_apikey_id = apikey.id history_apikey_role = apikey.role.name history_apikey_description = apikey.description - history_apikey_domains = [ domain.name for domain in apikey.domains] + history_apikey_domains = [domain.name for domain in apikey.domains] apikey.delete() except Exception as e: @@ -412,20 +435,21 @@ def manage_keys(): current_app.logger.info('Delete API key {0}'.format(apikey.id)) history = History(msg='Delete API key {0}'.format(apikey.id), - detail = json.dumps({ - 'key': history_apikey_id, - 'role': history_apikey_role, - 'description': history_apikey_description, - 'domains': history_apikey_domains - }), + detail=json.dumps({ + 'key': history_apikey_id, + 'role': history_apikey_role, + 'description': history_apikey_description, + 'domains': history_apikey_domains + }), created_by=current_user.username) history.add() return make_response( - jsonify({ - 'status': 'ok', - 'msg': 'Key has been removed.' - }), 200) + jsonify({ + 'status': 'ok', + 'msg': 'Key has been removed.' + }), 200) + @admin_bp.route('/manage-user', methods=['GET', 'POST']) @login_required @@ -459,17 +483,17 @@ def manage_user(): return make_response( jsonify({ 'status': - 'ok', + 'ok', 'msg': - 'Two factor authentication has been disabled for user.' + 'Two factor authentication has been disabled for user.' }), 200) else: return make_response( jsonify({ 'status': - 'error', + 'error', 'msg': - 'Cannot disable two factor authentication for user.' + 'Cannot disable two factor authentication for user.' }), 500) elif jdata['action'] == 'delete_user': @@ -549,18 +573,18 @@ def manage_user(): return make_response( jsonify({ 'status': - 'error', + 'error', 'msg': - 'You do not have permission to change Administrator users role.' + 'You do not have permission to change Administrator users role.' }), 400) if role_name == 'Administrator' and current_user.role.name != 'Administrator': return make_response( jsonify({ 'status': - 'error', + 'error', 'msg': - 'You do not have permission to promote a user to Administrator role.' + 'You do not have permission to promote a user to Administrator role.' }), 400) user = User(username=username) @@ -580,10 +604,10 @@ def manage_user(): return make_response( jsonify({ 'status': - 'error', + 'error', 'msg': - 'Cannot change user role. {0}'.format( - result['msg']) + 'Cannot change user role. {0}'.format( + result['msg']) }), 500) else: return make_response( @@ -598,9 +622,9 @@ def manage_user(): return make_response( jsonify({ 'status': - 'error', + 'error', 'msg': - 'There is something wrong, please contact Administrator.' + 'There is something wrong, please contact Administrator.' }), 400) @@ -693,7 +717,7 @@ def edit_account(account_name=None): Domain.name == domain_name).first() if account.id != domain.account_id: Domain(name=domain_name).assoc_account(account.id) - + for domain in old_domains: if domain.name not in new_domain_list: Domain(name=domain.name).assoc_account(None) @@ -774,9 +798,9 @@ def manage_account(): return make_response( jsonify({ 'status': - 'error', + 'error', 'msg': - 'There is something wrong, please contact Administrator.' + 'There is something wrong, please contact Administrator.' }), 400) @@ -799,10 +823,12 @@ class DetailedHistory(): Account:{{ account }} """, - domaintype=detail_dict['domain_type'], - account=Account.get_name_by_id(self=None, account_id=detail_dict['account_id']) if detail_dict['account_id'] != "0" else "None") + domaintype=detail_dict['domain_type'], + account=Account.get_name_by_id(self=None, account_id=detail_dict[ + 'account_id']) if detail_dict[ + 'account_id'] != "0" else "None") - elif 'authenticator' in detail_dict: # this is a user authentication + elif 'authenticator' in detail_dict: # this is a user authentication self.detailed_msg = render_template_string(""" @@ -825,28 +851,31 @@ class DetailedHistory():
""", - background_rgba="68,157,68" if detail_dict['success'] == 1 else "201,48,44", - username=detail_dict['username'], - auth_result="success" if detail_dict['success'] == 1 else "failure", - authenticator=detail_dict['authenticator'], - ip_address=detail_dict['ip_address']) + background_rgba="68,157,68" if detail_dict[ + 'success'] == 1 else "201,48,44", + username=detail_dict['username'], + auth_result="success" if detail_dict[ + 'success'] == 1 else "failure", + authenticator=detail_dict['authenticator'], + ip_address=detail_dict['ip_address']) - elif 'add_rrsets' in detail_dict: # this is a domain record change + elif 'add_rrsets' in detail_dict: # this is a domain record change # changes_set = [] self.detailed_msg = "" # extract_changelogs_from_a_history_entry(changes_set, history, 0) - elif 'name' in detail_dict and 'template' in history.msg: # template creation / deletion + elif 'name' in detail_dict and 'template' in history.msg: # template creation / deletion self.detailed_msg = render_template_string("""
Template name:{{ template_name }}
Description:{{ description }}
""", - template_name=DetailedHistory.get_key_val(detail_dict, "name"), - description=DetailedHistory.get_key_val(detail_dict, "description")) + template_name=DetailedHistory.get_key_val(detail_dict, "name"), + description=DetailedHistory.get_key_val(detail_dict, + "description")) - elif 'Change domain' in history.msg and 'access control' in history.msg: # added or removed a user from a domain + elif 'Change domain' in history.msg and 'access control' in history.msg: # added or removed a user from a domain users_with_access = DetailedHistory.get_key_val(detail_dict, "user_has_access") self.detailed_msg = render_template_string(""" @@ -854,7 +883,7 @@ class DetailedHistory():
Number of users:{{ users_with_access | length }}
""", - users_with_access=users_with_access) + users_with_access=users_with_access) elif 'Created API key' in history.msg or 'Updated API key' in history.msg: self.detailed_msg = render_template_string(""" @@ -866,11 +895,14 @@ class DetailedHistory(): Accessible accounts with this API key:{{ linked_accounts }} """, - keyname=DetailedHistory.get_key_val(detail_dict, "key"), - rolename=DetailedHistory.get_key_val(detail_dict, "role"), - description=DetailedHistory.get_key_val(detail_dict, "description"), - linked_domains=DetailedHistory.get_key_val(detail_dict, "domains" if "domains" in detail_dict else "domain_acl"), - linked_accounts=DetailedHistory.get_key_val(detail_dict, "accounts")) + keyname=DetailedHistory.get_key_val(detail_dict, "key"), + rolename=DetailedHistory.get_key_val(detail_dict, "role"), + description=DetailedHistory.get_key_val(detail_dict, + "description"), + linked_domains=DetailedHistory.get_key_val(detail_dict, + "domains" if "domains" in detail_dict else "domain_acl"), + linked_accounts=DetailedHistory.get_key_val(detail_dict, + "accounts")) elif 'Delete API key' in history.msg: self.detailed_msg = render_template_string(""" @@ -881,10 +913,12 @@ class DetailedHistory(): Accessible domains with this API key:{{ linked_domains }} """, - keyname=DetailedHistory.get_key_val(detail_dict, "key"), - rolename=DetailedHistory.get_key_val(detail_dict, "role"), - description=DetailedHistory.get_key_val(detail_dict, "description"), - linked_domains=DetailedHistory.get_key_val(detail_dict, "domains")) + keyname=DetailedHistory.get_key_val(detail_dict, "key"), + rolename=DetailedHistory.get_key_val(detail_dict, "role"), + description=DetailedHistory.get_key_val(detail_dict, + "description"), + linked_domains=DetailedHistory.get_key_val(detail_dict, + "domains")) elif 'Update type for domain' in history.msg: self.detailed_msg = render_template_string(""" @@ -894,9 +928,9 @@ class DetailedHistory(): Masters:{{ masters }} """, - domain=DetailedHistory.get_key_val(detail_dict, "domain"), - domain_type=DetailedHistory.get_key_val(detail_dict, "type"), - masters=DetailedHistory.get_key_val(detail_dict, "masters")) + domain=DetailedHistory.get_key_val(detail_dict, "domain"), + domain_type=DetailedHistory.get_key_val(detail_dict, "type"), + masters=DetailedHistory.get_key_val(detail_dict, "masters")) elif 'reverse' in history.msg: self.detailed_msg = render_template_string(""" @@ -905,8 +939,10 @@ class DetailedHistory(): Domain Master IPs:{{ domain_master_ips }} """, - domain_type=DetailedHistory.get_key_val(detail_dict, "domain_type"), - domain_master_ips=DetailedHistory.get_key_val(detail_dict, "domain_master_ips")) + domain_type=DetailedHistory.get_key_val(detail_dict, + "domain_type"), + domain_master_ips=DetailedHistory.get_key_val(detail_dict, + "domain_master_ips")) elif DetailedHistory.get_key_val(detail_dict, 'msg') and DetailedHistory.get_key_val(detail_dict, 'status'): self.detailed_msg = render_template_string(''' @@ -915,18 +951,21 @@ class DetailedHistory(): Message:{{ history_msg }} ''', - history_status=DetailedHistory.get_key_val(detail_dict, 'status'), - history_msg=DetailedHistory.get_key_val(detail_dict, 'msg')) - - elif 'Update domain' in history.msg and 'associate account' in history.msg: # When an account gets associated or dissociate with domains + history_status=DetailedHistory.get_key_val(detail_dict, + 'status'), + history_msg=DetailedHistory.get_key_val(detail_dict, 'msg')) + + elif 'Update domain' in history.msg and 'associate account' in history.msg: # When an account gets associated or dissociate with domains self.detailed_msg = render_template_string('''
Associate: {{ history_assoc_account }}
Dissociate:{{ history_dissoc_account }}
''', - history_assoc_account=DetailedHistory.get_key_val(detail_dict, 'assoc_account'), - history_dissoc_account=DetailedHistory.get_key_val(detail_dict, 'dissoc_account')) + history_assoc_account=DetailedHistory.get_key_val(detail_dict, + 'assoc_account'), + history_dissoc_account=DetailedHistory.get_key_val(detail_dict, + 'dissoc_account')) # check for lower key as well for old databases @staticmethod @@ -936,367 +975,383 @@ class DetailedHistory(): # convert a list of History objects into DetailedHistory objects def convert_histories(histories): - changes_set = dict() - detailedHistories = [] - j = 0 - for i in range(len(histories)): - if histories[i].detail and ('add_rrsets' in histories[i].detail or 'del_rrsets' in histories[i].detail): - extract_changelogs_from_a_history_entry(changes_set, histories[i], j) - if j in changes_set: - detailedHistories.append(DetailedHistory(histories[i], changes_set[j])) - else: # no changes were found - detailedHistories.append(DetailedHistory(histories[i], None)) - j += 1 + changes_set = dict() + detailedHistories = [] + j = 0 + for i in range(len(histories)): + if histories[i].detail and ('add_rrsets' in histories[i].detail or 'del_rrsets' in histories[i].detail): + extract_changelogs_from_a_history_entry(changes_set, histories[i], j) + if j in changes_set: + detailedHistories.append(DetailedHistory(histories[i], changes_set[j])) + else: # no changes were found + detailedHistories.append(DetailedHistory(histories[i], None)) + j += 1 + + else: + detailedHistories.append(DetailedHistory(histories[i], None)) + return detailedHistories - else: - detailedHistories.append(DetailedHistory(histories[i], None)) - return detailedHistories @admin_bp.route('/history', methods=['GET', 'POST']) @login_required @history_access_required def history(): - if request.method == 'POST': - if current_user.role.name != 'Administrator': - return make_response( - jsonify({ - 'status': 'error', - 'msg': 'You do not have permission to remove history.' - }), 401) + if request.method == 'POST': + if current_user.role.name != 'Administrator': + return make_response( + jsonify({ + 'status': 'error', + 'msg': 'You do not have permission to remove history.' + }), 401) - if Setting().get('preserve_history'): - return make_response( - jsonify({ - 'status': 'error', - 'msg': 'History removal is not allowed (toggle preserve_history in settings).' - }), 401) + if Setting().get('preserve_history'): + return make_response( + jsonify({ + 'status': 'error', + 'msg': 'History removal is not allowed (toggle preserve_history in settings).' + }), 401) - h = History() - result = h.remove_all() - if result: - history = History(msg='Remove all histories', - created_by=current_user.username) - history.add() - return make_response( - jsonify({ - 'status': 'ok', - 'msg': 'Changed user role successfully.' - }), 200) - else: - return make_response( - jsonify({ - 'status': 'error', - 'msg': 'Can not remove histories.' - }), 500) + h = History() + result = h.remove_all() + if result: + history = History(msg='Remove all histories', + created_by=current_user.username) + history.add() + return make_response( + jsonify({ + 'status': 'ok', + 'msg': 'Changed user role successfully.' + }), 200) + else: + return make_response( + jsonify({ + 'status': 'error', + 'msg': 'Can not remove histories.' + }), 500) + if request.method == 'GET': + doms = accounts = users = "" + if current_user.role.name in ['Administrator', 'Operator']: + all_domain_names = Domain.query.all() + all_account_names = Account.query.all() + all_user_names = User.query.all() - if request.method == 'GET': - doms = accounts = users = "" - if current_user.role.name in [ 'Administrator', 'Operator']: - all_domain_names = Domain.query.all() - all_account_names = Account.query.all() - all_user_names = User.query.all() + for d in all_domain_names: + doms += d.name + " " + for acc in all_account_names: + accounts += acc.name + " " + for usr in all_user_names: + users += usr.username + " " + else: # special autocomplete for users + all_domain_names = db.session.query(Domain) \ + .outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \ + .outerjoin(Account, Domain.account_id == Account.id) \ + .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ + .filter( + db.or_( + DomainUser.user_id == current_user.id, + AccountUser.user_id == current_user.id + )).all() + all_account_names = db.session.query(Account) \ + .outerjoin(Domain, Domain.account_id == Account.id) \ + .outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \ + .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ + .filter( + db.or_( + DomainUser.user_id == current_user.id, + AccountUser.user_id == current_user.id + )).all() + all_user_names = [] + for a in all_account_names: + temp = db.session.query(User) \ + .join(AccountUser, AccountUser.user_id == User.id) \ + .outerjoin(Account, Account.id == AccountUser.account_id) \ + .filter( + db.or_( + Account.id == a.id, + AccountUser.account_id == a.id + ) + ) \ + .all() + for u in temp: + if u in all_user_names: + continue + all_user_names.append(u) - for d in all_domain_names: - doms += d.name + " " - for acc in all_account_names: - accounts += acc.name + " " - for usr in all_user_names: - users += usr.username + " " - else: # special autocomplete for users - all_domain_names = db.session.query(Domain) \ - .outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \ - .outerjoin(Account, Domain.account_id == Account.id) \ - .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ - .filter( - db.or_( - DomainUser.user_id == current_user.id, - AccountUser.user_id == current_user.id - )).all() + for d in all_domain_names: + doms += d.name + " " - all_account_names = db.session.query(Account) \ - .outerjoin(Domain, Domain.account_id == Account.id) \ - .outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \ - .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ - .filter( - db.or_( - DomainUser.user_id == current_user.id, - AccountUser.user_id == current_user.id - )).all() + for a in all_account_names: + accounts += a.name + " " + for u in all_user_names: + users += u.username + " " + return render_template('admin_history.html', all_domain_names=doms, all_account_names=accounts, + all_usernames=users) - all_user_names = [] - for a in all_account_names: - temp = db.session.query(User) \ - .join(AccountUser, AccountUser.user_id == User.id) \ - .outerjoin(Account, Account.id == AccountUser.account_id) \ - .filter( - db.or_( - Account.id == a.id, - AccountUser.account_id == a.id - ) - ) \ - .all() - for u in temp: - if u in all_user_names: - continue - all_user_names.append(u) - - for d in all_domain_names: - doms += d.name + " " - - for a in all_account_names: - accounts += a.name + " " - for u in all_user_names: - users += u.username + " " - return render_template('admin_history.html', all_domain_names=doms, all_account_names=accounts, all_usernames=users) - # local_offset is the offset of the utc to the local time # offset must be int # return the date converted and simplified def from_utc_to_local(local_offset, timeframe): - offset = str(local_offset *(-1)) - date_split = str(timeframe).split(".")[0] - date_converted = datetime.datetime.strptime(date_split, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(minutes=int(offset)) - return date_converted + offset = str(local_offset * (-1)) + date_split = str(timeframe).split(".")[0] + date_converted = datetime.datetime.strptime(date_split, '%Y-%m-%d %H:%M:%S') + datetime.timedelta( + minutes=int(offset)) + return date_converted + @admin_bp.route('/history_table', methods=['GET', 'POST']) @login_required @history_access_required -def history_table(): # ajax call data +def history_table(): # ajax call data - if request.method == 'POST': - if current_user.role.name != 'Administrator': - return make_response( - jsonify({ - 'status': 'error', - 'msg': 'You do not have permission to remove history.' - }), 401) + if request.method == 'POST': + if current_user.role.name != 'Administrator': + return make_response( + jsonify({ + 'status': 'error', + 'msg': 'You do not have permission to remove history.' + }), 401) - h = History() - result = h.remove_all() - if result: - history = History(msg='Remove all histories', - created_by=current_user.username) - history.add() - return make_response( - jsonify({ - 'status': 'ok', - 'msg': 'Changed user role successfully.' - }), 200) - else: - return make_response( - jsonify({ - 'status': 'error', - 'msg': 'Can not remove histories.' - }), 500) + h = History() + result = h.remove_all() + if result: + history = History(msg='Remove all histories', + created_by=current_user.username) + history.add() + return make_response( + jsonify({ + 'status': 'ok', + 'msg': 'Changed user role successfully.' + }), 200) + else: + return make_response( + jsonify({ + 'status': 'error', + 'msg': 'Can not remove histories.' + }), 500) - detailedHistories = [] - lim = int(Setting().get('max_history_records')) # max num of records + detailedHistories = [] + lim = int(Setting().get('max_history_records')) # max num of records - if request.method == 'GET': - if current_user.role.name in [ 'Administrator', 'Operator' ]: - base_query = History.query - else: - # if the user isn't an administrator or operator, - # allow_user_view_history must be enabled to get here, - # so include history for the domains for the user - base_query = db.session.query(History) \ - .join(Domain, History.domain_id == Domain.id) \ - .outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \ - .outerjoin(Account, Domain.account_id == Account.id) \ - .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ - .filter( - db.or_( - DomainUser.user_id == current_user.id, - AccountUser.user_id == current_user.id - )) + if request.method == 'GET': + if current_user.role.name in ['Administrator', 'Operator']: + base_query = History.query + else: + # if the user isn't an administrator or operator, + # allow_user_view_history must be enabled to get here, + # so include history for the domains for the user + base_query = db.session.query(History) \ + .join(Domain, History.domain_id == Domain.id) \ + .outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \ + .outerjoin(Account, Domain.account_id == Account.id) \ + .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ + .filter( + db.or_( + DomainUser.user_id == current_user.id, + AccountUser.user_id == current_user.id + )) - domain_name = request.args.get('domain_name_filter') if request.args.get('domain_name_filter') != None \ - and len(request.args.get('domain_name_filter')) != 0 else None - account_name = request.args.get('account_name_filter') if request.args.get('account_name_filter') != None \ - and len(request.args.get('account_name_filter')) != 0 else None - user_name = request.args.get('auth_name_filter') if request.args.get('auth_name_filter') != None \ - and len(request.args.get('auth_name_filter')) != 0 else None + domain_name = request.args.get('domain_name_filter') if request.args.get('domain_name_filter') != None \ + and len( + request.args.get('domain_name_filter')) != 0 else None + account_name = request.args.get('account_name_filter') if request.args.get('account_name_filter') != None \ + and len( + request.args.get('account_name_filter')) != 0 else None + user_name = request.args.get('auth_name_filter') if request.args.get('auth_name_filter') != None \ + and len(request.args.get('auth_name_filter')) != 0 else None - min_date = request.args.get('min') if request.args.get('min') != None and len( request.args.get('min')) != 0 else None - if min_date != None: # get 1 day earlier, to check for timezone errors - min_date = str(datetime.datetime.strptime(min_date, '%Y-%m-%d') - datetime.timedelta(days=1)) - max_date = request.args.get('max') if request.args.get('max') != None and len( request.args.get('max')) != 0 else None - if max_date != None: # get 1 day later, to check for timezone errors - max_date = str(datetime.datetime.strptime(max_date, '%Y-%m-%d') + datetime.timedelta(days=1)) - tzoffset = request.args.get('tzoffset') if request.args.get('tzoffset') != None and len(request.args.get('tzoffset')) != 0 else None - changed_by = request.args.get('user_name_filter') if request.args.get('user_name_filter') != None \ - and len(request.args.get('user_name_filter')) != 0 else None - """ - Auth methods: LOCAL, Github OAuth, Azure OAuth, SAML, OIDC OAuth, Google OAuth - """ - auth_methods = [] - if (request.args.get('auth_local_only_checkbox') is None \ - and request.args.get('auth_oauth_only_checkbox') is None \ - and request.args.get('auth_saml_only_checkbox') is None and request.args.get('auth_all_checkbox') is None): - auth_methods = [] - if request.args.get('auth_all_checkbox') == "on": - auth_methods.append("") - if request.args.get('auth_local_only_checkbox') == "on": - auth_methods.append("LOCAL") - if request.args.get('auth_oauth_only_checkbox') == "on": - auth_methods.append("OAuth") - if request.args.get('auth_saml_only_checkbox') == "on": - auth_methods.append("SAML") + min_date = request.args.get('min') if request.args.get('min') != None and len( + request.args.get('min')) != 0 else None + if min_date != None: # get 1 day earlier, to check for timezone errors + min_date = str(datetime.datetime.strptime(min_date, '%Y-%m-%d') - datetime.timedelta(days=1)) + max_date = request.args.get('max') if request.args.get('max') != None and len( + request.args.get('max')) != 0 else None + if max_date != None: # get 1 day later, to check for timezone errors + max_date = str(datetime.datetime.strptime(max_date, '%Y-%m-%d') + datetime.timedelta(days=1)) + tzoffset = request.args.get('tzoffset') if request.args.get('tzoffset') != None and len( + request.args.get('tzoffset')) != 0 else None + changed_by = request.args.get('user_name_filter') if request.args.get('user_name_filter') != None \ + and len( + request.args.get('user_name_filter')) != 0 else None + """ + Auth methods: LOCAL, Github OAuth, Azure OAuth, SAML, OIDC OAuth, Google OAuth + """ + auth_methods = [] + if (request.args.get('auth_local_only_checkbox') is None \ + and request.args.get('auth_oauth_only_checkbox') is None \ + and request.args.get('auth_saml_only_checkbox') is None and request.args.get( + 'auth_all_checkbox') is None): + auth_methods = [] + if request.args.get('auth_all_checkbox') == "on": + auth_methods.append("") + if request.args.get('auth_local_only_checkbox') == "on": + auth_methods.append("LOCAL") + if request.args.get('auth_oauth_only_checkbox') == "on": + auth_methods.append("OAuth") + if request.args.get('auth_saml_only_checkbox') == "on": + auth_methods.append("SAML") - if request.args.get('domain_changelog_only_checkbox') != None: - changelog_only = True if request.args.get('domain_changelog_only_checkbox') == "on" else False - else: - changelog_only = False + if request.args.get('domain_changelog_only_checkbox') != None: + changelog_only = True if request.args.get('domain_changelog_only_checkbox') == "on" else False + else: + changelog_only = False + # users cannot search for authentication + if user_name != None and current_user.role.name not in ['Administrator', 'Operator']: + histories = [] + elif domain_name != None: + if not changelog_only: + histories = base_query \ + .filter( + db.and_( + db.or_( + History.msg.like("%domain " + domain_name) if domain_name != "*" else History.msg.like( + "%domain%"), + History.msg.like( + "%domain " + domain_name + " access control") if domain_name != "*" else History.msg.like( + "%domain%access control") + ), + History.created_on <= max_date if max_date != None else True, + History.created_on >= min_date if min_date != None else True, + History.created_by == changed_by if changed_by != None else True + ) + ).order_by(History.created_on.desc()).limit(lim).all() + else: + # search for records changes only + histories = base_query \ + .filter( + db.and_( + History.msg.like("Apply record changes to domain " + domain_name) if domain_name != "*" \ + else History.msg.like("Apply record changes to domain%"), + History.created_on <= max_date if max_date != None else True, + History.created_on >= min_date if min_date != None else True, + History.created_by == changed_by if changed_by != None else True + ) + ).order_by(History.created_on.desc()) \ + .limit(lim).all() + elif account_name != None: + if current_user.role.name in ['Administrator', 'Operator']: + histories = base_query \ + .join(Domain, History.domain_id == Domain.id) \ + .outerjoin(Account, Domain.account_id == Account.id) \ + .filter( + db.and_( + Account.id == Domain.account_id, + account_name == Account.name if account_name != "*" else True, + History.created_on <= max_date if max_date != None else True, + History.created_on >= min_date if min_date != None else True, + History.created_by == changed_by if changed_by != None else True + ) + ).order_by(History.created_on.desc()) \ + .limit(lim).all() + else: + histories = base_query \ + .filter( + db.and_( + Account.id == Domain.account_id, + account_name == Account.name if account_name != "*" else True, + History.created_on <= max_date if max_date != None else True, + History.created_on >= min_date if min_date != None else True, + History.created_by == changed_by if changed_by != None else True + ) + ).order_by(History.created_on.desc()) \ + .limit(lim).all() + elif user_name != None and current_user.role.name in ['Administrator', + 'Operator']: # only admins can see the user login-logouts - # users cannot search for authentication - if user_name != None and current_user.role.name not in [ 'Administrator', 'Operator']: - histories = [] - elif domain_name != None: + histories = History.query \ + .filter( + db.and_( + db.or_( + History.msg.like( + "User " + user_name + " authentication%") if user_name != "*" and user_name != None else History.msg.like( + "%authentication%"), + History.msg.like( + "User " + user_name + " was not authorized%") if user_name != "*" and user_name != None else History.msg.like( + "User%was not authorized%") + ), + History.created_on <= max_date if max_date != None else True, + History.created_on >= min_date if min_date != None else True, + History.created_by == changed_by if changed_by != None else True + ) + ) \ + .order_by(History.created_on.desc()).limit(lim).all() + temp = [] + for h in histories: + for method in auth_methods: + if method in h.detail: + temp.append(h) + break + histories = temp + elif (changed_by != None or max_date != None) and current_user.role.name in ['Administrator', + 'Operator']: # select changed by and date filters only + histories = History.query \ + .filter( + db.and_( + History.created_on <= max_date if max_date != None else True, + History.created_on >= min_date if min_date != None else True, + History.created_by == changed_by if changed_by != None else True + ) + ) \ + .order_by(History.created_on.desc()).limit(lim).all() + elif ( + changed_by != None or max_date != None): # special filtering for user because one user does not have access to log-ins logs + histories = base_query \ + .filter( + db.and_( + History.created_on <= max_date if max_date != None else True, + History.created_on >= min_date if min_date != None else True, + History.created_by == changed_by if changed_by != None else True + ) + ) \ + .order_by(History.created_on.desc()).limit(lim).all() + elif max_date != None: # if changed by == null and only date is applied + histories = base_query.filter( + db.and_( + History.created_on <= max_date if max_date != None else True, + History.created_on >= min_date if min_date != None else True, + ) + ).order_by(History.created_on.desc()).limit(lim).all() + else: # default view + if current_user.role.name in ['Administrator', 'Operator']: + histories = History.query.order_by(History.created_on.desc()).limit(lim).all() + else: + histories = db.session.query(History) \ + .join(Domain, History.domain_id == Domain.id) \ + .outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \ + .outerjoin(Account, Domain.account_id == Account.id) \ + .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ + .order_by(History.created_on.desc()) \ + .filter( + db.or_( + DomainUser.user_id == current_user.id, + AccountUser.user_id == current_user.id + )).limit(lim).all() - if not changelog_only: - histories = base_query \ - .filter( - db.and_( - db.or_( - History.msg.like("%domain "+ domain_name) if domain_name != "*" else History.msg.like("%domain%"), - History.msg.like("%domain "+ domain_name + " access control") if domain_name != "*" else History.msg.like("%domain%access control") - ), - History.created_on <= max_date if max_date != None else True, - History.created_on >= min_date if min_date != None else True, - History.created_by == changed_by if changed_by != None else True - ) - ).order_by(History.created_on.desc()).limit(lim).all() - else: - # search for records changes only - histories = base_query \ - .filter( - db.and_( - History.msg.like("Apply record changes to domain " + domain_name) if domain_name != "*" \ - else History.msg.like("Apply record changes to domain%"), - History.created_on <= max_date if max_date != None else True, - History.created_on >= min_date if min_date != None else True, - History.created_by == changed_by if changed_by != None else True + detailedHistories = convert_histories(histories) - ) - ).order_by(History.created_on.desc()) \ - .limit(lim).all() - elif account_name != None: - if current_user.role.name in ['Administrator', 'Operator']: - histories = base_query \ - .join(Domain, History.domain_id == Domain.id) \ - .outerjoin(Account, Domain.account_id == Account.id) \ - .filter( - db.and_( - Account.id == Domain.account_id, - account_name == Account.name if account_name != "*" else True, - History.created_on <= max_date if max_date != None else True, - History.created_on >= min_date if min_date != None else True, - History.created_by == changed_by if changed_by != None else True - ) - ).order_by(History.created_on.desc()) \ - .limit(lim).all() - else: - histories = base_query \ - .filter( - db.and_( - Account.id == Domain.account_id, - account_name == Account.name if account_name != "*" else True, - History.created_on <= max_date if max_date != None else True, - History.created_on >= min_date if min_date != None else True, - History.created_by == changed_by if changed_by != None else True - ) - ).order_by(History.created_on.desc()) \ - .limit(lim).all() - elif user_name != None and current_user.role.name in [ 'Administrator', 'Operator']: # only admins can see the user login-logouts + # Remove dates from previous or next day that were brought over + if tzoffset != None: + if min_date != None: + min_date_split = min_date.split()[0] + if max_date != None: + max_date_split = max_date.split()[0] + for i, history_rec in enumerate(detailedHistories): + local_date = str(from_utc_to_local(int(tzoffset), history_rec.history.created_on).date()) + if (min_date != None and local_date == min_date_split) or ( + max_date != None and local_date == max_date_split): + detailedHistories[i] = None - histories = History.query \ - .filter( - db.and_( - db.or_( - History.msg.like("User "+ user_name + " authentication%") if user_name != "*" and user_name != None else History.msg.like("%authentication%"), - History.msg.like("User "+ user_name + " was not authorized%") if user_name != "*" and user_name != None else History.msg.like("User%was not authorized%") - ), - History.created_on <= max_date if max_date != None else True, - History.created_on >= min_date if min_date != None else True, - History.created_by == changed_by if changed_by != None else True - ) - ) \ - .order_by(History.created_on.desc()).limit(lim).all() - temp = [] - for h in histories: - for method in auth_methods: - if method in h.detail: - temp.append(h) - break - histories = temp - elif (changed_by != None or max_date != None) and current_user.role.name in [ 'Administrator', 'Operator'] : # select changed by and date filters only - histories = History.query \ - .filter( - db.and_( - History.created_on <= max_date if max_date != None else True, - History.created_on >= min_date if min_date != None else True, - History.created_by == changed_by if changed_by != None else True - ) - ) \ - .order_by(History.created_on.desc()).limit(lim).all() - elif (changed_by != None or max_date != None): # special filtering for user because one user does not have access to log-ins logs - histories = base_query \ - .filter( - db.and_( - History.created_on <= max_date if max_date != None else True, - History.created_on >= min_date if min_date != None else True, - History.created_by == changed_by if changed_by != None else True - ) - ) \ - .order_by(History.created_on.desc()).limit(lim).all() - elif max_date != None: # if changed by == null and only date is applied - histories = base_query.filter( - db.and_( - History.created_on <= max_date if max_date != None else True, - History.created_on >= min_date if min_date != None else True, - ) - ).order_by(History.created_on.desc()).limit(lim).all() - else: # default view - if current_user.role.name in [ 'Administrator', 'Operator']: - histories = History.query.order_by(History.created_on.desc()).limit(lim).all() - else: - histories = db.session.query(History) \ - .join(Domain, History.domain_id == Domain.id) \ - .outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \ - .outerjoin(Account, Domain.account_id == Account.id) \ - .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ - .order_by(History.created_on.desc()) \ - .filter( - db.or_( - DomainUser.user_id == current_user.id, - AccountUser.user_id == current_user.id - )).limit(lim).all() - - detailedHistories = convert_histories(histories) - - # Remove dates from previous or next day that were brought over - if tzoffset != None: - if min_date != None: - min_date_split = min_date.split()[0] - if max_date != None: - max_date_split = max_date.split()[0] - for i, history_rec in enumerate(detailedHistories): - local_date = str(from_utc_to_local(int(tzoffset), history_rec.history.created_on).date()) - if (min_date != None and local_date == min_date_split) or (max_date != None and local_date == max_date_split): - detailedHistories[i] = None - - # Remove elements previously flagged as None - detailedHistories = [h for h in detailedHistories if h is not None] - - return render_template('admin_history_table.html', histories=detailedHistories, len_histories=len(detailedHistories), lim=lim) + # Remove elements previously flagged as None + detailedHistories = [h for h in detailedHistories if h is not None] + return render_template('admin_history_table.html', histories=detailedHistories, + len_histories=len(detailedHistories), lim=lim) @admin_bp.route('/setting/basic', methods=['GET']) @@ -1477,9 +1532,9 @@ def setting_authentication(): if not has_an_auth_method(local_db_enabled=local_db_enabled): result = { 'status': - False, + False, 'msg': - 'Must have at least one authentication method enabled.' + 'Must have at least one authentication method enabled.' } else: Setting().set('local_db_enabled', local_db_enabled) @@ -1528,20 +1583,19 @@ def setting_authentication(): Setting().set('autoprovisioning_attribute', request.form.get('autoprovisioning_attribute')) - if request.form.get('autoprovisioning')=='ON': - if validateURN(request.form.get('urn_value')): + if request.form.get('autoprovisioning') == 'ON': + if validateURN(request.form.get('urn_value')): Setting().set('urn_value', - request.form.get('urn_value')) + request.form.get('urn_value')) else: return render_template('admin_setting_authentication.html', - error="Invalid urn") + error="Invalid urn") else: Setting().set('urn_value', - request.form.get('urn_value')) + request.form.get('urn_value')) Setting().set('purge', True - if request.form.get('purge') == 'ON' else False) - + if request.form.get('purge') == 'ON' else False) result = {'status': True, 'msg': 'Saved successfully'} elif conf_type == 'google': @@ -1571,7 +1625,7 @@ def setting_authentication(): result = { 'status': True, 'msg': - 'Saved successfully. Please reload PDA to take effect.' + 'Saved successfully. Please reload PDA to take effect.' } elif conf_type == 'github': github_oauth_enabled = True if request.form.get( @@ -1600,7 +1654,7 @@ def setting_authentication(): result = { 'status': True, 'msg': - 'Saved successfully. Please reload PDA to take effect.' + 'Saved successfully. Please reload PDA to take effect.' } elif conf_type == 'azure': azure_oauth_enabled = True if request.form.get( @@ -1649,7 +1703,7 @@ def setting_authentication(): result = { 'status': True, 'msg': - 'Saved successfully. Please reload PDA to take effect.' + 'Saved successfully. Please reload PDA to take effect.' } elif conf_type == 'oidc': oidc_oauth_enabled = True if request.form.get( @@ -1694,7 +1748,7 @@ def setting_authentication(): result = { 'status': True, 'msg': - 'Saved successfully. Please reload PDA to take effect.' + 'Saved successfully. Please reload PDA to take effect.' } else: return abort(400) @@ -1738,10 +1792,10 @@ def create_template(): result = t.create() if result['status'] == 'ok': history = History(msg='Add domain template {0}'.format(name), - detail = json.dumps({ - 'name': name, - 'description': description - }), + detail=json.dumps({ + 'name': name, + 'description': description + }), created_by=current_user.username) history.add() return redirect(url_for('admin.templates')) @@ -1776,19 +1830,19 @@ def create_template_from_zone(): return make_response( jsonify({ 'status': - 'error', + 'error', 'msg': - 'A template with the name {0} already exists!'.format(name) + 'A template with the name {0} already exists!'.format(name) }), 409) t = DomainTemplate(name=name, description=description) result = t.create() if result['status'] == 'ok': history = History(msg='Add domain template {0}'.format(name), - detail = json.dumps({ - 'name': name, - 'description': description - }), + detail=json.dumps({ + 'name': name, + 'description': description + }), created_by=current_user.username) history.add() @@ -1918,7 +1972,7 @@ def apply_records(template): history = History( msg='Apply domain template record changes to domain template {0}' .format(template), - detail = json.dumps(jdata), + detail=json.dumps(jdata), created_by=current_user.username) history.add() return make_response(jsonify(result), 200) @@ -1948,7 +2002,7 @@ def delete_template(template): if result['status'] == 'ok': history = History( msg='Deleted domain template {0}'.format(template), - detail = json.dumps({'name': template}), + detail=json.dumps({'name': template}), created_by=current_user.username) history.add() return redirect(url_for('admin.templates')) @@ -2003,28 +2057,29 @@ def global_search(): return render_template('admin_global_search.html', domains=domains, records=records, comments=comments) + def validateURN(value): NID_PATTERN = re.compile(r'^[0-9a-z][0-9a-z-]{1,31}$', flags=re.IGNORECASE) NSS_PCHAR = '[a-z0-9-._~]|%[a-f0-9]{2}|[!$&\'()*+,;=]|:|@' NSS_PATTERN = re.compile(fr'^({NSS_PCHAR})({NSS_PCHAR}|/|\?)*$', re.IGNORECASE) - prefix=value.split(':') - if (len(prefix)<3): - current_app.logger.warning( "Too small urn prefix" ) + prefix = value.split(':') + if (len(prefix) < 3): + current_app.logger.warning("Too small urn prefix") return False - urn=prefix[0] - nid=prefix[1] - nss=value.replace(urn+":"+nid+":", "") + urn = prefix[0] + nid = prefix[1] + nss = value.replace(urn + ":" + nid + ":", "") - if not urn.lower()=="urn": - current_app.logger.warning( urn + ' contains invalid characters ' ) + if not urn.lower() == "urn": + current_app.logger.warning(urn + ' contains invalid characters ') return False if not re.match(NID_PATTERN, nid.lower()): - current_app.logger.warning( nid + ' contains invalid characters ' ) + current_app.logger.warning(nid + ' contains invalid characters ') return False if not re.match(NSS_PATTERN, nss): - current_app.logger.warning( nss + ' contains invalid characters ' ) + current_app.logger.warning(nss + ' contains invalid characters ') return False return True diff --git a/powerdnsadmin/templates/admin_pdns_stats.html b/powerdnsadmin/templates/admin_pdns_stats.html deleted file mode 100644 index b0e52f4..0000000 --- a/powerdnsadmin/templates/admin_pdns_stats.html +++ /dev/null @@ -1,127 +0,0 @@ -{% extends "base.html" %} - -{% set active_page = "admin_console" %} - -{% block title %} - - Admin Console - {{ SITE_NAME }} - -{% endblock %} - -{% block dashboard_stat %} -
-
-
-
-

- PowerDNS - Server Statistics & Configuration -

-
-
- -
-
-
-
-{% endblock %} - -{% block content %} -
-
-
-
-
-
-

PowerDNS Server Statistics

-
-
- - - - - - - - - {% for statistic in statistics %} - - - - - {% endfor %} - -
StatisticValue
- -  {{ statistic['name'] }} - - {{ statistic['value'] }}
-
-
-
-
-
-
-
-
-

PowerDNS Server Configuration

-
-
- - - - - - - - - {% for config in configs %} - - - - - {% endfor %} - -
ConfigurationValue
- -  {{ config['name'] }} - - - {{ config['value'] }} -
-
-
-
-
-
-
-{% endblock %} - -{% block extrascripts %} - -{% endblock %} diff --git a/powerdnsadmin/templates/admin_server_configuration.html b/powerdnsadmin/templates/admin_server_configuration.html new file mode 100644 index 0000000..a56bd12 --- /dev/null +++ b/powerdnsadmin/templates/admin_server_configuration.html @@ -0,0 +1,84 @@ +{% extends "base.html" %} +{% set active_page = "server_configuration" %} +{% block title %}Server Configuration - {{ SITE_NAME }}{% endblock %} + +{% block dashboard_stat %} +
+
+
+
+

+ Server Configuration +

+
+
+ +
+
+
+
+{% endblock %} + +{% block content %} +
+
+
+
+
+
+

Server Configuration

+
+ +
+ + + + + + + + + {% for config in configs %} + + + + + {% endfor %} + +
ConfigurationValue
+ +  {{ config['name'] }} + + + {{ config['value'] }} +
+
+ +
+ +
+ +
+ +
+ +
+{% endblock %} + +{% block extrascripts %} + +{% endblock %} diff --git a/powerdnsadmin/templates/admin_server_statistics.html b/powerdnsadmin/templates/admin_server_statistics.html new file mode 100644 index 0000000..16ff7d0 --- /dev/null +++ b/powerdnsadmin/templates/admin_server_statistics.html @@ -0,0 +1,82 @@ +{% extends "base.html" %} +{% set active_page = "server_statistics" %} +{% block title %}Server Statistics - {{ SITE_NAME }}{% endblock %} + +{% block dashboard_stat %} +
+
+
+
+

+ Server Statistics +

+
+
+ +
+
+
+
+{% endblock %} + +{% block content %} +
+
+
+
+
+
+

Server Statistics

+
+ +
+ + + + + + + + + {% for statistic in statistics %} + + + + + {% endfor %} + +
StatisticValue
+ +  {{ statistic['name'] }} + + {{ statistic['value'] }}
+
+ +
+ +
+ +
+ +
+ +
+{% endblock %} + +{% block extrascripts %} + +{% endblock %} diff --git a/powerdnsadmin/templates/base.html b/powerdnsadmin/templates/base.html index b3ff92c..d23d6b3 100644 --- a/powerdnsadmin/templates/base.html +++ b/powerdnsadmin/templates/base.html @@ -115,10 +115,16 @@ {% endif %} {% if current_user.role.name in ['Administrator', 'Operator'] %} -
  • - - -

    PowerDNS Info

    +
  • + + +

    Server Statistics

    +
    +
  • +
  • + + +

    Server Configuration