From 81b62a4aa967054f311002347e9c69897fb68210 Mon Sep 17 00:00:00 2001 From: Kailash Nadh Date: Thu, 24 Apr 2025 22:39:44 +0530 Subject: [PATCH] Fix race condition in immediate campaign pause-start actions. Pausing a campaign and starting it within the interval where campaign manager pipe cleanup happens would cause a campaign to be marked as finished. This patch addresses that by only doing the 'finished' state update if the campaign wasn't stopped (cancel, pause) manually. Potentially closes #2278 as this is the only reproducible scenario where a campaign prematurely gets marked as finished. --- internal/manager/manager.go | 14 +++++++++++--- internal/manager/pipe.go | 25 ++++++++++++++++++------- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/internal/manager/manager.go b/internal/manager/manager.go index b94528fe..588c7f59 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -289,8 +289,15 @@ func (m *Manager) Run() { default: } } else { - // Mark the pseudo counter that's added in makePipe() that is used - // to force a wait on a pipe. + // The pipe is created with a +1 on the waitgroup pseudo counter + // so that it immediately waits. Subsequently, every message created + // is incremented in the counter in pipe.newMessage(), and when it's' + // processed (or ignored when a campaign is paused or cancelled), + // the count is's reduced in worker(). + // + // This marks down the original non-message +1, causing the waitgroup + // to be released and the pipe to end, triggering the pg.Wait() + // in newPipe() that calls pipe.cleanup(). p.wg.Done() } } @@ -439,8 +446,9 @@ func (m *Manager) worker() { return } - // If the campaign has ended, ignore the message. + // If the campaign has ended or stopped, ignore the message. if msg.pipe != nil && msg.pipe.stopped.Load() { + // Reduce the message counter on the pipe. msg.pipe.wg.Done() continue } diff --git a/internal/manager/pipe.go b/internal/manager/pipe.go index 57a3a51e..68d401f2 100644 --- a/internal/manager/pipe.go +++ b/internal/manager/pipe.go @@ -60,7 +60,6 @@ func (m *Manager) newPipe(c *models.Campaign) (*pipe, error) { // (successfully or skipped after errors or cancellation). p.wg.Wait() - p.Stop(false) p.cleanup() }() @@ -75,13 +74,14 @@ func (m *Manager) newPipe(c *models.Campaign) (*pipe, error) { // in the current batch or not. A false indicates that all subscribers // have been processed, or that a campaign has been paused or cancelled. func (p *pipe) NextSubscribers() (bool, error) { - // Fetch a batch of subscribers. + // Fetch the next batch of subscribers from a 'running' campaign. subs, err := p.m.store.NextSubscribers(p.camp.ID, p.m.cfg.BatchSize) if err != nil { return false, fmt.Errorf("error fetching campaign subscribers (%s): %v", p.camp.Name, err) } - // There are no subscribers. + // There are no subscribers from the query. Either all subscribers on the campaign + // have been processed, or the campaign has changed from 'running' to 'paused' or 'cancelled'. if len(subs) == 0 { return false, nil } @@ -167,6 +167,9 @@ func (p *pipe) Stop(withErrors bool) { p.stopped.Store(true) } +// newMessage returns a campaign message while internally incrementing the +// number of messages in the pipe wait group so that the status of every +// message can be atomically tracked. func (p *pipe) newMessage(s models.Subscriber) (CampaignMessage, error) { msg, err := p.m.NewCampaignMessage(p.camp, s) if err != nil { @@ -180,7 +183,8 @@ func (p *pipe) newMessage(s models.Subscriber) (CampaignMessage, error) { } // cleanup finishes the campaign and updates the campaign status in the DB -// and also triggers a notification to the admin. +// and also triggers a notification to the admin. This only triggers once +// a pipe's wg counter is fully exhausted, draining all messages in its queue. func (p *pipe) cleanup() { defer func() { p.m.pipesMut.Lock() @@ -188,7 +192,7 @@ func (p *pipe) cleanup() { p.m.pipesMut.Unlock() }() - // Update campaign's "sent" count. + // Update campaign's 'sent count. if err := p.m.store.UpdateCampaignCounts(p.camp.ID, 0, int(p.sent.Load()), int(p.lastID.Load())); err != nil { p.m.log.Printf("error updating campaign counts (%s): %v", p.camp.Name, err) } @@ -205,6 +209,13 @@ func (p *pipe) cleanup() { return } + // The campaign was manually stopped (pause, cancel). + if p.stopped.Load() { + p.m.log.Printf("stop processing campaign (%s)", p.camp.Name) + return + } + + // Campaign wasn't manually stopped and subscribers were naturally exhausted. // Fetch the up-to-date campaign status from the DB. c, err := p.m.store.GetCampaign(p.camp.ID) if err != nil { @@ -221,9 +232,9 @@ func (p *pipe) cleanup() { p.m.log.Printf("campaign (%s) finished", p.camp.Name) } } else { - p.m.log.Printf("stop processing campaign (%s)", p.camp.Name) + p.m.log.Printf("finish processing campaign (%s)", p.camp.Name) } - // Notify the admin. + // Notify admin. _ = p.m.sendNotif(c, c.Status, "") }