1
0
voyager-api/ScrapperAPI/Workers/ExtractionCoordinator.cs

220 lines
7.7 KiB
C#

using System.Collections.Concurrent;
using System.Threading.Channels;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using ScrapperAPI.Dtos;
using ScrapperAPI.Interfaces;
using ScrapperAPI.Services;
using ScrapperAPI.Utils;
namespace ScrapperAPI.Workers;
/// <summary>
/// Processa extrações (modelos) sobre o HTML já capturado (content.content_bytes gzip).
/// </summary>
public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordinator
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<ExtractionCoordinator> _logger;
private readonly ExtractionEngine _engine;
private readonly Channel<long> _startRequests = Channel.CreateUnbounded<long>(
new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
private readonly ConcurrentDictionary<long, Runtime> _running = new();
public ExtractionCoordinator(
IServiceScopeFactory scopeFactory,
ILogger<ExtractionCoordinator> logger,
ExtractionEngine engine)
{
_scopeFactory = scopeFactory;
_logger = logger;
_engine = engine;
}
public async Task<long> StartRunAsync(StartExtractionRequest request, CancellationToken ct)
{
using var scope = _scopeFactory.CreateScope();
var runs = scope.ServiceProvider.GetRequiredService<IExtractionRunRepository>();
var runId = await runs.CreateAsync(new CreateExtractionRunDto(request.ModelId, request.SessionId), ct);
_running[runId] = new Runtime(runId, request.SessionId, request.ModelId, request.OnlyDone);
await _startRequests.Writer.WriteAsync(runId, ct);
return runId;
}
public ExtractionRuntimeStatus GetRuntimeStatus(long runId)
{
if (!_running.TryGetValue(runId, out var r))
return new ExtractionRuntimeStatus(runId, false, 0, 0, 0, 0, null);
return new ExtractionRuntimeStatus(
RunId: r.RunId,
IsRunning: r.IsRunning,
Processed: r.Processed,
Total: r.Total,
Succeeded: r.Succeeded,
Failed: r.Failed,
CurrentQueueId: r.CurrentQueueId
);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("ExtractionCoordinator started.");
while (!stoppingToken.IsCancellationRequested)
{
long runId;
try
{
runId = await _startRequests.Reader.ReadAsync(stoppingToken);
}
catch (OperationCanceledException) { break; }
if (!_running.TryGetValue(runId, out var runtime))
continue;
_ = RunOnceAsync(runtime, stoppingToken);
}
}
private async Task RunOnceAsync(Runtime runtime, CancellationToken hostToken)
{
if (!runtime.TryEnter())
return;
runtime.IsRunning = true;
try
{
using var scope = _scopeFactory.CreateScope();
var models = scope.ServiceProvider.GetRequiredService<IExtractionModelRepository>();
var runs = scope.ServiceProvider.GetRequiredService<IExtractionRunRepository>();
var queue = scope.ServiceProvider.GetRequiredService<IQueueRepository>();
var content = scope.ServiceProvider.GetRequiredService<IContentRepository>();
var extracted = scope.ServiceProvider.GetRequiredService<IExtractedDataRepository>();
var modelRow = await models.GetByIdAsync(runtime.ModelId, hostToken);
if (modelRow is null)
{
await runs.MarkFailedAsync(runtime.RunId, $"Model not found: {runtime.ModelId}", hostToken);
return;
}
await runs.MarkRunningAsync(runtime.RunId, hostToken);
var statuses = runtime.OnlyDone ? new short[] { 2 } : null;
var queueIds = await queue.ListQueueIdsAsync(runtime.SessionId, statuses, hostToken);
runtime.Total = queueIds.Count;
foreach (var qid in queueIds)
{
if (hostToken.IsCancellationRequested) break;
runtime.CurrentQueueId = qid;
runtime.Processed++;
try
{
var row = await content.GetCompressedByQueueIdAsync(qid, hostToken);
if (row is null || row.ContentBytes is null || row.ContentBytes.Length == 0)
throw new InvalidOperationException("Content not found");
if (!string.Equals(row.ContentEncoding, "gzip", StringComparison.OrdinalIgnoreCase))
throw new InvalidOperationException($"Unsupported encoding: {row.ContentEncoding}");
var html = CompressionUtils.GzipDecompressUtf8(row.ContentBytes);
using var json = _engine.Extract(html, modelRow.Definition.RootElement);
await extracted.UpsertAsync(new UpsertExtractedDataDto(
RunId: runtime.RunId,
ModelId: runtime.ModelId,
SessionId: runtime.SessionId,
QueueId: qid,
ExtractedJson: json,
Success: true,
Error: null
), hostToken);
runtime.Succeeded++;
}
catch (Exception ex)
{
using var errJson = JsonDocument.Parse("{}");
await extracted.UpsertAsync(new UpsertExtractedDataDto(
RunId: runtime.RunId,
ModelId: runtime.ModelId,
SessionId: runtime.SessionId,
QueueId: qid,
ExtractedJson: errJson,
Success: false,
Error: Truncate(ex.Message, 2000)
), hostToken);
runtime.Failed++;
}
finally
{
runtime.CurrentQueueId = null;
}
}
await runs.MarkDoneAsync(runtime.RunId, runtime.Total, runtime.Succeeded, runtime.Failed, hostToken);
}
catch (Exception ex)
{
try
{
using var scope = _scopeFactory.CreateScope();
var runs = scope.ServiceProvider.GetRequiredService<IExtractionRunRepository>();
await runs.MarkFailedAsync(runtime.RunId, Truncate(ex.ToString(), 8000), hostToken);
}
catch
{
// ignore double-fault
}
}
finally
{
runtime.IsRunning = false;
runtime.Exit();
}
}
private static string Truncate(string s, int max) => s.Length <= max ? s : s[..max];
private sealed class Runtime
{
private int _entered;
public long RunId { get; }
public int SessionId { get; }
public long ModelId { get; }
public bool OnlyDone { get; }
public bool IsRunning { get; set; }
public int Total { get; set; }
public int Processed { get; set; }
public int Succeeded { get; set; }
public int Failed { get; set; }
public int? CurrentQueueId { get; set; }
public Runtime(long runId, int sessionId, long modelId, bool onlyDone)
{
RunId = runId;
SessionId = sessionId;
ModelId = modelId;
OnlyDone = onlyDone;
}
public bool TryEnter() => Interlocked.CompareExchange(ref _entered, 1, 0) == 0;
public void Exit() => Interlocked.Exchange(ref _entered, 0);
}
}