mirror of
				https://github.com/gravitl/netmaker.git
				synced 2025-10-31 16:43:07 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			428 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			428 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package mq
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"net/http"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/gravitl/netmaker/servercfg"
 | |
| )
 | |
| 
 | |
| type EmqxOnPrem struct {
 | |
| 	URL      string
 | |
| 	UserName string
 | |
| 	Password string
 | |
| }
 | |
| 
 | |
| const already_exists = "ALREADY_EXISTS"
 | |
| 
 | |
| type (
 | |
| 	emqxUser struct {
 | |
| 		UserID   string `json:"user_id"`
 | |
| 		Password string `json:"password"`
 | |
| 		Admin    bool   `json:"is_superuser"`
 | |
| 	}
 | |
| 
 | |
| 	emqxLogin struct {
 | |
| 		Username string `json:"username"`
 | |
| 		Password string `json:"password"`
 | |
| 	}
 | |
| 
 | |
| 	emqxLoginResponse struct {
 | |
| 		License struct {
 | |
| 			Edition string `json:"edition"`
 | |
| 		} `json:"license"`
 | |
| 		Token   string `json:"token"`
 | |
| 		Version string `json:"version"`
 | |
| 	}
 | |
| 
 | |
| 	aclRule struct {
 | |
| 		Topic      string `json:"topic"`
 | |
| 		Permission string `json:"permission"`
 | |
| 		Action     string `json:"action"`
 | |
| 	}
 | |
| 
 | |
| 	aclObject struct {
 | |
| 		Rules    []aclRule `json:"rules"`
 | |
| 		Username string    `json:"username,omitempty"`
 | |
| 	}
 | |
| )
 | |
| 
 | |
| func getEmqxAuthToken() (string, error) {
 | |
| 	payload, err := json.Marshal(&emqxLogin{
 | |
| 		Username: servercfg.GetMqUserName(),
 | |
| 		Password: servercfg.GetMqPassword(),
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	resp, err := http.Post(servercfg.GetEmqxRestEndpoint()+"/api/v5/login", "application/json", bytes.NewReader(payload))
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	msg, err := io.ReadAll(resp.Body)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	if resp.StatusCode != http.StatusOK {
 | |
| 		return "", fmt.Errorf("error during EMQX login %v", string(msg))
 | |
| 	}
 | |
| 	var loginResp emqxLoginResponse
 | |
| 	if err := json.Unmarshal(msg, &loginResp); err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	return loginResp.Token, nil
 | |
| }
 | |
| 
 | |
| func (e *EmqxOnPrem) GetType() servercfg.Emqxdeploy { return servercfg.EmqxOnPremDeploy }
 | |
| 
 | |
| // CreateEmqxUser - creates an EMQX user
 | |
| func (e *EmqxOnPrem) CreateEmqxUser(username, password string) error {
 | |
| 	token, err := getEmqxAuthToken()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	payload, err := json.Marshal(&emqxUser{
 | |
| 		UserID:   username,
 | |
| 		Password: password,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users", bytes.NewReader(payload))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req.Header.Add("content-type", "application/json")
 | |
| 	req.Header.Add("authorization", "Bearer "+token)
 | |
| 	resp, err := (&http.Client{}).Do(req)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 	if resp.StatusCode >= 300 {
 | |
| 		msg, err := io.ReadAll(resp.Body)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		if !strings.Contains(string(msg), already_exists) {
 | |
| 			return fmt.Errorf("error creating EMQX user %v", string(msg))
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| func (e *EmqxOnPrem) CreateEmqxUserforServer() error {
 | |
| 	token, err := getEmqxAuthToken()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	payload, err := json.Marshal(&emqxUser{
 | |
| 		UserID:   servercfg.GetMqUserName(),
 | |
| 		Password: servercfg.GetMqPassword(),
 | |
| 		Admin:    true,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users", bytes.NewReader(payload))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req.Header.Add("content-type", "application/json")
 | |
| 	req.Header.Add("authorization", "Bearer "+token)
 | |
| 	resp, err := (&http.Client{}).Do(req)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 	if resp.StatusCode >= 300 {
 | |
| 		msg, err := io.ReadAll(resp.Body)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		if !strings.Contains(string(msg), already_exists) {
 | |
| 			return fmt.Errorf("error creating EMQX user %v", string(msg))
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // DeleteEmqxUser - deletes an EMQX user
 | |
| func (e *EmqxOnPrem) DeleteEmqxUser(username string) error {
 | |
| 	token, err := getEmqxAuthToken()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req, err := http.NewRequest(http.MethodDelete, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users/"+username, nil)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req.Header.Add("authorization", "Bearer "+token)
 | |
| 	resp, err := (&http.Client{}).Do(req)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 	if resp.StatusCode >= 300 {
 | |
| 		msg, err := io.ReadAll(resp.Body)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		return fmt.Errorf("error deleting EMQX user %v", string(msg))
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // CreateEmqxDefaultAuthenticator - creates a default authenticator based on password and using EMQX's built in database as storage
 | |
| func (e *EmqxOnPrem) CreateEmqxDefaultAuthenticator() error {
 | |
| 	token, err := getEmqxAuthToken()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	payload, err := json.Marshal(&struct {
 | |
| 		Mechanism  string `json:"mechanism"`
 | |
| 		Backend    string `json:"backend"`
 | |
| 		UserIDType string `json:"user_id_type"`
 | |
| 	}{Mechanism: "password_based", Backend: "built_in_database", UserIDType: "username"})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication", bytes.NewReader(payload))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req.Header.Add("content-type", "application/json")
 | |
| 	req.Header.Add("authorization", "Bearer "+token)
 | |
| 	resp, err := (&http.Client{}).Do(req)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 	if resp.StatusCode != http.StatusOK {
 | |
| 		msg, err := io.ReadAll(resp.Body)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		return fmt.Errorf("error creating default EMQX authenticator %v", string(msg))
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // CreateEmqxDefaultAuthorizer - creates a default ACL authorization mechanism based on the built in database
 | |
| func (e *EmqxOnPrem) CreateEmqxDefaultAuthorizer() error {
 | |
| 	token, err := getEmqxAuthToken()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	payload, err := json.Marshal(&struct {
 | |
| 		Enable bool   `json:"enable"`
 | |
| 		Type   string `json:"type"`
 | |
| 	}{Enable: true, Type: "built_in_database"})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources", bytes.NewReader(payload))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req.Header.Add("content-type", "application/json")
 | |
| 	req.Header.Add("authorization", "Bearer "+token)
 | |
| 	resp, err := (&http.Client{}).Do(req)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 	if resp.StatusCode != http.StatusNoContent {
 | |
| 		msg, err := io.ReadAll(resp.Body)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		return fmt.Errorf("error creating default EMQX ACL authorization mechanism %v", string(msg))
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetUserACL - returns ACL rules by username
 | |
| func (e *EmqxOnPrem) GetUserACL(username string) (*aclObject, error) {
 | |
| 	token, err := getEmqxAuthToken()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	req, err := http.NewRequest(http.MethodGet, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+username, nil)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	req.Header.Add("content-type", "application/json")
 | |
| 	req.Header.Add("authorization", "Bearer "+token)
 | |
| 	resp, err := (&http.Client{}).Do(req)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 	response, err := io.ReadAll(resp.Body)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if resp.StatusCode != http.StatusOK {
 | |
| 		return nil, fmt.Errorf("error fetching ACL rules %v", string(response))
 | |
| 	}
 | |
| 	body := new(aclObject)
 | |
| 	if err := json.Unmarshal(response, body); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return body, nil
 | |
| }
 | |
| 
 | |
| // CreateDefaultDenyRule - creates a rule to deny access to all topics for all users by default
 | |
| // to allow user access to topics use the `mq.CreateUserAccessRule` function
 | |
| func (e *EmqxOnPrem) CreateDefaultDenyRule() error {
 | |
| 	token, err := getEmqxAuthToken()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	payload, err := json.Marshal(&aclObject{Rules: []aclRule{{Topic: "#", Permission: "deny", Action: "all"}}})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/all", bytes.NewReader(payload))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req.Header.Add("content-type", "application/json")
 | |
| 	req.Header.Add("authorization", "Bearer "+token)
 | |
| 	resp, err := (&http.Client{}).Do(req)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 	if resp.StatusCode != http.StatusNoContent {
 | |
| 		msg, err := io.ReadAll(resp.Body)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		return fmt.Errorf("error creating default ACL rules %v", string(msg))
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // CreateHostACL - create host ACL rules
 | |
| func (e *EmqxOnPrem) CreateHostACL(hostID, serverName string) error {
 | |
| 	token, err := getEmqxAuthToken()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	payload, err := json.Marshal(&aclObject{
 | |
| 		Username: hostID,
 | |
| 		Rules: []aclRule{
 | |
| 			{
 | |
| 				Topic:      fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
 | |
| 				Permission: "allow",
 | |
| 				Action:     "all",
 | |
| 			},
 | |
| 			{
 | |
| 				Topic:      fmt.Sprintf("host/update/%s/%s", hostID, serverName),
 | |
| 				Permission: "allow",
 | |
| 				Action:     "all",
 | |
| 			},
 | |
| 			{
 | |
| 				Topic:      fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
 | |
| 				Permission: "allow",
 | |
| 				Action:     "all",
 | |
| 			},
 | |
| 		},
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req.Header.Add("content-type", "application/json")
 | |
| 	req.Header.Add("authorization", "Bearer "+token)
 | |
| 	resp, err := (&http.Client{}).Do(req)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 	if resp.StatusCode != http.StatusNoContent {
 | |
| 		msg, err := io.ReadAll(resp.Body)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // a lock required for preventing simultaneous updates to the same ACL object leading to overwriting each other
 | |
| // might occur when multiple nodes belonging to the same host are created at the same time
 | |
| var nodeAclMux sync.Mutex
 | |
| 
 | |
| // AppendNodeUpdateACL - adds ACL rule for subscribing to node updates for a node ID
 | |
| func (e *EmqxOnPrem) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
 | |
| 	nodeAclMux.Lock()
 | |
| 	defer nodeAclMux.Unlock()
 | |
| 	token, err := getEmqxAuthToken()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	aclObject, err := emqx.GetUserACL(hostID)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	aclObject.Rules = append(aclObject.Rules, []aclRule{
 | |
| 		{
 | |
| 			Topic:      fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
 | |
| 			Permission: "allow",
 | |
| 			Action:     "subscribe",
 | |
| 		},
 | |
| 		{
 | |
| 			Topic:      fmt.Sprintf("ping/%s/%s", serverName, nodeID),
 | |
| 			Permission: "allow",
 | |
| 			Action:     "all",
 | |
| 		},
 | |
| 		{
 | |
| 			Topic:      fmt.Sprintf("update/%s/%s", serverName, nodeID),
 | |
| 			Permission: "allow",
 | |
| 			Action:     "all",
 | |
| 		},
 | |
| 		{
 | |
| 			Topic:      fmt.Sprintf("signal/%s/%s", serverName, nodeID),
 | |
| 			Permission: "allow",
 | |
| 			Action:     "all",
 | |
| 		},
 | |
| 		{
 | |
| 			Topic:      fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
 | |
| 			Permission: "allow",
 | |
| 			Action:     "all",
 | |
| 		},
 | |
| 	}...)
 | |
| 	payload, err := json.Marshal(aclObject)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req.Header.Add("content-type", "application/json")
 | |
| 	req.Header.Add("authorization", "Bearer "+token)
 | |
| 	resp, err := (&http.Client{}).Do(req)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 	if resp.StatusCode != http.StatusNoContent {
 | |
| 		msg, err := io.ReadAll(resp.Body)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |