Refactor the core concurrent campaign manager logic.

This commit fully refactors the core campaign manager logic.
It applies a whole new approach to campaign state and lifecycle management.

- Create a new "pipeline" abstraction on top of campaign
  for state management.
- Account for every message processed and end campaigns
  based on the actual count.
- Discard in-queue messages in the pipeline of a paused
  or cancelled campaign.
This commit is contained in:
Kailash Nadh 2023-12-22 15:40:13 +05:30
parent 0c9dc07479
commit 414c5c0c99
5 changed files with 467 additions and 384 deletions

View file

@ -299,6 +299,10 @@ func handleUpdateCampaignStatus(c echo.Context) error {
return err
}
if o.Status == models.CampaignStatusPaused || o.Status == models.CampaignStatusCancelled {
app.manager.StopCampaign(id)
}
return c.JSON(http.StatusOK, okResp{out})
}

View file

@ -38,8 +38,6 @@ func handleEventStream(c echo.Context) error {
continue
}
fmt.Printf("data: %s\n\n", b)
c.Response().Write([]byte(fmt.Sprintf("retry: 3000\ndata: %s\n\n", b)))
c.Response().Flush()

View file

@ -1,7 +1,6 @@
package manager
import (
"bytes"
"errors"
"fmt"
"html/template"
@ -14,7 +13,6 @@ import (
"github.com/Masterminds/sprig/v3"
"github.com/knadh/listmonk/internal/i18n"
"github.com/knadh/listmonk/models"
"github.com/paulbellamy/ratecounter"
)
const (
@ -62,12 +60,11 @@ type Manager struct {
i18n *i18n.I18n
messengers map[string]Messenger
notifCB models.AdminNotifCallback
logger *log.Logger
log *log.Logger
// Campaigns that are currently running.
camps map[int]*models.Campaign
campRates map[int]*ratecounter.RateCounter
campsMut sync.RWMutex
pipes map[int]*pipe
pipesMut sync.RWMutex
tpls map[int]*models.Template
tplsMut sync.RWMutex
@ -78,17 +75,15 @@ type Manager struct {
links map[string]string
linksMut sync.RWMutex
subFetchQueue chan *models.Campaign
campMsgQueue chan CampaignMessage
campMsgErrorQueue chan msgError
campMsgErrorCounts map[int]int
msgQueue chan models.Message
nextPipes chan *pipe
campMsgQ chan CampaignMessage
msgQ chan models.Message
// Sliding window keeps track of the total number of messages sent in a period
// and on reaching the specified limit, waits until the window is over before
// sending further messages.
slidingWindowNumMsg int
slidingWindowStart time.Time
slidingCount int
slidingStart time.Time
tplFuncs template.FuncMap
}
@ -105,6 +100,8 @@ type CampaignMessage struct {
body []byte
altBody []byte
unsubURL string
pipe *pipe
}
// Config has parameters for configuring the manager.
@ -140,8 +137,8 @@ type Config struct {
}
type msgError struct {
camp *models.Campaign
err error
st *pipe
err error
}
var pushTimeout = time.Second * 3
@ -159,49 +156,25 @@ func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18
}
m := &Manager{
cfg: cfg,
store: store,
i18n: i,
notifCB: notifCB,
logger: l,
messengers: make(map[string]Messenger),
camps: make(map[int]*models.Campaign),
campRates: make(map[int]*ratecounter.RateCounter),
tpls: make(map[int]*models.Template),
links: make(map[string]string),
subFetchQueue: make(chan *models.Campaign, cfg.Concurrency),
campMsgQueue: make(chan CampaignMessage, cfg.Concurrency*2),
msgQueue: make(chan models.Message, cfg.Concurrency),
campMsgErrorQueue: make(chan msgError, cfg.MaxSendErrors),
campMsgErrorCounts: make(map[int]int),
slidingWindowStart: time.Now(),
cfg: cfg,
store: store,
i18n: i,
notifCB: notifCB,
log: l,
messengers: make(map[string]Messenger),
pipes: make(map[int]*pipe),
tpls: make(map[int]*models.Template),
links: make(map[string]string),
nextPipes: make(chan *pipe, cfg.Concurrency),
campMsgQ: make(chan CampaignMessage, cfg.Concurrency*2),
msgQ: make(chan models.Message, cfg.Concurrency),
slidingStart: time.Now(),
}
m.tplFuncs = m.makeGnericFuncMap()
return m
}
// NewCampaignMessage creates and returns a CampaignMessage that is made available
// to message templates while they're compiled. It represents a message from
// a campaign that's bound to a single Subscriber.
func (m *Manager) NewCampaignMessage(c *models.Campaign, s models.Subscriber) (CampaignMessage, error) {
msg := CampaignMessage{
Campaign: c,
Subscriber: s,
subject: c.Subject,
from: c.FromEmail,
to: s.Email,
unsubURL: fmt.Sprintf(m.cfg.UnsubURL, c.UUID, s.UUID),
}
if err := msg.render(); err != nil {
return msg, err
}
return msg, nil
}
// AddMessenger adds a Messenger messaging backend to the manager.
func (m *Manager) AddMessenger(msg Messenger) error {
id := msg.Name()
@ -219,9 +192,9 @@ func (m *Manager) PushMessage(msg models.Message) error {
defer t.Stop()
select {
case m.msgQueue <- msg:
case m.msgQ <- msg:
case <-t.C:
m.logger.Printf("message push timed out: '%s'", msg.Subject)
m.log.Printf("message push timed out: '%s'", msg.Subject)
return errors.New("message push timed out")
}
return nil
@ -239,9 +212,9 @@ func (m *Manager) PushCampaignMessage(msg CampaignMessage) error {
}
select {
case m.campMsgQueue <- msg:
case m.campMsgQ <- msg:
case <-t.C:
m.logger.Printf("message push timed out: '%s'", msg.Subject())
m.log.Printf("message push timed out: '%s'", msg.Subject())
return errors.New("message push timed out")
}
return nil
@ -255,20 +228,20 @@ func (m *Manager) HasMessenger(id string) bool {
// HasRunningCampaigns checks if there are any active campaigns.
func (m *Manager) HasRunningCampaigns() bool {
m.campsMut.Lock()
defer m.campsMut.Unlock()
return len(m.camps) > 0
m.pipesMut.Lock()
defer m.pipesMut.Unlock()
return len(m.pipes) > 0
}
// GetCampaignStats returns campaign statistics.
func (m *Manager) GetCampaignStats(id int) CampStats {
n := 0
m.campsMut.Lock()
if r, ok := m.campRates[id]; ok {
n = int(r.Rate())
m.pipesMut.Lock()
if c, ok := m.pipes[id]; ok {
n = int(c.rate.Rate())
}
m.campsMut.Unlock()
m.pipesMut.Unlock()
return CampStats{SendRate: n}
}
@ -281,6 +254,8 @@ func (m *Manager) GetCampaignStats(id int) CampStats {
// as "finished".
func (m *Manager) Run() {
if m.cfg.ScanCampaigns {
// Periodically scan campaigns and push running campaigns to nextPipes
// to fetch subscribers from the campaign.
go m.scanCampaigns(m.cfg.ScanInterval)
}
@ -289,26 +264,22 @@ func (m *Manager) Run() {
go m.worker()
}
// Fetch the next set of subscribers for a campaign and process them.
for c := range m.subFetchQueue {
has, err := m.nextSubscribers(c, m.cfg.BatchSize)
// Indefinitely wait on the pipe queue to fetch the next set of subscribers
// for any active campaigns.
for p := range m.nextPipes {
has, err := p.NextSubscribers()
if err != nil {
m.logger.Printf("error processing campaign batch (%s): %v", c.Name, err)
m.log.Printf("error processing campaign batch (%s): %v", p.camp.Name, err)
continue
}
if has {
// There are more subscribers to fetch.
m.subFetchQueue <- c
} else if m.isCampaignProcessing(c.ID) {
// There are no more subscribers. Either the campaign status
// has changed or all subscribers have been processed.
newC, err := m.exhaustCampaign(c, "")
if err != nil {
m.logger.Printf("error exhausting campaign (%s): %v", c.Name, err)
continue
}
m.sendNotif(newC, newC.Status, "")
// There are more subscribers to fetch. Queue again.
m.nextPipes <- p
} else {
// Mark the pseudo counter that's added in makePipe() that is used
// to force a wait on a pipe.
p.wg.Done()
}
}
}
@ -340,91 +311,6 @@ func (m *Manager) GetTpl(id int) (*models.Template, error) {
return tpl, nil
}
// worker is a blocking function that perpetually listents to events (message) on different
// queues and processes them.
func (m *Manager) worker() {
// Counter to keep track of the message / sec rate limit.
numMsg := 0
for {
select {
// Campaign message.
case msg, ok := <-m.campMsgQueue:
if !ok {
return
}
// Pause on hitting the message rate.
if numMsg >= m.cfg.MessageRate {
time.Sleep(time.Second)
numMsg = 0
}
numMsg++
// Outgoing message.
out := models.Message{
From: msg.from,
To: []string{msg.to},
Subject: msg.subject,
ContentType: msg.Campaign.ContentType,
Body: msg.body,
AltBody: msg.altBody,
Subscriber: msg.Subscriber,
Campaign: msg.Campaign,
Attachments: msg.Campaign.Attachments,
}
h := textproto.MIMEHeader{}
h.Set(models.EmailHeaderCampaignUUID, msg.Campaign.UUID)
h.Set(models.EmailHeaderSubscriberUUID, msg.Subscriber.UUID)
// Attach List-Unsubscribe headers?
if m.cfg.UnsubHeader {
h.Set("List-Unsubscribe-Post", "List-Unsubscribe=One-Click")
h.Set("List-Unsubscribe", `<`+msg.unsubURL+`>`)
}
// Attach any custom headers.
if len(msg.Campaign.Headers) > 0 {
for _, set := range msg.Campaign.Headers {
for hdr, val := range set {
h.Add(hdr, val)
}
}
}
out.Headers = h
if err := m.messengers[msg.Campaign.Messenger].Push(out); err != nil {
m.logger.Printf("error sending message in campaign %s: subscriber %d: %v",
msg.Campaign.Name, msg.Subscriber.ID, err)
select {
case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}:
default:
continue
}
}
m.campsMut.Lock()
if r, ok := m.campRates[msg.Campaign.ID]; ok {
r.Incr(1)
}
m.campsMut.Unlock()
// Arbitrary message.
case msg, ok := <-m.msgQueue:
if !ok {
return
}
err := m.messengers[msg.Messenger].Push(msg)
if err != nil {
m.logger.Printf("error sending message '%s': %v", msg.Subject, err)
}
}
}
}
// TemplateFuncs returns the template functions to be applied into
// compiled campaign templates.
func (m *Manager) TemplateFuncs(c *models.Campaign) template.FuncMap {
@ -476,15 +362,24 @@ func (m *Manager) GenericTemplateFuncs() template.FuncMap {
return m.tplFuncs
}
// StopCampaign marks a running campaign as stopped so that all its queued messages are ignored.
func (m *Manager) StopCampaign(id int) {
m.pipesMut.RLock()
if p, ok := m.pipes[id]; ok {
p.Stop(false)
}
m.pipesMut.RUnlock()
}
// Close closes and exits the campaign manager.
func (m *Manager) Close() {
close(m.subFetchQueue)
close(m.campMsgErrorQueue)
close(m.msgQueue)
close(m.nextPipes)
close(m.msgQ)
}
// scanCampaigns is a blocking function that periodically scans the data source
// for campaigns to process and dispatches them to the manager.
// for campaigns to process and dispatches them to the manager. It feeds campaigns
// into nextPipes.
func (m *Manager) scanCampaigns(tick time.Duration) {
t := time.NewTicker(tick)
defer t.Stop()
@ -493,205 +388,144 @@ func (m *Manager) scanCampaigns(tick time.Duration) {
select {
// Periodically scan the data source for campaigns to process.
case <-t.C:
campaigns, err := m.store.NextCampaigns(m.getPendingCampaignIDs())
campaigns, err := m.store.NextCampaigns(m.getRunningCampaignIDs())
if err != nil {
m.logger.Printf("error fetching campaigns: %v", err)
m.log.Printf("error fetching campaigns: %v", err)
continue
}
for _, c := range campaigns {
if err := m.addCampaign(c); err != nil {
m.logger.Printf("error processing campaign (%s): %v", c.Name, err)
// Create a new pipe that'll handle this campaign's states.
p, err := m.newPipe(c)
if err != nil {
m.log.Printf("error processing campaign (%s): %v", c.Name, err)
continue
}
m.logger.Printf("start processing campaign (%s)", c.Name)
m.log.Printf("start processing campaign (%s)", c.Name)
// If subscriber processing is busy, move on. Blocking and waiting
// can end up in a race condition where the waiting campaign's
// state in the data source has changed.
select {
case m.subFetchQueue <- c:
case m.nextPipes <- p:
default:
}
}
}
}
}
// Aggregate errors from sending messages to check against the error threshold
// after which a campaign is paused.
case e, ok := <-m.campMsgErrorQueue:
// worker is a blocking function that perpetually listents to events (message) on different
// queues and processes them.
func (m *Manager) worker() {
// Counter to keep track of the message / sec rate limit.
numMsg := 0
for {
select {
// Campaign message.
case msg, ok := <-m.campMsgQ:
if !ok {
return
}
if m.cfg.MaxSendErrors < 1 {
// If the campaign has ended, ignore the message.
if msg.pipe != nil && msg.pipe.stopped.Load() {
msg.pipe.wg.Done()
continue
}
// If the error threshold is met, pause the campaign.
m.campMsgErrorCounts[e.camp.ID]++
if m.campMsgErrorCounts[e.camp.ID] >= m.cfg.MaxSendErrors {
m.logger.Printf("error counted exceeded %d. pausing campaign %s",
m.cfg.MaxSendErrors, e.camp.Name)
// Pause on hitting the message rate.
if numMsg >= m.cfg.MessageRate {
time.Sleep(time.Second)
numMsg = 0
}
numMsg++
if m.isCampaignProcessing(e.camp.ID) {
m.exhaustCampaign(e.camp, models.CampaignStatusPaused)
// Outgoing message.
out := models.Message{
From: msg.from,
To: []string{msg.to},
Subject: msg.subject,
ContentType: msg.Campaign.ContentType,
Body: msg.body,
AltBody: msg.altBody,
Subscriber: msg.Subscriber,
Campaign: msg.Campaign,
Attachments: msg.Campaign.Attachments,
}
h := textproto.MIMEHeader{}
h.Set(models.EmailHeaderCampaignUUID, msg.Campaign.UUID)
h.Set(models.EmailHeaderSubscriberUUID, msg.Subscriber.UUID)
// Attach List-Unsubscribe headers?
if m.cfg.UnsubHeader {
h.Set("List-Unsubscribe-Post", "List-Unsubscribe=One-Click")
h.Set("List-Unsubscribe", `<`+msg.unsubURL+`>`)
}
// Attach any custom headers.
if len(msg.Campaign.Headers) > 0 {
for _, set := range msg.Campaign.Headers {
for hdr, val := range set {
h.Add(hdr, val)
}
}
delete(m.campMsgErrorCounts, e.camp.ID)
}
// Notify admins.
m.sendNotif(e.camp, models.CampaignStatusPaused, "Too many errors")
out.Headers = h
err := m.messengers[msg.Campaign.Messenger].Push(out)
if err != nil {
m.log.Printf("error sending message in campaign %s: subscriber %d: %v", msg.Campaign.Name, msg.Subscriber.ID, err)
}
// Increment the send rate or the error counter if there was an error.
if msg.pipe != nil {
// Mark the message as done.
msg.pipe.wg.Done()
if err != nil {
msg.pipe.OnError()
} else {
msg.pipe.rate.Incr(1)
}
}
// Arbitrary message.
case msg, ok := <-m.msgQ:
if !ok {
return
}
err := m.messengers[msg.Messenger].Push(msg)
if err != nil {
m.log.Printf("error sending message '%s': %v", msg.Subject, err)
}
}
}
}
// addCampaign adds a campaign to the process queue.
func (m *Manager) addCampaign(c *models.Campaign) error {
// Validate messenger.
if _, ok := m.messengers[c.Messenger]; !ok {
m.store.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled)
return fmt.Errorf("unknown messenger %s on campaign %s", c.Messenger, c.Name)
}
// Load the template.
if err := c.CompileTemplate(m.TemplateFuncs(c)); err != nil {
return err
}
// Load any media/attachments.
if err := m.attachMedia(c); err != nil {
return err
}
// Add the campaign to the active map.
m.campsMut.Lock()
m.camps[c.ID] = c
m.campRates[c.ID] = ratecounter.NewRateCounter(time.Minute)
m.campsMut.Unlock()
return nil
}
// getPendingCampaignIDs returns the IDs of campaigns currently being processed.
func (m *Manager) getPendingCampaignIDs() []int64 {
// getRunningCampaignIDs returns the IDs of campaigns currently being processed.
func (m *Manager) getRunningCampaignIDs() []int64 {
// Needs to return an empty slice in case there are no campaigns.
m.campsMut.RLock()
ids := make([]int64, 0, len(m.camps))
for _, c := range m.camps {
ids = append(ids, int64(c.ID))
m.pipesMut.RLock()
ids := make([]int64, 0, len(m.pipes))
for _, p := range m.pipes {
ids = append(ids, int64(p.camp.ID))
}
m.campsMut.RUnlock()
m.pipesMut.RUnlock()
return ids
}
// nextSubscribers processes the next batch of subscribers in a given campaign.
// It returns a bool indicating whether any subscribers were processed
// in the current batch or not. A false indicates that all subscribers
// have been processed, or that a campaign has been paused or cancelled.
func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, error) {
// Fetch a batch of subscribers.
subs, err := m.store.NextSubscribers(c.ID, batchSize)
if err != nil {
return false, fmt.Errorf("error fetching campaign subscribers (%s): %v", c.Name, err)
}
// There are no subscribers.
if len(subs) == 0 {
return false, nil
}
// Is there a sliding window limit configured?
hasSliding := m.cfg.SlidingWindow &&
m.cfg.SlidingWindowRate > 0 &&
m.cfg.SlidingWindowDuration.Seconds() > 1
// Push messages.
for _, s := range subs {
// Send the message.
msg, err := m.NewCampaignMessage(c, s)
if err != nil {
m.logger.Printf("error rendering message (%s) (%s): %v", c.Name, s.Email, err)
continue
}
// Push the message to the queue while blocking and waiting until
// the queue is drained.
m.campMsgQueue <- msg
// Check if the sliding window is active.
if hasSliding {
diff := time.Now().Sub(m.slidingWindowStart)
// Window has expired. Reset the clock.
if diff >= m.cfg.SlidingWindowDuration {
m.slidingWindowStart = time.Now()
m.slidingWindowNumMsg = 0
continue
}
// Have the messages exceeded the limit?
m.slidingWindowNumMsg++
if m.slidingWindowNumMsg >= m.cfg.SlidingWindowRate {
wait := m.cfg.SlidingWindowDuration - diff
m.logger.Printf("messages exceeded (%d) for the window (%v since %s). Sleeping for %s.",
m.slidingWindowNumMsg,
m.cfg.SlidingWindowDuration,
m.slidingWindowStart.Format(time.RFC822Z),
wait.Round(time.Second)*1)
m.slidingWindowNumMsg = 0
time.Sleep(wait)
}
}
}
return true, nil
}
// isCampaignProcessing checks if the campaign is being processed.
func (m *Manager) isCampaignProcessing(id int) bool {
m.campsMut.RLock()
_, ok := m.camps[id]
m.campsMut.RUnlock()
m.pipesMut.RLock()
_, ok := m.pipes[id]
m.pipesMut.RUnlock()
return ok
}
func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Campaign, error) {
m.campsMut.Lock()
delete(m.camps, c.ID)
delete(m.campRates, c.ID)
m.campsMut.Unlock()
// A status has been passed. Change the campaign's status
// without further checks.
if status != "" {
if err := m.store.UpdateCampaignStatus(c.ID, status); err != nil {
m.logger.Printf("error updating campaign (%s) status to %s: %v", c.Name, status, err)
} else {
m.logger.Printf("set campaign (%s) to %s", c.Name, status)
}
return c, nil
}
// Fetch the up-to-date campaign status from the source.
cm, err := m.store.GetCampaign(c.ID)
if err != nil {
return nil, err
}
// If a running campaign has exhausted subscribers, it's finished.
if cm.Status == models.CampaignStatusRunning {
cm.Status = models.CampaignStatusFinished
if err := m.store.UpdateCampaignStatus(c.ID, models.CampaignStatusFinished); err != nil {
m.logger.Printf("error finishing campaign (%s): %v", c.Name, err)
} else {
m.logger.Printf("campaign (%s) finished", c.Name)
}
} else {
m.logger.Printf("stop processing campaign (%s)", c.Name)
}
return cm, nil
}
// trackLink register a URL and return its UUID to be used in message templates
// for tracking links.
func (m *Manager) trackLink(url, campUUID, subUUID string) string {
@ -707,7 +541,7 @@ func (m *Manager) trackLink(url, campUUID, subUUID string) string {
// Register link.
uu, err := m.store.CreateLink(url)
if err != nil {
m.logger.Printf("error registering tracking for link '%s': %v", url, err)
m.log.Printf("error registering tracking for link '%s': %v", url, err)
// If the registration fails, fail over to the original URL.
return url
@ -736,61 +570,6 @@ func (m *Manager) sendNotif(c *models.Campaign, status, reason string) error {
return m.notifCB(subject, data)
}
// render takes a Message, executes its pre-compiled Campaign.Tpl
// and applies the resultant bytes to Message.body to be used in messages.
func (m *CampaignMessage) render() error {
out := bytes.Buffer{}
// Render the subject if it's a template.
if m.Campaign.SubjectTpl != nil {
if err := m.Campaign.SubjectTpl.ExecuteTemplate(&out, models.ContentTpl, m); err != nil {
return err
}
m.subject = out.String()
out.Reset()
}
// Compile the main template.
if err := m.Campaign.Tpl.ExecuteTemplate(&out, models.BaseTpl, m); err != nil {
return err
}
m.body = out.Bytes()
// Is there an alt body?
if m.Campaign.ContentType != models.CampaignContentTypePlain && m.Campaign.AltBody.Valid {
if m.Campaign.AltBodyTpl != nil {
b := bytes.Buffer{}
if err := m.Campaign.AltBodyTpl.ExecuteTemplate(&b, models.ContentTpl, m); err != nil {
return err
}
m.altBody = b.Bytes()
} else {
m.altBody = []byte(m.Campaign.AltBody.String)
}
}
return nil
}
// Subject returns a copy of the message subject
func (m *CampaignMessage) Subject() string {
return m.subject
}
// Body returns a copy of the message body.
func (m *CampaignMessage) Body() []byte {
out := make([]byte, len(m.body))
copy(out, m.body)
return out
}
// AltBody returns a copy of the message's alt body.
func (m *CampaignMessage) AltBody() []byte {
out := make([]byte, len(m.altBody))
copy(out, m.altBody)
return out
}
func (m *Manager) makeGnericFuncMap() template.FuncMap {
f := template.FuncMap{
"Date": func(layout string) string {

View file

@ -0,0 +1,84 @@
package manager
import (
"bytes"
"fmt"
"github.com/knadh/listmonk/models"
)
// NewCampaignMessage creates and returns a CampaignMessage that is made available
// to message templates while they're compiled. It represents a message from
// a campaign that's bound to a single Subscriber.
func (m *Manager) NewCampaignMessage(c *models.Campaign, s models.Subscriber) (CampaignMessage, error) {
msg := CampaignMessage{
Campaign: c,
Subscriber: s,
subject: c.Subject,
from: c.FromEmail,
to: s.Email,
unsubURL: fmt.Sprintf(m.cfg.UnsubURL, c.UUID, s.UUID),
}
if err := msg.render(); err != nil {
return msg, err
}
return msg, nil
}
// render takes a Message, executes its pre-compiled Campaign.Tpl
// and applies the resultant bytes to Message.body to be used in messages.
func (m *CampaignMessage) render() error {
out := bytes.Buffer{}
// Render the subject if it's a template.
if m.Campaign.SubjectTpl != nil {
if err := m.Campaign.SubjectTpl.ExecuteTemplate(&out, models.ContentTpl, m); err != nil {
return err
}
m.subject = out.String()
out.Reset()
}
// Compile the main template.
if err := m.Campaign.Tpl.ExecuteTemplate(&out, models.BaseTpl, m); err != nil {
return err
}
m.body = out.Bytes()
// Is there an alt body?
if m.Campaign.ContentType != models.CampaignContentTypePlain && m.Campaign.AltBody.Valid {
if m.Campaign.AltBodyTpl != nil {
b := bytes.Buffer{}
if err := m.Campaign.AltBodyTpl.ExecuteTemplate(&b, models.ContentTpl, m); err != nil {
return err
}
m.altBody = b.Bytes()
} else {
m.altBody = []byte(m.Campaign.AltBody.String)
}
}
return nil
}
// Subject returns a copy of the message subject
func (m *CampaignMessage) Subject() string {
return m.subject
}
// Body returns a copy of the message body.
func (m *CampaignMessage) Body() []byte {
out := make([]byte, len(m.body))
copy(out, m.body)
return out
}
// AltBody returns a copy of the message's alt body.
func (m *CampaignMessage) AltBody() []byte {
out := make([]byte, len(m.altBody))
copy(out, m.altBody)
return out
}

218
internal/manager/pipe.go Normal file
View file

@ -0,0 +1,218 @@
package manager
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/knadh/listmonk/models"
"github.com/paulbellamy/ratecounter"
)
type pipe struct {
camp *models.Campaign
rate *ratecounter.RateCounter
wg *sync.WaitGroup
stopped atomic.Bool
errors atomic.Uint64
withErrors atomic.Bool
m *Manager
}
// newPipe adds a campaign to the process queue.
func (m *Manager) newPipe(c *models.Campaign) (*pipe, error) {
// Validate messenger.
if _, ok := m.messengers[c.Messenger]; !ok {
m.store.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled)
return nil, fmt.Errorf("unknown messenger %s on campaign %s", c.Messenger, c.Name)
}
// Load the template.
if err := c.CompileTemplate(m.TemplateFuncs(c)); err != nil {
return nil, err
}
// Load any media/attachments.
if err := m.attachMedia(c); err != nil {
return nil, err
}
// Add the campaign to the active map.
p := &pipe{
camp: c,
rate: ratecounter.NewRateCounter(time.Minute),
wg: &sync.WaitGroup{},
m: m,
}
// Increment the waitgroup so that Wait() blocks immediately. This is necessary
// as a campaign pipe is created first and subscribers/messages under it are
// fetched asynchronolusly later. The messages each add to the wg and that
// count is used to determine the exhaustion/completion of all messages.
p.wg.Add(1)
go func() {
// Wait for all the messages in the campaign to be processed
// (successfully or skipped after errors or cancellation).
p.wg.Wait()
p.Stop(false)
p.cleanup()
}()
m.pipesMut.Lock()
m.pipes[c.ID] = p
m.pipesMut.Unlock()
return p, nil
}
// NextSubscribers processes the next batch of subscribers in a given campaign.
// It returns a bool indicating whether any subscribers were processed
// in the current batch or not. A false indicates that all subscribers
// have been processed, or that a campaign has been paused or cancelled.
func (p *pipe) NextSubscribers() (bool, error) {
// Fetch a batch of subscribers.
subs, err := p.m.store.NextSubscribers(p.camp.ID, p.m.cfg.BatchSize)
if err != nil {
return false, fmt.Errorf("error fetching campaign subscribers (%s): %v", p.camp.Name, err)
}
// There are no subscribers.
if len(subs) == 0 {
return false, nil
}
// Is there a sliding window limit configured?
hasSliding := p.m.cfg.SlidingWindow &&
p.m.cfg.SlidingWindowRate > 0 &&
p.m.cfg.SlidingWindowDuration.Seconds() > 1
// Push messages.
for _, s := range subs {
msg, err := p.newMessage(s)
if err != nil {
p.m.log.Printf("error rendering message (%s) (%s): %v", p.camp.Name, s.Email, err)
continue
}
// Push the message to the queue while blocking and waiting until
// the queue is drained.
p.m.campMsgQ <- msg
// Check if the sliding window is active.
if hasSliding {
diff := time.Now().Sub(p.m.slidingStart)
// Window has expired. Reset the clock.
if diff >= p.m.cfg.SlidingWindowDuration {
p.m.slidingStart = time.Now()
p.m.slidingCount = 0
continue
}
// Have the messages exceeded the limit?
p.m.slidingCount++
if p.m.slidingCount >= p.m.cfg.SlidingWindowRate {
wait := p.m.cfg.SlidingWindowDuration - diff
p.m.log.Printf("messages exceeded (%d) for the window (%v since %s). Sleeping for %s.",
p.m.slidingCount,
p.m.cfg.SlidingWindowDuration,
p.m.slidingStart.Format(time.RFC822Z),
wait.Round(time.Second)*1)
p.m.slidingCount = 0
time.Sleep(wait)
}
}
}
return true, nil
}
func (p *pipe) OnError() {
if p.m.cfg.MaxSendErrors < 1 {
return
}
// If the error threshold is met, pause the campaign.
count := p.errors.Add(1)
if int(count) < p.m.cfg.MaxSendErrors {
return
}
p.Stop(true)
p.m.log.Printf("error count exceeded %d. pausing campaign %s", p.m.cfg.MaxSendErrors, p.camp.Name)
}
// Stop "marks" a campaign as stopped. It doesn't actually stop the processing
// of messages. That happens when every queued message in the campaign is processed,
// marking .wg, the waitgroup counter as done. That triggers cleanup().
func (p *pipe) Stop(withErrors bool) {
// Already stopped.
if p.stopped.Load() {
return
}
if withErrors {
p.withErrors.Store(true)
}
p.stopped.Store(true)
}
func (p *pipe) newMessage(s models.Subscriber) (CampaignMessage, error) {
msg, err := p.m.NewCampaignMessage(p.camp, s)
if err != nil {
return msg, err
}
msg.pipe = p
p.wg.Add(1)
return msg, nil
}
func (p *pipe) cleanup() {
defer func() {
p.m.pipesMut.Lock()
delete(p.m.pipes, p.camp.ID)
p.m.pipesMut.Unlock()
}()
// The campaign was auto-paused due to errors.
if p.withErrors.Load() {
if err := p.m.store.UpdateCampaignStatus(p.camp.ID, models.CampaignStatusPaused); err != nil {
p.m.log.Printf("error updating campaign (%s) status to %s: %v", p.camp.Name, models.CampaignStatusPaused, err)
} else {
p.m.log.Printf("set campaign (%s) to %s", p.camp.Name, models.CampaignStatusPaused)
}
_ = p.m.sendNotif(p.camp, models.CampaignStatusPaused, "Too many errors")
return
}
// Fetch the up-to-date campaign status from the DB.
c, err := p.m.store.GetCampaign(p.camp.ID)
if err != nil {
p.m.log.Printf("error fetching campaign (%s) for ending", p.camp.Name)
return
}
// If a running campaign has exhausted subscribers, it's finished.
if c.Status == models.CampaignStatusRunning {
c.Status = models.CampaignStatusFinished
if err := p.m.store.UpdateCampaignStatus(p.camp.ID, models.CampaignStatusFinished); err != nil {
p.m.log.Printf("error finishing campaign (%s): %v", p.camp.Name, err)
} else {
p.m.log.Printf("campaign (%s) finished", p.camp.Name)
}
} else {
p.m.log.Printf("stop processing campaign (%s)", p.camp.Name)
}
// Notify the admin.
_ = p.m.sendNotif(c, c.Status, "")
}