listmonk/internal/core/subscribers.go

648 lines
22 KiB
Go

package core
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/gofrs/uuid/v5"
"github.com/jmoiron/sqlx"
"github.com/knadh/listmonk/internal/auth"
"github.com/knadh/listmonk/models"
"github.com/labstack/echo/v4"
"github.com/lib/pq"
)
var (
allowedSubQueryTables = map[string]struct{}{
"subscribers": {},
"lists": {},
"subscribers_lists": {},
"campaigns": {},
"campaign_lists": {},
"campaign_views": {},
"links": {},
"link_clicks": {},
"bounces": {},
}
)
// GetSubscriber fetches a subscriber by one of the given params.
func (c *Core) GetSubscriber(id int, uuid, email string) (models.Subscriber, error) {
var uu any
if uuid != "" {
uu = uuid
}
var out models.Subscribers
if err := c.q.GetSubscriber.Select(&out, id, uu, email); err != nil {
c.log.Printf("error fetching subscriber: %v", err)
return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching",
"name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
}
if len(out) == 0 {
return models.Subscriber{}, echo.NewHTTPError(http.StatusBadRequest,
c.i18n.Ts("globals.messages.notFound", "name",
fmt.Sprintf("{globals.terms.subscriber} (%d: %s%s)", id, uuid, email)))
}
if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
c.log.Printf("error loading subscriber lists: %v", err)
return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching",
"name", "{globals.terms.lists}", "error", pqErrMsg(err)))
}
return out[0], nil
}
// HasSubscriberLists checks if the given subscribers have at least one of the given lists.
func (c *Core) HasSubscriberLists(subIDs []int, listIDs []int) (map[int]bool, error) {
res := []struct {
SubID int `db:"subscriber_id"`
Has bool `db:"has"`
}{}
if err := c.q.HasSubscriberLists.Select(&res, pq.Array(subIDs), pq.Array(listIDs)); err != nil {
c.log.Printf("error fetching subscriber: %v", err)
return nil, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
}
out := make(map[int]bool, len(res))
for _, r := range res {
out[r.SubID] = r.Has
}
return out, nil
}
// GetSubscribersByEmail fetches a subscriber by one of the given params.
func (c *Core) GetSubscribersByEmail(emails []string) (models.Subscribers, error) {
var out models.Subscribers
if err := c.q.GetSubscribersByEmails.Select(&out, pq.Array(emails)); err != nil {
c.log.Printf("error fetching subscriber: %v", err)
return nil, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
}
if len(out) == 0 {
return nil, echo.NewHTTPError(http.StatusBadRequest, c.i18n.T("campaigns.noKnownSubsToTest"))
}
if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
c.log.Printf("error loading subscriber lists: %v", err)
return nil, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.lists}", "error", pqErrMsg(err)))
}
return out, nil
}
// QuerySubscribers queries and returns paginated subscrribers based on the given params including the total count.
func (c *Core) QuerySubscribers(searchStr, queryExp string, listIDs []int, subStatus string, order, orderBy string, offset, limit int) (models.Subscribers, int, error) {
// Sort params.
if !strSliceContains(orderBy, subQuerySortFields) {
orderBy = "subscribers.id"
}
if order != SortAsc && order != SortDesc {
order = SortDesc
}
// Required for pq.Array()
if listIDs == nil {
listIDs = []int{}
}
// There's an arbitrary query condition.
cond := "TRUE"
if queryExp != "" {
cond = queryExp
}
// stmt is the raw SQL query.
stmt := strings.ReplaceAll(c.q.QuerySubscribers, "%query%", cond)
stmt = strings.ReplaceAll(stmt, "%order%", orderBy+" "+order)
// Validate the tables used in the query.
if err := validateQueryTables(c.db, stmt, allowedSubQueryTables); err != nil {
c.log.Printf("error validating query tables: %v", err)
return nil, 0, echo.NewHTTPError(http.StatusBadRequest,
c.i18n.Ts("subscribers.errorPreparingQuery", "error", err.Error()))
}
// Create a readonly transaction that just does COUNT() to obtain the count of results
// and to ensure that the arbitrary query is indeed readonly.
total, err := c.getSubscriberCount(searchStr, cond, subStatus, listIDs)
if err != nil {
c.log.Printf("error getting subscriber count: %v", err)
return nil, 0, err
}
// No results.
if total == 0 {
return models.Subscribers{}, 0, nil
}
tx, err := c.db.BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
if err != nil {
c.log.Printf("error preparing subscriber query: %v", err)
return nil, 0, echo.NewHTTPError(http.StatusBadRequest, c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
}
defer tx.Rollback()
var out models.Subscribers
if err := tx.Select(&out, stmt, pq.Array(listIDs), subStatus, searchStr, offset, limit); err != nil {
return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
// Lazy load lists for each subscriber.
if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
c.log.Printf("error fetching subscriber lists: %v", err)
return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
return out, total, nil
}
// GetSubscriberLists returns a subscriber's lists based on the given conditions.
func (c *Core) GetSubscriberLists(subID int, uuid string, listIDs []int, listUUIDs []string, subStatus string, listType string) ([]models.List, error) {
if listIDs == nil {
listIDs = []int{}
}
if listUUIDs == nil {
listUUIDs = []string{}
}
var uu any
if uuid != "" {
uu = uuid
}
// Fetch double opt-in lists from the given list IDs.
// Get the list of subscription lists where the subscriber hasn't confirmed.
out := []models.List{}
if err := c.q.GetSubscriberLists.Select(&out, subID, uu, pq.Array(listIDs), pq.Array(listUUIDs), subStatus, listType); err != nil {
c.log.Printf("error fetching lists for opt-in: %s", pqErrMsg(err))
return nil, err
}
return out, nil
}
// GetSubscriberProfileForExport returns the subscriber's profile data as a JSON exportable.
// Get the subscriber's data. A single query that gets the profile, list subscriptions, campaign views,
// and link clicks. Names of private lists are replaced with "Private list".
func (c *Core) GetSubscriberProfileForExport(id int, uuid string) (models.SubscriberExportProfile, error) {
var uu any
if uuid != "" {
uu = uuid
}
var out models.SubscriberExportProfile
if err := c.q.ExportSubscriberData.Get(&out, id, uu); err != nil {
c.log.Printf("error fetching subscriber export data: %v", err)
return models.SubscriberExportProfile{}, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", err.Error()))
}
return out, nil
}
// ExportSubscribers returns an iterator function that provides lists of subscribers based
// on the given criteria in an exportable form. The iterator function returned can be called
// repeatedly until there are nil subscribers. It's an iterator because exports can be extremely
// large and may have to be fetched in batches from the DB and streamed somewhere.
func (c *Core) ExportSubscribers(searchStr, query string, subIDs, listIDs []int, subStatus string, batchSize int) (func() ([]models.SubscriberExport, error), error) {
if subIDs == nil {
subIDs = []int{}
}
if listIDs == nil {
listIDs = []int{}
}
// There's an arbitrary query condition.
cond := "TRUE"
if query != "" {
cond = query
}
stmt := strings.ReplaceAll(c.q.QuerySubscribersForExport, "%query%", cond)
// Create a readonly transaction that just does COUNT() to obtain the count of results
// and to ensure that the arbitrary query is indeed readonly.
if _, err := c.getSubscriberCount(searchStr, cond, subStatus, listIDs); err != nil {
c.log.Printf("error getting subscriber count: %v", err)
return nil, err
}
// Prepare the actual query statement.
tx, err := c.db.Preparex(stmt)
if err != nil {
c.log.Printf("error preparing subscriber query: %v", err)
return nil, echo.NewHTTPError(http.StatusBadRequest,
c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
}
id := 0
return func() ([]models.SubscriberExport, error) {
var out []models.SubscriberExport
if err := tx.Select(&out, pq.Array(listIDs), id, pq.Array(subIDs), subStatus, searchStr, batchSize); err != nil {
c.log.Printf("error exporting subscribers by query: %v", err)
return nil, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
if len(out) == 0 {
return nil, nil
}
id = out[len(out)-1].ID
return out, nil
}, nil
}
// InsertSubscriber inserts a subscriber and returns the ID. The first bool indicates if
// it was a new subscriber, and the second bool indicates if the subscriber was sent an optin confirmation.
// bool = optinSent?
func (c *Core) InsertSubscriber(sub models.Subscriber, listIDs []int, listUUIDs []string, preconfirm bool) (models.Subscriber, bool, error) {
uu, err := uuid.NewV4()
if err != nil {
c.log.Printf("error generating UUID: %v", err)
return models.Subscriber{}, false, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorUUID", "error", err.Error()))
}
sub.UUID = uu.String()
subStatus := models.SubscriptionStatusUnconfirmed
if preconfirm {
subStatus = models.SubscriptionStatusConfirmed
}
if sub.Status == "" {
sub.Status = auth.UserStatusEnabled
}
// For pq.Array()
if listIDs == nil {
listIDs = []int{}
}
if listUUIDs == nil {
listUUIDs = []string{}
}
if err = c.q.InsertSubscriber.Get(&sub.ID,
sub.UUID,
sub.Email,
strings.TrimSpace(sub.Name),
sub.Status,
sub.Attribs,
pq.Array(listIDs),
pq.Array(listUUIDs),
subStatus); err != nil {
if pqErr, ok := err.(*pq.Error); ok && pqErr.Constraint == "subscribers_email_key" {
return models.Subscriber{}, false, echo.NewHTTPError(http.StatusConflict, c.i18n.T("subscribers.emailExists"))
} else {
// return sub.Subscriber, errSubscriberExists
c.log.Printf("error inserting subscriber: %v", err)
return models.Subscriber{}, false, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorCreating", "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
}
}
// Fetch the subscriber's full data. If the subscriber already existed and wasn't
// created, the id will be empty. Fetch the details by e-mail then.
out, err := c.GetSubscriber(sub.ID, "", sub.Email)
if err != nil {
return models.Subscriber{}, false, err
}
hasOptin := false
if !preconfirm && c.consts.SendOptinConfirmation {
// Send a confirmation e-mail (if there are any double opt-in lists).
num, err := c.h.SendOptinConfirmation(out, listIDs)
if err != nil {
return out, hasOptin, err
}
hasOptin = num > 0
}
return out, hasOptin, nil
}
// UpdateSubscriber updates a subscriber's properties.
func (c *Core) UpdateSubscriber(id int, sub models.Subscriber) (models.Subscriber, error) {
// Format raw JSON attributes.
attribs := []byte("{}")
if len(sub.Attribs) > 0 {
if b, err := json.Marshal(sub.Attribs); err != nil {
return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorUpdating",
"name", "{globals.terms.subscriber}", "error", err.Error()))
} else {
attribs = b
}
}
_, err := c.q.UpdateSubscriber.Exec(id,
sub.Email,
strings.TrimSpace(sub.Name),
sub.Status,
json.RawMessage(attribs),
)
if err != nil {
c.log.Printf("error updating subscriber: %v", err)
return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
}
out, err := c.GetSubscriber(sub.ID, "", sub.Email)
if err != nil {
return models.Subscriber{}, err
}
return out, nil
}
// UpdateSubscriberWithLists updates a subscriber's properties.
// If deleteLists is set to true, all existing subscriptions are deleted and only
// the ones provided are added or retained.
func (c *Core) UpdateSubscriberWithLists(id int, sub models.Subscriber, listIDs []int, listUUIDs []string, preconfirm, deleteLists bool) (models.Subscriber, bool, error) {
subStatus := models.SubscriptionStatusUnconfirmed
if preconfirm {
subStatus = models.SubscriptionStatusConfirmed
}
// Format raw JSON attributes.
attribs := []byte("{}")
if len(sub.Attribs) > 0 {
if b, err := json.Marshal(sub.Attribs); err != nil {
return models.Subscriber{}, false, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorUpdating",
"name", "{globals.terms.subscriber}", "error", err.Error()))
} else {
attribs = b
}
}
_, err := c.q.UpdateSubscriberWithLists.Exec(id,
sub.Email,
strings.TrimSpace(sub.Name),
sub.Status,
json.RawMessage(attribs),
pq.Array(listIDs),
pq.Array(listUUIDs),
subStatus,
deleteLists)
if err != nil {
c.log.Printf("error updating subscriber: %v", err)
return models.Subscriber{}, false, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
}
out, err := c.GetSubscriber(sub.ID, "", sub.Email)
if err != nil {
return models.Subscriber{}, false, err
}
hasOptin := false
if !preconfirm && c.consts.SendOptinConfirmation {
// Send a confirmation e-mail (if there are any double opt-in lists).
num, err := c.h.SendOptinConfirmation(out, listIDs)
if err != nil {
return out, hasOptin, err
}
hasOptin = num > 0
}
return out, hasOptin, nil
}
// BlocklistSubscribers blocklists the given list of subscribers.
func (c *Core) BlocklistSubscribers(subIDs []int) error {
if _, err := c.q.BlocklistSubscribers.Exec(pq.Array(subIDs)); err != nil {
c.log.Printf("error blocklisting subscribers: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("subscribers.errorBlocklisting", "error", err.Error()))
}
return nil
}
// BlocklistSubscribersByQuery blocklists the given list of subscribers.
func (c *Core) BlocklistSubscribersByQuery(searchStr, queryExp string, listIDs []int, subStatus string) error {
if err := c.q.ExecSubQueryTpl(searchStr, sanitizeSQLExp(queryExp), c.q.BlocklistSubscribersByQuery, listIDs, c.db, subStatus); err != nil {
c.log.Printf("error blocklisting subscribers: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("subscribers.errorBlocklisting", "error", pqErrMsg(err)))
}
return nil
}
// DeleteSubscribers deletes the given list of subscribers.
func (c *Core) DeleteSubscribers(subIDs []int, subUUIDs []string) error {
if subIDs == nil {
subIDs = []int{}
}
if subUUIDs == nil {
subUUIDs = []string{}
}
if _, err := c.q.DeleteSubscribers.Exec(pq.Array(subIDs), pq.Array(subUUIDs)); err != nil {
c.log.Printf("error deleting subscribers: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
return nil
}
// DeleteSubscribersByQuery deletes subscribers by a given arbitrary query expression.
func (c *Core) DeleteSubscribersByQuery(searchStr, queryExp string, listIDs []int, subStatus string) error {
err := c.q.ExecSubQueryTpl(searchStr, sanitizeSQLExp(queryExp), c.q.DeleteSubscribersByQuery, listIDs, c.db, subStatus)
if err != nil {
c.log.Printf("error deleting subscribers: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
return err
}
// UnsubscribeByCampaign unsubscribes a given subscriber from lists in a given campaign.
func (c *Core) UnsubscribeByCampaign(subUUID, campUUID string, blocklist bool) error {
if _, err := c.q.UnsubscribeByCampaign.Exec(campUUID, subUUID, blocklist); err != nil {
c.log.Printf("error unsubscribing: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
return nil
}
// ConfirmOptionSubscription confirms a subscriber's optin subscription.
func (c *Core) ConfirmOptionSubscription(subUUID string, listUUIDs []string, meta models.JSON) error {
if meta == nil {
meta = models.JSON{}
}
if _, err := c.q.ConfirmSubscriptionOptin.Exec(subUUID, pq.Array(listUUIDs), meta); err != nil {
c.log.Printf("error confirming subscription: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
return nil
}
// DeleteSubscriberBounces deletes the given list of subscribers.
func (c *Core) DeleteSubscriberBounces(id int, uuid string) error {
var uu any
if uuid != "" {
uu = uuid
}
if _, err := c.q.DeleteBouncesBySubscriber.Exec(id, uu); err != nil {
c.log.Printf("error deleting bounces: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.bounces}", "error", pqErrMsg(err)))
}
return nil
}
// DeleteOrphanSubscribers deletes orphan subscriber records (subscribers without lists).
func (c *Core) DeleteOrphanSubscribers() (int, error) {
res, err := c.q.DeleteOrphanSubscribers.Exec()
if err != nil {
c.log.Printf("error deleting orphan subscribers: %v", err)
return 0, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
n, _ := res.RowsAffected()
return int(n), nil
}
// DeleteBlocklistedSubscribers deletes blocklisted subscribers.
func (c *Core) DeleteBlocklistedSubscribers() (int, error) {
res, err := c.q.DeleteBlocklistedSubscribers.Exec()
if err != nil {
c.log.Printf("error deleting blocklisted subscribers: %v", err)
return 0, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
n, _ := res.RowsAffected()
return int(n), nil
}
func (c *Core) getSubscriberCount(searchStr, queryExp, subStatus string, listIDs []int) (int, error) {
// If there's no condition, it's a "get all" call which can probably be optionally pulled from cache.
if queryExp == "" {
_ = c.refreshCache(matListSubStats, false)
total := 0
if err := c.q.QuerySubscribersCountAll.Get(&total, pq.Array(listIDs), subStatus); err != nil {
return 0, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
return total, nil
}
// Create a readonly transaction that just does COUNT() to obtain the count of results
// and to ensure that the arbitrary query is indeed readonly.
stmt := strings.ReplaceAll(c.q.QuerySubscribersCount, "%query%", queryExp)
tx, err := c.db.BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
if err != nil {
c.log.Printf("error preparing subscriber query: %v", err)
return 0, echo.NewHTTPError(http.StatusBadRequest, c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
}
defer tx.Rollback()
// Execute the readonly query and get the count of results.
total := 0
if err := tx.Get(&total, stmt, pq.Array(listIDs), subStatus, searchStr); err != nil {
return 0, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
return total, nil
}
// validateQueryTables checks if the query accesses only allowed tables.
func validateQueryTables(db *sqlx.DB, query string, allowedTables map[string]struct{}) error {
// Get the EXPLAIN (FORMAT JSON) output.
tx, err := db.BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
if err != nil {
return err
}
defer tx.Rollback()
var plan string
if err = tx.QueryRow("EXPLAIN (FORMAT JSON) "+query, nil, models.SubscriberStatusEnabled, "", 0, 10).Scan(&plan); err != nil {
return err
}
// Extract all relation names from the JSON plan.
tables, err := getTablesFromQueryPlan(plan)
if err != nil {
return fmt.Errorf("error getting tables from query: %v", err)
}
// Validate against allowed tables.
for _, table := range tables {
if _, ok := allowedTables[table]; !ok {
return fmt.Errorf("table '%s' is not allowed", table)
}
}
return nil
}
// getTablesFromQueryPlan parses the EXPLAIN JSON to find all "Relation Name" entries.
func getTablesFromQueryPlan(explainJSON string) ([]string, error) {
var plans []map[string]any
if err := json.Unmarshal([]byte(explainJSON), &plans); err != nil {
return nil, err
}
// Collect table names in `tables` recursively.
tables := make(map[string]struct{})
for _, plan := range plans {
traverseQueryPlan(plan, tables)
}
result := make([]string, 0, len(tables))
for table := range tables {
result = append(result, table)
}
return result, nil
}
func traverseQueryPlan(node map[string]any, tables map[string]struct{}) {
if relName, ok := node["Relation Name"].(string); ok {
tables[relName] = struct{}{}
}
// Recursively check nested plans (e.g., subqueries, CTEs).
for _, v := range node {
switch v := v.(type) {
case map[string]any:
traverseQueryPlan(v, tables)
case []any:
for _, item := range v {
if m, ok := item.(map[string]any); ok {
traverseQueryPlan(m, tables)
}
}
}
}
}