powerdns-admin/powerdnsadmin/routes/user.py
Matt Scott 4442577b0b
Created a new model to represent the Flask-Session storage schema sessions with a method for removing expired sessions.
Added a trigger for the Flask-Session model's session clean-up method to the `before_request` handler of the user router.
2023-11-24 06:26:38 -05:00

174 lines
6.5 KiB
Python

import datetime
import hashlib
import imghdr
import mimetypes
from flask import Blueprint, request, render_template, make_response, jsonify, redirect, url_for, g, session, \
current_app, after_this_request, abort
from flask_login import current_user, login_required, login_manager
from ..models.user import User, Anonymous
from ..models.setting import Setting
from .index import password_policy_check
user_bp = Blueprint('user',
__name__,
template_folder='templates',
url_prefix='/user')
@user_bp.before_request
def before_request():
# Check if user is anonymous
g.user = current_user
login_manager.anonymous_user = Anonymous
# Check site is in maintenance mode
maintenance = Setting().get('maintenance')
if maintenance and current_user.is_authenticated and current_user.role.name not in [
'Administrator', 'Operator'
]:
return render_template('maintenance.html')
# Manage session timeout
session.permanent = True
current_app.permanent_session_lifetime = datetime.timedelta(
minutes=int(Setting().get('session_timeout')))
session.modified = True
# Clean up expired sessions in the database
if Setting().get('session_type') == 'sqlalchemy':
from ..models.sessions import Sessions
Sessions().clean_up_expired_sessions()
@user_bp.route('/profile', methods=['GET', 'POST'])
@login_required
def profile():
if request.method == 'GET':
return render_template('user_profile.html')
if request.method == 'POST':
if session['authentication_type'] == 'LOCAL':
firstname = request.form.get('firstname', '').strip()
lastname = request.form.get('lastname', '').strip()
email = request.form.get('email', '').strip()
new_password = request.form.get('password', '')
else:
firstname = lastname = email = new_password = ''
current_app.logger.warning(
'Authenticated externally. User {0} information will not allowed to update the profile'
.format(current_user.username))
if request.data:
jdata = request.json
data = jdata['data']
if jdata['action'] == 'enable_otp':
if session['authentication_type'] in ['LOCAL', 'LDAP']:
enable_otp = data['enable_otp']
user = User(username=current_user.username)
user.update_profile(enable_otp=enable_otp)
return make_response(
jsonify({
'status':
'ok',
'msg':
'Change OTP Authentication successfully. Status: {0}'
.format(enable_otp)
}), 200)
else:
return make_response(
jsonify({
'status':
'error',
'msg':
'User {0} is externally. You are not allowed to update the OTP'
.format(current_user.username)
}), 400)
(password_policy_pass, password_policy) = password_policy_check(current_user.get_user_info_by_username(), new_password)
if not password_policy_pass:
if request.data:
return make_response(
jsonify({
'status': 'error',
'msg': password_policy['password'],
}), 400)
return render_template('user_profile.html', error_messages=password_policy)
user = User(username=current_user.username,
plain_text_password=new_password,
firstname=firstname,
lastname=lastname,
email=email,
reload_info=False)
user.update_profile()
return render_template('user_profile.html')
@user_bp.route('/qrcode')
@login_required
def qrcode():
if not current_user:
return redirect(url_for('index'))
return current_user.get_qrcode_value(), 200, {
'Content-Type': 'image/svg+xml',
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Expires': '0'
}
@user_bp.route('/image', methods=['GET'])
@login_required
def image():
"""Returns the user profile image or avatar."""
@after_this_request
def add_cache_headers(response_):
"""When the response is ok, add cache headers."""
if 200 <= response_.status_code <= 399:
response_.cache_control.private = True
response_.cache_control.max_age = int(datetime.timedelta(days=1).total_seconds())
return response_
def return_image(content, content_type=None):
"""Return the given binary image content. Guess the type if not given."""
if not content_type:
guess = mimetypes.guess_type('example.' + imghdr.what(None, h=content))
if guess and guess[0]:
content_type = guess[0]
return content, 200, {'Content-Type': content_type}
# To prevent "cache poisoning", the username query parameter is required
if request.args.get('username', None) != current_user.username:
abort(400)
setting = Setting()
if session['authentication_type'] == 'LDAP':
search_filter = '(&({0}={1}){2})'.format(setting.get('ldap_filter_username'),
current_user.username,
setting.get('ldap_filter_basic'))
result = User().ldap_search(search_filter, setting.get('ldap_base_dn'))
if result and result[0] and result[0][0] and result[0][0][1]:
user_obj = result[0][0][1]
for key in ['jpegPhoto', 'thumbnailPhoto']:
if key in user_obj and user_obj[key] and user_obj[key][0]:
current_app.logger.debug(f'Return {key} from ldap as user image')
return return_image(user_obj[key][0])
email = current_user.email
if email and setting.get('gravatar_enabled'):
hash_ = hashlib.md5(email.encode('utf-8')).hexdigest()
url = f'https://s.gravatar.com/avatar/{hash_}?s=100'
current_app.logger.debug('Redirect user image request to gravatar')
return redirect(url, 307)
# Fallback to the local default image
return current_app.send_static_file('img/user_image.png')