Merge pull request #1905 from gravitl/netclient_refactor_peer_updates

host based peer updates
This commit is contained in:
dcarns 2023-01-09 10:36:31 -05:00 committed by GitHub
commit 48af36a1b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 327 additions and 146 deletions

View file

@ -7,7 +7,6 @@ import (
"net/http"
"strings"
"github.com/google/uuid"
"github.com/gorilla/mux"
proxy_models "github.com/gravitl/netclient/nmproxy/models"
"github.com/gravitl/netmaker/database"
@ -442,6 +441,13 @@ func getNode(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return
}
hostPeerUpdate, err := logic.GetPeerUpdateForHost(host)
if err != nil && !database.IsEmptyRecord(err) {
logger.Log(0, r.Header.Get("user"),
fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err))
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return
}
server := servercfg.GetServerInfo()
network, err := logic.GetNetwork(node.Network)
if err != nil {
@ -453,7 +459,9 @@ func getNode(w http.ResponseWriter, r *http.Request) {
legacy := node.Legacy(host, &server, &network)
response := models.NodeGet{
Node: *legacy,
Host: *host,
Peers: peerUpdate.Peers,
HostPeers: hostPeerUpdate.Peers,
ServerConfig: server,
PeerIDs: peerUpdate.PeerIDs,
}
@ -637,19 +645,18 @@ func createNode(w http.ResponseWriter, r *http.Request) {
return
}
}
peerUpdate, err := logic.GetPeerUpdate(&data.Node, &data.Host)
hostPeerUpdate, err := logic.GetPeerUpdateForHost(&data.Host)
if err != nil && !database.IsEmptyRecord(err) {
logger.Log(0, r.Header.Get("user"),
fmt.Sprintf("error fetching wg peers config for node [ %s ]: %v", data.Node.ID.String(), err))
fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", data.Host.ID.String(), err))
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return
}
data.Node.Peers = peerUpdate.Peers
response := models.NodeJoinResponse{
Node: data.Node,
ServerConfig: server,
PeerIDs: peerUpdate.PeerIDs,
Host: data.Host,
Peers: hostPeerUpdate.Peers,
}
logger.Log(1, r.Header.Get("user"), "created new node", data.Host.Name, "on network", networkName)
w.WriteHeader(http.StatusOK)
@ -1061,28 +1068,14 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
}, &node)
}
if fromNode {
// check if server should be removed from mq
// err is irrelevent
nodes, _ := logic.GetAllNodes()
var foundNode models.Node
for _, nodetocheck := range nodes {
if nodetocheck.HostID == node.HostID {
foundNode = nodetocheck
break
}
}
// TODO: Address how to remove host
if foundNode.HostID != uuid.Nil {
if err = logic.DissasociateNodeFromHost(&foundNode, host); err == nil {
currNets := logic.GetHostNetworks(host.ID.String())
if len(currNets) > 0 {
mq.ModifyClient(&mq.MqClient{
ID: host.ID.String(),
Text: host.Name,
Networks: currNets,
})
}
}
// update networks for host mq client
currNets := logic.GetHostNetworks(host.ID.String())
if len(currNets) > 0 {
mq.ModifyClient(&mq.MqClient{
ID: host.ID.String(),
Text: host.Name,
Networks: currNets,
})
}
}
logic.ReturnSuccessResponse(w, r, nodeid+" deleted.")
@ -1091,12 +1084,11 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
runUpdates(&node, false)
return
}
go func() {
if err := mq.PublishPeerUpdate(node.Network, false); err != nil {
go func(network string) {
if err := mq.PublishPeerUpdate(network, false); err != nil {
logger.Log(1, "error publishing peer update ", err.Error())
return
}
}()
}(node.Network)
}

View file

@ -48,9 +48,9 @@ func GetNetworkNodes(network string) ([]models.Node, error) {
// UpdateNode - takes a node and updates another node with it's values
func UpdateNode(currentNode *models.Node, newNode *models.Node) error {
if newNode.Address.String() != currentNode.Address.String() {
if newNode.Address.IP.String() != currentNode.Address.IP.String() {
if network, err := GetParentNetwork(newNode.Network); err == nil {
if !IsAddressInCIDR(newNode.Address.String(), network.AddressRange) {
if !IsAddressInCIDR(newNode.Address.IP.String(), network.AddressRange) {
return fmt.Errorf("invalid address provided; out of network range for node %s", newNode.ID)
}
}

View file

@ -193,6 +193,127 @@ func GetPeersForProxy(node *models.Node, onlyPeers bool) (proxy_models.ProxyMana
return proxyPayload, nil
}
// GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
func GetPeerUpdateForHost(host *models.Host) (models.HostPeerUpdate, error) {
hostPeerUpdate := models.HostPeerUpdate{
Network: make(map[string]models.NetworkInfo),
PeerIDs: make(models.HostPeerMap),
ServerVersion: servercfg.GetVersion(),
ServerAddrs: []models.ServerAddr{},
}
log.Println("peer update for host ", host.ID.String())
peerIndexMap := make(map[string]int)
for _, nodeID := range host.Nodes {
node, err := GetNodeByID(nodeID)
if err != nil {
continue
}
if !node.Connected {
continue
}
hostPeerUpdate.Network[node.Network] = models.NetworkInfo{
DNS: getPeerDNS(node.Network),
}
currentPeers, err := GetNetworkNodes(node.Network)
if err != nil {
log.Println("no network nodes")
return models.HostPeerUpdate{}, err
}
for _, peer := range currentPeers {
if peer.ID == node.ID {
log.Println("peer update, skipping self")
//skip yourself
continue
}
var peerConfig wgtypes.PeerConfig
peerHost, err := GetHost(peer.HostID.String())
if err != nil {
log.Println("no peer host", err)
return models.HostPeerUpdate{}, err
}
if !peer.Connected {
log.Println("peer update, skipping unconnected node")
//skip unconnected nodes
continue
}
if !nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) {
log.Println("peer update, skipping node for acl")
//skip if not permitted by acl
continue
}
peerConfig.PublicKey = peerHost.PublicKey
peerConfig.PersistentKeepaliveInterval = &peer.PersistentKeepalive
peerConfig.ReplaceAllowedIPs = true
uselocal := false
if host.EndpointIP.String() == peerHost.EndpointIP.String() {
//peer is on same network
// set to localaddress
uselocal = true
if node.LocalAddress.IP == nil {
// use public endpint
uselocal = false
}
if node.LocalAddress.String() == peer.LocalAddress.String() {
uselocal = false
}
}
peerConfig.Endpoint = &net.UDPAddr{
IP: peerHost.EndpointIP,
Port: peerHost.ListenPort,
}
if !host.ProxyEnabled && peerHost.ProxyEnabled {
peerConfig.Endpoint.Port = peerHost.ProxyListenPort
}
if uselocal {
peerConfig.Endpoint.IP = peer.LocalAddress.IP
}
allowedips := getNodeAllowedIPs(&peer, &node)
if peer.IsIngressGateway {
for _, entry := range peer.IngressGatewayRange {
_, cidr, err := net.ParseCIDR(string(entry))
if err == nil {
allowedips = append(allowedips, *cidr)
}
}
}
if peer.IsRelay {
allowedips = append(allowedips, getRelayAllowedIPs(&node, &peer)...)
}
if peer.IsEgressGateway {
allowedips = append(allowedips, getEgressIPs(&node, &peer)...)
}
peerConfig.AllowedIPs = allowedips
if _, ok := hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()]; !ok {
hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = make(map[string]models.IDandAddr)
hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
ID: peer.ID.String(),
Address: peer.PrimaryAddress(),
Name: peerHost.Name,
Network: peer.Network,
}
} else {
peerAllowedIPs := hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs
peerAllowedIPs = append(peerAllowedIPs, allowedips...)
hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs = peerAllowedIPs
hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
ID: peer.ID.String(),
Address: peer.PrimaryAddress(),
Name: peerHost.Name,
Network: peer.Network,
}
}
}
}
return hostPeerUpdate, nil
}
// GetPeerUpdate - gets a wireguard peer config for each peer of a node
func GetPeerUpdate(node *models.Node, host *models.Host) (models.PeerUpdate, error) {
log.Println("peer update for node ", node.ID)
@ -731,6 +852,7 @@ func getPeerDNS(network string) string {
host, err := GetHost(node.HostID.String())
if err != nil {
logger.Log(0, "error retrieving host for node", node.ID.String(), err.Error())
continue
}
dns = dns + fmt.Sprintf("%s %s.%s\n", nodes[i].Address, host.Name, nodes[i].Network)
}

View file

@ -34,7 +34,8 @@ func CheckZombies(newnode *models.Node, mac net.HardwareAddr) {
for _, node := range nodes {
host, err := GetHost(node.HostID.String())
if err != nil {
// should we delete the node if host not found ??
continue
}
if host.MacAddress.String() == mac.String() {
logger.Log(0, "adding ", node.ID.String(), " to zombie list")

View file

@ -59,7 +59,6 @@ func (a *ApiNode) ConvertToServerNode(currentNode *Node) *Node {
convertedNode.IsRelay = a.IsRelay
convertedNode.IsRelayed = a.IsRelayed
convertedNode.PendingDelete = a.PendingDelete
convertedNode.Peers = currentNode.Peers
convertedNode.Failover = a.Failover
convertedNode.IsEgressGateway = a.IsEgressGateway
convertedNode.IsIngressGateway = a.IsIngressGateway

View file

@ -37,11 +37,15 @@ type IDandAddr struct {
Address string `json:"address" bson:"address" yaml:"address"`
Name string `json:"name" bson:"name" yaml:"name"`
IsServer string `json:"isserver" bson:"isserver" yaml:"isserver" validate:"checkyesorno"`
Network string `json:"network" bson:"network" yaml:"network" validate:"network"`
}
// PeerMap - peer map for ids and addresses in metrics
type PeerMap map[string]IDandAddr
// HostPeerMap - host peer map for ids and addresses
type HostPeerMap map[string]map[string]IDandAddr
// MetricsMap - map for holding multiple metrics in memory
type MetricsMap map[string]Metrics

View file

@ -16,6 +16,21 @@ type PeerUpdate struct {
ProxyUpdate proxy_models.ProxyManagerPayload `json:"proxy_update" bson:"proxy_update" yaml:"proxy_update"`
}
// HostPeerUpdate - struct for host peer updates
type HostPeerUpdate struct {
ServerVersion string `json:"serverversion" bson:"serverversion" yaml:"serverversion"`
ServerAddrs []ServerAddr `json:"serveraddrs" bson:"serveraddrs" yaml:"serveraddrs"`
Network map[string]NetworkInfo `json:"network" bson:"network" yaml:"network"`
Peers []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"`
PeerIDs HostPeerMap `json:"peerids" bson:"peerids" yaml:"peerids"`
ProxyUpdate proxy_models.ProxyManagerPayload `json:"proxy_update" bson:"proxy_update" yaml:"proxy_update"`
}
// NetworkInfo - struct for network info
type NetworkInfo struct {
DNS string `json:"dns" bson:"dns" yaml:"dns"`
}
// KeyUpdate - key update struct
type KeyUpdate struct {
Network string `json:"network" bson:"network"`

View file

@ -56,26 +56,25 @@ type Iface struct {
// CommonNode - represents a commonn node data elements shared by netmaker and netclient
type CommonNode struct {
ID uuid.UUID `json:"id" yaml:"id"`
HostID uuid.UUID `json:"hostid" yaml:"hostid"`
Network string `json:"network" yaml:"network"`
NetworkRange net.IPNet `json:"networkrange" yaml:"networkrange"`
NetworkRange6 net.IPNet `json:"networkrange6" yaml:"networkrange6"`
InternetGateway *net.UDPAddr `json:"internetgateway" yaml:"internetgateway"`
Server string `json:"server" yaml:"server"`
Connected bool `json:"connected" yaml:"connected"`
Address net.IPNet `json:"address" yaml:"address"`
Address6 net.IPNet `json:"address6" yaml:"address6"`
PostUp string `json:"postup" yaml:"postup"`
PostDown string `json:"postdown" yaml:"postdown"`
Action string `json:"action" yaml:"action"`
LocalAddress net.IPNet `json:"localaddress" yaml:"localaddress"`
IsLocal bool `json:"islocal" yaml:"islocal"`
IsEgressGateway bool `json:"isegressgateway" yaml:"isegressgateway"`
IsIngressGateway bool `json:"isingressgateway" yaml:"isingressgateway"`
DNSOn bool `json:"dnson" yaml:"dnson"`
PersistentKeepalive time.Duration `json:"persistentkeepalive" yaml:"persistentkeepalive"`
Peers []wgtypes.PeerConfig `json:"peers" yaml:"peers"`
ID uuid.UUID `json:"id" yaml:"id"`
HostID uuid.UUID `json:"hostid" yaml:"hostid"`
Network string `json:"network" yaml:"network"`
NetworkRange net.IPNet `json:"networkrange" yaml:"networkrange"`
NetworkRange6 net.IPNet `json:"networkrange6" yaml:"networkrange6"`
InternetGateway *net.UDPAddr `json:"internetgateway" yaml:"internetgateway"`
Server string `json:"server" yaml:"server"`
Connected bool `json:"connected" yaml:"connected"`
Address net.IPNet `json:"address" yaml:"address"`
Address6 net.IPNet `json:"address6" yaml:"address6"`
PostUp string `json:"postup" yaml:"postup"`
PostDown string `json:"postdown" yaml:"postdown"`
Action string `json:"action" yaml:"action"`
LocalAddress net.IPNet `json:"localaddress" yaml:"localaddress"`
IsLocal bool `json:"islocal" yaml:"islocal"`
IsEgressGateway bool `json:"isegressgateway" yaml:"isegressgateway"`
IsIngressGateway bool `json:"isingressgateway" yaml:"isingressgateway"`
DNSOn bool `json:"dnson" yaml:"dnson"`
PersistentKeepalive time.Duration `json:"persistentkeepalive" yaml:"persistentkeepalive"`
}
// Node - a model of a network node
@ -365,7 +364,7 @@ func (node *LegacyNode) SetDefaultFailover() {
// Node.Fill - fills other node data into calling node data if not set on calling node
func (newNode *Node) Fill(currentNode *Node) { // TODO add new field for nftables present
newNode.ID = currentNode.ID
newNode.HostID = currentNode.HostID
// Revisit the logic for boolean values
// TODO ---- !!!!!!!!!!!!!!!!!!!!!!!!!!!!
// TODO ---- !!!!!!!!!!!!!!!!!!!!!!!!!!
@ -435,9 +434,6 @@ func (newNode *Node) Fill(currentNode *Node) { // TODO add new field for nftable
if newNode.Server == "" {
newNode.Server = currentNode.Server
}
if newNode.Connected != currentNode.Connected {
newNode.Connected = currentNode.Connected
}
if newNode.DefaultACL == "" {
newNode.DefaultACL = currentNode.DefaultACL
}
@ -499,17 +495,23 @@ func (ln *LegacyNode) ConvertToNewNode() (*Host, *Node) {
host.HostPass = ln.Password
host.Name = ln.Name
host.ListenPort = int(ln.ListenPort)
_, cidr, _ := net.ParseCIDR(ln.LocalAddress)
_, cidr, _ = net.ParseCIDR(ln.LocalRange)
host.LocalRange = *cidr
if _, cidr, err := net.ParseCIDR(ln.LocalAddress); err == nil {
host.LocalRange = *cidr
} else {
if _, cidr, err := net.ParseCIDR(ln.LocalRange); err == nil {
host.LocalRange = *cidr
}
}
host.LocalListenPort = int(ln.LocalListenPort)
host.ProxyListenPort = int(ln.ProxyListenPort)
host.MTU = int(ln.MTU)
host.PublicKey, _ = wgtypes.ParseKey(ln.PublicKey)
host.MacAddress, _ = net.ParseMAC(ln.MacAddress)
host.TrafficKeyPublic = ln.TrafficKeys.Mine
gateway, _ := net.ResolveUDPAddr("udp", ln.InternetGateway)
host.InternetGateway = *gateway
gateway, err := net.ResolveUDPAddr("udp", ln.InternetGateway)
if err == nil {
host.InternetGateway = *gateway
}
id, _ := uuid.Parse(ln.ID)
host.Nodes = append(host.Nodes, id.String())
host.Interfaces = ln.Interfaces
@ -519,16 +521,26 @@ func (ln *LegacyNode) ConvertToNewNode() (*Host, *Node) {
id, _ := uuid.Parse(ln.ID)
node.ID = id
node.Network = ln.Network
_, cidr, _ := net.ParseCIDR(ln.NetworkSettings.AddressRange)
node.NetworkRange = *cidr
_, cidr, _ = net.ParseCIDR(ln.NetworkSettings.AddressRange6)
node.NetworkRange6 = *cidr
if _, cidr, err := net.ParseCIDR(ln.NetworkSettings.AddressRange); err == nil {
node.NetworkRange = *cidr
}
if _, cidr, err := net.ParseCIDR(ln.NetworkSettings.AddressRange6); err == nil {
node.NetworkRange6 = *cidr
}
node.Server = ln.Server
node.Connected = parseBool(ln.Connected)
_, cidr, _ = net.ParseCIDR(ln.Address)
node.Address = *cidr
_, cidr, _ = net.ParseCIDR(ln.Address6)
node.Address6 = *cidr
if ln.Address != "" {
node.Address = net.IPNet{
IP: net.ParseIP(ln.Address),
Mask: net.CIDRMask(32, 32),
}
}
if ln.Address6 != "" {
node.Address = net.IPNet{
IP: net.ParseIP(ln.Address6),
Mask: net.CIDRMask(128, 128),
}
}
node.PostUp = ln.PostUp
node.PostDown = ln.PostDown
node.Action = ln.Action

View file

@ -205,16 +205,17 @@ type NodeGet struct {
Node LegacyNode `json:"node" bson:"node" yaml:"node"`
Host Host `json:"host" yaml:"host"`
Peers []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"`
HostPeers []wgtypes.PeerConfig `json:"host_peers" bson:"host_peers" yaml:"host_peers"`
ServerConfig ServerConfig `json:"serverconfig" bson:"serverconfig" yaml:"serverconfig"`
PeerIDs PeerMap `json:"peerids,omitempty" bson:"peerids,omitempty" yaml:"peerids,omitempty"`
}
// NodeJoinResponse data returned to node in response to join
type NodeJoinResponse struct {
Node Node `json:"node" bson:"node" yaml:"node"`
Host Host `json:"host" yaml:"host"`
ServerConfig ServerConfig `json:"serverconfig" bson:"serverconfig" yaml:"serverconfig"`
PeerIDs PeerMap `json:"peerids,omitempty" bson:"peerids,omitempty" yaml:"peerids,omitempty"`
Node Node `json:"node" bson:"node" yaml:"node"`
Host Host `json:"host" yaml:"host"`
ServerConfig ServerConfig `json:"serverconfig" bson:"serverconfig" yaml:"serverconfig"`
Peers []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"`
}
// ServerConfig - struct for dealing with the server information for a netclient

View file

@ -13,7 +13,7 @@ func ModifyClient(client *MqClient) error {
roles := []MqDynSecRole{
{
Rolename: HostRole,
Rolename: HostGenericRole,
Priority: -1,
},
}
@ -43,6 +43,7 @@ func ModifyClient(client *MqClient) error {
// DeleteMqClient - removes a client from the DynSec system
func DeleteMqClient(hostID string) error {
deleteHostRole(hostID)
event := MqDynsecPayload{
Commands: []MqDynSecCmd{
{
@ -57,9 +58,17 @@ func DeleteMqClient(hostID string) error {
// CreateMqClient - creates an MQ DynSec client
func CreateMqClient(client *MqClient) error {
err := createHostRole(client.ID)
if err != nil {
return err
}
roles := []MqDynSecRole{
{
Rolename: HostRole,
Rolename: HostGenericRole,
Priority: -1,
},
{
Rolename: getHostRoleName(client.ID),
Priority: -1,
},
}

View file

@ -19,8 +19,8 @@ const (
exporterRole = "exporter"
// constant for node role
NodeRole = "node"
// HostRole constant for host role
HostRole = "host"
// HostGenericRole constant for host role
HostGenericRole = "host"
// const for dynamic security file
dynamicSecurityFile = "dynamic-security.json"
@ -66,7 +66,7 @@ var (
Acls: fetchServerAcls(),
},
{
Rolename: HostRole,
Rolename: HostGenericRole,
Acls: fetchNodeAcls(),
},
exporterMQRole,
@ -169,6 +169,18 @@ func ListClients(client mqtt.Client) (ListClientsData, error) {
return resp, errors.New("resp not found")
}
// fetches host related acls
func fetchHostAcls(hostID string) []Acl {
return []Acl{
{
AclType: "publishClientReceive",
Topic: fmt.Sprintf("peers/host/%s/#", hostID),
Priority: -1,
Allow: true,
},
}
}
// FetchNetworkAcls - fetches network acls
func FetchNetworkAcls(network string) []Acl {
return []Acl{
@ -220,6 +232,20 @@ func DeleteNetworkRole(network string) error {
return publishEventToDynSecTopic(event)
}
func deleteHostRole(hostID string) error {
// Deletes the hostID role from MQ
event := MqDynsecPayload{
Commands: []MqDynSecCmd{
{
Command: DeleteRoleCmd,
RoleName: getHostRoleName(hostID),
},
},
}
return publishEventToDynSecTopic(event)
}
// CreateNetworkRole - createss a network role from DynSec system
func CreateNetworkRole(network string) error {
// Create Role with acls for the network
@ -237,6 +263,27 @@ func CreateNetworkRole(network string) error {
return publishEventToDynSecTopic(event)
}
// creates role for the host with ID.
func createHostRole(hostID string) error {
// Create Role with acls for the host
event := MqDynsecPayload{
Commands: []MqDynSecCmd{
{
Command: CreateRoleCmd,
RoleName: getHostRoleName(hostID),
Textname: "host role with Acls for hosts",
Acls: fetchHostAcls(hostID),
},
},
}
return publishEventToDynSecTopic(event)
}
func getHostRoleName(hostID string) string {
return fmt.Sprintf("host-%s", hostID)
}
// serverAcls - fetches server role related acls
func fetchServerAcls() []Acl {
return []Acl{
@ -252,6 +299,12 @@ func fetchServerAcls() []Acl {
Priority: -1,
Allow: true,
},
{
AclType: "publishClientSend",
Topic: "peers/host/#",
Priority: -1,
Allow: true,
},
{
AclType: "publishClientSend",
Topic: "update/#",

View file

@ -90,19 +90,20 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
return
}
var newNode models.Node
if err := json.Unmarshal(decrypted, &newNode); err != nil {
var oldNode models.LegacyNode
if err := json.Unmarshal(decrypted, &oldNode); err != nil {
logger.Log(1, "error unmarshaling payload ", err.Error())
return
}
ifaceDelta := logic.IfaceDelta(&currentNode, &newNode)
_, newNode := oldNode.ConvertToNewNode()
ifaceDelta := logic.IfaceDelta(&currentNode, newNode)
if servercfg.Is_EE && ifaceDelta {
if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID.String(), currentNode.Network); err != nil {
logger.Log(1, "failed to reset failover list during node update", currentNode.ID.String(), currentNode.Network)
}
}
newNode.SetLastCheckIn()
if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
if err := logic.UpdateNode(&currentNode, newNode); err != nil {
logger.Log(1, "error saving node", err.Error())
return
}
@ -165,9 +166,13 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
if shouldUpdate {
logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
if err = PublishSinglePeerUpdate(&currentNode); err != nil {
logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
host, err := logic.GetHost(currentNode.HostID.String())
if err == nil {
if err = PublishSingleHostUpdate(host); err != nil {
logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
}
}
}
logger.Log(1, "updated node metrics", id)

View file

@ -14,20 +14,21 @@ import (
"github.com/gravitl/netmaker/serverctl"
)
// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node
// PublishPeerUpdate --- determines and publishes a peer update to all the hosts
func PublishPeerUpdate(network string, publishToSelf bool) error {
if !servercfg.IsMessageQueueBackend() {
return nil
}
networkNodes, err := logic.GetNetworkNodes(network)
hosts, err := logic.GetAllHosts()
if err != nil {
logger.Log(1, "err getting Network Nodes", err.Error())
logger.Log(1, "err getting all hosts", err.Error())
return err
}
for _, node := range networkNodes {
err = PublishSinglePeerUpdate(&node)
for _, host := range hosts {
err = PublishSingleHostUpdate(&host)
if err != nil {
logger.Log(1, "failed to publish peer update to node", node.ID.String(), "on network", node.Network, ":", err.Error())
logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
}
}
return err
@ -47,24 +48,20 @@ func PublishProxyPeerUpdate(node *models.Node) error {
return nil
}
// PublishSinglePeerUpdate --- determines and publishes a peer update to one node
func PublishSinglePeerUpdate(node *models.Node) error {
host, err := logic.GetHost(node.HostID.String())
if err != nil {
return nil
}
// PublishSingleHostUpdate --- determines and publishes a peer update to one host
func PublishSingleHostUpdate(host *models.Host) error {
peerUpdate, err := logic.GetPeerUpdate(node, host)
peerUpdate, err := logic.GetPeerUpdateForHost(host)
if err != nil {
return err
}
if host.ProxyEnabled {
proxyUpdate, err := logic.GetPeersForProxy(node, false)
if err != nil {
return err
}
proxyUpdate.Action = proxy_models.AddNetwork
peerUpdate.ProxyUpdate = proxyUpdate
// proxyUpdate, err := logic.GetPeersForProxy(node, false)
// if err != nil {
// return err
// }
// proxyUpdate.Action = proxy_models.AddNetwork
// peerUpdate.ProxyUpdate = proxyUpdate
}
@ -72,36 +69,12 @@ func PublishSinglePeerUpdate(node *models.Node) error {
if err != nil {
return err
}
return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data)
return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
}
// PublishPeerUpdate --- publishes a peer update to all the peers of a node
func PublishExtPeerUpdate(node *models.Node) error {
host, err := logic.GetHost(node.HostID.String())
if err != nil {
return nil
}
if !servercfg.IsMessageQueueBackend() {
return nil
}
peerUpdate, err := logic.GetPeerUpdate(node, host)
if err != nil {
return err
}
data, err := json.Marshal(&peerUpdate)
if err != nil {
return err
}
if host.ProxyEnabled {
proxyUpdate, err := logic.GetPeersForProxy(node, false)
if err == nil {
peerUpdate.ProxyUpdate = proxyUpdate
}
}
if err = publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil {
return err
}
go PublishPeerUpdate(node.Network, false)
return nil
}
@ -126,7 +99,7 @@ func NodeUpdate(node *models.Node) error {
logger.Log(2, "error marshalling node update ", err.Error())
return err
}
if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil {
if err = publish(host, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil {
logger.Log(2, "error publishing node update to peer ", node.ID.String(), err.Error())
return err
}
@ -156,7 +129,7 @@ func ProxyUpdate(proxyPayload *proxy_models.ProxyManagerPayload, node *models.No
logger.Log(2, "error marshalling node update ", err.Error())
return err
}
if err = publish(node, fmt.Sprintf("proxy/%s/%s", node.Network, node.ID), data); err != nil {
if err = publish(host, fmt.Sprintf("proxy/%s/%s", node.Network, node.ID), data); err != nil {
logger.Log(2, "error publishing proxy update to peer ", node.ID.String(), err.Error())
return err
}
@ -166,7 +139,7 @@ func ProxyUpdate(proxyPayload *proxy_models.ProxyManagerPayload, node *models.No
// sendPeers - retrieve networks, send peer ports to all peers
func sendPeers() {
networks, err := logic.GetNetworks()
hosts, err := logic.GetAllHosts()
if err != nil {
logger.Log(1, "error retrieving networks for keepalive", err.Error())
}
@ -191,13 +164,12 @@ func sendPeers() {
//collectServerMetrics(networks[:])
}
for _, network := range networks {
for _, host := range hosts {
if force {
logger.Log(2, "sending scheduled peer update (5 min)")
err = PublishPeerUpdate(network.NetID, false)
err = PublishSingleHostUpdate(&host)
if err != nil {
logger.Log(1, "error publishing udp port updates for network", network.NetID)
logger.Log(1, err.Error())
logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
}
}
}

View file

@ -40,7 +40,7 @@ func decryptMsg(node *models.Node, msg []byte) ([]byte, error) {
return ncutils.DeChunk(msg, nodePubTKey, serverPrivTKey)
}
func encryptMsg(node *models.Node, msg []byte) ([]byte, error) {
func encryptMsg(host *models.Host, msg []byte) ([]byte, error) {
// fetch server public key to be certain hasn't changed in transit
trafficKey, trafficErr := logic.RetrievePrivateTrafficKey()
if trafficErr != nil {
@ -52,10 +52,6 @@ func encryptMsg(node *models.Node, msg []byte) ([]byte, error) {
return nil, err
}
host, err := logic.GetHost(node.HostID.String())
if err != nil {
return nil, err
}
nodePubKey, err := ncutils.ConvertBytesToKey(host.TrafficKeyPublic)
if err != nil {
return nil, err
@ -68,8 +64,8 @@ func encryptMsg(node *models.Node, msg []byte) ([]byte, error) {
return ncutils.Chunk(msg, nodePubKey, serverPrivKey)
}
func publish(node *models.Node, dest string, msg []byte) error {
encrypted, encryptErr := encryptMsg(node, msg)
func publish(host *models.Host, dest string, msg []byte) error {
encrypted, encryptErr := encryptMsg(host, msg)
if encryptErr != nil {
return encryptErr
}