Remove obsolete bounce routines from manager package.

This commit is contained in:
Kailash Nadh 2021-09-26 18:57:25 +05:30
parent 4056187fec
commit 3ffd88f0df

View file

@ -21,9 +21,6 @@ const (
// BaseTPL is the name of the base template.
BaseTPL = "base"
BounceTypeBlocklist = "blocklist"
BounceTypeDelete = "delete"
// ContentTpl is the name of the compiled message.
ContentTpl = "content"
@ -38,11 +35,6 @@ type Store interface {
GetCampaign(campID int) (*models.Campaign, error)
UpdateCampaignStatus(campID int, status string) error
CreateLink(url string) (string, error)
// RecordBounce records an external bounce event identified by
// a user's UUID/e-mail and a campaign UUID.
RecordBounce(b models.Bounce) (int64, int, error)
BlocklistSubscriber(id int64) error
DeleteSubscriber(id int64) error
}
@ -72,7 +64,6 @@ type Manager struct {
campMsgErrorQueue chan msgError
campMsgErrorCounts map[int]int
msgQueue chan Message
bounceQueue chan models.Bounce
// Sliding window keeps track of the total number of messages sent in a period
// and on reaching the specified limit, waits until the window is over before
@ -124,8 +115,6 @@ type Config struct {
MessageURL string
ViewTrackURL string
UnsubHeader bool
BounceCount int
BounceAction string
}
type msgError struct {
@ -159,7 +148,6 @@ func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18
subFetchQueue: make(chan *models.Campaign, cfg.Concurrency),
campMsgQueue: make(chan CampaignMessage, cfg.Concurrency*2),
msgQueue: make(chan Message, cfg.Concurrency),
bounceQueue: make(chan models.Bounce, cfg.Concurrency),
campMsgErrorQueue: make(chan msgError, cfg.MaxSendErrors),
campMsgErrorCounts: make(map[int]int),
slidingWindowStart: time.Now(),
@ -240,20 +228,6 @@ func (m *Manager) HasRunningCampaigns() bool {
return len(m.camps) > 0
}
// PushBounce records a bounce event.
func (m *Manager) PushBounce(b models.Bounce) error {
t := time.NewTicker(pushTimeout)
defer t.Stop()
select {
case m.bounceQueue <- b:
case <-t.C:
m.logger.Printf("bounce pushed timed out: %s / %s", b.SubscriberUUID, b.Email)
return errors.New("bounce push timed out")
}
return nil
}
// Run is a blocking function (that should be invoked as a goroutine)
// that scans the data source at regular intervals for pending campaigns,
// and queues them for processing. The process queue fetches batches of
@ -292,7 +266,7 @@ func (m *Manager) Run(tick time.Duration) {
}
}
// worker is a blocking function that perpetually listents to events (message, bounce) on different
// worker is a blocking function that perpetually listents to events (message) on different
// queues and processes them.
func (m *Manager) worker() {
// Counter to keep track of the message / sec rate limit.
@ -365,30 +339,6 @@ func (m *Manager) worker() {
if err != nil {
m.logger.Printf("error sending message '%s': %v", msg.Subject, err)
}
// Bounce event.
case b, ok := <-m.bounceQueue:
if !ok {
return
}
subID, count, err := m.store.RecordBounce(b)
if err != nil {
m.logger.Printf("error recording bounce %s / %s", b.SubscriberUUID, b.Email)
}
if count >= m.cfg.BounceCount {
switch m.cfg.BounceAction {
case BounceTypeBlocklist:
err = m.store.BlocklistSubscriber(subID)
case BounceTypeDelete:
err = m.store.DeleteSubscriber(subID)
}
if err != nil {
m.logger.Printf("error executing bounce for subscriber: %s", b.SubscriberUUID)
}
}
}
}
}