From 0f6a0376dafd4a2fb89d05c4058258915f375de0 Mon Sep 17 00:00:00 2001 From: Kailash Nadh Date: Sun, 6 Feb 2022 11:38:02 +0530 Subject: [PATCH] Add accurate realtime message rate counter. The `rate` field `/api/campaigns/running/stats` returned was computed based on the total time spent from the start of the campaign to the current time. This meant that for large campaigns, if there were pauses or slowdowns in between, the rate would be skewed heavily making it useless to figure out the current send rate. This commit introduces a realtime running rate counter in the campaign manager that returns accurate (running) send rates for the last minute. The `rate` field in the API now shows the live running rate and a new `net_rate` field shows the rate from the beginning of the campaign. --- cmd/campaigns.go | 27 ++++++++++++++----------- frontend/src/views/Campaigns.vue | 11 ++++++++--- go.mod | 1 + go.sum | 4 ++-- i18n/cs-cz.json | 1 + i18n/de.json | 1 + i18n/en.json | 1 + i18n/es.json | 1 + i18n/fr.json | 1 + i18n/hu.json | 1 + i18n/it.json | 1 + i18n/ml.json | 1 + i18n/nl.json | 1 + i18n/pl.json | 1 + i18n/pt-BR.json | 1 + i18n/pt.json | 1 + i18n/ro.json | 1 + i18n/ru.json | 1 + i18n/tr.json | 1 + internal/manager/manager.go | 34 ++++++++++++++++++++++++++++++-- 20 files changed, 74 insertions(+), 18 deletions(-) diff --git a/cmd/campaigns.go b/cmd/campaigns.go index a494a128..8c8686c3 100644 --- a/cmd/campaigns.go +++ b/cmd/campaigns.go @@ -67,7 +67,8 @@ type campaignStats struct { Sent int `db:"sent" json:"sent"` Started null.Time `db:"started_at" json:"started_at"` UpdatedAt null.Time `db:"updated_at" json:"updated_at"` - Rate float64 `json:"rate"` + Rate int `json:"rate"` + NetRate int `json:"net_rate"` } type campsWrap struct { @@ -522,17 +523,21 @@ func handleGetRunningCampaignStats(c echo.Context) error { // Compute rate. for i, c := range out { if c.Started.Valid && c.UpdatedAt.Valid { - diff := c.UpdatedAt.Time.Sub(c.Started.Time).Minutes() - if diff > 0 { - var ( - sent = float64(c.Sent) - rate = sent / diff - ) - if rate > sent || rate > float64(c.ToSend) { - rate = sent - } - out[i].Rate = rate + diff := int(c.UpdatedAt.Time.Sub(c.Started.Time).Minutes()) + if diff < 1 { + diff = 1 } + + rate := c.Sent / diff + if rate > c.Sent || rate > c.ToSend { + rate = c.Sent + } + + // Rate since the starting of the campaign. + out[i].NetRate = rate + + // Realtime running rate over the last minute. + out[i].Rate = app.manager.GetCampaignStats(c.ID).SendRate } } diff --git a/frontend/src/views/Campaigns.vue b/frontend/src/views/Campaigns.vue index f6c97c0c..cb886a15 100644 --- a/frontend/src/views/Campaigns.vue +++ b/frontend/src/views/Campaigns.vue @@ -110,7 +110,7 @@ {{ $utils.niceDate(stats.updatedAt, true) }}

+ class="is-capitalized"> {{ $utils.duration(stats.startedAt, stats.updatedAt) }}

@@ -142,10 +142,15 @@

-

+

- {{ stats.rate.toFixed(0) }} / min + + {{ stats.rate.toFixed(0) }} / {{ $t('campaigns.rateMinuteShort') }} +

diff --git a/go.mod b/go.mod index 29ff7e7f..f35992f8 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/mattn/go-colorable v0.1.12 // indirect github.com/mitchellh/mapstructure v1.4.2 // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/paulbellamy/ratecounter v0.2.0 // indirect github.com/pelletier/go-toml v1.9.4 // indirect github.com/rhnvrm/simples3 v0.8.1 github.com/spf13/cast v1.4.1 // indirect diff --git a/go.sum b/go.sum index c4f6427a..b99419f7 100644 --- a/go.sum +++ b/go.sum @@ -134,6 +134,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs= +github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= @@ -142,8 +144,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= -github.com/rhnvrm/simples3 v0.8.0 h1:SAjJtsqObltKkejIGl3WgyySq2xdrfwZWXi6njFluuA= -github.com/rhnvrm/simples3 v0.8.0/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= github.com/rhnvrm/simples3 v0.8.1 h1:jL2yCi9P0pA8hFYkyVWZ4cs5RX3AMgcVsXTOqnCj0/w= github.com/rhnvrm/simples3 v0.8.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= diff --git a/i18n/cs-cz.json b/i18n/cs-cz.json index 0b38b056..945d6d7e 100644 --- a/i18n/cs-cz.json +++ b/i18n/cs-cz.json @@ -57,6 +57,7 @@ "campaigns.preview": "Náhled", "campaigns.progress": "Průběh", "campaigns.queryPlaceholder": "Jméno nebo předmět", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "Prvotní HTML", "campaigns.removeAltText": "Odebrat alternativní zprávu ve formátu prostého textu", "campaigns.richText": "Formátovaný text", diff --git a/i18n/de.json b/i18n/de.json index 0af346d3..2e4f9c65 100644 --- a/i18n/de.json +++ b/i18n/de.json @@ -57,6 +57,7 @@ "campaigns.preview": "Vorschau", "campaigns.progress": "Fortschritt", "campaigns.queryPlaceholder": "Name oder Betreff", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "HTML Code", "campaigns.removeAltText": "Lösche den alternativen unformatierten Text", "campaigns.richText": "Rich-Text", diff --git a/i18n/en.json b/i18n/en.json index 93bc2132..419a8c69 100644 --- a/i18n/en.json +++ b/i18n/en.json @@ -57,6 +57,7 @@ "campaigns.preview": "Preview", "campaigns.progress": "Progress", "campaigns.queryPlaceholder": "Name or subject", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "Raw HTML", "campaigns.removeAltText": "Remove alternate plain text message", "campaigns.richText": "Rich text", diff --git a/i18n/es.json b/i18n/es.json index 83e581c7..69329bd7 100644 --- a/i18n/es.json +++ b/i18n/es.json @@ -57,6 +57,7 @@ "campaigns.preview": "Vista previa", "campaigns.progress": "Progreso", "campaigns.queryPlaceholder": "Nombre o asunto", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "HTML crudo", "campaigns.removeAltText": "Eliminar mensaje en texto plano alternativo", "campaigns.richText": "Texto enriquecido", diff --git a/i18n/fr.json b/i18n/fr.json index d501f445..82223fa7 100644 --- a/i18n/fr.json +++ b/i18n/fr.json @@ -57,6 +57,7 @@ "campaigns.preview": "Aperçu", "campaigns.progress": "Avancement", "campaigns.queryPlaceholder": "Nom ou objet", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "HTML brut", "campaigns.removeAltText": "Supprimer le message alternatif en texte brut", "campaigns.richText": "Texte riche", diff --git a/i18n/hu.json b/i18n/hu.json index 8e78cf11..6a1be2b5 100644 --- a/i18n/hu.json +++ b/i18n/hu.json @@ -57,6 +57,7 @@ "campaigns.preview": "Előnézet", "campaigns.progress": "Folyamatban", "campaigns.queryPlaceholder": "Név vagy tárgy", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "Nyers (Raw) HTML", "campaigns.removeAltText": "Alternatív egyszerű szöveges üzenet eltávolítása", "campaigns.richText": "Rich text", diff --git a/i18n/it.json b/i18n/it.json index 95421257..16f01341 100644 --- a/i18n/it.json +++ b/i18n/it.json @@ -57,6 +57,7 @@ "campaigns.preview": "Anteprima", "campaigns.progress": "Avanzamento", "campaigns.queryPlaceholder": "Nome o oggetto", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "HTML semplice", "campaigns.removeAltText": "Cancellare il messaggio sostitutivo in testo semplice", "campaigns.richText": "Testo formattato", diff --git a/i18n/ml.json b/i18n/ml.json index d07fb616..5fd349f1 100644 --- a/i18n/ml.json +++ b/i18n/ml.json @@ -57,6 +57,7 @@ "campaigns.preview": "പ്രിവ്യൂ", "campaigns.progress": "പുരോഗതി", "campaigns.queryPlaceholder": "പേരോ വിഷയമോ", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "അസംസ്കൃത എച്. ടി. എം. എൽ", "campaigns.removeAltText": "Remove alternate plain text message", "campaigns.richText": "റിച്ച് ടെക്സ്റ്റ്", diff --git a/i18n/nl.json b/i18n/nl.json index 8d762405..3611cc19 100644 --- a/i18n/nl.json +++ b/i18n/nl.json @@ -57,6 +57,7 @@ "campaigns.preview": "Voorbeeld", "campaigns.progress": "Voortgang", "campaigns.queryPlaceholder": "Naam of onderwerp", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "HTML code", "campaigns.removeAltText": "Verwijder plain text bericht", "campaigns.richText": "Rich text", diff --git a/i18n/pl.json b/i18n/pl.json index bc5cb62f..addef714 100644 --- a/i18n/pl.json +++ b/i18n/pl.json @@ -57,6 +57,7 @@ "campaigns.preview": "Podgląd", "campaigns.progress": "Postęp", "campaigns.queryPlaceholder": "Nazwa lub temat", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "Raw HTML", "campaigns.removeAltText": "Usuń alternatywną treść typu plain text", "campaigns.richText": "Wzbogacony format tekstowy (Rich text)", diff --git a/i18n/pt-BR.json b/i18n/pt-BR.json index aad638ea..8ac7be03 100644 --- a/i18n/pt-BR.json +++ b/i18n/pt-BR.json @@ -57,6 +57,7 @@ "campaigns.preview": "Pré-visualizar", "campaigns.progress": "Progresso", "campaigns.queryPlaceholder": "Nome ou assunto", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "Código HTML", "campaigns.removeAltText": "Remover mensagem alternativa em texto simples", "campaigns.richText": "Texto com formatação", diff --git a/i18n/pt.json b/i18n/pt.json index a4101d52..55f56fd6 100644 --- a/i18n/pt.json +++ b/i18n/pt.json @@ -57,6 +57,7 @@ "campaigns.preview": "Pré-visualizar", "campaigns.progress": "Progresso", "campaigns.queryPlaceholder": "Nome ou assunto", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "HTML simples", "campaigns.removeAltText": "Remover mensagem alternativa em texto simples", "campaigns.richText": "Texto rico", diff --git a/i18n/ro.json b/i18n/ro.json index 76c9642f..ad4d77c0 100644 --- a/i18n/ro.json +++ b/i18n/ro.json @@ -57,6 +57,7 @@ "campaigns.preview": "Previzualizare", "campaigns.progress": "Progres", "campaigns.queryPlaceholder": "Numele sau subiectul", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "HTML brut", "campaigns.removeAltText": "Eliminați un mesaj text alternativ", "campaigns.richText": "Text îmbogățit", diff --git a/i18n/ru.json b/i18n/ru.json index 482401a0..2fb613b0 100644 --- a/i18n/ru.json +++ b/i18n/ru.json @@ -57,6 +57,7 @@ "campaigns.preview": "Предпросмотр", "campaigns.progress": "Прогресс", "campaigns.queryPlaceholder": "Имя темы", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "Необработанный HTML", "campaigns.removeAltText": "Удалить альтернативное простое текстовое сообщение", "campaigns.richText": "Форматированный текст", diff --git a/i18n/tr.json b/i18n/tr.json index d4b11dfb..e8e0cff8 100644 --- a/i18n/tr.json +++ b/i18n/tr.json @@ -57,6 +57,7 @@ "campaigns.preview": "Önizleme", "campaigns.progress": "İlerleme durumu", "campaigns.queryPlaceholder": "İsim veya konu", + "campaigns.rateMinuteShort": "min", "campaigns.rawHTML": "Ham HTML", "campaigns.removeAltText": "Alternatif düz yazıyı kaldır", "campaigns.richText": "Zengin metin", diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 024048c1..059296c9 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -15,6 +15,7 @@ import ( "github.com/knadh/listmonk/internal/i18n" "github.com/knadh/listmonk/internal/messenger" "github.com/knadh/listmonk/models" + "github.com/paulbellamy/ratecounter" ) const ( @@ -39,6 +40,11 @@ type Store interface { DeleteSubscriber(id int64) error } +// CampStats contains campaign stats like per minute send rate. +type CampStats struct { + SendRate int +} + // Manager handles the scheduling, processing, and queuing of campaigns // and message pushes. type Manager struct { @@ -50,8 +56,9 @@ type Manager struct { logger *log.Logger // Campaigns that are currently running. - camps map[int]*models.Campaign - campsMut sync.RWMutex + camps map[int]*models.Campaign + campRates map[int]*ratecounter.RateCounter + campsMut sync.RWMutex // Links generated using Track() are cached here so as to not query // the database for the link UUID for every message sent. This has to @@ -153,6 +160,7 @@ func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18 logger: l, messengers: make(map[string]messenger.Messenger), camps: make(map[int]*models.Campaign), + campRates: make(map[int]*ratecounter.RateCounter), links: make(map[string]string), subFetchQueue: make(chan *models.Campaign, cfg.Concurrency), campMsgQueue: make(chan CampaignMessage, cfg.Concurrency*2), @@ -237,6 +245,19 @@ func (m *Manager) HasRunningCampaigns() bool { return len(m.camps) > 0 } +// GetCampaignStats returns campaign statistics. +func (m *Manager) GetCampaignStats(id int) CampStats { + n := 0 + + m.campsMut.Lock() + if r, ok := m.campRates[id]; ok { + n = int(r.Rate()) + } + m.campsMut.Unlock() + + return CampStats{SendRate: n} +} + // Run is a blocking function (that should be invoked as a goroutine) // that scans the data source at regular intervals for pending campaigns, // and queues them for processing. The process queue fetches batches of @@ -337,9 +358,16 @@ func (m *Manager) worker() { select { case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}: default: + continue } } + m.campsMut.Lock() + if r, ok := m.campRates[msg.Campaign.ID]; ok { + r.Incr(1) + } + m.campsMut.Unlock() + // Arbitrary message. case msg, ok := <-m.msgQueue: if !ok { @@ -497,6 +525,7 @@ func (m *Manager) addCampaign(c *models.Campaign) error { // Add the campaign to the active map. m.campsMut.Lock() m.camps[c.ID] = c + m.campRates[c.ID] = ratecounter.NewRateCounter(time.Minute) m.campsMut.Unlock() return nil } @@ -589,6 +618,7 @@ func (m *Manager) isCampaignProcessing(id int) bool { func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Campaign, error) { m.campsMut.Lock() delete(m.camps, c.ID) + delete(m.campRates, c.ID) m.campsMut.Unlock() // A status has been passed. Change the campaign's status