mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-09-16 07:12:30 +00:00
Merge remote-tracking branch 'upstream/master' into fix-saml
Cleaning up conflicts with upstream changes.
This commit is contained in:
233
app/views.py
233
app/views.py
@@ -17,10 +17,10 @@ from flask_login import login_user, logout_user, current_user, login_required
|
||||
from werkzeug import secure_filename
|
||||
from werkzeug.security import gen_salt
|
||||
|
||||
from .models import User, Domain, Record, Server, History, Anonymous, Setting, DomainSetting, DomainTemplate, DomainTemplateRecord, Role
|
||||
from .models import User, Account, Domain, Record, Role, Server, History, Anonymous, Setting, DomainSetting, DomainTemplate, DomainTemplateRecord
|
||||
from app import app, login_manager, github, google
|
||||
from app.lib import utils
|
||||
from app.decorators import admin_role_required, can_access_domain
|
||||
from app.decorators import admin_role_required, can_access_domain, can_configure_dnssec
|
||||
|
||||
if app.config['SAML_ENABLED']:
|
||||
from onelogin.saml2.auth import OneLogin_Saml2_Auth
|
||||
@@ -29,10 +29,10 @@ if app.config['SAML_ENABLED']:
|
||||
logging = logger.getLogger(__name__)
|
||||
|
||||
# FILTERS
|
||||
jinja2.filters.FILTERS['display_record_name'] = utils.display_record_name
|
||||
jinja2.filters.FILTERS['display_master_name'] = utils.display_master_name
|
||||
jinja2.filters.FILTERS['display_second_to_time'] = utils.display_time
|
||||
jinja2.filters.FILTERS['email_to_gravatar_url'] = utils.email_to_gravatar_url
|
||||
app.jinja_env.filters['display_record_name'] = utils.display_record_name
|
||||
app.jinja_env.filters['display_master_name'] = utils.display_master_name
|
||||
app.jinja_env.filters['display_second_to_time'] = utils.display_time
|
||||
app.jinja_env.filters['email_to_gravatar_url'] = utils.email_to_gravatar_url
|
||||
|
||||
# Flag for pdns v4.x.x
|
||||
# TODO: Find another way to do this
|
||||
@@ -45,49 +45,46 @@ else:
|
||||
|
||||
@app.context_processor
|
||||
def inject_fullscreen_layout_setting():
|
||||
fullscreen_layout_setting = Setting.query.filter(Setting.name == 'fullscreen_layout').first()
|
||||
return dict(fullscreen_layout_setting=strtobool(fullscreen_layout_setting.value))
|
||||
setting_value = Setting().get('fullscreen_layout')
|
||||
return dict(fullscreen_layout_setting=strtobool(setting_value))
|
||||
|
||||
|
||||
@app.context_processor
|
||||
def inject_record_helper_setting():
|
||||
record_helper_setting = Setting.query.filter(Setting.name == 'record_helper').first()
|
||||
return dict(record_helper_setting=strtobool(record_helper_setting.value))
|
||||
setting_value = Setting().get('record_helper')
|
||||
return dict(record_helper_setting=strtobool(setting_value))
|
||||
|
||||
|
||||
@app.context_processor
|
||||
def inject_login_ldap_first_setting():
|
||||
login_ldap_first_setting = Setting.query.filter(Setting.name == 'login_ldap_first').first()
|
||||
return dict(login_ldap_first_setting=strtobool(login_ldap_first_setting.value))
|
||||
setting_value = Setting().get('login_ldap_first')
|
||||
return dict(login_ldap_first_setting=strtobool(setting_value))
|
||||
|
||||
|
||||
@app.context_processor
|
||||
def inject_default_record_table_size_setting():
|
||||
default_record_table_size_setting = Setting.query.filter(Setting.name == 'default_record_table_size').first()
|
||||
return dict(default_record_table_size_setting=default_record_table_size_setting.value)
|
||||
setting_value = Setting().get('default_record_table_size')
|
||||
return dict(default_record_table_size_setting=setting_value)
|
||||
|
||||
|
||||
@app.context_processor
|
||||
def inject_default_domain_table_size_setting():
|
||||
default_domain_table_size_setting = Setting.query.filter(Setting.name == 'default_domain_table_size').first()
|
||||
return dict(default_domain_table_size_setting=default_domain_table_size_setting.value)
|
||||
setting_value = Setting().get('default_domain_table_size')
|
||||
return dict(default_domain_table_size_setting=setting_value)
|
||||
|
||||
|
||||
@app.context_processor
|
||||
def inject_auto_ptr_setting():
|
||||
auto_ptr_setting = Setting.query.filter(Setting.name == 'auto_ptr').first()
|
||||
if auto_ptr_setting is None:
|
||||
return dict(auto_ptr_setting=False)
|
||||
else:
|
||||
return dict(auto_ptr_setting=strtobool(auto_ptr_setting.value))
|
||||
setting_value = Setting().get('auto_ptr')
|
||||
return dict(auto_ptr_setting=strtobool(setting_value))
|
||||
|
||||
|
||||
# START USER AUTHENTICATION HANDLER
|
||||
@app.before_request
|
||||
def before_request():
|
||||
# check site maintenance mode first
|
||||
maintenance = Setting.query.filter(Setting.name == 'maintenance').first()
|
||||
if maintenance and maintenance.value == 'True':
|
||||
maintenance = Setting().get('maintenance')
|
||||
if strtobool(maintenance):
|
||||
return render_template('maintenance.html')
|
||||
|
||||
# check if user is anonymous
|
||||
@@ -381,7 +378,7 @@ def login():
|
||||
|
||||
# check if user enabled OPT authentication
|
||||
if user.otp_secret:
|
||||
if otp_token:
|
||||
if otp_token and otp_token.isdigit():
|
||||
good_token = user.verify_totp(otp_token)
|
||||
if not good_token:
|
||||
return render_template('login.html', error='Invalid credentials',
|
||||
@@ -475,7 +472,11 @@ def saml_logout():
|
||||
@app.route('/dashboard', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def dashboard():
|
||||
d = Domain().update()
|
||||
if not app.config.get('BG_DOMAIN_UPDATES'):
|
||||
logging.debug('Update domains in foreground')
|
||||
d = Domain().update()
|
||||
else:
|
||||
logging.debug('Update domains in background')
|
||||
|
||||
# stats for dashboard
|
||||
domain_count = Domain.query.count()
|
||||
@@ -488,7 +489,8 @@ def dashboard():
|
||||
uptime = list([uptime for uptime in statistics if uptime['name'] == 'uptime'])[0]['value']
|
||||
else:
|
||||
uptime = 0
|
||||
return render_template('dashboard.html', domain_count=domain_count, users=users, history_number=history_number, uptime=uptime, histories=history,pdns_version=app.config['PDNS_VERSION'])
|
||||
|
||||
return render_template('dashboard.html', domain_count=domain_count, users=users, history_number=history_number, uptime=uptime, histories=history, dnssec_adm_only=app.config['DNSSEC_ADMINS_ONLY'], pdns_version=app.config['PDNS_VERSION'], show_bg_domain_button=app.config['BG_DOMAIN_UPDATES'])
|
||||
|
||||
|
||||
@app.route('/dashboard-domains', methods=['GET'])
|
||||
@@ -502,7 +504,7 @@ def dashboard_domains():
|
||||
template = app.jinja_env.get_template("dashboard_domain.html")
|
||||
render = template.make_module(vars={"current_user": current_user})
|
||||
|
||||
columns = [Domain.name, Domain.dnssec, Domain.type, Domain.serial, Domain.master]
|
||||
columns = [Domain.name, Domain.dnssec, Domain.type, Domain.serial, Domain.master, Domain.account]
|
||||
# History.created_on.desc()
|
||||
order_by = []
|
||||
for i in range(len(columns)):
|
||||
@@ -525,7 +527,13 @@ def dashboard_domains():
|
||||
if search:
|
||||
start = "" if search.startswith("^") else "%"
|
||||
end = "" if search.endswith("$") else "%"
|
||||
domains = domains.filter(Domain.name.ilike(start + search.strip("^$") + end))
|
||||
|
||||
if current_user.role.name == 'Administrator':
|
||||
domains = domains.outerjoin(Account).filter(Domain.name.ilike(start + search.strip("^$") + end) |
|
||||
Account.name.ilike(start + search.strip("^$") + end) |
|
||||
Account.description.ilike(start + search.strip("^$") + end))
|
||||
else:
|
||||
domains = domains.filter(Domain.name.ilike(start + search.strip("^$") + end))
|
||||
|
||||
filtered_count = domains.count()
|
||||
|
||||
@@ -535,9 +543,6 @@ def dashboard_domains():
|
||||
if length != -1:
|
||||
domains = domains[start:start + length]
|
||||
|
||||
if current_user.role.name != 'Administrator':
|
||||
domains = [d[2] for d in domains]
|
||||
|
||||
data = []
|
||||
for domain in domains:
|
||||
data.append([
|
||||
@@ -546,6 +551,7 @@ def dashboard_domains():
|
||||
render.type(domain),
|
||||
render.serial(domain),
|
||||
render.master(domain),
|
||||
render.account(domain),
|
||||
render.actions(domain),
|
||||
])
|
||||
|
||||
@@ -557,6 +563,17 @@ def dashboard_domains():
|
||||
}
|
||||
return jsonify(response_data)
|
||||
|
||||
@app.route('/dashboard-domains-updater', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def dashboard_domains_updater():
|
||||
logging.debug('Update domains in background')
|
||||
d = Domain().update()
|
||||
|
||||
response_data = {
|
||||
"result": d,
|
||||
}
|
||||
return jsonify(response_data)
|
||||
|
||||
|
||||
@app.route('/domain/<path:domain_name>', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
@@ -575,6 +592,8 @@ def domain(domain_name):
|
||||
# can not get any record, API server might be down
|
||||
return redirect(url_for('error', code=500))
|
||||
|
||||
quick_edit = strtobool(Setting().get('allow_quick_edit'))
|
||||
|
||||
records = []
|
||||
#TODO: This should be done in the "model" instead of "view"
|
||||
if NEW_SCHEMA:
|
||||
@@ -587,7 +606,7 @@ def domain(domain_name):
|
||||
editable_records = app.config['RECORDS_ALLOW_EDIT']
|
||||
else:
|
||||
editable_records = app.config['REVERSE_RECORDS_ALLOW_EDIT']
|
||||
return render_template('domain.html', domain=domain, records=records, editable_records=editable_records)
|
||||
return render_template('domain.html', domain=domain, records=records, editable_records=editable_records, quick_edit=quick_edit)
|
||||
else:
|
||||
for jr in jrecords:
|
||||
if jr['type'] in app.config['RECORDS_ALLOW_EDIT']:
|
||||
@@ -597,7 +616,7 @@ def domain(domain_name):
|
||||
editable_records = app.config['FORWARD_RECORDS_ALLOW_EDIT']
|
||||
else:
|
||||
editable_records = app.config['REVERSE_RECORDS_ALLOW_EDIT']
|
||||
return render_template('domain.html', domain=domain, records=records, editable_records=editable_records, pdns_version=app.config['PDNS_VERSION'])
|
||||
return render_template('domain.html', domain=domain, records=records, editable_records=editable_records, quick_edit=quick_edit, pdns_version=app.config['PDNS_VERSION'])
|
||||
|
||||
|
||||
@app.route('/admin/domain/add', methods=['GET', 'POST'])
|
||||
@@ -611,6 +630,7 @@ def domain_add():
|
||||
domain_type = request.form.getlist('radio_type')[0]
|
||||
domain_template = request.form.getlist('domain_template')[0]
|
||||
soa_edit_api = request.form.getlist('radio_type_soa_edit_api')[0]
|
||||
account_id = request.form.getlist('accountid')[0]
|
||||
|
||||
if ' ' in domain_name or not domain_name or not domain_type:
|
||||
return render_template('errors/400.html', msg="Please correct your input"), 400
|
||||
@@ -622,10 +642,13 @@ def domain_add():
|
||||
domain_master_ips = domain_master_string.split(',')
|
||||
else:
|
||||
domain_master_ips = []
|
||||
|
||||
account_name = Account().get_name_by_id(account_id)
|
||||
|
||||
d = Domain()
|
||||
result = d.add(domain_name=domain_name, domain_type=domain_type, soa_edit_api=soa_edit_api, domain_master_ips=domain_master_ips)
|
||||
result = d.add(domain_name=domain_name, domain_type=domain_type, soa_edit_api=soa_edit_api, domain_master_ips=domain_master_ips, account_name=account_name)
|
||||
if result['status'] == 'ok':
|
||||
history = History(msg='Add domain {0}'.format(domain_name), detail=str({'domain_type': domain_type, 'domain_master_ips': domain_master_ips}), created_by=current_user.username)
|
||||
history = History(msg='Add domain {0}'.format(domain_name), detail=str({'domain_type': domain_type, 'domain_master_ips': domain_master_ips, 'account_id': account_id}), created_by=current_user.username)
|
||||
history.add()
|
||||
if domain_template != '0':
|
||||
template = DomainTemplate.query.filter(DomainTemplate.id == domain_template).first()
|
||||
@@ -648,7 +671,10 @@ def domain_add():
|
||||
except:
|
||||
logging.error(traceback.print_exc())
|
||||
return redirect(url_for('error', code=500))
|
||||
return render_template('domain_add.html', templates=templates)
|
||||
|
||||
else:
|
||||
accounts = Account.query.all()
|
||||
return render_template('domain_add.html', templates=templates, accounts=accounts)
|
||||
|
||||
|
||||
@app.route('/admin/domain/<path:domain_name>/delete', methods=['GET'])
|
||||
@@ -676,12 +702,14 @@ def domain_management(domain_name):
|
||||
if not domain:
|
||||
return redirect(url_for('error', code=404))
|
||||
users = User.query.all()
|
||||
accounts = Account.query.all()
|
||||
|
||||
# get list of user ids to initilize selection data
|
||||
d = Domain(name=domain_name)
|
||||
domain_user_ids = d.get_user()
|
||||
account = d.get_account()
|
||||
|
||||
return render_template('domain_management.html', domain=domain, users=users, domain_user_ids=domain_user_ids)
|
||||
return render_template('domain_management.html', domain=domain, users=users, domain_user_ids=domain_user_ids, accounts=accounts, domain_account=account)
|
||||
|
||||
if request.method == 'POST':
|
||||
# username in right column
|
||||
@@ -717,9 +745,32 @@ def domain_change_soa_edit_api(domain_name):
|
||||
status = d.update_soa_setting(domain_name=domain_name, soa_edit_api=new_setting)
|
||||
if status['status'] != None:
|
||||
users = User.query.all()
|
||||
accounts = Account.query.all()
|
||||
d = Domain(name=domain_name)
|
||||
domain_user_ids = d.get_user()
|
||||
return render_template('domain_management.html', domain=domain, users=users, domain_user_ids=domain_user_ids, status=status)
|
||||
account = d.get_account()
|
||||
return render_template('domain_management.html', domain=domain, users=users, domain_user_ids=domain_user_ids, accounts=accounts, domain_account=account)
|
||||
else:
|
||||
return redirect(url_for('error', code=500))
|
||||
|
||||
|
||||
@app.route('/admin/domain/<path:domain_name>/change_account', methods=['POST'])
|
||||
@login_required
|
||||
@admin_role_required
|
||||
def domain_change_account(domain_name):
|
||||
domain = Domain.query.filter(Domain.name == domain_name).first()
|
||||
if not domain:
|
||||
return redirect(url_for('error', code=404))
|
||||
|
||||
account_id = request.form.get('accountid')
|
||||
status = domain.assoc_account(account_id)
|
||||
if status['status']:
|
||||
users = User.query.all()
|
||||
accounts = Account.query.all()
|
||||
d = Domain(name=domain_name)
|
||||
domain_user_ids = d.get_user()
|
||||
account = d.get_account()
|
||||
return render_template('domain_management.html', domain=domain, users=users, domain_user_ids=domain_user_ids, accounts=accounts, domain_account=account)
|
||||
else:
|
||||
return redirect(url_for('error', code=500))
|
||||
|
||||
@@ -822,6 +873,7 @@ def domain_dnssec(domain_name):
|
||||
@app.route('/domain/<path:domain_name>/dnssec/enable', methods=['GET'])
|
||||
@login_required
|
||||
@can_access_domain
|
||||
@can_configure_dnssec
|
||||
def domain_dnssec_enable(domain_name):
|
||||
domain = Domain()
|
||||
dnssec = domain.enable_domain_dnssec(domain_name)
|
||||
@@ -831,6 +883,7 @@ def domain_dnssec_enable(domain_name):
|
||||
@app.route('/domain/<path:domain_name>/dnssec/disable', methods=['GET'])
|
||||
@login_required
|
||||
@can_access_domain
|
||||
@can_configure_dnssec
|
||||
def domain_dnssec_disable(domain_name):
|
||||
domain = Domain()
|
||||
dnssec = domain.get_domain_dnssec(domain_name)
|
||||
@@ -1115,6 +1168,8 @@ def admin_manageuser():
|
||||
|
||||
if jdata['action'] == 'delete_user':
|
||||
user = User(username=data)
|
||||
if user.username == current_user.username:
|
||||
return make_response(jsonify( { 'status': 'error', 'msg': 'You cannot delete yourself.' } ), 400)
|
||||
result = user.delete()
|
||||
if result:
|
||||
history = History(msg='Delete username {0}'.format(data), created_by=current_user.username)
|
||||
@@ -1135,6 +1190,8 @@ def admin_manageuser():
|
||||
|
||||
elif jdata['action'] == 'set_admin':
|
||||
username = data['username']
|
||||
if username == current_user.username:
|
||||
return make_response(jsonify( { 'status': 'error', 'msg': 'You cannot change you own admin rights.' } ), 400)
|
||||
is_admin = data['is_admin']
|
||||
user = User(username=username)
|
||||
result = user.set_admin(is_admin)
|
||||
@@ -1151,6 +1208,95 @@ def admin_manageuser():
|
||||
return make_response(jsonify( { 'status': 'error', 'msg': 'There is something wrong, please contact Administrator.' } ), 400)
|
||||
|
||||
|
||||
@app.route('/admin/account/edit/<account_name>', methods=['GET', 'POST'])
|
||||
@app.route('/admin/account/edit', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
@admin_role_required
|
||||
def admin_editaccount(account_name=None):
|
||||
users = User.query.all()
|
||||
|
||||
if request.method == 'GET':
|
||||
if account_name is None:
|
||||
return render_template('admin_editaccount.html', users=users, create=1)
|
||||
|
||||
else:
|
||||
account = Account.query.filter(Account.name == account_name).first()
|
||||
account_user_ids = account.get_user()
|
||||
return render_template('admin_editaccount.html', account=account, account_user_ids=account_user_ids, users=users, create=0)
|
||||
|
||||
if request.method == 'POST':
|
||||
fdata = request.form
|
||||
new_user_list = request.form.getlist('account_multi_user')
|
||||
|
||||
# on POST, synthesize account and account_user_ids from form data
|
||||
if not account_name:
|
||||
account_name = fdata['accountname']
|
||||
|
||||
account = Account(name=account_name, description=fdata['accountdescription'], contact=fdata['accountcontact'], mail=fdata['accountmail'])
|
||||
account_user_ids = []
|
||||
for username in new_user_list:
|
||||
userid = User(username=username).get_user_info_by_username().id
|
||||
account_user_ids.append(userid)
|
||||
|
||||
create = int(fdata['create'])
|
||||
if create:
|
||||
# account __init__ sanitizes and lowercases the name, so to manage expectations
|
||||
# we let the user reenter the name until it's not empty and it's valid (ignoring the case)
|
||||
if account.name == "" or account.name != account_name.lower():
|
||||
return render_template('admin_editaccount.html', account=account, account_user_ids=account_user_ids, users=users, create=create, invalid_accountname=True)
|
||||
|
||||
if Account.query.filter(Account.name == account.name).first():
|
||||
return render_template('admin_editaccount.html', account=account, account_user_ids=account_user_ids, users=users, create=create, duplicate_accountname=True)
|
||||
|
||||
result = account.create_account()
|
||||
history = History(msg='Create account {0}'.format(account.name), created_by=current_user.username)
|
||||
|
||||
else:
|
||||
result = account.update_account()
|
||||
history = History(msg='Update account {0}'.format(account.name), created_by=current_user.username)
|
||||
|
||||
if result['status']:
|
||||
account.grant_privileges(new_user_list)
|
||||
history.add()
|
||||
return redirect(url_for('admin_manageaccount'))
|
||||
|
||||
return render_template('admin_editaccount.html', account=account, account_user_ids=account_user_ids, users=users, create=create, error=result['msg'])
|
||||
|
||||
|
||||
@app.route('/admin/manageaccount', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
@admin_role_required
|
||||
def admin_manageaccount():
|
||||
if request.method == 'GET':
|
||||
accounts = Account.query.order_by(Account.name).all()
|
||||
return render_template('admin_manageaccount.html', accounts=accounts)
|
||||
|
||||
if request.method == 'POST':
|
||||
#
|
||||
# post data should in format
|
||||
# {'action': 'delete_account', 'data': 'accountname'}
|
||||
#
|
||||
try:
|
||||
jdata = request.json
|
||||
data = jdata['data']
|
||||
|
||||
if jdata['action'] == 'delete_account':
|
||||
account = Account(name=data)
|
||||
result = account.delete_account()
|
||||
if result:
|
||||
history = History(msg='Delete account {0}'.format(data), created_by=current_user.username)
|
||||
history.add()
|
||||
return make_response(jsonify( { 'status': 'ok', 'msg': 'Account has been removed.' } ), 200)
|
||||
else:
|
||||
return make_response(jsonify( { 'status': 'error', 'msg': 'Cannot remove account.' } ), 500)
|
||||
|
||||
else:
|
||||
return make_response(jsonify( { 'status': 'error', 'msg': 'Action not supported.' } ), 400)
|
||||
except:
|
||||
logging.error(traceback.print_exc())
|
||||
return make_response(jsonify( { 'status': 'error', 'msg': 'There is something wrong, please contact Administrator.' } ), 400)
|
||||
|
||||
|
||||
@app.route('/admin/history', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
@admin_role_required
|
||||
@@ -1176,7 +1322,16 @@ def admin_history():
|
||||
@admin_role_required
|
||||
def admin_settings():
|
||||
if request.method == 'GET':
|
||||
settings = Setting.query.filter(Setting.name != 'maintenance')
|
||||
# start with a copy of the setting defaults (ignore maintenance setting)
|
||||
settings = Setting.defaults.copy()
|
||||
settings.pop('maintenance', None)
|
||||
|
||||
# update settings info with any customizations
|
||||
for s in settings:
|
||||
value = Setting().get(s)
|
||||
if value is not None:
|
||||
settings[s] = value
|
||||
|
||||
return render_template('admin_settings.html', settings=settings)
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user