Clean up main initialization to remove app interdepencies in init.

`App{}` now is used purely as a container for HTTP handlers.
This commit is contained in:
Kailash Nadh 2025-04-06 00:26:18 +05:30
parent 88489223c9
commit 78366ab7e4
9 changed files with 203 additions and 159 deletions

View file

@ -26,7 +26,7 @@ type campArchive struct {
// GetCampaignArchives renders the public campaign archives page.
func (a *App) GetCampaignArchives(c echo.Context) error {
// Get archives from the DB.
pg := a.paginator.NewFromURL(c.Request().URL.Query())
pg := a.pg.NewFromURL(c.Request().URL.Query())
camps, total, err := a.getCampaignArchives(pg.Offset, pg.Limit, false)
if err != nil {
return err
@ -52,7 +52,7 @@ func (a *App) GetCampaignArchives(c echo.Context) error {
// GetCampaignArchivesFeed renders the public campaign archives RSS feed.
func (a *App) GetCampaignArchivesFeed(c echo.Context) error {
var (
pg = a.paginator.NewFromURL(c.Request().URL.Query())
pg = a.pg.NewFromURL(c.Request().URL.Query())
showFullContent = a.cfg.EnablePublicArchiveRSSContent
)
@ -98,7 +98,7 @@ func (a *App) GetCampaignArchivesFeed(c echo.Context) error {
// CampaignArchivesPage renders the public campaign archives page.
func (a *App) CampaignArchivesPage(c echo.Context) error {
// Get archives from the DB.
pg := a.paginator.NewFromURL(c.Request().URL.Query())
pg := a.pg.NewFromURL(c.Request().URL.Query())
out, total, err := a.getCampaignArchives(pg.Offset, pg.Limit, false)
if err != nil {
return err

View file

@ -26,7 +26,7 @@ func (a *App) GetBounces(c echo.Context) error {
// Query and fetch bounces from the DB.
var (
pg = a.paginator.NewFromURL(c.Request().URL.Query())
pg = a.pg.NewFromURL(c.Request().URL.Query())
campID, _ = strconv.Atoi(c.QueryParam("campaign_id"))
source = c.FormValue("source")
orderBy = c.FormValue("order_by")

View file

@ -68,7 +68,7 @@ func (a *App) GetCampaigns(c echo.Context) error {
}
var (
pg = a.paginator.NewFromURL(c.Request().URL.Query())
pg = a.pg.NewFromURL(c.Request().URL.Query())
status = c.QueryParams()["status"]
tags = c.QueryParams()["tag"]

View file

@ -55,6 +55,7 @@ import (
const (
queryFilePath = "queries.sql"
emailMsgr = "email"
)
// UrlConfig contains various URL constants used in the app.
@ -394,8 +395,8 @@ func initUrlConfig(ko *koanf.Koanf) *UrlConfig {
return &UrlConfig{
RootURL: root,
LogoURL: path.Join(root, ko.String("app.logo_url")),
FaviconURL: path.Join(root, ko.String("app.favicon_url")),
LogoURL: ko.String("app.logo_url"),
FaviconURL: ko.String("app.favicon_url"),
LoginURL: path.Join(uriAdmin, "/login"),
// Static URLS.
@ -495,13 +496,37 @@ func initI18n(lang string, fs stuffbin.FileSystem) *i18n.I18n {
return i
}
// initCore initializes the CRUD DB core .
func initCore(fnNotify func(sub models.Subscriber, listIDs []int) (int, error), queries *models.Queries, db *sqlx.DB, i *i18n.I18n, ko *koanf.Koanf) *core.Core {
opt := &core.Opt{
Constants: core.Constants{
SendOptinConfirmation: ko.Bool("app.send_optin_confirmation"),
CacheSlowQueries: ko.Bool("app.cache_slow_queries"),
},
Queries: queries,
DB: db,
I18n: i,
Log: lo,
}
// Load bounce config.
if err := ko.Unmarshal("bounce.actions", &opt.Constants.BounceActions); err != nil {
lo.Fatalf("error unmarshalling bounce config: %v", err)
}
// Initialize the CRUD core.
return core.New(opt, &core.Hooks{
SendOptinConfirmation: fnNotify,
})
}
// initCampaignManager initializes the campaign manager.
func initCampaignManager(q *models.Queries, u *UrlConfig, co *core.Core, md media.Store, i *i18n.I18n) *manager.Manager {
func initCampaignManager(msgrs []manager.Messenger, q *models.Queries, u *UrlConfig, co *core.Core, md media.Store, i *i18n.I18n, ko *koanf.Koanf) *manager.Manager {
if ko.Bool("passive") {
lo.Println("running in passive mode. won't process campaigns.")
}
return manager.New(manager.Config{
mgr := manager.New(manager.Config{
BatchSize: ko.Int("app.batch_size"),
Concurrency: ko.Int("app.concurrency"),
MessageRate: ko.Int("app.message_rate"),
@ -522,6 +547,13 @@ func initCampaignManager(q *models.Queries, u *UrlConfig, co *core.Core, md medi
ScanInterval: time.Second * 5,
ScanCampaigns: !ko.Bool("passive"),
}, newManagerStore(q, co, md), i, lo)
// Attach all messengers to the campaign manager.
for _, m := range msgrs {
mgr.AddMessenger(m)
}
return mgr
}
// initTxTemplates initializes and compiles the transactional templates and caches them in-memory.
@ -807,7 +839,7 @@ func initAbout(q *models.Queries, db *sqlx.DB) about {
}
// initHTTPServer sets up and runs the app's main HTTP server and blocks forever.
func initHTTPServer(app *App) *echo.Echo {
func initHTTPServer(cfg *Config, urlCfg *UrlConfig, i *i18n.I18n, fs stuffbin.FileSystem, app *App) *echo.Echo {
// Initialize the HTTP server.
var srv = echo.New()
srv.HideBanner = true
@ -820,24 +852,24 @@ func initHTTPServer(app *App) *echo.Echo {
}
})
tpl, err := stuffbin.ParseTemplatesGlob(initTplFuncs(app.i18n, app.urlCfg), app.fs, "/public/templates/*.html")
tpl, err := stuffbin.ParseTemplatesGlob(initTplFuncs(i, urlCfg), fs, "/public/templates/*.html")
if err != nil {
lo.Fatalf("error parsing public templates: %v", err)
}
srv.Renderer = &tplRenderer{
templates: tpl,
SiteName: app.cfg.SiteName,
RootURL: app.urlCfg.RootURL,
LogoURL: app.urlCfg.LogoURL,
FaviconURL: app.urlCfg.FaviconURL,
AssetVersion: app.cfg.AssetVersion,
EnablePublicSubPage: app.cfg.EnablePublicSubPage,
EnablePublicArchive: app.cfg.EnablePublicArchive,
IndividualTracking: app.cfg.Privacy.IndividualTracking,
SiteName: cfg.SiteName,
RootURL: urlCfg.RootURL,
LogoURL: urlCfg.LogoURL,
FaviconURL: urlCfg.FaviconURL,
AssetVersion: cfg.AssetVersion,
EnablePublicSubPage: cfg.EnablePublicSubPage,
EnablePublicArchive: cfg.EnablePublicArchive,
IndividualTracking: cfg.Privacy.IndividualTracking,
}
// Initialize the static file server.
fSrv := app.fs.FileServer()
fSrv := fs.FileServer()
// Public (subscriber) facing static files.
srv.GET("/public/static/*", echo.WrapHandler(fSrv))
@ -876,11 +908,11 @@ func initCaptcha() *captcha.Captcha {
// initCron initializes the cron job for refreshing slow query cache.
func initCron(core *core.Core) {
func initCron(co *core.Core) {
c := cron.New()
_, err := c.Add(ko.MustString("app.cache_slow_queries_interval"), func() {
lo.Println("refreshing slow query cache")
_ = core.RefreshMatViews(true)
_ = co.RefreshMatViews(true)
lo.Println("done refreshing slow query cache")
})
if err != nil {
@ -925,19 +957,6 @@ func awaitReload(sigChan chan os.Signal, closerWait chan bool, closer func()) ch
return out
}
// joinFSPaths joins the given paths with the root path and returns the full paths.
func joinFSPaths(root string, paths []string) []string {
out := make([]string, 0, len(paths))
for _, p := range paths {
// real_path:stuffbin_alias
f := strings.Split(p, ":")
out = append(out, path.Join(root, f[0])+":"+f[1])
}
return out
}
// initTplFuncs returns a generic template func map with custom template
// functions and sprig template functions.
func initTplFuncs(i *i18n.I18n, u *UrlConfig) template.FuncMap {
@ -1038,3 +1057,16 @@ func initAuth(co *core.Core, db *sql.DB, ko *koanf.Koanf) (bool, *auth.Auth) {
return hasUsers, a
}
// joinFSPaths joins the given paths with the root path and returns the full paths.
func joinFSPaths(root string, paths []string) []string {
out := make([]string, 0, len(paths))
for _, p := range paths {
// real_path:stuffbin_alias
f := strings.Split(p, ":")
out = append(out, path.Join(root, f[0])+":"+f[1])
}
return out
}

View file

@ -14,7 +14,7 @@ import (
func (a *App) GetLists(c echo.Context) error {
var (
user = auth.GetUser(c)
pg = a.paginator.NewFromURL(c.Request().URL.Query())
pg = a.pg.NewFromURL(c.Request().URL.Query())
)
// Get the list IDs (or blanket permission) the user has access to.

View file

@ -31,33 +31,30 @@ import (
"github.com/knadh/stuffbin"
)
const (
emailMsgr = "email"
)
// App contains the "global" shared components, controllers and fields.
type App struct {
core *core.Core
fs stuffbin.FileSystem
db *sqlx.DB
queries *models.Queries
cfg *Config
urlCfg *UrlConfig
manager *manager.Manager
importer *subimporter.Importer
messengers []manager.Messenger
emailMessenger manager.Messenger
auth *auth.Auth
media media.Store
i18n *i18n.I18n
bounce *bounce.Manager
paginator *paginator.Paginator
captcha *captcha.Captcha
events *events.Events
optinNotifyHook func(models.Subscriber, []int) (int, error)
about about
log *log.Logger
bufLog *buflog.BufLog
cfg *Config
urlCfg *UrlConfig
fs stuffbin.FileSystem
db *sqlx.DB
queries *models.Queries
core *core.Core
manager *manager.Manager
messengers []manager.Messenger
emailMsgr manager.Messenger
importer *subimporter.Importer
auth *auth.Auth
media media.Store
bounce *bounce.Manager
captcha *captcha.Captcha
i18n *i18n.I18n
pg *paginator.Paginator
events *events.Events
log *log.Logger
bufLog *buflog.BufLog
about about
fnOptinNotify func(models.Subscriber, []int) (int, error)
// Channel for passing reload signals.
chReload chan os.Signal
@ -124,14 +121,15 @@ func init() {
// Load environment variables and merge into the loaded config.
if err := ko.Load(env.Provider("LISTMONK_", ".", func(s string) string {
return strings.Replace(strings.ToLower(
strings.TrimPrefix(s, "LISTMONK_")), "__", ".", -1)
return strings.Replace(strings.ToLower(strings.TrimPrefix(s, "LISTMONK_")), "__", ".", -1)
}), nil); err != nil {
lo.Fatalf("error loading config from env: %v", err)
}
// Connect to the database, load the filesystem to read SQL queries.
// Connect to the database.
db = initDB()
// Initialize the embedded filesystem with static assets.
fs = initFS(appDir, frontendDir, ko.String("static-dir"), ko.String("i18n-dir"))
// Installer mode? This runs before the SQL queries are loaded and prepared
@ -170,21 +168,101 @@ func init() {
}
func main() {
// Initialize the main app controller that wraps all of the app's
// components. This is passed around HTTP handlers.
var (
// Initialize static global config.
cfg = initConstConfig(ko)
// Initialize static URL config.
urlCfg = initUrlConfig(ko)
// Initialize i18n language map.
i18n = initI18n(ko.MustString("app.lang"), fs)
// Initialize the media store.
media = initMediaStore(ko)
fbOptinNotify = makeOptinNotifyHook(ko.Bool("app.send_optin_confirmation"), urlCfg, queries, i18n)
// Crud core.
core = initCore(fbOptinNotify, queries, db, i18n, ko)
// Initialize all messengers, SMTP and postback.
msgrs = append(initSMTPMessengers(), initPostbackMessengers(ko)...)
// Campaign manager.
mgr = initCampaignManager(msgrs, queries, urlCfg, core, media, i18n, ko)
// Bulk importer.
importer = initImporter(queries, db, core, i18n, ko)
// Initialize the auth manager.
hasUsers, auth = initAuth(core, db.DB, ko)
// Initialize the webhook/POP3 bounce processor.
bounce *bounce.Manager
emailMsgr *email.Emailer
chReload = make(chan os.Signal, 1)
)
// Initialize the bounce manager that processes bounces from webhooks and
// POP3 mailbox scanning.
if ko.Bool("bounce.enabled") {
bounce = initBounceManager(core.RecordBounce, queries.RecordBounce, lo, ko)
}
// Assign the default `email` messenger to the app.
for _, m := range msgrs {
if m.Name() == "email" {
emailMsgr = m.(*email.Emailer)
}
}
// Initialize the global admin/sub e-mail notifier.
initNotifs(fs, i18n, emailMsgr, urlCfg, ko)
// Initialize and cache tx templates in memory.
initTxTemplates(mgr, core)
// Initialize the bounce manager that processes bounces from webhooks and
// POP3 mailbox scanning.
if ko.Bool("bounce.enabled") {
go bounce.Run()
}
// Start cronjobs.
if ko.Bool("app.cache_slow_queries") {
initCron(core)
}
// Start the campaign manager workers. The campaign batches (fetch from DB, push out
// messages) get processed at the specified interval.
go mgr.Run()
// =========================================================================
// Initialize the App{} with all the global shared components, controllers and fields.
app := &App{
cfg: cfg,
urlCfg: urlCfg,
fs: fs,
db: db,
cfg: initConstConfig(ko),
urlCfg: initUrlConfig(ko),
media: initMediaStore(ko),
messengers: []manager.Messenger{},
log: lo,
bufLog: bufLog,
queries: queries,
core: core,
manager: mgr,
messengers: msgrs,
emailMsgr: emailMsgr,
importer: importer,
auth: auth,
media: media,
bounce: bounce,
captcha: initCaptcha(),
i18n: i18n,
log: lo,
events: evStream,
bufLog: bufLog,
paginator: paginator.New(paginator.Opt{
pg: paginator.New(paginator.Opt{
DefaultPerPage: 20,
MaxPerPage: 50,
NumPageNums: 10,
@ -192,107 +270,41 @@ func main() {
PerPageParam: "per_page",
AllowAll: true,
}),
fnOptinNotify: fbOptinNotify,
about: initAbout(queries, db),
chReload: chReload,
// If there are no users, then the app needs to prompt for new user setup.
needsUserSetup: !hasUsers,
}
// Load i18n language map.
app.i18n = initI18n(ko.MustString("app.lang"), fs)
cOpt := &core.Opt{
Constants: core.Constants{
SendOptinConfirmation: ko.Bool("app.send_optin_confirmation"),
CacheSlowQueries: ko.Bool("app.cache_slow_queries"),
},
Queries: queries,
DB: db,
I18n: app.i18n,
Log: lo,
}
// Load bounce config into the core.
if err := ko.Unmarshal("bounce.actions", &cOpt.Constants.BounceActions); err != nil {
lo.Fatalf("error unmarshalling bounce config: %v", err)
}
// Initialize the CRUD core.
optinNotify := makeOptinNotifyHook(ko.Bool("app.send_optin_confirmation"), app.urlCfg, queries, app.i18n)
app.optinNotifyHook = optinNotify
app.core = core.New(cOpt, &core.Hooks{SendOptinConfirmation: optinNotify})
app.queries = queries
app.manager = initCampaignManager(app.queries, app.urlCfg, app.core, app.media, app.i18n)
app.importer = initImporter(app.queries, db, app.core, app.i18n, ko)
hasUsers, auth := initAuth(app.core, db.DB, ko)
app.auth = auth
// If there are are no users in the DB who can login, the app has to prompt
// for new user setup.
app.needsUserSetup = !hasUsers
// Initialize the bounce manager that processes bounces from webhooks and
// POP3 mailbox scanning.
if ko.Bool("bounce.enabled") {
app.bounce = initBounceManager(app.core.RecordBounce, app.queries.RecordBounce, lo, ko)
go app.bounce.Run()
}
// Initialize the SMTP messengers.
app.messengers = initSMTPMessengers()
for _, m := range app.messengers {
if m.Name() == emailMsgr {
app.emailMessenger = m
}
}
// Initialize admin email notification templates.
initNotifs(app.fs, app.i18n, app.emailMessenger.(*email.Emailer), app.urlCfg, ko)
initTxTemplates(app.manager, app.core)
// Initialize any additional postback messengers.
app.messengers = append(app.messengers, initPostbackMessengers(ko)...)
// Attach all messengers to the campaign manager.
for _, m := range app.messengers {
app.manager.AddMessenger(m)
}
// Load system information.
app.about = initAbout(queries, db)
// Start cronjobs.
if cOpt.Constants.CacheSlowQueries {
initCron(app.core)
}
// Start the campaign workers. The campaign batches (fetch from DB, push out
// messages) get processed at the specified interval.
go app.manager.Run()
// Start the app server.
srv := initHTTPServer(app)
// Star the update checker.
if ko.Bool("app.check_updates") {
go app.checkUpdates(versionString, time.Hour*24)
}
// Start the app server.
srv := initHTTPServer(cfg, urlCfg, i18n, fs, app)
// =========================================================================
// Wait for the reload signal with a callback to gracefully shut down resources.
// The `wait` channel is passed to awaitReload to wait for the callback to finish
// within N seconds, or do a force reload.
app.chReload = make(chan os.Signal)
signal.Notify(app.chReload, syscall.SIGHUP)
signal.Notify(chReload, syscall.SIGHUP)
closerWait := make(chan bool)
<-awaitReload(app.chReload, closerWait, func() {
<-awaitReload(chReload, closerWait, func() {
// Stop the HTTP server.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
srv.Shutdown(ctx)
// Close the campaign manager.
app.manager.Close()
mgr.Close()
// Close the DB pool.
app.db.DB.Close()
db.Close()
// Close the messenger pool.
for _, m := range app.messengers {

View file

@ -155,7 +155,7 @@ func (a *App) GetMedia(c echo.Context) error {
// Get the media from the DB.
var (
pg = a.paginator.NewFromURL(c.Request().URL.Query())
pg = a.pg.NewFromURL(c.Request().URL.Query())
query = c.FormValue("query")
)
res, total, err := a.core.QueryMedia(a.cfg.MediaUpload.Provider, a.media, query, pg.Offset, pg.Limit)

View file

@ -575,7 +575,7 @@ func (a *App) SelfExportSubscriberData(c echo.Context) error {
// E-mail the data as a JSON attachment to the subscriber.
const fname = "data.json"
if err := a.emailMessenger.Push(models.Message{
if err := a.emailMsgr.Push(models.Message{
From: a.cfg.FromEmail,
To: []string{data.Email},
Subject: subject,

View file

@ -95,7 +95,7 @@ func (a *App) QuerySubscribers(c echo.Context) error {
subStatus = c.FormValue("subscription_status")
orderBy = c.FormValue("order_by")
order = c.FormValue("order")
pg = a.paginator.NewFromURL(c.Request().URL.Query())
pg = a.pg.NewFromURL(c.Request().URL.Query())
)
res, total, err := a.core.QuerySubscribers(query, listIDs, subStatus, order, orderBy, pg.Offset, pg.Limit)
if err != nil {
@ -264,7 +264,7 @@ func (a *App) SubscriberSendOptin(c echo.Context) error {
}
// Trigger the opt-in confirmation e-mail hook.
if _, err := a.optinNotifyHook(out, nil); err != nil {
if _, err := a.fnOptinNotify(out, nil); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, a.i18n.T("subscribers.errorSendingOptin"))
}
@ -657,7 +657,7 @@ func makeOptinNotifyHook(unsubHeader bool, u *UrlConfig, q *models.Queries, i *i
// Fetch double opt-in lists from the given list IDs.
// Get the list of subscription lists where the subscriber hasn't confirmed.
var lists = []models.List{}
if err := q.GetSubscriberLists.Select(&lists, sub.ID, "", pq.Array(listIDs), nil, models.SubscriptionStatusUnconfirmed, models.ListOptinDouble); err != nil {
if err := q.GetSubscriberLists.Select(&lists, sub.ID, nil, pq.Array(listIDs), nil, models.SubscriptionStatusUnconfirmed, models.ListOptinDouble); err != nil {
lo.Printf("error fetching lists for opt-in: %s", err)
return 0, err
}