mirror of
https://github.com/gravitl/netmaker.git
synced 2025-10-06 11:56:39 +08:00
stop context cancel on peer updates
This commit is contained in:
parent
b4081f43d1
commit
4efbe6256f
6 changed files with 192 additions and 154 deletions
|
@ -4,7 +4,7 @@ ARG tags
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
COPY . .
|
COPY . .
|
||||||
|
|
||||||
RUN GOOS=linux CGO_ENABLED=1 go build -ldflags="-s -w " -tags ${tags} .
|
RUN GOOS=linux CGO_ENABLED=1 go build -race -ldflags="-s -w " -tags ${tags} .
|
||||||
# RUN go build -tags=ee . -o netmaker main.go
|
# RUN go build -tags=ee . -o netmaker main.go
|
||||||
FROM alpine:3.18.2
|
FROM alpine:3.18.2
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,13 @@ func pull(w http.ResponseWriter, r *http.Request) {
|
||||||
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
|
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
hPU, err := logic.GetPeerUpdateForHost(context.Background(), "", host, nil, nil)
|
allNodes, err := logic.GetAllNodes()
|
||||||
|
if err != nil {
|
||||||
|
logger.Log(0, "could not pull peers for host", hostID)
|
||||||
|
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
hPU, err := logic.GetPeerUpdateForHost(context.Background(), "", host, allNodes, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Log(0, "could not pull peers for host", hostID)
|
logger.Log(0, "could not pull peers for host", hostID)
|
||||||
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
|
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
|
||||||
|
|
|
@ -388,7 +388,14 @@ func getNode(w http.ResponseWriter, r *http.Request) {
|
||||||
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
|
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
hostPeerUpdate, err := logic.GetPeerUpdateForHost(context.Background(), node.Network, host, nil, nil)
|
allNodes, err := logic.GetAllNodes()
|
||||||
|
if err != nil {
|
||||||
|
logger.Log(0, r.Header.Get("user"),
|
||||||
|
fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err))
|
||||||
|
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
hostPeerUpdate, err := logic.GetPeerUpdateForHost(context.Background(), node.Network, host, allNodes, nil, nil)
|
||||||
if err != nil && !database.IsEmptyRecord(err) {
|
if err != nil && !database.IsEmptyRecord(err) {
|
||||||
logger.Log(0, r.Header.Get("user"),
|
logger.Log(0, r.Header.Get("user"),
|
||||||
fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err))
|
fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err))
|
||||||
|
@ -583,9 +590,14 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
|
||||||
if len(removedClients) > 0 {
|
if len(removedClients) > 0 {
|
||||||
host, err := logic.GetHost(node.HostID.String())
|
host, err := logic.GetHost(node.HostID.String())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
allNodes, err := logic.GetAllNodes()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
go mq.PublishSingleHostPeerUpdate(
|
go mq.PublishSingleHostPeerUpdate(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
host,
|
host,
|
||||||
|
allNodes,
|
||||||
nil,
|
nil,
|
||||||
removedClients[:],
|
removedClients[:],
|
||||||
)
|
)
|
||||||
|
|
281
logic/peers.go
281
logic/peers.go
|
@ -3,7 +3,6 @@ package logic
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"net"
|
"net"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
|
|
||||||
|
@ -87,6 +86,7 @@ func GetProxyUpdateForHost(ctx context.Context, host *models.Host) (models.Proxy
|
||||||
|
|
||||||
// ResetPeerUpdateContext - kills any current peer updates and resets the context
|
// ResetPeerUpdateContext - kills any current peer updates and resets the context
|
||||||
func ResetPeerUpdateContext() {
|
func ResetPeerUpdateContext() {
|
||||||
|
return
|
||||||
if PeerUpdateCtx != nil && PeerUpdateStop != nil {
|
if PeerUpdateCtx != nil && PeerUpdateStop != nil {
|
||||||
PeerUpdateStop() // tell any current peer updates to stop
|
PeerUpdateStop() // tell any current peer updates to stop
|
||||||
}
|
}
|
||||||
|
@ -95,14 +95,11 @@ func ResetPeerUpdateContext() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
|
// GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
|
||||||
func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host, deletedNode *models.Node, deletedClients []models.ExtClient) (models.HostPeerUpdate, error) {
|
func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient) (models.HostPeerUpdate, error) {
|
||||||
if host == nil {
|
if host == nil {
|
||||||
return models.HostPeerUpdate{}, errors.New("host is nil")
|
return models.HostPeerUpdate{}, errors.New("host is nil")
|
||||||
}
|
}
|
||||||
allNodes, err := GetAllNodes()
|
|
||||||
if err != nil {
|
|
||||||
return models.HostPeerUpdate{}, err
|
|
||||||
}
|
|
||||||
// track which nodes are deleted
|
// track which nodes are deleted
|
||||||
// after peer calculation, if peer not in list, add delete config of peer
|
// after peer calculation, if peer not in list, add delete config of peer
|
||||||
hostPeerUpdate := models.HostPeerUpdate{
|
hostPeerUpdate := models.HostPeerUpdate{
|
||||||
|
@ -141,150 +138,150 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
|
||||||
nodePeerMap = make(map[string]models.PeerRouteInfo)
|
nodePeerMap = make(map[string]models.PeerRouteInfo)
|
||||||
}
|
}
|
||||||
for _, peer := range currentPeers {
|
for _, peer := range currentPeers {
|
||||||
select {
|
//select {
|
||||||
case <-ctx.Done():
|
// case <-ctx.Done():
|
||||||
logger.Log(2, "cancelled peer update for host", host.Name, host.ID.String())
|
// logger.Log(2, "cancelled peer update for host", host.Name, host.ID.String())
|
||||||
return models.HostPeerUpdate{}, fmt.Errorf("peer update cancelled")
|
// return models.HostPeerUpdate{}, fmt.Errorf("peer update cancelled")
|
||||||
default:
|
//default:
|
||||||
peer := peer
|
peer := peer
|
||||||
if peer.ID.String() == node.ID.String() {
|
if peer.ID.String() == node.ID.String() {
|
||||||
logger.Log(2, "peer update, skipping self")
|
logger.Log(2, "peer update, skipping self")
|
||||||
//skip yourself
|
//skip yourself
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
peerHost, err := GetHost(peer.HostID.String())
|
peerHost, err := GetHost(peer.HostID.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Log(1, "no peer host", peer.HostID.String(), err.Error())
|
logger.Log(1, "no peer host", peer.HostID.String(), err.Error())
|
||||||
return models.HostPeerUpdate{}, err
|
return models.HostPeerUpdate{}, err
|
||||||
}
|
}
|
||||||
peerConfig := wgtypes.PeerConfig{
|
peerConfig := wgtypes.PeerConfig{
|
||||||
PublicKey: peerHost.PublicKey,
|
PublicKey: peerHost.PublicKey,
|
||||||
PersistentKeepaliveInterval: &peer.PersistentKeepalive,
|
PersistentKeepaliveInterval: &peer.PersistentKeepalive,
|
||||||
ReplaceAllowedIPs: true,
|
ReplaceAllowedIPs: true,
|
||||||
}
|
}
|
||||||
if node.IsIngressGateway || node.IsEgressGateway {
|
if node.IsIngressGateway || node.IsEgressGateway {
|
||||||
if peer.IsIngressGateway {
|
if peer.IsIngressGateway {
|
||||||
_, extPeerIDAndAddrs, err := getExtPeers(&peer)
|
_, extPeerIDAndAddrs, err := getExtPeers(&peer)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
|
for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
|
||||||
extPeerIdAndAddr := extPeerIdAndAddr
|
extPeerIdAndAddr := extPeerIdAndAddr
|
||||||
nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
|
nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
|
||||||
PeerAddr: net.IPNet{
|
PeerAddr: net.IPNet{
|
||||||
IP: net.ParseIP(extPeerIdAndAddr.Address),
|
IP: net.ParseIP(extPeerIdAndAddr.Address),
|
||||||
Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
|
Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
|
||||||
},
|
},
|
||||||
PeerKey: extPeerIdAndAddr.ID,
|
PeerKey: extPeerIdAndAddr.ID,
|
||||||
Allow: true,
|
Allow: true,
|
||||||
ID: extPeerIdAndAddr.ID,
|
ID: extPeerIdAndAddr.ID,
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if node.IsIngressGateway && peer.IsEgressGateway {
|
|
||||||
hostPeerUpdate.IngressInfo.EgressRanges = append(hostPeerUpdate.IngressInfo.EgressRanges,
|
|
||||||
peer.EgressGatewayRanges...)
|
|
||||||
}
|
|
||||||
nodePeerMap[peerHost.PublicKey.String()] = models.PeerRouteInfo{
|
|
||||||
PeerAddr: net.IPNet{
|
|
||||||
IP: net.ParseIP(peer.PrimaryAddress()),
|
|
||||||
Mask: getCIDRMaskFromAddr(peer.PrimaryAddress()),
|
|
||||||
},
|
|
||||||
PeerKey: peerHost.PublicKey.String(),
|
|
||||||
Allow: true,
|
|
||||||
ID: peer.ID.String(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (node.IsRelayed && node.RelayedBy != peer.ID.String()) || (peer.IsRelayed && peer.RelayedBy != node.ID.String()) {
|
if node.IsIngressGateway && peer.IsEgressGateway {
|
||||||
// if node is relayed and peer is not the relay, set remove to true
|
hostPeerUpdate.IngressInfo.EgressRanges = append(hostPeerUpdate.IngressInfo.EgressRanges,
|
||||||
if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; ok {
|
peer.EgressGatewayRanges...)
|
||||||
continue
|
|
||||||
}
|
|
||||||
peerConfig.Remove = true
|
|
||||||
hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
|
|
||||||
peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
nodePeerMap[peerHost.PublicKey.String()] = models.PeerRouteInfo{
|
||||||
uselocal := false
|
PeerAddr: net.IPNet{
|
||||||
if host.EndpointIP.String() == peerHost.EndpointIP.String() {
|
IP: net.ParseIP(peer.PrimaryAddress()),
|
||||||
// peer is on same network
|
Mask: getCIDRMaskFromAddr(peer.PrimaryAddress()),
|
||||||
// set to localaddress
|
},
|
||||||
uselocal = true
|
PeerKey: peerHost.PublicKey.String(),
|
||||||
if node.LocalAddress.IP == nil {
|
Allow: true,
|
||||||
// use public endpint
|
ID: peer.ID.String(),
|
||||||
uselocal = false
|
|
||||||
}
|
|
||||||
if node.LocalAddress.String() == peer.LocalAddress.String() {
|
|
||||||
uselocal = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
peerConfig.Endpoint = &net.UDPAddr{
|
|
||||||
IP: peerHost.EndpointIP,
|
|
||||||
Port: getPeerWgListenPort(peerHost),
|
|
||||||
}
|
|
||||||
|
|
||||||
if uselocal {
|
|
||||||
peerConfig.Endpoint.IP = peer.LocalAddress.IP
|
|
||||||
peerConfig.Endpoint.Port = peerHost.ListenPort
|
|
||||||
}
|
|
||||||
allowedips := GetAllowedIPs(&node, &peer, nil)
|
|
||||||
if peer.Action != models.NODE_DELETE &&
|
|
||||||
!peer.PendingDelete &&
|
|
||||||
peer.Connected &&
|
|
||||||
nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) &&
|
|
||||||
(deletedNode == nil || (deletedNode != nil && peer.ID.String() != deletedNode.ID.String())) {
|
|
||||||
peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection
|
|
||||||
}
|
|
||||||
|
|
||||||
peerProxyPort := GetProxyListenPort(peerHost)
|
|
||||||
var nodePeer wgtypes.PeerConfig
|
|
||||||
if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; !ok {
|
|
||||||
hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()] = make(map[string]models.IDandAddr)
|
|
||||||
hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
|
|
||||||
peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
|
|
||||||
hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
|
|
||||||
ID: peer.ID.String(),
|
|
||||||
Address: peer.PrimaryAddress(),
|
|
||||||
Name: peerHost.Name,
|
|
||||||
Network: peer.Network,
|
|
||||||
ProxyListenPort: peerProxyPort,
|
|
||||||
}
|
|
||||||
hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
|
|
||||||
Interfaces: peerHost.Interfaces,
|
|
||||||
ProxyListenPort: peerProxyPort,
|
|
||||||
}
|
|
||||||
nodePeer = peerConfig
|
|
||||||
} else {
|
|
||||||
peerAllowedIPs := hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs
|
|
||||||
peerAllowedIPs = append(peerAllowedIPs, peerConfig.AllowedIPs...)
|
|
||||||
hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs = peerAllowedIPs
|
|
||||||
hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].Remove = false
|
|
||||||
hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
|
|
||||||
ID: peer.ID.String(),
|
|
||||||
Address: peer.PrimaryAddress(),
|
|
||||||
Name: peerHost.Name,
|
|
||||||
Network: peer.Network,
|
|
||||||
ProxyListenPort: GetProxyListenPort(peerHost),
|
|
||||||
}
|
|
||||||
hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
|
|
||||||
Interfaces: peerHost.Interfaces,
|
|
||||||
ProxyListenPort: peerProxyPort,
|
|
||||||
}
|
|
||||||
nodePeer = hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]]
|
|
||||||
}
|
|
||||||
|
|
||||||
if node.Network == network { // add to peers map for metrics
|
|
||||||
hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{
|
|
||||||
ID: peer.ID.String(),
|
|
||||||
Address: peer.PrimaryAddress(),
|
|
||||||
Name: peerHost.Name,
|
|
||||||
Network: peer.Network,
|
|
||||||
ProxyListenPort: peerHost.ProxyListenPort,
|
|
||||||
}
|
|
||||||
hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, nodePeer)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (node.IsRelayed && node.RelayedBy != peer.ID.String()) || (peer.IsRelayed && peer.RelayedBy != node.ID.String()) {
|
||||||
|
// if node is relayed and peer is not the relay, set remove to true
|
||||||
|
if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
peerConfig.Remove = true
|
||||||
|
hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
|
||||||
|
peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
uselocal := false
|
||||||
|
if host.EndpointIP.String() == peerHost.EndpointIP.String() {
|
||||||
|
// peer is on same network
|
||||||
|
// set to localaddress
|
||||||
|
uselocal = true
|
||||||
|
if node.LocalAddress.IP == nil {
|
||||||
|
// use public endpint
|
||||||
|
uselocal = false
|
||||||
|
}
|
||||||
|
if node.LocalAddress.String() == peer.LocalAddress.String() {
|
||||||
|
uselocal = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
peerConfig.Endpoint = &net.UDPAddr{
|
||||||
|
IP: peerHost.EndpointIP,
|
||||||
|
Port: getPeerWgListenPort(peerHost),
|
||||||
|
}
|
||||||
|
|
||||||
|
if uselocal {
|
||||||
|
peerConfig.Endpoint.IP = peer.LocalAddress.IP
|
||||||
|
peerConfig.Endpoint.Port = peerHost.ListenPort
|
||||||
|
}
|
||||||
|
allowedips := GetAllowedIPs(&node, &peer, nil)
|
||||||
|
if peer.Action != models.NODE_DELETE &&
|
||||||
|
!peer.PendingDelete &&
|
||||||
|
peer.Connected &&
|
||||||
|
nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) &&
|
||||||
|
(deletedNode == nil || (deletedNode != nil && peer.ID.String() != deletedNode.ID.String())) {
|
||||||
|
peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection
|
||||||
|
}
|
||||||
|
|
||||||
|
peerProxyPort := GetProxyListenPort(peerHost)
|
||||||
|
var nodePeer wgtypes.PeerConfig
|
||||||
|
if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; !ok {
|
||||||
|
hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()] = make(map[string]models.IDandAddr)
|
||||||
|
hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
|
||||||
|
peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
|
||||||
|
hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
|
||||||
|
ID: peer.ID.String(),
|
||||||
|
Address: peer.PrimaryAddress(),
|
||||||
|
Name: peerHost.Name,
|
||||||
|
Network: peer.Network,
|
||||||
|
ProxyListenPort: peerProxyPort,
|
||||||
|
}
|
||||||
|
hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
|
||||||
|
Interfaces: peerHost.Interfaces,
|
||||||
|
ProxyListenPort: peerProxyPort,
|
||||||
|
}
|
||||||
|
nodePeer = peerConfig
|
||||||
|
} else {
|
||||||
|
peerAllowedIPs := hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs
|
||||||
|
peerAllowedIPs = append(peerAllowedIPs, peerConfig.AllowedIPs...)
|
||||||
|
hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs = peerAllowedIPs
|
||||||
|
hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].Remove = false
|
||||||
|
hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
|
||||||
|
ID: peer.ID.String(),
|
||||||
|
Address: peer.PrimaryAddress(),
|
||||||
|
Name: peerHost.Name,
|
||||||
|
Network: peer.Network,
|
||||||
|
ProxyListenPort: GetProxyListenPort(peerHost),
|
||||||
|
}
|
||||||
|
hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
|
||||||
|
Interfaces: peerHost.Interfaces,
|
||||||
|
ProxyListenPort: peerProxyPort,
|
||||||
|
}
|
||||||
|
nodePeer = hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]]
|
||||||
|
}
|
||||||
|
|
||||||
|
if node.Network == network { // add to peers map for metrics
|
||||||
|
hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{
|
||||||
|
ID: peer.ID.String(),
|
||||||
|
Address: peer.PrimaryAddress(),
|
||||||
|
Name: peerHost.Name,
|
||||||
|
Network: peer.Network,
|
||||||
|
ProxyListenPort: peerHost.ProxyListenPort,
|
||||||
|
}
|
||||||
|
hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, nodePeer)
|
||||||
|
}
|
||||||
|
//}
|
||||||
}
|
}
|
||||||
var extPeers []wgtypes.PeerConfig
|
var extPeers []wgtypes.PeerConfig
|
||||||
var extPeerIDAndAddrs []models.IDandAddr
|
var extPeerIDAndAddrs []models.IDandAddr
|
||||||
|
|
|
@ -107,7 +107,11 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err = PublishSingleHostPeerUpdate(context.Background(), currentHost, nil, nil); err != nil {
|
nodes, err := logic.GetAllNodes()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = PublishSingleHostPeerUpdate(context.Background(), currentHost, nodes, nil, nil); err != nil {
|
||||||
slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
|
slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -235,7 +239,11 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
|
||||||
slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network)
|
slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network)
|
||||||
host, err := logic.GetHost(currentNode.HostID.String())
|
host, err := logic.GetHost(currentNode.HostID.String())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if err = PublishSingleHostPeerUpdate(context.Background(), host, nil, nil); err != nil {
|
nodes, err := logic.GetAllNodes()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = PublishSingleHostPeerUpdate(context.Background(), host, nodes, nil, nil); err != nil {
|
||||||
slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
|
slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,10 +24,14 @@ func PublishPeerUpdate() error {
|
||||||
logger.Log(1, "err getting all hosts", err.Error())
|
logger.Log(1, "err getting all hosts", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
allNodes, err := logic.GetAllNodes()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
logic.ResetPeerUpdateContext()
|
logic.ResetPeerUpdateContext()
|
||||||
for _, host := range hosts {
|
for _, host := range hosts {
|
||||||
host := host
|
host := host
|
||||||
if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, nil); err != nil {
|
if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, allNodes, nil, nil); err != nil {
|
||||||
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
|
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -46,10 +50,14 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
|
||||||
logger.Log(1, "err getting all hosts", err.Error())
|
logger.Log(1, "err getting all hosts", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
allNodes, err := logic.GetAllNodes()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
logic.ResetPeerUpdateContext()
|
logic.ResetPeerUpdateContext()
|
||||||
for _, host := range hosts {
|
for _, host := range hosts {
|
||||||
host := host
|
host := host
|
||||||
if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, delNode, nil); err != nil {
|
if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, allNodes, delNode, nil); err != nil {
|
||||||
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
|
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -68,10 +76,14 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
|
||||||
logger.Log(1, "err getting all hosts", err.Error())
|
logger.Log(1, "err getting all hosts", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
nodes, err := logic.GetAllNodes()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
logic.ResetPeerUpdateContext()
|
logic.ResetPeerUpdateContext()
|
||||||
for _, host := range hosts {
|
for _, host := range hosts {
|
||||||
host := host
|
host := host
|
||||||
if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, []models.ExtClient{*delClient}); err != nil {
|
if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nodes, nil, []models.ExtClient{*delClient}); err != nil {
|
||||||
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
|
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -79,9 +91,9 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
|
// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
|
||||||
func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, deletedNode *models.Node, deletedClients []models.ExtClient) error {
|
func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient) error {
|
||||||
|
|
||||||
peerUpdate, err := logic.GetPeerUpdateForHost(ctx, "", host, deletedNode, deletedClients)
|
peerUpdate, err := logic.GetPeerUpdateForHost(ctx, "", host, allNodes, deletedNode, deletedClients)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -436,7 +448,10 @@ func sendPeers() {
|
||||||
if err != nil && len(hosts) > 0 {
|
if err != nil && len(hosts) > 0 {
|
||||||
logger.Log(1, "error retrieving networks for keepalive", err.Error())
|
logger.Log(1, "error retrieving networks for keepalive", err.Error())
|
||||||
}
|
}
|
||||||
|
nodes, err := logic.GetAllNodes()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
var force bool
|
var force bool
|
||||||
peer_force_send++
|
peer_force_send++
|
||||||
if peer_force_send == 5 {
|
if peer_force_send == 5 {
|
||||||
|
@ -455,7 +470,7 @@ func sendPeers() {
|
||||||
for _, host := range hosts {
|
for _, host := range hosts {
|
||||||
host := host
|
host := host
|
||||||
logger.Log(2, "sending scheduled peer update (5 min)")
|
logger.Log(2, "sending scheduled peer update (5 min)")
|
||||||
if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, nil); err != nil {
|
if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nodes, nil, nil); err != nil {
|
||||||
logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
|
logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue