Implement per account domain access

Added the possibility for assigning users to an account, providing access to all domains associated with that account automatically.

This makes management easier, especially in installations with lots of domains and lots of managing entities.

The old style per-domain permissions are still there and working as usual. The two methods work perfectly side-by-side and are analogous to "user" (per-domain) and "group" (account) permissions as we know them from Active Directory and such places.

(cherry picked from commit 34fbc634d2848a7f76dc89a03dd8c0604068cc17)
This commit is contained in:
Thomas M Steenholdt 2018-06-05 16:41:39 -02:00
parent a3a58f16a5
commit a4af4ad4b3
3 changed files with 131 additions and 16 deletions

View File

@ -355,25 +355,48 @@ class User(db.Model):
db.session.rollback()
return False
def get_account_query(self):
"""
Get query for account to which the user is associated.
"""
return db.session.query(Account) \
.outerjoin(AccountUser, Account.id==AccountUser.account_id) \
.filter(AccountUser.user_id==self.id)
def get_account(self):
"""
Get all accounts to which the user is associated.
"""
return self.get_account_query()
def get_domain_query(self):
return db.session.query(User, DomainUser, Domain) \
.filter(User.id == self.id) \
.filter(User.id == DomainUser.user_id) \
.filter(Domain.id == DomainUser.domain_id)
"""
Get query for domain to which the user has access permission.
This includes direct domain permission AND permission through
account membership
"""
return db.session.query(Domain) \
.outerjoin(DomainUser, Domain.id==DomainUser.domain_id) \
.outerjoin(Account, Domain.account_id==Account.id) \
.outerjoin(AccountUser, Account.id==AccountUser.account_id) \
.filter(db.or_(DomainUser.user_id==User.id, AccountUser.user_id==User.id)) \
.filter(User.id==self.id)
def get_domain(self):
"""
Get domains which user has permission to
access
"""
return [q[2] for q in self.get_domain_query()]
return self.get_domain_query()
def delete(self):
"""
Delete a user
"""
# revoke all user privileges first
# revoke all user privileges and account associations first
self.revoke_privilege()
for a in self.get_account():
a.revoke_privileges_by_id(self.id)
try:
User.query.filter(User.username == self.username).delete()
@ -516,8 +539,9 @@ class Account(db.Model):
"""
Delete an account
"""
# unassociate all domains first
# unassociate all domains and users first
self.unassociate_domains()
self.grant_privileges([])
try:
Account.query.filter(Account.name == self.name).delete()
@ -529,6 +553,56 @@ class Account(db.Model):
logging.error('Cannot delete account {0} from DB'.format(self.username))
return False
def get_user(self):
"""
Get users (id) associated with this account
"""
user_ids = []
query = db.session.query(AccountUser, Account).filter(User.id==AccountUser.user_id).filter(Account.id==AccountUser.account_id).filter(Account.name==self.name).all()
for q in query:
user_ids.append(q[0].user_id)
return user_ids
def grant_privileges(self, new_user_list):
"""
Reconfigure account_user table
"""
account_id = self.get_id_by_name(self.name)
account_user_ids = self.get_user()
new_user_ids = [u.id for u in User.query.filter(User.username.in_(new_user_list)).all()] if new_user_list else []
removed_ids = list(set(account_user_ids).difference(new_user_ids))
added_ids = list(set(new_user_ids).difference(account_user_ids))
try:
for uid in removed_ids:
AccountUser.query.filter(AccountUser.user_id == uid).filter(AccountUser.account_id==account_id).delete()
db.session.commit()
except:
db.session.rollback()
logging.error('Cannot revoke user privielges on account {0}'.format(self.name))
try:
for uid in added_ids:
au = AccountUser(account_id, uid)
db.session.add(au)
db.session.commit()
except:
db.session.rollback()
logging.error('Cannot grant user privileges to account {0}'.format(self.name))
def revoke_privileges_by_id(self, user_id):
"""
Remove a single user from prigilege list based on user_id
"""
new_uids = [u for u in self.get_user() if u != user_id]
users = []
for uid in new_uids:
users.append(User(id=uid).get_user_info_by_id().username)
self.grant_privileges(users)
class Role(db.Model):
id = db.Column(db.Integer, primary_key = True)
@ -1061,6 +1135,20 @@ class DomainUser(db.Model):
return '<Domain_User {0} {1}>'.format(self.domain_id, self.user_id)
class AccountUser(db.Model):
__tablename__ = 'account_user'
id = db.Column(db.Integer, primary_key = True)
account_id = db.Column(db.Integer, db.ForeignKey('account.id'), nullable = False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable = False)
def __init__(self, account_id, user_id):
self.account_id = account_id
self.user_id = user_id
def __repr__(self):
return '<Account_User {0} {1}>'.format(self.account_id, self.user_id)
class Record(object):
"""
This is not a model, it's just an object

View File

@ -67,6 +67,21 @@
<span class="fa fa-envelope form-control-feedback"></span>
</div>
</div>
<div class="box-header with-border">
<h3 class="box-title">Access Control</h3>
</div>
<div class="box-body">
<p>Users on the right have access to manage records in all domains
associated with the account.</p>
<p>Click on users to move between columns.</p>
<div class="form-group col-xs-2">
<select multiple="multiple" class="form-control" id="account_multi_user" name="account_multi_user">
{% for user in users %}
<option {% if user.id in account_user_ids %}selected{% endif %} value="{{ user.username }}">{{ user.username }}</option>
{% endfor %}
</select>
</div>
</div>
<div class="box-footer">
<button type="submit" class="btn btn-flat btn-primary">{% if create %}Create{% else %}Update{% endif %} Account</button>
</div>
@ -95,4 +110,9 @@
</div>
</div>
</section>
{% endblock %}
{% block extrascripts %}
<script>
$("#account_multi_user").multiSelect();
</script>
{% endblock %}

View File

@ -525,9 +525,6 @@ def dashboard_domains():
if length != -1:
domains = domains[start:start + length]
if current_user.role.name != 'Administrator':
domains = [d[2] for d in domains]
data = []
for domain in domains:
data.append([
@ -1179,31 +1176,40 @@ def admin_manageuser():
@login_required
@admin_role_required
def admin_editaccount(account_name=None):
users = User.query.all()
if request.method == 'GET':
if account_name is None:
return render_template('admin_editaccount.html', create=1)
return render_template('admin_editaccount.html', users=users, create=1)
else:
account = Account.query.filter(Account.name == account_name).first()
return render_template('admin_editaccount.html', account=account, create=0)
account_user_ids = account.get_user()
return render_template('admin_editaccount.html', account=account, account_user_ids=account_user_ids, users=users, create=0)
if request.method == 'POST':
fdata = request.form
new_user_list = request.form.getlist('account_multi_user')
# on POST, synthesize account and account_user_ids from form data
if not account_name:
account_name = fdata['accountname']
account = Account(name=account_name, description=fdata['accountdescription'], contact=fdata['accountcontact'], mail=fdata['accountmail'])
account_user_ids = []
for username in new_user_list:
userid = User(username=username).get_user_info_by_username().id
account_user_ids.append(userid)
create = int(fdata['create'])
if create:
# account __init__ sanitizes and lowercases the name, so to manage expectations
# we let the user reenter the name until it's not empty and it's valid (ignoring the case)
if account.name == "" or account.name != account_name.lower():
return render_template('admin_editaccount.html', account=account, create=create, invalid_accountname=True)
return render_template('admin_editaccount.html', account=account, account_user_ids=account_user_ids, users=users, create=create, invalid_accountname=True)
if Account.query.filter(Account.name == account_name).first():
return render_template('admin_editaccount.html', account=account, create=create, duplicate_accountname=True)
if Account.query.filter(Account.name == account.name).first():
return render_template('admin_editaccount.html', account=account, account_user_ids=account_user_ids, users=users, create=create, duplicate_accountname=True)
result = account.create_account()
history = History(msg='Create account {0}'.format(account.name), created_by=current_user.username)
@ -1213,10 +1219,11 @@ def admin_editaccount(account_name=None):
history = History(msg='Update account {0}'.format(account.name), created_by=current_user.username)
if result['status']:
account.grant_privileges(new_user_list)
history.add()
return redirect(url_for('admin_manageaccount'))
return render_template('admin_editaccount.html', account=account, create=create, error=result['msg'])
return render_template('admin_editaccount.html', account=account, account_user_ids=account_user_ids, users=users, create=create, error=result['msg'])
@app.route('/admin/manageaccount', methods=['GET', 'POST'])