mirror of
https://github.com/gravitl/netmaker.git
synced 2025-09-10 15:14:22 +08:00
266 lines
7.6 KiB
Go
266 lines
7.6 KiB
Go
package logic
|
|
|
|
import (
|
|
"encoding/json"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
"github.com/gravitl/netmaker/database"
|
|
"github.com/gravitl/netmaker/logic"
|
|
"github.com/gravitl/netmaker/models"
|
|
"github.com/gravitl/netmaker/mq"
|
|
"github.com/gravitl/netmaker/netclient/ncutils"
|
|
"github.com/gravitl/netmaker/servercfg"
|
|
"golang.org/x/exp/slog"
|
|
)
|
|
|
|
var (
|
|
metricsCacheMutex = &sync.RWMutex{}
|
|
metricsCacheMap = make(map[string]models.Metrics)
|
|
)
|
|
|
|
func getMetricsFromCache(key string) (metrics models.Metrics, ok bool) {
|
|
metricsCacheMutex.RLock()
|
|
metrics, ok = metricsCacheMap[key]
|
|
metricsCacheMutex.RUnlock()
|
|
return
|
|
}
|
|
|
|
func storeMetricsInCache(key string, metrics models.Metrics) {
|
|
metricsCacheMutex.Lock()
|
|
metricsCacheMap[key] = metrics
|
|
metricsCacheMutex.Unlock()
|
|
}
|
|
|
|
func deleteNetworkFromCache(key string) {
|
|
metricsCacheMutex.Lock()
|
|
delete(metricsCacheMap, key)
|
|
metricsCacheMutex.Unlock()
|
|
}
|
|
|
|
func LoadNodeMetricsToCache() error {
|
|
slog.Info("loading metrics to cache")
|
|
if metricsCacheMap == nil {
|
|
metricsCacheMap = map[string]models.Metrics{}
|
|
}
|
|
|
|
collection, err := database.FetchRecords(database.METRICS_TABLE_NAME)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for key, value := range collection {
|
|
var metrics models.Metrics
|
|
if err := json.Unmarshal([]byte(value), &metrics); err != nil {
|
|
slog.Error("parse metric record error", "error", err.Error())
|
|
continue
|
|
}
|
|
if servercfg.CacheEnabled() {
|
|
storeMetricsInCache(key, metrics)
|
|
}
|
|
}
|
|
|
|
slog.Info("metrics loading done")
|
|
return nil
|
|
}
|
|
|
|
// GetMetrics - gets the metrics
|
|
func GetMetrics(nodeid string) (*models.Metrics, error) {
|
|
var metrics models.Metrics
|
|
if servercfg.CacheEnabled() {
|
|
if metrics, ok := getMetricsFromCache(nodeid); ok {
|
|
return &metrics, nil
|
|
}
|
|
}
|
|
record, err := database.FetchRecord(database.METRICS_TABLE_NAME, nodeid)
|
|
if err != nil {
|
|
if database.IsEmptyRecord(err) {
|
|
return &metrics, nil
|
|
}
|
|
return &metrics, err
|
|
}
|
|
err = json.Unmarshal([]byte(record), &metrics)
|
|
if err != nil {
|
|
return &metrics, err
|
|
}
|
|
if servercfg.CacheEnabled() {
|
|
storeMetricsInCache(nodeid, metrics)
|
|
}
|
|
return &metrics, nil
|
|
}
|
|
|
|
// UpdateMetrics - updates the metrics of a given client
|
|
func UpdateMetrics(nodeid string, metrics *models.Metrics) error {
|
|
data, err := json.Marshal(metrics)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = database.Insert(nodeid, string(data), database.METRICS_TABLE_NAME)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if servercfg.CacheEnabled() {
|
|
storeMetricsInCache(nodeid, *metrics)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteMetrics - deletes metrics of a given node
|
|
func DeleteMetrics(nodeid string) error {
|
|
err := database.DeleteRecord(database.METRICS_TABLE_NAME, nodeid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if servercfg.CacheEnabled() {
|
|
deleteNetworkFromCache(nodeid)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MQUpdateMetricsFallBack - called when mq fallback thread is triggered on client
|
|
func MQUpdateMetricsFallBack(nodeid string, newMetrics models.Metrics) {
|
|
|
|
currentNode, err := logic.GetNodeByID(nodeid)
|
|
if err != nil {
|
|
slog.Error("error getting node", "id", nodeid, "error", err)
|
|
return
|
|
}
|
|
|
|
updateNodeMetrics(¤tNode, &newMetrics)
|
|
if err = logic.UpdateMetrics(nodeid, &newMetrics); err != nil {
|
|
slog.Error("failed to update node metrics", "id", nodeid, "error", err)
|
|
return
|
|
}
|
|
if servercfg.IsMetricsExporter() {
|
|
if err := mq.PushMetricsToExporter(newMetrics); err != nil {
|
|
slog.Error("failed to push node metrics to exporter", "id", currentNode.ID, "error", err)
|
|
}
|
|
}
|
|
slog.Debug("updated node metrics", "id", nodeid)
|
|
}
|
|
|
|
func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) {
|
|
id, err := mq.GetID(msg.Topic())
|
|
if err != nil {
|
|
slog.Error("error getting ID sent on ", "topic", msg.Topic(), "error", err)
|
|
return
|
|
}
|
|
currentNode, err := logic.GetNodeByID(id)
|
|
if err != nil {
|
|
slog.Error("error getting node", "id", id, "error", err)
|
|
return
|
|
}
|
|
decrypted, decryptErr := mq.DecryptMsg(¤tNode, msg.Payload())
|
|
if decryptErr != nil {
|
|
slog.Error("failed to decrypt message for node", "id", id, "error", decryptErr)
|
|
return
|
|
}
|
|
|
|
var newMetrics models.Metrics
|
|
if err := json.Unmarshal(decrypted, &newMetrics); err != nil {
|
|
slog.Error("error unmarshaling payload", "error", err)
|
|
return
|
|
}
|
|
updateNodeMetrics(¤tNode, &newMetrics)
|
|
if err = logic.UpdateMetrics(id, &newMetrics); err != nil {
|
|
slog.Error("failed to update node metrics", "id", id, "error", err)
|
|
return
|
|
}
|
|
if servercfg.IsMetricsExporter() {
|
|
if err := mq.PushMetricsToExporter(newMetrics); err != nil {
|
|
slog.Error("failed to push node metrics to exporter", "id", currentNode.ID, "error", err)
|
|
}
|
|
}
|
|
slog.Debug("updated node metrics", "id", id)
|
|
}
|
|
|
|
func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) {
|
|
oldMetrics, err := logic.GetMetrics(currentNode.ID.String())
|
|
if err != nil {
|
|
slog.Error("error finding old metrics for node", "id", currentNode.ID, "error", err)
|
|
return
|
|
}
|
|
|
|
var attachedClients []models.ExtClient
|
|
if currentNode.IsIngressGateway {
|
|
clients, err := logic.GetExtClientsByID(currentNode.ID.String(), currentNode.Network)
|
|
if err == nil {
|
|
attachedClients = clients
|
|
}
|
|
}
|
|
if newMetrics.Connectivity == nil {
|
|
newMetrics.Connectivity = make(map[string]models.Metric)
|
|
}
|
|
for i := range attachedClients {
|
|
slog.Debug("[metrics] processing attached client", "client", attachedClients[i].ClientID, "public key", attachedClients[i].PublicKey)
|
|
clientMetric := newMetrics.Connectivity[attachedClients[i].PublicKey]
|
|
clientMetric.NodeName = attachedClients[i].ClientID
|
|
newMetrics.Connectivity[attachedClients[i].ClientID] = clientMetric
|
|
delete(newMetrics.Connectivity, attachedClients[i].PublicKey)
|
|
slog.Debug("[metrics] attached client metric", "metric", clientMetric)
|
|
}
|
|
|
|
// run through metrics for each peer
|
|
for k := range newMetrics.Connectivity {
|
|
currMetric := newMetrics.Connectivity[k]
|
|
oldMetric := oldMetrics.Connectivity[k]
|
|
currMetric.TotalTime += oldMetric.TotalTime
|
|
currMetric.Uptime += oldMetric.Uptime // get the total uptime for this connection
|
|
|
|
totalRecv := currMetric.TotalReceived
|
|
totalSent := currMetric.TotalSent
|
|
if currMetric.TotalReceived < oldMetric.TotalReceived && currMetric.TotalReceived < oldMetric.LastTotalReceived {
|
|
currMetric.TotalReceived += oldMetric.TotalReceived
|
|
} else {
|
|
currMetric.TotalReceived = currMetric.TotalReceived - oldMetric.LastTotalReceived + oldMetric.TotalReceived
|
|
}
|
|
if currMetric.TotalSent < oldMetric.TotalSent && currMetric.TotalSent < oldMetric.LastTotalSent {
|
|
currMetric.TotalSent += oldMetric.TotalSent
|
|
} else {
|
|
currMetric.TotalSent = currMetric.TotalSent - oldMetric.LastTotalSent + oldMetric.TotalSent
|
|
}
|
|
|
|
if currMetric.Uptime == 0 || currMetric.TotalTime == 0 {
|
|
currMetric.PercentUp = 0
|
|
} else {
|
|
currMetric.PercentUp = 100.0 * (float64(currMetric.Uptime) / float64(currMetric.TotalTime))
|
|
}
|
|
totalUpMinutes := currMetric.Uptime * ncutils.CheckInInterval
|
|
currMetric.ActualUptime = time.Duration(totalUpMinutes) * time.Minute
|
|
delete(oldMetrics.Connectivity, k) // remove from old data
|
|
currMetric.LastTotalReceived = totalRecv
|
|
currMetric.LastTotalSent = totalSent
|
|
newMetrics.Connectivity[k] = currMetric
|
|
|
|
}
|
|
|
|
slog.Debug("[metrics] node metrics data", "node ID", currentNode.ID, "metrics", newMetrics)
|
|
}
|
|
|
|
func GetHostLocInfo(ip, token string) string {
|
|
url := "https://ipinfo.io/"
|
|
if ip != "" {
|
|
url += ip
|
|
}
|
|
url += "/json"
|
|
if token != "" {
|
|
url += "?token=" + token
|
|
}
|
|
|
|
client := http.Client{Timeout: 3 * time.Second}
|
|
resp, err := client.Get(url)
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
var data struct {
|
|
Loc string `json:"loc"` // Format: "lat,lon"
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
|
|
return ""
|
|
}
|
|
return data.Loc
|
|
}
|