mirror of
https://github.com/gravitl/netmaker.git
synced 2025-10-06 11:56:39 +08:00
comment usage of emqx acls
This commit is contained in:
parent
6557ef1ebe
commit
8019f7a28e
3 changed files with 5 additions and 140 deletions
139
mq/emqx_cloud.go
139
mq/emqx_cloud.go
|
@ -22,13 +22,6 @@ type userCreateReq struct {
|
|||
Password string `json:"password"`
|
||||
}
|
||||
|
||||
type cloudAcl struct {
|
||||
UserName string `json:"username"`
|
||||
Topic string `json:"topic"`
|
||||
Action string `json:"action"`
|
||||
Access string `json:"access"`
|
||||
}
|
||||
|
||||
func (e *EmqxCloud) GetType() servercfg.Emqxdeploy { return servercfg.EmqxCloudDeploy }
|
||||
|
||||
func (e *EmqxCloud) CreateEmqxUser(username, pass string) error {
|
||||
|
@ -90,54 +83,6 @@ func (e *EmqxCloud) CreateEmqxUserforServer() error {
|
|||
return errors.New("request failed " + string(body))
|
||||
}
|
||||
return nil
|
||||
// add acls
|
||||
acls := []cloudAcl{
|
||||
{
|
||||
UserName: servercfg.GetMqUserName(),
|
||||
Topic: fmt.Sprintf("update/%s/#", servercfg.GetServer()),
|
||||
Access: "allow",
|
||||
Action: "sub",
|
||||
},
|
||||
{
|
||||
UserName: servercfg.GetMqUserName(),
|
||||
Topic: fmt.Sprintf("host/serverupdate/%s/#", servercfg.GetServer()),
|
||||
Access: "allow",
|
||||
Action: "sub",
|
||||
},
|
||||
{
|
||||
UserName: servercfg.GetMqUserName(),
|
||||
Topic: fmt.Sprintf("signal/%s/#", servercfg.GetServer()),
|
||||
Access: "allow",
|
||||
Action: "sub",
|
||||
},
|
||||
{
|
||||
UserName: servercfg.GetMqUserName(),
|
||||
Topic: fmt.Sprintf("metrics/%s/#", servercfg.GetServer()),
|
||||
Access: "allow",
|
||||
Action: "sub",
|
||||
},
|
||||
{
|
||||
UserName: servercfg.GetMqUserName(),
|
||||
Topic: "peers/host/#",
|
||||
Access: "allow",
|
||||
Action: "pub",
|
||||
},
|
||||
{
|
||||
UserName: servercfg.GetMqUserName(),
|
||||
Topic: "node/update/#",
|
||||
Access: "allow",
|
||||
Action: "pub",
|
||||
},
|
||||
{
|
||||
|
||||
UserName: servercfg.GetMqUserName(),
|
||||
Topic: "host/update/#",
|
||||
Access: "allow",
|
||||
Action: "pub",
|
||||
},
|
||||
}
|
||||
|
||||
return e.createacls(acls)
|
||||
}
|
||||
|
||||
func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore
|
||||
|
@ -148,97 +93,13 @@ func (e *EmqxCloud) CreateDefaultDenyRule() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *EmqxCloud) createacls(acls []cloudAcl) error {
|
||||
return nil
|
||||
payload, err := json.Marshal(acls)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client := &http.Client{}
|
||||
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/acl", e.URL), strings.NewReader(string(payload)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
req.SetBasicAuth(e.AppID, e.AppSecret)
|
||||
res, err := client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return errors.New("request failed " + string(body))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
|
||||
return nil
|
||||
acls := []cloudAcl{
|
||||
{
|
||||
UserName: hostID,
|
||||
Topic: fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
|
||||
Access: "allow",
|
||||
Action: "sub",
|
||||
},
|
||||
{
|
||||
UserName: hostID,
|
||||
Topic: fmt.Sprintf("host/update/%s/%s", hostID, serverName),
|
||||
Access: "allow",
|
||||
Action: "sub",
|
||||
},
|
||||
{
|
||||
UserName: hostID,
|
||||
Topic: fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
|
||||
Access: "allow",
|
||||
Action: "pub",
|
||||
},
|
||||
}
|
||||
|
||||
return e.createacls(acls)
|
||||
}
|
||||
|
||||
func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
|
||||
return nil
|
||||
acls := []cloudAcl{
|
||||
{
|
||||
UserName: hostID,
|
||||
Topic: fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
|
||||
Access: "allow",
|
||||
Action: "sub",
|
||||
},
|
||||
{
|
||||
UserName: hostID,
|
||||
Topic: fmt.Sprintf("ping/%s/%s", serverName, nodeID),
|
||||
Access: "allow",
|
||||
Action: "pubsub",
|
||||
},
|
||||
{
|
||||
UserName: hostID,
|
||||
Topic: fmt.Sprintf("update/%s/%s", serverName, nodeID),
|
||||
Access: "allow",
|
||||
Action: "pubsub",
|
||||
},
|
||||
{
|
||||
UserName: hostID,
|
||||
Topic: fmt.Sprintf("signal/%s/%s", serverName, nodeID),
|
||||
Access: "allow",
|
||||
Action: "pubsub",
|
||||
},
|
||||
{
|
||||
UserName: hostID,
|
||||
Topic: fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
|
||||
Access: "allow",
|
||||
Action: "pubsub",
|
||||
},
|
||||
}
|
||||
|
||||
return e.createacls(acls)
|
||||
}
|
||||
|
||||
func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list
|
||||
|
|
|
@ -248,6 +248,7 @@ func (e *EmqxOnPrem) CreateEmqxDefaultAuthorizer() error {
|
|||
|
||||
// GetUserACL - returns ACL rules by username
|
||||
func (e *EmqxOnPrem) GetUserACL(username string) (*aclObject, error) {
|
||||
return nil, nil
|
||||
token, err := getEmqxAuthToken()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -280,6 +281,7 @@ func (e *EmqxOnPrem) GetUserACL(username string) (*aclObject, error) {
|
|||
// CreateDefaultDenyRule - creates a rule to deny access to all topics for all users by default
|
||||
// to allow user access to topics use the `mq.CreateUserAccessRule` function
|
||||
func (e *EmqxOnPrem) CreateDefaultDenyRule() error {
|
||||
return nil
|
||||
token, err := getEmqxAuthToken()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -311,6 +313,7 @@ func (e *EmqxOnPrem) CreateDefaultDenyRule() error {
|
|||
|
||||
// CreateHostACL - create host ACL rules
|
||||
func (e *EmqxOnPrem) CreateHostACL(hostID, serverName string) error {
|
||||
return nil
|
||||
token, err := getEmqxAuthToken()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -365,6 +368,7 @@ var nodeAclMux sync.Mutex
|
|||
|
||||
// AppendNodeUpdateACL - adds ACL rule for subscribing to node updates for a node ID
|
||||
func (e *EmqxOnPrem) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
|
||||
return nil
|
||||
nodeAclMux.Lock()
|
||||
defer nodeAclMux.Unlock()
|
||||
token, err := getEmqxAuthToken()
|
||||
|
|
|
@ -92,7 +92,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
|
|||
}
|
||||
decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
|
||||
if decryptErr != nil {
|
||||
slog.Error("failed to decrypt message for host", "id", id, "error", decryptErr)
|
||||
slog.Error("failed to decrypt message for host", "id", id, "name", currentHost.Name, "error", decryptErr)
|
||||
return
|
||||
}
|
||||
var hostUpdate models.HostUpdate
|
||||
|
|
Loading…
Add table
Reference in a new issue