diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 2e31d37a..43dae42f 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -18,15 +18,18 @@ import ( //Daemon runs netclient daemon from command line func Daemon() error { + ctx, cancel := context.WithCancel(context.Background()) networks, err := ncutils.GetSystemNetworks() if err != nil { return err } for _, network := range networks { - go Netclient(network) - } - for { + go Netclient(ctx, network) } + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGTERM, os.Interrupt) + <-quit + cancel() return nil } @@ -44,29 +47,30 @@ func SetupMQTT(cfg config.ClientConfig) mqtt.Client { } //Netclient sets up Message Queue and subsribes/publishes updates to/from server -func Netclient(network string) { - ctx, cancel := context.WithCancel(context.Background()) - var cfg config.ClientConfig - cfg.Network = network - cfg.ReadConfig() - //fix NodeID to remove ### so NodeID can be used as message topic - //remove with GRA-73 - cfg.Node.ID = strings.ReplaceAll(cfg.Node.ID, "###", "-") - ncutils.Log("daemon started for network:" + network) - client := SetupMQTT(cfg) - if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { - log.Fatal(token.Error()) +func Netclient(ctx context.Context, network string) { + select { + case <-ctx.Done(): + ncutils.Log("shutting down daemon") + return + default: + var cfg config.ClientConfig + cfg.Network = network + cfg.ReadConfig() + //fix NodeID to remove ### so NodeID can be used as message topic + //remove with GRA-73 + cfg.Node.ID = strings.ReplaceAll(cfg.Node.ID, "###", "-") + ncutils.Log("daemon started for network:" + network) + client := SetupMQTT(cfg) + if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + client.AddRoute("update/"+cfg.Node.ID, NodeUpdate) + client.AddRoute("update/peers/"+cfg.Node.ID, UpdatePeers) + client.AddRoute("update/keys/"+cfg.Node.ID, UpdateKeys) + defer client.Disconnect(250) + go Checkin(ctx, cfg, network) + go Metrics(ctx, cfg, network) } - client.AddRoute("update/"+cfg.Node.ID, NodeUpdate) - client.AddRoute("update/peers/"+cfg.Node.ID, UpdatePeers) - client.AddRoute("update/keys/"+cfg.Node.ID, UpdateKeys) - defer client.Disconnect(250) - go Checkin(ctx, cfg, network) - go Metrics(ctx, cfg, network) - quit := make(chan os.Signal, 1) - signal.Notify(quit, syscall.SIGTERM, os.Interrupt) - <-quit - cancel() } //All -- mqtt message hander for all ('#') topics @@ -80,12 +84,12 @@ var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) ncutils.Log("received message to update node " + string(msg.Payload())) } -//NodeUpdate -- mqtt message handler for /update/peers/ topic +//UpdatePeers -- mqtt message handler for /update/peers/ topic var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { ncutils.Log("received message to update peers " + string(msg.Payload())) } -//NodeUpdate -- mqtt message handler for /update/keys/ topic +//UpdateKeys -- mqtt message handler for /update/keys/ topic var UpdateKeys mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { ncutils.Log("received message to update keys " + string(msg.Payload())) }