Merge pull request #440 from p53/create-api

Create api - resolves #416
This commit is contained in:
Khanh Ngo 2019-03-02 09:44:06 +07:00 committed by GitHub
commit ff3b484d84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 5019 additions and 226 deletions

2
.gitignore vendored
View File

@ -42,3 +42,5 @@ node_modules
.webassets-cache
app/static/generated
.venv*
.pytest_cache

View File

@ -1,24 +1,5 @@
language: python
python:
- "3.5.2"
before_install:
- sudo apt-key adv --fetch-keys http://dl.yarnpkg.com/debian/pubkey.gpg
- echo "deb http://dl.yarnpkg.com/debian/ stable main" | sudo tee /etc/apt/sources.list.d/yarn.list
- travis_retry sudo apt-get update
- travis_retry sudo apt-get install python3-dev libxml2-dev libxmlsec1-dev yarn
- mysql -e 'CREATE DATABASE pda';
- mysql -e "GRANT ALL PRIVILEGES ON pda.* to pda@'%' IDENTIFIED BY 'changeme'";
install:
- pip install -r requirements.txt
before_script:
- mv config_template.py config.py
- export FLASK_APP=app/__init__.py
- flask db upgrade
- yarn install --pure-lockfile
- flask assets build
language: minimal
script:
- sh run_travis.sh
cache:
yarn: true
- docker-compose -f docker-compose-test.yml up --exit-code-from powerdns-admin --abort-on-container-exit
services:
- mysql
- docker

134
README.md
View File

@ -40,3 +40,137 @@ You can now access PowerDNS-Admin at url http://localhost:9191
### Screenshots
![dashboard](https://user-images.githubusercontent.com/6447444/44068603-0d2d81f6-9fa5-11e8-83af-14e2ad79e370.png)
### Running tests
**NOTE:** Tests will create `__pycache__` folders which will be owned by root, which might be issue during rebuild
thus (e.g. invalid tar headers message) when such situation occurs, you need to remove those folders as root
1. Build images
```
docker-compose -f docker-compose-test.yml build
```
2. Run tests
```
docker-compose -f docker-compose-test.yml up
```
3. Rerun tests
```
docker-compose -f docker-compose-test.yml down
```
To teardown previous environment
```
docker-compose -f docker-compose-test.yml up
```
To run tests again
### API Usage
1. run docker image docker-compose up, go to UI http://localhost:9191, at http://localhost:9191/swagger is swagger API specification
2. click to register user, type e.g. user: admin and password: admin
3. login to UI in settings enable allow domain creation for users,
now you can create and manage domains with admin account and also ordinary users
4. Encode your user and password to base64, in our example we have user admin and password admin so in linux cmd line we type:
```
someuser@somehost:~$echo -n 'admin:admin'|base64
YWRtaW46YWRtaW4=
```
we use generated output in basic authentication, we auhtenticate as user,
with basic authentication, we can create/delete/get zone and create/delete/get/update apikeys
creating domain:
```
curl -L -vvv -H 'Content-Type: application/json' -H 'Authorization: Basic YWRtaW46YWRtaW4=' -X POST http://localhost:9191/api/v1/pdnsadmin/zones --data '{"name": "yourdomain.com.", "kind": "NATIVE", "nameservers": ["ns1.mydomain.com."]}'
```
creating apikey which has Administrator role, apikey can have also User role, when creating such apikey you have to specify also domain for which apikey is valid:
```
curl -L -vvv -H 'Content-Type: application/json' -H 'Authorization: Basic YWRtaW46YWRtaW4=' -X POST http://localhost:9191/api/v1/pdnsadmin/apikeys --data '{"description": "masterkey","domains":[], "role": "Administrator"}'
```
call above will return response like this:
```
[{"description": "samekey", "domains": [], "role": {"name": "Administrator", "id": 1}, "id": 2, "plain_key": "aGCthP3KLAeyjZI"}]
```
we take plain_key and base64 encode it, this is the only time we can get API key in plain text and save it somewhere:
```
someuser@somehost:~$echo -n 'aGCthP3KLAeyjZI'|base64
YUdDdGhQM0tMQWV5alpJ
```
We can use apikey for all calls specified in our API specification (it tries to follow powerdns API 1:1, only tsigkeys endpoints are not yet implemented), don't forget to specify Content-Type!
getting powerdns configuration:
```
curl -L -vvv -H 'Content-Type: application/json' -H 'X-API-KEY: YUdDdGhQM0tMQWV5alpJ' -X GET http://localhost:9191/api/v1/servers/localhost/config
```
creating and updating records:
```
curl -X PATCH -H 'Content-Type: application/json' --data '{"rrsets": [{"name": "test1.yourdomain.com.","type": "A","ttl": 86400,"changetype": "REPLACE","records": [ {"content": "192.0.2.5", "disabled": false} ]},{"name": "test2.yourdomain.com.","type": "AAAA","ttl": 86400,"changetype": "REPLACE","records": [ {"content": "2001:db8::6", "disabled": false} ]}]}' -H 'X-API-Key: YUdDdGhQM0tMQWV5alpJ' http://127.0.0.1:9191/api/v1/servers/localhost/zones/yourdomain.com.
```
getting domain:
```
curl -L -vvv -H 'Content-Type: application/json' -H 'X-API-KEY: YUdDdGhQM0tMQWV5alpJ' -X GET http://localhost:9191/api/v1/servers/localhost/zones/yourdomain.com
```
list zone records:
```
curl -H 'Content-Type: application/json' -H 'X-API-Key: YUdDdGhQM0tMQWV5alpJ' http://localhost:9191/api/v1/servers/localhost/zones/yourdomain.com
```
add new record:
```
curl -H 'Content-Type: application/json' -X PATCH --data '{"rrsets": [ {"name": "test.yourdomain.com.", "type": "A", "ttl": 86400, "changetype": "REPLACE", "records": [ {"content": "192.0.5.4", "disabled": false } ] } ] }' -H 'X-API-Key: YUdDdGhQM0tMQWV5alpJ' http://localhost:9191/api/v1/servers/localhost/zones/yourdomain.com | jq .
```
update record:
```
curl -H 'Content-Type: application/json' -X PATCH --data '{"rrsets": [ {"name": "test.yourdomain.com.", "type": "A", "ttl": 86400, "changetype": "REPLACE", "records": [ {"content": "192.0.2.5", "disabled": false, "name": "test.yourdomain.com.", "ttl": 86400, "type": "A"}]}]}' -H 'X-API-Key: YUdDdGhQM0tMQWV5alpJ' http://localhost:9191/api/v1/servers/localhost/zones/yourdomain.com | jq .
```
delete record:
```
curl -H 'Content-Type: application/json' -X PATCH --data '{"rrsets": [ {"name": "test.yourdomain.com.", "type": "A", "ttl": 86400, "changetype": "DELETE"}]}' -H 'X-API-Key: YUdDdGhQM0tMQWV5alpJ' http://localhost:9191/api/v1/servers/localhost/zones/yourdomain.com | jq
```
### Generate ER diagram
```
apt-get install python-dev graphviz libgraphviz-dev pkg-config
```
```
pip install graphviz mysqlclient ERAlchemy
```
```
docker-compose up -d
```
```
eralchemy -i 'mysql://powerdns_admin:changeme@'$(docker inspect powerdns-admin-mysql|jq -jr '.[0].NetworkSettings.Networks.powerdnsadmin_default.IPAddress')':3306/powerdns_admin' -o /tmp/output.pdf
```

View File

@ -43,4 +43,9 @@ if app.config.get('SAML_ENABLED') and app.config.get('SAML_ENCRYPT'):
certutil.create_self_signed_cert()
from app import models
from app.blueprints.api import api_blueprint
app.register_blueprint(api_blueprint, url_prefix='/api/v1')
from app import views

511
app/blueprints/api.py Normal file
View File

@ -0,0 +1,511 @@
import json
from flask import Blueprint, g, request, abort
from app.models import Domain, History, Setting, ApiKey
from app.lib import utils, helper
from app.decorators import api_basic_auth, api_can_create_domain, is_json
from app.decorators import apikey_auth, apikey_is_admin
from app.decorators import apikey_can_access_domain
from app import csrf
from app.errors import DomainNotExists, DomainAccessForbidden, RequestIsNotJSON
from app.errors import ApiKeyCreateFail, ApiKeyNotUsable, NotEnoughPrivileges
from app.schema import ApiKeySchema, DomainSchema, ApiPlainKeySchema
from urllib.parse import urljoin
from app.lib.log import logging
api_blueprint = Blueprint('api_blueprint', __name__)
apikey_schema = ApiKeySchema(many=True)
domain_schema = DomainSchema(many=True)
apikey_plain_schema = ApiPlainKeySchema(many=True)
@api_blueprint.errorhandler(400)
def handle_400(err):
return json.dumps({"msg": "Bad Request"}), 400
@api_blueprint.errorhandler(401)
def handle_401(err):
return json.dumps({"msg": "Unauthorized"}), 401
@api_blueprint.errorhandler(500)
def handle_500(err):
return json.dumps({"msg": "Internal Server Error"}), 500
@api_blueprint.errorhandler(DomainNotExists)
def handle_domain_not_exists(err):
return json.dumps(err.to_dict()), err.status_code
@api_blueprint.errorhandler(DomainAccessForbidden)
def handle_domain_access_forbidden(err):
return json.dumps(err.to_dict()), err.status_code
@api_blueprint.errorhandler(ApiKeyCreateFail)
def handle_apikey_create_fail(err):
return json.dumps(err.to_dict()), err.status_code
@api_blueprint.errorhandler(ApiKeyNotUsable)
def handle_apikey_not_usable(err):
return json.dumps(err.to_dict()), err.status_code
@api_blueprint.errorhandler(NotEnoughPrivileges)
def handle_not_enough_privileges(err):
return json.dumps(err.to_dict()), err.status_code
@api_blueprint.errorhandler(RequestIsNotJSON)
def handle_request_is_not_json(err):
return json.dumps(err.to_dict()), err.status_code
@api_blueprint.before_request
@is_json
def before_request():
pass
@csrf.exempt
@api_blueprint.route('/pdnsadmin/zones', methods=['POST'])
@api_basic_auth
@api_can_create_domain
def api_login_create_zone():
pdns_api_url = Setting().get('pdns_api_url')
pdns_api_key = Setting().get('pdns_api_key')
pdns_version = Setting().get('pdns_version')
api_uri_with_prefix = utils.pdns_api_extended_uri(pdns_version)
api_full_uri = api_uri_with_prefix + '/servers/localhost/zones'
headers = {}
headers['X-API-Key'] = pdns_api_key
msg_str = "Sending request to powerdns API {0}"
msg = msg_str.format(request.get_json(force=True))
logging.debug(msg)
resp = utils.fetch_remote(
urljoin(pdns_api_url, api_full_uri),
method='POST',
data=request.get_json(force=True),
headers=headers,
accept='application/json; q=1'
)
if resp.status_code == 201:
logging.debug("Request to powerdns API successful")
data = request.get_json(force=True)
history = History(
msg='Add domain {0}'.format(data['name'].rstrip('.')),
detail=json.dumps(data),
created_by=g.user.username
)
history.add()
if g.user.role.name not in ['Administrator', 'Operator']:
logging.debug("User is ordinary user, assigning created domain")
domain = Domain(name=data['name'].rstrip('.'))
domain.update()
domain.grant_privileges([g.user.username])
domain = Domain()
domain.update()
return resp.content, resp.status_code, resp.headers.items()
@csrf.exempt
@api_blueprint.route('/pdnsadmin/zones', methods=['GET'])
@api_basic_auth
def api_login_list_zones():
if g.user.role.name not in ['Administrator', 'Operator']:
domain_obj_list = g.user.get_domains()
else:
domain_obj_list = Domain.query.all()
domain_obj_list = [] if domain_obj_list is None else domain_obj_list
return json.dumps(domain_schema.dump(domain_obj_list)), 200
@csrf.exempt
@api_blueprint.route(
'/pdnsadmin/zones/<string:domain_name>',
methods=['DELETE']
)
@api_basic_auth
@api_can_create_domain
def api_login_delete_zone(domain_name):
pdns_api_url = Setting().get('pdns_api_url')
pdns_api_key = Setting().get('pdns_api_key')
pdns_version = Setting().get('pdns_version')
api_uri_with_prefix = utils.pdns_api_extended_uri(pdns_version)
api_full_uri = api_uri_with_prefix + '/servers/localhost/zones'
api_full_uri += '/' + domain_name
headers = {}
headers['X-API-Key'] = pdns_api_key
domain = Domain.query.filter(Domain.name == domain_name)
if not domain:
abort(404)
if g.user.role.name not in ['Administrator', 'Operator']:
user_domains_obj_list = g.user.get_domains()
user_domains_list = [item.name for item in user_domains_obj_list]
if domain_name not in user_domains_list:
raise DomainAccessForbidden()
msg_str = "Sending request to powerdns API {0}"
logging.debug(msg_str.format(domain_name))
try:
resp = utils.fetch_remote(
urljoin(pdns_api_url, api_full_uri),
method='DELETE',
headers=headers,
accept='application/json; q=1'
)
if resp.status_code == 204:
logging.debug("Request to powerdns API successful")
history = History(
msg='Delete domain {0}'.format(domain_name),
detail='',
created_by=g.user.username
)
history.add()
domain = Domain()
domain.update()
except Exception as e:
logging.error('Error: {0}'.format(e))
abort(500)
return resp.content, resp.status_code, resp.headers.items()
@csrf.exempt
@api_blueprint.route('/pdnsadmin/apikeys', methods=['POST'])
@api_basic_auth
def api_generate_apikey():
data = request.get_json()
description = None
role_name = None
apikey = None
domain_obj_list = []
abort(400) if 'domains' not in data else None
abort(400) if not isinstance(data['domains'], (list,)) else None
abort(400) if 'role' not in data else None
description = data['description'] if 'description' in data else None
role_name = data['role']
domains = data['domains']
if role_name == 'User' and len(domains) == 0:
logging.error("Apikey with User role must have domains")
raise ApiKeyNotUsable()
elif role_name == 'User':
domain_obj_list = Domain.query.filter(Domain.name.in_(domains)).all()
if len(domain_obj_list) == 0:
msg = "One of supplied domains does not exists"
logging.error(msg)
raise DomainNotExists(message=msg)
if g.user.role.name not in ['Administrator', 'Operator']:
# domain list of domain api key should be valid for
# if not any domain error
# role of api key, user cannot assign role above for api key
if role_name != 'User':
msg = "User cannot assign other role than User"
logging.error(msg)
raise NotEnoughPrivileges(message=msg)
user_domain_obj_list = g.user.get_domains()
domain_list = [item.name for item in domain_obj_list]
user_domain_list = [item.name for item in user_domain_obj_list]
logging.debug("Input domain list: {0}".format(domain_list))
logging.debug("User domain list: {0}".format(user_domain_list))
inter = set(domain_list).intersection(set(user_domain_list))
if not (len(inter) == len(domain_list)):
msg = "You don't have access to one of domains"
logging.error(msg)
raise DomainAccessForbidden(message=msg)
apikey = ApiKey(
desc=description,
role_name=role_name,
domains=domain_obj_list
)
try:
apikey.create()
except Exception as e:
logging.error('Error: {0}'.format(e))
raise ApiKeyCreateFail(message='Api key create failed')
return json.dumps(apikey_plain_schema.dump([apikey])), 201
@csrf.exempt
@api_blueprint.route('/pdnsadmin/apikeys', defaults={'domain_name': None})
@api_blueprint.route('/pdnsadmin/apikeys/<string:domain_name>')
@api_basic_auth
def api_get_apikeys(domain_name):
apikeys = []
logging.debug("Getting apikeys")
if g.user.role.name not in ['Administrator', 'Operator']:
if domain_name:
msg = "Check if domain {0} exists and \
is allowed for user." . format(domain_name)
logging.debug(msg)
apikeys = g.user.get_apikeys(domain_name)
if not apikeys:
raise DomainAccessForbidden(name=domain_name)
logging.debug(apikey_schema.dump(apikeys))
else:
msg_str = "Getting all allowed domains for user {0}"
msg = msg_str . format(g.user.username)
logging.debug(msg)
try:
apikeys = g.user.get_apikeys()
logging.debug(apikey_schema.dump(apikeys))
except Exception as e:
logging.error('Error: {0}'.format(e))
abort(500)
else:
logging.debug("Getting all domains for administrative user")
try:
apikeys = ApiKey.query.all()
logging.debug(apikey_schema.dump(apikeys))
except Exception as e:
logging.error('Error: {0}'.format(e))
abort(500)
return json.dumps(apikey_schema.dump(apikeys)), 200
@csrf.exempt
@api_blueprint.route('/pdnsadmin/apikeys/<int:apikey_id>', methods=['DELETE'])
@api_basic_auth
def api_delete_apikey(apikey_id):
apikey = ApiKey.query.get(apikey_id)
if not apikey:
abort(404)
logging.debug(g.user.role.name)
if g.user.role.name not in ['Administrator', 'Operator']:
apikeys = g.user.get_apikeys()
user_domains_obj_list = g.user.get_domain().all()
apikey_domains_obj_list = apikey.domains
user_domains_list = [item.name for item in user_domains_obj_list]
apikey_domains_list = [item.name for item in apikey_domains_obj_list]
apikeys_ids = [apikey_item.id for apikey_item in apikeys]
inter = set(apikey_domains_list).intersection(set(user_domains_list))
if not (len(inter) == len(apikey_domains_list)):
msg = "You don't have access to some domains apikey belongs to"
logging.error(msg)
raise DomainAccessForbidden(message=msg)
if apikey_id not in apikeys_ids:
raise DomainAccessForbidden()
try:
apikey.delete()
except Exception as e:
logging.error('Error: {0}'.format(e))
abort(500)
return '', 204
@csrf.exempt
@api_blueprint.route('/pdnsadmin/apikeys/<int:apikey_id>', methods=['PUT'])
@api_basic_auth
def api_update_apikey(apikey_id):
# if role different and user is allowed to change it, update
# if apikey domains are different and user is allowed to handle
# that domains update domains
data = request.get_json()
description = data['description'] if 'description' in data else None
role_name = data['role'] if 'role' in data else None
domains = data['domains'] if 'domains' in data else None
domain_obj_list = None
apikey = ApiKey.query.get(apikey_id)
if not apikey:
abort(404)
logging.debug('Updating apikey with id {0}'.format(apikey_id))
if role_name == 'User' and len(domains) == 0:
logging.error("Apikey with User role must have domains")
raise ApiKeyNotUsable()
elif role_name == 'User':
domain_obj_list = Domain.query.filter(Domain.name.in_(domains)).all()
if len(domain_obj_list) == 0:
msg = "One of supplied domains does not exists"
logging.error(msg)
raise DomainNotExists(message=msg)
if g.user.role.name not in ['Administrator', 'Operator']:
if role_name != 'User':
msg = "User cannot assign other role than User"
logging.error(msg)
raise NotEnoughPrivileges(message=msg)
apikeys = g.user.get_apikeys()
apikey_domains = [item.name for item in apikey.domains]
apikeys_ids = [apikey_item.id for apikey_item in apikeys]
user_domain_obj_list = g.user.get_domain().all()
domain_list = [item.name for item in domain_obj_list]
user_domain_list = [item.name for item in user_domain_obj_list]
logging.debug("Input domain list: {0}".format(domain_list))
logging.debug("User domain list: {0}".format(user_domain_list))
inter = set(domain_list).intersection(set(user_domain_list))
if not (len(inter) == len(domain_list)):
msg = "You don't have access to one of domains"
logging.error(msg)
raise DomainAccessForbidden(message=msg)
if apikey_id not in apikeys_ids:
msg = 'Apikey does not belong to domain to which user has access'
logging.error(msg)
raise DomainAccessForbidden()
if set(domains) == set(apikey_domains):
logging.debug("Domains are same, apikey domains won't be updated")
domains = None
if role_name == apikey.role:
logging.debug("Role is same, apikey role won't be updated")
role_name = None
if description == apikey.description:
msg = "Description is same, apikey description won't be updated"
logging.debug(msg)
description = None
try:
apikey = ApiKey.query.get(apikey_id)
apikey.update(
role_name=role_name,
domains=domains,
description=description
)
except Exception as e:
logging.error('Error: {0}'.format(e))
abort(500)
return '', 204
@csrf.exempt
@api_blueprint.route(
'/servers/<string:server_id>/zones/<string:zone_id>/<path:subpath>',
methods=['GET', 'POST', 'PUT', 'PATCH', 'DELETE']
)
@apikey_auth
@apikey_can_access_domain
def api_zone_subpath_forward(server_id, zone_id, subpath):
resp = helper.forward_request()
return resp.content, resp.status_code, resp.headers.items()
@csrf.exempt
@api_blueprint.route(
'/servers/<string:server_id>/zones/<string:zone_id>',
methods=['GET', 'PUT', 'PATCH', 'DELETE']
)
@apikey_auth
@apikey_can_access_domain
def api_zone_forward(server_id, zone_id):
resp = helper.forward_request()
domain = Domain()
domain.update()
return resp.content, resp.status_code, resp.headers.items()
@api_blueprint.route(
'/servers',
methods=['GET']
)
@apikey_auth
@apikey_is_admin
def api_server_forward():
resp = helper.forward_request()
return resp.content, resp.status_code, resp.headers.items()
@api_blueprint.route(
'/servers/<path:subpath>',
methods=['GET', 'PUT']
)
@apikey_auth
@apikey_is_admin
def api_server_sub_forward(subpath):
resp = helper.forward_request()
return resp.content, resp.status_code, resp.headers.items()
@csrf.exempt
@api_blueprint.route('/servers/<string:server_id>/zones', methods=['POST'])
@apikey_auth
def api_create_zone(server_id):
resp = helper.forward_request()
if resp.status_code == 201:
logging.debug("Request to powerdns API successful")
data = request.get_json(force=True)
history = History(
msg='Add domain {0}'.format(data['name'].rstrip('.')),
detail=json.dumps(data),
created_by=g.apikey.description
)
history.add()
if g.apikey.role.name not in ['Administrator', 'Operator']:
logging.debug("Apikey is user key, assigning created domain")
domain = Domain(name=data['name'].rstrip('.'))
g.apikey.domains.append(domain)
domain = Domain()
domain.update()
return resp.content, resp.status_code, resp.headers.items()
@csrf.exempt
@api_blueprint.route('/servers/<string:server_id>/zones', methods=['GET'])
@apikey_auth
def api_get_zones(server_id):
if g.apikey.role.name not in ['Administrator', 'Operator']:
domain_obj_list = g.apikey.domains
else:
domain_obj_list = Domain.query.all()
return json.dumps(domain_schema.dump(domain_obj_list)), 200

View File

@ -1,7 +1,12 @@
from functools import wraps
from flask import g, redirect, url_for
from flask import g, redirect, url_for, request, abort
from app.models import Setting
from .models import User, ApiKey
import base64
from app.lib.log import logging
from app.errors import RequestIsNotJSON, NotEnoughPrivileges
from app.errors import DomainAccessForbidden
def admin_role_required(f):
@ -73,6 +78,140 @@ def can_create_domain(f):
def decorated_function(*args, **kwargs):
if g.user.role.name not in ['Administrator', 'Operator'] and not Setting().get('allow_user_create_domain'):
return redirect(url_for('error', code=401))
return f(*args, **kwargs)
return decorated_function
def api_basic_auth(f):
@wraps(f)
def decorated_function(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if auth_header:
auth_header = auth_header.replace('Basic ', '', 1)
try:
auth_header = str(base64.b64decode(auth_header), 'utf-8')
username, password = auth_header.split(":")
except TypeError as e:
logging.error('Error: {0}'.format(e))
abort(401)
user = User(
username=username,
password=password,
plain_text_password=password
)
try:
auth_method = request.args.get('auth_method', 'LOCAL')
auth_method = 'LDAP' if auth_method != 'LOCAL' else 'LOCAL'
auth = user.is_validate(
method=auth_method,
src_ip=request.remote_addr
)
if not auth:
logging.error('Checking user password failed')
abort(401)
else:
user = User.query.filter(User.username == username).first()
g.user = user
except Exception as e:
logging.error('Error: {0}'.format(e))
abort(401)
else:
logging.error('Error: Authorization header missing!')
abort(401)
return f(*args, **kwargs)
return decorated_function
def is_json(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if request.method in ['POST', 'PUT', 'PATCH']:
if not request.is_json:
raise RequestIsNotJSON()
return f(*args, **kwargs)
return decorated_function
def api_can_create_domain(f):
"""
Grant access if:
- user is in Operator role or higher, or
- allow_user_create_domain is on
"""
@wraps(f)
def decorated_function(*args, **kwargs):
if g.user.role.name not in ['Administrator', 'Operator'] and not Setting().get('allow_user_create_domain'):
msg = "User {0} does not have enough privileges to create domain"
logging.error(msg.format(g.user.username))
raise NotEnoughPrivileges()
return f(*args, **kwargs)
return decorated_function
def apikey_is_admin(f):
"""
Grant access if user is in Administrator role
"""
@wraps(f)
def decorated_function(*args, **kwargs):
if g.apikey.role.name != 'Administrator':
msg = "Apikey {0} does not have enough privileges to create domain"
logging.error(msg.format(g.apikey.id))
raise NotEnoughPrivileges()
return f(*args, **kwargs)
return decorated_function
def apikey_can_access_domain(f):
@wraps(f)
def decorated_function(*args, **kwargs):
apikey = g.apikey
if g.apikey.role.name not in ['Administrator', 'Operator']:
domains = apikey.domains
zone_id = kwargs.get('zone_id')
domain_names = [item.name for item in domains]
if zone_id not in domain_names:
raise DomainAccessForbidden()
return f(*args, **kwargs)
return decorated_function
def apikey_auth(f):
@wraps(f)
def decorated_function(*args, **kwargs):
auth_header = request.headers.get('X-API-KEY')
if auth_header:
try:
apikey_val = str(base64.b64decode(auth_header), 'utf-8')
except TypeError as e:
logging.error('Error: {0}'.format(e))
abort(401)
apikey = ApiKey(
key=apikey_val
)
apikey.plain_text_password = apikey_val
try:
auth_method = 'LOCAL'
auth = apikey.is_validate(
method=auth_method,
src_ip=request.remote_addr
)
g.apikey = auth
except Exception as e:
logging.error('Error: {0}'.format(e))
abort(401)
else:
logging.error('Error: API key header missing!')
abort(401)
return f(*args, **kwargs)
return decorated_function

73
app/errors.py Normal file
View File

@ -0,0 +1,73 @@
class StructuredException(Exception):
status_code = 0
def __init__(self, name=None, message="You want override this error!"):
Exception.__init__(self)
self.message = message
self.name = name
def to_dict(self):
rv = dict()
msg = ''
if self.name:
msg = '{0} {1}'.format(self.message, self.name)
else:
msg = self.message
rv['msg'] = msg
return rv
class DomainNotExists(StructuredException):
status_code = 1000
def __init__(self, name=None, message="Domain does not exist"):
StructuredException.__init__(self)
self.message = message
self.name = name
class DomainAccessForbidden(StructuredException):
status_code = 1001
def __init__(self, name=None, message="Domain access not allowed"):
StructuredException.__init__(self)
self.message = message
self.name = name
class ApiKeyCreateFail(StructuredException):
status_code = 1002
def __init__(self, name=None, message="Creation of api key failed"):
StructuredException.__init__(self)
self.message = message
self.name = name
class ApiKeyNotUsable(StructuredException):
status_code = 1003
def __init__(self, name=None, message="Api key must have domains or have \
administrative role"):
StructuredException.__init__(self)
self.message = message
self.name = name
class NotEnoughPrivileges(StructuredException):
status_code = 1004
def __init__(self, name=None, message="Not enough privileges"):
StructuredException.__init__(self)
self.message = message
self.name = name
class RequestIsNotJSON(StructuredException):
status_code = 1005
def __init__(self, name=None, message="Request is not json"):
StructuredException.__init__(self)
self.message = message
self.name = name

43
app/lib/helper.py Normal file
View File

@ -0,0 +1,43 @@
from app.models import Setting
import requests
from flask import request
import logging as logger
from urllib.parse import urljoin
logging = logger.getLogger(__name__)
def forward_request():
pdns_api_url = Setting().get('pdns_api_url')
pdns_api_key = Setting().get('pdns_api_key')
headers = {}
data = None
msg_str = "Sending request to powerdns API {0}"
if request.method != 'GET' and request.method != 'DELETE':
msg = msg_str.format(request.get_json(force=True))
logging.debug(msg)
data = request.get_json(force=True)
verify = False
headers = {
'user-agent': 'powerdnsadmin/0',
'pragma': 'no-cache',
'cache-control': 'no-cache',
'accept': 'application/json; q=1',
'X-API-KEY': pdns_api_key
}
url = urljoin(pdns_api_url, request.path)
resp = requests.request(
request.method,
url,
headers=headers,
verify=verify,
json=data
)
return resp

View File

@ -43,4 +43,4 @@ class logger(object):
console_formatter = logging.Formatter('[%(levelname)s] %(message)s')
stderr_log_handler.setFormatter(console_formatter)
return logging.getLogger(self.name)
return logging.getLogger(self.name)

View File

@ -11,6 +11,10 @@ from datetime import datetime, timedelta
from threading import Thread
from .certutil import KEY_FILE, CERT_FILE
import logging as logger
logging = logger.getLogger(__name__)
if app.config['SAML_ENABLED']:
from onelogin.saml2.auth import OneLogin_Saml2_Auth
@ -95,10 +99,12 @@ def fetch_remote(remote_url, method='GET', data=None, accept=None, params=None,
params=params
)
try:
if r.status_code not in (200, 400, 422):
if r.status_code not in (200, 201, 204, 400, 422):
r.raise_for_status()
except Exception as e:
raise RuntimeError('Error while fetching {0}'.format(remote_url)) from e
msg = "Returned status {0} and content {1}"
logging.error(msg.format(r.status_code, r.content))
raise RuntimeError('Error while fetching {0}'.format(remote_url))
return r

File diff suppressed because it is too large Load Diff

27
app/schema.py Normal file
View File

@ -0,0 +1,27 @@
from lima import fields, Schema
class DomainSchema(Schema):
id = fields.Integer()
name = fields.String()
class RoleSchema(Schema):
id = fields.Integer()
name = fields.String()
class ApiKeySchema(Schema):
id = fields.Integer()
role = fields.Embed(schema=RoleSchema)
domains = fields.Embed(schema=DomainSchema, many=True)
description = fields.String()
key = fields.String()
class ApiPlainKeySchema(Schema):
id = fields.Integer()
role = fields.Embed(schema=RoleSchema)
domains = fields.Embed(schema=DomainSchema, many=True)
description = fields.String()
plain_key = fields.String()

1440
app/swagger-spec.yaml Normal file

File diff suppressed because it is too large Load Diff

32
app/validators.py Normal file
View File

@ -0,0 +1,32 @@
import os
from bravado_core.spec import Spec
from bravado_core.validate import validate_object
from yaml import load, Loader
def validate_zone(zone):
validate_object(spec, zone_spec, zone)
def validate_apikey(apikey):
validate_object(spec, apikey_spec, apikey)
def get_swagger_spec(spec_path):
with open(spec_path, 'r') as spec:
return load(spec.read(), Loader)
bravado_config = {
'validate_swagger_spec': False,
'validate_requests': False,
'validate_responses': False,
'use_models': True,
}
dir_path = os.path.dirname(os.path.abspath(__file__))
spec_path = os.path.join(dir_path, "swagger-spec.yaml")
spec_dict = get_swagger_spec(spec_path)
spec = Spec.from_dict(spec_dict, config=bravado_config)
zone_spec = spec_dict['definitions']['Zone']
apikey_spec = spec_dict['definitions']['ApiKey']

View File

@ -23,6 +23,7 @@ from app import app, login_manager, csrf
from app.lib import utils
from app.oauth import github_oauth, google_oauth, oidc_oauth
from app.decorators import admin_role_required, operator_role_required, can_access_domain, can_configure_dnssec, can_create_domain
from yaml import Loader, load
if app.config['SAML_ENABLED']:
from onelogin.saml2.utils import OneLogin_Saml2_Utils
@ -107,7 +108,9 @@ def login_via_authorization_header(request):
return None
user = User(username=username, password=password, plain_text_password=password)
try:
auth = user.is_validate(method='LOCAL', src_ip=request.remote_addr)
auth_method = request.args.get('auth_method', 'LOCAL')
auth_method = 'LDAP' if auth_method != 'LOCAL' else 'LOCAL'
auth = user.is_validate(method=auth_method, src_ip=request.remote_addr)
if auth == False:
return None
else:
@ -130,11 +133,13 @@ def http_bad_request(e):
def http_unauthorized(e):
return redirect(url_for('error', code=401))
@app.errorhandler(404)
def http_internal_server_error(e):
return redirect(url_for('error', code=404))
@app.errorhandler(405)
def _handle_api_error(ex):
if request.path.startswith('/api/'):
return json.dumps({"msg": "NotFound"}), 404
else:
return redirect(url_for('error', code=404))
@app.errorhandler(500)
def http_page_not_found(e):
@ -149,6 +154,22 @@ def error(code, msg=None):
else:
return render_template('errors/404.html'), 404
@app.route('/swagger', methods=['GET'])
def swagger_spec():
try:
dir_path = os.path.dirname(os.path.abspath(__file__))
spec_path = os.path.join(dir_path, "swagger-spec.yaml")
spec = open(spec_path,'r')
loaded_spec = load(spec.read(), Loader)
except Exception as e:
logging.error('Cannot view swagger spec. Error: {0}'.format(e))
logging.debug(traceback.format_exc())
return redirect(url_for('error', code=500))
resp = make_response(json.dumps(loaded_spec), 200)
resp.headers['Content-Type'] = 'application/json'
return resp
@app.route('/register', methods=['GET', 'POST'])
def register():
@ -1817,7 +1838,6 @@ def dyndns_update():
return render_template('dyndns.html', response=response), 200
@app.route('/', methods=['GET', 'POST'])
@login_required
def index():

View File

@ -5,7 +5,7 @@ basedir = os.path.abspath(os.path.dirname(__file__))
SECRET_KEY = 'changeme'
LOG_LEVEL = 'DEBUG'
LOG_FILE = os.path.join(basedir, 'logs/log.txt')
SALT = '$2b$12$yLUMTIfl21FKJQpTkRQXCu'
# TIMEOUT - for large zones
TIMEOUT = 10

104
configs/test.py Normal file
View File

@ -0,0 +1,104 @@
import os
basedir = os.path.abspath(os.path.dirname(__file__))
# BASIC APP CONFIG
SECRET_KEY = 'changeme'
LOG_LEVEL = 'DEBUG'
LOG_FILE = os.path.join(basedir, 'logs/log.txt')
SALT = '$2b$12$yLUMTIfl21FKJQpTkRQXCu'
# TIMEOUT - for large zones
TIMEOUT = 10
# UPLOAD DIR
UPLOAD_DIR = os.path.join(basedir, 'upload')
TEST_USER_PASSWORD = 'test'
TEST_USER = 'test'
TEST_ADMIN_USER = 'admin'
TEST_ADMIN_PASSWORD = 'admin'
TEST_USER_APIKEY = 'wewdsfewrfsfsdf'
TEST_ADMIN_APIKEY = 'nghnbnhtghrtert'
# DATABASE CONFIG FOR MYSQL
# DB_HOST = os.environ.get('PDA_DB_HOST')
# DB_PORT = os.environ.get('PDA_DB_PORT', 3306 )
# DB_NAME = os.environ.get('PDA_DB_NAME')
# DB_USER = os.environ.get('PDA_DB_USER')
# DB_PASSWORD = os.environ.get('PDA_DB_PASSWORD')
# #MySQL
# SQLALCHEMY_DATABASE_URI = 'mysql://'+DB_USER+':'+DB_PASSWORD+'@'+DB_HOST+':'+ str(DB_PORT) + '/'+DB_NAME
# SQLALCHEMY_MIGRATE_REPO = os.path.join(basedir, 'db_repository')
TEST_DB_LOCATION = '/tmp/testing.sqlite'
SQLALCHEMY_DATABASE_URI = 'sqlite:///{0}'.format(TEST_DB_LOCATION)
SQLALCHEMY_TRACK_MODIFICATIONS = False
# SAML Authentication
SAML_ENABLED = False
SAML_DEBUG = True
SAML_PATH = os.path.join(os.path.dirname(__file__), 'saml')
##Example for ADFS Metadata-URL
SAML_METADATA_URL = 'https://<hostname>/FederationMetadata/2007-06/FederationMetadata.xml'
#Cache Lifetime in Seconds
SAML_METADATA_CACHE_LIFETIME = 1
# SAML SSO binding format to use
## Default: library default (urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect)
#SAML_IDP_SSO_BINDING = 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST'
## EntityID of the IdP to use. Only needed if more than one IdP is
## in the SAML_METADATA_URL
### Default: First (only) IdP in the SAML_METADATA_URL
### Example: https://idp.example.edu/idp
#SAML_IDP_ENTITY_ID = 'https://idp.example.edu/idp'
## NameID format to request
### Default: The SAML NameID Format in the metadata if present,
### otherwise urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified
### Example: urn:oid:0.9.2342.19200300.100.1.1
#SAML_NAMEID_FORMAT = 'urn:oid:0.9.2342.19200300.100.1.1'
## Attribute to use for Email address
### Default: email
### Example: urn:oid:0.9.2342.19200300.100.1.3
#SAML_ATTRIBUTE_EMAIL = 'urn:oid:0.9.2342.19200300.100.1.3'
## Attribute to use for Given name
### Default: givenname
### Example: urn:oid:2.5.4.42
#SAML_ATTRIBUTE_GIVENNAME = 'urn:oid:2.5.4.42'
## Attribute to use for Surname
### Default: surname
### Example: urn:oid:2.5.4.4
#SAML_ATTRIBUTE_SURNAME = 'urn:oid:2.5.4.4'
## Attribute to use for username
### Default: Use NameID instead
### Example: urn:oid:0.9.2342.19200300.100.1.1
#SAML_ATTRIBUTE_USERNAME = 'urn:oid:0.9.2342.19200300.100.1.1'
## Attribute to get admin status from
### Default: Don't control admin with SAML attribute
### Example: https://example.edu/pdns-admin
### If set, look for the value 'true' to set a user as an administrator
### If not included in assertion, or set to something other than 'true',
### the user is set as a non-administrator user.
#SAML_ATTRIBUTE_ADMIN = 'https://example.edu/pdns-admin'
## Attribute to get account names from
### Default: Don't control accounts with SAML attribute
### If set, the user will be added and removed from accounts to match
### what's in the login assertion. Accounts that don't exist will
### be created and the user added to them.
SAML_ATTRIBUTE_ACCOUNT = 'https://example.edu/pdns-account'
SAML_SP_ENTITY_ID = 'http://<SAML SP Entity ID>'
SAML_SP_CONTACT_NAME = '<contact name>'
SAML_SP_CONTACT_MAIL = '<contact mail>'
#Configures if SAML tokens should be encrypted.
#If enabled a new app certificate will be generated on restart
SAML_SIGN_REQUEST = False
#Use SAML standard logout mechanism retrieved from idp metadata
#If configured false don't care about SAML session on logout.
#Logout from PowerDNS-Admin only and keep SAML session authenticated.
SAML_LOGOUT = False
#Configure to redirect to a different url then PowerDNS-Admin login after SAML logout
#for example redirect to google.com after successful saml logout
#SAML_LOGOUT_URL = 'https://google.com'

50
docker-compose-test.yml Normal file
View File

@ -0,0 +1,50 @@
version: "2.1"
services:
powerdns-admin:
build:
context: .
dockerfile: docker/PowerDNS-Admin/Dockerfile.test
args:
- ENVIRONMENT=test
image: powerdns-admin-test
env_file:
- ./env-test
container_name: powerdns-admin-test
mem_limit: 256M
memswap_limit: 256M
ports:
- "9191:9191"
volumes:
# Code
- .:/powerdns-admin/
- "./configs/test.py:/powerdns-admin/config.py"
- powerdns-admin-assets3:/powerdns-admin/logs
- ./app/static/custom:/powerdns-admin/app/static/custom
logging:
driver: json-file
options:
max-size: 50m
networks:
- default
depends_on:
- pdns-server
pdns-server:
build:
context: .
dockerfile: docker/PowerDNS-Admin/Dockerfile.pdns.test
image: pdns-server-test
ports:
- "5053:53"
- "5053:53/udp"
networks:
- default
env_file:
- ./env-test
networks:
default:
volumes:
powerdns-admin-assets3:

View File

@ -67,8 +67,8 @@ services:
image: psitrax/powerdns
hostname: ${PDNS_HOST}
ports:
- "53:53"
- "53:53/udp"
- "5053:53"
- "5053:53/udp"
networks:
- default
command: --api=yes --api-key=${PDNS_API_KEY} --webserver-address=0.0.0.0 --webserver-allow-from=0.0.0.0/0

View File

@ -2,7 +2,6 @@ FROM ubuntu:16.04
MAINTAINER Khanh Ngo "k@ndk.name"
ARG ENVIRONMENT=development
ENV ENVIRONMENT=${ENVIRONMENT}
WORKDIR /powerdns-admin
RUN apt-get update -y

View File

@ -0,0 +1,13 @@
FROM ubuntu:latest
RUN apt-get update && apt-get install -y pdns-backend-sqlite3 pdns-server sqlite3
COPY ./docker/PowerDNS-Admin/pdns.sqlite.sql /data/pdns.sql
ADD ./docker/PowerDNS-Admin/start.sh /data/
RUN rm -f /etc/powerdns/pdns.d/pdns.simplebind.conf
RUN rm -f /etc/powerdns/pdns.d/bind.conf
RUN chmod +x /data/start.sh && mkdir -p /var/empty/var/run
CMD /data/start.sh

View File

@ -0,0 +1,46 @@
FROM ubuntu:16.04
MAINTAINER Khanh Ngo "k@ndk.name"
ARG ENVIRONMENT=development
ENV ENVIRONMENT=${ENVIRONMENT}
WORKDIR /powerdns-admin
RUN apt-get update -y
RUN apt-get install -y apt-transport-https
RUN apt-get install -y locales locales-all
ENV LC_ALL en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US.UTF-8
RUN apt-get install -y python3-pip python3-dev supervisor curl mysql-client
RUN curl -sL https://deb.nodesource.com/setup_10.x | bash -
RUN apt-get install -y nodejs
RUN curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add -
RUN echo "deb https://dl.yarnpkg.com/debian/ stable main" > /etc/apt/sources.list.d/yarn.list
# Install yarn
RUN apt-get update -y
RUN apt-get install -y yarn
# Install Netcat for DB healthcheck
RUN apt-get install -y netcat
# lib for building mysql db driver
RUN apt-get install -y libmysqlclient-dev
# lib for building ldap and ssl-based application
RUN apt-get install -y libsasl2-dev libldap2-dev libssl-dev
# lib for building python3-saml
RUN apt-get install -y libxml2-dev libxslt1-dev libxmlsec1-dev libffi-dev pkg-config
COPY ./requirements.txt /powerdns-admin/requirements.txt
COPY ./docker/PowerDNS-Admin/wait-for-pdns.sh /opt
RUN chmod u+x /opt/wait-for-pdns.sh
RUN pip3 install -r requirements.txt
CMD ["/opt/wait-for-pdns.sh", "/usr/local/bin/pytest","--capture=no","-vv"]

View File

@ -0,0 +1,92 @@
PRAGMA foreign_keys = 1;
CREATE TABLE domains (
id INTEGER PRIMARY KEY,
name VARCHAR(255) NOT NULL COLLATE NOCASE,
master VARCHAR(128) DEFAULT NULL,
last_check INTEGER DEFAULT NULL,
type VARCHAR(6) NOT NULL,
notified_serial INTEGER DEFAULT NULL,
account VARCHAR(40) DEFAULT NULL
);
CREATE UNIQUE INDEX name_index ON domains(name);
CREATE TABLE records (
id INTEGER PRIMARY KEY,
domain_id INTEGER DEFAULT NULL,
name VARCHAR(255) DEFAULT NULL,
type VARCHAR(10) DEFAULT NULL,
content VARCHAR(65535) DEFAULT NULL,
ttl INTEGER DEFAULT NULL,
prio INTEGER DEFAULT NULL,
change_date INTEGER DEFAULT NULL,
disabled BOOLEAN DEFAULT 0,
ordername VARCHAR(255),
auth BOOL DEFAULT 1,
FOREIGN KEY(domain_id) REFERENCES domains(id) ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE INDEX rec_name_index ON records(name);
CREATE INDEX nametype_index ON records(name,type);
CREATE INDEX domain_id ON records(domain_id);
CREATE INDEX orderindex ON records(ordername);
CREATE TABLE supermasters (
ip VARCHAR(64) NOT NULL,
nameserver VARCHAR(255) NOT NULL COLLATE NOCASE,
account VARCHAR(40) NOT NULL
);
CREATE UNIQUE INDEX ip_nameserver_pk ON supermasters(ip, nameserver);
CREATE TABLE comments (
id INTEGER PRIMARY KEY,
domain_id INTEGER NOT NULL,
name VARCHAR(255) NOT NULL,
type VARCHAR(10) NOT NULL,
modified_at INT NOT NULL,
account VARCHAR(40) DEFAULT NULL,
comment VARCHAR(65535) NOT NULL,
FOREIGN KEY(domain_id) REFERENCES domains(id) ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE INDEX comments_domain_id_index ON comments (domain_id);
CREATE INDEX comments_nametype_index ON comments (name, type);
CREATE INDEX comments_order_idx ON comments (domain_id, modified_at);
CREATE TABLE domainmetadata (
id INTEGER PRIMARY KEY,
domain_id INT NOT NULL,
kind VARCHAR(32) COLLATE NOCASE,
content TEXT,
FOREIGN KEY(domain_id) REFERENCES domains(id) ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE INDEX domainmetaidindex ON domainmetadata(domain_id);
CREATE TABLE cryptokeys (
id INTEGER PRIMARY KEY,
domain_id INT NOT NULL,
flags INT NOT NULL,
active BOOL,
content TEXT,
FOREIGN KEY(domain_id) REFERENCES domains(id) ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE INDEX domainidindex ON cryptokeys(domain_id);
CREATE TABLE tsigkeys (
id INTEGER PRIMARY KEY,
name VARCHAR(255) COLLATE NOCASE,
algorithm VARCHAR(50) COLLATE NOCASE,
secret VARCHAR(255)
);
CREATE UNIQUE INDEX namealgoindex ON tsigkeys(name, algorithm);

View File

@ -0,0 +1,24 @@
#!/usr/bin/env sh
if [ -z ${PDNS_API_KEY+x} ]; then
API_KEY=changeme
fi
if [ -z ${PDNS_PORT+x} ]; then
WEB_PORT=8081
fi
# Import schema structure
if [ -e "/data/pdns.sql" ]; then
rm /data/pdns.db
cat /data/pdns.sql | sqlite3 /data/pdns.db
rm /data/pdns.sql
echo "Imported schema structure"
fi
chown -R pdns:pdns /data/
/usr/sbin/pdns_server \
--launch=gsqlite3 --gsqlite3-database=/data/pdns.db \
--webserver=yes --webserver-address=0.0.0.0 --webserver-port=${PDNS_PORT} \
--api=yes --api-key=$PDNS_API_KEY --webserver-allow-from=${PDNS_WEBSERVER_ALLOW_FROM}

View File

@ -0,0 +1,22 @@
#!/bin/sh
set -e
CMD="$1"
shift
CMD_ARGS="$@"
LOOPS=10
until curl -H "X-API-Key: ${PDNS_API_KEY}" "${PDNS_PROTO}://${PDNS_HOST}:${PDNS_PORT}/api/v1/servers"; do
>&2 echo "PDNS is unavailable - sleeping"
sleep 1
if [ $LOOPS -eq 10 ]
then
break
fi
done
sleep 5
>&2 echo "PDNS is up - executing command"
exec $CMD $CMD_ARGS

10
env-test Normal file
View File

@ -0,0 +1,10 @@
PDNS_DB_HOST=pdns-mysql
PDNS_DB_NAME=pdns
PDNS_DB_USER=pdns
PDNS_DB_PASSWORD=changeme
PDNS_PROTO=http
PDNS_PORT=8081
PDNS_HOST=pdns-server
PDNS_API_KEY=changeme
PDNS_WEBSERVER_ALLOW_FROM=0.0.0.0/0

View File

@ -0,0 +1,43 @@
"""Upgrade BD Schema
Revision ID: 654298797277
Revises: 31a4ed468b18
Create Date: 2018-12-23 22:18:01.904885
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '654298797277'
down_revision = '31a4ed468b18'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('apikey',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('key', sa.String(length=255), nullable=False),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('role_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['role_id'], ['role.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('key')
)
op.create_table('domain_apikey',
sa.Column('domain_id', sa.Integer(), nullable=True),
sa.Column('apikey_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['apikey_id'], ['apikey.id'], ),
sa.ForeignKeyConstraint(['domain_id'], ['domain.id'], )
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('domain_apikey')
op.drop_table('apikey')
# ### end Alembic commands ###

View File

@ -21,3 +21,7 @@ jsmin==2.2.2
Authlib==0.10
Flask-Seasurf
pytimeparse
lima
pytest
bravado-core
PyYAML

View File

@ -1,3 +0,0 @@
#!/usr/bin/env bash
python run.py&
nosetests --with-coverage

0
tests/__init__.py Normal file
View File

296
tests/fixtures.py Normal file
View File

@ -0,0 +1,296 @@
import pytest
import sys
import flask_migrate
import os
from base64 import b64encode
from unittest import mock
sys.path.append(os.getcwd())
from app.models import Role, User, Setting, ApiKey, Domain
from app import app, db
from app.blueprints.api import api_blueprint
from app.lib.log import logging
@pytest.fixture
def client():
app.config['TESTING'] = True
client = app.test_client()
yield client
def load_data(setting_name, *args, **kwargs):
if setting_name == 'maintenance':
return 0
if setting_name == 'pdns_api_url':
return 'http://empty'
if setting_name == 'pdns_api_key':
return 'XXXX'
if setting_name == 'pdns_version':
return '4.1.0'
if setting_name == 'google_oauth_enabled':
return False
if setting_name == 'session_timeout':
return 10
if setting_name == 'allow_user_create_domain':
return True
@pytest.fixture
def basic_auth_admin_headers():
test_admin_user = app.config.get('TEST_ADMIN_USER')
test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD')
user_pass = "{0}:{1}".format(test_admin_user, test_admin_pass)
user_pass_base64 = b64encode(user_pass.encode('utf-8'))
headers = {
"Authorization": "Basic {0}".format(user_pass_base64.decode('utf-8'))
}
return headers
@pytest.fixture
def basic_auth_user_headers():
test_user = app.config.get('TEST_USER')
test_user_pass = app.config.get('TEST_USER_PASSWORD')
user_pass = "{0}:{1}".format(test_user, test_user_pass)
user_pass_base64 = b64encode(user_pass.encode('utf-8'))
headers = {
"Authorization": "Basic {0}".format(user_pass_base64.decode('utf-8'))
}
return headers
@pytest.fixture(scope="module")
def initial_data():
pdns_proto = os.environ['PDNS_PROTO']
pdns_host = os.environ['PDNS_HOST']
pdns_port = os.environ['PDNS_PORT']
pdns_api_url = '{0}://{1}:{2}'.format(pdns_proto, pdns_host, pdns_port)
api_url_setting = Setting('pdns_api_url', pdns_api_url)
api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY'])
allow_create_domain_setting = Setting('allow_user_create_domain', True)
try:
with app.app_context():
flask_migrate.upgrade()
db.session.add(api_url_setting)
db.session.add(api_key_setting)
db.session.add(allow_create_domain_setting)
test_user_pass = app.config.get('TEST_USER_PASSWORD')
test_user = app.config.get('TEST_USER')
test_admin_user = app.config.get('TEST_ADMIN_USER')
test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD')
admin_user = User(
username=test_admin_user,
plain_text_password=test_admin_pass,
email="admin@admin.com"
)
msg = admin_user.create_local_user()
if not msg:
raise Exception("Error occured creating user {0}".format(msg))
ordinary_user = User(
username=test_user,
plain_text_password=test_user_pass,
email="test@test.com"
)
msg = ordinary_user.create_local_user()
if not msg:
raise Exception("Error occured creating user {0}".format(msg))
except Exception as e:
logging.error("Unexpected ERROR: {0}".format(e))
raise e
yield
db.session.close()
os.unlink(app.config['TEST_DB_LOCATION'])
@pytest.fixture(scope="module")
def initial_apikey_data():
pdns_proto = os.environ['PDNS_PROTO']
pdns_host = os.environ['PDNS_HOST']
pdns_port = os.environ['PDNS_PORT']
pdns_api_url = '{0}://{1}:{2}'.format(pdns_proto, pdns_host, pdns_port)
api_url_setting = Setting('pdns_api_url', pdns_api_url)
api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY'])
allow_create_domain_setting = Setting('allow_user_create_domain', True)
try:
with app.app_context():
flask_migrate.upgrade()
db.session.add(api_url_setting)
db.session.add(api_key_setting)
db.session.add(allow_create_domain_setting)
test_user_apikey = app.config.get('TEST_USER_APIKEY')
test_admin_apikey = app.config.get('TEST_ADMIN_APIKEY')
dummy_apikey = ApiKey(
desc="dummy",
role_name="Administrator"
)
admin_key = dummy_apikey.get_hashed_password(
plain_text_password=test_admin_apikey
).decode('utf-8')
admin_apikey = ApiKey(
key=admin_key,
desc="test admin apikey",
role_name="Administrator"
)
admin_apikey.create()
user_key = dummy_apikey.get_hashed_password(
plain_text_password=test_user_apikey
).decode('utf-8')
user_apikey = ApiKey(
key=user_key,
desc="test user apikey",
role_name="User"
)
user_apikey.create()
except Exception as e:
logging.error("Unexpected ERROR: {0}".format(e))
raise e
yield
db.session.close()
os.unlink(app.config['TEST_DB_LOCATION'])
@pytest.fixture
def zone_data():
data = {
"name": "example.org.",
"kind": "NATIVE",
"nameservers": ["ns1.example.org."]
}
return data
@pytest.fixture
def created_zone_data():
data = {
'url': '/api/v1/servers/localhost/zones/example.org.',
'soa_edit_api': 'DEFAULT',
'last_check': 0,
'masters': [],
'dnssec': False,
'notified_serial': 0,
'nsec3narrow': False,
'serial': 2019013101,
'nsec3param': '',
'soa_edit': '',
'api_rectify': False,
'kind': 'Native',
'rrsets': [
{
'comments': [],
'type': 'SOA',
'name': 'example.org.',
'ttl': 3600,
'records': [
{
'content': 'a.misconfigured.powerdns.server. hostmaster.example.org. 2019013101 10800 3600 604800 3600',
'disabled': False
}
]
},
{
'comments': [],
'type': 'NS',
'name': 'example.org.',
'ttl': 3600,
'records': [
{
'content': 'ns1.example.org.',
'disabled': False
}
]
}
],
'name': 'example.org.',
'account': '',
'id': 'example.org.'
}
return data
def user_apikey_data():
data = {
"description": "userkey",
"domains": [
"example.org"
],
"role": "User"
}
return data
def admin_apikey_data():
data = {
"description": "masterkey",
"domains": [],
"role": "Administrator"
}
return data
@pytest.fixture(scope='module')
def user_apikey_integration():
test_user_apikey = app.config.get('TEST_USER_APIKEY')
headers = create_apikey_headers(test_user_apikey)
return headers
@pytest.fixture(scope='module')
def admin_apikey_integration():
test_user_apikey = app.config.get('TEST_ADMIN_APIKEY')
headers = create_apikey_headers(test_user_apikey)
return headers
@pytest.fixture(scope='module')
def user_apikey():
data = user_apikey_data()
api_key = ApiKey(
desc=data['description'],
role_name=data['role'],
domains=[]
)
headers = create_apikey_headers(api_key.plain_key)
return headers
@pytest.fixture(scope='module')
def admin_apikey():
data = admin_apikey_data()
api_key = ApiKey(
desc=data['description'],
role_name=data['role'],
domains=[]
)
headers = create_apikey_headers(api_key.plain_key)
return headers
def create_apikey_headers(passw):
user_pass_base64 = b64encode(passw.encode('utf-8'))
headers = {
"X-API-KEY": "{0}".format(user_pass_base64.decode('utf-8'))
}
return headers

View File

View File

View File

View File

@ -0,0 +1,210 @@
import os
import pytest
import sys
import json
from base64 import b64encode
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_apikey
from app.models import Setting
from app.schema import DomainSchema, ApiKeySchema
from tests.fixtures import client, initial_data, basic_auth_admin_headers
from tests.fixtures import user_apikey_data, admin_apikey_data, zone_data
class TestIntegrationApiApiKeyAdminUser(object):
def test_empty_get(self, client, initial_data, basic_auth_admin_headers):
res = client.get(
"/api/v1/pdnsadmin/apikeys",
headers=basic_auth_admin_headers
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
@pytest.mark.parametrize(
"apikey_data",
[
user_apikey_data(),
admin_apikey_data()
]
)
def test_create_apikey(
self,
client,
initial_data,
apikey_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
assert res.status_code == 201
res = client.post(
"/api/v1/pdnsadmin/apikeys",
headers=basic_auth_admin_headers,
data=json.dumps(apikey_data),
content_type="application/json"
)
data = res.get_json(force=True)
validate_apikey(data)
assert res.status_code == 201
apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}"
apikey_url = apikey_url_format.format(data[0]['id'])
res = client.delete(
apikey_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
@pytest.mark.parametrize(
"apikey_data",
[
user_apikey_data(),
admin_apikey_data()
]
)
def test_get_multiple_apikey(
self,
client,
initial_data,
apikey_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
assert res.status_code == 201
res = client.post(
"/api/v1/pdnsadmin/apikeys",
headers=basic_auth_admin_headers,
data=json.dumps(apikey_data),
content_type="application/json"
)
data = res.get_json(force=True)
validate_apikey(data)
assert res.status_code == 201
res = client.get(
"/api/v1/pdnsadmin/apikeys",
headers=basic_auth_admin_headers
)
data = res.get_json(force=True)
fake_role = namedtuple(
"Role",
data[0]['role'].keys()
)(*data[0]['role'].values())
data[0]['domains'] = []
data[0]['role'] = fake_role
fake_apikey = namedtuple("ApiKey", data[0].keys())(*data[0].values())
apikey_schema = ApiKeySchema(many=True)
json.dumps(apikey_schema.dump([fake_apikey]))
assert res.status_code == 200
apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}"
apikey_url = apikey_url_format.format(fake_apikey.id)
res = client.delete(
apikey_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
@pytest.mark.parametrize(
"apikey_data",
[
user_apikey_data(),
admin_apikey_data()
]
)
def test_delete_apikey(
self,
client,
initial_data,
apikey_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
assert res.status_code == 201
res = client.post(
"/api/v1/pdnsadmin/apikeys",
headers=basic_auth_admin_headers,
data=json.dumps(apikey_data),
content_type="application/json"
)
data = res.get_json(force=True)
validate_apikey(data)
assert res.status_code == 201
apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}"
apikey_url = apikey_url_format.format(data[0]['id'])
res = client.delete(
apikey_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204

View File

@ -0,0 +1,120 @@
import os
import pytest
import sys
import json
from base64 import b64encode
from collections import namedtuple
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_user_headers
from tests.fixtures import zone_data
class TestIntegrationApiZoneUser(object):
def test_empty_get(self, initial_data, client, basic_auth_user_headers):
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
initial_data,
client,
zone_data,
basic_auth_user_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204
def test_get_multiple_zones(
self,
initial_data,
client,
zone_data,
basic_auth_user_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers
)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204
def test_delete_zone(
self,
initial_data,
client,
zone_data,
basic_auth_user_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204

View File

View File

@ -0,0 +1,121 @@
import os
import pytest
import sys
import json
from base64 import b64encode
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_admin_headers
from tests.fixtures import zone_data
class TestIntegrationApiZoneAdminUser(object):
def test_empty_get(self, client, initial_data, basic_auth_admin_headers):
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
client,
initial_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
def test_get_multiple_zones(
self,
client,
initial_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers
)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
def test_delete_zone(
self,
client,
initial_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204

View File

@ -0,0 +1,122 @@
import os
import pytest
import sys
import json
from base64 import b64encode
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client
from tests.fixtures import zone_data, initial_apikey_data
from tests.fixtures import admin_apikey_integration
class TestIntegrationApiZoneAdminApiKey(object):
def test_empty_get(self, client, initial_apikey_data, admin_apikey_integration):
res = client.get(
"/api/v1/servers/localhost/zones",
headers=admin_apikey_integration
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
client,
initial_apikey_data,
zone_data,
admin_apikey_integration
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=admin_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=admin_apikey_integration
)
assert res.status_code == 204
def test_get_multiple_zones(
self,
client,
initial_apikey_data,
zone_data,
admin_apikey_integration
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=admin_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
res = client.get(
"/api/v1/servers/localhost/zones",
headers=admin_apikey_integration
)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=admin_apikey_integration
)
assert res.status_code == 204
def test_delete_zone(
self,
client,
initial_apikey_data,
zone_data,
admin_apikey_integration
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=admin_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=admin_apikey_integration
)
assert res.status_code == 204

View File

@ -0,0 +1,125 @@
import os
import pytest
import sys
import json
from base64 import b64encode
from collections import namedtuple
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client
from tests.fixtures import zone_data, initial_apikey_data
from tests.fixtures import user_apikey_integration
class TestIntegrationApiZoneUserApiKey(object):
def test_empty_get(
self,
initial_apikey_data,
client,
user_apikey_integration
):
res = client.get(
"/api/v1/servers/localhost/zones",
headers=user_apikey_integration
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
initial_apikey_data,
client,
zone_data,
user_apikey_integration
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=user_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=user_apikey_integration
)
assert res.status_code == 204
def test_get_multiple_zones(
self,
initial_apikey_data,
client,
zone_data,
user_apikey_integration
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=user_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
res = client.get(
"/api/v1/servers/localhost/zones",
headers=user_apikey_integration
)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=user_apikey_integration
)
assert res.status_code == 204
def test_delete_zone(
self,
initial_apikey_data,
client,
zone_data,
user_apikey_integration
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=user_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=user_apikey_integration
)
assert res.status_code == 204

View File

@ -0,0 +1,120 @@
import os
import pytest
import sys
import json
from base64 import b64encode
from collections import namedtuple
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_user_headers
from tests.fixtures import zone_data
class TestIntegrationApiZoneUser(object):
def test_empty_get(self, initial_data, client, basic_auth_user_headers):
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
initial_data,
client,
zone_data,
basic_auth_user_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204
def test_get_multiple_zones(
self,
initial_data,
client,
zone_data,
basic_auth_user_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers
)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204
def test_delete_zone(
self,
initial_data,
client,
zone_data,
basic_auth_user_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204

0
tests/unit/__init__.py Normal file
View File

View File

View File

View File

View File

@ -0,0 +1,164 @@
import os
import pytest
from unittest.mock import patch, MagicMock
import sys
import json
from base64 import b64encode
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting, Domain, ApiKey, Role
from app.schema import DomainSchema, ApiKeySchema
from tests.fixtures import client, initial_data, created_zone_data
from tests.fixtures import user_apikey, admin_apikey, zone_data
from tests.fixtures import admin_apikey_data, load_data
class TestUnitApiZoneAdminApiKey(object):
@pytest.fixture
def common_data_mock(self):
self.oauth_setting_patcher = patch(
'app.oauth.Setting',
spec=app.models.Setting
)
self.views_setting_patcher = patch(
'app.views.Setting',
spec=app.models.Setting
)
self.helpers_setting_patcher = patch(
'app.lib.helper.Setting',
spec=app.models.Setting
)
self.models_setting_patcher = patch(
'app.models.Setting',
spec=app.models.Setting
)
self.mock_apikey_patcher = patch(
'app.decorators.ApiKey',
spec=app.models.ApiKey
)
self.mock_hist_patcher = patch(
'app.blueprints.api.History',
spec=app.models.History
)
data = admin_apikey_data()
api_key = ApiKey(
desc=data['description'],
role_name=data['role'],
domains=[]
)
api_key.role = Role(name=data['role'])
self.mock_oauth_setting = self.oauth_setting_patcher.start()
self.mock_views_setting = self.views_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.mock_apikey = self.mock_apikey_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_oauth_setting.return_value.get.side_effect = load_data
self.mock_views_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_apikey.return_value.is_validate.return_value = api_key
def test_empty_get(
self,
client,
common_data_mock,
admin_apikey
):
with patch('app.blueprints.api.Domain') as mock_domain, \
patch('app.lib.utils.requests.get') as mock_get:
mock_domain.return_value.domains.return_value = []
mock_domain.query.all.return_value = []
mock_get.return_value.json.return_value = []
mock_get.return_value.status_code = 200
res = client.get(
"/api/v1/servers/localhost/zones",
headers=admin_apikey
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
client,
common_data_mock,
zone_data,
admin_apikey,
created_zone_data
):
with patch('app.lib.helper.requests.request') as mock_post, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {}
mock_domain.return_value.update.return_value = True
res = client.post(
"/api/v1/servers/localhost/zones",
headers=admin_apikey,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
def test_get_multiple_zones(
self,
client,
common_data_mock,
zone_data,
admin_apikey
):
with patch('app.blueprints.api.Domain') as mock_domain:
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_domain.query.all.return_value = [test_domain]
res = client.get(
"/api/v1/servers/localhost/zones",
headers=admin_apikey
)
data = res.get_json(force=True)
fake_domain = namedtuple(
"Domain",
data[0].keys()
)(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
def test_delete_zone(
self,
client,
common_data_mock,
zone_data,
admin_apikey
):
with patch('app.lib.utils.requests.request') as mock_delete, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_domain.return_value.update.return_value = True
mock_delete.return_value.status_code = 204
mock_delete.return_value.content = ''
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=admin_apikey
)
assert res.status_code == 204

View File

@ -0,0 +1,167 @@
import os
import pytest
from unittest.mock import patch, MagicMock
import sys
import json
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import User, Domain, Role
from app.schema import DomainSchema
from tests.fixtures import client, basic_auth_admin_headers
from tests.fixtures import zone_data, created_zone_data, load_data
class TestUnitApiZoneAdminUser(object):
@pytest.fixture
def common_data_mock(self):
self.oauth_setting_patcher = patch(
'app.oauth.Setting',
spec=app.models.Setting
)
self.api_setting_patcher = patch(
'app.blueprints.api.Setting',
spec=app.models.Setting
)
self.views_setting_patcher = patch(
'app.views.Setting',
spec=app.models.Setting
)
self.helpers_setting_patcher = patch(
'app.lib.helper.Setting',
spec=app.models.Setting
)
self.models_setting_patcher = patch(
'app.models.Setting',
spec=app.models.Setting
)
self.decorators_setting_patcher = patch(
'app.decorators.Setting'
)
self.mock_user_patcher = patch(
'app.decorators.User'
)
self.mock_hist_patcher = patch(
'app.blueprints.api.History',
spec=app.models.History
)
self.mock_oauth_setting = self.oauth_setting_patcher.start()
self.mock_views_setting = self.views_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.decorators_setting = self.decorators_setting_patcher.start()
self.api_setting = self.api_setting_patcher.start()
self.mock_user = self.mock_user_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_oauth_setting.return_value.get.side_effect = load_data
self.mock_views_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.decorators_setting.return_value.get.side_effect = load_data
self.api_setting.return_value.get.side_effect = load_data
mockk = MagicMock()
mockk.role.name = "Administrator"
self.mock_user.query.filter.return_value.first.return_value = mockk
self.mock_user.return_value.is_validate.return_value = True
def test_empty_get(
self,
client,
common_data_mock,
basic_auth_admin_headers
):
with patch('app.blueprints.api.Domain') as mock_domain, \
patch('app.lib.utils.requests.get') as mock_get:
mock_domain.return_value.domains.return_value = []
mock_domain.query.all.return_value = []
mock_get.return_value.json.return_value = []
mock_get.return_value.status_code = 200
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
client,
common_data_mock,
zone_data,
basic_auth_admin_headers,
created_zone_data
):
with patch('app.lib.helper.requests.request') as mock_post, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {}
mock_domain.return_value.update.return_value = True
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
def test_get_multiple_zones(
self,
client,
common_data_mock,
zone_data,
basic_auth_admin_headers
):
with patch('app.blueprints.api.Domain') as mock_domain:
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_domain.query.all.return_value = [test_domain]
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers
)
data = res.get_json(force=True)
fake_domain = namedtuple(
"Domain",
data[0].keys()
)(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
def test_delete_zone(
self,
client,
common_data_mock,
zone_data,
basic_auth_admin_headers
):
with patch('app.lib.utils.requests.request') as mock_delete, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_domain.return_value.update.return_value = True
mock_domain.query.filter.return_value = True
mock_delete.return_value.status_code = 204
mock_delete.return_value.content = ''
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204

View File

@ -0,0 +1,146 @@
import os
import pytest
from unittest.mock import patch, MagicMock
import sys
import json
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import User, Domain, Role
from app.schema import DomainSchema
from tests.fixtures import client, basic_auth_user_headers
from tests.fixtures import zone_data, created_zone_data, load_data
class TestUnitApiZoneUser(object):
@pytest.fixture
def common_data_mock(self):
self.oauth_setting_patcher = patch(
'app.oauth.Setting',
spec=app.models.Setting
)
self.api_setting_patcher = patch(
'app.blueprints.api.Setting',
spec=app.models.Setting
)
self.views_setting_patcher = patch(
'app.views.Setting',
spec=app.models.Setting
)
self.helpers_setting_patcher = patch(
'app.lib.helper.Setting',
spec=app.models.Setting
)
self.models_setting_patcher = patch(
'app.models.Setting',
spec=app.models.Setting
)
self.decorators_setting_patcher = patch(
'app.decorators.Setting'
)
self.mock_user_patcher = patch(
'app.decorators.User'
)
self.mock_hist_patcher = patch(
'app.blueprints.api.History',
spec=app.models.History
)
self.mock_oauth_setting = self.oauth_setting_patcher.start()
self.mock_views_setting = self.views_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.decorators_setting = self.decorators_setting_patcher.start()
self.api_setting = self.api_setting_patcher.start()
self.mock_user = self.mock_user_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_oauth_setting.return_value.get.side_effect = load_data
self.mock_views_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.decorators_setting.return_value.get.side_effect = load_data
self.api_setting.return_value.get.side_effect = load_data
self.mockk = MagicMock()
self.mockk.role.name = "User"
self.mock_user.query.filter.return_value.first.return_value = self.mockk
self.mock_user.return_value.is_validate.return_value = True
def test_create_zone(
self,
client,
common_data_mock,
zone_data,
basic_auth_user_headers,
created_zone_data
):
with patch('app.lib.helper.requests.request') as mock_post, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {}
mock_domain.return_value.update.return_value = True
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
def test_get_multiple_zones(
self,
client,
common_data_mock,
zone_data,
basic_auth_user_headers
):
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
self.mockk.get_domains.return_value = [test_domain]
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers
)
data = res.get_json(force=True)
fake_domain = namedtuple(
"Domain",
data[0].keys()
)(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
def test_delete_zone(
self,
client,
common_data_mock,
zone_data,
basic_auth_user_headers
):
with patch('app.lib.utils.requests.request') as mock_delete, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_domain.return_value.update.return_value = True
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
self.mockk.get_domains.return_value = [test_domain]
mock_delete.return_value.status_code = 204
mock_delete.return_value.content = ''
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204

View File

@ -0,0 +1,145 @@
import os
import pytest
from unittest.mock import patch, MagicMock
import sys
import json
from base64 import b64encode
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting, Domain, ApiKey, Role
from app.schema import DomainSchema, ApiKeySchema
from tests.fixtures import client, initial_data, created_zone_data
from tests.fixtures import user_apikey, zone_data
from tests.fixtures import user_apikey_data, load_data
class TestUnitApiZoneUserApiKey(object):
@pytest.fixture
def common_data_mock(self):
self.oauth_setting_patcher = patch(
'app.oauth.Setting',
spec=app.models.Setting
)
self.views_setting_patcher = patch(
'app.views.Setting',
spec=app.models.Setting
)
self.helpers_setting_patcher = patch(
'app.lib.helper.Setting',
spec=app.models.Setting
)
self.models_setting_patcher = patch(
'app.models.Setting',
spec=app.models.Setting
)
self.mock_apikey_patcher = patch(
'app.decorators.ApiKey',
spec=app.models.ApiKey
)
self.mock_hist_patcher = patch(
'app.blueprints.api.History',
spec=app.models.History
)
self.mock_oauth_setting = self.oauth_setting_patcher.start()
self.mock_views_setting = self.views_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.mock_apikey = self.mock_apikey_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_oauth_setting.return_value.get.side_effect = load_data
self.mock_views_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
data = user_apikey_data()
domain = Domain(name=data['domains'][0])
api_key = ApiKey(
desc=data['description'],
role_name=data['role'],
domains=[domain]
)
api_key.role = Role(name=data['role'])
self.mock_apikey.return_value.is_validate.return_value = api_key
def test_create_zone(
self,
client,
common_data_mock,
zone_data,
user_apikey,
created_zone_data
):
with patch('app.lib.helper.requests.request') as mock_post, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {}
mock_domain.return_value.update.return_value = True
res = client.post(
"/api/v1/servers/localhost/zones",
headers=user_apikey,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
def test_get_multiple_zones(
self,
client,
common_data_mock,
zone_data,
user_apikey
):
with patch('app.blueprints.api.Domain') as mock_domain:
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_domain.query.all.return_value = [test_domain]
res = client.get(
"/api/v1/servers/localhost/zones",
headers=user_apikey
)
data = res.get_json(force=True)
fake_domain = namedtuple(
"Domain",
data[0].keys()
)(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
def test_delete_zone(
self,
client,
common_data_mock,
zone_data,
user_apikey
):
with patch('app.lib.utils.requests.request') as mock_delete, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_domain.return_value.update.return_value = True
mock_delete.return_value.status_code = 204
mock_delete.return_value.content = ''
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=user_apikey
)
assert res.status_code == 204