package worker import ( "database/sql" "fmt" "log" "os" "sync" "pdf-form-api/ai" "pdf-form-api/db" "pdf-form-api/models" ) type Config struct { BaseURL string APIKey string Model string VisionBaseURL string VisionAPIKey string VisionModel string } const maxVisionRetries = 3 func ProcessPDFQuestions(dbConn *sql.DB, pdfID int64, filePath string, fields []models.FormField, cfg Config, storeDir string) { if cfg.APIKey == "" && cfg.VisionAPIKey == "" { log.Printf("[worker] skipping question generation: no API key configured") return } if len(fields) == 0 { log.Printf("[worker] pdf %d: no fields to process", pdfID) return } log.Printf("[worker] pdf %d: processing %d fields for question generation", pdfID, len(fields)) var visionResult *ai.VisionResult if cfg.VisionAPIKey != "" { log.Printf("[worker] pdf %d: using vision API (%s/%s)", pdfID, cfg.VisionBaseURL, cfg.VisionModel) visionClient := ai.NewClient(cfg.VisionBaseURL, cfg.VisionAPIKey, cfg.VisionModel) for attempt := 1; attempt <= maxVisionRetries; attempt++ { var err error visionResult, err = visionClient.GenerateQuestionsWithVisionAndImages(filePath, fields, storeDir, pdfID) if err != nil { log.Printf("[worker] pdf %d: vision generation attempt %d failed: %v", pdfID, attempt, err) visionResult = nil continue } // Validate: every field must be covered uncovered := validateCoverage(visionResult, fields) if len(uncovered) == 0 { log.Printf("[worker] pdf %d: vision generation successful on attempt %d (%d/%d fields covered)", pdfID, attempt, len(visionResult.Fields), len(fields)) break } log.Printf("[worker] pdf %d: attempt %d incomplete - %d uncovered fields: %v, retrying...", pdfID, attempt, len(uncovered), uncovered) visionResult = nil } if visionResult == nil { log.Printf("[worker] pdf %d: vision generation failed after %d attempts, falling back to text", pdfID, maxVisionRetries) } } if visionResult != nil { if visionResult.Description != "" { if err := db.UpdatePDFDescription(dbConn, pdfID, visionResult.Description); err != nil { log.Printf("[worker] pdf %d: error updating description: %v", pdfID, err) } } labelMap := visionResult.LabelMap if labelMap == nil { labelMap = buildLabelIDMap(fields) } applyVisionResults(dbConn, pdfID, visionResult, labelMap, len(fields)) return } // Text fallback if cfg.APIKey != "" { log.Printf("[worker] pdf %d: falling back to text API (%s/%s)", pdfID, cfg.BaseURL, cfg.Model) client := ai.NewClient(cfg.BaseURL, cfg.APIKey, cfg.Model) fieldResults, err := client.GenerateQuestions(filePath, fields) if err != nil { log.Printf("[worker] pdf %d: text generation failed: %v", pdfID, err) applyDefaults(dbConn, pdfID, fields) return } if len(fieldResults) > 0 { fieldIDMap := buildFieldIDMap(fields) applyTextResults(dbConn, pdfID, fieldResults, fieldIDMap) // Check for uncovered fields and apply defaults coveredIDs := make(map[int64]bool) for _, r := range fieldResults { if id, ok := fieldIDMap[r.Name]; ok { coveredIDs[id] = true } } uncoveredFields := fieldsIDsNotIn(fields, coveredIDs) if len(uncoveredFields) > 0 { log.Printf("[worker] pdf %d: text fallback missed %d fields, applying defaults", pdfID, len(uncoveredFields)) applyDefaults(dbConn, pdfID, uncoveredFields) } return } } log.Printf("[worker] pdf %d: all generation methods failed, applying defaults", pdfID) applyDefaults(dbConn, pdfID, fields) } // validateCoverage checks that every field has a matching entry in the vision result. func validateCoverage(vr *ai.VisionResult, fields []models.FormField) []int64 { labelMap := vr.LabelMap if labelMap == nil { labelMap = buildLabelIDMap(fields) } covered := make(map[int64]bool) for _, entry := range vr.Fields { if id, ok := labelMap[entry.Label]; ok && entry.Question != "" && entry.WizardPage > 0 { covered[id] = true } } var uncovered []int64 for _, f := range fields { if !covered[f.ID] { uncovered = append(uncovered, f.ID) } } return uncovered } // fieldsIDsNotIn returns fields whose ID is not in the given set. func fieldsIDsNotIn(fields []models.FormField, ids map[int64]bool) []models.FormField { var out []models.FormField for _, f := range fields { if !ids[f.ID] { out = append(out, f) } } return out } func applyVisionResults(dbConn *sql.DB, pdfID int64, vr *ai.VisionResult, labelMap map[string]int64, totalFields int) { covered := make(map[int64]bool) var mu sync.Mutex var wg sync.WaitGroup updateLimit := make(chan struct{}, 5) for _, entry := range vr.Fields { updateLimit <- struct{}{} wg.Add(1) go func(entry ai.VisionFieldEntry) { defer wg.Done() defer func() { <-updateLimit }() fieldID, ok := labelMap[entry.Label] if !ok { log.Printf("[worker] pdf %d: label %s not found in map, skipping", pdfID, entry.Label) return } if err := db.UpdateFormFieldQuestion(dbConn, fieldID, entry.Question); err != nil { log.Printf("[worker] pdf %d: error updating question for %s: %v", pdfID, entry.Label, err) return } if entry.ValueGroup != "" { if err := db.UpdateFormFieldValueGroup(dbConn, fieldID, entry.ValueGroup); err != nil { log.Printf("[worker] pdf %d: error setting value_group for %s: %v", pdfID, entry.Label, err) } } if entry.WizardPage > 0 { if err := db.UpdateFormFieldWizardPage(dbConn, fieldID, entry.WizardPage); err != nil { log.Printf("[worker] pdf %d: error setting wizard_page for %s: %v", pdfID, entry.Label, err) } } mu.Lock() covered[fieldID] = true log.Printf("[worker] pdf %d: updated %s (q=%q, vg=%q, wp=%d)", pdfID, entry.Label, entry.Question, entry.ValueGroup, entry.WizardPage) mu.Unlock() }(entry) } wg.Wait() log.Printf("[worker] pdf %d: vision complete (%d/%d fields updated)", pdfID, len(covered), totalFields) } func applyTextResults(dbConn *sql.DB, pdfID int64, results []ai.FieldResult, fieldIDMap map[string]int64) { var mu sync.Mutex var wg sync.WaitGroup updateLimit := make(chan struct{}, 5) for _, r := range results { updateLimit <- struct{}{} wg.Add(1) go func(name, question string) { defer wg.Done() defer func() { <-updateLimit }() mu.Lock() fieldID, ok := fieldIDMap[name] mu.Unlock() if !ok { log.Printf("[worker] pdf %d: field %s not found in map, skipping", pdfID, name) return } if err := db.UpdateFormFieldQuestion(dbConn, fieldID, question); err != nil { log.Printf("[worker] pdf %d: error updating question for %s: %v", pdfID, name, err) } else { log.Printf("[worker] pdf %d: updated question for %s", pdfID, name) } }(r.Name, r.Question) } wg.Wait() log.Printf("[worker] pdf %d: text generation complete (%d questions saved)", pdfID, len(results)) } func applyDefaults(dbConn *sql.DB, pdfID int64, fields []models.FormField) { for i, f := range fields { question := fmt.Sprintf("What is the %s?", f.Name) if err := db.UpdateFormFieldQuestion(dbConn, f.ID, question); err != nil { log.Printf("[worker] pdf %d: error setting default question for %s: %v", pdfID, f.Name, err) } wp := i + 1 if err := db.UpdateFormFieldWizardPage(dbConn, f.ID, wp); err != nil { log.Printf("[worker] pdf %d: error setting default wizard_page for %s: %v", pdfID, f.Name, err) } } log.Printf("[worker] pdf %d: applied defaults to %d fields", pdfID, len(fields)) } func buildFieldIDMap(fields []models.FormField) map[string]int64 { m := make(map[string]int64, len(fields)) for _, f := range fields { m[f.Name] = f.ID } return m } func buildLabelIDMap(fields []models.FormField) map[string]int64 { m := make(map[string]int64, len(fields)) for _, f := range fields { m[fmt.Sprintf("F%d", f.ID)] = f.ID } return m } func DefaultConfig() Config { visionURL := envOrDefault("VISION_BASE_URL", "https://api.openai.com/v1") visionKey := envOrDefault("OPENAI_API_KEY", envOrDefault("VISION_API_KEY", "")) visionModel := envOrDefault("VISION_MODEL", "gpt-4o-mini") return Config{ BaseURL: envOrDefault("AI_BASE_URL", "https://api.deepseek.com"), APIKey: envOrDefault("AI_API_KEY", envOrDefault("DEEPSEEK_API_KEY", "")), Model: envOrDefault("AI_MODEL", "deepseek-chat"), VisionBaseURL: visionURL, VisionAPIKey: visionKey, VisionModel: visionModel, } } func envOrDefault(key, defaultVal string) string { if v := os.Getenv(key); v != "" { return v } return defaultVal }