History Tab Overhaul & Domain Record Modifications Changelog (#1042)

Co-authored-by: Konstantinos Kouris <85997752+konkourgr@users.noreply.github.com>
Co-authored-by: vmarkop <billy.mark.b.m.10@gmail.com>
Co-authored-by: KostasMparmparousis <mparmparousis.kostas@gmail.com>
Co-authored-by: dimpapac <demispapa@gmail.com>
This commit is contained in:
ManosKoukoularis
2021-11-30 11:02:37 +02:00
committed by GitHub
parent b3f9b4a2b0
commit 1332c8d29d
37 changed files with 18978 additions and 128 deletions

View File

@ -32,13 +32,171 @@ admin_bp = Blueprint('admin',
template_folder='templates',
url_prefix='/admin')
"""
changeSet is a list of tuples, in the following format
(old_state, new_state, change_type)
old_state: dictionary with "disabled" and "content" keys. {"disabled" : False, "content" : "1.1.1.1" }
new_state: similarly
change_type: "addition" or "deletion" or "status" for status change or "unchanged" for no change
Note: A change in "content", is considered a deletion and recreation of the same record,
holding the new content value.
"""
def get_record_changes(del_rrest, add_rrest):
changeSet = []
delSet = del_rrest['records'] if 'records' in del_rrest else []
addSet = add_rrest['records'] if 'records' in add_rrest else []
for d in delSet: # get the deletions and status changes
exists = False
for a in addSet:
if d['content'] == a['content']:
exists = True
if d['disabled'] != a['disabled']:
changeSet.append( ({"disabled":d['disabled'],"content":d['content']},
{"disabled":a['disabled'],"content":a['content']},
"status") )
break
if not exists: # deletion
changeSet.append( ({"disabled":d['disabled'],"content":d['content']},
None,
"deletion") )
for a in addSet: # get the additions
exists = False
for d in delSet:
if d['content'] == a['content']:
exists = True
# already checked for status change
break
if not exists:
changeSet.append( (None, {"disabled":a['disabled'], "content":a['content']}, "addition") )
continue
for a in addSet: # get the unchanged
exists = False
for c in changeSet:
if c[1] != None and c[1]["content"] == a['content']:
exists = True
break
if not exists:
changeSet.append( ( {"disabled":a['disabled'], "content":a['content']}, {"disabled":a['disabled'], "content":a['content']}, "unchanged") )
return changeSet
# out_changes is a list of HistoryRecordEntry objects in which we will append the new changes
# a HistoryRecordEntry represents a pair of add_rrest and del_rrest
def extract_changelogs_from_a_history_entry(out_changes, history_entry, change_num, record_name=None, record_type=None):
if history_entry.detail is None:
return
detail_dict = json.loads(history_entry.detail.replace("'", '"'))
if "add_rrests" not in detail_dict:
return
add_rrests = detail_dict['add_rrests']
del_rrests = detail_dict['del_rrests']
for add_rrest in add_rrests:
exists = False
for del_rrest in del_rrests:
if del_rrest['name'] == add_rrest['name'] and del_rrest['type'] == add_rrest['type']:
exists = True
if change_num not in out_changes:
out_changes[change_num] = []
out_changes[change_num].append(HistoryRecordEntry(history_entry, del_rrest, add_rrest, "*"))
break
if not exists: # this is a new record
if change_num not in out_changes:
out_changes[change_num] = []
out_changes[change_num].append(HistoryRecordEntry(history_entry, [], add_rrest, "+")) # (add_rrest, del_rrest, change_type)
for del_rrest in del_rrests:
exists = False
for add_rrest in add_rrests:
if del_rrest['name'] == add_rrest['name'] and del_rrest['type'] == add_rrest['type']:
exists = True # no need to add in the out_changes set
break
if not exists: # this is a deletion
if change_num not in out_changes:
out_changes[change_num] = []
out_changes[change_num].append(HistoryRecordEntry(history_entry, del_rrest, [], "-"))
# only used for changelog per record
if record_name != None and record_type != None: # then get only the records with the specific (record_name, record_type) tuple
if change_num in out_changes:
changes_i = out_changes[change_num]
else:
return
for hre in changes_i: # for each history record entry in changes_i
if 'type' in hre.add_rrest and hre.add_rrest['name'] == record_name and hre.add_rrest['type'] == record_type:
continue
elif 'type' in hre.del_rrest and hre.del_rrest['name'] == record_name and hre.del_rrest['type'] == record_type:
continue
else:
out_changes[change_num].remove(hre)
# records with same (name,type) are considered as a single HistoryRecordEntry
# history_entry is of type History - used to extract created_by and created_on
# add_rrest is a dictionary of replace
# del_rrest is a dictionary of remove
class HistoryRecordEntry:
def __init__(self, history_entry, del_rrest, add_rrest, change_type):
# search the add_rrest index into the add_rrest set for the key (name, type)
self.history_entry = history_entry
self.add_rrest = add_rrest
self.del_rrest = del_rrest
self.change_type = change_type # "*": edit or unchanged, "+" new tuple(name,type), "-" deleted (name,type) tuple
self.changed_fields = [] # contains a subset of : [ttl, name, type]
self.changeSet = [] # all changes for the records of this add_rrest-del_rrest pair
if change_type == "+": # addition
self.changed_fields.append("name")
self.changed_fields.append("type")
self.changed_fields.append("ttl")
self.changeSet = get_record_changes(del_rrest, add_rrest)
elif change_type == "-": # removal
self.changed_fields.append("name")
self.changed_fields.append("type")
self.changed_fields.append("ttl")
self.changeSet = get_record_changes(del_rrest, add_rrest)
elif change_type == "*": # edit of unchanged
if add_rrest['ttl'] != del_rrest['ttl']:
self.changed_fields.append("ttl")
self.changeSet = get_record_changes(del_rrest, add_rrest)
def toDict(self):
return {
"add_rrest" : self.add_rrest,
"del_rrest" : self.del_rrest,
"changed_fields" : self.changed_fields,
"created_on" : self.history_entry.created_on,
"created_by" : self.history_entry.created_by,
"change_type" : self.change_type,
"changeSet" : self.changeSet
}
def __eq__(self, obj2): # used for removal of objects from a list
return True if obj2.toDict() == self.toDict() else False
@admin_bp.before_request
def before_request():
# Manage session timeout
session.permanent = True
# current_app.permanent_session_lifetime = datetime.timedelta(
# minutes=int(Setting().get('session_timeout')))
current_app.permanent_session_lifetime = datetime.timedelta(
minutes=int(Setting().get('session_timeout')))
minutes=int(Setting().get('session_timeout')))
session.modified = True
@ -581,55 +739,469 @@ def manage_account():
}), 400)
class DetailedHistory():
def __init__(self, history, change_set):
self.history = history
self.detailed_msg = ""
self.change_set = change_set
if history.detail is None:
self.detailed_msg = ""
# if 'Create account' in history.msg:
# account = Account.query.filter(
# Account.name == history.msg.split(' ')[2]).first()
# self.detailed_msg = str(account.get_user())
# # WRONG, cannot do query afterwards, db may have changed
return
detail_dict = json.loads(history.detail.replace("'", '"'))
if 'domain_type' in detail_dict.keys() and 'account_id' in detail_dict.keys(): # this is a domain creation
self.detailed_msg = """
<table class="table table-bordered table-striped"><tr><td>Domain type:</td><td>{0}</td></tr> <tr><td>Account:</td><td>{1}</td></tr></table>
""".format(detail_dict['domain_type'],
Account.get_name_by_id(self=None, account_id=detail_dict['account_id']) if detail_dict['account_id'] != "0" else "None")
elif 'authenticator' in detail_dict.keys(): # this is a user authentication
self.detailed_msg = """
<table class="table table-bordered table-striped" style="width:565px;">
<thead>
<tr>
<th colspan="3" style="background:
"""
# Change table header background colour depending on auth success or failure
if detail_dict['success'] == 1:
self.detailed_msg+= """
rgba(68,157,68);"> <p style="color:white;">
User {0} authentication success
</p></th>
""".format(detail_dict['username'])
else:
self.detailed_msg+= """
rgba(201,48,44);"> <p style="color:white;">
User {0} authentication failure
</th>
""".format(detail_dict['username'])
self.detailed_msg+= """
</tr>
</thead>
<tbody>
<tr>
<td>Authenticator Type:</td>
<td colspan="2">{0}</td>
</tr>
<tr>
<td>IP Address</td>
<td colspan="2">{1}</td>
</tr>
</tbody>
</table>
""".format(detail_dict['authenticator'], detail_dict['ip_address'])
elif 'add_rrests' in detail_dict.keys(): # this is a domain record change
# changes_set = []
self.detailed_msg = ""
# extract_changelogs_from_a_history_entry(changes_set, history, 0)
elif 'name' in detail_dict.keys() and 'template' in history.msg: # template creation
self.detailed_msg = """
<table class="table table-bordered table-striped"><tr><td>Template name:</td><td>{0}</td></tr> <tr><td>Description:</td><td>{1}</td></tr></table>
""".format(detail_dict['name'], detail_dict['description'])
elif 'Change domain' in history.msg and 'access control' in history.msg: # added or removed a user from a domain
self.detailed_msg = """
<table class="table table-bordered table-striped"><tr><td>Users with access to this domain</td><td>{0}</td></tr><tr><td>Number of users:</td><td>{1}</td><tr></table>
""".format(str(detail_dict['user_has_access']).replace("]","").replace("[", ""), len((detail_dict['user_has_access'])))
elif 'Created API key' in history.msg or 'Updated API key' in history.msg:
self.detailed_msg = """
<table class="table table-bordered table-striped">
<tr><td>Key: </td><td>{0}</td></tr>
<tr><td>Role:</td><td>{1}</td></tr>
<tr><td>Description:</td><td>{2}</td></tr>
<tr><td>Accessible domains with this API key:</td><td>{3}</td></tr>
</table>
""".format(detail_dict['key'], detail_dict['role'], detail_dict['description'], str(detail_dict['domain_acl']).replace("]","").replace("[", ""))
elif 'Update type for domain' in history.msg:
self.detailed_msg = """
<table class="table table-bordered table-striped">
<tr><td>Domain: </td><td>{0}</td></tr>
<tr><td>Domain type:</td><td>{1}</td></tr>
<tr><td>Masters:</td><td>{2}</td></tr>
</table>
""".format(detail_dict['domain'], detail_dict['type'], str(detail_dict['masters']).replace("]","").replace("[", ""))
elif 'Delete API key' in history.msg:
self.detailed_msg = """
<table class="table table-bordered table-striped">
<tr><td>Key: </td><td>{0}</td></tr>
<tr><td>Role:</td><td>{1}</td></tr>
<tr><td>Description:</td><td>{2}</td></tr>
<tr><td>Accessible domains with this API key:</td><td>{3}</td></tr>
</table>
""".format(detail_dict['key'], detail_dict['role'], detail_dict['description'], str(detail_dict['domains']).replace("]","").replace("[", ""))
elif 'reverse' in history.msg:
self.detailed_msg = """
<table class="table table-bordered table-striped">
<tr><td>Domain Type: </td><td>{0}</td></tr>
<tr><td>Domain Master IPs:</td><td>{1}</td></tr>
</table>
""".format(detail_dict['domain_type'], detail_dict['domain_master_ips'])
# convert a list of History objects into DetailedHistory objects
def convert_histories(histories):
changes_set = dict()
detailedHistories = []
j = 0
for i in range(len(histories)):
# if histories[i].detail != None and 'add_rrests' in json.loads(histories[i].detail.replace("'", '"')):
if histories[i].detail != None and ('add_rrests' in json.loads(histories[i].detail.replace("'", '"')) or 'del_rrests' in json.loads(histories[i].detail.replace("'", '"'))):
extract_changelogs_from_a_history_entry(changes_set, histories[i], j)
if j in changes_set:
detailedHistories.append(DetailedHistory(histories[i], changes_set[j]))
else: # no changes were found
detailedHistories.append(DetailedHistory(histories[i], None))
j += 1
else:
detailedHistories.append(DetailedHistory(histories[i], None))
return detailedHistories
@admin_bp.route('/history', methods=['GET', 'POST'])
@login_required
@history_access_required
def history():
if request.method == 'POST':
if current_user.role.name != 'Administrator':
return make_response(
jsonify({
'status': 'error',
'msg': 'You do not have permission to remove history.'
}), 401)
if request.method == 'POST':
if current_user.role.name != 'Administrator':
return make_response(
jsonify({
'status': 'error',
'msg': 'You do not have permission to remove history.'
}), 401)
h = History()
result = h.remove_all()
if result:
history = History(msg='Remove all histories',
created_by=current_user.username)
history.add()
return make_response(
jsonify({
'status': 'ok',
'msg': 'Changed user role successfully.'
}), 200)
else:
return make_response(
jsonify({
'status': 'error',
'msg': 'Can not remove histories.'
}), 500)
h = History()
result = h.remove_all()
if result:
history = History(msg='Remove all histories',
created_by=current_user.username)
history.add()
return make_response(
jsonify({
'status': 'ok',
'msg': 'Changed user role successfully.'
}), 200)
else:
return make_response(
jsonify({
'status': 'error',
'msg': 'Can not remove histories.'
}), 500)
if request.method == 'GET':
if current_user.role.name in [ 'Administrator', 'Operator' ]:
histories = History.query.all()
else:
# if the user isn't an administrator or operator,
# allow_user_view_history must be enabled to get here,
# so include history for the domains for the user
histories = db.session.query(History) \
.join(Domain, History.domain_id == Domain.id) \
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.filter(
db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
))
return render_template('admin_history.html', histories=histories)
if request.method == 'GET':
doms = accounts = users = ""
if current_user.role.name in [ 'Administrator', 'Operator']:
all_domain_names = Domain.query.all()
all_account_names = Account.query.all()
all_user_names = User.query.all()
for d in all_domain_names:
doms += d.name + " "
for acc in all_account_names:
accounts += acc.name + " "
for usr in all_user_names:
users += usr.username + " "
else: # special autocomplete for users
all_domain_names = db.session.query(Domain) \
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.filter(
db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
)).all()
all_account_names = db.session.query(Account) \
.outerjoin(Domain, Domain.account_id == Account.id) \
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.filter(
db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
)).all()
all_user_names = []
for a in all_account_names:
temp = db.session.query(User) \
.join(AccountUser, AccountUser.user_id == User.id) \
.outerjoin(Account, Account.id == AccountUser.account_id) \
.filter(
db.or_(
Account.id == a.id,
AccountUser.account_id == a.id
)
) \
.all()
for u in temp:
if u in all_user_names:
continue
all_user_names.append(u)
for d in all_domain_names:
doms += d.name + " "
for a in all_account_names:
accounts += a.name + " "
for u in all_user_names:
users += u.username + " "
return render_template('admin_history.html', all_domain_names=doms, all_account_names=accounts, all_usernames=users)
# local_offset is the offset of the utc to the local time
# offset must be int
# return the date converted and simplified
def from_utc_to_local(local_offset, timeframe):
offset = str(local_offset *(-1))
date_split = str(timeframe).split(".")[0]
date_converted = datetime.datetime.strptime(date_split, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(minutes=int(offset))
return date_converted
@admin_bp.route('/history_table', methods=['GET', 'POST'])
@login_required
@history_access_required
def history_table(): # ajax call data
if request.method == 'POST':
if current_user.role.name != 'Administrator':
return make_response(
jsonify({
'status': 'error',
'msg': 'You do not have permission to remove history.'
}), 401)
h = History()
result = h.remove_all()
if result:
history = History(msg='Remove all histories',
created_by=current_user.username)
history.add()
return make_response(
jsonify({
'status': 'ok',
'msg': 'Changed user role successfully.'
}), 200)
else:
return make_response(
jsonify({
'status': 'error',
'msg': 'Can not remove histories.'
}), 500)
detailedHistories = []
lim = int(Setting().get('max_history_records')) # max num of records
if request.method == 'GET':
if current_user.role.name in [ 'Administrator', 'Operator' ]:
base_query = History.query
else:
# if the user isn't an administrator or operator,
# allow_user_view_history must be enabled to get here,
# so include history for the domains for the user
base_query = db.session.query(History) \
.join(Domain, History.domain_id == Domain.id) \
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.filter(
db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
))
domain_name = request.args.get('domain_name_filter') if request.args.get('domain_name_filter') != None \
and len(request.args.get('domain_name_filter')) != 0 else None
account_name = request.args.get('account_name_filter') if request.args.get('account_name_filter') != None \
and len(request.args.get('account_name_filter')) != 0 else None
user_name = request.args.get('auth_name_filter') if request.args.get('auth_name_filter') != None \
and len(request.args.get('auth_name_filter')) != 0 else None
min_date = request.args.get('min') if request.args.get('min') != None and len( request.args.get('min')) != 0 else None
if min_date != None: # get 1 day earlier, to check for timezone errors
min_date = str(datetime.datetime.strptime(min_date, '%Y-%m-%d') - datetime.timedelta(days=1))
max_date = request.args.get('max') if request.args.get('max') != None and len( request.args.get('max')) != 0 else None
if max_date != None: # get 1 day later, to check for timezone errors
max_date = str(datetime.datetime.strptime(max_date, '%Y-%m-%d') + datetime.timedelta(days=1))
tzoffset = request.args.get('tzoffset') if request.args.get('tzoffset') != None and len(request.args.get('tzoffset')) != 0 else None
changed_by = request.args.get('user_name_filter') if request.args.get('user_name_filter') != None \
and len(request.args.get('user_name_filter')) != 0 else None
"""
Auth methods: LOCAL, Github OAuth, Azure OAuth, SAML, OIDC OAuth, Google OAuth
"""
auth_methods = []
if (request.args.get('auth_local_only_checkbox') is None \
and request.args.get('auth_oauth_only_checkbox') is None \
and request.args.get('auth_saml_only_checkbox') is None and request.args.get('auth_all_checkbox') is None):
auth_methods = []
if request.args.get('auth_all_checkbox') == "on":
auth_methods.append("")
if request.args.get('auth_local_only_checkbox') == "on":
auth_methods.append("LOCAL")
if request.args.get('auth_oauth_only_checkbox') == "on":
auth_methods.append("OAuth")
if request.args.get('auth_saml_only_checkbox') == "on":
auth_methods.append("SAML")
if request.args.get('domain_changelog_only_checkbox') != None:
changelog_only = True if request.args.get('domain_changelog_only_checkbox') == "on" else False
else:
changelog_only = False
# users cannot search for authentication
if user_name != None and current_user.role.name not in [ 'Administrator', 'Operator']:
histories = []
elif domain_name != None:
if not changelog_only:
histories = base_query \
.filter(
db.and_(
db.or_(
History.msg.like("%domain "+ domain_name) if domain_name != "*" else History.msg.like("%domain%"),
History.msg.like("%domain "+ domain_name + " access control") if domain_name != "*" else History.msg.like("%domain%access control")
),
History.created_on <= max_date if max_date != None else True,
History.created_on >= min_date if min_date != None else True,
History.created_by == changed_by if changed_by != None else True
)
).order_by(History.created_on.desc()).limit(lim).all()
else:
# search for records changes only
histories = base_query \
.filter(
db.and_(
History.msg.like("Apply record changes to domain " + domain_name) if domain_name != "*" \
else History.msg.like("Apply record changes to domain%"),
History.created_on <= max_date if max_date != None else True,
History.created_on >= min_date if min_date != None else True,
History.created_by == changed_by if changed_by != None else True
)
).order_by(History.created_on.desc()) \
.limit(lim).all()
elif account_name != None:
if current_user.role.name in ['Administrator', 'Operator']:
histories = base_query \
.join(Domain, History.domain_id == Domain.id) \
.outerjoin(Account, Domain.account_id == Account.id) \
.filter(
db.and_(
Account.id == Domain.account_id,
account_name == Account.name if account_name != "*" else True,
History.created_on <= max_date if max_date != None else True,
History.created_on >= min_date if min_date != None else True,
History.created_by == changed_by if changed_by != None else True
)
).order_by(History.created_on.desc()) \
.limit(lim).all()
else:
histories = base_query \
.filter(
db.and_(
Account.id == Domain.account_id,
account_name == Account.name if account_name != "*" else True,
History.created_on <= max_date if max_date != None else True,
History.created_on >= min_date if min_date != None else True,
History.created_by == changed_by if changed_by != None else True
)
).order_by(History.created_on.desc()) \
.limit(lim).all()
elif user_name != None and current_user.role.name in [ 'Administrator', 'Operator']: # only admins can see the user login-logouts
histories = History.query \
.filter(
db.and_(
db.or_(
History.msg.like("User "+ user_name + " authentication%") if user_name != "*" and user_name != None else History.msg.like("%authentication%"),
History.msg.like("User "+ user_name + " was not authorized%") if user_name != "*" and user_name != None else History.msg.like("User%was not authorized%")
),
History.created_on <= max_date if max_date != None else True,
History.created_on >= min_date if min_date != None else True,
History.created_by == changed_by if changed_by != None else True
)
) \
.order_by(History.created_on.desc()).limit(lim).all()
temp = []
for h in histories:
for method in auth_methods:
if method in h.detail:
temp.append(h)
break
histories = temp
elif (changed_by != None or max_date != None) and current_user.role.name in [ 'Administrator', 'Operator'] : # select changed by and date filters only
histories = History.query \
.filter(
db.and_(
History.created_on <= max_date if max_date != None else True,
History.created_on >= min_date if min_date != None else True,
History.created_by == changed_by if changed_by != None else True
)
) \
.order_by(History.created_on.desc()).limit(lim).all()
elif (changed_by != None or max_date != None): # special filtering for user because one user does not have access to log-ins logs
histories = base_query \
.filter(
db.and_(
History.created_on <= max_date if max_date != None else True,
History.created_on >= min_date if min_date != None else True,
History.created_by == changed_by if changed_by != None else True
)
) \
.order_by(History.created_on.desc()).limit(lim).all()
elif max_date != None: # if changed by == null and only date is applied
histories = base_query.filter(
db.and_(
History.created_on <= max_date if max_date != None else True,
History.created_on >= min_date if min_date != None else True,
)
).order_by(History.created_on.desc()).limit(lim).all()
else: # default view
if current_user.role.name in [ 'Administrator', 'Operator']:
histories = History.query.order_by(History.created_on.desc()).limit(lim).all()
else:
histories = db.session.query(History) \
.join(Domain, History.domain_id == Domain.id) \
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.order_by(History.created_on.desc()) \
.filter(
db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
)).limit(lim).all()
detailedHistories = convert_histories(histories)
# Remove dates from previous or next day that were brought over
if tzoffset != None:
if min_date != None:
min_date_split = min_date.split()[0]
if max_date != None:
max_date_split = max_date.split()[0]
for i, history_rec in enumerate(detailedHistories):
local_date = str(from_utc_to_local(int(tzoffset), history_rec.history.created_on).date())
if (min_date != None and local_date == min_date_split) or (max_date != None and local_date == max_date_split):
detailedHistories[i] = None
# Remove elements previously flagged as None
detailedHistories = [h for h in detailedHistories if h is not None]
return render_template('admin_history_table.html', histories=detailedHistories, len_histories=len(detailedHistories), lim=lim)
@admin_bp.route('/setting/basic', methods=['GET'])
@ -644,9 +1216,9 @@ def setting_basic():
'pretty_ipv6_ptr', 'dnssec_admins_only',
'allow_user_create_domain', 'allow_user_remove_domain', 'allow_user_view_history', 'bg_domain_updates', 'site_name',
'session_timeout', 'warn_session_timeout', 'ttl_options',
'pdns_api_timeout', 'verify_ssl_connections', 'verify_user_email',
'delete_sso_accounts', 'otp_field_enabled', 'custom_css', 'enable_api_rr_history'
'delete_sso_accounts', 'otp_field_enabled', 'custom_css', 'enable_api_rr_history', 'max_history_records'
]
return render_template('admin_setting_basic.html', settings=settings)

View File

@ -61,7 +61,7 @@ def domains_custom(boxId):
))
template = current_app.jinja_env.get_template("dashboard_domain.html")
render = template.make_module(vars={"current_user": current_user})
render = template.make_module(vars={"current_user": current_user, "allow_user_view_history": Setting().get('allow_user_view_history')})
columns = [
Domain.name, Domain.dnssec, Domain.type, Domain.serial, Domain.master,
@ -163,19 +163,20 @@ def dashboard():
if current_user.role.name in ['Administrator', 'Operator']:
domain_count = Domain.query.count()
history_number = History.query.count()
history = History.query.order_by(History.created_on.desc()).limit(4)
history = History.query.order_by(History.created_on.desc()).limit(4).all()
elif Setting().get('allow_user_view_history'):
history = db.session.query(History) \
.join(Domain, History.domain_id == Domain.id) \
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.order_by(History.created_on.desc()) \
.filter(
db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
)).order_by(History.created_on.desc())
history_number = history.count()
)).all()
history_number = len(history) # history.count()
history = history[:4]
domain_count = db.session.query(Domain) \
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
@ -186,6 +187,10 @@ def dashboard():
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
)).count()
from .admin import convert_histories, DetailedHistory
detailedHistories = convert_histories(history)
server = Server(server_id='localhost')
statistics = server.get_statistic()
if statistics:
@ -202,7 +207,7 @@ def dashboard():
user_num=user_num,
history_number=history_number,
uptime=uptime,
histories=history,
histories=detailedHistories,
show_bg_domain_button=show_bg_domain_button,
pdns_version=Setting().get('pdns_version'))

View File

@ -24,7 +24,8 @@ from ..models.domain_setting import DomainSetting
from ..models.base import db
from ..models.domain_user import DomainUser
from ..models.account_user import AccountUser
from .admin import extract_changelogs_from_a_history_entry
from ..decorators import history_access_required
domain_bp = Blueprint('domain',
__name__,
template_folder='templates',
@ -131,7 +132,8 @@ def domain(domain_name):
records=records,
editable_records=editable_records,
quick_edit=quick_edit,
ttl_options=ttl_options)
ttl_options=ttl_options,
current_user=current_user)
@domain_bp.route('/remove', methods=['GET', 'POST'])
@ -187,6 +189,160 @@ def remove():
return render_template('domain_remove.html',
domainss=domains)
@domain_bp.route('/<path:domain_name>/changelog', methods=['GET'])
@login_required
@can_access_domain
@history_access_required
def changelog(domain_name):
g.user = current_user
login_manager.anonymous_user = Anonymous
domain = Domain.query.filter(Domain.name == domain_name).first()
if not domain:
abort(404)
# Query domain's rrsets from PowerDNS API
rrsets = Record().get_rrsets(domain.name)
current_app.logger.debug("Fetched rrests: \n{}".format(pretty_json(rrsets)))
# API server might be down, misconfigured
if not rrsets and domain.type != 'Slave':
abort(500)
records_allow_to_edit = Setting().get_records_allow_to_edit()
records = []
# get all changelogs for this domain, in descening order
if current_user.role.name in [ 'Administrator', 'Operator' ]:
histories = History.query.filter(History.domain_id == domain.id).order_by(History.created_on.desc()).all()
else:
# if the user isn't an administrator or operator,
# allow_user_view_history must be enabled to get here,
# so include history for the domains for the user
histories = db.session.query(History) \
.join(Domain, History.domain_id == Domain.id) \
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.order_by(History.created_on.desc()) \
.filter(
db.and_(db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
),
History.domain_id == domain.id
)
).all()
if StrictVersion(Setting().get('pdns_version')) >= StrictVersion('4.0.0'):
for r in rrsets:
if r['type'] in records_allow_to_edit:
r_name = r['name'].rstrip('.')
# If it is reverse zone and pretty_ipv6_ptr setting
# is enabled, we reformat the name for ipv6 records.
if Setting().get('pretty_ipv6_ptr') and r[
'type'] == 'PTR' and 'ip6.arpa' in r_name and '*' not in r_name:
r_name = dns.reversename.to_address(
dns.name.from_text(r_name))
# Create the list of records in format that
# PDA jinja2 template can understand.
index = 0
for record in r['records']:
if (len(r['comments'])>index):
c=r['comments'][index]['content']
else:
c=''
record_entry = RecordEntry(
name=r_name,
type=r['type'],
status='Disabled' if record['disabled'] else 'Active',
ttl=r['ttl'],
data=record['content'],
comment=c,
is_allowed_edit=True)
index += 1
records.append(record_entry)
else:
# Unsupported version
abort(500)
changes_set = dict()
for i in range(len(histories)):
extract_changelogs_from_a_history_entry(changes_set, histories[i], i)
if i in changes_set and len(changes_set[i]) == 0: # if empty, then remove the key
changes_set.pop(i)
return render_template('domain_changelog.html', domain=domain, allHistoryChanges=changes_set)
"""
Returns a changelog for a specific pair of (record_name, record_type)
"""
@domain_bp.route('/<path:domain_name>/changelog/<path:record_name>-<path:record_type>', methods=['GET'])
@login_required
@can_access_domain
@history_access_required
def record_changelog(domain_name, record_name, record_type):
g.user = current_user
login_manager.anonymous_user = Anonymous
domain = Domain.query.filter(Domain.name == domain_name).first()
if not domain:
abort(404)
# Query domain's rrsets from PowerDNS API
rrsets = Record().get_rrsets(domain.name)
current_app.logger.debug("Fetched rrests: \n{}".format(pretty_json(rrsets)))
# API server might be down, misconfigured
if not rrsets and domain.type != 'Slave':
abort(500)
# get all changelogs for this domain, in descening order
if current_user.role.name in [ 'Administrator', 'Operator' ]:
histories = History.query.filter(History.domain_id == domain.id).order_by(History.created_on.desc()).all()
else:
# if the user isn't an administrator or operator,
# allow_user_view_history must be enabled to get here,
# so include history for the domains for the user
histories = db.session.query(History) \
.join(Domain, History.domain_id == Domain.id) \
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.order_by(History.created_on.desc()) \
.filter(
db.and_(db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
),
History.domain_id == domain.id
)
).all()
changes_set_of_record = dict()
for i in range(len(histories)):
extract_changelogs_from_a_history_entry(changes_set_of_record, histories[i], i, record_name, record_type)
if i in changes_set_of_record and len(changes_set_of_record[i]) == 0: # if empty, then remove the key
changes_set_of_record.pop(i)
indexes_to_pop = []
for change_num in changes_set_of_record:
changes_i = changes_set_of_record[change_num]
for hre in changes_i: # for each history record entry in changes_i
if 'type' in hre.add_rrest and hre.add_rrest['name'] == record_name and hre.add_rrest['type'] == record_type:
continue
elif 'type' in hre.del_rrest and hre.del_rrest['name'] == record_name and hre.del_rrest['type'] == record_type:
continue
else:
changes_set_of_record[change_num].remove(hre)
if change_num in changes_set_of_record and len(changes_set_of_record[change_num]) == 0: # if empty, then remove the key
indexes_to_pop.append(change_num)
for i in indexes_to_pop:
changes_set_of_record.pop(i)
return render_template('domain_changelog.html', domain=domain, allHistoryChanges=changes_set_of_record)
@domain_bp.route('/add', methods=['GET', 'POST'])
@login_required