mirror of
https://github.com/cwinfo/powerdns-admin.git
synced 2025-07-02 04:16:19 +00:00
Add OTP authentication feature
This commit is contained in:
@ -1,10 +1,12 @@
|
||||
import os
|
||||
import os
|
||||
import ldap
|
||||
import time
|
||||
import base64
|
||||
import bcrypt
|
||||
import urlparse
|
||||
import itertools
|
||||
import traceback
|
||||
import onetimepass
|
||||
|
||||
from datetime import datetime
|
||||
from distutils.version import StrictVersion
|
||||
@ -51,9 +53,10 @@ class User(db.Model):
|
||||
lastname = db.Column(db.String(64))
|
||||
email = db.Column(db.String(128))
|
||||
avatar = db.Column(db.String(128))
|
||||
otp_secret = db.Column(db.String(16))
|
||||
role_id = db.Column(db.Integer, db.ForeignKey('role.id'))
|
||||
|
||||
def __init__(self, id=None, username=None, password=None, plain_text_password=None, firstname=None, lastname=None, role_id=None, email=None, avatar=None, reload_info=True):
|
||||
def __init__(self, id=None, username=None, password=None, plain_text_password=None, firstname=None, lastname=None, role_id=None, email=None, avatar=None, otp_secret=None, reload_info=True):
|
||||
self.id = id
|
||||
self.username = username
|
||||
self.password = password
|
||||
@ -63,6 +66,7 @@ class User(db.Model):
|
||||
self.role_id = role_id
|
||||
self.email = email
|
||||
self.avatar = avatar
|
||||
self.otp_secret = otp_secret
|
||||
|
||||
if reload_info:
|
||||
user_info = self.get_user_info_by_id() if id else self.get_user_info_by_username()
|
||||
@ -74,6 +78,7 @@ class User(db.Model):
|
||||
self.lastname = user_info.lastname
|
||||
self.email = user_info.email
|
||||
self.role_id = user_info.role_id
|
||||
self.otp_secret = user_info.otp_secret
|
||||
|
||||
def is_authenticated(self):
|
||||
return True
|
||||
@ -93,6 +98,12 @@ class User(db.Model):
|
||||
def __repr__(self):
|
||||
return '<User %r>' % (self.username)
|
||||
|
||||
def get_totp_uri(self):
|
||||
return 'otpauth://totp/PowerDNS-Admin:%s?secret=%s&issuer=PowerDNS-Admin' % (self.username, self.otp_secret)
|
||||
|
||||
def verify_totp(self, token):
|
||||
return onetimepass.valid_totp(token, self.otp_secret)
|
||||
|
||||
def get_hashed_password(self, plain_text_password=None):
|
||||
# Hash a password for the first time
|
||||
# (Using bcrypt, the salt is saved into the hash itself)
|
||||
@ -260,7 +271,7 @@ class User(db.Model):
|
||||
except Exception, e:
|
||||
raise
|
||||
|
||||
def update_profile(self):
|
||||
def update_profile(self, enable_otp=None):
|
||||
"""
|
||||
Update user profile
|
||||
"""
|
||||
@ -277,6 +288,16 @@ class User(db.Model):
|
||||
if self.avatar:
|
||||
user.avatar = self.avatar
|
||||
|
||||
if enable_otp == True:
|
||||
# generate the opt secret key
|
||||
user.otp_secret = base64.b32encode(os.urandom(10)).decode('utf-8')
|
||||
elif enable_otp == False:
|
||||
# set otp_secret="" means we want disable the otp authenticaion.
|
||||
user.otp_secret = ""
|
||||
else:
|
||||
# do nothing.
|
||||
pass
|
||||
|
||||
try:
|
||||
db.session.commit()
|
||||
return True
|
||||
|
Reference in New Issue
Block a user