mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-01-05 18:05:39 +00:00
Fix API key query
This commit is contained in:
parent
49908b9039
commit
462190a913
@ -57,8 +57,10 @@ class ApiKeyCreateFail(StructuredException):
|
||||
class ApiKeyNotUsable(StructuredException):
|
||||
status_code = 400
|
||||
|
||||
def __init__(self, name=None, message="Api key must have domains or have \
|
||||
administrative role" ):
|
||||
def __init__(
|
||||
self,
|
||||
name=None,
|
||||
message="Api key must have domains or have administrative role"):
|
||||
StructuredException.__init__(self)
|
||||
self.message = message
|
||||
self.name = name
|
||||
|
@ -4,7 +4,7 @@ from flask import Blueprint, g, request, abort, current_app, make_response, json
|
||||
from flask_login import current_user
|
||||
|
||||
from ..models.base import db
|
||||
from ..models import Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey
|
||||
from ..models import User,Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey
|
||||
from ..lib import utils, helper
|
||||
from ..lib.schema import ApiKeySchema, DomainSchema, ApiPlainKeySchema
|
||||
from ..lib.errors import DomainNotExists, DomainAlreadyExists, DomainAccessForbidden, RequestIsNotJSON, ApiKeyCreateFail, ApiKeyNotUsable, NotEnoughPrivileges
|
||||
@ -30,6 +30,29 @@ def get_user_domains():
|
||||
return domains
|
||||
|
||||
|
||||
def get_user_apikeys(domain_name=None):
|
||||
info = []
|
||||
apikey_query = db.session.query(ApiKey) \
|
||||
.join(Domain.apikeys) \
|
||||
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
|
||||
.outerjoin(Account, Domain.account_id == Account.id) \
|
||||
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
|
||||
.filter(
|
||||
db.or_(
|
||||
DomainUser.user_id == User.id,
|
||||
AccountUser.user_id == User.id
|
||||
)
|
||||
) \
|
||||
.filter(User.id == current_user.id)
|
||||
|
||||
if domain_name:
|
||||
info = apikey_query.filter(Domain.name == domain_name).all()
|
||||
else:
|
||||
info = apikey_query.all()
|
||||
|
||||
return info
|
||||
|
||||
|
||||
@api_bp.errorhandler(400)
|
||||
def handle_400(err):
|
||||
return json.dumps({"msg": "Bad Request"}), 400
|
||||
@ -118,10 +141,10 @@ def api_login_create_zone():
|
||||
|
||||
try:
|
||||
resp = utils.fetch_remote(urljoin(pdns_api_url, api_full_uri),
|
||||
method='POST',
|
||||
data=request.get_json(force=True),
|
||||
headers=headers,
|
||||
accept='application/json; q=1')
|
||||
method='POST',
|
||||
data=request.get_json(force=True),
|
||||
headers=headers,
|
||||
accept='application/json; q=1')
|
||||
except Exception as e:
|
||||
current_app.logger.error("Cannot create domain. Error: {}".format(e))
|
||||
abort(500)
|
||||
@ -147,7 +170,7 @@ def api_login_create_zone():
|
||||
domain.update()
|
||||
|
||||
if resp.status_code == 409:
|
||||
raise(DomainAlreadyExists)
|
||||
raise (DomainAlreadyExists)
|
||||
|
||||
return resp.content, resp.status_code, resp.headers.items()
|
||||
|
||||
@ -289,10 +312,10 @@ def api_get_apikeys(domain_name):
|
||||
|
||||
if current_user.role.name not in ['Administrator', 'Operator']:
|
||||
if domain_name:
|
||||
msg = "Check if domain {0} exists and \
|
||||
is allowed for user." .format(domain_name)
|
||||
msg = "Check if domain {0} exists and is allowed for user.".format(
|
||||
domain_name)
|
||||
current_app.logger.debug(msg)
|
||||
apikeys = current_user.get_apikeys(domain_name)
|
||||
apikeys = get_user_apikeys(domain_name)
|
||||
|
||||
if not apikeys:
|
||||
raise DomainAccessForbidden(name=domain_name)
|
||||
@ -304,7 +327,7 @@ def api_get_apikeys(domain_name):
|
||||
current_app.logger.debug(msg)
|
||||
|
||||
try:
|
||||
apikeys = current_user.get_apikeys()
|
||||
apikeys = get_user_apikeys()
|
||||
current_app.logger.debug(apikey_schema.dump(apikeys))
|
||||
except Exception as e:
|
||||
current_app.logger.error('Error: {0}'.format(e))
|
||||
@ -332,7 +355,7 @@ def api_delete_apikey(apikey_id):
|
||||
current_app.logger.debug(current_user.role.name)
|
||||
|
||||
if current_user.role.name not in ['Administrator', 'Operator']:
|
||||
apikeys = current_user.get_apikeys()
|
||||
apikeys = get_user_apikeys()
|
||||
user_domains_obj_list = current_user.get_domain().all()
|
||||
apikey_domains_obj_list = apikey.domains
|
||||
user_domains_list = [item.name for item in user_domains_obj_list]
|
||||
@ -393,7 +416,7 @@ def api_update_apikey(apikey_id):
|
||||
current_app.logger.error(msg)
|
||||
raise NotEnoughPrivileges(message=msg)
|
||||
|
||||
apikeys = current_user.get_apikeys()
|
||||
apikeys = get_user_apikeys()
|
||||
apikey_domains = [item.name for item in apikey.domains]
|
||||
apikeys_ids = [apikey_item.id for apikey_item in apikeys]
|
||||
|
||||
@ -468,20 +491,16 @@ def api_zone_forward(server_id, zone_id):
|
||||
if request.method != 'GET' and request.method != 'DELETE':
|
||||
data = request.get_json(force=True)
|
||||
for rrset_data in data['rrsets']:
|
||||
history = History(
|
||||
msg='{0} zone {1} record of {2}'.format(rrset_data['changetype'].lower(),
|
||||
rrset_data['type'],
|
||||
rrset_data['name'].rstrip('.')),
|
||||
detail=json.dumps(data),
|
||||
created_by=g.apikey.description
|
||||
)
|
||||
history = History(msg='{0} zone {1} record of {2}'.format(
|
||||
rrset_data['changetype'].lower(), rrset_data['type'],
|
||||
rrset_data['name'].rstrip('.')),
|
||||
detail=json.dumps(data),
|
||||
created_by=g.apikey.description)
|
||||
history.add()
|
||||
elif request.method == 'DELETE':
|
||||
history = History(
|
||||
msg='Deleted zone {0}'.format(domain.name),
|
||||
detail='',
|
||||
created_by=g.apikey.description
|
||||
)
|
||||
history = History(msg='Deleted zone {0}'.format(domain.name),
|
||||
detail='',
|
||||
created_by=g.apikey.description)
|
||||
history.add()
|
||||
return resp.content, resp.status_code, resp.headers.items()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user