mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-06-15 12:36:05 +00:00
Initial commit
This commit is contained in:
0
app/lib/__init__.py
Normal file
0
app/lib/__init__.py
Normal file
44
app/lib/log.py
Normal file
44
app/lib/log.py
Normal file
@ -0,0 +1,44 @@
|
||||
import os
|
||||
import logging
|
||||
|
||||
class logger(object):
|
||||
def __init__(self, name, level, logfile):
|
||||
self.name = name
|
||||
self.level = level
|
||||
self.logfile = logfile
|
||||
|
||||
def config(self):
|
||||
# define logger and set logging level
|
||||
logger = logging.getLogger()
|
||||
|
||||
if self.level == 'CRITICAL':
|
||||
level = logging.CRITICAL
|
||||
elif self.level == 'ERROR':
|
||||
level = logging.ERROR
|
||||
elif self.level == 'WARNING':
|
||||
level = logging.WARNING
|
||||
elif self.level == 'DEBUG':
|
||||
level = logging.DEBUG
|
||||
else:
|
||||
level = logging.INFO
|
||||
|
||||
logger.setLevel(level)
|
||||
|
||||
# set request requests module log level
|
||||
logging.getLogger("requests").setLevel(logging.CRITICAL)
|
||||
|
||||
# define handler to log into file
|
||||
file_log_handler = logging.FileHandler(self.logfile)
|
||||
logger.addHandler(file_log_handler)
|
||||
|
||||
# define handler to log into console
|
||||
stderr_log_handler = logging.StreamHandler()
|
||||
logger.addHandler(stderr_log_handler)
|
||||
|
||||
# define logging format for file and console
|
||||
file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
console_formatter = logging.Formatter('[%(levelname)s] %(message)s')
|
||||
file_log_handler.setFormatter(file_formatter)
|
||||
stderr_log_handler.setFormatter(console_formatter)
|
||||
|
||||
return logging.getLogger(self.name)
|
128
app/lib/utils.py
Normal file
128
app/lib/utils.py
Normal file
@ -0,0 +1,128 @@
|
||||
import re
|
||||
import sys
|
||||
import json
|
||||
import requests
|
||||
import urlparse
|
||||
|
||||
def auth_from_url(url):
|
||||
auth = None
|
||||
parsed_url = urlparse.urlparse(url).netloc
|
||||
if '@' in parsed_url:
|
||||
auth = parsed_url.split('@')[0].split(':')
|
||||
auth = requests.auth.HTTPBasicAuth(auth[0], auth[1])
|
||||
return auth
|
||||
|
||||
|
||||
def fetch_remote(remote_url, method='GET', data=None, accept=None, params=None, timeout=None, headers=None):
|
||||
if data is not None and type(data) != str:
|
||||
data = json.dumps(data)
|
||||
|
||||
if timeout is None:
|
||||
timeout = 1.5
|
||||
|
||||
verify = False
|
||||
|
||||
our_headers = {
|
||||
'user-agent': 'powerdnsadmin/0',
|
||||
'pragma': 'no-cache',
|
||||
'cache-control': 'no-cache'
|
||||
}
|
||||
if accept is not None:
|
||||
our_headers['accept'] = accept
|
||||
if headers is not None:
|
||||
our_headers.update(headers)
|
||||
|
||||
r = requests.request(
|
||||
method,
|
||||
remote_url,
|
||||
headers=headers,
|
||||
verify=verify,
|
||||
auth=auth_from_url(remote_url),
|
||||
timeout=timeout,
|
||||
data=data,
|
||||
params=params
|
||||
)
|
||||
try:
|
||||
if r.status_code not in (200, 400, 422):
|
||||
r.raise_for_status()
|
||||
except Exception as e:
|
||||
raise RuntimeError("While fetching " + remote_url + ": " + str(e)), None, sys.exc_info()[2]
|
||||
|
||||
return r
|
||||
|
||||
|
||||
def fetch_json(remote_url, method='GET', data=None, params=None, headers=None):
|
||||
r = fetch_remote(remote_url, method=method, data=data, params=params, headers=headers, accept='application/json; q=1')
|
||||
|
||||
if method == "DELETE":
|
||||
return True
|
||||
|
||||
try:
|
||||
assert('json' in r.headers['content-type'])
|
||||
except Exception as e:
|
||||
raise Exception("While fetching " + remote_url + ": " + str(e)), None, sys.exc_info()[2]
|
||||
|
||||
# don't use r.json here, as it will read from r.text, which will trigger
|
||||
# content encoding auto-detection in almost all cases, WHICH IS EXTREMELY
|
||||
# SLOOOOOOOOOOOOOOOOOOOOOOW. just don't.
|
||||
data = None
|
||||
try:
|
||||
data = json.loads(r.content)
|
||||
except UnicodeDecodeError:
|
||||
data = json.loads(r.content, 'iso-8859-1')
|
||||
return data
|
||||
|
||||
|
||||
def display_record_name(data):
|
||||
record_name, domain_name = data
|
||||
if record_name == domain_name:
|
||||
return '@'
|
||||
else:
|
||||
return record_name.replace('.'+domain_name, '')
|
||||
|
||||
def display_master_name(data):
|
||||
"""
|
||||
input data: "[u'127.0.0.1', u'8.8.8.8']"
|
||||
"""
|
||||
matches = re.findall(r'\'(.+?)\'', data)
|
||||
return ", ".join(matches)
|
||||
|
||||
def display_time(amount, units='s', remove_seconds=True):
|
||||
"""
|
||||
Convert timestamp to normal time format
|
||||
"""
|
||||
amount = int(amount)
|
||||
INTERVALS = [(lambda mlsec:divmod(mlsec, 1000), 'ms'),
|
||||
(lambda seconds:divmod(seconds, 60), 's'),
|
||||
(lambda minutes:divmod(minutes, 60), 'm'),
|
||||
(lambda hours:divmod(hours, 24), 'h'),
|
||||
(lambda days:divmod(days, 7), 'D'),
|
||||
(lambda weeks:divmod(weeks, 4), 'W'),
|
||||
(lambda years:divmod(years, 12), 'M'),
|
||||
(lambda decades:divmod(decades, 10), 'Y')]
|
||||
|
||||
for index_start, (interval, unit) in enumerate(INTERVALS):
|
||||
if unit == units:
|
||||
break
|
||||
|
||||
amount_abrev = []
|
||||
last_index = 0
|
||||
amount_temp = amount
|
||||
for index, (formula, abrev) in enumerate(INTERVALS[index_start: len(INTERVALS)]):
|
||||
divmod_result = formula(amount_temp)
|
||||
amount_temp = divmod_result[0]
|
||||
amount_abrev.append((divmod_result[1], abrev))
|
||||
if divmod_result[1] > 0:
|
||||
last_index = index
|
||||
amount_abrev_partial = amount_abrev[0: last_index + 1]
|
||||
amount_abrev_partial.reverse()
|
||||
|
||||
final_string = ''
|
||||
for amount, abrev in amount_abrev_partial:
|
||||
final_string += str(amount) + abrev + ' '
|
||||
|
||||
if remove_seconds and 'm' in final_string:
|
||||
final_string = final_string[:final_string.rfind(' ')]
|
||||
return final_string[:final_string.rfind(' ')]
|
||||
|
||||
return final_string
|
Reference in New Issue
Block a user