mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-06-14 20:16:05 +00:00
Add 'otp_force' basic setting (#1051)
If the 'otp_force' and 'otp_field_enabled' basic settings are both enabled, automatically enable 2FA for the user after login or signup, if needed, by setting a new OTP secret. Redirect the user to a welcome page for scanning the QR code. Also show the secret key in ASCII form on the user profile page for easier copying into other applications.
This commit is contained in:

committed by
GitHub

parent
0da9b2185e
commit
94a923a965
@ -1260,8 +1260,7 @@ def setting_basic():
|
||||
'allow_user_create_domain', 'allow_user_remove_domain', 'allow_user_view_history', 'bg_domain_updates', 'site_name',
|
||||
'session_timeout', 'warn_session_timeout', 'ttl_options',
|
||||
'pdns_api_timeout', 'verify_ssl_connections', 'verify_user_email',
|
||||
'delete_sso_accounts', 'otp_field_enabled', 'custom_css', 'enable_api_rr_history', 'max_history_records'
|
||||
|
||||
'delete_sso_accounts', 'otp_field_enabled', 'custom_css', 'enable_api_rr_history', 'max_history_records', 'otp_force'
|
||||
]
|
||||
|
||||
return render_template('admin_setting_basic.html', settings=settings)
|
||||
|
@ -4,6 +4,7 @@ import json
|
||||
import traceback
|
||||
import datetime
|
||||
import ipaddress
|
||||
import base64
|
||||
from distutils.util import strtobool
|
||||
from yaml import Loader, load
|
||||
from onelogin.saml2.utils import OneLogin_Saml2_Utils
|
||||
@ -167,10 +168,8 @@ def login():
|
||||
return redirect(url_for('index.login'))
|
||||
|
||||
session['user_id'] = user.id
|
||||
login_user(user, remember=False)
|
||||
session['authentication_type'] = 'OAuth'
|
||||
signin_history(user.username, 'Google OAuth', True)
|
||||
return redirect(url_for('index.index'))
|
||||
return authenticate_user(user, 'Google OAuth')
|
||||
|
||||
if 'github_token' in session:
|
||||
me = json.loads(github.get('user').text)
|
||||
@ -195,9 +194,7 @@ def login():
|
||||
|
||||
session['user_id'] = user.id
|
||||
session['authentication_type'] = 'OAuth'
|
||||
login_user(user, remember=False)
|
||||
signin_history(user.username, 'Github OAuth', True)
|
||||
return redirect(url_for('index.index'))
|
||||
return authenticate_user(user, 'Github OAuth')
|
||||
|
||||
if 'azure_token' in session:
|
||||
azure_info = azure.get('me?$select=displayName,givenName,id,mail,surname,userPrincipalName').text
|
||||
@ -366,10 +363,7 @@ def login():
|
||||
history.add()
|
||||
current_app.logger.warning('group info: {} '.format(account_id))
|
||||
|
||||
|
||||
login_user(user, remember=False)
|
||||
signin_history(user.username, 'Azure OAuth', True)
|
||||
return redirect(url_for('index.index'))
|
||||
return authenticate_user(user, 'Azure OAuth')
|
||||
|
||||
if 'oidc_token' in session:
|
||||
me = json.loads(oidc.get('userinfo').text)
|
||||
@ -433,9 +427,7 @@ def login():
|
||||
|
||||
session['user_id'] = user.id
|
||||
session['authentication_type'] = 'OAuth'
|
||||
login_user(user, remember=False)
|
||||
signin_history(user.username, 'OIDC OAuth', True)
|
||||
return redirect(url_for('index.index'))
|
||||
return authenticate_user(user, 'OIDC OAuth')
|
||||
|
||||
if request.method == 'GET':
|
||||
return render_template('login.html', saml_enabled=SAML_ENABLED)
|
||||
@ -512,9 +504,7 @@ def login():
|
||||
user.revoke_privilege(True)
|
||||
current_app.logger.warning('Procceding to revoke every privilige from ' + user.username + '.' )
|
||||
|
||||
login_user(user, remember=remember_me)
|
||||
signin_history(user.username, 'LOCAL', True)
|
||||
return redirect(session.get('next', url_for('index.index')))
|
||||
return authenticate_user(user, 'LOCAL', remember_me)
|
||||
|
||||
def checkForPDAEntries(Entitlements, urn_value):
|
||||
"""
|
||||
@ -584,6 +574,23 @@ def get_azure_groups(uri):
|
||||
mygroups = []
|
||||
return mygroups
|
||||
|
||||
# Handle user login, write history and, if set, handle showing the register_otp QR code.
|
||||
# if Setting for OTP on first login is enabled, and OTP field is also enabled,
|
||||
# but user isn't using it yet, enable OTP, get QR code and display it, logging the user out.
|
||||
def authenticate_user(user, authenticator, remember=False):
|
||||
login_user(user, remember=remember)
|
||||
signin_history(user.username, authenticator, True)
|
||||
if Setting().get('otp_force') and Setting().get('otp_field_enabled') and not user.otp_secret:
|
||||
user.update_profile(enable_otp=True)
|
||||
user_id = current_user.id
|
||||
prepare_welcome_user(user_id)
|
||||
return redirect(url_for('index.welcome'))
|
||||
return redirect(url_for('index.login'))
|
||||
|
||||
# Prepare user to enter /welcome screen, otherwise they won't have permission to do so
|
||||
def prepare_welcome_user(user_id):
|
||||
logout_user()
|
||||
session['welcome_user_id'] = user_id
|
||||
|
||||
@index_bp.route('/logout')
|
||||
def logout():
|
||||
@ -674,7 +681,12 @@ def register():
|
||||
if result and result['status']:
|
||||
if Setting().get('verify_user_email'):
|
||||
send_account_verification(email)
|
||||
return redirect(url_for('index.login'))
|
||||
if Setting().get('otp_force') and Setting().get('otp_field_enabled'):
|
||||
user.update_profile(enable_otp=True)
|
||||
prepare_welcome_user(user.id)
|
||||
return redirect(url_for('index.welcome'))
|
||||
else:
|
||||
return redirect(url_for('index.login'))
|
||||
else:
|
||||
return render_template('register.html',
|
||||
error=result['msg'])
|
||||
@ -684,6 +696,28 @@ def register():
|
||||
return render_template('errors/404.html'), 404
|
||||
|
||||
|
||||
# Show welcome page on first login if otp_force is enabled
|
||||
@index_bp.route('/welcome', methods=['GET', 'POST'])
|
||||
def welcome():
|
||||
if 'welcome_user_id' not in session:
|
||||
return redirect(url_for('index.index'))
|
||||
|
||||
user = User(id=session['welcome_user_id'])
|
||||
encoded_img_data = base64.b64encode(user.get_qrcode_value())
|
||||
|
||||
if request.method == 'GET':
|
||||
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user)
|
||||
elif request.method == 'POST':
|
||||
otp_token = request.form.get('otptoken', '')
|
||||
if otp_token and otp_token.isdigit():
|
||||
good_token = user.verify_totp(otp_token)
|
||||
if not good_token:
|
||||
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user, error="Invalid token")
|
||||
else:
|
||||
return render_template('register_otp.html', qrcode_image=encoded_img_data.decode(), user=user, error="Token required")
|
||||
session.pop('welcome_user_id')
|
||||
return redirect(url_for('index.index'))
|
||||
|
||||
@index_bp.route('/confirm/<token>', methods=['GET'])
|
||||
def confirm_email(token):
|
||||
email = confirm_token(token)
|
||||
@ -1037,9 +1071,7 @@ def saml_authorized():
|
||||
user.plain_text_password = None
|
||||
user.update_profile()
|
||||
session['authentication_type'] = 'SAML'
|
||||
login_user(user, remember=False)
|
||||
signin_history(user.username, 'SAML', True)
|
||||
return redirect(url_for('index.login'))
|
||||
return authenticate_user(user, 'SAML')
|
||||
else:
|
||||
return render_template('errors/SAML.html', errors=errors)
|
||||
|
||||
|
@ -1,7 +1,4 @@
|
||||
import datetime
|
||||
import qrcode as qrc
|
||||
import qrcode.image.svg as qrc_svg
|
||||
from io import BytesIO
|
||||
from flask import Blueprint, request, render_template, make_response, jsonify, redirect, url_for, g, session, current_app
|
||||
from flask_login import current_user, login_required, login_manager
|
||||
|
||||
@ -94,13 +91,9 @@ def qrcode():
|
||||
if not current_user:
|
||||
return redirect(url_for('index'))
|
||||
|
||||
img = qrc.make(current_user.get_totp_uri(),
|
||||
image_factory=qrc_svg.SvgPathImage)
|
||||
stream = BytesIO()
|
||||
img.save(stream)
|
||||
return stream.getvalue(), 200, {
|
||||
return current_user.get_qrcode_value(), 200, {
|
||||
'Content-Type': 'image/svg+xml',
|
||||
'Cache-Control': 'no-cache, no-store, must-revalidate',
|
||||
'Pragma': 'no-cache',
|
||||
'Expires': '0'
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user