listmonk/internal/core/subscribers.go
Kailash Nadh b5cd9498b1 Refactore all CRUD functions to a new core package.
This is a long pending refactor. All the DB, query, CRUD, and related
logic scattered across HTTP handlers are now moved into a central
`core` package with clean, abstracted methods, decoupling HTTP
handlers from executing direct DB queries and other business logic.

eg: `core.CreateList()`, `core.GetLists()` etc.

- Remove obsolete subscriber methods.
- Move optin hook queries to core.
- Move campaign methods to `core`.
- Move all campaign methods to `core`.
- Move public page functions to `core`.
- Move all template functions to `core`.
- Move media and settings function to `core`.
- Move handler middleware functions to `core`.
- Move all bounce functions to `core`.
- Move all dashboard functions to `core`.
- Fix GetLists() not honouring type
- Fix unwrapped JSON responses.
- Clean up obsolete pre-core util function.
- Replace SQL array null check with cardinality check.
- Fix missing validations in `core` queries.
- Remove superfluous deps on internal `subimporter`.
- Add dashboard functions to `core`.
- Fix broken domain ban check.
- Fix broken subscriber check middleware.
- Remove redundant error handling.
- Remove obsolete functions.
- Remove obsolete structs.
- Remove obsolete queries and DB functions.
- Document the `core` package.
2022-05-03 10:50:29 +05:30

437 lines
15 KiB
Go

package core
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/gofrs/uuid"
"github.com/knadh/listmonk/models"
"github.com/labstack/echo/v4"
"github.com/lib/pq"
)
var (
subQuerySortFields = []string{"email", "name", "created_at", "updated_at"}
)
// GetSubscriber fetches a subscriber by one of the given params.
func (c *Core) GetSubscriber(id int, uuid, email string) (models.Subscriber, error) {
var uu interface{}
if uuid != "" {
uu = uuid
}
var out models.Subscribers
if err := c.q.GetSubscriber.Select(&out, id, uu, email); err != nil {
c.log.Printf("error fetching subscriber: %v", err)
return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching",
"name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
}
if len(out) == 0 {
return models.Subscriber{}, echo.NewHTTPError(http.StatusBadRequest,
c.i18n.Ts("globals.messages.notFound", "name", "{globals.terms.subscriber}"))
}
if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
c.log.Printf("error loading subscriber lists: %v", err)
return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching",
"name", "{globals.terms.lists}", "error", pqErrMsg(err)))
}
return out[0], nil
}
// GetSubscribersByEmail fetches a subscriber by one of the given params.
func (c *Core) GetSubscribersByEmail(emails []string) (models.Subscribers, error) {
var out models.Subscribers
if err := c.q.GetSubscribersByEmails.Select(&out, pq.Array(emails)); err != nil {
c.log.Printf("error fetching subscriber: %v", err)
return nil, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
}
if len(out) == 0 {
return nil, echo.NewHTTPError(http.StatusBadRequest, c.i18n.T("campaigns.noKnownSubsToTest"))
}
if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
c.log.Printf("error loading subscriber lists: %v", err)
return nil, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.lists}", "error", pqErrMsg(err)))
}
return out, nil
}
// QuerySubscribers queries and returns paginated subscrribers based on the given params including the total count.
func (c *Core) QuerySubscribers(query string, listIDs []int, order, orderBy string, offset, limit int) (models.Subscribers, int, error) {
// There's an arbitrary query condition.
cond := ""
if query != "" {
cond = " AND " + query
}
// Sort params.
if !strSliceContains(orderBy, subQuerySortFields) {
orderBy = "subscribers.id"
}
if order != SortAsc && order != SortDesc {
order = SortDesc
}
// Required for pq.Array()
if listIDs == nil {
listIDs = []int{}
}
// Create a readonly transaction that just does COUNT() to obtain the count of results
// and to ensure that the arbitrary query is indeed readonly.
stmt := fmt.Sprintf(c.q.QuerySubscribersCount, cond)
tx, err := c.db.BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
if err != nil {
c.log.Printf("error preparing subscriber query: %v", err)
return nil, 0, echo.NewHTTPError(http.StatusBadRequest, c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
}
defer tx.Rollback()
// Execute the readonly query and get the count of results.
total := 0
if err := tx.Get(&total, stmt, pq.Array(listIDs)); err != nil {
return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
// No results.
if total == 0 {
return models.Subscribers{}, 0, nil
}
// Run the query again and fetch the actual data. stmt is the raw SQL query.
var out models.Subscribers
stmt = fmt.Sprintf(c.q.QuerySubscribers, cond, orderBy, order)
if err := tx.Select(&out, stmt, pq.Array(listIDs), offset, limit); err != nil {
return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
// Lazy load lists for each subscriber.
if err := out.LoadLists(c.q.GetSubscriberListsLazy); err != nil {
c.log.Printf("error fetching subscriber lists: %v", err)
return nil, 0, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
return out, total, nil
}
// GetSubscriberLists returns a subscriber's lists based on the given conditions.
func (c *Core) GetSubscriberLists(subID int, uuid string, listIDs []int, listUUIDs []string, subStatus string, listType string) ([]models.List, error) {
if listIDs == nil {
listIDs = []int{}
}
if listUUIDs == nil {
listUUIDs = []string{}
}
var uu interface{}
if uuid != "" {
uu = uuid
}
// Fetch double opt-in lists from the given list IDs.
// Get the list of subscription lists where the subscriber hasn't confirmed.
out := []models.List{}
if err := c.q.GetSubscriberLists.Select(&out, subID, uu, pq.Array(listIDs), pq.Array(listUUIDs), subStatus, listType); err != nil {
c.log.Printf("error fetching lists for opt-in: %s", pqErrMsg(err))
return nil, err
}
return out, nil
}
// GetSubscriberProfileForExport returns the subscriber's profile data as a JSON exportable.
// Get the subscriber's data. A single query that gets the profile, list subscriptions, campaign views,
// and link clicks. Names of private lists are replaced with "Private list".
func (c *Core) GetSubscriberProfileForExport(id int, uuid string) (models.SubscriberExportProfile, error) {
var uu interface{}
if uuid != "" {
uu = uuid
}
var out models.SubscriberExportProfile
if err := c.q.ExportSubscriberData.Get(&out, id, uu); err != nil {
c.log.Printf("error fetching subscriber export data: %v", err)
return models.SubscriberExportProfile{}, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", err.Error()))
}
return out, nil
}
// ExportSubscribers returns an iterator function that provides lists of subscribers based
// on the given criteria in an exportable form. The iterator function returned can be called
// repeatedly until there are nil subscribers. It's an iterator because exports can be extremely
// large and may have to be fetched in batches from the DB and streamed somewhere.
func (c *Core) ExportSubscribers(query string, subIDs, listIDs []int, batchSize int) (func() ([]models.SubscriberExport, error), error) {
// There's an arbitrary query condition.
cond := ""
if query != "" {
cond = " AND " + query
}
stmt := fmt.Sprintf(c.q.QuerySubscribersForExport, cond)
// Verify that the arbitrary SQL search expression is read only.
if cond != "" {
tx, err := c.db.Unsafe().BeginTxx(context.Background(), &sql.TxOptions{ReadOnly: true})
if err != nil {
c.log.Printf("error preparing subscriber query: %v", err)
return nil, echo.NewHTTPError(http.StatusBadRequest,
c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
}
defer tx.Rollback()
if _, err := tx.Query(stmt, nil, 0, nil, 1); err != nil {
return nil, echo.NewHTTPError(http.StatusBadRequest,
c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
}
}
if subIDs == nil {
subIDs = []int{}
}
if listIDs == nil {
listIDs = []int{}
}
// Prepare the actual query statement.
tx, err := c.db.Preparex(stmt)
if err != nil {
c.log.Printf("error preparing subscriber query: %v", err)
return nil, echo.NewHTTPError(http.StatusBadRequest,
c.i18n.Ts("subscribers.errorPreparingQuery", "error", pqErrMsg(err)))
}
id := 0
return func() ([]models.SubscriberExport, error) {
var out []models.SubscriberExport
if err := tx.Select(&out, pq.Array(listIDs), id, pq.Array(subIDs), batchSize); err != nil {
c.log.Printf("error exporting subscribers by query: %v", err)
return nil, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
if len(out) == 0 {
return nil, nil
}
id = out[len(out)-1].ID
return out, nil
}, nil
}
// insertSubscriber inserts a subscriber and returns the ID. The first bool indicates if
// it was a new subscriber, and the second bool indicates if the subscriber was sent an optin confirmation.
// 1st bool = isNew?, 2nd bool = optinSent?
func (c *Core) CreateSubscriber(sub models.Subscriber, listIDs []int, listUUIDs []string, preconfirm bool) (models.Subscriber, bool, bool, error) {
uu, err := uuid.NewV4()
if err != nil {
c.log.Printf("error generating UUID: %v", err)
return models.Subscriber{}, false, false, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorUUID", "error", err.Error()))
}
sub.UUID = uu.String()
var (
isNew = true
subStatus = models.SubscriptionStatusUnconfirmed
)
if preconfirm {
subStatus = models.SubscriptionStatusConfirmed
}
if sub.Status == "" {
sub.Status = models.UserStatusEnabled
}
// For pq.Array()
if listIDs == nil {
listIDs = []int{}
}
if listUUIDs == nil {
listUUIDs = []string{}
}
if err = c.q.InsertSubscriber.Get(&sub.ID,
sub.UUID,
sub.Email,
strings.TrimSpace(sub.Name),
sub.Status,
sub.Attribs,
pq.Array(listIDs),
pq.Array(listUUIDs),
subStatus); err != nil {
if pqErr, ok := err.(*pq.Error); ok && pqErr.Constraint == "subscribers_email_key" {
isNew = false
} else {
// return sub.Subscriber, errSubscriberExists
c.log.Printf("error inserting subscriber: %v", err)
return models.Subscriber{}, false, false, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorCreating",
"name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
}
}
// Fetch the subscriber'out full data. If the subscriber already existed and wasn't
// created, the id will be empty. Fetch the details by e-mail then.
out, err := c.GetSubscriber(sub.ID, "", sub.Email)
if err != nil {
return models.Subscriber{}, false, false, err
}
hasOptin := false
if !preconfirm && c.constants.SendOptinConfirmation {
// Send a confirmation e-mail (if there are any double opt-in lists).
num, _ := c.h.SendOptinConfirmation(out, listIDs)
hasOptin = num > 0
}
return out, isNew, hasOptin, nil
}
// UpdateSubscriber updates a subscriber's properties.
func (c *Core) UpdateSubscriber(id int, sub models.Subscriber, listIDs []int, preconfirm bool) (models.Subscriber, error) {
subStatus := models.SubscriptionStatusUnconfirmed
if preconfirm {
subStatus = models.SubscriptionStatusConfirmed
}
// Format raw JSON attributes.
attribs := []byte("{}")
if len(sub.Attribs) > 0 {
if b, err := json.Marshal(sub.Attribs); err != nil {
return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorUpdating",
"name", "{globals.terms.subscriber}", "error", err.Error()))
} else {
attribs = b
}
}
_, err := c.q.UpdateSubscriber.Exec(id,
sub.Email,
strings.TrimSpace(sub.Name),
sub.Status,
json.RawMessage(attribs),
pq.Array(listIDs),
subStatus)
if err != nil {
c.log.Printf("error updating subscriber: %v", err)
return models.Subscriber{}, echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorUpdating",
"name", "{globals.terms.subscriber}", "error", pqErrMsg(err)))
}
out, err := c.GetSubscriber(sub.ID, "", sub.Email)
if err != nil {
return models.Subscriber{}, err
}
return out, nil
}
// BlocklistSubscribers blocklists the given list of subscribers.
func (c *Core) BlocklistSubscribers(subIDs []int) error {
if _, err := c.q.BlocklistSubscribers.Exec(pq.Array(subIDs)); err != nil {
c.log.Printf("error blocklisting subscribers: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("subscribers.errorBlocklisting", "error", err.Error()))
}
return nil
}
// BlocklistSubscribersByQuery blocklists the given list of subscribers.
func (c *Core) BlocklistSubscribersByQuery(query string, listIDs []int) error {
if err := c.q.ExecSubscriberQueryTpl(sanitizeSQLExp(query), c.q.BlocklistSubscribersByQuery, listIDs, c.db); err != nil {
c.log.Printf("error blocklisting subscribers: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("subscribers.errorBlocklisting", "error", pqErrMsg(err)))
}
return nil
}
// DeleteSubscribers deletes the given list of subscribers.
func (c *Core) DeleteSubscribers(subIDs []int, subUUIDs []string) error {
if subIDs == nil {
subIDs = []int{}
}
if subUUIDs == nil {
subUUIDs = []string{}
}
if _, err := c.q.DeleteSubscribers.Exec(pq.Array(subIDs), pq.Array(subUUIDs)); err != nil {
c.log.Printf("error deleting subscribers: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
return nil
}
// DeleteSubscribersByQuery deletes subscribers by a given arbitrary query expression.
func (c *Core) DeleteSubscribersByQuery(query string, listIDs []int) error {
err := c.q.ExecSubscriberQueryTpl(sanitizeSQLExp(query), c.q.DeleteSubscribersByQuery, listIDs, c.db)
if err != nil {
c.log.Printf("error deleting subscribers: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
return err
}
// UnsubscribeByCampaign unsubscibers a given subscriber from lists in a given campaign.
func (c *Core) UnsubscribeByCampaign(subUUID, campUUID string, blocklist bool) error {
if _, err := c.q.Unsubscribe.Exec(campUUID, subUUID, blocklist); err != nil {
c.log.Printf("error unsubscribing: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
return nil
}
// ConfirmOptionSubscription confirms a subscriber's optin subscription.
func (c *Core) ConfirmOptionSubscription(subUUID string, listUUIDs []string) error {
if _, err := c.q.ConfirmSubscriptionOptin.Exec(subUUID, pq.Array(listUUIDs)); err != nil {
c.log.Printf("error confirming subscription: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorUpdating", "name", "{globals.terms.subscribers}", "error", pqErrMsg(err)))
}
return nil
}
// DeleteSubscriberBounces deletes the given list of subscribers.
func (c *Core) DeleteSubscriberBounces(id int, uuid string) error {
var uu interface{}
if uuid != "" {
uu = uuid
}
if _, err := c.q.DeleteBouncesBySubscriber.Exec(id, uu); err != nil {
c.log.Printf("error deleting bounces: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
c.i18n.Ts("globals.messages.errorDeleting", "name", "{globals.terms.bounces}", "error", pqErrMsg(err)))
}
return nil
}