mirror of
https://github.com/knadh/listmonk.git
synced 2026-01-20 16:23:07 +08:00
This has been a hair-pulling rabbit hole of an issue. #1931 and others. When the `next-campaign-subscribers` query that fetches $n subscribers per batch for a campaign returns no results, the manager assumes that the campaign is done and marks as finished. Marathon debugging revealed fundamental flaws in qyery's logic that would incorrectly return 0 rows under certain conditions. - Based on the "layout" of subscribers for eg: a series of blocklisted subscribers between confirmed subscribers. A series of unconfirmed subscribers in a batch belonging to a double opt-in list. - Bulk import blocklisting users, but not marking their subscriptions as 'unsubscribed'. - Conditions spread across multiple CTEs resulted in returning an arbitrary number of rows and $N per batch as the selected $N rows would get filtered out elsewhere, possibly even becoming 0. After fixing this and testing it on our prod instance that has 15 million subscribers and ~70 million subscriptions in the `subscriber_lists` table, ended up discovered significant inefficiences in Postgres query planning. When `subscriber_lists` and campaign list IDs are joined dynamically (CTE or ANY() or any kind of JOIN that involves) a query, the Postgres query planner is unable to use the right indexes. After testing dozens of approaches, discovered that statically passing the values to join on (hardcoding or passing via parametrized $1 vars), the query uses the right indexes. The difference is staggering. For the particular scenario on our large prod DB to pull a batch, ~15 seconds vs. ~50ms, a whopping 300x improvement! This patch splits `next-campaign-subscribers` into two separate queries, one which fetches campaign metadata and list_ids, whose values are then passed statically to the next query to fetch subscribers by batch. In addition, it fixes and refactors broken filtering and counting logic in `create-campaign` and `next-campaign` queries. Closes #1931, #1993, #1986.
153 lines
4.3 KiB
Go
153 lines
4.3 KiB
Go
package main
|
|
|
|
import (
|
|
"net/http"
|
|
|
|
"github.com/gofrs/uuid/v5"
|
|
"github.com/knadh/listmonk/internal/core"
|
|
"github.com/knadh/listmonk/internal/manager"
|
|
"github.com/knadh/listmonk/internal/media"
|
|
"github.com/knadh/listmonk/models"
|
|
"github.com/lib/pq"
|
|
)
|
|
|
|
// store implements DataSource over the primary
|
|
// database.
|
|
type store struct {
|
|
queries *models.Queries
|
|
core *core.Core
|
|
media media.Store
|
|
h *http.Client
|
|
}
|
|
|
|
type runningCamp struct {
|
|
CampaignID int `db:"campaign_id"`
|
|
CampaignType string `db:"campaign_type"`
|
|
LastSubscriberID int `db:"last_subscriber_id"`
|
|
MaxSubscriberID int `db:"max_subscriber_id"`
|
|
ListID int `db:"list_id"`
|
|
}
|
|
|
|
func newManagerStore(q *models.Queries, c *core.Core, m media.Store) *store {
|
|
return &store{
|
|
queries: q,
|
|
core: c,
|
|
media: m,
|
|
}
|
|
}
|
|
|
|
// NextCampaigns retrieves active campaigns ready to be processed excluding
|
|
// campaigns that are also being processed. Additionally, it takes a map of campaignID:sentCount
|
|
// of campaigns that are being processed and updates them in the DB.
|
|
func (s *store) NextCampaigns(currentIDs []int64, sentCounts []int64) ([]*models.Campaign, error) {
|
|
var out []*models.Campaign
|
|
err := s.queries.NextCampaigns.Select(&out, pq.Int64Array(currentIDs), pq.Int64Array(sentCounts))
|
|
return out, err
|
|
}
|
|
|
|
// NextSubscribers retrieves a subset of subscribers of a given campaign.
|
|
// Since batches are processed sequentially, the retrieval is ordered by ID,
|
|
// and every batch takes the last ID of the last batch and fetches the next
|
|
// batch above that.
|
|
func (s *store) NextSubscribers(campID, limit int) ([]models.Subscriber, error) {
|
|
var camps []runningCamp
|
|
if err := s.queries.GetRunningCampaign.Select(&camps, campID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var listIDs []int
|
|
for _, c := range camps {
|
|
listIDs = append(listIDs, c.ListID)
|
|
}
|
|
|
|
if len(listIDs) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
var out []models.Subscriber
|
|
err := s.queries.NextCampaignSubscribers.Select(&out, camps[0].CampaignID, camps[0].CampaignType, camps[0].LastSubscriberID, camps[0].MaxSubscriberID, pq.Array(listIDs), limit)
|
|
return out, err
|
|
}
|
|
|
|
// GetCampaign fetches a campaign from the database.
|
|
func (s *store) GetCampaign(campID int) (*models.Campaign, error) {
|
|
var out = &models.Campaign{}
|
|
err := s.queries.GetCampaign.Get(out, campID, nil, nil, "default")
|
|
return out, err
|
|
}
|
|
|
|
// UpdateCampaignStatus updates a campaign's status.
|
|
func (s *store) UpdateCampaignStatus(campID int, status string) error {
|
|
_, err := s.queries.UpdateCampaignStatus.Exec(campID, status)
|
|
return err
|
|
}
|
|
|
|
// UpdateCampaignCounts updates a campaign's status.
|
|
func (s *store) UpdateCampaignCounts(campID int, toSend int, sent int, lastSubID int) error {
|
|
_, err := s.queries.UpdateCampaignCounts.Exec(campID, toSend, sent, lastSubID)
|
|
return err
|
|
}
|
|
|
|
// GetAttachment fetches a media attachment blob.
|
|
func (s *store) GetAttachment(mediaID int) (models.Attachment, error) {
|
|
m, err := s.core.GetMedia(mediaID, "", s.media)
|
|
if err != nil {
|
|
return models.Attachment{}, err
|
|
}
|
|
|
|
b, err := s.media.GetBlob(m.URL)
|
|
if err != nil {
|
|
return models.Attachment{}, err
|
|
}
|
|
|
|
return models.Attachment{
|
|
Name: m.Filename,
|
|
Content: b,
|
|
Header: manager.MakeAttachmentHeader(m.Filename, "base64", m.ContentType),
|
|
}, nil
|
|
}
|
|
|
|
// CreateLink registers a URL with a UUID for tracking clicks and returns the UUID.
|
|
func (s *store) CreateLink(url string) (string, error) {
|
|
// Create a new UUID for the URL. If the URL already exists in the DB
|
|
// the UUID in the database is returned.
|
|
uu, err := uuid.NewV4()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
var out string
|
|
if err := s.queries.CreateLink.Get(&out, uu, url); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// RecordBounce records a bounce event and returns the bounce count.
|
|
func (s *store) RecordBounce(b models.Bounce) (int64, int, error) {
|
|
var res = struct {
|
|
SubscriberID int64 `db:"subscriber_id"`
|
|
Num int `db:"num"`
|
|
}{}
|
|
|
|
err := s.queries.UpdateCampaignStatus.Select(&res,
|
|
b.SubscriberUUID,
|
|
b.Email,
|
|
b.CampaignUUID,
|
|
b.Type,
|
|
b.Source,
|
|
b.Meta)
|
|
|
|
return res.SubscriberID, res.Num, err
|
|
}
|
|
|
|
func (s *store) BlocklistSubscriber(id int64) error {
|
|
_, err := s.queries.BlocklistSubscribers.Exec(pq.Int64Array{id})
|
|
return err
|
|
}
|
|
|
|
func (s *store) DeleteSubscriber(id int64) error {
|
|
_, err := s.queries.DeleteSubscribers.Exec(pq.Int64Array{id})
|
|
return err
|
|
}
|