From 513f85ede7a6521e5c821c2176c2438a85fdeeb9 Mon Sep 17 00:00:00 2001 From: Matthew R Kasun Date: Thu, 27 Jan 2022 09:48:32 -0500 Subject: [PATCH] refactor server pings --- controllers/node_grpc.go | 1 + logic/peers.go | 2 +- models/structs.go | 1 + mq/mq.go | 28 ++++++++++++++++++++-------- netclient/functions/daemon.go | 21 ++++++++++++++++----- 5 files changed, 39 insertions(+), 14 deletions(-) diff --git a/controllers/node_grpc.go b/controllers/node_grpc.go index 8edebf5f..1dae5714 100644 --- a/controllers/node_grpc.go +++ b/controllers/node_grpc.go @@ -71,6 +71,7 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) var serverAddrs = make([]models.ServerAddr, len(serverNodes)) for i, server := range serverNodes { serverAddrs[i] = models.ServerAddr{ + ID: server.ID, IsLeader: logic.IsLeader(&server), Address: server.Address, } diff --git a/logic/peers.go b/logic/peers.go index 44e4ae5c..97b741be 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -57,7 +57,7 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) { } peers = append(peers, peerData) if peer.IsServer == "yes" { - serverNodeAddresses = append(serverNodeAddresses, models.ServerAddr{IsLeader: IsLeader(&peer), Address: peer.Address}) + serverNodeAddresses = append(serverNodeAddresses, models.ServerAddr{ID: peer.ID, IsLeader: IsLeader(&peer), Address: peer.Address}) } } peerUpdate.Network = node.Network diff --git a/models/structs.go b/models/structs.go index 3020c1bd..45b03a3c 100644 --- a/models/structs.go +++ b/models/structs.go @@ -172,6 +172,7 @@ type Telemetry struct { // ServerAddr - to pass to clients to tell server addresses and if it's the leader or not type ServerAddr struct { + ID string `json:"id" bson:"id" yaml:"id"` IsLeader bool `json:"isleader" bson:"isleader" yaml:"isleader"` Address string `json:"address" bson:"address" yaml:"address"` } diff --git a/mq/mq.go b/mq/mq.go index 9d20e146..81417266 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -160,6 +160,7 @@ func SetupMQTT() mqtt.Client { if token := client.Connect(); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } + logger.Log(2, "connected to message queue", broker) return client } @@ -170,18 +171,29 @@ func Keepalive(ctx context.Context) { case <-ctx.Done(): return case <-time.After(time.Second * KEEPALIVE_TIMEOUT): - nodes, err := logic.GetAllNodes() - if err != nil { - logger.Log(1, "error retrieving nodes for keepalive", err.Error()) - } client := SetupMQTT() - for _, node := range nodes { - if token := client.Publish("serverkeepalive/"+node.ID, 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil { - logger.Log(1, "error publishing server keepalive", token.Error().Error()) + networks, err := logic.GetNetworks() + if err != nil { + logger.Log(1, "error retrieving networks for keepalive", err.Error()) + } + for _, network := range networks { + var id string + for _, servAddr := range network.DefaultServerAddrs { + if servAddr.IsLeader { + id = servAddr.ID + } + } + if id != "" { + logger.Log(0, "leader not defined for network", network.NetID) + continue + } + if token := client.Publish("serverkeepalive/"+id, 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil { + logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error()) + } else { + logger.Log(2, "keepalive sent for network", network.NetID) } client.Disconnect(MQ_DISCONNECT) } - logger.Log(2, "keepalive sent to all nodes") } } } diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 3e42f006..21906884 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -102,11 +102,21 @@ func MessageQueue(ctx context.Context, network string) { if cfg.DebugOn { ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/peers/" + cfg.Node.ID) } - if token := client.Subscribe("serverkeepalive/"+cfg.Node.ID, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil { - log.Fatal(token.Error()) - } - if cfg.DebugOn { - ncutils.Log("subscribed to server keepalives") + var id string + for _, server := range cfg.NetworkSettings.DefaultServerAddrs { + if server.IsLeader { + id = server.ID + } + if server.Address != "" { + if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + if cfg.DebugOn { + ncutils.Log("subscribed to server keepalives") + } + } else { + ncutils.Log("leader not defined for network" + cfg.Network) + } } defer client.Disconnect(250) go MonitorKeepalive(ctx, client, &cfg) @@ -270,6 +280,7 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) { serverid := string(msg.Payload()) keepalive <- serverid + ncutils.Log("keepalive from server") } // Resubscribe --- handles resubscribing if needed