Merge pull request #2064 from gravitl/GRA-1264-better-peer-mgmt

Gra 1264 better peer mgmt
This commit is contained in:
dcarns 2023-02-24 11:17:56 -05:00 committed by GitHub
commit 3244472481
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 87 additions and 204 deletions

View file

@ -389,7 +389,7 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
logger.Log(0, r.Header.Get("user"), "created new ext client on network", networkName)
w.WriteHeader(http.StatusOK)
go func() {
err = mq.PublishExtPeerUpdate(&node)
err = mq.PublishPeerUpdate()
if err != nil {
logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error())
}
@ -488,7 +488,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
logger.Log(0, r.Header.Get("user"), "updated ext client", newExtClient.ClientID)
if changedEnabled { // need to send a peer update to the ingress node as enablement of one of it's clients has changed
if ingressNode, err := logic.GetNodeByID(newclient.IngressGatewayID); err == nil {
if err = mq.PublishExtPeerUpdate(&ingressNode); err != nil {
if err = mq.PublishPeerUpdate(); err != nil {
logger.Log(1, "error setting ext peers on", ingressNode.ID.String(), ":", err.Error())
}
}
@ -567,11 +567,10 @@ func deleteExtClient(w http.ResponseWriter, r *http.Request) {
}
go func() {
err = mq.PublishExtPeerUpdate(&ingressnode)
if err != nil {
if err := mq.PublishPeerUpdate(); err != nil {
logger.Log(1, "error setting ext peers on "+ingressnode.ID.String()+": "+err.Error())
}
if err := mq.PublishDeleteExtClientDNS(&extclient); err != nil {
if err = mq.PublishDeleteExtClientDNS(&extclient); err != nil {
logger.Log(1, "error publishing dns update for extclient deletion", err.Error())
}
}()

View file

@ -433,7 +433,7 @@ func getNode(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return
}
hostPeerUpdate, err := logic.GetPeerUpdateForHost(node.Network, host)
hostPeerUpdate, err := logic.GetPeerUpdateForHost(node.Network, host, nil)
if err != nil && !database.IsEmptyRecord(err) {
logger.Log(0, r.Header.Get("user"),
fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err))
@ -616,7 +616,7 @@ func createNode(w http.ResponseWriter, r *http.Request) {
return
}
}
hostPeerUpdate, err := logic.GetPeerUpdateForHost(networkName, &data.Host)
hostPeerUpdate, err := logic.GetPeerUpdateForHost(networkName, &data.Host, nil)
if err != nil && !database.IsEmptyRecord(err) {
logger.Log(0, r.Header.Get("user"),
fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", data.Host.ID.String(), err))
@ -985,10 +985,17 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
if !fromNode { // notify node change
runUpdates(&node, false)
}
go func() { // notify of peer change
if err := mq.PublishPeerUpdate(); err != nil {
go func(deletedNode *models.Node, fromNode bool) { // notify of peer change
var err error
if fromNode {
err = mq.PublishDeletedNodePeerUpdate(deletedNode)
} else {
err = mq.PublishPeerUpdate()
}
if err != nil {
logger.Log(1, "error publishing peer update ", err.Error())
}
host, err := logic.GetHost(node.HostID.String())
if err != nil {
logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error())
@ -996,7 +1003,7 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
if err := mq.PublishDNSDelete(&node, host); err != nil {
logger.Log(1, "error publishing dns update", err.Error())
}
}()
}(&node, fromNode)
}
func runUpdates(node *models.Node, ifaceDelta bool) {

View file

@ -6,7 +6,6 @@ import (
"log"
"net"
"net/netip"
"time"
"github.com/gravitl/netmaker/database"
"github.com/gravitl/netmaker/logger"
@ -17,177 +16,6 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)
// GetPeersforProxy calculates the peers for a proxy
// TODO ==========================
// TODO ==========================
// TODO ==========================
// TODO ==========================
// TODO ==========================
// revisit this logic with new host/node models.
func GetPeersForProxy(node *models.Node, onlyPeers bool) (models.ProxyManagerPayload, error) {
proxyPayload := models.ProxyManagerPayload{}
var peers []wgtypes.PeerConfig
peerConfMap := make(map[string]models.PeerConf)
var err error
currentPeers, err := GetNetworkNodes(node.Network)
if err != nil {
return proxyPayload, err
}
if !onlyPeers {
if node.IsRelayed {
relayNode := FindRelay(node)
relayHost, err := GetHost(relayNode.HostID.String())
if err != nil {
return proxyPayload, err
}
if relayNode != nil {
host, err := GetHost(relayNode.HostID.String())
if err != nil {
logger.Log(0, "error retrieving host for relay node", relayNode.HostID.String(), err.Error())
}
relayEndpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayHost.EndpointIP, host.ListenPort))
if err != nil {
logger.Log(1, "failed to resolve relay node endpoint: ", err.Error())
}
proxyPayload.IsRelayed = true
proxyPayload.RelayedTo = relayEndpoint
} else {
logger.Log(0, "couldn't find relay node for: ", node.ID.String())
}
}
if node.IsRelay {
host, err := GetHost(node.HostID.String())
if err != nil {
logger.Log(0, "error retrieving host for relay node", node.ID.String(), err.Error())
}
relayedNodes, err := GetRelayedNodes(node)
if err != nil {
logger.Log(1, "failed to relayed nodes: ", node.ID.String(), err.Error())
proxyPayload.IsRelay = false
} else {
relayPeersMap := make(map[string]models.RelayedConf)
for _, relayedNode := range relayedNodes {
relayedNode := relayedNode
payload, err := GetPeersForProxy(&relayedNode, true)
if err == nil {
relayedHost, err := GetHost(relayedNode.HostID.String())
if err != nil {
logger.Log(0, "error retrieving host for relayNode", relayedNode.ID.String(), err.Error())
}
relayedEndpoint, udpErr := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayedHost.EndpointIP, host.ListenPort))
if udpErr == nil {
relayPeersMap[host.PublicKey.String()] = models.RelayedConf{
RelayedPeerEndpoint: relayedEndpoint,
RelayedPeerPubKey: relayedHost.PublicKey.String(),
Peers: payload.Peers,
}
}
}
}
proxyPayload.IsRelay = true
proxyPayload.RelayedPeerConf = relayPeersMap
}
}
}
for _, peer := range currentPeers {
if peer.ID == node.ID {
//skip yourself
continue
}
host, err := GetHost(peer.HostID.String())
if err != nil {
continue
}
proxyStatus := host.ProxyEnabled
listenPort := host.ListenPort
if proxyStatus {
listenPort = host.ProxyListenPort
if listenPort == 0 {
listenPort = models.NmProxyPort
}
} else if listenPort == 0 {
listenPort = host.ListenPort
}
endpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host.EndpointIP, listenPort))
if err != nil {
logger.Log(1, "failed to resolve udp addr for node: ", peer.ID.String(), host.EndpointIP.String(), err.Error())
continue
}
allowedips := GetAllowedIPs(node, &peer, nil)
var keepalive time.Duration
if node.PersistentKeepalive != 0 {
// set_keepalive
keepalive = node.PersistentKeepalive
}
peers = append(peers, wgtypes.PeerConfig{
PublicKey: host.PublicKey,
Endpoint: endpoint,
AllowedIPs: allowedips,
PersistentKeepaliveInterval: &keepalive,
ReplaceAllowedIPs: true,
})
peerConfMap[host.PublicKey.String()] = models.PeerConf{
Address: net.ParseIP(peer.PrimaryAddress()),
Proxy: proxyStatus,
PublicListenPort: int32(listenPort),
}
if !onlyPeers && peer.IsRelayed {
relayNode := FindRelay(&peer)
if relayNode != nil {
relayHost, err := GetHost(relayNode.HostID.String())
if err != nil {
logger.Log(0, "error retrieving host for relayNode", relayNode.ID.String(), err.Error())
continue
}
relayTo, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayHost.EndpointIP, relayHost.ListenPort))
if err == nil {
peerConfMap[host.PublicKey.String()] = models.PeerConf{
IsRelayed: true,
RelayedTo: relayTo,
Address: net.ParseIP(peer.PrimaryAddress()),
Proxy: proxyStatus,
PublicListenPort: int32(listenPort),
}
}
}
}
}
if node.IsIngressGateway {
var extPeers []wgtypes.PeerConfig
extPeers, peerConfMap, err = getExtPeersForProxy(node, peerConfMap)
if err == nil {
peers = append(peers, extPeers...)
} else if !database.IsEmptyRecord(err) {
logger.Log(1, "error retrieving external clients:", err.Error())
}
}
proxyPayload.IsIngress = node.IsIngressGateway
addr := node.Address
if addr.String() == "" {
addr = node.Address6
}
proxyPayload.Peers = peers
proxyPayload.PeerMap = peerConfMap
//proxyPayload.Network = node.Network
//proxyPayload.InterfaceName = node.Interface
//hardcode or read from host ??
proxyPayload.InterfaceName = models.WIREGUARD_INTERFACE
return proxyPayload, nil
}
// GetProxyUpdateForHost - gets the proxy update for host
func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error) {
proxyPayload := models.ProxyManagerPayload{
@ -213,7 +41,7 @@ func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error
relayPeersMap := make(map[string]models.RelayedConf)
for _, relayedHost := range relayedHosts {
relayedHost := relayedHost
payload, err := GetPeerUpdateForHost("", &relayedHost)
payload, err := GetPeerUpdateForHost("", &relayedHost, nil)
if err == nil {
relayedEndpoint, udpErr := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayedHost.EndpointIP, GetPeerListenPort(&relayedHost)))
if udpErr == nil {
@ -290,10 +118,12 @@ func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error
}
// GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpdate, error) {
func GetPeerUpdateForHost(network string, host *models.Host, deletedNode *models.Node) (models.HostPeerUpdate, error) {
if host == nil {
return models.HostPeerUpdate{}, errors.New("host is nil")
}
// track which nodes are deleted
// after peer calculation, if peer not in list, add delete config of peer
hostPeerUpdate := models.HostPeerUpdate{
Host: *host,
Server: servercfg.GetServer(),
@ -308,6 +138,10 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd
Peers: []wgtypes.PeerConfig{},
NodePeers: []wgtypes.PeerConfig{},
}
var deletedNodes = []models.Node{} // used to track deleted nodes
if deletedNode != nil {
deletedNodes = append(deletedNodes, *deletedNode)
}
logger.Log(1, "peer update for host ", host.ID.String())
peerIndexMap := make(map[string]int)
for _, nodeID := range host.Nodes {
@ -315,7 +149,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd
if err != nil {
continue
}
if !node.Connected || node.Action == models.NODE_DELETE || node.PendingDelete {
if !node.Connected || node.PendingDelete || node.Action == models.NODE_DELETE {
continue
}
currentPeers, err := GetNetworkNodes(node.Network)
@ -328,10 +162,14 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd
nodePeerMap = make(map[string]models.PeerRouteInfo)
}
for _, peer := range currentPeers {
peer := peer
if peer.ID == node.ID {
logger.Log(2, "peer update, skipping self")
//skip yourself
continue
}
if peer.Action == models.NODE_DELETE || peer.PendingDelete {
deletedNodes = append(deletedNodes, peer) // track deleted node for peer update
continue
}
var peerConfig wgtypes.PeerConfig
@ -341,7 +179,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd
return models.HostPeerUpdate{}, err
}
if !peer.Connected || peer.Action == models.NODE_DELETE || peer.PendingDelete {
if !peer.Connected {
logger.Log(2, "peer update, skipping unconnected node", peer.ID.String())
//skip unconnected nodes
continue
@ -393,6 +231,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd
_, extPeerIDAndAddrs, err := getExtPeers(&peer)
if err == nil {
for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
extPeerIdAndAddr := extPeerIdAndAddr
nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
PeerAddr: net.IPNet{
IP: net.ParseIP(extPeerIdAndAddr.Address),
@ -459,6 +298,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd
extPeers, extPeerIDAndAddrs, err = getExtPeers(&node)
if err == nil {
for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
extPeerIdAndAddr := extPeerIdAndAddr
nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
PeerAddr: net.IPNet{
IP: net.ParseIP(extPeerIdAndAddr.Address),
@ -470,6 +310,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd
}
hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, extPeers...)
for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
extPeerIdAndAddr := extPeerIdAndAddr
hostPeerUpdate.HostPeerIDs[extPeerIdAndAddr.ID] = make(map[string]models.IDandAddr)
hostPeerUpdate.HostPeerIDs[extPeerIdAndAddr.ID][extPeerIdAndAddr.ID] = models.IDandAddr{
ID: extPeerIdAndAddr.ID,
@ -514,9 +355,32 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd
}
}
// run through delete nodes
if len(deletedNodes) > 0 {
for i := range deletedNodes {
delNode := deletedNodes[i]
delHost, err := GetHost(delNode.HostID.String())
if err != nil {
continue
}
if _, ok := hostPeerUpdate.HostPeerIDs[delHost.PublicKey.String()]; !ok {
var peerConfig = wgtypes.PeerConfig{}
peerConfig.PublicKey = delHost.PublicKey
peerConfig.Endpoint = &net.UDPAddr{
IP: delHost.EndpointIP,
Port: GetPeerListenPort(delHost),
}
peerConfig.Remove = true
peerConfig.AllowedIPs = []net.IPNet{delNode.Address, delNode.Address6}
hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
}
}
}
return hostPeerUpdate, nil
}
// GetPeerListenPort - given a host, retrieve it's appropriate listening port
func GetPeerListenPort(host *models.Host) int {
peerPort := host.ListenPort
if host.ProxyEnabled {

View file

@ -10,8 +10,8 @@ import (
)
const (
// ZOMBIE_TIMEOUT - timeout in seconds for checking zombie status
ZOMBIE_TIMEOUT = 60
// ZOMBIE_TIMEOUT - timeout in hours for checking zombie status
ZOMBIE_TIMEOUT = 6
// ZOMBIE_DELETE_TIME - timeout in minutes for zombie node deletion
ZOMBIE_DELETE_TIME = 10
)
@ -86,7 +86,7 @@ func ManageZombies(ctx context.Context, peerUpdate chan *models.Node) {
zombies = append(zombies, id)
case id := <-newHostZombie:
hostZombies = append(hostZombies, id)
case <-time.After(time.Second * ZOMBIE_TIMEOUT):
case <-time.After(time.Hour * ZOMBIE_TIMEOUT): // run this check 4 times a day
logger.Log(3, "checking for zombie nodes")
if len(zombies) > 0 {
for i := len(zombies) - 1; i >= 0; i-- {

View file

@ -227,11 +227,10 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
host, err := logic.GetHost(currentNode.HostID.String())
if err == nil {
if err = PublishSingleHostUpdate(host); err != nil {
if err = PublishSingleHostPeerUpdate(host, nil); err != nil {
logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
}
}
}
logger.Log(1, "updated node metrics", id)

View file

@ -25,7 +25,7 @@ func PublishPeerUpdate() error {
}
for _, host := range hosts {
host := host
err = PublishSingleHostUpdate(&host)
err = PublishSingleHostPeerUpdate(&host, nil)
if err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
}
@ -33,10 +33,31 @@ func PublishPeerUpdate() error {
return err
}
// PublishSingleHostUpdate --- determines and publishes a peer update to one host
func PublishSingleHostUpdate(host *models.Host) error {
// PublishDeletedNodePeerUpdate --- determines and publishes a peer update
// to all the hosts with a deleted node to account for
func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
if !servercfg.IsMessageQueueBackend() {
return nil
}
peerUpdate, err := logic.GetPeerUpdateForHost("", host)
hosts, err := logic.GetAllHosts()
if err != nil {
logger.Log(1, "err getting all hosts", err.Error())
return err
}
for _, host := range hosts {
host := host
if err = PublishSingleHostPeerUpdate(&host, delNode); err != nil {
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
}
}
return err
}
// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
func PublishSingleHostPeerUpdate(host *models.Host, deletedNode *models.Node) error {
peerUpdate, err := logic.GetPeerUpdateForHost("", host, deletedNode)
if err != nil {
return err
}
@ -56,13 +77,6 @@ func PublishSingleHostUpdate(host *models.Host) error {
return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
}
// PublishExtPeerUpdate --- publishes a peer update to all the peers of a node
func PublishExtPeerUpdate(node *models.Node) error {
go PublishPeerUpdate()
return nil
}
// NodeUpdate -- publishes a node update
func NodeUpdate(node *models.Node) error {
host, err := logic.GetHost(node.HostID.String())
@ -410,7 +424,7 @@ func sendPeers() {
if force {
host := host
logger.Log(2, "sending scheduled peer update (5 min)")
err = PublishSingleHostUpdate(&host)
err = PublishSingleHostPeerUpdate(&host, nil)
if err != nil {
logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
}