diff --git a/campaigns.go b/campaigns.go index b8ef154f..cb61bdb3 100644 --- a/campaigns.go +++ b/campaigns.go @@ -180,7 +180,7 @@ func handlePreviewCampaign(c echo.Context) error { } // Render the message body. - m := app.manager.NewMessage(camp, sub) + m := app.manager.NewCampaignMessage(camp, sub) if err := m.Render(); err != nil { app.log.Printf("error rendering message: %v", err) return echo.NewHTTPError(http.StatusBadRequest, @@ -555,7 +555,7 @@ func sendTestMessage(sub models.Subscriber, camp *models.Campaign, app *App) err } // Render the message body. - m := app.manager.NewMessage(camp, sub) + m := app.manager.NewCampaignMessage(camp, sub) if err := m.Render(); err != nil { app.log.Printf("error rendering message: %v", err) return echo.NewHTTPError(http.StatusBadRequest, diff --git a/init.go b/init.go index a7f69ed6..51c2ee87 100644 --- a/init.go +++ b/init.go @@ -159,7 +159,7 @@ func initConstants() *constants { // initCampaignManager initializes the campaign manager. func initCampaignManager(app *App) *manager.Manager { campNotifCB := func(subject string, data interface{}) error { - return sendNotification(app.constants.NotifyEmails, subject, notifTplCampaign, data, app) + return app.sendNotification(app.constants.NotifyEmails, subject, notifTplCampaign, data) } return manager.New(manager.Config{ Concurrency: ko.Int("app.concurrency"), @@ -180,7 +180,7 @@ func initImporter(app *App) *subimporter.Importer { app.queries.UpdateListsDate.Stmt, app.db.DB, func(subject string, data interface{}) error { - go sendNotification(app.constants.NotifyEmails, subject, notifTplImport, data, app) + app.sendNotification(app.constants.NotifyEmails, subject, notifTplImport, data) return nil }) } diff --git a/internal/manager/manager.go b/internal/manager/manager.go index e833ef3a..6a8d0704 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -2,6 +2,7 @@ package manager import ( "bytes" + "errors" "fmt" "html/template" "log" @@ -43,7 +44,8 @@ type Manager struct { logger *log.Logger // Campaigns that are currently running. - camps map[int]*models.Campaign + camps map[int]*models.Campaign + campsMutex sync.RWMutex // Links generated using Track() are cached here so as to not query // the database for the link UUID for every message sent. This has to @@ -51,14 +53,16 @@ type Manager struct { links map[string]string linksMutex sync.RWMutex - subFetchQueue chan *models.Campaign - msgQueue chan Message - msgErrorQueue chan msgError - msgErrorCounts map[int]int + subFetchQueue chan *models.Campaign + campMsgQueue chan CampaignMessage + campMsgErrorQueue chan msgError + campMsgErrorCounts map[int]int + msgQueue chan Message } -// Message represents an active subscriber that's being processed. -type Message struct { +// CampaignMessage represents an instance of campaign message to be pushed out, +// specific to a subscriber, via the campaign's messenger. +type CampaignMessage struct { Campaign *models.Campaign Subscriber models.Subscriber Body []byte @@ -68,6 +72,15 @@ type Message struct { unsubURL string } +// Message represents a generic message to be pushed to a messenger. +type Message struct { + From string + To []string + Subject string + Body []byte + Messenger string +} + // Config has parameters for configuring the manager. type Config struct { Concurrency int @@ -88,24 +101,26 @@ type msgError struct { // New returns a new instance of Mailer. func New(cfg Config, src DataSource, notifCB models.AdminNotifCallback, l *log.Logger) *Manager { return &Manager{ - cfg: cfg, - src: src, - notifCB: notifCB, - logger: l, - messengers: make(map[string]messenger.Messenger), - camps: make(map[int]*models.Campaign), - links: make(map[string]string), - subFetchQueue: make(chan *models.Campaign, cfg.Concurrency), - msgQueue: make(chan Message, cfg.Concurrency), - msgErrorQueue: make(chan msgError, cfg.MaxSendErrors), - msgErrorCounts: make(map[int]int), + cfg: cfg, + src: src, + notifCB: notifCB, + logger: l, + messengers: make(map[string]messenger.Messenger), + camps: make(map[int]*models.Campaign), + links: make(map[string]string), + subFetchQueue: make(chan *models.Campaign, cfg.Concurrency), + campMsgQueue: make(chan CampaignMessage, cfg.Concurrency*2), + msgQueue: make(chan Message, cfg.Concurrency), + campMsgErrorQueue: make(chan msgError, cfg.MaxSendErrors), + campMsgErrorCounts: make(map[int]int), } } -// NewMessage creates and returns a Message that is made available -// to message templates while they're compiled. -func (m *Manager) NewMessage(c *models.Campaign, s models.Subscriber) Message { - return Message{ +// NewCampaignMessage creates and returns a CampaignMessage that is made available +// to message templates while they're compiled. It represents a message from +// a campaign that's bound to a single Subscriber. +func (m *Manager) NewCampaignMessage(c *models.Campaign, s models.Subscriber) CampaignMessage { + return CampaignMessage{ Campaign: c, Subscriber: s, @@ -125,6 +140,17 @@ func (m *Manager) AddMessenger(msg messenger.Messenger) error { return nil } +// PushMessage pushes a Message to be sent out by the workers. +func (m *Manager) PushMessage(msg Message) error { + select { + case m.msgQueue <- msg: + case <-time.After(time.Second * 3): + m.logger.Println("message push timed out: %'s'", msg.Subject) + return errors.New("message push timed out") + } + return nil +} + // GetMessengerNames returns the list of registered messengers. func (m *Manager) GetMessengerNames() []string { names := make([]string, 0, len(m.messengers)) @@ -177,21 +203,21 @@ func (m *Manager) Run(tick time.Duration) { // Aggregate errors from sending messages to check against the error threshold // after which a campaign is paused. - case e := <-m.msgErrorQueue: + case e := <-m.campMsgErrorQueue: if m.cfg.MaxSendErrors < 1 { continue } // If the error threshold is met, pause the campaign. - m.msgErrorCounts[e.camp.ID]++ - if m.msgErrorCounts[e.camp.ID] >= m.cfg.MaxSendErrors { + m.campMsgErrorCounts[e.camp.ID]++ + if m.campMsgErrorCounts[e.camp.ID] >= m.cfg.MaxSendErrors { m.logger.Printf("error counted exceeded %d. pausing campaign %s", m.cfg.MaxSendErrors, e.camp.Name) if m.isCampaignProcessing(e.camp.ID) { m.exhaustCampaign(e.camp, models.CampaignStatusPaused) } - delete(m.msgErrorCounts, e.camp.ID) + delete(m.campMsgErrorCounts, e.camp.ID) // Notify admins. m.sendNotif(e.camp, models.CampaignStatusPaused, "Too many errors") @@ -228,23 +254,31 @@ func (m *Manager) Run(tick time.Duration) { func (m *Manager) SpawnWorkers() { for i := 0; i < m.cfg.Concurrency; i++ { go func() { - for msg := range m.msgQueue { - if !m.isCampaignProcessing(msg.Campaign.ID) { - continue - } + for { + select { + // Campaign message. + case msg := <-m.campMsgQueue: + if !m.isCampaignProcessing(msg.Campaign.ID) { + continue + } - err := m.messengers[msg.Campaign.MessengerID].Push( - msg.from, - []string{msg.to}, - msg.Campaign.Subject, - msg.Body, nil) - if err != nil { - m.logger.Printf("error sending message in campaign %s: %v", - msg.Campaign.Name, err) + err := m.messengers[msg.Campaign.MessengerID].Push( + msg.from, []string{msg.to}, msg.Campaign.Subject, msg.Body, nil) + if err != nil { + m.logger.Printf("error sending message in campaign %s: %v", msg.Campaign.Name, err) - select { - case m.msgErrorQueue <- msgError{camp: msg.Campaign, err: err}: - default: + select { + case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}: + default: + } + } + + // Arbitrary message. + case msg := <-m.msgQueue: + err := m.messengers[msg.Messenger].Push( + msg.From, msg.To, msg.Subject, msg.Body, nil) + if err != nil { + m.logger.Printf("error sending message '%s': %v", msg.Subject, err) } } } @@ -256,17 +290,17 @@ func (m *Manager) SpawnWorkers() { // compiled campaign templates. func (m *Manager) TemplateFuncs(c *models.Campaign) template.FuncMap { return template.FuncMap{ - "TrackLink": func(url string, msg *Message) string { + "TrackLink": func(url string, msg *CampaignMessage) string { return m.trackLink(url, msg.Campaign.UUID, msg.Subscriber.UUID) }, - "TrackView": func(msg *Message) template.HTML { + "TrackView": func(msg *CampaignMessage) template.HTML { return template.HTML(fmt.Sprintf(``, fmt.Sprintf(m.cfg.ViewTrackURL, msg.Campaign.UUID, msg.Subscriber.UUID))) }, - "UnsubscribeURL": func(msg *Message) string { + "UnsubscribeURL": func(msg *CampaignMessage) string { return msg.unsubURL }, - "OptinURL": func(msg *Message) string { + "OptinURL": func(msg *CampaignMessage) string { // Add list IDs. // TODO: Show private lists list on optin e-mail return fmt.Sprintf(m.cfg.OptinURL, msg.Subscriber.UUID, "") @@ -294,17 +328,21 @@ func (m *Manager) addCampaign(c *models.Campaign) error { } // Add the campaign to the active map. + m.campsMutex.Lock() m.camps[c.ID] = c + m.campsMutex.Unlock() return nil } // getPendingCampaignIDs returns the IDs of campaigns currently being processed. func (m *Manager) getPendingCampaignIDs() []int64 { // Needs to return an empty slice in case there are no campaigns. - ids := make([]int64, 0) + m.campsMutex.RLock() + ids := make([]int64, 0, len(m.camps)) for _, c := range m.camps { ids = append(ids, int64(c.ID)) } + m.campsMutex.RUnlock() return ids } @@ -326,7 +364,7 @@ func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, erro // Push messages. for _, s := range subs { - msg := m.NewMessage(c, s) + msg := m.NewCampaignMessage(c, s) if err := msg.Render(); err != nil { m.logger.Printf("error rendering message (%s) (%s): %v", c.Name, s.Email, err) continue @@ -334,7 +372,7 @@ func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, erro // Push the message to the queue while blocking and waiting until // the queue is drained. - m.msgQueue <- msg + m.campMsgQueue <- msg } return true, nil @@ -342,12 +380,16 @@ func (m *Manager) nextSubscribers(c *models.Campaign, batchSize int) (bool, erro // isCampaignProcessing checks if the campaign is bing processed. func (m *Manager) isCampaignProcessing(id int) bool { + m.campsMutex.RLock() _, ok := m.camps[id] + m.campsMutex.RUnlock() return ok } func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Campaign, error) { + m.campsMutex.Lock() delete(m.camps, c.ID) + m.campsMutex.Unlock() // A status has been passed. Change the campaign's status // without further checks. @@ -420,13 +462,12 @@ func (m *Manager) sendNotif(c *models.Campaign, status, reason string) error { "Reason": reason, } ) - return m.notifCB(subject, data) } // Render takes a Message, executes its pre-compiled Campaign.Tpl // and applies the resultant bytes to Message.body to be used in messages. -func (m *Message) Render() error { +func (m *CampaignMessage) Render() error { out := bytes.Buffer{} if err := m.Campaign.Tpl.ExecuteTemplate(&out, models.BaseTpl, m); err != nil { return err diff --git a/main.go b/main.go index a3cb4be3..eadb695e 100644 --- a/main.go +++ b/main.go @@ -138,7 +138,8 @@ func main() { app.messenger = initMessengers(app.manager) app.notifTpls = initNotifTemplates("/email-templates/*.html", fs, app.constants) - // Start the campaign workers. + // Start the campaign workers. The campaign batches (fetch from DB, push out + // messages) get processed at the specified interval. go app.manager.Run(time.Second * 5) app.manager.SpawnWorkers() diff --git a/notifications.go b/notifications.go index 9b70daa1..a74d0416 100644 --- a/notifications.go +++ b/notifications.go @@ -2,6 +2,8 @@ package main import ( "bytes" + + "github.com/knadh/listmonk/internal/manager" ) const ( @@ -19,18 +21,20 @@ type notifData struct { } // sendNotification sends out an e-mail notification to admins. -func sendNotification(toEmails []string, subject, tplName string, data interface{}, app *App) error { +func (app *App) sendNotification(toEmails []string, subject, tplName string, data interface{}) error { var b bytes.Buffer if err := app.notifTpls.ExecuteTemplate(&b, tplName, data); err != nil { app.log.Printf("error compiling notification template '%s': %v", tplName, err) return err } - err := app.messenger.Push(app.constants.FromEmail, - toEmails, - subject, - b.Bytes(), - nil) + err := app.manager.PushMessage(manager.Message{ + From: app.constants.FromEmail, + To: toEmails, + Subject: subject, + Body: b.Bytes(), + Messenger: "email", + }) if err != nil { app.log.Printf("error sending admin notification (%s): %v", subject, err) return err diff --git a/subscribers.go b/subscribers.go index 3d7032e1..ed8b687f 100644 --- a/subscribers.go +++ b/subscribers.go @@ -12,8 +12,8 @@ import ( "github.com/asaskevich/govalidator" "github.com/gofrs/uuid" - "github.com/knadh/listmonk/models" "github.com/knadh/listmonk/internal/subimporter" + "github.com/knadh/listmonk/models" "github.com/labstack/echo" "github.com/lib/pq" ) @@ -181,7 +181,7 @@ func handleCreateSubscriber(c echo.Context) error { // If the lists are double-optins, send confirmation e-mails. // Todo: This arbitrary goroutine should be moved to a centralised pool. - go sendOptinConfirmation(req.Subscriber, []int64(req.Lists), app) + _ = sendOptinConfirmation(req.Subscriber, []int64(req.Lists), app) // Hand over to the GET handler to return the last insertion. c.SetParamNames("id") @@ -536,7 +536,7 @@ func insertSubscriber(req subimporter.SubReq, app *App) (int, error) { // If the lists are double-optins, send confirmation e-mails. // Todo: This arbitrary goroutine should be moved to a centralised pool. - go sendOptinConfirmation(req.Subscriber, []int64(req.Lists), app) + sendOptinConfirmation(req.Subscriber, []int64(req.Lists), app) return req.ID, nil } @@ -613,13 +613,11 @@ func sendOptinConfirmation(sub models.Subscriber, listIDs []int64, app *App) err out.OptinURL = fmt.Sprintf(app.constants.OptinURL, sub.UUID, qListIDs.Encode()) // Send the e-mail. - if err := sendNotification([]string{sub.Email}, - "Confirm subscription", - notifSubscriberOptin, out, app); err != nil { + if err := app.sendNotification([]string{sub.Email}, + "Confirm subscription", notifSubscriberOptin, out); err != nil { app.log.Printf("error e-mailing subscriber profile: %s", err) return err } - return nil } diff --git a/templates.go b/templates.go index cf6a6103..31d7d71d 100644 --- a/templates.go +++ b/templates.go @@ -114,7 +114,7 @@ func handlePreviewTemplate(c echo.Context) error { } // Render the message body. - m := app.manager.NewMessage(&camp, dummySubscriber) + m := app.manager.NewCampaignMessage(&camp, dummySubscriber) if err := m.Render(); err != nil { return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Error rendering message: %v", err))