From 66069fbc347f51a4dbad0a76c832b4c8bf19694d Mon Sep 17 00:00:00 2001 From: Abhishek K <32607604+abhishek9686@users.noreply.github.com> Date: Thu, 11 Apr 2024 21:18:57 +0530 Subject: [PATCH] NET-1082: Scale Testing Fixes (#2894) * add additional mutex lock on node acls func * increase verbosity * disable acls on cloud emqx * add emqx creds creation to go routine * add debug log of mq client id * comment port check * uncomment port check * check for connection mq connection open * use username for client id * add write mutex on acl is allowed * add mq connection lost handler on server * spin off zombie init as go routine * get whole api path from config * Revert "get whole api path from config" This reverts commit 392f5f4c5f00530788f09d26a655dcbe03a06ccb. * update extclient acls async * add additional mutex lock on node acls func (cherry picked from commit 5325f0e7d7ff9411f497fdc38c980ac0c3a6847d) * increase verbosity (cherry picked from commit 705b3cf0bfbca4d7f5dccdd579875ebb00f85511) * add emqx creds creation to go routine (cherry picked from commit c8e65f4820771eb0c7c7d62b77334211c6b82adb) * add debug log of mq client id (cherry picked from commit 29c5d6cecad6fcaeb4a57bac85895d6516294e28) * comment port check (cherry picked from commit db8d6d95ead39e9f436ad4dbc4176f5ff9312863) * check for connection mq connection open (cherry picked from commit 13b11033b0795693a0a1a0a225db50b8a5c001ae) * use username for client id (cherry picked from commit e90c7386dea48b560c9060289f1db06f2a8d77c1) * add write mutex on acl is allowed (cherry picked from commit 4cae1b0bb4b4b608fdb76b3ef49c7fd7390c9ccb) * add mq connection lost handler on server (cherry picked from commit c82918ad3564098487d5ac4a223ee5d95e76ac3e) * spin off zombie init as go routine (cherry picked from commit 6d65c44c4375ff7a292c05d2becf6507e7310837) * update extclient acls async (cherry picked from commit 6557ef1ebe87ec7c74e9038a99185616a2b87e89) * additionl logs for oauth user flow (cherry picked from commit 61703038ae3227de6f706e1d1d9a9362e4c258d5) * add more debug logs (cherry picked from commit 5980beacd10e7efa1cfbc12e6c048452d9ceca82) * add more debug logs (cherry picked from commit 4d001f0d2709fcc55ae66e2591a0bd079207a61d) * add set auth secret (cherry picked from commit f41cef5da51a884e68d12ac90ae627428b02b112) * fix fetch pass (cherry picked from commit 825caf4b600133f8365f5ab71e0ecd69cc8a996b) * make sure auth secret is set only once (cherry picked from commit ba33ed02aa52237126904f4bbe17b53dd595d7cf) * make sure auth secret is set only once (cherry picked from commit 920ac4c5073fb2c8805520d37abddffb062896ae) * comment usage of emqx acls * replace read lock with write lock on acls * replace read lock with write lock on acls (cherry picked from commit 808d2135c80461edfa96b61070e09ad136fa4644) * use deadlock pkg for visibility * add additional mutex locks * remove race flag * on mq re-connecting donot exit if failed * on mq re-connecting donot exit if failed * revert mutex package change * set mq clean session * remove debug log * go mod tidy * revert on prem emqx acls del --- controllers/enrollmentkeys.go | 2 +- controllers/ext_client.go | 11 ++- controllers/hosts.go | 33 ++++---- logic/acls/common.go | 6 +- logic/acls/nodeacls/retrieve.go | 9 +- logic/errors.go | 4 +- logic/zombie.go | 2 +- main.go | 2 +- migrate/migrate.go | 2 +- mq/emqx_cloud.go | 141 +------------------------------- mq/handlers.go | 2 +- mq/mq.go | 29 +++++-- pro/controllers/failover.go | 7 +- 13 files changed, 71 insertions(+), 179 deletions(-) diff --git a/controllers/enrollmentkeys.go b/controllers/enrollmentkeys.go index 1eff0808..ec9701e2 100644 --- a/controllers/enrollmentkeys.go +++ b/controllers/enrollmentkeys.go @@ -308,7 +308,7 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) { if !hostExists { newHost.PersistentKeepalive = models.DefaultPersistentKeepAlive // register host - logic.CheckHostPorts(&newHost) + //logic.CheckHostPorts(&newHost) // create EMQX credentials and ACLs for host if servercfg.GetBrokerType() == servercfg.EmqxBrokerType { if err := mq.GetEmqxHandler().CreateEmqxUser(newHost.ID.String(), newHost.HostPass); err != nil { diff --git a/controllers/ext_client.go b/controllers/ext_client.go index 0f1f3db3..c92e7eff 100644 --- a/controllers/ext_client.go +++ b/controllers/ext_client.go @@ -436,15 +436,14 @@ func createExtClient(w http.ResponseWriter, r *http.Request) { return } - if err := logic.SetClientDefaultACLs(&extclient); err != nil { - slog.Error("failed to set default acls for extclient", "user", r.Header.Get("user"), "network", node.Network, "error", err) - logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) - return - } - slog.Info("created extclient", "user", r.Header.Get("user"), "network", node.Network, "clientid", extclient.ClientID) w.WriteHeader(http.StatusOK) go func() { + if err := logic.SetClientDefaultACLs(&extclient); err != nil { + slog.Error("failed to set default acls for extclient", "user", r.Header.Get("user"), "network", node.Network, "error", err) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) + return + } if err := mq.PublishPeerUpdate(false); err != nil { logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error()) } diff --git a/controllers/hosts.go b/controllers/hosts.go index caf579f2..fa390ba9 100644 --- a/controllers/hosts.go +++ b/controllers/hosts.go @@ -554,26 +554,27 @@ func authenticateHost(response http.ResponseWriter, request *http.Request) { logic.ReturnErrorResponse(response, request, errorResponse) return } - - // Create EMQX creds and ACLs if not found - if servercfg.GetBrokerType() == servercfg.EmqxBrokerType { - if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil { - slog.Error("failed to create host credentials for EMQX: ", err.Error()) - } else { - if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil { - slog.Error("failed to add host ACL rules to EMQX: ", err.Error()) - } - for _, nodeID := range host.Nodes { - if node, err := logic.GetNodeByID(nodeID); err == nil { - if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil { - slog.Error("failed to add ACLs for EMQX node", "error", err) + go func() { + // Create EMQX creds and ACLs if not found + if servercfg.GetBrokerType() == servercfg.EmqxBrokerType { + if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil { + slog.Error("failed to create host credentials for EMQX: ", err.Error()) + } else { + if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil { + slog.Error("failed to add host ACL rules to EMQX: ", err.Error()) + } + for _, nodeID := range host.Nodes { + if node, err := logic.GetNodeByID(nodeID); err == nil { + if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil { + slog.Error("failed to add ACLs for EMQX node", "error", err) + } + } else { + slog.Error("failed to get node", "nodeid", nodeID, "error", err) } - } else { - slog.Error("failed to get node", "nodeid", nodeID, "error", err) } } } - } + }() response.WriteHeader(http.StatusOK) response.Header().Set("Content-Type", "application/json") diff --git a/logic/acls/common.go b/logic/acls/common.go index fd1ba071..9296e3b8 100644 --- a/logic/acls/common.go +++ b/logic/acls/common.go @@ -64,9 +64,9 @@ func (acl ACL) Save(containerID ContainerID, ID AclID) (ACL, error) { // ACL.IsAllowed - sees if ID is allowed in referring ACL func (acl ACL) IsAllowed(ID AclID) (allowed bool) { - AclMutex.RLock() + AclMutex.Lock() allowed = acl[ID] == Allowed - AclMutex.RUnlock() + AclMutex.Unlock() return } @@ -88,6 +88,8 @@ func (aclContainer ACLContainer) RemoveACL(ID AclID) ACLContainer { // ACLContainer.ChangeAccess - changes the relationship between two nodes in memory func (networkACL ACLContainer) ChangeAccess(ID1, ID2 AclID, value byte) { + AclMutex.Lock() + defer AclMutex.Unlock() if _, ok := networkACL[ID1]; !ok { slog.Error("ACL missing for ", "id", ID1) return diff --git a/logic/acls/nodeacls/retrieve.go b/logic/acls/nodeacls/retrieve.go index 15397c24..d5fa68c4 100644 --- a/logic/acls/nodeacls/retrieve.go +++ b/logic/acls/nodeacls/retrieve.go @@ -3,21 +3,26 @@ package nodeacls import ( "encoding/json" "fmt" + "sync" "github.com/gravitl/netmaker/logic/acls" ) +var NodesAllowedACLMutex = &sync.Mutex{} + // AreNodesAllowed - checks if nodes are allowed to communicate in their network ACL func AreNodesAllowed(networkID NetworkID, node1, node2 NodeID) bool { + NodesAllowedACLMutex.Lock() + defer NodesAllowedACLMutex.Unlock() var currentNetworkACL, err = FetchAllACLs(networkID) if err != nil { return false } var allowed bool - acls.AclMutex.RLock() + acls.AclMutex.Lock() currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)] currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)] - acls.AclMutex.RUnlock() + acls.AclMutex.Unlock() allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1)) return allowed } diff --git a/logic/errors.go b/logic/errors.go index edf2360a..1f46e578 100644 --- a/logic/errors.go +++ b/logic/errors.go @@ -4,8 +4,8 @@ import ( "encoding/json" "net/http" - "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/models" + "golang.org/x/exp/slog" ) // FormatError - takes ErrorResponse and uses correct code @@ -62,7 +62,7 @@ func ReturnErrorResponse(response http.ResponseWriter, request *http.Request, er if err != nil { panic(err) } - logger.Log(1, "processed request error:", errorMessage.Message) + slog.Debug("processed request error", "err", errorMessage.Message) response.Header().Set("Content-Type", "application/json") response.WriteHeader(errorMessage.Code) response.Write(jsonResponse) diff --git a/logic/zombie.go b/logic/zombie.go index 2e1692ed..14bb9523 100644 --- a/logic/zombie.go +++ b/logic/zombie.go @@ -76,7 +76,7 @@ func checkForZombieHosts(h *models.Host) { // ManageZombies - goroutine which adds/removes/deletes nodes from the zombie node quarantine list func ManageZombies(ctx context.Context, peerUpdate chan *models.Node) { logger.Log(2, "Zombie management started") - InitializeZombies() + go InitializeZombies() // Zombie Nodes Cleanup Four Times a Day ticker := time.NewTicker(time.Hour * ZOMBIE_TIMEOUT) diff --git a/main.go b/main.go index 97b425db..92f87b28 100644 --- a/main.go +++ b/main.go @@ -155,7 +155,7 @@ func runMessageQueue(wg *sync.WaitGroup, ctx context.Context) { defer wg.Done() brokerHost, _ := servercfg.GetMessageQueueEndpoint() logger.Log(0, "connecting to mq broker at", brokerHost) - mq.SetupMQTT() + mq.SetupMQTT(true) if mq.IsConnected() { logger.Log(0, "connected to MQ Broker") } else { diff --git a/migrate/migrate.go b/migrate/migrate.go index 3363d01d..e1474b86 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -287,7 +287,7 @@ func updateAcls() { } // save new acls - slog.Info(fmt.Sprintf("(migration) saving new acls for network: %s", network.NetID), "networkAcl", networkAcl) + slog.Debug(fmt.Sprintf("(migration) saving new acls for network: %s", network.NetID), "networkAcl", networkAcl) if _, err := networkAcl.Save(acls.ContainerID(network.NetID)); err != nil { slog.Error(fmt.Sprintf("error during acls migration. error saving new acls for network: %s", network.NetID), "error", err) continue diff --git a/mq/emqx_cloud.go b/mq/emqx_cloud.go index 658d84ed..23bfb244 100644 --- a/mq/emqx_cloud.go +++ b/mq/emqx_cloud.go @@ -22,13 +22,6 @@ type userCreateReq struct { Password string `json:"password"` } -type cloudAcl struct { - UserName string `json:"username"` - Topic string `json:"topic"` - Action string `json:"action"` - Access string `json:"access"` -} - func (e *EmqxCloud) GetType() servercfg.Emqxdeploy { return servercfg.EmqxCloudDeploy } func (e *EmqxCloud) CreateEmqxUser(username, pass string) error { @@ -89,54 +82,7 @@ func (e *EmqxCloud) CreateEmqxUserforServer() error { if res.StatusCode != http.StatusOK { return errors.New("request failed " + string(body)) } - // add acls - acls := []cloudAcl{ - { - UserName: servercfg.GetMqUserName(), - Topic: fmt.Sprintf("update/%s/#", servercfg.GetServer()), - Access: "allow", - Action: "sub", - }, - { - UserName: servercfg.GetMqUserName(), - Topic: fmt.Sprintf("host/serverupdate/%s/#", servercfg.GetServer()), - Access: "allow", - Action: "sub", - }, - { - UserName: servercfg.GetMqUserName(), - Topic: fmt.Sprintf("signal/%s/#", servercfg.GetServer()), - Access: "allow", - Action: "sub", - }, - { - UserName: servercfg.GetMqUserName(), - Topic: fmt.Sprintf("metrics/%s/#", servercfg.GetServer()), - Access: "allow", - Action: "sub", - }, - { - UserName: servercfg.GetMqUserName(), - Topic: "peers/host/#", - Access: "allow", - Action: "pub", - }, - { - UserName: servercfg.GetMqUserName(), - Topic: "node/update/#", - Access: "allow", - Action: "pub", - }, - { - - UserName: servercfg.GetMqUserName(), - Topic: "host/update/#", - Access: "allow", - Action: "pub", - }, - } - - return e.createacls(acls) + return nil } func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore @@ -147,94 +93,13 @@ func (e *EmqxCloud) CreateDefaultDenyRule() error { return nil } -func (e *EmqxCloud) createacls(acls []cloudAcl) error { - payload, err := json.Marshal(acls) - if err != nil { - return err - } - client := &http.Client{} - req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/acl", e.URL), strings.NewReader(string(payload))) - if err != nil { - return err - } - req.Header.Add("Content-Type", "application/json") - req.SetBasicAuth(e.AppID, e.AppSecret) - res, err := client.Do(req) - if err != nil { - return err - } - defer res.Body.Close() - - body, err := io.ReadAll(res.Body) - if err != nil { - return err - } - if res.StatusCode != http.StatusOK { - return errors.New("request failed " + string(body)) - } +func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error { return nil } -func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error { - acls := []cloudAcl{ - { - UserName: hostID, - Topic: fmt.Sprintf("peers/host/%s/%s", hostID, serverName), - Access: "allow", - Action: "sub", - }, - { - UserName: hostID, - Topic: fmt.Sprintf("host/update/%s/%s", hostID, serverName), - Access: "allow", - Action: "sub", - }, - { - UserName: hostID, - Topic: fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID), - Access: "allow", - Action: "pub", - }, - } - - return e.createacls(acls) -} - func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error { - acls := []cloudAcl{ - { - UserName: hostID, - Topic: fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID), - Access: "allow", - Action: "sub", - }, - { - UserName: hostID, - Topic: fmt.Sprintf("ping/%s/%s", serverName, nodeID), - Access: "allow", - Action: "pubsub", - }, - { - UserName: hostID, - Topic: fmt.Sprintf("update/%s/%s", serverName, nodeID), - Access: "allow", - Action: "pubsub", - }, - { - UserName: hostID, - Topic: fmt.Sprintf("signal/%s/%s", serverName, nodeID), - Access: "allow", - Action: "pubsub", - }, - { - UserName: hostID, - Topic: fmt.Sprintf("metrics/%s/%s", serverName, nodeID), - Access: "allow", - Action: "pubsub", - }, - } + return nil - return e.createacls(acls) } func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list diff --git a/mq/handlers.go b/mq/handlers.go index 974cc5e9..028f9649 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -92,7 +92,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) { } decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload()) if decryptErr != nil { - slog.Error("failed to decrypt message for host", "id", id, "error", decryptErr) + slog.Error("failed to decrypt message for host", "id", id, "name", currentHost.Name, "error", decryptErr) return } var hostUpdate models.HostUpdate diff --git a/mq/mq.go b/mq/mq.go index b09128c4..9ab2297b 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -8,8 +8,8 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gravitl/netmaker/logger" - "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/servercfg" + "golang.org/x/exp/slog" ) // KEEPALIVE_TIMEOUT - time in seconds for timeout @@ -27,12 +27,12 @@ var mqclient mqtt.Client func setMqOptions(user, password string, opts *mqtt.ClientOptions) { broker, _ := servercfg.GetMessageQueueEndpoint() opts.AddBroker(broker) - id := logic.RandomString(23) - opts.ClientID = id + opts.ClientID = user opts.SetUsername(user) opts.SetPassword(password) opts.SetAutoReconnect(true) opts.SetConnectRetry(true) + opts.SetCleanSession(true) opts.SetConnectRetryInterval(time.Second * 4) opts.SetKeepAlive(time.Minute) opts.SetCleanSession(true) @@ -40,7 +40,7 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) { } // SetupMQTT creates a connection to broker and return client -func SetupMQTT() { +func SetupMQTT(fatal bool) { if servercfg.GetBrokerType() == servercfg.EmqxBrokerType { if emqx.GetType() == servercfg.EmqxOnPremDeploy { time.Sleep(10 * time.Second) // wait for the REST endpoint to be ready @@ -70,6 +70,7 @@ func SetupMQTT() { opts := mqtt.NewClientOptions() setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts) + logger.Log(0, "Mq Client Connecting with Random ID: ", opts.ClientID) opts.SetOnConnectHandler(func(client mqtt.Client) { serverName := servercfg.GetServer() if token := client.Subscribe(fmt.Sprintf("update/%s/#", serverName), 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { @@ -92,6 +93,13 @@ func SetupMQTT() { opts.SetOrderMatters(false) opts.SetResumeSubs(true) }) + opts.SetConnectionLostHandler(func(c mqtt.Client, e error) { + slog.Warn("detected broker connection lost", "err", e.Error()) + c.Disconnect(250) + slog.Info("re-initiating MQ connection") + SetupMQTT(false) + + }) mqclient = mqtt.NewClient(opts) tperiod := time.Now().Add(10 * time.Second) for { @@ -99,9 +107,16 @@ func SetupMQTT() { logger.Log(2, "unable to connect to broker, retrying ...") if time.Now().After(tperiod) { if token.Error() == nil { - logger.FatalLog("could not connect to broker, token timeout, exiting ...") + if fatal { + logger.FatalLog("could not connect to broker, token timeout, exiting ...") + } + logger.Log(0, "could not connect to broker, token timeout, exiting ...") + } else { - logger.FatalLog("could not connect to broker, exiting ...", token.Error().Error()) + if fatal { + logger.FatalLog("could not connect to broker, exiting ...", token.Error().Error()) + } + logger.Log(0, "could not connect to broker, exiting ...", token.Error().Error()) } } } else { @@ -125,7 +140,7 @@ func Keepalive(ctx context.Context) { // IsConnected - function for determining if the mqclient is connected or not func IsConnected() bool { - return mqclient != nil && mqclient.IsConnected() + return mqclient != nil && mqclient.IsConnectionOpen() } // CloseClient - function to close the mq connection from server diff --git a/pro/controllers/failover.go b/pro/controllers/failover.go index 8a199c3b..37eaa179 100644 --- a/pro/controllers/failover.go +++ b/pro/controllers/failover.go @@ -134,10 +134,15 @@ func failOverME(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) return } + host, err := logic.GetHost(node.HostID.String()) + if err != nil { + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) + return + } failOverNode, exists := proLogic.FailOverExists(node.Network) if !exists { - logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failover node doesn't exist in the network"), "badrequest")) + logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("req-from: %s, failover node doesn't exist in the network", host.Name), "badrequest")) return } var failOverReq models.FailOverMeReq