mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-06-16 04:56:05 +00:00
Merge branch 'PowerDNS-Admin:master' into shine/config_table_key_uniqueness
This commit is contained in:
@ -3,6 +3,7 @@ from flask import current_app
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from ..lib import utils
|
||||
from ..lib.errors import InvalidAccountNameException
|
||||
from .base import db
|
||||
from .setting import Setting
|
||||
from .user import User
|
||||
@ -22,7 +23,7 @@ class Account(db.Model):
|
||||
back_populates="accounts")
|
||||
|
||||
def __init__(self, name=None, description=None, contact=None, mail=None):
|
||||
self.name = name
|
||||
self.name = Account.sanitize_name(name) if name is not None else name
|
||||
self.description = description
|
||||
self.contact = contact
|
||||
self.mail = mail
|
||||
@ -33,9 +34,30 @@ class Account(db.Model):
|
||||
self.PDNS_VERSION = Setting().get('pdns_version')
|
||||
self.API_EXTENDED_URL = utils.pdns_api_extended_uri(self.PDNS_VERSION)
|
||||
|
||||
if self.name is not None:
|
||||
self.name = ''.join(c for c in self.name.lower()
|
||||
if c in "abcdefghijklmnopqrstuvwxyz0123456789")
|
||||
|
||||
@staticmethod
|
||||
def sanitize_name(name):
|
||||
"""
|
||||
Formats the provided name to fit into the constraint
|
||||
"""
|
||||
if not isinstance(name, str):
|
||||
raise InvalidAccountNameException("Account name must be a string")
|
||||
|
||||
allowed_characters = "abcdefghijklmnopqrstuvwxyz0123456789"
|
||||
|
||||
if Setting().get('account_name_extra_chars'):
|
||||
allowed_characters += "_-."
|
||||
|
||||
sanitized_name = ''.join(c for c in name.lower() if c in allowed_characters)
|
||||
|
||||
if len(sanitized_name) > Account.name.type.length:
|
||||
current_app.logger.error("Account name {0} too long. Truncated to: {1}".format(
|
||||
sanitized_name, sanitized_name[:Account.name.type.length]))
|
||||
|
||||
if not sanitized_name:
|
||||
raise InvalidAccountNameException("Empty string is not a valid account name")
|
||||
|
||||
return sanitized_name[:Account.name.type.length]
|
||||
|
||||
def __repr__(self):
|
||||
return '<Account {0}r>'.format(self.name)
|
||||
@ -68,11 +90,9 @@ class Account(db.Model):
|
||||
"""
|
||||
Create a new account
|
||||
"""
|
||||
# Sanity check - account name
|
||||
if self.name == "":
|
||||
return {'status': False, 'msg': 'No account name specified'}
|
||||
self.name = Account.sanitize_name(self.name)
|
||||
|
||||
# check that account name is not already used
|
||||
# Check that account name is not already used
|
||||
account = Account.query.filter(Account.name == self.name).first()
|
||||
if account:
|
||||
return {'status': False, 'msg': 'Account already exists'}
|
||||
|
@ -60,31 +60,31 @@ class ApiKey(db.Model):
|
||||
|
||||
def update(self, role_name=None, description=None, domains=None, accounts=None):
|
||||
try:
|
||||
if role_name:
|
||||
role = Role.query.filter(Role.name == role_name).first()
|
||||
self.role_id = role.id
|
||||
if role_name:
|
||||
role = Role.query.filter(Role.name == role_name).first()
|
||||
self.role_id = role.id
|
||||
|
||||
if description:
|
||||
self.description = description
|
||||
if description:
|
||||
self.description = description
|
||||
|
||||
if domains is not None:
|
||||
domain_object_list = Domain.query \
|
||||
.filter(Domain.name.in_(domains)) \
|
||||
.all()
|
||||
self.domains[:] = domain_object_list
|
||||
if domains is not None:
|
||||
domain_object_list = Domain.query \
|
||||
.filter(Domain.name.in_(domains)) \
|
||||
.all()
|
||||
self.domains[:] = domain_object_list
|
||||
|
||||
if accounts is not None:
|
||||
account_object_list = Account.query \
|
||||
.filter(Account.name.in_(accounts)) \
|
||||
.all()
|
||||
self.accounts[:] = account_object_list
|
||||
if accounts is not None:
|
||||
account_object_list = Account.query \
|
||||
.filter(Account.name.in_(accounts)) \
|
||||
.all()
|
||||
self.accounts[:] = account_object_list
|
||||
|
||||
db.session.commit()
|
||||
db.session.commit()
|
||||
except Exception as e:
|
||||
msg_str = 'Update of apikey failed. Error: {0}'
|
||||
current_app.logger.error(msg_str.format(e))
|
||||
db.session.rollback
|
||||
raise e
|
||||
msg_str = 'Update of apikey failed. Error: {0}'
|
||||
current_app.logger.error(msg_str.format(e))
|
||||
db.session.rollback() # fixed line
|
||||
raise e
|
||||
|
||||
def get_hashed_password(self, plain_text_password=None):
|
||||
# Hash a password for the first time
|
||||
|
@ -1,3 +1,4 @@
|
||||
import json
|
||||
import re
|
||||
import traceback
|
||||
from flask import current_app
|
||||
@ -19,7 +20,7 @@ class Domain(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
name = db.Column(db.String(255), index=True, unique=True)
|
||||
master = db.Column(db.String(128))
|
||||
type = db.Column(db.String(6), nullable=False)
|
||||
type = db.Column(db.String(8), nullable=False)
|
||||
serial = db.Column(db.BigInteger)
|
||||
notified_serial = db.Column(db.BigInteger)
|
||||
last_check = db.Column(db.Integer)
|
||||
@ -109,6 +110,22 @@ class Domain(db.Model):
|
||||
'Domain does not exist. ERROR: {0}'.format(e))
|
||||
return None
|
||||
|
||||
def search_idn_domains(self, search_string):
|
||||
"""
|
||||
Search for IDN domains using the provided search string.
|
||||
"""
|
||||
# Compile the regular expression pattern for matching IDN domain names
|
||||
idn_pattern = re.compile(r'^xn--')
|
||||
|
||||
# Search for domain names that match the IDN pattern
|
||||
idn_domains = [
|
||||
domain for domain in self.get_domains() if idn_pattern.match(domain)
|
||||
]
|
||||
|
||||
# Filter the search results based on the provided search string
|
||||
return [domain for domain in idn_domains if search_string in domain]
|
||||
|
||||
|
||||
def update(self):
|
||||
"""
|
||||
Fetch zones (domains) from PowerDNS and update into DB
|
||||
@ -142,9 +159,20 @@ class Domain(db.Model):
|
||||
current_app.logger.debug(traceback.format_exc())
|
||||
|
||||
# update/add new domain
|
||||
account_cache = {}
|
||||
for data in jdata:
|
||||
if 'account' in data:
|
||||
account_id = Account().get_id_by_name(data['account'])
|
||||
# if no account is set don't try to query db
|
||||
if data['account'] == '':
|
||||
find_account_id = None
|
||||
else:
|
||||
find_account_id = account_cache.get(data['account'])
|
||||
# if account was not queried in the past and hence not in cache
|
||||
if find_account_id is None:
|
||||
find_account_id = Account().get_id_by_name(data['account'])
|
||||
# add to cache
|
||||
account_cache[data['account']] = find_account_id
|
||||
account_id = find_account_id
|
||||
else:
|
||||
current_app.logger.debug(
|
||||
"No 'account' data found in API result - Unsupported PowerDNS version?"
|
||||
@ -208,7 +236,7 @@ class Domain(db.Model):
|
||||
Add a domain to power dns
|
||||
"""
|
||||
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY}
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY, 'Content-Type': 'application/json'}
|
||||
|
||||
domain_name = domain_name + '.'
|
||||
domain_ns = [ns + '.' for ns in domain_ns]
|
||||
@ -311,7 +339,7 @@ class Domain(db.Model):
|
||||
if not domain:
|
||||
return {'status': 'error', 'msg': 'Domain does not exist.'}
|
||||
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY}
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY, 'Content-Type': 'application/json'}
|
||||
|
||||
if soa_edit_api not in ["DEFAULT", "INCREASE", "EPOCH", "OFF"]:
|
||||
soa_edit_api = 'DEFAULT'
|
||||
@ -361,7 +389,7 @@ class Domain(db.Model):
|
||||
if not domain:
|
||||
return {'status': 'error', 'msg': 'Domain does not exist.'}
|
||||
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY}
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY, 'Content-Type': 'application/json'}
|
||||
|
||||
post_data = {"kind": kind, "masters": masters}
|
||||
|
||||
@ -421,7 +449,7 @@ class Domain(db.Model):
|
||||
if result['status'] == 'ok':
|
||||
history = History(msg='Add reverse lookup domain {0}'.format(
|
||||
domain_reverse_name),
|
||||
detail=str({
|
||||
detail=json.dumps({
|
||||
'domain_type': 'Master',
|
||||
'domain_master_ips': ''
|
||||
}),
|
||||
@ -681,7 +709,7 @@ class Domain(db.Model):
|
||||
"""
|
||||
domain = Domain.query.filter(Domain.name == domain_name).first()
|
||||
if domain:
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY}
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY, 'Content-Type': 'application/json'}
|
||||
try:
|
||||
# Enable API-RECTIFY for domain, BEFORE activating DNSSEC
|
||||
post_data = {"api_rectify": True}
|
||||
@ -747,7 +775,7 @@ class Domain(db.Model):
|
||||
"""
|
||||
domain = Domain.query.filter(Domain.name == domain_name).first()
|
||||
if domain:
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY}
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY, 'Content-Type': 'application/json'}
|
||||
try:
|
||||
# Deactivate DNSSEC
|
||||
jdata = utils.fetch_json(
|
||||
@ -806,7 +834,7 @@ class Domain(db.Model):
|
||||
else:
|
||||
return {'status': 'error', 'msg': 'This domain does not exist'}
|
||||
|
||||
def assoc_account(self, account_id):
|
||||
def assoc_account(self, account_id, update=True):
|
||||
"""
|
||||
Associate domain with a domain, specified by account id
|
||||
"""
|
||||
@ -821,7 +849,7 @@ class Domain(db.Model):
|
||||
if not domain:
|
||||
return {'status': False, 'msg': 'Domain does not exist'}
|
||||
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY}
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY, 'Content-Type': 'application/json'}
|
||||
|
||||
account_name = Account().get_name_by_id(account_id)
|
||||
|
||||
@ -842,7 +870,8 @@ class Domain(db.Model):
|
||||
current_app.logger.error(jdata['error'])
|
||||
return {'status': 'error', 'msg': jdata['error']}
|
||||
else:
|
||||
self.update()
|
||||
if update:
|
||||
self.update()
|
||||
msg_str = 'Account changed for domain {0} successfully'
|
||||
current_app.logger.info(msg_str.format(domain_name))
|
||||
return {'status': 'ok', 'msg': 'account changed successfully'}
|
||||
@ -879,3 +908,18 @@ class Domain(db.Model):
|
||||
DomainUser.user_id == user_id,
|
||||
AccountUser.user_id == user_id
|
||||
)).filter(Domain.id == self.id).first()
|
||||
|
||||
# Return None if this domain does not exist as record,
|
||||
# Return the parent domain that hold the record if exist
|
||||
def is_overriding(self, domain_name):
|
||||
upper_domain_name = '.'.join(domain_name.split('.')[1:])
|
||||
while upper_domain_name != '':
|
||||
if self.get_id_by_name(upper_domain_name.rstrip('.')) != None:
|
||||
upper_domain = self.get_domain_info(upper_domain_name)
|
||||
if 'rrsets' in upper_domain:
|
||||
for r in upper_domain['rrsets']:
|
||||
if domain_name.rstrip('.') in r['name'].rstrip('.'):
|
||||
current_app.logger.error('Domain already exists as a record: {} under domain: {}'.format(r['name'].rstrip('.'), upper_domain_name))
|
||||
return upper_domain_name
|
||||
upper_domain_name = '.'.join(upper_domain_name.split('.')[1:])
|
||||
return None
|
||||
|
@ -99,7 +99,7 @@ class Record(object):
|
||||
}
|
||||
|
||||
# Continue if the record is ready to be added
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY}
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY, 'Content-Type': 'application/json'}
|
||||
|
||||
try:
|
||||
jdata = utils.fetch_json(urljoin(
|
||||
@ -169,12 +169,12 @@ class Record(object):
|
||||
record['record_data'] = record['record_data'].replace('[ZONE]', domain_name)
|
||||
# Translate record name into punycode (IDN) as that's the only way
|
||||
# to convey non-ascii records to the dns server
|
||||
record['record_name'] = record['record_name'].encode('idna').decode()
|
||||
record['record_name'] = utils.to_idna(record["record_name"], "encode")
|
||||
#TODO: error handling
|
||||
# If the record is an alias (CNAME), we will also make sure that
|
||||
# the target domain is properly converted to punycode (IDN)
|
||||
if record["record_type"] == 'CNAME':
|
||||
record['record_data'] = record['record_data'].encode('idna').decode()
|
||||
if record['record_type'] == 'CNAME' or record['record_type'] == 'SOA':
|
||||
record['record_data'] = utils.to_idna(record['record_data'], 'encode')
|
||||
#TODO: error handling
|
||||
# If it is ipv6 reverse zone and PRETTY_IPV6_PTR is enabled,
|
||||
# We convert ipv6 address back to reverse record format
|
||||
@ -293,7 +293,7 @@ class Record(object):
|
||||
return new_rrsets, del_rrsets
|
||||
|
||||
def apply_rrsets(self, domain_name, rrsets):
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY}
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY, 'Content-Type': 'application/json'}
|
||||
jdata = utils.fetch_json(urljoin(
|
||||
self.PDNS_STATS_URL, self.API_EXTENDED_URL +
|
||||
'/servers/localhost/zones/{0}'.format(domain_name)),
|
||||
@ -303,10 +303,48 @@ class Record(object):
|
||||
data=rrsets)
|
||||
return jdata
|
||||
|
||||
@staticmethod
|
||||
def to_api_payload(new_rrsets, del_rrsets):
|
||||
"""Turn the given changes into a single api payload."""
|
||||
|
||||
def replace_for_api(rrset):
|
||||
"""Return a modified copy of the given RRset with changetype REPLACE."""
|
||||
if not rrset or rrset.get('changetype', None) != 'REPLACE':
|
||||
return rrset
|
||||
replace_copy = dict(rrset)
|
||||
# For compatibility with some backends: Remove comments from rrset if all are blank
|
||||
if not any((bool(c.get('content', None)) for c in replace_copy.get('comments', []))):
|
||||
replace_copy.pop('comments', None)
|
||||
return replace_copy
|
||||
|
||||
def rrset_in(needle, haystack):
|
||||
"""Return whether the given RRset (identified by name and type) is in the list."""
|
||||
for hay in haystack:
|
||||
if needle['name'] == hay['name'] and needle['type'] == hay['type']:
|
||||
return True
|
||||
return False
|
||||
|
||||
def delete_for_api(rrset):
|
||||
"""Return a minified copy of the given RRset with changetype DELETE."""
|
||||
if not rrset or rrset.get('changetype', None) != 'DELETE':
|
||||
return rrset
|
||||
delete_copy = dict(rrset)
|
||||
delete_copy.pop('ttl', None)
|
||||
delete_copy.pop('records', None)
|
||||
delete_copy.pop('comments', None)
|
||||
return delete_copy
|
||||
|
||||
replaces = [replace_for_api(r) for r in new_rrsets]
|
||||
deletes = [delete_for_api(r) for r in del_rrsets if not rrset_in(r, replaces)]
|
||||
return {
|
||||
# order matters: first deletions, then additions+changes
|
||||
'rrsets': deletes + replaces
|
||||
}
|
||||
|
||||
def apply(self, domain_name, submitted_records):
|
||||
"""
|
||||
Apply record changes to a domain. This function
|
||||
will make 2 calls to the PDNS API to DELETE and
|
||||
will make 1 call to the PDNS API to DELETE and
|
||||
REPLACE records (rrsets)
|
||||
"""
|
||||
current_app.logger.debug(
|
||||
@ -315,68 +353,24 @@ class Record(object):
|
||||
# Get the list of rrsets to be added and deleted
|
||||
new_rrsets, del_rrsets = self.compare(domain_name, submitted_records)
|
||||
|
||||
# Remove blank comments from rrsets for compatibility with some backends
|
||||
def remove_blank_comments(rrset):
|
||||
if not rrset['comments']:
|
||||
del rrset['comments']
|
||||
elif isinstance(rrset['comments'], list):
|
||||
# Merge all non-blank comment values into a list
|
||||
merged_comments = [
|
||||
v
|
||||
for c in rrset['comments']
|
||||
for v in c.values()
|
||||
if v
|
||||
]
|
||||
# Delete comment if all values are blank (len(merged_comments) == 0)
|
||||
if not merged_comments:
|
||||
del rrset['comments']
|
||||
|
||||
for r in new_rrsets['rrsets']:
|
||||
remove_blank_comments(r)
|
||||
|
||||
for r in del_rrsets['rrsets']:
|
||||
remove_blank_comments(r)
|
||||
# The history logic still needs *all* the deletes with full data to display a useful diff.
|
||||
# So create a "minified" copy for the api call, and return the original data back up
|
||||
api_payload = self.to_api_payload(new_rrsets['rrsets'], del_rrsets['rrsets'])
|
||||
current_app.logger.debug(f"api payload: \n{utils.pretty_json(api_payload)}")
|
||||
|
||||
# Submit the changes to PDNS API
|
||||
try:
|
||||
if del_rrsets["rrsets"]:
|
||||
result = self.apply_rrsets(domain_name, del_rrsets)
|
||||
if api_payload["rrsets"]:
|
||||
result = self.apply_rrsets(domain_name, api_payload)
|
||||
if 'error' in result.keys():
|
||||
current_app.logger.error(
|
||||
'Cannot apply record changes with deleting rrsets step. PDNS error: {}'
|
||||
'Cannot apply record changes. PDNS error: {}'
|
||||
.format(result['error']))
|
||||
return {
|
||||
'status': 'error',
|
||||
'msg': result['error'].replace("'", "")
|
||||
}
|
||||
|
||||
if new_rrsets["rrsets"]:
|
||||
result = self.apply_rrsets(domain_name, new_rrsets)
|
||||
if 'error' in result.keys():
|
||||
current_app.logger.error(
|
||||
'Cannot apply record changes with adding rrsets step. PDNS error: {}'
|
||||
.format(result['error']))
|
||||
|
||||
# rollback - re-add the removed record if the adding operation is failed.
|
||||
if del_rrsets["rrsets"]:
|
||||
rollback_rrests = del_rrsets
|
||||
for r in del_rrsets["rrsets"]:
|
||||
r['changetype'] = 'REPLACE'
|
||||
rollback = self.apply_rrsets(domain_name, rollback_rrests)
|
||||
if 'error' in rollback.keys():
|
||||
return dict(status='error',
|
||||
msg='Failed to apply changes. Cannot rollback previous failed operation: {}'
|
||||
.format(rollback['error'].replace("'", "")))
|
||||
else:
|
||||
return dict(status='error',
|
||||
msg='Failed to apply changes. Rolled back previous failed operation: {}'
|
||||
.format(result['error'].replace("'", "")))
|
||||
else:
|
||||
return {
|
||||
'status': 'error',
|
||||
'msg': result['error'].replace("'", "")
|
||||
}
|
||||
|
||||
self.auto_ptr(domain_name, new_rrsets, del_rrsets)
|
||||
self.update_db_serial(domain_name)
|
||||
current_app.logger.info('Record was applied successfully.')
|
||||
@ -500,7 +494,7 @@ class Record(object):
|
||||
"""
|
||||
Delete a record from domain
|
||||
"""
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY}
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY, 'Content-Type': 'application/json'}
|
||||
data = {
|
||||
"rrsets": [{
|
||||
"name": self.name.rstrip('.') + '.',
|
||||
@ -562,7 +556,7 @@ class Record(object):
|
||||
"""
|
||||
Update single record
|
||||
"""
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY}
|
||||
headers = {'X-API-Key': self.PDNS_API_KEY, 'Content-Type': 'application/json'}
|
||||
|
||||
data = {
|
||||
"rrsets": [{
|
||||
|
@ -28,7 +28,7 @@ class Setting(db.Model):
|
||||
'allow_user_create_domain': False,
|
||||
'allow_user_remove_domain': False,
|
||||
'allow_user_view_history': False,
|
||||
'delete_sso_accounts': False,
|
||||
'delete_sso_accounts': False,
|
||||
'bg_domain_updates': False,
|
||||
'enable_api_rr_history': True,
|
||||
'site_name': 'PowerDNS-Admin',
|
||||
@ -110,6 +110,7 @@ class Setting(db.Model):
|
||||
'oidc_oauth_email': 'email',
|
||||
'oidc_oauth_account_name_property': '',
|
||||
'oidc_oauth_account_description_property': '',
|
||||
'enforce_api_ttl': False,
|
||||
'forward_records_allow_edit': {
|
||||
'A': True,
|
||||
'AAAA': True,
|
||||
@ -189,7 +190,11 @@ class Setting(db.Model):
|
||||
'ttl_options': '1 minute,5 minutes,30 minutes,60 minutes,24 hours',
|
||||
'otp_field_enabled': True,
|
||||
'custom_css': '',
|
||||
'max_history_records': 1000
|
||||
'otp_force': False,
|
||||
'max_history_records': 1000,
|
||||
'deny_domain_override': False,
|
||||
'account_name_extra_chars': False,
|
||||
'gravatar_enabled': False,
|
||||
}
|
||||
|
||||
def __init__(self, id=None, name=None, value=None):
|
||||
@ -270,15 +275,15 @@ class Setting(db.Model):
|
||||
|
||||
def get(self, setting):
|
||||
if setting in self.defaults:
|
||||
|
||||
|
||||
if setting.upper() in current_app.config:
|
||||
result = current_app.config[setting.upper()]
|
||||
else:
|
||||
result = self.query.filter(Setting.name == setting).first()
|
||||
|
||||
|
||||
if result is not None:
|
||||
if hasattr(result,'value'):
|
||||
result = result.value
|
||||
result = result.value
|
||||
return strtobool(result) if result in [
|
||||
'True', 'False'
|
||||
] else result
|
||||
@ -286,7 +291,7 @@ class Setting(db.Model):
|
||||
return self.defaults[setting]
|
||||
else:
|
||||
current_app.logger.error('Unknown setting queried: {0}'.format(setting))
|
||||
|
||||
|
||||
def get_records_allow_to_edit(self):
|
||||
return list(
|
||||
set(self.get_forward_records_allow_to_edit() +
|
||||
|
@ -8,6 +8,9 @@ import ldap.filter
|
||||
from flask import current_app
|
||||
from flask_login import AnonymousUserMixin
|
||||
from sqlalchemy import orm
|
||||
import qrcode as qrc
|
||||
import qrcode.image.svg as qrc_svg
|
||||
from io import BytesIO
|
||||
|
||||
from .base import db
|
||||
from .role import Role
|
||||
@ -80,10 +83,7 @@ class User(db.Model):
|
||||
return False
|
||||
|
||||
def get_id(self):
|
||||
try:
|
||||
return unicode(self.id) # python 2
|
||||
except NameError:
|
||||
return str(self.id) # python 3
|
||||
return str(self.id)
|
||||
|
||||
def __repr__(self):
|
||||
return '<User {0}>'.format(self.username)
|
||||
@ -94,7 +94,7 @@ class User(db.Model):
|
||||
|
||||
def verify_totp(self, token):
|
||||
totp = pyotp.TOTP(self.otp_secret)
|
||||
return totp.verify(token)
|
||||
return totp.verify(token, valid_window = 5)
|
||||
|
||||
def get_hashed_password(self, plain_text_password=None):
|
||||
# Hash a password for the first time
|
||||
@ -107,9 +107,10 @@ class User(db.Model):
|
||||
|
||||
def check_password(self, hashed_password):
|
||||
# Check hashed password. Using bcrypt, the salt is saved into the hash itself
|
||||
if (self.plain_text_password):
|
||||
return bcrypt.checkpw(self.plain_text_password.encode('utf-8'),
|
||||
hashed_password.encode('utf-8'))
|
||||
if hasattr(self, "plain_text_password"):
|
||||
if self.plain_text_password != None:
|
||||
return bcrypt.checkpw(self.plain_text_password.encode('utf-8'),
|
||||
hashed_password.encode('utf-8'))
|
||||
return False
|
||||
|
||||
def get_user_info_by_id(self):
|
||||
@ -125,7 +126,6 @@ class User(db.Model):
|
||||
conn = ldap.initialize(Setting().get('ldap_uri'))
|
||||
conn.set_option(ldap.OPT_REFERRALS, ldap.OPT_OFF)
|
||||
conn.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
||||
conn.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
|
||||
conn.set_option(ldap.OPT_X_TLS_DEMAND, True)
|
||||
conn.set_option(ldap.OPT_DEBUG_LEVEL, 255)
|
||||
conn.protocol_version = ldap.VERSION3
|
||||
@ -171,28 +171,6 @@ class User(db.Model):
|
||||
current_app.logger.error(e)
|
||||
return False
|
||||
|
||||
def ad_recursive_groups(self, groupDN):
|
||||
"""
|
||||
Recursively list groups belonging to a group. It will allow checking deep in the Active Directory
|
||||
whether a user is allowed to enter or not
|
||||
"""
|
||||
LDAP_BASE_DN = Setting().get('ldap_base_dn')
|
||||
groupSearchFilter = "(&(objectcategory=group)(member=%s))" % ldap.filter.escape_filter_chars(
|
||||
groupDN)
|
||||
result = [groupDN]
|
||||
try:
|
||||
groups = self.ldap_search(groupSearchFilter, LDAP_BASE_DN)
|
||||
for group in groups:
|
||||
result += [group[0][0]]
|
||||
if 'memberOf' in group[0][1]:
|
||||
for member in group[0][1]['memberOf']:
|
||||
result += self.ad_recursive_groups(
|
||||
member.decode("utf-8"))
|
||||
return result
|
||||
except ldap.LDAPError as e:
|
||||
current_app.logger.exception("Recursive AD Group search error")
|
||||
return result
|
||||
|
||||
def is_validate(self, method, src_ip='', trust_user=False):
|
||||
"""
|
||||
Validate user credential
|
||||
@ -304,7 +282,17 @@ class User(db.Model):
|
||||
LDAP_USER_GROUP))
|
||||
return False
|
||||
elif LDAP_TYPE == 'ad':
|
||||
user_ldap_groups = []
|
||||
ldap_admin_group_filter, ldap_operator_group, ldap_user_group = "", "", ""
|
||||
if LDAP_ADMIN_GROUP:
|
||||
ldap_admin_group_filter = "(memberOf:1.2.840.113556.1.4.1941:={0})".format(LDAP_ADMIN_GROUP)
|
||||
if LDAP_OPERATOR_GROUP:
|
||||
ldap_operator_group = "(memberOf:1.2.840.113556.1.4.1941:={0})".format(LDAP_OPERATOR_GROUP)
|
||||
if LDAP_USER_GROUP:
|
||||
ldap_user_group = "(memberOf:1.2.840.113556.1.4.1941:={0})".format(LDAP_USER_GROUP)
|
||||
searchFilter = "(&({0}={1})(|{2}{3}{4}))".format(LDAP_FILTER_USERNAME, self.username,
|
||||
ldap_admin_group_filter,
|
||||
ldap_operator_group, ldap_user_group)
|
||||
ldap_result = self.ldap_search(searchFilter, LDAP_BASE_DN)
|
||||
user_ad_member_of = ldap_result[0][0][1].get(
|
||||
'memberOf')
|
||||
|
||||
@ -314,26 +302,21 @@ class User(db.Model):
|
||||
.format(self.username))
|
||||
return False
|
||||
|
||||
for group in [
|
||||
g.decode("utf-8")
|
||||
for g in user_ad_member_of
|
||||
]:
|
||||
user_ldap_groups += self.ad_recursive_groups(
|
||||
group)
|
||||
user_ad_member_of = [g.decode("utf-8") for g in user_ad_member_of]
|
||||
|
||||
if (LDAP_ADMIN_GROUP in user_ldap_groups):
|
||||
if (LDAP_ADMIN_GROUP in user_ad_member_of):
|
||||
role_name = 'Administrator'
|
||||
current_app.logger.info(
|
||||
'User {0} is part of the "{1}" group that allows admin access to PowerDNS-Admin'
|
||||
.format(self.username,
|
||||
LDAP_ADMIN_GROUP))
|
||||
elif (LDAP_OPERATOR_GROUP in user_ldap_groups):
|
||||
elif (LDAP_OPERATOR_GROUP in user_ad_member_of):
|
||||
role_name = 'Operator'
|
||||
current_app.logger.info(
|
||||
'User {0} is part of the "{1}" group that allows operator access to PowerDNS-Admin'
|
||||
.format(self.username,
|
||||
LDAP_OPERATOR_GROUP))
|
||||
elif (LDAP_USER_GROUP in user_ldap_groups):
|
||||
elif (LDAP_USER_GROUP in user_ad_member_of):
|
||||
current_app.logger.info(
|
||||
'User {0} is part of the "{1}" group that allows user access to PowerDNS-Admin'
|
||||
.format(self.username,
|
||||
@ -439,8 +422,12 @@ class User(db.Model):
|
||||
self.role_id = Role.query.filter_by(
|
||||
name='Administrator').first().id
|
||||
|
||||
self.password = self.get_hashed_password(
|
||||
self.plain_text_password) if self.plain_text_password else '*'
|
||||
if hasattr(self, "plain_text_password"):
|
||||
if self.plain_text_password != None:
|
||||
self.password = self.get_hashed_password(
|
||||
self.plain_text_password)
|
||||
else:
|
||||
self.password = '*'
|
||||
|
||||
if self.password and self.password != '*':
|
||||
self.password = self.password.decode("utf-8")
|
||||
@ -476,9 +463,10 @@ class User(db.Model):
|
||||
user.email = self.email
|
||||
|
||||
# store new password hash (only if changed)
|
||||
if self.plain_text_password:
|
||||
user.password = self.get_hashed_password(
|
||||
self.plain_text_password).decode("utf-8")
|
||||
if hasattr(self, "plain_text_password"):
|
||||
if self.plain_text_password != None:
|
||||
user.password = self.get_hashed_password(
|
||||
self.plain_text_password).decode("utf-8")
|
||||
|
||||
db.session.commit()
|
||||
return {'status': True, 'msg': 'User updated successfully'}
|
||||
@ -493,9 +481,11 @@ class User(db.Model):
|
||||
|
||||
user.firstname = self.firstname if self.firstname else user.firstname
|
||||
user.lastname = self.lastname if self.lastname else user.lastname
|
||||
user.password = self.get_hashed_password(
|
||||
self.plain_text_password).decode(
|
||||
"utf-8") if self.plain_text_password else user.password
|
||||
|
||||
if hasattr(self, "plain_text_password"):
|
||||
if self.plain_text_password != None:
|
||||
user.password = self.get_hashed_password(
|
||||
self.plain_text_password).decode("utf-8")
|
||||
|
||||
if self.email:
|
||||
# Can not update to a new email that
|
||||
@ -634,6 +624,13 @@ class User(db.Model):
|
||||
accounts.append(q[1])
|
||||
return accounts
|
||||
|
||||
def get_qrcode_value(self):
|
||||
img = qrc.make(self.get_totp_uri(),
|
||||
image_factory=qrc_svg.SvgPathImage)
|
||||
stream = BytesIO()
|
||||
img.save(stream)
|
||||
return stream.getvalue()
|
||||
|
||||
|
||||
def read_entitlements(self, key):
|
||||
"""
|
||||
@ -787,14 +784,11 @@ def get_role_names(roles):
|
||||
"""
|
||||
roles_list=[]
|
||||
for role in roles:
|
||||
roles_list.append(role.name)
|
||||
roles_list.append(role.name)
|
||||
return roles_list
|
||||
|
||||
|
||||
def getUserInfo(DomainsOrAccounts):
|
||||
current=[]
|
||||
for DomainOrAccount in DomainsOrAccounts:
|
||||
current.append(DomainOrAccount.name)
|
||||
return current
|
||||
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user