diff --git a/ScrapperAPI/Controllers/ExtractionRunsController.cs b/ScrapperAPI/Controllers/ExtractionRunsController.cs
index f2c87ef..c711623 100644
--- a/ScrapperAPI/Controllers/ExtractionRunsController.cs
+++ b/ScrapperAPI/Controllers/ExtractionRunsController.cs
@@ -32,6 +32,22 @@ public sealed class ExtractionRunsController : ControllerBase
return Accepted(new { runId });
}
+ ///
+ /// Inicia extrações em massa (1 run por session).
+ /// POST /extraction-runs/bulk
+ /// {
+ /// "modelId": 10,
+ /// "sessionIds": [1,2,3], // opcional; se omitido/vazio, roda para todas as sessions
+ /// "onlyDone": true
+ /// }
+ ///
+ [HttpPost("bulk")]
+ public async Task StartBulk([FromBody] BulkStartExtractionRequest req, CancellationToken ct)
+ {
+ var runIds = await _coord.StartBulkRunsAsync(req, ct);
+ return Accepted(new { runIds, count = runIds.Count });
+ }
+
[HttpGet("{runId:long}")]
public async Task GetRun(long runId, CancellationToken ct)
{
diff --git a/ScrapperAPI/Dtos/ExtractionRunDtos.cs b/ScrapperAPI/Dtos/ExtractionRunDtos.cs
index 35bac92..71f8150 100644
--- a/ScrapperAPI/Dtos/ExtractionRunDtos.cs
+++ b/ScrapperAPI/Dtos/ExtractionRunDtos.cs
@@ -9,6 +9,15 @@ public sealed record StartExtractionRequest(
bool OnlyDone = true
);
+public sealed record BulkStartExtractionRequest(
+ [Required] long ModelId,
+ ///
+ /// Se vazio/nulo, roda para todas as sessions existentes.
+ ///
+ int[]? SessionIds = null,
+ bool OnlyDone = true
+);
+
public sealed record CreateExtractionRunDto(
long ModelId,
int SessionId
diff --git a/ScrapperAPI/Interfaces/IExtractionCoordinator.cs b/ScrapperAPI/Interfaces/IExtractionCoordinator.cs
index 102ff23..d24288a 100644
--- a/ScrapperAPI/Interfaces/IExtractionCoordinator.cs
+++ b/ScrapperAPI/Interfaces/IExtractionCoordinator.cs
@@ -9,6 +9,12 @@ public interface IExtractionCoordinator
///
Task StartRunAsync(StartExtractionRequest request, CancellationToken ct);
+ ///
+ /// Cria runs para varias sessions e enfileira para processamento.
+ /// Se SessionIds estiver vazio/nulo, roda para todas as sessions existentes.
+ ///
+ Task> StartBulkRunsAsync(BulkStartExtractionRequest request, CancellationToken ct);
+
///
/// Retorna status em tempo real (se estiver rodando).
///
diff --git a/ScrapperAPI/Options/ExtractionOptions.cs b/ScrapperAPI/Options/ExtractionOptions.cs
new file mode 100644
index 0000000..2a9439d
--- /dev/null
+++ b/ScrapperAPI/Options/ExtractionOptions.cs
@@ -0,0 +1,9 @@
+namespace ScrapperAPI.Options;
+
+public sealed class ExtractionOptions
+{
+ ///
+ /// Maximum number of extraction runs (sessions) processed in parallel by the ExtractionCoordinator.
+ ///
+ public int MaxParallelRuns { get; set; } = 3;
+}
\ No newline at end of file
diff --git a/ScrapperAPI/Program.cs b/ScrapperAPI/Program.cs
index be787db..5eeb8b6 100644
--- a/ScrapperAPI/Program.cs
+++ b/ScrapperAPI/Program.cs
@@ -62,6 +62,7 @@ builder.Services.AddAuthorization(options =>
});
builder.Services.Configure(builder.Configuration.GetSection("Scraper"));
+builder.Services.Configure(builder.Configuration.GetSection("Extraction"));
builder.Services.AddSingleton(sp =>
{
diff --git a/ScrapperAPI/Workers/ExtractionCoordinator.cs b/ScrapperAPI/Workers/ExtractionCoordinator.cs
index a61870c..a162aa5 100644
--- a/ScrapperAPI/Workers/ExtractionCoordinator.cs
+++ b/ScrapperAPI/Workers/ExtractionCoordinator.cs
@@ -1,9 +1,12 @@
using System.Collections.Concurrent;
+using System.Linq;
using System.Threading.Channels;
using System.Text.Json;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using ScrapperAPI.Dtos;
using ScrapperAPI.Interfaces;
+using ScrapperAPI.Options;
using ScrapperAPI.Services;
using ScrapperAPI.Utils;
@@ -18,6 +21,8 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi
private readonly ILogger _logger;
private readonly ExtractionEngine _engine;
+ private readonly SemaphoreSlim _parallelRuns;
+
private readonly Channel _startRequests = Channel.CreateUnbounded(
new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
@@ -26,11 +31,16 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi
public ExtractionCoordinator(
IServiceScopeFactory scopeFactory,
ILogger logger,
- ExtractionEngine engine)
+ ExtractionEngine engine,
+ IOptions opts)
{
_scopeFactory = scopeFactory;
_logger = logger;
_engine = engine;
+
+ var max = opts.Value.MaxParallelRuns;
+ if (max <= 0) max = 1;
+ _parallelRuns = new SemaphoreSlim(max, max);
}
public async Task StartRunAsync(StartExtractionRequest request, CancellationToken ct)
@@ -45,6 +55,43 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi
return runId;
}
+ public async Task> StartBulkRunsAsync(BulkStartExtractionRequest request, CancellationToken ct)
+ {
+ // Descobre sessions alvo (explicitamente ou todas)
+ int[] sessionIds;
+
+ if (request.SessionIds is { Length: > 0 })
+ {
+ sessionIds = request.SessionIds.Distinct().OrderBy(x => x).ToArray();
+ }
+ else
+ {
+ using var scope = _scopeFactory.CreateScope();
+ var sessions = scope.ServiceProvider.GetRequiredService();
+ sessionIds = (await sessions.GetAllAsync(ct)).Select(s => s.Id).ToArray();
+ }
+
+ var runIds = new List(sessionIds.Length);
+
+ // Cria todos os runs e enfileira
+ using (var scope = _scopeFactory.CreateScope())
+ {
+ var runs = scope.ServiceProvider.GetRequiredService();
+
+ foreach (var sid in sessionIds)
+ {
+ ct.ThrowIfCancellationRequested();
+
+ var runId = await runs.CreateAsync(new CreateExtractionRunDto(request.ModelId, sid), ct);
+ _running[runId] = new Runtime(runId, sid, request.ModelId, request.OnlyDone);
+ await _startRequests.Writer.WriteAsync(runId, ct);
+ runIds.Add(runId);
+ }
+ }
+
+ return runIds;
+ }
+
public ExtractionRuntimeStatus GetRuntimeStatus(long runId)
{
if (!_running.TryGetValue(runId, out var r))
@@ -77,7 +124,22 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi
if (!_running.TryGetValue(runId, out var runtime))
continue;
- _ = RunOnceAsync(runtime, stoppingToken);
+ _ = RunWithSemaphoreAsync(runtime, stoppingToken);
+ }
+ }
+
+ private async Task RunWithSemaphoreAsync(Runtime runtime, CancellationToken hostToken)
+ {
+ await _parallelRuns.WaitAsync(hostToken);
+ try
+ {
+ await RunOnceAsync(runtime, hostToken);
+ }
+ finally
+ {
+ _parallelRuns.Release();
+ // Remove do dicionario para evitar crescer pra sempre
+ _running.TryRemove(runtime.RunId, out _);
}
}
diff --git a/ScrapperAPI/appsettings.Development.json b/ScrapperAPI/appsettings.Development.json
index 033e1bb..60701f5 100644
--- a/ScrapperAPI/appsettings.Development.json
+++ b/ScrapperAPI/appsettings.Development.json
@@ -9,5 +9,8 @@
"Authority": "https://auth.evolucao.io/application/o/web-scrapper/",
"Audience": "qbwOof0fnJzIQhiDsM0Kd41dw7YB0Ab15FbnZxHM",
"RequireHttpsMetadata": true
+ },
+ "Extraction": {
+ "MaxParallelRuns": 3
}
}
\ No newline at end of file
diff --git a/ScrapperAPI/appsettings.json b/ScrapperAPI/appsettings.json
index 613ac2e..72e3efd 100644
--- a/ScrapperAPI/appsettings.json
+++ b/ScrapperAPI/appsettings.json
@@ -20,6 +20,9 @@
"MaxDelayMs": 8000
}
},
+ "Extraction": {
+ "MaxParallelRuns": 3
+ },
"AllowedHosts": "*",
"Authentication": {
"Authority": "https://auth.evolucao.io/application/o/web-scrapper/",