mirror of
https://github.com/simple-login/app.git
synced 2024-11-11 01:42:54 +08:00
461 lines
13 KiB
Python
461 lines
13 KiB
Python
import os
|
|
import ssl
|
|
|
|
import arrow
|
|
import flask_profiler
|
|
import sentry_sdk
|
|
from flask import Flask, redirect, url_for, render_template, request, jsonify
|
|
from flask_admin import Admin
|
|
from flask_cors import cross_origin
|
|
from flask_login import current_user
|
|
from sentry_sdk.integrations.flask import FlaskIntegration
|
|
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
|
|
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
|
|
|
|
|
|
from app import paddle_utils
|
|
from app.admin_model import SLModelView, SLAdminIndexView
|
|
from app.api.base import api_bp
|
|
from app.auth.base import auth_bp
|
|
from app.config import (
|
|
DEBUG,
|
|
DB_URI,
|
|
FLASK_SECRET,
|
|
SENTRY_DSN,
|
|
URL,
|
|
SHA1,
|
|
PADDLE_MONTHLY_PRODUCT_ID,
|
|
RESET_DB,
|
|
FLASK_PROFILER_PATH,
|
|
FLASK_PROFILER_PASSWORD,
|
|
SENTRY_FRONT_END_DSN,
|
|
)
|
|
from app.dashboard.base import dashboard_bp
|
|
from app.developer.base import developer_bp
|
|
from app.discover.base import discover_bp
|
|
from app.extensions import db, login_manager, migrate
|
|
from app.jose_utils import get_jwk_key
|
|
from app.log import LOG
|
|
from app.models import (
|
|
Client,
|
|
User,
|
|
ClientUser,
|
|
GenEmail,
|
|
RedirectUri,
|
|
Subscription,
|
|
PlanEnum,
|
|
ApiKey,
|
|
CustomDomain,
|
|
LifetimeCoupon,
|
|
Directory,
|
|
)
|
|
from app.monitor.base import monitor_bp
|
|
from app.oauth.base import oauth_bp
|
|
|
|
if SENTRY_DSN:
|
|
LOG.d("enable sentry")
|
|
sentry_sdk.init(
|
|
dsn=SENTRY_DSN,
|
|
integrations=[
|
|
FlaskIntegration(),
|
|
SqlalchemyIntegration(),
|
|
AioHttpIntegration(),
|
|
],
|
|
)
|
|
|
|
# the app is served behin nginx which uses http and not https
|
|
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
|
|
|
|
|
|
def create_app() -> Flask:
|
|
app = Flask(__name__)
|
|
app.url_map.strict_slashes = False
|
|
|
|
app.config["SQLALCHEMY_DATABASE_URI"] = DB_URI
|
|
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
|
app.secret_key = FLASK_SECRET
|
|
|
|
app.config["TEMPLATES_AUTO_RELOAD"] = True
|
|
|
|
# to avoid conflict with other cookie
|
|
app.config["SESSION_COOKIE_NAME"] = "slapp"
|
|
|
|
init_extensions(app)
|
|
register_blueprints(app)
|
|
set_index_page(app)
|
|
jinja2_filter(app)
|
|
|
|
setup_error_page(app)
|
|
|
|
setup_favicon_route(app)
|
|
setup_openid_metadata(app)
|
|
|
|
init_admin(app)
|
|
setup_paddle_callback(app)
|
|
setup_do_not_track(app)
|
|
|
|
if FLASK_PROFILER_PATH:
|
|
LOG.d("Enable flask-profiler")
|
|
app.config["flask_profiler"] = {
|
|
"enabled": True,
|
|
"storage": {"engine": "sqlite", "FILE": FLASK_PROFILER_PATH},
|
|
"basicAuth": {
|
|
"enabled": True,
|
|
"username": "admin",
|
|
"password": FLASK_PROFILER_PASSWORD,
|
|
},
|
|
"ignore": ["^/static/.*", "/git", "/exception"],
|
|
}
|
|
flask_profiler.init_app(app)
|
|
|
|
return app
|
|
|
|
|
|
def fake_data():
|
|
LOG.d("create fake data")
|
|
# Remove db if exist
|
|
if os.path.exists("db.sqlite"):
|
|
LOG.d("remove existing db file")
|
|
os.remove("db.sqlite")
|
|
|
|
# Create all tables
|
|
db.create_all()
|
|
|
|
# Create a user
|
|
user = User.create(
|
|
email="john@wick.com",
|
|
name="John Wick",
|
|
password="password",
|
|
activated=True,
|
|
is_admin=True,
|
|
otp_secret="base32secret3232",
|
|
)
|
|
db.session.commit()
|
|
|
|
LifetimeCoupon.create(code="coupon", nb_used=10)
|
|
db.session.commit()
|
|
|
|
# Create a subscription for user
|
|
Subscription.create(
|
|
user_id=user.id,
|
|
cancel_url="https://checkout.paddle.com/subscription/cancel?user=1234",
|
|
update_url="https://checkout.paddle.com/subscription/update?user=1234",
|
|
subscription_id="123",
|
|
event_time=arrow.now(),
|
|
next_bill_date=arrow.now().shift(days=10).date(),
|
|
plan=PlanEnum.monthly,
|
|
)
|
|
db.session.commit()
|
|
|
|
api_key = ApiKey.create(user_id=user.id, name="Chrome")
|
|
api_key.code = "code"
|
|
|
|
GenEmail.create_new(user.id, "e1@")
|
|
GenEmail.create_new(user.id, "e2@")
|
|
GenEmail.create_new(user.id, "e3@")
|
|
|
|
CustomDomain.create(user_id=user.id, domain="ab.cd", verified=True)
|
|
CustomDomain.create(
|
|
user_id=user.id, domain="very-long-domain.com.net.org", verified=True
|
|
)
|
|
db.session.commit()
|
|
|
|
Directory.create(user_id=user.id, name="abcd")
|
|
Directory.create(user_id=user.id, name="xyzt")
|
|
db.session.commit()
|
|
|
|
# Create a client
|
|
client1 = Client.create_new(name="Demo", user_id=user.id)
|
|
client1.oauth_client_id = "client-id"
|
|
client1.oauth_client_secret = "client-secret"
|
|
client1.published = True
|
|
db.session.commit()
|
|
|
|
RedirectUri.create(client_id=client1.id, uri="https://ab.com")
|
|
|
|
client2 = Client.create_new(name="Demo 2", user_id=user.id)
|
|
client2.oauth_client_id = "client-id2"
|
|
client2.oauth_client_secret = "client-secret2"
|
|
client2.published = True
|
|
db.session.commit()
|
|
|
|
db.session.commit()
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
user = User.query.get(user_id)
|
|
|
|
return user
|
|
|
|
|
|
def register_blueprints(app: Flask):
|
|
app.register_blueprint(auth_bp)
|
|
app.register_blueprint(monitor_bp)
|
|
app.register_blueprint(dashboard_bp)
|
|
app.register_blueprint(developer_bp)
|
|
|
|
app.register_blueprint(oauth_bp, url_prefix="/oauth")
|
|
app.register_blueprint(oauth_bp, url_prefix="/oauth2")
|
|
|
|
app.register_blueprint(discover_bp)
|
|
app.register_blueprint(api_bp)
|
|
|
|
|
|
def set_index_page(app):
|
|
@app.route("/", methods=["GET", "POST"])
|
|
def index():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for("dashboard.index"))
|
|
else:
|
|
return redirect(url_for("auth.login"))
|
|
|
|
@app.after_request
|
|
def after_request(res):
|
|
# not logging /static call
|
|
if (
|
|
not request.path.startswith("/static")
|
|
and not request.path.startswith("/admin/static")
|
|
and not request.path.startswith("/_debug_toolbar")
|
|
):
|
|
LOG.debug(
|
|
"%s %s %s %s %s",
|
|
request.remote_addr,
|
|
request.method,
|
|
request.path,
|
|
request.args,
|
|
res.status_code,
|
|
)
|
|
|
|
res.headers["X-Frame-Options"] = "deny"
|
|
return res
|
|
|
|
|
|
def setup_openid_metadata(app):
|
|
@app.route("/.well-known/openid-configuration")
|
|
@cross_origin()
|
|
def openid_config():
|
|
res = {
|
|
"issuer": URL,
|
|
"authorization_endpoint": URL + "/oauth2/authorize",
|
|
"token_endpoint": URL + "/oauth2/token",
|
|
"userinfo_endpoint": URL + "/oauth2/userinfo",
|
|
"jwks_uri": URL + "/jwks",
|
|
"response_types_supported": [
|
|
"code",
|
|
"token",
|
|
"id_token",
|
|
"id_token token",
|
|
"id_token code",
|
|
],
|
|
"subject_types_supported": ["public"],
|
|
"id_token_signing_alg_values_supported": ["RS256"],
|
|
# todo: add introspection and revocation endpoints
|
|
# "introspection_endpoint": URL + "/oauth2/token/introspection",
|
|
# "revocation_endpoint": URL + "/oauth2/token/revocation",
|
|
}
|
|
|
|
return jsonify(res)
|
|
|
|
@app.route("/jwks")
|
|
@cross_origin()
|
|
def jwks():
|
|
res = {"keys": [get_jwk_key()]}
|
|
return jsonify(res)
|
|
|
|
|
|
def setup_error_page(app):
|
|
@app.errorhandler(400)
|
|
def page_not_found(e):
|
|
return render_template("error/400.html"), 400
|
|
|
|
@app.errorhandler(401)
|
|
def page_not_found(e):
|
|
return render_template("error/401.html", current_url=request.full_path), 401
|
|
|
|
@app.errorhandler(403)
|
|
def page_not_found(e):
|
|
return render_template("error/403.html"), 403
|
|
|
|
@app.errorhandler(404)
|
|
def page_not_found(e):
|
|
return render_template("error/404.html"), 404
|
|
|
|
@app.errorhandler(Exception)
|
|
def error_handler(e):
|
|
LOG.exception(e)
|
|
return render_template("error/500.html"), 500
|
|
|
|
|
|
def setup_favicon_route(app):
|
|
@app.route("/favicon.ico")
|
|
def favicon():
|
|
return redirect("/static/favicon.ico")
|
|
|
|
|
|
def jinja2_filter(app):
|
|
def format_datetime(value):
|
|
dt = arrow.get(value)
|
|
return dt.humanize()
|
|
|
|
app.jinja_env.filters["dt"] = format_datetime
|
|
|
|
@app.context_processor
|
|
def inject_stage_and_region():
|
|
return dict(
|
|
YEAR=arrow.now().year,
|
|
URL=URL,
|
|
SENTRY_DSN=SENTRY_FRONT_END_DSN,
|
|
VERSION=SHA1,
|
|
)
|
|
|
|
|
|
def setup_paddle_callback(app: Flask):
|
|
@app.route("/paddle", methods=["GET", "POST"])
|
|
def paddle():
|
|
LOG.debug(
|
|
"paddle callback %s %s %s %s %s",
|
|
request.form.get("alert_name"),
|
|
request.form.get("email"),
|
|
request.form.get("customer_name"),
|
|
request.form.get("subscription_id"),
|
|
request.form.get("subscription_plan_id"),
|
|
)
|
|
|
|
# make sure the request comes from Paddle
|
|
if not paddle_utils.verify_incoming_request(dict(request.form)):
|
|
LOG.error(
|
|
"request not coming from paddle. Request data:%s", dict(request.form)
|
|
)
|
|
return "KO", 400
|
|
|
|
if (
|
|
request.form.get("alert_name") == "subscription_created"
|
|
): # new user subscribes
|
|
user_email = request.form.get("email")
|
|
user = User.get_by(email=user_email)
|
|
|
|
if (
|
|
int(request.form.get("subscription_plan_id"))
|
|
== PADDLE_MONTHLY_PRODUCT_ID
|
|
):
|
|
plan = PlanEnum.monthly
|
|
else:
|
|
plan = PlanEnum.yearly
|
|
|
|
sub = Subscription.get_by(user_id=user.id)
|
|
|
|
if not sub:
|
|
LOG.d("create a new sub")
|
|
Subscription.create(
|
|
user_id=user.id,
|
|
cancel_url=request.form.get("cancel_url"),
|
|
update_url=request.form.get("update_url"),
|
|
subscription_id=request.form.get("subscription_id"),
|
|
event_time=arrow.now(),
|
|
next_bill_date=arrow.get(
|
|
request.form.get("next_bill_date"), "YYYY-MM-DD"
|
|
).date(),
|
|
plan=plan,
|
|
)
|
|
else:
|
|
LOG.d("update existing sub %s", sub)
|
|
sub.cancel_url = request.form.get("cancel_url")
|
|
sub.update_url = request.form.get("update_url")
|
|
sub.subscription_id = request.form.get("subscription_id")
|
|
sub.event_time = arrow.now()
|
|
sub.next_bill_date = arrow.get(
|
|
request.form.get("next_bill_date"), "YYYY-MM-DD"
|
|
).date()
|
|
sub.plan = plan
|
|
|
|
LOG.debug("User %s upgrades!", user)
|
|
|
|
db.session.commit()
|
|
|
|
elif request.form.get("alert_name") == "subscription_updated":
|
|
subscription_id = request.form.get("subscription_id")
|
|
LOG.debug("Update subscription %s", subscription_id)
|
|
|
|
sub: Subscription = Subscription.get_by(subscription_id=subscription_id)
|
|
sub.event_time = arrow.now()
|
|
sub.next_bill_date = arrow.get(
|
|
request.form.get("next_bill_date"), "YYYY-MM-DD"
|
|
).date()
|
|
|
|
db.session.commit()
|
|
|
|
elif request.form.get("alert_name") == "subscription_cancelled":
|
|
subscription_id = request.form.get("subscription_id")
|
|
LOG.error("Cancel subscription %s", subscription_id)
|
|
|
|
sub: Subscription = Subscription.get_by(subscription_id=subscription_id)
|
|
if sub:
|
|
sub.cancelled = True
|
|
db.session.commit()
|
|
|
|
return "OK"
|
|
|
|
|
|
def init_extensions(app: Flask):
|
|
LOG.debug("init extensions")
|
|
login_manager.init_app(app)
|
|
db.init_app(app)
|
|
migrate.init_app(app)
|
|
|
|
|
|
def init_admin(app):
|
|
admin = Admin(name="SimpleLogin", template_mode="bootstrap3")
|
|
|
|
admin.init_app(app, index_view=SLAdminIndexView())
|
|
admin.add_view(SLModelView(User, db.session))
|
|
admin.add_view(SLModelView(Client, db.session))
|
|
admin.add_view(SLModelView(GenEmail, db.session))
|
|
admin.add_view(SLModelView(ClientUser, db.session))
|
|
|
|
|
|
def setup_do_not_track(app):
|
|
@app.route("/dnt")
|
|
def do_not_track():
|
|
return """
|
|
<script src="/static/local-storage-polyfill.js"></script>
|
|
|
|
<script>
|
|
// Disable GoatCounter if this script is called
|
|
|
|
store.set('goatcounter-ignore', 't');
|
|
|
|
alert("GoatCounter disabled");
|
|
|
|
window.location.href = "/";
|
|
|
|
</script>
|
|
"""
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app = create_app()
|
|
|
|
# enable flask toolbar
|
|
# app.config["DEBUG_TB_PROFILER_ENABLED"] = True
|
|
# app.config["DEBUG_TB_INTERCEPT_REDIRECTS"] = False
|
|
#
|
|
# toolbar = DebugToolbarExtension(app)
|
|
|
|
# enable to print all queries generated by sqlalchemy
|
|
# app.config["SQLALCHEMY_ECHO"] = True
|
|
|
|
# warning: only used in local
|
|
if RESET_DB:
|
|
LOG.warning("reset db, add fake data")
|
|
with app.app_context():
|
|
fake_data()
|
|
|
|
if URL.startswith("https"):
|
|
LOG.d("enable https")
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
|
|
context.load_cert_chain("local_data/cert.pem", "local_data/key.pem")
|
|
|
|
app.run(debug=True, host="0.0.0.0", port=7777, ssl_context=context)
|
|
else:
|
|
app.run(debug=True, host="0.0.0.0", port=7777)
|