fix: 监控采集方式修改 (#2489)

This commit is contained in:
ssongliu 2023-10-10 17:30:28 +08:00 committed by GitHub
parent 5fd238c429
commit 42e0bff8f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 129 additions and 90 deletions

View file

@ -1,6 +1,7 @@
package service
import (
"context"
"fmt"
"strconv"
"time"
@ -15,21 +16,28 @@ import (
"github.com/shirou/gopsutil/v3/net"
)
type MonitorService struct{}
type MonitorService struct {
DiskIO chan ([]disk.IOCountersStat)
NetIO chan ([]net.IOCountersStat)
}
var monitorCancel context.CancelFunc
type IMonitorService interface {
Run()
saveIODataToDB(ctx context.Context, interval float64)
saveNetDataToDB(ctx context.Context, interval float64)
}
func NewIMonitorService() IMonitorService {
return &MonitorService{}
return &MonitorService{
DiskIO: make(chan []disk.IOCountersStat, 2),
NetIO: make(chan []net.IOCountersStat, 2),
}
}
func (m *MonitorService) Run() {
monitorStatus, _ := settingRepo.Get(settingRepo.WithByKey("MonitorStatus"))
if monitorStatus.Value == "disable" {
return
}
var itemModel model.MonitorBase
totalPercent, _ := cpu.Percent(3*time.Second, false)
if len(totalPercent) == 1 {
@ -50,8 +58,8 @@ func (m *MonitorService) Run() {
global.LOG.Errorf("Insert basic monitoring data failed, err: %v", err)
}
go loadDiskIO()
go loadNetIO()
m.loadDiskIO()
m.loadNetIO()
MonitorStoreDays, err := settingRepo.Get(settingRepo.WithByKey("MonitorStoreDays"))
if err != nil {
@ -64,115 +72,145 @@ func (m *MonitorService) Run() {
_ = settingRepo.DelMonitorNet(timeForDelete)
}
func loadDiskIO() {
func (m *MonitorService) loadDiskIO() {
ioStat, _ := disk.IOCounters()
var diskIOList []disk.IOCountersStat
for _, io := range ioStat {
diskIOList = append(diskIOList, io)
}
m.DiskIO <- diskIOList
}
time.Sleep(60 * time.Second)
func (m *MonitorService) loadNetIO() {
netStat, _ := net.IOCounters(true)
netStatAll, _ := net.IOCounters(false)
var netList []net.IOCountersStat
netList = append(netList, netStat...)
netList = append(netList, netStatAll...)
m.NetIO <- netList
}
ioStat2, _ := disk.IOCounters()
var ioList []model.MonitorIO
for _, io2 := range ioStat2 {
for _, io1 := range ioStat {
if io2.Name == io1.Name {
var itemIO model.MonitorIO
itemIO.Name = io1.Name
if io2.ReadBytes != 0 && io1.ReadBytes != 0 && io2.ReadBytes > io1.ReadBytes {
itemIO.Read = uint64(float64(io2.ReadBytes-io1.ReadBytes) / 60)
}
if io2.WriteBytes != 0 && io1.WriteBytes != 0 && io2.WriteBytes > io1.WriteBytes {
itemIO.Write = uint64(float64(io2.WriteBytes-io1.WriteBytes) / 60)
}
func (m *MonitorService) saveIODataToDB(ctx context.Context, interval float64) {
defer close(m.DiskIO)
for {
select {
case <-ctx.Done():
return
case ioStat := <-m.DiskIO:
select {
case <-ctx.Done():
return
case ioStat2 := <-m.DiskIO:
var ioList []model.MonitorIO
for _, io2 := range ioStat2 {
for _, io1 := range ioStat {
if io2.Name == io1.Name {
var itemIO model.MonitorIO
itemIO.Name = io1.Name
if io2.ReadBytes != 0 && io1.ReadBytes != 0 && io2.ReadBytes > io1.ReadBytes {
itemIO.Read = uint64(float64(io2.ReadBytes-io1.ReadBytes) / interval / 60)
}
if io2.WriteBytes != 0 && io1.WriteBytes != 0 && io2.WriteBytes > io1.WriteBytes {
itemIO.Write = uint64(float64(io2.WriteBytes-io1.WriteBytes) / interval / 60)
}
if io2.ReadCount != 0 && io1.ReadCount != 0 && io2.ReadCount > io1.ReadCount {
itemIO.Count = uint64(float64(io2.ReadCount-io1.ReadCount) / 60)
}
writeCount := uint64(0)
if io2.WriteCount != 0 && io1.WriteCount != 0 && io2.WriteCount > io1.WriteCount {
writeCount = uint64(float64(io2.WriteCount-io1.WriteCount) / 60)
}
if writeCount > itemIO.Count {
itemIO.Count = writeCount
}
if io2.ReadCount != 0 && io1.ReadCount != 0 && io2.ReadCount > io1.ReadCount {
itemIO.Count = uint64(float64(io2.ReadCount-io1.ReadCount) / interval / 60)
}
writeCount := uint64(0)
if io2.WriteCount != 0 && io1.WriteCount != 0 && io2.WriteCount > io1.WriteCount {
writeCount = uint64(float64(io2.WriteCount-io1.WriteCount) / interval * 60)
}
if writeCount > itemIO.Count {
itemIO.Count = writeCount
}
if io2.ReadTime != 0 && io1.ReadTime != 0 && io2.ReadTime > io1.ReadTime {
itemIO.Time = uint64(float64(io2.ReadTime-io1.ReadTime) / 60)
if io2.ReadTime != 0 && io1.ReadTime != 0 && io2.ReadTime > io1.ReadTime {
itemIO.Time = uint64(float64(io2.ReadTime-io1.ReadTime) / interval / 60)
}
writeTime := uint64(0)
if io2.WriteTime != 0 && io1.WriteTime != 0 && io2.WriteTime > io1.WriteTime {
writeTime = uint64(float64(io2.WriteTime-io1.WriteTime) / interval / 60)
}
if writeTime > itemIO.Time {
itemIO.Time = writeTime
}
ioList = append(ioList, itemIO)
break
}
}
}
writeTime := uint64(0)
if io2.WriteTime != 0 && io1.WriteTime != 0 && io2.WriteTime > io1.WriteTime {
writeTime = uint64(float64(io2.WriteTime-io1.WriteTime) / 60)
if err := settingRepo.BatchCreateMonitorIO(ioList); err != nil {
global.LOG.Errorf("Insert io monitoring data failed, err: %v", err)
}
if writeTime > itemIO.Time {
itemIO.Time = writeTime
}
ioList = append(ioList, itemIO)
break
m.DiskIO <- ioStat2
}
}
}
if err := settingRepo.BatchCreateMonitorIO(ioList); err != nil {
global.LOG.Errorf("Insert io monitoring data failed, err: %v", err)
}
}
func loadNetIO() {
netStat, _ := net.IOCounters(true)
netStatAll, _ := net.IOCounters(false)
func (m *MonitorService) saveNetDataToDB(ctx context.Context, interval float64) {
defer close(m.NetIO)
for {
select {
case <-ctx.Done():
return
case netStat := <-m.NetIO:
select {
case <-ctx.Done():
return
case netStat2 := <-m.NetIO:
var netList []model.MonitorNetwork
for _, net2 := range netStat2 {
for _, net1 := range netStat {
if net2.Name == net1.Name {
var itemNet model.MonitorNetwork
itemNet.Name = net1.Name
time.Sleep(60 * time.Second)
netStat2, _ := net.IOCounters(true)
var netList []model.MonitorNetwork
for _, net2 := range netStat2 {
for _, net1 := range netStat {
if net2.Name == net1.Name {
var itemNet model.MonitorNetwork
itemNet.Name = net1.Name
if net2.BytesSent != 0 && net1.BytesSent != 0 && net2.BytesSent > net1.BytesSent {
itemNet.Up = float64(net2.BytesSent-net1.BytesSent) / 1024 / 60
if net2.BytesSent != 0 && net1.BytesSent != 0 && net2.BytesSent > net1.BytesSent {
itemNet.Up = float64(net2.BytesSent-net1.BytesSent) / 1024 / interval / 60
}
if net2.BytesRecv != 0 && net1.BytesRecv != 0 && net2.BytesRecv > net1.BytesRecv {
itemNet.Down = float64(net2.BytesRecv-net1.BytesRecv) / 1024 / interval / 60
}
netList = append(netList, itemNet)
break
}
}
}
if net2.BytesRecv != 0 && net1.BytesRecv != 0 && net2.BytesRecv > net1.BytesRecv {
itemNet.Down = float64(net2.BytesRecv-net1.BytesRecv) / 1024 / 60
if err := settingRepo.BatchCreateMonitorNet(netList); err != nil {
global.LOG.Errorf("Insert network monitoring data failed, err: %v", err)
}
netList = append(netList, itemNet)
break
m.NetIO <- netStat2
}
}
}
netStatAll2, _ := net.IOCounters(false)
for _, net2 := range netStatAll2 {
for _, net1 := range netStatAll {
if net2.Name == net1.Name {
var itemNet model.MonitorNetwork
itemNet.Name = net1.Name
if net2.BytesSent != 0 && net1.BytesSent != 0 && net2.BytesSent > net1.BytesSent {
itemNet.Up = float64(net2.BytesSent-net1.BytesSent) / 1024 / 60
}
if net2.BytesRecv != 0 && net1.BytesRecv != 0 && net2.BytesRecv > net1.BytesRecv {
itemNet.Down = float64(net2.BytesRecv-net1.BytesRecv) / 1024 / 60
}
netList = append(netList, itemNet)
break
}
}
}
if err := settingRepo.BatchCreateMonitorNet(netList); err != nil {
global.LOG.Errorf("Insert network monitoring data failed, err: %v", err)
}
}
func StartMonitor(removeBefore bool, interval string) error {
if removeBefore {
monitorCancel()
global.Cron.Remove(cron.EntryID(global.MonitorCronID))
}
imservice := NewIMonitorService()
monitorID, err := global.Cron.AddJob(fmt.Sprintf("@every %sm", interval), imservice)
intervalItem, err := strconv.Atoi(interval)
if err != nil {
return err
}
imservice.Run()
service := NewIMonitorService()
ctx, cancel := context.WithCancel(context.Background())
monitorCancel = cancel
monitorID, err := global.Cron.AddJob(fmt.Sprintf("@every %sm", interval), service)
if err != nil {
return err
}
service.Run()
go service.saveIODataToDB(ctx, float64(intervalItem))
go service.saveNetDataToDB(ctx, float64(intervalItem))
global.MonitorCronID = int(monitorID)
return nil
}

View file

@ -89,6 +89,7 @@ func (u *SettingService) Update(key, value string) error {
}
}
if value == "disable" && global.MonitorCronID != 0 {
monitorCancel()
global.Cron.Remove(cron.EntryID(global.MonitorCronID))
global.MonitorCronID = 0
}