Merge pull request #2058 from gravitl/feature_emqx

Add emqx boilerplate
This commit is contained in:
dcarns 2023-02-28 08:41:39 -05:00 committed by GitHub
commit dbf6f1034b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 326 additions and 82 deletions

View file

@ -0,0 +1,83 @@
version: "3.4"
services:
netmaker:
container_name: netmaker
image: gravitl/netmaker:v0.18.2
restart: always
volumes:
- dnsconfig:/root/config/dnsconfig
- sqldata:/root/data
environment:
BROKER_ENDPOINT: "wss://broker.NETMAKER_BASE_DOMAIN/mqtt"
BROKER_TYPE: "emqx"
EMQX_REST_ENDPOINT: "http://mq:18083"
SERVER_NAME: "NETMAKER_BASE_DOMAIN"
STUN_DOMAIN: "stun.NETMAKER_BASE_DOMAIN"
SERVER_HOST: "SERVER_PUBLIC_IP"
SERVER_API_CONN_STRING: "api.NETMAKER_BASE_DOMAIN:443"
COREDNS_ADDR: "SERVER_PUBLIC_IP"
DNS_MODE: "on"
SERVER_HTTP_HOST: "api.NETMAKER_BASE_DOMAIN"
API_PORT: "8081"
MASTER_KEY: "REPLACE_MASTER_KEY"
CORS_ALLOWED_ORIGIN: "*"
DISPLAY_KEYS: "on"
DATABASE: "sqlite"
NODE_ID: "netmaker-server-1"
SERVER_BROKER_ENDPOINT: "ws://mq:8083/mqtt"
STUN_PORT: "3478"
VERBOSITY: "1"
MQ_PASSWORD: "REPLACE_MQ_PASSWORD"
MQ_USERNAME: "REPLACE_MQ_USERNAME"
ports:
- "3478:3478/udp"
netmaker-ui:
container_name: netmaker-ui
image: gravitl/netmaker-ui:v0.18.2
depends_on:
- netmaker
links:
- "netmaker:api"
restart: always
environment:
BACKEND_URL: "https://api.NETMAKER_BASE_DOMAIN"
caddy:
image: caddy:2.6.2
container_name: caddy
restart: unless-stopped
volumes:
- /root/Caddyfile:/etc/caddy/Caddyfile
- caddy_data:/data
- caddy_conf:/config
ports:
- "80:80"
- "443:443"
coredns:
container_name: coredns
image: coredns/coredns
command: -conf /root/dnsconfig/Corefile
depends_on:
- netmaker
restart: always
volumes:
- dnsconfig:/root/dnsconfig
mq:
container_name: mq
image: emqx/emqx:5.0.17
restart: unless-stopped
environment:
EMQX_NAME: "emqx"
EMQX_DASHBOARD__DEFAULT_PASSWORD: "REPLACE_MQ_PASSWORD"
EMQX_DASHBOARD__DEFAULT_USERNAME: "REPLACE_MQ_USERNAME"
ports:
- "1883:1883" # MQTT
- "8883:8883" # SSL MQTT
- "8083:8083" # Websockets
- "18083:18083" # Dashboard/REST_API
volumes:
caddy_data: {}
caddy_conf: {}
sqldata: {}
dnsconfig: {}
mosquitto_logs: {}

View file

@ -9,7 +9,9 @@ services:
- dnsconfig:/root/config/dnsconfig
- sqldata:/root/data
environment:
BROKER_NAME: "broker.NETMAKER_BASE_DOMAIN"
BROKER_ENDPOINT: "wss://broker.NETMAKER_BASE_DOMAIN/mqtt"
BROKER_TYPE: "emqx"
EMQX_REST_ENDPOINT: "http://mq:18083"
SERVER_NAME: "NETMAKER_BASE_DOMAIN"
STUN_DOMAIN: "stun.NETMAKER_BASE_DOMAIN"
SERVER_HOST: "SERVER_PUBLIC_IP"
@ -23,9 +25,7 @@ services:
DISPLAY_KEYS: "on"
DATABASE: "sqlite"
NODE_ID: "netmaker-server-1"
MQ_HOST: "mq"
MQ_PORT: "443"
MQ_SERVER_PORT: "1883"
SERVER_BROKER_ENDPOINT: "ws://mq:8083/mqtt"
MQ_USERNAME: "REPLACE_MQ_USERNAME"
MQ_PASSWORD: "REPLACE_MQ_PASSWORD"
STUN_PORT: "3478"
@ -67,21 +67,17 @@ services:
- dnsconfig:/root/dnsconfig
mq:
container_name: mq
image: eclipse-mosquitto:2.0.15-openssl
depends_on:
- netmaker
image: emqx/emqx:5.0.17
restart: unless-stopped
command: ["/mosquitto/config/wait.sh"]
environment:
MQ_PASSWORD: "REPLACE_MQ_PASSWORD"
MQ_USERNAME: "REPLACE_MQ_USERNAME"
volumes:
- /root/mosquitto.conf:/mosquitto/config/mosquitto.conf
- /root/wait.sh:/mosquitto/config/wait.sh
- mosquitto_logs:/mosquitto/log
EMQX_NAME: "emqx"
EMQX_DASHBOARD__DEFAULT_PASSWORD: "REPLACE_MQ_PASSWORD"
EMQX_DASHBOARD__DEFAULT_USERNAME: "REPLACE_MQ_USERNAME"
ports:
- "1883:1883"
- "8883:8883"
- "1883:1883" # MQTT
- "8883:8883" # SSL MQTT
- "8083:8083" # Websockets
- "18083:18083" # Dashboard/REST_API
prometheus:
container_name: prometheus
image: gravitl/netmaker-prometheus:latest
@ -115,9 +111,8 @@ services:
depends_on:
- netmaker
environment:
MQ_HOST: "mq"
MQ_PORT: "443"
MQ_SERVER_PORT: "1883"
SERVER_BROKER_ENDPOINT: "ws://mq:8083/mqtt"
BROKER_ENDPOINT: "wss://broker.NETMAKER_BASE_DOMAIN/mqtt"
PROMETHEUS: "on"
VERBOSITY: "1"
API_PORT: "8085"

View file

@ -10,7 +10,7 @@ services:
- sqldata:/root/data
- shared_certs:/etc/netmaker
environment: # Necessary capabilities to set iptables when running in container
BROKER_NAME: "broker.NETMAKER_BASE_DOMAIN" # The domain/host IP indicating the mq broker address
BROKER_ENDPOINT: "wss://broker.NETMAKER_BASE_DOMAIN" # The domain/host IP indicating the mq broker address
SERVER_NAME: "NETMAKER_BASE_DOMAIN" # The base domain of netmaker
SERVER_HOST: "SERVER_PUBLIC_IP" # Set to public IP of machine.
SERVER_HTTP_HOST: "api.NETMAKER_BASE_DOMAIN" # Overrides SERVER_HOST if set. Useful for making HTTP available via different interfaces/networks.
@ -26,9 +26,7 @@ services:
DISPLAY_KEYS: "on" # Show keys permanently in UI (until deleted) as opposed to 1-time display.
DATABASE: "sqlite" # Database to use - sqlite, postgres, or rqlite
NODE_ID: "netmaker-server-1" # used for HA - identifies this server vs other servers
MQ_HOST: "mq" # the address of the mq server. If running from docker compose it will be "mq". Otherwise, need to input address. If using "host networking", it will find and detect the IP of the mq container.
MQ_PORT: "443" # the reachable port of MQ - change if external MQ port changes (port on proxy, not necessarily the one exposed in docker-compose)
MQ_SERVER_PORT: "1883" # the reachable port of MQ by the server - change if internal MQ port changes (or use external port if MQ is not on the same machine)
SERVER_BROKER_ENDPOINT: ""ws://mq:1883"" # the address of the mq server. If running from docker compose it will be "mq". Otherwise, need to input address. If using "host networking", it will find and detect the IP of the mq container.
MQ_USERNAME: "REPLACE_MQ_USERNAME" # the username to set for MQ access
MQ_PASSWORD: "REPLACE_MQ_PASSWORD" # the password to set for MQ access
STUN_PORT: "3478" # the reachable port of STUN on the server

View file

@ -9,7 +9,7 @@ services:
- dnsconfig:/root/config/dnsconfig
- sqldata:/root/data
environment:
BROKER_NAME: "broker.NETMAKER_BASE_DOMAIN"
BROKER_ENDPOINT: "wss://broker.NETMAKER_BASE_DOMAIN"
SERVER_NAME: "NETMAKER_BASE_DOMAIN"
STUN_DOMAIN: "stun.NETMAKER_BASE_DOMAIN"
SERVER_HOST: "SERVER_PUBLIC_IP"
@ -23,9 +23,7 @@ services:
DISPLAY_KEYS: "on"
DATABASE: "sqlite"
NODE_ID: "netmaker-server-1"
MQ_HOST: "mq"
MQ_PORT: "443"
MQ_SERVER_PORT: "1883"
SERVER_BROKER_ENDPOINT: "ws://mq:1883"
STUN_PORT: "3478"
VERBOSITY: "1"
MQ_PASSWORD: "REPLACE_MQ_PASSWORD"

View file

@ -36,7 +36,10 @@ type ServerConfig struct {
APIConnString string `yaml:"apiconn"`
APIHost string `yaml:"apihost"`
APIPort string `yaml:"apiport"`
MQHOST string `yaml:"mqhost"`
Broker string `yam:"broker"`
ServerBrokerEndpoint string `yaml:"serverbrokerendpoint"`
BrokerType string `yaml:"brokertype"`
EmqxRestEndpoint string `yaml:"emqxrestendpoint"`
MasterKey string `yaml:"masterkey"`
DNSKey string `yaml:"dnskey"`
AllowedOrigin string `yaml:"allowedorigin"`
@ -59,10 +62,7 @@ type ServerConfig struct {
AzureTenant string `yaml:"azuretenant"`
Telemetry string `yaml:"telemetry"`
HostNetwork string `yaml:"hostnetwork"`
MQPort string `yaml:"mqport"`
MQServerPort string `yaml:"mqserverport"`
Server string `yaml:"server"`
Broker string `yam:"broker"`
PublicIPService string `yaml:"publicipservice"`
MQPassword string `yaml:"mqpassword"`
MQUserName string `yaml:"mqusername"`

View file

@ -564,6 +564,13 @@ func createNode(w http.ResponseWriter, r *http.Request) {
data.Node.Server = servercfg.GetServer()
if !logic.HostExists(&data.Host) {
logic.CheckHostPorts(&data.Host)
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
// create EMQX credentials for host if it doesn't exists
if err := mq.CreateEmqxUser(data.Host.ID.String(), data.Host.HostPass, false); err != nil {
logger.Log(0, "failed to add host credentials to EMQX: ", data.Host.ID.String(), err.Error())
return
}
}
}
if err := logic.CreateHost(&data.Host); err != nil {
if errors.Is(err, logic.ErrHostExists) {
@ -589,7 +596,6 @@ func createNode(w http.ResponseWriter, r *http.Request) {
return
}
}
err = logic.AssociateNodeToHost(&data.Node, &data.Host)
if err != nil {
logger.Log(0, r.Header.Get("user"),

View file

@ -38,5 +38,5 @@ https://stun.NETMAKER_BASE_DOMAIN {
# MQ
wss://broker.NETMAKER_BASE_DOMAIN {
reverse_proxy ws://mq:8883
reverse_proxy ws://mq:8883 # For EMQX websockets use `reverse_proxy ws://mq:8083`
}

View file

@ -71,12 +71,10 @@ spec:
value: REPLACE_MASTER_KEY
- name: CORS_ALLOWED_ORIGIN
value: '*'
- name: MQ_HOST
value: "mq"
- name: MQ_PORT
value: "443"
- name: MQ_SERVER_PORT
value: "1883"
- name: SERVER_BROKER_ENDPOINT
value: "ws://mq:1883"
- name: BROKER_ENDPOINT
value: "wss://broker.NETMAKER_BASE_DOMAIN"
- name: PLATFORM
value: "Kubernetes"
- name: VERBOSITY

154
mq/emqx.go Normal file
View file

@ -0,0 +1,154 @@
package mq
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"github.com/gravitl/netmaker/servercfg"
)
type (
emqxUser struct {
UserID string `json:"user_id"`
Password string `json:"password"`
Admin bool `json:"is_superuser"`
}
emqxLogin struct {
Username string `json:"username"`
Password string `json:"password"`
}
emqxLoginResponse struct {
License struct {
Edition string `json:"edition"`
} `json:"license"`
Token string `json:"token"`
Version string `json:"version"`
}
)
func getEmqxAuthToken() (string, error) {
payload, err := json.Marshal(&emqxLogin{
Username: servercfg.GetMqUserName(),
Password: servercfg.GetMqPassword(),
})
if err != nil {
return "", err
}
resp, err := http.Post(servercfg.GetEmqxRestEndpoint()+"/api/v5/login", "application/json", bytes.NewReader(payload))
if err != nil {
return "", err
}
msg, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("error during EMQX login %v", string(msg))
}
var loginResp emqxLoginResponse
if err := json.Unmarshal(msg, &loginResp); err != nil {
return "", err
}
return loginResp.Token, nil
}
// CreateEmqxUser - creates an EMQX user
func CreateEmqxUser(username, password string, admin bool) error {
token, err := getEmqxAuthToken()
if err != nil {
return err
}
payload, err := json.Marshal(&emqxUser{
UserID: username,
Password: password,
Admin: admin,
})
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users", bytes.NewReader(payload))
if err != nil {
return err
}
req.Header.Add("content-type", "application/json")
req.Header.Add("authorization", "Bearer "+token)
resp, err := (&http.Client{}).Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
msg, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("error creating EMQX user %v", string(msg))
}
return nil
}
// DeleteEmqxUser - deletes an EMQX user
func DeleteEmqxUser(username string) error {
token, err := getEmqxAuthToken()
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodDelete, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users/"+username, nil)
if err != nil {
return err
}
req.Header.Add("authorization", "Bearer "+token)
resp, err := (&http.Client{}).Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
msg, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("error deleting EMQX user %v", string(msg))
}
return nil
}
// CreateEmqxDefaultAuthenticator - creates a default authenticator based on password and using EMQX's built in database as storage
func CreateEmqxDefaultAuthenticator() error {
token, err := getEmqxAuthToken()
if err != nil {
return err
}
payload, err := json.Marshal(&struct {
Mechanism string `json:"mechanism"`
Backend string `json:"backend"`
UserIDType string `json:"user_id_type"`
}{Mechanism: "password_based", Backend: "built_in_database", UserIDType: "username"})
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication", bytes.NewReader(payload))
if err != nil {
return err
}
req.Header.Add("content-type", "application/json")
req.Header.Add("authorization", "Bearer "+token)
resp, err := (&http.Client{}).Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
msg, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("error creating default EMQX authenticator %v", string(msg))
}
return nil
}

View file

@ -152,6 +152,13 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
return
}
case models.DeleteHost:
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
// delete EMQX credentials for host
if err := DeleteEmqxUser(currentHost.ID.String()); err != nil {
logger.Log(0, "failed to remove host credentials from EMQX: ", currentHost.ID.String(), err.Error())
return
}
}
if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil {
logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error())
return

View file

@ -2,6 +2,7 @@ package mq
import (
"context"
"log"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
@ -38,6 +39,17 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
// SetupMQTT creates a connection to broker and return client
func SetupMQTT() {
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
time.Sleep(10 * time.Second) // wait for the REST endpoint to be ready
// setup authenticator and create admin user
if err := CreateEmqxDefaultAuthenticator(); err != nil {
logger.Log(0, err.Error())
}
DeleteEmqxUser(servercfg.GetMqUserName())
if err := CreateEmqxUser(servercfg.GetMqUserName(), servercfg.GetMqPassword(), true); err != nil {
log.Fatal(err)
}
}
opts := mqtt.NewClientOptions()
setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts)
opts.SetOnConnectHandler(func(client mqtt.Client) {

View file

@ -13,6 +13,9 @@ import (
"github.com/gravitl/netmaker/models"
)
// EmqxBrokerType denotes the broker type for EMQX MQTT
const EmqxBrokerType = "emqx"
var (
Version = "dev"
Is_EE = false
@ -35,7 +38,6 @@ func GetServerConfig() config.ServerConfig {
cfg.CoreDNSAddr = GetCoreDNSAddr()
cfg.APIHost = GetAPIHost()
cfg.APIPort = GetAPIPort()
cfg.MQPort = GetMQPort()
cfg.MasterKey = "(hidden)"
cfg.DNSKey = "(hidden)"
cfg.AllowedOrigin = GetAllowedOrigin()
@ -43,6 +45,8 @@ func GetServerConfig() config.ServerConfig {
cfg.NodeID = GetNodeID()
cfg.StunHost = GetStunAddr()
cfg.StunPort = GetStunPort()
cfg.BrokerType = GetBrokerType()
cfg.EmqxRestEndpoint = GetEmqxRestEndpoint()
if IsRestBackend() {
cfg.RestBackend = "on"
}
@ -83,14 +87,13 @@ func GetServerConfig() config.ServerConfig {
func GetServerInfo() models.ServerConfig {
var cfg models.ServerConfig
cfg.Server = GetServer()
cfg.Broker = GetBroker()
cfg.MQUserName = GetMqUserName()
cfg.MQPassword = GetMqPassword()
cfg.API = GetAPIConnString()
cfg.CoreDNSAddr = GetCoreDNSAddr()
cfg.APIPort = GetAPIPort()
cfg.MQPort = GetMQPort()
cfg.DNSMode = "off"
cfg.Broker = GetPublicBrokerEndpoint()
if IsDNSMode() {
cfg.DNSMode = "on"
}
@ -196,32 +199,39 @@ func GetCoreDNSAddr() string {
return addr
}
// GetMQPort - gets the mq port
func GetMQPort() string {
port := "8883" //default
if os.Getenv("MQ_PORT") != "" {
port = os.Getenv("MQ_PORT")
} else if config.Config.Server.MQPort != "" {
port = config.Config.Server.MQPort
// GetPublicBrokerEndpoint - returns the public broker endpoint which shall be used by netclient
func GetPublicBrokerEndpoint() string {
if os.Getenv("BROKER_ENDPOINT") != "" {
return os.Getenv("BROKER_ENDPOINT")
} else {
return config.Config.Server.Broker
}
return port
}
// GetMessageQueueEndpoint - gets the message queue endpoint
func GetMessageQueueEndpoint() (string, bool) {
host, _ := GetPublicIP()
if os.Getenv("MQ_HOST") != "" {
host = os.Getenv("MQ_HOST")
} else if config.Config.Server.MQHOST != "" {
host = config.Config.Server.MQHOST
}
secure := strings.Contains(host, "wss") || strings.Contains(host, "ssl")
if secure {
host = "wss://" + host
if os.Getenv("SERVER_BROKER_ENDPOINT") != "" {
host = os.Getenv("SERVER_BROKER_ENDPOINT")
} else if config.Config.Server.ServerBrokerEndpoint != "" {
host = config.Config.Server.ServerBrokerEndpoint
} else if os.Getenv("BROKER_ENDPOINT") != "" {
host = os.Getenv("BROKER_ENDPOINT")
} else if config.Config.Server.Broker != "" {
host = config.Config.Server.Broker
} else {
host = "ws://" + host
host += ":1883" // default
}
return host, strings.Contains(host, "wss") || strings.Contains(host, "ssl") || strings.Contains(host, "mqtts")
}
// GetBrokerType - returns the type of MQ broker
func GetBrokerType() string {
if os.Getenv("BROKER_TYPE") != "" {
return os.Getenv("BROKER_TYPE")
} else {
return "mosquitto"
}
return host + ":" + GetMQServerPort(), secure
}
// GetMasterKey - gets the configured master key of server
@ -325,17 +335,6 @@ func GetServer() string {
return server
}
// GetBroker - gets the broker name
func GetBroker() string {
server := ""
if os.Getenv("BROKER_NAME") != "" {
server = os.Getenv("BROKER_NAME")
} else if config.Config.Server.Broker != "" {
server = config.Config.Server.Broker
}
return server
}
func GetVerbosity() int32 {
var verbosity = 0
var err error
@ -527,17 +526,6 @@ func GetAzureTenant() string {
return azureTenant
}
// GetMQServerPort - get mq port for server
func GetMQServerPort() string {
port := "1883" //default
if os.Getenv("MQ_SERVER_PORT") != "" {
port = os.Getenv("MQ_SERVER_PORT")
} else if config.Config.Server.MQServerPort != "" {
port = config.Config.Server.MQServerPort
}
return port
}
// GetMqPassword - fetches the MQ password
func GetMqPassword() string {
password := ""
@ -560,6 +548,11 @@ func GetMqUserName() string {
return password
}
// GetEmqxRestEndpoint - returns the REST API Endpoint of EMQX
func GetEmqxRestEndpoint() string {
return os.Getenv("EMQX_REST_ENDPOINT")
}
// IsBasicAuthEnabled - checks if basic auth has been configured to be turned off
func IsBasicAuthEnabled() bool {
var enabled = true //default