mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-06-15 04:26:05 +00:00
Merge remote-tracking branch 'keesbos/mgmt'
This commit is contained in:
@ -1,20 +1,40 @@
|
||||
import json
|
||||
from urllib.parse import urljoin
|
||||
from flask import Blueprint, g, request, abort, current_app, make_response, jsonify
|
||||
from flask import (
|
||||
Blueprint, g, request, abort, current_app, make_response, jsonify,
|
||||
)
|
||||
from flask_login import current_user
|
||||
|
||||
from ..models.base import db
|
||||
from ..models import User,Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey
|
||||
from ..models import (
|
||||
User, Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey,
|
||||
Role,
|
||||
)
|
||||
from ..lib import utils, helper
|
||||
from ..lib.schema import ApiKeySchema, DomainSchema, ApiPlainKeySchema
|
||||
from ..lib.errors import DomainNotExists, DomainAlreadyExists, DomainAccessForbidden, RequestIsNotJSON, ApiKeyCreateFail, ApiKeyNotUsable, NotEnoughPrivileges
|
||||
from ..decorators import api_basic_auth, api_can_create_domain, is_json, apikey_auth, apikey_is_admin, apikey_can_access_domain
|
||||
from ..lib.schema import (
|
||||
ApiKeySchema, DomainSchema, ApiPlainKeySchema, UserSchema, AccountSchema,
|
||||
)
|
||||
from ..lib.errors import (
|
||||
StructuredException,
|
||||
DomainNotExists, DomainAlreadyExists, DomainAccessForbidden,
|
||||
RequestIsNotJSON, ApiKeyCreateFail, ApiKeyNotUsable, NotEnoughPrivileges,
|
||||
AccountCreateFail, AccountUpdateFail, AccountDeleteFail,
|
||||
UserCreateFail, UserUpdateFail, UserDeleteFail,
|
||||
)
|
||||
from ..decorators import (
|
||||
api_basic_auth, api_can_create_domain, is_json, apikey_auth,
|
||||
apikey_is_admin, apikey_can_access_domain, api_role_can,
|
||||
)
|
||||
import random
|
||||
import string
|
||||
|
||||
api_bp = Blueprint('api', __name__, url_prefix='/api/v1')
|
||||
|
||||
apikey_schema = ApiKeySchema(many=True)
|
||||
domain_schema = DomainSchema(many=True)
|
||||
apikey_plain_schema = ApiPlainKeySchema(many=True)
|
||||
user_schema = UserSchema(many=True)
|
||||
account_schema = AccountSchema(many=True)
|
||||
|
||||
|
||||
def get_user_domains():
|
||||
@ -53,6 +73,22 @@ def get_user_apikeys(domain_name=None):
|
||||
return info
|
||||
|
||||
|
||||
def get_role_id(role_name, role_id=None):
|
||||
if role_id:
|
||||
if role_name:
|
||||
role = Role.query.filter(Role.name == role_name).first()
|
||||
if not role or role.id != role_id:
|
||||
role_id = None
|
||||
else:
|
||||
role = Role.query.filter(Role.id == role_id).first()
|
||||
if not role:
|
||||
role_id = None
|
||||
else:
|
||||
role = Role.query.filter(Role.name == role_name).first()
|
||||
role_id = role.id if role else None
|
||||
return role_id
|
||||
|
||||
|
||||
@api_bp.errorhandler(400)
|
||||
def handle_400(err):
|
||||
return json.dumps({"msg": "Bad Request"}), 400
|
||||
@ -73,6 +109,11 @@ def handle_500(err):
|
||||
return json.dumps({"msg": "Internal Server Error"}), 500
|
||||
|
||||
|
||||
@api_bp.errorhandler(StructuredException)
|
||||
def handle_StructuredException(err):
|
||||
return json.dumps(err.to_dict()), err.status_code
|
||||
|
||||
|
||||
@api_bp.errorhandler(DomainNotExists)
|
||||
def handle_domain_not_exists(err):
|
||||
return json.dumps(err.to_dict()), err.status_code
|
||||
@ -113,9 +154,12 @@ def handle_request_is_not_json(err):
|
||||
def before_request():
|
||||
# Check site is in maintenance mode
|
||||
maintenance = Setting().get('maintenance')
|
||||
if maintenance and current_user.is_authenticated and current_user.role.name not in [
|
||||
if (
|
||||
maintenance and current_user.is_authenticated and
|
||||
current_user.role.name not in [
|
||||
'Administrator', 'Operator'
|
||||
]:
|
||||
]
|
||||
):
|
||||
return make_response(
|
||||
jsonify({
|
||||
"status": False,
|
||||
@ -469,6 +513,375 @@ def api_update_apikey(apikey_id):
|
||||
return '', 204
|
||||
|
||||
|
||||
@api_bp.route('/pdnsadmin/users', defaults={'username': None})
|
||||
@api_bp.route('/pdnsadmin/users/<string:username>')
|
||||
@api_basic_auth
|
||||
@api_role_can('list users', allow_self=True)
|
||||
def api_list_users(username=None):
|
||||
if username is None:
|
||||
user_list = [] or User.query.all()
|
||||
else:
|
||||
user_list = [] or User.query.filter(User.username == username).all()
|
||||
if not user_list:
|
||||
abort(404)
|
||||
|
||||
return json.dumps(user_schema.dump(user_list)), 200
|
||||
|
||||
|
||||
@api_bp.route('/pdnsadmin/users', methods=['POST'])
|
||||
@api_basic_auth
|
||||
@api_role_can('create users', allow_self=True)
|
||||
def api_create_user():
|
||||
"""
|
||||
Create new user
|
||||
"""
|
||||
data = request.get_json()
|
||||
username = data['username'] if 'username' in data else None
|
||||
password = data['password'] if 'password' in data else None
|
||||
plain_text_password = (
|
||||
data['plain_text_password']
|
||||
if 'plain_text_password' in data
|
||||
else None
|
||||
)
|
||||
firstname = data['firstname'] if 'firstname' in data else None
|
||||
lastname = data['lastname'] if 'lastname' in data else None
|
||||
email = data['email'] if 'email' in data else None
|
||||
otp_secret = data['otp_secret'] if 'otp_secret' in data else None
|
||||
confirmed = data['confirmed'] if 'confirmed' in data else None
|
||||
role_name = data['role_name'] if 'role_name' in data else None
|
||||
role_id = data['role_id'] if 'role_id' in data else None
|
||||
|
||||
# Create user
|
||||
if not username:
|
||||
current_app.logger.debug('Invalid username {}'.format(username))
|
||||
abort(400)
|
||||
if not confirmed:
|
||||
confirmed = False
|
||||
elif confirmed is not True:
|
||||
current_app.logger.debug('Invalid confirmed {}'.format(confirmed))
|
||||
abort(400)
|
||||
|
||||
if not plain_text_password and not password:
|
||||
plain_text_password = ''.join(
|
||||
random.choice(string.ascii_letters + string.digits)
|
||||
for _ in range(15))
|
||||
if not role_name and not role_id:
|
||||
role_name = 'User'
|
||||
role_id = get_role_id(role_name, role_id)
|
||||
if not role_id:
|
||||
current_app.logger.debug(
|
||||
'Invalid role {}/{}'.format(role_name, role_id))
|
||||
abort(400)
|
||||
|
||||
user = User(
|
||||
username=username,
|
||||
password=password,
|
||||
plain_text_password=plain_text_password,
|
||||
firstname=firstname,
|
||||
lastname=lastname,
|
||||
role_id=role_id,
|
||||
email=email,
|
||||
otp_secret=otp_secret,
|
||||
confirmed=confirmed,
|
||||
)
|
||||
try:
|
||||
result = user.create_local_user()
|
||||
except Exception as e:
|
||||
current_app.logger.error('Create user ({}, {}) error: {}'.format(
|
||||
username, email, e))
|
||||
raise UserCreateFail(message='User create failed')
|
||||
if not result['status']:
|
||||
current_app.logger.warning('Create user ({}, {}) error: {}'.format(
|
||||
username, email, result['msg']))
|
||||
raise UserCreateFail(message=result['msg'])
|
||||
|
||||
history = History(msg='Created user {0}'.format(user.username),
|
||||
created_by=current_user.username)
|
||||
history.add()
|
||||
return json.dumps(user_schema.dump([user])), 201
|
||||
|
||||
|
||||
@api_bp.route('/pdnsadmin/users/<int:user_id>', methods=['PUT'])
|
||||
@api_basic_auth
|
||||
@api_role_can('update users', allow_self=True)
|
||||
def api_update_user(user_id):
|
||||
"""
|
||||
Update existing user
|
||||
"""
|
||||
data = request.get_json()
|
||||
username = data['username'] if 'username' in data else None
|
||||
password = data['password'] if 'password' in data else None
|
||||
plain_text_password = (
|
||||
data['plain_text_password']
|
||||
if 'plain_text_password' in data
|
||||
else None
|
||||
)
|
||||
firstname = data['firstname'] if 'firstname' in data else None
|
||||
lastname = data['lastname'] if 'lastname' in data else None
|
||||
email = data['email'] if 'email' in data else None
|
||||
otp_secret = data['otp_secret'] if 'otp_secret' in data else None
|
||||
confirmed = data['confirmed'] if 'confirmed' in data else None
|
||||
role_name = data['role_name'] if 'role_name' in data else None
|
||||
role_id = data['role_id'] if 'role_id' in data else None
|
||||
|
||||
user = User.query.get(user_id)
|
||||
if not user:
|
||||
current_app.logger.debug("User not found for id {}".format(user_id))
|
||||
abort(404)
|
||||
if username:
|
||||
if username != user.username:
|
||||
current_app.logger.error(
|
||||
'Cannot change username for {}'.format(user.username)
|
||||
)
|
||||
abort(400)
|
||||
if password is not None:
|
||||
user.password = password
|
||||
user.plain_text_password = plain_text_password or ''
|
||||
if firstname is not None:
|
||||
user.firstname = firstname
|
||||
if lastname is not None:
|
||||
user.lastname = lastname
|
||||
if email is not None:
|
||||
user.email = email
|
||||
if otp_secret is not None:
|
||||
user.otp_secret = otp_secret
|
||||
if confirmed is not None:
|
||||
user.confirmed = confirmed
|
||||
if role_name is not None:
|
||||
user.role_id = get_role_id(role_name, role_id)
|
||||
elif role_id is not None:
|
||||
user.role_id = role_id
|
||||
current_app.logger.debug(
|
||||
"Updating user {} ({})".format(user_id, user.username))
|
||||
try:
|
||||
result = user.update_local_user()
|
||||
except Exception as e:
|
||||
current_app.logger.error('Create user ({}, {}) error: {}'.format(
|
||||
username, email, e))
|
||||
raise UserUpdateFail(message='User update failed')
|
||||
if not result['status']:
|
||||
current_app.logger.warning('Update user ({}, {}) error: {}'.format(
|
||||
username, email, result['msg']))
|
||||
raise UserCreateFail(message=result['msg'])
|
||||
|
||||
history = History(msg='Updated user {0}'.format(user.username),
|
||||
created_by=current_user.username)
|
||||
history.add()
|
||||
return '', 204
|
||||
|
||||
|
||||
@api_bp.route('/pdnsadmin/users/<int:user_id>', methods=['DELETE'])
|
||||
@api_basic_auth
|
||||
@api_role_can('delete users')
|
||||
def api_delete_user(user_id):
|
||||
user = User.query.get(user_id)
|
||||
if not user:
|
||||
current_app.logger.debug("User not found for id {}".format(user_id))
|
||||
abort(404)
|
||||
if user.id == current_user.id:
|
||||
current_app.logger.debug("Cannot delete self (id {})".format(user_id))
|
||||
msg = "Cannot delete self"
|
||||
raise UserDeleteFail(message=msg)
|
||||
|
||||
# Remove account associations first
|
||||
user_accounts = Account.query.join(AccountUser).join(
|
||||
User).filter(AccountUser.user_id == user.id,
|
||||
AccountUser.account_id == Account.id).all()
|
||||
for uc in user_accounts:
|
||||
uc.revoke_privileges_by_id(user.id)
|
||||
|
||||
# Then delete the user
|
||||
result = user.delete()
|
||||
if not result:
|
||||
raise UserDeleteFail("Failed to delete user {}".format(
|
||||
user.username))
|
||||
|
||||
history = History(msg='Delete user {0}'.format(user.username),
|
||||
created_by=current_user.username)
|
||||
history.add()
|
||||
return '', 204
|
||||
|
||||
|
||||
@api_bp.route('/pdnsadmin/accounts', defaults={'account_name': None})
|
||||
@api_bp.route('/pdnsadmin/accounts/<string:account_name>')
|
||||
@api_basic_auth
|
||||
@api_role_can('list accounts')
|
||||
def api_list_accounts(account_name):
|
||||
if current_user.role.name not in ['Administrator', 'Operator']:
|
||||
msg = "{} role cannot list accounts".format(current_user.role.name)
|
||||
raise NotEnoughPrivileges(message=msg)
|
||||
else:
|
||||
if account_name is None:
|
||||
account_list = [] or Account.query.all()
|
||||
else:
|
||||
account_list = [] or Account.query.filter(
|
||||
Account.name == account_name).all()
|
||||
if not account_list:
|
||||
abort(404)
|
||||
return json.dumps(account_schema.dump(account_list)), 200
|
||||
|
||||
|
||||
@api_bp.route('/pdnsadmin/accounts', methods=['POST'])
|
||||
@api_basic_auth
|
||||
def api_create_account():
|
||||
if current_user.role.name not in ['Administrator', 'Operator']:
|
||||
msg = "{} role cannot create accounts".format(current_user.role.name)
|
||||
raise NotEnoughPrivileges(message=msg)
|
||||
data = request.get_json()
|
||||
name = data['name'] if 'name' in data else None
|
||||
description = data['description'] if 'description' in data else None
|
||||
contact = data['contact'] if 'contact' in data else None
|
||||
mail = data['mail'] if 'mail' in data else None
|
||||
if not name:
|
||||
current_app.logger.debug("Account name missing")
|
||||
abort(400)
|
||||
|
||||
account = Account(name=name,
|
||||
description=description,
|
||||
contact=contact,
|
||||
mail=mail)
|
||||
|
||||
try:
|
||||
result = account.create_account()
|
||||
except Exception as e:
|
||||
current_app.logger.error('Error: {0}'.format(e))
|
||||
raise AccountCreateFail(message='Account create failed')
|
||||
if not result['status']:
|
||||
raise AccountCreateFail(message=result['msg'])
|
||||
|
||||
history = History(msg='Create account {0}'.format(account.name),
|
||||
created_by=current_user.username)
|
||||
history.add()
|
||||
return json.dumps(account_schema.dump([account])), 201
|
||||
|
||||
|
||||
@api_bp.route('/pdnsadmin/accounts/<int:account_id>', methods=['PUT'])
|
||||
@api_basic_auth
|
||||
@api_role_can('update accounts')
|
||||
def api_update_account(account_id):
|
||||
data = request.get_json()
|
||||
name = data['name'] if 'name' in data else None
|
||||
description = data['description'] if 'description' in data else None
|
||||
contact = data['contact'] if 'contact' in data else None
|
||||
mail = data['mail'] if 'mail' in data else None
|
||||
|
||||
account = Account.query.get(account_id)
|
||||
|
||||
if not account:
|
||||
abort(404)
|
||||
|
||||
if name and name != account.name:
|
||||
abort(400)
|
||||
|
||||
if current_user.role.name not in ['Administrator', 'Operator']:
|
||||
msg = "User role update accounts"
|
||||
raise NotEnoughPrivileges(message=msg)
|
||||
|
||||
if description is not None:
|
||||
account.description = description
|
||||
if contact is not None:
|
||||
account.contact = contact
|
||||
if mail is not None:
|
||||
account.mail = mail
|
||||
|
||||
current_app.logger.debug(
|
||||
"Updating account {} ({})".format(account_id, account.name))
|
||||
result = account.update_account()
|
||||
if not result['status']:
|
||||
raise AccountDeleteFail(message=result['msg'])
|
||||
history = History(msg='Update account {0}'.format(account.name),
|
||||
created_by=current_user.username)
|
||||
history.add()
|
||||
return '', 204
|
||||
|
||||
|
||||
@api_bp.route('/pdnsadmin/accounts/<int:account_id>', methods=['DELETE'])
|
||||
@api_basic_auth
|
||||
@api_role_can('delete accounts')
|
||||
def api_delete_account(account_id):
|
||||
account_list = [] or Account.query.filter(Account.id == account_id).all()
|
||||
if len(account_list) == 1:
|
||||
account = account_list[0]
|
||||
else:
|
||||
abort(404)
|
||||
current_app.logger.debug(
|
||||
"Deleting account {} ({})".format(account_id, account.name))
|
||||
result = account.delete_account()
|
||||
if not result:
|
||||
raise AccountUpdateFail(message=result['msg'])
|
||||
|
||||
history = History(msg='Delete account {0}'.format(account.name),
|
||||
created_by=current_user.username)
|
||||
history.add()
|
||||
return '', 204
|
||||
|
||||
|
||||
@api_bp.route('/pdnsadmin/accounts/users/<int:account_id>', methods=['GET'])
|
||||
@api_basic_auth
|
||||
@api_role_can('list account users')
|
||||
def api_list_account_users(account_id):
|
||||
account = Account.query.get(account_id)
|
||||
if not account:
|
||||
abort(404)
|
||||
user_list = User.query.join(AccountUser).filter(
|
||||
AccountUser.account_id == account_id).all()
|
||||
return json.dumps(user_schema.dump(user_list)), 200
|
||||
|
||||
|
||||
@api_bp.route(
|
||||
'/pdnsadmin/accounts/users/<int:account_id>/<int:user_id>',
|
||||
methods=['PUT'])
|
||||
@api_basic_auth
|
||||
@api_role_can('add user to account')
|
||||
def api_add_account_user(account_id, user_id):
|
||||
account = Account.query.get(account_id)
|
||||
if not account:
|
||||
abort(404)
|
||||
user = User.query.get(user_id)
|
||||
if not user:
|
||||
abort(404)
|
||||
if not account.add_user(user):
|
||||
raise AccountUpdateFail("Cannot add user {} to {}".format(
|
||||
user.username, account.name))
|
||||
|
||||
history = History(
|
||||
msg='Revoke {} user privileges on {}'.format(
|
||||
user.username, account.name),
|
||||
created_by=current_user.username)
|
||||
history.add()
|
||||
return '', 204
|
||||
|
||||
|
||||
@api_bp.route(
|
||||
'/pdnsadmin/accounts/users/<int:account_id>/<int:user_id>',
|
||||
methods=['DELETE'])
|
||||
@api_basic_auth
|
||||
@api_role_can('remove user from account')
|
||||
def api_remove_account_user(account_id, user_id):
|
||||
account = Account.query.get(account_id)
|
||||
if not account:
|
||||
abort(404)
|
||||
user = User.query.get(user_id)
|
||||
if not user:
|
||||
abort(404)
|
||||
user_list = User.query.join(AccountUser).filter(
|
||||
AccountUser.account_id == account_id,
|
||||
AccountUser.user_id == user_id,
|
||||
).all()
|
||||
if not user_list:
|
||||
abort(404)
|
||||
if not account.remove_user(user):
|
||||
raise AccountUpdateFail("Cannot remove user {} from {}".format(
|
||||
user.username, account.name))
|
||||
|
||||
history = History(
|
||||
msg='Revoke {} user privileges on {}'.format(
|
||||
user.username, account.name),
|
||||
created_by=current_user.username)
|
||||
history.add()
|
||||
return '', 204
|
||||
|
||||
|
||||
@api_bp.route(
|
||||
'/servers/<string:server_id>/zones/<string:zone_id>/<path:subpath>',
|
||||
methods=['GET', 'POST', 'PUT', 'PATCH', 'DELETE'])
|
||||
|
Reference in New Issue
Block a user