From a3fd856dd85ae23d53362086d91dc4960c3e87e7 Mon Sep 17 00:00:00 2001 From: Khanh Ngo Date: Fri, 19 Jun 2020 08:47:51 +0700 Subject: [PATCH] Code refactoring and bug fixes --- powerdnsadmin/default_config.py | 6 +- powerdnsadmin/lib/utils.py | 1 + powerdnsadmin/models/account_user.py | 2 +- powerdnsadmin/models/api_key.py | 4 +- powerdnsadmin/models/domain.py | 215 ++++++++++++------------- powerdnsadmin/models/domain_setting.py | 4 +- powerdnsadmin/models/domain_user.py | 2 +- powerdnsadmin/models/history.py | 2 + powerdnsadmin/models/record.py | 65 ++++---- powerdnsadmin/models/record_entry.py | 2 +- powerdnsadmin/models/role.py | 2 +- powerdnsadmin/models/server.py | 9 +- powerdnsadmin/models/setting.py | 2 + powerdnsadmin/routes/user.py | 2 +- 14 files changed, 152 insertions(+), 166 deletions(-) diff --git a/powerdnsadmin/default_config.py b/powerdnsadmin/default_config.py index 7b82935..ce09bcc 100644 --- a/powerdnsadmin/default_config.py +++ b/powerdnsadmin/default_config.py @@ -16,12 +16,12 @@ SQLA_DB_HOST = '127.0.0.1' SQLA_DB_NAME = 'pda' SQLALCHEMY_TRACK_MODIFICATIONS = True -### DATBASE - MySQL +### DATABASE - MySQL SQLALCHEMY_DATABASE_URI = 'mysql://'+SQLA_DB_USER+':'+SQLA_DB_PASSWORD+'@'+SQLA_DB_HOST+'/'+SQLA_DB_NAME -### DATABSE - SQLite +### DATABASE - SQLite # SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'pdns.db') # SAML Authnetication SAML_ENABLED = False -SAML_ASSERTION_ENCRYPTED = True \ No newline at end of file +SAML_ASSERTION_ENCRYPTED = True diff --git a/powerdnsadmin/lib/utils.py b/powerdnsadmin/lib/utils.py index 82e6b6b..43c7617 100644 --- a/powerdnsadmin/lib/utils.py +++ b/powerdnsadmin/lib/utils.py @@ -221,6 +221,7 @@ def ensure_list(l): yield from l + class customBoxes: boxes = { "reverse": (" ", " "), diff --git a/powerdnsadmin/models/account_user.py b/powerdnsadmin/models/account_user.py index a582884..81bb43a 100644 --- a/powerdnsadmin/models/account_user.py +++ b/powerdnsadmin/models/account_user.py @@ -14,4 +14,4 @@ class AccountUser(db.Model): self.user_id = user_id def __repr__(self): - return ''.format(self.account_id, self.user_id) \ No newline at end of file + return ''.format(self.account_id, self.user_id) diff --git a/powerdnsadmin/models/api_key.py b/powerdnsadmin/models/api_key.py index 52602c0..9c7d9d3 100644 --- a/powerdnsadmin/models/api_key.py +++ b/powerdnsadmin/models/api_key.py @@ -91,9 +91,9 @@ class ApiKey(db.Model): current_app.config.get('SALT').encode('utf-8')) def check_password(self, hashed_password): - # Check hased password. Using bcrypt, + # Check hashed password. Using bcrypt, # the salt is saved into the hash itself - if (self.plain_text_password): + if self.plain_text_password: return bcrypt.checkpw(self.plain_text_password.encode('utf-8'), hashed_password.encode('utf-8')) return False diff --git a/powerdnsadmin/models/domain.py b/powerdnsadmin/models/domain.py index d17d31c..40453b5 100644 --- a/powerdnsadmin/models/domain.py +++ b/powerdnsadmin/models/domain.py @@ -74,23 +74,21 @@ class Domain(db.Model): """ Get all domains which has in PowerDNS """ - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} jdata = utils.fetch_json(urljoin( self.PDNS_STATS_URL, self.API_EXTENDED_URL + - '/servers/localhost/zones/{0}'.format(domain_name)), - headers=headers, - timeout=int( - Setting().get('pdns_api_timeout')), - verify=Setting().get('verify_ssl_connections')) + '/servers/localhost/zones/{0}'.format(domain_name)), + headers=headers, + timeout=int( + Setting().get('pdns_api_timeout')), + verify=Setting().get('verify_ssl_connections')) return jdata def get_domains(self): """ Get all domains which has in PowerDNS """ - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} jdata = utils.fetch_json( urljoin(self.PDNS_STATS_URL, self.API_EXTENDED_URL + '/servers/localhost/zones'), @@ -120,8 +118,7 @@ class Domain(db.Model): dict_db_domain = dict((x.name, x) for x in db_domain) current_app.logger.info("Found {} entries in PowerDNS-Admin".format( len(list_db_domain))) - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} try: jdata = utils.fetch_json( urljoin(self.PDNS_STATS_URL, @@ -211,8 +208,7 @@ class Domain(db.Model): Add a domain to power dns """ - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} domain_name = domain_name + '.' domain_ns = [ns + '.' for ns in domain_ns] @@ -262,15 +258,14 @@ class Domain(db.Model): """ Read Domain from PowerDNS and add into PDNS-Admin """ - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} if not domain: try: domain = utils.fetch_json( urljoin( self.PDNS_STATS_URL, self.API_EXTENDED_URL + - '/servers/localhost/zones/{0}'.format( - domain_dict['name'])), + '/servers/localhost/zones/{0}'.format( + domain_dict['name'])), headers=headers, timeout=int(Setting().get('pdns_api_timeout')), verify=Setting().get('verify_ssl_connections')) @@ -315,8 +310,8 @@ class Domain(db.Model): domain = Domain.query.filter(Domain.name == domain_name).first() if not domain: return {'status': 'error', 'msg': 'Domain does not exist.'} - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + + headers = {'X-API-Key': self.PDNS_API_KEY} if soa_edit_api not in ["DEFAULT", "INCREASE", "EPOCH", "OFF"]: soa_edit_api = 'DEFAULT' @@ -329,13 +324,13 @@ class Domain(db.Model): try: jdata = utils.fetch_json(urljoin( self.PDNS_STATS_URL, self.API_EXTENDED_URL + - '/servers/localhost/zones/{0}'.format(domain.name)), - headers=headers, - timeout=int( - Setting().get('pdns_api_timeout')), - method='PUT', - verify=Setting().get('verify_ssl_connections'), - data=post_data) + '/servers/localhost/zones/{0}'.format(domain.name)), + headers=headers, + timeout=int( + Setting().get('pdns_api_timeout')), + method='PUT', + verify=Setting().get('verify_ssl_connections'), + data=post_data) if 'error' in jdata.keys(): current_app.logger.error(jdata['error']) return {'status': 'error', 'msg': jdata['error']} @@ -365,21 +360,21 @@ class Domain(db.Model): domain = Domain.query.filter(Domain.name == domain_name).first() if not domain: return {'status': 'error', 'msg': 'Domain does not exist.'} - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + + headers = {'X-API-Key': self.PDNS_API_KEY} post_data = {"kind": kind, "masters": masters} try: jdata = utils.fetch_json(urljoin( self.PDNS_STATS_URL, self.API_EXTENDED_URL + - '/servers/localhost/zones/{0}'.format(domain.name)), - headers=headers, - timeout=int( - Setting().get('pdns_api_timeout')), - method='PUT', - verify=Setting().get('verify_ssl_connections'), - data=post_data) + '/servers/localhost/zones/{0}'.format(domain.name)), + headers=headers, + timeout=int( + Setting().get('pdns_api_timeout')), + method='PUT', + verify=Setting().get('verify_ssl_connections'), + data=post_data) if 'error' in jdata.keys(): current_app.logger.error(jdata['error']) return {'status': 'error', 'msg': jdata['error']} @@ -410,27 +405,27 @@ class Domain(db.Model): domain_obj = Domain.query.filter(Domain.name == domain_name).first() domain_auto_ptr = DomainSetting.query.filter( DomainSetting.domain == domain_obj).filter( - DomainSetting.setting == 'auto_ptr').first() + DomainSetting.setting == 'auto_ptr').first() domain_auto_ptr = strtobool( domain_auto_ptr.value) if domain_auto_ptr else False system_auto_ptr = Setting().get('auto_ptr') self.name = domain_name domain_id = self.get_id_by_name(domain_reverse_name) - if None == domain_id and \ - ( - system_auto_ptr or - domain_auto_ptr - ): - result = self.add(domain_reverse_name, 'Master', 'DEFAULT', '', '') + if domain_id is None and \ + ( + system_auto_ptr or + domain_auto_ptr + ): + result = self.add(domain_reverse_name, 'Master', 'DEFAULT', [], []) self.update() if result['status'] == 'ok': history = History(msg='Add reverse lookup domain {0}'.format( domain_reverse_name), - detail=str({ - 'domain_type': 'Master', - 'domain_master_ips': '' - }), - created_by='System') + detail=str({ + 'domain_type': 'Master', + 'domain_master_ips': '' + }), + created_by='System') history.add() else: return { @@ -443,9 +438,9 @@ class Domain(db.Model): self.grant_privileges(domain_user_ids) return { 'status': - 'ok', + 'ok', 'msg': - 'New reverse lookup domain created with granted privileges' + 'New reverse lookup domain created with granted privileges' } return { 'status': 'ok', @@ -497,16 +492,15 @@ class Domain(db.Model): """ Delete a single domain name from powerdns """ - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} utils.fetch_json(urljoin( self.PDNS_STATS_URL, self.API_EXTENDED_URL + - '/servers/localhost/zones/{0}'.format(domain_name)), - headers=headers, - timeout=int(Setting().get('pdns_api_timeout')), - method='DELETE', - verify=Setting().get('verify_ssl_connections')) + '/servers/localhost/zones/{0}'.format(domain_name)), + headers=headers, + timeout=int(Setting().get('pdns_api_timeout')), + method='DELETE', + verify=Setting().get('verify_ssl_connections')) current_app.logger.info( 'Deleted domain successfully from PowerDNS: {0}'.format( domain_name)) @@ -540,8 +534,8 @@ class Domain(db.Model): user_ids = [] query = db.session.query( DomainUser, Domain).filter(User.id == DomainUser.user_id).filter( - Domain.id == DomainUser.domain_id).filter( - Domain.name == self.name).all() + Domain.id == DomainUser.domain_id).filter( + Domain.name == self.name).all() for q in query: user_ids.append(q[0].user_id) return user_ids @@ -566,7 +560,7 @@ class Domain(db.Model): db.session.rollback() current_app.logger.error( 'Cannot revoke user privileges on domain {0}. DETAIL: {1}'. - format(self.name, e)) + format(self.name, e)) current_app.logger.debug(print(traceback.format_exc())) try: @@ -578,7 +572,7 @@ class Domain(db.Model): db.session.rollback() current_app.logger.error( 'Cannot grant user privileges to domain {0}. DETAIL: {1}'. - format(self.name, e)) + format(self.name, e)) current_app.logger.debug(print(traceback.format_exc())) def update_from_master(self, domain_name): @@ -587,27 +581,26 @@ class Domain(db.Model): """ domain = Domain.query.filter(Domain.name == domain_name).first() if domain: - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} try: r = utils.fetch_json(urljoin( self.PDNS_STATS_URL, self.API_EXTENDED_URL + - '/servers/localhost/zones/{0}/axfr-retrieve'.format( - domain.name)), - headers=headers, - timeout=int( - Setting().get('pdns_api_timeout')), - method='PUT', - verify=Setting().get('verify_ssl_connections')) + '/servers/localhost/zones/{0}/axfr-retrieve'.format( + domain.name)), + headers=headers, + timeout=int( + Setting().get('pdns_api_timeout')), + method='PUT', + verify=Setting().get('verify_ssl_connections')) return {'status': 'ok', 'msg': r.get('result')} except Exception as e: current_app.logger.error( 'Cannot update from master. DETAIL: {0}'.format(e)) return { 'status': - 'error', + 'error', 'msg': - 'There was something wrong, please contact administrator' + 'There was something wrong, please contact administrator' } else: return {'status': 'error', 'msg': 'This domain does not exist'} @@ -618,14 +611,13 @@ class Domain(db.Model): """ domain = Domain.query.filter(Domain.name == domain_name).first() if domain: - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} try: jdata = utils.fetch_json( urljoin( self.PDNS_STATS_URL, self.API_EXTENDED_URL + - '/servers/localhost/zones/{0}/cryptokeys'.format( - domain.name)), + '/servers/localhost/zones/{0}/cryptokeys'.format( + domain.name)), headers=headers, timeout=int(Setting().get('pdns_api_timeout')), method='GET', @@ -642,9 +634,9 @@ class Domain(db.Model): 'Cannot get domain dnssec. DETAIL: {0}'.format(e)) return { 'status': - 'error', + 'error', 'msg': - 'There was something wrong, please contact administrator' + 'There was something wrong, please contact administrator' } else: return {'status': 'error', 'msg': 'This domain does not exist'} @@ -655,15 +647,14 @@ class Domain(db.Model): """ domain = Domain.query.filter(Domain.name == domain_name).first() if domain: - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} try: # Enable API-RECTIFY for domain, BEFORE activating DNSSEC post_data = {"api_rectify": True} jdata = utils.fetch_json( urljoin( self.PDNS_STATS_URL, self.API_EXTENDED_URL + - '/servers/localhost/zones/{0}'.format(domain.name)), + '/servers/localhost/zones/{0}'.format(domain.name)), headers=headers, timeout=int(Setting().get('pdns_api_timeout')), method='PUT', @@ -673,7 +664,7 @@ class Domain(db.Model): return { 'status': 'error', 'msg': - 'API-RECTIFY could not be enabled for this domain', + 'API-RECTIFY could not be enabled for this domain', 'jdata': jdata } @@ -682,8 +673,8 @@ class Domain(db.Model): jdata = utils.fetch_json( urljoin( self.PDNS_STATS_URL, self.API_EXTENDED_URL + - '/servers/localhost/zones/{0}/cryptokeys'.format( - domain.name)), + '/servers/localhost/zones/{0}/cryptokeys'.format( + domain.name)), headers=headers, timeout=int(Setting().get('pdns_api_timeout')), method='POST', @@ -692,12 +683,12 @@ class Domain(db.Model): if 'error' in jdata: return { 'status': - 'error', + 'error', 'msg': - 'Cannot enable DNSSEC for this domain. Error: {0}'. - format(jdata['error']), + 'Cannot enable DNSSEC for this domain. Error: {0}'. + format(jdata['error']), 'jdata': - jdata + jdata } return {'status': 'ok'} @@ -708,9 +699,9 @@ class Domain(db.Model): current_app.logger.debug(traceback.format_exc()) return { 'status': - 'error', + 'error', 'msg': - 'There was something wrong, please contact administrator' + 'There was something wrong, please contact administrator' } else: @@ -722,15 +713,14 @@ class Domain(db.Model): """ domain = Domain.query.filter(Domain.name == domain_name).first() if domain: - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} try: # Deactivate DNSSEC jdata = utils.fetch_json( urljoin( self.PDNS_STATS_URL, self.API_EXTENDED_URL + - '/servers/localhost/zones/{0}/cryptokeys/{1}'.format( - domain.name, key_id)), + '/servers/localhost/zones/{0}/cryptokeys/{1}'.format( + domain.name, key_id)), headers=headers, timeout=int(Setting().get('pdns_api_timeout')), method='DELETE', @@ -738,12 +728,12 @@ class Domain(db.Model): if jdata != True: return { 'status': - 'error', + 'error', 'msg': - 'Cannot disable DNSSEC for this domain. Error: {0}'. - format(jdata['error']), + 'Cannot disable DNSSEC for this domain. Error: {0}'. + format(jdata['error']), 'jdata': - jdata + jdata } # Disable API-RECTIFY for domain, AFTER deactivating DNSSEC @@ -751,7 +741,7 @@ class Domain(db.Model): jdata = utils.fetch_json( urljoin( self.PDNS_STATS_URL, self.API_EXTENDED_URL + - '/servers/localhost/zones/{0}'.format(domain.name)), + '/servers/localhost/zones/{0}'.format(domain.name)), headers=headers, timeout=int(Setting().get('pdns_api_timeout')), method='PUT', @@ -761,7 +751,7 @@ class Domain(db.Model): return { 'status': 'error', 'msg': - 'API-RECTIFY could not be disabled for this domain', + 'API-RECTIFY could not be disabled for this domain', 'jdata': jdata } @@ -774,7 +764,7 @@ class Domain(db.Model): return { 'status': 'error', 'msg': - 'There was something wrong, please contact administrator', + 'There was something wrong, please contact administrator', 'domain': domain.name, 'id': key_id } @@ -797,8 +787,7 @@ class Domain(db.Model): if not domain: return {'status': False, 'msg': 'Domain does not exist'} - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} account_name = Account().get_name_by_id(account_id) @@ -807,13 +796,13 @@ class Domain(db.Model): try: jdata = utils.fetch_json(urljoin( self.PDNS_STATS_URL, self.API_EXTENDED_URL + - '/servers/localhost/zones/{0}'.format(domain_name)), - headers=headers, - timeout=int( - Setting().get('pdns_api_timeout')), - method='PUT', - verify=Setting().get('verify_ssl_connections'), - data=post_data) + '/servers/localhost/zones/{0}'.format(domain_name)), + headers=headers, + timeout=int( + Setting().get('pdns_api_timeout')), + method='PUT', + verify=Setting().get('verify_ssl_connections'), + data=post_data) if 'error' in jdata.keys(): current_app.logger.error(jdata['error']) @@ -852,7 +841,7 @@ class Domain(db.Model): .outerjoin(Account, Domain.account_id == Account.id) \ .outerjoin(AccountUser, Account.id == AccountUser.account_id) \ .filter( - db.or_( - DomainUser.user_id == user_id, - AccountUser.user_id == user_id - )).filter(Domain.id == self.id).first() + db.or_( + DomainUser.user_id == user_id, + AccountUser.user_id == user_id + )).filter(Domain.id == self.id).first() diff --git a/powerdnsadmin/models/domain_setting.py b/powerdnsadmin/models/domain_setting.py index 849239c..91dcb2e 100644 --- a/powerdnsadmin/models/domain_setting.py +++ b/powerdnsadmin/models/domain_setting.py @@ -1,3 +1,5 @@ +import traceback + from flask import current_app from .base import db @@ -32,4 +34,4 @@ class DomainSetting(db.Model): 'Unable to set DomainSetting value. DETAIL: {0}'.format(e)) current_app.logger.debug(traceback.format_exc()) db.session.rollback() - return False \ No newline at end of file + return False diff --git a/powerdnsadmin/models/domain_user.py b/powerdnsadmin/models/domain_user.py index e556339..7b24a53 100644 --- a/powerdnsadmin/models/domain_user.py +++ b/powerdnsadmin/models/domain_user.py @@ -14,4 +14,4 @@ class DomainUser(db.Model): self.user_id = user_id def __repr__(self): - return ''.format(self.domain_id, self.user_id) \ No newline at end of file + return ''.format(self.domain_id, self.user_id) diff --git a/powerdnsadmin/models/history.py b/powerdnsadmin/models/history.py index 70ef43a..709a911 100644 --- a/powerdnsadmin/models/history.py +++ b/powerdnsadmin/models/history.py @@ -1,3 +1,5 @@ +import traceback + from flask import current_app from datetime import datetime diff --git a/powerdnsadmin/models/record.py b/powerdnsadmin/models/record.py index 2a9ff71..201ad82 100644 --- a/powerdnsadmin/models/record.py +++ b/powerdnsadmin/models/record.py @@ -14,12 +14,11 @@ from .setting import Setting from .domain import Domain from .domain_setting import DomainSetting -def byRecordContent(e): - return e['content'] -def byRecordContentPair(e): +def by_record_content_pair(e): return e[0]['content'] + class Record(object): """ This is not a model, it's just an object @@ -49,8 +48,7 @@ class Record(object): """ Query domain's rrsets via PDNS API """ - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} try: jdata = utils.fetch_json(urljoin( self.PDNS_STATS_URL, self.API_EXTENDED_URL + @@ -68,8 +66,8 @@ class Record(object): rrsets=[] for r in jdata['rrsets']: while len(r['comments'])'.format(self.name) \ No newline at end of file + return ''.format(self.name) diff --git a/powerdnsadmin/models/server.py b/powerdnsadmin/models/server.py index cdcd6f0..ce5b27f 100644 --- a/powerdnsadmin/models/server.py +++ b/powerdnsadmin/models/server.py @@ -24,8 +24,7 @@ class Server(object): """ Get server config """ - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} try: jdata = utils.fetch_json(urljoin( @@ -46,8 +45,7 @@ class Server(object): """ Get server statistics """ - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} try: jdata = utils.fetch_json(urljoin( @@ -68,8 +66,7 @@ class Server(object): """ Search zone/record/comment directly from PDNS API """ - headers = {} - headers['X-API-Key'] = self.PDNS_API_KEY + headers = {'X-API-Key': self.PDNS_API_KEY} try: jdata = utils.fetch_json(urljoin( diff --git a/powerdnsadmin/models/setting.py b/powerdnsadmin/models/setting.py index 0f6f35d..9e5aba3 100644 --- a/powerdnsadmin/models/setting.py +++ b/powerdnsadmin/models/setting.py @@ -1,4 +1,6 @@ import sys +import traceback + import pytimeparse from ast import literal_eval from distutils.util import strtobool diff --git a/powerdnsadmin/routes/user.py b/powerdnsadmin/routes/user.py index b623736..a6abce3 100644 --- a/powerdnsadmin/routes/user.py +++ b/powerdnsadmin/routes/user.py @@ -50,7 +50,7 @@ def profile(): 'password'] if 'password' in request.form else '' else: firstname = lastname = email = new_password = '' - logging.warning( + current_app.logger.warning( 'Authenticated externally. User {0} information will not allowed to update the profile' .format(current_user.username))