app/shell.py
2023-11-21 16:42:18 +01:00

73 lines
2.1 KiB
Python

import flask_migrate
from IPython import embed
from sqlalchemy_utils import create_database, database_exists, drop_database
from app import models
from app.config import DB_URI
from app.db import Session
from app.log import LOG
from app.models import User, RecoveryCode
if False:
# noinspection PyUnreachableCode
def create_db():
if not database_exists(DB_URI):
LOG.d("db not exist, create database")
create_database(DB_URI)
# Create all tables
# Use flask-migrate instead of db.create_all()
flask_migrate.upgrade()
# noinspection PyUnreachableCode
def reset_db():
if database_exists(DB_URI):
drop_database(DB_URI)
create_db()
def change_password(user_id, new_password):
user = User.get(user_id)
user.set_password(new_password)
Session.commit()
def migrate_recovery_codes():
last_id = -1
while True:
recovery_codes = (
RecoveryCode.filter(RecoveryCode.id > last_id)
.order_by(RecoveryCode.id)
.limit(100)
.all()
)
batch_codes = len(recovery_codes)
old_codes = 0
new_codes = 0
last_code = None
last_code_id = None
for recovery_code in recovery_codes:
if len(recovery_code.code) == models._RECOVERY_CODE_LENGTH:
last_code = recovery_code.code
last_code_id = recovery_code.id
recovery_code.code = RecoveryCode._hash_code(recovery_code.code)
old_codes += 1
Session.flush()
else:
new_codes += 1
last_id = recovery_code.id
Session.commit()
LOG.i(
f"Updated {old_codes}/{batch_codes} for this batch ({new_codes} already updated)"
)
if last_code is not None:
recovery_code = RecoveryCode.get_by(id=last_code_id)
assert RecoveryCode._hash_code(last_code) == recovery_code.code
LOG.i("Check is Good")
if len(recovery_codes) == 0:
break
if __name__ == "__main__":
embed()