mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-01-14 14:16:29 +00:00
3e462dab17
CSRF has been initialized *before* the app config was fully read. That made it impossible to configure CSRF properly. Moved the CSRF init into the routes module, and switched from programmatic to decorated exemptions. GET routes don't need to be exempted because they are by default.
1171 lines
48 KiB
Python
1171 lines
48 KiB
Python
import os
|
|
import re
|
|
import json
|
|
import traceback
|
|
import datetime
|
|
import ipaddress
|
|
import base64
|
|
from distutils.util import strtobool
|
|
from yaml import Loader, load
|
|
from flask import Blueprint, render_template, make_response, url_for, current_app, g, session, request, redirect, abort
|
|
from flask_login import login_user, logout_user, login_required, current_user
|
|
|
|
from .base import csrf, login_manager
|
|
from ..lib import utils
|
|
from ..decorators import dyndns_login_required
|
|
from ..models.base import db
|
|
from ..models.user import User, Anonymous
|
|
from ..models.role import Role
|
|
from ..models.account import Account
|
|
from ..models.account_user import AccountUser
|
|
from ..models.domain import Domain
|
|
from ..models.domain_user import DomainUser
|
|
from ..models.domain_setting import DomainSetting
|
|
from ..models.record import Record
|
|
from ..models.setting import Setting
|
|
from ..models.history import History
|
|
from ..services.google import google_oauth
|
|
from ..services.github import github_oauth
|
|
from ..services.azure import azure_oauth
|
|
from ..services.oidc import oidc_oauth
|
|
from ..services.saml import SAML
|
|
from ..services.token import confirm_token
|
|
from ..services.email import send_account_verification
|
|
|
|
google = None
|
|
github = None
|
|
azure = None
|
|
oidc = None
|
|
saml = None
|
|
|
|
index_bp = Blueprint('index',
|
|
__name__,
|
|
template_folder='templates',
|
|
url_prefix='/')
|
|
|
|
@index_bp.before_app_first_request
|
|
def register_modules():
|
|
global google
|
|
global github
|
|
global azure
|
|
global oidc
|
|
global saml
|
|
google = google_oauth()
|
|
github = github_oauth()
|
|
azure = azure_oauth()
|
|
oidc = oidc_oauth()
|
|
saml = SAML()
|
|
|
|
|
|
@index_bp.before_request
|
|
def before_request():
|
|
# Check if user is anonymous
|
|
g.user = current_user
|
|
login_manager.anonymous_user = Anonymous
|
|
|
|
# Check site is in maintenance mode
|
|
maintenance = Setting().get('maintenance')
|
|
if maintenance and current_user.is_authenticated and current_user.role.name not in [
|
|
'Administrator', 'Operator'
|
|
]:
|
|
return render_template('maintenance.html')
|
|
|
|
# Manage session timeout
|
|
session.permanent = True
|
|
current_app.permanent_session_lifetime = datetime.timedelta(
|
|
minutes=int(Setting().get('session_timeout')))
|
|
session.modified = True
|
|
|
|
|
|
@index_bp.route('/', methods=['GET'])
|
|
@login_required
|
|
def index():
|
|
return redirect(url_for('dashboard.dashboard'))
|
|
|
|
|
|
@index_bp.route('/ping', methods=['GET'])
|
|
def ping():
|
|
return make_response('ok')
|
|
|
|
|
|
@index_bp.route('/google/login')
|
|
def google_login():
|
|
if not Setting().get('google_oauth_enabled') or google is None:
|
|
current_app.logger.error(
|
|
'Google OAuth is disabled or you have not yet reloaded the pda application after enabling.'
|
|
)
|
|
abort(400)
|
|
else:
|
|
redirect_uri = url_for('google_authorized', _external=True)
|
|
return google.authorize_redirect(redirect_uri)
|
|
|
|
|
|
@index_bp.route('/github/login')
|
|
def github_login():
|
|
if not Setting().get('github_oauth_enabled') or github is None:
|
|
current_app.logger.error(
|
|
'Github OAuth is disabled or you have not yet reloaded the pda application after enabling.'
|
|
)
|
|
abort(400)
|
|
else:
|
|
redirect_uri = url_for('github_authorized', _external=True)
|
|
return github.authorize_redirect(redirect_uri)
|
|
|
|
|
|
@index_bp.route('/azure/login')
|
|
def azure_login():
|
|
if not Setting().get('azure_oauth_enabled') or azure is None:
|
|
current_app.logger.error(
|
|
'Microsoft OAuth is disabled or you have not yet reloaded the pda application after enabling.'
|
|
)
|
|
abort(400)
|
|
else:
|
|
redirect_uri = url_for('azure_authorized',
|
|
_external=True,
|
|
_scheme='https')
|
|
return azure.authorize_redirect(redirect_uri)
|
|
|
|
|
|
@index_bp.route('/oidc/login')
|
|
def oidc_login():
|
|
if not Setting().get('oidc_oauth_enabled') or oidc is None:
|
|
current_app.logger.error(
|
|
'OIDC OAuth is disabled or you have not yet reloaded the pda application after enabling.'
|
|
)
|
|
abort(400)
|
|
else:
|
|
redirect_uri = url_for('oidc_authorized', _external=True)
|
|
return oidc.authorize_redirect(redirect_uri)
|
|
|
|
|
|
@index_bp.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
SAML_ENABLED = current_app.config.get('SAML_ENABLED')
|
|
|
|
if g.user is not None and current_user.is_authenticated:
|
|
return redirect(url_for('dashboard.dashboard'))
|
|
|
|
if 'google_token' in session:
|
|
user_data = json.loads(google.get('userinfo').text)
|
|
first_name = user_data['given_name']
|
|
surname = user_data['family_name']
|
|
email = user_data['email']
|
|
user = User.query.filter_by(username=email).first()
|
|
if user is None:
|
|
user = User.query.filter_by(email=email).first()
|
|
if not user:
|
|
user = User(username=email,
|
|
firstname=first_name,
|
|
lastname=surname,
|
|
plain_text_password=None,
|
|
email=email)
|
|
|
|
result = user.create_local_user()
|
|
if not result['status']:
|
|
session.pop('google_token', None)
|
|
return redirect(url_for('index.login'))
|
|
|
|
session['user_id'] = user.id
|
|
session['authentication_type'] = 'OAuth'
|
|
return authenticate_user(user, 'Google OAuth')
|
|
|
|
if 'github_token' in session:
|
|
me = json.loads(github.get('user').text)
|
|
github_username = me['login']
|
|
github_name = me['name']
|
|
github_email = me['email']
|
|
|
|
user = User.query.filter_by(username=github_username).first()
|
|
if user is None:
|
|
user = User.query.filter_by(email=github_email).first()
|
|
if not user:
|
|
user = User(username=github_username,
|
|
plain_text_password=None,
|
|
firstname=github_name,
|
|
lastname='',
|
|
email=github_email)
|
|
|
|
result = user.create_local_user()
|
|
if not result['status']:
|
|
session.pop('github_token', None)
|
|
return redirect(url_for('index.login'))
|
|
|
|
session['user_id'] = user.id
|
|
session['authentication_type'] = 'OAuth'
|
|
return authenticate_user(user, 'Github OAuth')
|
|
|
|
if 'azure_token' in session:
|
|
azure_info = azure.get('me?$select=displayName,givenName,id,mail,surname,userPrincipalName').text
|
|
current_app.logger.info('Azure login returned: '+azure_info)
|
|
me = json.loads(azure_info)
|
|
|
|
azure_info = azure.post('me/getMemberGroups',
|
|
json={'securityEnabledOnly': False}).text
|
|
current_app.logger.info('Azure groups returned: ' + azure_info)
|
|
grouplookup = json.loads(azure_info)
|
|
# Groups are in mygroups['value'] which is an array
|
|
if "value" in grouplookup:
|
|
mygroups = grouplookup["value"]
|
|
else:
|
|
mygroups = []
|
|
|
|
azure_username = me["userPrincipalName"]
|
|
azure_givenname = me["givenName"]
|
|
azure_familyname = me["surname"]
|
|
if "mail" in me:
|
|
azure_email = me["mail"]
|
|
else:
|
|
azure_email = ""
|
|
if not azure_email:
|
|
azure_email = me["userPrincipalName"]
|
|
|
|
# Handle foreign principals such as guest users
|
|
azure_email = re.sub(r"#.*$", "", azure_email)
|
|
azure_username = re.sub(r"#.*$", "", azure_username)
|
|
|
|
user = User.query.filter_by(username=azure_username).first()
|
|
if not user:
|
|
user = User(username=azure_username,
|
|
plain_text_password=None,
|
|
firstname=azure_givenname,
|
|
lastname=azure_familyname,
|
|
email=azure_email)
|
|
|
|
result = user.create_local_user()
|
|
if not result['status']:
|
|
current_app.logger.warning('Unable to create ' + azure_username)
|
|
session.pop('azure_token', None)
|
|
# note: a redirect to login results in an endless loop, so render the login page instead
|
|
return render_template('login.html',
|
|
saml_enabled=SAML_ENABLED,
|
|
error=('User ' + azure_username +
|
|
' cannot be created.'))
|
|
|
|
session['user_id'] = user.id
|
|
session['authentication_type'] = 'OAuth'
|
|
|
|
# Handle group memberships, if defined
|
|
if Setting().get('azure_sg_enabled'):
|
|
if Setting().get('azure_admin_group') in mygroups:
|
|
current_app.logger.info('Setting role for user ' +
|
|
azure_username +
|
|
' to Administrator due to group membership')
|
|
user.set_role("Administrator")
|
|
else:
|
|
if Setting().get('azure_operator_group') in mygroups:
|
|
current_app.logger.info('Setting role for user ' +
|
|
azure_username +
|
|
' to Operator due to group membership')
|
|
user.set_role("Operator")
|
|
else:
|
|
if Setting().get('azure_user_group') in mygroups:
|
|
current_app.logger.info('Setting role for user ' +
|
|
azure_username +
|
|
' to User due to group membership')
|
|
user.set_role("User")
|
|
else:
|
|
current_app.logger.warning('User ' +
|
|
azure_username +
|
|
' has no relevant group memberships')
|
|
session.pop('azure_token', None)
|
|
return render_template('login.html',
|
|
saml_enabled=SAML_ENABLED,
|
|
error=('User ' + azure_username +
|
|
' is not in any authorised groups.'))
|
|
|
|
# Handle account/group creation, if enabled
|
|
if Setting().get('azure_group_accounts_enabled') and mygroups:
|
|
current_app.logger.info('Azure group account sync enabled')
|
|
name_value = Setting().get('azure_group_accounts_name')
|
|
description_value = Setting().get('azure_group_accounts_description')
|
|
select_values = name_value
|
|
if description_value != '':
|
|
select_values += ',' + description_value
|
|
|
|
mygroups = get_azure_groups(
|
|
'me/memberOf/microsoft.graph.group?$count=false&$securityEnabled=true&$select={}'.format(select_values))
|
|
|
|
description_pattern = Setting().get('azure_group_accounts_description_re')
|
|
pattern = Setting().get('azure_group_accounts_name_re')
|
|
|
|
# Loop through users security groups
|
|
for azure_group in mygroups:
|
|
if name_value in azure_group:
|
|
group_name = azure_group[name_value]
|
|
group_description = ''
|
|
if description_value in azure_group:
|
|
group_description = azure_group[description_value]
|
|
|
|
# Do regex search if enabled for group description
|
|
if description_pattern != '':
|
|
current_app.logger.info('Matching group description {} against regex {}'.format(
|
|
group_description, description_pattern))
|
|
matches = re.match(
|
|
description_pattern, group_description)
|
|
if matches:
|
|
current_app.logger.info(
|
|
'Group {} matched regexp'.format(group_description))
|
|
group_description = matches.group(1)
|
|
else:
|
|
# Regexp didn't match, continue to next iteration
|
|
continue
|
|
|
|
# Do regex search if enabled for group name
|
|
if pattern != '':
|
|
current_app.logger.info(
|
|
'Matching group name {} against regex {}'.format(group_name, pattern))
|
|
matches = re.match(pattern, group_name)
|
|
if matches:
|
|
current_app.logger.info(
|
|
'Group {} matched regexp'.format(group_name))
|
|
group_name = matches.group(1)
|
|
else:
|
|
# Regexp didn't match, continue to next iteration
|
|
continue
|
|
|
|
sanitized_group_name = Account.sanitize_name(group_name)
|
|
account_id = account.get_id_by_name(account_name=sanitized_group_name)
|
|
|
|
if account_id:
|
|
account = Account.query.get(account_id)
|
|
# check if user has permissions
|
|
account_users = account.get_user()
|
|
current_app.logger.info('Group: {} Users: {}'.format(
|
|
group_name,
|
|
account_users))
|
|
if user.id in account_users:
|
|
current_app.logger.info('User id {} is already in account {}'.format(
|
|
user.id, group_name))
|
|
else:
|
|
account.add_user(user)
|
|
history = History(msg='Update account {0}'.format(
|
|
account.name),
|
|
created_by='System')
|
|
history.add()
|
|
current_app.logger.info('User {} added to Account {}'.format(
|
|
user.username, account.name))
|
|
else:
|
|
account = Account(
|
|
name=sanitized_group_name,
|
|
description=group_description,
|
|
contact='',
|
|
mail=''
|
|
)
|
|
account.create_account()
|
|
history = History(msg='Create account {0}'.format(
|
|
account.name),
|
|
created_by='System')
|
|
history.add()
|
|
|
|
account.add_user(user)
|
|
history = History(msg='Update account {0}'.format(account.name),
|
|
created_by='System')
|
|
history.add()
|
|
current_app.logger.warning('group info: {} '.format(account_id))
|
|
|
|
return authenticate_user(user, 'Azure OAuth')
|
|
|
|
if 'oidc_token' in session:
|
|
me = json.loads(oidc.get('userinfo').text)
|
|
oidc_username = me[Setting().get('oidc_oauth_username')]
|
|
oidc_givenname = me[Setting().get('oidc_oauth_firstname')]
|
|
oidc_familyname = me[Setting().get('oidc_oauth_last_name')]
|
|
oidc_email = me[Setting().get('oidc_oauth_email')]
|
|
|
|
user = User.query.filter_by(username=oidc_username).first()
|
|
if not user:
|
|
user = User(username=oidc_username,
|
|
plain_text_password=None,
|
|
firstname=oidc_givenname,
|
|
lastname=oidc_familyname,
|
|
email=oidc_email)
|
|
result = user.create_local_user()
|
|
else:
|
|
user.firstname = oidc_givenname
|
|
user.lastname = oidc_familyname
|
|
user.email = oidc_email
|
|
user.plain_text_password = None
|
|
result = user.update_local_user()
|
|
|
|
if not result['status']:
|
|
session.pop('oidc_token', None)
|
|
return redirect(url_for('index.login'))
|
|
|
|
#This checks if the account_name_property and account_description property were included in settings.
|
|
if Setting().get('oidc_oauth_account_name_property') and Setting().get('oidc_oauth_account_description_property'):
|
|
|
|
#Gets the name_property and description_property.
|
|
name_prop = Setting().get('oidc_oauth_account_name_property')
|
|
desc_prop = Setting().get('oidc_oauth_account_description_property')
|
|
|
|
account_to_add = []
|
|
#If the name_property and desc_property exist in me (A variable that contains all the userinfo from the IdP).
|
|
if name_prop in me and desc_prop in me:
|
|
accounts_name_prop = [me[name_prop]] if type(me[name_prop]) is not list else me[name_prop]
|
|
accounts_desc_prop = [me[desc_prop]] if type(me[desc_prop]) is not list else me[desc_prop]
|
|
|
|
#Run on all groups the user is in by the index num.
|
|
for i in range(len(accounts_name_prop)):
|
|
description = ''
|
|
if i < len(accounts_desc_prop):
|
|
description = accounts_desc_prop[i]
|
|
account = handle_account(accounts_name_prop[i], description)
|
|
|
|
account_to_add.append(account)
|
|
user_accounts = user.get_accounts()
|
|
|
|
# Add accounts
|
|
for account in account_to_add:
|
|
if account not in user_accounts:
|
|
account.add_user(user)
|
|
|
|
# Remove accounts if the setting is enabled
|
|
if Setting().get('delete_sso_accounts'):
|
|
for account in user_accounts:
|
|
if account not in account_to_add:
|
|
account.remove_user(user)
|
|
|
|
session['user_id'] = user.id
|
|
session['authentication_type'] = 'OAuth'
|
|
return authenticate_user(user, 'OIDC OAuth')
|
|
|
|
if request.method == 'GET':
|
|
return render_template('login.html', saml_enabled=SAML_ENABLED)
|
|
elif request.method == 'POST':
|
|
# process Local-DB authentication
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
otp_token = request.form.get('otptoken')
|
|
auth_method = request.form.get('auth_method', 'LOCAL')
|
|
session[
|
|
'authentication_type'] = 'LDAP' if auth_method != 'LOCAL' else 'LOCAL'
|
|
remember_me = True if 'remember' in request.form else False
|
|
|
|
if auth_method == 'LOCAL' and not Setting().get('local_db_enabled'):
|
|
return render_template(
|
|
'login.html',
|
|
saml_enabled=SAML_ENABLED,
|
|
error='Local authentication is disabled')
|
|
|
|
user = User(username=username,
|
|
password=password,
|
|
plain_text_password=password)
|
|
|
|
try:
|
|
if Setting().get('verify_user_email') and user.email and not user.confirmed:
|
|
return render_template(
|
|
'login.html',
|
|
saml_enabled=SAML_ENABLED,
|
|
error='Please confirm your email address first')
|
|
|
|
auth = user.is_validate(method=auth_method,
|
|
src_ip=request.remote_addr)
|
|
if auth == False:
|
|
signin_history(user.username, auth_method, False)
|
|
return render_template('login.html',
|
|
saml_enabled=SAML_ENABLED,
|
|
error='Invalid credentials')
|
|
except Exception as e:
|
|
current_app.logger.error(
|
|
"Cannot authenticate user. Error: {}".format(e))
|
|
current_app.logger.debug(traceback.format_exc())
|
|
return render_template('login.html',
|
|
saml_enabled=SAML_ENABLED,
|
|
error=e)
|
|
|
|
# check if user enabled OPT authentication
|
|
if user.otp_secret:
|
|
if otp_token and otp_token.isdigit():
|
|
good_token = user.verify_totp(otp_token)
|
|
if not good_token:
|
|
signin_history(user.username, auth_method, False)
|
|
return render_template('login.html',
|
|
saml_enabled=SAML_ENABLED,
|
|
error='Invalid credentials')
|
|
else:
|
|
return render_template('login.html',
|
|
saml_enabled=SAML_ENABLED,
|
|
error='Token required')
|
|
|
|
if Setting().get('autoprovisioning') and auth_method!='LOCAL':
|
|
urn_value=Setting().get('urn_value')
|
|
Entitlements=user.read_entitlements(Setting().get('autoprovisioning_attribute'))
|
|
if len(Entitlements)==0 and Setting().get('purge'):
|
|
user.set_role("User")
|
|
user.revoke_privilege(True)
|
|
|
|
elif len(Entitlements)!=0:
|
|
if checkForPDAEntries(Entitlements, urn_value):
|
|
user.updateUser(Entitlements)
|
|
else:
|
|
current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix')
|
|
if Setting().get('purge'):
|
|
user.set_role("User")
|
|
user.revoke_privilege(True)
|
|
current_app.logger.warning('Procceding to revoke every privilige from ' + user.username + '.' )
|
|
|
|
return authenticate_user(user, auth_method, remember_me)
|
|
|
|
def checkForPDAEntries(Entitlements, urn_value):
|
|
"""
|
|
Run through every record located in the ldap attribute given and determine if there are any valid powerdns-admin records
|
|
"""
|
|
urnArguments=[x.lower() for x in urn_value.split(':')]
|
|
for Entitlement in Entitlements:
|
|
entArguments=Entitlement.split(':powerdns-admin')
|
|
entArguments=[x.lower() for x in entArguments[0].split(':')]
|
|
if (entArguments==urnArguments):
|
|
return True
|
|
return False
|
|
|
|
|
|
def clear_session():
|
|
session.pop('user_id', None)
|
|
session.pop('github_token', None)
|
|
session.pop('google_token', None)
|
|
session.pop('authentication_type', None)
|
|
session.pop('remote_user', None)
|
|
session.clear()
|
|
logout_user()
|
|
|
|
|
|
def signin_history(username, authenticator, success):
|
|
# Get user ip address
|
|
if request.headers.getlist("X-Forwarded-For"):
|
|
request_ip = request.headers.getlist("X-Forwarded-For")[0]
|
|
request_ip = request_ip.split(',')[0]
|
|
else:
|
|
request_ip = request.remote_addr
|
|
|
|
# Write log
|
|
if success:
|
|
str_success = 'succeeded'
|
|
current_app.logger.info(
|
|
"User {} authenticated successfully via {} from {}".format(
|
|
username, authenticator, request_ip))
|
|
else:
|
|
str_success = 'failed'
|
|
current_app.logger.warning(
|
|
"User {} failed to authenticate via {} from {}".format(
|
|
username, authenticator, request_ip))
|
|
|
|
# Write history
|
|
History(msg='User {} authentication {}'.format(username, str_success),
|
|
detail = json.dumps({
|
|
'username': username,
|
|
'authenticator': authenticator,
|
|
'ip_address': request_ip,
|
|
'success': 1 if success else 0
|
|
}),
|
|
created_by='System').add()
|
|
|
|
# Get a list of Azure security groups the user is a member of
|
|
def get_azure_groups(uri):
|
|
azure_info = azure.get(uri).text
|
|
current_app.logger.info('Azure groups returned: ' + azure_info)
|
|
grouplookup = json.loads(azure_info)
|
|
if "value" in grouplookup:
|
|
mygroups = grouplookup["value"]
|
|
# If "@odata.nextLink" exists in the results, we need to get more groups
|
|
if "@odata.nextLink" in grouplookup:
|
|
# The additional groups are added to the existing array
|
|
mygroups.extend(get_azure_groups(grouplookup["@odata.nextLink"]))
|
|
else:
|
|
mygroups = []
|
|
return mygroups
|
|
|
|
# Handle user login, write history and, if set, handle showing the register_otp QR code.
|
|
# if Setting for OTP on first login is enabled, and OTP field is also enabled,
|
|
# but user isn't using it yet, enable OTP, get QR code and display it, logging the user out.
|
|
def authenticate_user(user, authenticator, remember=False):
|
|
login_user(user, remember=remember)
|
|
signin_history(user.username, authenticator, True)
|
|
if Setting().get('otp_force') and Setting().get('otp_field_enabled') and not user.otp_secret:
|
|
user.update_profile(enable_otp=True)
|
|
user_id = current_user.id
|
|
prepare_welcome_user(user_id)
|
|
return redirect(url_for('index.welcome'))
|
|
return redirect(url_for('index.login'))
|
|
|
|
# Prepare user to enter /welcome screen, otherwise they won't have permission to do so
|
|
def prepare_welcome_user(user_id):
|
|
logout_user()
|
|
session['welcome_user_id'] = user_id
|
|
|
|
@index_bp.route('/logout')
|
|
def logout():
|
|
if current_app.config.get(
|
|
'SAML_ENABLED'
|
|
) and 'samlSessionIndex' in session and current_app.config.get(
|
|
'SAML_LOGOUT'):
|
|
req = saml.prepare_flask_request(request)
|
|
auth = saml.init_saml_auth(req)
|
|
if current_app.config.get('SAML_LOGOUT_URL'):
|
|
return redirect(
|
|
auth.logout(
|
|
name_id_format=
|
|
"urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress",
|
|
return_to=current_app.config.get('SAML_LOGOUT_URL'),
|
|
session_index=session['samlSessionIndex'],
|
|
name_id=session['samlNameId']))
|
|
return redirect(
|
|
auth.logout(
|
|
name_id_format=
|
|
"urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress",
|
|
session_index=session['samlSessionIndex'],
|
|
name_id=session['samlNameId']))
|
|
|
|
redirect_uri = url_for('index.login')
|
|
oidc_logout = Setting().get('oidc_oauth_logout_url')
|
|
|
|
if 'oidc_token' in session and oidc_logout:
|
|
redirect_uri = "{}?redirect_uri={}".format(
|
|
oidc_logout, url_for('index.login', _external=True))
|
|
|
|
# Clean cookies and flask session
|
|
clear_session()
|
|
|
|
# If remote user authentication is enabled and a logout URL is configured for it,
|
|
# redirect users to that instead
|
|
remote_user_logout_url = current_app.config.get('REMOTE_USER_LOGOUT_URL')
|
|
if current_app.config.get('REMOTE_USER_ENABLED') and remote_user_logout_url:
|
|
current_app.logger.debug(
|
|
'Redirecting remote user "{0}" to logout URL {1}'
|
|
.format(current_user.username, remote_user_logout_url))
|
|
# Warning: if REMOTE_USER environment variable is still set and not cleared by
|
|
# some external module, not defining a custom logout URL will trigger a loop
|
|
# that will just log the user back in right after logging out
|
|
res = make_response(redirect(remote_user_logout_url.strip()))
|
|
|
|
# Remove any custom cookies the remote authentication mechanism may use
|
|
# (e.g.: MOD_AUTH_CAS and MOD_AUTH_CAS_S)
|
|
remote_cookies = current_app.config.get('REMOTE_USER_COOKIES')
|
|
for r_cookie_name in utils.ensure_list(remote_cookies):
|
|
res.delete_cookie(r_cookie_name)
|
|
|
|
return res
|
|
|
|
return redirect(redirect_uri)
|
|
|
|
|
|
@index_bp.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
if Setting().get('signup_enabled'):
|
|
if request.method == 'GET':
|
|
return render_template('register.html')
|
|
elif request.method == 'POST':
|
|
username = request.form.get('username', '').strip()
|
|
password = request.form.get('password', '')
|
|
firstname = request.form.get('firstname', '').strip()
|
|
lastname = request.form.get('lastname', '').strip()
|
|
email = request.form.get('email', '').strip()
|
|
rpassword = request.form.get('rpassword', '')
|
|
|
|
if not username or not password or not email:
|
|
return render_template(
|
|
'register.html', error='Please input required information')
|
|
|
|
if password != rpassword:
|
|
return render_template(
|
|
'register.html',
|
|
error="Password confirmation does not match")
|
|
|
|
user = User(username=username,
|
|
plain_text_password=password,
|
|
firstname=firstname,
|
|
lastname=lastname,
|
|
email=email)
|
|
|
|
try:
|
|
result = user.create_local_user()
|
|
if result and result['status']:
|
|
if Setting().get('verify_user_email'):
|
|
send_account_verification(email)
|
|
if Setting().get('otp_force') and Setting().get('otp_field_enabled'):
|
|
user.update_profile(enable_otp=True)
|
|
prepare_welcome_user(user.id)
|
|
return redirect(url_for('index.welcome'))
|
|
else:
|
|
return redirect(url_for('index.login'))
|
|
else:
|
|
return render_template('register.html',
|
|
error=result['msg'])
|
|
except Exception as e:
|
|
return render_template('register.html', error=e)
|
|
else:
|
|
return render_template('errors/404.html'), 404
|
|
|
|
|
|
# Show welcome page on first login if otp_force is enabled
|
|
@index_bp.route('/welcome', methods=['GET', 'POST'])
|
|
def welcome():
|
|
if 'welcome_user_id' not in session:
|
|
return redirect(url_for('index.index'))
|
|
|
|
user = User(id=session['welcome_user_id'])
|
|
encoded_img_data = base64.b64encode(user.get_qrcode_value())
|
|
|
|
if request.method == 'GET':
|
|
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user)
|
|
elif request.method == 'POST':
|
|
otp_token = request.form.get('otptoken', '')
|
|
if otp_token and otp_token.isdigit():
|
|
good_token = user.verify_totp(otp_token)
|
|
if not good_token:
|
|
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user, error="Invalid token")
|
|
else:
|
|
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user, error="Token required")
|
|
session.pop('welcome_user_id')
|
|
return redirect(url_for('index.index'))
|
|
|
|
@index_bp.route('/confirm/<token>', methods=['GET'])
|
|
def confirm_email(token):
|
|
email = confirm_token(token)
|
|
if not email:
|
|
# Cannot confirm email
|
|
return render_template('email_confirmation.html', status=0)
|
|
|
|
user = User.query.filter_by(email=email).first_or_404()
|
|
if user.confirmed:
|
|
# Already confirmed
|
|
current_app.logger.info(
|
|
"User email {} already confirmed".format(email))
|
|
return render_template('email_confirmation.html', status=2)
|
|
else:
|
|
# Confirm email is valid
|
|
user.update_confirmed(confirmed=1)
|
|
current_app.logger.info(
|
|
"User email {} confirmed successfully".format(email))
|
|
return render_template('email_confirmation.html', status=1)
|
|
|
|
|
|
@index_bp.route('/resend-confirmation-email', methods=['GET', 'POST'])
|
|
def resend_confirmation_email():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index.index'))
|
|
if request.method == 'GET':
|
|
return render_template('resend_confirmation_email.html')
|
|
elif request.method == 'POST':
|
|
email = request.form.get('email')
|
|
user = User.query.filter(User.email == email).first()
|
|
if not user:
|
|
# Email not found
|
|
status = 0
|
|
elif user.confirmed:
|
|
# Email already confirmed
|
|
status = 1
|
|
else:
|
|
# Send new confirmed email
|
|
send_account_verification(user.email)
|
|
status = 2
|
|
|
|
return render_template('resend_confirmation_email.html', status=status)
|
|
|
|
|
|
@index_bp.route('/nic/checkip.html', methods=['GET', 'POST'])
|
|
@csrf.exempt
|
|
def dyndns_checkip():
|
|
# This route covers the default ddclient 'web' setting for the checkip service
|
|
return render_template('dyndns.html',
|
|
response=request.environ.get(
|
|
'HTTP_X_REAL_IP', request.remote_addr))
|
|
|
|
|
|
@index_bp.route('/nic/update', methods=['GET', 'POST'])
|
|
@csrf.exempt
|
|
@dyndns_login_required
|
|
def dyndns_update():
|
|
# dyndns protocol response codes in use are:
|
|
# good: update successful
|
|
# nochg: IP address already set to update address
|
|
# nohost: hostname does not exist for this user account
|
|
# 911: server error
|
|
# have to use 200 HTTP return codes because ddclient does not read the return string if the code is other than 200
|
|
# reference: https://help.dyn.com/remote-access-api/perform-update/
|
|
# reference: https://help.dyn.com/remote-access-api/return-codes/
|
|
hostname = request.args.get('hostname')
|
|
myip = request.args.get('myip')
|
|
|
|
if not hostname:
|
|
history = History(msg="DynDNS update: missing hostname parameter",
|
|
created_by=current_user.username)
|
|
history.add()
|
|
return render_template('dyndns.html', response='nohost'), 200
|
|
|
|
try:
|
|
if current_user.role.name in ['Administrator', 'Operator']:
|
|
domains = Domain.query.all()
|
|
else:
|
|
# Get query for domain to which the user has access permission.
|
|
# This includes direct domain permission AND permission through
|
|
# account membership
|
|
domains = db.session.query(Domain) \
|
|
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
|
|
.outerjoin(Account, Domain.account_id == Account.id) \
|
|
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
|
|
.filter(
|
|
db.or_(
|
|
DomainUser.user_id == current_user.id,
|
|
AccountUser.user_id == current_user.id
|
|
)).all()
|
|
except Exception as e:
|
|
current_app.logger.error('DynDNS Error: {0}'.format(e))
|
|
current_app.logger.debug(traceback.format_exc())
|
|
return render_template('dyndns.html', response='911'), 200
|
|
|
|
domain = None
|
|
domain_segments = hostname.split('.')
|
|
for _index in range(len(domain_segments)):
|
|
full_domain = '.'.join(domain_segments)
|
|
potential_domain = Domain.query.filter(
|
|
Domain.name == full_domain).first()
|
|
if potential_domain in domains:
|
|
domain = potential_domain
|
|
break
|
|
domain_segments.pop(0)
|
|
|
|
if not domain:
|
|
history = History(
|
|
msg=
|
|
"DynDNS update: attempted update of {0} but it does not exist for this user"
|
|
.format(hostname),
|
|
created_by=current_user.username)
|
|
history.add()
|
|
return render_template('dyndns.html', response='nohost'), 200
|
|
|
|
myip_addr = []
|
|
if myip:
|
|
for address in myip.split(','):
|
|
myip_addr += utils.validate_ipaddress(address)
|
|
|
|
remote_addr = utils.validate_ipaddress(
|
|
request.headers.get('X-Forwarded-For',
|
|
request.remote_addr).split(', ')[0])
|
|
|
|
response = 'nochg'
|
|
for ip in myip_addr or remote_addr:
|
|
if isinstance(ip, ipaddress.IPv4Address):
|
|
rtype = 'A'
|
|
else:
|
|
rtype = 'AAAA'
|
|
|
|
r = Record(name=hostname, type=rtype)
|
|
# Check if the user requested record exists within this domain
|
|
if r.exists(domain.name) and r.is_allowed_edit():
|
|
if r.data == str(ip):
|
|
# Record content did not change, return 'nochg'
|
|
history = History(
|
|
msg=
|
|
"DynDNS update: attempted update of {0} but record already up-to-date"
|
|
.format(hostname),
|
|
created_by=current_user.username,
|
|
domain_id=domain.id)
|
|
history.add()
|
|
else:
|
|
oldip = r.data
|
|
result = r.update(domain.name, str(ip))
|
|
if result['status'] == 'ok':
|
|
history = History(
|
|
msg='DynDNS update: updated {} successfully'.format(hostname),
|
|
detail = json.dumps({
|
|
'domain': domain.name,
|
|
'record': hostname,
|
|
'type': rtype,
|
|
'old_value': oldip,
|
|
'new_value': str(ip)
|
|
}),
|
|
created_by=current_user.username,
|
|
domain_id=domain.id)
|
|
history.add()
|
|
response = 'good'
|
|
else:
|
|
response = '911'
|
|
break
|
|
elif r.is_allowed_edit():
|
|
ondemand_creation = DomainSetting.query.filter(
|
|
DomainSetting.domain == domain).filter(
|
|
DomainSetting.setting == 'create_via_dyndns').first()
|
|
if (ondemand_creation is not None) and (strtobool(
|
|
ondemand_creation.value) == True):
|
|
|
|
# Build the rrset
|
|
rrset_data = [{
|
|
"changetype": "REPLACE",
|
|
"name": hostname + '.',
|
|
"ttl": 3600,
|
|
"type": rtype,
|
|
"records": [{
|
|
"content": str(ip),
|
|
"disabled": False
|
|
}],
|
|
"comments": []
|
|
}]
|
|
|
|
# Format the rrset
|
|
rrset = {"rrsets": rrset_data}
|
|
result = Record().add(domain.name, rrset)
|
|
if result['status'] == 'ok':
|
|
history = History(
|
|
msg=
|
|
'DynDNS update: created record {0} in zone {1} successfully'
|
|
.format(hostname, domain.name, str(ip)),
|
|
detail = json.dumps({
|
|
'domain': domain.name,
|
|
'record': hostname,
|
|
'value': str(ip)
|
|
}),
|
|
created_by=current_user.username,
|
|
domain_id=domain.id)
|
|
history.add()
|
|
response = 'good'
|
|
else:
|
|
history = History(
|
|
msg=
|
|
'DynDNS update: attempted update of {0} but it does not exist for this user'
|
|
.format(hostname),
|
|
created_by=current_user.username)
|
|
history.add()
|
|
|
|
return render_template('dyndns.html', response=response), 200
|
|
|
|
|
|
### START SAML AUTHENTICATION ###
|
|
@index_bp.route('/saml/login')
|
|
def saml_login():
|
|
if not current_app.config.get('SAML_ENABLED'):
|
|
abort(400)
|
|
from onelogin.saml2.utils import OneLogin_Saml2_Utils
|
|
req = saml.prepare_flask_request(request)
|
|
auth = saml.init_saml_auth(req)
|
|
redirect_url = OneLogin_Saml2_Utils.get_self_url(req) + url_for(
|
|
'index.saml_authorized')
|
|
return redirect(auth.login(return_to=redirect_url))
|
|
|
|
|
|
@index_bp.route('/saml/metadata')
|
|
def saml_metadata():
|
|
if not current_app.config.get('SAML_ENABLED'):
|
|
current_app.logger.error("SAML authentication is disabled.")
|
|
abort(400)
|
|
from onelogin.saml2.utils import OneLogin_Saml2_Utils
|
|
req = saml.prepare_flask_request(request)
|
|
auth = saml.init_saml_auth(req)
|
|
settings = auth.get_settings()
|
|
metadata = settings.get_sp_metadata()
|
|
errors = settings.validate_metadata(metadata)
|
|
|
|
if len(errors) == 0:
|
|
resp = make_response(metadata, 200)
|
|
resp.headers['Content-Type'] = 'text/xml'
|
|
else:
|
|
resp = make_response(errors.join(', '), 500)
|
|
return resp
|
|
|
|
|
|
@index_bp.route('/saml/authorized', methods=['GET', 'POST'])
|
|
@csrf.exempt
|
|
def saml_authorized():
|
|
errors = []
|
|
if not current_app.config.get('SAML_ENABLED'):
|
|
current_app.logger.error("SAML authentication is disabled.")
|
|
abort(400)
|
|
from onelogin.saml2.utils import OneLogin_Saml2_Utils
|
|
req = saml.prepare_flask_request(request)
|
|
auth = saml.init_saml_auth(req)
|
|
auth.process_response()
|
|
current_app.logger.debug( auth.get_attributes() )
|
|
errors = auth.get_errors()
|
|
if len(errors) == 0:
|
|
session['samlUserdata'] = auth.get_attributes()
|
|
session['samlNameId'] = auth.get_nameid()
|
|
session['samlSessionIndex'] = auth.get_session_index()
|
|
self_url = OneLogin_Saml2_Utils.get_self_url(req)
|
|
self_url = self_url + req['script_name']
|
|
if 'RelayState' in request.form and self_url != request.form[
|
|
'RelayState']:
|
|
return redirect(auth.redirect_to(request.form['RelayState']))
|
|
if current_app.config.get('SAML_ATTRIBUTE_USERNAME', False):
|
|
username = session['samlUserdata'][
|
|
current_app.config['SAML_ATTRIBUTE_USERNAME']][0].lower()
|
|
else:
|
|
username = session['samlNameId'].lower()
|
|
user = User.query.filter_by(username=username).first()
|
|
if not user:
|
|
# create user
|
|
user = User(username=username,
|
|
plain_text_password=None,
|
|
email=session['samlNameId'])
|
|
user.create_local_user()
|
|
session['user_id'] = user.id
|
|
email_attribute_name = current_app.config.get('SAML_ATTRIBUTE_EMAIL',
|
|
'email')
|
|
givenname_attribute_name = current_app.config.get(
|
|
'SAML_ATTRIBUTE_GIVENNAME', 'givenname')
|
|
surname_attribute_name = current_app.config.get(
|
|
'SAML_ATTRIBUTE_SURNAME', 'surname')
|
|
name_attribute_name = current_app.config.get('SAML_ATTRIBUTE_NAME',
|
|
None)
|
|
account_attribute_name = current_app.config.get(
|
|
'SAML_ATTRIBUTE_ACCOUNT', None)
|
|
admin_attribute_name = current_app.config.get('SAML_ATTRIBUTE_ADMIN',
|
|
None)
|
|
group_attribute_name = current_app.config.get('SAML_ATTRIBUTE_GROUP',
|
|
None)
|
|
admin_group_name = current_app.config.get('SAML_GROUP_ADMIN_NAME',
|
|
None)
|
|
operator_group_name = current_app.config.get('SAML_GROUP_OPERATOR_NAME',
|
|
None)
|
|
group_to_account_mapping = create_group_to_account_mapping()
|
|
|
|
if email_attribute_name in session['samlUserdata']:
|
|
user.email = session['samlUserdata'][email_attribute_name][
|
|
0].lower()
|
|
if givenname_attribute_name in session['samlUserdata']:
|
|
user.firstname = session['samlUserdata'][givenname_attribute_name][
|
|
0]
|
|
if surname_attribute_name in session['samlUserdata']:
|
|
user.lastname = session['samlUserdata'][surname_attribute_name][0]
|
|
if name_attribute_name in session['samlUserdata']:
|
|
name = session['samlUserdata'][name_attribute_name][0].split(' ')
|
|
user.firstname = name[0]
|
|
user.lastname = ' '.join(name[1:])
|
|
|
|
if group_attribute_name:
|
|
user_groups = session['samlUserdata'].get(group_attribute_name, [])
|
|
else:
|
|
user_groups = []
|
|
if admin_attribute_name or group_attribute_name:
|
|
user_accounts = set(user.get_accounts())
|
|
saml_accounts = []
|
|
for group_mapping in group_to_account_mapping:
|
|
mapping = group_mapping.split('=')
|
|
group = mapping[0]
|
|
account_name = mapping[1]
|
|
|
|
if group in user_groups:
|
|
account = handle_account(account_name)
|
|
saml_accounts.append(account)
|
|
|
|
for account_name in session['samlUserdata'].get(
|
|
account_attribute_name, []):
|
|
account = handle_account(account_name)
|
|
saml_accounts.append(account)
|
|
saml_accounts = set(saml_accounts)
|
|
for account in saml_accounts - user_accounts:
|
|
account.add_user(user)
|
|
history = History(msg='Adding {0} to account {1}'.format(
|
|
user.username, account.name),
|
|
created_by='SAML Assertion')
|
|
history.add()
|
|
for account in user_accounts - saml_accounts:
|
|
account.remove_user(user)
|
|
history = History(msg='Removing {0} from account {1}'.format(
|
|
user.username, account.name),
|
|
created_by='SAML Assertion')
|
|
history.add()
|
|
if admin_attribute_name and 'true' in session['samlUserdata'].get(
|
|
admin_attribute_name, []):
|
|
uplift_to_admin(user)
|
|
elif admin_group_name in user_groups:
|
|
uplift_to_admin(user)
|
|
elif operator_group_name in user_groups:
|
|
uplift_to_operator(user)
|
|
elif admin_attribute_name or group_attribute_name:
|
|
if user.role.name != 'User':
|
|
user.role_id = Role.query.filter_by(name='User').first().id
|
|
history = History(msg='Demoting {0} to user'.format(
|
|
user.username),
|
|
created_by='SAML Assertion')
|
|
history.add()
|
|
user.plain_text_password = None
|
|
user.update_profile()
|
|
session['authentication_type'] = 'SAML'
|
|
return authenticate_user(user, 'SAML')
|
|
else:
|
|
return render_template('errors/SAML.html', errors=errors)
|
|
|
|
|
|
def create_group_to_account_mapping():
|
|
group_to_account_mapping_string = current_app.config.get(
|
|
'SAML_GROUP_TO_ACCOUNT_MAPPING', None)
|
|
if group_to_account_mapping_string and len(
|
|
group_to_account_mapping_string.strip()) > 0:
|
|
group_to_account_mapping = group_to_account_mapping_string.split(',')
|
|
else:
|
|
group_to_account_mapping = []
|
|
return group_to_account_mapping
|
|
|
|
|
|
def handle_account(account_name, account_description=""):
|
|
clean_name = Account.sanitize_name(account_name)
|
|
account = Account.query.filter_by(name=clean_name).first()
|
|
if not account:
|
|
account = Account(name=clean_name,
|
|
description=account_description,
|
|
contact='',
|
|
mail='')
|
|
account.create_account()
|
|
history = History(msg='Account {0} created'.format(account.name),
|
|
created_by='OIDC/SAML Assertion')
|
|
history.add()
|
|
else:
|
|
account.description = account_description
|
|
account.update_account()
|
|
return account
|
|
|
|
|
|
def uplift_to_admin(user):
|
|
if user.role.name != 'Administrator':
|
|
user.role_id = Role.query.filter_by(name='Administrator').first().id
|
|
history = History(msg='Promoting {0} to administrator'.format(
|
|
user.username),
|
|
created_by='SAML Assertion')
|
|
history.add()
|
|
|
|
def uplift_to_operator(user):
|
|
if user.role.name != 'Operator':
|
|
user.role_id = Role.query.filter_by(name='Operator').first().id
|
|
history = History(msg='Promoting {0} to operator'.format(
|
|
user.username),
|
|
created_by='SAML Assertion')
|
|
history.add()
|
|
|
|
|
|
@index_bp.route('/saml/sls')
|
|
def saml_logout():
|
|
req = saml.prepare_flask_request(request)
|
|
auth = saml.init_saml_auth(req)
|
|
url = auth.process_slo()
|
|
errors = auth.get_errors()
|
|
if len(errors) == 0:
|
|
clear_session()
|
|
if url is not None:
|
|
return redirect(url)
|
|
elif current_app.config.get('SAML_LOGOUT_URL') is not None:
|
|
return redirect(current_app.config.get('SAML_LOGOUT_URL'))
|
|
else:
|
|
return redirect(url_for('login'))
|
|
else:
|
|
return render_template('errors/SAML.html', errors=errors)
|
|
|
|
|
|
### END SAML AUTHENTICATION ###
|
|
|
|
|
|
@index_bp.route('/swagger', methods=['GET'])
|
|
def swagger_spec():
|
|
try:
|
|
spec_path = os.path.join(current_app.root_path, "swagger-spec.yaml")
|
|
spec = open(spec_path, 'r')
|
|
loaded_spec = load(spec.read(), Loader)
|
|
except Exception as e:
|
|
current_app.logger.error(
|
|
'Cannot view swagger spec. Error: {0}'.format(e))
|
|
current_app.logger.debug(traceback.format_exc())
|
|
abort(500)
|
|
|
|
resp = make_response(json.dumps(loaded_spec), 200)
|
|
resp.headers['Content-Type'] = 'application/json'
|
|
|
|
return resp
|