Integrate with supabase backend
This commit is contained in:
315
internal/metrics/collector.go
Normal file
315
internal/metrics/collector.go
Normal file
@@ -0,0 +1,315 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/shirou/gopsutil/v3/cpu"
|
||||
"github.com/shirou/gopsutil/v3/disk"
|
||||
"github.com/shirou/gopsutil/v3/host"
|
||||
"github.com/shirou/gopsutil/v3/load"
|
||||
"github.com/shirou/gopsutil/v3/mem"
|
||||
psnet "github.com/shirou/gopsutil/v3/net"
|
||||
|
||||
"nannyagentv2/internal/types"
|
||||
)
|
||||
|
||||
// Collector handles system metrics collection
|
||||
type Collector struct {
|
||||
agentVersion string
|
||||
}
|
||||
|
||||
// NewCollector creates a new metrics collector
|
||||
func NewCollector(agentVersion string) *Collector {
|
||||
return &Collector{
|
||||
agentVersion: agentVersion,
|
||||
}
|
||||
}
|
||||
|
||||
// GatherSystemMetrics collects comprehensive system metrics
|
||||
func (c *Collector) GatherSystemMetrics() (*types.SystemMetrics, error) {
|
||||
metrics := &types.SystemMetrics{
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// System Information
|
||||
if hostInfo, err := host.Info(); err == nil {
|
||||
metrics.Hostname = hostInfo.Hostname
|
||||
metrics.Platform = hostInfo.Platform
|
||||
metrics.PlatformFamily = hostInfo.PlatformFamily
|
||||
metrics.PlatformVersion = hostInfo.PlatformVersion
|
||||
metrics.KernelVersion = hostInfo.KernelVersion
|
||||
metrics.KernelArch = hostInfo.KernelArch
|
||||
}
|
||||
|
||||
// CPU Metrics
|
||||
if percentages, err := cpu.Percent(time.Second, false); err == nil && len(percentages) > 0 {
|
||||
metrics.CPUUsage = math.Round(percentages[0]*100) / 100
|
||||
}
|
||||
|
||||
if cpuInfo, err := cpu.Info(); err == nil && len(cpuInfo) > 0 {
|
||||
metrics.CPUCores = len(cpuInfo)
|
||||
metrics.CPUModel = cpuInfo[0].ModelName
|
||||
}
|
||||
|
||||
// Memory Metrics
|
||||
if memInfo, err := mem.VirtualMemory(); err == nil {
|
||||
metrics.MemoryUsage = math.Round(float64(memInfo.Used)/(1024*1024)*100) / 100 // MB
|
||||
metrics.MemoryTotal = memInfo.Total
|
||||
metrics.MemoryUsed = memInfo.Used
|
||||
metrics.MemoryFree = memInfo.Free
|
||||
metrics.MemoryAvailable = memInfo.Available
|
||||
}
|
||||
|
||||
if swapInfo, err := mem.SwapMemory(); err == nil {
|
||||
metrics.SwapTotal = swapInfo.Total
|
||||
metrics.SwapUsed = swapInfo.Used
|
||||
metrics.SwapFree = swapInfo.Free
|
||||
}
|
||||
|
||||
// Disk Metrics
|
||||
if diskInfo, err := disk.Usage("/"); err == nil {
|
||||
metrics.DiskUsage = math.Round(diskInfo.UsedPercent*100) / 100
|
||||
metrics.DiskTotal = diskInfo.Total
|
||||
metrics.DiskUsed = diskInfo.Used
|
||||
metrics.DiskFree = diskInfo.Free
|
||||
}
|
||||
|
||||
// Load Averages
|
||||
if loadAvg, err := load.Avg(); err == nil {
|
||||
metrics.LoadAvg1 = math.Round(loadAvg.Load1*100) / 100
|
||||
metrics.LoadAvg5 = math.Round(loadAvg.Load5*100) / 100
|
||||
metrics.LoadAvg15 = math.Round(loadAvg.Load15*100) / 100
|
||||
}
|
||||
|
||||
// Process Count (simplified - using a constant for now)
|
||||
// Note: gopsutil doesn't have host.Processes(), would need process.Processes()
|
||||
metrics.ProcessCount = 0 // Placeholder
|
||||
|
||||
// Network Metrics
|
||||
netIn, netOut := c.getNetworkStats()
|
||||
metrics.NetworkInKbps = netIn
|
||||
metrics.NetworkOutKbps = netOut
|
||||
|
||||
if netIOCounters, err := psnet.IOCounters(false); err == nil && len(netIOCounters) > 0 {
|
||||
netIO := netIOCounters[0]
|
||||
metrics.NetworkInBytes = netIO.BytesRecv
|
||||
metrics.NetworkOutBytes = netIO.BytesSent
|
||||
}
|
||||
|
||||
// IP Address and Location
|
||||
metrics.IPAddress = c.getIPAddress()
|
||||
metrics.Location = c.getLocation() // Placeholder
|
||||
|
||||
// Filesystem Information
|
||||
metrics.FilesystemInfo = c.getFilesystemInfo()
|
||||
|
||||
// Block Devices
|
||||
metrics.BlockDevices = c.getBlockDevices()
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
// getNetworkStats returns network input/output rates in Kbps
|
||||
func (c *Collector) getNetworkStats() (float64, float64) {
|
||||
netIOCounters, err := psnet.IOCounters(false)
|
||||
if err != nil || len(netIOCounters) == 0 {
|
||||
return 0.0, 0.0
|
||||
}
|
||||
|
||||
// Use the first interface for aggregate stats
|
||||
netIO := netIOCounters[0]
|
||||
|
||||
// Convert bytes to kilobits per second (simplified - cumulative bytes to kilobits)
|
||||
netInKbps := float64(netIO.BytesRecv) * 8 / 1024
|
||||
netOutKbps := float64(netIO.BytesSent) * 8 / 1024
|
||||
|
||||
return netInKbps, netOutKbps
|
||||
}
|
||||
|
||||
// getIPAddress returns the primary IP address of the system
|
||||
func (c *Collector) getIPAddress() string {
|
||||
interfaces, err := psnet.Interfaces()
|
||||
if err != nil {
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
for _, iface := range interfaces {
|
||||
if len(iface.Addrs) > 0 && !strings.Contains(iface.Addrs[0].Addr, "127.0.0.1") {
|
||||
return strings.Split(iface.Addrs[0].Addr, "/")[0] // Remove CIDR if present
|
||||
}
|
||||
}
|
||||
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
// getLocation returns basic location information (placeholder)
|
||||
func (c *Collector) getLocation() string {
|
||||
return "unknown" // Would integrate with GeoIP service
|
||||
}
|
||||
|
||||
// getFilesystemInfo returns information about mounted filesystems
|
||||
func (c *Collector) getFilesystemInfo() []types.FilesystemInfo {
|
||||
partitions, err := disk.Partitions(false)
|
||||
if err != nil {
|
||||
return []types.FilesystemInfo{}
|
||||
}
|
||||
|
||||
var filesystems []types.FilesystemInfo
|
||||
for _, partition := range partitions {
|
||||
usage, err := disk.Usage(partition.Mountpoint)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
fs := types.FilesystemInfo{
|
||||
Mountpoint: partition.Mountpoint,
|
||||
Fstype: partition.Fstype,
|
||||
Total: usage.Total,
|
||||
Used: usage.Used,
|
||||
Free: usage.Free,
|
||||
UsagePercent: math.Round(usage.UsedPercent*100) / 100,
|
||||
}
|
||||
filesystems = append(filesystems, fs)
|
||||
}
|
||||
|
||||
return filesystems
|
||||
}
|
||||
|
||||
// getBlockDevices returns information about block devices
|
||||
func (c *Collector) getBlockDevices() []types.BlockDevice {
|
||||
partitions, err := disk.Partitions(true)
|
||||
if err != nil {
|
||||
return []types.BlockDevice{}
|
||||
}
|
||||
|
||||
var devices []types.BlockDevice
|
||||
deviceMap := make(map[string]bool)
|
||||
|
||||
for _, partition := range partitions {
|
||||
// Only include actual block devices
|
||||
if strings.HasPrefix(partition.Device, "/dev/") {
|
||||
deviceName := partition.Device
|
||||
if !deviceMap[deviceName] {
|
||||
deviceMap[deviceName] = true
|
||||
|
||||
device := types.BlockDevice{
|
||||
Name: deviceName,
|
||||
Model: "unknown",
|
||||
Size: 0,
|
||||
SerialNumber: "unknown",
|
||||
}
|
||||
devices = append(devices, device)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return devices
|
||||
}
|
||||
|
||||
// SendMetrics sends system metrics to the agent-auth-api endpoint
|
||||
func (c *Collector) SendMetrics(agentAuthURL, accessToken, agentID string, metrics *types.SystemMetrics) error {
|
||||
// Create flattened metrics request for agent-auth-api
|
||||
metricsReq := c.CreateMetricsRequest(agentID, metrics)
|
||||
|
||||
return c.sendMetricsRequest(agentAuthURL, accessToken, metricsReq)
|
||||
}
|
||||
|
||||
// CreateMetricsRequest converts SystemMetrics to the flattened format expected by agent-auth-api
|
||||
func (c *Collector) CreateMetricsRequest(agentID string, systemMetrics *types.SystemMetrics) *types.MetricsRequest {
|
||||
return &types.MetricsRequest{
|
||||
AgentID: agentID,
|
||||
CPUUsage: systemMetrics.CPUUsage,
|
||||
MemoryUsage: systemMetrics.MemoryUsage,
|
||||
DiskUsage: systemMetrics.DiskUsage,
|
||||
NetworkInKbps: systemMetrics.NetworkInKbps,
|
||||
NetworkOutKbps: systemMetrics.NetworkOutKbps,
|
||||
IPAddress: systemMetrics.IPAddress,
|
||||
Location: systemMetrics.Location,
|
||||
AgentVersion: c.agentVersion,
|
||||
KernelVersion: systemMetrics.KernelVersion,
|
||||
DeviceFingerprint: c.generateDeviceFingerprint(systemMetrics),
|
||||
LoadAverages: map[string]float64{
|
||||
"load1": systemMetrics.LoadAvg1,
|
||||
"load5": systemMetrics.LoadAvg5,
|
||||
"load15": systemMetrics.LoadAvg15,
|
||||
},
|
||||
OSInfo: map[string]string{
|
||||
"platform": systemMetrics.Platform,
|
||||
"platform_family": systemMetrics.PlatformFamily,
|
||||
"platform_version": systemMetrics.PlatformVersion,
|
||||
"kernel_version": systemMetrics.KernelVersion,
|
||||
"kernel_arch": systemMetrics.KernelArch,
|
||||
},
|
||||
FilesystemInfo: systemMetrics.FilesystemInfo,
|
||||
BlockDevices: systemMetrics.BlockDevices,
|
||||
NetworkStats: map[string]uint64{
|
||||
"bytes_sent": systemMetrics.NetworkOutBytes,
|
||||
"bytes_recv": systemMetrics.NetworkInBytes,
|
||||
"total_bytes": systemMetrics.NetworkInBytes + systemMetrics.NetworkOutBytes,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// sendMetricsRequest sends the metrics request to the agent-auth-api
|
||||
func (c *Collector) sendMetricsRequest(agentAuthURL, accessToken string, metricsReq *types.MetricsRequest) error {
|
||||
// Wrap metrics in the expected payload structure
|
||||
payload := map[string]interface{}{
|
||||
"metrics": metricsReq,
|
||||
"timestamp": time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
jsonData, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal metrics: %w", err)
|
||||
}
|
||||
|
||||
// Send to /metrics endpoint
|
||||
metricsURL := fmt.Sprintf("%s/metrics", agentAuthURL)
|
||||
req, err := http.NewRequest("POST", metricsURL, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", accessToken))
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send metrics: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Read response
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read response: %w", err)
|
||||
}
|
||||
|
||||
// Check response status
|
||||
if resp.StatusCode == http.StatusUnauthorized {
|
||||
return fmt.Errorf("unauthorized")
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("metrics request failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// generateDeviceFingerprint creates a unique device identifier
|
||||
func (c *Collector) generateDeviceFingerprint(metrics *types.SystemMetrics) string {
|
||||
fingerprint := fmt.Sprintf("%s-%s-%s", metrics.Hostname, metrics.Platform, metrics.KernelVersion)
|
||||
hasher := sha256.New()
|
||||
hasher.Write([]byte(fingerprint))
|
||||
return fmt.Sprintf("%x", hasher.Sum(nil))[:16]
|
||||
}
|
||||
Reference in New Issue
Block a user