Add Api to PowerDNS-Admin

This commit is contained in:
Pavol Ipoth
2019-03-01 23:49:31 +01:00
parent 343190b684
commit 1feb77e2f3
49 changed files with 5001 additions and 226 deletions

0
tests/__init__.py Normal file
View File

296
tests/fixtures.py Normal file
View File

@ -0,0 +1,296 @@
import pytest
import sys
import flask_migrate
import os
from base64 import b64encode
from unittest import mock
sys.path.append(os.getcwd())
from app.models import Role, User, Setting, ApiKey, Domain
from app import app, db
from app.blueprints.api import api_blueprint
from app.lib.log import logging
@pytest.fixture
def client():
app.config['TESTING'] = True
client = app.test_client()
yield client
def load_data(setting_name, *args, **kwargs):
if setting_name == 'maintenance':
return 0
if setting_name == 'pdns_api_url':
return 'http://empty'
if setting_name == 'pdns_api_key':
return 'XXXX'
if setting_name == 'pdns_version':
return '4.1.0'
if setting_name == 'google_oauth_enabled':
return False
if setting_name == 'session_timeout':
return 10
if setting_name == 'allow_user_create_domain':
return True
@pytest.fixture
def basic_auth_admin_headers():
test_admin_user = app.config.get('TEST_ADMIN_USER')
test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD')
user_pass = "{0}:{1}".format(test_admin_user, test_admin_pass)
user_pass_base64 = b64encode(user_pass.encode('utf-8'))
headers = {
"Authorization": "Basic {0}".format(user_pass_base64.decode('utf-8'))
}
return headers
@pytest.fixture
def basic_auth_user_headers():
test_user = app.config.get('TEST_USER')
test_user_pass = app.config.get('TEST_USER_PASSWORD')
user_pass = "{0}:{1}".format(test_user, test_user_pass)
user_pass_base64 = b64encode(user_pass.encode('utf-8'))
headers = {
"Authorization": "Basic {0}".format(user_pass_base64.decode('utf-8'))
}
return headers
@pytest.fixture(scope="module")
def initial_data():
pdns_proto = os.environ['PDNS_PROTO']
pdns_host = os.environ['PDNS_HOST']
pdns_port = os.environ['PDNS_PORT']
pdns_api_url = '{0}://{1}:{2}'.format(pdns_proto, pdns_host, pdns_port)
api_url_setting = Setting('pdns_api_url', pdns_api_url)
api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY'])
allow_create_domain_setting = Setting('allow_user_create_domain', True)
try:
with app.app_context():
flask_migrate.upgrade()
db.session.add(api_url_setting)
db.session.add(api_key_setting)
db.session.add(allow_create_domain_setting)
test_user_pass = app.config.get('TEST_USER_PASSWORD')
test_user = app.config.get('TEST_USER')
test_admin_user = app.config.get('TEST_ADMIN_USER')
test_admin_pass = app.config.get('TEST_ADMIN_PASSWORD')
admin_user = User(
username=test_admin_user,
plain_text_password=test_admin_pass,
email="admin@admin.com"
)
msg = admin_user.create_local_user()
if not msg:
raise Exception("Error occured creating user {0}".format(msg))
ordinary_user = User(
username=test_user,
plain_text_password=test_user_pass,
email="test@test.com"
)
msg = ordinary_user.create_local_user()
if not msg:
raise Exception("Error occured creating user {0}".format(msg))
except Exception as e:
logging.error("Unexpected ERROR: {0}".format(e))
raise e
yield
db.session.close()
os.unlink(app.config['TEST_DB_LOCATION'])
@pytest.fixture(scope="module")
def initial_apikey_data():
pdns_proto = os.environ['PDNS_PROTO']
pdns_host = os.environ['PDNS_HOST']
pdns_port = os.environ['PDNS_PORT']
pdns_api_url = '{0}://{1}:{2}'.format(pdns_proto, pdns_host, pdns_port)
api_url_setting = Setting('pdns_api_url', pdns_api_url)
api_key_setting = Setting('pdns_api_key', os.environ['PDNS_API_KEY'])
allow_create_domain_setting = Setting('allow_user_create_domain', True)
try:
with app.app_context():
flask_migrate.upgrade()
db.session.add(api_url_setting)
db.session.add(api_key_setting)
db.session.add(allow_create_domain_setting)
test_user_apikey = app.config.get('TEST_USER_APIKEY')
test_admin_apikey = app.config.get('TEST_ADMIN_APIKEY')
dummy_apikey = ApiKey(
desc="dummy",
role_name="Administrator"
)
admin_key = dummy_apikey.get_hashed_password(
plain_text_password=test_admin_apikey
).decode('utf-8')
admin_apikey = ApiKey(
key=admin_key,
desc="test admin apikey",
role_name="Administrator"
)
admin_apikey.create()
user_key = dummy_apikey.get_hashed_password(
plain_text_password=test_user_apikey
).decode('utf-8')
user_apikey = ApiKey(
key=user_key,
desc="test user apikey",
role_name="User"
)
user_apikey.create()
except Exception as e:
logging.error("Unexpected ERROR: {0}".format(e))
raise e
yield
db.session.close()
os.unlink(app.config['TEST_DB_LOCATION'])
@pytest.fixture
def zone_data():
data = {
"name": "example.org.",
"kind": "NATIVE",
"nameservers": ["ns1.example.org."]
}
return data
@pytest.fixture
def created_zone_data():
data = {
'url': '/api/v1/servers/localhost/zones/example.org.',
'soa_edit_api': 'DEFAULT',
'last_check': 0,
'masters': [],
'dnssec': False,
'notified_serial': 0,
'nsec3narrow': False,
'serial': 2019013101,
'nsec3param': '',
'soa_edit': '',
'api_rectify': False,
'kind': 'Native',
'rrsets': [
{
'comments': [],
'type': 'SOA',
'name': 'example.org.',
'ttl': 3600,
'records': [
{
'content': 'a.misconfigured.powerdns.server. hostmaster.example.org. 2019013101 10800 3600 604800 3600',
'disabled': False
}
]
},
{
'comments': [],
'type': 'NS',
'name': 'example.org.',
'ttl': 3600,
'records': [
{
'content': 'ns1.example.org.',
'disabled': False
}
]
}
],
'name': 'example.org.',
'account': '',
'id': 'example.org.'
}
return data
def user_apikey_data():
data = {
"description": "userkey",
"domains": [
"example.org"
],
"role": "User"
}
return data
def admin_apikey_data():
data = {
"description": "masterkey",
"domains": [],
"role": "Administrator"
}
return data
@pytest.fixture(scope='module')
def user_apikey_integration():
test_user_apikey = app.config.get('TEST_USER_APIKEY')
headers = create_apikey_headers(test_user_apikey)
return headers
@pytest.fixture(scope='module')
def admin_apikey_integration():
test_user_apikey = app.config.get('TEST_ADMIN_APIKEY')
headers = create_apikey_headers(test_user_apikey)
return headers
@pytest.fixture(scope='module')
def user_apikey():
data = user_apikey_data()
api_key = ApiKey(
desc=data['description'],
role_name=data['role'],
domains=[]
)
headers = create_apikey_headers(api_key.plain_key)
return headers
@pytest.fixture(scope='module')
def admin_apikey():
data = admin_apikey_data()
api_key = ApiKey(
desc=data['description'],
role_name=data['role'],
domains=[]
)
headers = create_apikey_headers(api_key.plain_key)
return headers
def create_apikey_headers(passw):
user_pass_base64 = b64encode(passw.encode('utf-8'))
headers = {
"X-API-KEY": "{0}".format(user_pass_base64.decode('utf-8'))
}
return headers

View File

View File

View File

View File

@ -0,0 +1,210 @@
import os
import pytest
import sys
import json
from base64 import b64encode
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_apikey
from app.models import Setting
from app.schema import DomainSchema, ApiKeySchema
from tests.fixtures import client, initial_data, basic_auth_admin_headers
from tests.fixtures import user_apikey_data, admin_apikey_data, zone_data
class TestIntegrationApiApiKeyAdminUser(object):
def test_empty_get(self, client, initial_data, basic_auth_admin_headers):
res = client.get(
"/api/v1/pdnsadmin/apikeys",
headers=basic_auth_admin_headers
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
@pytest.mark.parametrize(
"apikey_data",
[
user_apikey_data(),
admin_apikey_data()
]
)
def test_create_apikey(
self,
client,
initial_data,
apikey_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
assert res.status_code == 201
res = client.post(
"/api/v1/pdnsadmin/apikeys",
headers=basic_auth_admin_headers,
data=json.dumps(apikey_data),
content_type="application/json"
)
data = res.get_json(force=True)
validate_apikey(data)
assert res.status_code == 201
apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}"
apikey_url = apikey_url_format.format(data[0]['id'])
res = client.delete(
apikey_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
@pytest.mark.parametrize(
"apikey_data",
[
user_apikey_data(),
admin_apikey_data()
]
)
def test_get_multiple_apikey(
self,
client,
initial_data,
apikey_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
assert res.status_code == 201
res = client.post(
"/api/v1/pdnsadmin/apikeys",
headers=basic_auth_admin_headers,
data=json.dumps(apikey_data),
content_type="application/json"
)
data = res.get_json(force=True)
validate_apikey(data)
assert res.status_code == 201
res = client.get(
"/api/v1/pdnsadmin/apikeys",
headers=basic_auth_admin_headers
)
data = res.get_json(force=True)
fake_role = namedtuple(
"Role",
data[0]['role'].keys()
)(*data[0]['role'].values())
data[0]['domains'] = []
data[0]['role'] = fake_role
fake_apikey = namedtuple("ApiKey", data[0].keys())(*data[0].values())
apikey_schema = ApiKeySchema(many=True)
json.dumps(apikey_schema.dump([fake_apikey]))
assert res.status_code == 200
apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}"
apikey_url = apikey_url_format.format(fake_apikey.id)
res = client.delete(
apikey_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
@pytest.mark.parametrize(
"apikey_data",
[
user_apikey_data(),
admin_apikey_data()
]
)
def test_delete_apikey(
self,
client,
initial_data,
apikey_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
assert res.status_code == 201
res = client.post(
"/api/v1/pdnsadmin/apikeys",
headers=basic_auth_admin_headers,
data=json.dumps(apikey_data),
content_type="application/json"
)
data = res.get_json(force=True)
validate_apikey(data)
assert res.status_code == 201
apikey_url_format = "/api/v1/pdnsadmin/apikeys/{0}"
apikey_url = apikey_url_format.format(data[0]['id'])
res = client.delete(
apikey_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204

View File

@ -0,0 +1,120 @@
import os
import pytest
import sys
import json
from base64 import b64encode
from collections import namedtuple
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_user_headers
from tests.fixtures import zone_data
class TestIntegrationApiZoneUser(object):
def test_empty_get(self, initial_data, client, basic_auth_user_headers):
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
initial_data,
client,
zone_data,
basic_auth_user_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204
def test_get_multiple_zones(
self,
initial_data,
client,
zone_data,
basic_auth_user_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers
)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204
def test_delete_zone(
self,
initial_data,
client,
zone_data,
basic_auth_user_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204

View File

View File

@ -0,0 +1,121 @@
import os
import pytest
import sys
import json
from base64 import b64encode
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_admin_headers
from tests.fixtures import zone_data
class TestIntegrationApiZoneAdminUser(object):
def test_empty_get(self, client, initial_data, basic_auth_admin_headers):
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
client,
initial_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
def test_get_multiple_zones(
self,
client,
initial_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers
)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204
def test_delete_zone(
self,
client,
initial_data,
zone_data,
basic_auth_admin_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204

View File

@ -0,0 +1,122 @@
import os
import pytest
import sys
import json
from base64 import b64encode
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client
from tests.fixtures import zone_data, initial_apikey_data
from tests.fixtures import admin_apikey_integration
class TestIntegrationApiZoneAdminApiKey(object):
def test_empty_get(self, client, initial_apikey_data, admin_apikey_integration):
res = client.get(
"/api/v1/servers/localhost/zones",
headers=admin_apikey_integration
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
client,
initial_apikey_data,
zone_data,
admin_apikey_integration
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=admin_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=admin_apikey_integration
)
assert res.status_code == 204
def test_get_multiple_zones(
self,
client,
initial_apikey_data,
zone_data,
admin_apikey_integration
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=admin_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
res = client.get(
"/api/v1/servers/localhost/zones",
headers=admin_apikey_integration
)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=admin_apikey_integration
)
assert res.status_code == 204
def test_delete_zone(
self,
client,
initial_apikey_data,
zone_data,
admin_apikey_integration
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=admin_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=admin_apikey_integration
)
assert res.status_code == 204

View File

@ -0,0 +1,125 @@
import os
import pytest
import sys
import json
from base64 import b64encode
from collections import namedtuple
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client
from tests.fixtures import zone_data, initial_apikey_data
from tests.fixtures import user_apikey_integration
class TestIntegrationApiZoneUserApiKey(object):
def test_empty_get(
self,
initial_apikey_data,
client,
user_apikey_integration
):
res = client.get(
"/api/v1/servers/localhost/zones",
headers=user_apikey_integration
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
initial_apikey_data,
client,
zone_data,
user_apikey_integration
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=user_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=user_apikey_integration
)
assert res.status_code == 204
def test_get_multiple_zones(
self,
initial_apikey_data,
client,
zone_data,
user_apikey_integration
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=user_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
res = client.get(
"/api/v1/servers/localhost/zones",
headers=user_apikey_integration
)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=user_apikey_integration
)
assert res.status_code == 204
def test_delete_zone(
self,
initial_apikey_data,
client,
zone_data,
user_apikey_integration
):
res = client.post(
"/api/v1/servers/localhost/zones",
headers=user_apikey_integration,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=user_apikey_integration
)
assert res.status_code == 204

View File

@ -0,0 +1,120 @@
import os
import pytest
import sys
import json
from base64 import b64encode
from collections import namedtuple
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting
from app.schema import DomainSchema
from tests.fixtures import client, initial_data, basic_auth_user_headers
from tests.fixtures import zone_data
class TestIntegrationApiZoneUser(object):
def test_empty_get(self, initial_data, client, basic_auth_user_headers):
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
initial_data,
client,
zone_data,
basic_auth_user_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204
def test_get_multiple_zones(
self,
initial_data,
client,
zone_data,
basic_auth_user_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers
)
data = res.get_json(force=True)
fake_domain = namedtuple("Domain", data[0].keys())(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204
def test_delete_zone(
self,
initial_data,
client,
zone_data,
basic_auth_user_headers
):
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204

0
tests/unit/__init__.py Normal file
View File

View File

View File

View File

View File

@ -0,0 +1,164 @@
import os
import pytest
from unittest.mock import patch, MagicMock
import sys
import json
from base64 import b64encode
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting, Domain, ApiKey, Role
from app.schema import DomainSchema, ApiKeySchema
from tests.fixtures import client, initial_data, created_zone_data
from tests.fixtures import user_apikey, admin_apikey, zone_data
from tests.fixtures import admin_apikey_data, load_data
class TestUnitApiZoneAdminApiKey(object):
@pytest.fixture
def common_data_mock(self):
self.oauth_setting_patcher = patch(
'app.oauth.Setting',
spec=app.models.Setting
)
self.views_setting_patcher = patch(
'app.views.Setting',
spec=app.models.Setting
)
self.helpers_setting_patcher = patch(
'app.lib.helper.Setting',
spec=app.models.Setting
)
self.models_setting_patcher = patch(
'app.models.Setting',
spec=app.models.Setting
)
self.mock_apikey_patcher = patch(
'app.decorators.ApiKey',
spec=app.models.ApiKey
)
self.mock_hist_patcher = patch(
'app.blueprints.api.History',
spec=app.models.History
)
data = admin_apikey_data()
api_key = ApiKey(
desc=data['description'],
role_name=data['role'],
domains=[]
)
api_key.role = Role(name=data['role'])
self.mock_oauth_setting = self.oauth_setting_patcher.start()
self.mock_views_setting = self.views_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.mock_apikey = self.mock_apikey_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_oauth_setting.return_value.get.side_effect = load_data
self.mock_views_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.mock_apikey.return_value.is_validate.return_value = api_key
def test_empty_get(
self,
client,
common_data_mock,
admin_apikey
):
with patch('app.blueprints.api.Domain') as mock_domain, \
patch('app.lib.utils.requests.get') as mock_get:
mock_domain.return_value.domains.return_value = []
mock_domain.query.all.return_value = []
mock_get.return_value.json.return_value = []
mock_get.return_value.status_code = 200
res = client.get(
"/api/v1/servers/localhost/zones",
headers=admin_apikey
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
client,
common_data_mock,
zone_data,
admin_apikey,
created_zone_data
):
with patch('app.lib.helper.requests.request') as mock_post, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {}
mock_domain.return_value.update.return_value = True
res = client.post(
"/api/v1/servers/localhost/zones",
headers=admin_apikey,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
def test_get_multiple_zones(
self,
client,
common_data_mock,
zone_data,
admin_apikey
):
with patch('app.blueprints.api.Domain') as mock_domain:
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_domain.query.all.return_value = [test_domain]
res = client.get(
"/api/v1/servers/localhost/zones",
headers=admin_apikey
)
data = res.get_json(force=True)
fake_domain = namedtuple(
"Domain",
data[0].keys()
)(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
def test_delete_zone(
self,
client,
common_data_mock,
zone_data,
admin_apikey
):
with patch('app.lib.utils.requests.request') as mock_delete, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_domain.return_value.update.return_value = True
mock_delete.return_value.status_code = 204
mock_delete.return_value.content = ''
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=admin_apikey
)
assert res.status_code == 204

View File

@ -0,0 +1,167 @@
import os
import pytest
from unittest.mock import patch, MagicMock
import sys
import json
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import User, Domain, Role
from app.schema import DomainSchema
from tests.fixtures import client, basic_auth_admin_headers
from tests.fixtures import zone_data, created_zone_data, load_data
class TestUnitApiZoneAdminUser(object):
@pytest.fixture
def common_data_mock(self):
self.oauth_setting_patcher = patch(
'app.oauth.Setting',
spec=app.models.Setting
)
self.api_setting_patcher = patch(
'app.blueprints.api.Setting',
spec=app.models.Setting
)
self.views_setting_patcher = patch(
'app.views.Setting',
spec=app.models.Setting
)
self.helpers_setting_patcher = patch(
'app.lib.helper.Setting',
spec=app.models.Setting
)
self.models_setting_patcher = patch(
'app.models.Setting',
spec=app.models.Setting
)
self.decorators_setting_patcher = patch(
'app.decorators.Setting'
)
self.mock_user_patcher = patch(
'app.decorators.User'
)
self.mock_hist_patcher = patch(
'app.blueprints.api.History',
spec=app.models.History
)
self.mock_oauth_setting = self.oauth_setting_patcher.start()
self.mock_views_setting = self.views_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.decorators_setting = self.decorators_setting_patcher.start()
self.api_setting = self.api_setting_patcher.start()
self.mock_user = self.mock_user_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_oauth_setting.return_value.get.side_effect = load_data
self.mock_views_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.decorators_setting.return_value.get.side_effect = load_data
self.api_setting.return_value.get.side_effect = load_data
mockk = MagicMock()
mockk.role.name = "Administrator"
self.mock_user.query.filter.return_value.first.return_value = mockk
self.mock_user.return_value.is_validate.return_value = True
def test_empty_get(
self,
client,
common_data_mock,
basic_auth_admin_headers
):
with patch('app.blueprints.api.Domain') as mock_domain, \
patch('app.lib.utils.requests.get') as mock_get:
mock_domain.return_value.domains.return_value = []
mock_domain.query.all.return_value = []
mock_get.return_value.json.return_value = []
mock_get.return_value.status_code = 200
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers
)
data = res.get_json(force=True)
assert res.status_code == 200
assert data == []
def test_create_zone(
self,
client,
common_data_mock,
zone_data,
basic_auth_admin_headers,
created_zone_data
):
with patch('app.lib.helper.requests.request') as mock_post, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {}
mock_domain.return_value.update.return_value = True
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
def test_get_multiple_zones(
self,
client,
common_data_mock,
zone_data,
basic_auth_admin_headers
):
with patch('app.blueprints.api.Domain') as mock_domain:
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_domain.query.all.return_value = [test_domain]
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_admin_headers
)
data = res.get_json(force=True)
fake_domain = namedtuple(
"Domain",
data[0].keys()
)(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
def test_delete_zone(
self,
client,
common_data_mock,
zone_data,
basic_auth_admin_headers
):
with patch('app.lib.utils.requests.request') as mock_delete, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_domain.return_value.update.return_value = True
mock_domain.query.filter.return_value = True
mock_delete.return_value.status_code = 204
mock_delete.return_value.content = ''
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_admin_headers
)
assert res.status_code == 204

View File

@ -0,0 +1,146 @@
import os
import pytest
from unittest.mock import patch, MagicMock
import sys
import json
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import User, Domain, Role
from app.schema import DomainSchema
from tests.fixtures import client, basic_auth_user_headers
from tests.fixtures import zone_data, created_zone_data, load_data
class TestUnitApiZoneUser(object):
@pytest.fixture
def common_data_mock(self):
self.oauth_setting_patcher = patch(
'app.oauth.Setting',
spec=app.models.Setting
)
self.api_setting_patcher = patch(
'app.blueprints.api.Setting',
spec=app.models.Setting
)
self.views_setting_patcher = patch(
'app.views.Setting',
spec=app.models.Setting
)
self.helpers_setting_patcher = patch(
'app.lib.helper.Setting',
spec=app.models.Setting
)
self.models_setting_patcher = patch(
'app.models.Setting',
spec=app.models.Setting
)
self.decorators_setting_patcher = patch(
'app.decorators.Setting'
)
self.mock_user_patcher = patch(
'app.decorators.User'
)
self.mock_hist_patcher = patch(
'app.blueprints.api.History',
spec=app.models.History
)
self.mock_oauth_setting = self.oauth_setting_patcher.start()
self.mock_views_setting = self.views_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.decorators_setting = self.decorators_setting_patcher.start()
self.api_setting = self.api_setting_patcher.start()
self.mock_user = self.mock_user_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_oauth_setting.return_value.get.side_effect = load_data
self.mock_views_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
self.decorators_setting.return_value.get.side_effect = load_data
self.api_setting.return_value.get.side_effect = load_data
self.mockk = MagicMock()
self.mockk.role.name = "User"
self.mock_user.query.filter.return_value.first.return_value = self.mockk
self.mock_user.return_value.is_validate.return_value = True
def test_create_zone(
self,
client,
common_data_mock,
zone_data,
basic_auth_user_headers,
created_zone_data
):
with patch('app.lib.helper.requests.request') as mock_post, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {}
mock_domain.return_value.update.return_value = True
res = client.post(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
def test_get_multiple_zones(
self,
client,
common_data_mock,
zone_data,
basic_auth_user_headers
):
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
self.mockk.get_domains.return_value = [test_domain]
res = client.get(
"/api/v1/pdnsadmin/zones",
headers=basic_auth_user_headers
)
data = res.get_json(force=True)
fake_domain = namedtuple(
"Domain",
data[0].keys()
)(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
def test_delete_zone(
self,
client,
common_data_mock,
zone_data,
basic_auth_user_headers
):
with patch('app.lib.utils.requests.request') as mock_delete, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_domain.return_value.update.return_value = True
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
self.mockk.get_domains.return_value = [test_domain]
mock_delete.return_value.status_code = 204
mock_delete.return_value.content = ''
zone_url_format = "/api/v1/pdnsadmin/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=basic_auth_user_headers
)
assert res.status_code == 204

View File

@ -0,0 +1,145 @@
import os
import pytest
from unittest.mock import patch, MagicMock
import sys
import json
from base64 import b64encode
from collections import namedtuple
import logging as logger
sys.path.append(os.getcwd())
import app
from app.validators import validate_zone
from app.models import Setting, Domain, ApiKey, Role
from app.schema import DomainSchema, ApiKeySchema
from tests.fixtures import client, initial_data, created_zone_data
from tests.fixtures import user_apikey, zone_data
from tests.fixtures import user_apikey_data, load_data
class TestUnitApiZoneUserApiKey(object):
@pytest.fixture
def common_data_mock(self):
self.oauth_setting_patcher = patch(
'app.oauth.Setting',
spec=app.models.Setting
)
self.views_setting_patcher = patch(
'app.views.Setting',
spec=app.models.Setting
)
self.helpers_setting_patcher = patch(
'app.lib.helper.Setting',
spec=app.models.Setting
)
self.models_setting_patcher = patch(
'app.models.Setting',
spec=app.models.Setting
)
self.mock_apikey_patcher = patch(
'app.decorators.ApiKey',
spec=app.models.ApiKey
)
self.mock_hist_patcher = patch(
'app.blueprints.api.History',
spec=app.models.History
)
self.mock_oauth_setting = self.oauth_setting_patcher.start()
self.mock_views_setting = self.views_setting_patcher.start()
self.mock_helpers_setting = self.helpers_setting_patcher.start()
self.mock_models_setting = self.models_setting_patcher.start()
self.mock_apikey = self.mock_apikey_patcher.start()
self.mock_hist = self.mock_hist_patcher.start()
self.mock_oauth_setting.return_value.get.side_effect = load_data
self.mock_views_setting.return_value.get.side_effect = load_data
self.mock_helpers_setting.return_value.get.side_effect = load_data
self.mock_models_setting.return_value.get.side_effect = load_data
data = user_apikey_data()
domain = Domain(name=data['domains'][0])
api_key = ApiKey(
desc=data['description'],
role_name=data['role'],
domains=[domain]
)
api_key.role = Role(name=data['role'])
self.mock_apikey.return_value.is_validate.return_value = api_key
def test_create_zone(
self,
client,
common_data_mock,
zone_data,
user_apikey,
created_zone_data
):
with patch('app.lib.helper.requests.request') as mock_post, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_post.return_value.status_code = 201
mock_post.return_value.content = json.dumps(created_zone_data)
mock_post.return_value.headers = {}
mock_domain.return_value.update.return_value = True
res = client.post(
"/api/v1/servers/localhost/zones",
headers=user_apikey,
data=json.dumps(zone_data),
content_type="application/json"
)
data = res.get_json(force=True)
data['rrsets'] = []
validate_zone(data)
assert res.status_code == 201
def test_get_multiple_zones(
self,
client,
common_data_mock,
zone_data,
user_apikey
):
with patch('app.blueprints.api.Domain') as mock_domain:
test_domain = Domain(1, name=zone_data['name'].rstrip("."))
mock_domain.query.all.return_value = [test_domain]
res = client.get(
"/api/v1/servers/localhost/zones",
headers=user_apikey
)
data = res.get_json(force=True)
fake_domain = namedtuple(
"Domain",
data[0].keys()
)(*data[0].values())
domain_schema = DomainSchema(many=True)
json.dumps(domain_schema.dump([fake_domain]))
assert res.status_code == 200
def test_delete_zone(
self,
client,
common_data_mock,
zone_data,
user_apikey
):
with patch('app.lib.utils.requests.request') as mock_delete, \
patch('app.blueprints.api.Domain') as mock_domain:
mock_domain.return_value.update.return_value = True
mock_delete.return_value.status_code = 204
mock_delete.return_value.content = ''
zone_url_format = "/api/v1/servers/localhost/zones/{0}"
zone_url = zone_url_format.format(zone_data['name'].rstrip("."))
res = client.delete(
zone_url,
headers=user_apikey
)
assert res.status_code == 204