remove peer update context

This commit is contained in:
Abhishek Kondur 2023-06-29 00:17:42 +05:30
parent c53386d744
commit 086ef25ac6
7 changed files with 19 additions and 43 deletions

View file

@ -4,7 +4,7 @@ ARG tags
WORKDIR /app WORKDIR /app
COPY . . COPY . .
RUN GOOS=linux CGO_ENABLED=1 go build -race -ldflags="-s -w " -tags ${tags} . RUN GOOS=linux CGO_ENABLED=1 go build -ldflags="-s -w " -tags ${tags} .
# RUN go build -tags=ee . -o netmaker main.go # RUN go build -tags=ee . -o netmaker main.go
FROM alpine:3.18.2 FROM alpine:3.18.2

View file

@ -1,7 +1,6 @@
package controller package controller
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -87,7 +86,7 @@ func pull(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return return
} }
hPU, err := logic.GetPeerUpdateForHost(context.Background(), "", host, allNodes, nil, nil) hPU, err := logic.GetPeerUpdateForHost("", host, allNodes, nil, nil)
if err != nil { if err != nil {
logger.Log(0, "could not pull peers for host", hostID) logger.Log(0, "could not pull peers for host", hostID)
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))

View file

@ -1,7 +1,6 @@
package controller package controller
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
@ -395,7 +394,7 @@ func getNode(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return return
} }
hostPeerUpdate, err := logic.GetPeerUpdateForHost(context.Background(), node.Network, host, allNodes, nil, nil) hostPeerUpdate, err := logic.GetPeerUpdateForHost(node.Network, host, allNodes, nil, nil)
if err != nil && !database.IsEmptyRecord(err) { if err != nil && !database.IsEmptyRecord(err) {
logger.Log(0, r.Header.Get("user"), logger.Log(0, r.Header.Get("user"),
fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err)) fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err))
@ -595,7 +594,6 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
return return
} }
go mq.PublishSingleHostPeerUpdate( go mq.PublishSingleHostPeerUpdate(
context.Background(),
host, host,
allNodes, allNodes,
nil, nil,

View file

@ -1,7 +1,6 @@
package ee_controllers package ee_controllers
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
@ -91,8 +90,12 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
h, err := logic.GetHost(relayedNode.HostID.String()) h, err := logic.GetHost(relayedNode.HostID.String())
if err == nil { if err == nil {
if h.OS == models.OS_Types.IoT { if h.OS == models.OS_Types.IoT {
nodes, err := logic.GetAllNodes()
if err != nil {
return
}
node.IsRelay = true // for iot update to recognise that it has to delete relay peer node.IsRelay = true // for iot update to recognise that it has to delete relay peer
if err = mq.PublishSingleHostPeerUpdate(context.Background(), h, &node, nil); err != nil { if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil); err != nil {
logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error()) logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error())
} }
} }

View file

@ -1,7 +1,6 @@
package logic package logic
import ( import (
"context"
"errors" "errors"
"net" "net"
"net/netip" "net/netip"
@ -16,15 +15,8 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
var (
// PeerUpdateCtx context to send to host peer updates
PeerUpdateCtx context.Context
// PeerUpdateStop - the cancel for PeerUpdateCtx
PeerUpdateStop context.CancelFunc
)
// GetProxyUpdateForHost - gets the proxy update for host // GetProxyUpdateForHost - gets the proxy update for host
func GetProxyUpdateForHost(ctx context.Context, host *models.Host) (models.ProxyManagerPayload, error) { func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error) {
proxyPayload := models.ProxyManagerPayload{ proxyPayload := models.ProxyManagerPayload{
Action: models.ProxyUpdate, Action: models.ProxyUpdate,
} }
@ -84,18 +76,8 @@ func GetProxyUpdateForHost(ctx context.Context, host *models.Host) (models.Proxy
return proxyPayload, nil return proxyPayload, nil
} }
// ResetPeerUpdateContext - kills any current peer updates and resets the context
func ResetPeerUpdateContext() {
return
if PeerUpdateCtx != nil && PeerUpdateStop != nil {
PeerUpdateStop() // tell any current peer updates to stop
}
PeerUpdateCtx, PeerUpdateStop = context.WithCancel(context.Background())
}
// GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks // GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient) (models.HostPeerUpdate, error) { func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient) (models.HostPeerUpdate, error) {
if host == nil { if host == nil {
return models.HostPeerUpdate{}, errors.New("host is nil") return models.HostPeerUpdate{}, errors.New("host is nil")
} }

View file

@ -1,7 +1,6 @@
package mq package mq
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math" "math"
@ -111,7 +110,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
if err != nil { if err != nil {
return return
} }
if err = PublishSingleHostPeerUpdate(context.Background(), currentHost, nodes, nil, nil); err != nil { if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil); err != nil {
slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err) slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
return return
} }
@ -243,7 +242,7 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
if err != nil { if err != nil {
return return
} }
if err = PublishSingleHostPeerUpdate(context.Background(), host, nodes, nil, nil); err != nil { if err = PublishSingleHostPeerUpdate(host, nodes, nil, nil); err != nil {
slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err) slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
} }
} }

View file

@ -1,7 +1,6 @@
package mq package mq
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -28,10 +27,9 @@ func PublishPeerUpdate() error {
if err != nil { if err != nil {
return err return err
} }
logic.ResetPeerUpdateContext()
for _, host := range hosts { for _, host := range hosts {
host := host host := host
if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, allNodes, nil, nil); err != nil { if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
} }
} }
@ -54,10 +52,9 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
if err != nil { if err != nil {
return err return err
} }
logic.ResetPeerUpdateContext()
for _, host := range hosts { for _, host := range hosts {
host := host host := host
if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, allNodes, delNode, nil); err != nil { if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
} }
} }
@ -80,10 +77,9 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
if err != nil { if err != nil {
return err return err
} }
logic.ResetPeerUpdateContext()
for _, host := range hosts { for _, host := range hosts {
host := host host := host
if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nodes, nil, []models.ExtClient{*delClient}); err != nil { if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
} }
} }
@ -91,9 +87,9 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
} }
// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient) error { func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient) error {
peerUpdate, err := logic.GetPeerUpdateForHost(ctx, "", host, allNodes, deletedNode, deletedClients) peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
if err != nil { if err != nil {
return err return err
} }
@ -101,7 +97,7 @@ func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, allNode
return nil return nil
} }
if host.OS != models.OS_Types.IoT { if host.OS != models.OS_Types.IoT {
proxyUpdate, err := logic.GetProxyUpdateForHost(ctx, host) proxyUpdate, err := logic.GetProxyUpdateForHost(host)
if err != nil { if err != nil {
return err return err
} }
@ -468,11 +464,10 @@ func sendPeers() {
//collectServerMetrics(networks[:]) //collectServerMetrics(networks[:])
} }
if force { if force {
logic.ResetPeerUpdateContext()
for _, host := range hosts { for _, host := range hosts {
host := host host := host
logger.Log(2, "sending scheduled peer update (5 min)") logger.Log(2, "sending scheduled peer update (5 min)")
if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nodes, nil, nil); err != nil { if err = PublishSingleHostPeerUpdate(&host, nodes, nil, nil); err != nil {
logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error()) logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
} }
} }