Merge branch 'master' of github.com:PowerDNS-Admin/PowerDNS-Admin

This commit is contained in:
Matt Scott 2023-02-17 12:19:43 -05:00
commit 5cdacc2e71
27 changed files with 721 additions and 579 deletions

View File

@ -1,11 +1,11 @@
version: "2.1" version: "3.8"
services: services:
powerdns-admin: powerdns-admin:
image: powerdns-admin-test
build: build:
context: . context: .
dockerfile: docker-test/Dockerfile dockerfile: docker-test/Dockerfile
image: powerdns-admin-test
container_name: powerdns-admin-test container_name: powerdns-admin-test
ports: ports:
- "9191:80" - "9191:80"
@ -17,10 +17,10 @@ services:
- pdns-server - pdns-server
pdns-server: pdns-server:
image: pdns-server-test
build: build:
context: . context: .
dockerfile: docker-test/Dockerfile.pdns dockerfile: docker-test/Dockerfile.pdns
image: pdns-server-test
ports: ports:
- "5053:53" - "5053:53"
- "5053:53/udp" - "5053:53/udp"

View File

@ -1,15 +1,35 @@
FROM debian:stretch-slim FROM debian:bullseye-slim
LABEL maintainer="k@ndk.name" LABEL maintainer="k@ndk.name"
ENV LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8 LANGUAGE=en_US.UTF-8 ENV LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8 LANGUAGE=en_US.UTF-8
RUN apt-get update -y \ RUN apt-get update -y \
&& apt-get install -y --no-install-recommends apt-transport-https locales locales-all python3-pip python3-setuptools python3-dev curl libsasl2-dev libldap2-dev libssl-dev libxml2-dev libxslt1-dev libxmlsec1-dev libffi-dev build-essential libmariadb-dev-compat \ && apt-get install -y --no-install-recommends \
&& curl -sL https://deb.nodesource.com/setup_10.x | bash - \ apt-transport-https \
curl \
build-essential \
libffi-dev \
libldap2-dev \
libmariadb-dev-compat \
libsasl2-dev \
libssl-dev \
libxml2-dev \
libxmlsec1-dev \
libxmlsec1-openssl \
libxslt1-dev \
locales \
locales-all \
pkg-config \
python3-dev \
python3-pip \
python3-setuptools \
&& curl -sL https://deb.nodesource.com/setup_lts.x | bash - \
&& curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \ && curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
&& echo "deb https://dl.yarnpkg.com/debian/ stable main" > /etc/apt/sources.list.d/yarn.list \ && echo "deb https://dl.yarnpkg.com/debian/ stable main" > /etc/apt/sources.list.d/yarn.list \
&& apt-get update -y \ && apt-get update -y \
&& apt-get install -y nodejs yarn \ && apt-get install -y --no-install-recommends \
nodejs \
yarn \
&& apt-get clean -y \ && apt-get clean -y \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
@ -21,8 +41,6 @@ RUN pip3 install --upgrade pip
RUN pip3 install -r requirements.txt RUN pip3 install -r requirements.txt
COPY . /app COPY . /app
COPY ./docker/entrypoint.sh /usr/local/bin/
RUN chmod +x /usr/local/bin/entrypoint.sh
ENV FLASK_APP=powerdnsadmin/__init__.py ENV FLASK_APP=powerdnsadmin/__init__.py
RUN yarn install --pure-lockfile --production \ RUN yarn install --pure-lockfile --production \
@ -31,4 +49,4 @@ RUN yarn install --pure-lockfile --production \
COPY ./docker-test/wait-for-pdns.sh /opt COPY ./docker-test/wait-for-pdns.sh /opt
RUN chmod u+x /opt/wait-for-pdns.sh RUN chmod u+x /opt/wait-for-pdns.sh
CMD ["/opt/wait-for-pdns.sh", "/usr/local/bin/pytest","--capture=no","-vv"] CMD ["/opt/wait-for-pdns.sh", "/usr/local/bin/pytest", "-W", "ignore::DeprecationWarning", "--capture=no", "-vv"]

View File

@ -10,9 +10,9 @@ fi
# Import schema structure # Import schema structure
if [ -e "/data/pdns.sql" ]; then if [ -e "/data/pdns.sql" ]; then
rm /data/pdns.db rm -f /data/pdns.db
cat /data/pdns.sql | sqlite3 /data/pdns.db cat /data/pdns.sql | sqlite3 /data/pdns.db
rm /data/pdns.sql rm -f /data/pdns.sql
echo "Imported schema structure" echo "Imported schema structure"
fi fi

View File

@ -1,8 +1,13 @@
from __future__ import with_statement from __future__ import with_statement
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
import logging import logging
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from flask import current_app
from alembic import context
# this is the Alembic Config object, which provides # this is the Alembic Config object, which provides
# access to the values within the .ini file in use. # access to the values within the .ini file in use.
@ -17,9 +22,9 @@ logger = logging.getLogger('alembic.env')
# for 'autogenerate' support # for 'autogenerate' support
# from myapp import mymodel # from myapp import mymodel
# target_metadata = mymodel.Base.metadata # target_metadata = mymodel.Base.metadata
from flask import current_app config.set_main_option(
config.set_main_option('sqlalchemy.url', 'sqlalchemy.url',
current_app.config.get('SQLALCHEMY_DATABASE_URI').replace("%","%%")) str(current_app.extensions['migrate'].db.engine.url).replace('%', '%%'))
target_metadata = current_app.extensions['migrate'].db.metadata target_metadata = current_app.extensions['migrate'].db.metadata
# other values from the config, defined by the needs of env.py, # other values from the config, defined by the needs of env.py,
@ -41,7 +46,9 @@ def run_migrations_offline():
""" """
url = config.get_main_option("sqlalchemy.url") url = config.get_main_option("sqlalchemy.url")
context.configure(url=url) context.configure(
url=url, target_metadata=target_metadata, literal_binds=True
)
with context.begin_transaction(): with context.begin_transaction():
context.run_migrations() context.run_migrations()
@ -65,22 +72,23 @@ def run_migrations_online():
directives[:] = [] directives[:] = []
logger.info('No changes in schema detected.') logger.info('No changes in schema detected.')
engine = engine_from_config(config.get_section(config.config_ini_section), connectable = engine_from_config(
prefix='sqlalchemy.', config.get_section(config.config_ini_section),
poolclass=pool.NullPool) prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
connection = engine.connect() with connectable.connect() as connection:
context.configure(connection=connection, context.configure(
target_metadata=target_metadata, connection=connection,
process_revision_directives=process_revision_directives, target_metadata=target_metadata,
render_as_batch=config.get_main_option('sqlalchemy.url').startswith('sqlite:'), process_revision_directives=process_revision_directives,
**current_app.extensions['migrate'].configure_args) **current_app.extensions['migrate'].configure_args
)
try:
with context.begin_transaction(): with context.begin_transaction():
context.run_migrations() context.run_migrations()
finally:
connection.close()
if context.is_offline_mode(): if context.is_offline_mode():
run_migrations_offline() run_migrations_offline()

View File

@ -56,9 +56,9 @@ def seed_data():
op.bulk_insert(template_table, op.bulk_insert(template_table,
[ [
{id: 1, 'name': 'basic_template_1', 'description': 'Basic Template #1'}, {'id': 1, 'name': 'basic_template_1', 'description': 'Basic Template #1'},
{id: 2, 'name': 'basic_template_2', 'description': 'Basic Template #2'}, {'id': 2, 'name': 'basic_template_2', 'description': 'Basic Template #2'},
{id: 3, 'name': 'basic_template_3', 'description': 'Basic Template #3'} {'id': 3, 'name': 'basic_template_3', 'description': 'Basic Template #3'}
] ]
) )

View File

@ -388,7 +388,7 @@ def apikey_can_configure_dnssec(http_methods=[]):
def allowed_record_types(f): def allowed_record_types(f):
@wraps(f) @wraps(f)
def decorated_function(*args, **kwargs): def decorated_function(*args, **kwargs):
if request.method == 'GET': if request.method in ['GET', 'DELETE', 'PUT']:
return f(*args, **kwargs) return f(*args, **kwargs)
if g.apikey.role.name in ['Administrator', 'Operator']: if g.apikey.role.name in ['Administrator', 'Operator']:

View File

@ -5,7 +5,7 @@ class Role(db.Model):
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), index=True, unique=True) name = db.Column(db.String(64), index=True, unique=True)
description = db.Column(db.String(128)) description = db.Column(db.String(128))
users = db.relationship('User', backref='role', lazy=True) users = db.relationship('User', back_populates='role', lazy=True)
apikeys = db.relationship('ApiKey', back_populates='role', lazy=True) apikeys = db.relationship('ApiKey', back_populates='role', lazy=True)
def __init__(self, id=None, name=None, description=None): def __init__(self, id=None, name=None, description=None):
@ -20,4 +20,4 @@ class Role(db.Model):
self.description = description self.description = description
def __repr__(self): def __repr__(self):
return '<Role {0}r>'.format(self.name) return '<Role {0}>'.format(self.name)

View File

@ -34,6 +34,7 @@ class User(db.Model):
otp_secret = db.Column(db.String(16)) otp_secret = db.Column(db.String(16))
confirmed = db.Column(db.SmallInteger, nullable=False, default=0) confirmed = db.Column(db.SmallInteger, nullable=False, default=0)
role_id = db.Column(db.Integer, db.ForeignKey('role.id')) role_id = db.Column(db.Integer, db.ForeignKey('role.id'))
role = db.relationship('Role', back_populates="users", lazy=True)
accounts = None accounts = None
def __init__(self, def __init__(self,

View File

@ -1,22 +1,21 @@
import json import json
from urllib.parse import urljoin import secrets
import string
from base64 import b64encode from base64 import b64encode
from flask import ( from urllib.parse import urljoin
Blueprint, g, request, abort, current_app, make_response, jsonify,
) from flask import (Blueprint, g, request, abort, current_app, make_response, jsonify)
from flask_login import current_user from flask_login import current_user
from .base import csrf from .base import csrf
from ..models.base import db from ..decorators import (
from ..models import ( api_basic_auth, api_can_create_domain, is_json, apikey_auth,
User, Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey, apikey_can_create_domain, apikey_can_remove_domain,
Role, apikey_is_admin, apikey_can_access_domain, apikey_can_configure_dnssec,
api_role_can, apikey_or_basic_auth,
callback_if_request_body_contains_key, allowed_record_types, allowed_record_ttl
) )
from ..lib import utils, helper from ..lib import utils, helper
from ..lib.schema import (
ApiKeySchema, DomainSchema, ApiPlainKeySchema, UserSchema, AccountSchema,
UserDetailedSchema,
)
from ..lib.errors import ( from ..lib.errors import (
StructuredException, StructuredException,
DomainNotExists, DomainAlreadyExists, DomainAccessForbidden, DomainNotExists, DomainAlreadyExists, DomainAccessForbidden,
@ -26,15 +25,15 @@ from ..lib.errors import (
UserCreateFail, UserCreateDuplicate, UserUpdateFail, UserDeleteFail, UserCreateFail, UserCreateDuplicate, UserUpdateFail, UserDeleteFail,
UserUpdateFailEmail, InvalidAccountNameException UserUpdateFailEmail, InvalidAccountNameException
) )
from ..decorators import ( from ..lib.schema import (
api_basic_auth, api_can_create_domain, is_json, apikey_auth, ApiKeySchema, DomainSchema, ApiPlainKeySchema, UserSchema, AccountSchema,
apikey_can_create_domain, apikey_can_remove_domain, UserDetailedSchema,
apikey_is_admin, apikey_can_access_domain, apikey_can_configure_dnssec,
api_role_can, apikey_or_basic_auth,
callback_if_request_body_contains_key, allowed_record_types, allowed_record_ttl
) )
import secrets from ..models import (
import string User, Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey,
Role,
)
from ..models.base import db
api_bp = Blueprint('api', __name__, url_prefix='/api/v1') api_bp = Blueprint('api', __name__, url_prefix='/api/v1')
apilist_bp = Blueprint('apilist', __name__, url_prefix='/') apilist_bp = Blueprint('apilist', __name__, url_prefix='/')
@ -56,10 +55,10 @@ def get_user_domains():
.outerjoin(Account, Domain.account_id == Account.id) \ .outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \ .outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.filter( .filter(
db.or_( db.or_(
DomainUser.user_id == current_user.id, DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id AccountUser.user_id == current_user.id
)).all() )).all()
return domains return domains
@ -71,10 +70,10 @@ def get_user_apikeys(domain_name=None):
.outerjoin(Account, Domain.account_id == Account.id) \ .outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \ .outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.filter( .filter(
db.or_( db.or_(
DomainUser.user_id == User.id, DomainUser.user_id == User.id,
AccountUser.user_id == User.id AccountUser.user_id == User.id
) )
) \ ) \
.filter(User.id == current_user.id) .filter(User.id == current_user.id)
@ -167,12 +166,7 @@ def handle_request_is_not_json(err):
def before_request(): def before_request():
# Check site is in maintenance mode # Check site is in maintenance mode
maintenance = Setting().get('maintenance') maintenance = Setting().get('maintenance')
if ( if (maintenance and current_user.is_authenticated and current_user.role.name not in ['Administrator', 'Operator']):
maintenance and current_user.is_authenticated and
current_user.role.name not in [
'Administrator', 'Operator'
]
):
return make_response( return make_response(
jsonify({ jsonify({
"status": False, "status": False,
@ -224,14 +218,13 @@ def api_login_create_zone():
history = History(msg='Add domain {0}'.format( history = History(msg='Add domain {0}'.format(
data['name'].rstrip('.')), data['name'].rstrip('.')),
detail=json.dumps(data), detail=json.dumps(data),
created_by=current_user.username, created_by=current_user.username,
domain_id=domain_id) domain_id=domain_id)
history.add() history.add()
if current_user.role.name not in ['Administrator', 'Operator']: if current_user.role.name not in ['Administrator', 'Operator']:
current_app.logger.debug( current_app.logger.debug("User is ordinary user, assigning created domain")
"User is ordinary user, assigning created domain")
domain = Domain(name=data['name'].rstrip('.')) domain = Domain(name=data['name'].rstrip('.'))
domain.update() domain.update()
domain.grant_privileges([current_user.id]) domain.grant_privileges([current_user.id])
@ -299,9 +292,9 @@ def api_login_delete_zone(domain_name):
history = History(msg='Delete domain {0}'.format( history = History(msg='Delete domain {0}'.format(
utils.pretty_domain_name(domain_name)), utils.pretty_domain_name(domain_name)),
detail='', detail='',
created_by=current_user.username, created_by=current_user.username,
domain_id=domain_id) domain_id=domain_id)
history.add() history.add()
except Exception as e: except Exception as e:
@ -326,14 +319,14 @@ def api_generate_apikey():
if 'domains' not in data: if 'domains' not in data:
domains = [] domains = []
elif not isinstance(data['domains'], (list, )): elif not isinstance(data['domains'], (list,)):
abort(400) abort(400)
else: else:
domains = [d['name'] if isinstance(d, dict) else d for d in data['domains']] domains = [d['name'] if isinstance(d, dict) else d for d in data['domains']]
if 'accounts' not in data: if 'accounts' not in data:
accounts = [] accounts = []
elif not isinstance(data['accounts'], (list, )): elif not isinstance(data['accounts'], (list,)):
abort(400) abort(400)
else: else:
accounts = [a['name'] if isinstance(a, dict) else a for a in data['accounts']] accounts = [a['name'] if isinstance(a, dict) else a for a in data['accounts']]
@ -385,8 +378,7 @@ def api_generate_apikey():
user_domain_list = [item.name for item in user_domain_obj_list] user_domain_list = [item.name for item in user_domain_obj_list]
current_app.logger.debug("Input domain list: {0}".format(domain_list)) current_app.logger.debug("Input domain list: {0}".format(domain_list))
current_app.logger.debug( current_app.logger.debug("User domain list: {0}".format(user_domain_list))
"User domain list: {0}".format(user_domain_list))
inter = set(domain_list).intersection(set(user_domain_list)) inter = set(domain_list).intersection(set(user_domain_list))
@ -539,14 +531,14 @@ def api_update_apikey(apikey_id):
if 'domains' not in data: if 'domains' not in data:
domains = None domains = None
elif not isinstance(data['domains'], (list, )): elif not isinstance(data['domains'], (list,)):
abort(400) abort(400)
else: else:
domains = [d['name'] if isinstance(d, dict) else d for d in data['domains']] domains = [d['name'] if isinstance(d, dict) else d for d in data['domains']]
if 'accounts' not in data: if 'accounts' not in data:
accounts = None accounts = None
elif not isinstance(data['accounts'], (list, )): elif not isinstance(data['accounts'], (list,)):
abort(400) abort(400)
else: else:
accounts = [a['name'] if isinstance(a, dict) else a for a in data['accounts']] accounts = [a['name'] if isinstance(a, dict) else a for a in data['accounts']]
@ -963,9 +955,7 @@ def api_delete_account(account_id):
account = account_list[0] account = account_list[0]
else: else:
abort(404) abort(404)
current_app.logger.debug( current_app.logger.debug(f'Deleting Account {account.name}')
f'Deleting Account {account.name}'
)
# Remove account association from domains first # Remove account association from domains first
if len(account.domains) > 0: if len(account.domains) > 0:
@ -1047,7 +1037,7 @@ def api_remove_account_user(account_id, user_id):
user_list = User.query.join(AccountUser).filter( user_list = User.query.join(AccountUser).filter(
AccountUser.account_id == account_id, AccountUser.account_id == account_id,
AccountUser.user_id == user_id, AccountUser.user_id == user_id,
).all() ).all()
if not user_list: if not user_list:
abort(404) abort(404)
if not account.remove_user(user): if not account.remove_user(user):
@ -1123,9 +1113,9 @@ def api_zone_forward(server_id, zone_id):
history = History(msg='{0} zone {1} record of {2}'.format( history = History(msg='{0} zone {1} record of {2}'.format(
rrset_data['changetype'].lower(), rrset_data['type'], rrset_data['changetype'].lower(), rrset_data['type'],
rrset_data['name'].rstrip('.')), rrset_data['name'].rstrip('.')),
detail=json.dumps(data), detail=json.dumps(data),
created_by=g.apikey.description, created_by=g.apikey.description,
domain_id=Domain().get_id_by_name(zone_id.rstrip('.'))) domain_id=Domain().get_id_by_name(zone_id.rstrip('.')))
history.add() history.add()
elif request.method == 'DELETE': elif request.method == 'DELETE':
history = History(msg='Deleted zone {0}'.format(zone_id.rstrip('.')), history = History(msg='Deleted zone {0}'.format(zone_id.rstrip('.')),
@ -1192,17 +1182,13 @@ def api_get_zones(server_id):
return jsonify(domain_schema.dump(domain_obj_list)), 200 return jsonify(domain_schema.dump(domain_obj_list)), 200
else: else:
resp = helper.forward_request() resp = helper.forward_request()
if ( if (g.apikey.role.name not in ['Administrator', 'Operator'] and resp.status_code == 200):
g.apikey.role.name not in ['Administrator', 'Operator']
and resp.status_code == 200
):
domain_list = [d['name'] domain_list = [d['name']
for d in domain_schema.dump(g.apikey.domains)] for d in domain_schema.dump(g.apikey.domains)]
accounts_domains = [d.name for a in g.apikey.accounts for d in a.domains] accounts_domains = [d.name for a in g.apikey.accounts for d in a.domains]
allowed_domains = set(domain_list + accounts_domains) allowed_domains = set(domain_list + accounts_domains)
current_app.logger.debug("Account domains: {}".format( current_app.logger.debug("Account domains: {}".format('/'.join(accounts_domains)))
'/'.join(accounts_domains)))
content = json.dumps([i for i in json.loads(resp.content) content = json.dumps([i for i in json.loads(resp.content)
if i['name'].rstrip('.') in allowed_domains]) if i['name'].rstrip('.') in allowed_domains])
return content, resp.status_code, resp.headers.items() return content, resp.status_code, resp.headers.items()
@ -1223,6 +1209,7 @@ def api_server_config_forward(server_id):
resp = helper.forward_request() resp = helper.forward_request()
return resp.content, resp.status_code, resp.headers.items() return resp.content, resp.status_code, resp.headers.items()
# The endpoint to synchronize Domains in background # The endpoint to synchronize Domains in background
@api_bp.route('/sync_domains', methods=['GET']) @api_bp.route('/sync_domains', methods=['GET'])
@apikey_or_basic_auth @apikey_or_basic_auth
@ -1231,6 +1218,7 @@ def sync_domains():
domain.update() domain.update()
return 'Finished synchronization in background', 200 return 'Finished synchronization in background', 200
@api_bp.route('/health', methods=['GET']) @api_bp.route('/health', methods=['GET'])
@apikey_auth @apikey_auth
def health(): def health():
@ -1244,7 +1232,8 @@ def health():
try: try:
domain.get_domain_info(domain_to_query.name) domain.get_domain_info(domain_to_query.name)
except Exception as e: except Exception as e:
current_app.logger.error("Health Check - Failed to query authoritative server for domain {}".format(domain_to_query.name)) current_app.logger.error(
"Health Check - Failed to query authoritative server for domain {}".format(domain_to_query.name))
return make_response("Down", 503) return make_response("Down", 503)
return make_response("Up", 200) return make_response("Up", 200)

View File

@ -581,7 +581,7 @@ def get_azure_groups(uri):
def authenticate_user(user, authenticator, remember=False): def authenticate_user(user, authenticator, remember=False):
login_user(user, remember=remember) login_user(user, remember=remember)
signin_history(user.username, authenticator, True) signin_history(user.username, authenticator, True)
if Setting().get('otp_force') and Setting().get('otp_field_enabled') and not user.otp_secret: if Setting().get('otp_force') and Setting().get('otp_field_enabled') and not user.otp_secret and session['authentication_type'] not in ['OAuth']:
user.update_profile(enable_otp=True) user.update_profile(enable_otp=True)
user_id = current_user.id user_id = current_user.id
prepare_welcome_user(user_id) prepare_welcome_user(user_id)

View File

@ -1,33 +1,41 @@
Flask==1.1.2 mysqlclient==2.1.1
Flask-Assets==2.0
Flask-Login==0.5.0
Flask-SQLAlchemy==2.4.4
Flask-Migrate==2.5.3
SQLAlchemy==1.3.19
mysqlclient==2.0.1
configobj==5.0.6 configobj==5.0.6
bcrypt>=3.1.7 bcrypt==4.0.1
requests==2.24.0 requests==2.28.1
python-ldap==3.4.2 python-ldap==3.4.3
pyotp==2.4.0 pyotp==2.8.0
qrcode==6.1 qrcode==7.3.1
dnspython>=1.16.0 dnspython>=1.16.0
gunicorn==20.0.4 gunicorn==20.1.0
python3-saml python3-saml==1.14.0
pytz==2020.1 pytz==2022.7
cssmin==0.2.0 cssmin==0.2.0
rjsmin==1.2.0 rjsmin==1.2.1
Authlib==0.15 Authlib==1.2.0
Flask-SeaSurf==1.1.1 Flask-SeaSurf==1.1.1
bravado-core==5.17.0 bravado-core==5.17.1
jsonschema[format]>=2.5.1,<4.0.0 # until https://github.com/Yelp/bravado-core/pull/385
lima==0.5 lima==0.5
pytest==6.1.1
pytimeparse==1.1.8 pytimeparse==1.1.8
PyYAML==5.4 alembic==1.9.0
Flask-SSLify==0.1.5 certifi==2022.12.7
cffi==1.15.1
passlib==1.7.4
pyasn1==0.4.8
webcolors==1.12
zipp==3.11.0
Flask==2.2.2
Flask-Assets==2.0
Flask-Login==0.6.2
Flask-SQLAlchemy==3.0.2
Flask-Migrate==4.0.0
SQLAlchemy==1.4.45
pyOpenSSL==22.1.0
Authlib==1.2.0
Flask-SeaSurf==1.1.1
PyYAML==6.0
Flask-Mail==0.9.1 Flask-Mail==0.9.1
flask-session==0.3.2 Flask-SSLify==0.1.5
Jinja2==3.0.3 Flask-Session==0.4.0
itsdangerous==2.0.1 lxml==4.6.5
werkzeug==2.0.3 pytest==7.2.0
cryptography==36.0.2

View File

@ -1,26 +1,28 @@
import os import os
import pytest
import flask_migrate
from base64 import b64encode from base64 import b64encode
from powerdnsadmin import create_app import pytest
from powerdnsadmin.models.base import db from flask_migrate import upgrade as flask_migrate_upgrade
from powerdnsadmin.models.user import User
from powerdnsadmin.models.setting import Setting
from powerdnsadmin.models.api_key import ApiKey
app = create_app('../configs/test.py') from powerdnsadmin import create_app
ctx = app.app_context() from powerdnsadmin.models.api_key import ApiKey
ctx.push() from powerdnsadmin.models.base import db
from powerdnsadmin.models.setting import Setting
from powerdnsadmin.models.user import User
@pytest.fixture(scope="session")
def app():
app = create_app('../configs/test.py')
yield app
@pytest.fixture @pytest.fixture
def client(): def client(app):
app.config['TESTING'] = True app.config['TESTING'] = True
client = app.test_client() client = app.test_client()
yield client yield client
def load_data(setting_name, *args, **kwargs): def load_data(setting_name, *args, **kwargs):
if setting_name == 'maintenance': if setting_name == 'maintenance':
return 0 return 0
@ -36,20 +38,22 @@ def load_data(setting_name, *args, **kwargs):
return 10 return 10
if setting_name == 'allow_user_create_domain': if setting_name == 'allow_user_create_domain':
return True return True
if setting_name == 'allow_user_remove_domain':
return True
@pytest.fixture @pytest.fixture
def test_admin_user(): def test_admin_user(app):
return app.config.get('TEST_ADMIN_USER') return app.config.get('TEST_ADMIN_USER')
@pytest.fixture @pytest.fixture
def test_user(): def test_user(app):
return app.config.get('TEST_USER') return app.config.get('TEST_USER')
@pytest.fixture @pytest.fixture
def basic_auth_admin_headers(): def basic_auth_admin_headers(app):
test_admin_user = app.config.get('TEST_ADMIN_USER') test_admin_user = app.config.get('TEST_ADMIN_USER')
test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD') test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD')
user_pass = "{0}:{1}".format(test_admin_user, test_admin_pass) user_pass = "{0}:{1}".format(test_admin_user, test_admin_pass)
@ -61,7 +65,7 @@ def basic_auth_admin_headers():
@pytest.fixture @pytest.fixture
def basic_auth_user_headers(): def basic_auth_user_headers(app):
test_user = app.config.get('TEST_USER') test_user = app.config.get('TEST_USER')
test_user_pass = app.config.get('TEST_USER_PASSWORD') test_user_pass = app.config.get('TEST_USER_PASSWORD')
user_pass = "{0}:{1}".format(test_user, test_user_pass) user_pass = "{0}:{1}".format(test_user, test_user_pass)
@ -73,7 +77,8 @@ def basic_auth_user_headers():
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def initial_data(): def initial_data(app):
pdns_proto = os.environ['PDNS_PROTO'] pdns_proto = os.environ['PDNS_PROTO']
pdns_host = os.environ['PDNS_HOST'] pdns_host = os.environ['PDNS_HOST']
pdns_port = os.environ['PDNS_PORT'] pdns_port = os.environ['PDNS_PORT']
@ -83,46 +88,44 @@ def initial_data():
api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY']) api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY'])
allow_create_domain_setting = Setting('allow_user_create_domain', True) allow_create_domain_setting = Setting('allow_user_create_domain', True)
try: with app.app_context():
flask_migrate.upgrade() try:
flask_migrate_upgrade(directory="migrations")
db.session.add(api_url_setting)
db.session.add(api_key_setting)
db.session.add(allow_create_domain_setting)
db.session.add(api_url_setting) test_user = app.config.get('TEST_USER')
db.session.add(api_key_setting) test_user_pass = app.config.get('TEST_USER_PASSWORD')
db.session.add(allow_create_domain_setting) test_admin_user = app.config.get('TEST_ADMIN_USER')
test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD')
test_user = app.config.get('TEST_USER') admin_user = User(username=test_admin_user,
test_user_pass = app.config.get('TEST_USER_PASSWORD') plain_text_password=test_admin_pass,
test_admin_user = app.config.get('TEST_ADMIN_USER') email="admin@admin.com")
test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD') ret = admin_user.create_local_user()
admin_user = User(username=test_admin_user, if not ret['status']:
plain_text_password=test_admin_pass, raise Exception("Error occurred creating user {0}".format(ret['msg']))
email="admin@admin.com")
msg = admin_user.create_local_user()
if not msg: ordinary_user = User(username=test_user,
raise Exception("Error occurred creating user {0}".format(msg)) plain_text_password=test_user_pass,
email="test@test.com")
ret = ordinary_user.create_local_user()
ordinary_user = User(username=test_user, if not ret['status']:
plain_text_password=test_user_pass, raise Exception("Error occurred creating user {0}".format(ret['msg']))
email="test@test.com")
msg = ordinary_user.create_local_user()
if not msg: except Exception as e:
raise Exception("Error occurred creating user {0}".format(msg)) print("Unexpected ERROR: {0}".format(e))
raise e
except Exception as e:
print("Unexpected ERROR: {0}".format(e))
raise e
yield yield
db.session.close()
os.unlink(app.config['TEST_DB_LOCATION']) os.unlink(app.config['TEST_DB_LOCATION'])
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def initial_apikey_data(): def initial_apikey_data(app):
pdns_proto = os.environ['PDNS_PROTO'] pdns_proto = os.environ['PDNS_PROTO']
pdns_host = os.environ['PDNS_HOST'] pdns_host = os.environ['PDNS_HOST']
pdns_port = os.environ['PDNS_PORT'] pdns_port = os.environ['PDNS_PORT']
@ -131,42 +134,42 @@ def initial_apikey_data():
api_url_setting = Setting('pdns_api_url', pdns_api_url) api_url_setting = Setting('pdns_api_url', pdns_api_url)
api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY']) api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY'])
allow_create_domain_setting = Setting('allow_user_create_domain', True) allow_create_domain_setting = Setting('allow_user_create_domain', True)
allow_remove_domain_setting = Setting('allow_user_remove_domain', True)
try: with app.app_context():
flask_migrate.upgrade() try:
flask_migrate_upgrade(directory="migrations")
db.session.add(api_url_setting)
db.session.add(api_key_setting)
db.session.add(allow_create_domain_setting)
db.session.add(allow_remove_domain_setting)
db.session.add(api_url_setting) test_user_apikey = app.config.get('TEST_USER_APIKEY')
db.session.add(api_key_setting) test_admin_apikey = app.config.get('TEST_ADMIN_APIKEY')
db.session.add(allow_create_domain_setting)
test_user_apikey = app.config.get('TEST_USER_APIKEY') dummy_apikey = ApiKey(desc="dummy", role_name="Administrator")
test_admin_apikey = app.config.get('TEST_ADMIN_APIKEY')
dummy_apikey = ApiKey(desc="dummy", role_name="Administrator") admin_key = dummy_apikey.get_hashed_password(
plain_text_password=test_admin_apikey).decode('utf-8')
admin_key = dummy_apikey.get_hashed_password( admin_apikey = ApiKey(key=admin_key,
plain_text_password=test_admin_apikey).decode('utf-8') desc="test admin apikey",
role_name="Administrator")
admin_apikey.create()
admin_apikey = ApiKey(key=admin_key, user_key = dummy_apikey.get_hashed_password(
desc="test admin apikey", plain_text_password=test_user_apikey).decode('utf-8')
role_name="Administrator")
admin_apikey.create()
user_key = dummy_apikey.get_hashed_password( user_apikey = ApiKey(key=user_key,
plain_text_password=test_user_apikey).decode('utf-8') desc="test user apikey",
role_name="User")
user_apikey.create()
user_apikey = ApiKey(key=user_key, except Exception as e:
desc="test user apikey", print("Unexpected ERROR: {0}".format(e))
role_name="User") raise e
user_apikey.create()
except Exception as e:
print("Unexpected ERROR: {0}".format(e))
raise e
yield yield
db.session.close()
os.unlink(app.config['TEST_DB_LOCATION']) os.unlink(app.config['TEST_DB_LOCATION'])
@ -183,61 +186,51 @@ def zone_data():
@pytest.fixture @pytest.fixture
def created_zone_data(): def created_zone_data():
data = { data = {
'url': 'url': '/api/v1/servers/localhost/zones/example.org.',
'/api/v1/servers/localhost/zones/example.org.', 'soa_edit_api': 'DEFAULT',
'soa_edit_api': 'last_check': 0,
'DEFAULT',
'last_check':
0,
'masters': [], 'masters': [],
'dnssec': 'dnssec': False,
False, 'notified_serial': 0,
'notified_serial': 'nsec3narrow': False,
0, 'serial': 2019013101,
'nsec3narrow': 'nsec3param': '',
False, 'soa_edit': '',
'serial': 'api_rectify': False,
2019013101, 'kind': 'Native',
'nsec3param':
'',
'soa_edit':
'',
'api_rectify':
False,
'kind':
'Native',
'rrsets': [{ 'rrsets': [{
'comments': [], 'comments': [],
'type': 'type': 'SOA',
'SOA', 'name': 'example.org.',
'name': 'ttl': 3600,
'example.org.',
'ttl':
3600,
'records': [{ 'records': [{
'content': 'content': 'a.misconfigured.powerdns.server. hostmaster.example.org. 2019013101 10800 3600 604800 3600',
'a.misconfigured.powerdns.server. hostmaster.example.org. 2019013101 10800 3600 604800 3600',
'disabled': False 'disabled': False
}] }]
}, { }, {
'comments': [], 'comments': [],
'type': 'type': 'NS',
'NS', 'name': 'example.org.',
'name': 'ttl': 3600,
'example.org.',
'ttl':
3600,
'records': [{ 'records': [{
'content': 'ns1.example.org.', 'content': 'ns1.example.org.',
'disabled': False 'disabled': False
}] }]
}], }],
'name': 'name': 'example.org.',
'example.org.', 'account': '',
'account': 'id': 'example.org.'
'', }
'id': return data
'example.org.'
def user_data(app):
test_user = app.config.get('TEST_USER')
test_user_pass = app.config.get('TEST_USER_PASSWORD')
data = {
"username": test_user,
"plain_text_password": test_user_pass,
"email": "test@test.com"
} }
return data return data
@ -257,37 +250,39 @@ def admin_apikey_data():
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def user_apikey_integration(): def user_apikey_integration(app):
test_user_apikey = app.config.get('TEST_USER_APIKEY') test_user_apikey = app.config.get('TEST_USER_APIKEY')
headers = create_apikey_headers(test_user_apikey) headers = create_apikey_headers(test_user_apikey)
return headers return headers
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def admin_apikey_integration(): def admin_apikey_integration(app):
test_user_apikey = app.config.get('TEST_ADMIN_APIKEY') test_user_apikey = app.config.get('TEST_ADMIN_APIKEY')
headers = create_apikey_headers(test_user_apikey) headers = create_apikey_headers(test_user_apikey)
return headers return headers
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def user_apikey(): def user_apikey(app):
data = user_apikey_data() with app.app_context():
api_key = ApiKey(desc=data['description'], data = user_apikey_data()
role_name=data['role'], api_key = ApiKey(desc=data['description'],
domains=[]) role_name=data['role'],
headers = create_apikey_headers(api_key.plain_key) domains=[])
return headers headers = create_apikey_headers(api_key.plain_key)
return headers
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def admin_apikey(): def admin_apikey(app):
data = admin_apikey_data() with app.app_context():
api_key = ApiKey(desc=data['description'], data = admin_apikey_data()
role_name=data['role'], api_key = ApiKey(desc=data['description'],
domains=[]) role_name=data['role'],
headers = create_apikey_headers(api_key.plain_key) domains=[])
return headers headers = create_apikey_headers(api_key.plain_key)
return headers
def create_apikey_headers(passw): def create_apikey_headers(passw):

View File

@ -4,12 +4,11 @@ from collections import namedtuple
from powerdnsadmin.lib.validators import validate_apikey from powerdnsadmin.lib.validators import validate_apikey
from powerdnsadmin.lib.schema import ApiKeySchema from powerdnsadmin.lib.schema import ApiKeySchema
from tests.fixtures import client, initial_data, basic_auth_admin_headers from tests.conftest import user_apikey_data, admin_apikey_data
from tests.fixtures import user_apikey_data, admin_apikey_data, zone_data
class TestIntegrationApiApiKeyAdminUser(object): class TestIntegrationApiApiKeyAdminUser(object):
def test_empty_get(self, client, initial_data, basic_auth_admin_headers): def test_empty_get(self, initial_data, client, basic_auth_admin_headers):
res = client.get("/api/v1/pdnsadmin/apikeys", res = client.get("/api/v1/pdnsadmin/apikeys",
headers=basic_auth_admin_headers) headers=basic_auth_admin_headers)
data = res.get_json(force=True) data = res.get_json(force=True)
@ -19,7 +18,7 @@ class TestIntegrationApiApiKeyAdminUser(object):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"apikey_data", "apikey_data",
[user_apikey_data(), admin_apikey_data()]) [user_apikey_data(), admin_apikey_data()])
def test_create_apikey(self, client, initial_data, apikey_data, zone_data, def test_create_apikey(self, initial_data, client, apikey_data, zone_data,
basic_auth_admin_headers): basic_auth_admin_headers):
res = client.post("/api/v1/pdnsadmin/zones", res = client.post("/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers, headers=basic_auth_admin_headers,
@ -39,7 +38,7 @@ class TestIntegrationApiApiKeyAdminUser(object):
assert res.status_code == 201 assert res.status_code == 201
apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}" apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}"
apikey_url = apikey_url_format.format(data[0]['id']) apikey_url = apikey_url_format.format(data['id'])
res = client.delete(apikey_url, headers=basic_auth_admin_headers) res = client.delete(apikey_url, headers=basic_auth_admin_headers)
@ -54,7 +53,7 @@ class TestIntegrationApiApiKeyAdminUser(object):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"apikey_data", "apikey_data",
[user_apikey_data(), admin_apikey_data()]) [user_apikey_data(), admin_apikey_data()])
def test_get_multiple_apikey(self, client, initial_data, apikey_data, def test_get_multiple_apikey(self, initial_data, client, apikey_data,
zone_data, basic_auth_admin_headers): zone_data, basic_auth_admin_headers):
res = client.post("/api/v1/pdnsadmin/zones", res = client.post("/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers, headers=basic_auth_admin_headers,
@ -103,7 +102,7 @@ class TestIntegrationApiApiKeyAdminUser(object):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"apikey_data", "apikey_data",
[user_apikey_data(), admin_apikey_data()]) [user_apikey_data(), admin_apikey_data()])
def test_delete_apikey(self, client, initial_data, apikey_data, zone_data, def test_delete_apikey(self, initial_data, client, apikey_data, zone_data,
basic_auth_admin_headers): basic_auth_admin_headers):
res = client.post("/api/v1/pdnsadmin/zones", res = client.post("/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers, headers=basic_auth_admin_headers,
@ -123,7 +122,7 @@ class TestIntegrationApiApiKeyAdminUser(object):
assert res.status_code == 201 assert res.status_code == 201
apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}" apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}"
apikey_url = apikey_url_format.format(data[0]['id']) apikey_url = apikey_url_format.format(data['id'])
res = client.delete(apikey_url, headers=basic_auth_admin_headers) res = client.delete(apikey_url, headers=basic_auth_admin_headers)
assert res.status_code == 204 assert res.status_code == 204

View File

@ -1,14 +1,11 @@
import pytest
import json import json
from collections import namedtuple from collections import namedtuple
from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_user_headers
from tests.fixtures import zone_data
class TestIntegrationApiZoneUser(object): class TestIntegrationApiApiKeyUser(object):
def test_empty_get(self, initial_data, client, basic_auth_user_headers): def test_empty_get(self, initial_data, client, basic_auth_user_headers):
res = client.get("/api/v1/pdnsadmin/zones", res = client.get("/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers) headers=basic_auth_user_headers)

View File

@ -14,8 +14,10 @@ class IntegrationApiManagement(object):
assert res.status_code == status_code assert res.status_code == status_code
if res.status_code == 200: if res.status_code == 200:
data = res.get_json(force=True) data = res.get_json(force=True)
assert len(data) == 1 assert isinstance(data, dict)
return data[0] assert len(data) == 7
assert data.get('id', None)
return data
return None return None
def check_account(self, cmpdata, data=None): def check_account(self, cmpdata, data=None):
@ -37,8 +39,10 @@ class IntegrationApiManagement(object):
assert res.status_code == status_code assert res.status_code == status_code
if status_code == 200: if status_code == 200:
data = res.get_json(force=True) data = res.get_json(force=True)
assert len(data) == 1 assert isinstance(data, dict)
return data[0] assert len(data) == 7
assert data.get('id', None)
return data
return None return None
def check_user(self, cmpdata, data=None): def check_user(self, cmpdata, data=None):
@ -50,5 +54,5 @@ class IntegrationApiManagement(object):
elif key == 'role': elif key == 'role':
assert data[key]['name'] == cmpdata['role_name'] assert data[key]['name'] == cmpdata['role_name']
else: else:
assert key in ("id",) assert key in ("id","accounts",)
return data return data

View File

@ -1,9 +1,5 @@
import json import json
from tests.fixtures import ( # noqa: F401
client, initial_data, basic_auth_admin_headers,
test_admin_user, test_user, account_data, user1_data,
)
from . import IntegrationApiManagement from . import IntegrationApiManagement
@ -89,8 +85,9 @@ class TestIntegrationApiManagementAdminUser(IntegrationApiManagement):
) )
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 200 assert res.status_code == 200
assert len(data) == 1 assert isinstance(data, dict)
data = data[0] assert len(data) == 7
assert data.get('id', None)
account_id = data["id"] account_id = data["id"]
for key, value in account_data.items(): for key, value in account_data.items():
assert data[key] == value assert data[key] == value
@ -142,10 +139,12 @@ class TestIntegrationApiManagementAdminUser(IntegrationApiManagement):
) )
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 201 assert res.status_code == 201
assert len(data) == 1 assert isinstance(data, dict)
assert len(data) == 6
assert data.get('id', None)
# Check user # Check user
user1 = self.check_user(user1_data, data[0]) user1 = self.check_user(user1_data, data)
user1_id = user1["id"] user1_id = user1["id"]
updated = user1_data.copy() updated = user1_data.copy()
@ -240,10 +239,12 @@ class TestIntegrationApiManagementAdminUser(IntegrationApiManagement):
) )
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 201 assert res.status_code == 201
assert len(data) == 1 assert isinstance(data, dict)
assert len(data) == 6
assert data.get('id', None)
# Check user # Check user
user1 = self.check_user(user1_data, data[0]) user1 = self.check_user(user1_data, data)
user1_id = user1["id"] user1_id = user1["id"]
# Assert test account has no users # Assert test account has no users

View File

@ -1,66 +1,61 @@
import json import json
from tests.fixtures import ( # noqa: F401
client, initial_data, basic_auth_admin_headers, basic_auth_user_headers,
test_admin_user, test_user, account_data, user1_data,
)
from . import IntegrationApiManagement from . import IntegrationApiManagement
class TestIntegrationApiManagementUser(IntegrationApiManagement): class TestIntegrationApiManagementUser(IntegrationApiManagement):
def test_accounts_empty_get( def test_accounts_empty_get(self, initial_data, client, # noqa: F811
self, client, initial_data, # noqa: F811 basic_auth_user_headers): # noqa: F811
basic_auth_user_headers): # noqa: F811
res = client.get("/api/v1/pdnsadmin/accounts", res = client.get("/api/v1/pdnsadmin/accounts",
headers=basic_auth_user_headers) headers=basic_auth_user_headers)
assert res.status_code == 401 assert res.status_code == 401
def test_users_empty_get( def test_users_empty_get(self, initial_data, client, # noqa: F811
self, client, initial_data, # noqa: F811 test_admin_user, test_user, # noqa: F811
test_admin_user, test_user, # noqa: F811 basic_auth_user_headers): # noqa: F811
basic_auth_user_headers): # noqa: F811
res = client.get("/api/v1/pdnsadmin/users", res = client.get("/api/v1/pdnsadmin/users",
headers=basic_auth_user_headers) headers=basic_auth_user_headers)
assert res.status_code == 401 assert res.status_code == 401
def test_self_get( def test_self_get(self, initial_data, client, basic_auth_user_headers, test_user): # noqa: F811
self, initial_data, client, test_user, # noqa: F811
basic_auth_user_headers): # noqa: F811
self.user = None
res = client.get("/api/v1/pdnsadmin/users/{}".format(test_user), res = client.get("/api/v1/pdnsadmin/users/{}".format(test_user),
headers=basic_auth_user_headers) headers=basic_auth_user_headers)
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 200 assert res.status_code == 200
assert len(data) == 1, data assert data
self.user = data
def test_accounts( def test_create_account_fail(self, client, initial_data, account_data, # noqa: F811
self, client, initial_data, # noqa: F811 basic_auth_user_headers): # noqa: F811
account_data, # noqa: F811
basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811 # Create account (should fail)
res = client.post("/api/v1/pdnsadmin/accounts",
headers=basic_auth_user_headers,
data=json.dumps(account_data),
content_type="application/json")
assert res.status_code == 401
def test_create_account_as_admin(self, app, initial_data, client, account_data, # noqa: F811
basic_auth_admin_headers): # noqa: F811
self.client = client self.client = client
self.basic_auth_admin_headers = basic_auth_admin_headers self.basic_auth_admin_headers = basic_auth_admin_headers
# Create account (should fail) with app.test_request_context():
res = client.post( # Create account (as admin)
"/api/v1/pdnsadmin/accounts", res = client.post("/api/v1/pdnsadmin/accounts",
headers=basic_auth_user_headers, headers=basic_auth_admin_headers,
data=json.dumps(account_data), data=json.dumps(account_data),
content_type="application/json", content_type="application/json")
) data = res.get_json(force=True)
assert res.status_code == 401 assert res.status_code == 201
# Create account (as admin) def test_update_account_fail(
res = client.post( self, initial_data, client, # noqa: F811
"/api/v1/pdnsadmin/accounts", account_data, # noqa: F811
headers=basic_auth_admin_headers, basic_auth_user_headers,
data=json.dumps(account_data), basic_auth_admin_headers): # noqa: F811
content_type="application/json", self.client = client
) self.basic_auth_admin_headers = basic_auth_admin_headers
data = res.get_json(force=True)
assert res.status_code == 201
# Check account # Check account
data = self.check_account(account_data) data = self.check_account(account_data)
@ -75,6 +70,18 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement):
) )
assert res.status_code == 401 assert res.status_code == 401
def test_delete_account_fail(
self, initial_data, client, # noqa: F811
account_data, # noqa: F811
basic_auth_user_headers,
basic_auth_admin_headers): # noqa: F811
self.client = client
self.basic_auth_admin_headers = basic_auth_admin_headers
# Check account
data = self.check_account(account_data)
account_id = data["id"]
# Delete account (should fail) # Delete account (should fail)
res = client.delete( res = client.delete(
"/api/v1/pdnsadmin/accounts/{}".format(account_id), "/api/v1/pdnsadmin/accounts/{}".format(account_id),
@ -84,6 +91,17 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement):
) )
assert res.status_code == 401 assert res.status_code == 401
def test_delete_account_as_admin(
self, client, initial_data, # noqa: F811
account_data, # noqa: F811
basic_auth_admin_headers): # noqa: F811
self.client = client
self.basic_auth_admin_headers = basic_auth_admin_headers
# Check account
data = self.check_account(account_data)
account_id = data["id"]
# Cleanup (delete account as admin) # Cleanup (delete account as admin)
res = client.delete( res = client.delete(
"/api/v1/pdnsadmin/accounts/{}".format(account_id), "/api/v1/pdnsadmin/accounts/{}".format(account_id),
@ -94,8 +112,8 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement):
assert res.status_code == 204 assert res.status_code == 204
def test_users( def test_users(
self, client, initial_data, # noqa: F811 self, client, initial_data, # noqa: F811
user1_data, # noqa: F811 user1_data, # noqa: F811
basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811 basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811
self.client = client self.client = client
self.basic_auth_admin_headers = basic_auth_admin_headers self.basic_auth_admin_headers = basic_auth_admin_headers
@ -118,10 +136,12 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement):
) )
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 201 assert res.status_code == 201
assert len(data) == 1 assert isinstance(data, dict)
assert len(data) == 6
assert data.get('id', None)
# Check user # Check user
user1 = self.check_user(user1_data, data[0]) user1 = self.check_user(user1_data, data)
user1_id = user1["id"] user1_id = user1["id"]
# Update to defaults (should fail) # Update to defaults (should fail)
@ -152,8 +172,8 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement):
assert res.status_code == 204 assert res.status_code == 204
def test_account_users( def test_account_users(
self, client, initial_data, # noqa: F811 self, client, initial_data, # noqa: F811
account_data, user1_data, # noqa: F811 account_data, user1_data, # noqa: F811
basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811 basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811
self.client = client self.client = client
self.basic_auth_admin_headers = basic_auth_admin_headers self.basic_auth_admin_headers = basic_auth_admin_headers
@ -181,10 +201,12 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement):
) )
data = res.get_json(force=True) data = res.get_json(force=True)
assert res.status_code == 201 assert res.status_code == 201
assert len(data) == 1 assert isinstance(data, dict)
assert len(data) == 6
assert data.get('id', None)
# Check user # Check user
user1 = self.check_user(user1_data, data[0]) user1 = self.check_user(user1_data, data)
user1_id = user1["id"] user1_id = user1["id"]
# Assert test account has no users # Assert test account has no users

View File

@ -1,11 +1,8 @@
import pytest
import json import json
from collections import namedtuple from collections import namedtuple
from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_admin_headers
from tests.fixtures import zone_data
class TestIntegrationApiZoneAdminUser(object): class TestIntegrationApiZoneAdminUser(object):

View File

@ -3,9 +3,6 @@ from collections import namedtuple
from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client
from tests.fixtures import zone_data, initial_apikey_data
from tests.fixtures import admin_apikey_integration
class TestIntegrationApiZoneAdminApiKey(object): class TestIntegrationApiZoneAdminApiKey(object):

View File

@ -3,9 +3,6 @@ from collections import namedtuple
from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client
from tests.fixtures import zone_data, initial_apikey_data
from tests.fixtures import user_apikey_integration
class TestIntegrationApiZoneUserApiKey(object): class TestIntegrationApiZoneUserApiKey(object):

View File

@ -3,8 +3,6 @@ from collections import namedtuple
from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_user_headers
from tests.fixtures import zone_data
class TestIntegrationApiZoneUser(object): class TestIntegrationApiZoneUser(object):

View File

@ -2,9 +2,6 @@ import json
import pytest import pytest
from unittest.mock import patch from unittest.mock import patch
from collections import namedtuple from collections import namedtuple
import sys
import os
sys.path.append(os.getcwd())
import powerdnsadmin import powerdnsadmin
from powerdnsadmin.models.setting import Setting from powerdnsadmin.models.setting import Setting
@ -12,79 +9,108 @@ from powerdnsadmin.models.domain import Domain
from powerdnsadmin.models.api_key import ApiKey from powerdnsadmin.models.api_key import ApiKey
from powerdnsadmin.models.role import Role from powerdnsadmin.models.role import Role
from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema, ApiKeySchema from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, initial_data, created_zone_data from tests.conftest import admin_apikey_data, load_data
from tests.fixtures import user_apikey, admin_apikey, zone_data
from tests.fixtures import admin_apikey_data, load_data
class TestUnitApiZoneAdminApiKey(object): class TestUnitApiZoneAdminApiKey(object):
@pytest.fixture @pytest.fixture
def common_data_mock(self): def common_data_mock(self, app):
self.google_setting_patcher = patch( with app.app_context():
'powerdnsadmin.services.google.Setting', self.google_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.services.google.Setting',
self.github_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.services.github.Setting', self.github_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.services.github.Setting',
self.oidc_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.services.oidc.Setting', self.azure_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.services.azure.Setting',
self.helpers_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.lib.helper.Setting', self.oidc_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.services.oidc.Setting',
self.models_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.models.setting.Setting', self.helpers_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.lib.helper.Setting',
self.domain_model_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.models.domain.Setting', self.models_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.models.setting.Setting',
self.record_model_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.models.record.Setting', self.domain_model_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.models.domain.Setting',
self.server_model_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.models.server.Setting', self.record_model_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.models.record.Setting',
self.mock_apikey_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.decorators.ApiKey', self.server_model_setting_patcher = patch(
spec=powerdnsadmin.models.api_key.ApiKey) 'powerdnsadmin.models.server.Setting',
self.mock_hist_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.routes.api.History', self.mock_apikey_patcher = patch(
spec=powerdnsadmin.models.history.History) 'powerdnsadmin.decorators.ApiKey',
self.mock_setting_patcher = patch( spec=powerdnsadmin.models.api_key.ApiKey)
'powerdnsadmin.routes.api.Setting', self.mock_hist_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.routes.api.History',
spec=powerdnsadmin.models.history.History)
self.mock_setting_patcher = patch(
'powerdnsadmin.routes.api.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_decorators_setting_patcher = patch(
'powerdnsadmin.decorators.Setting',
spec=powerdnsadmin.models.setting.Setting)
data = admin_apikey_data() data = admin_apikey_data()
api_key = ApiKey(desc=data['description'], api_key = ApiKey(desc=data['description'],
role_name=data['role'], role_name=data['role'],
domains=[]) domains=[])
api_key.role = Role(name=data['role']) api_key.role = Role(name=data['role'])
self.mock_google_setting = self.google_setting_patcher.start() self.mock_google_setting = self.google_setting_patcher.start()
self.mock_github_setting = self.github_setting_patcher.start() self.mock_github_setting = self.github_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start() self.mock_azure_setting = self.azure_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start() self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start() self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start( self.mock_models_setting = self.models_setting_patcher.start()
) self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
self.mock_record_model_setting = self.record_model_setting_patcher.start( )
) self.mock_record_model_setting = self.record_model_setting_patcher.start(
self.mock_server_model_setting = self.server_model_setting_patcher.start( )
) self.mock_server_model_setting = self.server_model_setting_patcher.start(
self.mock_apikey = self.mock_apikey_patcher.start() )
self.mock_hist = self.mock_hist_patcher.start() self.mock_apikey = self.mock_apikey_patcher.start()
self.mock_setting = self.mock_setting_patcher.start() self.mock_hist = self.mock_hist_patcher.start()
self.mock_setting = self.mock_setting_patcher.start()
self.mock_decorators_setting = self.mock_decorators_setting_patcher.start()
self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_azure_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.mock_decorators_setting.return_value.get.side_effect = load_data
self.mock_apikey.return_value.is_validate.return_value = api_key
yield
for patcher in [
self.google_setting_patcher,
self.github_setting_patcher,
self.azure_setting_patcher,
self.oidc_setting_patcher,
self.helpers_setting_patcher,
self.models_setting_patcher,
self.domain_model_setting_patcher,
self.record_model_setting_patcher,
self.server_model_setting_patcher,
self.mock_apikey_patcher,
self.mock_hist_patcher,
self.mock_setting_patcher,
self.mock_decorators_setting_patcher,
]:
patcher.stop()
self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.mock_apikey.return_value.is_validate.return_value = api_key
def test_empty_get(self, client, common_data_mock, admin_apikey): def test_empty_get(self, client, common_data_mock, admin_apikey):
with patch('powerdnsadmin.routes.api.Domain') as mock_domain, \ with patch('powerdnsadmin.routes.api.Domain') as mock_domain, \

View File

@ -5,37 +5,33 @@ from collections import namedtuple
import powerdnsadmin import powerdnsadmin
from powerdnsadmin.models.user import User from powerdnsadmin.models.user import User
from powerdnsadmin.models.role import Role
from powerdnsadmin.models.domain import Domain from powerdnsadmin.models.domain import Domain
from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, basic_auth_admin_headers from tests.conftest import load_data
from tests.fixtures import zone_data, created_zone_data, load_data
class TestUnitApiZoneAdminUser(object): class TestUnitApiZoneAdminUser(object):
@pytest.fixture @pytest.fixture
def common_data_mock(self): def common_data_mock(self, app, initial_data):
self.google_setting_patcher = patch( self.google_setting_patcher = patch(
'powerdnsadmin.services.google.Setting', 'powerdnsadmin.services.google.Setting',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.setting.Setting)
self.github_setting_patcher = patch( self.github_setting_patcher = patch(
'powerdnsadmin.services.github.Setting', 'powerdnsadmin.services.github.Setting',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.setting.Setting)
self.azure_setting_patcher = patch(
'powerdnsadmin.services.azure.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.oidc_setting_patcher = patch( self.oidc_setting_patcher = patch(
'powerdnsadmin.services.oidc.Setting', 'powerdnsadmin.services.oidc.Setting',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.setting.Setting)
self.api_setting_patcher = patch(
'powerdnsadmin.routes.api.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.base_route_user_patcher = patch(
'powerdnsadmin.routes.base.User',
spec=powerdnsadmin.models.user.User)
self.helpers_setting_patcher = patch( self.helpers_setting_patcher = patch(
'powerdnsadmin.lib.helper.Setting', 'powerdnsadmin.lib.helper.Setting',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.setting.Setting)
self.models_setting_patcher = patch( self.models_setting_patcher = patch(
'powerdnsadmin.models.Setting', 'powerdnsadmin.models.setting.Setting',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.setting.Setting)
self.domain_model_setting_patcher = patch( self.domain_model_setting_patcher = patch(
'powerdnsadmin.models.domain.Setting', 'powerdnsadmin.models.domain.Setting',
@ -46,50 +42,81 @@ class TestUnitApiZoneAdminUser(object):
self.server_model_setting_patcher = patch( self.server_model_setting_patcher = patch(
'powerdnsadmin.models.server.Setting', 'powerdnsadmin.models.server.Setting',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.setting.Setting)
self.decorators_setting_patcher = patch( self.mock_user_patcher = patch(
'powerdnsadmin.decorators.Setting', 'powerdnsadmin.decorators.User',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.user.User)
self.mock_user_patcher = patch('powerdnsadmin.decorators.User',
spec=powerdnsadmin.models.user.User)
self.mock_hist_patcher = patch( self.mock_hist_patcher = patch(
'powerdnsadmin.routes.api.History', 'powerdnsadmin.routes.api.History',
spec=powerdnsadmin.models.history.History) spec=powerdnsadmin.models.history.History)
self.mock_setting_patcher = patch(
'powerdnsadmin.routes.api.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_decorators_setting_patcher = patch(
'powerdnsadmin.decorators.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.base_route_user_patcher = patch(
'powerdnsadmin.routes.base.User',
spec=powerdnsadmin.models.user.User)
self.mock_google_setting = self.google_setting_patcher.start() with app.app_context():
self.mock_github_setting = self.github_setting_patcher.start() self.mock_google_setting = self.google_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start() self.mock_github_setting = self.github_setting_patcher.start()
self.mock_base_route_user = self.base_route_user_patcher.start() self.mock_azure_setting = self.azure_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start() self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start() self.mock_base_route_user = self.base_route_user_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start( self.mock_helpers_setting = self.helpers_setting_patcher.start()
) self.mock_models_setting = self.models_setting_patcher.start()
self.mock_record_model_setting = self.record_model_setting_patcher.start( self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
) )
self.mock_server_model_setting = self.server_model_setting_patcher.start( self.mock_record_model_setting = self.record_model_setting_patcher.start(
) )
self.decorators_setting = self.decorators_setting_patcher.start() self.mock_server_model_setting = self.server_model_setting_patcher.start(
self.api_setting = self.api_setting_patcher.start() )
self.mock_user = self.mock_user_patcher.start() self.mock_user = self.mock_user_patcher.start()
self.mock_hist = self.mock_hist_patcher.start() self.mock_hist = self.mock_hist_patcher.start()
self.mock_setting = self.mock_setting_patcher.start()
self.mock_decorators_setting = self.mock_decorators_setting_patcher.start()
self.mock_google_setting.return_value.get.side_effect = load_data self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_github_setting.return_value.get.side_effect = load_data self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data self.mock_azure_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data self.mock_record_model_setting.return_value.get.side_effect = load_data
self.decorators_setting.return_value.get.side_effect = load_data self.mock_server_model_setting.return_value.get.side_effect = load_data
self.api_setting.return_value.get.side_effect = load_data self.mock_decorators_setting.return_value.get.side_effect = load_data
self.mock_setting.return_value.get.side_effect = load_data
self.mockk = MagicMock() self.mockk = MagicMock()
self.mockk.role.name = "Administrator" self.mockk.role.name = "Administrator"
self.mock_user.query.filter.return_value.first.return_value = self.mockk
self.mock_user.return_value.is_validate.return_value = True
self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk
self.mock_base_route_user.return_value.is_validate.return_value = True
yield
for patcher in [
self.google_setting_patcher,
self.github_setting_patcher,
self.azure_setting_patcher,
self.oidc_setting_patcher,
self.base_route_user_patcher,
self.helpers_setting_patcher,
self.models_setting_patcher,
self.domain_model_setting_patcher,
self.record_model_setting_patcher,
self.server_model_setting_patcher,
self.mock_user_patcher,
self.mock_hist_patcher,
self.mock_setting_patcher,
self.mock_decorators_setting_patcher,
]:
patcher.stop()
self.mock_user.query.filter.return_value.first.return_value = self.mockk
self.mock_user.return_value.is_validate.return_value = True
self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk
self.mock_base_route_user.return_value.is_validate.return_value = True
def test_empty_get(self, client, common_data_mock, def test_empty_get(self, client, common_data_mock,
basic_auth_admin_headers): basic_auth_admin_headers):
@ -135,8 +162,7 @@ class TestUnitApiZoneAdminUser(object):
headers=basic_auth_admin_headers) headers=basic_auth_admin_headers)
data = res.get_json(force=True) data = res.get_json(force=True)
fake_domain = namedtuple("Domain", fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True) domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain])) json.dumps(domain_schema.dump([fake_domain]))

View File

@ -5,37 +5,33 @@ from collections import namedtuple
import powerdnsadmin import powerdnsadmin
from powerdnsadmin.models.user import User from powerdnsadmin.models.user import User
from powerdnsadmin.models.role import Role
from powerdnsadmin.models.domain import Domain from powerdnsadmin.models.domain import Domain
from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, basic_auth_user_headers from tests.conftest import load_data
from tests.fixtures import zone_data, created_zone_data, load_data
class TestUnitApiZoneUser(object): class TestUnitApiZoneUser(object):
@pytest.fixture @pytest.fixture
def common_data_mock(self): def common_data_mock(self, app, initial_data):
self.google_setting_patcher = patch( self.google_setting_patcher = patch(
'powerdnsadmin.services.google.Setting', 'powerdnsadmin.services.google.Setting',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.setting.Setting)
self.github_setting_patcher = patch( self.github_setting_patcher = patch(
'powerdnsadmin.services.github.Setting', 'powerdnsadmin.services.github.Setting',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.setting.Setting)
self.azure_setting_patcher = patch(
'powerdnsadmin.services.azure.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.oidc_setting_patcher = patch( self.oidc_setting_patcher = patch(
'powerdnsadmin.services.oidc.Setting', 'powerdnsadmin.services.oidc.Setting',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.setting.Setting)
self.api_setting_patcher = patch(
'powerdnsadmin.routes.api.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.base_route_user_patcher = patch(
'powerdnsadmin.routes.base.User',
spec=powerdnsadmin.models.user.User)
self.helpers_setting_patcher = patch( self.helpers_setting_patcher = patch(
'powerdnsadmin.lib.helper.Setting', 'powerdnsadmin.lib.helper.Setting',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.setting.Setting)
self.models_setting_patcher = patch( self.models_setting_patcher = patch(
'powerdnsadmin.models.Setting', 'powerdnsadmin.models.setting.Setting',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.setting.Setting)
self.domain_model_setting_patcher = patch( self.domain_model_setting_patcher = patch(
'powerdnsadmin.models.domain.Setting', 'powerdnsadmin.models.domain.Setting',
@ -46,55 +42,86 @@ class TestUnitApiZoneUser(object):
self.server_model_setting_patcher = patch( self.server_model_setting_patcher = patch(
'powerdnsadmin.models.server.Setting', 'powerdnsadmin.models.server.Setting',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.setting.Setting)
self.decorators_setting_patcher = patch( self.mock_user_patcher = patch(
'powerdnsadmin.decorators.Setting', 'powerdnsadmin.decorators.User',
spec=powerdnsadmin.models.setting.Setting) spec=powerdnsadmin.models.user.User)
self.mock_user_patcher = patch('powerdnsadmin.decorators.User',
spec=powerdnsadmin.models.user.User)
self.mock_hist_patcher = patch( self.mock_hist_patcher = patch(
'powerdnsadmin.routes.api.History', 'powerdnsadmin.routes.api.History',
spec=powerdnsadmin.models.history.History) spec=powerdnsadmin.models.history.History)
self.mock_setting_patcher = patch(
'powerdnsadmin.routes.api.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_decorators_setting_patcher = patch(
'powerdnsadmin.decorators.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.base_route_user_patcher = patch(
'powerdnsadmin.routes.base.User',
spec=powerdnsadmin.models.user.User)
self.mock_google_setting = self.google_setting_patcher.start() with app.app_context():
self.mock_github_setting = self.github_setting_patcher.start() self.mock_google_setting = self.google_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start() self.mock_github_setting = self.github_setting_patcher.start()
self.mock_base_route_user = self.base_route_user_patcher.start() self.mock_azure_setting = self.azure_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start() self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start() self.mock_base_route_user = self.base_route_user_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start( self.mock_helpers_setting = self.helpers_setting_patcher.start()
) self.mock_models_setting = self.models_setting_patcher.start()
self.mock_record_model_setting = self.record_model_setting_patcher.start( self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
) )
self.mock_server_model_setting = self.server_model_setting_patcher.start( self.mock_record_model_setting = self.record_model_setting_patcher.start(
) )
self.decorators_setting = self.decorators_setting_patcher.start() self.mock_server_model_setting = self.server_model_setting_patcher.start(
self.api_setting = self.api_setting_patcher.start() )
self.mock_user = self.mock_user_patcher.start() self.mock_user = self.mock_user_patcher.start()
self.mock_hist = self.mock_hist_patcher.start() self.mock_hist = self.mock_hist_patcher.start()
self.mock_setting = self.mock_setting_patcher.start()
self.mock_decorators_setting = self.mock_decorators_setting_patcher.start()
self.mock_google_setting.return_value.get.side_effect = load_data self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_github_setting.return_value.get.side_effect = load_data self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data self.mock_azure_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data self.mock_record_model_setting.return_value.get.side_effect = load_data
self.decorators_setting.return_value.get.side_effect = load_data self.mock_server_model_setting.return_value.get.side_effect = load_data
self.api_setting.return_value.get.side_effect = load_data self.mock_decorators_setting.return_value.get.side_effect = load_data
self.mock_setting.return_value.get.side_effect = load_data
self.mockk = MagicMock() self.mockk = MagicMock()
self.mockk.role.name = "User" self.mockk.role.name = "User"
self.mock_user.query.filter.return_value.first.return_value = self.mockk
self.mock_user.return_value.is_validate.return_value = True
self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk
self.mock_base_route_user.return_value.is_validate.return_value = True
yield
for patcher in [
self.google_setting_patcher,
self.github_setting_patcher,
self.azure_setting_patcher,
self.oidc_setting_patcher,
self.base_route_user_patcher,
self.helpers_setting_patcher,
self.models_setting_patcher,
self.domain_model_setting_patcher,
self.record_model_setting_patcher,
self.server_model_setting_patcher,
self.mock_user_patcher,
self.mock_hist_patcher,
self.mock_setting_patcher,
self.mock_decorators_setting_patcher,
]:
patcher.stop()
self.mock_user.query.filter.return_value.first.return_value = self.mockk
self.mock_user.return_value.is_validate.return_value = True
self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk
self.mock_base_route_user.return_value.is_validate.return_value = True
def test_create_zone(self, client, common_data_mock, zone_data, def test_create_zone(self, client, common_data_mock, zone_data,
basic_auth_user_headers, created_zone_data): basic_auth_user_headers, created_zone_data):
with patch('powerdnsadmin.lib.helper.requests.request') as mock_post, \ with patch('powerdnsadmin.lib.helper.requests.request') as mock_post, \
patch('powerdnsadmin.routes.api.Domain') as mock_domain: patch('powerdnsadmin.routes.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201 mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data) mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {} mock_post.return_value.headers = {}
@ -115,8 +142,9 @@ class TestUnitApiZoneUser(object):
with patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains: with patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains:
test_domain = Domain(1, name=zone_data['name'].rstrip(".")) test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_user_domains.return_value = [test_domain] mock_user_domains.return_value = [test_domain]
res = client.get("/api/v1/pdnsadmin/zones", res = client.get("/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers) headers=basic_auth_user_headers)
data = res.get_json(force=True) data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values()) fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
@ -127,9 +155,9 @@ class TestUnitApiZoneUser(object):
def test_delete_zone(self, client, common_data_mock, zone_data, def test_delete_zone(self, client, common_data_mock, zone_data,
basic_auth_user_headers): basic_auth_user_headers):
with patch('powerdnsadmin.lib.utils.requests.request') as mock_delete, \ with patch('powerdnsadmin.lib.helper.requests.request') as mock_delete, \
patch('powerdnsadmin.routes.api.Domain') as mock_domain, \ patch('powerdnsadmin.routes.api.Domain') as mock_domain, \
patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains: patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains:
test_domain = Domain(1, name=zone_data['name'].rstrip(".")) test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_domain.return_value.update.return_value = True mock_domain.return_value.update.return_value = True

View File

@ -1,7 +1,6 @@
import json import json
import pytest import pytest
from unittest.mock import patch from unittest.mock import patch
from base64 import b64encode
from collections import namedtuple from collections import namedtuple
import powerdnsadmin import powerdnsadmin
@ -10,78 +9,111 @@ from powerdnsadmin.models.domain import Domain
from powerdnsadmin.models.api_key import ApiKey from powerdnsadmin.models.api_key import ApiKey
from powerdnsadmin.models.role import Role from powerdnsadmin.models.role import Role
from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema, ApiKeySchema from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, initial_data, created_zone_data from tests.conftest import user_apikey_data, load_data
from tests.fixtures import user_apikey, zone_data
from tests.fixtures import user_apikey_data, load_data
class TestUnitApiZoneUserApiKey(object): class TestUnitApiZoneUserApiKey(object):
@pytest.fixture @pytest.fixture
def common_data_mock(self): def common_data_mock(self, app):
self.google_setting_patcher = patch( with app.app_context():
'powerdnsadmin.services.google.Setting', self.google_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.services.google.Setting',
self.github_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.services.github.Setting', self.github_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.services.github.Setting',
self.oidc_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.services.oidc.Setting', self.azure_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.services.azure.Setting',
self.helpers_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.lib.helper.Setting', self.oidc_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.services.oidc.Setting',
self.models_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.models.setting.Setting', self.helpers_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.lib.helper.Setting',
self.domain_model_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.models.domain.Setting', self.models_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.models.setting.Setting',
self.record_model_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.models.record.Setting', self.domain_model_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.models.domain.Setting',
self.server_model_setting_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.models.server.Setting', self.record_model_setting_patcher = patch(
spec=powerdnsadmin.models.setting.Setting) 'powerdnsadmin.models.record.Setting',
self.mock_apikey_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.decorators.ApiKey', self.server_model_setting_patcher = patch(
spec=powerdnsadmin.models.api_key.ApiKey) 'powerdnsadmin.models.server.Setting',
self.mock_hist_patcher = patch( spec=powerdnsadmin.models.setting.Setting)
'powerdnsadmin.routes.api.History', self.mock_apikey_patcher = patch(
spec=powerdnsadmin.models.history.History) 'powerdnsadmin.decorators.ApiKey',
spec=powerdnsadmin.models.api_key.ApiKey)
self.mock_hist_patcher = patch(
'powerdnsadmin.routes.api.History',
spec=powerdnsadmin.models.history.History)
self.mock_setting_patcher = patch(
'powerdnsadmin.routes.api.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_decorators_setting_patcher = patch(
'powerdnsadmin.decorators.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_google_setting = self.google_setting_patcher.start() self.mock_google_setting = self.google_setting_patcher.start()
self.mock_github_setting = self.github_setting_patcher.start() self.mock_github_setting = self.github_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start() self.mock_azure_setting = self.azure_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start() self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start() self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start( self.mock_models_setting = self.models_setting_patcher.start()
) self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
self.mock_record_model_setting = self.record_model_setting_patcher.start( )
) self.mock_record_model_setting = self.record_model_setting_patcher.start(
self.mock_server_model_setting = self.server_model_setting_patcher.start( )
) self.mock_server_model_setting = self.server_model_setting_patcher.start(
self.mock_apikey = self.mock_apikey_patcher.start() )
self.mock_hist = self.mock_hist_patcher.start() self.mock_apikey = self.mock_apikey_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_setting = self.mock_setting_patcher.start()
self.mock_decorators_setting = self.mock_decorators_setting_patcher.start()
self.mock_google_setting.return_value.get.side_effect = load_data self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_github_setting.return_value.get.side_effect = load_data self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data self.mock_azure_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.mock_decorators_setting.return_value.get.side_effect = load_data
data = user_apikey_data() data = user_apikey_data()
domain = Domain(name=data['domains'][0]) domain = Domain(name=data['domains'][0])
api_key = ApiKey(desc=data['description'], api_key = ApiKey(desc=data['description'],
role_name=data['role'], role_name=data['role'],
domains=[domain]) domains=[domain])
api_key.role = Role(name=data['role']) api_key.role = Role(name=data['role'])
self.mock_apikey.return_value.is_validate.return_value = api_key
yield
for patcher in [
self.google_setting_patcher,
self.github_setting_patcher,
self.azure_setting_patcher,
self.oidc_setting_patcher,
self.helpers_setting_patcher,
self.models_setting_patcher,
self.domain_model_setting_patcher,
self.record_model_setting_patcher,
self.server_model_setting_patcher,
self.mock_apikey_patcher,
self.mock_hist_patcher,
self.mock_setting_patcher,
self.mock_decorators_setting_patcher,
]:
patcher.stop()
self.mock_apikey.return_value.is_validate.return_value = api_key
def test_create_zone(self, client, common_data_mock, zone_data, def test_create_zone(self, client, common_data_mock, zone_data,
user_apikey, created_zone_data): user_apikey, created_zone_data):
@ -112,8 +144,7 @@ class TestUnitApiZoneUserApiKey(object):
headers=user_apikey) headers=user_apikey)
data = res.get_json(force=True) data = res.get_json(force=True)
fake_domain = namedtuple("Domain", fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True) domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain])) json.dumps(domain_schema.dump([fake_domain]))