powerdns-admin/powerdnsadmin/lib/utils.py

258 lines
7.2 KiB
Python
Raw Normal View History

import logging
2015-12-13 09:34:12 +00:00
import re
import json
import requests
2016-07-13 14:33:21 +00:00
import hashlib
import ipaddress
2016-07-13 14:33:21 +00:00
2020-05-25 13:16:33 +00:00
from collections.abc import Iterable
from distutils.version import StrictVersion
2018-03-30 06:49:35 +00:00
from urllib.parse import urlparse
2021-03-16 18:37:05 +00:00
from datetime import datetime, timedelta
2018-03-30 06:49:35 +00:00
2015-12-13 09:34:12 +00:00
def auth_from_url(url):
auth = None
2018-03-30 06:49:35 +00:00
parsed_url = urlparse(url).netloc
2015-12-13 09:34:12 +00:00
if '@' in parsed_url:
auth = parsed_url.split('@')[0].split(':')
auth = requests.auth.HTTPBasicAuth(auth[0], auth[1])
return auth
def fetch_remote(remote_url,
method='GET',
data=None,
accept=None,
params=None,
timeout=None,
headers=None,
verify=True):
2015-12-13 09:34:12 +00:00
if data is not None and type(data) != str:
data = json.dumps(data)
verify = bool(verify) # enforce type boolean
2015-12-13 09:34:12 +00:00
our_headers = {
'user-agent': 'powerdnsadmin/0',
'pragma': 'no-cache',
'cache-control': 'no-cache'
}
if accept is not None:
our_headers['accept'] = accept
if headers is not None:
our_headers.update(headers)
r = requests.request(method,
remote_url,
headers=headers,
verify=verify,
auth=auth_from_url(remote_url),
timeout=timeout,
data=data,
params=params)
logging.debug(
'Querying remote server "{0}" ({1}) finished with code {2} (took {3}s)'
.format(remote_url, method, r.status_code, r.elapsed.total_seconds()))
2015-12-13 09:34:12 +00:00
try:
if r.status_code not in (200, 201, 204, 400, 409, 422):
2015-12-13 09:34:12 +00:00
r.raise_for_status()
except Exception as e:
msg = "Returned status {0} and content {1}".format(r.status_code, r.text)
raise RuntimeError('Error while fetching {0}. {1}'.format(
remote_url, msg))
2015-12-13 09:34:12 +00:00
return r
def fetch_json(remote_url,
method='GET',
data=None,
params=None,
headers=None,
timeout=None,
verify=True):
r = fetch_remote(remote_url,
method=method,
data=data,
params=params,
headers=headers,
timeout=timeout,
verify=verify,
2018-03-27 23:52:48 +00:00
accept='application/json; q=1')
2015-12-13 09:34:12 +00:00
if method == "DELETE":
return True
if r.status_code == 204:
return {}
2019-12-04 04:50:46 +00:00
elif r.status_code == 409:
return {
'error': 'Resource already exists or conflict',
'http_code': r.status_code
}
2015-12-13 09:34:12 +00:00
try:
assert ('json' in r.headers['content-type'])
2015-12-13 09:34:12 +00:00
except Exception as e:
raise RuntimeError(
'Error while fetching {0}'.format(remote_url)) from e
2015-12-13 09:34:12 +00:00
# don't use r.json here, as it will read from r.text, which will trigger
# content encoding auto-detection in almost all cases, WHICH IS EXTREMELY
# SLOOOOOOOOOOOOOOOOOOOOOOW. just don't.
data = None
try:
2018-03-30 06:49:35 +00:00
data = json.loads(r.content.decode('utf-8'))
2021-10-30 19:06:44 +00:00
except UnicodeDecodeError:
# If the decoding fails, switch to slower but probably working .json()
try:
logging.warning("UTF-8 content.decode failed, switching to slower .json method")
data = r.json()
except Exception as e:
raise e
2018-03-30 06:49:35 +00:00
except Exception as e:
raise RuntimeError(
'Error while loading JSON data from {0}'.format(remote_url)) from e
2015-12-13 09:34:12 +00:00
return data
def display_record_name(data):
record_name, domain_name = data
if record_name == domain_name:
return '@'
else:
return re.sub('\.{}$'.format(domain_name), '', record_name)
2015-12-13 09:34:12 +00:00
2018-03-30 06:49:35 +00:00
2015-12-13 09:34:12 +00:00
def display_master_name(data):
"""
input data: "[u'127.0.0.1', u'8.8.8.8']"
"""
matches = re.findall(r'\'(.+?)\'', data)
return ", ".join(matches)
2018-03-30 06:49:35 +00:00
2015-12-13 09:34:12 +00:00
def display_time(amount, units='s', remove_seconds=True):
"""
Convert timestamp to normal time format
"""
amount = int(amount)
INTERVALS = [(lambda mlsec: divmod(mlsec, 1000), 'ms'),
(lambda seconds: divmod(seconds, 60), 's'),
(lambda minutes: divmod(minutes, 60), 'm'),
(lambda hours: divmod(hours, 24), 'h'),
(lambda days: divmod(days, 7), 'D'),
(lambda weeks: divmod(weeks, 4), 'W'),
(lambda years: divmod(years, 12), 'M'),
(lambda decades: divmod(decades, 10), 'Y')]
2015-12-13 09:34:12 +00:00
for index_start, (interval, unit) in enumerate(INTERVALS):
if unit == units:
break
amount_abrev = []
last_index = 0
amount_temp = amount
for index, (formula,
abrev) in enumerate(INTERVALS[index_start:len(INTERVALS)]):
2015-12-13 09:34:12 +00:00
divmod_result = formula(amount_temp)
amount_temp = divmod_result[0]
amount_abrev.append((divmod_result[1], abrev))
if divmod_result[1] > 0:
last_index = index
amount_abrev_partial = amount_abrev[0:last_index + 1]
2015-12-13 09:34:12 +00:00
amount_abrev_partial.reverse()
final_string = ''
for amount, abrev in amount_abrev_partial:
final_string += str(amount) + abrev + ' '
if remove_seconds and 'm' in final_string:
final_string = final_string[:final_string.rfind(' ')]
return final_string[:final_string.rfind(' ')]
return final_string
2018-03-30 06:49:35 +00:00
def pdns_api_extended_uri(version):
"""
Check the pdns version
"""
if StrictVersion(version) >= StrictVersion('4.0.0'):
return "/api/v1"
else:
return ""
2016-07-13 14:33:21 +00:00
2018-03-30 06:49:35 +00:00
def email_to_gravatar_url(email="", size=100):
"""
AD doesn't necessarily have email
"""
2018-04-09 11:50:55 +00:00
if email is None:
email = ""
2018-03-30 06:49:35 +00:00
hash_string = hashlib.md5(email.encode('utf-8')).hexdigest()
return "https://s.gravatar.com/avatar/{0}?s={1}".format(hash_string, size)
2017-10-31 18:21:22 +00:00
def display_setting_state(value):
if value == 1:
return "ON"
elif value == 0:
return "OFF"
else:
return "UNKNOWN"
def validate_ipaddress(address):
try:
ip = ipaddress.ip_address(address)
except ValueError:
pass
else:
if isinstance(ip, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
return [ip]
return []
2019-12-14 07:47:21 +00:00
def pretty_json(data):
return json.dumps(data, sort_keys=True, indent=4)
2020-05-25 13:16:33 +00:00
def ensure_list(l):
if not l:
l = []
elif not isinstance(l, Iterable) or isinstance(l, str):
l = [l]
yield from l
2020-06-19 01:47:51 +00:00
class customBoxes:
boxes = {
"reverse": (" ", " "),
"ip6arpa": ("ip6", "%.ip6.arpa"),
"inaddrarpa": ("in-addr", "%.in-addr.arpa")
}
order = ["reverse", "ip6arpa", "inaddrarpa"]
2021-03-16 18:37:05 +00:00
def pretty_domain_name(value):
"""
Display domain name in original format.
If it is IDN domain (Punycode starts with xn--), do the
idna decoding.
Note that any part of the domain name can be individually punycoded
"""
if isinstance(value, str):
if value.startswith('xn--') \
or value.find('.xn--') != -1:
2021-03-16 18:37:05 +00:00
try:
return value.encode().decode('idna')
except:
raise Exception("Cannot decode IDN domain")
else:
return value
else:
raise Exception("Require the Punycode in string format")