mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-07-06 22:24:05 +00:00
enh: Enforce Record Restrictions in API (#1089)
Co-authored-by: Tom <tom@tom.com>
This commit is contained in:
@ -5,9 +5,10 @@ from flask import g, request, abort, current_app, Response
|
||||
from flask_login import current_user
|
||||
|
||||
from .models import User, ApiKey, Setting, Domain, Setting
|
||||
from .lib.errors import RequestIsNotJSON, NotEnoughPrivileges
|
||||
from .lib.errors import RequestIsNotJSON, NotEnoughPrivileges, RecordTTLNotAllowed, RecordTypeNotAllowed
|
||||
from .lib.errors import DomainAccessForbidden, DomainOverrideForbidden
|
||||
|
||||
|
||||
def admin_role_required(f):
|
||||
"""
|
||||
Grant access if user is in Administrator role
|
||||
@ -384,6 +385,60 @@ def apikey_can_configure_dnssec(http_methods=[]):
|
||||
return decorated_function
|
||||
return decorator
|
||||
|
||||
def allowed_record_types(f):
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
if request.method == 'GET':
|
||||
return f(*args, **kwargs)
|
||||
|
||||
if g.apikey.role.name in ['Administrator', 'Operator']:
|
||||
return f(*args, **kwargs)
|
||||
|
||||
records_allowed_to_edit = Setting().get_records_allow_to_edit()
|
||||
content = request.get_json()
|
||||
try:
|
||||
for record in content['rrsets']:
|
||||
if 'type' not in record:
|
||||
raise RecordTypeNotAllowed()
|
||||
|
||||
if record['type'] not in records_allowed_to_edit:
|
||||
current_app.logger.error(f"Error: Record type not allowed: {record['type']}")
|
||||
raise RecordTypeNotAllowed(message=f"Record type not allowed: {record['type']}")
|
||||
except (TypeError, KeyError) as e:
|
||||
raise e
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return decorated_function
|
||||
|
||||
def allowed_record_ttl(f):
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
if not Setting().get('enforce_api_ttl'):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
if request.method == 'GET':
|
||||
return f(*args, **kwargs)
|
||||
|
||||
if g.apikey.role.name in ['Administrator', 'Operator']:
|
||||
return f(*args, **kwargs)
|
||||
|
||||
allowed_ttls = Setting().get_ttl_options()
|
||||
allowed_numeric_ttls = [ ttl[0] for ttl in allowed_ttls ]
|
||||
content = request.get_json()
|
||||
try:
|
||||
for record in content['rrsets']:
|
||||
if 'ttl' not in record:
|
||||
raise RecordTTLNotAllowed()
|
||||
|
||||
if record['ttl'] not in allowed_numeric_ttls:
|
||||
current_app.logger.error(f"Error: Record TTL not allowed: {record['ttl']}")
|
||||
raise RecordTTLNotAllowed(message=f"Record TTL not allowed: {record['ttl']}")
|
||||
except (TypeError, KeyError) as e:
|
||||
raise e
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return decorated_function
|
||||
|
||||
|
||||
def apikey_auth(f):
|
||||
@wraps(f)
|
||||
|
Reference in New Issue
Block a user