diff --git a/controllers/hosts.go b/controllers/hosts.go index 3e624d6b..a4a064d0 100644 --- a/controllers/hosts.go +++ b/controllers/hosts.go @@ -108,7 +108,7 @@ func updateHost(w http.ResponseWriter, r *http.Request) { logger.Log(0, r.Header.Get("user"), "failed to update host networks roles in DynSec:", err.Error()) } } - + // TODO: publish host update through MQ go func() { if err := mq.PublishPeerUpdate(); err != nil { logger.Log(0, "fail to publish peer update: ", err.Error()) @@ -148,6 +148,7 @@ func deleteHost(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) return } + // TODO: publish host update with delete action using MQ if err = mq.DeleteMqClient(currHost.ID.String()); err != nil { logger.Log(0, "error removing DynSec credentials for host:", currHost.Name, err.Error()) diff --git a/models/host.go b/models/host.go index da24dbe2..223ca749 100644 --- a/models/host.go +++ b/models/host.go @@ -64,3 +64,22 @@ func ParseBool(s string) bool { } return b } + +// HostMqAction - type for host update action +type HostMqAction string + +const ( + // UpdateHost - constant for host update action + UpdateHost = "UPDATE_HOST" + // DeleteHost - constant for host delete action + DeleteHost = "DELETE_HOST" + // JoinHostToNetwork - constant for host network join action + JoinHostToNetwork = "JOIN_HOST_TO_NETWORK" +) + +// HostUpdate - struct for host update +type HostUpdate struct { + Action HostMqAction + Host Host + Node Node +} diff --git a/mq/dynsec_helper.go b/mq/dynsec_helper.go index cd4767ed..f854dc90 100644 --- a/mq/dynsec_helper.go +++ b/mq/dynsec_helper.go @@ -180,7 +180,13 @@ func fetchHostAcls(hostID string) []Acl { }, { AclType: "publishClientReceive", - Topic: fmt.Sprintf("host/update/%s", hostID), + Topic: fmt.Sprintf("host/update/%s/#", hostID), + Priority: -1, + Allow: true, + }, + { + AclType: "publishClientSend", + Topic: fmt.Sprintf("host/serverupdate/%s", hostID), Priority: -1, Allow: true, }, @@ -323,6 +329,12 @@ func fetchServerAcls() []Acl { Priority: -1, Allow: true, }, + { + AclType: "publishClientSend", + Topic: "host/update/#", + Priority: -1, + Allow: true, + }, { AclType: "publishClientReceive", Topic: "ping/#", @@ -361,7 +373,7 @@ func fetchServerAcls() []Acl { }, { AclType: "publishClientReceive", - Topic: "host/update/#", + Topic: "host/serverupdate/#", Priority: -1, Allow: true, }, diff --git a/mq/handlers.go b/mq/handlers.go index 1faeeb72..34490b79 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -118,43 +118,44 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) { }() } -// UpdateHost message Handler -- handles updates from client hosts +// UpdateHost message Handler -- handles host updates from clients func UpdateHost(client mqtt.Client, msg mqtt.Message) { - go func() { - id, err := getID(msg.Topic()) + go func(msg mqtt.Message) { + id, err := getHostID(msg.Topic()) if err != nil { logger.Log(1, "error getting host.ID sent on ", msg.Topic(), err.Error()) return } currentHost, err := logic.GetHost(id) if err != nil { - logger.Log(1, "error getting node ", id, err.Error()) + logger.Log(1, "error getting host ", id, err.Error()) return } decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload()) if decryptErr != nil { - logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error()) + logger.Log(1, "failed to decrypt message for host ", id, decryptErr.Error()) return } - var newHost models.Host - if err := json.Unmarshal(decrypted, &newHost); err != nil { + var hostUpdate models.HostUpdate + if err := json.Unmarshal(decrypted, &hostUpdate); err != nil { logger.Log(1, "error unmarshaling payload ", err.Error()) return } - // ifaceDelta := logic.IfaceDelta(¤tHost, newNode) + logger.Log(0, "recieved host update for host: ", id) + switch hostUpdate.Action { + case models.UpdateHost: + // TODO: logic to update host recieved from client + case models.DeleteHost: + // TODO: logic to delete host on the server + + } // if servercfg.Is_EE && ifaceDelta { // if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil { // logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network) // } // } - logic.UpdateHost(&newHost, currentHost) - if err := logic.UpsertHost(&newHost); err != nil { - logger.Log(1, "error saving host", err.Error()) - return - } - logger.Log(1, "updated host", newHost.ID.String()) - }() + }(msg) } // UpdateMetrics message Handler -- handles updates from client nodes for metrics diff --git a/mq/mq.go b/mq/mq.go index a3c32ae4..d0ccc420 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -83,7 +83,7 @@ func SetupMQTT() { client.Disconnect(240) logger.Log(0, "node update subscription failed") } - if token := client.Subscribe("host/update/#", 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + if token := client.Subscribe("host/serverupdate/#", 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { client.Disconnect(240) logger.Log(0, "host update subscription failed") } diff --git a/mq/publishers.go b/mq/publishers.go index 614798f0..e2423544 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -92,20 +92,20 @@ func NodeUpdate(node *models.Node) error { return nil } -// HostUpdate -- publishes a host topic update -func HostUpdate(host *models.Host) error { +// HostUpdate -- publishes a host update to clients +func HostUpdate(hostUpdate *models.HostUpdate) error { if !servercfg.IsMessageQueueBackend() { return nil } - logger.Log(3, "publishing host update to "+host.ID.String()) + logger.Log(3, "publishing host update to "+hostUpdate.Host.ID.String()) - data, err := json.Marshal(host) + data, err := json.Marshal(hostUpdate) if err != nil { logger.Log(2, "error marshalling node update ", err.Error()) return err } - if err = publish(host, fmt.Sprintf("host/update/%s", host.ID.String()), data); err != nil { - logger.Log(2, "error publishing host update to", host.ID.String(), err.Error()) + if err = publish(&hostUpdate.Host, fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), data); err != nil { + logger.Log(2, "error publishing host update to", hostUpdate.Host.ID.String(), err.Error()) return err } diff --git a/mq/util.go b/mq/util.go index 0ab59ee9..ac0caedc 100644 --- a/mq/util.go +++ b/mq/util.go @@ -94,3 +94,13 @@ func getID(topic string) (string, error) { //the last part of the topic will be the node.ID return parts[count-1], nil } + +// decodes a message queue topic and returns the embedded host.ID +func getHostID(topic string) (string, error) { + parts := strings.Split(topic, "/") + count := len(parts) + if count < 4 { + return "", fmt.Errorf("invalid topic") + } + return parts[2], nil +}