diff --git a/Dockerfile b/Dockerfile index 4be29ea6..c7c6369c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ #first stage - builder -FROM gravitl/go-builder as builder +FROM gravitl/go-builder AS builder ARG tags WORKDIR /app COPY . . diff --git a/auth/host_session.go b/auth/host_session.go index 0113351a..d6869ed0 100644 --- a/auth/host_session.go +++ b/auth/host_session.go @@ -164,10 +164,6 @@ func SessionHandler(conn *websocket.Conn) { logger.Log(0, "failed to create host credentials for EMQX: ", err.Error()) return } - if err := mq.GetEmqxHandler().CreateHostACL(result.Host.ID.String(), servercfg.GetServerInfo().Server); err != nil { - logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error()) - return - } } logic.CheckHostPorts(&result.Host) if err := logic.CreateHost(&result.Host); err != nil { diff --git a/config/config.go b/config/config.go index 522372ef..591c8e3d 100644 --- a/config/config.go +++ b/config/config.go @@ -94,6 +94,7 @@ type ServerConfig struct { CacheEnabled string `yaml:"caching_enabled"` EndpointDetection bool `json:"endpoint_detection"` AllowedEmailDomains string `yaml:"allowed_email_domains"` + MetricInterval string `yaml:"metric_interval"` } // SQLConfig - Generic SQL Config diff --git a/controllers/enrollmentkeys.go b/controllers/enrollmentkeys.go index 2cfb1363..87a189a4 100644 --- a/controllers/enrollmentkeys.go +++ b/controllers/enrollmentkeys.go @@ -315,10 +315,6 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) { logger.Log(0, "failed to create host credentials for EMQX: ", err.Error()) return } - if err := mq.GetEmqxHandler().CreateHostACL(newHost.ID.String(), servercfg.GetServerInfo().Server); err != nil { - logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error()) - return - } } if err = logic.CreateHost(&newHost); err != nil { logger.Log( diff --git a/controllers/hosts.go b/controllers/hosts.go index f6a88aa4..faf56411 100644 --- a/controllers/hosts.go +++ b/controllers/hosts.go @@ -555,23 +555,10 @@ func authenticateHost(response http.ResponseWriter, request *http.Request) { return } go func() { - // Create EMQX creds and ACLs if not found + // Create EMQX creds if servercfg.GetBrokerType() == servercfg.EmqxBrokerType { if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil { slog.Error("failed to create host credentials for EMQX: ", err.Error()) - } else { - if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil { - slog.Error("failed to add host ACL rules to EMQX: ", err.Error()) - } - for _, nodeID := range host.Nodes { - if node, err := logic.GetNodeByID(nodeID); err == nil { - if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil { - slog.Error("failed to add ACLs for EMQX node", "error", err) - } - } else { - slog.Error("failed to get node", "nodeid", nodeID, "error", err) - } - } } } }() diff --git a/controllers/server.go b/controllers/server.go index 6e96688c..6c8c121e 100644 --- a/controllers/server.go +++ b/controllers/server.go @@ -117,6 +117,7 @@ func getStatus(w http.ResponseWriter, r *http.Request) { type status struct { DB bool `json:"db_connected"` Broker bool `json:"broker_connected"` + IsBrokerConnOpen bool `json:"is_broker_conn_open"` LicenseError string `json:"license_error"` IsPro bool `json:"is_pro"` TrialEndDate time.Time `json:"trial_end_date"` @@ -141,6 +142,7 @@ func getStatus(w http.ResponseWriter, r *http.Request) { currentServerStatus := status{ DB: database.IsConnected(), Broker: mq.IsConnected(), + IsBrokerConnOpen: mq.IsConnectionOpen(), LicenseError: licenseErr, IsPro: servercfg.IsPro, TrialEndDate: trialEndDate, diff --git a/models/host.go b/models/host.go index 684cc7d1..2781dee0 100644 --- a/models/host.go +++ b/models/host.go @@ -116,6 +116,8 @@ const ( UpdateKeys HostMqAction = "UPDATE_KEYS" // RequestPull - request a pull from a host RequestPull HostMqAction = "REQ_PULL" + // SignalPull - request a pull from a host without restart + SignalPull HostMqAction = "SIGNAL_PULL" // UpdateMetrics - updates metrics data UpdateMetrics HostMqAction = "UPDATE_METRICS" ) diff --git a/models/structs.go b/models/structs.go index d4ea1171..59427533 100644 --- a/models/structs.go +++ b/models/structs.go @@ -273,19 +273,20 @@ type NodeJoinResponse struct { // ServerConfig - struct for dealing with the server information for a netclient type ServerConfig struct { - CoreDNSAddr string `yaml:"corednsaddr"` - API string `yaml:"api"` - APIPort string `yaml:"apiport"` - DNSMode string `yaml:"dnsmode"` - Version string `yaml:"version"` - MQPort string `yaml:"mqport"` - MQUserName string `yaml:"mq_username"` - MQPassword string `yaml:"mq_password"` - BrokerType string `yaml:"broker_type"` - Server string `yaml:"server"` - Broker string `yaml:"broker"` - IsPro bool `yaml:"isee" json:"Is_EE"` - TrafficKey []byte `yaml:"traffickey"` + CoreDNSAddr string `yaml:"corednsaddr"` + API string `yaml:"api"` + APIPort string `yaml:"apiport"` + DNSMode string `yaml:"dnsmode"` + Version string `yaml:"version"` + MQPort string `yaml:"mqport"` + MQUserName string `yaml:"mq_username"` + MQPassword string `yaml:"mq_password"` + BrokerType string `yaml:"broker_type"` + Server string `yaml:"server"` + Broker string `yaml:"broker"` + IsPro bool `yaml:"isee" json:"Is_EE"` + TrafficKey []byte `yaml:"traffickey"` + MetricInterval string `yaml:"metric_interval"` } // User.NameInCharset - returns if name is in charset below or not diff --git a/mq/emqx.go b/mq/emqx.go index 8b9b9f09..43b8390b 100644 --- a/mq/emqx.go +++ b/mq/emqx.go @@ -10,10 +10,7 @@ type Emqx interface { CreateEmqxUserforServer() error CreateEmqxDefaultAuthenticator() error CreateEmqxDefaultAuthorizer() error - CreateDefaultDenyRule() error - CreateHostACL(hostID, serverName string) error - AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error - GetUserACL(username string) (*aclObject, error) + CreateDefaultAllowRule() error DeleteEmqxUser(username string) error } diff --git a/mq/emqx_cloud.go b/mq/emqx_cloud.go index 23bfb244..aabac0db 100644 --- a/mq/emqx_cloud.go +++ b/mq/emqx_cloud.go @@ -89,21 +89,10 @@ func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ign func (e *EmqxCloud) CreateEmqxDefaultAuthorizer() error { return nil } // ignore -func (e *EmqxCloud) CreateDefaultDenyRule() error { +func (e *EmqxCloud) CreateDefaultAllowRule() error { return nil } -func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error { - return nil -} - -func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error { - return nil - -} - -func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list - func (e *EmqxCloud) DeleteEmqxUser(username string) error { client := &http.Client{} diff --git a/mq/emqx_on_prem.go b/mq/emqx_on_prem.go index f116f0a8..d69067f3 100644 --- a/mq/emqx_on_prem.go +++ b/mq/emqx_on_prem.go @@ -7,7 +7,6 @@ import ( "io" "net/http" "strings" - "sync" "github.com/gravitl/netmaker/servercfg" ) @@ -246,45 +245,14 @@ func (e *EmqxOnPrem) CreateEmqxDefaultAuthorizer() error { return nil } -// GetUserACL - returns ACL rules by username -func (e *EmqxOnPrem) GetUserACL(username string) (*aclObject, error) { - token, err := getEmqxAuthToken() - if err != nil { - return nil, err - } - req, err := http.NewRequest(http.MethodGet, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+username, nil) - if err != nil { - return nil, err - } - req.Header.Add("content-type", "application/json") - req.Header.Add("authorization", "Bearer "+token) - resp, err := (&http.Client{}).Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - response, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("error fetching ACL rules %v", string(response)) - } - body := new(aclObject) - if err := json.Unmarshal(response, body); err != nil { - return nil, err - } - return body, nil -} - -// CreateDefaultDenyRule - creates a rule to deny access to all topics for all users by default +// CreateDefaultAllowRule - creates a rule to deny access to all topics for all users by default // to allow user access to topics use the `mq.CreateUserAccessRule` function -func (e *EmqxOnPrem) CreateDefaultDenyRule() error { +func (e *EmqxOnPrem) CreateDefaultAllowRule() error { token, err := getEmqxAuthToken() if err != nil { return err } - payload, err := json.Marshal(&aclObject{Rules: []aclRule{{Topic: "#", Permission: "deny", Action: "all"}}}) + payload, err := json.Marshal(&aclObject{Rules: []aclRule{{Topic: "#", Permission: "allow", Action: "all"}}}) if err != nil { return err } @@ -308,121 +276,3 @@ func (e *EmqxOnPrem) CreateDefaultDenyRule() error { } return nil } - -// CreateHostACL - create host ACL rules -func (e *EmqxOnPrem) CreateHostACL(hostID, serverName string) error { - token, err := getEmqxAuthToken() - if err != nil { - return err - } - payload, err := json.Marshal(&aclObject{ - Username: hostID, - Rules: []aclRule{ - { - Topic: fmt.Sprintf("peers/host/%s/%s", hostID, serverName), - Permission: "allow", - Action: "all", - }, - { - Topic: fmt.Sprintf("host/update/%s/%s", hostID, serverName), - Permission: "allow", - Action: "all", - }, - { - Topic: fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID), - Permission: "allow", - Action: "all", - }, - }, - }) - if err != nil { - return err - } - req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload)) - if err != nil { - return err - } - req.Header.Add("content-type", "application/json") - req.Header.Add("authorization", "Bearer "+token) - resp, err := (&http.Client{}).Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusNoContent { - msg, err := io.ReadAll(resp.Body) - if err != nil { - return err - } - return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg)) - } - return nil -} - -// a lock required for preventing simultaneous updates to the same ACL object leading to overwriting each other -// might occur when multiple nodes belonging to the same host are created at the same time -var nodeAclMux sync.Mutex - -// AppendNodeUpdateACL - adds ACL rule for subscribing to node updates for a node ID -func (e *EmqxOnPrem) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error { - nodeAclMux.Lock() - defer nodeAclMux.Unlock() - token, err := getEmqxAuthToken() - if err != nil { - return err - } - aclObject, err := emqx.GetUserACL(hostID) - if err != nil { - return err - } - aclObject.Rules = append(aclObject.Rules, []aclRule{ - { - Topic: fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID), - Permission: "allow", - Action: "subscribe", - }, - { - Topic: fmt.Sprintf("ping/%s/%s", serverName, nodeID), - Permission: "allow", - Action: "all", - }, - { - Topic: fmt.Sprintf("update/%s/%s", serverName, nodeID), - Permission: "allow", - Action: "all", - }, - { - Topic: fmt.Sprintf("signal/%s/%s", serverName, nodeID), - Permission: "allow", - Action: "all", - }, - { - Topic: fmt.Sprintf("metrics/%s/%s", serverName, nodeID), - Permission: "allow", - Action: "all", - }, - }...) - payload, err := json.Marshal(aclObject) - if err != nil { - return err - } - req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload)) - if err != nil { - return err - } - req.Header.Add("content-type", "application/json") - req.Header.Add("authorization", "Bearer "+token) - resp, err := (&http.Client{}).Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusNoContent { - msg, err := io.ReadAll(resp.Body) - if err != nil { - return err - } - return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg)) - } - return nil -} diff --git a/mq/handlers.go b/mq/handlers.go index 1c087393..8adb0744 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -113,12 +113,6 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) { slog.Error("failed to send new node to host", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err) return } else { - if servercfg.GetBrokerType() == servercfg.EmqxBrokerType { - if err = emqx.AppendNodeUpdateACL(hu.Host.ID.String(), hu.Node.Network, hu.Node.ID.String(), servercfg.GetServer()); err != nil { - slog.Error("failed to add ACLs for EMQX node", "error", err) - return - } - } nodes, err := logic.GetAllNodes() if err != nil { return diff --git a/mq/mq.go b/mq/mq.go index 92a6200b..a143b1ab 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -58,7 +58,7 @@ func SetupMQTT(fatal bool) { logger.Log(0, err.Error()) } // create a default deny ACL to all topics for all users - if err := emqx.CreateDefaultDenyRule(); err != nil { + if err := emqx.CreateDefaultAllowRule(); err != nil { log.Fatal(err) } } else { @@ -142,6 +142,11 @@ func Keepalive(ctx context.Context) { // IsConnected - function for determining if the mqclient is connected or not func IsConnected() bool { + return mqclient != nil && mqclient.IsConnected() +} + +// IsConnectionOpen - function for determining if the mqclient is connected or not +func IsConnectionOpen() bool { return mqclient != nil && mqclient.IsConnectionOpen() } diff --git a/mq/publishers.go b/mq/publishers.go index 6c231b01..b3f3efda 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -35,7 +35,6 @@ func PublishPeerUpdate(replacePeers bool) error { logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) } }(host) - } return err } @@ -217,30 +216,14 @@ func sendPeers() { if err != nil && len(hosts) > 0 { logger.Log(1, "error retrieving networks for keepalive", err.Error()) } - nodes, err := logic.GetAllNodes() - if err != nil { - return - } - var force bool + peer_force_send++ if peer_force_send == 5 { servercfg.SetHost() - force = true peer_force_send = 0 err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed.. if err != nil { logger.Log(3, "error occurred on timer,", err.Error()) } - - //collectServerMetrics(networks[:]) - } - if force { - for _, host := range hosts { - host := host - logger.Log(2, "sending scheduled peer update (5 min)") - if err = PublishSingleHostPeerUpdate(&host, nodes, nil, nil, false); err != nil { - logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error()) - } - } } } diff --git a/pro/controllers/failover.go b/pro/controllers/failover.go index 04262165..42c02734 100644 --- a/pro/controllers/failover.go +++ b/pro/controllers/failover.go @@ -19,12 +19,44 @@ import ( // FailOverHandlers - handlers for FailOver func FailOverHandlers(r *mux.Router) { + r.HandleFunc("/api/v1/node/{nodeid}/failover", http.HandlerFunc(getfailOver)).Methods(http.MethodGet) r.HandleFunc("/api/v1/node/{nodeid}/failover", logic.SecurityCheck(true, http.HandlerFunc(createfailOver))).Methods(http.MethodPost) r.HandleFunc("/api/v1/node/{nodeid}/failover", logic.SecurityCheck(true, http.HandlerFunc(deletefailOver))).Methods(http.MethodDelete) r.HandleFunc("/api/v1/node/{network}/failover/reset", logic.SecurityCheck(true, http.HandlerFunc(resetFailOver))).Methods(http.MethodPost) r.HandleFunc("/api/v1/node/{nodeid}/failover_me", controller.Authorize(true, false, "host", http.HandlerFunc(failOverME))).Methods(http.MethodPost) } +// swagger:route GET /api/v1/node/failover node getfailOver +// +// get failover node. +// +// Schemes: https +// +// Security: +// oauth +// +// Responses: +// 200: nodeResponse +func getfailOver(w http.ResponseWriter, r *http.Request) { + var params = mux.Vars(r) + nodeid := params["nodeid"] + // confirm host exists + node, err := logic.GetNodeByID(nodeid) + if err != nil { + slog.Error("failed to get node:", "error", err.Error()) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) + return + } + + failOverNode, exists := proLogic.FailOverExists(node.Network) + if !exists { + logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failover node not found"), "notfound")) + return + } + w.Header().Set("Content-Type", "application/json") + logic.ReturnSuccessResponseWithJson(w, r, failOverNode, "get failover node successfully") +} + // swagger:route POST /api/v1/node/failover node createfailOver // // Create a relay. diff --git a/servercfg/serverconf.go b/servercfg/serverconf.go index 56e00e34..1ab54b7f 100644 --- a/servercfg/serverconf.go +++ b/servercfg/serverconf.go @@ -91,7 +91,7 @@ func GetServerConfig() config.ServerConfig { } cfg.JwtValidityDuration = GetJwtValidityDuration() cfg.RacAutoDisable = GetRacAutoDisable() - + cfg.MetricInterval = GetMetricInterval() return cfg } @@ -135,6 +135,7 @@ func GetServerInfo() models.ServerConfig { } cfg.Version = GetVersion() cfg.IsPro = IsPro + cfg.MetricInterval = GetMetricInterval() return cfg } @@ -586,6 +587,16 @@ func GetMqUserName() string { return password } +// GetMetricInterval - get the publish metric interval +func GetMetricInterval() string { + //default 15 minutes + mi := "15" + if os.Getenv("PUBLISH_METRIC_INTERVAL") != "" { + mi = os.Getenv("PUBLISH_METRIC_INTERVAL") + } + return mi +} + // GetEmqxRestEndpoint - returns the REST API Endpoint of EMQX func GetEmqxRestEndpoint() string { return os.Getenv("EMQX_REST_ENDPOINT")