Refactored zone history retrieval, parsing and displaying code.

This commit is contained in:
Rauno Tuul
2023-04-04 15:32:52 +03:00
parent 53cfa4fdaa
commit fe10665e19
5 changed files with 224 additions and 303 deletions

View File

@@ -25,7 +25,7 @@ from ..models.domain_setting import DomainSetting
from ..models.base import db
from ..models.domain_user import DomainUser
from ..models.account_user import AccountUser
from .admin import extract_changelogs_from_a_history_entry
from .admin import extract_changelogs_from_history
from ..decorators import history_access_required
domain_bp = Blueprint('domain',
__name__,
@@ -201,17 +201,6 @@ def changelog(domain_name):
if not domain:
abort(404)
# Query domain's rrsets from PowerDNS API
rrsets = Record().get_rrsets(domain.name)
current_app.logger.debug("Fetched rrsets: \n{}".format(pretty_json(rrsets)))
# API server might be down, misconfigured
if not rrsets and domain.type != 'Slave':
abort(500)
records_allow_to_edit = Setting().get_records_allow_to_edit()
records = []
# get all changelogs for this domain, in descening order
if current_user.role.name in [ 'Administrator', 'Operator' ]:
histories = History.query.filter(History.domain_id == domain.id).order_by(History.created_on.desc()).all()
@@ -230,49 +219,13 @@ def changelog(domain_name):
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
),
History.domain_id == domain.id
History.domain_id == domain.id,
History.detail.isnot(None)
)
).all()
if StrictVersion(Setting().get('pdns_version')) >= StrictVersion('4.0.0'):
for r in rrsets:
if r['type'] in records_allow_to_edit:
r_name = r['name'].rstrip('.')
changes_set = extract_changelogs_from_history(histories)
# If it is reverse zone and pretty_ipv6_ptr setting
# is enabled, we reformat the name for ipv6 records.
if Setting().get('pretty_ipv6_ptr') and r[
'type'] == 'PTR' and 'ip6.arpa' in r_name and '*' not in r_name:
r_name = dns.reversename.to_address(
dns.name.from_text(r_name))
# Create the list of records in format that
# PDA jinja2 template can understand.
index = 0
for record in r['records']:
if (len(r['comments'])>index):
c=r['comments'][index]['content']
else:
c=''
record_entry = RecordEntry(
name=r_name,
type=r['type'],
status='Disabled' if record['disabled'] else 'Active',
ttl=r['ttl'],
data=record['content'],
comment=c,
is_allowed_edit=True)
index += 1
records.append(record_entry)
else:
# Unsupported version
abort(500)
changes_set = dict()
for i in range(len(histories)):
extract_changelogs_from_a_history_entry(changes_set, histories[i], i)
if i in changes_set and len(changes_set[i]) == 0: # if empty, then remove the key
changes_set.pop(i)
return render_template('domain_changelog.html', domain=domain, allHistoryChanges=changes_set)
"""
@@ -289,17 +242,18 @@ def record_changelog(domain_name, record_name, record_type):
domain = Domain.query.filter(Domain.name == domain_name).first()
if not domain:
abort(404)
# Query domain's rrsets from PowerDNS API
rrsets = Record().get_rrsets(domain.name)
current_app.logger.debug("Fetched rrsets: \n{}".format(pretty_json(rrsets)))
# API server might be down, misconfigured
if not rrsets and domain.type != 'Slave':
abort(500)
# get all changelogs for this domain, in descening order
if current_user.role.name in [ 'Administrator', 'Operator' ]:
histories = History.query.filter(History.domain_id == domain.id).order_by(History.created_on.desc()).all()
histories = History.query \
.filter(
db.and_(
History.domain_id == domain.id,
History.detail.like("%{}%".format(record_name))
)
) \
.order_by(History.created_on.desc()) \
.all()
else:
# if the user isn't an administrator or operator,
# allow_user_view_history must be enabled to get here,
@@ -309,43 +263,23 @@ def record_changelog(domain_name, record_name, record_type):
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.order_by(History.created_on.desc()) \
.filter(
db.and_(db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
),
History.domain_id == domain.id
History.domain_id == domain.id,
History.detail.like("%{}%".format(record_name))
)
).all()
) \
.order_by(History.created_on.desc()) \
.all()
changes_set_of_record = dict()
for i in range(len(histories)):
extract_changelogs_from_a_history_entry(changes_set_of_record, histories[i], i, record_name, record_type)
if i in changes_set_of_record and len(changes_set_of_record[i]) == 0: # if empty, then remove the key
changes_set_of_record.pop(i)
changes_set = extract_changelogs_from_history(histories, record_name, record_type)
indexes_to_pop = []
for change_num in changes_set_of_record:
changes_i = changes_set_of_record[change_num]
for hre in changes_i: # for each history record entry in changes_i
if 'type' in hre.add_rrset and hre.add_rrset['name'] == record_name and hre.add_rrset['type'] == record_type:
continue
elif 'type' in hre.del_rrset and hre.del_rrset['name'] == record_name and hre.del_rrset['type'] == record_type:
continue
else:
changes_set_of_record[change_num].remove(hre)
if change_num in changes_set_of_record and len(changes_set_of_record[change_num]) == 0: # if empty, then remove the key
indexes_to_pop.append(change_num)
for i in indexes_to_pop:
changes_set_of_record.pop(i)
return render_template('domain_changelog.html', domain=domain, allHistoryChanges=changes_set_of_record,
return render_template('domain_changelog.html', domain=domain, allHistoryChanges=changes_set,
record_name = record_name, record_type = record_type)
@domain_bp.route('/add', methods=['GET', 'POST'])
@login_required
@can_create_domain