netmaker/mq/migrate.go
Yabin Ma 5f21c8bb1d
NET-1778: scale test code changes (#3203)
* comment ACL call and add debug message

* add cache for network nodes

* fix load node to network cache issue

* add peerUpdate call 1 min limit

* add debug log for scale test

* release maps

* avoid default policy for node

* 1 min limit for peerUpdate trigger

* mq options

* Revert "mq options"

This reverts commit 10b93d0118.

* set peerUpdate run in sequence

* update for emqx 5.8.2

* remove batch peer update

* change the sleep to 10 millisec to avoid timeout

* add compress and change encrypt for peerUpdate message

* add mem profiling and automaxprocs

* add failover ctx mutex

* ignore request to failover peer

* remove code without called

* remove debug logs

* update emqx to v5.8.2

* change broker keepalive

* add OLD_ACL_SUPPORT setting

* add host version check for message encrypt

* remove debug message

* remove peerUpdate call control

---------

Co-authored-by: abhishek9686 <abhi281342@gmail.com>
2024-12-10 10:15:31 +04:00

140 lines
3.5 KiB
Go

package mq
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/servercfg"
"golang.org/x/exp/slog"
)
func setupmqtt_old() (mqtt.Client, error) {
opts := mqtt.NewClientOptions()
opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT"))
id := logic.RandomString(23)
opts.ClientID = id
opts.SetUsername(os.Getenv("OLD_MQ_USERNAME"))
opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD"))
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(time.Second << 2)
opts.SetKeepAlive(time.Minute)
opts.SetWriteTimeout(time.Minute)
mqclient := mqtt.NewClient(opts)
var connecterr error
if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
if token.Error() == nil {
connecterr = errors.New("connect timeout")
} else {
connecterr = token.Error()
}
slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr)
}
return mqclient, nil
}
func getEmqxAuthTokenOld() (string, error) {
payload, err := json.Marshal(&emqxLogin{
Username: os.Getenv("OLD_MQ_USERNAME"),
Password: os.Getenv("OLD_MQ_PASSWORD"),
})
if err != nil {
return "", err
}
resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload))
if err != nil {
return "", err
}
defer resp.Body.Close()
msg, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("error during EMQX login %v", string(msg))
}
var loginResp emqxLoginResponse
if err := json.Unmarshal(msg, &loginResp); err != nil {
return "", err
}
return loginResp.Token, nil
}
func SendPullSYN() error {
mqclient, err := setupmqtt_old()
if err != nil {
return err
}
hosts, err := logic.GetAllHosts()
if err != nil {
return err
}
for _, host := range hosts {
host := host
hostUpdate := models.HostUpdate{
Action: models.RequestPull,
Host: host,
}
msg, _ := json.Marshal(hostUpdate)
zipped, err := compressPayload(msg)
if err != nil {
return err
}
encrypted, encryptErr := encryptAESGCM(host.TrafficKeyPublic[0:32], zipped)
if encryptErr != nil {
return encryptErr
}
if encryptErr != nil {
continue
}
logger.Log(0, "sending pull syn to", host.Name)
mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted)
}
return nil
}
func KickOutClients() error {
authToken, err := getEmqxAuthTokenOld()
if err != nil {
return err
}
hosts, err := logic.GetAllHosts()
if err != nil {
slog.Error("failed to migrate emqx: ", "error", err)
return err
}
for _, host := range hosts {
url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String())
client := &http.Client{}
req, err := http.NewRequest(http.MethodDelete, url, nil)
if err != nil {
slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err)
continue
}
req.Header.Add("Authorization", "Bearer "+authToken)
res, err := client.Do(req)
if err != nil {
slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err)
continue
}
if res.StatusCode != http.StatusNoContent {
slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode)
}
res.Body.Close()
}
return nil
}