Corrected issue with encoding / decoding of dictionary and list type settings values.

Updated zone record settings management to use valid JSON format with backwards compatibility support for the non-JSON literal format.
This commit is contained in:
Matt Scott 2023-04-14 18:52:27 -04:00
parent c842d09195
commit ccd7373efe
No known key found for this signature in database
GPG Key ID: A9A0AFFC0E079001
3 changed files with 36 additions and 23 deletions

View File

@ -594,6 +594,11 @@ class AppSettings(object):
return int(value) return int(value)
if (var_type == dict or var_type == list) and isinstance(value, str) and len(value) > 0: if (var_type == dict or var_type == list) and isinstance(value, str) and len(value) > 0:
try:
return json.loads(value)
except JSONDecodeError as e:
# Provide backwards compatibility for legacy non-JSON format
value = value.replace("'", '"').replace('True', 'true').replace('False', 'false')
try: try:
return json.loads(value) return json.loads(value)
except JSONDecodeError as e: except JSONDecodeError as e:

View File

@ -72,13 +72,17 @@ class Setting(db.Model):
return False return False
def set(self, setting, value): def set(self, setting, value):
import json
current_setting = Setting.query.filter(Setting.name == setting).first() current_setting = Setting.query.filter(Setting.name == setting).first()
if current_setting is None: if current_setting is None:
current_setting = Setting(name=setting, value=None) current_setting = Setting(name=setting, value=None)
db.session.add(current_setting) db.session.add(current_setting)
value = str(AppSettings.convert_type(setting, value)) value = AppSettings.convert_type(setting, value)
if isinstance(value, dict) or isinstance(value, list):
value = json.dumps(value)
try: try:
current_setting.value = value current_setting.value = value

View File

@ -128,14 +128,17 @@ def extract_changelogs_from_history(histories, record_name=None, record_type=Non
# filter only the records with the specific record_name, record_type # filter only the records with the specific record_name, record_type
if record_name != None and record_type != None: if record_name != None and record_type != None:
details['add_rrsets'] = list(filter_rr_list_by_name_and_type(details['add_rrsets'], record_name, record_type)) details['add_rrsets'] = list(
details['del_rrsets'] = list(filter_rr_list_by_name_and_type(details['del_rrsets'], record_name, record_type)) filter_rr_list_by_name_and_type(details['add_rrsets'], record_name, record_type))
details['del_rrsets'] = list(
filter_rr_list_by_name_and_type(details['del_rrsets'], record_name, record_type))
if not details['add_rrsets'] and not details['del_rrsets']: if not details['add_rrsets'] and not details['del_rrsets']:
continue continue
# same record name and type RR are being deleted and created in same entry. # same record name and type RR are being deleted and created in same entry.
del_add_changes = set([(r['name'], r['type']) for r in details['add_rrsets']]).intersection([(r['name'], r['type']) for r in details['del_rrsets']]) del_add_changes = set([(r['name'], r['type']) for r in details['add_rrsets']]).intersection(
[(r['name'], r['type']) for r in details['del_rrsets']])
for del_add_change in del_add_changes: for del_add_change in del_add_changes:
changes.append(HistoryRecordEntry( changes.append(HistoryRecordEntry(
entry, entry,
@ -1271,7 +1274,8 @@ def history_table(): # ajax call data
) )
).order_by(History.created_on.desc()) \ ).order_by(History.created_on.desc()) \
.limit(lim).all() .limit(lim).all()
elif user_name != None and current_user.role.name in ['Administrator', 'Operator']: # only admins can see the user login-logouts elif user_name != None and current_user.role.name in ['Administrator',
'Operator']: # only admins can see the user login-logouts
histories = base_query.filter( histories = base_query.filter(
db.and_( db.and_(
@ -1296,7 +1300,8 @@ def history_table(): # ajax call data
temp.append(h) temp.append(h)
break break
histories = temp histories = temp
elif (changed_by != None or max_date != None) and current_user.role.name in ['Administrator', 'Operator']: # select changed by and date filters only elif (changed_by != None or max_date != None) and current_user.role.name in ['Administrator',
'Operator']: # select changed by and date filters only
histories = base_query.filter( histories = base_query.filter(
db.and_( db.and_(
History.created_on <= max_date if max_date != None else True, History.created_on <= max_date if max_date != None else True,
@ -1305,7 +1310,8 @@ def history_table(): # ajax call data
) )
) \ ) \
.order_by(History.created_on.desc()).limit(lim).all() .order_by(History.created_on.desc()).limit(lim).all()
elif (changed_by != None or max_date != None): # special filtering for user because one user does not have access to log-ins logs elif (
changed_by != None or max_date != None): # special filtering for user because one user does not have access to log-ins logs
histories = base_query.filter( histories = base_query.filter(
db.and_( db.and_(
History.created_on <= max_date if max_date != None else True, History.created_on <= max_date if max_date != None else True,
@ -1462,14 +1468,11 @@ def setting_pdns():
def setting_records(): def setting_records():
from powerdnsadmin.lib.settings import AppSettings from powerdnsadmin.lib.settings import AppSettings
if request.method == 'GET': if request.method == 'GET':
_fr = Setting().get('forward_records_allow_edit') forward_records = Setting().get('forward_records_allow_edit')
_rr = Setting().get('reverse_records_allow_edit') reverse_records = Setting().get('reverse_records_allow_edit')
f_records = literal_eval(_fr) if isinstance(_fr, str) else _fr
r_records = literal_eval(_rr) if isinstance(_rr, str) else _rr
return render_template('admin_setting_records.html', return render_template('admin_setting_records.html',
f_records=f_records, f_records=forward_records,
r_records=r_records) r_records=reverse_records)
elif request.method == 'POST': elif request.method == 'POST':
fr = {} fr = {}
rr = {} rr = {}
@ -1480,8 +1483,9 @@ def setting_records():
rr[r] = True if request.form.get('rr_{0}'.format( rr[r] = True if request.form.get('rr_{0}'.format(
r.lower())) else False r.lower())) else False
Setting().set('forward_records_allow_edit', str(fr)) Setting().set('forward_records_allow_edit', json.dumps(fr))
Setting().set('reverse_records_allow_edit', str(rr)) Setting().set('reverse_records_allow_edit', json.dumps(rr))
return redirect(url_for('admin.setting_records')) return redirect(url_for('admin.setting_records'))