mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-06-14 12:06:06 +00:00
Provision PDA user privileges based On LDAP Attributes (#980)
This commit is contained in:

committed by
GitHub

parent
32983635c6
commit
6e04d0419b
@ -1,6 +1,7 @@
|
||||
import json
|
||||
import datetime
|
||||
import traceback
|
||||
import re
|
||||
from base64 import b64encode
|
||||
from ast import literal_eval
|
||||
from flask import Blueprint, render_template, make_response, url_for, current_app, request, redirect, jsonify, abort, flash, session
|
||||
@ -829,6 +830,27 @@ def setting_authentication():
|
||||
Setting().set('ldap_user_group',
|
||||
request.form.get('ldap_user_group'))
|
||||
Setting().set('ldap_domain', request.form.get('ldap_domain'))
|
||||
Setting().set(
|
||||
'autoprovisioning', True
|
||||
if request.form.get('autoprovisioning') == 'ON' else False)
|
||||
Setting().set('autoprovisioning_attribute',
|
||||
request.form.get('autoprovisioning_attribute'))
|
||||
|
||||
if request.form.get('autoprovisioning')=='ON':
|
||||
if validateURN(request.form.get('urn_value')):
|
||||
Setting().set('urn_value',
|
||||
request.form.get('urn_value'))
|
||||
else:
|
||||
return render_template('admin_setting_authentication.html',
|
||||
error="Invalid urn")
|
||||
else:
|
||||
Setting().set('urn_value',
|
||||
request.form.get('urn_value'))
|
||||
|
||||
Setting().set('purge', True
|
||||
if request.form.get('purge') == 'ON' else False)
|
||||
|
||||
|
||||
result = {'status': True, 'msg': 'Saved successfully'}
|
||||
elif conf_type == 'google':
|
||||
google_oauth_enabled = True if request.form.get(
|
||||
@ -1286,3 +1308,29 @@ def global_search():
|
||||
pass
|
||||
|
||||
return render_template('admin_global_search.html', domains=domains, records=records, comments=comments)
|
||||
|
||||
def validateURN(value):
|
||||
NID_PATTERN = re.compile(r'^[0-9a-z][0-9a-z-]{1,31}$', flags=re.IGNORECASE)
|
||||
NSS_PCHAR = '[a-z0-9-._~]|%[a-f0-9]{2}|[!$&\'()*+,;=]|:|@'
|
||||
NSS_PATTERN = re.compile(fr'^({NSS_PCHAR})({NSS_PCHAR}|/|\?)*$', re.IGNORECASE)
|
||||
|
||||
prefix=value.split(':')
|
||||
if (len(prefix)<3):
|
||||
current_app.logger.warning( "Too small urn prefix" )
|
||||
return False
|
||||
|
||||
urn=prefix[0]
|
||||
nid=prefix[1]
|
||||
nss=value.replace(urn+":"+nid+":", "")
|
||||
|
||||
if not urn.lower()=="urn":
|
||||
current_app.logger.warning( urn + ' contains invalid characters ' )
|
||||
return False
|
||||
if not re.match(NID_PATTERN, nid.lower()):
|
||||
current_app.logger.warning( nid + ' contains invalid characters ' )
|
||||
return False
|
||||
if not re.match(NSS_PATTERN, nss):
|
||||
current_app.logger.warning( nss + ' contains invalid characters ' )
|
||||
return False
|
||||
|
||||
return True
|
||||
|
@ -473,10 +473,39 @@ def login():
|
||||
saml_enabled=SAML_ENABLED,
|
||||
error='Token required')
|
||||
|
||||
if Setting().get('autoprovisioning') and auth_method!='LOCAL':
|
||||
urn_value=Setting().get('urn_value')
|
||||
Entitlements=user.read_entitlements(Setting().get('autoprovisioning_attribute'))
|
||||
if len(Entitlements)==0 and Setting().get('purge'):
|
||||
user.set_role("User")
|
||||
user.revoke_privilege(True)
|
||||
|
||||
elif len(Entitlements)!=0:
|
||||
if checkForPDAEntries(Entitlements, urn_value):
|
||||
user.updateUser(Entitlements)
|
||||
else:
|
||||
current_app.logger.warning('Not a single powerdns-admin record was found, possibly a typo in the prefix')
|
||||
if Setting().get('purge'):
|
||||
user.set_role("User")
|
||||
user.revoke_privilege(True)
|
||||
current_app.logger.warning('Procceding to revoke every privilige from ' + user.username + '.' )
|
||||
|
||||
login_user(user, remember=remember_me)
|
||||
signin_history(user.username, 'LOCAL', True)
|
||||
return redirect(session.get('next', url_for('index.index')))
|
||||
|
||||
def checkForPDAEntries(Entitlements, urn_value):
|
||||
"""
|
||||
Run through every record located in the ldap attribute given and determine if there are any valid powerdns-admin records
|
||||
"""
|
||||
urnArguments=[x.lower() for x in urn_value.split(':')]
|
||||
for Entitlement in Entitlements:
|
||||
entArguments=Entitlement.split(':powerdns-admin')
|
||||
entArguments=[x.lower() for x in entArguments[0].split(':')]
|
||||
if (entArguments==urnArguments):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def clear_session():
|
||||
session.pop('user_id', None)
|
||||
|
Reference in New Issue
Block a user