From b6b4f1cba0ce7ac8acea8a39251416c659c39580 Mon Sep 17 00:00:00 2001 From: nicoeri Date: Thu, 15 Jan 2026 22:16:54 -0300 Subject: [PATCH] Add bulk extraction runs and configurable parallel execution limit --- .../Controllers/ExtractionRunsController.cs | 16 +++++ ScrapperAPI/Dtos/ExtractionRunDtos.cs | 9 +++ .../Interfaces/IExtractionCoordinator.cs | 6 ++ ScrapperAPI/Options/ExtractionOptions.cs | 9 +++ ScrapperAPI/Program.cs | 1 + ScrapperAPI/Workers/ExtractionCoordinator.cs | 66 ++++++++++++++++++- ScrapperAPI/appsettings.Development.json | 3 + ScrapperAPI/appsettings.json | 3 + 8 files changed, 111 insertions(+), 2 deletions(-) create mode 100644 ScrapperAPI/Options/ExtractionOptions.cs diff --git a/ScrapperAPI/Controllers/ExtractionRunsController.cs b/ScrapperAPI/Controllers/ExtractionRunsController.cs index f2c87ef..c711623 100644 --- a/ScrapperAPI/Controllers/ExtractionRunsController.cs +++ b/ScrapperAPI/Controllers/ExtractionRunsController.cs @@ -32,6 +32,22 @@ public sealed class ExtractionRunsController : ControllerBase return Accepted(new { runId }); } + /// + /// Inicia extrações em massa (1 run por session). + /// POST /extraction-runs/bulk + /// { + /// "modelId": 10, + /// "sessionIds": [1,2,3], // opcional; se omitido/vazio, roda para todas as sessions + /// "onlyDone": true + /// } + /// + [HttpPost("bulk")] + public async Task StartBulk([FromBody] BulkStartExtractionRequest req, CancellationToken ct) + { + var runIds = await _coord.StartBulkRunsAsync(req, ct); + return Accepted(new { runIds, count = runIds.Count }); + } + [HttpGet("{runId:long}")] public async Task GetRun(long runId, CancellationToken ct) { diff --git a/ScrapperAPI/Dtos/ExtractionRunDtos.cs b/ScrapperAPI/Dtos/ExtractionRunDtos.cs index 35bac92..71f8150 100644 --- a/ScrapperAPI/Dtos/ExtractionRunDtos.cs +++ b/ScrapperAPI/Dtos/ExtractionRunDtos.cs @@ -9,6 +9,15 @@ public sealed record StartExtractionRequest( bool OnlyDone = true ); +public sealed record BulkStartExtractionRequest( + [Required] long ModelId, + /// + /// Se vazio/nulo, roda para todas as sessions existentes. + /// + int[]? SessionIds = null, + bool OnlyDone = true +); + public sealed record CreateExtractionRunDto( long ModelId, int SessionId diff --git a/ScrapperAPI/Interfaces/IExtractionCoordinator.cs b/ScrapperAPI/Interfaces/IExtractionCoordinator.cs index 102ff23..d24288a 100644 --- a/ScrapperAPI/Interfaces/IExtractionCoordinator.cs +++ b/ScrapperAPI/Interfaces/IExtractionCoordinator.cs @@ -9,6 +9,12 @@ public interface IExtractionCoordinator /// Task StartRunAsync(StartExtractionRequest request, CancellationToken ct); + /// + /// Cria runs para varias sessions e enfileira para processamento. + /// Se SessionIds estiver vazio/nulo, roda para todas as sessions existentes. + /// + Task> StartBulkRunsAsync(BulkStartExtractionRequest request, CancellationToken ct); + /// /// Retorna status em tempo real (se estiver rodando). /// diff --git a/ScrapperAPI/Options/ExtractionOptions.cs b/ScrapperAPI/Options/ExtractionOptions.cs new file mode 100644 index 0000000..2a9439d --- /dev/null +++ b/ScrapperAPI/Options/ExtractionOptions.cs @@ -0,0 +1,9 @@ +namespace ScrapperAPI.Options; + +public sealed class ExtractionOptions +{ + /// + /// Maximum number of extraction runs (sessions) processed in parallel by the ExtractionCoordinator. + /// + public int MaxParallelRuns { get; set; } = 3; +} \ No newline at end of file diff --git a/ScrapperAPI/Program.cs b/ScrapperAPI/Program.cs index be787db..5eeb8b6 100644 --- a/ScrapperAPI/Program.cs +++ b/ScrapperAPI/Program.cs @@ -62,6 +62,7 @@ builder.Services.AddAuthorization(options => }); builder.Services.Configure(builder.Configuration.GetSection("Scraper")); +builder.Services.Configure(builder.Configuration.GetSection("Extraction")); builder.Services.AddSingleton(sp => { diff --git a/ScrapperAPI/Workers/ExtractionCoordinator.cs b/ScrapperAPI/Workers/ExtractionCoordinator.cs index a61870c..a162aa5 100644 --- a/ScrapperAPI/Workers/ExtractionCoordinator.cs +++ b/ScrapperAPI/Workers/ExtractionCoordinator.cs @@ -1,9 +1,12 @@ using System.Collections.Concurrent; +using System.Linq; using System.Threading.Channels; using System.Text.Json; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using ScrapperAPI.Dtos; using ScrapperAPI.Interfaces; +using ScrapperAPI.Options; using ScrapperAPI.Services; using ScrapperAPI.Utils; @@ -18,6 +21,8 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi private readonly ILogger _logger; private readonly ExtractionEngine _engine; + private readonly SemaphoreSlim _parallelRuns; + private readonly Channel _startRequests = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); @@ -26,11 +31,16 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi public ExtractionCoordinator( IServiceScopeFactory scopeFactory, ILogger logger, - ExtractionEngine engine) + ExtractionEngine engine, + IOptions opts) { _scopeFactory = scopeFactory; _logger = logger; _engine = engine; + + var max = opts.Value.MaxParallelRuns; + if (max <= 0) max = 1; + _parallelRuns = new SemaphoreSlim(max, max); } public async Task StartRunAsync(StartExtractionRequest request, CancellationToken ct) @@ -45,6 +55,43 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi return runId; } + public async Task> StartBulkRunsAsync(BulkStartExtractionRequest request, CancellationToken ct) + { + // Descobre sessions alvo (explicitamente ou todas) + int[] sessionIds; + + if (request.SessionIds is { Length: > 0 }) + { + sessionIds = request.SessionIds.Distinct().OrderBy(x => x).ToArray(); + } + else + { + using var scope = _scopeFactory.CreateScope(); + var sessions = scope.ServiceProvider.GetRequiredService(); + sessionIds = (await sessions.GetAllAsync(ct)).Select(s => s.Id).ToArray(); + } + + var runIds = new List(sessionIds.Length); + + // Cria todos os runs e enfileira + using (var scope = _scopeFactory.CreateScope()) + { + var runs = scope.ServiceProvider.GetRequiredService(); + + foreach (var sid in sessionIds) + { + ct.ThrowIfCancellationRequested(); + + var runId = await runs.CreateAsync(new CreateExtractionRunDto(request.ModelId, sid), ct); + _running[runId] = new Runtime(runId, sid, request.ModelId, request.OnlyDone); + await _startRequests.Writer.WriteAsync(runId, ct); + runIds.Add(runId); + } + } + + return runIds; + } + public ExtractionRuntimeStatus GetRuntimeStatus(long runId) { if (!_running.TryGetValue(runId, out var r)) @@ -77,7 +124,22 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi if (!_running.TryGetValue(runId, out var runtime)) continue; - _ = RunOnceAsync(runtime, stoppingToken); + _ = RunWithSemaphoreAsync(runtime, stoppingToken); + } + } + + private async Task RunWithSemaphoreAsync(Runtime runtime, CancellationToken hostToken) + { + await _parallelRuns.WaitAsync(hostToken); + try + { + await RunOnceAsync(runtime, hostToken); + } + finally + { + _parallelRuns.Release(); + // Remove do dicionario para evitar crescer pra sempre + _running.TryRemove(runtime.RunId, out _); } } diff --git a/ScrapperAPI/appsettings.Development.json b/ScrapperAPI/appsettings.Development.json index 033e1bb..60701f5 100644 --- a/ScrapperAPI/appsettings.Development.json +++ b/ScrapperAPI/appsettings.Development.json @@ -9,5 +9,8 @@ "Authority": "https://auth.evolucao.io/application/o/web-scrapper/", "Audience": "qbwOof0fnJzIQhiDsM0Kd41dw7YB0Ab15FbnZxHM", "RequireHttpsMetadata": true + }, + "Extraction": { + "MaxParallelRuns": 3 } } \ No newline at end of file diff --git a/ScrapperAPI/appsettings.json b/ScrapperAPI/appsettings.json index 613ac2e..72e3efd 100644 --- a/ScrapperAPI/appsettings.json +++ b/ScrapperAPI/appsettings.json @@ -20,6 +20,9 @@ "MaxDelayMs": 8000 } }, + "Extraction": { + "MaxParallelRuns": 3 + }, "AllowedHosts": "*", "Authentication": { "Authority": "https://auth.evolucao.io/application/o/web-scrapper/",