package main import ( "net/http" "github.com/gofrs/uuid/v5" "github.com/knadh/listmonk/internal/core" "github.com/knadh/listmonk/internal/manager" "github.com/knadh/listmonk/internal/media" "github.com/knadh/listmonk/models" "github.com/lib/pq" ) // store implements DataSource over the primary // database. type store struct { queries *models.Queries core *core.Core media media.Store h *http.Client } func newManagerStore(q *models.Queries, c *core.Core, m media.Store) *store { return &store{ queries: q, core: c, media: m, } } // NextCampaigns retrieves active campaigns ready to be processed excluding // campaigns that are also being processed. Additionally, it takes a map of campaignID:sentCount // of campaigns that are being processed and updates them in the DB. func (s *store) NextCampaigns(currentIDs []int64, sentCounts []int64) ([]*models.Campaign, error) { var out []*models.Campaign err := s.queries.NextCampaigns.Select(&out, pq.Int64Array(currentIDs), pq.Int64Array(sentCounts)) return out, err } // NextSubscribers retrieves a subset of subscribers of a given campaign. // Since batches are processed sequentially, the retrieval is ordered by ID, // and every batch takes the last ID of the last batch and fetches the next // batch above that. func (s *store) NextSubscribers(campID, limit int) ([]models.Subscriber, error) { var out []models.Subscriber err := s.queries.NextCampaignSubscribers.Select(&out, campID, limit) return out, err } // GetCampaign fetches a campaign from the database. func (s *store) GetCampaign(campID int) (*models.Campaign, error) { var out = &models.Campaign{} err := s.queries.GetCampaign.Get(out, campID, nil, nil, "default") return out, err } // UpdateCampaignStatus updates a campaign's status. func (s *store) UpdateCampaignStatus(campID int, status string) error { _, err := s.queries.UpdateCampaignStatus.Exec(campID, status) return err } // UpdateCampaignCounts updates a campaign's status. func (s *store) UpdateCampaignCounts(campID int, toSend int, sent int, lastSubID int) error { _, err := s.queries.UpdateCampaignCounts.Exec(campID, toSend, sent, lastSubID) return err } // GetAttachment fetches a media attachment blob. func (s *store) GetAttachment(mediaID int) (models.Attachment, error) { m, err := s.core.GetMedia(mediaID, "", s.media) if err != nil { return models.Attachment{}, err } b, err := s.media.GetBlob(m.URL) if err != nil { return models.Attachment{}, err } return models.Attachment{ Name: m.Filename, Content: b, Header: manager.MakeAttachmentHeader(m.Filename, "base64", m.ContentType), }, nil } // CreateLink registers a URL with a UUID for tracking clicks and returns the UUID. func (s *store) CreateLink(url string) (string, error) { // Create a new UUID for the URL. If the URL already exists in the DB // the UUID in the database is returned. uu, err := uuid.NewV4() if err != nil { return "", err } var out string if err := s.queries.CreateLink.Get(&out, uu, url); err != nil { return "", err } return out, nil } // RecordBounce records a bounce event and returns the bounce count. func (s *store) RecordBounce(b models.Bounce) (int64, int, error) { var res = struct { SubscriberID int64 `db:"subscriber_id"` Num int `db:"num"` }{} err := s.queries.UpdateCampaignStatus.Select(&res, b.SubscriberUUID, b.Email, b.CampaignUUID, b.Type, b.Source, b.Meta) return res.SubscriberID, res.Num, err } func (s *store) BlocklistSubscriber(id int64) error { _, err := s.queries.BlocklistSubscribers.Exec(pq.Int64Array{id}) return err } func (s *store) DeleteSubscriber(id int64) error { _, err := s.queries.DeleteSubscribers.Exec(pq.Int64Array{id}) return err }