From 77c901e4d93fc0e578565cdb33c98d33e672bf1f Mon Sep 17 00:00:00 2001 From: Abhishek Kondur Date: Wed, 5 Oct 2022 19:48:55 +0530 Subject: [PATCH 1/5] fix for node connect/disconnect issue --- netclient/functions/daemon.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index d3a1c99f..afcf70fd 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -239,7 +239,7 @@ func setupMQTTSingleton(cfg *config.ClientConfig) error { opts.AddBroker("mqtts://" + server + ":" + port) opts.SetUsername(cfg.Node.ID) opts.SetPassword(string(pass)) - mqclient := mqtt.NewClient(opts) + mqclient = mqtt.NewClient(opts) var connecterr error opts.SetClientID(ncutils.MakeRandomString(23)) if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil { From fe5fd7fce91de768af761514ba7315e687150200 Mon Sep 17 00:00:00 2001 From: Abhishek Kondur Date: Wed, 5 Oct 2022 21:21:34 +0530 Subject: [PATCH 2/5] do checkin after mq is connected, publish metrics only when node is connected --- netclient/functions/daemon.go | 11 ++++++++++- netclient/functions/mqpublish.go | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index afcf70fd..ae626564 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -68,12 +68,14 @@ func Daemon() error { cancel() logger.Log(0, "shutting down netclient daemon") wg.Wait() + mqclient.Disconnect(250) logger.Log(0, "shutdown complete") return nil case <-reset: logger.Log(0, "received reset") cancel() wg.Wait() + mqclient.Disconnect(250) logger.Log(0, "restarting daemon") cancel = startGoRoutines(&wg) } @@ -109,7 +111,14 @@ func startGoRoutines(wg *sync.WaitGroup) context.CancelFunc { } } wg.Add(1) - go Checkin(ctx, wg) + for { + if mqclient != nil && mqclient.IsConnected() { + go Checkin(ctx, wg) + break + } + time.Sleep(time.Second) + } + return cancel } diff --git a/netclient/functions/mqpublish.go b/netclient/functions/mqpublish.go index e2ef7bc9..fe33a3fd 100644 --- a/netclient/functions/mqpublish.go +++ b/netclient/functions/mqpublish.go @@ -107,7 +107,7 @@ func checkin() { config.Write(&nodeCfg, nodeCfg.Network) } Hello(&nodeCfg) - if nodeCfg.Server.Is_EE { + if nodeCfg.Server.Is_EE && nodeCfg.Node.Connected == "yes" { logger.Log(0, "collecting metrics for node", nodeCfg.Node.Name) publishMetrics(&nodeCfg) } From 28ca66b32d116e79aebd03725d192ef94c0868d5 Mon Sep 17 00:00:00 2001 From: Abhishek Kondur Date: Thu, 6 Oct 2022 00:54:05 +0530 Subject: [PATCH 3/5] delete node api fix --- controllers/node.go | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/controllers/node.go b/controllers/node.go index ab2a67c9..95245f93 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -83,12 +83,15 @@ func authenticate(response http.ResponseWriter, request *http.Request) { var err error result, err = logic.GetNodeByID(authRequest.ID) if err != nil { - errorResponse.Code = http.StatusBadRequest - errorResponse.Message = err.Error() - logger.Log(0, request.Header.Get("user"), - fmt.Sprintf("failed to get node info [%s]: %v", authRequest.ID, err)) - logic.ReturnErrorResponse(response, request, errorResponse) - return + result, err = logic.GetDeletedNodeByID(authRequest.ID) + if err != nil { + errorResponse.Code = http.StatusBadRequest + errorResponse.Message = err.Error() + logger.Log(0, request.Header.Get("user"), + fmt.Sprintf("failed to get node info [%s]: %v", authRequest.ID, err)) + logic.ReturnErrorResponse(response, request, errorResponse) + return + } } err = bcrypt.CompareHashAndPassword([]byte(result.Password), []byte(authRequest.Password)) @@ -256,7 +259,6 @@ func authorize(nodesAllowed, networkCheck bool, authNetwork string, next http.Ha logic.ReturnErrorResponse(w, r, errorResponse) return } - r.Header.Set("requestfrom", "") //check if node instead of user if nodesAllowed { // TODO --- should ensure that node is only operating on itself @@ -264,7 +266,6 @@ func authorize(nodesAllowed, networkCheck bool, authNetwork string, next http.Ha // this indicates request is from a node // used for failover - if a getNode comes from node, this will trigger a metrics wipe - r.Header.Set("requestfrom", "node") next.ServeHTTP(w, r) return } @@ -1040,10 +1041,20 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { fromNode := r.Header.Get("requestfrom") == "node" var node, err = logic.GetNodeByID(nodeid) if err != nil { - logger.Log(0, r.Header.Get("user"), - fmt.Sprintf("error fetching node [ %s ] info: %v", nodeid, err)) - logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) - return + if fromNode { + node, err = logic.GetDeletedNodeByID(nodeid) + if err != nil { + logger.Log(0, r.Header.Get("user"), + fmt.Sprintf("error fetching node from deleted nodes [ %s ] info: %v", nodeid, err)) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) + return + } + } else { + logger.Log(0, r.Header.Get("user"), + fmt.Sprintf("error fetching node [ %s ] info: %v", nodeid, err)) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) + return + } } if isServer(&node) { err := fmt.Errorf("cannot delete server node") From b3df6d1a6835c13c37ce20e2819da9c9b607bffe Mon Sep 17 00:00:00 2001 From: Abhishek Kondur Date: Thu, 6 Oct 2022 00:58:13 +0530 Subject: [PATCH 4/5] check for empty record --- logic/nodes.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/logic/nodes.go b/logic/nodes.go index 38851142..12e4aabf 100644 --- a/logic/nodes.go +++ b/logic/nodes.go @@ -186,7 +186,9 @@ func DeleteNodeByID(node *models.Node, exterminate bool) error { } } if err = database.DeleteRecord(database.NODES_TABLE_NAME, key); err != nil { - return err + if !database.IsEmptyRecord(err) { + return err + } } if servercfg.IsDNSMode() { From f23b0a11ba8ccd9ab7313b9e18820be6054d4bbe Mon Sep 17 00:00:00 2001 From: Abhishek Kondur Date: Thu, 6 Oct 2022 00:59:13 +0530 Subject: [PATCH 5/5] check mq is connected before checkin, rm unused functions --- netclient/functions/common.go | 28 +------------------ netclient/functions/daemon.go | 47 +++++--------------------------- netclient/functions/mqpublish.go | 8 ++++-- 3 files changed, 14 insertions(+), 69 deletions(-) diff --git a/netclient/functions/common.go b/netclient/functions/common.go index 7b78d7ba..34ead956 100644 --- a/netclient/functions/common.go +++ b/netclient/functions/common.go @@ -192,37 +192,10 @@ func LeaveNetwork(network string) error { if err := removeHostDNS(cfg.Node.Interface, ncutils.IsWindows()); err != nil { logger.Log(0, "failed to delete dns entries for", cfg.Node.Interface, err.Error()) } - logger.Log(2, "deleting broker keys as required") - if !brokerInUse(cfg.Server.Server) { - if err := deleteBrokerFiles(cfg.Server.Server); err != nil { - logger.Log(0, "failed to deleter certs for", cfg.Server.Server, err.Error()) - } - } logger.Log(2, "restarting daemon") return daemon.Restart() } -func brokerInUse(broker string) bool { - networks, _ := ncutils.GetSystemNetworks() - for _, net := range networks { - cfg := config.ClientConfig{} - cfg.Network = net - cfg.ReadConfig() - if cfg.Server.Server == broker { - return true - } - } - return false -} - -func deleteBrokerFiles(broker string) error { - dir := ncutils.GetNetclientServerPath(broker) - if err := os.RemoveAll(dir); err != nil { - return err - } - return nil -} - func deleteNodeFromServer(cfg *config.ClientConfig) error { node := cfg.Node if node.IsServer == "yes" { @@ -340,6 +313,7 @@ func API(data any, method, url, authorization string) (*http.Response, error) { if authorization != "" { request.Header.Set("authorization", "Bearer "+authorization) } + request.Header.Set("requestfrom", "node") return HTTPClient.Do(request) } diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index ae626564..12b3ddbe 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -2,8 +2,6 @@ package functions import ( "context" - "crypto/tls" - "crypto/x509" "errors" "fmt" "os" @@ -68,14 +66,18 @@ func Daemon() error { cancel() logger.Log(0, "shutting down netclient daemon") wg.Wait() - mqclient.Disconnect(250) + if mqclient != nil { + mqclient.Disconnect(250) + } logger.Log(0, "shutdown complete") return nil case <-reset: logger.Log(0, "received reset") cancel() wg.Wait() - mqclient.Disconnect(250) + if mqclient != nil { + mqclient.Disconnect(250) + } logger.Log(0, "restarting daemon") cancel = startGoRoutines(&wg) } @@ -111,14 +113,7 @@ func startGoRoutines(wg *sync.WaitGroup) context.CancelFunc { } } wg.Add(1) - for { - if mqclient != nil && mqclient.IsConnected() { - go Checkin(ctx, wg) - break - } - time.Sleep(time.Second) - } - + go Checkin(ctx, wg) return cancel } @@ -207,34 +202,6 @@ func messageQueue(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientCon logger.Log(0, "shutting down message queue for server", cfg.Server.Server) } -// NewTLSConf sets up tls configuration to connect to broker securely -func NewTLSConfig(server string) (*tls.Config, error) { - file := ncutils.GetNetclientServerPath(server) + ncutils.GetSeparator() + "root.pem" - certpool := x509.NewCertPool() - ca, err := os.ReadFile(file) - if err != nil { - logger.Log(0, "could not read CA file", err.Error()) - } - ok := certpool.AppendCertsFromPEM(ca) - if !ok { - logger.Log(0, "failed to append cert") - } - clientKeyPair, err := tls.LoadX509KeyPair(ncutils.GetNetclientServerPath(server)+ncutils.GetSeparator()+"client.pem", ncutils.GetNetclientPath()+ncutils.GetSeparator()+"client.key") - if err != nil { - logger.Log(0, "could not read client cert/key", err.Error()) - return nil, err - } - certs := []tls.Certificate{clientKeyPair} - return &tls.Config{ - RootCAs: certpool, - ClientAuth: tls.NoClientCert, - ClientCAs: nil, - Certificates: certs, - InsecureSkipVerify: false, - }, nil - -} - // func setMQTTSingenton creates a connection to broker for single use (ie to publish a message) // only to be called from cli (eg. connect/disconnect, join, leave) and not from daemon --- func setupMQTTSingleton(cfg *config.ClientConfig) error { diff --git a/netclient/functions/mqpublish.go b/netclient/functions/mqpublish.go index fe33a3fd..233bc2f5 100644 --- a/netclient/functions/mqpublish.go +++ b/netclient/functions/mqpublish.go @@ -29,7 +29,6 @@ var metricsCache = new(sync.Map) func Checkin(ctx context.Context, wg *sync.WaitGroup) { logger.Log(2, "starting checkin goroutine") defer wg.Done() - checkin() ticker := time.NewTicker(time.Minute * ncutils.CheckInInterval) defer ticker.Stop() for { @@ -38,7 +37,12 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup) { logger.Log(0, "checkin routine closed") return case <-ticker.C: - checkin() + if mqclient != nil && mqclient.IsConnected() { + checkin() + } else { + logger.Log(0, "MQ client is not connected, skipping checkin...") + } + } } }