mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2024-12-04 19:15:30 +00:00
fix(auth:basic): improved API basic auth handling to avoid exceptions
Currently passing an invalid Basic auth header (random string base64 encoded) would result in an exception being raised due to a `username, password = auth_header.split()`. I refactored the code in this decorator by checking explicitly that we are doing basic authentication then by checking the number of entries returned by the split. I also added exception handling for invalid UTF-8 code sequences. Tested with a fuzzer. Tested with valid and invalid credentials. This fixes #1447.
This commit is contained in:
parent
57b4457add
commit
24f94abc32
@ -133,50 +133,63 @@ def api_basic_auth(f):
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
auth_header = request.headers.get('Authorization')
|
||||
if auth_header:
|
||||
auth_header = auth_header.replace('Basic ', '', 1)
|
||||
|
||||
try:
|
||||
auth_header = str(base64.b64decode(auth_header), 'utf-8')
|
||||
username, password = auth_header.split(":")
|
||||
except binascii.Error as e:
|
||||
current_app.logger.error(
|
||||
'Invalid base64-encoded of credential. Error {0}'.format(
|
||||
e))
|
||||
abort(401)
|
||||
except TypeError as e:
|
||||
current_app.logger.error('Error: {0}'.format(e))
|
||||
abort(401)
|
||||
|
||||
user = User(username=username,
|
||||
password=password,
|
||||
plain_text_password=password)
|
||||
|
||||
try:
|
||||
if Setting().get('verify_user_email') and user.email and not user.confirmed:
|
||||
current_app.logger.warning(
|
||||
'Basic authentication failed for user {} because of unverified email address'
|
||||
.format(username))
|
||||
abort(401)
|
||||
|
||||
auth_method = request.args.get('auth_method', 'LOCAL')
|
||||
auth_method = 'LDAP' if auth_method != 'LOCAL' else 'LOCAL'
|
||||
auth = user.is_validate(method=auth_method,
|
||||
src_ip=request.remote_addr)
|
||||
|
||||
if not auth:
|
||||
current_app.logger.error('Checking user password failed')
|
||||
abort(401)
|
||||
else:
|
||||
user = User.query.filter(User.username == username).first()
|
||||
current_user = user # lgtm [py/unused-local-variable]
|
||||
except Exception as e:
|
||||
current_app.logger.error('Error: {0}'.format(e))
|
||||
abort(401)
|
||||
else:
|
||||
if not auth_header:
|
||||
current_app.logger.error('Error: Authorization header missing!')
|
||||
abort(401)
|
||||
|
||||
if auth_header[:6] != "Basic ":
|
||||
current_app.logger.error('Error: Unsupported authorization mechanism!')
|
||||
abort(401)
|
||||
|
||||
# Remove "Basic " from the header value
|
||||
auth_header = auth_header[6:]
|
||||
|
||||
try:
|
||||
auth_header = str(base64.b64decode(auth_header), 'utf-8')
|
||||
# NK: We use auth_components here as we don't know if we'll have a :, we split it maximum 1 times to grab the
|
||||
# username, the rest of the string would be the password.
|
||||
auth_components = auth_header.split(':', maxsplit=1)
|
||||
except (binascii.Error, UnicodeDecodeError) as e:
|
||||
current_app.logger.error(
|
||||
'Invalid base64-encoded of credential. Error {0}'.format(
|
||||
e))
|
||||
abort(401)
|
||||
except TypeError as e:
|
||||
current_app.logger.error('Error: {0}'.format(e))
|
||||
abort(401)
|
||||
|
||||
# If we don't have two auth components (username, password), we can abort
|
||||
if len(auth_components) != 2:
|
||||
abort(401)
|
||||
|
||||
(username, password) = auth_components
|
||||
|
||||
user = User(username=username,
|
||||
password=password,
|
||||
plain_text_password=password)
|
||||
|
||||
try:
|
||||
if Setting().get('verify_user_email') and user.email and not user.confirmed:
|
||||
current_app.logger.warning(
|
||||
'Basic authentication failed for user {} because of unverified email address'
|
||||
.format(username))
|
||||
abort(401)
|
||||
|
||||
auth_method = request.args.get('auth_method', 'LOCAL')
|
||||
auth_method = 'LDAP' if auth_method != 'LOCAL' else 'LOCAL'
|
||||
auth = user.is_validate(method=auth_method, src_ip=request.remote_addr)
|
||||
|
||||
if not auth:
|
||||
current_app.logger.error('Checking user password failed')
|
||||
abort(401)
|
||||
else:
|
||||
user = User.query.filter(User.username == username).first()
|
||||
current_user = user # lgtm [py/unused-local-variable]
|
||||
except Exception as e:
|
||||
current_app.logger.error('Error: {0}'.format(e))
|
||||
abort(401)
|
||||
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return decorated_function
|
||||
|
Loading…
Reference in New Issue
Block a user