powerdns-admin/app/lib/utils.py
Chuyen Vo 1ccd98a364 Update utils.py
add timeout requests.request for large zones
2016-03-17 12:00:33 +07:00

132 lines
3.9 KiB
Python

import re
import sys
import json
import requests
import urlparse
from app import app
TIMEOUT = app.config['TIMEOUT']
def auth_from_url(url):
auth = None
parsed_url = urlparse.urlparse(url).netloc
if '@' in parsed_url:
auth = parsed_url.split('@')[0].split(':')
auth = requests.auth.HTTPBasicAuth(auth[0], auth[1])
return auth
def fetch_remote(remote_url, method='GET', data=None, accept=None, params=None, timeout=None, headers=None):
if data is not None and type(data) != str:
data = json.dumps(data)
if timeout is None:
timeout = TIMEOUT
verify = False
our_headers = {
'user-agent': 'powerdnsadmin/0',
'pragma': 'no-cache',
'cache-control': 'no-cache'
}
if accept is not None:
our_headers['accept'] = accept
if headers is not None:
our_headers.update(headers)
r = requests.request(
method,
remote_url,
headers=headers,
verify=verify,
auth=auth_from_url(remote_url),
timeout=timeout,
data=data,
params=params
)
try:
if r.status_code not in (200, 400, 422):
r.raise_for_status()
except Exception as e:
raise RuntimeError("While fetching " + remote_url + ": " + str(e)), None, sys.exc_info()[2]
return r
def fetch_json(remote_url, method='GET', data=None, params=None, headers=None):
r = fetch_remote(remote_url, method=method, data=data, params=params, headers=headers, accept='application/json; q=1')
if method == "DELETE":
return True
try:
assert('json' in r.headers['content-type'])
except Exception as e:
raise Exception("While fetching " + remote_url + ": " + str(e)), None, sys.exc_info()[2]
# don't use r.json here, as it will read from r.text, which will trigger
# content encoding auto-detection in almost all cases, WHICH IS EXTREMELY
# SLOOOOOOOOOOOOOOOOOOOOOOW. just don't.
data = None
try:
data = json.loads(r.content)
except UnicodeDecodeError:
data = json.loads(r.content, 'iso-8859-1')
return data
def display_record_name(data):
record_name, domain_name = data
if record_name == domain_name:
return '@'
else:
return record_name.replace('.'+domain_name, '')
def display_master_name(data):
"""
input data: "[u'127.0.0.1', u'8.8.8.8']"
"""
matches = re.findall(r'\'(.+?)\'', data)
return ", ".join(matches)
def display_time(amount, units='s', remove_seconds=True):
"""
Convert timestamp to normal time format
"""
amount = int(amount)
INTERVALS = [(lambda mlsec:divmod(mlsec, 1000), 'ms'),
(lambda seconds:divmod(seconds, 60), 's'),
(lambda minutes:divmod(minutes, 60), 'm'),
(lambda hours:divmod(hours, 24), 'h'),
(lambda days:divmod(days, 7), 'D'),
(lambda weeks:divmod(weeks, 4), 'W'),
(lambda years:divmod(years, 12), 'M'),
(lambda decades:divmod(decades, 10), 'Y')]
for index_start, (interval, unit) in enumerate(INTERVALS):
if unit == units:
break
amount_abrev = []
last_index = 0
amount_temp = amount
for index, (formula, abrev) in enumerate(INTERVALS[index_start: len(INTERVALS)]):
divmod_result = formula(amount_temp)
amount_temp = divmod_result[0]
amount_abrev.append((divmod_result[1], abrev))
if divmod_result[1] > 0:
last_index = index
amount_abrev_partial = amount_abrev[0: last_index + 1]
amount_abrev_partial.reverse()
final_string = ''
for amount, abrev in amount_abrev_partial:
final_string += str(amount) + abrev + ' '
if remove_seconds and 'm' in final_string:
final_string = final_string[:final_string.rfind(' ')]
return final_string[:final_string.rfind(' ')]
return final_string