1
0

Add bulk extraction runs and configurable parallel execution limit

This commit is contained in:
Márcio Eric 2026-01-15 22:16:54 -03:00
parent 1ae977b3f1
commit b6b4f1cba0
8 changed files with 111 additions and 2 deletions

View File

@ -32,6 +32,22 @@ public sealed class ExtractionRunsController : ControllerBase
return Accepted(new { runId }); return Accepted(new { runId });
} }
/// <summary>
/// Inicia extrações em massa (1 run por session).
/// POST /extraction-runs/bulk
/// {
/// "modelId": 10,
/// "sessionIds": [1,2,3], // opcional; se omitido/vazio, roda para todas as sessions
/// "onlyDone": true
/// }
/// </summary>
[HttpPost("bulk")]
public async Task<IActionResult> StartBulk([FromBody] BulkStartExtractionRequest req, CancellationToken ct)
{
var runIds = await _coord.StartBulkRunsAsync(req, ct);
return Accepted(new { runIds, count = runIds.Count });
}
[HttpGet("{runId:long}")] [HttpGet("{runId:long}")]
public async Task<IActionResult> GetRun(long runId, CancellationToken ct) public async Task<IActionResult> GetRun(long runId, CancellationToken ct)
{ {

View File

@ -9,6 +9,15 @@ public sealed record StartExtractionRequest(
bool OnlyDone = true bool OnlyDone = true
); );
public sealed record BulkStartExtractionRequest(
[Required] long ModelId,
/// <summary>
/// Se vazio/nulo, roda para todas as sessions existentes.
/// </summary>
int[]? SessionIds = null,
bool OnlyDone = true
);
public sealed record CreateExtractionRunDto( public sealed record CreateExtractionRunDto(
long ModelId, long ModelId,
int SessionId int SessionId

View File

@ -9,6 +9,12 @@ public interface IExtractionCoordinator
/// </summary> /// </summary>
Task<long> StartRunAsync(StartExtractionRequest request, CancellationToken ct); Task<long> StartRunAsync(StartExtractionRequest request, CancellationToken ct);
/// <summary>
/// Cria runs para varias sessions e enfileira para processamento.
/// Se SessionIds estiver vazio/nulo, roda para todas as sessions existentes.
/// </summary>
Task<IReadOnlyList<long>> StartBulkRunsAsync(BulkStartExtractionRequest request, CancellationToken ct);
/// <summary> /// <summary>
/// Retorna status em tempo real (se estiver rodando). /// Retorna status em tempo real (se estiver rodando).
/// </summary> /// </summary>

View File

@ -0,0 +1,9 @@
namespace ScrapperAPI.Options;
public sealed class ExtractionOptions
{
/// <summary>
/// Maximum number of extraction runs (sessions) processed in parallel by the ExtractionCoordinator.
/// </summary>
public int MaxParallelRuns { get; set; } = 3;
}

View File

@ -62,6 +62,7 @@ builder.Services.AddAuthorization(options =>
}); });
builder.Services.Configure<ScraperOptions>(builder.Configuration.GetSection("Scraper")); builder.Services.Configure<ScraperOptions>(builder.Configuration.GetSection("Scraper"));
builder.Services.Configure<ExtractionOptions>(builder.Configuration.GetSection("Extraction"));
builder.Services.AddSingleton<IDomainRateLimiter>(sp => builder.Services.AddSingleton<IDomainRateLimiter>(sp =>
{ {

View File

@ -1,9 +1,12 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Channels; using System.Threading.Channels;
using System.Text.Json; using System.Text.Json;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScrapperAPI.Dtos; using ScrapperAPI.Dtos;
using ScrapperAPI.Interfaces; using ScrapperAPI.Interfaces;
using ScrapperAPI.Options;
using ScrapperAPI.Services; using ScrapperAPI.Services;
using ScrapperAPI.Utils; using ScrapperAPI.Utils;
@ -18,6 +21,8 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi
private readonly ILogger<ExtractionCoordinator> _logger; private readonly ILogger<ExtractionCoordinator> _logger;
private readonly ExtractionEngine _engine; private readonly ExtractionEngine _engine;
private readonly SemaphoreSlim _parallelRuns;
private readonly Channel<long> _startRequests = Channel.CreateUnbounded<long>( private readonly Channel<long> _startRequests = Channel.CreateUnbounded<long>(
new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
@ -26,11 +31,16 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi
public ExtractionCoordinator( public ExtractionCoordinator(
IServiceScopeFactory scopeFactory, IServiceScopeFactory scopeFactory,
ILogger<ExtractionCoordinator> logger, ILogger<ExtractionCoordinator> logger,
ExtractionEngine engine) ExtractionEngine engine,
IOptions<ExtractionOptions> opts)
{ {
_scopeFactory = scopeFactory; _scopeFactory = scopeFactory;
_logger = logger; _logger = logger;
_engine = engine; _engine = engine;
var max = opts.Value.MaxParallelRuns;
if (max <= 0) max = 1;
_parallelRuns = new SemaphoreSlim(max, max);
} }
public async Task<long> StartRunAsync(StartExtractionRequest request, CancellationToken ct) public async Task<long> StartRunAsync(StartExtractionRequest request, CancellationToken ct)
@ -45,6 +55,43 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi
return runId; return runId;
} }
public async Task<IReadOnlyList<long>> StartBulkRunsAsync(BulkStartExtractionRequest request, CancellationToken ct)
{
// Descobre sessions alvo (explicitamente ou todas)
int[] sessionIds;
if (request.SessionIds is { Length: > 0 })
{
sessionIds = request.SessionIds.Distinct().OrderBy(x => x).ToArray();
}
else
{
using var scope = _scopeFactory.CreateScope();
var sessions = scope.ServiceProvider.GetRequiredService<ISessionRepository>();
sessionIds = (await sessions.GetAllAsync(ct)).Select(s => s.Id).ToArray();
}
var runIds = new List<long>(sessionIds.Length);
// Cria todos os runs e enfileira
using (var scope = _scopeFactory.CreateScope())
{
var runs = scope.ServiceProvider.GetRequiredService<IExtractionRunRepository>();
foreach (var sid in sessionIds)
{
ct.ThrowIfCancellationRequested();
var runId = await runs.CreateAsync(new CreateExtractionRunDto(request.ModelId, sid), ct);
_running[runId] = new Runtime(runId, sid, request.ModelId, request.OnlyDone);
await _startRequests.Writer.WriteAsync(runId, ct);
runIds.Add(runId);
}
}
return runIds;
}
public ExtractionRuntimeStatus GetRuntimeStatus(long runId) public ExtractionRuntimeStatus GetRuntimeStatus(long runId)
{ {
if (!_running.TryGetValue(runId, out var r)) if (!_running.TryGetValue(runId, out var r))
@ -77,7 +124,22 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi
if (!_running.TryGetValue(runId, out var runtime)) if (!_running.TryGetValue(runId, out var runtime))
continue; continue;
_ = RunOnceAsync(runtime, stoppingToken); _ = RunWithSemaphoreAsync(runtime, stoppingToken);
}
}
private async Task RunWithSemaphoreAsync(Runtime runtime, CancellationToken hostToken)
{
await _parallelRuns.WaitAsync(hostToken);
try
{
await RunOnceAsync(runtime, hostToken);
}
finally
{
_parallelRuns.Release();
// Remove do dicionario para evitar crescer pra sempre
_running.TryRemove(runtime.RunId, out _);
} }
} }

View File

@ -9,5 +9,8 @@
"Authority": "https://auth.evolucao.io/application/o/web-scrapper/", "Authority": "https://auth.evolucao.io/application/o/web-scrapper/",
"Audience": "qbwOof0fnJzIQhiDsM0Kd41dw7YB0Ab15FbnZxHM", "Audience": "qbwOof0fnJzIQhiDsM0Kd41dw7YB0Ab15FbnZxHM",
"RequireHttpsMetadata": true "RequireHttpsMetadata": true
},
"Extraction": {
"MaxParallelRuns": 3
} }
} }

View File

@ -20,6 +20,9 @@
"MaxDelayMs": 8000 "MaxDelayMs": 8000
} }
}, },
"Extraction": {
"MaxParallelRuns": 3
},
"AllowedHosts": "*", "AllowedHosts": "*",
"Authentication": { "Authentication": {
"Authority": "https://auth.evolucao.io/application/o/web-scrapper/", "Authority": "https://auth.evolucao.io/application/o/web-scrapper/",