powerdns-admin/powerdnsadmin/routes/index.py
2022-06-18 14:30:56 +02:00

1174 lines
48 KiB
Python

import os
import re
import json
import traceback
import datetime
import ipaddress
import base64
from distutils.util import strtobool
from yaml import Loader, load
from flask import Blueprint, render_template, make_response, url_for, current_app, g, session, request, redirect, abort
from flask_login import login_user, logout_user, login_required, current_user
from .base import login_manager
from ..lib import utils
from ..decorators import dyndns_login_required
from ..models.base import db
from ..models.user import User, Anonymous
from ..models.role import Role
from ..models.account import Account
from ..models.account_user import AccountUser
from ..models.domain import Domain
from ..models.domain_user import DomainUser
from ..models.domain_setting import DomainSetting
from ..models.record import Record
from ..models.setting import Setting
from ..models.history import History
from ..services.google import google_oauth
from ..services.github import github_oauth
from ..services.azure import azure_oauth
from ..services.oidc import oidc_oauth
from ..services.saml import SAML
from ..services.token import confirm_token
from ..services.email import send_account_verification
google = None
github = None
azure = None
oidc = None
saml = None
index_bp = Blueprint('index',
__name__,
template_folder='templates',
url_prefix='/')
@index_bp.before_app_first_request
def register_modules():
global google
global github
global azure
global oidc
global saml
google = google_oauth()
github = github_oauth()
azure = azure_oauth()
oidc = oidc_oauth()
saml = SAML()
@index_bp.before_request
def before_request():
# Check if user is anonymous
g.user = current_user
login_manager.anonymous_user = Anonymous
# Check site is in maintenance mode
maintenance = Setting().get('maintenance')
if maintenance and current_user.is_authenticated and current_user.role.name not in [
'Administrator', 'Operator'
]:
return render_template('maintenance.html')
# Manage session timeout
session.permanent = True
current_app.permanent_session_lifetime = datetime.timedelta(
minutes=int(Setting().get('session_timeout')))
session.modified = True
@index_bp.route('/', methods=['GET'])
@login_required
def index():
return redirect(url_for('dashboard.dashboard'))
@index_bp.route('/ping', methods=['GET'])
def ping():
return make_response('ok')
@index_bp.route('/google/login')
def google_login():
if not Setting().get('google_oauth_enabled') or google is None:
current_app.logger.error(
'Google OAuth is disabled or you have not yet reloaded the pda application after enabling.'
)
abort(400)
else:
redirect_uri = url_for('google_authorized', _external=True)
return google.authorize_redirect(redirect_uri)
@index_bp.route('/github/login')
def github_login():
if not Setting().get('github_oauth_enabled') or github is None:
current_app.logger.error(
'Github OAuth is disabled or you have not yet reloaded the pda application after enabling.'
)
abort(400)
else:
redirect_uri = url_for('github_authorized', _external=True)
return github.authorize_redirect(redirect_uri)
@index_bp.route('/azure/login')
def azure_login():
if not Setting().get('azure_oauth_enabled') or azure is None:
current_app.logger.error(
'Microsoft OAuth is disabled or you have not yet reloaded the pda application after enabling.'
)
abort(400)
else:
redirect_uri = url_for('azure_authorized',
_external=True,
_scheme='https')
return azure.authorize_redirect(redirect_uri)
@index_bp.route('/oidc/login')
def oidc_login():
if not Setting().get('oidc_oauth_enabled') or oidc is None:
current_app.logger.error(
'OIDC OAuth is disabled or you have not yet reloaded the pda application after enabling.'
)
abort(400)
else:
redirect_uri = url_for('oidc_authorized', _external=True)
return oidc.authorize_redirect(redirect_uri)
@index_bp.route('/login', methods=['GET', 'POST'])
def login():
SAML_ENABLED = current_app.config.get('SAML_ENABLED')
if g.user is not None and current_user.is_authenticated:
return redirect(url_for('dashboard.dashboard'))
if 'google_token' in session:
user_data = json.loads(google.get('userinfo').text)
first_name = user_data['given_name']
surname = user_data['family_name']
email = user_data['email']
user = User.query.filter_by(username=email).first()
if user is None:
user = User.query.filter_by(email=email).first()
if not user:
user = User(username=email,
firstname=first_name,
lastname=surname,
plain_text_password=None,
email=email)
result = user.create_local_user()
if not result['status']:
session.pop('google_token', None)
return redirect(url_for('index.login'))
session['user_id'] = user.id
session['authentication_type'] = 'OAuth'
return authenticate_user(user, 'Google OAuth')
if 'github_token' in session:
me = json.loads(github.get('user').text)
github_username = me['login']
github_name = me['name']
github_email = me['email']
user = User.query.filter_by(username=github_username).first()
if user is None:
user = User.query.filter_by(email=github_email).first()
if not user:
user = User(username=github_username,
plain_text_password=None,
firstname=github_name,
lastname='',
email=github_email)
result = user.create_local_user()
if not result['status']:
session.pop('github_token', None)
return redirect(url_for('index.login'))
session['user_id'] = user.id
session['authentication_type'] = 'OAuth'
return authenticate_user(user, 'Github OAuth')
if 'azure_token' in session:
azure_info = azure.get('me?$select=displayName,givenName,id,mail,surname,userPrincipalName').text
current_app.logger.info('Azure login returned: '+azure_info)
me = json.loads(azure_info)
azure_info = azure.post('me/getMemberGroups',
json={'securityEnabledOnly': False}).text
current_app.logger.info('Azure groups returned: ' + azure_info)
grouplookup = json.loads(azure_info)
# Groups are in mygroups['value'] which is an array
if "value" in grouplookup:
mygroups = grouplookup["value"]
else:
mygroups = []
azure_username = me["userPrincipalName"]
azure_givenname = me["givenName"]
azure_familyname = me["surname"]
if "mail" in me:
azure_email = me["mail"]
else:
azure_email = ""
if not azure_email:
azure_email = me["userPrincipalName"]
# Handle foreign principals such as guest users
azure_email = re.sub(r"#.*$", "", azure_email)
azure_username = re.sub(r"#.*$", "", azure_username)
user = User.query.filter_by(username=azure_username).first()
if not user:
user = User(username=azure_username,
plain_text_password=None,
firstname=azure_givenname,
lastname=azure_familyname,
email=azure_email)
result = user.create_local_user()
if not result['status']:
current_app.logger.warning('Unable to create ' + azure_username)
session.pop('azure_token', None)
# note: a redirect to login results in an endless loop, so render the login page instead
return render_template('login.html',
saml_enabled=SAML_ENABLED,
error=('User ' + azure_username +
' cannot be created.'))
session['user_id'] = user.id
session['authentication_type'] = 'OAuth'
# Handle group memberships, if defined
if Setting().get('azure_sg_enabled'):
if Setting().get('azure_admin_group') in mygroups:
current_app.logger.info('Setting role for user ' +
azure_username +
' to Administrator due to group membership')
user.set_role("Administrator")
else:
if Setting().get('azure_operator_group') in mygroups:
current_app.logger.info('Setting role for user ' +
azure_username +
' to Operator due to group membership')
user.set_role("Operator")
else:
if Setting().get('azure_user_group') in mygroups:
current_app.logger.info('Setting role for user ' +
azure_username +
' to User due to group membership')
user.set_role("User")
else:
current_app.logger.warning('User ' +
azure_username +
' has no relevant group memberships')
session.pop('azure_token', None)
return render_template('login.html',
saml_enabled=SAML_ENABLED,
error=('User ' + azure_username +
' is not in any authorised groups.'))
# Handle account/group creation, if enabled
if Setting().get('azure_group_accounts_enabled') and mygroups:
current_app.logger.info('Azure group account sync enabled')
name_value = Setting().get('azure_group_accounts_name')
description_value = Setting().get('azure_group_accounts_description')
select_values = name_value
if description_value != '':
select_values += ',' + description_value
mygroups = get_azure_groups(
'me/memberOf/microsoft.graph.group?$count=false&$securityEnabled=true&$select={}'.format(select_values))
description_pattern = Setting().get('azure_group_accounts_description_re')
pattern = Setting().get('azure_group_accounts_name_re')
# Loop through users security groups
for azure_group in mygroups:
if name_value in azure_group:
group_name = azure_group[name_value]
group_description = ''
if description_value in azure_group:
group_description = azure_group[description_value]
# Do regex search if enabled for group description
if description_pattern != '':
current_app.logger.info('Matching group description {} against regex {}'.format(
group_description, description_pattern))
matches = re.match(
description_pattern, group_description)
if matches:
current_app.logger.info(
'Group {} matched regexp'.format(group_description))
group_description = matches.group(1)
else:
# Regexp didn't match, continue to next iteration
continue
# Do regex search if enabled for group name
if pattern != '':
current_app.logger.info(
'Matching group name {} against regex {}'.format(group_name, pattern))
matches = re.match(pattern, group_name)
if matches:
current_app.logger.info(
'Group {} matched regexp'.format(group_name))
group_name = matches.group(1)
else:
# Regexp didn't match, continue to next iteration
continue
account = Account()
account_id = account.get_id_by_name(account_name=group_name)
if account_id:
account = Account.query.get(account_id)
# check if user has permissions
account_users = account.get_user()
current_app.logger.info('Group: {} Users: {}'.format(
group_name,
account_users))
if user.id in account_users:
current_app.logger.info('User id {} is already in account {}'.format(
user.id, group_name))
else:
account.add_user(user)
history = History(msg='Update account {0}'.format(
account.name),
created_by='System')
history.add()
current_app.logger.info('User {} added to Account {}'.format(
user.username, account.name))
else:
account.name = group_name
account.description = group_description
account.contact = ''
account.mail = ''
account.create_account()
history = History(msg='Create account {0}'.format(
account.name),
created_by='System')
history.add()
account.add_user(user)
history = History(msg='Update account {0}'.format(account.name),
created_by='System')
history.add()
current_app.logger.warning('group info: {} '.format(account_id))
return authenticate_user(user, 'Azure OAuth')
if 'oidc_token' in session:
me = json.loads(oidc.get('userinfo').text)
oidc_username = me[Setting().get('oidc_oauth_username')]
oidc_givenname = me[Setting().get('oidc_oauth_firstname')]
oidc_familyname = me[Setting().get('oidc_oauth_last_name')]
oidc_email = me[Setting().get('oidc_oauth_email')]
user = User.query.filter_by(username=oidc_username).first()
if not user:
user = User(username=oidc_username,
plain_text_password=None,
firstname=oidc_givenname,
lastname=oidc_familyname,
email=oidc_email)
result = user.create_local_user()
else:
user.firstname = oidc_givenname
user.lastname = oidc_familyname
user.email = oidc_email
user.plain_text_password = None
result = user.update_local_user()
if not result['status']:
session.pop('oidc_token', None)
return redirect(url_for('index.login'))
#This checks if the account_name_property and account_description property were included in settings.
if Setting().get('oidc_oauth_account_name_property') and Setting().get('oidc_oauth_account_description_property'):
#Gets the name_property and description_property.
name_prop = Setting().get('oidc_oauth_account_name_property')
desc_prop = Setting().get('oidc_oauth_account_description_property')
account_to_add = []
#If the name_property and desc_property exist in me (A variable that contains all the userinfo from the IdP).
if name_prop in me and desc_prop in me:
accounts_name_prop = [me[name_prop]] if type(me[name_prop]) is not list else me[name_prop]
accounts_desc_prop = [me[desc_prop]] if type(me[desc_prop]) is not list else me[desc_prop]
#Run on all groups the user is in by the index num.
for i in range(len(accounts_name_prop)):
description = ''
if i < len(accounts_desc_prop):
description = accounts_desc_prop[i]
account = handle_account(accounts_name_prop[i], description)
account_to_add.append(account)
user_accounts = user.get_accounts()
# Add accounts
for account in account_to_add:
if account not in user_accounts:
account.add_user(user)
# Remove accounts if the setting is enabled
if Setting().get('delete_sso_accounts'):
for account in user_accounts:
if account not in account_to_add:
account.remove_user(user)
session['user_id'] = user.id
session['authentication_type'] = 'OAuth'
return authenticate_user(user, 'OIDC OAuth')
if request.method == 'GET':
return render_template('login.html', saml_enabled=SAML_ENABLED)
elif request.method == 'POST':
# process Local-DB authentication
username = request.form['username']
password = request.form['password']
otp_token = request.form.get('otptoken')
auth_method = request.form.get('auth_method', 'LOCAL')
session[
'authentication_type'] = 'LDAP' if auth_method != 'LOCAL' else 'LOCAL'
remember_me = True if 'remember' in request.form else False
if auth_method == 'LOCAL' and not Setting().get('local_db_enabled'):
return render_template(
'login.html',
saml_enabled=SAML_ENABLED,
error='Local authentication is disabled')
user = User(username=username,
password=password,
plain_text_password=password)
try:
if Setting().get('verify_user_email') and user.email and not user.confirmed:
return render_template(
'login.html',
saml_enabled=SAML_ENABLED,
error='Please confirm your email address first')
auth = user.is_validate(method=auth_method,
src_ip=request.remote_addr)
if auth == False:
signin_history(user.username, auth_method, False)
return render_template('login.html',
saml_enabled=SAML_ENABLED,
error='Invalid credentials')
except Exception as e:
current_app.logger.error(
"Cannot authenticate user. Error: {}".format(e))
current_app.logger.debug(traceback.format_exc())
return render_template('login.html',
saml_enabled=SAML_ENABLED,
error=e)
# check if user enabled OPT authentication
if user.otp_secret:
if otp_token and otp_token.isdigit():
good_token = user.verify_totp(otp_token)
if not good_token:
signin_history(user.username, auth_method, False)
return render_template('login.html',
saml_enabled=SAML_ENABLED,
error='Invalid credentials')
else:
return render_template('login.html',
saml_enabled=SAML_ENABLED,
error='Token required')
if Setting().get('autoprovisioning') and auth_method!='LOCAL':
urn_value=Setting().get('urn_value')
Entitlements=user.read_entitlements(Setting().get('autoprovisioning_attribute'))
if len(Entitlements)==0 and Setting().get('purge'):
user.set_role("User")
user.revoke_privilege(True)
elif len(Entitlements)!=0:
if checkForPDAEntries(Entitlements, urn_value):
user.updateUser(Entitlements)
else:
current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix')
if Setting().get('purge'):
user.set_role("User")
user.revoke_privilege(True)
current_app.logger.warning('Procceding to revoke every privilige from ' + user.username + '.' )
return authenticate_user(user, auth_method, remember_me)
def checkForPDAEntries(Entitlements, urn_value):
"""
Run through every record located in the ldap attribute given and determine if there are any valid powerdns-admin records
"""
urnArguments=[x.lower() for x in urn_value.split(':')]
for Entitlement in Entitlements:
entArguments=Entitlement.split(':powerdns-admin')
entArguments=[x.lower() for x in entArguments[0].split(':')]
if (entArguments==urnArguments):
return True
return False
def clear_session():
session.pop('user_id', None)
session.pop('github_token', None)
session.pop('google_token', None)
session.pop('authentication_type', None)
session.pop('remote_user', None)
session.clear()
logout_user()
def signin_history(username, authenticator, success):
# Get user ip address
if request.headers.getlist("X-Forwarded-For"):
request_ip = request.headers.getlist("X-Forwarded-For")[0]
request_ip = request_ip.split(',')[0]
else:
request_ip = request.remote_addr
# Write log
if success:
str_success = 'succeeded'
current_app.logger.info(
"User {} authenticated successfully via {} from {}".format(
username, authenticator, request_ip))
else:
str_success = 'failed'
current_app.logger.warning(
"User {} failed to authenticate via {} from {}".format(
username, authenticator, request_ip))
# Write history
History(msg='User {} authentication {}'.format(username, str_success),
detail = json.dumps({
'username': username,
'authenticator': authenticator,
'ip_address': request_ip,
'success': 1 if success else 0
}),
created_by='System').add()
# Get a list of Azure security groups the user is a member of
def get_azure_groups(uri):
azure_info = azure.get(uri).text
current_app.logger.info('Azure groups returned: ' + azure_info)
grouplookup = json.loads(azure_info)
if "value" in grouplookup:
mygroups = grouplookup["value"]
# If "@odata.nextLink" exists in the results, we need to get more groups
if "@odata.nextLink" in grouplookup:
# The additional groups are added to the existing array
mygroups.extend(get_azure_groups(grouplookup["@odata.nextLink"]))
else:
mygroups = []
return mygroups
# Handle user login, write history and, if set, handle showing the register_otp QR code.
# if Setting for OTP on first login is enabled, and OTP field is also enabled,
# but user isn't using it yet, enable OTP, get QR code and display it, logging the user out.
def authenticate_user(user, authenticator, remember=False):
login_user(user, remember=remember)
signin_history(user.username, authenticator, True)
if Setting().get('otp_force') and Setting().get('otp_field_enabled') and not user.otp_secret:
user.update_profile(enable_otp=True)
user_id = current_user.id
prepare_welcome_user(user_id)
return redirect(url_for('index.welcome'))
return redirect(url_for('index.login'))
# Prepare user to enter /welcome screen, otherwise they won't have permission to do so
def prepare_welcome_user(user_id):
logout_user()
session['welcome_user_id'] = user_id
@index_bp.route('/logout')
def logout():
if current_app.config.get(
'SAML_ENABLED'
) and 'samlSessionIndex' in session and current_app.config.get(
'SAML_LOGOUT'):
req = saml.prepare_flask_request(request)
auth = saml.init_saml_auth(req)
if current_app.config.get('SAML_LOGOUT_URL'):
return redirect(
auth.logout(
name_id_format=
"urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress",
return_to=current_app.config.get('SAML_LOGOUT_URL'),
session_index=session['samlSessionIndex'],
name_id=session['samlNameId']))
return redirect(
auth.logout(
name_id_format=
"urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress",
session_index=session['samlSessionIndex'],
name_id=session['samlNameId']))
redirect_uri = url_for('index.login')
oidc_logout = Setting().get('oidc_oauth_logout_url')
if 'oidc_token' in session and oidc_logout:
redirect_uri = "{}?redirect_uri={}".format(
oidc_logout, url_for('index.login', _external=True))
# Clean cookies and flask session
clear_session()
# If remote user authentication is enabled and a logout URL is configured for it,
# redirect users to that instead
remote_user_logout_url = current_app.config.get('REMOTE_USER_LOGOUT_URL')
if current_app.config.get('REMOTE_USER_ENABLED') and remote_user_logout_url:
current_app.logger.debug(
'Redirecting remote user "{0}" to logout URL {1}'
.format(current_user.username, remote_user_logout_url))
# Warning: if REMOTE_USER environment variable is still set and not cleared by
# some external module, not defining a custom logout URL will trigger a loop
# that will just log the user back in right after logging out
res = make_response(redirect(remote_user_logout_url.strip()))
# Remove any custom cookies the remote authentication mechanism may use
# (e.g.: MOD_AUTH_CAS and MOD_AUTH_CAS_S)
remote_cookies = current_app.config.get('REMOTE_USER_COOKIES')
for r_cookie_name in utils.ensure_list(remote_cookies):
res.delete_cookie(r_cookie_name)
return res
return redirect(redirect_uri)
@index_bp.route('/register', methods=['GET', 'POST'])
def register():
if Setting().get('signup_enabled'):
if request.method == 'GET':
return render_template('register.html')
elif request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
firstname = request.form.get('firstname', '').strip()
lastname = request.form.get('lastname', '').strip()
email = request.form.get('email', '').strip()
rpassword = request.form.get('rpassword', '')
if not username or not password or not email:
return render_template(
'register.html', error='Please input required information')
if password != rpassword:
return render_template(
'register.html',
error="Password confirmation does not match")
user = User(username=username,
plain_text_password=password,
firstname=firstname,
lastname=lastname,
email=email)
try:
result = user.create_local_user()
if result and result['status']:
if Setting().get('verify_user_email'):
send_account_verification(email)
if Setting().get('otp_force') and Setting().get('otp_field_enabled'):
user.update_profile(enable_otp=True)
prepare_welcome_user(user.id)
return redirect(url_for('index.welcome'))
else:
return redirect(url_for('index.login'))
else:
return render_template('register.html',
error=result['msg'])
except Exception as e:
return render_template('register.html', error=e)
else:
return render_template('errors/404.html'), 404
# Show welcome page on first login if otp_force is enabled
@index_bp.route('/welcome', methods=['GET', 'POST'])
def welcome():
if 'welcome_user_id' not in session:
return redirect(url_for('index.index'))
user = User(id=session['welcome_user_id'])
encoded_img_data = base64.b64encode(user.get_qrcode_value())
if request.method == 'GET':
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user)
elif request.method == 'POST':
otp_token = request.form.get('otptoken', '')
if otp_token and otp_token.isdigit():
good_token = user.verify_totp(otp_token)
if not good_token:
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user, error="Invalid token")
else:
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user, error="Token required")
session.pop('welcome_user_id')
return redirect(url_for('index.index'))
@index_bp.route('/confirm/<token>', methods=['GET'])
def confirm_email(token):
email = confirm_token(token)
if not email:
# Cannot confirm email
return render_template('email_confirmation.html', status=0)
user = User.query.filter_by(email=email).first_or_404()
if user.confirmed:
# Already confirmed
current_app.logger.info(
"User email {} already confirmed".format(email))
return render_template('email_confirmation.html', status=2)
else:
# Confirm email is valid
user.update_confirmed(confirmed=1)
current_app.logger.info(
"User email {} confirmed successfully".format(email))
return render_template('email_confirmation.html', status=1)
@index_bp.route('/resend-confirmation-email', methods=['GET', 'POST'])
def resend_confirmation_email():
if current_user.is_authenticated:
return redirect(url_for('index.index'))
if request.method == 'GET':
return render_template('resend_confirmation_email.html')
elif request.method == 'POST':
email = request.form.get('email')
user = User.query.filter(User.email == email).first()
if not user:
# Email not found
status = 0
elif user.confirmed:
# Email already confirmed
status = 1
else:
# Send new confirmed email
send_account_verification(user.email)
status = 2
return render_template('resend_confirmation_email.html', status=status)
@index_bp.route('/nic/checkip.html', methods=['GET', 'POST'])
def dyndns_checkip():
# This route covers the default ddclient 'web' setting for the checkip service
return render_template('dyndns.html',
response=request.environ.get(
'HTTP_X_REAL_IP', request.remote_addr))
@index_bp.route('/nic/update', methods=['GET', 'POST'])
@dyndns_login_required
def dyndns_update():
# dyndns protocol response codes in use are:
# good: update successful
# nochg: IP address already set to update address
# nohost: hostname does not exist for this user account
# 911: server error
# have to use 200 HTTP return codes because ddclient does not read the return string if the code is other than 200
# reference: https://help.dyn.com/remote-access-api/perform-update/
# reference: https://help.dyn.com/remote-access-api/return-codes/
hostname = request.args.get('hostname')
myip = request.args.get('myip')
if not hostname:
history = History(msg="DynDNS update: missing hostname parameter",
created_by=current_user.username)
history.add()
return render_template('dyndns.html', response='nohost'), 200
try:
if current_user.role.name in ['Administrator', 'Operator']:
domains = Domain.query.all()
else:
# Get query for domain to which the user has access permission.
# This includes direct domain permission AND permission through
# account membership
domains = db.session.query(Domain) \
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.filter(
db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
)).all()
except Exception as e:
current_app.logger.error('DynDNS Error: {0}'.format(e))
current_app.logger.debug(traceback.format_exc())
return render_template('dyndns.html', response='911'), 200
domain = None
domain_segments = hostname.split('.')
for _index in range(len(domain_segments)):
full_domain = '.'.join(domain_segments)
potential_domain = Domain.query.filter(
Domain.name == full_domain).first()
if potential_domain in domains:
domain = potential_domain
break
domain_segments.pop(0)
if not domain:
history = History(
msg=
"DynDNS update: attempted update of {0} but it does not exist for this user"
.format(hostname),
created_by=current_user.username)
history.add()
return render_template('dyndns.html', response='nohost'), 200
myip_addr = []
if myip:
for address in myip.split(','):
myip_addr += utils.validate_ipaddress(address)
remote_addr = utils.validate_ipaddress(
request.headers.get('X-Forwarded-For',
request.remote_addr).split(', ')[0])
response = 'nochg'
for ip in myip_addr or remote_addr:
if isinstance(ip, ipaddress.IPv4Address):
rtype = 'A'
else:
rtype = 'AAAA'
r = Record(name=hostname, type=rtype)
# Check if the user requested record exists within this domain
if r.exists(domain.name) and r.is_allowed_edit():
if r.data == str(ip):
# Record content did not change, return 'nochg'
history = History(
msg=
"DynDNS update: attempted update of {0} but record already up-to-date"
.format(hostname),
created_by=current_user.username,
domain_id=domain.id)
history.add()
else:
oldip = r.data
result = r.update(domain.name, str(ip))
if result['status'] == 'ok':
history = History(
msg='DynDNS update: updated {} successfully'.format(hostname),
detail = json.dumps({
'domain': domain.name,
'record': hostname,
'type': rtype,
'old_value': oldip,
'new_value': str(ip)
}),
created_by=current_user.username,
domain_id=domain.id)
history.add()
response = 'good'
else:
response = '911'
break
elif r.is_allowed_edit():
ondemand_creation = DomainSetting.query.filter(
DomainSetting.domain == domain).filter(
DomainSetting.setting == 'create_via_dyndns').first()
if (ondemand_creation is not None) and (strtobool(
ondemand_creation.value) == True):
# Build the rrset
rrset_data = [{
"changetype": "REPLACE",
"name": hostname + '.',
"ttl": 3600,
"type": rtype,
"records": [{
"content": str(ip),
"disabled": False
}],
"comments": []
}]
# Format the rrset
rrset = {"rrsets": rrset_data}
result = Record().add(domain.name, rrset)
if result['status'] == 'ok':
history = History(
msg=
'DynDNS update: created record {0} in zone {1} successfully'
.format(hostname, domain.name, str(ip)),
detail = json.dumps({
'domain': domain.name,
'record': hostname,
'value': str(ip)
}),
created_by=current_user.username,
domain_id=domain.id)
history.add()
response = 'good'
else:
history = History(
msg=
'DynDNS update: attempted update of {0} but it does not exist for this user'
.format(hostname),
created_by=current_user.username)
history.add()
return render_template('dyndns.html', response=response), 200
### START SAML AUTHENTICATION ###
@index_bp.route('/saml/login')
def saml_login():
if not current_app.config.get('SAML_ENABLED'):
abort(400)
from onelogin.saml2.utils import OneLogin_Saml2_Utils
req = saml.prepare_flask_request(request)
auth = saml.init_saml_auth(req)
redirect_url = OneLogin_Saml2_Utils.get_self_url(req) + url_for(
'index.saml_authorized')
return redirect(auth.login(return_to=redirect_url))
@index_bp.route('/saml/metadata')
def saml_metadata():
if not current_app.config.get('SAML_ENABLED'):
current_app.logger.error("SAML authentication is disabled.")
abort(400)
from onelogin.saml2.utils import OneLogin_Saml2_Utils
req = saml.prepare_flask_request(request)
auth = saml.init_saml_auth(req)
settings = auth.get_settings()
metadata = settings.get_sp_metadata()
errors = settings.validate_metadata(metadata)
if len(errors) == 0:
resp = make_response(metadata, 200)
resp.headers['Content-Type'] = 'text/xml'
else:
resp = make_response(errors.join(', '), 500)
return resp
@index_bp.route('/saml/authorized', methods=['GET', 'POST'])
def saml_authorized():
errors = []
if not current_app.config.get('SAML_ENABLED'):
current_app.logger.error("SAML authentication is disabled.")
abort(400)
from onelogin.saml2.utils import OneLogin_Saml2_Utils
req = saml.prepare_flask_request(request)
auth = saml.init_saml_auth(req)
auth.process_response()
current_app.logger.debug( auth.get_attributes() )
errors = auth.get_errors()
if len(errors) == 0:
session['samlUserdata'] = auth.get_attributes()
session['samlNameId'] = auth.get_nameid()
session['samlSessionIndex'] = auth.get_session_index()
self_url = OneLogin_Saml2_Utils.get_self_url(req)
self_url = self_url + req['script_name']
if 'RelayState' in request.form and self_url != request.form[
'RelayState']:
return redirect(auth.redirect_to(request.form['RelayState']))
if current_app.config.get('SAML_ATTRIBUTE_USERNAME', False):
username = session['samlUserdata'][
current_app.config['SAML_ATTRIBUTE_USERNAME']][0].lower()
else:
username = session['samlNameId'].lower()
user = User.query.filter_by(username=username).first()
if not user:
# create user
user = User(username=username,
plain_text_password=None,
email=session['samlNameId'])
user.create_local_user()
session['user_id'] = user.id
email_attribute_name = current_app.config.get('SAML_ATTRIBUTE_EMAIL',
'email')
givenname_attribute_name = current_app.config.get(
'SAML_ATTRIBUTE_GIVENNAME', 'givenname')
surname_attribute_name = current_app.config.get(
'SAML_ATTRIBUTE_SURNAME', 'surname')
name_attribute_name = current_app.config.get('SAML_ATTRIBUTE_NAME',
None)
account_attribute_name = current_app.config.get(
'SAML_ATTRIBUTE_ACCOUNT', None)
admin_attribute_name = current_app.config.get('SAML_ATTRIBUTE_ADMIN',
None)
group_attribute_name = current_app.config.get('SAML_ATTRIBUTE_GROUP',
None)
admin_group_name = current_app.config.get('SAML_GROUP_ADMIN_NAME',
None)
operator_group_name = current_app.config.get('SAML_GROUP_OPERATOR_NAME',
None)
group_to_account_mapping = create_group_to_account_mapping()
if email_attribute_name in session['samlUserdata']:
user.email = session['samlUserdata'][email_attribute_name][
0].lower()
if givenname_attribute_name in session['samlUserdata']:
user.firstname = session['samlUserdata'][givenname_attribute_name][
0]
if surname_attribute_name in session['samlUserdata']:
user.lastname = session['samlUserdata'][surname_attribute_name][0]
if name_attribute_name in session['samlUserdata']:
name = session['samlUserdata'][name_attribute_name][0].split(' ')
user.firstname = name[0]
user.lastname = ' '.join(name[1:])
if group_attribute_name:
user_groups = session['samlUserdata'].get(group_attribute_name, [])
else:
user_groups = []
if admin_attribute_name or group_attribute_name:
user_accounts = set(user.get_accounts())
saml_accounts = []
for group_mapping in group_to_account_mapping:
mapping = group_mapping.split('=')
group = mapping[0]
account_name = mapping[1]
if group in user_groups:
account = handle_account(account_name)
saml_accounts.append(account)
for account_name in session['samlUserdata'].get(
account_attribute_name, []):
account = handle_account(account_name)
saml_accounts.append(account)
saml_accounts = set(saml_accounts)
for account in saml_accounts - user_accounts:
account.add_user(user)
history = History(msg='Adding {0} to account {1}'.format(
user.username, account.name),
created_by='SAML Assertion')
history.add()
for account in user_accounts - saml_accounts:
account.remove_user(user)
history = History(msg='Removing {0} from account {1}'.format(
user.username, account.name),
created_by='SAML Assertion')
history.add()
if admin_attribute_name and 'true' in session['samlUserdata'].get(
admin_attribute_name, []):
uplift_to_admin(user)
elif admin_group_name in user_groups:
uplift_to_admin(user)
elif operator_group_name in user_groups:
uplift_to_operator(user)
elif admin_attribute_name or group_attribute_name:
if user.role.name != 'User':
user.role_id = Role.query.filter_by(name='User').first().id
history = History(msg='Demoting {0} to user'.format(
user.username),
created_by='SAML Assertion')
history.add()
user.plain_text_password = None
user.update_profile()
session['authentication_type'] = 'SAML'
return authenticate_user(user, 'SAML')
else:
return render_template('errors/SAML.html', errors=errors)
def create_group_to_account_mapping():
group_to_account_mapping_string = current_app.config.get(
'SAML_GROUP_TO_ACCOUNT_MAPPING', None)
if group_to_account_mapping_string and len(
group_to_account_mapping_string.strip()) > 0:
group_to_account_mapping = group_to_account_mapping_string.split(',')
else:
group_to_account_mapping = []
return group_to_account_mapping
def handle_account(account_name, account_description=""):
if Setting().get('account_name_extra_chars'):
char_list = "abcdefghijklmnopqrstuvwxyz0123456789_-."
else:
char_list = "abcdefghijklmnopqrstuvwxyz0123456789"
clean_name = ''.join(c for c in account_name.lower()
if c in char_list)
if len(clean_name) > Account.name.type.length:
current_app.logger.error(
"Account name {0} too long. Truncated.".format(clean_name))
account = Account.query.filter_by(name=clean_name).first()
if not account:
account = Account(name=clean_name.lower(),
description=account_description,
contact='',
mail='')
account.create_account()
history = History(msg='Account {0} created'.format(account.name),
created_by='OIDC/SAML Assertion')
history.add()
else:
account.description = account_description
account.update_account()
return account
def uplift_to_admin(user):
if user.role.name != 'Administrator':
user.role_id = Role.query.filter_by(name='Administrator').first().id
history = History(msg='Promoting {0} to administrator'.format(
user.username),
created_by='SAML Assertion')
history.add()
def uplift_to_operator(user):
if user.role.name != 'Operator':
user.role_id = Role.query.filter_by(name='Operator').first().id
history = History(msg='Promoting {0} to operator'.format(
user.username),
created_by='SAML Assertion')
history.add()
@index_bp.route('/saml/sls')
def saml_logout():
req = saml.prepare_flask_request(request)
auth = saml.init_saml_auth(req)
url = auth.process_slo()
errors = auth.get_errors()
if len(errors) == 0:
clear_session()
if url is not None:
return redirect(url)
elif current_app.config.get('SAML_LOGOUT_URL') is not None:
return redirect(current_app.config.get('SAML_LOGOUT_URL'))
else:
return redirect(url_for('login'))
else:
return render_template('errors/SAML.html', errors=errors)
### END SAML AUTHENTICATION ###
@index_bp.route('/swagger', methods=['GET'])
def swagger_spec():
try:
spec_path = os.path.join(current_app.root_path, "swagger-spec.yaml")
spec = open(spec_path, 'r')
loaded_spec = load(spec.read(), Loader)
except Exception as e:
current_app.logger.error(
'Cannot view swagger spec. Error: {0}'.format(e))
current_app.logger.debug(traceback.format_exc())
abort(500)
resp = make_response(json.dumps(loaded_spec), 200)
resp.headers['Content-Type'] = 'application/json'
return resp