refactor proxy updates

This commit is contained in:
Abhishek Kondur 2022-11-07 01:25:04 +05:30
parent acae6c3aed
commit a7c0abe2fc
14 changed files with 192 additions and 300 deletions

View file

@ -3,7 +3,6 @@ package controller
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net"
"net/http" "net/http"
"strings" "strings"
@ -15,10 +14,8 @@ import (
"github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/models/promodels" "github.com/gravitl/netmaker/models/promodels"
"github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/mq"
"github.com/gravitl/netmaker/nm-proxy/manager"
"github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/servercfg"
"golang.org/x/crypto/bcrypt" "golang.org/x/crypto/bcrypt"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
func nodeHandlers(r *mux.Router) { func nodeHandlers(r *mux.Router) {
@ -1015,25 +1012,6 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
if servercfg.IsDNSMode() { if servercfg.IsDNSMode() {
logic.SetDNS() logic.SetDNS()
} }
wgPubKey, wgErr := wgtypes.ParseKey(newNode.PublicKey)
nodeEndpoint, udpErr := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", newNode.Endpoint, newNode.LocalListenPort))
if wgErr == nil && udpErr == nil {
logic.ProxyMgmChan <- &manager.ManagerAction{
Action: manager.UpdatePeer,
Payload: manager.ManagerPayload{
InterfaceName: newNode.Interface,
Peers: []wgtypes.PeerConfig{
{
PublicKey: wgPubKey,
Endpoint: nodeEndpoint,
},
},
},
}
} else {
logger.Log(1, fmt.Sprintf("failed to send node update to proxy, wgErr: %v, udpErr: %v", wgErr, udpErr))
}
logger.Log(1, r.Header.Get("user"), "updated node", node.ID, "on network", node.Network) logger.Log(1, r.Header.Get("user"), "updated node", node.ID, "on network", node.Network)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(newNode) json.NewEncoder(w).Encode(newNode)
@ -1121,20 +1099,6 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
logger.Log(0, "failed to reset failover lists during node delete for node", node.Name, node.Network) logger.Log(0, "failed to reset failover lists during node delete for node", node.Name, node.Network)
} }
} }
wgKey, _ := wgtypes.ParseKey(node.PublicKey)
endpoint, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", node.Endpoint, node.LocalListenPort))
logic.ProxyMgmChan <- &manager.ManagerAction{
Action: manager.DeletePeer,
Payload: manager.ManagerPayload{
InterfaceName: node.Interface,
Peers: []wgtypes.PeerConfig{
{
PublicKey: wgKey,
Endpoint: endpoint,
},
},
},
}
logic.ReturnSuccessResponse(w, r, nodeid+" deleted.") logic.ReturnSuccessResponse(w, r, nodeid+" deleted.")
logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"]) logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
runUpdates(&node, false) runUpdates(&node, false)
@ -1151,6 +1115,7 @@ func runUpdates(node *models.Node, ifaceDelta bool) {
if err := runServerUpdate(node, ifaceDelta); err != nil { if err := runServerUpdate(node, ifaceDelta); err != nil {
logger.Log(1, "error running server update", err.Error()) logger.Log(1, "error running server update", err.Error())
} }
}() }()
} }

View file

@ -10,8 +10,6 @@ import (
"github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/mq"
"github.com/gravitl/netmaker/nm-proxy/manager"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
// swagger:route POST /api/nodes/{network}/{nodeid}/createrelay nodes createRelay // swagger:route POST /api/nodes/{network}/{nodeid}/createrelay nodes createRelay
@ -45,49 +43,15 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
return return
} }
relayPeersMap := make(map[string][]wgtypes.PeerConfig)
logger.Log(1, r.Header.Get("user"), "created relay on node", relay.NodeID, "on network", relay.NetID) logger.Log(1, r.Header.Get("user"), "created relay on node", relay.NodeID, "on network", relay.NetID)
for _, relayedNode := range updatenodes { for _, relayedNode := range updatenodes {
peers, err := logic.GetPeersForProxy(&relayedNode)
if err == nil {
relayPeersMap[relayedNode.PublicKey] = peers
}
// relayEndpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", node.Endpoint, node.LocalListenPort))
// if err != nil {
// logger.Log(1, "failed to resolve relay node endpoint: ", err.Error())
// }
// err = mq.ProxyUpdate(&manager.ManagerAction{
// Action: manager.AddInterface,
// Payload: manager.ManagerPayload{
// InterfaceName: relayedNode.Interface,
// IsRelayed: true,
// Peers: peers,
// RelayedTo: relayEndpoint,
// },
// }, &node)
// if err != nil {
// logger.Log(1, "failed to send proxy update for relayed node: ", err.Error())
// }
err = mq.NodeUpdate(&relayedNode) err = mq.NodeUpdate(&relayedNode)
if err != nil { if err != nil {
logger.Log(1, "error sending update to relayed node ", relayedNode.Name, "on network", relay.NetID, ": ", err.Error()) logger.Log(1, "error sending update to relayed node ", relayedNode.Name, "on network", relay.NetID, ": ", err.Error())
} }
} }
// send proxy update for node that is relaying traffic
logger.Log(0, "--------> sending relay update to proxy")
err = mq.ProxyUpdate(&manager.ManagerAction{
Action: manager.RelayPeers,
Payload: manager.ManagerPayload{
IsRelay: true,
RelayedPeers: relayPeersMap,
},
}, &node)
if err != nil {
logger.Log(1, "failed to send proxy update: ", err.Error())
}
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(node) json.NewEncoder(w).Encode(node)
runUpdates(&node, true) runUpdates(&node, true)

View file

@ -3,7 +3,6 @@ package logic
import ( import (
"errors" "errors"
"fmt" "fmt"
"log"
"net" "net"
"strconv" "strconv"
"strings" "strings"
@ -14,24 +13,56 @@ import (
"github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic/acls/nodeacls" "github.com/gravitl/netmaker/logic/acls/nodeacls"
"github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/nm-proxy/manager"
"github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/servercfg"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
func GetPeersForProxy(node *models.Node) ([]wgtypes.PeerConfig, error) { func GetPeersForProxy(node *models.Node, onlyPeers bool) (manager.ManagerPayload, error) {
proxyPayload := manager.ManagerPayload{}
var peers []wgtypes.PeerConfig var peers []wgtypes.PeerConfig
peerConfMap := make(map[string]manager.PeerConf)
var err error var err error
currentPeers, err := GetNetworkNodes(node.Network) currentPeers, err := GetNetworkNodes(node.Network)
if err != nil { if err != nil {
return peers, err return proxyPayload, err
} }
if !onlyPeers {
if node.IsRelayed == "yes" {
relayNode := FindRelay(node)
relayEndpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayNode.Endpoint, relayNode.LocalListenPort))
if err != nil {
logger.Log(1, "failed to resolve relay node endpoint: ", err.Error())
}
proxyPayload.IsRelayed = true
proxyPayload.RelayedTo = relayEndpoint
}
if node.IsRelay == "yes" {
relayedNodes, err := GetRelayedNodes(node)
if err != nil {
logger.Log(1, "failed to relayed nodes: ", node.Name, err.Error())
proxyPayload.IsRelay = false
} else {
relayPeersMap := make(map[string][]wgtypes.PeerConfig)
for _, relayedNode := range relayedNodes {
payload, err := GetPeersForProxy(&relayedNode, true)
if err == nil {
relayPeersMap[relayedNode.PublicKey] = payload.Peers
}
}
proxyPayload.IsRelay = true
proxyPayload.RelayedPeers = relayPeersMap
}
}
}
for _, peer := range currentPeers { for _, peer := range currentPeers {
if peer.ID == node.ID { if peer.ID == node.ID {
//skip yourself //skip yourself
continue continue
} }
log.Printf("----------> PEER: %s, Endpoint: %s, LocalPort: %d", peer.ID, peer.Endpoint, peer.LocalListenPort)
pubkey, err := wgtypes.ParseKey(peer.PublicKey) pubkey, err := wgtypes.ParseKey(peer.PublicKey)
if err != nil { if err != nil {
logger.Log(1, "failed to parse node pub key: ", peer.ID) logger.Log(1, "failed to parse node pub key: ", peer.ID)
@ -48,7 +79,6 @@ func GetPeersForProxy(node *models.Node) ([]wgtypes.PeerConfig, error) {
// set_keepalive // set_keepalive
keepalive, _ = time.ParseDuration(strconv.FormatInt(int64(node.PersistentKeepalive), 10) + "s") keepalive, _ = time.ParseDuration(strconv.FormatInt(int64(node.PersistentKeepalive), 10) + "s")
} }
log.Printf("---------->##### PEER: %s, Endpoint: %s, LocalPort: %d", peer.ID, endpoint, peer.LocalListenPort)
peers = append(peers, wgtypes.PeerConfig{ peers = append(peers, wgtypes.PeerConfig{
PublicKey: pubkey, PublicKey: pubkey,
Endpoint: endpoint, Endpoint: endpoint,
@ -56,8 +86,25 @@ func GetPeersForProxy(node *models.Node) ([]wgtypes.PeerConfig, error) {
PersistentKeepaliveInterval: &keepalive, PersistentKeepaliveInterval: &keepalive,
ReplaceAllowedIPs: true, ReplaceAllowedIPs: true,
}) })
if !onlyPeers && peer.IsRelayed == "yes" {
relayNode := FindRelay(&peer)
if relayNode != nil {
relayTo, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", peer.Endpoint, peer.LocalListenPort))
if err == nil {
peerConfMap[peer.PublicKey] = manager.PeerConf{
IsRelayed: true,
RelayedTo: relayTo,
} }
return peers, nil }
}
}
}
proxyPayload.Peers = peers
proxyPayload.PeerMap = peerConfMap
proxyPayload.InterfaceName = node.Interface
return proxyPayload, nil
} }
// GetPeerUpdate - gets a wireguard peer config for each peer of a node // GetPeerUpdate - gets a wireguard peer config for each peer of a node
@ -237,16 +284,6 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) {
peerUpdate.ServerAddrs = serverNodeAddresses peerUpdate.ServerAddrs = serverNodeAddresses
peerUpdate.DNS = getPeerDNS(node.Network) peerUpdate.DNS = getPeerDNS(node.Network)
peerUpdate.PeerIDs = peerMap peerUpdate.PeerIDs = peerMap
if node.IsRelayed == "yes" {
relayNode := FindRelay(node)
relayEndpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayNode.Endpoint, relayNode.LocalListenPort))
if err != nil {
logger.Log(1, "failed to resolve relay node endpoint: ", err.Error())
}
peerUpdate.IsRelayed = true
peerUpdate.RelayTo = relayEndpoint
}
return peerUpdate, nil return peerUpdate, nil
} }

View file

@ -175,38 +175,12 @@ func ServerJoin(networkSettings *models.Network) (models.Node, error) {
if err != nil { if err != nil {
return returnNode, err return returnNode, err
} }
logger.Log(0, "--------> Hereeeeeee23333") proxyPayload, err := GetPeersForProxy(node, false)
proxyPayload := manager.ManagerPayload{ if err != nil && !ncutils.IsEmptyRecord(err) {
IsRelay: node.IsRelay == "yes", logger.Log(1, "failed to retrieve peers")
InterfaceName: node.Interface, return returnNode, err
Peers: peers.Peers,
} }
// if proxyPayload.IsRelayed {
// relayNode := FindRelay(node)
// relayEndpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayNode.Endpoint, relayNode.LocalListenPort))
// if err != nil {
// logger.Log(1, "failed to resolve relay node endpoint: ", err.Error())
// proxyPayload.IsRelayed = false
// }
// proxyPayload.RelayedTo = relayEndpoint
// }
if proxyPayload.IsRelay {
relayedNodes, err := GetRelayedNodes(node)
if err != nil {
logger.Log(1, "failed to relayed nodes: ", node.Name, err.Error())
proxyPayload.IsRelay = false
} else {
relayPeersMap := make(map[string][]wgtypes.PeerConfig)
for _, relayedNode := range relayedNodes {
peers, err := GetPeersForProxy(&relayedNode)
if err == nil {
relayPeersMap[relayedNode.PublicKey] = peers
}
}
proxyPayload.RelayedPeers = relayPeersMap
}
}
ProxyMgmChan <- &manager.ManagerAction{ ProxyMgmChan <- &manager.ManagerAction{
Action: manager.AddInterface, Action: manager.AddInterface,
Payload: proxyPayload, Payload: proxyPayload,

View file

@ -161,31 +161,11 @@ func setWGConfig(node *models.Node, peerupdate bool) error {
} }
logger.Log(0, "--------> ADD/Update INTERFACE TO PROXY.....") logger.Log(0, "--------> ADD/Update INTERFACE TO PROXY.....")
peersP, err := GetPeersForProxy(node) proxyPayload, err := GetPeersForProxy(node, false)
if err != nil { if err != nil {
logger.Log(0, "failed to get peers for proxy: ", err.Error()) logger.Log(0, "failed to get peers for proxy: ", err.Error())
} else { } else {
proxyPayload := manager.ManagerPayload{
IsRelay: node.IsRelay == "yes",
InterfaceName: node.Interface,
Peers: peersP,
}
if proxyPayload.IsRelay {
relayedNodes, err := GetRelayedNodes(node)
if err != nil {
logger.Log(1, "failed to relayed nodes: ", node.Name, err.Error())
proxyPayload.IsRelay = false
} else {
relayPeersMap := make(map[string][]wgtypes.PeerConfig)
for _, relayedNode := range relayedNodes {
peers, err := GetPeersForProxy(&relayedNode)
if err == nil {
relayPeersMap[relayedNode.PublicKey] = peers
}
}
proxyPayload.RelayedPeers = relayPeersMap
}
}
ProxyMgmChan <- &manager.ManagerAction{ ProxyMgmChan <- &manager.ManagerAction{
Action: manager.AddInterface, Action: manager.AddInterface,
Payload: proxyPayload, Payload: proxyPayload,

View file

@ -1,8 +1,7 @@
package models package models
import ( import (
"net" "github.com/gravitl/netmaker/nm-proxy/manager"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
@ -14,8 +13,7 @@ type PeerUpdate struct {
Peers []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"` Peers []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"`
DNS string `json:"dns" bson:"dns" yaml:"dns"` DNS string `json:"dns" bson:"dns" yaml:"dns"`
PeerIDs PeerMap `json:"peerids" bson:"peerids" yaml:"peerids"` PeerIDs PeerMap `json:"peerids" bson:"peerids" yaml:"peerids"`
IsRelayed bool `json:"is_relayed" bson:"is_relayed" yaml:"is_relayed"` ProxyUpdate manager.ManagerAction `josn:"proxy_update"`
RelayTo *net.UDPAddr `json:"relay_to" bson:"relay_to" yaml:"relay_to"`
} }
// KeyUpdate - key update struct // KeyUpdate - key update struct

View file

@ -3,7 +3,6 @@ package mq
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net"
"time" "time"
mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang"
@ -12,9 +11,7 @@ import (
"github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/netclient/ncutils"
"github.com/gravitl/netmaker/nm-proxy/manager"
"github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/servercfg"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
// DefaultHandler default message queue handler -- NOT USED // DefaultHandler default message queue handler -- NOT USED
@ -104,22 +101,6 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
if err = PublishPeerUpdate(&currentNode, true); err != nil { if err = PublishPeerUpdate(&currentNode, true); err != nil {
logger.Log(0, "error updating peers when node", currentNode.Name, currentNode.ID, "informed the server of an interface change", err.Error()) logger.Log(0, "error updating peers when node", currentNode.Name, currentNode.ID, "informed the server of an interface change", err.Error())
} }
pubKey, wgErr := wgtypes.ParseKey(newNode.PublicKey)
endpoint, udpErr := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", newNode.Endpoint, newNode.LocalListenPort))
if wgErr == nil && udpErr == nil {
logic.ProxyMgmChan <- &manager.ManagerAction{
Action: manager.UpdatePeer,
Payload: manager.ManagerPayload{
InterfaceName: newNode.Interface,
Peers: []wgtypes.PeerConfig{
{
PublicKey: pubKey,
Endpoint: endpoint,
},
},
},
}
}
} }
logger.Log(1, "updated node", id, newNode.Name) logger.Log(1, "updated node", id, newNode.Name)

View file

@ -26,7 +26,10 @@ func PublishPeerUpdate(newNode *models.Node, publishToSelf bool) error {
return err return err
} }
for _, node := range networkNodes { for _, node := range networkNodes {
// err := PublishProxyUpdate(manager.AddInterface, &node)
// if err != nil {
// logger.Log(1, "failed to publish proxy update to node", node.Name, "on network", node.Network, ":", err.Error())
// }
if node.IsServer == "yes" { if node.IsServer == "yes" {
continue continue
} }
@ -38,16 +41,40 @@ func PublishPeerUpdate(newNode *models.Node, publishToSelf bool) error {
if err != nil { if err != nil {
logger.Log(1, "failed to publish peer update to node", node.Name, "on network", node.Network, ":", err.Error()) logger.Log(1, "failed to publish peer update to node", node.Name, "on network", node.Network, ":", err.Error())
} }
} }
return err return err
} }
func PublishProxyUpdate(action manager.ProxyAction, node *models.Node) error {
peerUpdates, err := logic.GetPeersForProxy(node, false)
if err != nil {
return err
}
err = ProxyUpdate(&manager.ManagerAction{
Action: action,
Payload: peerUpdates}, node)
if err != nil {
logger.Log(1, "failed to send proxy update: ", err.Error())
return err
}
return nil
}
// PublishSinglePeerUpdate --- determines and publishes a peer update to one node // PublishSinglePeerUpdate --- determines and publishes a peer update to one node
func PublishSinglePeerUpdate(node *models.Node) error { func PublishSinglePeerUpdate(node *models.Node) error {
peerUpdate, err := logic.GetPeerUpdate(node) peerUpdate, err := logic.GetPeerUpdate(node)
if err != nil { if err != nil {
return err return err
} }
proxyUpdate, err := logic.GetPeersForProxy(node, false)
if err != nil {
return err
}
peerUpdate.ProxyUpdate = manager.ManagerAction{
Action: manager.AddInterface,
Payload: proxyUpdate,
}
data, err := json.Marshal(&peerUpdate) data, err := json.Marshal(&peerUpdate)
if err != nil { if err != nil {
return err return err
@ -104,10 +131,14 @@ func NodeUpdate(node *models.Node) error {
logger.Log(2, "error publishing node update to peer ", node.ID, err.Error()) logger.Log(2, "error publishing node update to peer ", node.ID, err.Error())
return err return err
} }
err = PublishProxyUpdate(manager.AddInterface, node)
if err != nil {
logger.Log(1, "failed to publish proxy update to node", node.Name, "on network", node.Network, ":", err.Error())
}
return nil return nil
} }
//ProxyUpdate -- publishes updates related to proxy //ProxyUpdate -- publishes updates to peers related to proxy
func ProxyUpdate(proxyPayload *manager.ManagerAction, node *models.Node) error { func ProxyUpdate(proxyPayload *manager.ManagerAction, node *models.Node) error {
if !servercfg.IsMessageQueueBackend() { if !servercfg.IsMessageQueueBackend() {
return nil return nil
@ -122,7 +153,7 @@ func ProxyUpdate(proxyPayload *manager.ManagerAction, node *models.Node) error {
logger.Log(2, "error marshalling node update ", err.Error()) logger.Log(2, "error marshalling node update ", err.Error())
return err return err
} }
if err = publish(node, fmt.Sprintf("update/proxy/%s/%s", node.Network, node.ID), data); err != nil { if err = publish(node, fmt.Sprintf("proxy/%s/%s", node.Network, node.ID), data); err != nil {
logger.Log(2, "error publishing node update to peer ", node.ID, err.Error()) logger.Log(2, "error publishing node update to peer ", node.ID, err.Error())
return err return err
} }
@ -188,6 +219,7 @@ func sendPeers() {
if errN != nil { if errN != nil {
logger.Log(1, errN.Error()) logger.Log(1, errN.Error())
} }
serverctl.SyncServerNetworkWithProxy()
} }
} }
} }

View file

@ -25,8 +25,8 @@ import (
"github.com/gravitl/netmaker/netclient/local" "github.com/gravitl/netmaker/netclient/local"
"github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/netclient/ncutils"
"github.com/gravitl/netmaker/netclient/wireguard" "github.com/gravitl/netmaker/netclient/wireguard"
nmproxy "github.com/gravitl/netmaker/nm-proxy" nmproxy "github.com/gravitl/netmaker/nm-proxy"
"github.com/gravitl/netmaker/nm-proxy/common"
"github.com/gravitl/netmaker/nm-proxy/manager" "github.com/gravitl/netmaker/nm-proxy/manager"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
@ -158,12 +158,12 @@ func startGoRoutines(wg *sync.WaitGroup) context.CancelFunc {
} }
func GetNodeInfo(cfg *config.ClientConfig) (models.NodeGet, error) { func GetNodeInfo(cfg *config.ClientConfig) (models.NodeGet, error) {
var nodeGET models.NodeGet var nodeGET models.NodeGet
token, err := common.Authenticate(cfg) token, err := Authenticate(cfg)
if err != nil { if err != nil {
return nodeGET, err return nodeGET, err
} }
url := fmt.Sprintf("https://%s/api/nodes/%s/%s", cfg.Server.API, cfg.Network, cfg.Node.ID) url := fmt.Sprintf("https://%s/api/nodes/%s/%s", cfg.Server.API, cfg.Network, cfg.Node.ID)
response, err := common.API("", http.MethodGet, url, token) response, err := API("", http.MethodGet, url, token)
if err != nil { if err != nil {
return nodeGET, err return nodeGET, err
} }
@ -217,7 +217,7 @@ func setSubscriptions(client mqtt.Client, nodeCfg *config.ClientConfig) {
} }
return return
} }
if token := client.Subscribe(fmt.Sprintf("update/proxy/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID), 0, mqtt.MessageHandler(ProxyUpdate)); token.WaitTimeout(mq.MQ_TIMEOUT*time.Second) && token.Error() != nil { if token := client.Subscribe(fmt.Sprintf("proxy/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID), 0, mqtt.MessageHandler(ProxyUpdate)); token.WaitTimeout(mq.MQ_TIMEOUT*time.Second) && token.Error() != nil {
if token.Error() == nil { if token.Error() == nil {
logger.Log(0, "network:", nodeCfg.Node.Network, "connection timeout") logger.Log(0, "network:", nodeCfg.Node.Network, "connection timeout")
} else { } else {

View file

@ -34,7 +34,7 @@ var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
func ProxyUpdate(client mqtt.Client, msg mqtt.Message) { func ProxyUpdate(client mqtt.Client, msg mqtt.Message) {
var nodeCfg config.ClientConfig var nodeCfg config.ClientConfig
var proxyUpdate manager.ManagerAction var proxyUpdate manager.ManagerAction
var network = strings.Split(msg.Topic(), "/")[2] var network = parseNetworkFromTopic(msg.Topic())
nodeCfg.Network = network nodeCfg.Network = network
nodeCfg.ReadConfig() nodeCfg.ReadConfig()
@ -165,12 +165,7 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
// } // }
// } // }
// } // }
ProxyMgmChan <- &manager.ManagerAction{
Action: manager.AddInterface,
Payload: manager.ManagerPayload{
IsRelayed: newNode.IsRelay == "yes",
},
}
if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers
doneErr := publishSignal(&nodeCfg, ncutils.DONE) doneErr := publishSignal(&nodeCfg, ncutils.DONE)
if doneErr != nil { if doneErr != nil {
@ -273,15 +268,7 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
logger.Log(0, "error syncing wg after peer update: "+err.Error()) logger.Log(0, "error syncing wg after peer update: "+err.Error())
return return
} }
ProxyMgmChan <- &manager.ManagerAction{ ProxyMgmChan <- &peerUpdate.ProxyUpdate
Action: manager.AddInterface,
Payload: manager.ManagerPayload{
InterfaceName: cfg.Node.Interface,
Peers: peerUpdate.Peers,
IsRelayed: peerUpdate.IsRelayed,
RelayedTo: peerUpdate.RelayTo,
},
}
logger.Log(0, "network:", cfg.Node.Network, "received peer update for node "+cfg.Node.Name+" "+cfg.Node.Network) logger.Log(0, "network:", cfg.Node.Network, "received peer update for node "+cfg.Node.Name+" "+cfg.Node.Network)
if cfg.Node.DNSOn == "yes" { if cfg.Node.DNSOn == "yes" {
if err := setHostDNS(peerUpdate.DNS, cfg.Node.Interface, ncutils.IsWindows()); err != nil { if err := setHostDNS(peerUpdate.DNS, cfg.Node.Interface, ncutils.IsWindows()); err != nil {
@ -294,6 +281,7 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
return return
} }
} }
_ = UpdateLocalListenPort(&cfg) _ = UpdateLocalListenPort(&cfg)
} }

View file

@ -1,22 +1,11 @@
package common package common
import ( import (
"bytes"
"context" "context"
"encoding/json"
"fmt"
"io"
"log" "log"
"net" "net"
"net/http"
"os"
"os/exec" "os/exec"
"strings" "strings"
"time"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/netclient/config"
"github.com/gravitl/netmaker/netclient/ncutils"
"github.com/gravitl/netmaker/nm-proxy/wg" "github.com/gravitl/netmaker/nm-proxy/wg"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
@ -94,63 +83,3 @@ func RunCmd(command string, printerr bool) (string, error) {
} }
return string(out), err return string(out), err
} }
// API function to interact with netmaker api endpoints. response from endpoint is returned
func API(data interface{}, method, url, authorization string) (*http.Response, error) {
var request *http.Request
var err error
if data != "" {
payload, err := json.Marshal(data)
if err != nil {
return nil, fmt.Errorf("error encoding data %w", err)
}
request, err = http.NewRequest(method, url, bytes.NewBuffer(payload))
if err != nil {
return nil, fmt.Errorf("error creating http request %w", err)
}
request.Header.Set("Content-Type", "application/json")
} else {
request, err = http.NewRequest(method, url, nil)
if err != nil {
return nil, fmt.Errorf("error creating http request %w", err)
}
}
if authorization != "" {
request.Header.Set("authorization", "Bearer "+authorization)
}
request.Header.Set("requestfrom", "node")
var httpClient http.Client
httpClient.Timeout = time.Minute
return httpClient.Do(request)
}
// Authenticate authenticates with api to permit subsequent interactions with the api
func Authenticate(cfg *config.ClientConfig) (string, error) {
pass, err := os.ReadFile(ncutils.GetNetclientPathSpecific() + "secret-" + cfg.Network)
if err != nil {
return "", fmt.Errorf("could not read secrets file %w", err)
}
data := models.AuthParams{
MacAddress: cfg.Node.MacAddress,
ID: cfg.Node.ID,
Password: string(pass),
}
url := "https://" + cfg.Server.API + "/api/nodes/adm/" + cfg.Network + "/authenticate"
response, err := API(data, http.MethodPost, url, "")
if err != nil {
return "", err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
bodybytes, _ := io.ReadAll(response.Body)
return "", fmt.Errorf("failed to authenticate %s %s", response.Status, string(bodybytes))
}
resp := models.SuccessResponse{}
if err := json.NewDecoder(response.Body).Decode(&resp); err != nil {
return "", fmt.Errorf("error decoding respone %w", err)
}
tokenData := resp.Response.(map[string]interface{})
token := tokenData["AuthToken"]
return token.(string), nil
}

View file

@ -8,7 +8,6 @@ import (
"net" "net"
"runtime" "runtime"
"github.com/gravitl/netmaker/netclient/wireguard"
"github.com/gravitl/netmaker/nm-proxy/common" "github.com/gravitl/netmaker/nm-proxy/common"
peerpkg "github.com/gravitl/netmaker/nm-proxy/peer" peerpkg "github.com/gravitl/netmaker/nm-proxy/peer"
"github.com/gravitl/netmaker/nm-proxy/wg" "github.com/gravitl/netmaker/nm-proxy/wg"
@ -18,12 +17,17 @@ import (
type ProxyAction string type ProxyAction string
type ManagerPayload struct { type ManagerPayload struct {
InterfaceName string InterfaceName string `json:"interface_name"`
Peers []wgtypes.PeerConfig Peers []wgtypes.PeerConfig `json:"peers"`
IsRelayed bool PeerMap map[string]PeerConf `json:"peer_map"`
RelayedTo *net.UDPAddr IsRelayed bool `json:"is_relayed"`
IsRelay bool RelayedTo *net.UDPAddr `json:"relayed_to"`
RelayedPeers map[string][]wgtypes.PeerConfig IsRelay bool `json:"is_relay"`
RelayedPeers map[string][]wgtypes.PeerConfig `json:"relayed_peers"`
}
type PeerConf struct {
IsRelayed bool `json:"is_relayed"`
RelayedTo *net.UDPAddr `json:"relayed_to"`
} }
const ( const (
@ -48,6 +52,10 @@ func StartProxyManager(manageChan chan *ManagerAction) {
log.Printf("-------> PROXY-MANAGER: %+v\n", mI) log.Printf("-------> PROXY-MANAGER: %+v\n", mI)
switch mI.Action { switch mI.Action {
case AddInterface: case AddInterface:
common.IsRelay = mI.Payload.IsRelay
if mI.Payload.IsRelay {
mI.RelayPeers()
}
err := mI.AddInterfaceToProxy() err := mI.AddInterfaceToProxy()
if err != nil { if err != nil {
log.Printf("failed to add interface: [%s] to proxy: %v\n ", mI.Payload.InterfaceName, err) log.Printf("failed to add interface: [%s] to proxy: %v\n ", mI.Payload.InterfaceName, err)
@ -161,7 +169,7 @@ func (m *ManagerAction) AddInterfaceToProxy() error {
ifaceName := m.Payload.InterfaceName ifaceName := m.Payload.InterfaceName
log.Println("--------> IFACE: ", ifaceName) log.Println("--------> IFACE: ", ifaceName)
if runtime.GOOS == "darwin" { if runtime.GOOS == "darwin" {
ifaceName, err = wireguard.GetRealIface(ifaceName) ifaceName, err = wg.GetRealIface(ifaceName)
if err != nil { if err != nil {
log.Println("failed to get real iface: ", err) log.Println("failed to get real iface: ", err)
} }

View file

@ -1,9 +1,13 @@
package wg package wg
import ( import (
"errors"
"fmt" "fmt"
"log" "log"
"net" "net"
"os"
"os/exec"
"strings"
"sync" "sync"
"time" "time"
@ -14,7 +18,7 @@ import (
const ( const (
DefaultMTU = 1280 DefaultMTU = 1280
DefaultWgPort = 51820 DefaultWgPort = 51820
DefaultWgKeepAlive = 25 * time.Second DefaultWgKeepAlive = 20 * time.Second
) )
// WGIface represents a interface instance // WGIface represents a interface instance
@ -103,7 +107,6 @@ func parseAddress(address string) (WGAddress, error) {
} }
// UpdatePeer updates existing Wireguard Peer or creates a new one if doesn't exist // UpdatePeer updates existing Wireguard Peer or creates a new one if doesn't exist
// Endpoint is optional
func (w *WGIface) UpdatePeer(peerKey string, allowedIps []net.IPNet, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error { func (w *WGIface) UpdatePeer(peerKey string, allowedIps []net.IPNet, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error {
w.mu.Lock() w.mu.Lock()
defer w.mu.Unlock() defer w.mu.Unlock()
@ -122,8 +125,8 @@ func (w *WGIface) UpdatePeer(peerKey string, allowedIps []net.IPNet, keepAlive t
} }
peer := wgtypes.PeerConfig{ peer := wgtypes.PeerConfig{
PublicKey: peerKeyParsed, PublicKey: peerKeyParsed,
ReplaceAllowedIPs: true, // ReplaceAllowedIPs: true,
AllowedIPs: allowedIps, // AllowedIPs: allowedIps,
PersistentKeepaliveInterval: &keepAlive, PersistentKeepaliveInterval: &keepAlive,
PresharedKey: preSharedKey, PresharedKey: preSharedKey,
Endpoint: endpoint, Endpoint: endpoint,
@ -176,3 +179,58 @@ func (w *WGIface) GetListenPort() (*int, error) {
return &d.ListenPort, nil return &d.ListenPort, nil
} }
// GetRealIface - retrieves tun iface based on reference iface name from config file
func GetRealIface(iface string) (string, error) {
RunCmd("wg show interfaces", false)
ifacePath := "/var/run/wireguard/" + iface + ".name"
if !(FileExists(ifacePath)) {
return "", errors.New(ifacePath + " does not exist")
}
realIfaceName, err := GetFileAsString(ifacePath)
if err != nil {
return "", err
}
realIfaceName = strings.TrimSpace(realIfaceName)
if !(FileExists(fmt.Sprintf("/var/run/wireguard/%s.sock", realIfaceName))) {
return "", errors.New("interface file does not exist")
}
return realIfaceName, nil
}
// FileExists - checks if file exists locally
func FileExists(f string) bool {
info, err := os.Stat(f)
if os.IsNotExist(err) {
return false
}
if err != nil && strings.Contains(err.Error(), "not a directory") {
return false
}
if err != nil {
log.Println(0, "error reading file: "+f+", "+err.Error())
}
return !info.IsDir()
}
// GetFileAsString - returns the string contents of a given file
func GetFileAsString(path string) (string, error) {
content, err := os.ReadFile(path)
if err != nil {
return "", err
}
return string(content), err
}
// RunCmd - runs a local command
func RunCmd(command string, printerr bool) (string, error) {
args := strings.Fields(command)
cmd := exec.Command(args[0], args[1:]...)
cmd.Wait()
out, err := cmd.CombinedOutput()
if err != nil && printerr {
log.Println("error running command: ", command)
log.Println(strings.TrimSuffix(string(out), "\n"))
}
return string(out), err
}

View file

@ -14,7 +14,6 @@ import (
"github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/netclient/ncutils"
"github.com/gravitl/netmaker/nm-proxy/manager" "github.com/gravitl/netmaker/nm-proxy/manager"
"github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/servercfg"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
const ( const (
@ -83,33 +82,12 @@ func SyncServerNetworkWithProxy() error {
logger.Log(1, "failed to retrieve local server node: ", serverNode.ID) logger.Log(1, "failed to retrieve local server node: ", serverNode.ID)
continue continue
} }
peers, err := logic.GetPeersForProxy(&serverNode) proxyPayload, err := logic.GetPeersForProxy(&serverNode, false)
if err != nil && !ncutils.IsEmptyRecord(err) { if err != nil && !ncutils.IsEmptyRecord(err) {
logger.Log(1, "failed to retrieve peers for server node: ", serverNode.ID) logger.Log(1, "failed to retrieve peers for server node: ", serverNode.ID)
continue continue
} }
logger.Log(0, "----> HEREEEEEEEE1") logger.Log(0, "----> HEREEEEEEEE1")
proxyPayload := manager.ManagerPayload{
IsRelay: serverNode.IsRelay == "yes",
InterfaceName: serverNode.Interface,
Peers: peers,
}
if proxyPayload.IsRelay {
relayedNodes, err := logic.GetRelayedNodes(&serverNode)
if err != nil {
logger.Log(1, "failed to relayed nodes: ", serverNode.Name, err.Error())
proxyPayload.IsRelay = false
} else {
relayPeersMap := make(map[string][]wgtypes.PeerConfig)
for _, relayedNode := range relayedNodes {
peers, err := logic.GetPeersForProxy(&relayedNode)
if err == nil {
relayPeersMap[relayedNode.PublicKey] = peers
}
}
proxyPayload.RelayedPeers = relayPeersMap
}
}
logic.ProxyMgmChan <- &manager.ManagerAction{ logic.ProxyMgmChan <- &manager.ManagerAction{
Action: manager.AddInterface, Action: manager.AddInterface,
Payload: proxyPayload, Payload: proxyPayload,