Net 1440 batchpeerupdate (#3042)

* NET-1440 scale test changes

* fix UT error and add error info

* load metric data into cacha in startup

* remove debug info for metric

* add server telemetry and hasSuperAdmin to cache

* fix user UT case

* update sqlite connection string for performance

* update check-in TS in cache only if cache enabled

* update metric data in cache only if cache enabled and write to DB once in stop

* update server status in mq topic

* add failover existed to server status update

* only send mq messsage when there is server status change

* batch peerUpdate

* code changes for scale for review

* update UT case

* update mq client check

* mq connection code change

* revert server status update changes

* revert batch peerUpdate

* remove server status update info

* batch peerUpdate

* code changes based on review and setupmqtt in keepalive

* set the mq message order to false for PIN

* remove setupmqtt in keepalive

* add peerUpdate batch size to config

* update batch peerUpdate

* recycle ip in node deletion

* update ip allocation logic

* remove ip addr cap

* remove ippool file

* update get extClient func

* remove ip from cache map when extClient is removed

* add batch peerUpdate switch

* set batch peerUpdate to true by default

---------

Co-authored-by: Max Ma <mayabin@gmail.com>
This commit is contained in:
Abhishek K 2024-08-16 15:35:43 +05:30 committed by GitHub
parent 22cd0ae446
commit 5a561b3835
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 92 additions and 38 deletions

View file

@ -698,13 +698,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
slog.Error("Failed to get nodes", "error", err)
return
}
go mq.PublishSingleHostPeerUpdate(
ingressHost,
nodes,
nil,
[]models.ExtClient{oldExtClient},
false,
)
go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}, false, nil)
}
}

View file

@ -334,14 +334,8 @@ func updateNetworkACLv2(w http.ResponseWriter, r *http.Request) {
}
for hostId, clients := range assocClientsToDisconnectPerHost {
if host, ok := hostsMap[hostId]; ok {
if err = mq.PublishSingleHostPeerUpdate(&host, allNodes, nil, clients, false); err != nil {
slog.Error(
"failed to publish peer update to ingress after ACL update on network",
"network",
netname,
"host",
hostId,
)
if err = mq.PublishSingleHostPeerUpdate(&host, allNodes, nil, clients, false, nil); err != nil {
slog.Error("failed to publish peer update to ingress after ACL update on network", "network", netname, "host", hostId)
}
}
}

View file

@ -593,7 +593,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
return
}
go func() {
if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false); err != nil {
if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false, nil); err != nil {
slog.Error("publishSingleHostUpdate", "host", host.Name, "error", err)
}
if err := mq.NodeUpdate(&node); err != nil {

View file

@ -65,7 +65,7 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
}
allNodes, err := logic.GetAllNodes()
if err == nil {
PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false)
PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false, nil)
}
} else {
err = PublishPeerUpdate(false)
@ -117,7 +117,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
if err != nil {
return
}
if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false); err != nil {
if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false, nil); err != nil {
slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
return
}

View file

@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/gravitl/netmaker/logger"
@ -13,6 +14,9 @@ import (
"golang.org/x/exp/slog"
)
var batchSize = servercfg.GetPeerUpdateBatchSize()
var batchUpdate = servercfg.GetBatchPeerUpdate()
// PublishPeerUpdate --- determines and publishes a peer update to all the hosts
func PublishPeerUpdate(replacePeers bool) error {
if !servercfg.IsMessageQueueBackend() {
@ -28,15 +32,37 @@ func PublishPeerUpdate(replacePeers bool) error {
if err != nil {
return err
}
for _, host := range hosts {
host := host
go func(host models.Host) {
if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
}
}(host)
//if batch peer update disabled
if !batchUpdate {
for _, host := range hosts {
host := host
go func(host models.Host) {
if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
}
}(host)
}
return nil
}
return err
//if batch peer update enabled
batchHost := BatchItems(hosts, batchSize)
var wg sync.WaitGroup
for _, v := range batchHost {
hostLen := len(v)
wg.Add(hostLen)
for i := 0; i < hostLen; i++ {
host := hosts[i]
go func(host models.Host) {
if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
}
}(host)
}
wg.Wait()
}
return nil
}
// PublishDeletedNodePeerUpdate --- determines and publishes a peer update
@ -57,7 +83,7 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
}
for _, host := range hosts {
host := host
if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false); err != nil {
if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, nil); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
}
}
@ -83,7 +109,7 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
for _, host := range hosts {
host := host
if host.OS != models.OS_Types.IoT {
if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false); err != nil {
if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, nil); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
}
}
@ -92,7 +118,10 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
}
// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool) error {
func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool, wg *sync.WaitGroup) error {
if wg != nil {
defer wg.Done()
}
peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
if err != nil {
return err

View file

@ -3,6 +3,7 @@ package mq
import (
"errors"
"fmt"
"math"
"strings"
"time"
@ -45,6 +46,26 @@ func DecryptMsg(node *models.Node, msg []byte) ([]byte, error) {
return decryptMsgWithHost(host, msg)
}
func BatchItems[T any](items []T, batchSize int) [][]T {
if batchSize <= 0 {
return nil
}
remainderBatchSize := len(items) % batchSize
nBatches := int(math.Ceil(float64(len(items)) / float64(batchSize)))
batches := make([][]T, nBatches)
for i := range batches {
if i == nBatches-1 && remainderBatchSize > 0 {
batches[i] = make([]T, remainderBatchSize)
} else {
batches[i] = make([]T, batchSize)
}
for j := range batches[i] {
batches[i][j] = items[i*batchSize+j]
}
}
return batches
}
func encryptMsg(host *models.Host, msg []byte) ([]byte, error) {
if host.OS == models.OS_Types.IoT {
return msg, nil

View file

@ -133,14 +133,8 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
return
}
node.IsRelay = true // for iot update to recognise that it has to delete relay peer
if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false); err != nil {
logger.Log(
1,
"failed to publish peer update to host",
h.ID.String(),
": ",
err.Error(),
)
if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false, nil); err != nil {
logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error())
}
}
}

View file

@ -78,7 +78,7 @@ func disableExtClient(client *models.ExtClient) error {
if err != nil {
return err
}
go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false)
go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false, nil)
} else {
return err
}

View file

@ -597,6 +597,28 @@ func GetMetricInterval() string {
return mi
}
// GetBatchPeerUpdate - if batch peer update
func GetBatchPeerUpdate() bool {
enabled := true
if os.Getenv("PEER_UPDATE_BATCH") != "" {
enabled = os.Getenv("PEER_UPDATE_BATCH") == "true"
}
return enabled
}
// GetPeerUpdateBatchSize - get the batch size for peer update
func GetPeerUpdateBatchSize() int {
//default 50
batchSize := 50
if os.Getenv("PEER_UPDATE_BATCH_SIZE") != "" {
b, e := strconv.Atoi(os.Getenv("PEER_UPDATE_BATCH_SIZE"))
if e == nil && b > 0 && b < 1000 {
batchSize = b
}
}
return batchSize
}
// GetEmqxRestEndpoint - returns the REST API Endpoint of EMQX
func GetEmqxRestEndpoint() string {
return os.Getenv("EMQX_REST_ENDPOINT")