diff --git a/docker-compose-test.yml b/docker-compose-test.yml index 7dcf4a0..77c8dba 100644 --- a/docker-compose-test.yml +++ b/docker-compose-test.yml @@ -1,11 +1,11 @@ -version: "2.1" +version: "3.8" services: powerdns-admin: + image: powerdns-admin-test build: context: . dockerfile: docker-test/Dockerfile - image: powerdns-admin-test container_name: powerdns-admin-test ports: - "9191:80" @@ -17,10 +17,10 @@ services: - pdns-server pdns-server: + image: pdns-server-test build: context: . dockerfile: docker-test/Dockerfile.pdns - image: pdns-server-test ports: - "5053:53" - "5053:53/udp" diff --git a/docker-test/Dockerfile b/docker-test/Dockerfile index 577e120..17f934f 100644 --- a/docker-test/Dockerfile +++ b/docker-test/Dockerfile @@ -1,15 +1,35 @@ -FROM debian:stretch-slim +FROM debian:bullseye-slim LABEL maintainer="k@ndk.name" ENV LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8 LANGUAGE=en_US.UTF-8 RUN apt-get update -y \ - && apt-get install -y --no-install-recommends apt-transport-https locales locales-all python3-pip python3-setuptools python3-dev curl libsasl2-dev libldap2-dev libssl-dev libxml2-dev libxslt1-dev libxmlsec1-dev libffi-dev build-essential libmariadb-dev-compat \ - && curl -sL https://deb.nodesource.com/setup_10.x | bash - \ + && apt-get install -y --no-install-recommends \ + apt-transport-https \ + curl \ + build-essential \ + libffi-dev \ + libldap2-dev \ + libmariadb-dev-compat \ + libsasl2-dev \ + libssl-dev \ + libxml2-dev \ + libxmlsec1-dev \ + libxmlsec1-openssl \ + libxslt1-dev \ + locales \ + locales-all \ + pkg-config \ + python3-dev \ + python3-pip \ + python3-setuptools \ + && curl -sL https://deb.nodesource.com/setup_lts.x | bash - \ && curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \ && echo "deb https://dl.yarnpkg.com/debian/ stable main" > /etc/apt/sources.list.d/yarn.list \ && apt-get update -y \ - && apt-get install -y nodejs yarn \ + && apt-get install -y --no-install-recommends \ + nodejs \ + yarn \ && apt-get clean -y \ && rm -rf /var/lib/apt/lists/* @@ -21,8 +41,6 @@ RUN pip3 install --upgrade pip RUN pip3 install -r requirements.txt COPY . /app -COPY ./docker/entrypoint.sh /usr/local/bin/ -RUN chmod +x /usr/local/bin/entrypoint.sh ENV FLASK_APP=powerdnsadmin/__init__.py RUN yarn install --pure-lockfile --production \ @@ -31,4 +49,4 @@ RUN yarn install --pure-lockfile --production \ COPY ./docker-test/wait-for-pdns.sh /opt RUN chmod u+x /opt/wait-for-pdns.sh -CMD ["/opt/wait-for-pdns.sh", "/usr/local/bin/pytest","--capture=no","-vv"] +CMD ["/opt/wait-for-pdns.sh", "/usr/local/bin/pytest", "-W", "ignore::DeprecationWarning", "--capture=no", "-vv"] diff --git a/docker-test/start.sh b/docker-test/start.sh index 9a66017..efd1c0e 100644 --- a/docker-test/start.sh +++ b/docker-test/start.sh @@ -10,9 +10,9 @@ fi # Import schema structure if [ -e "/data/pdns.sql" ]; then - rm /data/pdns.db + rm -f /data/pdns.db cat /data/pdns.sql | sqlite3 /data/pdns.db - rm /data/pdns.sql + rm -f /data/pdns.sql echo "Imported schema structure" fi diff --git a/migrations/env.py b/migrations/env.py index 4742e14..8b3fb33 100755 --- a/migrations/env.py +++ b/migrations/env.py @@ -1,8 +1,13 @@ from __future__ import with_statement -from alembic import context -from sqlalchemy import engine_from_config, pool -from logging.config import fileConfig + import logging +from logging.config import fileConfig + +from sqlalchemy import engine_from_config +from sqlalchemy import pool +from flask import current_app + +from alembic import context # this is the Alembic Config object, which provides # access to the values within the .ini file in use. @@ -17,9 +22,9 @@ logger = logging.getLogger('alembic.env') # for 'autogenerate' support # from myapp import mymodel # target_metadata = mymodel.Base.metadata -from flask import current_app -config.set_main_option('sqlalchemy.url', - current_app.config.get('SQLALCHEMY_DATABASE_URI').replace("%","%%")) +config.set_main_option( + 'sqlalchemy.url', + str(current_app.extensions['migrate'].db.engine.url).replace('%', '%%')) target_metadata = current_app.extensions['migrate'].db.metadata # other values from the config, defined by the needs of env.py, @@ -41,7 +46,9 @@ def run_migrations_offline(): """ url = config.get_main_option("sqlalchemy.url") - context.configure(url=url) + context.configure( + url=url, target_metadata=target_metadata, literal_binds=True + ) with context.begin_transaction(): context.run_migrations() @@ -65,22 +72,23 @@ def run_migrations_online(): directives[:] = [] logger.info('No changes in schema detected.') - engine = engine_from_config(config.get_section(config.config_ini_section), - prefix='sqlalchemy.', - poolclass=pool.NullPool) + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix='sqlalchemy.', + poolclass=pool.NullPool, + ) - connection = engine.connect() - context.configure(connection=connection, - target_metadata=target_metadata, - process_revision_directives=process_revision_directives, - render_as_batch=config.get_main_option('sqlalchemy.url').startswith('sqlite:'), - **current_app.extensions['migrate'].configure_args) + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=target_metadata, + process_revision_directives=process_revision_directives, + **current_app.extensions['migrate'].configure_args + ) - try: with context.begin_transaction(): context.run_migrations() - finally: - connection.close() + if context.is_offline_mode(): run_migrations_offline() diff --git a/migrations/versions/787bdba9e147_init_db.py b/migrations/versions/787bdba9e147_init_db.py index aa781de..c4c7aa2 100644 --- a/migrations/versions/787bdba9e147_init_db.py +++ b/migrations/versions/787bdba9e147_init_db.py @@ -56,9 +56,9 @@ def seed_data(): op.bulk_insert(template_table, [ - {id: 1, 'name': 'basic_template_1', 'description': 'Basic Template #1'}, - {id: 2, 'name': 'basic_template_2', 'description': 'Basic Template #2'}, - {id: 3, 'name': 'basic_template_3', 'description': 'Basic Template #3'} + {'id': 1, 'name': 'basic_template_1', 'description': 'Basic Template #1'}, + {'id': 2, 'name': 'basic_template_2', 'description': 'Basic Template #2'}, + {'id': 3, 'name': 'basic_template_3', 'description': 'Basic Template #3'} ] ) diff --git a/powerdnsadmin/decorators.py b/powerdnsadmin/decorators.py index 6382141..50e4a3f 100644 --- a/powerdnsadmin/decorators.py +++ b/powerdnsadmin/decorators.py @@ -388,7 +388,7 @@ def apikey_can_configure_dnssec(http_methods=[]): def allowed_record_types(f): @wraps(f) def decorated_function(*args, **kwargs): - if request.method == 'GET': + if request.method in ['GET', 'DELETE', 'PUT']: return f(*args, **kwargs) if g.apikey.role.name in ['Administrator', 'Operator']: diff --git a/powerdnsadmin/models/role.py b/powerdnsadmin/models/role.py index a5cf530..5440f3d 100644 --- a/powerdnsadmin/models/role.py +++ b/powerdnsadmin/models/role.py @@ -5,7 +5,7 @@ class Role(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), index=True, unique=True) description = db.Column(db.String(128)) - users = db.relationship('User', backref='role', lazy=True) + users = db.relationship('User', back_populates='role', lazy=True) apikeys = db.relationship('ApiKey', back_populates='role', lazy=True) def __init__(self, id=None, name=None, description=None): @@ -20,4 +20,4 @@ class Role(db.Model): self.description = description def __repr__(self): - return ''.format(self.name) + return ''.format(self.name) diff --git a/powerdnsadmin/models/user.py b/powerdnsadmin/models/user.py index 1e39569..78104f6 100644 --- a/powerdnsadmin/models/user.py +++ b/powerdnsadmin/models/user.py @@ -34,6 +34,7 @@ class User(db.Model): otp_secret = db.Column(db.String(16)) confirmed = db.Column(db.SmallInteger, nullable=False, default=0) role_id = db.Column(db.Integer, db.ForeignKey('role.id')) + role = db.relationship('Role', back_populates="users", lazy=True) accounts = None def __init__(self, diff --git a/powerdnsadmin/routes/api.py b/powerdnsadmin/routes/api.py index 3df31e1..4a18c1d 100644 --- a/powerdnsadmin/routes/api.py +++ b/powerdnsadmin/routes/api.py @@ -1,22 +1,21 @@ import json -from urllib.parse import urljoin +import secrets +import string from base64 import b64encode -from flask import ( - Blueprint, g, request, abort, current_app, make_response, jsonify, -) +from urllib.parse import urljoin + +from flask import (Blueprint, g, request, abort, current_app, make_response, jsonify) from flask_login import current_user from .base import csrf -from ..models.base import db -from ..models import ( - User, Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey, - Role, +from ..decorators import ( + api_basic_auth, api_can_create_domain, is_json, apikey_auth, + apikey_can_create_domain, apikey_can_remove_domain, + apikey_is_admin, apikey_can_access_domain, apikey_can_configure_dnssec, + api_role_can, apikey_or_basic_auth, + callback_if_request_body_contains_key, allowed_record_types, allowed_record_ttl ) from ..lib import utils, helper -from ..lib.schema import ( - ApiKeySchema, DomainSchema, ApiPlainKeySchema, UserSchema, AccountSchema, - UserDetailedSchema, -) from ..lib.errors import ( StructuredException, DomainNotExists, DomainAlreadyExists, DomainAccessForbidden, @@ -26,15 +25,15 @@ from ..lib.errors import ( UserCreateFail, UserCreateDuplicate, UserUpdateFail, UserDeleteFail, UserUpdateFailEmail, InvalidAccountNameException ) -from ..decorators import ( - api_basic_auth, api_can_create_domain, is_json, apikey_auth, - apikey_can_create_domain, apikey_can_remove_domain, - apikey_is_admin, apikey_can_access_domain, apikey_can_configure_dnssec, - api_role_can, apikey_or_basic_auth, - callback_if_request_body_contains_key, allowed_record_types, allowed_record_ttl +from ..lib.schema import ( + ApiKeySchema, DomainSchema, ApiPlainKeySchema, UserSchema, AccountSchema, + UserDetailedSchema, ) -import secrets -import string +from ..models import ( + User, Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey, + Role, +) +from ..models.base import db api_bp = Blueprint('api', __name__, url_prefix='/api/v1') apilist_bp = Blueprint('apilist', __name__, url_prefix='/') @@ -56,10 +55,10 @@ def get_user_domains(): .outerjoin(Account, Domain.account_id == Account.id) \ .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ .filter( - db.or_( - DomainUser.user_id == current_user.id, - AccountUser.user_id == current_user.id - )).all() + db.or_( + DomainUser.user_id == current_user.id, + AccountUser.user_id == current_user.id + )).all() return domains @@ -71,10 +70,10 @@ def get_user_apikeys(domain_name=None): .outerjoin(Account, Domain.account_id == Account.id) \ .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ .filter( - db.or_( - DomainUser.user_id == User.id, - AccountUser.user_id == User.id - ) + db.or_( + DomainUser.user_id == User.id, + AccountUser.user_id == User.id + ) ) \ .filter(User.id == current_user.id) @@ -167,12 +166,7 @@ def handle_request_is_not_json(err): def before_request(): # Check site is in maintenance mode maintenance = Setting().get('maintenance') - if ( - maintenance and current_user.is_authenticated and - current_user.role.name not in [ - 'Administrator', 'Operator' - ] - ): + if (maintenance and current_user.is_authenticated and current_user.role.name not in ['Administrator', 'Operator']): return make_response( jsonify({ "status": False, @@ -224,14 +218,13 @@ def api_login_create_zone(): history = History(msg='Add domain {0}'.format( data['name'].rstrip('.')), - detail=json.dumps(data), - created_by=current_user.username, - domain_id=domain_id) + detail=json.dumps(data), + created_by=current_user.username, + domain_id=domain_id) history.add() if current_user.role.name not in ['Administrator', 'Operator']: - current_app.logger.debug( - "User is ordinary user, assigning created domain") + current_app.logger.debug("User is ordinary user, assigning created domain") domain = Domain(name=data['name'].rstrip('.')) domain.update() domain.grant_privileges([current_user.id]) @@ -299,9 +292,9 @@ def api_login_delete_zone(domain_name): history = History(msg='Delete domain {0}'.format( utils.pretty_domain_name(domain_name)), - detail='', - created_by=current_user.username, - domain_id=domain_id) + detail='', + created_by=current_user.username, + domain_id=domain_id) history.add() except Exception as e: @@ -326,14 +319,14 @@ def api_generate_apikey(): if 'domains' not in data: domains = [] - elif not isinstance(data['domains'], (list, )): + elif not isinstance(data['domains'], (list,)): abort(400) else: domains = [d['name'] if isinstance(d, dict) else d for d in data['domains']] if 'accounts' not in data: accounts = [] - elif not isinstance(data['accounts'], (list, )): + elif not isinstance(data['accounts'], (list,)): abort(400) else: accounts = [a['name'] if isinstance(a, dict) else a for a in data['accounts']] @@ -385,8 +378,7 @@ def api_generate_apikey(): user_domain_list = [item.name for item in user_domain_obj_list] current_app.logger.debug("Input domain list: {0}".format(domain_list)) - current_app.logger.debug( - "User domain list: {0}".format(user_domain_list)) + current_app.logger.debug("User domain list: {0}".format(user_domain_list)) inter = set(domain_list).intersection(set(user_domain_list)) @@ -539,14 +531,14 @@ def api_update_apikey(apikey_id): if 'domains' not in data: domains = None - elif not isinstance(data['domains'], (list, )): + elif not isinstance(data['domains'], (list,)): abort(400) else: domains = [d['name'] if isinstance(d, dict) else d for d in data['domains']] if 'accounts' not in data: accounts = None - elif not isinstance(data['accounts'], (list, )): + elif not isinstance(data['accounts'], (list,)): abort(400) else: accounts = [a['name'] if isinstance(a, dict) else a for a in data['accounts']] @@ -963,9 +955,7 @@ def api_delete_account(account_id): account = account_list[0] else: abort(404) - current_app.logger.debug( - f'Deleting Account {account.name}' - ) + current_app.logger.debug(f'Deleting Account {account.name}') # Remove account association from domains first if len(account.domains) > 0: @@ -1047,7 +1037,7 @@ def api_remove_account_user(account_id, user_id): user_list = User.query.join(AccountUser).filter( AccountUser.account_id == account_id, AccountUser.user_id == user_id, - ).all() + ).all() if not user_list: abort(404) if not account.remove_user(user): @@ -1123,9 +1113,9 @@ def api_zone_forward(server_id, zone_id): history = History(msg='{0} zone {1} record of {2}'.format( rrset_data['changetype'].lower(), rrset_data['type'], rrset_data['name'].rstrip('.')), - detail=json.dumps(data), - created_by=g.apikey.description, - domain_id=Domain().get_id_by_name(zone_id.rstrip('.'))) + detail=json.dumps(data), + created_by=g.apikey.description, + domain_id=Domain().get_id_by_name(zone_id.rstrip('.'))) history.add() elif request.method == 'DELETE': history = History(msg='Deleted zone {0}'.format(zone_id.rstrip('.')), @@ -1192,17 +1182,13 @@ def api_get_zones(server_id): return jsonify(domain_schema.dump(domain_obj_list)), 200 else: resp = helper.forward_request() - if ( - g.apikey.role.name not in ['Administrator', 'Operator'] - and resp.status_code == 200 - ): + if (g.apikey.role.name not in ['Administrator', 'Operator'] and resp.status_code == 200): domain_list = [d['name'] for d in domain_schema.dump(g.apikey.domains)] accounts_domains = [d.name for a in g.apikey.accounts for d in a.domains] allowed_domains = set(domain_list + accounts_domains) - current_app.logger.debug("Account domains: {}".format( - '/'.join(accounts_domains))) + current_app.logger.debug("Account domains: {}".format('/'.join(accounts_domains))) content = json.dumps([i for i in json.loads(resp.content) if i['name'].rstrip('.') in allowed_domains]) return content, resp.status_code, resp.headers.items() @@ -1223,6 +1209,7 @@ def api_server_config_forward(server_id): resp = helper.forward_request() return resp.content, resp.status_code, resp.headers.items() + # The endpoint to synchronize Domains in background @api_bp.route('/sync_domains', methods=['GET']) @apikey_or_basic_auth @@ -1231,6 +1218,7 @@ def sync_domains(): domain.update() return 'Finished synchronization in background', 200 + @api_bp.route('/health', methods=['GET']) @apikey_auth def health(): @@ -1244,7 +1232,8 @@ def health(): try: domain.get_domain_info(domain_to_query.name) except Exception as e: - current_app.logger.error("Health Check - Failed to query authoritative server for domain {}".format(domain_to_query.name)) + current_app.logger.error( + "Health Check - Failed to query authoritative server for domain {}".format(domain_to_query.name)) return make_response("Down", 503) return make_response("Up", 200) diff --git a/requirements.txt b/requirements.txt index 1fc2864..11b1421 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,33 +1,41 @@ -Flask==1.1.2 -Flask-Assets==2.0 -Flask-Login==0.5.0 -Flask-SQLAlchemy==2.4.4 -Flask-Migrate==2.5.3 -SQLAlchemy==1.3.19 -mysqlclient==2.0.1 +mysqlclient==2.1.1 configobj==5.0.6 -bcrypt>=3.1.7 -requests==2.24.0 -python-ldap==3.4.2 -pyotp==2.4.0 -qrcode==6.1 +bcrypt==4.0.1 +requests==2.28.1 +python-ldap==3.4.3 +pyotp==2.8.0 +qrcode==7.3.1 dnspython>=1.16.0 -gunicorn==20.0.4 -python3-saml -pytz==2020.1 +gunicorn==20.1.0 +python3-saml==1.14.0 +pytz==2022.7 cssmin==0.2.0 -rjsmin==1.2.0 -Authlib==0.15 +rjsmin==1.2.1 +Authlib==1.2.0 Flask-SeaSurf==1.1.1 -bravado-core==5.17.0 +bravado-core==5.17.1 +jsonschema[format]>=2.5.1,<4.0.0 # until https://github.com/Yelp/bravado-core/pull/385 lima==0.5 -pytest==6.1.1 pytimeparse==1.1.8 -PyYAML==5.4 -Flask-SSLify==0.1.5 +alembic==1.9.0 +certifi==2022.12.7 +cffi==1.15.1 +passlib==1.7.4 +pyasn1==0.4.8 +webcolors==1.12 +zipp==3.11.0 +Flask==2.2.2 +Flask-Assets==2.0 +Flask-Login==0.6.2 +Flask-SQLAlchemy==3.0.2 +Flask-Migrate==4.0.0 +SQLAlchemy==1.4.45 +pyOpenSSL==22.1.0 +Authlib==1.2.0 +Flask-SeaSurf==1.1.1 +PyYAML==6.0 Flask-Mail==0.9.1 -flask-session==0.3.2 -Jinja2==3.0.3 -itsdangerous==2.0.1 -werkzeug==2.0.3 -cryptography==36.0.2 +Flask-SSLify==0.1.5 +Flask-Session==0.4.0 +lxml==4.6.5 +pytest==7.2.0 diff --git a/tests/fixtures.py b/tests/conftest.py similarity index 50% rename from tests/fixtures.py rename to tests/conftest.py index 39b6a70..f5de9e8 100644 --- a/tests/fixtures.py +++ b/tests/conftest.py @@ -1,26 +1,28 @@ import os -import pytest -import flask_migrate from base64 import b64encode -from powerdnsadmin import create_app -from powerdnsadmin.models.base import db -from powerdnsadmin.models.user import User -from powerdnsadmin.models.setting import Setting -from powerdnsadmin.models.api_key import ApiKey +import pytest +from flask_migrate import upgrade as flask_migrate_upgrade -app = create_app('../configs/test.py') -ctx = app.app_context() -ctx.push() +from powerdnsadmin import create_app +from powerdnsadmin.models.api_key import ApiKey +from powerdnsadmin.models.base import db +from powerdnsadmin.models.setting import Setting +from powerdnsadmin.models.user import User + + +@pytest.fixture(scope="session") +def app(): + app = create_app('../configs/test.py') + yield app @pytest.fixture -def client(): +def client(app): app.config['TESTING'] = True client = app.test_client() yield client - def load_data(setting_name, *args, **kwargs): if setting_name == 'maintenance': return 0 @@ -36,20 +38,22 @@ def load_data(setting_name, *args, **kwargs): return 10 if setting_name == 'allow_user_create_domain': return True + if setting_name == 'allow_user_remove_domain': + return True @pytest.fixture -def test_admin_user(): +def test_admin_user(app): return app.config.get('TEST_ADMIN_USER') @pytest.fixture -def test_user(): +def test_user(app): return app.config.get('TEST_USER') @pytest.fixture -def basic_auth_admin_headers(): +def basic_auth_admin_headers(app): test_admin_user = app.config.get('TEST_ADMIN_USER') test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD') user_pass = "{0}:{1}".format(test_admin_user, test_admin_pass) @@ -61,7 +65,7 @@ def basic_auth_admin_headers(): @pytest.fixture -def basic_auth_user_headers(): +def basic_auth_user_headers(app): test_user = app.config.get('TEST_USER') test_user_pass = app.config.get('TEST_USER_PASSWORD') user_pass = "{0}:{1}".format(test_user, test_user_pass) @@ -73,7 +77,8 @@ def basic_auth_user_headers(): @pytest.fixture(scope="module") -def initial_data(): +def initial_data(app): + pdns_proto = os.environ['PDNS_PROTO'] pdns_host = os.environ['PDNS_HOST'] pdns_port = os.environ['PDNS_PORT'] @@ -83,46 +88,44 @@ def initial_data(): api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY']) allow_create_domain_setting = Setting('allow_user_create_domain', True) - try: - flask_migrate.upgrade() + with app.app_context(): + try: + flask_migrate_upgrade(directory="migrations") + db.session.add(api_url_setting) + db.session.add(api_key_setting) + db.session.add(allow_create_domain_setting) - db.session.add(api_url_setting) - db.session.add(api_key_setting) - db.session.add(allow_create_domain_setting) + test_user = app.config.get('TEST_USER') + test_user_pass = app.config.get('TEST_USER_PASSWORD') + test_admin_user = app.config.get('TEST_ADMIN_USER') + test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD') - test_user = app.config.get('TEST_USER') - test_user_pass = app.config.get('TEST_USER_PASSWORD') - test_admin_user = app.config.get('TEST_ADMIN_USER') - test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD') + admin_user = User(username=test_admin_user, + plain_text_password=test_admin_pass, + email="admin@admin.com") + ret = admin_user.create_local_user() - admin_user = User(username=test_admin_user, - plain_text_password=test_admin_pass, - email="admin@admin.com") - msg = admin_user.create_local_user() + if not ret['status']: + raise Exception("Error occurred creating user {0}".format(ret['msg'])) - if not msg: - raise Exception("Error occurred creating user {0}".format(msg)) + ordinary_user = User(username=test_user, + plain_text_password=test_user_pass, + email="test@test.com") + ret = ordinary_user.create_local_user() - ordinary_user = User(username=test_user, - plain_text_password=test_user_pass, - email="test@test.com") - msg = ordinary_user.create_local_user() + if not ret['status']: + raise Exception("Error occurred creating user {0}".format(ret['msg'])) - if not msg: - raise Exception("Error occurred creating user {0}".format(msg)) - - except Exception as e: - print("Unexpected ERROR: {0}".format(e)) - raise e + except Exception as e: + print("Unexpected ERROR: {0}".format(e)) + raise e yield - - db.session.close() os.unlink(app.config['TEST_DB_LOCATION']) @pytest.fixture(scope="module") -def initial_apikey_data(): +def initial_apikey_data(app): pdns_proto = os.environ['PDNS_PROTO'] pdns_host = os.environ['PDNS_HOST'] pdns_port = os.environ['PDNS_PORT'] @@ -131,42 +134,42 @@ def initial_apikey_data(): api_url_setting = Setting('pdns_api_url', pdns_api_url) api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY']) allow_create_domain_setting = Setting('allow_user_create_domain', True) + allow_remove_domain_setting = Setting('allow_user_remove_domain', True) - try: - flask_migrate.upgrade() + with app.app_context(): + try: + flask_migrate_upgrade(directory="migrations") + db.session.add(api_url_setting) + db.session.add(api_key_setting) + db.session.add(allow_create_domain_setting) + db.session.add(allow_remove_domain_setting) - db.session.add(api_url_setting) - db.session.add(api_key_setting) - db.session.add(allow_create_domain_setting) + test_user_apikey = app.config.get('TEST_USER_APIKEY') + test_admin_apikey = app.config.get('TEST_ADMIN_APIKEY') - test_user_apikey = app.config.get('TEST_USER_APIKEY') - test_admin_apikey = app.config.get('TEST_ADMIN_APIKEY') + dummy_apikey = ApiKey(desc="dummy", role_name="Administrator") - dummy_apikey = ApiKey(desc="dummy", role_name="Administrator") + admin_key = dummy_apikey.get_hashed_password( + plain_text_password=test_admin_apikey).decode('utf-8') - admin_key = dummy_apikey.get_hashed_password( - plain_text_password=test_admin_apikey).decode('utf-8') + admin_apikey = ApiKey(key=admin_key, + desc="test admin apikey", + role_name="Administrator") + admin_apikey.create() - admin_apikey = ApiKey(key=admin_key, - desc="test admin apikey", - role_name="Administrator") - admin_apikey.create() + user_key = dummy_apikey.get_hashed_password( + plain_text_password=test_user_apikey).decode('utf-8') - user_key = dummy_apikey.get_hashed_password( - plain_text_password=test_user_apikey).decode('utf-8') + user_apikey = ApiKey(key=user_key, + desc="test user apikey", + role_name="User") + user_apikey.create() - user_apikey = ApiKey(key=user_key, - desc="test user apikey", - role_name="User") - user_apikey.create() - - except Exception as e: - print("Unexpected ERROR: {0}".format(e)) - raise e + except Exception as e: + print("Unexpected ERROR: {0}".format(e)) + raise e yield - - db.session.close() os.unlink(app.config['TEST_DB_LOCATION']) @@ -183,61 +186,51 @@ def zone_data(): @pytest.fixture def created_zone_data(): data = { - 'url': - '/api/v1/servers/localhost/zones/example.org.', - 'soa_edit_api': - 'DEFAULT', - 'last_check': - 0, + 'url': '/api/v1/servers/localhost/zones/example.org.', + 'soa_edit_api': 'DEFAULT', + 'last_check': 0, 'masters': [], - 'dnssec': - False, - 'notified_serial': - 0, - 'nsec3narrow': - False, - 'serial': - 2019013101, - 'nsec3param': - '', - 'soa_edit': - '', - 'api_rectify': - False, - 'kind': - 'Native', + 'dnssec': False, + 'notified_serial': 0, + 'nsec3narrow': False, + 'serial': 2019013101, + 'nsec3param': '', + 'soa_edit': '', + 'api_rectify': False, + 'kind': 'Native', 'rrsets': [{ 'comments': [], - 'type': - 'SOA', - 'name': - 'example.org.', - 'ttl': - 3600, + 'type': 'SOA', + 'name': 'example.org.', + 'ttl': 3600, 'records': [{ - 'content': - 'a.misconfigured.powerdns.server. hostmaster.example.org. 2019013101 10800 3600 604800 3600', + 'content': 'a.misconfigured.powerdns.server. hostmaster.example.org. 2019013101 10800 3600 604800 3600', 'disabled': False }] }, { 'comments': [], - 'type': - 'NS', - 'name': - 'example.org.', - 'ttl': - 3600, + 'type': 'NS', + 'name': 'example.org.', + 'ttl': 3600, 'records': [{ 'content': 'ns1.example.org.', 'disabled': False }] }], - 'name': - 'example.org.', - 'account': - '', - 'id': - 'example.org.' + 'name': 'example.org.', + 'account': '', + 'id': 'example.org.' + } + return data + + +def user_data(app): + test_user = app.config.get('TEST_USER') + test_user_pass = app.config.get('TEST_USER_PASSWORD') + data = { + "username": test_user, + "plain_text_password": test_user_pass, + "email": "test@test.com" } return data @@ -257,37 +250,39 @@ def admin_apikey_data(): @pytest.fixture(scope='module') -def user_apikey_integration(): +def user_apikey_integration(app): test_user_apikey = app.config.get('TEST_USER_APIKEY') headers = create_apikey_headers(test_user_apikey) return headers @pytest.fixture(scope='module') -def admin_apikey_integration(): +def admin_apikey_integration(app): test_user_apikey = app.config.get('TEST_ADMIN_APIKEY') headers = create_apikey_headers(test_user_apikey) return headers @pytest.fixture(scope='module') -def user_apikey(): - data = user_apikey_data() - api_key = ApiKey(desc=data['description'], - role_name=data['role'], - domains=[]) - headers = create_apikey_headers(api_key.plain_key) - return headers +def user_apikey(app): + with app.app_context(): + data = user_apikey_data() + api_key = ApiKey(desc=data['description'], + role_name=data['role'], + domains=[]) + headers = create_apikey_headers(api_key.plain_key) + return headers @pytest.fixture(scope='module') -def admin_apikey(): - data = admin_apikey_data() - api_key = ApiKey(desc=data['description'], - role_name=data['role'], - domains=[]) - headers = create_apikey_headers(api_key.plain_key) - return headers +def admin_apikey(app): + with app.app_context(): + data = admin_apikey_data() + api_key = ApiKey(desc=data['description'], + role_name=data['role'], + domains=[]) + headers = create_apikey_headers(api_key.plain_key) + return headers def create_apikey_headers(passw): diff --git a/tests/integration/api/apikey/test_admin_user.py b/tests/integration/api/apikey/test_admin_user.py index d8d8f29..3bc0c82 100644 --- a/tests/integration/api/apikey/test_admin_user.py +++ b/tests/integration/api/apikey/test_admin_user.py @@ -4,12 +4,11 @@ from collections import namedtuple from powerdnsadmin.lib.validators import validate_apikey from powerdnsadmin.lib.schema import ApiKeySchema -from tests.fixtures import client, initial_data, basic_auth_admin_headers -from tests.fixtures import user_apikey_data, admin_apikey_data, zone_data +from tests.conftest import user_apikey_data, admin_apikey_data class TestIntegrationApiApiKeyAdminUser(object): - def test_empty_get(self, client, initial_data, basic_auth_admin_headers): + def test_empty_get(self, initial_data, client, basic_auth_admin_headers): res = client.get("/api/v1/pdnsadmin/apikeys", headers=basic_auth_admin_headers) data = res.get_json(force=True) @@ -19,7 +18,7 @@ class TestIntegrationApiApiKeyAdminUser(object): @pytest.mark.parametrize( "apikey_data", [user_apikey_data(), admin_apikey_data()]) - def test_create_apikey(self, client, initial_data, apikey_data, zone_data, + def test_create_apikey(self, initial_data, client, apikey_data, zone_data, basic_auth_admin_headers): res = client.post("/api/v1/pdnsadmin/zones", headers=basic_auth_admin_headers, @@ -39,7 +38,7 @@ class TestIntegrationApiApiKeyAdminUser(object): assert res.status_code == 201 apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}" - apikey_url = apikey_url_format.format(data[0]['id']) + apikey_url = apikey_url_format.format(data['id']) res = client.delete(apikey_url, headers=basic_auth_admin_headers) @@ -54,7 +53,7 @@ class TestIntegrationApiApiKeyAdminUser(object): @pytest.mark.parametrize( "apikey_data", [user_apikey_data(), admin_apikey_data()]) - def test_get_multiple_apikey(self, client, initial_data, apikey_data, + def test_get_multiple_apikey(self, initial_data, client, apikey_data, zone_data, basic_auth_admin_headers): res = client.post("/api/v1/pdnsadmin/zones", headers=basic_auth_admin_headers, @@ -103,7 +102,7 @@ class TestIntegrationApiApiKeyAdminUser(object): @pytest.mark.parametrize( "apikey_data", [user_apikey_data(), admin_apikey_data()]) - def test_delete_apikey(self, client, initial_data, apikey_data, zone_data, + def test_delete_apikey(self, initial_data, client, apikey_data, zone_data, basic_auth_admin_headers): res = client.post("/api/v1/pdnsadmin/zones", headers=basic_auth_admin_headers, @@ -123,7 +122,7 @@ class TestIntegrationApiApiKeyAdminUser(object): assert res.status_code == 201 apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}" - apikey_url = apikey_url_format.format(data[0]['id']) + apikey_url = apikey_url_format.format(data['id']) res = client.delete(apikey_url, headers=basic_auth_admin_headers) assert res.status_code == 204 diff --git a/tests/integration/api/apikey/test_user.py b/tests/integration/api/apikey/test_user.py index 17fc970..34fdd86 100644 --- a/tests/integration/api/apikey/test_user.py +++ b/tests/integration/api/apikey/test_user.py @@ -1,14 +1,11 @@ -import pytest import json from collections import namedtuple from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client, initial_data, basic_auth_user_headers -from tests.fixtures import zone_data -class TestIntegrationApiZoneUser(object): +class TestIntegrationApiApiKeyUser(object): def test_empty_get(self, initial_data, client, basic_auth_user_headers): res = client.get("/api/v1/pdnsadmin/zones", headers=basic_auth_user_headers) diff --git a/tests/integration/api/management/__init__.py b/tests/integration/api/management/__init__.py index b546f46..af748be 100644 --- a/tests/integration/api/management/__init__.py +++ b/tests/integration/api/management/__init__.py @@ -14,8 +14,10 @@ class IntegrationApiManagement(object): assert res.status_code == status_code if res.status_code == 200: data = res.get_json(force=True) - assert len(data) == 1 - return data[0] + assert isinstance(data, dict) + assert len(data) == 7 + assert data.get('id', None) + return data return None def check_account(self, cmpdata, data=None): @@ -37,8 +39,10 @@ class IntegrationApiManagement(object): assert res.status_code == status_code if status_code == 200: data = res.get_json(force=True) - assert len(data) == 1 - return data[0] + assert isinstance(data, dict) + assert len(data) == 7 + assert data.get('id', None) + return data return None def check_user(self, cmpdata, data=None): @@ -50,5 +54,5 @@ class IntegrationApiManagement(object): elif key == 'role': assert data[key]['name'] == cmpdata['role_name'] else: - assert key in ("id",) + assert key in ("id","accounts",) return data diff --git a/tests/integration/api/management/test_admin_user.py b/tests/integration/api/management/test_admin_user.py index 3df0384..b017320 100644 --- a/tests/integration/api/management/test_admin_user.py +++ b/tests/integration/api/management/test_admin_user.py @@ -1,9 +1,5 @@ - import json -from tests.fixtures import ( # noqa: F401 - client, initial_data, basic_auth_admin_headers, - test_admin_user, test_user, account_data, user1_data, -) + from . import IntegrationApiManagement @@ -89,8 +85,9 @@ class TestIntegrationApiManagementAdminUser(IntegrationApiManagement): ) data = res.get_json(force=True) assert res.status_code == 200 - assert len(data) == 1 - data = data[0] + assert isinstance(data, dict) + assert len(data) == 7 + assert data.get('id', None) account_id = data["id"] for key, value in account_data.items(): assert data[key] == value @@ -142,10 +139,12 @@ class TestIntegrationApiManagementAdminUser(IntegrationApiManagement): ) data = res.get_json(force=True) assert res.status_code == 201 - assert len(data) == 1 + assert isinstance(data, dict) + assert len(data) == 6 + assert data.get('id', None) # Check user - user1 = self.check_user(user1_data, data[0]) + user1 = self.check_user(user1_data, data) user1_id = user1["id"] updated = user1_data.copy() @@ -240,10 +239,12 @@ class TestIntegrationApiManagementAdminUser(IntegrationApiManagement): ) data = res.get_json(force=True) assert res.status_code == 201 - assert len(data) == 1 + assert isinstance(data, dict) + assert len(data) == 6 + assert data.get('id', None) # Check user - user1 = self.check_user(user1_data, data[0]) + user1 = self.check_user(user1_data, data) user1_id = user1["id"] # Assert test account has no users diff --git a/tests/integration/api/management/test_user.py b/tests/integration/api/management/test_user.py index b56652e..65a5869 100644 --- a/tests/integration/api/management/test_user.py +++ b/tests/integration/api/management/test_user.py @@ -1,66 +1,61 @@ - import json -from tests.fixtures import ( # noqa: F401 - client, initial_data, basic_auth_admin_headers, basic_auth_user_headers, - test_admin_user, test_user, account_data, user1_data, -) from . import IntegrationApiManagement class TestIntegrationApiManagementUser(IntegrationApiManagement): - def test_accounts_empty_get( - self, client, initial_data, # noqa: F811 - basic_auth_user_headers): # noqa: F811 + def test_accounts_empty_get(self, initial_data, client, # noqa: F811 + basic_auth_user_headers): # noqa: F811 res = client.get("/api/v1/pdnsadmin/accounts", headers=basic_auth_user_headers) assert res.status_code == 401 - def test_users_empty_get( - self, client, initial_data, # noqa: F811 - test_admin_user, test_user, # noqa: F811 - basic_auth_user_headers): # noqa: F811 + def test_users_empty_get(self, initial_data, client, # noqa: F811 + test_admin_user, test_user, # noqa: F811 + basic_auth_user_headers): # noqa: F811 res = client.get("/api/v1/pdnsadmin/users", headers=basic_auth_user_headers) assert res.status_code == 401 - def test_self_get( - self, initial_data, client, test_user, # noqa: F811 - basic_auth_user_headers): # noqa: F811 - self.user = None + def test_self_get(self, initial_data, client, basic_auth_user_headers, test_user): # noqa: F811 res = client.get("/api/v1/pdnsadmin/users/{}".format(test_user), headers=basic_auth_user_headers) data = res.get_json(force=True) assert res.status_code == 200 - assert len(data) == 1, data - self.user = data + assert data - def test_accounts( - self, client, initial_data, # noqa: F811 - account_data, # noqa: F811 - basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811 + def test_create_account_fail(self, client, initial_data, account_data, # noqa: F811 + basic_auth_user_headers): # noqa: F811 + + # Create account (should fail) + res = client.post("/api/v1/pdnsadmin/accounts", + headers=basic_auth_user_headers, + data=json.dumps(account_data), + content_type="application/json") + assert res.status_code == 401 + + def test_create_account_as_admin(self, app, initial_data, client, account_data, # noqa: F811 + basic_auth_admin_headers): # noqa: F811 self.client = client self.basic_auth_admin_headers = basic_auth_admin_headers - # Create account (should fail) - res = client.post( - "/api/v1/pdnsadmin/accounts", - headers=basic_auth_user_headers, - data=json.dumps(account_data), - content_type="application/json", - ) - assert res.status_code == 401 + with app.test_request_context(): + # Create account (as admin) + res = client.post("/api/v1/pdnsadmin/accounts", + headers=basic_auth_admin_headers, + data=json.dumps(account_data), + content_type="application/json") + data = res.get_json(force=True) + assert res.status_code == 201 - # Create account (as admin) - res = client.post( - "/api/v1/pdnsadmin/accounts", - headers=basic_auth_admin_headers, - data=json.dumps(account_data), - content_type="application/json", - ) - data = res.get_json(force=True) - assert res.status_code == 201 + def test_update_account_fail( + self, initial_data, client, # noqa: F811 + account_data, # noqa: F811 + basic_auth_user_headers, + basic_auth_admin_headers): # noqa: F811 + self.client = client + self.basic_auth_admin_headers = basic_auth_admin_headers # Check account data = self.check_account(account_data) @@ -75,6 +70,18 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement): ) assert res.status_code == 401 + def test_delete_account_fail( + self, initial_data, client, # noqa: F811 + account_data, # noqa: F811 + basic_auth_user_headers, + basic_auth_admin_headers): # noqa: F811 + self.client = client + self.basic_auth_admin_headers = basic_auth_admin_headers + + # Check account + data = self.check_account(account_data) + account_id = data["id"] + # Delete account (should fail) res = client.delete( "/api/v1/pdnsadmin/accounts/{}".format(account_id), @@ -84,6 +91,17 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement): ) assert res.status_code == 401 + def test_delete_account_as_admin( + self, client, initial_data, # noqa: F811 + account_data, # noqa: F811 + basic_auth_admin_headers): # noqa: F811 + self.client = client + self.basic_auth_admin_headers = basic_auth_admin_headers + + # Check account + data = self.check_account(account_data) + account_id = data["id"] + # Cleanup (delete account as admin) res = client.delete( "/api/v1/pdnsadmin/accounts/{}".format(account_id), @@ -94,8 +112,8 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement): assert res.status_code == 204 def test_users( - self, client, initial_data, # noqa: F811 - user1_data, # noqa: F811 + self, client, initial_data, # noqa: F811 + user1_data, # noqa: F811 basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811 self.client = client self.basic_auth_admin_headers = basic_auth_admin_headers @@ -118,10 +136,12 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement): ) data = res.get_json(force=True) assert res.status_code == 201 - assert len(data) == 1 + assert isinstance(data, dict) + assert len(data) == 6 + assert data.get('id', None) # Check user - user1 = self.check_user(user1_data, data[0]) + user1 = self.check_user(user1_data, data) user1_id = user1["id"] # Update to defaults (should fail) @@ -152,8 +172,8 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement): assert res.status_code == 204 def test_account_users( - self, client, initial_data, # noqa: F811 - account_data, user1_data, # noqa: F811 + self, client, initial_data, # noqa: F811 + account_data, user1_data, # noqa: F811 basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811 self.client = client self.basic_auth_admin_headers = basic_auth_admin_headers @@ -181,10 +201,12 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement): ) data = res.get_json(force=True) assert res.status_code == 201 - assert len(data) == 1 + assert isinstance(data, dict) + assert len(data) == 6 + assert data.get('id', None) # Check user - user1 = self.check_user(user1_data, data[0]) + user1 = self.check_user(user1_data, data) user1_id = user1["id"] # Assert test account has no users diff --git a/tests/integration/api/zone/test_admin_user.py b/tests/integration/api/zone/test_admin_user.py index 2aa9c44..f1c12a7 100644 --- a/tests/integration/api/zone/test_admin_user.py +++ b/tests/integration/api/zone/test_admin_user.py @@ -1,11 +1,8 @@ -import pytest import json from collections import namedtuple from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client, initial_data, basic_auth_admin_headers -from tests.fixtures import zone_data class TestIntegrationApiZoneAdminUser(object): diff --git a/tests/integration/api/zone/test_apikey_admin_user.py b/tests/integration/api/zone/test_apikey_admin_user.py index 9090ed6..494366f 100644 --- a/tests/integration/api/zone/test_apikey_admin_user.py +++ b/tests/integration/api/zone/test_apikey_admin_user.py @@ -3,9 +3,6 @@ from collections import namedtuple from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client -from tests.fixtures import zone_data, initial_apikey_data -from tests.fixtures import admin_apikey_integration class TestIntegrationApiZoneAdminApiKey(object): diff --git a/tests/integration/api/zone/test_apikey_user.py b/tests/integration/api/zone/test_apikey_user.py index b0c96bc..94ddac1 100644 --- a/tests/integration/api/zone/test_apikey_user.py +++ b/tests/integration/api/zone/test_apikey_user.py @@ -3,9 +3,6 @@ from collections import namedtuple from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client -from tests.fixtures import zone_data, initial_apikey_data -from tests.fixtures import user_apikey_integration class TestIntegrationApiZoneUserApiKey(object): diff --git a/tests/integration/api/zone/test_user.py b/tests/integration/api/zone/test_user.py index 7392521..1d0751d 100644 --- a/tests/integration/api/zone/test_user.py +++ b/tests/integration/api/zone/test_user.py @@ -3,8 +3,6 @@ from collections import namedtuple from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client, initial_data, basic_auth_user_headers -from tests.fixtures import zone_data class TestIntegrationApiZoneUser(object): diff --git a/tests/unit/apikey/__init__.py b/tests/unit/apikey/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/unit/test_decorators.py b/tests/unit/test_decorators.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/unit/zone/test_admin_apikey.py b/tests/unit/zone/test_admin_apikey.py index 6f3a9c3..ccef69c 100644 --- a/tests/unit/zone/test_admin_apikey.py +++ b/tests/unit/zone/test_admin_apikey.py @@ -2,9 +2,6 @@ import json import pytest from unittest.mock import patch from collections import namedtuple -import sys -import os -sys.path.append(os.getcwd()) import powerdnsadmin from powerdnsadmin.models.setting import Setting @@ -12,79 +9,108 @@ from powerdnsadmin.models.domain import Domain from powerdnsadmin.models.api_key import ApiKey from powerdnsadmin.models.role import Role from powerdnsadmin.lib.validators import validate_zone -from powerdnsadmin.lib.schema import DomainSchema, ApiKeySchema -from tests.fixtures import client, initial_data, created_zone_data -from tests.fixtures import user_apikey, admin_apikey, zone_data -from tests.fixtures import admin_apikey_data, load_data +from powerdnsadmin.lib.schema import DomainSchema +from tests.conftest import admin_apikey_data, load_data class TestUnitApiZoneAdminApiKey(object): @pytest.fixture - def common_data_mock(self): - self.google_setting_patcher = patch( - 'powerdnsadmin.services.google.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.github_setting_patcher = patch( - 'powerdnsadmin.services.github.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.oidc_setting_patcher = patch( - 'powerdnsadmin.services.oidc.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.helpers_setting_patcher = patch( - 'powerdnsadmin.lib.helper.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.models_setting_patcher = patch( - 'powerdnsadmin.models.setting.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.domain_model_setting_patcher = patch( - 'powerdnsadmin.models.domain.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.record_model_setting_patcher = patch( - 'powerdnsadmin.models.record.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.server_model_setting_patcher = patch( - 'powerdnsadmin.models.server.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.mock_apikey_patcher = patch( - 'powerdnsadmin.decorators.ApiKey', - spec=powerdnsadmin.models.api_key.ApiKey) - self.mock_hist_patcher = patch( - 'powerdnsadmin.routes.api.History', - spec=powerdnsadmin.models.history.History) - self.mock_setting_patcher = patch( - 'powerdnsadmin.routes.api.Setting', - spec=powerdnsadmin.models.setting.Setting) + def common_data_mock(self, app): + with app.app_context(): + self.google_setting_patcher = patch( + 'powerdnsadmin.services.google.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.github_setting_patcher = patch( + 'powerdnsadmin.services.github.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.azure_setting_patcher = patch( + 'powerdnsadmin.services.azure.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.oidc_setting_patcher = patch( + 'powerdnsadmin.services.oidc.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.helpers_setting_patcher = patch( + 'powerdnsadmin.lib.helper.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.models_setting_patcher = patch( + 'powerdnsadmin.models.setting.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.domain_model_setting_patcher = patch( + 'powerdnsadmin.models.domain.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.record_model_setting_patcher = patch( + 'powerdnsadmin.models.record.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.server_model_setting_patcher = patch( + 'powerdnsadmin.models.server.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.mock_apikey_patcher = patch( + 'powerdnsadmin.decorators.ApiKey', + spec=powerdnsadmin.models.api_key.ApiKey) + self.mock_hist_patcher = patch( + 'powerdnsadmin.routes.api.History', + spec=powerdnsadmin.models.history.History) + self.mock_setting_patcher = patch( + 'powerdnsadmin.routes.api.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.mock_decorators_setting_patcher = patch( + 'powerdnsadmin.decorators.Setting', + spec=powerdnsadmin.models.setting.Setting) - data = admin_apikey_data() - api_key = ApiKey(desc=data['description'], - role_name=data['role'], - domains=[]) - api_key.role = Role(name=data['role']) + data = admin_apikey_data() + api_key = ApiKey(desc=data['description'], + role_name=data['role'], + domains=[]) + api_key.role = Role(name=data['role']) - self.mock_google_setting = self.google_setting_patcher.start() - self.mock_github_setting = self.github_setting_patcher.start() - self.mock_oidc_setting = self.oidc_setting_patcher.start() - self.mock_helpers_setting = self.helpers_setting_patcher.start() - self.mock_models_setting = self.models_setting_patcher.start() - self.mock_domain_model_setting = self.domain_model_setting_patcher.start( - ) - self.mock_record_model_setting = self.record_model_setting_patcher.start( - ) - self.mock_server_model_setting = self.server_model_setting_patcher.start( - ) - self.mock_apikey = self.mock_apikey_patcher.start() - self.mock_hist = self.mock_hist_patcher.start() - self.mock_setting = self.mock_setting_patcher.start() + self.mock_google_setting = self.google_setting_patcher.start() + self.mock_github_setting = self.github_setting_patcher.start() + self.mock_azure_setting = self.azure_setting_patcher.start() + self.mock_oidc_setting = self.oidc_setting_patcher.start() + self.mock_helpers_setting = self.helpers_setting_patcher.start() + self.mock_models_setting = self.models_setting_patcher.start() + self.mock_domain_model_setting = self.domain_model_setting_patcher.start( + ) + self.mock_record_model_setting = self.record_model_setting_patcher.start( + ) + self.mock_server_model_setting = self.server_model_setting_patcher.start( + ) + self.mock_apikey = self.mock_apikey_patcher.start() + self.mock_hist = self.mock_hist_patcher.start() + self.mock_setting = self.mock_setting_patcher.start() + self.mock_decorators_setting = self.mock_decorators_setting_patcher.start() + + self.mock_google_setting.return_value.get.side_effect = load_data + self.mock_github_setting.return_value.get.side_effect = load_data + self.mock_azure_setting.return_value.get.side_effect = load_data + self.mock_oidc_setting.return_value.get.side_effect = load_data + self.mock_helpers_setting.return_value.get.side_effect = load_data + self.mock_models_setting.return_value.get.side_effect = load_data + self.mock_domain_model_setting.return_value.get.side_effect = load_data + self.mock_record_model_setting.return_value.get.side_effect = load_data + self.mock_server_model_setting.return_value.get.side_effect = load_data + self.mock_decorators_setting.return_value.get.side_effect = load_data + self.mock_apikey.return_value.is_validate.return_value = api_key + + yield + + for patcher in [ + self.google_setting_patcher, + self.github_setting_patcher, + self.azure_setting_patcher, + self.oidc_setting_patcher, + self.helpers_setting_patcher, + self.models_setting_patcher, + self.domain_model_setting_patcher, + self.record_model_setting_patcher, + self.server_model_setting_patcher, + self.mock_apikey_patcher, + self.mock_hist_patcher, + self.mock_setting_patcher, + self.mock_decorators_setting_patcher, + ]: + patcher.stop() - self.mock_google_setting.return_value.get.side_effect = load_data - self.mock_github_setting.return_value.get.side_effect = load_data - self.mock_oidc_setting.return_value.get.side_effect = load_data - self.mock_helpers_setting.return_value.get.side_effect = load_data - self.mock_models_setting.return_value.get.side_effect = load_data - self.mock_domain_model_setting.return_value.get.side_effect = load_data - self.mock_record_model_setting.return_value.get.side_effect = load_data - self.mock_server_model_setting.return_value.get.side_effect = load_data - self.mock_apikey.return_value.is_validate.return_value = api_key def test_empty_get(self, client, common_data_mock, admin_apikey): with patch('powerdnsadmin.routes.api.Domain') as mock_domain, \ diff --git a/tests/unit/zone/test_admin_user.py b/tests/unit/zone/test_admin_user.py index 085ba74..c4412de 100644 --- a/tests/unit/zone/test_admin_user.py +++ b/tests/unit/zone/test_admin_user.py @@ -5,37 +5,33 @@ from collections import namedtuple import powerdnsadmin from powerdnsadmin.models.user import User -from powerdnsadmin.models.role import Role from powerdnsadmin.models.domain import Domain from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client, basic_auth_admin_headers -from tests.fixtures import zone_data, created_zone_data, load_data +from tests.conftest import load_data class TestUnitApiZoneAdminUser(object): @pytest.fixture - def common_data_mock(self): + def common_data_mock(self, app, initial_data): + self.google_setting_patcher = patch( 'powerdnsadmin.services.google.Setting', spec=powerdnsadmin.models.setting.Setting) self.github_setting_patcher = patch( 'powerdnsadmin.services.github.Setting', spec=powerdnsadmin.models.setting.Setting) + self.azure_setting_patcher = patch( + 'powerdnsadmin.services.azure.Setting', + spec=powerdnsadmin.models.setting.Setting) self.oidc_setting_patcher = patch( 'powerdnsadmin.services.oidc.Setting', spec=powerdnsadmin.models.setting.Setting) - self.api_setting_patcher = patch( - 'powerdnsadmin.routes.api.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.base_route_user_patcher = patch( - 'powerdnsadmin.routes.base.User', - spec=powerdnsadmin.models.user.User) self.helpers_setting_patcher = patch( 'powerdnsadmin.lib.helper.Setting', spec=powerdnsadmin.models.setting.Setting) self.models_setting_patcher = patch( - 'powerdnsadmin.models.Setting', + 'powerdnsadmin.models.setting.Setting', spec=powerdnsadmin.models.setting.Setting) self.domain_model_setting_patcher = patch( 'powerdnsadmin.models.domain.Setting', @@ -46,50 +42,81 @@ class TestUnitApiZoneAdminUser(object): self.server_model_setting_patcher = patch( 'powerdnsadmin.models.server.Setting', spec=powerdnsadmin.models.setting.Setting) - self.decorators_setting_patcher = patch( - 'powerdnsadmin.decorators.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.mock_user_patcher = patch('powerdnsadmin.decorators.User', - spec=powerdnsadmin.models.user.User) + self.mock_user_patcher = patch( + 'powerdnsadmin.decorators.User', + spec=powerdnsadmin.models.user.User) self.mock_hist_patcher = patch( 'powerdnsadmin.routes.api.History', spec=powerdnsadmin.models.history.History) + self.mock_setting_patcher = patch( + 'powerdnsadmin.routes.api.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.mock_decorators_setting_patcher = patch( + 'powerdnsadmin.decorators.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.base_route_user_patcher = patch( + 'powerdnsadmin.routes.base.User', + spec=powerdnsadmin.models.user.User) - self.mock_google_setting = self.google_setting_patcher.start() - self.mock_github_setting = self.github_setting_patcher.start() - self.mock_oidc_setting = self.oidc_setting_patcher.start() - self.mock_base_route_user = self.base_route_user_patcher.start() - self.mock_helpers_setting = self.helpers_setting_patcher.start() - self.mock_models_setting = self.models_setting_patcher.start() - self.mock_domain_model_setting = self.domain_model_setting_patcher.start( - ) - self.mock_record_model_setting = self.record_model_setting_patcher.start( - ) - self.mock_server_model_setting = self.server_model_setting_patcher.start( - ) - self.decorators_setting = self.decorators_setting_patcher.start() - self.api_setting = self.api_setting_patcher.start() - self.mock_user = self.mock_user_patcher.start() - self.mock_hist = self.mock_hist_patcher.start() + with app.app_context(): + self.mock_google_setting = self.google_setting_patcher.start() + self.mock_github_setting = self.github_setting_patcher.start() + self.mock_azure_setting = self.azure_setting_patcher.start() + self.mock_oidc_setting = self.oidc_setting_patcher.start() + self.mock_base_route_user = self.base_route_user_patcher.start() + self.mock_helpers_setting = self.helpers_setting_patcher.start() + self.mock_models_setting = self.models_setting_patcher.start() + self.mock_domain_model_setting = self.domain_model_setting_patcher.start( + ) + self.mock_record_model_setting = self.record_model_setting_patcher.start( + ) + self.mock_server_model_setting = self.server_model_setting_patcher.start( + ) + self.mock_user = self.mock_user_patcher.start() + self.mock_hist = self.mock_hist_patcher.start() + self.mock_setting = self.mock_setting_patcher.start() + self.mock_decorators_setting = self.mock_decorators_setting_patcher.start() - self.mock_google_setting.return_value.get.side_effect = load_data - self.mock_github_setting.return_value.get.side_effect = load_data - self.mock_oidc_setting.return_value.get.side_effect = load_data - self.mock_helpers_setting.return_value.get.side_effect = load_data - self.mock_models_setting.return_value.get.side_effect = load_data - self.mock_domain_model_setting.return_value.get.side_effect = load_data - self.mock_record_model_setting.return_value.get.side_effect = load_data - self.mock_server_model_setting.return_value.get.side_effect = load_data - self.decorators_setting.return_value.get.side_effect = load_data - self.api_setting.return_value.get.side_effect = load_data + self.mock_google_setting.return_value.get.side_effect = load_data + self.mock_github_setting.return_value.get.side_effect = load_data + self.mock_azure_setting.return_value.get.side_effect = load_data + self.mock_oidc_setting.return_value.get.side_effect = load_data + self.mock_helpers_setting.return_value.get.side_effect = load_data + self.mock_models_setting.return_value.get.side_effect = load_data + self.mock_domain_model_setting.return_value.get.side_effect = load_data + self.mock_record_model_setting.return_value.get.side_effect = load_data + self.mock_server_model_setting.return_value.get.side_effect = load_data + self.mock_decorators_setting.return_value.get.side_effect = load_data + self.mock_setting.return_value.get.side_effect = load_data - self.mockk = MagicMock() - self.mockk.role.name = "Administrator" + self.mockk = MagicMock() + self.mockk.role.name = "Administrator" + + self.mock_user.query.filter.return_value.first.return_value = self.mockk + self.mock_user.return_value.is_validate.return_value = True + self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk + self.mock_base_route_user.return_value.is_validate.return_value = True + + yield + + for patcher in [ + self.google_setting_patcher, + self.github_setting_patcher, + self.azure_setting_patcher, + self.oidc_setting_patcher, + self.base_route_user_patcher, + self.helpers_setting_patcher, + self.models_setting_patcher, + self.domain_model_setting_patcher, + self.record_model_setting_patcher, + self.server_model_setting_patcher, + self.mock_user_patcher, + self.mock_hist_patcher, + self.mock_setting_patcher, + self.mock_decorators_setting_patcher, + ]: + patcher.stop() - self.mock_user.query.filter.return_value.first.return_value = self.mockk - self.mock_user.return_value.is_validate.return_value = True - self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk - self.mock_base_route_user.return_value.is_validate.return_value = True def test_empty_get(self, client, common_data_mock, basic_auth_admin_headers): @@ -135,8 +162,7 @@ class TestUnitApiZoneAdminUser(object): headers=basic_auth_admin_headers) data = res.get_json(force=True) - fake_domain = namedtuple("Domain", - data[0].keys())(*data[0].values()) + fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values()) domain_schema = DomainSchema(many=True) json.dumps(domain_schema.dump([fake_domain])) diff --git a/tests/unit/zone/test_user.py b/tests/unit/zone/test_user.py index 7a8d8d9..f7fe335 100644 --- a/tests/unit/zone/test_user.py +++ b/tests/unit/zone/test_user.py @@ -5,37 +5,33 @@ from collections import namedtuple import powerdnsadmin from powerdnsadmin.models.user import User -from powerdnsadmin.models.role import Role from powerdnsadmin.models.domain import Domain from powerdnsadmin.lib.validators import validate_zone from powerdnsadmin.lib.schema import DomainSchema -from tests.fixtures import client, basic_auth_user_headers -from tests.fixtures import zone_data, created_zone_data, load_data +from tests.conftest import load_data class TestUnitApiZoneUser(object): @pytest.fixture - def common_data_mock(self): + def common_data_mock(self, app, initial_data): + self.google_setting_patcher = patch( 'powerdnsadmin.services.google.Setting', spec=powerdnsadmin.models.setting.Setting) self.github_setting_patcher = patch( 'powerdnsadmin.services.github.Setting', spec=powerdnsadmin.models.setting.Setting) + self.azure_setting_patcher = patch( + 'powerdnsadmin.services.azure.Setting', + spec=powerdnsadmin.models.setting.Setting) self.oidc_setting_patcher = patch( 'powerdnsadmin.services.oidc.Setting', spec=powerdnsadmin.models.setting.Setting) - self.api_setting_patcher = patch( - 'powerdnsadmin.routes.api.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.base_route_user_patcher = patch( - 'powerdnsadmin.routes.base.User', - spec=powerdnsadmin.models.user.User) self.helpers_setting_patcher = patch( 'powerdnsadmin.lib.helper.Setting', spec=powerdnsadmin.models.setting.Setting) self.models_setting_patcher = patch( - 'powerdnsadmin.models.Setting', + 'powerdnsadmin.models.setting.Setting', spec=powerdnsadmin.models.setting.Setting) self.domain_model_setting_patcher = patch( 'powerdnsadmin.models.domain.Setting', @@ -46,55 +42,86 @@ class TestUnitApiZoneUser(object): self.server_model_setting_patcher = patch( 'powerdnsadmin.models.server.Setting', spec=powerdnsadmin.models.setting.Setting) - self.decorators_setting_patcher = patch( - 'powerdnsadmin.decorators.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.mock_user_patcher = patch('powerdnsadmin.decorators.User', - spec=powerdnsadmin.models.user.User) + self.mock_user_patcher = patch( + 'powerdnsadmin.decorators.User', + spec=powerdnsadmin.models.user.User) self.mock_hist_patcher = patch( 'powerdnsadmin.routes.api.History', spec=powerdnsadmin.models.history.History) + self.mock_setting_patcher = patch( + 'powerdnsadmin.routes.api.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.mock_decorators_setting_patcher = patch( + 'powerdnsadmin.decorators.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.base_route_user_patcher = patch( + 'powerdnsadmin.routes.base.User', + spec=powerdnsadmin.models.user.User) - self.mock_google_setting = self.google_setting_patcher.start() - self.mock_github_setting = self.github_setting_patcher.start() - self.mock_oidc_setting = self.oidc_setting_patcher.start() - self.mock_base_route_user = self.base_route_user_patcher.start() - self.mock_helpers_setting = self.helpers_setting_patcher.start() - self.mock_models_setting = self.models_setting_patcher.start() - self.mock_domain_model_setting = self.domain_model_setting_patcher.start( - ) - self.mock_record_model_setting = self.record_model_setting_patcher.start( - ) - self.mock_server_model_setting = self.server_model_setting_patcher.start( - ) - self.decorators_setting = self.decorators_setting_patcher.start() - self.api_setting = self.api_setting_patcher.start() - self.mock_user = self.mock_user_patcher.start() - self.mock_hist = self.mock_hist_patcher.start() + with app.app_context(): + self.mock_google_setting = self.google_setting_patcher.start() + self.mock_github_setting = self.github_setting_patcher.start() + self.mock_azure_setting = self.azure_setting_patcher.start() + self.mock_oidc_setting = self.oidc_setting_patcher.start() + self.mock_base_route_user = self.base_route_user_patcher.start() + self.mock_helpers_setting = self.helpers_setting_patcher.start() + self.mock_models_setting = self.models_setting_patcher.start() + self.mock_domain_model_setting = self.domain_model_setting_patcher.start( + ) + self.mock_record_model_setting = self.record_model_setting_patcher.start( + ) + self.mock_server_model_setting = self.server_model_setting_patcher.start( + ) + self.mock_user = self.mock_user_patcher.start() + self.mock_hist = self.mock_hist_patcher.start() + self.mock_setting = self.mock_setting_patcher.start() + self.mock_decorators_setting = self.mock_decorators_setting_patcher.start() - self.mock_google_setting.return_value.get.side_effect = load_data - self.mock_github_setting.return_value.get.side_effect = load_data - self.mock_oidc_setting.return_value.get.side_effect = load_data - self.mock_helpers_setting.return_value.get.side_effect = load_data - self.mock_models_setting.return_value.get.side_effect = load_data - self.mock_domain_model_setting.return_value.get.side_effect = load_data - self.mock_record_model_setting.return_value.get.side_effect = load_data - self.mock_server_model_setting.return_value.get.side_effect = load_data - self.decorators_setting.return_value.get.side_effect = load_data - self.api_setting.return_value.get.side_effect = load_data + self.mock_google_setting.return_value.get.side_effect = load_data + self.mock_github_setting.return_value.get.side_effect = load_data + self.mock_azure_setting.return_value.get.side_effect = load_data + self.mock_oidc_setting.return_value.get.side_effect = load_data + self.mock_helpers_setting.return_value.get.side_effect = load_data + self.mock_models_setting.return_value.get.side_effect = load_data + self.mock_domain_model_setting.return_value.get.side_effect = load_data + self.mock_record_model_setting.return_value.get.side_effect = load_data + self.mock_server_model_setting.return_value.get.side_effect = load_data + self.mock_decorators_setting.return_value.get.side_effect = load_data + self.mock_setting.return_value.get.side_effect = load_data - self.mockk = MagicMock() - self.mockk.role.name = "User" + self.mockk = MagicMock() + self.mockk.role.name = "User" + + self.mock_user.query.filter.return_value.first.return_value = self.mockk + self.mock_user.return_value.is_validate.return_value = True + self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk + self.mock_base_route_user.return_value.is_validate.return_value = True + + yield + + for patcher in [ + self.google_setting_patcher, + self.github_setting_patcher, + self.azure_setting_patcher, + self.oidc_setting_patcher, + self.base_route_user_patcher, + self.helpers_setting_patcher, + self.models_setting_patcher, + self.domain_model_setting_patcher, + self.record_model_setting_patcher, + self.server_model_setting_patcher, + self.mock_user_patcher, + self.mock_hist_patcher, + self.mock_setting_patcher, + self.mock_decorators_setting_patcher, + ]: + patcher.stop() - self.mock_user.query.filter.return_value.first.return_value = self.mockk - self.mock_user.return_value.is_validate.return_value = True - self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk - self.mock_base_route_user.return_value.is_validate.return_value = True def test_create_zone(self, client, common_data_mock, zone_data, basic_auth_user_headers, created_zone_data): with patch('powerdnsadmin.lib.helper.requests.request') as mock_post, \ - patch('powerdnsadmin.routes.api.Domain') as mock_domain: + patch('powerdnsadmin.routes.api.Domain') as mock_domain: mock_post.return_value.status_code = 201 mock_post.return_value.content = json.dumps(created_zone_data) mock_post.return_value.headers = {} @@ -115,8 +142,9 @@ class TestUnitApiZoneUser(object): with patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains: test_domain = Domain(1, name=zone_data['name'].rstrip(".")) mock_user_domains.return_value = [test_domain] + res = client.get("/api/v1/pdnsadmin/zones", - headers=basic_auth_user_headers) + headers=basic_auth_user_headers) data = res.get_json(force=True) fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values()) @@ -127,9 +155,9 @@ class TestUnitApiZoneUser(object): def test_delete_zone(self, client, common_data_mock, zone_data, basic_auth_user_headers): - with patch('powerdnsadmin.lib.utils.requests.request') as mock_delete, \ - patch('powerdnsadmin.routes.api.Domain') as mock_domain, \ - patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains: + with patch('powerdnsadmin.lib.helper.requests.request') as mock_delete, \ + patch('powerdnsadmin.routes.api.Domain') as mock_domain, \ + patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains: test_domain = Domain(1, name=zone_data['name'].rstrip(".")) mock_domain.return_value.update.return_value = True diff --git a/tests/unit/zone/test_user_apikey.py b/tests/unit/zone/test_user_apikey.py index 84cd53c..b1911ce 100644 --- a/tests/unit/zone/test_user_apikey.py +++ b/tests/unit/zone/test_user_apikey.py @@ -1,7 +1,6 @@ import json import pytest from unittest.mock import patch -from base64 import b64encode from collections import namedtuple import powerdnsadmin @@ -10,78 +9,111 @@ from powerdnsadmin.models.domain import Domain from powerdnsadmin.models.api_key import ApiKey from powerdnsadmin.models.role import Role from powerdnsadmin.lib.validators import validate_zone -from powerdnsadmin.lib.schema import DomainSchema, ApiKeySchema -from tests.fixtures import client, initial_data, created_zone_data -from tests.fixtures import user_apikey, zone_data -from tests.fixtures import user_apikey_data, load_data +from powerdnsadmin.lib.schema import DomainSchema +from tests.conftest import user_apikey_data, load_data class TestUnitApiZoneUserApiKey(object): @pytest.fixture - def common_data_mock(self): - self.google_setting_patcher = patch( - 'powerdnsadmin.services.google.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.github_setting_patcher = patch( - 'powerdnsadmin.services.github.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.oidc_setting_patcher = patch( - 'powerdnsadmin.services.oidc.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.helpers_setting_patcher = patch( - 'powerdnsadmin.lib.helper.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.models_setting_patcher = patch( - 'powerdnsadmin.models.setting.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.domain_model_setting_patcher = patch( - 'powerdnsadmin.models.domain.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.record_model_setting_patcher = patch( - 'powerdnsadmin.models.record.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.server_model_setting_patcher = patch( - 'powerdnsadmin.models.server.Setting', - spec=powerdnsadmin.models.setting.Setting) - self.mock_apikey_patcher = patch( - 'powerdnsadmin.decorators.ApiKey', - spec=powerdnsadmin.models.api_key.ApiKey) - self.mock_hist_patcher = patch( - 'powerdnsadmin.routes.api.History', - spec=powerdnsadmin.models.history.History) + def common_data_mock(self, app): + with app.app_context(): + self.google_setting_patcher = patch( + 'powerdnsadmin.services.google.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.github_setting_patcher = patch( + 'powerdnsadmin.services.github.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.azure_setting_patcher = patch( + 'powerdnsadmin.services.azure.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.oidc_setting_patcher = patch( + 'powerdnsadmin.services.oidc.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.helpers_setting_patcher = patch( + 'powerdnsadmin.lib.helper.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.models_setting_patcher = patch( + 'powerdnsadmin.models.setting.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.domain_model_setting_patcher = patch( + 'powerdnsadmin.models.domain.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.record_model_setting_patcher = patch( + 'powerdnsadmin.models.record.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.server_model_setting_patcher = patch( + 'powerdnsadmin.models.server.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.mock_apikey_patcher = patch( + 'powerdnsadmin.decorators.ApiKey', + spec=powerdnsadmin.models.api_key.ApiKey) + self.mock_hist_patcher = patch( + 'powerdnsadmin.routes.api.History', + spec=powerdnsadmin.models.history.History) + self.mock_setting_patcher = patch( + 'powerdnsadmin.routes.api.Setting', + spec=powerdnsadmin.models.setting.Setting) + self.mock_decorators_setting_patcher = patch( + 'powerdnsadmin.decorators.Setting', + spec=powerdnsadmin.models.setting.Setting) - self.mock_google_setting = self.google_setting_patcher.start() - self.mock_github_setting = self.github_setting_patcher.start() - self.mock_oidc_setting = self.oidc_setting_patcher.start() - self.mock_helpers_setting = self.helpers_setting_patcher.start() - self.mock_models_setting = self.models_setting_patcher.start() - self.mock_domain_model_setting = self.domain_model_setting_patcher.start( - ) - self.mock_record_model_setting = self.record_model_setting_patcher.start( - ) - self.mock_server_model_setting = self.server_model_setting_patcher.start( - ) - self.mock_apikey = self.mock_apikey_patcher.start() - self.mock_hist = self.mock_hist_patcher.start() + self.mock_google_setting = self.google_setting_patcher.start() + self.mock_github_setting = self.github_setting_patcher.start() + self.mock_azure_setting = self.azure_setting_patcher.start() + self.mock_oidc_setting = self.oidc_setting_patcher.start() + self.mock_helpers_setting = self.helpers_setting_patcher.start() + self.mock_models_setting = self.models_setting_patcher.start() + self.mock_domain_model_setting = self.domain_model_setting_patcher.start( + ) + self.mock_record_model_setting = self.record_model_setting_patcher.start( + ) + self.mock_server_model_setting = self.server_model_setting_patcher.start( + ) + self.mock_apikey = self.mock_apikey_patcher.start() + self.mock_hist = self.mock_hist_patcher.start() + self.mock_setting = self.mock_setting_patcher.start() + self.mock_decorators_setting = self.mock_decorators_setting_patcher.start() - self.mock_google_setting.return_value.get.side_effect = load_data - self.mock_github_setting.return_value.get.side_effect = load_data - self.mock_oidc_setting.return_value.get.side_effect = load_data - self.mock_helpers_setting.return_value.get.side_effect = load_data - self.mock_models_setting.return_value.get.side_effect = load_data - self.mock_domain_model_setting.return_value.get.side_effect = load_data - self.mock_record_model_setting.return_value.get.side_effect = load_data - self.mock_server_model_setting.return_value.get.side_effect = load_data + self.mock_google_setting.return_value.get.side_effect = load_data + self.mock_github_setting.return_value.get.side_effect = load_data + self.mock_azure_setting.return_value.get.side_effect = load_data + self.mock_oidc_setting.return_value.get.side_effect = load_data + self.mock_helpers_setting.return_value.get.side_effect = load_data + self.mock_models_setting.return_value.get.side_effect = load_data + self.mock_domain_model_setting.return_value.get.side_effect = load_data + self.mock_record_model_setting.return_value.get.side_effect = load_data + self.mock_server_model_setting.return_value.get.side_effect = load_data + self.mock_decorators_setting.return_value.get.side_effect = load_data - data = user_apikey_data() - domain = Domain(name=data['domains'][0]) + data = user_apikey_data() + domain = Domain(name=data['domains'][0]) - api_key = ApiKey(desc=data['description'], - role_name=data['role'], - domains=[domain]) - api_key.role = Role(name=data['role']) + api_key = ApiKey(desc=data['description'], + role_name=data['role'], + domains=[domain]) + api_key.role = Role(name=data['role']) + + self.mock_apikey.return_value.is_validate.return_value = api_key + + yield + + for patcher in [ + self.google_setting_patcher, + self.github_setting_patcher, + self.azure_setting_patcher, + self.oidc_setting_patcher, + self.helpers_setting_patcher, + self.models_setting_patcher, + self.domain_model_setting_patcher, + self.record_model_setting_patcher, + self.server_model_setting_patcher, + self.mock_apikey_patcher, + self.mock_hist_patcher, + self.mock_setting_patcher, + self.mock_decorators_setting_patcher, + ]: + patcher.stop() - self.mock_apikey.return_value.is_validate.return_value = api_key def test_create_zone(self, client, common_data_mock, zone_data, user_apikey, created_zone_data): @@ -112,8 +144,7 @@ class TestUnitApiZoneUserApiKey(object): headers=user_apikey) data = res.get_json(force=True) - fake_domain = namedtuple("Domain", - data[0].keys())(*data[0].values()) + fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values()) domain_schema = DomainSchema(many=True) json.dumps(domain_schema.dump([fake_domain]))