Merge branch 'master' into AdminLTE-Upgrade

This commit is contained in:
Matt Scott 2023-02-17 18:17:32 -05:00 committed by GitHub
commit 2ff01fbfe9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 824 additions and 599 deletions

View File

@ -1,11 +1,11 @@
version: "2.1"
version: "3.8"
services:
powerdns-admin:
image: powerdns-admin-test
build:
context: .
dockerfile: docker-test/Dockerfile
image: powerdns-admin-test
container_name: powerdns-admin-test
ports:
- "9191:80"
@ -17,10 +17,10 @@ services:
- pdns-server
pdns-server:
image: pdns-server-test
build:
context: .
dockerfile: docker-test/Dockerfile.pdns
image: pdns-server-test
ports:
- "5053:53"
- "5053:53/udp"

View File

@ -1,15 +1,35 @@
FROM debian:stretch-slim
FROM debian:bullseye-slim
LABEL maintainer="k@ndk.name"
ENV LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8 LANGUAGE=en_US.UTF-8
RUN apt-get update -y \
&& apt-get install -y --no-install-recommends apt-transport-https locales locales-all python3-pip python3-setuptools python3-dev curl libsasl2-dev libldap2-dev libssl-dev libxml2-dev libxslt1-dev libxmlsec1-dev libffi-dev build-essential libmariadb-dev-compat \
&& curl -sL https://deb.nodesource.com/setup_10.x | bash - \
&& apt-get install -y --no-install-recommends \
apt-transport-https \
curl \
build-essential \
libffi-dev \
libldap2-dev \
libmariadb-dev-compat \
libsasl2-dev \
libssl-dev \
libxml2-dev \
libxmlsec1-dev \
libxmlsec1-openssl \
libxslt1-dev \
locales \
locales-all \
pkg-config \
python3-dev \
python3-pip \
python3-setuptools \
&& curl -sL https://deb.nodesource.com/setup_lts.x | bash - \
&& curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
&& echo "deb https://dl.yarnpkg.com/debian/ stable main" > /etc/apt/sources.list.d/yarn.list \
&& apt-get update -y \
&& apt-get install -y nodejs yarn \
&& apt-get install -y --no-install-recommends \
nodejs \
yarn \
&& apt-get clean -y \
&& rm -rf /var/lib/apt/lists/*
@ -21,8 +41,6 @@ RUN pip3 install --upgrade pip
RUN pip3 install -r requirements.txt
COPY . /app
COPY ./docker/entrypoint.sh /usr/local/bin/
RUN chmod +x /usr/local/bin/entrypoint.sh
ENV FLASK_APP=powerdnsadmin/__init__.py
RUN yarn install --pure-lockfile --production \
@ -31,4 +49,4 @@ RUN yarn install --pure-lockfile --production \
COPY ./docker-test/wait-for-pdns.sh /opt
RUN chmod u+x /opt/wait-for-pdns.sh
CMD ["/opt/wait-for-pdns.sh", "/usr/local/bin/pytest","--capture=no","-vv"]
CMD ["/opt/wait-for-pdns.sh", "/usr/local/bin/pytest", "-W", "ignore::DeprecationWarning", "--capture=no", "-vv"]

View File

@ -10,9 +10,9 @@ fi
# Import schema structure
if [ -e "/data/pdns.sql" ]; then
rm /data/pdns.db
rm -f /data/pdns.db
cat /data/pdns.sql | sqlite3 /data/pdns.db
rm /data/pdns.sql
rm -f /data/pdns.sql
echo "Imported schema structure"
fi

View File

@ -1,8 +1,13 @@
from __future__ import with_statement
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
import logging
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from flask import current_app
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
@ -17,9 +22,9 @@ logger = logging.getLogger('alembic.env')
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from flask import current_app
config.set_main_option('sqlalchemy.url',
current_app.config.get('SQLALCHEMY_DATABASE_URI').replace("%","%%"))
config.set_main_option(
'sqlalchemy.url',
str(current_app.extensions['migrate'].db.engine.url).replace('%', '%%'))
target_metadata = current_app.extensions['migrate'].db.metadata
# other values from the config, defined by the needs of env.py,
@ -41,7 +46,9 @@ def run_migrations_offline():
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True
)
with context.begin_transaction():
context.run_migrations()
@ -65,22 +72,23 @@ def run_migrations_online():
directives[:] = []
logger.info('No changes in schema detected.')
engine = engine_from_config(config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
connection = engine.connect()
context.configure(connection=connection,
target_metadata=target_metadata,
process_revision_directives=process_revision_directives,
render_as_batch=config.get_main_option('sqlalchemy.url').startswith('sqlite:'),
**current_app.extensions['migrate'].configure_args)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
process_revision_directives=process_revision_directives,
**current_app.extensions['migrate'].configure_args
)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
if context.is_offline_mode():
run_migrations_offline()

View File

@ -56,9 +56,9 @@ def seed_data():
op.bulk_insert(template_table,
[
{id: 1, 'name': 'basic_template_1', 'description': 'Basic Template #1'},
{id: 2, 'name': 'basic_template_2', 'description': 'Basic Template #2'},
{id: 3, 'name': 'basic_template_3', 'description': 'Basic Template #3'}
{'id': 1, 'name': 'basic_template_1', 'description': 'Basic Template #1'},
{'id': 2, 'name': 'basic_template_2', 'description': 'Basic Template #2'},
{'id': 3, 'name': 'basic_template_3', 'description': 'Basic Template #3'}
]
)

View File

@ -0,0 +1,24 @@
"""Add unique index to settings table keys
Revision ID: b24bf17725d2
Revises: 0967658d9c0d
Create Date: 2021-12-12 20:29:17.103441
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'b24bf17725d2'
down_revision = '0967658d9c0d'
branch_labels = None
depends_on = None
def upgrade():
op.create_index(op.f('ix_setting_name'), 'setting', ['name'], unique=True)
def downgrade():
op.drop_index(op.f('ix_setting_name'), table_name='setting')

View File

@ -388,7 +388,7 @@ def apikey_can_configure_dnssec(http_methods=[]):
def allowed_record_types(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if request.method == 'GET':
if request.method in ['GET', 'DELETE', 'PUT']:
return f(*args, **kwargs)
if g.apikey.role.name in ['Administrator', 'Operator']:

View File

@ -2,6 +2,7 @@ import json
import re
import traceback
from flask import current_app
from flask_login import current_user
from urllib.parse import urljoin
from distutils.util import strtobool
@ -548,11 +549,12 @@ class Domain(db.Model):
domain.apikeys[:] = []
# Remove history for domain
domain_history = History.query.filter(
History.domain_id == domain.id
)
if domain_history:
domain_history.delete()
if not Setting().get('preserve_history'):
domain_history = History.query.filter(
History.domain_id == domain.id
)
if domain_history:
domain_history.delete()
# then remove domain
Domain.query.filter(Domain.name == domain_name).delete()
@ -851,6 +853,7 @@ class Domain(db.Model):
headers = {'X-API-Key': self.PDNS_API_KEY, 'Content-Type': 'application/json'}
account_name_old = Account().get_name_by_id(domain.account_id)
account_name = Account().get_name_by_id(account_id)
post_data = {"account": account_name}
@ -874,6 +877,13 @@ class Domain(db.Model):
self.update()
msg_str = 'Account changed for domain {0} successfully'
current_app.logger.info(msg_str.format(domain_name))
history = History(msg='Update domain {0} associate account {1}'.format(domain.name, 'none' if account_name == '' else account_name),
detail = json.dumps({
'assoc_account': 'None' if account_name == '' else account_name,
'dissoc_account': 'None' if account_name_old == '' else account_name_old
}),
created_by=current_user.username)
history.add()
return {'status': 'ok', 'msg': 'account changed successfully'}
except Exception as e:

View File

@ -422,6 +422,25 @@ class Record(object):
]
d = Domain()
for r in del_rrsets:
for record in r['records']:
# Format the reverse record name
# It is the reverse of forward record's content.
reverse_host_address = dns.reversename.from_address(
record['content']).to_text()
# Create the reverse domain name in PDNS
domain_reverse_name = d.get_reverse_domain_name(
reverse_host_address)
d.create_reverse_domain(domain_name,
domain_reverse_name)
# Delete the reverse zone
self.name = reverse_host_address
self.type = 'PTR'
self.data = record['content']
self.delete(domain_reverse_name)
for r in new_rrsets:
for record in r['records']:
# Format the reverse record name
@ -455,25 +474,6 @@ class Record(object):
# Format the rrset
rrset = {"rrsets": rrset_data}
self.add(domain_reverse_name, rrset)
for r in del_rrsets:
for record in r['records']:
# Format the reverse record name
# It is the reverse of forward record's content.
reverse_host_address = dns.reversename.from_address(
record['content']).to_text()
# Create the reverse domain name in PDNS
domain_reverse_name = d.get_reverse_domain_name(
reverse_host_address)
d.create_reverse_domain(domain_name,
domain_reverse_name)
# Delete the reverse zone
self.name = reverse_host_address
self.type = 'PTR'
self.data = record['content']
self.delete(domain_reverse_name)
return {
'status': 'ok',
'msg': 'Auto-PTR record was updated successfully'

View File

@ -5,7 +5,7 @@ class Role(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), index=True, unique=True)
description = db.Column(db.String(128))
users = db.relationship('User', backref='role', lazy=True)
users = db.relationship('User', back_populates='role', lazy=True)
apikeys = db.relationship('ApiKey', back_populates='role', lazy=True)
def __init__(self, id=None, name=None, description=None):
@ -20,4 +20,4 @@ class Role(db.Model):
self.description = description
def __repr__(self):
return '<Role {0}r>'.format(self.name)
return '<Role {0}>'.format(self.name)

View File

@ -11,7 +11,7 @@ from .base import db
class Setting(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64))
name = db.Column(db.String(64), unique=True, index=True)
value = db.Column(db.Text())
defaults = {
@ -31,6 +31,7 @@ class Setting(db.Model):
'delete_sso_accounts': False,
'bg_domain_updates': False,
'enable_api_rr_history': True,
'preserve_history': False,
'site_name': 'PowerDNS-Admin',
'site_url': 'http://localhost:9191',
'session_timeout': 10,

View File

@ -34,6 +34,7 @@ class User(db.Model):
otp_secret = db.Column(db.String(16))
confirmed = db.Column(db.SmallInteger, nullable=False, default=0)
role_id = db.Column(db.Integer, db.ForeignKey('role.id'))
role = db.relationship('Role', back_populates="users", lazy=True)
accounts = None
def __init__(self,

View File

@ -610,14 +610,21 @@ def manage_user():
@operator_role_required
def edit_account(account_name=None):
users = User.query.all()
account = Account.query.filter(
Account.name == account_name).first()
all_accounts = Account.query.all()
accounts = {acc.id: acc for acc in all_accounts}
domains = Domain.query.all()
if request.method == 'GET':
if account_name is None:
if account_name is None or not account:
return render_template('admin_edit_account.html',
account=None,
account_user_ids=[],
users=users,
domains=domains,
accounts=accounts,
create=1)
else:
account = Account.query.filter(
Account.name == account_name).first()
@ -626,11 +633,14 @@ def edit_account(account_name=None):
account=account,
account_user_ids=account_user_ids,
users=users,
domains=domains,
accounts=accounts,
create=0)
if request.method == 'POST':
fdata = request.form
new_user_list = request.form.getlist('account_multi_user')
new_domain_list = request.form.getlist('account_domains')
# on POST, synthesize account and account_user_ids from form data
if not account_name:
@ -654,6 +664,8 @@ def edit_account(account_name=None):
account=account,
account_user_ids=account_user_ids,
users=users,
domains=domains,
accounts=accounts,
create=create,
invalid_accountname=True)
@ -662,19 +674,33 @@ def edit_account(account_name=None):
account=account,
account_user_ids=account_user_ids,
users=users,
domains=domains,
accounts=accounts,
create=create,
duplicate_accountname=True)
result = account.create_account()
history = History(msg='Create account {0}'.format(account.name),
created_by=current_user.username)
else:
result = account.update_account()
history = History(msg='Update account {0}'.format(account.name),
created_by=current_user.username)
if result['status']:
account = Account.query.filter(
Account.name == account_name).first()
old_domains = Domain.query.filter(Domain.account_id == account.id).all()
for domain_name in new_domain_list:
domain = Domain.query.filter(
Domain.name == domain_name).first()
if account.id != domain.account_id:
Domain(name=domain_name).assoc_account(account.id)
for domain in old_domains:
if domain.name not in new_domain_list:
Domain(name=domain.name).assoc_account(None)
history = History(msg='{0} account {1}'.format('Create' if create else 'Update', account.name),
created_by=current_user.username)
account.grant_privileges(new_user_list)
history.add()
return redirect(url_for('admin.manage_account'))
@ -892,6 +918,16 @@ class DetailedHistory():
history_status=DetailedHistory.get_key_val(detail_dict, 'status'),
history_msg=DetailedHistory.get_key_val(detail_dict, 'msg'))
elif 'Update domain' in history.msg and 'associate account' in history.msg: # When an account gets associated or dissociate with domains
self.detailed_msg = render_template_string('''
<table class="table table-bordered table-striped">
<tr><td>Associate: </td><td>{{ history_assoc_account }}</td></tr>
<tr><td>Dissociate:</td><td>{{ history_dissoc_account }}</td></tr>
</table>
''',
history_assoc_account=DetailedHistory.get_key_val(detail_dict, 'assoc_account'),
history_dissoc_account=DetailedHistory.get_key_val(detail_dict, 'dissoc_account'))
# check for lower key as well for old databases
@staticmethod
def get_key_val(_dict, key):
@ -928,6 +964,13 @@ def history():
'msg': 'You do not have permission to remove history.'
}), 401)
if Setting().get('preserve_history'):
return make_response(
jsonify({
'status': 'error',
'msg': 'History removal is not allowed (toggle preserve_history in settings).'
}), 401)
h = History()
result = h.remove_all()
if result:
@ -1283,6 +1326,7 @@ def setting_basic():
'otp_field_enabled',
'otp_force',
'pdns_api_timeout',
'preserve_history',
'pretty_ipv6_ptr',
'record_helper',
'record_quick_edit',

View File

@ -1,22 +1,21 @@
import json
from urllib.parse import urljoin
import secrets
import string
from base64 import b64encode
from flask import (
Blueprint, g, request, abort, current_app, make_response, jsonify,
)
from urllib.parse import urljoin
from flask import (Blueprint, g, request, abort, current_app, make_response, jsonify)
from flask_login import current_user
from .base import csrf
from ..models.base import db
from ..models import (
User, Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey,
Role,
from ..decorators import (
api_basic_auth, api_can_create_domain, is_json, apikey_auth,
apikey_can_create_domain, apikey_can_remove_domain,
apikey_is_admin, apikey_can_access_domain, apikey_can_configure_dnssec,
api_role_can, apikey_or_basic_auth,
callback_if_request_body_contains_key, allowed_record_types, allowed_record_ttl
)
from ..lib import utils, helper
from ..lib.schema import (
ApiKeySchema, DomainSchema, ApiPlainKeySchema, UserSchema, AccountSchema,
UserDetailedSchema,
)
from ..lib.errors import (
StructuredException,
DomainNotExists, DomainAlreadyExists, DomainAccessForbidden,
@ -26,15 +25,15 @@ from ..lib.errors import (
UserCreateFail, UserCreateDuplicate, UserUpdateFail, UserDeleteFail,
UserUpdateFailEmail, InvalidAccountNameException
)
from ..decorators import (
api_basic_auth, api_can_create_domain, is_json, apikey_auth,
apikey_can_create_domain, apikey_can_remove_domain,
apikey_is_admin, apikey_can_access_domain, apikey_can_configure_dnssec,
api_role_can, apikey_or_basic_auth,
callback_if_request_body_contains_key, allowed_record_types, allowed_record_ttl
from ..lib.schema import (
ApiKeySchema, DomainSchema, ApiPlainKeySchema, UserSchema, AccountSchema,
UserDetailedSchema,
)
import secrets
import string
from ..models import (
User, Domain, DomainUser, Account, AccountUser, History, Setting, ApiKey,
Role,
)
from ..models.base import db
api_bp = Blueprint('api', __name__, url_prefix='/api/v1')
apilist_bp = Blueprint('apilist', __name__, url_prefix='/')
@ -56,10 +55,10 @@ def get_user_domains():
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.filter(
db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
)).all()
db.or_(
DomainUser.user_id == current_user.id,
AccountUser.user_id == current_user.id
)).all()
return domains
@ -71,10 +70,10 @@ def get_user_apikeys(domain_name=None):
.outerjoin(Account, Domain.account_id == Account.id) \
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
.filter(
db.or_(
DomainUser.user_id == User.id,
AccountUser.user_id == User.id
)
db.or_(
DomainUser.user_id == User.id,
AccountUser.user_id == User.id
)
) \
.filter(User.id == current_user.id)
@ -167,12 +166,7 @@ def handle_request_is_not_json(err):
def before_request():
# Check site is in maintenance mode
maintenance = Setting().get('maintenance')
if (
maintenance and current_user.is_authenticated and
current_user.role.name not in [
'Administrator', 'Operator'
]
):
if (maintenance and current_user.is_authenticated and current_user.role.name not in ['Administrator', 'Operator']):
return make_response(
jsonify({
"status": False,
@ -224,14 +218,13 @@ def api_login_create_zone():
history = History(msg='Add domain {0}'.format(
data['name'].rstrip('.')),
detail=json.dumps(data),
created_by=current_user.username,
domain_id=domain_id)
detail=json.dumps(data),
created_by=current_user.username,
domain_id=domain_id)
history.add()
if current_user.role.name not in ['Administrator', 'Operator']:
current_app.logger.debug(
"User is ordinary user, assigning created domain")
current_app.logger.debug("User is ordinary user, assigning created domain")
domain = Domain(name=data['name'].rstrip('.'))
domain.update()
domain.grant_privileges([current_user.id])
@ -299,9 +292,9 @@ def api_login_delete_zone(domain_name):
history = History(msg='Delete domain {0}'.format(
utils.pretty_domain_name(domain_name)),
detail='',
created_by=current_user.username,
domain_id=domain_id)
detail='',
created_by=current_user.username,
domain_id=domain_id)
history.add()
except Exception as e:
@ -326,14 +319,14 @@ def api_generate_apikey():
if 'domains' not in data:
domains = []
elif not isinstance(data['domains'], (list, )):
elif not isinstance(data['domains'], (list,)):
abort(400)
else:
domains = [d['name'] if isinstance(d, dict) else d for d in data['domains']]
if 'accounts' not in data:
accounts = []
elif not isinstance(data['accounts'], (list, )):
elif not isinstance(data['accounts'], (list,)):
abort(400)
else:
accounts = [a['name'] if isinstance(a, dict) else a for a in data['accounts']]
@ -385,8 +378,7 @@ def api_generate_apikey():
user_domain_list = [item.name for item in user_domain_obj_list]
current_app.logger.debug("Input domain list: {0}".format(domain_list))
current_app.logger.debug(
"User domain list: {0}".format(user_domain_list))
current_app.logger.debug("User domain list: {0}".format(user_domain_list))
inter = set(domain_list).intersection(set(user_domain_list))
@ -539,14 +531,14 @@ def api_update_apikey(apikey_id):
if 'domains' not in data:
domains = None
elif not isinstance(data['domains'], (list, )):
elif not isinstance(data['domains'], (list,)):
abort(400)
else:
domains = [d['name'] if isinstance(d, dict) else d for d in data['domains']]
if 'accounts' not in data:
accounts = None
elif not isinstance(data['accounts'], (list, )):
elif not isinstance(data['accounts'], (list,)):
abort(400)
else:
accounts = [a['name'] if isinstance(a, dict) else a for a in data['accounts']]
@ -963,9 +955,7 @@ def api_delete_account(account_id):
account = account_list[0]
else:
abort(404)
current_app.logger.debug(
f'Deleting Account {account.name}'
)
current_app.logger.debug(f'Deleting Account {account.name}')
# Remove account association from domains first
if len(account.domains) > 0:
@ -1047,7 +1037,7 @@ def api_remove_account_user(account_id, user_id):
user_list = User.query.join(AccountUser).filter(
AccountUser.account_id == account_id,
AccountUser.user_id == user_id,
).all()
).all()
if not user_list:
abort(404)
if not account.remove_user(user):
@ -1123,9 +1113,9 @@ def api_zone_forward(server_id, zone_id):
history = History(msg='{0} zone {1} record of {2}'.format(
rrset_data['changetype'].lower(), rrset_data['type'],
rrset_data['name'].rstrip('.')),
detail=json.dumps(data),
created_by=g.apikey.description,
domain_id=Domain().get_id_by_name(zone_id.rstrip('.')))
detail=json.dumps(data),
created_by=g.apikey.description,
domain_id=Domain().get_id_by_name(zone_id.rstrip('.')))
history.add()
elif request.method == 'DELETE':
history = History(msg='Deleted zone {0}'.format(zone_id.rstrip('.')),
@ -1192,17 +1182,13 @@ def api_get_zones(server_id):
return jsonify(domain_schema.dump(domain_obj_list)), 200
else:
resp = helper.forward_request()
if (
g.apikey.role.name not in ['Administrator', 'Operator']
and resp.status_code == 200
):
if (g.apikey.role.name not in ['Administrator', 'Operator'] and resp.status_code == 200):
domain_list = [d['name']
for d in domain_schema.dump(g.apikey.domains)]
accounts_domains = [d.name for a in g.apikey.accounts for d in a.domains]
allowed_domains = set(domain_list + accounts_domains)
current_app.logger.debug("Account domains: {}".format(
'/'.join(accounts_domains)))
current_app.logger.debug("Account domains: {}".format('/'.join(accounts_domains)))
content = json.dumps([i for i in json.loads(resp.content)
if i['name'].rstrip('.') in allowed_domains])
return content, resp.status_code, resp.headers.items()
@ -1223,6 +1209,7 @@ def api_server_config_forward(server_id):
resp = helper.forward_request()
return resp.content, resp.status_code, resp.headers.items()
# The endpoint to synchronize Domains in background
@api_bp.route('/sync_domains', methods=['GET'])
@apikey_or_basic_auth
@ -1231,6 +1218,7 @@ def sync_domains():
domain.update()
return 'Finished synchronization in background', 200
@api_bp.route('/health', methods=['GET'])
@apikey_auth
def health():
@ -1244,7 +1232,8 @@ def health():
try:
domain.get_domain_info(domain_to_query.name)
except Exception as e:
current_app.logger.error("Health Check - Failed to query authoritative server for domain {}".format(domain_to_query.name))
current_app.logger.error(
"Health Check - Failed to query authoritative server for domain {}".format(domain_to_query.name))
return make_response("Down", 503)
return make_response("Up", 200)

View File

@ -89,14 +89,14 @@ def domain(domain_name):
# - Find a way to make it consistent, or
# - Only allow one comment for that case
if StrictVersion(Setting().get('pdns_version')) >= StrictVersion('4.0.0'):
pretty_v6 = Setting().get('pretty_ipv6_ptr')
for r in rrsets:
if r['type'] in records_allow_to_edit:
r_name = r['name'].rstrip('.')
# If it is reverse zone and pretty_ipv6_ptr setting
# is enabled, we reformat the name for ipv6 records.
if Setting().get('pretty_ipv6_ptr') and r[
'type'] == 'PTR' and 'ip6.arpa' in r_name and '*' not in r_name:
if pretty_v6 and r['type'] == 'PTR' and 'ip6.arpa' in r_name and '*' not in r_name:
r_name = dns.reversename.to_address(
dns.name.from_text(r_name))

View File

@ -581,7 +581,7 @@ def get_azure_groups(uri):
def authenticate_user(user, authenticator, remember=False):
login_user(user, remember=remember)
signin_history(user.username, authenticator, True)
if Setting().get('otp_force') and Setting().get('otp_field_enabled') and not user.otp_secret:
if Setting().get('otp_force') and Setting().get('otp_field_enabled') and not user.otp_secret and session['authentication_type'] not in ['OAuth']:
user.update_profile(enable_otp=True)
user_id = current_user.id
prepare_welcome_user(user_id)

View File

@ -1,10 +1,14 @@
Flask==2.1.3
Flask==2.2.2
Flask-Assets==2.0
Flask-Login==0.6.2
Flask-SQLAlchemy==2.5.1
Flask-Migrate==2.5.3
SQLAlchemy==1.3.24
mysqlclient==2.0.1
Flask-SQLAlchemy==3.0.2
Flask-Migrate==4.0.0
Flask-Mail==0.9.1
Flask-SSLify==0.1.5
Flask-Session==0.4.0
Flask-SeaSurf==1.1.1
SQLAlchemy==1.4.45
mysqlclient==2.1.1
configobj==5.0.8
bcrypt==4.0.1
requests==2.28.2
@ -13,22 +17,30 @@ pyotp==2.8.0
qrcode==7.3.1
dnspython>=2.3.0
gunicorn==20.1.0
python3-saml
python3-saml==1.14.0
pytz==2022.7.1
cssmin==0.2.0
rjsmin==1.2.1
Authlib==1.2.0
Flask-SeaSurf==1.1.1
bravado-core==5.17.1
lima==0.5
pytest==7.2.1
jsonschema[format]>=2.5.1,<4.0.0 # until https://github.com/Yelp/bravado-core/pull/385
pytimeparse==1.1.8
PyYAML==5.4
Flask-SSLify==0.1.5
Flask-Mail==0.9.1
flask-session==0.4.0
alembic==1.9.0
certifi==2022.12.7
cffi==1.15.1
passlib==1.7.4
pyasn1==0.4.8
webcolors==1.12
zipp==3.11.0
pyOpenSSL==22.1.0
Authlib==1.2.0
Flask-SeaSurf==1.1.1
PyYAML==6.0
Jinja2==3.1.2
itsdangerous==2.1.2
werkzeug==2.1.2
cryptography==36.0.2
flask_session_captcha==1.3.0
lxml==4.6.5

View File

@ -1,26 +1,28 @@
import os
import pytest
import flask_migrate
from base64 import b64encode
from powerdnsadmin import create_app
from powerdnsadmin.models.base import db
from powerdnsadmin.models.user import User
from powerdnsadmin.models.setting import Setting
from powerdnsadmin.models.api_key import ApiKey
import pytest
from flask_migrate import upgrade as flask_migrate_upgrade
app = create_app('../configs/test.py')
ctx = app.app_context()
ctx.push()
from powerdnsadmin import create_app
from powerdnsadmin.models.api_key import ApiKey
from powerdnsadmin.models.base import db
from powerdnsadmin.models.setting import Setting
from powerdnsadmin.models.user import User
@pytest.fixture(scope="session")
def app():
app = create_app('../configs/test.py')
yield app
@pytest.fixture
def client():
def client(app):
app.config['TESTING'] = True
client = app.test_client()
yield client
def load_data(setting_name, *args, **kwargs):
if setting_name == 'maintenance':
return 0
@ -36,20 +38,22 @@ def load_data(setting_name, *args, **kwargs):
return 10
if setting_name == 'allow_user_create_domain':
return True
if setting_name == 'allow_user_remove_domain':
return True
@pytest.fixture
def test_admin_user():
def test_admin_user(app):
return app.config.get('TEST_ADMIN_USER')
@pytest.fixture
def test_user():
def test_user(app):
return app.config.get('TEST_USER')
@pytest.fixture
def basic_auth_admin_headers():
def basic_auth_admin_headers(app):
test_admin_user = app.config.get('TEST_ADMIN_USER')
test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD')
user_pass = "{0}:{1}".format(test_admin_user, test_admin_pass)
@ -61,7 +65,7 @@ def basic_auth_admin_headers():
@pytest.fixture
def basic_auth_user_headers():
def basic_auth_user_headers(app):
test_user = app.config.get('TEST_USER')
test_user_pass = app.config.get('TEST_USER_PASSWORD')
user_pass = "{0}:{1}".format(test_user, test_user_pass)
@ -73,7 +77,8 @@ def basic_auth_user_headers():
@pytest.fixture(scope="module")
def initial_data():
def initial_data(app):
pdns_proto = os.environ['PDNS_PROTO']
pdns_host = os.environ['PDNS_HOST']
pdns_port = os.environ['PDNS_PORT']
@ -83,46 +88,44 @@ def initial_data():
api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY'])
allow_create_domain_setting = Setting('allow_user_create_domain', True)
try:
flask_migrate.upgrade()
with app.app_context():
try:
flask_migrate_upgrade(directory="migrations")
db.session.add(api_url_setting)
db.session.add(api_key_setting)
db.session.add(allow_create_domain_setting)
db.session.add(api_url_setting)
db.session.add(api_key_setting)
db.session.add(allow_create_domain_setting)
test_user = app.config.get('TEST_USER')
test_user_pass = app.config.get('TEST_USER_PASSWORD')
test_admin_user = app.config.get('TEST_ADMIN_USER')
test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD')
test_user = app.config.get('TEST_USER')
test_user_pass = app.config.get('TEST_USER_PASSWORD')
test_admin_user = app.config.get('TEST_ADMIN_USER')
test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD')
admin_user = User(username=test_admin_user,
plain_text_password=test_admin_pass,
email="admin@admin.com")
ret = admin_user.create_local_user()
admin_user = User(username=test_admin_user,
plain_text_password=test_admin_pass,
email="admin@admin.com")
msg = admin_user.create_local_user()
if not ret['status']:
raise Exception("Error occurred creating user {0}".format(ret['msg']))
if not msg:
raise Exception("Error occurred creating user {0}".format(msg))
ordinary_user = User(username=test_user,
plain_text_password=test_user_pass,
email="test@test.com")
ret = ordinary_user.create_local_user()
ordinary_user = User(username=test_user,
plain_text_password=test_user_pass,
email="test@test.com")
msg = ordinary_user.create_local_user()
if not ret['status']:
raise Exception("Error occurred creating user {0}".format(ret['msg']))
if not msg:
raise Exception("Error occurred creating user {0}".format(msg))
except Exception as e:
print("Unexpected ERROR: {0}".format(e))
raise e
except Exception as e:
print("Unexpected ERROR: {0}".format(e))
raise e
yield
db.session.close()
os.unlink(app.config['TEST_DB_LOCATION'])
@pytest.fixture(scope="module")
def initial_apikey_data():
def initial_apikey_data(app):
pdns_proto = os.environ['PDNS_PROTO']
pdns_host = os.environ['PDNS_HOST']
pdns_port = os.environ['PDNS_PORT']
@ -131,42 +134,42 @@ def initial_apikey_data():
api_url_setting = Setting('pdns_api_url', pdns_api_url)
api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY'])
allow_create_domain_setting = Setting('allow_user_create_domain', True)
allow_remove_domain_setting = Setting('allow_user_remove_domain', True)
try:
flask_migrate.upgrade()
with app.app_context():
try:
flask_migrate_upgrade(directory="migrations")
db.session.add(api_url_setting)
db.session.add(api_key_setting)
db.session.add(allow_create_domain_setting)
db.session.add(allow_remove_domain_setting)
db.session.add(api_url_setting)
db.session.add(api_key_setting)
db.session.add(allow_create_domain_setting)
test_user_apikey = app.config.get('TEST_USER_APIKEY')
test_admin_apikey = app.config.get('TEST_ADMIN_APIKEY')
test_user_apikey = app.config.get('TEST_USER_APIKEY')
test_admin_apikey = app.config.get('TEST_ADMIN_APIKEY')
dummy_apikey = ApiKey(desc="dummy", role_name="Administrator")
dummy_apikey = ApiKey(desc="dummy", role_name="Administrator")
admin_key = dummy_apikey.get_hashed_password(
plain_text_password=test_admin_apikey).decode('utf-8')
admin_key = dummy_apikey.get_hashed_password(
plain_text_password=test_admin_apikey).decode('utf-8')
admin_apikey = ApiKey(key=admin_key,
desc="test admin apikey",
role_name="Administrator")
admin_apikey.create()
admin_apikey = ApiKey(key=admin_key,
desc="test admin apikey",
role_name="Administrator")
admin_apikey.create()
user_key = dummy_apikey.get_hashed_password(
plain_text_password=test_user_apikey).decode('utf-8')
user_key = dummy_apikey.get_hashed_password(
plain_text_password=test_user_apikey).decode('utf-8')
user_apikey = ApiKey(key=user_key,
desc="test user apikey",
role_name="User")
user_apikey.create()
user_apikey = ApiKey(key=user_key,
desc="test user apikey",
role_name="User")
user_apikey.create()
except Exception as e:
print("Unexpected ERROR: {0}".format(e))
raise e
except Exception as e:
print("Unexpected ERROR: {0}".format(e))
raise e
yield
db.session.close()
os.unlink(app.config['TEST_DB_LOCATION'])
@ -183,61 +186,51 @@ def zone_data():
@pytest.fixture
def created_zone_data():
data = {
'url':
'/api/v1/servers/localhost/zones/example.org.',
'soa_edit_api':
'DEFAULT',
'last_check':
0,
'url': '/api/v1/servers/localhost/zones/example.org.',
'soa_edit_api': 'DEFAULT',
'last_check': 0,
'masters': [],
'dnssec':
False,
'notified_serial':
0,
'nsec3narrow':
False,
'serial':
2019013101,
'nsec3param':
'',
'soa_edit':
'',
'api_rectify':
False,
'kind':
'Native',
'dnssec': False,
'notified_serial': 0,
'nsec3narrow': False,
'serial': 2019013101,
'nsec3param': '',
'soa_edit': '',
'api_rectify': False,
'kind': 'Native',
'rrsets': [{
'comments': [],
'type':
'SOA',
'name':
'example.org.',
'ttl':
3600,
'type': 'SOA',
'name': 'example.org.',
'ttl': 3600,
'records': [{
'content':
'a.misconfigured.powerdns.server. hostmaster.example.org. 2019013101 10800 3600 604800 3600',
'content': 'a.misconfigured.powerdns.server. hostmaster.example.org. 2019013101 10800 3600 604800 3600',
'disabled': False
}]
}, {
'comments': [],
'type':
'NS',
'name':
'example.org.',
'ttl':
3600,
'type': 'NS',
'name': 'example.org.',
'ttl': 3600,
'records': [{
'content': 'ns1.example.org.',
'disabled': False
}]
}],
'name':
'example.org.',
'account':
'',
'id':
'example.org.'
'name': 'example.org.',
'account': '',
'id': 'example.org.'
}
return data
def user_data(app):
test_user = app.config.get('TEST_USER')
test_user_pass = app.config.get('TEST_USER_PASSWORD')
data = {
"username": test_user,
"plain_text_password": test_user_pass,
"email": "test@test.com"
}
return data
@ -257,37 +250,39 @@ def admin_apikey_data():
@pytest.fixture(scope='module')
def user_apikey_integration():
def user_apikey_integration(app):
test_user_apikey = app.config.get('TEST_USER_APIKEY')
headers = create_apikey_headers(test_user_apikey)
return headers
@pytest.fixture(scope='module')
def admin_apikey_integration():
def admin_apikey_integration(app):
test_user_apikey = app.config.get('TEST_ADMIN_APIKEY')
headers = create_apikey_headers(test_user_apikey)
return headers
@pytest.fixture(scope='module')
def user_apikey():
data = user_apikey_data()
api_key = ApiKey(desc=data['description'],
role_name=data['role'],
domains=[])
headers = create_apikey_headers(api_key.plain_key)
return headers
def user_apikey(app):
with app.app_context():
data = user_apikey_data()
api_key = ApiKey(desc=data['description'],
role_name=data['role'],
domains=[])
headers = create_apikey_headers(api_key.plain_key)
return headers
@pytest.fixture(scope='module')
def admin_apikey():
data = admin_apikey_data()
api_key = ApiKey(desc=data['description'],
role_name=data['role'],
domains=[])
headers = create_apikey_headers(api_key.plain_key)
return headers
def admin_apikey(app):
with app.app_context():
data = admin_apikey_data()
api_key = ApiKey(desc=data['description'],
role_name=data['role'],
domains=[])
headers = create_apikey_headers(api_key.plain_key)
return headers
def create_apikey_headers(passw):

View File

@ -4,12 +4,11 @@ from collections import namedtuple
from powerdnsadmin.lib.validators import validate_apikey
from powerdnsadmin.lib.schema import ApiKeySchema
from tests.fixtures import client, initial_data, basic_auth_admin_headers
from tests.fixtures import user_apikey_data, admin_apikey_data, zone_data
from tests.conftest import user_apikey_data, admin_apikey_data
class TestIntegrationApiApiKeyAdminUser(object):
def test_empty_get(self, client, initial_data, basic_auth_admin_headers):
def test_empty_get(self, initial_data, client, basic_auth_admin_headers):
res = client.get("/api/v1/pdnsadmin/apikeys",
headers=basic_auth_admin_headers)
data = res.get_json(force=True)
@ -19,7 +18,7 @@ class TestIntegrationApiApiKeyAdminUser(object):
@pytest.mark.parametrize(
"apikey_data",
[user_apikey_data(), admin_apikey_data()])
def test_create_apikey(self, client, initial_data, apikey_data, zone_data,
def test_create_apikey(self, initial_data, client, apikey_data, zone_data,
basic_auth_admin_headers):
res = client.post("/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
@ -39,7 +38,7 @@ class TestIntegrationApiApiKeyAdminUser(object):
assert res.status_code == 201
apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}"
apikey_url = apikey_url_format.format(data[0]['id'])
apikey_url = apikey_url_format.format(data['id'])
res = client.delete(apikey_url, headers=basic_auth_admin_headers)
@ -54,7 +53,7 @@ class TestIntegrationApiApiKeyAdminUser(object):
@pytest.mark.parametrize(
"apikey_data",
[user_apikey_data(), admin_apikey_data()])
def test_get_multiple_apikey(self, client, initial_data, apikey_data,
def test_get_multiple_apikey(self, initial_data, client, apikey_data,
zone_data, basic_auth_admin_headers):
res = client.post("/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
@ -103,7 +102,7 @@ class TestIntegrationApiApiKeyAdminUser(object):
@pytest.mark.parametrize(
"apikey_data",
[user_apikey_data(), admin_apikey_data()])
def test_delete_apikey(self, client, initial_data, apikey_data, zone_data,
def test_delete_apikey(self, initial_data, client, apikey_data, zone_data,
basic_auth_admin_headers):
res = client.post("/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
@ -123,7 +122,7 @@ class TestIntegrationApiApiKeyAdminUser(object):
assert res.status_code == 201
apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}"
apikey_url = apikey_url_format.format(data[0]['id'])
apikey_url = apikey_url_format.format(data['id'])
res = client.delete(apikey_url, headers=basic_auth_admin_headers)
assert res.status_code == 204

View File

@ -1,14 +1,11 @@
import pytest
import json
from collections import namedtuple
from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_user_headers
from tests.fixtures import zone_data
class TestIntegrationApiZoneUser(object):
class TestIntegrationApiApiKeyUser(object):
def test_empty_get(self, initial_data, client, basic_auth_user_headers):
res = client.get("/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers)

View File

@ -14,8 +14,10 @@ class IntegrationApiManagement(object):
assert res.status_code == status_code
if res.status_code == 200:
data = res.get_json(force=True)
assert len(data) == 1
return data[0]
assert isinstance(data, dict)
assert len(data) == 7
assert data.get('id', None)
return data
return None
def check_account(self, cmpdata, data=None):
@ -37,8 +39,10 @@ class IntegrationApiManagement(object):
assert res.status_code == status_code
if status_code == 200:
data = res.get_json(force=True)
assert len(data) == 1
return data[0]
assert isinstance(data, dict)
assert len(data) == 7
assert data.get('id', None)
return data
return None
def check_user(self, cmpdata, data=None):
@ -50,5 +54,5 @@ class IntegrationApiManagement(object):
elif key == 'role':
assert data[key]['name'] == cmpdata['role_name']
else:
assert key in ("id",)
assert key in ("id","accounts",)
return data

View File

@ -1,9 +1,5 @@
import json
from tests.fixtures import ( # noqa: F401
client, initial_data, basic_auth_admin_headers,
test_admin_user, test_user, account_data, user1_data,
)
from . import IntegrationApiManagement
@ -89,8 +85,9 @@ class TestIntegrationApiManagementAdminUser(IntegrationApiManagement):
)
data = res.get_json(force=True)
assert res.status_code == 200
assert len(data) == 1
data = data[0]
assert isinstance(data, dict)
assert len(data) == 7
assert data.get('id', None)
account_id = data["id"]
for key, value in account_data.items():
assert data[key] == value
@ -142,10 +139,12 @@ class TestIntegrationApiManagementAdminUser(IntegrationApiManagement):
)
data = res.get_json(force=True)
assert res.status_code == 201
assert len(data) == 1
assert isinstance(data, dict)
assert len(data) == 6
assert data.get('id', None)
# Check user
user1 = self.check_user(user1_data, data[0])
user1 = self.check_user(user1_data, data)
user1_id = user1["id"]
updated = user1_data.copy()
@ -240,10 +239,12 @@ class TestIntegrationApiManagementAdminUser(IntegrationApiManagement):
)
data = res.get_json(force=True)
assert res.status_code == 201
assert len(data) == 1
assert isinstance(data, dict)
assert len(data) == 6
assert data.get('id', None)
# Check user
user1 = self.check_user(user1_data, data[0])
user1 = self.check_user(user1_data, data)
user1_id = user1["id"]
# Assert test account has no users

View File

@ -1,66 +1,61 @@
import json
from tests.fixtures import ( # noqa: F401
client, initial_data, basic_auth_admin_headers, basic_auth_user_headers,
test_admin_user, test_user, account_data, user1_data,
)
from . import IntegrationApiManagement
class TestIntegrationApiManagementUser(IntegrationApiManagement):
def test_accounts_empty_get(
self, client, initial_data, # noqa: F811
basic_auth_user_headers): # noqa: F811
def test_accounts_empty_get(self, initial_data, client, # noqa: F811
basic_auth_user_headers): # noqa: F811
res = client.get("/api/v1/pdnsadmin/accounts",
headers=basic_auth_user_headers)
assert res.status_code == 401
def test_users_empty_get(
self, client, initial_data, # noqa: F811
test_admin_user, test_user, # noqa: F811
basic_auth_user_headers): # noqa: F811
def test_users_empty_get(self, initial_data, client, # noqa: F811
test_admin_user, test_user, # noqa: F811
basic_auth_user_headers): # noqa: F811
res = client.get("/api/v1/pdnsadmin/users",
headers=basic_auth_user_headers)
assert res.status_code == 401
def test_self_get(
self, initial_data, client, test_user, # noqa: F811
basic_auth_user_headers): # noqa: F811
self.user = None
def test_self_get(self, initial_data, client, basic_auth_user_headers, test_user): # noqa: F811
res = client.get("/api/v1/pdnsadmin/users/{}".format(test_user),
headers=basic_auth_user_headers)
data = res.get_json(force=True)
assert res.status_code == 200
assert len(data) == 1, data
self.user = data
assert data
def test_accounts(
self, client, initial_data, # noqa: F811
account_data, # noqa: F811
basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811
def test_create_account_fail(self, client, initial_data, account_data, # noqa: F811
basic_auth_user_headers): # noqa: F811
# Create account (should fail)
res = client.post("/api/v1/pdnsadmin/accounts",
headers=basic_auth_user_headers,
data=json.dumps(account_data),
content_type="application/json")
assert res.status_code == 401
def test_create_account_as_admin(self, app, initial_data, client, account_data, # noqa: F811
basic_auth_admin_headers): # noqa: F811
self.client = client
self.basic_auth_admin_headers = basic_auth_admin_headers
# Create account (should fail)
res = client.post(
"/api/v1/pdnsadmin/accounts",
headers=basic_auth_user_headers,
data=json.dumps(account_data),
content_type="application/json",
)
assert res.status_code == 401
with app.test_request_context():
# Create account (as admin)
res = client.post("/api/v1/pdnsadmin/accounts",
headers=basic_auth_admin_headers,
data=json.dumps(account_data),
content_type="application/json")
data = res.get_json(force=True)
assert res.status_code == 201
# Create account (as admin)
res = client.post(
"/api/v1/pdnsadmin/accounts",
headers=basic_auth_admin_headers,
data=json.dumps(account_data),
content_type="application/json",
)
data = res.get_json(force=True)
assert res.status_code == 201
def test_update_account_fail(
self, initial_data, client, # noqa: F811
account_data, # noqa: F811
basic_auth_user_headers,
basic_auth_admin_headers): # noqa: F811
self.client = client
self.basic_auth_admin_headers = basic_auth_admin_headers
# Check account
data = self.check_account(account_data)
@ -75,6 +70,18 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement):
)
assert res.status_code == 401
def test_delete_account_fail(
self, initial_data, client, # noqa: F811
account_data, # noqa: F811
basic_auth_user_headers,
basic_auth_admin_headers): # noqa: F811
self.client = client
self.basic_auth_admin_headers = basic_auth_admin_headers
# Check account
data = self.check_account(account_data)
account_id = data["id"]
# Delete account (should fail)
res = client.delete(
"/api/v1/pdnsadmin/accounts/{}".format(account_id),
@ -84,6 +91,17 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement):
)
assert res.status_code == 401
def test_delete_account_as_admin(
self, client, initial_data, # noqa: F811
account_data, # noqa: F811
basic_auth_admin_headers): # noqa: F811
self.client = client
self.basic_auth_admin_headers = basic_auth_admin_headers
# Check account
data = self.check_account(account_data)
account_id = data["id"]
# Cleanup (delete account as admin)
res = client.delete(
"/api/v1/pdnsadmin/accounts/{}".format(account_id),
@ -94,8 +112,8 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement):
assert res.status_code == 204
def test_users(
self, client, initial_data, # noqa: F811
user1_data, # noqa: F811
self, client, initial_data, # noqa: F811
user1_data, # noqa: F811
basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811
self.client = client
self.basic_auth_admin_headers = basic_auth_admin_headers
@ -118,10 +136,12 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement):
)
data = res.get_json(force=True)
assert res.status_code == 201
assert len(data) == 1
assert isinstance(data, dict)
assert len(data) == 6
assert data.get('id', None)
# Check user
user1 = self.check_user(user1_data, data[0])
user1 = self.check_user(user1_data, data)
user1_id = user1["id"]
# Update to defaults (should fail)
@ -152,8 +172,8 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement):
assert res.status_code == 204
def test_account_users(
self, client, initial_data, # noqa: F811
account_data, user1_data, # noqa: F811
self, client, initial_data, # noqa: F811
account_data, user1_data, # noqa: F811
basic_auth_admin_headers, basic_auth_user_headers): # noqa: F811
self.client = client
self.basic_auth_admin_headers = basic_auth_admin_headers
@ -181,10 +201,12 @@ class TestIntegrationApiManagementUser(IntegrationApiManagement):
)
data = res.get_json(force=True)
assert res.status_code == 201
assert len(data) == 1
assert isinstance(data, dict)
assert len(data) == 6
assert data.get('id', None)
# Check user
user1 = self.check_user(user1_data, data[0])
user1 = self.check_user(user1_data, data)
user1_id = user1["id"]
# Assert test account has no users

View File

@ -1,11 +1,8 @@
import pytest
import json
from collections import namedtuple
from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_admin_headers
from tests.fixtures import zone_data
class TestIntegrationApiZoneAdminUser(object):

View File

@ -3,9 +3,6 @@ from collections import namedtuple
from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client
from tests.fixtures import zone_data, initial_apikey_data
from tests.fixtures import admin_apikey_integration
class TestIntegrationApiZoneAdminApiKey(object):

View File

@ -3,9 +3,6 @@ from collections import namedtuple
from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client
from tests.fixtures import zone_data, initial_apikey_data
from tests.fixtures import user_apikey_integration
class TestIntegrationApiZoneUserApiKey(object):

View File

@ -3,8 +3,6 @@ from collections import namedtuple
from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_user_headers
from tests.fixtures import zone_data
class TestIntegrationApiZoneUser(object):

View File

@ -2,9 +2,6 @@ import json
import pytest
from unittest.mock import patch
from collections import namedtuple
import sys
import os
sys.path.append(os.getcwd())
import powerdnsadmin
from powerdnsadmin.models.setting import Setting
@ -12,79 +9,108 @@ from powerdnsadmin.models.domain import Domain
from powerdnsadmin.models.api_key import ApiKey
from powerdnsadmin.models.role import Role
from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema, ApiKeySchema
from tests.fixtures import client, initial_data, created_zone_data
from tests.fixtures import user_apikey, admin_apikey, zone_data
from tests.fixtures import admin_apikey_data, load_data
from powerdnsadmin.lib.schema import DomainSchema
from tests.conftest import admin_apikey_data, load_data
class TestUnitApiZoneAdminApiKey(object):
@pytest.fixture
def common_data_mock(self):
self.google_setting_patcher = patch(
'powerdnsadmin.services.google.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.github_setting_patcher = patch(
'powerdnsadmin.services.github.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.oidc_setting_patcher = patch(
'powerdnsadmin.services.oidc.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.helpers_setting_patcher = patch(
'powerdnsadmin.lib.helper.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.models_setting_patcher = patch(
'powerdnsadmin.models.setting.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.domain_model_setting_patcher = patch(
'powerdnsadmin.models.domain.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.record_model_setting_patcher = patch(
'powerdnsadmin.models.record.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.server_model_setting_patcher = patch(
'powerdnsadmin.models.server.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_apikey_patcher = patch(
'powerdnsadmin.decorators.ApiKey',
spec=powerdnsadmin.models.api_key.ApiKey)
self.mock_hist_patcher = patch(
'powerdnsadmin.routes.api.History',
spec=powerdnsadmin.models.history.History)
self.mock_setting_patcher = patch(
'powerdnsadmin.routes.api.Setting',
spec=powerdnsadmin.models.setting.Setting)
def common_data_mock(self, app):
with app.app_context():
self.google_setting_patcher = patch(
'powerdnsadmin.services.google.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.github_setting_patcher = patch(
'powerdnsadmin.services.github.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.azure_setting_patcher = patch(
'powerdnsadmin.services.azure.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.oidc_setting_patcher = patch(
'powerdnsadmin.services.oidc.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.helpers_setting_patcher = patch(
'powerdnsadmin.lib.helper.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.models_setting_patcher = patch(
'powerdnsadmin.models.setting.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.domain_model_setting_patcher = patch(
'powerdnsadmin.models.domain.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.record_model_setting_patcher = patch(
'powerdnsadmin.models.record.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.server_model_setting_patcher = patch(
'powerdnsadmin.models.server.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_apikey_patcher = patch(
'powerdnsadmin.decorators.ApiKey',
spec=powerdnsadmin.models.api_key.ApiKey)
self.mock_hist_patcher = patch(
'powerdnsadmin.routes.api.History',
spec=powerdnsadmin.models.history.History)
self.mock_setting_patcher = patch(
'powerdnsadmin.routes.api.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_decorators_setting_patcher = patch(
'powerdnsadmin.decorators.Setting',
spec=powerdnsadmin.models.setting.Setting)
data = admin_apikey_data()
api_key = ApiKey(desc=data['description'],
role_name=data['role'],
domains=[])
api_key.role = Role(name=data['role'])
data = admin_apikey_data()
api_key = ApiKey(desc=data['description'],
role_name=data['role'],
domains=[])
api_key.role = Role(name=data['role'])
self.mock_google_setting = self.google_setting_patcher.start()
self.mock_github_setting = self.github_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
)
self.mock_record_model_setting = self.record_model_setting_patcher.start(
)
self.mock_server_model_setting = self.server_model_setting_patcher.start(
)
self.mock_apikey = self.mock_apikey_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_setting = self.mock_setting_patcher.start()
self.mock_google_setting = self.google_setting_patcher.start()
self.mock_github_setting = self.github_setting_patcher.start()
self.mock_azure_setting = self.azure_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
)
self.mock_record_model_setting = self.record_model_setting_patcher.start(
)
self.mock_server_model_setting = self.server_model_setting_patcher.start(
)
self.mock_apikey = self.mock_apikey_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_setting = self.mock_setting_patcher.start()
self.mock_decorators_setting = self.mock_decorators_setting_patcher.start()
self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_azure_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.mock_decorators_setting.return_value.get.side_effect = load_data
self.mock_apikey.return_value.is_validate.return_value = api_key
yield
for patcher in [
self.google_setting_patcher,
self.github_setting_patcher,
self.azure_setting_patcher,
self.oidc_setting_patcher,
self.helpers_setting_patcher,
self.models_setting_patcher,
self.domain_model_setting_patcher,
self.record_model_setting_patcher,
self.server_model_setting_patcher,
self.mock_apikey_patcher,
self.mock_hist_patcher,
self.mock_setting_patcher,
self.mock_decorators_setting_patcher,
]:
patcher.stop()
self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.mock_apikey.return_value.is_validate.return_value = api_key
def test_empty_get(self, client, common_data_mock, admin_apikey):
with patch('powerdnsadmin.routes.api.Domain') as mock_domain, \

View File

@ -5,37 +5,33 @@ from collections import namedtuple
import powerdnsadmin
from powerdnsadmin.models.user import User
from powerdnsadmin.models.role import Role
from powerdnsadmin.models.domain import Domain
from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, basic_auth_admin_headers
from tests.fixtures import zone_data, created_zone_data, load_data
from tests.conftest import load_data
class TestUnitApiZoneAdminUser(object):
@pytest.fixture
def common_data_mock(self):
def common_data_mock(self, app, initial_data):
self.google_setting_patcher = patch(
'powerdnsadmin.services.google.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.github_setting_patcher = patch(
'powerdnsadmin.services.github.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.azure_setting_patcher = patch(
'powerdnsadmin.services.azure.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.oidc_setting_patcher = patch(
'powerdnsadmin.services.oidc.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.api_setting_patcher = patch(
'powerdnsadmin.routes.api.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.base_route_user_patcher = patch(
'powerdnsadmin.routes.base.User',
spec=powerdnsadmin.models.user.User)
self.helpers_setting_patcher = patch(
'powerdnsadmin.lib.helper.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.models_setting_patcher = patch(
'powerdnsadmin.models.Setting',
'powerdnsadmin.models.setting.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.domain_model_setting_patcher = patch(
'powerdnsadmin.models.domain.Setting',
@ -46,50 +42,81 @@ class TestUnitApiZoneAdminUser(object):
self.server_model_setting_patcher = patch(
'powerdnsadmin.models.server.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.decorators_setting_patcher = patch(
'powerdnsadmin.decorators.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_user_patcher = patch('powerdnsadmin.decorators.User',
spec=powerdnsadmin.models.user.User)
self.mock_user_patcher = patch(
'powerdnsadmin.decorators.User',
spec=powerdnsadmin.models.user.User)
self.mock_hist_patcher = patch(
'powerdnsadmin.routes.api.History',
spec=powerdnsadmin.models.history.History)
self.mock_setting_patcher = patch(
'powerdnsadmin.routes.api.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_decorators_setting_patcher = patch(
'powerdnsadmin.decorators.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.base_route_user_patcher = patch(
'powerdnsadmin.routes.base.User',
spec=powerdnsadmin.models.user.User)
self.mock_google_setting = self.google_setting_patcher.start()
self.mock_github_setting = self.github_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_base_route_user = self.base_route_user_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
)
self.mock_record_model_setting = self.record_model_setting_patcher.start(
)
self.mock_server_model_setting = self.server_model_setting_patcher.start(
)
self.decorators_setting = self.decorators_setting_patcher.start()
self.api_setting = self.api_setting_patcher.start()
self.mock_user = self.mock_user_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
with app.app_context():
self.mock_google_setting = self.google_setting_patcher.start()
self.mock_github_setting = self.github_setting_patcher.start()
self.mock_azure_setting = self.azure_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_base_route_user = self.base_route_user_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
)
self.mock_record_model_setting = self.record_model_setting_patcher.start(
)
self.mock_server_model_setting = self.server_model_setting_patcher.start(
)
self.mock_user = self.mock_user_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_setting = self.mock_setting_patcher.start()
self.mock_decorators_setting = self.mock_decorators_setting_patcher.start()
self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.decorators_setting.return_value.get.side_effect = load_data
self.api_setting.return_value.get.side_effect = load_data
self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_azure_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.mock_decorators_setting.return_value.get.side_effect = load_data
self.mock_setting.return_value.get.side_effect = load_data
self.mockk = MagicMock()
self.mockk.role.name = "Administrator"
self.mockk = MagicMock()
self.mockk.role.name = "Administrator"
self.mock_user.query.filter.return_value.first.return_value = self.mockk
self.mock_user.return_value.is_validate.return_value = True
self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk
self.mock_base_route_user.return_value.is_validate.return_value = True
yield
for patcher in [
self.google_setting_patcher,
self.github_setting_patcher,
self.azure_setting_patcher,
self.oidc_setting_patcher,
self.base_route_user_patcher,
self.helpers_setting_patcher,
self.models_setting_patcher,
self.domain_model_setting_patcher,
self.record_model_setting_patcher,
self.server_model_setting_patcher,
self.mock_user_patcher,
self.mock_hist_patcher,
self.mock_setting_patcher,
self.mock_decorators_setting_patcher,
]:
patcher.stop()
self.mock_user.query.filter.return_value.first.return_value = self.mockk
self.mock_user.return_value.is_validate.return_value = True
self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk
self.mock_base_route_user.return_value.is_validate.return_value = True
def test_empty_get(self, client, common_data_mock,
basic_auth_admin_headers):
@ -135,8 +162,7 @@ class TestUnitApiZoneAdminUser(object):
headers=basic_auth_admin_headers)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain",
data[0].keys())(*data[0].values())
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))

View File

@ -5,37 +5,33 @@ from collections import namedtuple
import powerdnsadmin
from powerdnsadmin.models.user import User
from powerdnsadmin.models.role import Role
from powerdnsadmin.models.domain import Domain
from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema
from tests.fixtures import client, basic_auth_user_headers
from tests.fixtures import zone_data, created_zone_data, load_data
from tests.conftest import load_data
class TestUnitApiZoneUser(object):
@pytest.fixture
def common_data_mock(self):
def common_data_mock(self, app, initial_data):
self.google_setting_patcher = patch(
'powerdnsadmin.services.google.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.github_setting_patcher = patch(
'powerdnsadmin.services.github.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.azure_setting_patcher = patch(
'powerdnsadmin.services.azure.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.oidc_setting_patcher = patch(
'powerdnsadmin.services.oidc.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.api_setting_patcher = patch(
'powerdnsadmin.routes.api.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.base_route_user_patcher = patch(
'powerdnsadmin.routes.base.User',
spec=powerdnsadmin.models.user.User)
self.helpers_setting_patcher = patch(
'powerdnsadmin.lib.helper.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.models_setting_patcher = patch(
'powerdnsadmin.models.Setting',
'powerdnsadmin.models.setting.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.domain_model_setting_patcher = patch(
'powerdnsadmin.models.domain.Setting',
@ -46,55 +42,86 @@ class TestUnitApiZoneUser(object):
self.server_model_setting_patcher = patch(
'powerdnsadmin.models.server.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.decorators_setting_patcher = patch(
'powerdnsadmin.decorators.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_user_patcher = patch('powerdnsadmin.decorators.User',
spec=powerdnsadmin.models.user.User)
self.mock_user_patcher = patch(
'powerdnsadmin.decorators.User',
spec=powerdnsadmin.models.user.User)
self.mock_hist_patcher = patch(
'powerdnsadmin.routes.api.History',
spec=powerdnsadmin.models.history.History)
self.mock_setting_patcher = patch(
'powerdnsadmin.routes.api.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_decorators_setting_patcher = patch(
'powerdnsadmin.decorators.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.base_route_user_patcher = patch(
'powerdnsadmin.routes.base.User',
spec=powerdnsadmin.models.user.User)
self.mock_google_setting = self.google_setting_patcher.start()
self.mock_github_setting = self.github_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_base_route_user = self.base_route_user_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
)
self.mock_record_model_setting = self.record_model_setting_patcher.start(
)
self.mock_server_model_setting = self.server_model_setting_patcher.start(
)
self.decorators_setting = self.decorators_setting_patcher.start()
self.api_setting = self.api_setting_patcher.start()
self.mock_user = self.mock_user_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
with app.app_context():
self.mock_google_setting = self.google_setting_patcher.start()
self.mock_github_setting = self.github_setting_patcher.start()
self.mock_azure_setting = self.azure_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_base_route_user = self.base_route_user_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
)
self.mock_record_model_setting = self.record_model_setting_patcher.start(
)
self.mock_server_model_setting = self.server_model_setting_patcher.start(
)
self.mock_user = self.mock_user_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_setting = self.mock_setting_patcher.start()
self.mock_decorators_setting = self.mock_decorators_setting_patcher.start()
self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.decorators_setting.return_value.get.side_effect = load_data
self.api_setting.return_value.get.side_effect = load_data
self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_azure_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.mock_decorators_setting.return_value.get.side_effect = load_data
self.mock_setting.return_value.get.side_effect = load_data
self.mockk = MagicMock()
self.mockk.role.name = "User"
self.mockk = MagicMock()
self.mockk.role.name = "User"
self.mock_user.query.filter.return_value.first.return_value = self.mockk
self.mock_user.return_value.is_validate.return_value = True
self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk
self.mock_base_route_user.return_value.is_validate.return_value = True
yield
for patcher in [
self.google_setting_patcher,
self.github_setting_patcher,
self.azure_setting_patcher,
self.oidc_setting_patcher,
self.base_route_user_patcher,
self.helpers_setting_patcher,
self.models_setting_patcher,
self.domain_model_setting_patcher,
self.record_model_setting_patcher,
self.server_model_setting_patcher,
self.mock_user_patcher,
self.mock_hist_patcher,
self.mock_setting_patcher,
self.mock_decorators_setting_patcher,
]:
patcher.stop()
self.mock_user.query.filter.return_value.first.return_value = self.mockk
self.mock_user.return_value.is_validate.return_value = True
self.mock_base_route_user.query.filter.return_value.first.return_value = self.mockk
self.mock_base_route_user.return_value.is_validate.return_value = True
def test_create_zone(self, client, common_data_mock, zone_data,
basic_auth_user_headers, created_zone_data):
with patch('powerdnsadmin.lib.helper.requests.request') as mock_post, \
patch('powerdnsadmin.routes.api.Domain') as mock_domain:
patch('powerdnsadmin.routes.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {}
@ -115,8 +142,9 @@ class TestUnitApiZoneUser(object):
with patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains:
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_user_domains.return_value = [test_domain]
res = client.get("/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers)
headers=basic_auth_user_headers)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
@ -127,9 +155,9 @@ class TestUnitApiZoneUser(object):
def test_delete_zone(self, client, common_data_mock, zone_data,
basic_auth_user_headers):
with patch('powerdnsadmin.lib.utils.requests.request') as mock_delete, \
patch('powerdnsadmin.routes.api.Domain') as mock_domain, \
patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains:
with patch('powerdnsadmin.lib.helper.requests.request') as mock_delete, \
patch('powerdnsadmin.routes.api.Domain') as mock_domain, \
patch('powerdnsadmin.routes.api.get_user_domains') as mock_user_domains:
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_domain.return_value.update.return_value = True

View File

@ -1,7 +1,6 @@
import json
import pytest
from unittest.mock import patch
from base64 import b64encode
from collections import namedtuple
import powerdnsadmin
@ -10,78 +9,111 @@ from powerdnsadmin.models.domain import Domain
from powerdnsadmin.models.api_key import ApiKey
from powerdnsadmin.models.role import Role
from powerdnsadmin.lib.validators import validate_zone
from powerdnsadmin.lib.schema import DomainSchema, ApiKeySchema
from tests.fixtures import client, initial_data, created_zone_data
from tests.fixtures import user_apikey, zone_data
from tests.fixtures import user_apikey_data, load_data
from powerdnsadmin.lib.schema import DomainSchema
from tests.conftest import user_apikey_data, load_data
class TestUnitApiZoneUserApiKey(object):
@pytest.fixture
def common_data_mock(self):
self.google_setting_patcher = patch(
'powerdnsadmin.services.google.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.github_setting_patcher = patch(
'powerdnsadmin.services.github.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.oidc_setting_patcher = patch(
'powerdnsadmin.services.oidc.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.helpers_setting_patcher = patch(
'powerdnsadmin.lib.helper.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.models_setting_patcher = patch(
'powerdnsadmin.models.setting.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.domain_model_setting_patcher = patch(
'powerdnsadmin.models.domain.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.record_model_setting_patcher = patch(
'powerdnsadmin.models.record.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.server_model_setting_patcher = patch(
'powerdnsadmin.models.server.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_apikey_patcher = patch(
'powerdnsadmin.decorators.ApiKey',
spec=powerdnsadmin.models.api_key.ApiKey)
self.mock_hist_patcher = patch(
'powerdnsadmin.routes.api.History',
spec=powerdnsadmin.models.history.History)
def common_data_mock(self, app):
with app.app_context():
self.google_setting_patcher = patch(
'powerdnsadmin.services.google.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.github_setting_patcher = patch(
'powerdnsadmin.services.github.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.azure_setting_patcher = patch(
'powerdnsadmin.services.azure.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.oidc_setting_patcher = patch(
'powerdnsadmin.services.oidc.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.helpers_setting_patcher = patch(
'powerdnsadmin.lib.helper.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.models_setting_patcher = patch(
'powerdnsadmin.models.setting.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.domain_model_setting_patcher = patch(
'powerdnsadmin.models.domain.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.record_model_setting_patcher = patch(
'powerdnsadmin.models.record.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.server_model_setting_patcher = patch(
'powerdnsadmin.models.server.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_apikey_patcher = patch(
'powerdnsadmin.decorators.ApiKey',
spec=powerdnsadmin.models.api_key.ApiKey)
self.mock_hist_patcher = patch(
'powerdnsadmin.routes.api.History',
spec=powerdnsadmin.models.history.History)
self.mock_setting_patcher = patch(
'powerdnsadmin.routes.api.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_decorators_setting_patcher = patch(
'powerdnsadmin.decorators.Setting',
spec=powerdnsadmin.models.setting.Setting)
self.mock_google_setting = self.google_setting_patcher.start()
self.mock_github_setting = self.github_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
)
self.mock_record_model_setting = self.record_model_setting_patcher.start(
)
self.mock_server_model_setting = self.server_model_setting_patcher.start(
)
self.mock_apikey = self.mock_apikey_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_google_setting = self.google_setting_patcher.start()
self.mock_github_setting = self.github_setting_patcher.start()
self.mock_azure_setting = self.azure_setting_patcher.start()
self.mock_oidc_setting = self.oidc_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.mock_domain_model_setting = self.domain_model_setting_patcher.start(
)
self.mock_record_model_setting = self.record_model_setting_patcher.start(
)
self.mock_server_model_setting = self.server_model_setting_patcher.start(
)
self.mock_apikey = self.mock_apikey_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_setting = self.mock_setting_patcher.start()
self.mock_decorators_setting = self.mock_decorators_setting_patcher.start()
self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.mock_google_setting.return_value.get.side_effect = load_data
self.mock_github_setting.return_value.get.side_effect = load_data
self.mock_azure_setting.return_value.get.side_effect = load_data
self.mock_oidc_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_domain_model_setting.return_value.get.side_effect = load_data
self.mock_record_model_setting.return_value.get.side_effect = load_data
self.mock_server_model_setting.return_value.get.side_effect = load_data
self.mock_decorators_setting.return_value.get.side_effect = load_data
data = user_apikey_data()
domain = Domain(name=data['domains'][0])
data = user_apikey_data()
domain = Domain(name=data['domains'][0])
api_key = ApiKey(desc=data['description'],
role_name=data['role'],
domains=[domain])
api_key.role = Role(name=data['role'])
api_key = ApiKey(desc=data['description'],
role_name=data['role'],
domains=[domain])
api_key.role = Role(name=data['role'])
self.mock_apikey.return_value.is_validate.return_value = api_key
yield
for patcher in [
self.google_setting_patcher,
self.github_setting_patcher,
self.azure_setting_patcher,
self.oidc_setting_patcher,
self.helpers_setting_patcher,
self.models_setting_patcher,
self.domain_model_setting_patcher,
self.record_model_setting_patcher,
self.server_model_setting_patcher,
self.mock_apikey_patcher,
self.mock_hist_patcher,
self.mock_setting_patcher,
self.mock_decorators_setting_patcher,
]:
patcher.stop()
self.mock_apikey.return_value.is_validate.return_value = api_key
def test_create_zone(self, client, common_data_mock, zone_data,
user_apikey, created_zone_data):
@ -112,8 +144,7 @@ class TestUnitApiZoneUserApiKey(object):
headers=user_apikey)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain",
data[0].keys())(*data[0].values())
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))