mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2024-11-08 22:50:26 +00:00
156 lines
5.8 KiB
Python
156 lines
5.8 KiB
Python
import datetime
|
|
import hashlib
|
|
import imghdr
|
|
import mimetypes
|
|
|
|
from flask import Blueprint, request, render_template, make_response, jsonify, redirect, url_for, g, session, \
|
|
current_app, after_this_request, abort
|
|
from flask_login import current_user, login_required, login_manager
|
|
|
|
from ..models.user import User, Anonymous
|
|
from ..models.setting import Setting
|
|
|
|
user_bp = Blueprint('user',
|
|
__name__,
|
|
template_folder='templates',
|
|
url_prefix='/user')
|
|
|
|
|
|
@user_bp.before_request
|
|
def before_request():
|
|
# Check if user is anonymous
|
|
g.user = current_user
|
|
login_manager.anonymous_user = Anonymous
|
|
|
|
# Check site is in maintenance mode
|
|
maintenance = Setting().get('maintenance')
|
|
if maintenance and current_user.is_authenticated and current_user.role.name not in [
|
|
'Administrator', 'Operator'
|
|
]:
|
|
return render_template('maintenance.html')
|
|
|
|
# Manage session timeout
|
|
session.permanent = True
|
|
current_app.permanent_session_lifetime = datetime.timedelta(
|
|
minutes=int(Setting().get('session_timeout')))
|
|
session.modified = True
|
|
|
|
|
|
@user_bp.route('/profile', methods=['GET', 'POST'])
|
|
@login_required
|
|
def profile():
|
|
if request.method == 'GET':
|
|
return render_template('user_profile.html')
|
|
if request.method == 'POST':
|
|
if session['authentication_type'] == 'LOCAL':
|
|
firstname = request.form.get('firstname', '').strip()
|
|
lastname = request.form.get('lastname', '').strip()
|
|
email = request.form.get('email', '').strip()
|
|
new_password = request.form.get('password', '')
|
|
else:
|
|
firstname = lastname = email = new_password = ''
|
|
current_app.logger.warning(
|
|
'Authenticated externally. User {0} information will not allowed to update the profile'
|
|
.format(current_user.username))
|
|
|
|
if request.data:
|
|
jdata = request.json
|
|
data = jdata['data']
|
|
if jdata['action'] == 'enable_otp':
|
|
if session['authentication_type'] in ['LOCAL', 'LDAP']:
|
|
enable_otp = data['enable_otp']
|
|
user = User(username=current_user.username)
|
|
user.update_profile(enable_otp=enable_otp)
|
|
return make_response(
|
|
jsonify({
|
|
'status':
|
|
'ok',
|
|
'msg':
|
|
'Change OTP Authentication successfully. Status: {0}'
|
|
.format(enable_otp)
|
|
}), 200)
|
|
else:
|
|
return make_response(
|
|
jsonify({
|
|
'status':
|
|
'error',
|
|
'msg':
|
|
'User {0} is externally. You are not allowed to update the OTP'
|
|
.format(current_user.username)
|
|
}), 400)
|
|
|
|
user = User(username=current_user.username,
|
|
plain_text_password=new_password,
|
|
firstname=firstname,
|
|
lastname=lastname,
|
|
email=email,
|
|
reload_info=False)
|
|
user.update_profile()
|
|
|
|
return render_template('user_profile.html')
|
|
|
|
|
|
@user_bp.route('/qrcode')
|
|
@login_required
|
|
def qrcode():
|
|
if not current_user:
|
|
return redirect(url_for('index'))
|
|
|
|
return current_user.get_qrcode_value(), 200, {
|
|
'Content-Type': 'image/svg+xml',
|
|
'Cache-Control': 'no-cache, no-store, must-revalidate',
|
|
'Pragma': 'no-cache',
|
|
'Expires': '0'
|
|
}
|
|
|
|
|
|
@user_bp.route('/image', methods=['GET'])
|
|
@login_required
|
|
def image():
|
|
"""Returns the user profile image or avatar."""
|
|
|
|
@after_this_request
|
|
def add_cache_headers(response_):
|
|
"""When the response is ok, add cache headers."""
|
|
if 200 <= response_.status_code <= 399:
|
|
response_.cache_control.private = True
|
|
response_.cache_control.max_age = int(datetime.timedelta(days=1).total_seconds())
|
|
return response_
|
|
|
|
def return_image(content, content_type=None):
|
|
"""Return the given binary image content. Guess the type if not given."""
|
|
if not content_type:
|
|
guess = mimetypes.guess_type('example.' + imghdr.what(None, h=content))
|
|
if guess and guess[0]:
|
|
content_type = guess[0]
|
|
|
|
return content, 200, {'Content-Type': content_type}
|
|
|
|
# To prevent "cache poisoning", the username query parameter is required
|
|
if request.args.get('username', None) != current_user.username:
|
|
abort(400)
|
|
|
|
setting = Setting()
|
|
|
|
if session['authentication_type'] == 'LDAP':
|
|
search_filter = '(&({0}={1}){2})'.format(setting.get('ldap_filter_username'),
|
|
current_user.username,
|
|
setting.get('ldap_filter_basic'))
|
|
result = User().ldap_search(search_filter, setting.get('ldap_base_dn'))
|
|
if result and result[0] and result[0][0] and result[0][0][1]:
|
|
user_obj = result[0][0][1]
|
|
for key in ['jpegPhoto', 'thumbnailPhoto']:
|
|
if key in user_obj and user_obj[key] and user_obj[key][0]:
|
|
current_app.logger.debug(f'Return {key} from ldap as user image')
|
|
return return_image(user_obj[key][0])
|
|
|
|
email = current_user.email
|
|
if email and setting.get('gravatar_enabled'):
|
|
hash_ = hashlib.md5(email.encode('utf-8')).hexdigest()
|
|
url = f'https://s.gravatar.com/avatar/{hash_}?s=100'
|
|
current_app.logger.debug('Redirect user image request to gravatar')
|
|
return redirect(url, 307)
|
|
|
|
# Fallback to the local default image
|
|
return current_app.send_static_file('img/user_image.png')
|