mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-01-25 03:04:39 +00:00
2656242b45
I added the parentheses to the `db.session.rollback` line to call the method, which will now properly roll back any changes made to the database if an error occurs.
143 lines
5.2 KiB
Python
143 lines
5.2 KiB
Python
import secrets
|
|
import string
|
|
import bcrypt
|
|
from flask import current_app
|
|
|
|
from .base import db
|
|
from ..models.role import Role
|
|
from ..models.domain import Domain
|
|
from ..models.account import Account
|
|
|
|
class ApiKey(db.Model):
|
|
__tablename__ = "apikey"
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
key = db.Column(db.String(255), unique=True, nullable=False)
|
|
description = db.Column(db.String(255))
|
|
role_id = db.Column(db.Integer, db.ForeignKey('role.id'))
|
|
role = db.relationship('Role', back_populates="apikeys", lazy=True)
|
|
domains = db.relationship("Domain",
|
|
secondary="domain_apikey",
|
|
back_populates="apikeys")
|
|
accounts = db.relationship("Account",
|
|
secondary="apikey_account",
|
|
back_populates="apikeys")
|
|
|
|
def __init__(self, key=None, desc=None, role_name=None, domains=[], accounts=[]):
|
|
self.id = None
|
|
self.description = desc
|
|
self.role_name = role_name
|
|
self.domains[:] = domains
|
|
self.accounts[:] = accounts
|
|
if not key:
|
|
rand_key = ''.join(
|
|
secrets.choice(string.ascii_letters + string.digits)
|
|
for _ in range(15))
|
|
self.plain_key = rand_key
|
|
self.key = self.get_hashed_password(rand_key).decode('utf-8')
|
|
current_app.logger.debug("Hashed key: {0}".format(self.key))
|
|
else:
|
|
self.key = key
|
|
|
|
def create(self):
|
|
try:
|
|
self.role = Role.query.filter(Role.name == self.role_name).first()
|
|
db.session.add(self)
|
|
db.session.commit()
|
|
except Exception as e:
|
|
current_app.logger.error('Can not update api key table. Error: {0}'.format(e))
|
|
db.session.rollback()
|
|
raise e
|
|
|
|
def delete(self):
|
|
try:
|
|
db.session.delete(self)
|
|
db.session.commit()
|
|
except Exception as e:
|
|
msg_str = 'Can not delete api key template. Error: {0}'
|
|
current_app.logger.error(msg_str.format(e))
|
|
db.session.rollback()
|
|
raise e
|
|
|
|
def update(self, role_name=None, description=None, domains=None, accounts=None):
|
|
try:
|
|
if role_name:
|
|
role = Role.query.filter(Role.name == role_name).first()
|
|
self.role_id = role.id
|
|
|
|
if description:
|
|
self.description = description
|
|
|
|
if domains is not None:
|
|
domain_object_list = Domain.query \
|
|
.filter(Domain.name.in_(domains)) \
|
|
.all()
|
|
self.domains[:] = domain_object_list
|
|
|
|
if accounts is not None:
|
|
account_object_list = Account.query \
|
|
.filter(Account.name.in_(accounts)) \
|
|
.all()
|
|
self.accounts[:] = account_object_list
|
|
|
|
db.session.commit()
|
|
except Exception as e:
|
|
msg_str = 'Update of apikey failed. Error: {0}'
|
|
current_app.logger.error(msg_str.format(e))
|
|
db.session.rollback() # fixed line
|
|
raise e
|
|
|
|
def get_hashed_password(self, plain_text_password=None):
|
|
# Hash a password for the first time
|
|
# (Using bcrypt, the salt is saved into the hash itself)
|
|
if plain_text_password is None:
|
|
return plain_text_password
|
|
|
|
if plain_text_password:
|
|
pw = plain_text_password
|
|
else:
|
|
pw = self.plain_text_password
|
|
|
|
# The salt value is currently re-used here intentionally because
|
|
# the implementation relies on just the API key's value itself
|
|
# for database lookup: ApiKey.is_validate() would have no way of
|
|
# discerning whether any given key is valid if bcrypt.gensalt()
|
|
# was used. As far as is known, this is fine as long as the
|
|
# value of new API keys is randomly generated in a
|
|
# cryptographically secure fashion, as this then makes
|
|
# expendable as an exception the otherwise vital protection of
|
|
# proper salting as provided by bcrypt.gensalt().
|
|
return bcrypt.hashpw(pw.encode('utf-8'),
|
|
current_app.config.get('SALT').encode('utf-8'))
|
|
|
|
def check_password(self, hashed_password):
|
|
# Check hashed password. Using bcrypt,
|
|
# the salt is saved into the hash itself
|
|
if self.plain_text_password:
|
|
return bcrypt.checkpw(self.plain_text_password.encode('utf-8'),
|
|
hashed_password.encode('utf-8'))
|
|
return False
|
|
|
|
def is_validate(self, method, src_ip=''):
|
|
"""
|
|
Validate user credential
|
|
"""
|
|
if method == 'LOCAL':
|
|
passw_hash = self.get_hashed_password(self.plain_text_password)
|
|
apikey = ApiKey.query \
|
|
.filter(ApiKey.key == passw_hash.decode('utf-8')) \
|
|
.first()
|
|
|
|
if not apikey:
|
|
raise Exception("Unauthorized")
|
|
|
|
return apikey
|
|
|
|
def associate_account(self, account):
|
|
return True
|
|
|
|
def dissociate_account(self, account):
|
|
return True
|
|
|
|
def get_accounts(self):
|
|
return True
|