From 8019f7a28e50da8a164c7bd461926d38ae5f5b09 Mon Sep 17 00:00:00 2001 From: abhishek9686 Date: Wed, 10 Apr 2024 12:08:26 +0530 Subject: [PATCH] comment usage of emqx acls --- mq/emqx_cloud.go | 139 --------------------------------------------- mq/emqx_on_prem.go | 4 ++ mq/handlers.go | 2 +- 3 files changed, 5 insertions(+), 140 deletions(-) diff --git a/mq/emqx_cloud.go b/mq/emqx_cloud.go index c30da9d5..23bfb244 100644 --- a/mq/emqx_cloud.go +++ b/mq/emqx_cloud.go @@ -22,13 +22,6 @@ type userCreateReq struct { Password string `json:"password"` } -type cloudAcl struct { - UserName string `json:"username"` - Topic string `json:"topic"` - Action string `json:"action"` - Access string `json:"access"` -} - func (e *EmqxCloud) GetType() servercfg.Emqxdeploy { return servercfg.EmqxCloudDeploy } func (e *EmqxCloud) CreateEmqxUser(username, pass string) error { @@ -90,54 +83,6 @@ func (e *EmqxCloud) CreateEmqxUserforServer() error { return errors.New("request failed " + string(body)) } return nil - // add acls - acls := []cloudAcl{ - { - UserName: servercfg.GetMqUserName(), - Topic: fmt.Sprintf("update/%s/#", servercfg.GetServer()), - Access: "allow", - Action: "sub", - }, - { - UserName: servercfg.GetMqUserName(), - Topic: fmt.Sprintf("host/serverupdate/%s/#", servercfg.GetServer()), - Access: "allow", - Action: "sub", - }, - { - UserName: servercfg.GetMqUserName(), - Topic: fmt.Sprintf("signal/%s/#", servercfg.GetServer()), - Access: "allow", - Action: "sub", - }, - { - UserName: servercfg.GetMqUserName(), - Topic: fmt.Sprintf("metrics/%s/#", servercfg.GetServer()), - Access: "allow", - Action: "sub", - }, - { - UserName: servercfg.GetMqUserName(), - Topic: "peers/host/#", - Access: "allow", - Action: "pub", - }, - { - UserName: servercfg.GetMqUserName(), - Topic: "node/update/#", - Access: "allow", - Action: "pub", - }, - { - - UserName: servercfg.GetMqUserName(), - Topic: "host/update/#", - Access: "allow", - Action: "pub", - }, - } - - return e.createacls(acls) } func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore @@ -148,97 +93,13 @@ func (e *EmqxCloud) CreateDefaultDenyRule() error { return nil } -func (e *EmqxCloud) createacls(acls []cloudAcl) error { - return nil - payload, err := json.Marshal(acls) - if err != nil { - return err - } - client := &http.Client{} - req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/acl", e.URL), strings.NewReader(string(payload))) - if err != nil { - return err - } - req.Header.Add("Content-Type", "application/json") - req.SetBasicAuth(e.AppID, e.AppSecret) - res, err := client.Do(req) - if err != nil { - return err - } - defer res.Body.Close() - - body, err := io.ReadAll(res.Body) - if err != nil { - return err - } - if res.StatusCode != http.StatusOK { - return errors.New("request failed " + string(body)) - } - return nil -} - func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error { return nil - acls := []cloudAcl{ - { - UserName: hostID, - Topic: fmt.Sprintf("peers/host/%s/%s", hostID, serverName), - Access: "allow", - Action: "sub", - }, - { - UserName: hostID, - Topic: fmt.Sprintf("host/update/%s/%s", hostID, serverName), - Access: "allow", - Action: "sub", - }, - { - UserName: hostID, - Topic: fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID), - Access: "allow", - Action: "pub", - }, - } - - return e.createacls(acls) } func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error { return nil - acls := []cloudAcl{ - { - UserName: hostID, - Topic: fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID), - Access: "allow", - Action: "sub", - }, - { - UserName: hostID, - Topic: fmt.Sprintf("ping/%s/%s", serverName, nodeID), - Access: "allow", - Action: "pubsub", - }, - { - UserName: hostID, - Topic: fmt.Sprintf("update/%s/%s", serverName, nodeID), - Access: "allow", - Action: "pubsub", - }, - { - UserName: hostID, - Topic: fmt.Sprintf("signal/%s/%s", serverName, nodeID), - Access: "allow", - Action: "pubsub", - }, - { - UserName: hostID, - Topic: fmt.Sprintf("metrics/%s/%s", serverName, nodeID), - Access: "allow", - Action: "pubsub", - }, - } - return e.createacls(acls) } func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list diff --git a/mq/emqx_on_prem.go b/mq/emqx_on_prem.go index f116f0a8..49cff77e 100644 --- a/mq/emqx_on_prem.go +++ b/mq/emqx_on_prem.go @@ -248,6 +248,7 @@ func (e *EmqxOnPrem) CreateEmqxDefaultAuthorizer() error { // GetUserACL - returns ACL rules by username func (e *EmqxOnPrem) GetUserACL(username string) (*aclObject, error) { + return nil, nil token, err := getEmqxAuthToken() if err != nil { return nil, err @@ -280,6 +281,7 @@ func (e *EmqxOnPrem) GetUserACL(username string) (*aclObject, error) { // CreateDefaultDenyRule - creates a rule to deny access to all topics for all users by default // to allow user access to topics use the `mq.CreateUserAccessRule` function func (e *EmqxOnPrem) CreateDefaultDenyRule() error { + return nil token, err := getEmqxAuthToken() if err != nil { return err @@ -311,6 +313,7 @@ func (e *EmqxOnPrem) CreateDefaultDenyRule() error { // CreateHostACL - create host ACL rules func (e *EmqxOnPrem) CreateHostACL(hostID, serverName string) error { + return nil token, err := getEmqxAuthToken() if err != nil { return err @@ -365,6 +368,7 @@ var nodeAclMux sync.Mutex // AppendNodeUpdateACL - adds ACL rule for subscribing to node updates for a node ID func (e *EmqxOnPrem) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error { + return nil nodeAclMux.Lock() defer nodeAclMux.Unlock() token, err := getEmqxAuthToken() diff --git a/mq/handlers.go b/mq/handlers.go index 528e5d07..c8ce3801 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -92,7 +92,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) { } decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload()) if decryptErr != nil { - slog.Error("failed to decrypt message for host", "id", id, "error", decryptErr) + slog.Error("failed to decrypt message for host", "id", id, "name", currentHost.Name, "error", decryptErr) return } var hostUpdate models.HostUpdate