Files
nannyagent/agent.go

559 lines
18 KiB
Go

package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/sashabaranov/go-openai"
)
// DiagnosticResponse represents the diagnostic phase response from AI
type DiagnosticResponse struct {
ResponseType string `json:"response_type"`
Phase string `json:"phase"`
Analysis string `json:"analysis"`
Commands []string `json:"commands"`
NextSteps []string `json:"next_steps"`
Reasoning string `json:"reasoning"`
ConfidenceLevel float64 `json:"confidence_level"`
}
// EBPFRequest represents a request for eBPF program execution
type EBPFRequest struct {
Name string `json:"name"`
Type string `json:"type"`
Target string `json:"target"`
Duration int `json:"duration"`
Filters map[string]string `json:"filters,omitempty"`
Description string `json:"description"`
}
// EBPFEnhancedDiagnosticResponse represents the enhanced diagnostic response with eBPF
type EBPFEnhancedDiagnosticResponse struct {
ResponseType string `json:"response_type"`
Phase string `json:"phase"`
Analysis string `json:"analysis"`
Commands []string `json:"commands"`
EBPFPrograms []EBPFRequest `json:"ebpf_programs"`
NextSteps []string `json:"next_steps"`
Reasoning string `json:"reasoning"`
ConfidenceLevel float64 `json:"confidence_level"`
}
// ResolutionResponse represents the resolution phase response from AI
type ResolutionResponse struct {
ResponseType string `json:"response_type"`
RootCause string `json:"root_cause"`
ResolutionPlan string `json:"resolution_plan"`
Confidence string `json:"confidence"`
}
// Command represents a command to be executed
type Command struct {
ID string `json:"id"`
Command string `json:"command"`
Description string `json:"description"`
}
// AgentConfig holds configuration for concurrent execution
type AgentConfig struct {
MaxConcurrentTasks int `json:"max_concurrent_tasks"`
CollectiveResults bool `json:"collective_results"`
}
// DefaultAgentConfig returns default configuration
func DefaultAgentConfig() *AgentConfig {
return &AgentConfig{
MaxConcurrentTasks: 10, // Default to 10 concurrent forks
CollectiveResults: true, // Send results collectively when all finish
}
}
// CommandResult represents the result of executing a command
type CommandResult struct {
ID string `json:"id"`
Command string `json:"command"`
Output string `json:"output"`
ExitCode int `json:"exit_code"`
Error string `json:"error,omitempty"`
}
// LinuxDiagnosticAgent represents the main agent
type LinuxDiagnosticAgent struct {
client *openai.Client
model string
executor *CommandExecutor
episodeID string // TensorZero episode ID for conversation continuity
ebpfManager *BCCTraceManager // BCC-style eBPF tracing capabilities
config *AgentConfig // Configuration for concurrent execution
}
// NewLinuxDiagnosticAgent creates a new diagnostic agent
func NewLinuxDiagnosticAgent() *LinuxDiagnosticAgent {
// Get Supabase project URL for TensorZero proxy
supabaseURL := os.Getenv("SUPABASE_PROJECT_URL")
if supabaseURL == "" {
fmt.Printf("Warning: SUPABASE_PROJECT_URL not set, TensorZero integration will not work\n")
supabaseURL = "https://gpqzsricripnvbrpsyws.supabase.co" // fallback
}
model := os.Getenv("NANNYAPI_MODEL")
if model == "" {
model = "tensorzero::function_name::diagnose_and_heal"
fmt.Printf("Warning: Using default model '%s'. Set NANNYAPI_MODEL environment variable for your specific function.\n", model)
}
// Note: We don't use the OpenAI client anymore, we use direct HTTP to Supabase proxy
agent := &LinuxDiagnosticAgent{
client: nil, // Not used anymore
model: model,
executor: NewCommandExecutor(10 * time.Second), // 10 second timeout for commands
config: DefaultAgentConfig(), // Default concurrent execution config
}
// Initialize BCC-style eBPF capabilities
agent.ebpfManager = NewBCCTraceManager()
return agent
}
// DiagnoseIssue starts the diagnostic process for a given issue
func (a *LinuxDiagnosticAgent) DiagnoseIssue(issue string) error {
fmt.Printf("Diagnosing issue: %s\n", issue)
fmt.Println("Gathering system information...")
// Gather system information
systemInfo := GatherSystemInfo()
// Format the initial prompt with system information
initialPrompt := FormatSystemInfoForPrompt(systemInfo) + "\n" + issue
// Start conversation with initial issue including system info
messages := []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleUser,
Content: initialPrompt,
},
}
for {
// Send request to TensorZero API via OpenAI SDK
response, err := a.sendRequestWithEpisode(messages, a.episodeID)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
if len(response.Choices) == 0 {
return fmt.Errorf("no choices in response")
}
content := response.Choices[0].Message.Content
fmt.Printf("\nAI Response:\n%s\n", content)
// Parse the response to determine next action
var diagnosticResp EBPFEnhancedDiagnosticResponse
var resolutionResp ResolutionResponse
// Try to parse as diagnostic response first (with eBPF support)
if err := json.Unmarshal([]byte(content), &diagnosticResp); err == nil && diagnosticResp.ResponseType == "diagnostic" {
// Handle diagnostic phase
fmt.Printf("\nReasoning: %s\n", diagnosticResp.Reasoning)
// Execute commands and collect results
commandResults := make([]CommandResult, 0, len(diagnosticResp.Commands))
if len(diagnosticResp.Commands) > 0 {
fmt.Printf("🔧 Executing diagnostic commands...\n")
for i, cmdStr := range diagnosticResp.Commands {
// Convert string to Command struct
cmd := Command{
ID: fmt.Sprintf("cmd_%d", i),
Command: cmdStr,
Description: fmt.Sprintf("Diagnostic command: %s", cmdStr),
}
result := a.executor.Execute(cmd)
commandResults = append(commandResults, result)
if result.ExitCode != 0 {
fmt.Printf("❌ Command '%s' failed with exit code %d\n", cmd.ID, result.ExitCode)
}
}
}
// Execute eBPF programs if present - support both old and new formats
var ebpfResults []map[string]interface{}
if len(diagnosticResp.EBPFPrograms) > 0 {
fmt.Printf("🔬 AI requested %d eBPF traces for enhanced diagnostics\n", len(diagnosticResp.EBPFPrograms))
// Convert EBPFPrograms to TraceSpecs and execute concurrently
traceSpecs := a.convertEBPFProgramsToTraceSpecs(diagnosticResp.EBPFPrograms)
ebpfResults = a.executeBCCTracesConcurrently(traceSpecs)
}
// Prepare combined results as user message
allResults := map[string]interface{}{
"command_results": commandResults,
"executed_commands": len(commandResults),
}
// Include eBPF results if any were executed
if len(ebpfResults) > 0 {
allResults["ebpf_results"] = ebpfResults
allResults["executed_ebpf_programs"] = len(ebpfResults)
// Extract evidence summary for TensorZero
evidenceSummary := make([]string, 0)
for _, result := range ebpfResults {
name := result["name"]
eventCount := result["data_points"]
description := result["description"]
status := result["status"]
summaryStr := fmt.Sprintf("%s: %v events (%s) - %s", name, eventCount, status, description)
evidenceSummary = append(evidenceSummary, summaryStr)
}
allResults["ebpf_evidence_summary"] = evidenceSummary
}
resultsJSON, err := json.MarshalIndent(allResults, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal command results: %w", err)
}
// Add AI response and command results to conversation
messages = append(messages, openai.ChatCompletionMessage{
Role: openai.ChatMessageRoleAssistant,
Content: content,
})
messages = append(messages, openai.ChatCompletionMessage{
Role: openai.ChatMessageRoleUser,
Content: string(resultsJSON),
})
continue
}
// Try to parse as resolution response
if err := json.Unmarshal([]byte(content), &resolutionResp); err == nil && resolutionResp.ResponseType == "resolution" {
// Handle resolution phase
fmt.Printf("\n=== DIAGNOSIS COMPLETE ===\n")
fmt.Printf("Root Cause: %s\n", resolutionResp.RootCause)
fmt.Printf("Resolution Plan: %s\n", resolutionResp.ResolutionPlan)
fmt.Printf("Confidence: %s\n", resolutionResp.Confidence)
break
}
// If we can't parse the response, treat it as an error or unexpected format
fmt.Printf("Unexpected response format or error from AI:\n%s\n", content)
break
}
return nil
}
// sendRequest sends a request to TensorZero via Supabase proxy (without episode ID)
func (a *LinuxDiagnosticAgent) sendRequest(messages []openai.ChatCompletionMessage) (*openai.ChatCompletionResponse, error) {
return a.sendRequestWithEpisode(messages, "")
}
// sendRequestWithEpisode sends a request to TensorZero via Supabase proxy with episode ID for conversation continuity
func (a *LinuxDiagnosticAgent) sendRequestWithEpisode(messages []openai.ChatCompletionMessage, episodeID string) (*openai.ChatCompletionResponse, error) {
// Convert messages to the expected format
messageMaps := make([]map[string]interface{}, len(messages))
for i, msg := range messages {
messageMaps[i] = map[string]interface{}{
"role": msg.Role,
"content": msg.Content,
}
}
// Create TensorZero request
tzRequest := map[string]interface{}{
"model": a.model,
"messages": messageMaps,
}
// Add episode ID if provided
if episodeID != "" {
tzRequest["tensorzero::episode_id"] = episodeID
}
// Marshal request
requestBody, err := json.Marshal(tzRequest)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
// Get Supabase URL
supabaseURL := os.Getenv("SUPABASE_PROJECT_URL")
if supabaseURL == "" {
return nil, fmt.Errorf("SUPABASE_PROJECT_URL not set")
}
// Create HTTP request to TensorZero proxy
endpoint := fmt.Sprintf("%s/functions/v1/tensorzero-proxy", supabaseURL)
req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(requestBody))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
// Set headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
// Note: No authentication needed for TensorZero proxy based on the existing pattern
// Send request
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
// Check status code
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("TensorZero proxy error: %d, body: %s", resp.StatusCode, string(body))
}
// Parse response
var tzResponse map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&tzResponse); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
// Convert to OpenAI format for compatibility
choices, ok := tzResponse["choices"].([]interface{})
if !ok || len(choices) == 0 {
return nil, fmt.Errorf("no choices in response")
}
// Extract the first choice
firstChoice, ok := choices[0].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid choice format")
}
message, ok := firstChoice["message"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid message format")
}
content, ok := message["content"].(string)
if !ok {
return nil, fmt.Errorf("invalid content format")
}
// Create OpenAI-compatible response
response := &openai.ChatCompletionResponse{
Choices: []openai.ChatCompletionChoice{
{
Message: openai.ChatCompletionMessage{
Role: openai.ChatMessageRoleAssistant,
Content: content,
},
},
},
}
// Update episode ID if provided in response
if respEpisodeID, ok := tzResponse["episode_id"].(string); ok && respEpisodeID != "" {
a.episodeID = respEpisodeID
}
return response, nil
}
// convertEBPFProgramsToTraceSpecs converts old EBPFProgram format to new TraceSpec format
func (a *LinuxDiagnosticAgent) convertEBPFProgramsToTraceSpecs(ebpfPrograms []EBPFRequest) []TraceSpec {
var traceSpecs []TraceSpec
for _, prog := range ebpfPrograms {
spec := a.convertToTraceSpec(prog)
traceSpecs = append(traceSpecs, spec)
}
return traceSpecs
}
// convertToTraceSpec converts an EBPFRequest to a TraceSpec for BCC-style tracing
func (a *LinuxDiagnosticAgent) convertToTraceSpec(prog EBPFRequest) TraceSpec {
// Determine probe type based on target and type
probeType := "p" // default to kprobe
target := prog.Target
if strings.HasPrefix(target, "tracepoint:") {
probeType = "t"
target = strings.TrimPrefix(target, "tracepoint:")
} else if strings.HasPrefix(target, "kprobe:") {
probeType = "p"
target = strings.TrimPrefix(target, "kprobe:")
} else if prog.Type == "tracepoint" {
probeType = "t"
} else if prog.Type == "syscall" {
// Convert syscall names to kprobe targets
if !strings.HasPrefix(target, "__x64_sys_") && !strings.Contains(target, ":") {
if strings.HasPrefix(target, "sys_") {
target = "__x64_" + target
} else {
target = "__x64_sys_" + target
}
}
probeType = "p"
}
// Set default duration if not specified
duration := prog.Duration
if duration <= 0 {
duration = 5 // default 5 seconds
}
return TraceSpec{
ProbeType: probeType,
Target: target,
Format: prog.Description, // Use description as format
Arguments: []string{}, // Start with no arguments for compatibility
Duration: duration,
}
}
// executeBCCTracesConcurrently executes multiple BCC traces concurrently with configurable parallelism
func (a *LinuxDiagnosticAgent) executeBCCTracesConcurrently(traceSpecs []TraceSpec) []map[string]interface{} {
if len(traceSpecs) == 0 {
return []map[string]interface{}{}
}
fmt.Printf("🚀 Executing %d BCC traces with max %d concurrent tasks\n", len(traceSpecs), a.config.MaxConcurrentTasks)
// Channel to limit concurrent goroutines
semaphore := make(chan struct{}, a.config.MaxConcurrentTasks)
resultsChan := make(chan map[string]interface{}, len(traceSpecs))
var wg sync.WaitGroup
// Start all traces concurrently
for i, spec := range traceSpecs {
wg.Add(1)
go func(index int, traceSpec TraceSpec) {
defer wg.Done()
// Acquire semaphore
semaphore <- struct{}{}
defer func() { <-semaphore }()
result := a.executeSingleBCCTrace(index, traceSpec)
resultsChan <- result
}(i, spec)
}
// Wait for all traces to complete
go func() {
wg.Wait()
close(resultsChan)
}()
// Collect all results
var allResults []map[string]interface{}
for result := range resultsChan {
allResults = append(allResults, result)
}
if a.config.CollectiveResults {
fmt.Printf("✅ All %d BCC traces completed. Sending collective results to API layer.\n", len(allResults))
}
return allResults
}
// executeSingleBCCTrace executes a single BCC trace and returns the result
func (a *LinuxDiagnosticAgent) executeSingleBCCTrace(index int, spec TraceSpec) map[string]interface{} {
result := map[string]interface{}{
"index": index,
"target": spec.Target,
"probe_type": spec.ProbeType,
"success": false,
"error": "",
"start_time": time.Now().Format(time.RFC3339),
}
fmt.Printf("🔍 [Task %d] Starting BCC trace: %s (type: %s)\n", index, spec.Target, spec.ProbeType)
// Start the trace
traceID, err := a.ebpfManager.StartTrace(spec)
if err != nil {
result["error"] = fmt.Sprintf("Failed to start trace: %v", err)
fmt.Printf("❌ [Task %d] Failed to start trace %s: %v\n", index, spec.Target, err)
return result
}
result["trace_id"] = traceID
fmt.Printf("🚀 [Task %d] Trace %s started with ID: %s\n", index, spec.Target, traceID)
// Wait for the trace duration
time.Sleep(time.Duration(spec.Duration) * time.Second)
// Get the trace result
traceResult, err := a.ebpfManager.GetTraceResult(traceID)
if err != nil {
// Try to stop the trace if it's still running
a.ebpfManager.StopTrace(traceID)
result["error"] = fmt.Sprintf("Failed to get trace results: %v", err)
fmt.Printf("❌ [Task %d] Failed to get results for trace %s: %v\n", index, spec.Target, err)
return result
}
// Populate result with trace data
result["success"] = true
result["end_time"] = time.Now().Format(time.RFC3339)
result["event_count"] = traceResult.EventCount
result["events_per_second"] = traceResult.Statistics.EventsPerSecond
result["duration"] = traceResult.EndTime.Sub(traceResult.StartTime).Seconds()
result["summary"] = traceResult.Summary
// Include sample events (limit to avoid large payloads)
maxSampleEvents := 10
if len(traceResult.Events) > 0 {
sampleCount := len(traceResult.Events)
if sampleCount > maxSampleEvents {
sampleCount = maxSampleEvents
}
sampleEvents := make([]map[string]interface{}, sampleCount)
for i := 0; i < sampleCount; i++ {
event := traceResult.Events[i]
sampleEvents[i] = map[string]interface{}{
"pid": event.PID,
"tid": event.TID,
"process_name": event.ProcessName,
"message": event.Message,
"timestamp": event.Timestamp,
}
}
result["sample_events"] = sampleEvents
}
// Include top processes
if len(traceResult.Statistics.TopProcesses) > 0 {
topProcesses := make([]map[string]interface{}, len(traceResult.Statistics.TopProcesses))
for i, proc := range traceResult.Statistics.TopProcesses {
topProcesses[i] = map[string]interface{}{
"process_name": proc.ProcessName,
"event_count": proc.EventCount,
"percentage": proc.Percentage,
}
}
result["top_processes"] = topProcesses
}
fmt.Printf("✅ [Task %d] Trace %s completed: %d events (%.2f events/sec)\n",
index, spec.Target, traceResult.EventCount, traceResult.Statistics.EventsPerSecond)
return result
}