netmaker/pro/logic/metrics.go
2024-11-10 15:30:21 +04:00

239 lines
7.1 KiB
Go

package logic
import (
"encoding/json"
"sync"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/database"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/mq"
"github.com/gravitl/netmaker/netclient/ncutils"
"github.com/gravitl/netmaker/servercfg"
"golang.org/x/exp/slog"
)
var (
metricsCacheMutex = &sync.RWMutex{}
metricsCacheMap = make(map[string]models.Metrics)
)
func getMetricsFromCache(key string) (metrics models.Metrics, ok bool) {
metricsCacheMutex.RLock()
metrics, ok = metricsCacheMap[key]
metricsCacheMutex.RUnlock()
return
}
func storeMetricsInCache(key string, metrics models.Metrics) {
metricsCacheMutex.Lock()
metricsCacheMap[key] = metrics
metricsCacheMutex.Unlock()
}
func deleteNetworkFromCache(key string) {
metricsCacheMutex.Lock()
delete(metricsCacheMap, key)
metricsCacheMutex.Unlock()
}
func LoadNodeMetricsToCache() error {
slog.Info("loading metrics to cache")
if metricsCacheMap == nil {
metricsCacheMap = map[string]models.Metrics{}
}
collection, err := database.FetchRecords(database.METRICS_TABLE_NAME)
if err != nil {
return err
}
for key, value := range collection {
var metrics models.Metrics
if err := json.Unmarshal([]byte(value), &metrics); err != nil {
slog.Error("parse metric record error", "error", err.Error())
continue
}
if servercfg.CacheEnabled() {
storeMetricsInCache(key, metrics)
}
}
slog.Info("metrics loading done")
return nil
}
// GetMetrics - gets the metrics
func GetMetrics(nodeid string) (*models.Metrics, error) {
var metrics models.Metrics
if servercfg.CacheEnabled() {
if metrics, ok := getMetricsFromCache(nodeid); ok {
return &metrics, nil
}
}
record, err := database.FetchRecord(database.METRICS_TABLE_NAME, nodeid)
if err != nil {
if database.IsEmptyRecord(err) {
return &metrics, nil
}
return &metrics, err
}
err = json.Unmarshal([]byte(record), &metrics)
if err != nil {
return &metrics, err
}
if servercfg.CacheEnabled() {
storeMetricsInCache(nodeid, metrics)
}
return &metrics, nil
}
// UpdateMetrics - updates the metrics of a given client
func UpdateMetrics(nodeid string, metrics *models.Metrics) error {
data, err := json.Marshal(metrics)
if err != nil {
return err
}
err = database.Insert(nodeid, string(data), database.METRICS_TABLE_NAME)
if err != nil {
return err
}
if servercfg.CacheEnabled() {
storeMetricsInCache(nodeid, *metrics)
}
return nil
}
// DeleteMetrics - deletes metrics of a given node
func DeleteMetrics(nodeid string) error {
err := database.DeleteRecord(database.METRICS_TABLE_NAME, nodeid)
if err != nil {
return err
}
if servercfg.CacheEnabled() {
deleteNetworkFromCache(nodeid)
}
return nil
}
// MQUpdateMetricsFallBack - called when mq fallback thread is triggered on client
func MQUpdateMetricsFallBack(nodeid string, newMetrics models.Metrics) {
currentNode, err := logic.GetNodeByID(nodeid)
if err != nil {
slog.Error("error getting node", "id", nodeid, "error", err)
return
}
updateNodeMetrics(&currentNode, &newMetrics)
if err = logic.UpdateMetrics(nodeid, &newMetrics); err != nil {
slog.Error("failed to update node metrics", "id", nodeid, "error", err)
return
}
if servercfg.IsMetricsExporter() {
if err := mq.PushMetricsToExporter(newMetrics); err != nil {
slog.Error("failed to push node metrics to exporter", "id", currentNode.ID, "error", err)
}
}
slog.Debug("updated node metrics", "id", nodeid)
}
func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) {
id, err := mq.GetID(msg.Topic())
if err != nil {
slog.Error("error getting ID sent on ", "topic", msg.Topic(), "error", err)
return
}
currentNode, err := logic.GetNodeByID(id)
if err != nil {
slog.Error("error getting node", "id", id, "error", err)
return
}
decrypted, decryptErr := mq.DecryptMsg(&currentNode, msg.Payload())
if decryptErr != nil {
slog.Error("failed to decrypt message for node", "id", id, "error", decryptErr)
return
}
var newMetrics models.Metrics
if err := json.Unmarshal(decrypted, &newMetrics); err != nil {
slog.Error("error unmarshaling payload", "error", err)
return
}
updateNodeMetrics(&currentNode, &newMetrics)
if err = logic.UpdateMetrics(id, &newMetrics); err != nil {
slog.Error("failed to update node metrics", "id", id, "error", err)
return
}
if servercfg.IsMetricsExporter() {
if err := mq.PushMetricsToExporter(newMetrics); err != nil {
slog.Error("failed to push node metrics to exporter", "id", currentNode.ID, "error", err)
}
}
slog.Debug("updated node metrics", "id", id)
}
func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) {
oldMetrics, err := logic.GetMetrics(currentNode.ID.String())
if err != nil {
slog.Error("error finding old metrics for node", "id", currentNode.ID, "error", err)
return
}
var attachedClients []models.ExtClient
if currentNode.IsIngressGateway {
clients, err := logic.GetExtClientsByID(currentNode.ID.String(), currentNode.Network)
if err == nil {
attachedClients = clients
}
}
if newMetrics.Connectivity == nil {
newMetrics.Connectivity = make(map[string]models.Metric)
}
for i := range attachedClients {
slog.Debug("[metrics] processing attached client", "client", attachedClients[i].ClientID, "public key", attachedClients[i].PublicKey)
clientMetric := newMetrics.Connectivity[attachedClients[i].PublicKey]
clientMetric.NodeName = attachedClients[i].ClientID
newMetrics.Connectivity[attachedClients[i].ClientID] = clientMetric
delete(newMetrics.Connectivity, attachedClients[i].PublicKey)
slog.Debug("[metrics] attached client metric", "metric", clientMetric)
}
// run through metrics for each peer
for k := range newMetrics.Connectivity {
currMetric := newMetrics.Connectivity[k]
oldMetric := oldMetrics.Connectivity[k]
currMetric.TotalTime += oldMetric.TotalTime
currMetric.Uptime += oldMetric.Uptime // get the total uptime for this connection
totalRecv := currMetric.TotalReceived
totalSent := currMetric.TotalSent
if currMetric.TotalReceived < oldMetric.TotalReceived && currMetric.TotalReceived < oldMetric.LastTotalReceived {
currMetric.TotalReceived += oldMetric.TotalReceived
} else {
currMetric.TotalReceived = currMetric.TotalReceived - oldMetric.LastTotalReceived + oldMetric.TotalReceived
}
if currMetric.TotalSent < oldMetric.TotalSent && currMetric.TotalSent < oldMetric.LastTotalSent {
currMetric.TotalSent += oldMetric.TotalSent
} else {
currMetric.TotalSent = currMetric.TotalSent - oldMetric.LastTotalSent + oldMetric.TotalSent
}
if currMetric.Uptime == 0 || currMetric.TotalTime == 0 {
currMetric.PercentUp = 0
} else {
currMetric.PercentUp = 100.0 * (float64(currMetric.Uptime) / float64(currMetric.TotalTime))
}
totalUpMinutes := currMetric.Uptime * ncutils.CheckInInterval
currMetric.ActualUptime = time.Duration(totalUpMinutes) * time.Minute
delete(oldMetrics.Connectivity, k) // remove from old data
currMetric.LastTotalReceived = totalRecv
currMetric.LastTotalSent = totalSent
newMetrics.Connectivity[k] = currMetric
}
slog.Debug("[metrics] node metrics data", "node ID", currentNode.ID, "metrics", newMetrics)
}