From 935567761b0bbf44488f0abbe321f9a3e83648e8 Mon Sep 17 00:00:00 2001 From: "Matthew R. Kasun" Date: Tue, 12 Apr 2022 10:43:02 -0400 Subject: [PATCH] mq direct to server public ip --- compose/docker-compose.contained.yml | 1 + config/config.go | 2 +- controllers/node_grpc.go | 6 -- logic/accesskeys.go | 7 +- models/accessToken.go | 2 +- models/node.go | 2 +- netclient/command/commands.go | 14 ++-- netclient/config/config.go | 3 +- netclient/functions/daemon.go | 72 +++++++----------- netclient/functions/join.go | 3 +- netclient/functions/mqhandlers.go | 5 +- netclient/functions/mqpublish.go | 108 +++++++++++++-------------- servercfg/serverconf.go | 13 +++- 13 files changed, 104 insertions(+), 134 deletions(-) diff --git a/compose/docker-compose.contained.yml b/compose/docker-compose.contained.yml index c0c8be75..efa0da00 100644 --- a/compose/docker-compose.contained.yml +++ b/compose/docker-compose.contained.yml @@ -16,6 +16,7 @@ services: - net.ipv4.conf.all.src_valid_mark=1 restart: always environment: + SERVER_NAME: "broker.NETMAKER_BASE_DOMAIN" SERVER_HOST: "SERVER_PUBLIC_IP" SERVER_API_CONN_STRING: "api.NETMAKER_BASE_DOMAIN:443" SERVER_GRPC_CONN_STRING: "grpc.NETMAKER_BASE_DOMAIN:443" diff --git a/config/config.go b/config/config.go index afe9cdd3..4d635df7 100644 --- a/config/config.go +++ b/config/config.go @@ -73,7 +73,7 @@ type ServerConfig struct { HostNetwork string `yaml:"hostnetwork"` CommsCIDR string `yaml:"commscidr"` MQPort string `yaml:"mqport"` - CommsID string `yaml:"commsid"` + Server string `yaml:"server"` } // SQLConfig - Generic SQL Config diff --git a/controllers/node_grpc.go b/controllers/node_grpc.go index bec6d93b..9e85fdac 100644 --- a/controllers/node_grpc.go +++ b/controllers/node_grpc.go @@ -92,12 +92,6 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) Server: key, } - commID, err := logic.FetchCommsNetID() - if err != nil { - return nil, err - } - node.CommID = commID - err = logic.CreateNode(&node) if err != nil { return nil, err diff --git a/logic/accesskeys.go b/logic/accesskeys.go index 15ffd5b3..1b1cbcef 100644 --- a/logic/accesskeys.go +++ b/logic/accesskeys.go @@ -51,17 +51,12 @@ func CreateAccessKey(accesskey models.AccessKey, network models.Network) (models netID := network.NetID - commsNetID, err := FetchCommsNetID() - if err != nil { - return models.AccessKey{}, errors.New("could not retrieve comms netid") - } - var accessToken models.AccessToken s := servercfg.GetServerConfig() servervals := models.ServerConfig{ GRPCConnString: s.GRPCConnString, GRPCSSL: s.GRPCSSL, - CommsNetwork: commsNetID, + Server: s.Server, } accessToken.ServerConfig = servervals accessToken.ClientConfig.Network = netID diff --git a/models/accessToken.go b/models/accessToken.go index d254411a..e66d932a 100644 --- a/models/accessToken.go +++ b/models/accessToken.go @@ -14,5 +14,5 @@ type ClientConfig struct { type ServerConfig struct { GRPCConnString string `json:"grpcconn"` GRPCSSL string `json:"grpcssl"` - CommsNetwork string `json:"commsnetwork"` + Server string `json:"server"` } diff --git a/models/node.go b/models/node.go index cc1dffe5..5e77b127 100644 --- a/models/node.go +++ b/models/node.go @@ -79,7 +79,7 @@ type Node struct { OS string `json:"os" bson:"os" yaml:"os"` MTU int32 `json:"mtu" bson:"mtu" yaml:"mtu"` Version string `json:"version" bson:"version" yaml:"version"` - CommID string `json:"commid" bson:"commid" yaml:"comid"` + Server string `json:"server" bson:"server" yaml:"server"` TrafficKeys TrafficKeys `json:"traffickeys" bson:"traffickeys" yaml:"traffickeys"` } diff --git a/netclient/command/commands.go b/netclient/command/commands.go index 87c1af5c..40e844bd 100644 --- a/netclient/command/commands.go +++ b/netclient/command/commands.go @@ -98,13 +98,13 @@ func Leave(cfg *config.ClientConfig, force bool) error { } else { logger.Log(0, "success") } - nets, err := ncutils.GetSystemNetworks() - if err == nil && len(nets) == 1 { - if nets[0] == cfg.Node.CommID { - logger.Log(1, "detected comms as remaining network, removing...") - err = functions.LeaveNetwork(nets[0], true) - } - } + //nets, err := ncutils.GetSystemNetworks() + //if err == nil && len(nets) == 1 { + //if nets[0] == cfg.Node.CommID { + //logger.Log(1, "detected comms as remaining network, removing...") + //err = functions.LeaveNetwork(nets[0], true) + //} + //} return err } diff --git a/netclient/config/config.go b/netclient/config/config.go index 350a8701..46c4652b 100644 --- a/netclient/config/config.go +++ b/netclient/config/config.go @@ -34,6 +34,7 @@ type ServerConfig struct { AccessKey string `yaml:"accesskey"` GRPCSSL string `yaml:"grpcssl"` CommsNetwork string `yaml:"commsnetwork"` + Server string `yaml:"server"` } // Write - writes the config of a client to disk @@ -188,7 +189,7 @@ func GetCLIConfig(c *cli.Context) (ClientConfig, string, error) { cfg.Server.AccessKey = accesstoken.ClientConfig.Key cfg.Node.LocalRange = accesstoken.ClientConfig.LocalRange cfg.Server.GRPCSSL = accesstoken.ServerConfig.GRPCSSL - cfg.Server.CommsNetwork = accesstoken.ServerConfig.CommsNetwork + cfg.Server.Server = accesstoken.ServerConfig.Server if c.String("grpcserver") != "" { cfg.Server.GRPCAddress = c.String("grpcserver") } diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 5e1f2cc1..9cc69efd 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -36,9 +36,13 @@ type cachedMessage struct { // Daemon runs netclient daemon from command line func Daemon() error { + var exists = struct{}{} + serverSet := make(map[string]struct{}) // == initial pull of all networks == networks, _ := ncutils.GetSystemNetworks() for _, network := range networks { + serverSet[network] = exists + //temporary code --- remove in version v0.13.0 removeHostDNS(network, ncutils.IsWindows()) // end of code to be removed in version v0.13.0 @@ -48,30 +52,24 @@ func Daemon() error { initialPull(cfg.Network) } - // == get all the comms networks on machine == - commsNetworks, err := getCommsNetworks(networks[:]) - if err != nil { - return errors.New("no comm networks exist") - } - // == subscribe to all nodes on each comms network on machine == - for currCommsNet := range commsNetworks { - logger.Log(1, "started comms network daemon, ", currCommsNet) + for server := range serverSet { + logger.Log(1, "started daemon for server , ", server) ctx, cancel := context.WithCancel(context.Background()) - networkcontext.Store(currCommsNet, cancel) - go messageQueue(ctx, currCommsNet) + networkcontext.Store(server, cancel) + go messageQueue(ctx, server) } // == add waitgroup and cancel for checkin routine == wg := sync.WaitGroup{} ctx, cancel := context.WithCancel(context.Background()) wg.Add(1) - go Checkin(ctx, &wg, commsNetworks) + go Checkin(ctx, &wg, serverSet) quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGTERM, os.Interrupt, os.Kill) <-quit - for currCommsNet := range commsNetworks { - if cancel, ok := networkcontext.Load(currCommsNet); ok { + for server := range serverSet { + if cancel, ok := networkcontext.Load(server); ok { cancel.(context.CancelFunc)() } } @@ -101,16 +99,14 @@ func UpdateKeys(nodeCfg *config.ClientConfig, client mqtt.Client) error { } nodeCfg.Node.PublicKey = key.PublicKey().String() - var commsCfg = getCommsCfgByNode(&nodeCfg.Node) - PublishNodeUpdate(&commsCfg, nodeCfg) + PublishNodeUpdate(nodeCfg) return nil } // PingServer -- checks if server is reachable // use commsCfg only* -func PingServer(commsCfg *config.ClientConfig) error { - node := getServerAddress(commsCfg) - pinger, err := ping.NewPinger(node) +func PingServer(cfg *config.ClientConfig) error { + pinger, err := ping.NewPinger(cfg.Node.Server) if err != nil { return err } @@ -185,9 +181,9 @@ func messageQueue(ctx context.Context, commsNet string) { // setupMQTT creates a connection to broker and return client // utilizes comms client configs to setup connections -func setupMQTT(commsCfg *config.ClientConfig, publish bool) mqtt.Client { +func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client { opts := mqtt.NewClientOptions() - server := getServerAddress(commsCfg) + server := cfg.Node.Server opts.AddBroker(server + ":1883") // TODO get the appropriate port of the comms mq server opts.ClientID = ncutils.MakeRandomString(23) // helps avoid id duplication on broker opts.SetDefaultPublishHandler(All) @@ -213,8 +209,8 @@ func setupMQTT(commsCfg *config.ClientConfig, publish bool) mqtt.Client { opts.SetOrderMatters(true) opts.SetResumeSubs(true) opts.SetConnectionLostHandler(func(c mqtt.Client, e error) { - logger.Log(0, "detected broker connection lost, running pull for ", commsCfg.Node.Network) - _, err := Pull(commsCfg.Node.Network, true) + logger.Log(0, "detected broker connection lost, running pull for ", cfg.Node.Network) + _, err := Pull(cfg.Node.Network, true) if err != nil { logger.Log(0, "could not run pull, server unreachable: ", err.Error()) logger.Log(0, "waiting to retry...") @@ -227,10 +223,10 @@ func setupMQTT(commsCfg *config.ClientConfig, publish bool) mqtt.Client { for { //if after 12 seconds, try a gRPC pull on the last try if time.Now().After(tperiod) { - logger.Log(0, "running pull for ", commsCfg.Node.Network) - _, err := Pull(commsCfg.Node.Network, true) + logger.Log(0, "running pull for ", cfg.Node.Network) + _, err := Pull(cfg.Node.Network, true) if err != nil { - logger.Log(0, "could not run pull, exiting ", commsCfg.Node.Network, " setup: ", err.Error()) + logger.Log(0, "could not run pull, exiting ", cfg.Node.Network, " setup: ", err.Error()) return client } time.Sleep(time.Second) @@ -238,10 +234,10 @@ func setupMQTT(commsCfg *config.ClientConfig, publish bool) mqtt.Client { if token := client.Connect(); token.Wait() && token.Error() != nil { logger.Log(0, "unable to connect to broker, retrying ...") if time.Now().After(tperiod) { - logger.Log(0, "could not connect to broker, exiting ", commsCfg.Node.Network, " setup: ", token.Error().Error()) + logger.Log(0, "could not connect to broker, exiting ", cfg.Node.Network, " setup: ", token.Error().Error()) if strings.Contains(token.Error().Error(), "connectex") || strings.Contains(token.Error().Error(), "i/o timeout") { logger.Log(0, "connection issue detected.. pulling and restarting daemon") - Pull(commsCfg.Node.Network, true) + Pull(cfg.Node.Network, true) daemon.Restart() } return client @@ -255,8 +251,8 @@ func setupMQTT(commsCfg *config.ClientConfig, publish bool) mqtt.Client { } // publishes a message to server to update peers on this peer's behalf -func publishSignal(commsCfg, nodeCfg *config.ClientConfig, signal byte) error { - if err := publish(commsCfg, nodeCfg, fmt.Sprintf("signal/%s", nodeCfg.Node.ID), []byte{signal}, 1); err != nil { +func publishSignal(nodeCfg *config.ClientConfig, signal byte) error { + if err := publish(nodeCfg, fmt.Sprintf("signal/%s", nodeCfg.Node.ID), []byte{signal}, 1); err != nil { return err } return nil @@ -324,24 +320,6 @@ func getServerAddress(cfg *config.ClientConfig) string { return server.Address } -func getCommsNetworks(networks []string) (map[string]bool, error) { - var cfg config.ClientConfig - var response = make(map[string]bool, 1) - for _, network := range networks { - cfg.Network = network - cfg.ReadConfig() - response[cfg.Node.CommID] = true - } - return response, nil -} - -func getCommsCfgByNode(node *models.Node) config.ClientConfig { - var commsCfg config.ClientConfig - commsCfg.Network = node.CommID - commsCfg.ReadConfig() - return commsCfg -} - // == Message Caches == func insert(network, which, cache string) { diff --git a/netclient/functions/join.go b/netclient/functions/join.go index 9c453c72..a0473244 100644 --- a/netclient/functions/join.go +++ b/netclient/functions/join.go @@ -297,8 +297,7 @@ func setListenPort(oldListenPort int32, cfg *config.ClientConfig) { // if newListenPort has been modified to find an available port, publish to server if cfg.Node.ListenPort != newListenPort { - var currentCommsCfg = getCommsCfgByNode(&cfg.Node) - PublishNodeUpdate(¤tCommsCfg, cfg) + PublishNodeUpdate(cfg) } } } diff --git a/netclient/functions/mqhandlers.go b/netclient/functions/mqhandlers.go index 4c3b7808..40298dc3 100644 --- a/netclient/functions/mqhandlers.go +++ b/netclient/functions/mqhandlers.go @@ -33,7 +33,6 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { var network = parseNetworkFromTopic(msg.Topic()) nodeCfg.Network = network nodeCfg.ReadConfig() - var commsCfg = getCommsCfgByNode(&nodeCfg.Node) data, dataErr := decryptMsg(&nodeCfg, msg.Payload()) if dataErr != nil { @@ -131,14 +130,14 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { // } // } // } - doneErr := publishSignal(&commsCfg, &nodeCfg, ncutils.DONE) + doneErr := publishSignal(&nodeCfg, ncutils.DONE) if doneErr != nil { logger.Log(0, "could not notify server to update peers after interface change") } else { logger.Log(0, "signalled finished interface update to server") } } else if hubChange { - doneErr := publishSignal(&commsCfg, &nodeCfg, ncutils.DONE) + doneErr := publishSignal(&nodeCfg, ncutils.DONE) if doneErr != nil { logger.Log(0, "could not notify server to update peers after hub change") } else { diff --git a/netclient/functions/mqpublish.go b/netclient/functions/mqpublish.go index fc5ce3eb..1ae20b55 100644 --- a/netclient/functions/mqpublish.go +++ b/netclient/functions/mqpublish.go @@ -15,7 +15,7 @@ import ( // Checkin -- go routine that checks for public or local ip changes, publishes changes // if there are no updates, simply "pings" the server as a checkin -func Checkin(ctx context.Context, wg *sync.WaitGroup, currentComms map[string]bool) { +func Checkin(ctx context.Context, wg *sync.WaitGroup, currentComms map[string]struct{}) { defer wg.Done() for { select { @@ -30,58 +30,50 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup, currentComms map[string]bo if err != nil { return } - for commsNet := range currentComms { - var currCommsCfg config.ClientConfig - currCommsCfg.Network = commsNet - currCommsCfg.ReadConfig() - for _, network := range networks { - var nodeCfg config.ClientConfig - nodeCfg.Network = network - nodeCfg.ReadConfig() - if nodeCfg.Node.CommID != commsNet { - continue // skip if not on current comms network + for _, network := range networks { + var nodeCfg config.ClientConfig + nodeCfg.Network = network + nodeCfg.ReadConfig() + if nodeCfg.Node.IsStatic != "yes" { + extIP, err := ncutils.GetPublicIP() + if err != nil { + logger.Log(1, "error encountered checking public ip addresses: ", err.Error()) } - if nodeCfg.Node.IsStatic != "yes" { - extIP, err := ncutils.GetPublicIP() - if err != nil { - logger.Log(1, "error encountered checking public ip addresses: ", err.Error()) - } - if nodeCfg.Node.Endpoint != extIP && extIP != "" { - logger.Log(1, "endpoint has changed from ", nodeCfg.Node.Endpoint, " to ", extIP) - nodeCfg.Node.Endpoint = extIP - if err := PublishNodeUpdate(&currCommsCfg, &nodeCfg); err != nil { - logger.Log(0, "could not publish endpoint change") - } - } - intIP, err := getPrivateAddr() - if err != nil { - logger.Log(1, "error encountered checking private ip addresses: ", err.Error()) - } - if nodeCfg.Node.LocalAddress != intIP && intIP != "" { - logger.Log(1, "local Address has changed from ", nodeCfg.Node.LocalAddress, " to ", intIP) - nodeCfg.Node.LocalAddress = intIP - if err := PublishNodeUpdate(&currCommsCfg, &nodeCfg); err != nil { - logger.Log(0, "could not publish local address change") - } - } - } else if nodeCfg.Node.IsLocal == "yes" && nodeCfg.Node.LocalRange != "" { - localIP, err := ncutils.GetLocalIP(nodeCfg.Node.LocalRange) - if err != nil { - logger.Log(1, "error encountered checking local ip addresses: ", err.Error()) - } - if nodeCfg.Node.Endpoint != localIP && localIP != "" { - logger.Log(1, "endpoint has changed from "+nodeCfg.Node.Endpoint+" to ", localIP) - nodeCfg.Node.Endpoint = localIP - if err := PublishNodeUpdate(&currCommsCfg, &nodeCfg); err != nil { - logger.Log(0, "could not publish localip change") - } + if nodeCfg.Node.Endpoint != extIP && extIP != "" { + logger.Log(1, "endpoint has changed from ", nodeCfg.Node.Endpoint, " to ", extIP) + nodeCfg.Node.Endpoint = extIP + if err := PublishNodeUpdate(&nodeCfg); err != nil { + logger.Log(0, "could not publish endpoint change") } } - if err := PingServer(&currCommsCfg); err != nil { - logger.Log(0, "could not ping server on comms net, ", currCommsCfg.Network, "\n", err.Error()) - } else { - Hello(&currCommsCfg, &nodeCfg) + intIP, err := getPrivateAddr() + if err != nil { + logger.Log(1, "error encountered checking private ip addresses: ", err.Error()) } + if nodeCfg.Node.LocalAddress != intIP && intIP != "" { + logger.Log(1, "local Address has changed from ", nodeCfg.Node.LocalAddress, " to ", intIP) + nodeCfg.Node.LocalAddress = intIP + if err := PublishNodeUpdate(&nodeCfg); err != nil { + logger.Log(0, "could not publish local address change") + } + } + } else if nodeCfg.Node.IsLocal == "yes" && nodeCfg.Node.LocalRange != "" { + localIP, err := ncutils.GetLocalIP(nodeCfg.Node.LocalRange) + if err != nil { + logger.Log(1, "error encountered checking local ip addresses: ", err.Error()) + } + if nodeCfg.Node.Endpoint != localIP && localIP != "" { + logger.Log(1, "endpoint has changed from "+nodeCfg.Node.Endpoint+" to ", localIP) + nodeCfg.Node.Endpoint = localIP + if err := PublishNodeUpdate(&nodeCfg); err != nil { + logger.Log(0, "could not publish localip change") + } + } + } + if err := PingServer(&nodeCfg); err != nil { + logger.Log(0, "could not ping server for , ", nodeCfg.Network, "\n", err.Error()) + } else { + Hello(&nodeCfg) } } } @@ -89,7 +81,7 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup, currentComms map[string]bo } // PublishNodeUpdates -- saves node and pushes changes to broker -func PublishNodeUpdate(commsCfg, nodeCfg *config.ClientConfig) error { +func PublishNodeUpdate(nodeCfg *config.ClientConfig) error { if err := config.Write(nodeCfg, nodeCfg.Network); err != nil { return err } @@ -97,7 +89,7 @@ func PublishNodeUpdate(commsCfg, nodeCfg *config.ClientConfig) error { if err != nil { return err } - if err = publish(commsCfg, nodeCfg, fmt.Sprintf("update/%s", nodeCfg.Node.ID), data, 1); err != nil { + if err = publish(nodeCfg, fmt.Sprintf("update/%s", nodeCfg.Node.ID), data, 1); err != nil { return err } logger.Log(0, "sent a node update to server for node", nodeCfg.Node.Name, ", ", nodeCfg.Node.ID) @@ -105,20 +97,20 @@ func PublishNodeUpdate(commsCfg, nodeCfg *config.ClientConfig) error { } // Hello -- ping the broker to let server know node it's alive and well -func Hello(commsCfg, nodeCfg *config.ClientConfig) { - if err := publish(commsCfg, nodeCfg, fmt.Sprintf("ping/%s", nodeCfg.Node.ID), []byte(ncutils.Version), 0); err != nil { +func Hello(nodeCfg *config.ClientConfig) { + if err := publish(nodeCfg, fmt.Sprintf("ping/%s", nodeCfg.Node.ID), []byte(ncutils.Version), 0); err != nil { logger.Log(0, fmt.Sprintf("error publishing ping, %v", err)) - logger.Log(0, "running pull on "+commsCfg.Node.Network+" to reconnect") - _, err := Pull(commsCfg.Node.Network, true) + logger.Log(0, "running pull on "+nodeCfg.Node.Network+" to reconnect") + _, err := Pull(nodeCfg.Node.Network, true) if err != nil { - logger.Log(0, "could not run pull on "+commsCfg.Node.Network+", error: "+err.Error()) + logger.Log(0, "could not run pull on "+nodeCfg.Node.Network+", error: "+err.Error()) } } } // requires the commscfg in which to send traffic over and nodecfg of node that is publish the message // node cfg is so that the traffic keys of that node may be fetched for encryption -func publish(commsCfg, nodeCfg *config.ClientConfig, dest string, msg []byte, qos byte) error { +func publish(nodeCfg *config.ClientConfig, dest string, msg []byte, qos byte) error { // setup the keys trafficPrivKey, err := auth.RetrieveTrafficKey(nodeCfg.Node.Network) if err != nil { @@ -130,7 +122,7 @@ func publish(commsCfg, nodeCfg *config.ClientConfig, dest string, msg []byte, qo return err } - client := setupMQTT(commsCfg, true) + client := setupMQTT(nodeCfg, true) defer client.Disconnect(250) encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey) if err != nil { diff --git a/servercfg/serverconf.go b/servercfg/serverconf.go index 0712237d..98429478 100644 --- a/servercfg/serverconf.go +++ b/servercfg/serverconf.go @@ -96,7 +96,7 @@ func GetServerConfig() config.ServerConfig { cfg.ManageIPTables = ManageIPTables() services := strings.Join(GetPortForwardServiceList(), ",") cfg.PortForwardServices = services - cfg.CommsID = GetCommsID() + cfg.Server = GetServer() return cfg } @@ -412,6 +412,17 @@ func ManageIPTables() string { return manage } +// GetServer - gets the server name +func GetServer() string { + server := "" + if os.Getenv("SERVER_NAME") != "" { + server = os.Getenv("SERVER_NAME") + } else if config.Config.Server.Server != "" { + server = config.Config.Server.Server + } + return server +} + // IsDNSMode - should it run with DNS func IsDNSMode() bool { isdns := true