mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2024-11-14 01:20:27 +00:00
206 lines
7.1 KiB
Python
206 lines
7.1 KiB
Python
import datetime
|
|
from collections import namedtuple
|
|
from flask import Blueprint, render_template, url_for, current_app, request, jsonify, redirect, g, session, abort
|
|
from flask_login import login_required, current_user, login_manager
|
|
from sqlalchemy import not_
|
|
|
|
from ..decorators import operator_role_required
|
|
from ..models.user import User, Anonymous
|
|
from ..models.account import Account
|
|
from ..models.account_user import AccountUser
|
|
from ..models.domain import Domain
|
|
from ..models.domain_user import DomainUser
|
|
from ..models.setting import Setting
|
|
from ..models.history import History
|
|
from ..models.server import Server
|
|
from ..models.base import db
|
|
|
|
dashboard_bp = Blueprint('dashboard',
|
|
__name__,
|
|
template_folder='templates',
|
|
url_prefix='/dashboard')
|
|
|
|
|
|
class ZoneTabs:
|
|
"""Config data for the zone tabs on the dashboard."""
|
|
|
|
TabInfo = namedtuple('TabInfo', ['display_name', 'filter_pattern'])
|
|
"""Info about a single tab.
|
|
|
|
`display_name` is the name on the tab.
|
|
`filter_pattern` is a SQL LIKE pattern , which is case-insensitively matched against the zone
|
|
name (without the final root-dot).
|
|
|
|
If a filter is present, the tab will show zones that match the filter.
|
|
If no filter is present, the tab will show zones that are not matched by any other tab filter.
|
|
"""
|
|
|
|
tabs = {
|
|
'forward': TabInfo("", None),
|
|
'reverse_ipv4': TabInfo("in-addr.arpa", '%.in-addr.arpa'),
|
|
'reverse_ipv6': TabInfo("ip6.arpa", '%.ip6.arpa'),
|
|
}
|
|
"""Dict of unique tab id to a TabInfo."""
|
|
|
|
order = ['forward', 'reverse_ipv4', 'reverse_ipv6']
|
|
"""List of tab ids in the order they will appear."""
|
|
|
|
|
|
@dashboard_bp.before_request
|
|
def before_request():
|
|
# Check if user is anonymous
|
|
g.user = current_user
|
|
login_manager.anonymous_user = Anonymous
|
|
|
|
# Check site is in maintenance mode
|
|
maintenance = Setting().get('maintenance')
|
|
if maintenance and current_user.is_authenticated and current_user.role.name not in [
|
|
'Administrator', 'Operator'
|
|
]:
|
|
return render_template('maintenance.html')
|
|
|
|
# Manage session timeout
|
|
session.permanent = True
|
|
current_app.permanent_session_lifetime = datetime.timedelta(
|
|
minutes=int(Setting().get('session_timeout')))
|
|
session.modified = True
|
|
|
|
|
|
@dashboard_bp.route('/domains-custom/<path:tab_id>', methods=['GET'])
|
|
@login_required
|
|
def domains_custom(tab_id):
|
|
if tab_id not in ZoneTabs.tabs:
|
|
abort(404)
|
|
|
|
if current_user.role.name in ['Administrator', 'Operator']:
|
|
domains = Domain.query
|
|
else:
|
|
# Get query for domain to which the user has access permission.
|
|
# This includes direct domain permission AND permission through
|
|
# account membership
|
|
domains = db.session.query(Domain) \
|
|
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
|
|
.outerjoin(Account, Domain.account_id == Account.id) \
|
|
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
|
|
.filter(
|
|
db.or_(
|
|
DomainUser.user_id == current_user.id,
|
|
AccountUser.user_id == current_user.id
|
|
))
|
|
|
|
template = current_app.jinja_env.get_template("dashboard_domain.html")
|
|
render = template.make_module(
|
|
vars={"current_user": current_user, "allow_user_view_history": Setting().get('allow_user_view_history')})
|
|
|
|
columns = [
|
|
Domain.name, Domain.dnssec, Domain.type, Domain.serial, Domain.master,
|
|
Domain.account_id
|
|
]
|
|
|
|
order_by = []
|
|
for i in range(len(columns)):
|
|
column_index = request.args.get("order[{0}][column]".format(i))
|
|
sort_direction = request.args.get("order[{0}][dir]".format(i))
|
|
if column_index is None:
|
|
break
|
|
if sort_direction != "asc" and sort_direction != "desc":
|
|
sort_direction = "asc"
|
|
|
|
column = columns[int(column_index)]
|
|
order_by.append(getattr(column, sort_direction)())
|
|
|
|
if order_by:
|
|
domains = domains.order_by(*order_by)
|
|
|
|
if ZoneTabs.tabs[tab_id].filter_pattern:
|
|
# If the tab has a filter, use only that
|
|
domains = domains.filter(Domain.name.ilike(ZoneTabs.tabs[tab_id].filter_pattern))
|
|
else:
|
|
# If the tab has no filter, use all the other filters in negated form
|
|
for tab_info in ZoneTabs.tabs.values():
|
|
if not tab_info.filter_pattern:
|
|
continue
|
|
domains = domains.filter(not_(Domain.name.ilike(tab_info.filter_pattern)))
|
|
|
|
total_count = domains.count()
|
|
|
|
search = request.args.get("search[value]")
|
|
if search:
|
|
start = "" if search.startswith("^") else "%"
|
|
end = "" if search.endswith("$") else "%"
|
|
|
|
if current_user.role.name in ['Administrator', 'Operator']:
|
|
domains = domains.outerjoin(Account).filter(
|
|
Domain.name.ilike(start + search.strip("^$") + end)
|
|
| Account.name.ilike(start + search.strip("^$") + end)
|
|
| Account.description.ilike(start + search.strip("^$") + end))
|
|
else:
|
|
domains = domains.filter(
|
|
Domain.name.ilike(start + search.strip("^$") + end))
|
|
|
|
filtered_count = domains.count()
|
|
|
|
start = int(request.args.get("start", 0))
|
|
length = min(int(request.args.get("length", 0)), 100)
|
|
|
|
if length != -1:
|
|
domains = domains[start:start + length]
|
|
|
|
data = []
|
|
for domain in domains:
|
|
data.append([
|
|
render.name(domain),
|
|
render.dnssec(domain),
|
|
render.type(domain),
|
|
render.serial(domain),
|
|
render.master(domain),
|
|
render.account(domain),
|
|
render.actions(domain),
|
|
])
|
|
|
|
response_data = {
|
|
"draw": int(request.args.get("draw", 0)),
|
|
"recordsTotal": total_count,
|
|
"recordsFiltered": filtered_count,
|
|
"data": data,
|
|
}
|
|
return jsonify(response_data)
|
|
|
|
|
|
@dashboard_bp.route('/', methods=['GET', 'POST'])
|
|
@login_required
|
|
def dashboard():
|
|
if not Setting().get('pdns_api_url') or not Setting().get(
|
|
'pdns_api_key') or not Setting().get('pdns_version'):
|
|
return redirect(url_for('admin.setting_pdns'))
|
|
|
|
BG_DOMAIN_UPDATE = Setting().get('bg_domain_updates')
|
|
if not BG_DOMAIN_UPDATE:
|
|
current_app.logger.info('Updating zones in foreground...')
|
|
Domain().update()
|
|
else:
|
|
current_app.logger.info('Updating zones in background...')
|
|
|
|
show_bg_domain_button = BG_DOMAIN_UPDATE
|
|
if BG_DOMAIN_UPDATE and current_user.role.name not in ['Administrator', 'Operator']:
|
|
show_bg_domain_button = False
|
|
|
|
# Add custom boxes to render_template
|
|
return render_template('dashboard.html',
|
|
zone_tabs=ZoneTabs,
|
|
show_bg_domain_button=show_bg_domain_button,
|
|
pdns_version=Setting().get('pdns_version'))
|
|
|
|
|
|
@dashboard_bp.route('/domains-updater', methods=['GET', 'POST'])
|
|
@login_required
|
|
@operator_role_required
|
|
def domains_updater():
|
|
current_app.logger.debug('Update zones in background')
|
|
d = Domain().update()
|
|
|
|
response_data = {
|
|
"result": d,
|
|
}
|
|
return jsonify(response_data)
|