diff --git a/powerdnsadmin/decorators.py b/powerdnsadmin/decorators.py index 50e4a3f..7be69f3 100644 --- a/powerdnsadmin/decorators.py +++ b/powerdnsadmin/decorators.py @@ -133,50 +133,63 @@ def api_basic_auth(f): @wraps(f) def decorated_function(*args, **kwargs): auth_header = request.headers.get('Authorization') - if auth_header: - auth_header = auth_header.replace('Basic ', '', 1) - try: - auth_header = str(base64.b64decode(auth_header), 'utf-8') - username, password = auth_header.split(":") - except binascii.Error as e: - current_app.logger.error( - 'Invalid base64-encoded of credential. Error {0}'.format( - e)) - abort(401) - except TypeError as e: - current_app.logger.error('Error: {0}'.format(e)) - abort(401) - - user = User(username=username, - password=password, - plain_text_password=password) - - try: - if Setting().get('verify_user_email') and user.email and not user.confirmed: - current_app.logger.warning( - 'Basic authentication failed for user {} because of unverified email address' - .format(username)) - abort(401) - - auth_method = request.args.get('auth_method', 'LOCAL') - auth_method = 'LDAP' if auth_method != 'LOCAL' else 'LOCAL' - auth = user.is_validate(method=auth_method, - src_ip=request.remote_addr) - - if not auth: - current_app.logger.error('Checking user password failed') - abort(401) - else: - user = User.query.filter(User.username == username).first() - current_user = user # lgtm [py/unused-local-variable] - except Exception as e: - current_app.logger.error('Error: {0}'.format(e)) - abort(401) - else: + if not auth_header: current_app.logger.error('Error: Authorization header missing!') abort(401) + if auth_header[:6] != "Basic ": + current_app.logger.error('Error: Unsupported authorization mechanism!') + abort(401) + + # Remove "Basic " from the header value + auth_header = auth_header[6:] + + try: + auth_header = str(base64.b64decode(auth_header), 'utf-8') + # NK: We use auth_components here as we don't know if we'll have a :, we split it maximum 1 times to grab the + # username, the rest of the string would be the password. + auth_components = auth_header.split(':', maxsplit=1) + except (binascii.Error, UnicodeDecodeError) as e: + current_app.logger.error( + 'Invalid base64-encoded of credential. Error {0}'.format( + e)) + abort(401) + except TypeError as e: + current_app.logger.error('Error: {0}'.format(e)) + abort(401) + + # If we don't have two auth components (username, password), we can abort + if len(auth_components) != 2: + abort(401) + + (username, password) = auth_components + + user = User(username=username, + password=password, + plain_text_password=password) + + try: + if Setting().get('verify_user_email') and user.email and not user.confirmed: + current_app.logger.warning( + 'Basic authentication failed for user {} because of unverified email address' + .format(username)) + abort(401) + + auth_method = request.args.get('auth_method', 'LOCAL') + auth_method = 'LDAP' if auth_method != 'LOCAL' else 'LOCAL' + auth = user.is_validate(method=auth_method, src_ip=request.remote_addr) + + if not auth: + current_app.logger.error('Checking user password failed') + abort(401) + else: + user = User.query.filter(User.username == username).first() + current_user = user # lgtm [py/unused-local-variable] + except Exception as e: + current_app.logger.error('Error: {0}'.format(e)) + abort(401) + return f(*args, **kwargs) return decorated_function