Fix race condition in immediate campaign pause-start actions.

Pausing a campaign and starting it within the interval where campaign manager
pipe cleanup happens would cause a campaign to be marked as finished. This
patch addresses that by only doing the 'finished' state update if the campaign
wasn't stopped (cancel, pause) manually.

Potentially closes #2278 as this is the only reproducible scenario where a
campaign prematurely gets marked as finished.
This commit is contained in:
Kailash Nadh 2025-04-24 22:39:44 +05:30
parent d3c8eca446
commit 81b62a4aa9
2 changed files with 29 additions and 10 deletions

View file

@ -289,8 +289,15 @@ func (m *Manager) Run() {
default:
}
} else {
// Mark the pseudo counter that's added in makePipe() that is used
// to force a wait on a pipe.
// The pipe is created with a +1 on the waitgroup pseudo counter
// so that it immediately waits. Subsequently, every message created
// is incremented in the counter in pipe.newMessage(), and when it's'
// processed (or ignored when a campaign is paused or cancelled),
// the count is's reduced in worker().
//
// This marks down the original non-message +1, causing the waitgroup
// to be released and the pipe to end, triggering the pg.Wait()
// in newPipe() that calls pipe.cleanup().
p.wg.Done()
}
}
@ -439,8 +446,9 @@ func (m *Manager) worker() {
return
}
// If the campaign has ended, ignore the message.
// If the campaign has ended or stopped, ignore the message.
if msg.pipe != nil && msg.pipe.stopped.Load() {
// Reduce the message counter on the pipe.
msg.pipe.wg.Done()
continue
}

View file

@ -60,7 +60,6 @@ func (m *Manager) newPipe(c *models.Campaign) (*pipe, error) {
// (successfully or skipped after errors or cancellation).
p.wg.Wait()
p.Stop(false)
p.cleanup()
}()
@ -75,13 +74,14 @@ func (m *Manager) newPipe(c *models.Campaign) (*pipe, error) {
// in the current batch or not. A false indicates that all subscribers
// have been processed, or that a campaign has been paused or cancelled.
func (p *pipe) NextSubscribers() (bool, error) {
// Fetch a batch of subscribers.
// Fetch the next batch of subscribers from a 'running' campaign.
subs, err := p.m.store.NextSubscribers(p.camp.ID, p.m.cfg.BatchSize)
if err != nil {
return false, fmt.Errorf("error fetching campaign subscribers (%s): %v", p.camp.Name, err)
}
// There are no subscribers.
// There are no subscribers from the query. Either all subscribers on the campaign
// have been processed, or the campaign has changed from 'running' to 'paused' or 'cancelled'.
if len(subs) == 0 {
return false, nil
}
@ -167,6 +167,9 @@ func (p *pipe) Stop(withErrors bool) {
p.stopped.Store(true)
}
// newMessage returns a campaign message while internally incrementing the
// number of messages in the pipe wait group so that the status of every
// message can be atomically tracked.
func (p *pipe) newMessage(s models.Subscriber) (CampaignMessage, error) {
msg, err := p.m.NewCampaignMessage(p.camp, s)
if err != nil {
@ -180,7 +183,8 @@ func (p *pipe) newMessage(s models.Subscriber) (CampaignMessage, error) {
}
// cleanup finishes the campaign and updates the campaign status in the DB
// and also triggers a notification to the admin.
// and also triggers a notification to the admin. This only triggers once
// a pipe's wg counter is fully exhausted, draining all messages in its queue.
func (p *pipe) cleanup() {
defer func() {
p.m.pipesMut.Lock()
@ -188,7 +192,7 @@ func (p *pipe) cleanup() {
p.m.pipesMut.Unlock()
}()
// Update campaign's "sent" count.
// Update campaign's 'sent count.
if err := p.m.store.UpdateCampaignCounts(p.camp.ID, 0, int(p.sent.Load()), int(p.lastID.Load())); err != nil {
p.m.log.Printf("error updating campaign counts (%s): %v", p.camp.Name, err)
}
@ -205,6 +209,13 @@ func (p *pipe) cleanup() {
return
}
// The campaign was manually stopped (pause, cancel).
if p.stopped.Load() {
p.m.log.Printf("stop processing campaign (%s)", p.camp.Name)
return
}
// Campaign wasn't manually stopped and subscribers were naturally exhausted.
// Fetch the up-to-date campaign status from the DB.
c, err := p.m.store.GetCampaign(p.camp.ID)
if err != nil {
@ -221,9 +232,9 @@ func (p *pipe) cleanup() {
p.m.log.Printf("campaign (%s) finished", p.camp.Name)
}
} else {
p.m.log.Printf("stop processing campaign (%s)", p.camp.Name)
p.m.log.Printf("finish processing campaign (%s)", p.camp.Name)
}
// Notify the admin.
// Notify admin.
_ = p.m.sendNotif(c, c.Status, "")
}