using System.Collections.Concurrent; using System.Linq; using System.Threading.Channels; using System.Text.Json; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using ScrapperAPI.Dtos; using ScrapperAPI.Interfaces; using ScrapperAPI.Options; using ScrapperAPI.Services; using ScrapperAPI.Utils; namespace ScrapperAPI.Workers; /// /// Processa extrações (modelos) sobre o HTML já capturado (content.content_bytes gzip). /// public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordinator { private readonly IServiceScopeFactory _scopeFactory; private readonly ILogger _logger; private readonly ExtractionEngine _engine; private readonly SemaphoreSlim _parallelRuns; private readonly Channel _startRequests = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); private readonly ConcurrentDictionary _running = new(); public ExtractionCoordinator( IServiceScopeFactory scopeFactory, ILogger logger, ExtractionEngine engine, IOptions opts) { _scopeFactory = scopeFactory; _logger = logger; _engine = engine; var max = opts.Value.MaxParallelRuns; if (max <= 0) max = 1; _parallelRuns = new SemaphoreSlim(max, max); } public async Task StartRunAsync(StartExtractionRequest request, CancellationToken ct) { using var scope = _scopeFactory.CreateScope(); var runs = scope.ServiceProvider.GetRequiredService(); var runId = await runs.CreateAsync(new CreateExtractionRunDto(request.ModelId, request.SessionId), ct); _running[runId] = new Runtime(runId, request.SessionId, request.ModelId, request.OnlyDone); await _startRequests.Writer.WriteAsync(runId, ct); return runId; } public async Task> StartBulkRunsAsync(BulkStartExtractionRequest request, CancellationToken ct) { // Descobre sessions alvo (explicitamente ou todas) int[] sessionIds; if (request.SessionIds is { Length: > 0 }) { sessionIds = request.SessionIds.Distinct().OrderBy(x => x).ToArray(); } else { using var scope = _scopeFactory.CreateScope(); var sessions = scope.ServiceProvider.GetRequiredService(); sessionIds = (await sessions.GetAllAsync(ct)).Select(s => s.Id).ToArray(); } var runIds = new List(sessionIds.Length); // Cria todos os runs e enfileira using (var scope = _scopeFactory.CreateScope()) { var runs = scope.ServiceProvider.GetRequiredService(); foreach (var sid in sessionIds) { ct.ThrowIfCancellationRequested(); var runId = await runs.CreateAsync(new CreateExtractionRunDto(request.ModelId, sid), ct); _running[runId] = new Runtime(runId, sid, request.ModelId, request.OnlyDone); await _startRequests.Writer.WriteAsync(runId, ct); runIds.Add(runId); } } return runIds; } public ExtractionRuntimeStatus GetRuntimeStatus(long runId) { if (!_running.TryGetValue(runId, out var r)) return new ExtractionRuntimeStatus(runId, false, 0, 0, 0, 0, null); return new ExtractionRuntimeStatus( RunId: r.RunId, IsRunning: r.IsRunning, Processed: r.Processed, Total: r.Total, Succeeded: r.Succeeded, Failed: r.Failed, CurrentQueueId: r.CurrentQueueId ); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("ExtractionCoordinator started."); while (!stoppingToken.IsCancellationRequested) { long runId; try { runId = await _startRequests.Reader.ReadAsync(stoppingToken); } catch (OperationCanceledException) { break; } if (!_running.TryGetValue(runId, out var runtime)) continue; _ = RunWithSemaphoreAsync(runtime, stoppingToken); } } private async Task RunWithSemaphoreAsync(Runtime runtime, CancellationToken hostToken) { await _parallelRuns.WaitAsync(hostToken); try { await RunOnceAsync(runtime, hostToken); } finally { _parallelRuns.Release(); // Remove do dicionario para evitar crescer pra sempre _running.TryRemove(runtime.RunId, out _); } } private async Task RunOnceAsync(Runtime runtime, CancellationToken hostToken) { if (!runtime.TryEnter()) return; runtime.IsRunning = true; try { using var scope = _scopeFactory.CreateScope(); var models = scope.ServiceProvider.GetRequiredService(); var runs = scope.ServiceProvider.GetRequiredService(); var queue = scope.ServiceProvider.GetRequiredService(); var content = scope.ServiceProvider.GetRequiredService(); var extracted = scope.ServiceProvider.GetRequiredService(); var modelRow = await models.GetByIdAsync(runtime.ModelId, hostToken); if (modelRow is null) { await runs.MarkFailedAsync(runtime.RunId, $"Model not found: {runtime.ModelId}", hostToken); return; } await runs.MarkRunningAsync(runtime.RunId, hostToken); var statuses = runtime.OnlyDone ? new short[] { 2 } : null; var queueIds = await queue.ListQueueIdsAsync(runtime.SessionId, statuses, hostToken); runtime.Total = queueIds.Count; foreach (var qid in queueIds) { if (hostToken.IsCancellationRequested) break; runtime.CurrentQueueId = qid; runtime.Processed++; try { var row = await content.GetCompressedByQueueIdAsync(qid, hostToken); if (row is null || row.ContentBytes is null || row.ContentBytes.Length == 0) throw new InvalidOperationException("Content not found"); if (!string.Equals(row.ContentEncoding, "gzip", StringComparison.OrdinalIgnoreCase)) throw new InvalidOperationException($"Unsupported encoding: {row.ContentEncoding}"); var html = CompressionUtils.GzipDecompressUtf8(row.ContentBytes); using var json = _engine.Extract(html, modelRow.Definition.RootElement); await extracted.UpsertAsync(new UpsertExtractedDataDto( RunId: runtime.RunId, ModelId: runtime.ModelId, SessionId: runtime.SessionId, QueueId: qid, ExtractedJson: json, Success: true, Error: null ), hostToken); runtime.Succeeded++; } catch (Exception ex) { using var errJson = JsonDocument.Parse("{}"); await extracted.UpsertAsync(new UpsertExtractedDataDto( RunId: runtime.RunId, ModelId: runtime.ModelId, SessionId: runtime.SessionId, QueueId: qid, ExtractedJson: errJson, Success: false, Error: Truncate(ex.Message, 2000) ), hostToken); runtime.Failed++; } finally { runtime.CurrentQueueId = null; } } await runs.MarkDoneAsync(runtime.RunId, runtime.Total, runtime.Succeeded, runtime.Failed, hostToken); } catch (Exception ex) { try { using var scope = _scopeFactory.CreateScope(); var runs = scope.ServiceProvider.GetRequiredService(); await runs.MarkFailedAsync(runtime.RunId, Truncate(ex.ToString(), 8000), hostToken); } catch { // ignore double-fault } } finally { runtime.IsRunning = false; runtime.Exit(); } } private static string Truncate(string s, int max) => s.Length <= max ? s : s[..max]; private sealed class Runtime { private int _entered; public long RunId { get; } public int SessionId { get; } public long ModelId { get; } public bool OnlyDone { get; } public bool IsRunning { get; set; } public int Total { get; set; } public int Processed { get; set; } public int Succeeded { get; set; } public int Failed { get; set; } public int? CurrentQueueId { get; set; } public Runtime(long runId, int sessionId, long modelId, bool onlyDone) { RunId = runId; SessionId = sessionId; ModelId = modelId; OnlyDone = onlyDone; } public bool TryEnter() => Interlocked.CompareExchange(ref _entered, 1, 0) == 0; public void Exit() => Interlocked.Exchange(ref _entered, 0); } }