mirror of
https://github.com/gravitl/netmaker.git
synced 2024-11-11 01:54:34 +08:00
Merge pull request #790 from gravitl/bugfix_v0.10.1_client_order
Bugfix v0.10.1 client order
This commit is contained in:
commit
1fc64fb8cf
8 changed files with 73 additions and 31 deletions
|
@ -5,7 +5,6 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gravitl/netmaker/database"
|
||||
|
@ -418,6 +417,7 @@ func createNode(w http.ResponseWriter, r *http.Request) {
|
|||
logger.Log(1, r.Header.Get("user"), "created new node", node.Name, "on network", node.Network)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode(node)
|
||||
runForceServerUpdate(&node)
|
||||
}
|
||||
|
||||
// Takes node out of pending state
|
||||
|
@ -606,11 +606,6 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
//send update to node to be deleted before deleting on server otherwise message cannot be sent
|
||||
node.Action = models.NODE_DELETE
|
||||
if err := mq.NodeUpdate(&node); err != nil {
|
||||
logger.Log(1, "error publishing node update", err.Error())
|
||||
returnErrorResponse(w, r, formatError(err, "internal"))
|
||||
return
|
||||
}
|
||||
|
||||
err = logic.DeleteNodeByID(&node, false)
|
||||
if err != nil {
|
||||
|
@ -621,6 +616,7 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
|
||||
runUpdates(&node, false)
|
||||
runForceServerUpdate(&node)
|
||||
}
|
||||
|
||||
func runUpdates(node *models.Node, ifaceDelta bool) {
|
||||
|
@ -629,27 +625,22 @@ func runUpdates(node *models.Node, ifaceDelta bool) {
|
|||
if err != nil {
|
||||
logger.Log(3, "error occurred on timer,", err.Error())
|
||||
}
|
||||
if err := runServerUpdate(node, ifaceDelta); err != nil {
|
||||
logger.Log(1, "error running server update", err.Error())
|
||||
}
|
||||
// publish node update if not server
|
||||
if err := mq.NodeUpdate(node); err != nil {
|
||||
logger.Log(1, "error publishing node update to node", node.Name, node.ID, err.Error())
|
||||
}
|
||||
|
||||
if err := runServerUpdate(node, ifaceDelta); err != nil {
|
||||
logger.Log(1, "error running server update", err.Error())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// updates local peers for a server on a given node's network
|
||||
func runServerUpdate(node *models.Node, ifaceDelta bool) error {
|
||||
var mutex sync.Mutex
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
if servercfg.IsClientMode() != "on" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !isServer(node) && ifaceDelta {
|
||||
ifaceDelta = false
|
||||
if servercfg.IsClientMode() != "on" || !isServer(node) {
|
||||
return nil
|
||||
}
|
||||
|
||||
currentServerNode, err := logic.GetNetworkServerLocal(node.Network)
|
||||
|
|
|
@ -107,7 +107,7 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object)
|
|||
Type: nodepb.NODE_TYPE,
|
||||
}
|
||||
|
||||
runServerUpdate(&node, true)
|
||||
runForceServerUpdate(&node)
|
||||
|
||||
go func(node *models.Node) {
|
||||
if node.UDPHolePunch == "yes" {
|
||||
|
@ -216,7 +216,7 @@ func (s *NodeServiceServer) DeleteNode(ctx context.Context, req *nodepb.Object)
|
|||
return nil, err
|
||||
}
|
||||
|
||||
runUpdates(&node, false)
|
||||
runForceServerUpdate(&node)
|
||||
|
||||
return &nodepb.Object{
|
||||
Data: "success",
|
||||
|
@ -308,3 +308,18 @@ func getNodeFromRequestData(data string) (models.Node, error) {
|
|||
func isServer(node *models.Node) bool {
|
||||
return node.IsServer == "yes"
|
||||
}
|
||||
|
||||
func runForceServerUpdate(node *models.Node) {
|
||||
go func() {
|
||||
if err := mq.PublishPeerUpdate(node); err != nil {
|
||||
logger.Log(1, "failed a peer update after creation of node", node.Name)
|
||||
}
|
||||
|
||||
var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network)
|
||||
if getErr == nil {
|
||||
if err := logic.ServerUpdate(¤tServerNode, false); err != nil {
|
||||
logger.Log(1, "server node:", currentServerNode.ID, "failed update")
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
|
@ -58,6 +58,7 @@ func IfaceDelta(currentNode *models.Node, newNode *models.Node) bool {
|
|||
newNode.IsRelay != currentNode.IsRelay ||
|
||||
newNode.UDPHolePunch != currentNode.UDPHolePunch ||
|
||||
newNode.IsPending != currentNode.IsPending ||
|
||||
newNode.ListenPort != currentNode.ListenPort ||
|
||||
newNode.MTU != currentNode.MTU ||
|
||||
newNode.PersistentKeepalive != currentNode.PersistentKeepalive ||
|
||||
newNode.DNSOn != currentNode.DNSOn ||
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/gravitl/netmaker/logger"
|
||||
"github.com/gravitl/netmaker/logic"
|
||||
"github.com/gravitl/netmaker/models"
|
||||
"github.com/gravitl/netmaker/netclient/ncutils"
|
||||
)
|
||||
|
||||
// DefaultHandler default message queue handler - only called when GetDebug == true
|
||||
|
@ -96,10 +97,30 @@ func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
|
|||
logger.Log(1, "error getting node ", id, err.Error())
|
||||
return
|
||||
}
|
||||
if err := PublishPeerUpdate(¤tNode); err != nil {
|
||||
logger.Log(1, "error publishing peer update ", err.Error())
|
||||
decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload())
|
||||
if decryptErr != nil {
|
||||
logger.Log(1, "failed to decrypt message during client peer update for node ", id, decryptErr.Error())
|
||||
return
|
||||
}
|
||||
switch decrypted[0] {
|
||||
case ncutils.ACK:
|
||||
currentServerNode, err := logic.GetNetworkServerLocal(currentNode.Network)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err := logic.ServerUpdate(¤tServerNode, false); err != nil {
|
||||
logger.Log(1, "server node:", currentServerNode.ID, "failed update")
|
||||
return
|
||||
}
|
||||
case ncutils.DONE:
|
||||
if err := PublishPeerUpdate(¤tNode); err != nil {
|
||||
logger.Log(1, "error publishing peer update ", err.Error())
|
||||
return
|
||||
}
|
||||
case ncutils.KEY:
|
||||
logger.Log(0, "I should have broke")
|
||||
}
|
||||
|
||||
logger.Log(1, "sent peer updates after signal received from", id, currentNode.Name)
|
||||
}()
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/go-ping/ping"
|
||||
"github.com/gravitl/netmaker/models"
|
||||
|
@ -199,6 +200,12 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
|
|||
return
|
||||
}
|
||||
if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers
|
||||
ackErr := publishSignal(&cfg, ncutils.ACK)
|
||||
if ackErr != nil {
|
||||
ncutils.Log("could not notify server that it received an interface update")
|
||||
} else {
|
||||
ncutils.Log("signalled acknowledgement of change to server")
|
||||
}
|
||||
ncutils.Log("applying WG conf to " + file)
|
||||
err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file)
|
||||
if err != nil {
|
||||
|
@ -215,11 +222,11 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
|
|||
}
|
||||
}
|
||||
}
|
||||
pubErr := publishClientPeers(&cfg)
|
||||
if pubErr != nil {
|
||||
doneErr := publishSignal(&cfg, ncutils.DONE)
|
||||
if doneErr != nil {
|
||||
ncutils.Log("could not notify server to update peers after interface change")
|
||||
} else {
|
||||
ncutils.Log("signalled peer update to server")
|
||||
ncutils.Log("signalled finshed interface update to server")
|
||||
}
|
||||
}
|
||||
//deal with DNS
|
||||
|
@ -257,6 +264,7 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
|
|||
insert(peerUpdate.Network, lastPeerUpdate, string(data))
|
||||
|
||||
file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
|
||||
spew.Dump(peerUpdate.Peers)
|
||||
err = wireguard.UpdateWgPeers(file, peerUpdate.Peers)
|
||||
if err != nil {
|
||||
ncutils.Log("error updating wireguard peers" + err.Error())
|
||||
|
@ -480,9 +488,8 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
|
|||
}
|
||||
|
||||
// publishes a message to server to update peers on this peer's behalf
|
||||
func publishClientPeers(cfg *config.ClientConfig) error {
|
||||
payload := []byte(ncutils.MakeRandomString(16)) // just random string for now to keep the bytes different
|
||||
if err := publish(cfg, fmt.Sprintf("signal/%s", cfg.Node.ID), payload, 1); err != nil {
|
||||
func publishSignal(cfg *config.ClientConfig, signal byte) error {
|
||||
if err := publish(cfg, fmt.Sprintf("signal/%s", cfg.Node.ID), []byte{signal}, 1); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
10
netclient/ncutils/constants.go
Normal file
10
netclient/ncutils/constants.go
Normal file
|
@ -0,0 +1,10 @@
|
|||
package ncutils
|
||||
|
||||
const (
|
||||
// ACK - acknowledgement signal for MQ
|
||||
ACK = 1
|
||||
// DONE - done signal for MQ
|
||||
DONE = 2
|
||||
// KEY - key update completed signal for MQ
|
||||
KEY = 3
|
||||
)
|
|
@ -16,6 +16,7 @@ func IfaceDelta(currentNode *models.Node, newNode *models.Node) bool {
|
|||
newNode.IsEgressGateway != currentNode.IsEgressGateway ||
|
||||
newNode.IsIngressGateway != currentNode.IsIngressGateway ||
|
||||
newNode.IsRelay != currentNode.IsRelay ||
|
||||
newNode.ListenPort != currentNode.ListenPort ||
|
||||
newNode.UDPHolePunch != currentNode.UDPHolePunch ||
|
||||
newNode.MTU != currentNode.MTU ||
|
||||
newNode.IsPending != currentNode.IsPending ||
|
||||
|
|
|
@ -131,10 +131,6 @@ func InitWireguard(node *models.Node, privkey string, peers []wgtypes.PeerConfig
|
|||
return err
|
||||
}
|
||||
nodecfg := modcfg.Node
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("failed to open client: %v", err)
|
||||
}
|
||||
var ifacename string
|
||||
if nodecfg.Interface != "" {
|
||||
ifacename = nodecfg.Interface
|
||||
|
|
Loading…
Reference in a new issue