From 0b2da4c66456035c945c1c6e48710ef3940e9bea Mon Sep 17 00:00:00 2001 From: Kailash Nadh Date: Fri, 26 May 2023 22:07:58 +0530 Subject: [PATCH] Add support for streaming async events via HTTP serverside events. - `GET /api/events?type=error` opens a long-lived HTTP server side event connection that streams error messages. - async (typically SMTP) errors are now streamed to the frontend and disaplyed as an error toast on the admin UI. --- cmd/admin.go | 2 +- cmd/events.go | 54 +++++++++++++++++++ cmd/handlers.go | 2 + cmd/main.go | 16 +++--- cmd/settings.go | 2 +- frontend/src/App.vue | 20 ++++++++ frontend/src/constants.js | 1 + frontend/src/utils.js | 4 +- internal/events/events.go | 100 ++++++++++++++++++++++++++++++++++++ internal/manager/manager.go | 4 +- 10 files changed, 193 insertions(+), 12 deletions(-) create mode 100644 cmd/events.go create mode 100644 internal/events/events.go diff --git a/cmd/admin.go b/cmd/admin.go index abaf6d0f..444eb650 100644 --- a/cmd/admin.go +++ b/cmd/admin.go @@ -89,7 +89,7 @@ func handleReloadApp(c echo.Context) error { app := c.Get("app").(*App) go func() { <-time.After(time.Millisecond * 500) - app.sigChan <- syscall.SIGHUP + app.chReload <- syscall.SIGHUP }() return c.JSON(http.StatusOK, okResp{true}) } diff --git a/cmd/events.go b/cmd/events.go new file mode 100644 index 00000000..732668dc --- /dev/null +++ b/cmd/events.go @@ -0,0 +1,54 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "time" + + "github.com/labstack/echo/v4" +) + +// handleEventStream serves an endpoint that never closes and pushes a +// live event stream (text/event-stream) such as a error messages. +func handleEventStream(c echo.Context) error { + var ( + app = c.Get("app").(*App) + ) + + h := c.Response().Header() + h.Set(echo.HeaderContentType, "text/event-stream") + h.Set(echo.HeaderCacheControl, "no-store") + h.Set(echo.HeaderConnection, "keep-alive") + + // Subscribe to the event stream with a random ID. + id := fmt.Sprintf("api:%v", time.Now().UnixNano()) + sub, err := app.events.Subscribe(id) + if err != nil { + log.Fatalf("error subscribing to events: %v", err) + } + + ctx := c.Request().Context() + for { + select { + case e := <-sub: + b, err := json.Marshal(e) + if err != nil { + app.log.Printf("error marshalling event: %v", err) + continue + } + + fmt.Printf("data: %s\n\n", b) + + c.Response().Write([]byte(fmt.Sprintf("retry: 3000\ndata: %s\n\n", b))) + c.Response().Flush() + + case <-ctx.Done(): + // On HTTP connection close, unsubscribe. + app.events.Unsubscribe(id) + return nil + } + } + + return nil +} diff --git a/cmd/handlers.go b/cmd/handlers.go index adaec435..828d79fb 100644 --- a/cmd/handlers.go +++ b/cmd/handlers.go @@ -161,6 +161,8 @@ func initHTTPHandlers(e *echo.Echo, app *App) { g.POST("/api/tx", handleSendTxMessage) + g.GET("/api/events", handleEventStream) + if app.constants.BounceWebhooksEnabled { // Private authenticated bounce endpoint. g.POST("/webhooks/bounce", handleBounceWebhook) diff --git a/cmd/main.go b/cmd/main.go index bcaec935..67b46529 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -19,6 +19,7 @@ import ( "github.com/knadh/listmonk/internal/buflog" "github.com/knadh/listmonk/internal/captcha" "github.com/knadh/listmonk/internal/core" + "github.com/knadh/listmonk/internal/events" "github.com/knadh/listmonk/internal/i18n" "github.com/knadh/listmonk/internal/manager" "github.com/knadh/listmonk/internal/media" @@ -48,12 +49,13 @@ type App struct { bounce *bounce.Manager paginator *paginator.Paginator captcha *captcha.Captcha + events *events.Events notifTpls *notifTpls log *log.Logger bufLog *buflog.BufLog // Channel for passing reload signals. - sigChan chan os.Signal + chReload chan os.Signal // Global variable that stores the state indicating that a restart is required // after a settings update. @@ -66,8 +68,9 @@ type App struct { var ( // Buffered log writer for storing N lines of log entries for the UI. - bufLog = buflog.New(5000) - lo = log.New(io.MultiWriter(os.Stdout, bufLog), "", + evStream = events.New() + bufLog = buflog.New(5000) + lo = log.New(io.MultiWriter(os.Stdout, bufLog, evStream.ErrWriter()), "", log.Ldate|log.Ltime|log.Lshortfile) ko = koanf.New(".") @@ -170,6 +173,7 @@ func main() { log: lo, bufLog: bufLog, captcha: initCaptcha(), + events: evStream, paginator: paginator.New(paginator.Opt{ DefaultPerPage: 20, @@ -240,11 +244,11 @@ func main() { // Wait for the reload signal with a callback to gracefully shut down resources. // The `wait` channel is passed to awaitReload to wait for the callback to finish // within N seconds, or do a force reload. - app.sigChan = make(chan os.Signal) - signal.Notify(app.sigChan, syscall.SIGHUP) + app.chReload = make(chan os.Signal) + signal.Notify(app.chReload, syscall.SIGHUP) closerWait := make(chan bool) - <-awaitReload(app.sigChan, closerWait, func() { + <-awaitReload(app.chReload, closerWait, func() { // Stop the HTTP server. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() diff --git a/cmd/settings.go b/cmd/settings.go index d0485175..0159dc1b 100644 --- a/cmd/settings.go +++ b/cmd/settings.go @@ -196,7 +196,7 @@ func handleUpdateSettings(c echo.Context) error { // No running campaigns. Reload the app. go func() { <-time.After(time.Millisecond * 500) - app.sigChan <- syscall.SIGHUP + app.chReload <- syscall.SIGHUP }() return c.JSON(http.StatusOK, okResp{true}) diff --git a/frontend/src/App.vue b/frontend/src/App.vue index 29d737cd..418f2978 100644 --- a/frontend/src/App.vue +++ b/frontend/src/App.vue @@ -133,6 +133,24 @@ export default Vue.extend({ }; http.send(); }, + + listenEvents() { + const reMatchLog = /(.+?)\.go:\d+:(.+?)$/im; + const evtSource = new EventSource(uris.errorEvents, { withCredentials: true }); + let numEv = 0; + evtSource.onmessage = (e) => { + if (numEv > 50) { + return; + } + numEv += 1; + + const d = JSON.parse(e.data); + if (d && d.type === 'error') { + const msg = reMatchLog.exec(d.message.trim()); + this.$utils.toast(msg[2], 'is-danger', null, true); + } + }; + }, }, computed: { @@ -155,6 +173,8 @@ export default Vue.extend({ window.addEventListener('resize', () => { this.windowWidth = window.innerWidth; }); + + this.listenEvents(); }, }); diff --git a/frontend/src/constants.js b/frontend/src/constants.js index e4cd46ca..f68b234d 100644 --- a/frontend/src/constants.js +++ b/frontend/src/constants.js @@ -22,6 +22,7 @@ export const uris = Object.freeze({ previewTemplate: '/api/templates/:id/preview', previewRawTemplate: '/api/templates/preview', exportSubscribers: '/api/subscribers/export', + errorEvents: '/api/events?type=error', base: `${baseURL}/static`, root: rootURL, static: `${baseURL}/static`, diff --git a/frontend/src/utils.js b/frontend/src/utils.js index 2283c460..cb6caf9c 100644 --- a/frontend/src/utils.js +++ b/frontend/src/utils.js @@ -160,11 +160,11 @@ export default class Utils { }); }; - toast = (msg, typ, duration) => { + toast = (msg, typ, duration, queue) => { Toast.open({ message: this.escapeHTML(msg), type: !typ ? 'is-success' : typ, - queue: false, + queue, duration: duration || 3000, position: 'is-top', pauseOnHover: true, diff --git a/internal/events/events.go b/internal/events/events.go new file mode 100644 index 00000000..5c684c4e --- /dev/null +++ b/internal/events/events.go @@ -0,0 +1,100 @@ +// Package events implements a simple event broadcasting mechanism +// for usage in broadcasting error messages, postbacks etc. various +// channels. +package events + +import ( + "bytes" + "fmt" + "io" + "sync" +) + +const ( + TypeError = "error" +) + +// Event represents a single event in the system. +type Event struct { + ID string `json:"id"` + Type string `json:"type"` + Message string `json:"message"` + Data interface{} `json:"data"` + Channels []string `json:"-"` +} + +type Events struct { + subs map[string]chan Event + sync.RWMutex +} + +// New returns a new instance of Events. +func New() *Events { + return &Events{ + subs: make(map[string]chan Event), + } +} + +// Subscribe returns a channel to which the given event `types` are streamed. +// id is the unique identifier for the caller. A caller can only register +// for subscription once. +func (ev *Events) Subscribe(id string) (chan Event, error) { + ev.Lock() + defer ev.Unlock() + + if ch, ok := ev.subs[id]; ok { + return ch, nil + } + + ch := make(chan Event, 100) + ev.subs[id] = ch + + return ch, nil +} + +// Unsubscribe unsubscribes a subscriber (obviously). +func (ev *Events) Unsubscribe(id string) { + ev.Lock() + defer ev.Unlock() + delete(ev.subs, id) +} + +// Publish publishes an event to all subscribers. +func (ev *Events) Publish(e Event) error { + ev.Lock() + defer ev.Unlock() + + for _, ch := range ev.subs { + select { + case ch <- e: + default: + return fmt.Errorf("event queue full for type: %s", e.Type) + } + } + + return nil +} + +// This implements an io.Writer specifically for receiving error messages +// mirrored (io.MultiWriter) from error log writing. +type wri struct { + ev *Events +} + +func (w *wri) Write(b []byte) (n int, err error) { + // Only broadcast error messages. + if !bytes.Contains(b, []byte("error")) { + return 0, nil + } + + w.ev.Publish(Event{ + Type: TypeError, + Message: string(b), + }) + + return len(b), nil +} + +func (ev *Events) ErrWriter() io.Writer { + return &wri{ev: ev} +} diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 7649bbc3..56d71a93 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -395,8 +395,8 @@ func (m *Manager) worker() { out.Headers = h if err := m.messengers[msg.Campaign.Messenger].Push(out); err != nil { - m.logger.Printf("error sending message in campaign %s: subscriber %s: %v", - msg.Campaign.Name, msg.Subscriber.UUID, err) + m.logger.Printf("error sending message in campaign %s: subscriber %d: %v", + msg.Campaign.Name, msg.Subscriber.ID, err) select { case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}: