2019-12-02 03:32:03 +00:00
import json
2019-12-18 08:25:20 +00:00
import datetime
2019-12-02 03:32:03 +00:00
import traceback
2021-08-05 17:37:48 +00:00
import re
2020-08-08 13:41:18 +00:00
from base64 import b64encode
2019-12-02 03:32:03 +00:00
from ast import literal_eval
2023-02-19 20:04:30 +00:00
from flask import Blueprint , render_template , render_template_string , make_response , url_for , current_app , request , \
redirect , jsonify , abort , flash , session
2019-12-07 13:20:40 +00:00
from flask_login import login_required , current_user
2019-12-02 03:32:03 +00:00
2021-03-27 18:33:11 +00:00
from . . decorators import operator_role_required , admin_role_required , history_access_required
2019-12-07 13:20:40 +00:00
from . . models . user import User
2019-12-02 03:32:03 +00:00
from . . models . account import Account
from . . models . account_user import AccountUser
from . . models . role import Role
from . . models . server import Server
from . . models . setting import Setting
from . . models . history import History
from . . models . domain import Domain
2021-03-27 18:33:11 +00:00
from . . models . domain_user import DomainUser
2019-12-02 03:32:03 +00:00
from . . models . record import Record
from . . models . domain_template import DomainTemplate
from . . models . domain_template_record import DomainTemplateRecord
2020-08-08 13:41:18 +00:00
from . . models . api_key import ApiKey
2021-03-27 18:33:11 +00:00
from . . models . base import db
2020-08-08 13:41:18 +00:00
2022-05-18 22:53:35 +00:00
from . . lib . errors import ApiKeyCreateFail
2020-08-08 13:41:18 +00:00
from . . lib . schema import ApiPlainKeySchema
apikey_plain_schema = ApiPlainKeySchema ( many = True )
2019-12-02 03:32:03 +00:00
admin_bp = Blueprint ( ' admin ' ,
__name__ ,
template_folder = ' templates ' ,
url_prefix = ' /admin ' )
2023-02-19 20:04:30 +00:00
2022-05-18 22:53:35 +00:00
def get_record_changes ( del_rrset , add_rrset ) :
2023-03-03 12:22:29 +00:00
""" Use the given deleted and added RRset to build a list of record changes.
Args :
del_rrset : The RRset with changetype DELETE , or None
add_rrset : The RRset with changetype REPLACE , or None
Returns :
A list of tuples in the format ` ( old_state , new_state , change_type ) ` . ` old_state ` and
` new_state ` are dictionaries with the keys " disabled " , " content " and " comment " .
` change_type ` can be " addition " , " deletion " , " edit " or " unchanged " . When it ' s " addition "
then ` old_state ` is None , when it ' s " deletion " then `new_state` is None.
"""
def get_records ( rrset ) :
""" For the given RRset return a combined list of records and comments. """
if not rrset or ' records ' not in rrset :
return [ ]
records = [ dict ( record ) for record in rrset [ ' records ' ] ]
for i , record in enumerate ( records ) :
if ' comments ' in rrset and len ( rrset [ ' comments ' ] ) > i :
record [ ' comment ' ] = rrset [ ' comments ' ] [ i ] . get ( ' content ' , None )
else :
record [ ' comment ' ] = None
return records
def record_is_unchanged ( old , new ) :
""" Returns True if the old record is not different from the new one. """
if old [ ' content ' ] != new [ ' content ' ] :
raise ValueError ( " Can ' t compare records with different content " )
# check everything except the content
return old [ ' disabled ' ] == new [ ' disabled ' ] and old [ ' comment ' ] == new [ ' comment ' ]
def to_state ( record ) :
""" For the given record, return the state dict. """
return {
" disabled " : record [ ' disabled ' ] ,
" content " : record [ ' content ' ] ,
" comment " : record . get ( ' comment ' , ' ' ) ,
}
add_records = get_records ( add_rrset )
del_records = get_records ( del_rrset )
changeset = [ ]
for add_record in add_records :
for del_record in list ( del_records ) :
if add_record [ ' content ' ] == del_record [ ' content ' ] :
# either edited or unchanged
if record_is_unchanged ( del_record , add_record ) :
# unchanged
changeset . append ( ( to_state ( del_record ) , to_state ( add_record ) , " unchanged " ) )
else :
# edited
changeset . append ( ( to_state ( del_record ) , to_state ( add_record ) , " edit " ) )
del_records . remove ( del_record )
2021-11-30 09:02:37 +00:00
break
2023-03-03 12:22:29 +00:00
else : # not mis-indented, else block for the del_records for loop
# addition
changeset . append ( ( None , to_state ( add_record ) , " addition " ) )
2021-12-03 14:12:11 +00:00
2023-03-03 12:22:29 +00:00
# Because the first loop removed edit/unchanged records from the del_records list,
# it now only contains real deletions.
for del_record in del_records :
changeset . append ( ( to_state ( del_record ) , None , " deletion " ) )
2021-12-03 14:12:11 +00:00
2023-03-03 12:22:29 +00:00
# Sort them by the old content. For Additions the new state will be used.
changeset . sort ( key = lambda change : change [ 0 ] [ ' content ' ] if change [ 0 ] else change [ 1 ] [ ' content ' ] )
2021-11-30 09:02:37 +00:00
2023-03-03 12:22:29 +00:00
return changeset
2021-11-30 09:02:37 +00:00
2023-02-19 20:04:30 +00:00
2023-04-04 12:32:52 +00:00
def filter_rr_list_by_name_and_type ( rrset , record_name , record_type ) :
return list ( filter ( lambda rr : rr [ ' name ' ] == record_name and rr [ ' type ' ] == record_type , rrset ) )
2021-11-30 09:02:37 +00:00
# out_changes is a list of HistoryRecordEntry objects in which we will append the new changes
2022-05-18 22:53:35 +00:00
# a HistoryRecordEntry represents a pair of add_rrset and del_rrset
2023-04-04 12:32:52 +00:00
def extract_changelogs_from_history ( histories , record_name = None , record_type = None ) :
out_changes = [ ]
for entry in histories :
changes = [ ]
if " add_rrsets " in entry . detail :
details = json . loads ( entry . detail )
if not details [ ' add_rrsets ' ] and not details [ ' del_rrsets ' ] :
2021-11-30 09:02:37 +00:00
continue
2023-04-04 12:32:52 +00:00
else : # not a record entry
continue
# filter only the records with the specific record_name, record_type
if record_name != None and record_type != None :
details [ ' add_rrsets ' ] = list ( filter_rr_list_by_name_and_type ( details [ ' add_rrsets ' ] , record_name , record_type ) )
details [ ' del_rrsets ' ] = list ( filter_rr_list_by_name_and_type ( details [ ' del_rrsets ' ] , record_name , record_type ) )
if not details [ ' add_rrsets ' ] and not details [ ' del_rrsets ' ] :
2021-11-30 09:02:37 +00:00
continue
2023-04-04 12:32:52 +00:00
# same record name and type RR are being deleted and created in same entry.
del_add_changes = set ( [ ( r [ ' name ' ] , r [ ' type ' ] ) for r in details [ ' add_rrsets ' ] ] ) . intersection ( [ ( r [ ' name ' ] , r [ ' type ' ] ) for r in details [ ' del_rrsets ' ] ] )
for del_add_change in del_add_changes :
changes . append ( HistoryRecordEntry (
entry ,
filter_rr_list_by_name_and_type ( details [ ' del_rrsets ' ] , del_add_change [ 0 ] , del_add_change [ 1 ] ) . pop ( 0 ) ,
filter_rr_list_by_name_and_type ( details [ ' add_rrsets ' ] , del_add_change [ 0 ] , del_add_change [ 1 ] ) . pop ( 0 ) ,
" * " )
)
for rrset in details [ ' add_rrsets ' ] :
if ( rrset [ ' name ' ] , rrset [ ' type ' ] ) not in del_add_changes :
changes . append ( HistoryRecordEntry ( entry , { } , rrset , " + " ) )
for rrset in details [ ' del_rrsets ' ] :
if ( rrset [ ' name ' ] , rrset [ ' type ' ] ) not in del_add_changes :
changes . append ( HistoryRecordEntry ( entry , rrset , { } , " - " ) )
# sort changes by the record name
if changes :
changes . sort ( key = lambda change :
change . del_rrset [ ' name ' ] if change . del_rrset else change . add_rrset [ ' name ' ]
)
out_changes . extend ( changes )
return out_changes
2021-11-30 09:02:37 +00:00
# records with same (name,type) are considered as a single HistoryRecordEntry
# history_entry is of type History - used to extract created_by and created_on
2022-05-18 22:53:35 +00:00
# add_rrset is a dictionary of replace
# del_rrset is a dictionary of remove
2021-11-30 09:02:37 +00:00
class HistoryRecordEntry :
2022-05-18 22:53:35 +00:00
def __init__ ( self , history_entry , del_rrset , add_rrset , change_type ) :
# search the add_rrset index into the add_rrset set for the key (name, type)
2021-11-30 09:02:37 +00:00
self . history_entry = history_entry
2022-05-18 22:53:35 +00:00
self . add_rrset = add_rrset
self . del_rrset = del_rrset
2021-11-30 09:02:37 +00:00
self . change_type = change_type # "*": edit or unchanged, "+" new tuple(name,type), "-" deleted (name,type) tuple
2023-02-19 20:04:30 +00:00
self . changed_fields = [ ] # contains a subset of : [ttl, name, type]
self . changeSet = [ ] # all changes for the records of this add_rrset-del_rrset pair
2021-11-30 09:02:37 +00:00
2023-04-04 12:32:52 +00:00
if change_type == " + " or change_type == " - " :
2021-11-30 09:02:37 +00:00
self . changed_fields . append ( " name " )
self . changed_fields . append ( " type " )
self . changed_fields . append ( " ttl " )
elif change_type == " * " : # edit of unchanged
2022-05-18 22:53:35 +00:00
if add_rrset [ ' ttl ' ] != del_rrset [ ' ttl ' ] :
2021-11-30 09:02:37 +00:00
self . changed_fields . append ( " ttl " )
2023-04-04 12:32:52 +00:00
self . changeSet = get_record_changes ( del_rrset , add_rrset )
2021-12-03 14:12:11 +00:00
2021-11-30 09:02:37 +00:00
def toDict ( self ) :
return {
2023-02-19 20:04:30 +00:00
" add_rrset " : self . add_rrset ,
" del_rrset " : self . del_rrset ,
" changed_fields " : self . changed_fields ,
" created_on " : self . history_entry . created_on ,
" created_by " : self . history_entry . created_by ,
" change_type " : self . change_type ,
" changeSet " : self . changeSet
2021-11-30 09:02:37 +00:00
}
2023-02-19 20:04:30 +00:00
def __eq__ ( self , obj2 ) : # used for removal of objects from a list
2021-11-30 09:02:37 +00:00
return True if obj2 . toDict ( ) == self . toDict ( ) else False
2019-12-02 03:32:03 +00:00
2023-02-19 20:04:30 +00:00
2019-12-18 08:25:20 +00:00
@admin_bp.before_request
def before_request ( ) :
# Manage session timeout
session . permanent = True
2021-11-30 09:02:37 +00:00
# current_app.permanent_session_lifetime = datetime.timedelta(
# minutes=int(Setting().get('session_timeout')))
2019-12-18 08:25:20 +00:00
current_app . permanent_session_lifetime = datetime . timedelta (
2023-02-19 20:04:30 +00:00
minutes = int ( Setting ( ) . get ( ' session_timeout ' ) ) )
2019-12-18 08:25:20 +00:00
session . modified = True
2023-02-19 20:04:30 +00:00
@admin_bp.route ( ' /server/statistics ' , methods = [ ' GET ' ] )
2019-12-02 03:32:03 +00:00
@login_required
@operator_role_required
2023-02-19 20:04:30 +00:00
def server_statistics ( ) :
2019-12-02 03:32:03 +00:00
if not Setting ( ) . get ( ' pdns_api_url ' ) or not Setting ( ) . get (
' pdns_api_key ' ) or not Setting ( ) . get ( ' pdns_version ' ) :
return redirect ( url_for ( ' admin.setting_pdns ' ) )
domains = Domain . query . all ( )
users = User . query . all ( )
server = Server ( server_id = ' localhost ' )
statistics = server . get_statistic ( )
history_number = History . query . count ( )
if statistics :
uptime = list ( [
uptime for uptime in statistics if uptime [ ' name ' ] == ' uptime '
] ) [ 0 ] [ ' value ' ]
else :
uptime = 0
2023-02-19 20:04:30 +00:00
return render_template ( ' admin_server_statistics.html ' ,
2019-12-02 03:32:03 +00:00
domains = domains ,
users = users ,
statistics = statistics ,
uptime = uptime ,
history_number = history_number )
2023-02-19 20:04:30 +00:00
@admin_bp.route ( ' /server/configuration ' , methods = [ ' GET ' ] )
@login_required
@operator_role_required
def server_configuration ( ) :
if not Setting ( ) . get ( ' pdns_api_url ' ) or not Setting ( ) . get (
' pdns_api_key ' ) or not Setting ( ) . get ( ' pdns_version ' ) :
return redirect ( url_for ( ' admin.setting_pdns ' ) )
domains = Domain . query . all ( )
users = User . query . all ( )
server = Server ( server_id = ' localhost ' )
configs = server . get_config ( )
history_number = History . query . count ( )
return render_template ( ' admin_server_configuration.html ' ,
domains = domains ,
users = users ,
configs = configs ,
history_number = history_number )
2019-12-02 03:32:03 +00:00
@admin_bp.route ( ' /user/edit/<user_username> ' , methods = [ ' GET ' , ' POST ' ] )
@admin_bp.route ( ' /user/edit ' , methods = [ ' GET ' , ' POST ' ] )
@login_required
@operator_role_required
def edit_user ( user_username = None ) :
if user_username :
user = User . query . filter ( User . username == user_username ) . first ( )
create = False
if not user :
return render_template ( ' errors/404.html ' ) , 404
if user . role . name == ' Administrator ' and current_user . role . name != ' Administrator ' :
return render_template ( ' errors/401.html ' ) , 401
else :
user = None
create = True
if request . method == ' GET ' :
return render_template ( ' admin_edit_user.html ' ,
user = user ,
create = create )
elif request . method == ' POST ' :
fdata = request . form
if create :
2021-11-05 15:04:35 +00:00
user_username = fdata . get ( ' username ' , ' ' ) . strip ( )
2019-12-02 03:32:03 +00:00
user = User ( username = user_username ,
2021-11-05 15:04:35 +00:00
plain_text_password = fdata . get ( ' password ' , ' ' ) ,
firstname = fdata . get ( ' firstname ' , ' ' ) . strip ( ) ,
lastname = fdata . get ( ' lastname ' , ' ' ) . strip ( ) ,
email = fdata . get ( ' email ' , ' ' ) . strip ( ) ,
2019-12-02 03:32:03 +00:00
reload_info = False )
if create :
2021-11-05 15:04:35 +00:00
if not fdata . get ( ' password ' , ' ' ) :
2019-12-02 03:32:03 +00:00
return render_template ( ' admin_edit_user.html ' ,
user = user ,
create = create ,
blank_password = True )
result = user . create_local_user ( )
history = History ( msg = ' Created user {0} ' . format ( user . username ) ,
created_by = current_user . username )
else :
result = user . update_local_user ( )
history = History ( msg = ' Updated user {0} ' . format ( user . username ) ,
created_by = current_user . username )
if result [ ' status ' ] :
history . add ( )
return redirect ( url_for ( ' admin.manage_user ' ) )
return render_template ( ' admin_edit_user.html ' ,
user = user ,
create = create ,
error = result [ ' msg ' ] )
2023-02-19 20:04:30 +00:00
2020-08-08 13:41:18 +00:00
@admin_bp.route ( ' /key/edit/<key_id> ' , methods = [ ' GET ' , ' POST ' ] )
@admin_bp.route ( ' /key/edit ' , methods = [ ' GET ' , ' POST ' ] )
@login_required
@operator_role_required
def edit_key ( key_id = None ) :
domains = Domain . query . all ( )
2021-12-03 14:12:11 +00:00
accounts = Account . query . all ( )
2020-08-08 13:41:18 +00:00
roles = Role . query . all ( )
apikey = None
create = True
plain_key = None
if key_id :
apikey = ApiKey . query . filter ( ApiKey . id == key_id ) . first ( )
create = False
if not apikey :
return render_template ( ' errors/404.html ' ) , 404
if request . method == ' GET ' :
return render_template ( ' admin_edit_key.html ' ,
key = apikey ,
domains = domains ,
2021-12-03 14:12:11 +00:00
accounts = accounts ,
2020-08-08 13:41:18 +00:00
roles = roles ,
create = create )
if request . method == ' POST ' :
fdata = request . form
description = fdata [ ' description ' ]
role = fdata . getlist ( ' key_role ' ) [ 0 ]
2021-12-03 14:12:11 +00:00
domain_list = fdata . getlist ( ' key_multi_domain ' )
account_list = fdata . getlist ( ' key_multi_account ' )
2020-08-08 13:41:18 +00:00
# Create new apikey
if create :
2021-12-03 14:12:11 +00:00
if role == " User " :
domain_obj_list = Domain . query . filter ( Domain . name . in_ ( domain_list ) ) . all ( )
account_obj_list = Account . query . filter ( Account . name . in_ ( account_list ) ) . all ( )
else :
account_obj_list , domain_obj_list = [ ] , [ ]
2020-08-08 13:41:18 +00:00
apikey = ApiKey ( desc = description ,
role_name = role ,
2021-12-03 14:12:11 +00:00
domains = domain_obj_list ,
accounts = account_obj_list )
2020-08-08 13:41:18 +00:00
try :
apikey . create ( )
except Exception as e :
current_app . logger . error ( ' Error: {0} ' . format ( e ) )
raise ApiKeyCreateFail ( message = ' Api key create failed ' )
plain_key = apikey_plain_schema . dump ( [ apikey ] ) [ 0 ] [ " plain_key " ]
plain_key = b64encode ( plain_key . encode ( ' utf-8 ' ) ) . decode ( ' utf-8 ' )
2023-02-19 20:04:30 +00:00
history_message = " Created API key {0} " . format ( apikey . id )
2020-08-08 13:41:18 +00:00
# Update existing apikey
else :
try :
2021-12-03 14:12:11 +00:00
if role != " User " :
domain_list , account_list = [ ] , [ ]
2023-02-19 20:04:30 +00:00
apikey . update ( role , description , domain_list , account_list )
history_message = " Updated API key {0} " . format ( apikey . id )
2020-08-08 13:41:18 +00:00
except Exception as e :
current_app . logger . error ( ' Error: {0} ' . format ( e ) )
history = History ( msg = history_message ,
2023-02-19 20:04:30 +00:00
detail = json . dumps ( {
' key ' : apikey . id ,
' role ' : apikey . role . name ,
' description ' : apikey . description ,
' domains ' : [ domain . name for domain in apikey . domains ] ,
' accounts ' : [ a . name for a in apikey . accounts ]
} ) ,
2020-08-08 13:41:18 +00:00
created_by = current_user . username )
history . add ( )
2021-12-03 14:12:11 +00:00
2020-08-08 13:41:18 +00:00
return render_template ( ' admin_edit_key.html ' ,
key = apikey ,
domains = domains ,
2021-12-03 14:12:11 +00:00
accounts = accounts ,
2020-08-08 13:41:18 +00:00
roles = roles ,
create = create ,
plain_key = plain_key )
2023-02-19 20:04:30 +00:00
2020-08-08 13:41:18 +00:00
@admin_bp.route ( ' /manage-keys ' , methods = [ ' GET ' , ' POST ' ] )
@login_required
@operator_role_required
def manage_keys ( ) :
if request . method == ' GET ' :
try :
apikeys = ApiKey . query . all ( )
except Exception as e :
current_app . logger . error ( ' Error: {0} ' . format ( e ) )
abort ( 500 )
return render_template ( ' admin_manage_keys.html ' ,
2023-02-19 20:04:30 +00:00
keys = apikeys )
2020-08-08 13:41:18 +00:00
elif request . method == ' POST ' :
jdata = request . json
if jdata [ ' action ' ] == ' delete_key ' :
apikey = ApiKey . query . get ( jdata [ ' data ' ] )
try :
history_apikey_id = apikey . id
history_apikey_role = apikey . role . name
history_apikey_description = apikey . description
2023-02-19 20:04:30 +00:00
history_apikey_domains = [ domain . name for domain in apikey . domains ]
2021-12-03 14:12:11 +00:00
2020-08-08 13:41:18 +00:00
apikey . delete ( )
except Exception as e :
current_app . logger . error ( ' Error: {0} ' . format ( e ) )
current_app . logger . info ( ' Delete API key {0} ' . format ( apikey . id ) )
history = History ( msg = ' Delete API key {0} ' . format ( apikey . id ) ,
2023-02-19 20:04:30 +00:00
detail = json . dumps ( {
' key ' : history_apikey_id ,
' role ' : history_apikey_role ,
' description ' : history_apikey_description ,
' domains ' : history_apikey_domains
} ) ,
2020-08-08 13:41:18 +00:00
created_by = current_user . username )
history . add ( )
return make_response (
2023-02-19 20:04:30 +00:00
jsonify ( {
' status ' : ' ok ' ,
' msg ' : ' Key has been removed. '
} ) , 200 )
2019-12-02 03:32:03 +00:00
@admin_bp.route ( ' /manage-user ' , methods = [ ' GET ' , ' POST ' ] )
@login_required
@operator_role_required
def manage_user ( ) :
if request . method == ' GET ' :
roles = Role . query . all ( )
users = User . query . order_by ( User . username ) . all ( )
return render_template ( ' admin_manage_user.html ' ,
users = users ,
roles = roles )
if request . method == ' POST ' :
#
# post data should in format
# {'action': 'delete_user', 'data': 'username'}
#
try :
jdata = request . json
data = jdata [ ' data ' ]
if jdata [ ' action ' ] == ' user_otp_disable ' :
user = User ( username = data )
result = user . update_profile ( enable_otp = False )
if result :
history = History (
msg = ' Two factor authentication disabled for user {0} ' .
format ( data ) ,
created_by = current_user . username )
history . add ( )
return make_response (
jsonify ( {
' status ' :
2023-02-19 20:04:30 +00:00
' ok ' ,
2019-12-02 03:32:03 +00:00
' msg ' :
2023-02-19 20:04:30 +00:00
' Two factor authentication has been disabled for user. '
2019-12-02 03:32:03 +00:00
} ) , 200 )
else :
return make_response (
jsonify ( {
' status ' :
2023-02-19 20:04:30 +00:00
' error ' ,
2019-12-02 03:32:03 +00:00
' msg ' :
2023-02-19 20:04:30 +00:00
' Cannot disable two factor authentication for user. '
2019-12-02 03:32:03 +00:00
} ) , 500 )
elif jdata [ ' action ' ] == ' delete_user ' :
user = User ( username = data )
if user . username == current_user . username :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' You cannot delete yourself. '
} ) , 400 )
# Remove account associations first
user_accounts = Account . query . join ( AccountUser ) . join (
User ) . filter ( AccountUser . user_id == user . id ,
AccountUser . account_id == Account . id ) . all ( )
for uc in user_accounts :
uc . revoke_privileges_by_id ( user . id )
# Then delete the user
result = user . delete ( )
if result :
2019-12-16 04:01:16 +00:00
history = History ( msg = ' Delete user {0} ' . format ( data ) ,
2019-12-02 03:32:03 +00:00
created_by = current_user . username )
history . add ( )
return make_response (
jsonify ( {
' status ' : ' ok ' ,
' msg ' : ' User has been removed. '
} ) , 200 )
else :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' Cannot remove user. '
} ) , 500 )
elif jdata [ ' action ' ] == ' revoke_user_privileges ' :
user = User ( username = data )
result = user . revoke_privilege ( )
if result :
history = History (
msg = ' Revoke {0} user privileges ' . format ( data ) ,
created_by = current_user . username )
history . add ( )
return make_response (
jsonify ( {
' status ' : ' ok ' ,
' msg ' : ' Revoked user privileges. '
} ) , 200 )
else :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' Cannot revoke user privilege. '
} ) , 500 )
elif jdata [ ' action ' ] == ' update_user_role ' :
username = data [ ' username ' ]
role_name = data [ ' role_name ' ]
if username == current_user . username :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' You cannot change you own roles. '
} ) , 400 )
user = User . query . filter ( User . username == username ) . first ( )
if not user :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' User does not exist. '
} ) , 404 )
if user . role . name == ' Administrator ' and current_user . role . name != ' Administrator ' :
return make_response (
jsonify ( {
' status ' :
2023-02-19 20:04:30 +00:00
' error ' ,
2019-12-02 03:32:03 +00:00
' msg ' :
2023-02-19 20:04:30 +00:00
' You do not have permission to change Administrator users role. '
2019-12-02 03:32:03 +00:00
} ) , 400 )
if role_name == ' Administrator ' and current_user . role . name != ' Administrator ' :
return make_response (
jsonify ( {
' status ' :
2023-02-19 20:04:30 +00:00
' error ' ,
2019-12-02 03:32:03 +00:00
' msg ' :
2023-02-19 20:04:30 +00:00
' You do not have permission to promote a user to Administrator role. '
2019-12-02 03:32:03 +00:00
} ) , 400 )
user = User ( username = username )
result = user . set_role ( role_name )
if result [ ' status ' ] :
history = History (
msg = ' Change user role of {0} to {1} ' . format (
username , role_name ) ,
created_by = current_user . username )
history . add ( )
return make_response (
jsonify ( {
' status ' : ' ok ' ,
' msg ' : ' Changed user role successfully. '
} ) , 200 )
else :
return make_response (
jsonify ( {
' status ' :
2023-02-19 20:04:30 +00:00
' error ' ,
2019-12-02 03:32:03 +00:00
' msg ' :
2023-02-19 20:04:30 +00:00
' Cannot change user role. {0} ' . format (
result [ ' msg ' ] )
2019-12-02 03:32:03 +00:00
} ) , 500 )
else :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' Action not supported. '
} ) , 400 )
except Exception as e :
current_app . logger . error (
' Cannot update user. Error: {0} ' . format ( e ) )
current_app . logger . debug ( traceback . format_exc ( ) )
return make_response (
jsonify ( {
' status ' :
2023-02-19 20:04:30 +00:00
' error ' ,
2019-12-02 03:32:03 +00:00
' msg ' :
2023-02-19 20:04:30 +00:00
' There is something wrong, please contact Administrator. '
2019-12-02 03:32:03 +00:00
} ) , 400 )
@admin_bp.route ( ' /account/edit/<account_name> ' , methods = [ ' GET ' , ' POST ' ] )
@admin_bp.route ( ' /account/edit ' , methods = [ ' GET ' , ' POST ' ] )
@login_required
@operator_role_required
def edit_account ( account_name = None ) :
users = User . query . all ( )
2022-06-28 09:15:43 +00:00
account = Account . query . filter (
Account . name == account_name ) . first ( )
all_accounts = Account . query . all ( )
2022-06-29 06:56:01 +00:00
accounts = { acc . id : acc for acc in all_accounts }
2022-06-28 09:15:43 +00:00
domains = Domain . query . all ( )
2019-12-02 03:32:03 +00:00
if request . method == ' GET ' :
2022-06-28 09:15:43 +00:00
if account_name is None or not account :
2019-12-02 03:32:03 +00:00
return render_template ( ' admin_edit_account.html ' ,
2022-06-28 09:15:43 +00:00
account = None ,
2021-08-05 17:41:28 +00:00
account_user_ids = [ ] ,
2019-12-02 03:32:03 +00:00
users = users ,
2022-06-28 09:15:43 +00:00
domains = domains ,
accounts = accounts ,
2019-12-02 03:32:03 +00:00
create = 1 )
else :
account = Account . query . filter (
Account . name == account_name ) . first ( )
account_user_ids = account . get_user ( )
return render_template ( ' admin_edit_account.html ' ,
account = account ,
account_user_ids = account_user_ids ,
users = users ,
2022-06-28 09:15:43 +00:00
domains = domains ,
accounts = accounts ,
2019-12-02 03:32:03 +00:00
create = 0 )
if request . method == ' POST ' :
fdata = request . form
new_user_list = request . form . getlist ( ' account_multi_user ' )
2022-06-28 09:15:43 +00:00
new_domain_list = request . form . getlist ( ' account_domains ' )
2019-12-02 03:32:03 +00:00
# on POST, synthesize account and account_user_ids from form data
if not account_name :
account_name = fdata [ ' accountname ' ]
account = Account ( name = account_name ,
description = fdata [ ' accountdescription ' ] ,
contact = fdata [ ' accountcontact ' ] ,
mail = fdata [ ' accountmail ' ] )
account_user_ids = [ ]
for username in new_user_list :
userid = User ( username = username ) . get_user_info_by_username ( ) . id
account_user_ids . append ( userid )
create = int ( fdata [ ' create ' ] )
if create :
# account __init__ sanitizes and lowercases the name, so to manage expectations
# we let the user reenter the name until it's not empty and it's valid (ignoring the case)
if account . name == " " or account . name != account_name . lower ( ) :
return render_template ( ' admin_edit_account.html ' ,
account = account ,
account_user_ids = account_user_ids ,
users = users ,
2022-06-28 09:15:43 +00:00
domains = domains ,
accounts = accounts ,
2019-12-02 03:32:03 +00:00
create = create ,
invalid_accountname = True )
if Account . query . filter ( Account . name == account . name ) . first ( ) :
return render_template ( ' admin_edit_account.html ' ,
account = account ,
account_user_ids = account_user_ids ,
users = users ,
2022-06-28 09:15:43 +00:00
domains = domains ,
accounts = accounts ,
2019-12-02 03:32:03 +00:00
create = create ,
duplicate_accountname = True )
result = account . create_account ( )
else :
result = account . update_account ( )
if result [ ' status ' ] :
2022-06-28 09:15:43 +00:00
account = Account . query . filter (
Account . name == account_name ) . first ( )
old_domains = Domain . query . filter ( Domain . account_id == account . id ) . all ( )
for domain_name in new_domain_list :
domain = Domain . query . filter (
Domain . name == domain_name ) . first ( )
if account . id != domain . account_id :
Domain ( name = domain_name ) . assoc_account ( account . id )
2023-02-19 20:04:30 +00:00
2022-06-28 09:15:43 +00:00
for domain in old_domains :
if domain . name not in new_domain_list :
Domain ( name = domain . name ) . assoc_account ( None )
history = History ( msg = ' {0} account {1} ' . format ( ' Create ' if create else ' Update ' , account . name ) ,
created_by = current_user . username )
2019-12-02 03:32:03 +00:00
account . grant_privileges ( new_user_list )
history . add ( )
return redirect ( url_for ( ' admin.manage_account ' ) )
return render_template ( ' admin_edit_account.html ' ,
account = account ,
account_user_ids = account_user_ids ,
users = users ,
create = create ,
error = result [ ' msg ' ] )
@admin_bp.route ( ' /manage-account ' , methods = [ ' GET ' , ' POST ' ] )
@login_required
@operator_role_required
def manage_account ( ) :
if request . method == ' GET ' :
accounts = Account . query . order_by ( Account . name ) . all ( )
for account in accounts :
account . user_num = AccountUser . query . filter (
AccountUser . account_id == account . id ) . count ( )
return render_template ( ' admin_manage_account.html ' , accounts = accounts )
if request . method == ' POST ' :
#
# post data should in format
# {'action': 'delete_account', 'data': 'accountname'}
#
try :
jdata = request . json
data = jdata [ ' data ' ]
if jdata [ ' action ' ] == ' delete_account ' :
account = Account . query . filter ( Account . name == data ) . first ( )
if not account :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' Account not found. '
} ) , 404 )
# Remove account association from domains first
for domain in account . domains :
Domain ( name = domain . name ) . assoc_account ( None )
# Then delete the account
result = account . delete_account ( )
if result :
history = History ( msg = ' Delete account {0} ' . format ( data ) ,
created_by = current_user . username )
history . add ( )
return make_response (
jsonify ( {
' status ' : ' ok ' ,
' msg ' : ' Account has been removed. '
} ) , 200 )
else :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' Cannot remove account. '
} ) , 500 )
else :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' Action not supported. '
} ) , 400 )
except Exception as e :
current_app . logger . error (
' Cannot update account. Error: {0} ' . format ( e ) )
current_app . logger . debug ( traceback . format_exc ( ) )
return make_response (
jsonify ( {
' status ' :
2023-02-19 20:04:30 +00:00
' error ' ,
2019-12-02 03:32:03 +00:00
' msg ' :
2023-02-19 20:04:30 +00:00
' There is something wrong, please contact Administrator. '
2019-12-02 03:32:03 +00:00
} ) , 400 )
2021-12-04 14:36:05 +00:00
2021-11-30 09:02:37 +00:00
class DetailedHistory ( ) :
2021-12-04 14:36:05 +00:00
def __init__ ( self , history , change_set ) :
self . history = history
self . detailed_msg = " "
self . change_set = change_set
if not history . detail :
self . detailed_msg = " "
return
2022-04-05 08:50:15 +00:00
detail_dict = json . loads ( history . detail )
2021-12-04 14:36:05 +00:00
2023-03-12 19:41:10 +00:00
if ' domain_type ' in detail_dict and ' account_id ' in detail_dict : # this is a zone creation
2021-12-04 14:36:05 +00:00
self . detailed_msg = render_template_string ( """
< table class = " table table-bordered table-striped " >
2023-03-12 19:41:10 +00:00
< tr > < td > Zone Type : < / td > < td > { { domaintype } } < / td > < / tr >
2021-12-04 14:36:05 +00:00
< tr > < td > Account : < / td > < td > { { account } } < / td > < / tr >
< / table >
""" ,
2023-02-19 20:04:30 +00:00
domaintype = detail_dict [ ' domain_type ' ] ,
account = Account . get_name_by_id ( self = None , account_id = detail_dict [
' account_id ' ] ) if detail_dict [
' account_id ' ] != " 0 " else " None " )
2021-12-04 14:36:05 +00:00
2023-02-19 20:04:30 +00:00
elif ' authenticator ' in detail_dict : # this is a user authentication
2021-12-04 14:36:05 +00:00
self . detailed_msg = render_template_string ( """
2023-02-18 16:04:14 +00:00
< table class = " table table-bordered table-striped " " >
< tbody >
2023-02-17 22:47:23 +00:00
< tr >
2023-02-18 16:04:14 +00:00
< td > Username : < / td >
< td > { { username } } < / td >
< / tr >
< tr >
< td > Authentication Result : < / td >
< td > { { auth_result } } < / td >
2021-12-04 14:36:05 +00:00
< / tr >
< tr >
< td > Authenticator Type : < / td >
2023-02-18 16:04:14 +00:00
< td > { { authenticator } } < / td >
2021-12-04 14:36:05 +00:00
< / tr >
< tr >
2023-02-18 16:04:14 +00:00
< td > IP Address : < / td >
< td > { { ip_address } } < / td >
2021-12-04 14:36:05 +00:00
< / tr >
< / tbody >
< / table >
""" ,
2023-02-19 20:04:30 +00:00
background_rgba = " 68,157,68 " if detail_dict [
' success ' ] == 1 else " 201,48,44 " ,
username = detail_dict [ ' username ' ] ,
auth_result = " success " if detail_dict [
' success ' ] == 1 else " failure " ,
authenticator = detail_dict [ ' authenticator ' ] ,
ip_address = detail_dict [ ' ip_address ' ] )
2023-03-12 19:41:10 +00:00
elif ' add_rrsets ' in detail_dict : # this is a zone record change
2021-12-04 14:36:05 +00:00
self . detailed_msg = " "
2023-02-19 20:04:30 +00:00
elif ' name ' in detail_dict and ' template ' in history . msg : # template creation / deletion
2021-12-04 14:36:05 +00:00
self . detailed_msg = render_template_string ( """
< table class = " table table-bordered table-striped " >
< tr > < td > Template name : < / td > < td > { { template_name } } < / td > < / tr >
< tr > < td > Description : < / td > < td > { { description } } < / td > < / tr >
< / table >
""" ,
2023-02-19 20:04:30 +00:00
template_name = DetailedHistory . get_key_val ( detail_dict , " name " ) ,
description = DetailedHistory . get_key_val ( detail_dict ,
" description " ) )
2021-12-04 14:36:05 +00:00
2023-03-16 17:36:47 +00:00
elif any ( msg in history . msg for msg in [ ' Change zone ' , ' Change domain ' ] ) and ' access control ' in history . msg : # added or removed a user from a zone
2021-12-04 14:36:05 +00:00
users_with_access = DetailedHistory . get_key_val ( detail_dict , " user_has_access " )
self . detailed_msg = render_template_string ( """
< table class = " table table-bordered table-striped " >
2023-03-12 19:41:10 +00:00
< tr > < td > Users with access to this zone < / td > < td > { { users_with_access } } < / td > < / tr >
2021-12-04 14:36:05 +00:00
< tr > < td > Number of users : < / td > < td > { { users_with_access | length } } < / td > < tr >
< / table >
""" ,
2023-02-19 20:04:30 +00:00
users_with_access = users_with_access )
2021-12-04 14:36:05 +00:00
elif ' Created API key ' in history . msg or ' Updated API key ' in history . msg :
self . detailed_msg = render_template_string ( """
< table class = " table table-bordered table-striped " >
< tr > < td > Key : < / td > < td > { { keyname } } < / td > < / tr >
< tr > < td > Role : < / td > < td > { { rolename } } < / td > < / tr >
< tr > < td > Description : < / td > < td > { { description } } < / td > < / tr >
2023-03-12 19:41:10 +00:00
< tr > < td > Accessible zones with this API key : < / td > < td > { { linked_domains } } < / td > < / tr >
2021-12-04 14:36:05 +00:00
< tr > < td > Accessible accounts with this API key : < / td > < td > { { linked_accounts } } < / td > < / tr >
< / table >
""" ,
2023-02-19 20:04:30 +00:00
keyname = DetailedHistory . get_key_val ( detail_dict , " key " ) ,
rolename = DetailedHistory . get_key_val ( detail_dict , " role " ) ,
description = DetailedHistory . get_key_val ( detail_dict ,
" description " ) ,
linked_domains = DetailedHistory . get_key_val ( detail_dict ,
" domains " if " domains " in detail_dict else " domain_acl " ) ,
linked_accounts = DetailedHistory . get_key_val ( detail_dict ,
" accounts " ) )
2021-12-04 14:36:05 +00:00
elif ' Delete API key ' in history . msg :
self . detailed_msg = render_template_string ( """
< table class = " table table-bordered table-striped " >
< tr > < td > Key : < / td > < td > { { keyname } } < / td > < / tr >
< tr > < td > Role : < / td > < td > { { rolename } } < / td > < / tr >
< tr > < td > Description : < / td > < td > { { description } } < / td > < / tr >
2023-03-12 19:41:10 +00:00
< tr > < td > Accessible zones with this API key : < / td > < td > { { linked_domains } } < / td > < / tr >
2021-12-04 14:36:05 +00:00
< / table >
""" ,
2023-02-19 20:04:30 +00:00
keyname = DetailedHistory . get_key_val ( detail_dict , " key " ) ,
rolename = DetailedHistory . get_key_val ( detail_dict , " role " ) ,
description = DetailedHistory . get_key_val ( detail_dict ,
" description " ) ,
linked_domains = DetailedHistory . get_key_val ( detail_dict ,
" domains " ) )
2021-12-04 14:36:05 +00:00
2023-03-16 17:36:47 +00:00
elif any ( msg in history . msg for msg in [ ' Update type for zone ' , ' Update type for domain ' ] ) :
2021-12-04 14:36:05 +00:00
self . detailed_msg = render_template_string ( """
< table class = " table table-bordered table-striped " >
2023-03-12 19:41:10 +00:00
< tr > < td > Zone : < / td > < td > { { domain } } < / td > < / tr >
< tr > < td > Zone type : < / td > < td > { { domain_type } } < / td > < / tr >
2021-12-04 14:36:05 +00:00
< tr > < td > Masters : < / td > < td > { { masters } } < / td > < / tr >
< / table >
""" ,
2023-02-19 20:04:30 +00:00
domain = DetailedHistory . get_key_val ( detail_dict , " domain " ) ,
domain_type = DetailedHistory . get_key_val ( detail_dict , " type " ) ,
masters = DetailedHistory . get_key_val ( detail_dict , " masters " ) )
2021-12-04 14:36:05 +00:00
elif ' reverse ' in history . msg :
self . detailed_msg = render_template_string ( """
< table class = " table table-bordered table-striped " >
2023-03-12 19:41:10 +00:00
< tr > < td > Zone Type : < / td > < td > { { domain_type } } < / td > < / tr >
< tr > < td > Zone Master IPs : < / td > < td > { { domain_master_ips } } < / td > < / tr >
2021-12-04 14:36:05 +00:00
< / table >
""" ,
2023-02-19 20:04:30 +00:00
domain_type = DetailedHistory . get_key_val ( detail_dict ,
" domain_type " ) ,
domain_master_ips = DetailedHistory . get_key_val ( detail_dict ,
" domain_master_ips " ) )
2021-12-04 14:36:05 +00:00
2022-04-04 15:22:41 +00:00
elif DetailedHistory . get_key_val ( detail_dict , ' msg ' ) and DetailedHistory . get_key_val ( detail_dict , ' status ' ) :
self . detailed_msg = render_template_string ( '''
< table class = " table table-bordered table-striped " >
< tr > < td > Status : < / td > < td > { { history_status } } < / td > < / tr >
< tr > < td > Message : < / td > < td > { { history_msg } } < / td > < / tr >
< / table >
''' ,
2023-02-19 20:04:30 +00:00
history_status = DetailedHistory . get_key_val ( detail_dict ,
' status ' ) ,
history_msg = DetailedHistory . get_key_val ( detail_dict , ' msg ' ) )
2023-03-16 17:36:47 +00:00
elif any ( msg in history . msg for msg in [ ' Update zone ' , ' Update domain ' ] ) and ' associate account ' in history . msg : # When an account gets associated or dissociate with zones
2022-06-28 09:16:45 +00:00
self . detailed_msg = render_template_string ( '''
< table class = " table table-bordered table-striped " >
< tr > < td > Associate : < / td > < td > { { history_assoc_account } } < / td > < / tr >
< tr > < td > Dissociate : < / td > < td > { { history_dissoc_account } } < / td > < / tr >
< / table >
''' ,
2023-02-19 20:04:30 +00:00
history_assoc_account = DetailedHistory . get_key_val ( detail_dict ,
' assoc_account ' ) ,
history_dissoc_account = DetailedHistory . get_key_val ( detail_dict ,
' dissoc_account ' ) )
2022-04-04 15:22:41 +00:00
2021-12-04 14:36:05 +00:00
# check for lower key as well for old databases
@staticmethod
def get_key_val ( _dict , key ) :
2021-12-04 16:38:48 +00:00
return str ( _dict . get ( key , _dict . get ( key . title ( ) , ' ' ) ) )
2021-12-04 14:36:05 +00:00
2021-11-30 09:02:37 +00:00
# convert a list of History objects into DetailedHistory objects
def convert_histories ( histories ) :
2023-02-19 20:04:30 +00:00
detailedHistories = [ ]
2023-04-04 12:32:52 +00:00
for history in histories :
if history . detail and ( ' add_rrsets ' in history . detail or ' del_rrsets ' in history . detail ) :
history_as_list = list ( )
history_as_list . append ( history )
detailedHistories . append ( DetailedHistory ( history , extract_changelogs_from_history ( history_as_list ) ) )
2023-02-19 20:04:30 +00:00
else :
2023-04-04 12:32:52 +00:00
detailedHistories . append ( DetailedHistory ( history , None ) )
2023-02-19 20:04:30 +00:00
return detailedHistories
2021-11-30 09:02:37 +00:00
2019-12-02 03:32:03 +00:00
@admin_bp.route ( ' /history ' , methods = [ ' GET ' , ' POST ' ] )
@login_required
2021-03-27 18:33:11 +00:00
@history_access_required
2019-12-02 03:32:03 +00:00
def history ( ) :
2023-02-19 20:04:30 +00:00
if request . method == ' POST ' :
if current_user . role . name != ' Administrator ' :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' You do not have permission to remove history. '
} ) , 401 )
if Setting ( ) . get ( ' preserve_history ' ) :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' History removal is not allowed (toggle preserve_history in settings). '
} ) , 401 )
h = History ( )
result = h . remove_all ( )
if result :
history = History ( msg = ' Remove all histories ' ,
created_by = current_user . username )
history . add ( )
return make_response (
jsonify ( {
' status ' : ' ok ' ,
' msg ' : ' Changed user role successfully. '
} ) , 200 )
else :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' Can not remove histories. '
} ) , 500 )
if request . method == ' GET ' :
doms = accounts = users = " "
if current_user . role . name in [ ' Administrator ' , ' Operator ' ] :
all_domain_names = Domain . query . all ( )
all_account_names = Account . query . all ( )
all_user_names = User . query . all ( )
for d in all_domain_names :
doms + = d . name + " "
for acc in all_account_names :
accounts + = acc . name + " "
for usr in all_user_names :
users + = usr . username + " "
else : # special autocomplete for users
all_domain_names = db . session . query ( Domain ) \
. outerjoin ( DomainUser , Domain . id == DomainUser . domain_id ) \
. outerjoin ( Account , Domain . account_id == Account . id ) \
. outerjoin ( AccountUser , Account . id == AccountUser . account_id ) \
. filter (
db . or_ (
DomainUser . user_id == current_user . id ,
AccountUser . user_id == current_user . id
) ) . all ( )
all_account_names = db . session . query ( Account ) \
. outerjoin ( Domain , Domain . account_id == Account . id ) \
. outerjoin ( DomainUser , Domain . id == DomainUser . domain_id ) \
. outerjoin ( AccountUser , Account . id == AccountUser . account_id ) \
. filter (
db . or_ (
DomainUser . user_id == current_user . id ,
AccountUser . user_id == current_user . id
) ) . all ( )
all_user_names = [ ]
for a in all_account_names :
temp = db . session . query ( User ) \
. join ( AccountUser , AccountUser . user_id == User . id ) \
. outerjoin ( Account , Account . id == AccountUser . account_id ) \
. filter (
db . or_ (
Account . id == a . id ,
AccountUser . account_id == a . id
)
) \
. all ( )
for u in temp :
if u in all_user_names :
continue
all_user_names . append ( u )
for d in all_domain_names :
doms + = d . name + " "
for a in all_account_names :
accounts + = a . name + " "
for u in all_user_names :
users + = u . username + " "
return render_template ( ' admin_history.html ' , all_domain_names = doms , all_account_names = accounts ,
all_usernames = users )
2021-11-30 09:02:37 +00:00
# local_offset is the offset of the utc to the local time
# offset must be int
# return the date converted and simplified
def from_utc_to_local ( local_offset , timeframe ) :
2023-02-19 20:04:30 +00:00
offset = str ( local_offset * ( - 1 ) )
date_split = str ( timeframe ) . split ( " . " ) [ 0 ]
date_converted = datetime . datetime . strptime ( date_split , ' % Y- % m- %d % H: % M: % S ' ) + datetime . timedelta (
minutes = int ( offset ) )
return date_converted
2021-11-30 09:02:37 +00:00
@admin_bp.route ( ' /history_table ' , methods = [ ' GET ' , ' POST ' ] )
@login_required
@history_access_required
2023-02-19 20:04:30 +00:00
def history_table ( ) : # ajax call data
2019-12-02 03:32:03 +00:00
2023-02-19 20:04:30 +00:00
if request . method == ' POST ' :
if current_user . role . name != ' Administrator ' :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' You do not have permission to remove history. '
} ) , 401 )
h = History ( )
result = h . remove_all ( )
if result :
history = History ( msg = ' Remove all histories ' ,
created_by = current_user . username )
history . add ( )
return make_response (
jsonify ( {
' status ' : ' ok ' ,
' msg ' : ' Changed user role successfully. '
} ) , 200 )
else :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' Can not remove histories. '
} ) , 500 )
detailedHistories = [ ]
lim = int ( Setting ( ) . get ( ' max_history_records ' ) ) # max num of records
if request . method == ' GET ' :
if current_user . role . name in [ ' Administrator ' , ' Operator ' ] :
base_query = History . query
else :
# if the user isn't an administrator or operator,
# allow_user_view_history must be enabled to get here,
2023-03-12 19:41:10 +00:00
# so include history for the zones for the user
2023-02-19 20:04:30 +00:00
base_query = db . session . query ( History ) \
. join ( Domain , History . domain_id == Domain . id ) \
. outerjoin ( DomainUser , Domain . id == DomainUser . domain_id ) \
. outerjoin ( Account , Domain . account_id == Account . id ) \
. outerjoin ( AccountUser , Account . id == AccountUser . account_id ) \
. filter (
db . or_ (
DomainUser . user_id == current_user . id ,
AccountUser . user_id == current_user . id
) )
domain_name = request . args . get ( ' domain_name_filter ' ) if request . args . get ( ' domain_name_filter ' ) != None \
and len (
request . args . get ( ' domain_name_filter ' ) ) != 0 else None
account_name = request . args . get ( ' account_name_filter ' ) if request . args . get ( ' account_name_filter ' ) != None \
and len (
request . args . get ( ' account_name_filter ' ) ) != 0 else None
user_name = request . args . get ( ' auth_name_filter ' ) if request . args . get ( ' auth_name_filter ' ) != None \
and len ( request . args . get ( ' auth_name_filter ' ) ) != 0 else None
min_date = request . args . get ( ' min ' ) if request . args . get ( ' min ' ) != None and len (
request . args . get ( ' min ' ) ) != 0 else None
if min_date != None : # get 1 day earlier, to check for timezone errors
min_date = str ( datetime . datetime . strptime ( min_date , ' % Y- % m- %d ' ) - datetime . timedelta ( days = 1 ) )
max_date = request . args . get ( ' max ' ) if request . args . get ( ' max ' ) != None and len (
request . args . get ( ' max ' ) ) != 0 else None
if max_date != None : # get 1 day later, to check for timezone errors
max_date = str ( datetime . datetime . strptime ( max_date , ' % Y- % m- %d ' ) + datetime . timedelta ( days = 1 ) )
tzoffset = request . args . get ( ' tzoffset ' ) if request . args . get ( ' tzoffset ' ) != None and len (
request . args . get ( ' tzoffset ' ) ) != 0 else None
changed_by = request . args . get ( ' user_name_filter ' ) if request . args . get ( ' user_name_filter ' ) != None \
and len (
request . args . get ( ' user_name_filter ' ) ) != 0 else None
"""
Auth methods : LOCAL , Github OAuth , Azure OAuth , SAML , OIDC OAuth , Google OAuth
"""
auth_methods = [ ]
if ( request . args . get ( ' auth_local_only_checkbox ' ) is None \
and request . args . get ( ' auth_oauth_only_checkbox ' ) is None \
and request . args . get ( ' auth_saml_only_checkbox ' ) is None and request . args . get (
' auth_all_checkbox ' ) is None ) :
auth_methods = [ ]
if request . args . get ( ' auth_all_checkbox ' ) == " on " :
auth_methods . append ( " " )
if request . args . get ( ' auth_local_only_checkbox ' ) == " on " :
auth_methods . append ( " LOCAL " )
if request . args . get ( ' auth_oauth_only_checkbox ' ) == " on " :
auth_methods . append ( " OAuth " )
if request . args . get ( ' auth_saml_only_checkbox ' ) == " on " :
auth_methods . append ( " SAML " )
if request . args . get ( ' domain_changelog_only_checkbox ' ) != None :
changelog_only = True if request . args . get ( ' domain_changelog_only_checkbox ' ) == " on " else False
else :
changelog_only = False
# users cannot search for authentication
if user_name != None and current_user . role . name not in [ ' Administrator ' , ' Operator ' ] :
histories = [ ]
elif domain_name != None :
if not changelog_only :
histories = base_query \
. filter (
db . and_ (
db . or_ (
2023-03-16 17:36:47 +00:00
History . msg . like ( " %d omain " + domain_name ) if domain_name != " * " else History . msg . like ( " %d omain % " ) ,
History . msg . like ( " % zone " + domain_name ) if domain_name != " * " else History . msg . like ( " % zone % " ) ,
2023-02-19 20:04:30 +00:00
History . msg . like (
" %d omain " + domain_name + " access control " ) if domain_name != " * " else History . msg . like (
2023-03-16 17:36:47 +00:00
" %d omain %a ccess control " ) ,
2023-02-19 20:04:30 +00:00
History . msg . like (
2023-03-12 19:41:10 +00:00
" % zone " + domain_name + " access control " ) if domain_name != " * " else History . msg . like (
" % zone %a ccess control " )
2023-02-19 20:04:30 +00:00
) ,
History . created_on < = max_date if max_date != None else True ,
History . created_on > = min_date if min_date != None else True ,
History . created_by == changed_by if changed_by != None else True
)
) . order_by ( History . created_on . desc ( ) ) . limit ( lim ) . all ( )
else :
# search for records changes only
histories = base_query \
. filter (
db . and_ (
2023-03-16 17:36:47 +00:00
db . or_ (
History . msg . like ( " Apply record changes to domain " + domain_name ) if domain_name != " * " \
else History . msg . like ( " Apply record changes to domain % " ) ,
History . msg . like ( " Apply record changes to zone " + domain_name ) if domain_name != " * " \
else History . msg . like ( " Apply record changes to zone % " ) ,
) ,
2023-02-19 20:04:30 +00:00
History . created_on < = max_date if max_date != None else True ,
History . created_on > = min_date if min_date != None else True ,
History . created_by == changed_by if changed_by != None else True
)
) . order_by ( History . created_on . desc ( ) ) \
. limit ( lim ) . all ( )
elif account_name != None :
if current_user . role . name in [ ' Administrator ' , ' Operator ' ] :
histories = base_query \
. join ( Domain , History . domain_id == Domain . id ) \
. outerjoin ( Account , Domain . account_id == Account . id ) \
. filter (
db . and_ (
Account . id == Domain . account_id ,
account_name == Account . name if account_name != " * " else True ,
History . created_on < = max_date if max_date != None else True ,
History . created_on > = min_date if min_date != None else True ,
History . created_by == changed_by if changed_by != None else True
)
) . order_by ( History . created_on . desc ( ) ) \
. limit ( lim ) . all ( )
else :
histories = base_query \
. filter (
db . and_ (
Account . id == Domain . account_id ,
account_name == Account . name if account_name != " * " else True ,
History . created_on < = max_date if max_date != None else True ,
History . created_on > = min_date if min_date != None else True ,
History . created_by == changed_by if changed_by != None else True
)
) . order_by ( History . created_on . desc ( ) ) \
. limit ( lim ) . all ( )
elif user_name != None and current_user . role . name in [ ' Administrator ' ,
' Operator ' ] : # only admins can see the user login-logouts
histories = History . query \
. filter (
db . and_ (
db . or_ (
History . msg . like (
" User " + user_name + " authentication % " ) if user_name != " * " and user_name != None else History . msg . like (
" %a uthentication % " ) ,
History . msg . like (
" User " + user_name + " was not authorized % " ) if user_name != " * " and user_name != None else History . msg . like (
" User % was not authorized % " )
) ,
History . created_on < = max_date if max_date != None else True ,
History . created_on > = min_date if min_date != None else True ,
History . created_by == changed_by if changed_by != None else True
)
) \
. order_by ( History . created_on . desc ( ) ) . limit ( lim ) . all ( )
temp = [ ]
for h in histories :
for method in auth_methods :
if method in h . detail :
temp . append ( h )
break
histories = temp
elif ( changed_by != None or max_date != None ) and current_user . role . name in [ ' Administrator ' ,
' Operator ' ] : # select changed by and date filters only
histories = History . query \
. filter (
db . and_ (
History . created_on < = max_date if max_date != None else True ,
History . created_on > = min_date if min_date != None else True ,
History . created_by == changed_by if changed_by != None else True
)
) \
. order_by ( History . created_on . desc ( ) ) . limit ( lim ) . all ( )
elif (
changed_by != None or max_date != None ) : # special filtering for user because one user does not have access to log-ins logs
histories = base_query \
. filter (
db . and_ (
History . created_on < = max_date if max_date != None else True ,
History . created_on > = min_date if min_date != None else True ,
History . created_by == changed_by if changed_by != None else True
)
) \
. order_by ( History . created_on . desc ( ) ) . limit ( lim ) . all ( )
elif max_date != None : # if changed by == null and only date is applied
histories = base_query . filter (
db . and_ (
History . created_on < = max_date if max_date != None else True ,
History . created_on > = min_date if min_date != None else True ,
)
) . order_by ( History . created_on . desc ( ) ) . limit ( lim ) . all ( )
else : # default view
if current_user . role . name in [ ' Administrator ' , ' Operator ' ] :
histories = History . query . order_by ( History . created_on . desc ( ) ) . limit ( lim ) . all ( )
else :
histories = db . session . query ( History ) \
. join ( Domain , History . domain_id == Domain . id ) \
. outerjoin ( DomainUser , Domain . id == DomainUser . domain_id ) \
. outerjoin ( Account , Domain . account_id == Account . id ) \
. outerjoin ( AccountUser , Account . id == AccountUser . account_id ) \
. order_by ( History . created_on . desc ( ) ) \
. filter (
db . or_ (
DomainUser . user_id == current_user . id ,
AccountUser . user_id == current_user . id
) ) . limit ( lim ) . all ( )
detailedHistories = convert_histories ( histories )
# Remove dates from previous or next day that were brought over
if tzoffset != None :
if min_date != None :
min_date_split = min_date . split ( ) [ 0 ]
if max_date != None :
max_date_split = max_date . split ( ) [ 0 ]
for i , history_rec in enumerate ( detailedHistories ) :
local_date = str ( from_utc_to_local ( int ( tzoffset ) , history_rec . history . created_on ) . date ( ) )
if ( min_date != None and local_date == min_date_split ) or (
max_date != None and local_date == max_date_split ) :
detailedHistories [ i ] = None
# Remove elements previously flagged as None
detailedHistories = [ h for h in detailedHistories if h is not None ]
return render_template ( ' admin_history_table.html ' , histories = detailedHistories ,
len_histories = len ( detailedHistories ) , lim = lim )
2019-12-02 03:32:03 +00:00
@admin_bp.route ( ' /setting/basic ' , methods = [ ' GET ' ] )
@login_required
@operator_role_required
def setting_basic ( ) :
2022-05-27 11:01:46 +00:00
settings = [
' account_name_extra_chars ' ,
' allow_user_create_domain ' ,
' allow_user_remove_domain ' ,
' allow_user_view_history ' ,
' auto_ptr ' ,
' bg_domain_updates ' ,
' custom_css ' ,
' default_domain_table_size ' ,
' default_record_table_size ' ,
' delete_sso_accounts ' ,
2023-03-12 13:36:30 +00:00
' custom_history_header ' ,
2022-05-27 11:01:46 +00:00
' deny_domain_override ' ,
' dnssec_admins_only ' ,
' enable_api_rr_history ' ,
' enforce_api_ttl ' ,
' fullscreen_layout ' ,
' gravatar_enabled ' ,
' login_ldap_first ' ,
' maintenance ' ,
' max_history_records ' ,
' otp_field_enabled ' ,
' otp_force ' ,
' pdns_api_timeout ' ,
2023-02-13 10:08:03 +00:00
' preserve_history ' ,
2022-05-27 11:01:46 +00:00
' pretty_ipv6_ptr ' ,
' record_helper ' ,
' record_quick_edit ' ,
' session_timeout ' ,
' site_name ' ,
' ttl_options ' ,
' verify_ssl_connections ' ,
' verify_user_email ' ,
' warn_session_timeout ' ,
]
return render_template ( ' admin_setting_basic.html ' , settings = settings )
2019-12-02 03:32:03 +00:00
@admin_bp.route ( ' /setting/basic/<path:setting>/edit ' , methods = [ ' POST ' ] )
@login_required
@operator_role_required
def setting_basic_edit ( setting ) :
jdata = request . json
new_value = jdata [ ' value ' ]
result = Setting ( ) . set ( setting , new_value )
if ( result ) :
return make_response (
jsonify ( {
' status ' : ' ok ' ,
' msg ' : ' Toggled setting successfully. '
} ) , 200 )
else :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' Unable to toggle setting. '
} ) , 500 )
@admin_bp.route ( ' /setting/basic/<path:setting>/toggle ' , methods = [ ' POST ' ] )
@login_required
@operator_role_required
def setting_basic_toggle ( setting ) :
result = Setting ( ) . toggle ( setting )
if ( result ) :
return make_response (
jsonify ( {
' status ' : ' ok ' ,
' msg ' : ' Toggled setting successfully. '
} ) , 200 )
else :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' Unable to toggle setting. '
} ) , 500 )
@admin_bp.route ( ' /setting/pdns ' , methods = [ ' GET ' , ' POST ' ] )
@login_required
@admin_role_required
def setting_pdns ( ) :
if request . method == ' GET ' :
pdns_api_url = Setting ( ) . get ( ' pdns_api_url ' )
pdns_api_key = Setting ( ) . get ( ' pdns_api_key ' )
pdns_version = Setting ( ) . get ( ' pdns_version ' )
return render_template ( ' admin_setting_pdns.html ' ,
pdns_api_url = pdns_api_url ,
pdns_api_key = pdns_api_key ,
pdns_version = pdns_version )
elif request . method == ' POST ' :
pdns_api_url = request . form . get ( ' pdns_api_url ' )
pdns_api_key = request . form . get ( ' pdns_api_key ' )
pdns_version = request . form . get ( ' pdns_version ' )
Setting ( ) . set ( ' pdns_api_url ' , pdns_api_url )
Setting ( ) . set ( ' pdns_api_key ' , pdns_api_key )
Setting ( ) . set ( ' pdns_version ' , pdns_version )
return render_template ( ' admin_setting_pdns.html ' ,
pdns_api_url = pdns_api_url ,
pdns_api_key = pdns_api_key ,
pdns_version = pdns_version )
@admin_bp.route ( ' /setting/dns-records ' , methods = [ ' GET ' , ' POST ' ] )
@login_required
@operator_role_required
def setting_records ( ) :
if request . method == ' GET ' :
_fr = Setting ( ) . get ( ' forward_records_allow_edit ' )
_rr = Setting ( ) . get ( ' reverse_records_allow_edit ' )
f_records = literal_eval ( _fr ) if isinstance ( _fr , str ) else _fr
r_records = literal_eval ( _rr ) if isinstance ( _rr , str ) else _rr
return render_template ( ' admin_setting_records.html ' ,
f_records = f_records ,
r_records = r_records )
elif request . method == ' POST ' :
fr = { }
rr = { }
records = Setting ( ) . defaults [ ' forward_records_allow_edit ' ]
for r in records :
fr [ r ] = True if request . form . get ( ' fr_ {0} ' . format (
r . lower ( ) ) ) else False
rr [ r ] = True if request . form . get ( ' rr_ {0} ' . format (
r . lower ( ) ) ) else False
Setting ( ) . set ( ' forward_records_allow_edit ' , str ( fr ) )
Setting ( ) . set ( ' reverse_records_allow_edit ' , str ( rr ) )
return redirect ( url_for ( ' admin.setting_records ' ) )
2020-04-30 03:17:08 +00:00
def has_an_auth_method ( local_db_enabled = None ,
ldap_enabled = None ,
google_oauth_enabled = None ,
github_oauth_enabled = None ,
oidc_oauth_enabled = None ,
azure_oauth_enabled = None ) :
2020-04-16 07:46:27 +00:00
if local_db_enabled is None :
local_db_enabled = Setting ( ) . get ( ' local_db_enabled ' )
if ldap_enabled is None :
ldap_enabled = Setting ( ) . get ( ' ldap_enabled ' )
if google_oauth_enabled is None :
google_oauth_enabled = Setting ( ) . get ( ' google_oauth_enabled ' )
if github_oauth_enabled is None :
github_oauth_enabled = Setting ( ) . get ( ' github_oauth_enabled ' )
if oidc_oauth_enabled is None :
oidc_oauth_enabled = Setting ( ) . get ( ' oidc_oauth_enabled ' )
if azure_oauth_enabled is None :
azure_oauth_enabled = Setting ( ) . get ( ' azure_oauth_enabled ' )
return local_db_enabled or ldap_enabled or google_oauth_enabled or github_oauth_enabled or oidc_oauth_enabled or azure_oauth_enabled
2019-12-02 03:32:03 +00:00
2020-04-30 03:17:08 +00:00
2019-12-02 03:32:03 +00:00
@admin_bp.route ( ' /setting/authentication ' , methods = [ ' GET ' , ' POST ' ] )
@login_required
@admin_role_required
def setting_authentication ( ) :
if request . method == ' GET ' :
return render_template ( ' admin_setting_authentication.html ' )
elif request . method == ' POST ' :
conf_type = request . form . get ( ' config_tab ' )
result = None
if conf_type == ' general ' :
local_db_enabled = True if request . form . get (
' local_db_enabled ' ) else False
signup_enabled = True if request . form . get (
2023-03-17 03:42:45 +00:00
' signup_enabled ' ) else False
pwd_enforce_characters = True if request . form . get ( ' pwd_enforce_characters ' ) else False
pwd_min_len = safe_cast ( request . form . get ( ' pwd_min_len ' , Setting ( ) . defaults [ " pwd_min_len " ] ) , int ,
Setting ( ) . defaults [ " pwd_min_len " ] )
pwd_min_lowercase = safe_cast ( request . form . get ( ' pwd_min_lowercase ' , Setting ( ) . defaults [ " pwd_min_lowercase " ] ) , int ,
Setting ( ) . defaults [ " pwd_min_lowercase " ] )
pwd_min_uppercase = safe_cast ( request . form . get ( ' pwd_min_uppercase ' , Setting ( ) . defaults [ " pwd_min_uppercase " ] ) , int ,
Setting ( ) . defaults [ " pwd_min_uppercase " ] )
pwd_min_digits = safe_cast ( request . form . get ( ' pwd_min_digits ' , Setting ( ) . defaults [ " pwd_min_digits " ] ) , int ,
Setting ( ) . defaults [ " pwd_min_digits " ] )
pwd_min_special = safe_cast ( request . form . get ( ' pwd_min_special ' , Setting ( ) . defaults [ " pwd_min_special " ] ) , int ,
Setting ( ) . defaults [ " pwd_min_special " ] )
pwd_enforce_complexity = True if request . form . get ( ' pwd_enforce_complexity ' ) else False
pwd_min_complexity = safe_cast ( request . form . get ( ' pwd_min_complexity ' , Setting ( ) . defaults [ " pwd_min_complexity " ] ) , int ,
Setting ( ) . defaults [ " pwd_min_complexity " ] )
2019-12-02 03:32:03 +00:00
2020-04-16 07:46:27 +00:00
if not has_an_auth_method ( local_db_enabled = local_db_enabled ) :
2019-12-02 03:32:03 +00:00
result = {
' status ' :
2023-02-19 20:04:30 +00:00
False ,
2019-12-02 03:32:03 +00:00
' msg ' :
2023-02-19 20:04:30 +00:00
' Must have at least one authentication method enabled. '
2019-12-02 03:32:03 +00:00
}
else :
Setting ( ) . set ( ' local_db_enabled ' , local_db_enabled )
Setting ( ) . set ( ' signup_enabled ' , signup_enabled )
2023-03-17 03:42:45 +00:00
Setting ( ) . set ( ' pwd_enforce_characters ' , pwd_enforce_characters )
Setting ( ) . set ( ' pwd_min_len ' , pwd_min_len )
Setting ( ) . set ( ' pwd_min_lowercase ' , pwd_min_lowercase )
Setting ( ) . set ( ' pwd_min_uppercase ' , pwd_min_uppercase )
Setting ( ) . set ( ' pwd_min_digits ' , pwd_min_digits )
Setting ( ) . set ( ' pwd_min_special ' , pwd_min_special )
Setting ( ) . set ( ' pwd_enforce_complexity ' , pwd_enforce_complexity )
Setting ( ) . set ( ' pwd_min_complexity ' , pwd_min_complexity )
2019-12-02 03:32:03 +00:00
result = { ' status ' : True , ' msg ' : ' Saved successfully ' }
2023-03-17 03:42:45 +00:00
2019-12-02 03:32:03 +00:00
elif conf_type == ' ldap ' :
ldap_enabled = True if request . form . get ( ' ldap_enabled ' ) else False
2020-04-16 07:46:27 +00:00
if not has_an_auth_method ( ldap_enabled = ldap_enabled ) :
2019-12-02 03:32:03 +00:00
result = {
' status ' :
2020-04-16 07:46:27 +00:00
False ,
2019-12-02 03:32:03 +00:00
' msg ' :
2020-04-16 07:46:27 +00:00
' Must have at least one authentication method enabled. '
2019-12-02 03:32:03 +00:00
}
else :
Setting ( ) . set ( ' ldap_enabled ' , ldap_enabled )
Setting ( ) . set ( ' ldap_type ' , request . form . get ( ' ldap_type ' ) )
Setting ( ) . set ( ' ldap_uri ' , request . form . get ( ' ldap_uri ' ) )
Setting ( ) . set ( ' ldap_base_dn ' , request . form . get ( ' ldap_base_dn ' ) )
Setting ( ) . set ( ' ldap_admin_username ' ,
request . form . get ( ' ldap_admin_username ' ) )
Setting ( ) . set ( ' ldap_admin_password ' ,
request . form . get ( ' ldap_admin_password ' ) )
Setting ( ) . set ( ' ldap_filter_basic ' ,
request . form . get ( ' ldap_filter_basic ' ) )
2020-01-08 22:19:51 +00:00
Setting ( ) . set ( ' ldap_filter_group ' ,
request . form . get ( ' ldap_filter_group ' ) )
2019-12-02 03:32:03 +00:00
Setting ( ) . set ( ' ldap_filter_username ' ,
request . form . get ( ' ldap_filter_username ' ) )
2020-01-08 22:19:51 +00:00
Setting ( ) . set ( ' ldap_filter_groupname ' ,
request . form . get ( ' ldap_filter_groupname ' ) )
2019-12-02 03:32:03 +00:00
Setting ( ) . set (
' ldap_sg_enabled ' , True
if request . form . get ( ' ldap_sg_enabled ' ) == ' ON ' else False )
Setting ( ) . set ( ' ldap_admin_group ' ,
request . form . get ( ' ldap_admin_group ' ) )
Setting ( ) . set ( ' ldap_operator_group ' ,
request . form . get ( ' ldap_operator_group ' ) )
Setting ( ) . set ( ' ldap_user_group ' ,
request . form . get ( ' ldap_user_group ' ) )
Setting ( ) . set ( ' ldap_domain ' , request . form . get ( ' ldap_domain ' ) )
2021-08-05 17:37:48 +00:00
Setting ( ) . set (
' autoprovisioning ' , True
if request . form . get ( ' autoprovisioning ' ) == ' ON ' else False )
Setting ( ) . set ( ' autoprovisioning_attribute ' ,
request . form . get ( ' autoprovisioning_attribute ' ) )
2023-02-19 20:04:30 +00:00
if request . form . get ( ' autoprovisioning ' ) == ' ON ' :
if validateURN ( request . form . get ( ' urn_value ' ) ) :
2021-08-05 17:37:48 +00:00
Setting ( ) . set ( ' urn_value ' ,
2023-02-19 20:04:30 +00:00
request . form . get ( ' urn_value ' ) )
2021-08-05 17:37:48 +00:00
else :
return render_template ( ' admin_setting_authentication.html ' ,
2023-02-19 20:04:30 +00:00
error = " Invalid urn " )
2021-08-05 17:37:48 +00:00
else :
Setting ( ) . set ( ' urn_value ' ,
2023-02-19 20:04:30 +00:00
request . form . get ( ' urn_value ' ) )
2021-08-05 17:37:48 +00:00
Setting ( ) . set ( ' purge ' , True
2023-02-19 20:04:30 +00:00
if request . form . get ( ' purge ' ) == ' ON ' else False )
2021-08-05 17:37:48 +00:00
2019-12-02 03:32:03 +00:00
result = { ' status ' : True , ' msg ' : ' Saved successfully ' }
elif conf_type == ' google ' :
2020-04-16 07:46:27 +00:00
google_oauth_enabled = True if request . form . get (
' google_oauth_enabled ' ) else False
if not has_an_auth_method ( google_oauth_enabled = google_oauth_enabled ) :
result = {
' status ' :
False ,
' msg ' :
' Must have at least one authentication method enabled. '
}
else :
2020-04-30 03:17:08 +00:00
Setting ( ) . set ( ' google_oauth_enabled ' , google_oauth_enabled )
2020-04-16 07:46:27 +00:00
Setting ( ) . set ( ' google_oauth_client_id ' ,
request . form . get ( ' google_oauth_client_id ' ) )
Setting ( ) . set ( ' google_oauth_client_secret ' ,
request . form . get ( ' google_oauth_client_secret ' ) )
2023-03-12 13:13:54 +00:00
Setting ( ) . set ( ' google_oauth_metadata_url ' ,
request . form . get ( ' google_oauth_metadata_url ' ) )
2020-04-16 07:46:27 +00:00
Setting ( ) . set ( ' google_token_url ' ,
request . form . get ( ' google_token_url ' ) )
Setting ( ) . set ( ' google_oauth_scope ' ,
request . form . get ( ' google_oauth_scope ' ) )
Setting ( ) . set ( ' google_authorize_url ' ,
request . form . get ( ' google_authorize_url ' ) )
Setting ( ) . set ( ' google_base_url ' ,
request . form . get ( ' google_base_url ' ) )
result = {
' status ' : True ,
2020-04-30 03:17:08 +00:00
' msg ' :
2023-02-19 20:04:30 +00:00
' Saved successfully. Please reload PDA to take effect. '
2020-04-16 07:46:27 +00:00
}
2019-12-02 03:32:03 +00:00
elif conf_type == ' github ' :
2020-04-16 07:46:27 +00:00
github_oauth_enabled = True if request . form . get (
' github_oauth_enabled ' ) else False
if not has_an_auth_method ( github_oauth_enabled = github_oauth_enabled ) :
result = {
' status ' :
False ,
' msg ' :
' Must have at least one authentication method enabled. '
}
else :
2020-04-30 03:17:08 +00:00
Setting ( ) . set ( ' github_oauth_enabled ' , github_oauth_enabled )
2020-04-16 07:46:27 +00:00
Setting ( ) . set ( ' github_oauth_key ' ,
request . form . get ( ' github_oauth_key ' ) )
Setting ( ) . set ( ' github_oauth_secret ' ,
request . form . get ( ' github_oauth_secret ' ) )
Setting ( ) . set ( ' github_oauth_scope ' ,
request . form . get ( ' github_oauth_scope ' ) )
Setting ( ) . set ( ' github_oauth_api_url ' ,
request . form . get ( ' github_oauth_api_url ' ) )
2023-03-12 13:13:54 +00:00
Setting ( ) . set ( ' github_oauth_metadata_url ' ,
request . form . get ( ' github_oauth_metadata_url ' ) )
2020-04-16 07:46:27 +00:00
Setting ( ) . set ( ' github_oauth_token_url ' ,
request . form . get ( ' github_oauth_token_url ' ) )
Setting ( ) . set ( ' github_oauth_authorize_url ' ,
request . form . get ( ' github_oauth_authorize_url ' ) )
result = {
' status ' : True ,
2020-04-30 03:17:08 +00:00
' msg ' :
2023-02-19 20:04:30 +00:00
' Saved successfully. Please reload PDA to take effect. '
2020-04-16 07:46:27 +00:00
}
2019-12-06 07:27:35 +00:00
elif conf_type == ' azure ' :
2020-04-16 07:46:27 +00:00
azure_oauth_enabled = True if request . form . get (
' azure_oauth_enabled ' ) else False
if not has_an_auth_method ( azure_oauth_enabled = azure_oauth_enabled ) :
result = {
' status ' :
False ,
' msg ' :
' Must have at least one authentication method enabled. '
}
else :
2020-04-30 03:17:08 +00:00
Setting ( ) . set ( ' azure_oauth_enabled ' , azure_oauth_enabled )
2020-04-16 07:46:27 +00:00
Setting ( ) . set ( ' azure_oauth_key ' ,
request . form . get ( ' azure_oauth_key ' ) )
Setting ( ) . set ( ' azure_oauth_secret ' ,
request . form . get ( ' azure_oauth_secret ' ) )
Setting ( ) . set ( ' azure_oauth_scope ' ,
request . form . get ( ' azure_oauth_scope ' ) )
Setting ( ) . set ( ' azure_oauth_api_url ' ,
request . form . get ( ' azure_oauth_api_url ' ) )
2023-03-12 13:13:54 +00:00
Setting ( ) . set ( ' azure_oauth_metadata_url ' ,
request . form . get ( ' azure_oauth_metadata_url ' ) )
2020-04-16 07:46:27 +00:00
Setting ( ) . set ( ' azure_oauth_token_url ' ,
request . form . get ( ' azure_oauth_token_url ' ) )
Setting ( ) . set ( ' azure_oauth_authorize_url ' ,
request . form . get ( ' azure_oauth_authorize_url ' ) )
2020-04-30 03:17:08 +00:00
Setting ( ) . set (
' azure_sg_enabled ' , True
if request . form . get ( ' azure_sg_enabled ' ) == ' ON ' else False )
Setting ( ) . set ( ' azure_admin_group ' ,
request . form . get ( ' azure_admin_group ' ) )
Setting ( ) . set ( ' azure_operator_group ' ,
request . form . get ( ' azure_operator_group ' ) )
Setting ( ) . set ( ' azure_user_group ' ,
request . form . get ( ' azure_user_group ' ) )
2020-07-03 06:55:31 +00:00
Setting ( ) . set (
' azure_group_accounts_enabled ' , True
if request . form . get ( ' azure_group_accounts_enabled ' ) == ' ON ' else False )
Setting ( ) . set ( ' azure_group_accounts_name ' ,
request . form . get ( ' azure_group_accounts_name ' ) )
Setting ( ) . set ( ' azure_group_accounts_name_re ' ,
request . form . get ( ' azure_group_accounts_name_re ' ) )
Setting ( ) . set ( ' azure_group_accounts_description ' ,
request . form . get ( ' azure_group_accounts_description ' ) )
Setting ( ) . set ( ' azure_group_accounts_description_re ' ,
request . form . get ( ' azure_group_accounts_description_re ' ) )
2020-04-16 07:46:27 +00:00
result = {
' status ' : True ,
2020-04-30 03:17:08 +00:00
' msg ' :
2023-02-19 20:04:30 +00:00
' Saved successfully. Please reload PDA to take effect. '
2020-04-16 07:46:27 +00:00
}
2019-12-02 03:32:03 +00:00
elif conf_type == ' oidc ' :
2020-04-16 07:46:27 +00:00
oidc_oauth_enabled = True if request . form . get (
' oidc_oauth_enabled ' ) else False
if not has_an_auth_method ( oidc_oauth_enabled = oidc_oauth_enabled ) :
result = {
' status ' :
False ,
' msg ' :
' Must have at least one authentication method enabled. '
}
else :
Setting ( ) . set (
' oidc_oauth_enabled ' ,
True if request . form . get ( ' oidc_oauth_enabled ' ) else False )
2020-04-30 03:17:08 +00:00
Setting ( ) . set ( ' oidc_oauth_key ' ,
request . form . get ( ' oidc_oauth_key ' ) )
2020-04-16 07:46:27 +00:00
Setting ( ) . set ( ' oidc_oauth_secret ' ,
request . form . get ( ' oidc_oauth_secret ' ) )
Setting ( ) . set ( ' oidc_oauth_scope ' ,
request . form . get ( ' oidc_oauth_scope ' ) )
Setting ( ) . set ( ' oidc_oauth_api_url ' ,
request . form . get ( ' oidc_oauth_api_url ' ) )
2023-03-12 13:13:54 +00:00
Setting ( ) . set ( ' oidc_oauth_metadata_url ' ,
request . form . get ( ' oidc_oauth_metadata_url ' ) )
2020-04-16 07:46:27 +00:00
Setting ( ) . set ( ' oidc_oauth_token_url ' ,
request . form . get ( ' oidc_oauth_token_url ' ) )
Setting ( ) . set ( ' oidc_oauth_authorize_url ' ,
request . form . get ( ' oidc_oauth_authorize_url ' ) )
2021-11-05 15:28:21 +00:00
Setting ( ) . set ( ' oidc_oauth_logout_url ' ,
request . form . get ( ' oidc_oauth_logout_url ' ) )
2020-05-04 07:12:48 +00:00
Setting ( ) . set ( ' oidc_oauth_username ' ,
request . form . get ( ' oidc_oauth_username ' ) )
Setting ( ) . set ( ' oidc_oauth_firstname ' ,
request . form . get ( ' oidc_oauth_firstname ' ) )
Setting ( ) . set ( ' oidc_oauth_last_name ' ,
request . form . get ( ' oidc_oauth_last_name ' ) )
Setting ( ) . set ( ' oidc_oauth_email ' ,
request . form . get ( ' oidc_oauth_email ' ) )
2020-03-06 15:01:18 +00:00
Setting ( ) . set ( ' oidc_oauth_account_name_property ' ,
request . form . get ( ' oidc_oauth_account_name_property ' ) )
Setting ( ) . set ( ' oidc_oauth_account_description_property ' ,
request . form . get ( ' oidc_oauth_account_description_property ' ) )
2020-04-16 07:46:27 +00:00
result = {
' status ' : True ,
2020-04-30 03:17:08 +00:00
' msg ' :
2023-02-19 20:04:30 +00:00
' Saved successfully. Please reload PDA to take effect. '
2020-04-16 07:46:27 +00:00
}
2019-12-02 03:32:03 +00:00
else :
return abort ( 400 )
return render_template ( ' admin_setting_authentication.html ' ,
result = result )
@admin_bp.route ( ' /templates ' , methods = [ ' GET ' , ' POST ' ] )
@admin_bp.route ( ' /templates/list ' , methods = [ ' GET ' , ' POST ' ] )
@login_required
@operator_role_required
def templates ( ) :
templates = DomainTemplate . query . all ( )
return render_template ( ' template.html ' , templates = templates )
@admin_bp.route ( ' /template/create ' , methods = [ ' GET ' , ' POST ' ] )
@login_required
@operator_role_required
def create_template ( ) :
if request . method == ' GET ' :
return render_template ( ' template_add.html ' )
if request . method == ' POST ' :
try :
name = request . form . getlist ( ' name ' ) [ 0 ]
description = request . form . getlist ( ' description ' ) [ 0 ]
if ' ' in name or not name or not type :
flash ( " Please correct your input " , ' error ' )
return redirect ( url_for ( ' admin.create_template ' ) )
if DomainTemplate . query . filter (
DomainTemplate . name == name ) . first ( ) :
flash (
" A template with the name {0} already exists! " . format (
name ) , ' error ' )
return redirect ( url_for ( ' admin.create_template ' ) )
t = DomainTemplate ( name = name , description = description )
result = t . create ( )
if result [ ' status ' ] == ' ok ' :
2023-03-12 19:41:10 +00:00
history = History ( msg = ' Add zone template {0} ' . format ( name ) ,
2023-02-19 20:04:30 +00:00
detail = json . dumps ( {
' name ' : name ,
' description ' : description
} ) ,
2019-12-02 03:32:03 +00:00
created_by = current_user . username )
history . add ( )
return redirect ( url_for ( ' admin.templates ' ) )
else :
flash ( result [ ' msg ' ] , ' error ' )
return redirect ( url_for ( ' admin.create_template ' ) )
except Exception as e :
current_app . logger . error (
2023-03-12 19:41:10 +00:00
' Cannot create zone template. Error: {0} ' . format ( e ) )
2019-12-02 03:32:03 +00:00
current_app . logger . debug ( traceback . format_exc ( ) )
abort ( 500 )
@admin_bp.route ( ' /template/create-from-zone ' , methods = [ ' POST ' ] )
@login_required
@operator_role_required
def create_template_from_zone ( ) :
try :
jdata = request . json
name = jdata [ ' name ' ]
description = jdata [ ' description ' ]
domain_name = jdata [ ' domain ' ]
if ' ' in name or not name or not type :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' Please correct template name '
} ) , 400 )
if DomainTemplate . query . filter ( DomainTemplate . name == name ) . first ( ) :
return make_response (
jsonify ( {
' status ' :
2023-02-19 20:04:30 +00:00
' error ' ,
2019-12-02 03:32:03 +00:00
' msg ' :
2023-02-19 20:04:30 +00:00
' A template with the name {0} already exists! ' . format ( name )
2019-12-02 03:32:03 +00:00
} ) , 409 )
t = DomainTemplate ( name = name , description = description )
result = t . create ( )
if result [ ' status ' ] == ' ok ' :
2023-03-12 19:41:10 +00:00
history = History ( msg = ' Add zone template {0} ' . format ( name ) ,
2023-02-19 20:04:30 +00:00
detail = json . dumps ( {
' name ' : name ,
' description ' : description
} ) ,
2019-12-02 03:32:03 +00:00
created_by = current_user . username )
history . add ( )
2023-03-12 19:41:10 +00:00
# After creating the zone in Zone Template in the,
2019-12-16 07:23:17 +00:00
# local DB. We add records into it Record Template.
2019-12-02 03:32:03 +00:00
records = [ ]
domain = Domain . query . filter ( Domain . name == domain_name ) . first ( )
if domain :
2019-12-16 07:23:17 +00:00
# Query zone's rrsets from PowerDNS API
rrsets = Record ( ) . get_rrsets ( domain . name )
if rrsets :
for r in rrsets :
name = ' @ ' if r [ ' name ' ] == domain_name + ' . ' else r [
' name ' ] . replace ( ' . {} . ' . format ( domain_name ) , ' ' )
for record in r [ ' records ' ] :
t_record = DomainTemplateRecord (
2019-12-02 03:32:03 +00:00
name = name ,
2019-12-16 07:23:17 +00:00
type = r [ ' type ' ] ,
status = False if record [ ' disabled ' ] else True ,
ttl = r [ ' ttl ' ] ,
data = record [ ' content ' ] )
records . append ( t_record )
2019-12-02 03:32:03 +00:00
2019-12-16 07:23:17 +00:00
result = t . replace_records ( records )
2019-12-02 03:32:03 +00:00
2019-12-16 07:23:17 +00:00
if result [ ' status ' ] == ' ok ' :
2019-12-02 03:32:03 +00:00
return make_response (
jsonify ( {
' status ' : ' ok ' ,
' msg ' : result [ ' msg ' ]
} ) , 200 )
else :
2023-03-12 19:41:10 +00:00
# Revert the zone template (remove it)
2019-12-16 07:23:17 +00:00
# ff we cannot add records.
2019-12-02 03:32:03 +00:00
t . delete_template ( )
return make_response (
jsonify ( {
' status ' : ' error ' ,
2019-12-16 07:23:17 +00:00
' msg ' : result [ ' msg ' ]
2019-12-02 03:32:03 +00:00
} ) , 500 )
else :
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : result [ ' msg ' ]
} ) , 500 )
except Exception as e :
current_app . logger . error (
' Cannot create template from zone. Error: {0} ' . format ( e ) )
current_app . logger . debug ( traceback . format_exc ( ) )
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' Error when applying new changes '
} ) , 500 )
@admin_bp.route ( ' /template/<path:template>/edit ' , methods = [ ' GET ' ] )
@login_required
@operator_role_required
def edit_template ( template ) :
try :
t = DomainTemplate . query . filter (
DomainTemplate . name == template ) . first ( )
records_allow_to_edit = Setting ( ) . get_records_allow_to_edit ( )
quick_edit = Setting ( ) . get ( ' record_quick_edit ' )
ttl_options = Setting ( ) . get_ttl_options ( )
if t is not None :
records = [ ]
for jr in t . records :
if jr . type in records_allow_to_edit :
record = DomainTemplateRecord (
name = jr . name ,
type = jr . type ,
2019-12-16 07:23:17 +00:00
status = ' Active ' if jr . status else ' Disabled ' ,
2019-12-02 03:32:03 +00:00
ttl = jr . ttl ,
2019-12-09 10:50:48 +00:00
data = jr . data ,
comment = jr . comment if jr . comment else ' ' )
2019-12-02 03:32:03 +00:00
records . append ( record )
return render_template ( ' template_edit.html ' ,
template = t . name ,
records = records ,
editable_records = records_allow_to_edit ,
quick_edit = quick_edit ,
ttl_options = ttl_options )
except Exception as e :
current_app . logger . error (
2023-03-12 19:41:10 +00:00
' Cannot open zone template page. DETAIL: {0} ' . format ( e ) )
2019-12-02 03:32:03 +00:00
current_app . logger . debug ( traceback . format_exc ( ) )
abort ( 500 )
return redirect ( url_for ( ' admin.templates ' ) )
@admin_bp.route ( ' /template/<path:template>/apply ' ,
methods = [ ' POST ' ] ,
strict_slashes = False )
@login_required
def apply_records ( template ) :
try :
jdata = request . json
records = [ ]
for j in jdata [ ' records ' ] :
name = ' @ ' if j [ ' record_name ' ] in [ ' @ ' , ' ' ] else j [ ' record_name ' ]
type = j [ ' record_type ' ]
data = j [ ' record_data ' ]
2019-12-09 10:50:48 +00:00
comment = j [ ' record_comment ' ]
2019-12-16 07:23:17 +00:00
status = 0 if j [ ' record_status ' ] == ' Disabled ' else 1
2019-12-02 03:32:03 +00:00
ttl = int ( j [ ' record_ttl ' ] ) if j [ ' record_ttl ' ] else 3600
dtr = DomainTemplateRecord ( name = name ,
type = type ,
data = data ,
2019-12-09 10:50:48 +00:00
comment = comment ,
2019-12-16 07:23:17 +00:00
status = status ,
2019-12-02 03:32:03 +00:00
ttl = ttl )
records . append ( dtr )
t = DomainTemplate . query . filter (
DomainTemplate . name == template ) . first ( )
result = t . replace_records ( records )
if result [ ' status ' ] == ' ok ' :
jdata . pop ( ' _csrf_token ' ,
None ) # don't store csrf token in the history.
history = History (
2023-03-12 19:41:10 +00:00
msg = ' Apply zone template record changes to zone template {0} '
2019-12-02 03:32:03 +00:00
. format ( template ) ,
2023-02-19 20:04:30 +00:00
detail = json . dumps ( jdata ) ,
2019-12-02 03:32:03 +00:00
created_by = current_user . username )
history . add ( )
return make_response ( jsonify ( result ) , 200 )
else :
return make_response ( jsonify ( result ) , 400 )
except Exception as e :
current_app . logger . error (
' Cannot apply record changes to the template. Error: {0} ' . format (
e ) )
current_app . logger . debug ( traceback . format_exc ( ) )
return make_response (
jsonify ( {
' status ' : ' error ' ,
' msg ' : ' Error when applying new changes '
} ) , 500 )
@admin_bp.route ( ' /template/<path:template>/delete ' , methods = [ ' POST ' ] )
@login_required
@operator_role_required
def delete_template ( template ) :
try :
t = DomainTemplate . query . filter (
DomainTemplate . name == template ) . first ( )
if t is not None :
result = t . delete_template ( )
if result [ ' status ' ] == ' ok ' :
history = History (
2023-03-12 19:41:10 +00:00
msg = ' Deleted zone template {0} ' . format ( template ) ,
2023-02-19 20:04:30 +00:00
detail = json . dumps ( { ' name ' : template } ) ,
2019-12-02 03:32:03 +00:00
created_by = current_user . username )
history . add ( )
return redirect ( url_for ( ' admin.templates ' ) )
else :
flash ( result [ ' msg ' ] , ' error ' )
return redirect ( url_for ( ' admin.templates ' ) )
except Exception as e :
current_app . logger . error (
' Cannot delete template. Error: {0} ' . format ( e ) )
current_app . logger . debug ( traceback . format_exc ( ) )
abort ( 500 )
return redirect ( url_for ( ' admin.templates ' ) )
2019-12-11 03:26:17 +00:00
@admin_bp.route ( ' /global-search ' , methods = [ ' GET ' ] )
@login_required
def global_search ( ) :
if request . method == ' GET ' :
domains = [ ]
records = [ ]
comments = [ ]
query = request . args . get ( ' q ' )
if query :
server = Server ( server_id = ' localhost ' )
results = server . global_search ( object_type = ' all ' , query = query )
2023-02-13 10:10:44 +00:00
# Filter results to domains to which the user has access permission
if current_user . role . name not in [ ' Administrator ' , ' Operator ' ] :
allowed_domains = db . session . query ( Domain ) \
. outerjoin ( DomainUser , Domain . id == DomainUser . domain_id ) \
. outerjoin ( Account , Domain . account_id == Account . id ) \
. outerjoin ( AccountUser , Account . id == AccountUser . account_id ) \
. filter (
db . or_ (
DomainUser . user_id == current_user . id ,
AccountUser . user_id == current_user . id
) ) \
. with_entities ( Domain . name ) \
. all ( )
allowed_domains = [ value for value , in allowed_domains ]
results = list ( filter ( lambda r : r [ ' zone_id ' ] [ : - 1 ] in allowed_domains , results ) )
2019-12-11 03:26:17 +00:00
# Format the search result
for result in results :
if result [ ' object_type ' ] == ' zone ' :
# Remove the dot at the end of string
result [ ' name ' ] = result [ ' name ' ] [ : - 1 ]
domains . append ( result )
elif result [ ' object_type ' ] == ' record ' :
# Remove the dot at the end of string
result [ ' name ' ] = result [ ' name ' ] [ : - 1 ]
result [ ' zone_id ' ] = result [ ' zone_id ' ] [ : - 1 ]
records . append ( result )
elif result [ ' object_type ' ] == ' comment ' :
# Get the actual record name, exclude the domain part
result [ ' name ' ] = result [ ' name ' ] . replace ( result [ ' zone_id ' ] , ' ' )
if result [ ' name ' ] :
result [ ' name ' ] = result [ ' name ' ] [ : - 1 ]
else :
result [ ' name ' ] = ' @ '
# Remove the dot at the end of string
result [ ' zone_id ' ] = result [ ' zone_id ' ] [ : - 1 ]
comments . append ( result )
else :
pass
2023-02-19 20:54:11 +00:00
params : dict = {
' query ' : query if query is not None else ' ' ,
' domains ' : domains ,
' records ' : records ,
' comments ' : comments ,
}
return render_template ( ' admin_global_search.html ' , * * params )
2021-08-05 17:37:48 +00:00
2023-02-19 20:04:30 +00:00
2021-08-05 17:37:48 +00:00
def validateURN ( value ) :
NID_PATTERN = re . compile ( r ' ^[0-9a-z][0-9a-z-] { 1,31}$ ' , flags = re . IGNORECASE )
NSS_PCHAR = ' [a-z0-9-._~]| % [a-f0-9] {2} |[!$& \' ()*+,;=]|:|@ '
NSS_PATTERN = re . compile ( fr ' ^( { NSS_PCHAR } )( { NSS_PCHAR } |/| \ ?)*$ ' , re . IGNORECASE )
2023-02-19 20:04:30 +00:00
prefix = value . split ( ' : ' )
if ( len ( prefix ) < 3 ) :
current_app . logger . warning ( " Too small urn prefix " )
2021-08-05 17:37:48 +00:00
return False
2023-02-19 20:04:30 +00:00
urn = prefix [ 0 ]
nid = prefix [ 1 ]
nss = value . replace ( urn + " : " + nid + " : " , " " )
2021-08-05 17:37:48 +00:00
2023-02-19 20:04:30 +00:00
if not urn . lower ( ) == " urn " :
current_app . logger . warning ( urn + ' contains invalid characters ' )
2021-08-05 17:37:48 +00:00
return False
if not re . match ( NID_PATTERN , nid . lower ( ) ) :
2023-02-19 20:04:30 +00:00
current_app . logger . warning ( nid + ' contains invalid characters ' )
2021-08-05 17:37:48 +00:00
return False
if not re . match ( NSS_PATTERN , nss ) :
2023-02-19 20:04:30 +00:00
current_app . logger . warning ( nss + ' contains invalid characters ' )
2021-08-05 17:37:48 +00:00
return False
return True
2023-03-17 03:42:45 +00:00
def safe_cast ( val , to_type , default = None ) :
try :
return to_type ( val )
except ( ValueError , TypeError ) :
return default