From 11010393d874572fe0704ab5a3b045399d5cf96e Mon Sep 17 00:00:00 2001 From: Kailash Nadh Date: Fri, 29 Oct 2021 14:40:22 +0530 Subject: [PATCH] Add "passive" mode with `--passive` flag. Every listmonk instance scans the DB periodically to look for running campaigns to process. This made running multiple instances of listmonk impractical as they would all pick up the same running campaign and process them, resulting in duplicate e-mails. This commit adds a `--passive` flag to the binary that runs listmonk in a "passive" mode where campaign processing is disabled. This allows multiple instances of listmonk to be run to handle different kinds of requests if there is a requirement (scale/traffic?). It is important to note that there should only be one non-passive instance running at any given time. If distributed campaign processing is ever considered, this will change. --- cmd/init.go | 13 ++++++++++--- cmd/main.go | 2 +- internal/manager/manager.go | 19 +++++++++++++++---- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/cmd/init.go b/cmd/init.go index c49e0bc2..44ff490a 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -99,11 +99,12 @@ func initFlags() { f.Bool("install", false, "setup database (first time)") f.Bool("idempotent", false, "make --install run only if the databse isn't already setup") f.Bool("upgrade", false, "upgrade database to the current version") - f.Bool("version", false, "current version of the build") + f.Bool("version", false, "show current version of the build") f.Bool("new-config", false, "generate sample config file") f.String("static-dir", "", "(optional) path to directory with static files") f.String("i18n-dir", "", "(optional) path to directory with i18n language files") f.Bool("yes", false, "assume 'yes' to prompts during --install/upgrade") + f.Bool("passive", false, "run in passive mode where campaigns are not processed") if err := f.Parse(os.Args[1:]); err != nil { lo.Fatalf("error loading flags: %v", err) } @@ -194,7 +195,7 @@ func initFS(appDir, frontendDir, staticDir, i18nDir string) stuffbin.FileSystem // Default dir in cwd. i18nDir = "i18n" } - lo.Printf("will load i18n files from: %v", i18nDir) + lo.Printf("loading i18n files from: %v", i18nDir) files = append(files, joinFSPaths(i18nDir, i18nFiles)...) } @@ -203,7 +204,7 @@ func initFS(appDir, frontendDir, staticDir, i18nDir string) stuffbin.FileSystem // Default dir in cwd. staticDir = "static" } - lo.Printf("will load static files from: %v", staticDir) + lo.Printf("loading static files from: %v", staticDir) files = append(files, joinFSPaths(staticDir, staticFiles)...) } @@ -352,6 +353,10 @@ func initCampaignManager(q *Queries, cs *constants, app *App) *manager.Manager { lo.Fatal("app.message_rate should be at least 1") } + if ko.Bool("passive") { + lo.Println("running in passive mode. won't process campaigns.") + } + return manager.New(manager.Config{ BatchSize: ko.Int("app.batch_size"), Concurrency: ko.Int("app.concurrency"), @@ -368,6 +373,8 @@ func initCampaignManager(q *Queries, cs *constants, app *App) *manager.Manager { SlidingWindow: ko.Bool("app.message_sliding_window"), SlidingWindowDuration: ko.Duration("app.message_sliding_window_duration"), SlidingWindowRate: ko.Int("app.message_sliding_window_rate"), + ScanInterval: time.Second * 5, + ScanCampaigns: !ko.Bool("passive"), }, newManagerStore(q), campNotifCB, app.i18n, lo) } diff --git a/cmd/main.go b/cmd/main.go index 739fa191..0fb251a0 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -188,7 +188,7 @@ func main() { // Start the campaign workers. The campaign batches (fetch from DB, push out // messages) get processed at the specified interval. - go app.manager.Run(time.Second * 5) + go app.manager.Run() // Start the app server. srv := initHTTPServer(app) diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 1cd9a63d..a4e3965a 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -98,8 +98,7 @@ type Message struct { // Config has parameters for configuring the manager. type Config struct { // Number of subscribers to pull from the DB in a single iteration. - BatchSize int - + BatchSize int Concurrency int MessageRate int MaxSendErrors int @@ -115,6 +114,16 @@ type Config struct { MessageURL string ViewTrackURL string UnsubHeader bool + + // Interval to scan the DB for active campaign checkpoints. + ScanInterval time.Duration + + // ScanCampaigns indicates whether this instance of manager will scan the DB + // for active campaigns and process them. + // This can be used to run multiple instances of listmonk + // (exposed to the internet, private etc.) where only one does campaign + // processing while the others handle other kinds of traffic. + ScanCampaigns bool } type msgError struct { @@ -234,8 +243,10 @@ func (m *Manager) HasRunningCampaigns() bool { // subscribers and pushes messages to them for each queued campaign // until all subscribers are exhausted, at which point, a campaign is marked // as "finished". -func (m *Manager) Run(tick time.Duration) { - go m.scanCampaigns(tick) +func (m *Manager) Run() { + if m.cfg.ScanCampaigns { + go m.scanCampaigns(m.cfg.ScanInterval) + } // Spawn N message workers. for i := 0; i < m.cfg.Concurrency; i++ {