Add "passive" mode with --passive flag.

Every listmonk instance scans the DB periodically to look for
running campaigns to process. This made running multiple instances of
listmonk impractical as they would all pick up the same running
campaign and process them, resulting in duplicate e-mails.

This commit adds a `--passive` flag to the binary that runs listmonk
in a "passive" mode where campaign processing is disabled. This allows
multiple instances of listmonk to be run to handle different kinds of
requests if there is a requirement (scale/traffic?). It is important
to note that there should only be one non-passive instance running at
any given time. If distributed campaign processing is ever considered,
this will change.
This commit is contained in:
Kailash Nadh 2021-10-29 14:40:22 +05:30
parent 9dd8592fdd
commit 11010393d8
3 changed files with 26 additions and 8 deletions

View file

@ -99,11 +99,12 @@ func initFlags() {
f.Bool("install", false, "setup database (first time)")
f.Bool("idempotent", false, "make --install run only if the databse isn't already setup")
f.Bool("upgrade", false, "upgrade database to the current version")
f.Bool("version", false, "current version of the build")
f.Bool("version", false, "show current version of the build")
f.Bool("new-config", false, "generate sample config file")
f.String("static-dir", "", "(optional) path to directory with static files")
f.String("i18n-dir", "", "(optional) path to directory with i18n language files")
f.Bool("yes", false, "assume 'yes' to prompts during --install/upgrade")
f.Bool("passive", false, "run in passive mode where campaigns are not processed")
if err := f.Parse(os.Args[1:]); err != nil {
lo.Fatalf("error loading flags: %v", err)
}
@ -194,7 +195,7 @@ func initFS(appDir, frontendDir, staticDir, i18nDir string) stuffbin.FileSystem
// Default dir in cwd.
i18nDir = "i18n"
}
lo.Printf("will load i18n files from: %v", i18nDir)
lo.Printf("loading i18n files from: %v", i18nDir)
files = append(files, joinFSPaths(i18nDir, i18nFiles)...)
}
@ -203,7 +204,7 @@ func initFS(appDir, frontendDir, staticDir, i18nDir string) stuffbin.FileSystem
// Default dir in cwd.
staticDir = "static"
}
lo.Printf("will load static files from: %v", staticDir)
lo.Printf("loading static files from: %v", staticDir)
files = append(files, joinFSPaths(staticDir, staticFiles)...)
}
@ -352,6 +353,10 @@ func initCampaignManager(q *Queries, cs *constants, app *App) *manager.Manager {
lo.Fatal("app.message_rate should be at least 1")
}
if ko.Bool("passive") {
lo.Println("running in passive mode. won't process campaigns.")
}
return manager.New(manager.Config{
BatchSize: ko.Int("app.batch_size"),
Concurrency: ko.Int("app.concurrency"),
@ -368,6 +373,8 @@ func initCampaignManager(q *Queries, cs *constants, app *App) *manager.Manager {
SlidingWindow: ko.Bool("app.message_sliding_window"),
SlidingWindowDuration: ko.Duration("app.message_sliding_window_duration"),
SlidingWindowRate: ko.Int("app.message_sliding_window_rate"),
ScanInterval: time.Second * 5,
ScanCampaigns: !ko.Bool("passive"),
}, newManagerStore(q), campNotifCB, app.i18n, lo)
}

View file

@ -188,7 +188,7 @@ func main() {
// Start the campaign workers. The campaign batches (fetch from DB, push out
// messages) get processed at the specified interval.
go app.manager.Run(time.Second * 5)
go app.manager.Run()
// Start the app server.
srv := initHTTPServer(app)

View file

@ -98,8 +98,7 @@ type Message struct {
// Config has parameters for configuring the manager.
type Config struct {
// Number of subscribers to pull from the DB in a single iteration.
BatchSize int
BatchSize int
Concurrency int
MessageRate int
MaxSendErrors int
@ -115,6 +114,16 @@ type Config struct {
MessageURL string
ViewTrackURL string
UnsubHeader bool
// Interval to scan the DB for active campaign checkpoints.
ScanInterval time.Duration
// ScanCampaigns indicates whether this instance of manager will scan the DB
// for active campaigns and process them.
// This can be used to run multiple instances of listmonk
// (exposed to the internet, private etc.) where only one does campaign
// processing while the others handle other kinds of traffic.
ScanCampaigns bool
}
type msgError struct {
@ -234,8 +243,10 @@ func (m *Manager) HasRunningCampaigns() bool {
// subscribers and pushes messages to them for each queued campaign
// until all subscribers are exhausted, at which point, a campaign is marked
// as "finished".
func (m *Manager) Run(tick time.Duration) {
go m.scanCampaigns(tick)
func (m *Manager) Run() {
if m.cfg.ScanCampaigns {
go m.scanCampaigns(m.cfg.ScanInterval)
}
// Spawn N message workers.
for i := 0; i < m.cfg.Concurrency; i++ {