mirror of
https://github.com/gravitl/netmaker.git
synced 2024-09-20 15:26:04 +08:00
commit
8227e7899c
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -11,6 +12,7 @@ import (
|
|||
"github.com/gorilla/handlers"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gravitl/netmaker/logger"
|
||||
m "github.com/gravitl/netmaker/migrate"
|
||||
"github.com/gravitl/netmaker/servercfg"
|
||||
)
|
||||
|
||||
|
@ -62,6 +64,11 @@ func HandleRESTRequests(wg *sync.WaitGroup, ctx context.Context) {
|
|||
logger.Log(0, err.Error())
|
||||
}
|
||||
}()
|
||||
if os.Getenv("MIGRATE_EMQX") == "true" {
|
||||
logger.Log(0, "migrating emqx...")
|
||||
time.Sleep(time.Second * 2)
|
||||
m.MigrateEmqx()
|
||||
}
|
||||
logger.Log(0, "REST Server successfully started on port ", port, " (REST)")
|
||||
|
||||
// Block main routine until a signal is received
|
||||
|
|
|
@ -645,6 +645,17 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
}
|
||||
relayUpdate := logic.RelayUpdates(¤tNode, newNode)
|
||||
if relayUpdate && newNode.IsRelay {
|
||||
err = logic.ValidateRelay(models.RelayRequest{
|
||||
NodeID: newNode.ID.String(),
|
||||
NetID: newNode.Network,
|
||||
RelayedNodes: newNode.RelayedNodes,
|
||||
}, true)
|
||||
if err != nil {
|
||||
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
|
||||
return
|
||||
}
|
||||
}
|
||||
_, err = logic.GetHost(newNode.HostID.String())
|
||||
if err != nil {
|
||||
logger.Log(0, r.Header.Get("user"),
|
||||
|
|
|
@ -77,14 +77,12 @@ func RemoveNodeACL(networkID NetworkID, nodeID NodeID) (acls.ACLContainer, error
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
acls.AclMutex.Lock()
|
||||
for currentNodeID := range currentNetworkACL {
|
||||
if NodeID(currentNodeID) != nodeID {
|
||||
currentNetworkACL[currentNodeID].Remove(acls.AclID(nodeID))
|
||||
}
|
||||
}
|
||||
delete(currentNetworkACL, acls.AclID(nodeID))
|
||||
acls.AclMutex.Unlock()
|
||||
return currentNetworkACL.Save(acls.ContainerID(networkID))
|
||||
}
|
||||
|
||||
|
|
|
@ -15,8 +15,10 @@ func AreNodesAllowed(networkID NetworkID, node1, node2 NodeID) bool {
|
|||
}
|
||||
var allowed bool
|
||||
acls.AclMutex.RLock()
|
||||
allowed = currentNetworkACL[acls.AclID(node1)].IsAllowed(acls.AclID(node2)) && currentNetworkACL[acls.AclID(node2)].IsAllowed(acls.AclID(node1))
|
||||
currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)]
|
||||
currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)]
|
||||
acls.AclMutex.RUnlock()
|
||||
allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1))
|
||||
return allowed
|
||||
}
|
||||
|
||||
|
|
|
@ -418,7 +418,6 @@ func DissasociateNodeFromHost(n *models.Node, h *models.Host) error {
|
|||
if err := DeleteNodeByID(n); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return UpsertHost(h)
|
||||
}
|
||||
|
||||
|
|
|
@ -189,7 +189,6 @@ func UpdateNode(currentNode *models.Node, newNode *models.Node) error {
|
|||
func DeleteNode(node *models.Node, purge bool) error {
|
||||
alreadyDeleted := node.PendingDelete || node.Action == models.NODE_DELETE
|
||||
node.Action = models.NODE_DELETE
|
||||
|
||||
//delete ext clients if node is ingress gw
|
||||
if node.IsIngressGateway {
|
||||
if err := DeleteGatewayExtClients(node.ID.String(), node.Network); err != nil {
|
||||
|
@ -235,7 +234,6 @@ func DeleteNode(node *models.Node, purge bool) error {
|
|||
if node.IsInternetGateway {
|
||||
UnsetInternetGw(node)
|
||||
}
|
||||
|
||||
if !purge && !alreadyDeleted {
|
||||
newnode := *node
|
||||
newnode.PendingDelete = true
|
||||
|
@ -281,7 +279,6 @@ func GetNodeByHostRef(hostid, network string) (node models.Node, err error) {
|
|||
func DeleteNodeByID(node *models.Node) error {
|
||||
var err error
|
||||
var key = node.ID.String()
|
||||
|
||||
if err = database.DeleteRecord(database.NODES_TABLE_NAME, key); err != nil {
|
||||
if !database.IsEmptyRecord(err) {
|
||||
return err
|
||||
|
|
|
@ -28,3 +28,7 @@ var SetRelayedNodes = func(setRelayed bool, relay string, relayed []string) []mo
|
|||
var RelayUpdates = func(currentNode, newNode *models.Node) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
var ValidateRelay = func(relay models.RelayRequest, update bool) error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"golang.org/x/exp/slog"
|
||||
|
||||
|
@ -12,6 +13,7 @@ import (
|
|||
"github.com/gravitl/netmaker/logic"
|
||||
"github.com/gravitl/netmaker/logic/acls"
|
||||
"github.com/gravitl/netmaker/models"
|
||||
"github.com/gravitl/netmaker/mq"
|
||||
"github.com/gravitl/netmaker/servercfg"
|
||||
)
|
||||
|
||||
|
@ -22,6 +24,7 @@ func Run() {
|
|||
updateHosts()
|
||||
updateNodes()
|
||||
updateAcls()
|
||||
|
||||
}
|
||||
|
||||
func assignSuperAdmin() {
|
||||
|
@ -292,3 +295,19 @@ func updateAcls() {
|
|||
slog.Info(fmt.Sprintf("(migration) successfully saved new acls for network: %s", network.NetID))
|
||||
}
|
||||
}
|
||||
|
||||
func MigrateEmqx() {
|
||||
|
||||
err := mq.SendPullSYN()
|
||||
if err != nil {
|
||||
logger.Log(0, "failed to send pull syn to clients", "error", err.Error())
|
||||
|
||||
}
|
||||
time.Sleep(time.Second * 3)
|
||||
slog.Info("proceeding to kicking out clients from emqx")
|
||||
err = mq.KickOutClients()
|
||||
if err != nil {
|
||||
logger.Log(2, "failed to migrate emqx: ", "kickout-error", err.Error())
|
||||
}
|
||||
|
||||
}
|
||||
|
|
131
mq/migrate.go
Normal file
131
mq/migrate.go
Normal file
|
@ -0,0 +1,131 @@
|
|||
package mq
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/gravitl/netmaker/logger"
|
||||
"github.com/gravitl/netmaker/logic"
|
||||
"github.com/gravitl/netmaker/models"
|
||||
"github.com/gravitl/netmaker/servercfg"
|
||||
"golang.org/x/exp/slog"
|
||||
)
|
||||
|
||||
func setupmqtt_old() (mqtt.Client, error) {
|
||||
|
||||
opts := mqtt.NewClientOptions()
|
||||
opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT"))
|
||||
id := logic.RandomString(23)
|
||||
opts.ClientID = id
|
||||
opts.SetUsername(os.Getenv("OLD_MQ_USERNAME"))
|
||||
opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD"))
|
||||
opts.SetAutoReconnect(true)
|
||||
opts.SetConnectRetry(true)
|
||||
opts.SetConnectRetryInterval(time.Second << 2)
|
||||
opts.SetKeepAlive(time.Minute)
|
||||
opts.SetWriteTimeout(time.Minute)
|
||||
mqclient := mqtt.NewClient(opts)
|
||||
|
||||
var connecterr error
|
||||
if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
|
||||
if token.Error() == nil {
|
||||
connecterr = errors.New("connect timeout")
|
||||
} else {
|
||||
connecterr = token.Error()
|
||||
}
|
||||
slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr)
|
||||
}
|
||||
return mqclient, nil
|
||||
}
|
||||
|
||||
func getEmqxAuthTokenOld() (string, error) {
|
||||
payload, err := json.Marshal(&emqxLogin{
|
||||
Username: os.Getenv("OLD_MQ_USERNAME"),
|
||||
Password: os.Getenv("OLD_MQ_PASSWORD"),
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
msg, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return "", fmt.Errorf("error during EMQX login %v", string(msg))
|
||||
}
|
||||
var loginResp emqxLoginResponse
|
||||
if err := json.Unmarshal(msg, &loginResp); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return loginResp.Token, nil
|
||||
}
|
||||
|
||||
func SendPullSYN() error {
|
||||
mqclient, err := setupmqtt_old()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hosts, err := logic.GetAllHosts()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, host := range hosts {
|
||||
host := host
|
||||
hostUpdate := models.HostUpdate{
|
||||
Action: models.RequestPull,
|
||||
Host: host,
|
||||
}
|
||||
msg, _ := json.Marshal(hostUpdate)
|
||||
encrypted, encryptErr := encryptMsg(&host, msg)
|
||||
if encryptErr != nil {
|
||||
continue
|
||||
}
|
||||
logger.Log(0, "sending pull syn to", host.Name)
|
||||
mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func KickOutClients() error {
|
||||
authToken, err := getEmqxAuthTokenOld()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hosts, err := logic.GetAllHosts()
|
||||
if err != nil {
|
||||
slog.Error("failed to migrate emqx: ", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, host := range hosts {
|
||||
url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String())
|
||||
client := &http.Client{}
|
||||
req, err := http.NewRequest(http.MethodDelete, url, nil)
|
||||
if err != nil {
|
||||
slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err)
|
||||
continue
|
||||
}
|
||||
req.Header.Add("Authorization", "Bearer "+authToken)
|
||||
res, err := client.Do(req)
|
||||
if err != nil {
|
||||
slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err)
|
||||
continue
|
||||
}
|
||||
if res.StatusCode != http.StatusNoContent {
|
||||
slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode)
|
||||
}
|
||||
res.Body.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
1
mq/mq.go
1
mq/mq.go
|
@ -60,6 +60,7 @@ func SetupMQTT() {
|
|||
log.Fatal(err)
|
||||
}
|
||||
} else {
|
||||
emqx.DeleteEmqxUser(servercfg.GetMqUserName())
|
||||
if err := emqx.CreateEmqxUserforServer(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -101,6 +101,7 @@ func InitPro() {
|
|||
logic.UpdateRelayed = proLogic.UpdateRelayed
|
||||
logic.SetRelayedNodes = proLogic.SetRelayedNodes
|
||||
logic.RelayUpdates = proLogic.RelayUpdates
|
||||
logic.ValidateRelay = proLogic.ValidateRelay
|
||||
logic.GetTrialEndDate = getTrialEndDate
|
||||
logic.SetDefaultGw = proLogic.SetDefaultGw
|
||||
logic.SetDefaultGwForRelayedUpdate = proLogic.SetDefaultGwForRelayedUpdate
|
||||
|
|
|
@ -44,7 +44,7 @@ func CreateRelay(relay models.RelayRequest) ([]models.Node, models.Node, error)
|
|||
if host.OS != "linux" {
|
||||
return returnnodes, models.Node{}, fmt.Errorf("only linux machines can be relay nodes")
|
||||
}
|
||||
err = ValidateRelay(relay)
|
||||
err = ValidateRelay(relay, false)
|
||||
if err != nil {
|
||||
return returnnodes, models.Node{}, err
|
||||
}
|
||||
|
@ -101,14 +101,14 @@ func SetRelayedNodes(setRelayed bool, relay string, relayed []string) []models.N
|
|||
// }
|
||||
|
||||
// ValidateRelay - checks if relay is valid
|
||||
func ValidateRelay(relay models.RelayRequest) error {
|
||||
func ValidateRelay(relay models.RelayRequest, update bool) error {
|
||||
var err error
|
||||
|
||||
node, err := logic.GetNodeByID(relay.NodeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if node.IsRelay {
|
||||
if !update && node.IsRelay {
|
||||
return errors.New("node is already acting as a relay")
|
||||
}
|
||||
for _, relayedNodeID := range relay.RelayedNodes {
|
||||
|
@ -119,6 +119,9 @@ func ValidateRelay(relay models.RelayRequest) error {
|
|||
if relayedNode.IsIngressGateway {
|
||||
return errors.New("cannot relay an ingress gateway (" + relayedNodeID + ")")
|
||||
}
|
||||
if relayedNode.IsInternetGateway {
|
||||
return errors.New("cannot relay an internet gateway (" + relayedNodeID + ")")
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue