NM-125: Egress HA by Latency, Allow Tags to be selected as routing peers (#3698)

* enable egress routing peers with tags

* remove tag from egress when deleted

* fix egress tag functionality

* filter duplicate egress ips

* set default stun server if unset

* add version to status api

* sync deleted node udpate host deletion
This commit is contained in:
Abhishek K 2025-10-25 23:49:21 +04:00 committed by GitHub
parent 310e6f28ba
commit c5b48db2a1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 178 additions and 40 deletions

View file

@ -84,8 +84,15 @@ func createEgress(w http.ResponseWriter, r *http.Request) {
CreatedBy: r.Header.Get("user"),
CreatedAt: time.Now().UTC(),
}
for nodeID, metric := range req.Nodes {
e.Nodes[nodeID] = metric
if len(req.Tags) > 0 {
for tagID, metric := range req.Tags {
e.Tags[tagID] = metric
}
e.Nodes = make(datatypes.JSONMap)
} else {
for nodeID, metric := range req.Nodes {
e.Nodes[nodeID] = metric
}
}
if err := logic.ValidateEgressReq(&e); err != nil {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
@ -272,8 +279,15 @@ func updateEgress(w http.ResponseWriter, r *http.Request) {
}
e.Nodes = make(datatypes.JSONMap)
e.Tags = make(datatypes.JSONMap)
for nodeID, metric := range req.Nodes {
e.Nodes[nodeID] = metric
if len(req.Tags) > 0 {
for tagID, metric := range req.Tags {
e.Tags[tagID] = metric
}
e.Nodes = make(datatypes.JSONMap)
} else {
for nodeID, metric := range req.Nodes {
e.Nodes[nodeID] = metric
}
}
if e.Domain != req.Domain {
e.DomainAns = datatypes.JSONSlice[string]{}

View file

@ -403,8 +403,7 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
case models.SignalHost:
mq.SignalPeer(hostUpdate.Signal)
case models.DeleteHost:
mq.DeleteAndCleanupHost(currentHost)
sendPeerUpdate = true
go mq.DeleteAndCleanupHost(currentHost)
}
go func() {
if sendDeletedNodeUpdate {
@ -447,11 +446,7 @@ func deleteHost(w http.ResponseWriter, r *http.Request) {
slog.Error("failed to get node", "nodeid", nodeID, "error", err)
continue
}
var gwClients []models.ExtClient
if node.IsIngressGateway {
gwClients = logic.GetGwExtclients(node.ID.String(), node.Network)
}
go mq.PublishMqUpdatesForDeletedNode(node, false, gwClients)
go mq.PublishMqUpdatesForDeletedNode(node, false)
}
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
@ -705,10 +700,6 @@ func deleteHostFromNetwork(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return
}
var gwClients []models.ExtClient
if node.IsIngressGateway {
gwClients = logic.GetGwExtclients(node.ID.String(), node.Network)
}
logger.Log(1, "deleting node", node.ID.String(), "from host", currHost.Name)
if err := logic.DeleteNode(node, forceDelete); err != nil {
logic.ReturnErrorResponse(
@ -719,7 +710,7 @@ func deleteHostFromNetwork(w http.ResponseWriter, r *http.Request) {
return
}
go func() {
mq.PublishMqUpdatesForDeletedNode(*node, true, gwClients)
mq.PublishMqUpdatesForDeletedNode(*node, true)
if servercfg.IsDNSMode() {
logic.SetDNS()
}

View file

@ -729,9 +729,9 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
if err := mq.NodeUpdate(newNode); err != nil {
slog.Error("error publishing node update to node", "node", newNode.ID, "error", err)
}
if !newNode.Connected {
mq.HostUpdate(&models.HostUpdate{Host: *host, Action: models.RequestPull})
}
// if !newNode.Connected {
// mq.HostUpdate(&models.HostUpdate{Host: *host, Action: models.SignalPull})
// }
mq.PublishPeerUpdate(false)
if servercfg.IsDNSMode() {
logic.SetDNS()
@ -759,10 +759,6 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
}
forceDelete := r.URL.Query().Get("force") == "true"
fromNode := r.Header.Get("requestfrom") == "node"
var gwClients []models.ExtClient
if node.IsIngressGateway {
gwClients = logic.GetGwExtclients(node.ID.String(), node.Network)
}
purge := forceDelete || fromNode
if err := logic.DeleteNode(&node, purge); err != nil {
logic.ReturnErrorResponse(
@ -775,5 +771,5 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
logic.ReturnSuccessResponse(w, r, nodeid+" deleted.")
logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
go mq.PublishMqUpdatesForDeletedNode(node, !fromNode, gwClients)
go mq.PublishMqUpdatesForDeletedNode(node, !fromNode)
}

View file

@ -101,6 +101,7 @@ func getStatus(w http.ResponseWriter, r *http.Request) {
IsPro bool `json:"is_pro"`
TrialEndDate time.Time `json:"trial_end_date"`
IsOnTrialLicense bool `json:"is_on_trial_license"`
Version string `json:"version"`
}
licenseErr := ""
@ -125,6 +126,7 @@ func getStatus(w http.ResponseWriter, r *http.Request) {
IsBrokerConnOpen: mq.IsConnectionOpen(),
LicenseError: licenseErr,
IsPro: servercfg.IsPro,
Version: servercfg.Version,
//TrialEndDate: trialEndDate,
//IsOnTrialLicense: isOnTrial,
}

View file

@ -13,7 +13,9 @@ import (
"github.com/gravitl/netmaker/servercfg"
)
func ValidateEgressReq(e *schema.Egress) error {
var ValidateEgressReq = validateEgressReq
func validateEgressReq(e *schema.Egress) error {
if e.Network == "" {
return errors.New("network id is empty")
}
@ -162,6 +164,42 @@ func AddEgressInfoToPeerByAccess(node, targetNode *models.Node, eli []schema.Egr
}
}
for tagID := range targetNode.Tags {
if metric, ok := e.Tags[tagID.String()]; ok {
m64, err := metric.(json.Number).Int64()
if err != nil {
m64 = 256
}
m := uint32(m64)
if e.Range != "" {
req.Ranges = append(req.Ranges, e.Range)
} else {
req.Ranges = append(req.Ranges, e.DomainAns...)
}
if e.Range != "" {
req.Ranges = append(req.Ranges, e.Range)
req.RangesWithMetric = append(req.RangesWithMetric, models.EgressRangeMetric{
Network: e.Range,
Nat: e.Nat,
RouteMetric: m,
})
}
if e.Domain != "" && len(e.DomainAns) > 0 {
req.Ranges = append(req.Ranges, e.DomainAns...)
for _, domainAnsI := range e.DomainAns {
req.RangesWithMetric = append(req.RangesWithMetric, models.EgressRangeMetric{
Network: domainAnsI,
Nat: e.Nat,
RouteMetric: m,
})
}
}
break
}
}
}
if targetNode.Mutex != nil {
targetNode.Mutex.Lock()
@ -240,6 +278,41 @@ func GetNodeEgressInfo(targetNode *models.Node, eli []schema.Egress, acls []mode
}
}
for tagID := range targetNode.Tags {
if metric, ok := e.Tags[tagID.String()]; ok {
m64, err := metric.(json.Number).Int64()
if err != nil {
m64 = 256
}
m := uint32(m64)
if e.Range != "" {
req.Ranges = append(req.Ranges, e.Range)
} else {
req.Ranges = append(req.Ranges, e.DomainAns...)
}
if e.Range != "" {
req.Ranges = append(req.Ranges, e.Range)
req.RangesWithMetric = append(req.RangesWithMetric, models.EgressRangeMetric{
Network: e.Range,
Nat: e.Nat,
RouteMetric: m,
})
}
if e.Domain != "" && len(e.DomainAns) > 0 {
req.Ranges = append(req.Ranges, e.DomainAns...)
for _, domainAnsI := range e.DomainAns {
req.RangesWithMetric = append(req.RangesWithMetric, models.EgressRangeMetric{
Network: domainAnsI,
Nat: e.Nat,
RouteMetric: m,
})
}
}
break
}
}
}
if targetNode.Mutex != nil {
targetNode.Mutex.Lock()

View file

@ -223,6 +223,9 @@ func UpdateNode(currentNode *models.Node, newNode *models.Node) error {
}
newNode.EgressDetails = models.EgressDetails{}
newNode.SetLastModified()
if !currentNode.Connected && newNode.Connected {
newNode.SetLastCheckIn()
}
if data, err := json.Marshal(newNode); err != nil {
return err
} else {

View file

@ -582,7 +582,7 @@ func filterConflictingEgressRoutes(node, peer models.Node) []string {
}
}
return egressIPs
return UniqueStrings(egressIPs)
}
func filterConflictingEgressRoutesWithMetric(node, peer models.Node) []models.EgressRangeMetric {

View file

@ -821,6 +821,9 @@ func migrateSettings() {
if settings.JwtValidityDurationClients == 0 {
settings.JwtValidityDurationClients = servercfg.GetJwtValidityDurationFromEnv() / 60
}
if settings.StunServers == "" {
settings.StunServers = servercfg.GetStunServers()
}
logic.UpsertServerSettings(settings)
}

View file

@ -6,7 +6,7 @@ type EgressReq struct {
Network string `json:"network"`
Description string `json:"description"`
Nodes map[string]int `json:"nodes"`
Tags []string `json:"tags"`
Tags map[string]int `json:"tags"`
Range string `json:"range"`
Domain string `json:"domain"`
Nat bool `json:"nat"`

View file

@ -160,19 +160,13 @@ func DeleteAndCleanupHost(h *models.Host) {
}
// notify of deleted peer change
go func(host models.Host) {
for _, nodeID := range host.Nodes {
node, err := logic.GetNodeByID(nodeID)
if err == nil {
var gwClients []models.ExtClient
if node.IsIngressGateway {
gwClients = logic.GetGwExtclients(node.ID.String(), node.Network)
}
go PublishMqUpdatesForDeletedNode(node, false, gwClients)
}
for _, nodeID := range h.Nodes {
node, err := logic.GetNodeByID(nodeID)
if err == nil {
PublishMqUpdatesForDeletedNode(node, false)
}
}(*h)
}
if err := logic.DisassociateAllNodesFromHost(h.ID.String()); err != nil {
slog.Error("failed to delete all nodes of host", "id", h.ID, "error", err)

View file

@ -198,7 +198,7 @@ func ServerStartNotify() error {
}
// PublishMqUpdatesForDeletedNode - published all the required updates for deleted node
func PublishMqUpdatesForDeletedNode(node models.Node, sendNodeUpdate bool, gwClients []models.ExtClient) {
func PublishMqUpdatesForDeletedNode(node models.Node, sendNodeUpdate bool) {
// notify of peer change
node.PendingDelete = true
node.Action = models.NODE_DELETE

View file

@ -159,6 +159,7 @@ func InitPro() {
logic.GetNameserversForHost = proLogic.GetNameserversForHost
logic.GetNameserversForNode = proLogic.GetNameserversForNode
logic.ValidateNameserverReq = proLogic.ValidateNameserverReq
logic.ValidateEgressReq = proLogic.ValidateEgressReq
}

56
pro/logic/egress.go Normal file
View file

@ -0,0 +1,56 @@
package logic
import (
"context"
"errors"
"github.com/gravitl/netmaker/db"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/schema"
"github.com/gravitl/netmaker/servercfg"
"gorm.io/datatypes"
)
func ValidateEgressReq(e *schema.Egress) error {
if e.Network == "" {
return errors.New("network id is empty")
}
_, err := logic.GetNetwork(e.Network)
if err != nil {
return errors.New("failed to get network " + err.Error())
}
if !servercfg.IsPro && len(e.Nodes) > 1 {
return errors.New("can only set one routing node on CE")
}
if len(e.Nodes) > 0 {
for k := range e.Nodes {
_, err := logic.GetNodeByID(k)
if err != nil {
return errors.New("invalid routing node " + err.Error())
}
}
}
if len(e.Tags) > 0 {
e.Nodes = make(datatypes.JSONMap)
for tagID := range e.Tags {
_, err := GetTag(models.TagID(tagID))
if err != nil {
return errors.New("invalid tag " + tagID)
}
}
}
return nil
}
func RemoveTagFromEgress(net models.NetworkID, tagID models.TagID) {
eli, _ := (&schema.Egress{Network: net.String()}).ListByNetwork(db.WithContext(context.TODO()))
for _, eI := range eli {
if _, ok := eI.Tags[tagID.String()]; ok {
delete(eI.Tags, tagID.String())
eI.Update(db.WithContext(context.TODO()))
}
}
}

View file

@ -78,6 +78,7 @@ func DeleteTag(tagID models.TagID, removeFromPolicy bool) error {
// remove tag used on acl policy
go RemoveDeviceTagFromAclPolicies(tagID, tag.Network)
}
go RemoveTagFromEgress(tag.Network, tagID)
extclients, _ := logic.GetNetworkExtClients(tag.Network.String())
for _, extclient := range extclients {
if _, ok := extclient.Tags[tagID]; ok {

View file

@ -750,7 +750,11 @@ func IsStunEnabled() bool {
}
func GetStunServers() string {
return os.Getenv("STUN_SERVERS")
stunservers := os.Getenv("STUN_SERVERS")
if stunservers == "" {
stunservers = "stun1.l.google.com:19302,stun2.l.google.com:19302,stun3.l.google.com:19302,stun4.l.google.com:19302"
}
return stunservers
}
// GetEnvironment returns the environment the server is running in (e.g. dev, staging, prod...)