mirror of
https://github.com/knadh/listmonk.git
synced 2024-11-15 12:05:50 +08:00
414c5c0c99
This commit fully refactors the core campaign manager logic. It applies a whole new approach to campaign state and lifecycle management. - Create a new "pipeline" abstraction on top of campaign for state management. - Account for every message processed and end campaigns based on the actual count. - Discard in-queue messages in the pipeline of a paused or cancelled campaign.
52 lines
1.1 KiB
Go
52 lines
1.1 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/labstack/echo/v4"
|
|
)
|
|
|
|
// handleEventStream serves an endpoint that never closes and pushes a
|
|
// live event stream (text/event-stream) such as a error messages.
|
|
func handleEventStream(c echo.Context) error {
|
|
var (
|
|
app = c.Get("app").(*App)
|
|
)
|
|
|
|
h := c.Response().Header()
|
|
h.Set(echo.HeaderContentType, "text/event-stream")
|
|
h.Set(echo.HeaderCacheControl, "no-store")
|
|
h.Set(echo.HeaderConnection, "keep-alive")
|
|
|
|
// Subscribe to the event stream with a random ID.
|
|
id := fmt.Sprintf("api:%v", time.Now().UnixNano())
|
|
sub, err := app.events.Subscribe(id)
|
|
if err != nil {
|
|
log.Fatalf("error subscribing to events: %v", err)
|
|
}
|
|
|
|
ctx := c.Request().Context()
|
|
for {
|
|
select {
|
|
case e := <-sub:
|
|
b, err := json.Marshal(e)
|
|
if err != nil {
|
|
app.log.Printf("error marshalling event: %v", err)
|
|
continue
|
|
}
|
|
|
|
c.Response().Write([]byte(fmt.Sprintf("retry: 3000\ndata: %s\n\n", b)))
|
|
c.Response().Flush()
|
|
|
|
case <-ctx.Done():
|
|
// On HTTP connection close, unsubscribe.
|
|
app.events.Unsubscribe(id)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|