listmonk/internal/events/events.go
Kailash Nadh 0b2da4c664 Add support for streaming async events via HTTP serverside events.
- `GET /api/events?type=error` opens a long-lived HTTP server side
  event connection that streams error messages.
- async (typically SMTP) errors are now streamed to the frontend and
  disaplyed as an error toast on the admin UI.
2023-05-27 15:44:59 +05:30

101 lines
1.9 KiB
Go

// Package events implements a simple event broadcasting mechanism
// for usage in broadcasting error messages, postbacks etc. various
// channels.
package events
import (
"bytes"
"fmt"
"io"
"sync"
)
const (
TypeError = "error"
)
// Event represents a single event in the system.
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Message string `json:"message"`
Data interface{} `json:"data"`
Channels []string `json:"-"`
}
type Events struct {
subs map[string]chan Event
sync.RWMutex
}
// New returns a new instance of Events.
func New() *Events {
return &Events{
subs: make(map[string]chan Event),
}
}
// Subscribe returns a channel to which the given event `types` are streamed.
// id is the unique identifier for the caller. A caller can only register
// for subscription once.
func (ev *Events) Subscribe(id string) (chan Event, error) {
ev.Lock()
defer ev.Unlock()
if ch, ok := ev.subs[id]; ok {
return ch, nil
}
ch := make(chan Event, 100)
ev.subs[id] = ch
return ch, nil
}
// Unsubscribe unsubscribes a subscriber (obviously).
func (ev *Events) Unsubscribe(id string) {
ev.Lock()
defer ev.Unlock()
delete(ev.subs, id)
}
// Publish publishes an event to all subscribers.
func (ev *Events) Publish(e Event) error {
ev.Lock()
defer ev.Unlock()
for _, ch := range ev.subs {
select {
case ch <- e:
default:
return fmt.Errorf("event queue full for type: %s", e.Type)
}
}
return nil
}
// This implements an io.Writer specifically for receiving error messages
// mirrored (io.MultiWriter) from error log writing.
type wri struct {
ev *Events
}
func (w *wri) Write(b []byte) (n int, err error) {
// Only broadcast error messages.
if !bytes.Contains(b, []byte("error")) {
return 0, nil
}
w.ev.Publish(Event{
Type: TypeError,
Message: string(b),
})
return len(b), nil
}
func (ev *Events) ErrWriter() io.Writer {
return &wri{ev: ev}
}