diff --git a/docker-compose-test.yml b/docker-compose-test.yml index 7dcf4a0..77c8dba 100644 --- a/docker-compose-test.yml +++ b/docker-compose-test.yml @@ -1,11 +1,11 @@ -version: "2.1" +version: "3.8" services: powerdns-admin: + image: powerdns-admin-test build: context: . dockerfile: docker-test/Dockerfile - image: powerdns-admin-test container_name: powerdns-admin-test ports: - "9191:80" @@ -17,10 +17,10 @@ services: - pdns-server pdns-server: + image: pdns-server-test build: context: . dockerfile: docker-test/Dockerfile.pdns - image: pdns-server-test ports: - "5053:53" - "5053:53/udp" diff --git a/docker-test/Dockerfile b/docker-test/Dockerfile index 577e120..17f934f 100644 --- a/docker-test/Dockerfile +++ b/docker-test/Dockerfile @@ -1,15 +1,35 @@ -FROM debian:stretch-slim +FROM debian:bullseye-slim LABEL maintainer="k@ndk.name" ENV LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8 LANGUAGE=en_US.UTF-8 RUN apt-get update -y \ - && apt-get install -y --no-install-recommends apt-transport-https locales locales-all python3-pip python3-setuptools python3-dev curl libsasl2-dev libldap2-dev libssl-dev libxml2-dev libxslt1-dev libxmlsec1-dev libffi-dev build-essential libmariadb-dev-compat \ - && curl -sL https://deb.nodesource.com/setup_10.x | bash - \ + && apt-get install -y --no-install-recommends \ + apt-transport-https \ + curl \ + build-essential \ + libffi-dev \ + libldap2-dev \ + libmariadb-dev-compat \ + libsasl2-dev \ + libssl-dev \ + libxml2-dev \ + libxmlsec1-dev \ + libxmlsec1-openssl \ + libxslt1-dev \ + locales \ + locales-all \ + pkg-config \ + python3-dev \ + python3-pip \ + python3-setuptools \ + && curl -sL https://deb.nodesource.com/setup_lts.x | bash - \ && curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \ && echo "deb https://dl.yarnpkg.com/debian/ stable main" > /etc/apt/sources.list.d/yarn.list \ && apt-get update -y \ - && apt-get install -y nodejs yarn \ + && apt-get install -y --no-install-recommends \ + nodejs \ + yarn \ && apt-get clean -y \ && rm -rf /var/lib/apt/lists/* @@ -21,8 +41,6 @@ RUN pip3 install --upgrade pip RUN pip3 install -r requirements.txt COPY . /app -COPY ./docker/entrypoint.sh /usr/local/bin/ -RUN chmod +x /usr/local/bin/entrypoint.sh ENV FLASK_APP=powerdnsadmin/__init__.py RUN yarn install --pure-lockfile --production \ @@ -31,4 +49,4 @@ RUN yarn install --pure-lockfile --production \ COPY ./docker-test/wait-for-pdns.sh /opt RUN chmod u+x /opt/wait-for-pdns.sh -CMD ["/opt/wait-for-pdns.sh", "/usr/local/bin/pytest","--capture=no","-vv"] +CMD ["/opt/wait-for-pdns.sh", "/usr/local/bin/pytest", "-W", "ignore::DeprecationWarning", "--capture=no", "-vv"] diff --git a/docker-test/start.sh b/docker-test/start.sh index 9a66017..efd1c0e 100644 --- a/docker-test/start.sh +++ b/docker-test/start.sh @@ -10,9 +10,9 @@ fi # Import schema structure if [ -e "/data/pdns.sql" ]; then - rm /data/pdns.db + rm -f /data/pdns.db cat /data/pdns.sql | sqlite3 /data/pdns.db - rm /data/pdns.sql + rm -f /data/pdns.sql echo "Imported schema structure" fi diff --git a/migrations/env.py b/migrations/env.py index 4742e14..8b3fb33 100755 --- a/migrations/env.py +++ b/migrations/env.py @@ -1,8 +1,13 @@ from __future__ import with_statement -from alembic import context -from sqlalchemy import engine_from_config, pool -from logging.config import fileConfig + import logging +from logging.config import fileConfig + +from sqlalchemy import engine_from_config +from sqlalchemy import pool +from flask import current_app + +from alembic import context # this is the Alembic Config object, which provides # access to the values within the .ini file in use. @@ -17,9 +22,9 @@ logger = logging.getLogger('alembic.env') # for 'autogenerate' support # from myapp import mymodel # target_metadata = mymodel.Base.metadata -from flask import current_app -config.set_main_option('sqlalchemy.url', - current_app.config.get('SQLALCHEMY_DATABASE_URI').replace("%","%%")) +config.set_main_option( + 'sqlalchemy.url', + str(current_app.extensions['migrate'].db.engine.url).replace('%', '%%')) target_metadata = current_app.extensions['migrate'].db.metadata # other values from the config, defined by the needs of env.py, @@ -41,7 +46,9 @@ def run_migrations_offline(): """ url = config.get_main_option("sqlalchemy.url") - context.configure(url=url) + context.configure( + url=url, target_metadata=target_metadata, literal_binds=True + ) with context.begin_transaction(): context.run_migrations() @@ -65,22 +72,23 @@ def run_migrations_online(): directives[:] = [] logger.info('No changes in schema detected.') - engine = engine_from_config(config.get_section(config.config_ini_section), - prefix='sqlalchemy.', - poolclass=pool.NullPool) + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix='sqlalchemy.', + poolclass=pool.NullPool, + ) - connection = engine.connect() - context.configure(connection=connection, - target_metadata=target_metadata, - process_revision_directives=process_revision_directives, - render_as_batch=config.get_main_option('sqlalchemy.url').startswith('sqlite:'), - **current_app.extensions['migrate'].configure_args) + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=target_metadata, + process_revision_directives=process_revision_directives, + **current_app.extensions['migrate'].configure_args + ) - try: with context.begin_transaction(): context.run_migrations() - finally: - connection.close() + if context.is_offline_mode(): run_migrations_offline() diff --git a/migrations/versions/787bdba9e147_init_db.py b/migrations/versions/787bdba9e147_init_db.py index aa781de..c4c7aa2 100644 --- a/migrations/versions/787bdba9e147_init_db.py +++ b/migrations/versions/787bdba9e147_init_db.py @@ -56,9 +56,9 @@ def seed_data(): op.bulk_insert(template_table, [ - {id: 1, 'name': 'basic_template_1', 'description': 'Basic Template #1'}, - {id: 2, 'name': 'basic_template_2', 'description': 'Basic Template #2'}, - {id: 3, 'name': 'basic_template_3', 'description': 'Basic Template #3'} + {'id': 1, 'name': 'basic_template_1', 'description': 'Basic Template #1'}, + {'id': 2, 'name': 'basic_template_2', 'description': 'Basic Template #2'}, + {'id': 3, 'name': 'basic_template_3', 'description': 'Basic Template #3'} ] ) diff --git a/migrations/versions/b24bf17725d2_add_unique_index_to_settings_table_keys.py b/migrations/versions/b24bf17725d2_add_unique_index_to_settings_table_keys.py new file mode 100644 index 0000000..48cfbe9 --- /dev/null +++ b/migrations/versions/b24bf17725d2_add_unique_index_to_settings_table_keys.py @@ -0,0 +1,24 @@ +"""Add unique index to settings table keys + +Revision ID: b24bf17725d2 +Revises: 0967658d9c0d +Create Date: 2021-12-12 20:29:17.103441 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'b24bf17725d2' +down_revision = '0967658d9c0d' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_index(op.f('ix_setting_name'), 'setting', ['name'], unique=True) + + +def downgrade(): + op.drop_index(op.f('ix_setting_name'), table_name='setting') diff --git a/powerdnsadmin/decorators.py b/powerdnsadmin/decorators.py index 6382141..50e4a3f 100644 --- a/powerdnsadmin/decorators.py +++ b/powerdnsadmin/decorators.py @@ -388,7 +388,7 @@ def apikey_can_configure_dnssec(http_methods=[]): def allowed_record_types(f): @wraps(f) def decorated_function(*args, **kwargs): - if request.method == 'GET': + if request.method in ['GET', 'DELETE', 'PUT']: return f(*args, **kwargs) if g.apikey.role.name in ['Administrator', 'Operator']: diff --git a/powerdnsadmin/models/domain.py b/powerdnsadmin/models/domain.py index ab154c1..500ad74 100644 --- a/powerdnsadmin/models/domain.py +++ b/powerdnsadmin/models/domain.py @@ -2,6 +2,7 @@ import json import re import traceback from flask import current_app +from flask_login import current_user from urllib.parse import urljoin from distutils.util import strtobool @@ -548,11 +549,12 @@ class Domain(db.Model): domain.apikeys[:] = [] # Remove history for domain - domain_history = History.query.filter( - History.domain_id == domain.id - ) - if domain_history: - domain_history.delete() + if not Setting().get('preserve_history'): + domain_history = History.query.filter( + History.domain_id == domain.id + ) + if domain_history: + domain_history.delete() # then remove domain Domain.query.filter(Domain.name == domain_name).delete() @@ -851,6 +853,7 @@ class Domain(db.Model): headers = {'X-API-Key': self.PDNS_API_KEY, 'Content-Type': 'application/json'} + account_name_old = Account().get_name_by_id(domain.account_id) account_name = Account().get_name_by_id(account_id) post_data = {"account": account_name} @@ -874,6 +877,13 @@ class Domain(db.Model): self.update() msg_str = 'Account changed for domain {0} successfully' current_app.logger.info(msg_str.format(domain_name)) + history = History(msg='Update domain {0} associate account {1}'.format(domain.name, 'none' if account_name == '' else account_name), + detail = json.dumps({ + 'assoc_account': 'None' if account_name == '' else account_name, + 'dissoc_account': 'None' if account_name_old == '' else account_name_old + }), + created_by=current_user.username) + history.add() return {'status': 'ok', 'msg': 'account changed successfully'} except Exception as e: diff --git a/powerdnsadmin/models/record.py b/powerdnsadmin/models/record.py index b6c3f54..ae70a9e 100644 --- a/powerdnsadmin/models/record.py +++ b/powerdnsadmin/models/record.py @@ -422,6 +422,25 @@ class Record(object): ] d = Domain() + for r in del_rrsets: + for record in r['records']: + # Format the reverse record name + # It is the reverse of forward record's content. + reverse_host_address = dns.reversename.from_address( + record['content']).to_text() + + # Create the reverse domain name in PDNS + domain_reverse_name = d.get_reverse_domain_name( + reverse_host_address) + d.create_reverse_domain(domain_name, + domain_reverse_name) + + # Delete the reverse zone + self.name = reverse_host_address + self.type = 'PTR' + self.data = record['content'] + self.delete(domain_reverse_name) + for r in new_rrsets: for record in r['records']: # Format the reverse record name @@ -455,25 +474,6 @@ class Record(object): # Format the rrset rrset = {"rrsets": rrset_data} self.add(domain_reverse_name, rrset) - - for r in del_rrsets: - for record in r['records']: - # Format the reverse record name - # It is the reverse of forward record's content. - reverse_host_address = dns.reversename.from_address( - record['content']).to_text() - - # Create the reverse domain name in PDNS - domain_reverse_name = d.get_reverse_domain_name( - reverse_host_address) - d.create_reverse_domain(domain_name, - domain_reverse_name) - - # Delete the reverse zone - self.name = reverse_host_address - self.type = 'PTR' - self.data = record['content'] - self.delete(domain_reverse_name) return { 'status': 'ok', 'msg': 'Auto-PTR record was updated successfully' diff --git a/powerdnsadmin/models/role.py b/powerdnsadmin/models/role.py index a5cf530..5440f3d 100644 --- a/powerdnsadmin/models/role.py +++ b/powerdnsadmin/models/role.py @@ -5,7 +5,7 @@ class Role(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), index=True, unique=True) description = db.Column(db.String(128)) - users = db.relationship('User', backref='role', lazy=True) + users = db.relationship('User', back_populates='role', lazy=True) apikeys = db.relationship('ApiKey', back_populates='role', lazy=True) def __init__(self, id=None, name=None, description=None): @@ -20,4 +20,4 @@ class Role(db.Model): self.description = description def __repr__(self): - return ''.format(self.name) + return ''.format(self.name) diff --git a/powerdnsadmin/models/setting.py b/powerdnsadmin/models/setting.py index a4016bf..71fec98 100644 --- a/powerdnsadmin/models/setting.py +++ b/powerdnsadmin/models/setting.py @@ -11,7 +11,7 @@ from .base import db class Setting(db.Model): id = db.Column(db.Integer, primary_key=True) - name = db.Column(db.String(64)) + name = db.Column(db.String(64), unique=True, index=True) value = db.Column(db.Text()) defaults = { @@ -31,6 +31,7 @@ class Setting(db.Model): 'delete_sso_accounts': False, 'bg_domain_updates': False, 'enable_api_rr_history': True, + 'preserve_history': False, 'site_name': 'PowerDNS-Admin', 'site_url': 'http://localhost:9191', 'session_timeout': 10, diff --git a/powerdnsadmin/models/user.py b/powerdnsadmin/models/user.py index 1e39569..78104f6 100644 --- a/powerdnsadmin/models/user.py +++ b/powerdnsadmin/models/user.py @@ -34,6 +34,7 @@ class User(db.Model): otp_secret = db.Column(db.String(16)) confirmed = db.Column(db.SmallInteger, nullable=False, default=0) role_id = db.Column(db.Integer, db.ForeignKey('role.id')) + role = db.relationship('Role', back_populates="users", lazy=True) accounts = None def __init__(self, diff --git a/powerdnsadmin/routes/admin.py b/powerdnsadmin/routes/admin.py index 5b8af61..98f4ef9 100644 --- a/powerdnsadmin/routes/admin.py +++ b/powerdnsadmin/routes/admin.py @@ -610,14 +610,21 @@ def manage_user(): @operator_role_required def edit_account(account_name=None): users = User.query.all() + account = Account.query.filter( + Account.name == account_name).first() + all_accounts = Account.query.all() + accounts = {acc.id: acc for acc in all_accounts} + domains = Domain.query.all() if request.method == 'GET': - if account_name is None: + if account_name is None or not account: return render_template('admin_edit_account.html', + account=None, account_user_ids=[], users=users, + domains=domains, + accounts=accounts, create=1) - else: account = Account.query.filter( Account.name == account_name).first() @@ -626,11 +633,14 @@ def edit_account(account_name=None): account=account, account_user_ids=account_user_ids, users=users, + domains=domains, + accounts=accounts, create=0) if request.method == 'POST': fdata = request.form new_user_list = request.form.getlist('account_multi_user') + new_domain_list = request.form.getlist('account_domains') # on POST, synthesize account and account_user_ids from form data if not account_name: @@ -654,6 +664,8 @@ def edit_account(account_name=None): account=account, account_user_ids=account_user_ids, users=users, + domains=domains, + accounts=accounts, create=create, invalid_accountname=True) @@ -662,19 +674,33 @@ def edit_account(account_name=None): account=account, account_user_ids=account_user_ids, users=users, + domains=domains, + accounts=accounts, create=create, duplicate_accountname=True) result = account.create_account() - history = History(msg='Create account {0}'.format(account.name), - created_by=current_user.username) - else: result = account.update_account() - history = History(msg='Update account {0}'.format(account.name), - created_by=current_user.username) if result['status']: + account = Account.query.filter( + Account.name == account_name).first() + old_domains = Domain.query.filter(Domain.account_id == account.id).all() + + for domain_name in new_domain_list: + domain = Domain.query.filter( + Domain.name == domain_name).first() + if account.id != domain.account_id: + Domain(name=domain_name).assoc_account(account.id) + + for domain in old_domains: + if domain.name not in new_domain_list: + Domain(name=domain.name).assoc_account(None) + + history = History(msg='{0} account {1}'.format('Create' if create else 'Update', account.name), + created_by=current_user.username) + account.grant_privileges(new_user_list) history.add() return redirect(url_for('admin.manage_account')) @@ -891,6 +917,16 @@ class DetailedHistory(): ''', history_status=DetailedHistory.get_key_val(detail_dict, 'status'), history_msg=DetailedHistory.get_key_val(detail_dict, 'msg')) + + elif 'Update domain' in history.msg and 'associate account' in history.msg: # When an account gets associated or dissociate with domains + self.detailed_msg = render_template_string(''' + + + +
Associate: {{ history_assoc_account }}
Dissociate:{{ history_dissoc_account }}
+ ''', + history_assoc_account=DetailedHistory.get_key_val(detail_dict, 'assoc_account'), + history_dissoc_account=DetailedHistory.get_key_val(detail_dict, 'dissoc_account')) # check for lower key as well for old databases @staticmethod @@ -928,6 +964,13 @@ def history(): 'msg': 'You do not have permission to remove history.' }), 401) + if Setting().get('preserve_history'): + return make_response( + jsonify({ + 'status': 'error', + 'msg': 'History removal is not allowed (toggle preserve_history in settings).' + }), 401) + h = History() result = h.remove_all() if result: @@ -1283,6 +1326,7 @@ def setting_basic(): 'otp_field_enabled', 'otp_force', 'pdns_api_timeout', + 'preserve_history', 'pretty_ipv6_ptr', 'record_helper', 'record_quick_edit', diff --git a/powerdnsadmin/routes/api.py b/powerdnsadmin/routes/api.py index 3df31e1..4a18c1d 100644 --- a/powerdnsadmin/routes/api.py +++ b/powerdnsadmin/routes/api.py @@ -1,22 +1,21 @@ import json -from urllib.parse import urljoin +import secrets +import string from base64 import b64encode -from flask import ( - Blueprint, g, request, abort, current_app, make_response, jsonify, -) +from urllib.parse import urljoin + +from flask import (Blueprint, g, request, abort, current_app, make_response, jsonify) from flask_login import current_user from .base import csrf -from ..models.base import db -from ..models import ( - User, Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey, - Role, +from ..decorators import ( + api_basic_auth, api_can_create_domain, is_json, apikey_auth, + apikey_can_create_domain, apikey_can_remove_domain, + apikey_is_admin, apikey_can_access_domain, apikey_can_configure_dnssec, + api_role_can, apikey_or_basic_auth, + callback_if_request_body_contains_key, allowed_record_types, allowed_record_ttl ) from ..lib import utils, helper -from ..lib.schema import ( - ApiKeySchema, DomainSchema, ApiPlainKeySchema, UserSchema, AccountSchema, - UserDetailedSchema, -) from ..lib.errors import ( StructuredException, DomainNotExists, DomainAlreadyExists, DomainAccessForbidden, @@ -26,15 +25,15 @@ from ..lib.errors import ( UserCreateFail, UserCreateDuplicate, UserUpdateFail, UserDeleteFail, UserUpdateFailEmail, InvalidAccountNameException ) -from ..decorators import ( - api_basic_auth, api_can_create_domain, is_json, apikey_auth, - apikey_can_create_domain, apikey_can_remove_domain, - apikey_is_admin, apikey_can_access_domain, apikey_can_configure_dnssec, - api_role_can, apikey_or_basic_auth, - callback_if_request_body_contains_key, allowed_record_types, allowed_record_ttl +from ..lib.schema import ( + ApiKeySchema, DomainSchema, ApiPlainKeySchema, UserSchema, AccountSchema, + UserDetailedSchema, ) -import secrets -import string +from ..models import ( + User, Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey, + Role, +) +from ..models.base import db api_bp = Blueprint('api', __name__, url_prefix='/api/v1') apilist_bp = Blueprint('apilist', __name__, url_prefix='/') @@ -56,10 +55,10 @@ def get_user_domains(): .outerjoin(Account, Domain.account_id == Account.id) \ .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ .filter( - db.or_( - DomainUser.user_id == current_user.id, - AccountUser.user_id == current_user.id - )).all() + db.or_( + DomainUser.user_id == current_user.id, + AccountUser.user_id == current_user.id + )).all() return domains @@ -71,10 +70,10 @@ def get_user_apikeys(domain_name=None): .outerjoin(Account, Domain.account_id == Account.id) \ .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ .filter( - db.or_( - DomainUser.user_id == User.id, - AccountUser.user_id == User.id - ) + db.or_( + DomainUser.user_id == User.id, + AccountUser.user_id == User.id + ) ) \ .filter(User.id == current_user.id) @@ -167,12 +166,7 @@ def handle_request_is_not_json(err): def before_request(): # Check site is in maintenance mode maintenance = Setting().get('maintenance') - if ( - maintenance and current_user.is_authenticated and - current_user.role.name not in [ - 'Administrator', 'Operator' - ] - ): + if (maintenance and current_user.is_authenticated and current_user.role.name not in ['Administrator', 'Operator']): return make_response( jsonify({ "status": False, @@ -224,14 +218,13 @@ def api_login_create_zone(): history = History(msg='Add domain {0}'.format( data['name'].rstrip('.')), - detail=json.dumps(data), - created_by=current_user.username, - domain_id=domain_id) + detail=json.dumps(data), + created_by=current_user.username, + domain_id=domain_id) history.add() if current_user.role.name not in ['Administrator', 'Operator']: - current_app.logger.debug( - "User is ordinary user, assigning created domain") + current_app.logger.debug("User is ordinary user, assigning created domain") domain = Domain(name=data['name'].rstrip('.')) domain.update() domain.grant_privileges([current_user.id]) @@ -299,9 +292,9 @@ def api_login_delete_zone(domain_name): history = History(msg='Delete domain {0}'.format( utils.pretty_domain_name(domain_name)), - detail='', - created_by=current_user.username, - domain_id=domain_id) + detail='', + created_by=current_user.username, + domain_id=domain_id) history.add() except Exception as e: @@ -326,14 +319,14 @@ def api_generate_apikey(): if 'domains' not in data: domains = [] - elif not isinstance(data['domains'], (list, )): + elif not isinstance(data['domains'], (list,)): abort(400) else: domains = [d['name'] if isinstance(d, dict) else d for d in data['domains']] if 'accounts' not in data: accounts = [] - elif not isinstance(data['accounts'], (list, )): + elif not isinstance(data['accounts'], (list,)): abort(400) else: accounts = [a['name'] if isinstance(a, dict) else a for a in data['accounts']] @@ -385,8 +378,7 @@ def api_generate_apikey(): user_domain_list = [item.name for item in user_domain_obj_list] current_app.logger.debug("Input domain list: {0}".format(domain_list)) - current_app.logger.debug( - "User domain list: {0}".format(user_domain_list)) + current_app.logger.debug("User domain list: {0}".format(user_domain_list)) inter = set(domain_list).intersection(set(user_domain_list)) @@ -539,14 +531,14 @@ def api_update_apikey(apikey_id): if 'domains' not in data: domains = None - elif not isinstance(data['domains'], (list, )): + elif not isinstance(data['domains'], (list,)): abort(400) else: domains = [d['name'] if isinstance(d, dict) else d for d in data['domains']] if 'accounts' not in data: accounts = None - elif not isinstance(data['accounts'], (list, )): + elif not isinstance(data['accounts'], (list,)): abort(400) else: accounts = [a['name'] if isinstance(a, dict) else a for a in data['accounts']] @@ -963,9 +955,7 @@ def api_delete_account(account_id): account = account_list[0] else: abort(404) - current_app.logger.debug( - f'Deleting Account {account.name}' - ) + current_app.logger.debug(f'Deleting Account {account.name}') # Remove account association from domains first if len(account.domains) > 0: @@ -1047,7 +1037,7 @@ def api_remove_account_user(account_id, user_id): user_list = User.query.join(AccountUser).filter( AccountUser.account_id == account_id, AccountUser.user_id == user_id, - ).all() + ).all() if not user_list: abort(404) if not account.remove_user(user): @@ -1123,9 +1113,9 @@ def api_zone_forward(server_id, zone_id): history = History(msg='{0} zone {1} record of {2}'.format( rrset_data['changetype'].lower(), rrset_data['type'], rrset_data['name'].rstrip('.')), - detail=json.dumps(data), - created_by=g.apikey.description, - domain_id=Domain().get_id_by_name(zone_id.rstrip('.'))) + detail=json.dumps(data), + created_by=g.apikey.description, + domain_id=Domain().get_id_by_name(zone_id.rstrip('.'))) history.add() elif request.method == 'DELETE': history = History(msg='Deleted zone {0}'.format(zone_id.rstrip('.')), @@ -1192,17 +1182,13 @@ def api_get_zones(server_id): return jsonify(domain_schema.dump(domain_obj_list)), 200 else: resp = helper.forward_request() - if ( - g.apikey.role.name not in ['Administrator', 'Operator'] - and resp.status_code == 200 - ): + if (g.apikey.role.name not in ['Administrator', 'Operator'] and resp.status_code == 200): domain_list = [d['name'] for d in domain_schema.dump(g.apikey.domains)] accounts_domains = [d.name for a in g.apikey.accounts for d in a.domains] allowed_domains = set(domain_list + accounts_domains) - current_app.logger.debug("Account domains: {}".format( - '/'.join(accounts_domains))) + current_app.logger.debug("Account domains: {}".format('/'.join(accounts_domains))) content = json.dumps([i for i in json.loads(resp.content) if i['name'].rstrip('.') in allowed_domains]) return content, resp.status_code, resp.headers.items() @@ -1223,6 +1209,7 @@ def api_server_config_forward(server_id): resp = helper.forward_request() return resp.content, resp.status_code, resp.headers.items() + # The endpoint to synchronize Domains in background @api_bp.route('/sync_domains', methods=['GET']) @apikey_or_basic_auth @@ -1231,6 +1218,7 @@ def sync_domains(): domain.update() return 'Finished synchronization in background', 200 + @api_bp.route('/health', methods=['GET']) @apikey_auth def health(): @@ -1244,7 +1232,8 @@ def health(): try: domain.get_domain_info(domain_to_query.name) except Exception as e: - current_app.logger.error("Health Check - Failed to query authoritative server for domain {}".format(domain_to_query.name)) + current_app.logger.error( + "Health Check - Failed to query authoritative server for domain {}".format(domain_to_query.name)) return make_response("Down", 503) return make_response("Up", 200) diff --git a/powerdnsadmin/routes/domain.py b/powerdnsadmin/routes/domain.py index 87d0be4..9742760 100644 --- a/powerdnsadmin/routes/domain.py +++ b/powerdnsadmin/routes/domain.py @@ -89,14 +89,14 @@ def domain(domain_name): # - Find a way to make it consistent, or # - Only allow one comment for that case if StrictVersion(Setting().get('pdns_version')) >= StrictVersion('4.0.0'): + pretty_v6 = Setting().get('pretty_ipv6_ptr') for r in rrsets: if r['type'] in records_allow_to_edit: r_name = r['name'].rstrip('.') # If it is reverse zone and pretty_ipv6_ptr setting # is enabled, we reformat the name for ipv6 records. - if Setting().get('pretty_ipv6_ptr') and r[ - 'type'] == 'PTR' and 'ip6.arpa' in r_name and '*' not in r_name: + if pretty_v6 and r['type'] == 'PTR' and 'ip6.arpa' in r_name and '*' not in r_name: r_name = dns.reversename.to_address( dns.name.from_text(r_name)) diff --git a/powerdnsadmin/routes/index.py b/powerdnsadmin/routes/index.py index 2c2dfbd..14ad275 100644 --- a/powerdnsadmin/routes/index.py +++ b/powerdnsadmin/routes/index.py @@ -581,7 +581,7 @@ def get_azure_groups(uri): def authenticate_user(user, authenticator, remember=False): login_user(user, remember=remember) signin_history(user.username, authenticator, True) - if Setting().get('otp_force') and Setting().get('otp_field_enabled') and not user.otp_secret: + if Setting().get('otp_force') and Setting().get('otp_field_enabled') and not user.otp_secret and session['authentication_type'] not in ['OAuth']: user.update_profile(enable_otp=True) user_id = current_user.id prepare_welcome_user(user_id) diff --git a/requirements.txt b/requirements.txt index b540289..ca5af41 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,14 @@ -Flask==2.1.3 +Flask==2.2.2 Flask-Assets==2.0 Flask-Login==0.6.2 -Flask-SQLAlchemy==2.5.1 -Flask-Migrate==2.5.3 -SQLAlchemy==1.3.24 -mysqlclient==2.0.1 +Flask-SQLAlchemy==3.0.2 +Flask-Migrate==4.0.0 +Flask-Mail==0.9.1 +Flask-SSLify==0.1.5 +Flask-Session==0.4.0 +Flask-SeaSurf==1.1.1 +SQLAlchemy==1.4.45 +mysqlclient==2.1.1 configobj==5.0.8 bcrypt==4.0.1 requests==2.28.2 @@ -13,22 +17,30 @@ pyotp==2.8.0 qrcode==7.3.1 dnspython>=2.3.0 gunicorn==20.1.0 -python3-saml +python3-saml==1.14.0 pytz==2022.7.1 cssmin==0.2.0 rjsmin==1.2.1 Authlib==1.2.0 -Flask-SeaSurf==1.1.1 bravado-core==5.17.1 lima==0.5 pytest==7.2.1 +jsonschema[format]>=2.5.1,<4.0.0 # until https://github.com/Yelp/bravado-core/pull/385 pytimeparse==1.1.8 -PyYAML==5.4 -Flask-SSLify==0.1.5 -Flask-Mail==0.9.1 -flask-session==0.4.0 +alembic==1.9.0 +certifi==2022.12.7 +cffi==1.15.1 +passlib==1.7.4 +pyasn1==0.4.8 +webcolors==1.12 +zipp==3.11.0 +pyOpenSSL==22.1.0 +Authlib==1.2.0 +Flask-SeaSurf==1.1.1 +PyYAML==6.0 Jinja2==3.1.2 itsdangerous==2.1.2 werkzeug==2.1.2 cryptography==36.0.2 -flask_session_captcha==1.3.0 \ No newline at end of file +flask_session_captcha==1.3.0 +lxml==4.6.5 \ No newline at end of file diff --git a/tests/fixtures.py b/tests/conftest.py similarity index 50% rename from tests/fixtures.py rename to tests/conftest.py index 39b6a70..f5de9e8 100644 --- a/tests/fixtures.py +++ b/tests/conftest.py @@ -1,26 +1,28 @@ import os -import pytest -import flask_migrate from base64 import b64encode -from powerdnsadmin import create_app -from powerdnsadmin.models.base import db -from powerdnsadmin.models.user import User -from powerdnsadmin.models.setting import Setting -from powerdnsadmin.models.api_key import ApiKey +import pytest +from flask_migrate import upgrade as flask_migrate_upgrade -app = create_app('../configs/test.py') -ctx = app.app_context() -ctx.push() +from powerdnsadmin import create_app +from powerdnsadmin.models.api_key import ApiKey +from powerdnsadmin.models.base import db +from powerdnsadmin.models.setting import Setting +from powerdnsadmin.models.user import User + + +@pytest.fixture(scope="session") +def app(): + app = create_app('../configs/test.py') + yield app @pytest.fixture -def client(): +def client(app): app.config['TESTING'] = True client = app.test_client() yield client - def load_data(setting_name, *args, **kwargs): if setting_name == 'maintenance': return 0 @@ -36,20 +38,22 @@ def load_data(setting_name, *args, **kwargs): return 10 if setting_name == 'allow_user_create_domain': return True + if setting_name == 'allow_user_remove_domain': + return True @pytest.fixture -def test_admin_user(): +def test_admin_user(app): return app.config.get('TEST_ADMIN_USER') @pytest.fixture -def test_user(): +def test_user(app): return app.config.get('TEST_USER') @pytest.fixture -def basic_auth_admin_headers(): +def basic_auth_admin_headers(app): test_admin_user = app.config.get('TEST_ADMIN_USER') test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD') user_pass = "{0}:{1}".format(test_admin_user, test_admin_pass) @@ -61,7 +65,7 @@ def basic_auth_admin_headers(): @pytest.fixture -def basic_auth_user_headers(): +def basic_auth_user_headers(app): test_user = app.config.get('TEST_USER') test_user_pass = app.config.get('TEST_USER_PASSWORD') user_pass = "{0}:{1}".format(test_user, test_user_pass) @@ -73,7 +77,8 @@ def basic_auth_user_headers(): @pytest.fixture(scope="module") -def initial_data(): +def initial_data(app): + pdns_proto = os.environ['PDNS_PROTO'] pdns_host = os.environ['PDNS_HOST'] pdns_port = os.environ['PDNS_PORT'] @@ -83,46 +88,44 @@ def initial_data(): api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY']) allow_create_domain_setting = Setting('allow_user_create_domain', True) - try: - flask_migrate.upgrade() + with app.app_context(): + try: + flask_migrate_upgrade(directory="migrations") + db.session.add(api_url_setting) + db.session.add(api_key_setting) + db.session.add(allow_create_domain_setting) - db.session.add(api_url_setting) - db.session.add(api_key_setting) - db.session.add(allow_create_domain_setting) + test_user = app.config.get('TEST_USER') + test_user_pass = app.config.get('TEST_USER_PASSWORD') + test_admin_user = app.config.get('TEST_ADMIN_USER') + test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD') - test_user = app.config.get('TEST_USER') - test_user_pass = app.config.get('TEST_USER_PASSWORD') - test_admin_user = app.config.get('TEST_ADMIN_USER') - test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD') + admin_user = User(username=test_admin_user, + plain_text_password=test_admin_pass, + email="admin@admin.com") + ret = admin_user.create_local_user() - admin_user = User(username=test_admin_user, - plain_text_password=test_admin_pass, - email="admin@admin.com") - msg = admin_user.create_local_user() + if not ret['status']: + raise Exception("Error occurred creating user {0}".format(ret['msg'])) - if not msg: - raise Exception("Error occurred creating user {0}".format(msg)) + ordinary_user = User(username=test_user, + plain_text_password=test_user_pass, + email="test@test.com") + ret = ordinary_user.create_local_user() - ordinary_user = User(username=test_user, - plain_text_password=test_user_pass, - email="test@test.com") - msg = ordinary_user.create_local_user() + if not ret['status']: + raise Exception("Error occurred creating user {0}".format(ret['msg'])) - if not msg: - raise Exception("Error occurred creating user {0}".format(msg)) - - except Exception as e: - print("Unexpected ERROR: {0}".format(e)) - raise e + except Exception as e: + print("Unexpected ERROR: {0}".format(e)) + raise e yield - - db.session.close() os.unlink(app.config['TEST_DB_LOCATION']) @pytest.fixture(scope="module") -def initial_apikey_data(): +def initial_apikey_data(app): pdns_proto = os.environ['PDNS_PROTO'] pdns_host = os.environ['PDNS_HOST'] pdns_port = os.environ['PDNS_PORT'] @@ -131,42 +134,42 @@ def initial_apikey_data(): api_url_setting = Setting('pdns_api_url', pdns_api_url) api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY']) allow_create_domain_setting = Setting('allow_user_create_domain', True) + allow_remove_domain_setting = Setting('allow_user_remove_domain', True) - try: - flask_migrate.upgrade() + with app.app_context(): + try: + flask_migrate_upgrade(directory="migrations") + db.session.add(api_url_setting) + db.session.add(api_key_setting) + db.session.add(allow_create_domain_setting) + db.session.add(allow_remove_domain_setting) - db.session.add(api_url_setting) - db.session.add(api_key_setting) - db.session.add(allow_create_domain_setting) + test_user_apikey = app.config.get('TEST_USER_APIKEY') + test_admin_apikey = app.config.get('TEST_ADMIN_APIKEY') - test_user_apikey = app.config.get('TEST_USER_APIKEY') - test_admin_apikey = app.config.get('TEST_ADMIN_APIKEY') + dummy_apikey = ApiKey(desc="dummy", role_name="Administrator") - dummy_apikey = ApiKey(desc="dummy", role_name="Administrator") + admin_key = dummy_apikey.get_hashed_password( + plain_text_password=test_admin_apikey).decode('utf-8') - admin_key = dummy_apikey.get_hashed_password( - plain_text_password=test_admin_apikey).decode('utf-8') + admin_apikey = ApiKey(key=admin_key, + desc="test admin apikey", + role_name="Administrator") + admin_apikey.create() - admin_apikey = ApiKey(key=admin_key, - desc="test admin apikey", - role_name="Administrator") - admin_apikey.create() + user_key = dummy_apikey.get_hashed_password( + plain_text_password=test_user_apikey).decode('utf-8') - user_key = dummy_apikey.get_hashed_password( - plain_text_password=test_user_apikey).decode('utf-8') + user_apikey = ApiKey(key=user_key, + desc="test user apikey", + role_name="User") + user_apikey.create() - user_apikey = ApiKey(key=user_key, - desc="test user apikey", - role_name="User") - user_apikey.create() - - except Exception as e: - print("Unexpected ERROR: {0}".format(e)) - raise e + except Exception as e: + print("Unexpected ERROR: {0}".format(e)) + raise e yield - - db.session.close() os.unlink(app.config['TEST_DB_LOCATION']) @@ -183,61 +186,51 @@ def zone_data(): @pytest.fixture def created_zone_data(): data = { - 'url': - '/api/v1/servers/localhost/zones/example.org.', - 'soa_edit_api': - 'DEFAULT', - 'last_check': - 0, + 'url': '/api/v1/servers/localhost/zones/example.org.', + 'soa_edit_api': 'DEFAULT', + 'last_check': 0, 'masters': [], - 'dnssec': - False, - 'notified_serial': - 0, - 'nsec3narrow': - False, - 'serial': - 2019013101, - 'nsec3param': - '', - 'soa_edit': - '', - 'api_rectify': - False, - 'kind': - 'Native', + 'dnssec': False, + 'notified_serial': 0, + 'nsec3narrow': False, + 'serial': 2019013101, + 'nsec3param': '', + 'soa_edit': '', + 'api_rectify': False, + 'kind': 'Native', 'rrsets': [{ 'comments': [], - 'type': - 'SOA', - 'name': - 'example.org.', - 'ttl': - 3600, + 'type': 'SOA', + 'name': 'example.org.', + 'ttl': 3600, 'records': [{ - 'content': - 'a.misconfigured.powerdns.server. hostmaster.example.org. 2019013101 10800 3600 604800 3600', + 'content': 'a.misconfigured.powerdns.server. hostmaster.example.org. 2019013101 10800 3600 604800 3600', 'disabled': False }] }, { 'comments': [], - 'type': - 'NS', - 'name': - 'example.org.', - 'ttl': - 3600, + 'type': 'NS', + 'name': 'example.org.', + 'ttl': 3600, 'records': [{ 'content': 'ns1.example.org.', 'disabled': False }] }], - 'name': - 'example.org.', - 'account': - '', - 'id': - 'example.org.' + 'name': 'example.org.', + 'account': '', + 'id': 'example.org.' + } + return data + + +def user_data(app): + test_user = app.config.get('TEST_USER') + test_user_pass = app.config.get('TEST_USER_PASSWORD') + data = { + "username": test_user, + "plain_text_password": test_user_pass, + "email": "test@test.com" } return data @@ -257,37 +250,39 @@ def admin_apikey_data(): @pytest.fixture(scope='module') -def user_apikey_integration(): +def user_apikey_integration(app): test_user_apikey = app.config.get('TEST_USER_APIKEY') headers = create_apikey_headers(test_user_apikey) return headers @pytest.fixture(scope='module') -def admin_apikey_integration(): +def admin_apikey_integration(app): test_user_apikey = app.config.get('TEST_ADMIN_APIKEY') headers = create_apikey_headers(test_user_apikey) return headers @pytest.fixture(scope='module') -def user_apikey(): - data = user_apikey_data() - api_key = ApiKey(desc=data['description'], - role_name=data['role'], - domains=[]) - headers = create_apikey_headers(api_key.plain_key) - return headers +def user_apikey(app): + with app.app_context(): + data = user_apikey_data() + api_key = ApiKey(desc=data['description'], + role_name=data['role'], + domains=[]) + headers = create_apikey_headers(api_key.plain_key) + return headers @pytest.fixture(scope='module') -def admin_apikey(): - data = admin_apikey_data() - api_key = ApiKey(desc=data['description'], - role_name=data['role'], - domains=[]) - headers = create_apikey_headers(api_key.plain_key) - return headers +def admin_apikey(app): + with app.app_context(): + data = admin_apikey_data() + api_key = ApiKey(desc=data['description'], + role_name=data['role'], + domains=[]) + headers = create_apikey_headers(api_key.plain_key) + return headers def create_apikey_headers(passw): diff --git a/tests/integration/api/apikey/test_admin_user.py b/tests/integration/api/apikey/test_admin_user.py index d8d8f29..3bc0c82 100644 --- a/tests/integration/api/apikey/test_admin_user.py +++ b/tests/integration/api/apikey/test_admin_user.py @@ -4,12 +4,11 @@ from collections import namedtuple from powerdnsadmin.lib.validators import validate_apikey from powerdnsadmin.lib.schema import ApiKeySchema -from tests.fixtures import client, initial_data, basic_auth_admin_headers -from tests.fixtures import user_apikey_data, admin_apikey_data, zone_data +from tests.conftest import user_apikey_data, admin_apikey_data class TestIntegrationApiApiKeyAdminUser(object): - def test_empty_get(self, client, initial_data, basic_auth_admin_headers): + def test_empty_get(self, initial_data, client, basic_auth_admin_headers): res = client.get("/api/v1/pdnsadmin/apikeys", headers=basic_auth_admin_headers) data = res.get_json(force=True) @@ -19,7 +18,7 @@ class TestIntegrationApiApiKeyAdminUser(object): @pytest.mark.parametrize( "apikey_data", [user_apikey_data(), admin_apikey_data()]) - def test_create_apikey(self, client, initial_data, apikey_data, zone_data, + def test_create_apikey(self, initial_data, client, apikey_data, zone_data, basic_auth_admin_headers): res = client.post("/api/v1/pdnsadmin/zones", headers=basic_auth_admin_headers, @@ -39,7 +38,7 @@ class TestIntegrationApiApiKeyAdminUser(object): assert res.status_code == 201 apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}" - apikey_url = apikey_url_format.format(data[0]['id']) + apikey_url = apikey_url_format.format(data['id']) res = client.delete(apikey_url, headers=basic_auth_admin_headers) @@ -54,7 +53,7 @@ class TestIntegrationApiApiKeyAdminUser(object): @pytest.mark.parametrize( "apikey_data", [user_apikey_data(), admin_apikey_data()]) - def test_get_multiple_apikey(self, client, initial_data, apikey_data, + def test_get_multiple_apikey(self, initial_data, client, apikey_data, zone_data, basic_auth_admin_headers): res = client.post("/api/v1/pdnsadmin/zones", headers=basic_auth_admin_headers, @@ -103,7 +102,7 @@ class TestIntegrationApiApiKeyAdminUser(object): @pytest.mark.parametrize( "apikey_data", [user_apikey_data(), admin_apikey_data()]) - def test_delete_apikey(self, client, initial_data, apikey_data, zone_data, + def test_delete_apikey(self, initial_data, client, apikey_data, zone_data, basic_auth_admin_headers): res = client.post("/api/v1/pdnsadmin/zones", headers=basic_auth_admin_headers, @@ -123,7 +122,7 @@ class TestIntegrationApiApiKeyAdminUser(object): assert res.status_code == 201 apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}" - apikey_url = apikey_url_format.format(data[0]['id']) + apikey_url = apikey_url_format.format(data['id']) res = client.delete(apikey_url, headers=basic_auth_admin_headers) assert res.status_code == 204 diff --git a/tests/integration/api/apikey/test_user.py b/tests/integration/api/apikey/test_user.py index 17fc970..34fdd86 100644 --- a/tests/integration/api/apikey/test_user.py +++ b/tests/integration/api/apikey/test_user.py @@ -1,14 +1,11 @@ -import pytest import json from collections import namedtuple from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client, initial_data, basic_auth_user_headers -from tests.fixtures import zone_data -class TestIntegrationApiZoneUser(object): +class TestIntegrationApiApiKeyUser(object): def test_empty_get(self, initial_data, client, basic_auth_user_headers): res = client.get("/api/v1/pdnsadmin/zones", headers=basic_auth_user_headers) diff --git a/tests/integration/api/management/__init__.py b/tests/integration/api/management/__init__.py index b546f46..af748be 100644 --- a/tests/integration/api/management/__init__.py +++ b/tests/integration/api/management/__init__.py @@ -14,8 +14,10 @@ class IntegrationApiManagement(object): assert res.status_code == status_code if res.status_code == 200: data = res.get_json(force=True) - assert len(data) == 1 - return data[0] + assert isinstance(data, dict) + assert len(data) == 7 + assert data.get('id', None) + return data return None def check_account(self, cmpdata, data=None): @@ -37,8 +39,10 @@ class IntegrationApiManagement(object): assert res.status_code == status_code if status_code == 200: data = res.get_json(force=True) - assert len(data) == 1 - return data[0] + assert isinstance(data, dict) + assert len(data) == 7 + assert data.get('id', None) + return data return None def check_user(self, cmpdata, data=None): @@ -50,5 +54,5 @@ class IntegrationApiManagement(object): elif key == 'role': assert data[key]['name'] == cmpdata['role_name'] else: - assert key in ("id",) + assert key in ("id","accounts",) return data diff --git a/tests/integration/api/management/test_admin_user.py b/tests/integration/api/management/test_admin_user.py index 3df0384..b017320 100644 --- a/tests/integration/api/management/test_admin_user.py +++ b/tests/integration/api/management/test_admin_user.py @@ -1,9 +1,5 @@ - import json -from tests.fixtures import ( # noqa: F401 - client, initial_data, basic_auth_admin_headers, - test_admin_user, test_user, account_data, user1_data, -) + from . import IntegrationApiManagement @@ -89,8 +85,9 @@ class TestIntegrationApiManagementAdminUser(IntegrationApiManagement): ) data = res.get_json(force=True) assert res.status_code == 200 - assert len(data) == 1 - data = data[0] + assert isinstance(data, dict) + assert len(data) == 7 + assert data.get('id', None) account_id = data["id"] for key, value in account_data.items(): assert data[key] == value @@ -142,10 +139,12 @@ class TestIntegrationApiManagementAdminUser(IntegrationApiManagement): ) data = res.get_json(force=True) assert res.status_code == 201 - assert len(data) == 1 + assert isinstance(data, dict) + assert len(data) == 6 + assert data.get('id', None) # Check user - user1 = self.check_user(user1_data, data[0]) + user1 = self.check_user(user1_data, data) user1_id = user1["id"] updated = user1_data.copy() @@ -240,10 +239,12 @@ class TestIntegrationApiManagementAdminUser(IntegrationApiManagement): ) data = res.get_json(force=True) assert res.status_code == 201 - assert len(data) == 1 + assert isinstance(data, dict) + assert len(data) == 6 + assert data.get('id', None) # Check user - user1 = self.check_user(user1_data, data[0]) + user1 = self.check_user(user1_data, data) user1_id = user1["id"] # Assert test account has no users diff --git a/tests/integration/api/management/test_user.py b/tests/integration/api/management/test_user.py index b56652e..65a5869 100644 --- a/tests/integration/api/management/test_user.py +++ b/tests/integration/api/management/test_user.py @@ -1,66 +1,61 @@ - import json -from tests.fixtures import ( # noqa: F401 - client, initial_data, basic_auth_admin_headers, basic_auth_user_headers, - test_admin_user, test_user, account_data, user1_data, -) from . import IntegrationApiManagement class TestIntegrationApiManagementUser(IntegrationApiManagement): - def test_accounts_empty_get( - self, client, initial_data, # noqa: F811 - basic_auth_user_headers): # noqa: F811 + def test_accounts_empty_get(self, initial_data, client, # noqa: F811 + basic_auth_user_headers): # noqa: F811 res = client.get("/api/v1/pdnsadmin/accounts", headers=basic_auth_user_headers) assert res.status_code == 401 - def test_users_empty_get( - self, client, initial_data, # noqa: F811 - test_admin_user, test_user, # noqa: F811 - basic_auth_user_headers): # noqa: F811 + def test_users_empty_get(self, initial_data, client, # noqa: F811 + test_admin_user, test_user, # noqa: F811 + basic_auth_user_headers): # noqa: F811 res = client.get("/api/v1/pdnsadmin/users", headers=basic_auth_user_headers) assert res.status_code == 401 - def test_self_get( - self, initial_data, client, test_user, # noqa: F811 - basic_auth_user_headers): # noqa: F811 - self.user = None + def test_self_get(self, initial_data, client, basic_auth_user_headers, test_user): # noqa: F811 res = client.get("/api/v1/pdnsadmin/users/{}".format(test_user), headers=basic_auth_user_headers) data = res.get_json(force=True) assert res.status_code == 200 - assert len(data) == 1, data - self.user = data + assert data - def test_accounts( - self, client, initial_data, # noqa: F811 - account_data, # noqa: F811 - basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811 + def test_create_account_fail(self, client, initial_data, account_data, # noqa: F811 + basic_auth_user_headers): # noqa: F811 + + # Create account (should fail) + res = client.post("/api/v1/pdnsadmin/accounts", + headers=basic_auth_user_headers, + data=json.dumps(account_data), + content_type="application/json") + assert res.status_code == 401 + + def test_create_account_as_admin(self, app, initial_data, client, account_data, # noqa: F811 + basic_auth_admin_headers): # noqa: F811 self.client = client self.basic_auth_admin_headers = basic_auth_admin_headers - # Create account (should fail) - res = client.post( - "/api/v1/pdnsadmin/accounts", - headers=basic_auth_user_headers, - data=json.dumps(account_data), - content_type="application/json", - ) - assert res.status_code == 401 + with app.test_request_context(): + # Create account (as admin) + res = client.post("/api/v1/pdnsadmin/accounts", + headers=basic_auth_admin_headers, + data=json.dumps(account_data), + content_type="application/json") + data = res.get_json(force=True) + assert res.status_code == 201 - # Create account (as admin) - res = client.post( - "/api/v1/pdnsadmin/accounts", - headers=basic_auth_admin_headers, - data=json.dumps(account_data), - content_type="application/json", - ) - data = res.get_json(force=True) - assert res.status_code == 201 + def test_update_account_fail( + self, initial_data, client, # noqa: F811 + account_data, # noqa: F811 + basic_auth_user_headers, + basic_auth_admin_headers): # noqa: F811 + self.client = client + self.basic_auth_admin_headers = basic_auth_admin_headers # Check account data = self.check_account(account_data) @@ -75,6 +70,18 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement): ) assert res.status_code == 401 + def test_delete_account_fail( + self, initial_data, client, # noqa: F811 + account_data, # noqa: F811 + basic_auth_user_headers, + basic_auth_admin_headers): # noqa: F811 + self.client = client + self.basic_auth_admin_headers = basic_auth_admin_headers + + # Check account + data = self.check_account(account_data) + account_id = data["id"] + # Delete account (should fail) res = client.delete( "/api/v1/pdnsadmin/accounts/{}".format(account_id), @@ -84,6 +91,17 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement): ) assert res.status_code == 401 + def test_delete_account_as_admin( + self, client, initial_data, # noqa: F811 + account_data, # noqa: F811 + basic_auth_admin_headers): # noqa: F811 + self.client = client + self.basic_auth_admin_headers = basic_auth_admin_headers + + # Check account + data = self.check_account(account_data) + account_id = data["id"] + # Cleanup (delete account as admin) res = client.delete( "/api/v1/pdnsadmin/accounts/{}".format(account_id), @@ -94,8 +112,8 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement): assert res.status_code == 204 def test_users( - self, client, initial_data, # noqa: F811 - user1_data, # noqa: F811 + self, client, initial_data, # noqa: F811 + user1_data, # noqa: F811 basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811 self.client = client self.basic_auth_admin_headers = basic_auth_admin_headers @@ -118,10 +136,12 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement): ) data = res.get_json(force=True) assert res.status_code == 201 - assert len(data) == 1 + assert isinstance(data, dict) + assert len(data) == 6 + assert data.get('id', None) # Check user - user1 = self.check_user(user1_data, data[0]) + user1 = self.check_user(user1_data, data) user1_id = user1["id"] # Update to defaults (should fail) @@ -152,8 +172,8 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement): assert res.status_code == 204 def test_account_users( - self, client, initial_data, # noqa: F811 - account_data, user1_data, # noqa: F811 + self, client, initial_data, # noqa: F811 + account_data, user1_data, # noqa: F811 basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811 self.client = client self.basic_auth_admin_headers = basic_auth_admin_headers @@ -181,10 +201,12 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement): ) data = res.get_json(force=True) assert res.status_code == 201 - assert len(data) == 1 + assert isinstance(data, dict) + assert len(data) == 6 + assert data.get('id', None) # Check user - user1 = self.check_user(user1_data, data[0]) + user1 = self.check_user(user1_data, data) user1_id = user1["id"] # Assert test account has no users diff --git a/tests/integration/api/zone/test_admin_user.py b/tests/integration/api/zone/test_admin_user.py index 2aa9c44..f1c12a7 100644 --- a/tests/integration/api/zone/test_admin_user.py +++ b/tests/integration/api/zone/test_admin_user.py @@ -1,11 +1,8 @@ -import pytest import json from collections import namedtuple from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client, initial_data, basic_auth_admin_headers -from tests.fixtures import zone_data class TestIntegrationApiZoneAdminUser(object): diff --git a/tests/integration/api/zone/test_apikey_admin_user.py b/tests/integration/api/zone/test_apikey_admin_user.py index 9090ed6..494366f 100644 --- a/tests/integration/api/zone/test_apikey_admin_user.py +++ b/tests/integration/api/zone/test_apikey_admin_user.py @@ -3,9 +3,6 @@ from collections import namedtuple from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client -from tests.fixtures import zone_data, initial_apikey_data -from tests.fixtures import admin_apikey_integration class TestIntegrationApiZoneAdminApiKey(object): diff --git a/tests/integration/api/zone/test_apikey_user.py b/tests/integration/api/zone/test_apikey_user.py index b0c96bc..94ddac1 100644 --- a/tests/integration/api/zone/test_apikey_user.py +++ b/tests/integration/api/zone/test_apikey_user.py @@ -3,9 +3,6 @@ from collections import namedtuple from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client -from tests.fixtures import zone_data, initial_apikey_data -from tests.fixtures import user_apikey_integration class TestIntegrationApiZoneUserApiKey(object): diff --git a/tests/integration/api/zone/test_user.py b/tests/integration/api/zone/test_user.py index 7392521..1d0751d 100644 --- a/tests/integration/api/zone/test_user.py +++ b/tests/integration/api/zone/test_user.py @@ -3,8 +3,6 @@ from collections import namedtuple from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client, initial_data, basic_auth_user_headers -from tests.fixtures import zone_data class TestIntegrationApiZoneUser(object): diff --git a/tests/unit/apikey/__init__.py b/tests/unit/apikey/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/unit/test_decorators.py b/tests/unit/test_decorators.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/unit/zone/test_admin_apikey.py b/tests/unit/zone/test_admin_apikey.py index 6f3a9c3..ccef69c 100644 --- a/tests/unit/zone/test_admin_apikey.py +++ b/tests/unit/zone/test_admin_apikey.py @@ -2,9 +2,6 @@ import json import pytest from unittest.mock import patch from collections import namedtuple -import sys -import os -sys.path.append(os.getcwd()) import powerdnsadmin from powerdnsadmin.models.setting import Setting @@ -12,79 +9,108 @@ from powerdnsadmin.models.domain import Domain from powerdnsadmin.models.api_key import ApiKey from powerdnsadmin.models.role import Role from powerdnsadmin.lib.validators import validate_zone -from powerdnsadmin.lib.schema import DomainSchema, ApiKeySchema -from tests.fixtures import client, initial_data, created_zone_data -from tests.fixtures import user_apikey, admin_apikey, zone_data -from tests.fixtures import admin_apikey_data, load_data +from powerdnsadmin.lib.schema import DomainSchema +from tests.conftest import admin_apikey_data, load_data class TestUnitApiZoneAdminApiKey(object): @pytest.fixture - def common_data_mock(self): - self.google_setting_patcher = patch( - 'powerdnsadmin.services.google.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.github_setting_patcher = patch( - 'powerdnsadmin.services.github.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.oidc_setting_patcher = patch( - 'powerdnsadmin.services.oidc.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.helpers_setting_patcher = patch( - 'powerdnsadmin.lib.helper.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.models_setting_patcher = patch( - 'powerdnsadmin.models.setting.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.domain_model_setting_patcher = patch( - 'powerdnsadmin.models.domain.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.record_model_setting_patcher = patch( - 'powerdnsadmin.models.record.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.server_model_setting_patcher = patch( - 'powerdnsadmin.models.server.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.mock_apikey_patcher = patch( - 'powerdnsadmin.decorators.ApiKey', - spec=powerdnsadmin.models.api_key.ApiKey) - self.mock_hist_patcher = patch( - 'powerdnsadmin.routes.api.History', - spec=powerdnsadmin.models.history.History) - self.mock_setting_patcher = patch( - 'powerdnsadmin.routes.api.Setting', - spec=powerdnsadmin.models.setting.Setting) + def common_data_mock(self, app): + with app.app_context(): + self.google_setting_patcher = patch( + 'powerdnsadmin.services.google.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.github_setting_patcher = patch( + 'powerdnsadmin.services.github.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.azure_setting_patcher = patch( + 'powerdnsadmin.services.azure.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.oidc_setting_patcher = patch( + 'powerdnsadmin.services.oidc.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.helpers_setting_patcher = patch( + 'powerdnsadmin.lib.helper.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.models_setting_patcher = patch( + 'powerdnsadmin.models.setting.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.domain_model_setting_patcher = patch( + 'powerdnsadmin.models.domain.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.record_model_setting_patcher = patch( + 'powerdnsadmin.models.record.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.server_model_setting_patcher = patch( + 'powerdnsadmin.models.server.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.mock_apikey_patcher = patch( + 'powerdnsadmin.decorators.ApiKey', + spec=powerdnsadmin.models.api_key.ApiKey) + self.mock_hist_patcher = patch( + 'powerdnsadmin.routes.api.History', + spec=powerdnsadmin.models.history.History) + self.mock_setting_patcher = patch( + 'powerdnsadmin.routes.api.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.mock_decorators_setting_patcher = patch( + 'powerdnsadmin.decorators.Setting', + spec=powerdnsadmin.models.setting.Setting) - data = admin_apikey_data() - api_key = ApiKey(desc=data['description'], - role_name=data['role'], - domains=[]) - api_key.role = Role(name=data['role']) + data = admin_apikey_data() + api_key = ApiKey(desc=data['description'], + role_name=data['role'], + domains=[]) + api_key.role = Role(name=data['role']) - self.mock_google_setting = self.google_setting_patcher.start() - self.mock_github_setting = self.github_setting_patcher.start() - self.mock_oidc_setting = self.oidc_setting_patcher.start() - self.mock_helpers_setting = self.helpers_setting_patcher.start() - self.mock_models_setting = self.models_setting_patcher.start() - self.mock_domain_model_setting = self.domain_model_setting_patcher.start( - ) - self.mock_record_model_setting = self.record_model_setting_patcher.start( - ) - self.mock_server_model_setting = self.server_model_setting_patcher.start( - ) - self.mock_apikey = self.mock_apikey_patcher.start() - self.mock_hist = self.mock_hist_patcher.start() - self.mock_setting = self.mock_setting_patcher.start() + self.mock_google_setting = self.google_setting_patcher.start() + self.mock_github_setting = self.github_setting_patcher.start() + self.mock_azure_setting = self.azure_setting_patcher.start() + self.mock_oidc_setting = self.oidc_setting_patcher.start() + self.mock_helpers_setting = self.helpers_setting_patcher.start() + self.mock_models_setting = self.models_setting_patcher.start() + self.mock_domain_model_setting = self.domain_model_setting_patcher.start( + ) + self.mock_record_model_setting = self.record_model_setting_patcher.start( + ) + self.mock_server_model_setting = self.server_model_setting_patcher.start( + ) + self.mock_apikey = self.mock_apikey_patcher.start() + self.mock_hist = self.mock_hist_patcher.start() + self.mock_setting = self.mock_setting_patcher.start() + self.mock_decorators_setting = self.mock_decorators_setting_patcher.start() + + self.mock_google_setting.return_value.get.side_effect = load_data + self.mock_github_setting.return_value.get.side_effect = load_data + self.mock_azure_setting.return_value.get.side_effect = load_data + self.mock_oidc_setting.return_value.get.side_effect = load_data + self.mock_helpers_setting.return_value.get.side_effect = load_data + self.mock_models_setting.return_value.get.side_effect = load_data + self.mock_domain_model_setting.return_value.get.side_effect = load_data + self.mock_record_model_setting.return_value.get.side_effect = load_data + self.mock_server_model_setting.return_value.get.side_effect = load_data + self.mock_decorators_setting.return_value.get.side_effect = load_data + self.mock_apikey.return_value.is_validate.return_value = api_key + + yield + + for patcher in [ + self.google_setting_patcher, + self.github_setting_patcher, + self.azure_setting_patcher, + self.oidc_setting_patcher, + self.helpers_setting_patcher, + self.models_setting_patcher, + self.domain_model_setting_patcher, + self.record_model_setting_patcher, + self.server_model_setting_patcher, + self.mock_apikey_patcher, + self.mock_hist_patcher, + self.mock_setting_patcher, + self.mock_decorators_setting_patcher, + ]: + patcher.stop() - self.mock_google_setting.return_value.get.side_effect = load_data - self.mock_github_setting.return_value.get.side_effect = load_data - self.mock_oidc_setting.return_value.get.side_effect = load_data - self.mock_helpers_setting.return_value.get.side_effect = load_data - self.mock_models_setting.return_value.get.side_effect = load_data - self.mock_domain_model_setting.return_value.get.side_effect = load_data - self.mock_record_model_setting.return_value.get.side_effect = load_data - self.mock_server_model_setting.return_value.get.side_effect = load_data - self.mock_apikey.return_value.is_validate.return_value = api_key def test_empty_get(self, client, common_data_mock, admin_apikey): with patch('powerdnsadmin.routes.api.Domain') as mock_domain, \ diff --git a/tests/unit/zone/test_admin_user.py b/tests/unit/zone/test_admin_user.py index 085ba74..c4412de 100644 --- a/tests/unit/zone/test_admin_user.py +++ b/tests/unit/zone/test_admin_user.py @@ -5,37 +5,33 @@ from collections import namedtuple import powerdnsadmin from powerdnsadmin.models.user import User -from powerdnsadmin.models.role import Role from powerdnsadmin.models.domain import Domain from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client, basic_auth_admin_headers -from tests.fixtures import zone_data, created_zone_data, load_data +from tests.conftest import load_data class TestUnitApiZoneAdminUser(object): @pytest.fixture - def common_data_mock(self): + def common_data_mock(self, app, initial_data): + self.google_setting_patcher = patch( 'powerdnsadmin.services.google.Setting', spec=powerdnsadmin.models.setting.Setting) self.github_setting_patcher = patch( 'powerdnsadmin.services.github.Setting', spec=powerdnsadmin.models.setting.Setting) + self.azure_setting_patcher = patch( + 'powerdnsadmin.services.azure.Setting', + spec=powerdnsadmin.models.setting.Setting) self.oidc_setting_patcher = patch( 'powerdnsadmin.services.oidc.Setting', spec=powerdnsadmin.models.setting.Setting) - self.api_setting_patcher = patch( - 'powerdnsadmin.routes.api.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.base_route_user_patcher = patch( - 'powerdnsadmin.routes.base.User', - spec=powerdnsadmin.models.user.User) self.helpers_setting_patcher = patch( 'powerdnsadmin.lib.helper.Setting', spec=powerdnsadmin.models.setting.Setting) self.models_setting_patcher = patch( - 'powerdnsadmin.models.Setting', + 'powerdnsadmin.models.setting.Setting', spec=powerdnsadmin.models.setting.Setting) self.domain_model_setting_patcher = patch( 'powerdnsadmin.models.domain.Setting', @@ -46,50 +42,81 @@ class TestUnitApiZoneAdminUser(object): self.server_model_setting_patcher = patch( 'powerdnsadmin.models.server.Setting', spec=powerdnsadmin.models.setting.Setting) - self.decorators_setting_patcher = patch( - 'powerdnsadmin.decorators.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.mock_user_patcher = patch('powerdnsadmin.decorators.User', - spec=powerdnsadmin.models.user.User) + self.mock_user_patcher = patch( + 'powerdnsadmin.decorators.User', + spec=powerdnsadmin.models.user.User) self.mock_hist_patcher = patch( 'powerdnsadmin.routes.api.History', spec=powerdnsadmin.models.history.History) + self.mock_setting_patcher = patch( + 'powerdnsadmin.routes.api.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.mock_decorators_setting_patcher = patch( + 'powerdnsadmin.decorators.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.base_route_user_patcher = patch( + 'powerdnsadmin.routes.base.User', + spec=powerdnsadmin.models.user.User) - self.mock_google_setting = self.google_setting_patcher.start() - self.mock_github_setting = self.github_setting_patcher.start() - self.mock_oidc_setting = self.oidc_setting_patcher.start() - self.mock_base_route_user = self.base_route_user_patcher.start() - self.mock_helpers_setting = self.helpers_setting_patcher.start() - self.mock_models_setting = self.models_setting_patcher.start() - self.mock_domain_model_setting = self.domain_model_setting_patcher.start( - ) - self.mock_record_model_setting = self.record_model_setting_patcher.start( - ) - self.mock_server_model_setting = self.server_model_setting_patcher.start( - ) - self.decorators_setting = self.decorators_setting_patcher.start() - self.api_setting = self.api_setting_patcher.start() - self.mock_user = self.mock_user_patcher.start() - self.mock_hist = self.mock_hist_patcher.start() + with app.app_context(): + self.mock_google_setting = self.google_setting_patcher.start() + self.mock_github_setting = self.github_setting_patcher.start() + self.mock_azure_setting = self.azure_setting_patcher.start() + self.mock_oidc_setting = self.oidc_setting_patcher.start() + self.mock_base_route_user = self.base_route_user_patcher.start() + self.mock_helpers_setting = self.helpers_setting_patcher.start() + self.mock_models_setting = self.models_setting_patcher.start() + self.mock_domain_model_setting = self.domain_model_setting_patcher.start( + ) + self.mock_record_model_setting = self.record_model_setting_patcher.start( + ) + self.mock_server_model_setting = self.server_model_setting_patcher.start( + ) + self.mock_user = self.mock_user_patcher.start() + self.mock_hist = self.mock_hist_patcher.start() + self.mock_setting = self.mock_setting_patcher.start() + self.mock_decorators_setting = self.mock_decorators_setting_patcher.start() - self.mock_google_setting.return_value.get.side_effect = load_data - self.mock_github_setting.return_value.get.side_effect = load_data - self.mock_oidc_setting.return_value.get.side_effect = load_data - self.mock_helpers_setting.return_value.get.side_effect = load_data - self.mock_models_setting.return_value.get.side_effect = load_data - self.mock_domain_model_setting.return_value.get.side_effect = load_data - self.mock_record_model_setting.return_value.get.side_effect = load_data - self.mock_server_model_setting.return_value.get.side_effect = load_data - self.decorators_setting.return_value.get.side_effect = load_data - self.api_setting.return_value.get.side_effect = load_data + self.mock_google_setting.return_value.get.side_effect = load_data + self.mock_github_setting.return_value.get.side_effect = load_data + self.mock_azure_setting.return_value.get.side_effect = load_data + self.mock_oidc_setting.return_value.get.side_effect = load_data + self.mock_helpers_setting.return_value.get.side_effect = load_data + self.mock_models_setting.return_value.get.side_effect = load_data + self.mock_domain_model_setting.return_value.get.side_effect = load_data + self.mock_record_model_setting.return_value.get.side_effect = load_data + self.mock_server_model_setting.return_value.get.side_effect = load_data + self.mock_decorators_setting.return_value.get.side_effect = load_data + self.mock_setting.return_value.get.side_effect = load_data - self.mockk = MagicMock() - self.mockk.role.name = "Administrator" + self.mockk = MagicMock() + self.mockk.role.name = "Administrator" + + self.mock_user.query.filter.return_value.first.return_value = self.mockk + self.mock_user.return_value.is_validate.return_value = True + self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk + self.mock_base_route_user.return_value.is_validate.return_value = True + + yield + + for patcher in [ + self.google_setting_patcher, + self.github_setting_patcher, + self.azure_setting_patcher, + self.oidc_setting_patcher, + self.base_route_user_patcher, + self.helpers_setting_patcher, + self.models_setting_patcher, + self.domain_model_setting_patcher, + self.record_model_setting_patcher, + self.server_model_setting_patcher, + self.mock_user_patcher, + self.mock_hist_patcher, + self.mock_setting_patcher, + self.mock_decorators_setting_patcher, + ]: + patcher.stop() - self.mock_user.query.filter.return_value.first.return_value = self.mockk - self.mock_user.return_value.is_validate.return_value = True - self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk - self.mock_base_route_user.return_value.is_validate.return_value = True def test_empty_get(self, client, common_data_mock, basic_auth_admin_headers): @@ -135,8 +162,7 @@ class TestUnitApiZoneAdminUser(object): headers=basic_auth_admin_headers) data = res.get_json(force=True) - fake_domain = namedtuple("Domain", - data[0].keys())(*data[0].values()) + fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values()) domain_schema = DomainSchema(many=True) json.dumps(domain_schema.dump([fake_domain])) diff --git a/tests/unit/zone/test_user.py b/tests/unit/zone/test_user.py index 7a8d8d9..f7fe335 100644 --- a/tests/unit/zone/test_user.py +++ b/tests/unit/zone/test_user.py @@ -5,37 +5,33 @@ from collections import namedtuple import powerdnsadmin from powerdnsadmin.models.user import User -from powerdnsadmin.models.role import Role from powerdnsadmin.models.domain import Domain from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client, basic_auth_user_headers -from tests.fixtures import zone_data, created_zone_data, load_data +from tests.conftest import load_data class TestUnitApiZoneUser(object): @pytest.fixture - def common_data_mock(self): + def common_data_mock(self, app, initial_data): + self.google_setting_patcher = patch( 'powerdnsadmin.services.google.Setting', spec=powerdnsadmin.models.setting.Setting) self.github_setting_patcher = patch( 'powerdnsadmin.services.github.Setting', spec=powerdnsadmin.models.setting.Setting) + self.azure_setting_patcher = patch( + 'powerdnsadmin.services.azure.Setting', + spec=powerdnsadmin.models.setting.Setting) self.oidc_setting_patcher = patch( 'powerdnsadmin.services.oidc.Setting', spec=powerdnsadmin.models.setting.Setting) - self.api_setting_patcher = patch( - 'powerdnsadmin.routes.api.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.base_route_user_patcher = patch( - 'powerdnsadmin.routes.base.User', - spec=powerdnsadmin.models.user.User) self.helpers_setting_patcher = patch( 'powerdnsadmin.lib.helper.Setting', spec=powerdnsadmin.models.setting.Setting) self.models_setting_patcher = patch( - 'powerdnsadmin.models.Setting', + 'powerdnsadmin.models.setting.Setting', spec=powerdnsadmin.models.setting.Setting) self.domain_model_setting_patcher = patch( 'powerdnsadmin.models.domain.Setting', @@ -46,55 +42,86 @@ class TestUnitApiZoneUser(object): self.server_model_setting_patcher = patch( 'powerdnsadmin.models.server.Setting', spec=powerdnsadmin.models.setting.Setting) - self.decorators_setting_patcher = patch( - 'powerdnsadmin.decorators.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.mock_user_patcher = patch('powerdnsadmin.decorators.User', - spec=powerdnsadmin.models.user.User) + self.mock_user_patcher = patch( + 'powerdnsadmin.decorators.User', + spec=powerdnsadmin.models.user.User) self.mock_hist_patcher = patch( 'powerdnsadmin.routes.api.History', spec=powerdnsadmin.models.history.History) + self.mock_setting_patcher = patch( + 'powerdnsadmin.routes.api.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.mock_decorators_setting_patcher = patch( + 'powerdnsadmin.decorators.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.base_route_user_patcher = patch( + 'powerdnsadmin.routes.base.User', + spec=powerdnsadmin.models.user.User) - self.mock_google_setting = self.google_setting_patcher.start() - self.mock_github_setting = self.github_setting_patcher.start() - self.mock_oidc_setting = self.oidc_setting_patcher.start() - self.mock_base_route_user = self.base_route_user_patcher.start() - self.mock_helpers_setting = self.helpers_setting_patcher.start() - self.mock_models_setting = self.models_setting_patcher.start() - self.mock_domain_model_setting = self.domain_model_setting_patcher.start( - ) - self.mock_record_model_setting = self.record_model_setting_patcher.start( - ) - self.mock_server_model_setting = self.server_model_setting_patcher.start( - ) - self.decorators_setting = self.decorators_setting_patcher.start() - self.api_setting = self.api_setting_patcher.start() - self.mock_user = self.mock_user_patcher.start() - self.mock_hist = self.mock_hist_patcher.start() + with app.app_context(): + self.mock_google_setting = self.google_setting_patcher.start() + self.mock_github_setting = self.github_setting_patcher.start() + self.mock_azure_setting = self.azure_setting_patcher.start() + self.mock_oidc_setting = self.oidc_setting_patcher.start() + self.mock_base_route_user = self.base_route_user_patcher.start() + self.mock_helpers_setting = self.helpers_setting_patcher.start() + self.mock_models_setting = self.models_setting_patcher.start() + self.mock_domain_model_setting = self.domain_model_setting_patcher.start( + ) + self.mock_record_model_setting = self.record_model_setting_patcher.start( + ) + self.mock_server_model_setting = self.server_model_setting_patcher.start( + ) + self.mock_user = self.mock_user_patcher.start() + self.mock_hist = self.mock_hist_patcher.start() + self.mock_setting = self.mock_setting_patcher.start() + self.mock_decorators_setting = self.mock_decorators_setting_patcher.start() - self.mock_google_setting.return_value.get.side_effect = load_data - self.mock_github_setting.return_value.get.side_effect = load_data - self.mock_oidc_setting.return_value.get.side_effect = load_data - self.mock_helpers_setting.return_value.get.side_effect = load_data - self.mock_models_setting.return_value.get.side_effect = load_data - self.mock_domain_model_setting.return_value.get.side_effect = load_data - self.mock_record_model_setting.return_value.get.side_effect = load_data - self.mock_server_model_setting.return_value.get.side_effect = load_data - self.decorators_setting.return_value.get.side_effect = load_data - self.api_setting.return_value.get.side_effect = load_data + self.mock_google_setting.return_value.get.side_effect = load_data + self.mock_github_setting.return_value.get.side_effect = load_data + self.mock_azure_setting.return_value.get.side_effect = load_data + self.mock_oidc_setting.return_value.get.side_effect = load_data + self.mock_helpers_setting.return_value.get.side_effect = load_data + self.mock_models_setting.return_value.get.side_effect = load_data + self.mock_domain_model_setting.return_value.get.side_effect = load_data + self.mock_record_model_setting.return_value.get.side_effect = load_data + self.mock_server_model_setting.return_value.get.side_effect = load_data + self.mock_decorators_setting.return_value.get.side_effect = load_data + self.mock_setting.return_value.get.side_effect = load_data - self.mockk = MagicMock() - self.mockk.role.name = "User" + self.mockk = MagicMock() + self.mockk.role.name = "User" + + self.mock_user.query.filter.return_value.first.return_value = self.mockk + self.mock_user.return_value.is_validate.return_value = True + self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk + self.mock_base_route_user.return_value.is_validate.return_value = True + + yield + + for patcher in [ + self.google_setting_patcher, + self.github_setting_patcher, + self.azure_setting_patcher, + self.oidc_setting_patcher, + self.base_route_user_patcher, + self.helpers_setting_patcher, + self.models_setting_patcher, + self.domain_model_setting_patcher, + self.record_model_setting_patcher, + self.server_model_setting_patcher, + self.mock_user_patcher, + self.mock_hist_patcher, + self.mock_setting_patcher, + self.mock_decorators_setting_patcher, + ]: + patcher.stop() - self.mock_user.query.filter.return_value.first.return_value = self.mockk - self.mock_user.return_value.is_validate.return_value = True - self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk - self.mock_base_route_user.return_value.is_validate.return_value = True def test_create_zone(self, client, common_data_mock, zone_data, basic_auth_user_headers, created_zone_data): with patch('powerdnsadmin.lib.helper.requests.request') as mock_post, \ - patch('powerdnsadmin.routes.api.Domain') as mock_domain: + patch('powerdnsadmin.routes.api.Domain') as mock_domain: mock_post.return_value.status_code = 201 mock_post.return_value.content = json.dumps(created_zone_data) mock_post.return_value.headers = {} @@ -115,8 +142,9 @@ class TestUnitApiZoneUser(object): with patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains: test_domain = Domain(1, name=zone_data['name'].rstrip(".")) mock_user_domains.return_value = [test_domain] + res = client.get("/api/v1/pdnsadmin/zones", - headers=basic_auth_user_headers) + headers=basic_auth_user_headers) data = res.get_json(force=True) fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values()) @@ -127,9 +155,9 @@ class TestUnitApiZoneUser(object): def test_delete_zone(self, client, common_data_mock, zone_data, basic_auth_user_headers): - with patch('powerdnsadmin.lib.utils.requests.request') as mock_delete, \ - patch('powerdnsadmin.routes.api.Domain') as mock_domain, \ - patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains: + with patch('powerdnsadmin.lib.helper.requests.request') as mock_delete, \ + patch('powerdnsadmin.routes.api.Domain') as mock_domain, \ + patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains: test_domain = Domain(1, name=zone_data['name'].rstrip(".")) mock_domain.return_value.update.return_value = True diff --git a/tests/unit/zone/test_user_apikey.py b/tests/unit/zone/test_user_apikey.py index 84cd53c..b1911ce 100644 --- a/tests/unit/zone/test_user_apikey.py +++ b/tests/unit/zone/test_user_apikey.py @@ -1,7 +1,6 @@ import json import pytest from unittest.mock import patch -from base64 import b64encode from collections import namedtuple import powerdnsadmin @@ -10,78 +9,111 @@ from powerdnsadmin.models.domain import Domain from powerdnsadmin.models.api_key import ApiKey from powerdnsadmin.models.role import Role from powerdnsadmin.lib.validators import validate_zone -from powerdnsadmin.lib.schema import DomainSchema, ApiKeySchema -from tests.fixtures import client, initial_data, created_zone_data -from tests.fixtures import user_apikey, zone_data -from tests.fixtures import user_apikey_data, load_data +from powerdnsadmin.lib.schema import DomainSchema +from tests.conftest import user_apikey_data, load_data class TestUnitApiZoneUserApiKey(object): @pytest.fixture - def common_data_mock(self): - self.google_setting_patcher = patch( - 'powerdnsadmin.services.google.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.github_setting_patcher = patch( - 'powerdnsadmin.services.github.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.oidc_setting_patcher = patch( - 'powerdnsadmin.services.oidc.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.helpers_setting_patcher = patch( - 'powerdnsadmin.lib.helper.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.models_setting_patcher = patch( - 'powerdnsadmin.models.setting.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.domain_model_setting_patcher = patch( - 'powerdnsadmin.models.domain.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.record_model_setting_patcher = patch( - 'powerdnsadmin.models.record.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.server_model_setting_patcher = patch( - 'powerdnsadmin.models.server.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.mock_apikey_patcher = patch( - 'powerdnsadmin.decorators.ApiKey', - spec=powerdnsadmin.models.api_key.ApiKey) - self.mock_hist_patcher = patch( - 'powerdnsadmin.routes.api.History', - spec=powerdnsadmin.models.history.History) + def common_data_mock(self, app): + with app.app_context(): + self.google_setting_patcher = patch( + 'powerdnsadmin.services.google.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.github_setting_patcher = patch( + 'powerdnsadmin.services.github.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.azure_setting_patcher = patch( + 'powerdnsadmin.services.azure.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.oidc_setting_patcher = patch( + 'powerdnsadmin.services.oidc.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.helpers_setting_patcher = patch( + 'powerdnsadmin.lib.helper.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.models_setting_patcher = patch( + 'powerdnsadmin.models.setting.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.domain_model_setting_patcher = patch( + 'powerdnsadmin.models.domain.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.record_model_setting_patcher = patch( + 'powerdnsadmin.models.record.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.server_model_setting_patcher = patch( + 'powerdnsadmin.models.server.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.mock_apikey_patcher = patch( + 'powerdnsadmin.decorators.ApiKey', + spec=powerdnsadmin.models.api_key.ApiKey) + self.mock_hist_patcher = patch( + 'powerdnsadmin.routes.api.History', + spec=powerdnsadmin.models.history.History) + self.mock_setting_patcher = patch( + 'powerdnsadmin.routes.api.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.mock_decorators_setting_patcher = patch( + 'powerdnsadmin.decorators.Setting', + spec=powerdnsadmin.models.setting.Setting) - self.mock_google_setting = self.google_setting_patcher.start() - self.mock_github_setting = self.github_setting_patcher.start() - self.mock_oidc_setting = self.oidc_setting_patcher.start() - self.mock_helpers_setting = self.helpers_setting_patcher.start() - self.mock_models_setting = self.models_setting_patcher.start() - self.mock_domain_model_setting = self.domain_model_setting_patcher.start( - ) - self.mock_record_model_setting = self.record_model_setting_patcher.start( - ) - self.mock_server_model_setting = self.server_model_setting_patcher.start( - ) - self.mock_apikey = self.mock_apikey_patcher.start() - self.mock_hist = self.mock_hist_patcher.start() + self.mock_google_setting = self.google_setting_patcher.start() + self.mock_github_setting = self.github_setting_patcher.start() + self.mock_azure_setting = self.azure_setting_patcher.start() + self.mock_oidc_setting = self.oidc_setting_patcher.start() + self.mock_helpers_setting = self.helpers_setting_patcher.start() + self.mock_models_setting = self.models_setting_patcher.start() + self.mock_domain_model_setting = self.domain_model_setting_patcher.start( + ) + self.mock_record_model_setting = self.record_model_setting_patcher.start( + ) + self.mock_server_model_setting = self.server_model_setting_patcher.start( + ) + self.mock_apikey = self.mock_apikey_patcher.start() + self.mock_hist = self.mock_hist_patcher.start() + self.mock_setting = self.mock_setting_patcher.start() + self.mock_decorators_setting = self.mock_decorators_setting_patcher.start() - self.mock_google_setting.return_value.get.side_effect = load_data - self.mock_github_setting.return_value.get.side_effect = load_data - self.mock_oidc_setting.return_value.get.side_effect = load_data - self.mock_helpers_setting.return_value.get.side_effect = load_data - self.mock_models_setting.return_value.get.side_effect = load_data - self.mock_domain_model_setting.return_value.get.side_effect = load_data - self.mock_record_model_setting.return_value.get.side_effect = load_data - self.mock_server_model_setting.return_value.get.side_effect = load_data + self.mock_google_setting.return_value.get.side_effect = load_data + self.mock_github_setting.return_value.get.side_effect = load_data + self.mock_azure_setting.return_value.get.side_effect = load_data + self.mock_oidc_setting.return_value.get.side_effect = load_data + self.mock_helpers_setting.return_value.get.side_effect = load_data + self.mock_models_setting.return_value.get.side_effect = load_data + self.mock_domain_model_setting.return_value.get.side_effect = load_data + self.mock_record_model_setting.return_value.get.side_effect = load_data + self.mock_server_model_setting.return_value.get.side_effect = load_data + self.mock_decorators_setting.return_value.get.side_effect = load_data - data = user_apikey_data() - domain = Domain(name=data['domains'][0]) + data = user_apikey_data() + domain = Domain(name=data['domains'][0]) - api_key = ApiKey(desc=data['description'], - role_name=data['role'], - domains=[domain]) - api_key.role = Role(name=data['role']) + api_key = ApiKey(desc=data['description'], + role_name=data['role'], + domains=[domain]) + api_key.role = Role(name=data['role']) + + self.mock_apikey.return_value.is_validate.return_value = api_key + + yield + + for patcher in [ + self.google_setting_patcher, + self.github_setting_patcher, + self.azure_setting_patcher, + self.oidc_setting_patcher, + self.helpers_setting_patcher, + self.models_setting_patcher, + self.domain_model_setting_patcher, + self.record_model_setting_patcher, + self.server_model_setting_patcher, + self.mock_apikey_patcher, + self.mock_hist_patcher, + self.mock_setting_patcher, + self.mock_decorators_setting_patcher, + ]: + patcher.stop() - self.mock_apikey.return_value.is_validate.return_value = api_key def test_create_zone(self, client, common_data_mock, zone_data, user_apikey, created_zone_data): @@ -112,8 +144,7 @@ class TestUnitApiZoneUserApiKey(object): headers=user_apikey) data = res.get_json(force=True) - fake_domain = namedtuple("Domain", - data[0].keys())(*data[0].values()) + fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values()) domain_schema = DomainSchema(many=True) json.dumps(domain_schema.dump([fake_domain]))