diff --git a/compose/docker-compose-emqx.yml b/compose/docker-compose-emqx.yml new file mode 100644 index 00000000..b599b009 --- /dev/null +++ b/compose/docker-compose-emqx.yml @@ -0,0 +1,83 @@ +version: "3.4" + +services: + netmaker: + container_name: netmaker + image: gravitl/netmaker:v0.18.2 + restart: always + volumes: + - dnsconfig:/root/config/dnsconfig + - sqldata:/root/data + environment: + BROKER_ENDPOINT: "wss://broker.NETMAKER_BASE_DOMAIN/mqtt" + BROKER_TYPE: "emqx" + EMQX_REST_ENDPOINT: "http://mq:18083" + SERVER_NAME: "NETMAKER_BASE_DOMAIN" + STUN_DOMAIN: "stun.NETMAKER_BASE_DOMAIN" + SERVER_HOST: "SERVER_PUBLIC_IP" + SERVER_API_CONN_STRING: "api.NETMAKER_BASE_DOMAIN:443" + COREDNS_ADDR: "SERVER_PUBLIC_IP" + DNS_MODE: "on" + SERVER_HTTP_HOST: "api.NETMAKER_BASE_DOMAIN" + API_PORT: "8081" + MASTER_KEY: "REPLACE_MASTER_KEY" + CORS_ALLOWED_ORIGIN: "*" + DISPLAY_KEYS: "on" + DATABASE: "sqlite" + NODE_ID: "netmaker-server-1" + SERVER_BROKER_ENDPOINT: "ws://mq:8083/mqtt" + STUN_PORT: "3478" + VERBOSITY: "1" + MQ_PASSWORD: "REPLACE_MQ_PASSWORD" + MQ_USERNAME: "REPLACE_MQ_USERNAME" + ports: + - "3478:3478/udp" + netmaker-ui: + container_name: netmaker-ui + image: gravitl/netmaker-ui:v0.18.2 + depends_on: + - netmaker + links: + - "netmaker:api" + restart: always + environment: + BACKEND_URL: "https://api.NETMAKER_BASE_DOMAIN" + caddy: + image: caddy:2.6.2 + container_name: caddy + restart: unless-stopped + volumes: + - /root/Caddyfile:/etc/caddy/Caddyfile + - caddy_data:/data + - caddy_conf:/config + ports: + - "80:80" + - "443:443" + coredns: + container_name: coredns + image: coredns/coredns + command: -conf /root/dnsconfig/Corefile + depends_on: + - netmaker + restart: always + volumes: + - dnsconfig:/root/dnsconfig + mq: + container_name: mq + image: emqx/emqx:5.0.17 + restart: unless-stopped + environment: + EMQX_NAME: "emqx" + EMQX_DASHBOARD__DEFAULT_PASSWORD: "REPLACE_MQ_PASSWORD" + EMQX_DASHBOARD__DEFAULT_USERNAME: "REPLACE_MQ_USERNAME" + ports: + - "1883:1883" # MQTT + - "8883:8883" # SSL MQTT + - "8083:8083" # Websockets + - "18083:18083" # Dashboard/REST_API +volumes: + caddy_data: {} + caddy_conf: {} + sqldata: {} + dnsconfig: {} + mosquitto_logs: {} diff --git a/compose/docker-compose.ee.yml b/compose/docker-compose.ee.yml index fee3a412..d5567772 100644 --- a/compose/docker-compose.ee.yml +++ b/compose/docker-compose.ee.yml @@ -9,7 +9,9 @@ services: - dnsconfig:/root/config/dnsconfig - sqldata:/root/data environment: - BROKER_NAME: "broker.NETMAKER_BASE_DOMAIN" + BROKER_ENDPOINT: "wss://broker.NETMAKER_BASE_DOMAIN/mqtt" + BROKER_TYPE: "emqx" + EMQX_REST_ENDPOINT: "http://mq:18083" SERVER_NAME: "NETMAKER_BASE_DOMAIN" STUN_DOMAIN: "stun.NETMAKER_BASE_DOMAIN" SERVER_HOST: "SERVER_PUBLIC_IP" @@ -23,9 +25,7 @@ services: DISPLAY_KEYS: "on" DATABASE: "sqlite" NODE_ID: "netmaker-server-1" - MQ_HOST: "mq" - MQ_PORT: "443" - MQ_SERVER_PORT: "1883" + SERVER_BROKER_ENDPOINT: "ws://mq:8083/mqtt" MQ_USERNAME: "REPLACE_MQ_USERNAME" MQ_PASSWORD: "REPLACE_MQ_PASSWORD" STUN_PORT: "3478" @@ -67,21 +67,17 @@ services: - dnsconfig:/root/dnsconfig mq: container_name: mq - image: eclipse-mosquitto:2.0.15-openssl - depends_on: - - netmaker + image: emqx/emqx:5.0.17 restart: unless-stopped - command: ["/mosquitto/config/wait.sh"] environment: - MQ_PASSWORD: "REPLACE_MQ_PASSWORD" - MQ_USERNAME: "REPLACE_MQ_USERNAME" - volumes: - - /root/mosquitto.conf:/mosquitto/config/mosquitto.conf - - /root/wait.sh:/mosquitto/config/wait.sh - - mosquitto_logs:/mosquitto/log + EMQX_NAME: "emqx" + EMQX_DASHBOARD__DEFAULT_PASSWORD: "REPLACE_MQ_PASSWORD" + EMQX_DASHBOARD__DEFAULT_USERNAME: "REPLACE_MQ_USERNAME" ports: - - "1883:1883" - - "8883:8883" + - "1883:1883" # MQTT + - "8883:8883" # SSL MQTT + - "8083:8083" # Websockets + - "18083:18083" # Dashboard/REST_API prometheus: container_name: prometheus image: gravitl/netmaker-prometheus:latest @@ -115,9 +111,8 @@ services: depends_on: - netmaker environment: - MQ_HOST: "mq" - MQ_PORT: "443" - MQ_SERVER_PORT: "1883" + SERVER_BROKER_ENDPOINT: "ws://mq:8083/mqtt" + BROKER_ENDPOINT: "wss://broker.NETMAKER_BASE_DOMAIN/mqtt" PROMETHEUS: "on" VERBOSITY: "1" API_PORT: "8085" diff --git a/compose/docker-compose.reference.yml b/compose/docker-compose.reference.yml index ed51f789..a67c2335 100644 --- a/compose/docker-compose.reference.yml +++ b/compose/docker-compose.reference.yml @@ -10,7 +10,7 @@ services: - sqldata:/root/data - shared_certs:/etc/netmaker environment: # Necessary capabilities to set iptables when running in container - BROKER_NAME: "broker.NETMAKER_BASE_DOMAIN" # The domain/host IP indicating the mq broker address + BROKER_ENDPOINT: "wss://broker.NETMAKER_BASE_DOMAIN" # The domain/host IP indicating the mq broker address SERVER_NAME: "NETMAKER_BASE_DOMAIN" # The base domain of netmaker SERVER_HOST: "SERVER_PUBLIC_IP" # Set to public IP of machine. SERVER_HTTP_HOST: "api.NETMAKER_BASE_DOMAIN" # Overrides SERVER_HOST if set. Useful for making HTTP available via different interfaces/networks. @@ -26,9 +26,7 @@ services: DISPLAY_KEYS: "on" # Show keys permanently in UI (until deleted) as opposed to 1-time display. DATABASE: "sqlite" # Database to use - sqlite, postgres, or rqlite NODE_ID: "netmaker-server-1" # used for HA - identifies this server vs other servers - MQ_HOST: "mq" # the address of the mq server. If running from docker compose it will be "mq". Otherwise, need to input address. If using "host networking", it will find and detect the IP of the mq container. - MQ_PORT: "443" # the reachable port of MQ - change if external MQ port changes (port on proxy, not necessarily the one exposed in docker-compose) - MQ_SERVER_PORT: "1883" # the reachable port of MQ by the server - change if internal MQ port changes (or use external port if MQ is not on the same machine) + SERVER_BROKER_ENDPOINT: ""ws://mq:1883"" # the address of the mq server. If running from docker compose it will be "mq". Otherwise, need to input address. If using "host networking", it will find and detect the IP of the mq container. MQ_USERNAME: "REPLACE_MQ_USERNAME" # the username to set for MQ access MQ_PASSWORD: "REPLACE_MQ_PASSWORD" # the password to set for MQ access STUN_PORT: "3478" # the reachable port of STUN on the server diff --git a/compose/docker-compose.yml b/compose/docker-compose.yml index 16b8dfe3..a9937df6 100644 --- a/compose/docker-compose.yml +++ b/compose/docker-compose.yml @@ -9,7 +9,7 @@ services: - dnsconfig:/root/config/dnsconfig - sqldata:/root/data environment: - BROKER_NAME: "broker.NETMAKER_BASE_DOMAIN" + BROKER_ENDPOINT: "wss://broker.NETMAKER_BASE_DOMAIN" SERVER_NAME: "NETMAKER_BASE_DOMAIN" STUN_DOMAIN: "stun.NETMAKER_BASE_DOMAIN" SERVER_HOST: "SERVER_PUBLIC_IP" @@ -23,9 +23,7 @@ services: DISPLAY_KEYS: "on" DATABASE: "sqlite" NODE_ID: "netmaker-server-1" - MQ_HOST: "mq" - MQ_PORT: "443" - MQ_SERVER_PORT: "1883" + SERVER_BROKER_ENDPOINT: "ws://mq:1883" STUN_PORT: "3478" VERBOSITY: "1" MQ_PASSWORD: "REPLACE_MQ_PASSWORD" diff --git a/config/config.go b/config/config.go index 70868550..e2c148d6 100644 --- a/config/config.go +++ b/config/config.go @@ -36,7 +36,10 @@ type ServerConfig struct { APIConnString string `yaml:"apiconn"` APIHost string `yaml:"apihost"` APIPort string `yaml:"apiport"` - MQHOST string `yaml:"mqhost"` + Broker string `yam:"broker"` + ServerBrokerEndpoint string `yaml:"serverbrokerendpoint"` + BrokerType string `yaml:"brokertype"` + EmqxRestEndpoint string `yaml:"emqxrestendpoint"` MasterKey string `yaml:"masterkey"` DNSKey string `yaml:"dnskey"` AllowedOrigin string `yaml:"allowedorigin"` @@ -59,10 +62,7 @@ type ServerConfig struct { AzureTenant string `yaml:"azuretenant"` Telemetry string `yaml:"telemetry"` HostNetwork string `yaml:"hostnetwork"` - MQPort string `yaml:"mqport"` - MQServerPort string `yaml:"mqserverport"` Server string `yaml:"server"` - Broker string `yam:"broker"` PublicIPService string `yaml:"publicipservice"` MQPassword string `yaml:"mqpassword"` MQUserName string `yaml:"mqusername"` diff --git a/controllers/node.go b/controllers/node.go index be9d69e8..7570a92c 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -564,6 +564,13 @@ func createNode(w http.ResponseWriter, r *http.Request) { data.Node.Server = servercfg.GetServer() if !logic.HostExists(&data.Host) { logic.CheckHostPorts(&data.Host) + if servercfg.GetBrokerType() == servercfg.EmqxBrokerType { + // create EMQX credentials for host if it doesn't exists + if err := mq.CreateEmqxUser(data.Host.ID.String(), data.Host.HostPass, false); err != nil { + logger.Log(0, "failed to add host credentials to EMQX: ", data.Host.ID.String(), err.Error()) + return + } + } } if err := logic.CreateHost(&data.Host); err != nil { if errors.Is(err, logic.ErrHostExists) { @@ -589,7 +596,6 @@ func createNode(w http.ResponseWriter, r *http.Request) { return } } - err = logic.AssociateNodeToHost(&data.Node, &data.Host) if err != nil { logger.Log(0, r.Header.Get("user"), diff --git a/docker/Caddyfile b/docker/Caddyfile index 51a6f98a..d8392b7d 100644 --- a/docker/Caddyfile +++ b/docker/Caddyfile @@ -38,5 +38,5 @@ https://stun.NETMAKER_BASE_DOMAIN { # MQ wss://broker.NETMAKER_BASE_DOMAIN { - reverse_proxy ws://mq:8883 + reverse_proxy ws://mq:8883 # For EMQX websockets use `reverse_proxy ws://mq:8083` } diff --git a/k8s/server/netmaker-server.yaml b/k8s/server/netmaker-server.yaml index 16aff312..ae77df58 100644 --- a/k8s/server/netmaker-server.yaml +++ b/k8s/server/netmaker-server.yaml @@ -71,12 +71,10 @@ spec: value: REPLACE_MASTER_KEY - name: CORS_ALLOWED_ORIGIN value: '*' - - name: MQ_HOST - value: "mq" - - name: MQ_PORT - value: "443" - - name: MQ_SERVER_PORT - value: "1883" + - name: SERVER_BROKER_ENDPOINT + value: "ws://mq:1883" + - name: BROKER_ENDPOINT + value: "wss://broker.NETMAKER_BASE_DOMAIN" - name: PLATFORM value: "Kubernetes" - name: VERBOSITY diff --git a/mq/emqx.go b/mq/emqx.go new file mode 100644 index 00000000..e2e0b773 --- /dev/null +++ b/mq/emqx.go @@ -0,0 +1,154 @@ +package mq + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/gravitl/netmaker/servercfg" +) + +type ( + emqxUser struct { + UserID string `json:"user_id"` + Password string `json:"password"` + Admin bool `json:"is_superuser"` + } + + emqxLogin struct { + Username string `json:"username"` + Password string `json:"password"` + } + + emqxLoginResponse struct { + License struct { + Edition string `json:"edition"` + } `json:"license"` + Token string `json:"token"` + Version string `json:"version"` + } +) + +func getEmqxAuthToken() (string, error) { + payload, err := json.Marshal(&emqxLogin{ + Username: servercfg.GetMqUserName(), + Password: servercfg.GetMqPassword(), + }) + if err != nil { + return "", err + } + resp, err := http.Post(servercfg.GetEmqxRestEndpoint()+"/api/v5/login", "application/json", bytes.NewReader(payload)) + if err != nil { + return "", err + } + msg, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("error during EMQX login %v", string(msg)) + } + var loginResp emqxLoginResponse + if err := json.Unmarshal(msg, &loginResp); err != nil { + return "", err + } + return loginResp.Token, nil +} + +// CreateEmqxUser - creates an EMQX user +func CreateEmqxUser(username, password string, admin bool) error { + token, err := getEmqxAuthToken() + if err != nil { + return err + } + payload, err := json.Marshal(&emqxUser{ + UserID: username, + Password: password, + Admin: admin, + }) + if err != nil { + return err + } + req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users", bytes.NewReader(payload)) + if err != nil { + return err + } + req.Header.Add("content-type", "application/json") + req.Header.Add("authorization", "Bearer "+token) + resp, err := (&http.Client{}).Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + msg, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + return fmt.Errorf("error creating EMQX user %v", string(msg)) + } + return nil +} + +// DeleteEmqxUser - deletes an EMQX user +func DeleteEmqxUser(username string) error { + token, err := getEmqxAuthToken() + if err != nil { + return err + } + req, err := http.NewRequest(http.MethodDelete, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users/"+username, nil) + if err != nil { + return err + } + req.Header.Add("authorization", "Bearer "+token) + resp, err := (&http.Client{}).Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + msg, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + return fmt.Errorf("error deleting EMQX user %v", string(msg)) + } + return nil +} + +// CreateEmqxDefaultAuthenticator - creates a default authenticator based on password and using EMQX's built in database as storage +func CreateEmqxDefaultAuthenticator() error { + token, err := getEmqxAuthToken() + if err != nil { + return err + } + payload, err := json.Marshal(&struct { + Mechanism string `json:"mechanism"` + Backend string `json:"backend"` + UserIDType string `json:"user_id_type"` + }{Mechanism: "password_based", Backend: "built_in_database", UserIDType: "username"}) + if err != nil { + return err + } + req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication", bytes.NewReader(payload)) + if err != nil { + return err + } + req.Header.Add("content-type", "application/json") + req.Header.Add("authorization", "Bearer "+token) + resp, err := (&http.Client{}).Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + msg, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + return fmt.Errorf("error creating default EMQX authenticator %v", string(msg)) + } + return nil +} diff --git a/mq/handlers.go b/mq/handlers.go index e4b79611..94951a77 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -152,6 +152,13 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) { return } case models.DeleteHost: + if servercfg.GetBrokerType() == servercfg.EmqxBrokerType { + // delete EMQX credentials for host + if err := DeleteEmqxUser(currentHost.ID.String()); err != nil { + logger.Log(0, "failed to remove host credentials from EMQX: ", currentHost.ID.String(), err.Error()) + return + } + } if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil { logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error()) return diff --git a/mq/mq.go b/mq/mq.go index 9fabc90f..b8e0d458 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -2,6 +2,7 @@ package mq import ( "context" + "log" "time" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -38,6 +39,17 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) { // SetupMQTT creates a connection to broker and return client func SetupMQTT() { + if servercfg.GetBrokerType() == servercfg.EmqxBrokerType { + time.Sleep(10 * time.Second) // wait for the REST endpoint to be ready + // setup authenticator and create admin user + if err := CreateEmqxDefaultAuthenticator(); err != nil { + logger.Log(0, err.Error()) + } + DeleteEmqxUser(servercfg.GetMqUserName()) + if err := CreateEmqxUser(servercfg.GetMqUserName(), servercfg.GetMqPassword(), true); err != nil { + log.Fatal(err) + } + } opts := mqtt.NewClientOptions() setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts) opts.SetOnConnectHandler(func(client mqtt.Client) { diff --git a/servercfg/serverconf.go b/servercfg/serverconf.go index 4d1f2a51..3685af87 100644 --- a/servercfg/serverconf.go +++ b/servercfg/serverconf.go @@ -13,6 +13,9 @@ import ( "github.com/gravitl/netmaker/models" ) +// EmqxBrokerType denotes the broker type for EMQX MQTT +const EmqxBrokerType = "emqx" + var ( Version = "dev" Is_EE = false @@ -35,7 +38,6 @@ func GetServerConfig() config.ServerConfig { cfg.CoreDNSAddr = GetCoreDNSAddr() cfg.APIHost = GetAPIHost() cfg.APIPort = GetAPIPort() - cfg.MQPort = GetMQPort() cfg.MasterKey = "(hidden)" cfg.DNSKey = "(hidden)" cfg.AllowedOrigin = GetAllowedOrigin() @@ -43,6 +45,8 @@ func GetServerConfig() config.ServerConfig { cfg.NodeID = GetNodeID() cfg.StunHost = GetStunAddr() cfg.StunPort = GetStunPort() + cfg.BrokerType = GetBrokerType() + cfg.EmqxRestEndpoint = GetEmqxRestEndpoint() if IsRestBackend() { cfg.RestBackend = "on" } @@ -83,14 +87,13 @@ func GetServerConfig() config.ServerConfig { func GetServerInfo() models.ServerConfig { var cfg models.ServerConfig cfg.Server = GetServer() - cfg.Broker = GetBroker() cfg.MQUserName = GetMqUserName() cfg.MQPassword = GetMqPassword() cfg.API = GetAPIConnString() cfg.CoreDNSAddr = GetCoreDNSAddr() cfg.APIPort = GetAPIPort() - cfg.MQPort = GetMQPort() cfg.DNSMode = "off" + cfg.Broker = GetPublicBrokerEndpoint() if IsDNSMode() { cfg.DNSMode = "on" } @@ -196,32 +199,39 @@ func GetCoreDNSAddr() string { return addr } -// GetMQPort - gets the mq port -func GetMQPort() string { - port := "8883" //default - if os.Getenv("MQ_PORT") != "" { - port = os.Getenv("MQ_PORT") - } else if config.Config.Server.MQPort != "" { - port = config.Config.Server.MQPort +// GetPublicBrokerEndpoint - returns the public broker endpoint which shall be used by netclient +func GetPublicBrokerEndpoint() string { + if os.Getenv("BROKER_ENDPOINT") != "" { + return os.Getenv("BROKER_ENDPOINT") + } else { + return config.Config.Server.Broker } - return port } // GetMessageQueueEndpoint - gets the message queue endpoint func GetMessageQueueEndpoint() (string, bool) { host, _ := GetPublicIP() - if os.Getenv("MQ_HOST") != "" { - host = os.Getenv("MQ_HOST") - } else if config.Config.Server.MQHOST != "" { - host = config.Config.Server.MQHOST - } - secure := strings.Contains(host, "wss") || strings.Contains(host, "ssl") - if secure { - host = "wss://" + host + if os.Getenv("SERVER_BROKER_ENDPOINT") != "" { + host = os.Getenv("SERVER_BROKER_ENDPOINT") + } else if config.Config.Server.ServerBrokerEndpoint != "" { + host = config.Config.Server.ServerBrokerEndpoint + } else if os.Getenv("BROKER_ENDPOINT") != "" { + host = os.Getenv("BROKER_ENDPOINT") + } else if config.Config.Server.Broker != "" { + host = config.Config.Server.Broker } else { - host = "ws://" + host + host += ":1883" // default + } + return host, strings.Contains(host, "wss") || strings.Contains(host, "ssl") || strings.Contains(host, "mqtts") +} + +// GetBrokerType - returns the type of MQ broker +func GetBrokerType() string { + if os.Getenv("BROKER_TYPE") != "" { + return os.Getenv("BROKER_TYPE") + } else { + return "mosquitto" } - return host + ":" + GetMQServerPort(), secure } // GetMasterKey - gets the configured master key of server @@ -325,17 +335,6 @@ func GetServer() string { return server } -// GetBroker - gets the broker name -func GetBroker() string { - server := "" - if os.Getenv("BROKER_NAME") != "" { - server = os.Getenv("BROKER_NAME") - } else if config.Config.Server.Broker != "" { - server = config.Config.Server.Broker - } - return server -} - func GetVerbosity() int32 { var verbosity = 0 var err error @@ -527,17 +526,6 @@ func GetAzureTenant() string { return azureTenant } -// GetMQServerPort - get mq port for server -func GetMQServerPort() string { - port := "1883" //default - if os.Getenv("MQ_SERVER_PORT") != "" { - port = os.Getenv("MQ_SERVER_PORT") - } else if config.Config.Server.MQServerPort != "" { - port = config.Config.Server.MQServerPort - } - return port -} - // GetMqPassword - fetches the MQ password func GetMqPassword() string { password := "" @@ -560,6 +548,11 @@ func GetMqUserName() string { return password } +// GetEmqxRestEndpoint - returns the REST API Endpoint of EMQX +func GetEmqxRestEndpoint() string { + return os.Getenv("EMQX_REST_ENDPOINT") +} + // IsBasicAuthEnabled - checks if basic auth has been configured to be turned off func IsBasicAuthEnabled() bool { var enabled = true //default