diff --git a/controllers/controller.go b/controllers/controller.go index d80093be..4ce41e47 100644 --- a/controllers/controller.go +++ b/controllers/controller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "os" "strings" "sync" "time" @@ -11,6 +12,7 @@ import ( "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/gravitl/netmaker/logger" + m "github.com/gravitl/netmaker/migrate" "github.com/gravitl/netmaker/servercfg" ) @@ -62,6 +64,11 @@ func HandleRESTRequests(wg *sync.WaitGroup, ctx context.Context) { logger.Log(0, err.Error()) } }() + if os.Getenv("MIGRATE_EMQX") == "true" { + logger.Log(0, "migrating emqx...") + time.Sleep(time.Second * 2) + m.MigrateEmqx() + } logger.Log(0, "REST Server successfully started on port ", port, " (REST)") // Block main routine until a signal is received diff --git a/controllers/node.go b/controllers/node.go index d0a77ce7..2c2ce043 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -645,6 +645,17 @@ func updateNode(w http.ResponseWriter, r *http.Request) { } relayUpdate := logic.RelayUpdates(¤tNode, newNode) + if relayUpdate && newNode.IsRelay { + err = logic.ValidateRelay(models.RelayRequest{ + NodeID: newNode.ID.String(), + NetID: newNode.Network, + RelayedNodes: newNode.RelayedNodes, + }, true) + if err != nil { + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) + return + } + } _, err = logic.GetHost(newNode.HostID.String()) if err != nil { logger.Log(0, r.Header.Get("user"), diff --git a/logic/acls/nodeacls/modify.go b/logic/acls/nodeacls/modify.go index 54c898b7..5710dd61 100644 --- a/logic/acls/nodeacls/modify.go +++ b/logic/acls/nodeacls/modify.go @@ -77,14 +77,12 @@ func RemoveNodeACL(networkID NetworkID, nodeID NodeID) (acls.ACLContainer, error if err != nil { return nil, err } - acls.AclMutex.Lock() for currentNodeID := range currentNetworkACL { if NodeID(currentNodeID) != nodeID { currentNetworkACL[currentNodeID].Remove(acls.AclID(nodeID)) } } delete(currentNetworkACL, acls.AclID(nodeID)) - acls.AclMutex.Unlock() return currentNetworkACL.Save(acls.ContainerID(networkID)) } diff --git a/logic/acls/nodeacls/retrieve.go b/logic/acls/nodeacls/retrieve.go index 70870e4d..15397c24 100644 --- a/logic/acls/nodeacls/retrieve.go +++ b/logic/acls/nodeacls/retrieve.go @@ -15,8 +15,10 @@ func AreNodesAllowed(networkID NetworkID, node1, node2 NodeID) bool { } var allowed bool acls.AclMutex.RLock() - allowed = currentNetworkACL[acls.AclID(node1)].IsAllowed(acls.AclID(node2)) && currentNetworkACL[acls.AclID(node2)].IsAllowed(acls.AclID(node1)) + currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)] + currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)] acls.AclMutex.RUnlock() + allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1)) return allowed } diff --git a/logic/hosts.go b/logic/hosts.go index d3dd48bb..6d8c3b28 100644 --- a/logic/hosts.go +++ b/logic/hosts.go @@ -418,7 +418,6 @@ func DissasociateNodeFromHost(n *models.Node, h *models.Host) error { if err := DeleteNodeByID(n); err != nil { return err } - return UpsertHost(h) } diff --git a/logic/nodes.go b/logic/nodes.go index f5e7e125..353a33f1 100644 --- a/logic/nodes.go +++ b/logic/nodes.go @@ -189,7 +189,6 @@ func UpdateNode(currentNode *models.Node, newNode *models.Node) error { func DeleteNode(node *models.Node, purge bool) error { alreadyDeleted := node.PendingDelete || node.Action == models.NODE_DELETE node.Action = models.NODE_DELETE - //delete ext clients if node is ingress gw if node.IsIngressGateway { if err := DeleteGatewayExtClients(node.ID.String(), node.Network); err != nil { @@ -235,7 +234,6 @@ func DeleteNode(node *models.Node, purge bool) error { if node.IsInternetGateway { UnsetInternetGw(node) } - if !purge && !alreadyDeleted { newnode := *node newnode.PendingDelete = true @@ -281,7 +279,6 @@ func GetNodeByHostRef(hostid, network string) (node models.Node, err error) { func DeleteNodeByID(node *models.Node) error { var err error var key = node.ID.String() - if err = database.DeleteRecord(database.NODES_TABLE_NAME, key); err != nil { if !database.IsEmptyRecord(err) { return err diff --git a/logic/relay.go b/logic/relay.go index 181cf511..bd3c80bb 100644 --- a/logic/relay.go +++ b/logic/relay.go @@ -28,3 +28,7 @@ var SetRelayedNodes = func(setRelayed bool, relay string, relayed []string) []mo var RelayUpdates = func(currentNode, newNode *models.Node) bool { return false } + +var ValidateRelay = func(relay models.RelayRequest, update bool) error { + return nil +} diff --git a/migrate/migrate.go b/migrate/migrate.go index ce4cc840..0dd8ba5c 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log" + "time" "golang.org/x/exp/slog" @@ -12,6 +13,7 @@ import ( "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/logic/acls" "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/servercfg" ) @@ -22,6 +24,7 @@ func Run() { updateHosts() updateNodes() updateAcls() + } func assignSuperAdmin() { @@ -292,3 +295,19 @@ func updateAcls() { slog.Info(fmt.Sprintf("(migration) successfully saved new acls for network: %s", network.NetID)) } } + +func MigrateEmqx() { + + err := mq.SendPullSYN() + if err != nil { + logger.Log(0, "failed to send pull syn to clients", "error", err.Error()) + + } + time.Sleep(time.Second * 3) + slog.Info("proceeding to kicking out clients from emqx") + err = mq.KickOutClients() + if err != nil { + logger.Log(0, "failed to migrate emqx: ", "kickout-error", err.Error()) + } + +} diff --git a/mq/migrate.go b/mq/migrate.go new file mode 100644 index 00000000..068d785f --- /dev/null +++ b/mq/migrate.go @@ -0,0 +1,131 @@ +package mq + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gravitl/netmaker/logger" + "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/servercfg" + "golang.org/x/exp/slog" +) + +func setupmqtt_old() (mqtt.Client, error) { + + opts := mqtt.NewClientOptions() + opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT")) + id := logic.RandomString(23) + opts.ClientID = id + opts.SetUsername(os.Getenv("OLD_MQ_USERNAME")) + opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD")) + opts.SetAutoReconnect(true) + opts.SetConnectRetry(true) + opts.SetConnectRetryInterval(time.Second << 2) + opts.SetKeepAlive(time.Minute) + opts.SetWriteTimeout(time.Minute) + mqclient := mqtt.NewClient(opts) + + var connecterr error + if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil { + if token.Error() == nil { + connecterr = errors.New("connect timeout") + } else { + connecterr = token.Error() + } + slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr) + } + return mqclient, nil +} + +func getEmqxAuthTokenOld() (string, error) { + payload, err := json.Marshal(&emqxLogin{ + Username: os.Getenv("OLD_MQ_USERNAME"), + Password: os.Getenv("OLD_MQ_PASSWORD"), + }) + if err != nil { + return "", err + } + resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload)) + if err != nil { + return "", err + } + msg, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("error during EMQX login %v", string(msg)) + } + var loginResp emqxLoginResponse + if err := json.Unmarshal(msg, &loginResp); err != nil { + return "", err + } + return loginResp.Token, nil +} + +func SendPullSYN() error { + mqclient, err := setupmqtt_old() + if err != nil { + return err + } + hosts, err := logic.GetAllHosts() + if err != nil { + return err + } + for _, host := range hosts { + host := host + hostUpdate := models.HostUpdate{ + Action: models.RequestPull, + Host: host, + } + msg, _ := json.Marshal(hostUpdate) + encrypted, encryptErr := encryptMsg(&host, msg) + if encryptErr != nil { + continue + } + logger.Log(0, "sending pull syn to", host.Name) + mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted) + } + return nil +} + +func KickOutClients() error { + authToken, err := getEmqxAuthTokenOld() + if err != nil { + return err + } + hosts, err := logic.GetAllHosts() + if err != nil { + slog.Error("failed to migrate emqx: ", "error", err) + return err + } + + for _, host := range hosts { + url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String()) + client := &http.Client{} + req, err := http.NewRequest(http.MethodDelete, url, nil) + if err != nil { + slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err) + continue + } + req.Header.Add("Authorization", "Bearer "+authToken) + res, err := client.Do(req) + if err != nil { + slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err) + continue + } + if res.StatusCode != http.StatusNoContent { + slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode) + } + res.Body.Close() + } + return nil +} diff --git a/pro/initialize.go b/pro/initialize.go index 2c06d3bb..13e2bfaa 100644 --- a/pro/initialize.go +++ b/pro/initialize.go @@ -101,6 +101,7 @@ func InitPro() { logic.UpdateRelayed = proLogic.UpdateRelayed logic.SetRelayedNodes = proLogic.SetRelayedNodes logic.RelayUpdates = proLogic.RelayUpdates + logic.ValidateRelay = proLogic.ValidateRelay logic.GetTrialEndDate = getTrialEndDate logic.SetDefaultGw = proLogic.SetDefaultGw logic.SetDefaultGwForRelayedUpdate = proLogic.SetDefaultGwForRelayedUpdate diff --git a/pro/logic/relays.go b/pro/logic/relays.go index 09e56dc0..6c06f8c0 100644 --- a/pro/logic/relays.go +++ b/pro/logic/relays.go @@ -44,7 +44,7 @@ func CreateRelay(relay models.RelayRequest) ([]models.Node, models.Node, error) if host.OS != "linux" { return returnnodes, models.Node{}, fmt.Errorf("only linux machines can be relay nodes") } - err = ValidateRelay(relay) + err = ValidateRelay(relay, false) if err != nil { return returnnodes, models.Node{}, err } @@ -101,14 +101,14 @@ func SetRelayedNodes(setRelayed bool, relay string, relayed []string) []models.N // } // ValidateRelay - checks if relay is valid -func ValidateRelay(relay models.RelayRequest) error { +func ValidateRelay(relay models.RelayRequest, update bool) error { var err error node, err := logic.GetNodeByID(relay.NodeID) if err != nil { return err } - if node.IsRelay { + if !update && node.IsRelay { return errors.New("node is already acting as a relay") } for _, relayedNodeID := range relay.RelayedNodes { @@ -119,6 +119,9 @@ func ValidateRelay(relay models.RelayRequest) error { if relayedNode.IsIngressGateway { return errors.New("cannot relay an ingress gateway (" + relayedNodeID + ")") } + if relayedNode.IsInternetGateway { + return errors.New("cannot relay an internet gateway (" + relayedNodeID + ")") + } } return err }