diff --git a/controllers/hosts.go b/controllers/hosts.go index 5b3466a7..485f3503 100644 --- a/controllers/hosts.go +++ b/controllers/hosts.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" + "github.com/google/uuid" "github.com/gorilla/mux" "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/logger" @@ -99,6 +100,16 @@ func pull(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) return } + for _, nodeID := range host.Nodes { + node, err := logic.GetNodeByID(nodeID) + if err != nil { + slog.Error("failed to get node:", "id", node.ID, "error", err) + continue + } + if node.FailedOverBy != uuid.Nil { + go logic.ResetFailedOverPeer(&node) + } + } allNodes, err := logic.GetAllNodes() if err != nil { logger.Log(0, "failed to get nodes: ", hostID) @@ -533,39 +544,33 @@ func signalPeer(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) return } - if signal.ToHostPubKey == "" || signal.TurnRelayEndpoint == "" { + if signal.ToHostPubKey == "" || (!servercfg.IsPro && signal.TurnRelayEndpoint == "") { msg := "insufficient data to signal peer" logger.Log(0, r.Header.Get("user"), msg) logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New(msg), "badrequest")) return } - hosts, err := logic.GetAllHosts() + signal.IsPro = servercfg.IsPro + var peerHost *models.Host + if signal.ToHostID == "" { + peerHost, err = logic.GetHostByPubKey(signal.ToHostPubKey) + } else { + peerHost, err = logic.GetHost(signal.ToHostID) + } if err != nil { - logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) - return - } - // push the signal to host through mq - found := false - for _, hostI := range hosts { - if hostI.PublicKey.String() == signal.ToHostPubKey { - // found host publish message and break - found = true - err = mq.HostUpdate(&models.HostUpdate{ - Action: models.SignalHost, - Host: hostI, - Signal: signal, - }) - if err != nil { - logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failed to publish signal to peer: "+err.Error()), "badrequest")) - return - } - break - } - } - if !found { logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failed to signal, peer not found"), "badrequest")) return } + err = mq.HostUpdate(&models.HostUpdate{ + Action: models.SignalHost, + Host: *peerHost, + Signal: signal, + }) + if err != nil { + logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failed to publish signal to peer: "+err.Error()), "badrequest")) + return + } + w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(signal) } diff --git a/controllers/migrate.go b/controllers/migrate.go index d57b79f2..e3cf729b 100644 --- a/controllers/migrate.go +++ b/controllers/migrate.go @@ -218,7 +218,5 @@ func convertLegacyNode(legacy models.LegacyNode, hostID uuid.UUID) models.Node { node.IngressGatewayRange6 = legacy.IngressGatewayRange6 node.DefaultACL = legacy.DefaultACL node.OwnerID = legacy.OwnerID - node.FailoverNode, _ = uuid.Parse(legacy.FailoverNode) - node.Failover = models.ParseBool(legacy.Failover) return node } diff --git a/controllers/node.go b/controllers/node.go index 4acfd4f7..52441c77 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -341,7 +341,6 @@ func getAllNodes(w http.ResponseWriter, r *http.Request) { func getNode(w http.ResponseWriter, r *http.Request) { // set header. w.Header().Set("Content-Type", "application/json") - nodeRequest := r.Header.Get("requestfrom") == "node" var params = mux.Vars(r) nodeid := params["nodeid"] @@ -386,12 +385,6 @@ func getNode(w http.ResponseWriter, r *http.Request) { PeerIDs: hostPeerUpdate.PeerIDs, } - if servercfg.IsPro && nodeRequest { - if err = logic.EnterpriseResetAllPeersFailovers(node.ID, node.Network); err != nil { - logger.Log(1, "failed to reset failover list during node config pull", node.ID.String(), node.Network) - } - } - logger.Log(2, r.Header.Get("user"), "fetched node", params["nodeid"]) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(response) @@ -524,12 +517,6 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) { return } - if servercfg.IsPro && request.Failover { - if err = logic.EnterpriseResetFailoverFunc(node.Network); err != nil { - logger.Log(1, "failed to reset failover list during failover create", node.ID.String(), node.Network) - } - } - apiNode := node.ConvertToAPINode() logger.Log(1, r.Header.Get("user"), "created ingress gateway on node", nodeid, "on network", netid) w.WriteHeader(http.StatusOK) @@ -562,7 +549,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(err, "bad request")) return } - node, wasFailover, removedClients, err := logic.DeleteIngressGateway(nodeid) + node, removedClients, err := logic.DeleteIngressGateway(nodeid) if err != nil { logger.Log(0, r.Header.Get("user"), fmt.Sprintf("failed to delete ingress gateway on node [%s] on network [%s]: %v", @@ -572,11 +559,6 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { } if servercfg.IsPro { - if wasFailover { - if err = logic.EnterpriseResetFailoverFunc(node.Network); err != nil { - logger.Log(1, "failed to reset failover list during failover create", node.ID.String(), node.Network) - } - } go func() { users, err := logic.GetUsersDB() if err == nil { @@ -662,11 +644,6 @@ func updateNode(w http.ResponseWriter, r *http.Request) { } ifaceDelta := logic.IfaceDelta(¤tNode, newNode) aclUpdate := currentNode.DefaultACL != newNode.DefaultACL - if ifaceDelta && servercfg.IsPro { - if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil { - logger.Log(0, "failed to reset failover lists during node update for node", currentNode.ID.String(), currentNode.Network) - } - } err = logic.UpdateNode(¤tNode, newNode) if err != nil { diff --git a/logic/errors.go b/logic/errors.go index 8259d586..edf2360a 100644 --- a/logic/errors.go +++ b/logic/errors.go @@ -44,6 +44,17 @@ func ReturnSuccessResponse(response http.ResponseWriter, request *http.Request, json.NewEncoder(response).Encode(httpResponse) } +// ReturnSuccessResponseWithJson - processes message and adds header +func ReturnSuccessResponseWithJson(response http.ResponseWriter, request *http.Request, res interface{}, message string) { + var httpResponse models.SuccessResponse + httpResponse.Code = http.StatusOK + httpResponse.Response = res + httpResponse.Message = message + response.Header().Set("Content-Type", "application/json") + response.WriteHeader(http.StatusOK) + json.NewEncoder(response).Encode(httpResponse) +} + // ReturnErrorResponse - processes error and adds header func ReturnErrorResponse(response http.ResponseWriter, request *http.Request, errorMessage models.ErrorResponse) { httpResponse := &models.ErrorResponse{Code: errorMessage.Code, Message: errorMessage.Message} diff --git a/logic/gateway.go b/logic/gateway.go index 342dacb3..c5fbc1d0 100644 --- a/logic/gateway.go +++ b/logic/gateway.go @@ -8,7 +8,6 @@ import ( "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/models" - "github.com/gravitl/netmaker/servercfg" ) // GetInternetGateways - gets all the nodes that are internet gateways @@ -168,9 +167,6 @@ func CreateIngressGateway(netid string, nodeid string, ingress models.IngressReq node.IngressGatewayRange6 = network.AddressRange6 node.IngressDNS = ingress.ExtclientDNS node.SetLastModified() - if ingress.Failover && servercfg.IsPro { - node.Failover = true - } err = UpsertNode(&node) if err != nil { return models.Node{}, err @@ -199,35 +195,33 @@ func GetIngressGwUsers(node models.Node) (models.IngressGwUsers, error) { } // DeleteIngressGateway - deletes an ingress gateway -func DeleteIngressGateway(nodeid string) (models.Node, bool, []models.ExtClient, error) { +func DeleteIngressGateway(nodeid string) (models.Node, []models.ExtClient, error) { removedClients := []models.ExtClient{} node, err := GetNodeByID(nodeid) if err != nil { - return models.Node{}, false, removedClients, err + return models.Node{}, removedClients, err } clients, err := GetExtClientsByID(nodeid, node.Network) if err != nil && !database.IsEmptyRecord(err) { - return models.Node{}, false, removedClients, err + return models.Node{}, removedClients, err } removedClients = clients // delete ext clients belonging to ingress gateway if err = DeleteGatewayExtClients(node.ID.String(), node.Network); err != nil { - return models.Node{}, false, removedClients, err + return models.Node{}, removedClients, err } logger.Log(3, "deleting ingress gateway") - wasFailover := node.Failover node.LastModified = time.Now() node.IsIngressGateway = false node.IngressGatewayRange = "" - node.Failover = false err = UpsertNode(&node) if err != nil { - return models.Node{}, wasFailover, removedClients, err + return models.Node{}, removedClients, err } err = SetNetworkNodesLastModified(node.Network) - return node, wasFailover, removedClients, err + return node, removedClients, err } // DeleteGatewayExtClients - deletes ext clients based on gateway (mac) of ingress node and network diff --git a/logic/hosts.go b/logic/hosts.go index 9778ddf8..ca260de0 100644 --- a/logic/hosts.go +++ b/logic/hosts.go @@ -156,6 +156,20 @@ func GetHost(hostid string) (*models.Host, error) { return &h, nil } +// GetHostByPubKey - gets a host from db given pubkey +func GetHostByPubKey(hostPubKey string) (*models.Host, error) { + hosts, err := GetAllHosts() + if err != nil { + return nil, err + } + for _, host := range hosts { + if host.PublicKey.String() == hostPubKey { + return &host, nil + } + } + return nil, errors.New("host not found") +} + // CreateHost - creates a host if not exist func CreateHost(h *models.Host) error { hosts, hErr := GetAllHosts() diff --git a/logic/nodes.go b/logic/nodes.go index f5889da2..a72043db 100644 --- a/logic/nodes.go +++ b/logic/nodes.go @@ -205,6 +205,9 @@ func DeleteNode(node *models.Node, purge bool) error { UpsertNode(&relayNode) } } + if node.FailedOverBy != uuid.Nil { + ResetFailedOverPeer(node) + } if node.IsRelay { // unset all the relayed nodes SetRelayedNodes(false, node.ID.String(), node.RelayedNodes) @@ -233,11 +236,6 @@ func DeleteNode(node *models.Node, purge bool) error { if err := DissasociateNodeFromHost(node, host); err != nil { return err } - if servercfg.IsPro { - if err := EnterpriseResetAllPeersFailovers(node.ID, node.Network); err != nil { - logger.Log(0, "failed to reset failover lists during node delete for node", host.Name, node.Network) - } - } return nil } @@ -309,20 +307,6 @@ func ValidateNode(node *models.Node, isUpdate bool) error { return err } -// IsFailoverPresent - checks if a node is marked as a failover in given network -func IsFailoverPresent(network string) bool { - netNodes, err := GetNetworkNodes(network) - if err != nil { - return false - } - for i := range netNodes { - if netNodes[i].Failover { - return true - } - } - return false -} - // GetAllNodes - returns all nodes in the DB func GetAllNodes() ([]models.Node, error) { var nodes []models.Node @@ -385,6 +369,9 @@ func SetNodeDefaults(node *models.Node) { if node.DefaultACL == "" { node.DefaultACL = parentNetwork.DefaultACL } + if node.FailOverPeers == nil { + node.FailOverPeers = make(map[string]struct{}) + } node.SetLastModified() node.SetLastCheckIn() diff --git a/logic/peers.go b/logic/peers.go index 9e7e32ca..bcbb0cb2 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -15,6 +15,17 @@ import ( "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ) +var ( + // ResetFailOver - function to reset failOvered peers on this node + ResetFailOver = func(failOverNode *models.Node) error { + return nil + } + // ResetFailedOverPeer - removes failed over node from network peers + ResetFailedOverPeer = func(failedOverNode *models.Node) error { + return nil + } +) + // GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient) (models.HostPeerUpdate, error) { @@ -132,7 +143,9 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N if peer.IsIngressGateway { hostPeerUpdate.EgressRoutes = append(hostPeerUpdate.EgressRoutes, getExtpeersExtraRoutes(peer.Network)...) } - if (node.IsRelayed && node.RelayedBy != peer.ID.String()) || (peer.IsRelayed && peer.RelayedBy != node.ID.String()) { + _, isFailOverPeer := node.FailOverPeers[peer.ID.String()] + if (node.IsRelayed && node.RelayedBy != peer.ID.String()) || + (peer.IsRelayed && peer.RelayedBy != node.ID.String()) || isFailOverPeer { // if node is relayed and peer is not the relay, set remove to true if _, ok := peerIndexMap[peerHost.PublicKey.String()]; ok { continue @@ -197,9 +210,10 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N nodePeer = hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]] } - if node.Network == network { // add to peers map for metrics + if node.Network == network && !peerConfig.Remove { // add to peers map for metrics hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{ ID: peer.ID.String(), + HostID: peerHost.ID.String(), Address: peer.PrimaryAddress(), Name: peerHost.Name, Network: peer.Network, @@ -316,6 +330,31 @@ func GetAllowedIPs(node, peer *models.Node, metrics *models.Metrics) []net.IPNet return allowedips } +func GetFailOverPeerIps(peer, node *models.Node) []net.IPNet { + allowedips := []net.IPNet{} + for failOverpeerID := range node.FailOverPeers { + failOverpeer, err := GetNodeByID(failOverpeerID) + if err == nil && failOverpeer.FailedOverBy == peer.ID { + if failOverpeer.Address.IP != nil { + allowed := net.IPNet{ + IP: failOverpeer.Address.IP, + Mask: net.CIDRMask(32, 32), + } + allowedips = append(allowedips, allowed) + } + if failOverpeer.Address6.IP != nil { + allowed := net.IPNet{ + IP: failOverpeer.Address6.IP, + Mask: net.CIDRMask(128, 128), + } + allowedips = append(allowedips, allowed) + } + + } + } + return allowedips +} + func GetEgressIPs(peer *models.Node) []net.IPNet { peerHost, err := GetHost(peer.HostID.String()) @@ -379,6 +418,9 @@ func getNodeAllowedIPs(peer, node *models.Node) []net.IPNet { if peer.IsRelay { allowedips = append(allowedips, RelayedAllowedIPs(peer, node)...) } + if peer.IsFailOver { + allowedips = append(allowedips, GetFailOverPeerIps(peer, node)...) + } return allowedips } diff --git a/logic/server.go b/logic/server.go index a8d50932..00202026 100644 --- a/logic/server.go +++ b/logic/server.go @@ -1,22 +1,8 @@ package logic -import ( - "github.com/google/uuid" - "github.com/gravitl/netmaker/models" -) - // EnterpriseCheckFuncs - can be set to run functions for EE var EnterpriseCheckFuncs []func() -// EnterpriseFailoverFunc - interface to control failover funcs -var EnterpriseFailoverFunc func(node *models.Node) error - -// EnterpriseResetFailoverFunc - interface to control reset failover funcs -var EnterpriseResetFailoverFunc func(network string) error - -// EnterpriseResetAllPeersFailovers - resets all nodes that are considering a node to be failover worthy (inclusive) -var EnterpriseResetAllPeersFailovers func(nodeid uuid.UUID, network string) error - // == Join, Checkin, and Leave for Server == // KUBERNETES_LISTEN_PORT - starting port for Kubernetes in order to use NodePort range diff --git a/models/api_host.go b/models/api_host.go index 8cfe79af..2dc8c089 100644 --- a/models/api_host.go +++ b/models/api_host.go @@ -26,10 +26,6 @@ type ApiHost struct { MacAddress string `json:"macaddress"` Nodes []string `json:"nodes"` IsDefault bool `json:"isdefault" yaml:"isdefault"` - IsRelayed bool `json:"isrelayed" yaml:"isrelayed" bson:"isrelayed"` - RelayedBy string `json:"relayed_by" yaml:"relayed_by" bson:"relayed_by"` - IsRelay bool `json:"isrelay" yaml:"isrelay" bson:"isrelay"` - RelayedHosts []string `json:"relay_hosts" yaml:"relay_hosts" bson:"relay_hosts"` NatType string `json:"nat_type" yaml:"nat_type"` PersistentKeepalive int `json:"persistentkeepalive" yaml:"persistentkeepalive"` AutoUpdate bool `json:"autoupdate" yaml:"autoupdate"` diff --git a/models/api_node.go b/models/api_node.go index 5dea54ae..5f474b40 100644 --- a/models/api_node.go +++ b/models/api_node.go @@ -30,7 +30,6 @@ type ApiNode struct { IsIngressGateway bool `json:"isingressgateway"` EgressGatewayRanges []string `json:"egressgatewayranges"` EgressGatewayNatEnabled bool `json:"egressgatewaynatenabled"` - FailoverNode string `json:"failovernode"` DNSOn bool `json:"dnson"` IngressDns string `json:"ingressdns"` Server string `json:"server"` @@ -38,8 +37,10 @@ type ApiNode struct { Connected bool `json:"connected"` PendingDelete bool `json:"pendingdelete"` // == PRO == - DefaultACL string `json:"defaultacl,omitempty" validate:"checkyesornoorunset"` - Failover bool `json:"failover"` + DefaultACL string `json:"defaultacl,omitempty" validate:"checkyesornoorunset"` + IsFailOver bool `json:"is_fail_over"` + FailOverPeers map[string]struct{} `json:"fail_over_peers" yaml:"fail_over_peers"` + FailedOverBy uuid.UUID `json:"failed_over_by" yaml:"failed_over_by"` } // ApiNode.ConvertToServerNode - converts an api node to a server node @@ -56,7 +57,8 @@ func (a *ApiNode) ConvertToServerNode(currentNode *Node) *Node { convertedNode.RelayedBy = a.RelayedBy convertedNode.RelayedNodes = a.RelayedNodes convertedNode.PendingDelete = a.PendingDelete - convertedNode.Failover = a.Failover + convertedNode.FailedOverBy = currentNode.FailedOverBy + convertedNode.FailOverPeers = currentNode.FailOverPeers convertedNode.IsEgressGateway = a.IsEgressGateway convertedNode.IsIngressGateway = a.IsIngressGateway // prevents user from changing ranges, must delete and recreate @@ -100,7 +102,6 @@ func (a *ApiNode) ConvertToServerNode(currentNode *Node) *Node { convertedNode.Address6 = *addr6 convertedNode.Address6.IP = ip6 } - convertedNode.FailoverNode, _ = uuid.Parse(a.FailoverNode) convertedNode.LastModified = time.Unix(a.LastModified, 0) convertedNode.LastCheckIn = time.Unix(a.LastCheckIn, 0) convertedNode.LastPeerUpdate = time.Unix(a.LastPeerUpdate, 0) @@ -146,10 +147,6 @@ func (nm *Node) ConvertToAPINode() *ApiNode { apiNode.IsIngressGateway = nm.IsIngressGateway apiNode.EgressGatewayRanges = nm.EgressGatewayRanges apiNode.EgressGatewayNatEnabled = nm.EgressGatewayNatEnabled - apiNode.FailoverNode = nm.FailoverNode.String() - if isUUIDSet(apiNode.FailoverNode) { - apiNode.FailoverNode = "" - } apiNode.DNSOn = nm.DNSOn apiNode.IngressDns = nm.IngressDNS apiNode.Server = nm.Server @@ -160,14 +157,12 @@ func (nm *Node) ConvertToAPINode() *ApiNode { apiNode.Connected = nm.Connected apiNode.PendingDelete = nm.PendingDelete apiNode.DefaultACL = nm.DefaultACL - apiNode.Failover = nm.Failover + apiNode.IsFailOver = nm.IsFailOver + apiNode.FailOverPeers = nm.FailOverPeers + apiNode.FailedOverBy = nm.FailedOverBy return &apiNode } func isEmptyAddr(addr string) bool { return addr == "" || addr == ":0" } - -func isUUIDSet(uuid string) bool { - return uuid != "00000000-0000-0000-0000-000000000000" -} diff --git a/models/host.go b/models/host.go index de55c531..42465a26 100644 --- a/models/host.go +++ b/models/host.go @@ -126,6 +126,8 @@ const ( Disconnect SignalAction = "DISCONNECT" // ConnNegotiation - action to negotiate connection between peers ConnNegotiation SignalAction = "CONNECTION_NEGOTIATION" + // RelayME - action to relay the peer + RelayME SignalAction = "RELAY_ME" ) // HostUpdate - struct for host update @@ -148,8 +150,13 @@ type Signal struct { FromHostPubKey string `json:"from_host_pubkey"` TurnRelayEndpoint string `json:"turn_relay_addr"` ToHostPubKey string `json:"to_host_pubkey"` + FromHostID string `json:"from_host_id"` + ToHostID string `json:"to_host_id"` + FromNodeID string `json:"from_node_id"` + ToNodeID string `json:"to_node_id"` Reply bool `json:"reply"` Action SignalAction `json:"action"` + IsPro bool `json:"is_pro"` TimeStamp int64 `json:"timestamp"` } diff --git a/models/metrics.go b/models/metrics.go index f83c33e9..72a77e1c 100644 --- a/models/metrics.go +++ b/models/metrics.go @@ -29,6 +29,7 @@ type Metric struct { // IDandAddr - struct to hold ID and primary Address type IDandAddr struct { ID string `json:"id" bson:"id" yaml:"id"` + HostID string `json:"host_id"` Address string `json:"address" bson:"address" yaml:"address"` Name string `json:"name" bson:"name" yaml:"name"` IsServer string `json:"isserver" bson:"isserver" yaml:"isserver" validate:"checkyesorno"` diff --git a/models/mqtt.go b/models/mqtt.go index 7c6072af..5dc1927e 100644 --- a/models/mqtt.go +++ b/models/mqtt.go @@ -70,3 +70,8 @@ type FwUpdate struct { IsEgressGw bool `json:"is_egress_gw"` EgressInfo map[string]EgressInfo `json:"egress_info"` } + +// FailOverMeReq - struct for failover req +type FailOverMeReq struct { + NodeID string `json:"node_id"` +} diff --git a/models/node.go b/models/node.go index 3e230488..4e5dc972 100644 --- a/models/node.go +++ b/models/node.go @@ -90,10 +90,11 @@ type Node struct { IngressGatewayRange string `json:"ingressgatewayrange" bson:"ingressgatewayrange" yaml:"ingressgatewayrange"` IngressGatewayRange6 string `json:"ingressgatewayrange6" bson:"ingressgatewayrange6" yaml:"ingressgatewayrange6"` // == PRO == - DefaultACL string `json:"defaultacl,omitempty" bson:"defaultacl,omitempty" yaml:"defaultacl,omitempty" validate:"checkyesornoorunset"` - OwnerID string `json:"ownerid,omitempty" bson:"ownerid,omitempty" yaml:"ownerid,omitempty"` - FailoverNode uuid.UUID `json:"failovernode" bson:"failovernode" yaml:"failovernode"` - Failover bool `json:"failover" bson:"failover" yaml:"failover"` + DefaultACL string `json:"defaultacl,omitempty" bson:"defaultacl,omitempty" yaml:"defaultacl,omitempty" validate:"checkyesornoorunset"` + OwnerID string `json:"ownerid,omitempty" bson:"ownerid,omitempty" yaml:"ownerid,omitempty"` + IsFailOver bool `json:"is_fail_over" yaml:"is_fail_over"` + FailOverPeers map[string]struct{} `json:"fail_over_peers" yaml:"fail_over_peers"` + FailedOverBy uuid.UUID `json:"failed_over_by" yaml:"failed_over_by"` } // LegacyNode - legacy struct for node model @@ -432,8 +433,8 @@ func (newNode *Node) Fill(currentNode *Node, isPro bool) { // TODO add new field if newNode.DefaultACL == "" { newNode.DefaultACL = currentNode.DefaultACL } - if newNode.Failover != currentNode.Failover { - newNode.Failover = currentNode.Failover + if newNode.IsFailOver != currentNode.IsFailOver { + newNode.IsFailOver = currentNode.IsFailOver } } diff --git a/mq/handlers.go b/mq/handlers.go index a6ab628e..13ddb638 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -49,11 +49,6 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) { } ifaceDelta := logic.IfaceDelta(¤tNode, &newNode) - if servercfg.IsPro && ifaceDelta { - if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil { - slog.Warn("failed to reset failover list during node update", "nodeid", currentNode.ID, "network", currentNode.Network) - } - } newNode.SetLastCheckIn() if err := logic.UpdateNode(¤tNode, &newNode); err != nil { slog.Error("error saving node", "id", id, "error", err) diff --git a/mq/util.go b/mq/util.go index 4bdc4194..6ecee141 100644 --- a/mq/util.go +++ b/mq/util.go @@ -73,6 +73,7 @@ func encryptMsg(host *models.Host, msg []byte) ([]byte, error) { } func publish(host *models.Host, dest string, msg []byte) error { + encrypted, encryptErr := encryptMsg(host, msg) if encryptErr != nil { return encryptErr @@ -80,6 +81,7 @@ func publish(host *models.Host, dest string, msg []byte) error { if mqclient == nil { return errors.New("cannot publish ... mqclient not connected") } + if token := mqclient.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil { var err error if token.Error() == nil { diff --git a/pro/controllers/failover.go b/pro/controllers/failover.go new file mode 100644 index 00000000..9519afc7 --- /dev/null +++ b/pro/controllers/failover.go @@ -0,0 +1,201 @@ +package controllers + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + + "github.com/google/uuid" + "github.com/gorilla/mux" + controller "github.com/gravitl/netmaker/controllers" + "github.com/gravitl/netmaker/logger" + "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/mq" + proLogic "github.com/gravitl/netmaker/pro/logic" + "golang.org/x/exp/slog" +) + +// FailOverHandlers - handlers for FailOver +func FailOverHandlers(r *mux.Router) { + r.HandleFunc("/api/v1/node/{nodeid}/failover", logic.SecurityCheck(true, http.HandlerFunc(createfailOver))).Methods(http.MethodPost) + r.HandleFunc("/api/v1/node/{nodeid}/failover", logic.SecurityCheck(true, http.HandlerFunc(deletefailOver))).Methods(http.MethodDelete) + r.HandleFunc("/api/v1/node/{network}/failover/reset", logic.SecurityCheck(true, http.HandlerFunc(resetFailOver))).Methods(http.MethodPost) + r.HandleFunc("/api/v1/node/{nodeid}/failover_me", controller.Authorize(true, false, "host", http.HandlerFunc(failOverME))).Methods(http.MethodPost) +} + +// swagger:route POST /api/v1/node/failover node createfailOver +// +// Create a relay. +// +// Schemes: https +// +// Security: +// oauth +// +// Responses: +// 200: nodeResponse +func createfailOver(w http.ResponseWriter, r *http.Request) { + var params = mux.Vars(r) + nodeid := params["nodeid"] + // confirm host exists + node, err := logic.GetNodeByID(nodeid) + if err != nil { + slog.Error("failed to get node:", "error", err.Error()) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) + return + } + if _, exists := proLogic.FailOverExists(node.Network); exists { + logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failover exists already in the network"), "badrequest")) + return + } + host, err := logic.GetHost(node.HostID.String()) + if err != nil { + logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("error getting host"+err.Error()), "badrequest")) + return + } + if host.OS != models.OS_Types.Linux { + logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("only linux nodes can act as failovers"), "badrequest")) + return + } + if node.IsRelayed { + logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("cannot set relayed node as failover"), "badrequest")) + return + } + node.IsFailOver = true + err = logic.UpsertNode(&node) + if err != nil { + slog.Error("failed to upsert node", "node", node.ID.String(), "error", err) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) + return + } + go mq.PublishPeerUpdate() + w.Header().Set("Content-Type", "application/json") + logic.ReturnSuccessResponseWithJson(w, r, node, "created failover successfully") +} + +func resetFailOver(w http.ResponseWriter, r *http.Request) { + var params = mux.Vars(r) + net := params["network"] + nodes, err := logic.GetNetworkNodes(net) + if err != nil { + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) + return + } + for _, node := range nodes { + if node.FailedOverBy != uuid.Nil { + node.FailedOverBy = uuid.Nil + node.FailOverPeers = make(map[string]struct{}) + logic.UpsertNode(&node) + } + } + go mq.PublishPeerUpdate() + w.Header().Set("Content-Type", "application/json") + logic.ReturnSuccessResponse(w, r, "failover has been reset successfully") +} + +// swagger:route DELETE /api/v1/node/failover node deletefailOver +// +// Create a relay. +// +// Schemes: https +// +// Security: +// oauth +// +// Responses: +// 200: nodeResponse +func deletefailOver(w http.ResponseWriter, r *http.Request) { + var params = mux.Vars(r) + nodeid := params["nodeid"] + // confirm host exists + node, err := logic.GetNodeByID(nodeid) + if err != nil { + slog.Error("failed to get node:", "error", err.Error()) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) + return + } + node.IsFailOver = false + // Reset FailOvered Peers + err = logic.UpsertNode(&node) + if err != nil { + slog.Error("failed to upsert node", "node", node.ID.String(), "error", err) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) + return + } + go func() { + proLogic.ResetFailOver(&node) + mq.PublishPeerUpdate() + }() + w.Header().Set("Content-Type", "application/json") + logic.ReturnSuccessResponseWithJson(w, r, node, "deleted failover successfully") +} + +// swagger:route POST /api/node/{nodeid}/failOverME node failOver_me +// +// Create a relay. +// +// Schemes: https +// +// Security: +// oauth +// +// Responses: +// 200: nodeResponse +func failOverME(w http.ResponseWriter, r *http.Request) { + var params = mux.Vars(r) + nodeid := params["nodeid"] + // confirm host exists + node, err := logic.GetNodeByID(nodeid) + if err != nil { + logger.Log(0, r.Header.Get("user"), "failed to get node:", err.Error()) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) + return + } + + failOverNode, exists := proLogic.FailOverExists(node.Network) + if !exists { + logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failover node doesn't exist in the network"), "badrequest")) + return + } + var failOverReq models.FailOverMeReq + err = json.NewDecoder(r.Body).Decode(&failOverReq) + if err != nil { + logger.Log(0, r.Header.Get("user"), "error decoding request body: ", err.Error()) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) + return + } + var sendPeerUpdate bool + peerNode, err := logic.GetNodeByID(failOverReq.NodeID) + if err != nil { + slog.Error("peer not found: ", "nodeid", failOverReq.NodeID, "error", err) + logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("peer not found"), "badrequest")) + return + } + if node.IsRelayed || node.IsFailOver { + logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("node is relayed or acting as failover"), "badrequest")) + return + } + if peerNode.IsRelayed || peerNode.IsFailOver { + logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("peer node is relayed or acting as failover"), "badrequest")) + return + } + + err = proLogic.SetFailOverCtx(failOverNode, node, peerNode) + if err != nil { + slog.Error("failed to create failover", "id", node.ID.String(), + "network", node.Network, "error", err) + logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("failed to create failover: %v", err), "internal")) + return + } + slog.Info("[auto-relay] created relay on node", "node", node.ID.String(), "network", node.Network) + sendPeerUpdate = true + + if sendPeerUpdate { + go mq.PublishPeerUpdate() + } + + w.Header().Set("Content-Type", "application/json") + logic.ReturnSuccessResponse(w, r, "relayed successfully") +} diff --git a/pro/controllers/relay.go b/pro/controllers/relay.go index 65b81399..c707be7e 100644 --- a/pro/controllers/relay.go +++ b/pro/controllers/relay.go @@ -3,9 +3,11 @@ package controllers import ( "encoding/json" "fmt" - proLogic "github.com/gravitl/netmaker/pro/logic" "net/http" + "github.com/google/uuid" + proLogic "github.com/gravitl/netmaker/pro/logic" + "github.com/gorilla/mux" controller "github.com/gravitl/netmaker/controllers" "github.com/gravitl/netmaker/logger" @@ -19,6 +21,7 @@ func RelayHandlers(r *mux.Router) { r.HandleFunc("/api/nodes/{network}/{nodeid}/createrelay", controller.Authorize(false, true, "user", http.HandlerFunc(createRelay))).Methods(http.MethodPost) r.HandleFunc("/api/nodes/{network}/{nodeid}/deleterelay", controller.Authorize(false, true, "user", http.HandlerFunc(deleteRelay))).Methods(http.MethodDelete) + r.HandleFunc("/api/v1/host/{hostid}/failoverme", controller.Authorize(true, false, "host", http.HandlerFunc(failOverME))).Methods(http.MethodPost) } // swagger:route POST /api/nodes/{network}/{nodeid}/createrelay nodes createRelay @@ -51,6 +54,15 @@ func createRelay(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) return } + for _, relayedNodeID := range relayNode.RelayedNodes { + relayedNode, err := logic.GetNodeByID(relayedNodeID) + if err == nil { + if relayedNode.FailedOverBy != uuid.Nil { + go logic.ResetFailedOverPeer(&relayedNode) + } + + } + } go mq.PublishPeerUpdate() logger.Log(1, r.Header.Get("user"), "created relay on node", relayRequest.NodeID, "on network", relayRequest.NetID) apiNode := relayNode.ConvertToAPINode() diff --git a/pro/initialize.go b/pro/initialize.go index 13346a9e..28db5f54 100644 --- a/pro/initialize.go +++ b/pro/initialize.go @@ -27,6 +27,7 @@ func InitPro() { proControllers.MetricHandlers, proControllers.RelayHandlers, proControllers.UserHandlers, + proControllers.FailOverHandlers, ) logic.EnterpriseCheckFuncs = append(logic.EnterpriseCheckFuncs, func() { // == License Handling == @@ -42,11 +43,9 @@ func InitPro() { if servercfg.GetServerConfig().RacAutoDisable { AddRacHooks() } - resetFailover() }) - logic.EnterpriseFailoverFunc = proLogic.SetFailover - logic.EnterpriseResetFailoverFunc = proLogic.ResetFailover - logic.EnterpriseResetAllPeersFailovers = proLogic.WipeAffectedFailoversOnly + logic.ResetFailOver = proLogic.ResetFailOver + logic.ResetFailedOverPeer = proLogic.ResetFailedOverPeer logic.DenyClientNodeAccess = proLogic.DenyClientNode logic.IsClientNodeAllowed = proLogic.IsClientNodeAllowed logic.AllowClientNodeAccess = proLogic.RemoveDeniedNodeFromClient @@ -65,18 +64,6 @@ func InitPro() { mq.UpdateMetrics = proLogic.MQUpdateMetrics } -func resetFailover() { - nets, err := logic.GetNetworks() - if err == nil { - for _, net := range nets { - err = proLogic.ResetFailover(net.NetID) - if err != nil { - slog.Error("failed to reset failover", "network", net.NetID, "error", err.Error()) - } - } - } -} - func retrieveProLogo() string { return ` __ __ ______ ______ __ __ ______ __ __ ______ ______ diff --git a/pro/logic/failover.go b/pro/logic/failover.go index 71883c5c..d8006334 100644 --- a/pro/logic/failover.go +++ b/pro/logic/failover.go @@ -1,122 +1,97 @@ package logic import ( + "errors" + "github.com/google/uuid" - "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" ) -// SetFailover - finds a suitable failover candidate and sets it -func SetFailover(node *models.Node) error { - failoverNode := determineFailoverCandidate(node) - if failoverNode != nil { - return setFailoverNode(failoverNode, node) +func SetFailOverCtx(failOverNode, victimNode, peerNode models.Node) error { + if peerNode.FailOverPeers == nil { + peerNode.FailOverPeers = make(map[string]struct{}) + } + if victimNode.FailOverPeers == nil { + victimNode.FailOverPeers = make(map[string]struct{}) + } + peerNode.FailOverPeers[victimNode.ID.String()] = struct{}{} + victimNode.FailOverPeers[peerNode.ID.String()] = struct{}{} + victimNode.FailedOverBy = failOverNode.ID + peerNode.FailedOverBy = failOverNode.ID + if err := logic.UpsertNode(&failOverNode); err != nil { + return err + } + if err := logic.UpsertNode(&victimNode); err != nil { + return err + } + if err := logic.UpsertNode(&peerNode); err != nil { + return err } return nil } -// ResetFailover - sets the failover node and wipes disconnected status -func ResetFailover(network string) error { +// GetFailOverNode - gets the host acting as failOver +func GetFailOverNode(network string, allNodes []models.Node) (models.Node, error) { + nodes := logic.GetNetworkNodesMemory(allNodes, network) + for _, node := range nodes { + if node.IsFailOver { + return node, nil + } + } + return models.Node{}, errors.New("auto relay not found") +} + +// FailOverExists - checks if failOver exists already in the network +func FailOverExists(network string) (failOverNode models.Node, exists bool) { nodes, err := logic.GetNetworkNodes(network) + if err != nil { + return + } + for _, node := range nodes { + if node.IsFailOver { + exists = true + failOverNode = node + return + } + } + return +} + +// ResetFailedOverPeer - removes failed over node from network peers +func ResetFailedOverPeer(failedOveredNode *models.Node) error { + nodes, err := logic.GetNetworkNodes(failedOveredNode.Network) + if err != nil { + return err + } + failedOveredNode.FailedOverBy = uuid.Nil + failedOveredNode.FailOverPeers = make(map[string]struct{}) + err = logic.UpsertNode(failedOveredNode) if err != nil { return err } for _, node := range nodes { - node := node - err = SetFailover(&node) - if err != nil { - logger.Log(2, "error setting failover for node", node.ID.String(), ":", err.Error()) - } - err = WipeFailover(node.ID.String()) - if err != nil { - logger.Log(2, "error wiping failover for node", node.ID.String(), ":", err.Error()) + if node.FailOverPeers == nil || node.ID == failedOveredNode.ID { + continue } + delete(node.FailOverPeers, failedOveredNode.ID.String()) + logic.UpsertNode(&node) } return nil } -// determineFailoverCandidate - returns a list of nodes that -// are suitable for relaying a given node -func determineFailoverCandidate(nodeToBeRelayed *models.Node) *models.Node { - - currentNetworkNodes, err := logic.GetNetworkNodes(nodeToBeRelayed.Network) - if err != nil { - return nil - } - - currentMetrics, err := GetMetrics(nodeToBeRelayed.ID.String()) - if err != nil || currentMetrics == nil || currentMetrics.Connectivity == nil { - return nil - } - - minLatency := int64(9223372036854775807) // max signed int64 value - var fastestCandidate *models.Node - for i := range currentNetworkNodes { - if currentNetworkNodes[i].ID == nodeToBeRelayed.ID { - continue - } - - if currentMetrics.Connectivity[currentNetworkNodes[i].ID.String()].Connected && (currentNetworkNodes[i].Failover) { - if currentMetrics.Connectivity[currentNetworkNodes[i].ID.String()].Latency < int64(minLatency) { - fastestCandidate = ¤tNetworkNodes[i] - minLatency = currentMetrics.Connectivity[currentNetworkNodes[i].ID.String()].Latency - } - } - } - - return fastestCandidate -} - -// setFailoverNode - changes node's failover node -func setFailoverNode(failoverNode, node *models.Node) error { - - node.FailoverNode = failoverNode.ID - nodeToUpdate, err := logic.GetNodeByID(node.ID.String()) +// ResetFailOver - reset failovered peers +func ResetFailOver(failOverNode *models.Node) error { + // Unset FailedOverPeers + nodes, err := logic.GetNetworkNodes(failOverNode.Network) if err != nil { return err } - if nodeToUpdate.FailoverNode == failoverNode.ID { - return nil - } - return logic.UpdateNode(&nodeToUpdate, node) -} - -// WipeFailover - removes the failover peers of given node (ID) -func WipeFailover(nodeid string) error { - metrics, err := GetMetrics(nodeid) - if err != nil { - return err - } - if metrics != nil { - metrics.FailoverPeers = make(map[string]string) - return logic.UpdateMetrics(nodeid, metrics) - } - return nil -} - -// WipeAffectedFailoversOnly - wipes failovers for nodes that have given node (ID) -// in their respective failover lists -func WipeAffectedFailoversOnly(nodeid uuid.UUID, network string) error { - currentNetworkNodes, err := logic.GetNetworkNodes(network) - if err != nil { - return nil - } - WipeFailover(nodeid.String()) - - for i := range currentNetworkNodes { - currNodeID := currentNetworkNodes[i].ID - if currNodeID == nodeid { - continue - } - currMetrics, err := GetMetrics(currNodeID.String()) - if err != nil || currMetrics == nil { - continue - } - if currMetrics.FailoverPeers != nil { - if len(currMetrics.FailoverPeers[nodeid.String()]) > 0 { - WipeFailover(currNodeID.String()) - } + for _, node := range nodes { + if node.FailedOverBy == failOverNode.ID { + node.FailedOverBy = uuid.Nil + node.FailOverPeers = make(map[string]struct{}) + logic.UpsertNode(&node) } } return nil diff --git a/pro/logic/metrics.go b/pro/logic/metrics.go index 78d20349..b8ee71ac 100644 --- a/pro/logic/metrics.go +++ b/pro/logic/metrics.go @@ -2,6 +2,9 @@ package logic import ( "encoding/json" + "math" + "time" + mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/logic" @@ -10,8 +13,6 @@ import ( "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/servercfg" "golang.org/x/exp/slog" - "math" - "time" ) // GetMetrics - gets the metrics @@ -80,13 +81,6 @@ func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) { } } - if newMetrics.Connectivity != nil { - err := logic.EnterpriseFailoverFunc(¤tNode) - if err != nil { - slog.Error("failed to failover for node", "id", currentNode.ID, "network", currentNode.Network, "error", err) - } - } - if shouldUpdate { slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network) host, err := logic.GetHost(currentNode.HostID.String()) @@ -170,21 +164,6 @@ func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) boo } - // add nodes that need failover - nodes, err := logic.GetNetworkNodes(currentNode.Network) - if err != nil { - slog.Error("failed to retrieve nodes while updating metrics", "error", err) - return false - } - for _, node := range nodes { - if !newMetrics.Connectivity[node.ID.String()].Connected && - len(newMetrics.Connectivity[node.ID.String()].NodeName) > 0 && - node.Connected && - len(node.FailoverNode) > 0 && - !node.Failover { - newMetrics.FailoverPeers[node.ID.String()] = node.FailoverNode.String() - } - } shouldUpdate := len(oldMetrics.FailoverPeers) == 0 && len(newMetrics.FailoverPeers) > 0 for k, v := range oldMetrics.FailoverPeers { if len(newMetrics.FailoverPeers[k]) > 0 && len(v) == 0 { diff --git a/pro/logic/relays.go b/pro/logic/relays.go index fb0660c1..88106b4e 100644 --- a/pro/logic/relays.go +++ b/pro/logic/relays.go @@ -69,7 +69,7 @@ func SetRelayedNodes(setRelayed bool, relay string, relayed []string) []models.N continue } node.IsRelayed = setRelayed - if node.IsRelayed { + if setRelayed { node.RelayedBy = relay } else { node.RelayedBy = "" @@ -155,6 +155,7 @@ func UpdateRelayed(currentNode, newNode *models.Node) { if len(updatenodes) > 0 { for _, relayedNode := range updatenodes { node := relayedNode + ResetFailedOverPeer(&node) go func() { if err := mq.NodeUpdate(&node); err != nil { slog.Error("error publishing node update to node", "node", node.ID, "error", err) diff --git a/pro/types.go b/pro/types.go index 3e4ef08d..29444d49 100644 --- a/pro/types.go +++ b/pro/types.go @@ -1,3 +1,4 @@ +// go:build ee //go:build ee // +build ee