From 5a561b38354b1711eb79e84875fa1bd2e5f80505 Mon Sep 17 00:00:00 2001 From: Abhishek K <32607604+abhishek9686@users.noreply.github.com> Date: Fri, 16 Aug 2024 15:35:43 +0530 Subject: [PATCH] Net 1440 batchpeerupdate (#3042) * NET-1440 scale test changes * fix UT error and add error info * load metric data into cacha in startup * remove debug info for metric * add server telemetry and hasSuperAdmin to cache * fix user UT case * update sqlite connection string for performance * update check-in TS in cache only if cache enabled * update metric data in cache only if cache enabled and write to DB once in stop * update server status in mq topic * add failover existed to server status update * only send mq messsage when there is server status change * batch peerUpdate * code changes for scale for review * update UT case * update mq client check * mq connection code change * revert server status update changes * revert batch peerUpdate * remove server status update info * batch peerUpdate * code changes based on review and setupmqtt in keepalive * set the mq message order to false for PIN * remove setupmqtt in keepalive * add peerUpdate batch size to config * update batch peerUpdate * recycle ip in node deletion * update ip allocation logic * remove ip addr cap * remove ippool file * update get extClient func * remove ip from cache map when extClient is removed * add batch peerUpdate switch * set batch peerUpdate to true by default --------- Co-authored-by: Max Ma --- controllers/ext_client.go | 8 +----- controllers/network.go | 10 ++------ controllers/node.go | 2 +- mq/handlers.go | 4 +-- mq/publishers.go | 51 +++++++++++++++++++++++++++++-------- mq/util.go | 21 +++++++++++++++ pro/controllers/relay.go | 10 ++------ pro/remote_access_client.go | 2 +- servercfg/serverconf.go | 22 ++++++++++++++++ 9 files changed, 92 insertions(+), 38 deletions(-) diff --git a/controllers/ext_client.go b/controllers/ext_client.go index 2452d49c..d5f16f60 100644 --- a/controllers/ext_client.go +++ b/controllers/ext_client.go @@ -698,13 +698,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) { slog.Error("Failed to get nodes", "error", err) return } - go mq.PublishSingleHostPeerUpdate( - ingressHost, - nodes, - nil, - []models.ExtClient{oldExtClient}, - false, - ) + go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}, false, nil) } } diff --git a/controllers/network.go b/controllers/network.go index 2c2f2b43..fd8b95de 100644 --- a/controllers/network.go +++ b/controllers/network.go @@ -334,14 +334,8 @@ func updateNetworkACLv2(w http.ResponseWriter, r *http.Request) { } for hostId, clients := range assocClientsToDisconnectPerHost { if host, ok := hostsMap[hostId]; ok { - if err = mq.PublishSingleHostPeerUpdate(&host, allNodes, nil, clients, false); err != nil { - slog.Error( - "failed to publish peer update to ingress after ACL update on network", - "network", - netname, - "host", - hostId, - ) + if err = mq.PublishSingleHostPeerUpdate(&host, allNodes, nil, clients, false, nil); err != nil { + slog.Error("failed to publish peer update to ingress after ACL update on network", "network", netname, "host", hostId) } } } diff --git a/controllers/node.go b/controllers/node.go index 09cce98f..f7962d2e 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -593,7 +593,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { return } go func() { - if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false); err != nil { + if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false, nil); err != nil { slog.Error("publishSingleHostUpdate", "host", host.Name, "error", err) } if err := mq.NodeUpdate(&node); err != nil { diff --git a/mq/handlers.go b/mq/handlers.go index 8adb0744..12913326 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -65,7 +65,7 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) { } allNodes, err := logic.GetAllNodes() if err == nil { - PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false) + PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false, nil) } } else { err = PublishPeerUpdate(false) @@ -117,7 +117,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) { if err != nil { return } - if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false); err != nil { + if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false, nil); err != nil { slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err) return } diff --git a/mq/publishers.go b/mq/publishers.go index 71bb96af..099eb4a0 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" "github.com/gravitl/netmaker/logger" @@ -13,6 +14,9 @@ import ( "golang.org/x/exp/slog" ) +var batchSize = servercfg.GetPeerUpdateBatchSize() +var batchUpdate = servercfg.GetBatchPeerUpdate() + // PublishPeerUpdate --- determines and publishes a peer update to all the hosts func PublishPeerUpdate(replacePeers bool) error { if !servercfg.IsMessageQueueBackend() { @@ -28,15 +32,37 @@ func PublishPeerUpdate(replacePeers bool) error { if err != nil { return err } - for _, host := range hosts { - host := host - go func(host models.Host) { - if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil { - logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) - } - }(host) + + //if batch peer update disabled + if !batchUpdate { + for _, host := range hosts { + host := host + go func(host models.Host) { + if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil { + logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) + } + }(host) + } + return nil } - return err + + //if batch peer update enabled + batchHost := BatchItems(hosts, batchSize) + var wg sync.WaitGroup + for _, v := range batchHost { + hostLen := len(v) + wg.Add(hostLen) + for i := 0; i < hostLen; i++ { + host := hosts[i] + go func(host models.Host) { + if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil { + logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) + } + }(host) + } + wg.Wait() + } + return nil } // PublishDeletedNodePeerUpdate --- determines and publishes a peer update @@ -57,7 +83,7 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error { } for _, host := range hosts { host := host - if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false); err != nil { + if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, nil); err != nil { logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) } } @@ -83,7 +109,7 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error { for _, host := range hosts { host := host if host.OS != models.OS_Types.IoT { - if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false); err != nil { + if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, nil); err != nil { logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) } } @@ -92,7 +118,10 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error { } // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host -func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool) error { +func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool, wg *sync.WaitGroup) error { + if wg != nil { + defer wg.Done() + } peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients) if err != nil { return err diff --git a/mq/util.go b/mq/util.go index 72cf7160..a38cd7d7 100644 --- a/mq/util.go +++ b/mq/util.go @@ -3,6 +3,7 @@ package mq import ( "errors" "fmt" + "math" "strings" "time" @@ -45,6 +46,26 @@ func DecryptMsg(node *models.Node, msg []byte) ([]byte, error) { return decryptMsgWithHost(host, msg) } +func BatchItems[T any](items []T, batchSize int) [][]T { + if batchSize <= 0 { + return nil + } + remainderBatchSize := len(items) % batchSize + nBatches := int(math.Ceil(float64(len(items)) / float64(batchSize))) + batches := make([][]T, nBatches) + for i := range batches { + if i == nBatches-1 && remainderBatchSize > 0 { + batches[i] = make([]T, remainderBatchSize) + } else { + batches[i] = make([]T, batchSize) + } + for j := range batches[i] { + batches[i][j] = items[i*batchSize+j] + } + } + return batches +} + func encryptMsg(host *models.Host, msg []byte) ([]byte, error) { if host.OS == models.OS_Types.IoT { return msg, nil diff --git a/pro/controllers/relay.go b/pro/controllers/relay.go index b8061b4f..289dc531 100644 --- a/pro/controllers/relay.go +++ b/pro/controllers/relay.go @@ -133,14 +133,8 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) { return } node.IsRelay = true // for iot update to recognise that it has to delete relay peer - if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false); err != nil { - logger.Log( - 1, - "failed to publish peer update to host", - h.ID.String(), - ": ", - err.Error(), - ) + if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false, nil); err != nil { + logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error()) } } } diff --git a/pro/remote_access_client.go b/pro/remote_access_client.go index 68da2baa..5ec6708a 100644 --- a/pro/remote_access_client.go +++ b/pro/remote_access_client.go @@ -78,7 +78,7 @@ func disableExtClient(client *models.ExtClient) error { if err != nil { return err } - go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false) + go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false, nil) } else { return err } diff --git a/servercfg/serverconf.go b/servercfg/serverconf.go index 1ab54b7f..3090f33a 100644 --- a/servercfg/serverconf.go +++ b/servercfg/serverconf.go @@ -597,6 +597,28 @@ func GetMetricInterval() string { return mi } +// GetBatchPeerUpdate - if batch peer update +func GetBatchPeerUpdate() bool { + enabled := true + if os.Getenv("PEER_UPDATE_BATCH") != "" { + enabled = os.Getenv("PEER_UPDATE_BATCH") == "true" + } + return enabled +} + +// GetPeerUpdateBatchSize - get the batch size for peer update +func GetPeerUpdateBatchSize() int { + //default 50 + batchSize := 50 + if os.Getenv("PEER_UPDATE_BATCH_SIZE") != "" { + b, e := strconv.Atoi(os.Getenv("PEER_UPDATE_BATCH_SIZE")) + if e == nil && b > 0 && b < 1000 { + batchSize = b + } + } + return batchSize +} + // GetEmqxRestEndpoint - returns the REST API Endpoint of EMQX func GetEmqxRestEndpoint() string { return os.Getenv("EMQX_REST_ENDPOINT")