Add accurate sent count, last sent subscriber tracking on campaigns.

- Sent count is no longer the batch size fetched from the DB but is
  the actual count of messages sent.
- Pausing and resuming now accurately tracks the last subscriber that
  was processed and resumes from there.
- Fix multiple concurrent campaigns blocking.

Closes #1616. Closes #905. Closes #1496. Closes #1250. Closes #1010.
This commit is contained in:
Kailash Nadh 2023-12-22 19:35:53 +05:30
parent 414c5c0c99
commit 772476c0df
4 changed files with 68 additions and 16 deletions

View file

@ -28,10 +28,12 @@ func newManagerStore(q *models.Queries, c *core.Core, m media.Store) *store {
}
}
// NextCampaigns retrieves active campaigns ready to be processed.
func (s *store) NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error) {
// NextCampaigns retrieves active campaigns ready to be processed excluding
// campaigns that are also being processed. Additionally, it takes a map of campaignID:sentCount
// of campaigns that are being processed and updates them in the DB.
func (s *store) NextCampaigns(currentIDs []int64, sentCounts []int64) ([]*models.Campaign, error) {
var out []*models.Campaign
err := s.queries.NextCampaigns.Select(&out, pq.Int64Array(excludeIDs))
err := s.queries.NextCampaigns.Select(&out, pq.Int64Array(currentIDs), pq.Int64Array(sentCounts))
return out, err
}
@ -58,6 +60,12 @@ func (s *store) UpdateCampaignStatus(campID int, status string) error {
return err
}
// UpdateCampaignStatus updates a campaign's status.
func (s *store) UpdateCampaignCounts(campID int, toSend int, sent int, lastSubID int) error {
_, err := s.queries.UpdateCampaignCounts.Exec(campID, toSend, sent, lastSubID)
return err
}
// GetAttachment fetches a media attachment blob.
func (s *store) GetAttachment(mediaID int) (models.Attachment, error) {
m, err := s.core.GetMedia(mediaID, "", s.media)

View file

@ -28,11 +28,12 @@ const (
// Store represents a data backend, such as a database,
// that provides subscriber and campaign records.
type Store interface {
NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error)
NextCampaigns(currentIDs []int64, sentCounts []int64) ([]*models.Campaign, error)
NextSubscribers(campID, limit int) ([]models.Subscriber, error)
GetCampaign(campID int) (*models.Campaign, error)
GetAttachment(mediaID int) (models.Attachment, error)
UpdateCampaignStatus(campID int, status string) error
UpdateCampaignCounts(campID int, toSend int, sent int, lastSubID int) error
CreateLink(url string) (string, error)
BlocklistSubscriber(id int64) error
DeleteSubscriber(id int64) error
@ -165,9 +166,9 @@ func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18
pipes: make(map[int]*pipe),
tpls: make(map[int]*models.Template),
links: make(map[string]string),
nextPipes: make(chan *pipe, cfg.Concurrency),
campMsgQ: make(chan CampaignMessage, cfg.Concurrency*2),
msgQ: make(chan models.Message, cfg.Concurrency),
nextPipes: make(chan *pipe, 1000),
campMsgQ: make(chan CampaignMessage, cfg.Concurrency*cfg.MessageRate*2),
msgQ: make(chan models.Message, cfg.Concurrency*cfg.MessageRate*2),
slidingStart: time.Now(),
}
m.tplFuncs = m.makeGnericFuncMap()
@ -275,7 +276,10 @@ func (m *Manager) Run() {
if has {
// There are more subscribers to fetch. Queue again.
m.nextPipes <- p
select {
case m.nextPipes <- p:
default:
}
} else {
// Mark the pseudo counter that's added in makePipe() that is used
// to force a wait on a pipe.
@ -388,7 +392,8 @@ func (m *Manager) scanCampaigns(tick time.Duration) {
select {
// Periodically scan the data source for campaigns to process.
case <-t.C:
campaigns, err := m.store.NextCampaigns(m.getRunningCampaignIDs())
ids, counts := m.getCurrentCampaigns()
campaigns, err := m.store.NextCampaigns(ids, counts)
if err != nil {
m.log.Printf("error fetching campaigns: %v", err)
continue
@ -488,7 +493,12 @@ func (m *Manager) worker() {
if err != nil {
msg.pipe.OnError()
} else {
id := uint64(msg.Subscriber.ID)
if id > msg.pipe.lastID.Load() {
msg.pipe.lastID.Store(uint64(msg.Subscriber.ID))
}
msg.pipe.rate.Incr(1)
msg.pipe.sent.Add(1)
}
}
@ -518,6 +528,29 @@ func (m *Manager) getRunningCampaignIDs() []int64 {
return ids
}
// getCurrentCampaigns returns the IDs of campaigns currently being processed
// and their sent counts.
func (m *Manager) getCurrentCampaigns() ([]int64, []int64) {
// Needs to return an empty slice in case there are no campaigns.
m.pipesMut.RLock()
defer m.pipesMut.RUnlock()
var (
ids = make([]int64, 0, len(m.pipes))
counts = make([]int64, 0, len(m.pipes))
)
for _, p := range m.pipes {
ids = append(ids, int64(p.camp.ID))
// Get the sent counts for campaigns and reset them to 0
// as in the database, they're stored cumulatively (sent += $newSent).
counts = append(counts, p.sent.Load())
p.sent.Store(0)
}
return ids, counts
}
// isCampaignProcessing checks if the campaign is being processed.
func (m *Manager) isCampaignProcessing(id int) bool {
m.pipesMut.RLock()

View file

@ -14,8 +14,10 @@ type pipe struct {
camp *models.Campaign
rate *ratecounter.RateCounter
wg *sync.WaitGroup
stopped atomic.Bool
sent atomic.Int64
lastID atomic.Uint64
errors atomic.Uint64
stopped atomic.Bool
withErrors atomic.Bool
m *Manager
@ -182,6 +184,11 @@ func (p *pipe) cleanup() {
p.m.pipesMut.Unlock()
}()
// Update campaign's "sent" count.
if err := p.m.store.UpdateCampaignCounts(p.camp.ID, 0, int(p.sent.Load()), int(p.lastID.Load())); err != nil {
p.m.log.Printf("error updating campaign counts (%s): %v", p.camp.Name, err)
}
// The campaign was auto-paused due to errors.
if p.withErrors.Load() {
if err := p.m.store.UpdateCampaignStatus(p.camp.ID, models.CampaignStatusPaused); err != nil {

View file

@ -621,7 +621,7 @@ SELECT id, status, to_send, sent, started_at, updated_at
-- that is, the total number of subscribers to be processed across all lists of a campaign.
-- Thus, it has a sideaffect.
-- In addition, it finds the max_subscriber_id, the upper limit across all lists of
-- a campaign. This is used to fetch and slice subscribers for the campaign in next-subscriber-campaigns.
-- a campaign. This is used to fetch and slice subscribers for the campaign in next-campaign-subscribers.
WITH camps AS (
-- Get all running campaigns and their template bodies (if the template's deleted, the default template body instead)
SELECT campaigns.*, COALESCE(templates.body, (SELECT body FROM templates WHERE is_default = true LIMIT 1)) AS template_body
@ -666,6 +666,12 @@ counts AS (
)
GROUP BY camps.id
),
updateCounts AS (
WITH uc (campaign_id, sent_count) AS (SELECT * FROM unnest($1::INT[], $2::INT[]))
UPDATE campaigns
SET sent = sent + uc.sent_count
FROM uc WHERE campaigns.id = uc.campaign_id
),
u AS (
-- For each campaign, update the to_send count and set the max_subscriber_id.
UPDATE campaigns AS ca
@ -767,9 +773,7 @@ subs AS (
),
u AS (
UPDATE campaigns
SET last_subscriber_id = (SELECT MAX(id) FROM subs),
sent = sent + (SELECT COUNT(id) FROM subs),
updated_at = NOW()
SET last_subscriber_id = (SELECT MAX(id) FROM subs), updated_at = NOW()
WHERE (SELECT COUNT(id) FROM subs) > 0 AND id=$1
)
SELECT * FROM subs;
@ -829,8 +833,8 @@ INSERT INTO campaign_lists (campaign_id, list_id, list_name)
-- name: update-campaign-counts
UPDATE campaigns SET
to_send=(CASE WHEN $2 != 0 THEN $2 ELSE to_send END),
sent=(CASE WHEN $3 != 0 THEN $3 ELSE sent END),
last_subscriber_id=(CASE WHEN $4 != 0 THEN $4 ELSE last_subscriber_id END),
sent=sent+$3,
last_subscriber_id=(CASE WHEN $4 > 0 THEN $4 ELSE to_send END),
updated_at=NOW()
WHERE id=$1;