From 1ae977b3f147149470d64a510ace44defda3663d Mon Sep 17 00:00:00 2001 From: nicoeri Date: Thu, 15 Jan 2026 21:28:31 -0300 Subject: [PATCH] Add extraction models, runs, and extracted data implementation --- .../Controllers/ExtractionModelsController.cs | 46 +++ .../Controllers/ExtractionRunsController.cs | 66 ++++ ScrapperAPI/Dtos/ExtractionModelDtos.cs | 20 + ScrapperAPI/Dtos/ExtractionRunDtos.cs | 61 ++++ .../Interfaces/IExtractedDataRepository.cs | 10 + .../Interfaces/IExtractionCoordinator.cs | 16 + .../Interfaces/IExtractionModelRepository.cs | 10 + .../Interfaces/IExtractionRunRepository.cs | 12 + ScrapperAPI/Interfaces/IQueueRepository.cs | 5 + ScrapperAPI/Program.cs | 9 + .../Repositories/ExtractedDataRepository.cs | 112 ++++++ .../Repositories/ExtractionModelRepository.cs | 92 +++++ .../Repositories/ExtractionRunRepository.cs | 96 +++++ ScrapperAPI/Repositories/QueueRepository.cs | 27 ++ ScrapperAPI/ScrapperAPI.csproj | 1 + ScrapperAPI/Scripts/database.sql | 54 ++- ScrapperAPI/Services/ExtractionEngine.cs | 341 ++++++++++++++++++ ScrapperAPI/Workers/ExtractionCoordinator.cs | 219 +++++++++++ 18 files changed, 1196 insertions(+), 1 deletion(-) create mode 100644 ScrapperAPI/Controllers/ExtractionModelsController.cs create mode 100644 ScrapperAPI/Controllers/ExtractionRunsController.cs create mode 100644 ScrapperAPI/Dtos/ExtractionModelDtos.cs create mode 100644 ScrapperAPI/Dtos/ExtractionRunDtos.cs create mode 100644 ScrapperAPI/Interfaces/IExtractedDataRepository.cs create mode 100644 ScrapperAPI/Interfaces/IExtractionCoordinator.cs create mode 100644 ScrapperAPI/Interfaces/IExtractionModelRepository.cs create mode 100644 ScrapperAPI/Interfaces/IExtractionRunRepository.cs create mode 100644 ScrapperAPI/Repositories/ExtractedDataRepository.cs create mode 100644 ScrapperAPI/Repositories/ExtractionModelRepository.cs create mode 100644 ScrapperAPI/Repositories/ExtractionRunRepository.cs create mode 100644 ScrapperAPI/Services/ExtractionEngine.cs create mode 100644 ScrapperAPI/Workers/ExtractionCoordinator.cs diff --git a/ScrapperAPI/Controllers/ExtractionModelsController.cs b/ScrapperAPI/Controllers/ExtractionModelsController.cs new file mode 100644 index 0000000..47e7974 --- /dev/null +++ b/ScrapperAPI/Controllers/ExtractionModelsController.cs @@ -0,0 +1,46 @@ +using System.ComponentModel.DataAnnotations; +using System.Text.Json; +using Microsoft.AspNetCore.Mvc; +using ScrapperAPI.Dtos; +using ScrapperAPI.Interfaces; + +namespace ScrapperAPI.Controllers; + +public sealed record CreateExtractionModelRequest( + [Required] string Name, + [Required] JsonDocument Definition, + int Version = 1, + string? Description = null); + +[ApiController] +[Route("extraction-models")] +public sealed class ExtractionModelsController : ControllerBase +{ + private readonly IExtractionModelRepository _models; + + public ExtractionModelsController(IExtractionModelRepository models) => _models = models; + + [HttpPost] + public async Task Create([FromBody] CreateExtractionModelRequest req, CancellationToken ct) + { + var id = await _models.CreateAsync(new CreateExtractionModelDto( + Name: req.Name, + Version: req.Version <= 0 ? 1 : req.Version, + Description: req.Description, + Definition: req.Definition + ), ct); + + return Created($"/extraction-models/{id}", new { id }); + } + + [HttpGet] + public async Task List(CancellationToken ct) + => Ok(await _models.GetAllAsync(ct)); + + [HttpGet("{id:long}")] + public async Task GetById(long id, CancellationToken ct) + { + var row = await _models.GetByIdAsync(id, ct); + return row is null ? NotFound() : Ok(row); + } +} diff --git a/ScrapperAPI/Controllers/ExtractionRunsController.cs b/ScrapperAPI/Controllers/ExtractionRunsController.cs new file mode 100644 index 0000000..f2c87ef --- /dev/null +++ b/ScrapperAPI/Controllers/ExtractionRunsController.cs @@ -0,0 +1,66 @@ +using Microsoft.AspNetCore.Mvc; +using ScrapperAPI.Dtos; +using ScrapperAPI.Interfaces; + +namespace ScrapperAPI.Controllers; + +[ApiController] +[Route("extraction-runs")] +public sealed class ExtractionRunsController : ControllerBase +{ + private readonly IExtractionCoordinator _coord; + private readonly IExtractionRunRepository _runs; + private readonly IExtractedDataRepository _extracted; + + public ExtractionRunsController( + IExtractionCoordinator coord, + IExtractionRunRepository runs, + IExtractedDataRepository extracted) + { + _coord = coord; + _runs = runs; + _extracted = extracted; + } + + /// + /// Inicia uma extração em background (cria um run). + /// + [HttpPost] + public async Task Start([FromBody] StartExtractionRequest req, CancellationToken ct) + { + var runId = await _coord.StartRunAsync(req, ct); + return Accepted(new { runId }); + } + + [HttpGet("{runId:long}")] + public async Task GetRun(long runId, CancellationToken ct) + { + var row = await _runs.GetByIdAsync(runId, ct); + if (row is null) return NotFound(); + + var runtime = _coord.GetRuntimeStatus(runId); + return Ok(new { run = row, runtime }); + } + + /// + /// Lista os dados extraídos de uma sessão por modelo. + /// GET /extraction-runs/session/1/model/10 + /// + [HttpGet("session/{sessionId:int}/model/{modelId:long}")] + public async Task ListExtracted(int sessionId, long modelId, CancellationToken ct) + { + var rows = await _extracted.ListBySessionAsync(sessionId, modelId, ct); + return Ok(rows); + } + + /// + /// Pega o JSON extraído de um item específico. + /// GET /extraction-runs/queue/123/model/10 + /// + [HttpGet("queue/{queueId:int}/model/{modelId:long}")] + public async Task GetByQueue(int queueId, long modelId, CancellationToken ct) + { + var row = await _extracted.GetByQueueIdAsync(queueId, modelId, ct); + return row is null ? NotFound() : Ok(row); + } +} diff --git a/ScrapperAPI/Dtos/ExtractionModelDtos.cs b/ScrapperAPI/Dtos/ExtractionModelDtos.cs new file mode 100644 index 0000000..80d9b4e --- /dev/null +++ b/ScrapperAPI/Dtos/ExtractionModelDtos.cs @@ -0,0 +1,20 @@ +using System.Text.Json; + +namespace ScrapperAPI.Dtos; + +public sealed record CreateExtractionModelDto( + string Name, + int Version, + string? Description, + JsonDocument Definition +); + +public sealed record ExtractionModelRow( + long Id, + string Name, + int Version, + string? Description, + JsonDocument Definition, + DateTimeOffset CreatedAt, + DateTimeOffset UpdatedAt +); diff --git a/ScrapperAPI/Dtos/ExtractionRunDtos.cs b/ScrapperAPI/Dtos/ExtractionRunDtos.cs new file mode 100644 index 0000000..35bac92 --- /dev/null +++ b/ScrapperAPI/Dtos/ExtractionRunDtos.cs @@ -0,0 +1,61 @@ +using System.ComponentModel.DataAnnotations; +using System.Text.Json; + +namespace ScrapperAPI.Dtos; + +public sealed record StartExtractionRequest( + [Required] int SessionId, + [Required] long ModelId, + bool OnlyDone = true +); + +public sealed record CreateExtractionRunDto( + long ModelId, + int SessionId +); + +public sealed record ExtractionRunRow( + long Id, + long ModelId, + int SessionId, + short Status, + DateTimeOffset CreatedAt, + DateTimeOffset? StartedAt, + DateTimeOffset? FinishedAt, + int Total, + int Succeeded, + int Failed, + string? Error +); + +public sealed record ExtractionRuntimeStatus( + long RunId, + bool IsRunning, + int Processed, + int Total, + int Succeeded, + int Failed, + int? CurrentQueueId +); + +public sealed record UpsertExtractedDataDto( + long RunId, + long ModelId, + int SessionId, + int QueueId, + JsonDocument ExtractedJson, + bool Success, + string? Error +); + +public sealed record ExtractedDataRow( + long Id, + long RunId, + long ModelId, + int SessionId, + int QueueId, + JsonDocument ExtractedJson, + bool Success, + string? Error, + DateTimeOffset ExtractedAt +); diff --git a/ScrapperAPI/Interfaces/IExtractedDataRepository.cs b/ScrapperAPI/Interfaces/IExtractedDataRepository.cs new file mode 100644 index 0000000..27fee85 --- /dev/null +++ b/ScrapperAPI/Interfaces/IExtractedDataRepository.cs @@ -0,0 +1,10 @@ +using ScrapperAPI.Dtos; + +namespace ScrapperAPI.Interfaces; + +public interface IExtractedDataRepository +{ + Task UpsertAsync(UpsertExtractedDataDto dto, CancellationToken ct); + Task> ListBySessionAsync(int sessionId, long modelId, CancellationToken ct); + Task GetByQueueIdAsync(int queueId, long modelId, CancellationToken ct); +} diff --git a/ScrapperAPI/Interfaces/IExtractionCoordinator.cs b/ScrapperAPI/Interfaces/IExtractionCoordinator.cs new file mode 100644 index 0000000..102ff23 --- /dev/null +++ b/ScrapperAPI/Interfaces/IExtractionCoordinator.cs @@ -0,0 +1,16 @@ +using ScrapperAPI.Dtos; + +namespace ScrapperAPI.Interfaces; + +public interface IExtractionCoordinator +{ + /// + /// Cria um run e inicia o processamento em background. + /// + Task StartRunAsync(StartExtractionRequest request, CancellationToken ct); + + /// + /// Retorna status em tempo real (se estiver rodando). + /// + ExtractionRuntimeStatus GetRuntimeStatus(long runId); +} diff --git a/ScrapperAPI/Interfaces/IExtractionModelRepository.cs b/ScrapperAPI/Interfaces/IExtractionModelRepository.cs new file mode 100644 index 0000000..0db636a --- /dev/null +++ b/ScrapperAPI/Interfaces/IExtractionModelRepository.cs @@ -0,0 +1,10 @@ +using ScrapperAPI.Dtos; + +namespace ScrapperAPI.Interfaces; + +public interface IExtractionModelRepository +{ + Task CreateAsync(CreateExtractionModelDto dto, CancellationToken ct); + Task> GetAllAsync(CancellationToken ct); + Task GetByIdAsync(long id, CancellationToken ct); +} diff --git a/ScrapperAPI/Interfaces/IExtractionRunRepository.cs b/ScrapperAPI/Interfaces/IExtractionRunRepository.cs new file mode 100644 index 0000000..07c5d87 --- /dev/null +++ b/ScrapperAPI/Interfaces/IExtractionRunRepository.cs @@ -0,0 +1,12 @@ +using ScrapperAPI.Dtos; + +namespace ScrapperAPI.Interfaces; + +public interface IExtractionRunRepository +{ + Task CreateAsync(CreateExtractionRunDto dto, CancellationToken ct); + Task GetByIdAsync(long id, CancellationToken ct); + Task MarkRunningAsync(long runId, CancellationToken ct); + Task MarkDoneAsync(long runId, int total, int succeeded, int failed, CancellationToken ct); + Task MarkFailedAsync(long runId, string error, CancellationToken ct); +} diff --git a/ScrapperAPI/Interfaces/IQueueRepository.cs b/ScrapperAPI/Interfaces/IQueueRepository.cs index 7adb0d9..aa81131 100644 --- a/ScrapperAPI/Interfaces/IQueueRepository.cs +++ b/ScrapperAPI/Interfaces/IQueueRepository.cs @@ -21,4 +21,9 @@ public interface IQueueRepository Task RemovePendingByIdAsync(int sessionId, int queueId, CancellationToken ct); Task RemovePendingByUrlAsync(int sessionId, string url, CancellationToken ct); + + /// + /// Lista IDs da fila por sessão e status (ex.: status=2 -> DONE). + /// + Task> ListQueueIdsAsync(int sessionId, IReadOnlyCollection? statuses, CancellationToken ct); } \ No newline at end of file diff --git a/ScrapperAPI/Program.cs b/ScrapperAPI/Program.cs index 181ef69..be787db 100644 --- a/ScrapperAPI/Program.cs +++ b/ScrapperAPI/Program.cs @@ -76,11 +76,20 @@ builder.Services.AddScoped(); builder.Services.AddScoped(); builder.Services.AddScoped(); +// Extraction +builder.Services.AddSingleton(); +builder.Services.AddScoped(); +builder.Services.AddScoped(); +builder.Services.AddScoped(); + builder.Services.AddHttpClient("scraper", c => c.Timeout = TimeSpan.FromSeconds(30)); builder.Services.AddSingleton(); builder.Services.AddHostedService(sp => (ScrapeCoordinator)sp.GetRequiredService()); +builder.Services.AddSingleton(); +builder.Services.AddHostedService(sp => (ExtractionCoordinator)sp.GetRequiredService()); + builder.Services.AddCors(options => { options.AddPolicy("AllowReact", diff --git a/ScrapperAPI/Repositories/ExtractedDataRepository.cs b/ScrapperAPI/Repositories/ExtractedDataRepository.cs new file mode 100644 index 0000000..2483f51 --- /dev/null +++ b/ScrapperAPI/Repositories/ExtractedDataRepository.cs @@ -0,0 +1,112 @@ +using System.Text.Json; +using Dapper; +using ScrapperAPI.Dtos; +using ScrapperAPI.Interfaces; + +namespace ScrapperAPI.Repositories; + +public sealed class ExtractedDataRepository : IExtractedDataRepository +{ + private readonly IDbConnectionFactory _db; + public ExtractedDataRepository(IDbConnectionFactory db) => _db = db; + + public async Task UpsertAsync(UpsertExtractedDataDto dto, CancellationToken ct) + { + const string sql = """ + insert into extracted_data(run_id, model_id, session_id, queue_id, extracted_json, success, error) + values (@runId, @modelId, @sessionId, @queueId, @json::jsonb, @success, @error) + on conflict (model_id, queue_id) + do update set + run_id = excluded.run_id, + session_id = excluded.session_id, + extracted_json = excluded.extracted_json, + success = excluded.success, + error = excluded.error, + extracted_at = now(); + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + await conn.ExecuteAsync(new CommandDefinition(sql, new + { + runId = dto.RunId, + modelId = dto.ModelId, + sessionId = dto.SessionId, + queueId = dto.QueueId, + json = dto.ExtractedJson.RootElement.GetRawText(), + success = dto.Success, + error = dto.Error + }, cancellationToken: ct)); + } + + public async Task> ListBySessionAsync(int sessionId, long modelId, CancellationToken ct) + { + const string sql = """ + select + id, + run_id as RunId, + model_id as ModelId, + session_id as SessionId, + queue_id as QueueId, + extracted_json::text as extracted_json, + success, + error, + extracted_at as ExtractedAt + from extracted_data + where session_id = @sessionId + and model_id = @modelId + order by queue_id; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + var rows = await conn.QueryAsync(new CommandDefinition(sql, new { sessionId, modelId }, cancellationToken: ct)); + return rows.Select(r => r.ToDto()).ToList(); + } + + public async Task GetByQueueIdAsync(int queueId, long modelId, CancellationToken ct) + { + const string sql = """ + select + id, + run_id as RunId, + model_id as ModelId, + session_id as SessionId, + queue_id as QueueId, + extracted_json::text as extracted_json, + success, + error, + extracted_at as ExtractedAt + from extracted_data + where queue_id = @queueId + and model_id = @modelId + limit 1; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + var row = await conn.QuerySingleOrDefaultAsync(new CommandDefinition(sql, new { queueId, modelId }, cancellationToken: ct)); + return row?.ToDto(); + } + + private sealed record RowRaw( + long Id, + long RunId, + long ModelId, + int SessionId, + int QueueId, + string Extracted_Json, + bool Success, + string? Error, + DateTimeOffset ExtractedAt) + { + public ExtractedDataRow ToDto() => new( + Id, + RunId, + ModelId, + SessionId, + QueueId, + JsonDocument.Parse(Extracted_Json), + Success, + Error, + ExtractedAt + ); + } +} diff --git a/ScrapperAPI/Repositories/ExtractionModelRepository.cs b/ScrapperAPI/Repositories/ExtractionModelRepository.cs new file mode 100644 index 0000000..6c0c86c --- /dev/null +++ b/ScrapperAPI/Repositories/ExtractionModelRepository.cs @@ -0,0 +1,92 @@ +using System.Text.Json; +using Dapper; +using ScrapperAPI.Dtos; +using ScrapperAPI.Interfaces; + +namespace ScrapperAPI.Repositories; + +public sealed class ExtractionModelRepository : IExtractionModelRepository +{ + private readonly IDbConnectionFactory _db; + + public ExtractionModelRepository(IDbConnectionFactory db) => _db = db; + + public async Task CreateAsync(CreateExtractionModelDto dto, CancellationToken ct) + { + const string sql = """ + insert into extraction_model(name, version, description, definition) + values (@name, @version, @description, @definition::jsonb) + returning id; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.ExecuteScalarAsync(new CommandDefinition(sql, new + { + name = dto.Name, + version = dto.Version, + description = dto.Description, + definition = dto.Definition.RootElement.GetRawText() + }, cancellationToken: ct)); + } + + public async Task> GetAllAsync(CancellationToken ct) + { + const string sql = """ + select + id, + name, + version, + description, + definition::text as definition_json, + created_at, + updated_at + from extraction_model + order by name, version, id; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + var rows = await conn.QueryAsync(new CommandDefinition(sql, cancellationToken: ct)); + return rows.Select(r => r.ToDto()).ToList(); + } + + public async Task GetByIdAsync(long id, CancellationToken ct) + { + const string sql = """ + select + id as Id, + name as Name, + version as Version, + description as Description, + definition as DefinitionJson, + created_at as CreatedAt, + updated_at as UpdatedAt + from extraction_model + where id = @id + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + var row = await conn.QuerySingleOrDefaultAsync(new CommandDefinition(sql, new { id }, cancellationToken: ct)); + return row?.ToDto(); + } + + private class ModelRaw + { + public long Id { get; set; } + public string Name { get; set; } = default!; + public int Version { get; set; } + public string? Description { get; set; } + public string DefinitionJson { get; set; } = default!; + public DateTime CreatedAt { get; set; } + public DateTime UpdatedAt { get; set; } + + public ExtractionModelRow ToDto() => new( + Id, + Name, + Version, + Description, + JsonDocument.Parse(DefinitionJson), + CreatedAt, + UpdatedAt + ); + } +} diff --git a/ScrapperAPI/Repositories/ExtractionRunRepository.cs b/ScrapperAPI/Repositories/ExtractionRunRepository.cs new file mode 100644 index 0000000..27bf471 --- /dev/null +++ b/ScrapperAPI/Repositories/ExtractionRunRepository.cs @@ -0,0 +1,96 @@ +using Dapper; +using ScrapperAPI.Dtos; +using ScrapperAPI.Interfaces; + +namespace ScrapperAPI.Repositories; + +public sealed class ExtractionRunRepository : IExtractionRunRepository +{ + private readonly IDbConnectionFactory _db; + public ExtractionRunRepository(IDbConnectionFactory db) => _db = db; + + public async Task CreateAsync(CreateExtractionRunDto dto, CancellationToken ct) + { + const string sql = """ + insert into extraction_run(model_id, session_id, status) + values (@modelId, @sessionId, 0) + returning id; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.ExecuteScalarAsync(new CommandDefinition(sql, new + { + modelId = dto.ModelId, + sessionId = dto.SessionId + }, cancellationToken: ct)); + } + + public async Task GetByIdAsync(long id, CancellationToken ct) + { + const string sql = """ + select + id, + model_id as ModelId, + session_id as SessionId, + status, + created_at as CreatedAt, + started_at as StartedAt, + finished_at as FinishedAt, + total, + succeeded, + failed, + error + from extraction_run + where id = @id + limit 1; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.QuerySingleOrDefaultAsync(new CommandDefinition(sql, new { id }, cancellationToken: ct)); + } + + public async Task MarkRunningAsync(long runId, CancellationToken ct) + { + const string sql = """ + update extraction_run + set status = 1, + started_at = now(), + error = null + where id = @runId; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + await conn.ExecuteAsync(new CommandDefinition(sql, new { runId }, cancellationToken: ct)); + } + + public async Task MarkDoneAsync(long runId, int total, int succeeded, int failed, CancellationToken ct) + { + const string sql = """ + update extraction_run + set status = 2, + finished_at = now(), + total = @total, + succeeded = @succeeded, + failed = @failed, + error = null + where id = @runId; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + await conn.ExecuteAsync(new CommandDefinition(sql, new { runId, total, succeeded, failed }, cancellationToken: ct)); + } + + public async Task MarkFailedAsync(long runId, string error, CancellationToken ct) + { + const string sql = """ + update extraction_run + set status = 3, + finished_at = now(), + error = @error + where id = @runId; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + await conn.ExecuteAsync(new CommandDefinition(sql, new { runId, error }, cancellationToken: ct)); + } +} diff --git a/ScrapperAPI/Repositories/QueueRepository.cs b/ScrapperAPI/Repositories/QueueRepository.cs index 707c332..47e0b70 100644 --- a/ScrapperAPI/Repositories/QueueRepository.cs +++ b/ScrapperAPI/Repositories/QueueRepository.cs @@ -155,4 +155,31 @@ public sealed class QueueRepository : IQueueRepository return await conn.ExecuteAsync(new CommandDefinition(sql, new { sessionId, url }, cancellationToken: ct)); } + public async Task> ListQueueIdsAsync(int sessionId, IReadOnlyCollection? statuses, CancellationToken ct) + { + // Ex.: statuses = [2] -> DONE + var statusFilter = statuses is { Count: > 0 }; + + var sql = statusFilter + ? """ + select id + from queue + where session_id = @sessionId + and status = any(@statuses) + order by id; + """ + : """ + select id + from queue + where session_id = @sessionId + order by id; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + var rows = await conn.QueryAsync(new CommandDefinition(sql, + new { sessionId, statuses = statuses?.ToArray() }, + cancellationToken: ct)); + return rows.ToList(); + } + } \ No newline at end of file diff --git a/ScrapperAPI/ScrapperAPI.csproj b/ScrapperAPI/ScrapperAPI.csproj index 355a035..30dc7ce 100644 --- a/ScrapperAPI/ScrapperAPI.csproj +++ b/ScrapperAPI/ScrapperAPI.csproj @@ -11,6 +11,7 @@ + diff --git a/ScrapperAPI/Scripts/database.sql b/ScrapperAPI/Scripts/database.sql index 407fdb8..739a5ad 100644 --- a/ScrapperAPI/Scripts/database.sql +++ b/ScrapperAPI/Scripts/database.sql @@ -37,4 +37,56 @@ alter table content add column content_encoding varchar(20) not null default 'gzip', add column content_bytes bytea null, add column original_length int null, - add column compressed_length int null; \ No newline at end of file + add column compressed_length int null; + +-- ------------------------------------------------------------ +-- Extraction models + runs + extracted json +-- ------------------------------------------------------------ + +drop table if exists extracted_data; +drop table if exists extraction_run; +drop table if exists extraction_model; + +create table extraction_model ( + id bigserial primary key, + name varchar(200) not null, + version int not null default 1, + description text null, + definition jsonb not null, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + unique(name, version) +); + +create table extraction_run ( + id bigserial primary key, + model_id bigint not null references extraction_model(id), + session_id int not null references session(id), + status smallint not null default 0, -- 0=queued 1=running 2=done 3=failed + started_at timestamptz null, + finished_at timestamptz null, + total int not null default 0, + succeeded int not null default 0, + failed int not null default 0, + error text null, + created_at timestamptz not null default now() +); + +create index idx_extraction_run_session on extraction_run(session_id); + +create table extracted_data ( + id bigserial primary key, + run_id bigint not null references extraction_run(id), + model_id bigint not null references extraction_model(id), + session_id int not null references session(id), + queue_id int not null references queue(id), + extracted_json jsonb not null, + success boolean not null default true, + error text null, + extracted_at timestamptz not null default now(), + unique(model_id, queue_id) +); + +create index idx_extracted_data_session on extracted_data(session_id); +create index idx_extracted_data_queue on extracted_data(queue_id); +create index idx_extracted_data_json on extracted_data using gin (extracted_json); \ No newline at end of file diff --git a/ScrapperAPI/Services/ExtractionEngine.cs b/ScrapperAPI/Services/ExtractionEngine.cs new file mode 100644 index 0000000..61c722f --- /dev/null +++ b/ScrapperAPI/Services/ExtractionEngine.cs @@ -0,0 +1,341 @@ +using System.Globalization; +using System.Text.Json; +using AngleSharp.Dom; +using AngleSharp.Html.Parser; + +namespace ScrapperAPI.Services; + +/// +/// Engine genérico de extração baseado em CSS selectors (AngleSharp). +/// A definição do modelo vem como JSON (extraction_model.definition). +/// +public sealed class ExtractionEngine +{ + private readonly HtmlParser _parser = new(); + + public JsonDocument Extract(string html, JsonElement modelDefinition) + { + var doc = _parser.ParseDocument(html); + + var rootSelector = modelDefinition.TryGetProperty("rootSelector", out var rs) && rs.ValueKind == JsonValueKind.String + ? rs.GetString() + : null; + + IElement root = doc.DocumentElement; + if (!string.IsNullOrWhiteSpace(rootSelector)) + { + root = doc.QuerySelector(rootSelector!) ?? root; + } + + if (!modelDefinition.TryGetProperty("fields", out var fields) || fields.ValueKind != JsonValueKind.Array) + throw new InvalidOperationException("Model definition must contain an array property 'fields'."); + + using var stream = new MemoryStream(); + using (var writer = new Utf8JsonWriter(stream, new JsonWriterOptions { Indented = false })) + { + writer.WriteStartObject(); + + foreach (var field in fields.EnumerateArray()) + { + WriteField(writer, root, field); + } + + writer.WriteEndObject(); + } + + stream.Position = 0; + return JsonDocument.Parse(stream); + } + + private static void WriteField(Utf8JsonWriter writer, IElement context, JsonElement field) + { + var key = field.GetProperty("key").GetString(); + if (string.IsNullOrWhiteSpace(key)) + throw new InvalidOperationException("Field 'key' is required."); + + var type = field.GetProperty("type").GetString()?.ToLowerInvariant(); + if (string.IsNullOrWhiteSpace(type)) + throw new InvalidOperationException($"Field '{key}' missing 'type'."); + + writer.WritePropertyName(key); + + switch (type) + { + case "object": + WriteObject(writer, context, field); + break; + case "array": + WriteArray(writer, context, field); + break; + default: + WritePrimitive(writer, context, field, type); + break; + } + } + + private static void WriteObject(Utf8JsonWriter writer, IElement context, JsonElement field) + { + var objContext = ResolveContext(context, field); + + if (!field.TryGetProperty("fields", out var fields) || fields.ValueKind != JsonValueKind.Array) + throw new InvalidOperationException("Object field must contain an array property 'fields'."); + + writer.WriteStartObject(); + foreach (var sub in fields.EnumerateArray()) + { + WriteField(writer, objContext, sub); + } + writer.WriteEndObject(); + } + + private static void WriteArray(Utf8JsonWriter writer, IElement context, JsonElement field) + { + var selector = field.TryGetProperty("selector", out var s) && s.ValueKind == JsonValueKind.String + ? s.GetString() + : null; + + if (string.IsNullOrWhiteSpace(selector)) + { + writer.WriteStartArray(); + writer.WriteEndArray(); + return; + } + + if (!field.TryGetProperty("items", out var items)) + throw new InvalidOperationException("Array field must contain 'items'."); + + var nodes = context.QuerySelectorAll(selector!); + + writer.WriteStartArray(); + foreach (var node in nodes) + { + WriteArrayItem(writer, node, items); + } + writer.WriteEndArray(); + } + + private static void WriteArrayItem(Utf8JsonWriter writer, IElement itemContext, JsonElement items) + { + var type = items.GetProperty("type").GetString()?.ToLowerInvariant(); + if (string.IsNullOrWhiteSpace(type)) + throw new InvalidOperationException("Array 'items.type' is required."); + + switch (type) + { + case "object": + if (!items.TryGetProperty("fields", out var fields) || fields.ValueKind != JsonValueKind.Array) + throw new InvalidOperationException("Array items of type 'object' must contain 'fields'."); + + writer.WriteStartObject(); + foreach (var sub in fields.EnumerateArray()) + { + WriteField(writer, itemContext, sub); + } + writer.WriteEndObject(); + break; + case "array": + // array de array + // items.selector indica onde encontrar os sub-itens dentro de cada itemContext + var tmpField = JsonDocument.Parse($"{{\"type\":\"array\",\"selector\":{JsonSerializer.Serialize(items.GetProperty("selector").GetString())},\"items\":{items.GetProperty("items").GetRawText()}}}").RootElement; + WriteArray(writer, itemContext, tmpField); + break; + default: + WritePrimitive(writer, itemContext, items, type); + break; + } + } + + private static void WritePrimitive(Utf8JsonWriter writer, IElement context, JsonElement field, string type) + { + var node = ResolveNode(context, field); + var raw = ReadRawValue(node, field); + raw = ApplyTransforms(raw, field); + + if (raw is null) + { + writer.WriteNullValue(); + return; + } + + switch (type) + { + case "string": + writer.WriteStringValue(raw); + break; + case "number": + if (TryParseNumber(raw, field, out var dec)) + writer.WriteNumberValue(dec); + else + writer.WriteNullValue(); + break; + case "date": + if (TryParseDate(raw, field, out var date)) + writer.WriteStringValue(date); + else + writer.WriteNullValue(); + break; + case "boolean": + case "bool": + if (TryParseBool(raw, out var b)) + writer.WriteBooleanValue(b); + else + writer.WriteNullValue(); + break; + default: + // fallback: string + writer.WriteStringValue(raw); + break; + } + } + + private static IElement ResolveContext(IElement context, JsonElement field) + { + var selector = field.TryGetProperty("selector", out var s) && s.ValueKind == JsonValueKind.String + ? s.GetString() + : null; + + if (string.IsNullOrWhiteSpace(selector)) + return context; + + return context.QuerySelector(selector!) ?? context; + } + + private static IElement? ResolveNode(IElement context, JsonElement field) + { + var selector = field.TryGetProperty("selector", out var s) && s.ValueKind == JsonValueKind.String + ? s.GetString() + : null; + + if (string.IsNullOrWhiteSpace(selector)) + return context; + + return context.QuerySelector(selector!); + } + + private static string? ReadRawValue(IElement? node, JsonElement field) + { + if (node is null) return null; + + // default source: text + if (!field.TryGetProperty("source", out var source) || source.ValueKind != JsonValueKind.Object) + return node.TextContent; + + var kind = source.TryGetProperty("kind", out var k) && k.ValueKind == JsonValueKind.String + ? k.GetString()?.ToLowerInvariant() + : "text"; + + return kind switch + { + "text" => node.TextContent, + "html" => node.InnerHtml, + "attr" => source.TryGetProperty("name", out var n) && n.ValueKind == JsonValueKind.String + ? node.GetAttribute(n.GetString()!) + : null, + "value" => node.GetAttribute("value") ?? node.TextContent, + _ => node.TextContent + }; + } + + private static string? ApplyTransforms(string? raw, JsonElement field) + { + if (raw is null) return null; + + if (!field.TryGetProperty("transforms", out var transforms) || transforms.ValueKind != JsonValueKind.Array) + return raw; + + var current = raw; + foreach (var t in transforms.EnumerateArray()) + { + if (t.ValueKind != JsonValueKind.String) continue; + var tr = t.GetString() ?? ""; + + if (string.Equals(tr, "trim", StringComparison.OrdinalIgnoreCase)) + current = current.Trim(); + else if (string.Equals(tr, "lower", StringComparison.OrdinalIgnoreCase)) + current = current.ToLowerInvariant(); + else if (string.Equals(tr, "upper", StringComparison.OrdinalIgnoreCase)) + current = current.ToUpperInvariant(); + else if (string.Equals(tr, "removeNonDigits", StringComparison.OrdinalIgnoreCase)) + current = new string(current.Where(char.IsDigit).ToArray()); + // transforms mais avançados (regex/replace/etc) você pode adicionar depois + } + + return current; + } + + private static bool TryParseNumber(string raw, JsonElement field, out decimal value) + { + // transform opcional: "number:pt-BR" ou "number:invariant" + var culture = CultureInfo.InvariantCulture; + + if (field.TryGetProperty("transforms", out var transforms) && transforms.ValueKind == JsonValueKind.Array) + { + foreach (var t in transforms.EnumerateArray()) + { + if (t.ValueKind != JsonValueKind.String) continue; + var s = t.GetString() ?? ""; + if (s.StartsWith("number:", StringComparison.OrdinalIgnoreCase)) + { + var arg = s.Substring("number:".Length); + if (string.Equals(arg, "pt-BR", StringComparison.OrdinalIgnoreCase)) + culture = new CultureInfo("pt-BR"); + else if (string.Equals(arg, "invariant", StringComparison.OrdinalIgnoreCase)) + culture = CultureInfo.InvariantCulture; + } + } + } + + return decimal.TryParse(raw, NumberStyles.Any, culture, out value); + } + + private static bool TryParseDate(string raw, JsonElement field, out string iso) + { + // transform opcional: "date:dd/MM/yyyy" etc. + string? format = null; + + if (field.TryGetProperty("transforms", out var transforms) && transforms.ValueKind == JsonValueKind.Array) + { + foreach (var t in transforms.EnumerateArray()) + { + if (t.ValueKind != JsonValueKind.String) continue; + var s = t.GetString() ?? ""; + if (s.StartsWith("date:", StringComparison.OrdinalIgnoreCase)) + format = s.Substring("date:".Length); + } + } + + DateTime dt; + if (!string.IsNullOrWhiteSpace(format)) + { + if (!DateTime.TryParseExact(raw, format, CultureInfo.InvariantCulture, DateTimeStyles.None, out dt)) + { + iso = ""; + return false; + } + } + else + { + if (!DateTime.TryParse(raw, CultureInfo.InvariantCulture, DateTimeStyles.AssumeLocal, out dt)) + { + // tenta pt-BR + if (!DateTime.TryParse(raw, new CultureInfo("pt-BR"), DateTimeStyles.AssumeLocal, out dt)) + { + iso = ""; + return false; + } + } + } + + iso = dt.ToString("yyyy-MM-dd", CultureInfo.InvariantCulture); + return true; + } + + private static bool TryParseBool(string raw, out bool value) + { + var s = raw.Trim().ToLowerInvariant(); + if (s is "true" or "1" or "yes" or "y" or "sim" or "s") { value = true; return true; } + if (s is "false" or "0" or "no" or "n" or "nao" or "não") { value = false; return true; } + value = false; + return false; + } +} diff --git a/ScrapperAPI/Workers/ExtractionCoordinator.cs b/ScrapperAPI/Workers/ExtractionCoordinator.cs new file mode 100644 index 0000000..a61870c --- /dev/null +++ b/ScrapperAPI/Workers/ExtractionCoordinator.cs @@ -0,0 +1,219 @@ +using System.Collections.Concurrent; +using System.Threading.Channels; +using System.Text.Json; +using Microsoft.Extensions.Logging; +using ScrapperAPI.Dtos; +using ScrapperAPI.Interfaces; +using ScrapperAPI.Services; +using ScrapperAPI.Utils; + +namespace ScrapperAPI.Workers; + +/// +/// Processa extrações (modelos) sobre o HTML já capturado (content.content_bytes gzip). +/// +public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordinator +{ + private readonly IServiceScopeFactory _scopeFactory; + private readonly ILogger _logger; + private readonly ExtractionEngine _engine; + + private readonly Channel _startRequests = Channel.CreateUnbounded( + new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); + + private readonly ConcurrentDictionary _running = new(); + + public ExtractionCoordinator( + IServiceScopeFactory scopeFactory, + ILogger logger, + ExtractionEngine engine) + { + _scopeFactory = scopeFactory; + _logger = logger; + _engine = engine; + } + + public async Task StartRunAsync(StartExtractionRequest request, CancellationToken ct) + { + using var scope = _scopeFactory.CreateScope(); + var runs = scope.ServiceProvider.GetRequiredService(); + + var runId = await runs.CreateAsync(new CreateExtractionRunDto(request.ModelId, request.SessionId), ct); + + _running[runId] = new Runtime(runId, request.SessionId, request.ModelId, request.OnlyDone); + await _startRequests.Writer.WriteAsync(runId, ct); + return runId; + } + + public ExtractionRuntimeStatus GetRuntimeStatus(long runId) + { + if (!_running.TryGetValue(runId, out var r)) + return new ExtractionRuntimeStatus(runId, false, 0, 0, 0, 0, null); + + return new ExtractionRuntimeStatus( + RunId: r.RunId, + IsRunning: r.IsRunning, + Processed: r.Processed, + Total: r.Total, + Succeeded: r.Succeeded, + Failed: r.Failed, + CurrentQueueId: r.CurrentQueueId + ); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("ExtractionCoordinator started."); + + while (!stoppingToken.IsCancellationRequested) + { + long runId; + try + { + runId = await _startRequests.Reader.ReadAsync(stoppingToken); + } + catch (OperationCanceledException) { break; } + + if (!_running.TryGetValue(runId, out var runtime)) + continue; + + _ = RunOnceAsync(runtime, stoppingToken); + } + } + + private async Task RunOnceAsync(Runtime runtime, CancellationToken hostToken) + { + if (!runtime.TryEnter()) + return; + + runtime.IsRunning = true; + + try + { + using var scope = _scopeFactory.CreateScope(); + var models = scope.ServiceProvider.GetRequiredService(); + var runs = scope.ServiceProvider.GetRequiredService(); + var queue = scope.ServiceProvider.GetRequiredService(); + var content = scope.ServiceProvider.GetRequiredService(); + var extracted = scope.ServiceProvider.GetRequiredService(); + + var modelRow = await models.GetByIdAsync(runtime.ModelId, hostToken); + if (modelRow is null) + { + await runs.MarkFailedAsync(runtime.RunId, $"Model not found: {runtime.ModelId}", hostToken); + return; + } + + await runs.MarkRunningAsync(runtime.RunId, hostToken); + + var statuses = runtime.OnlyDone ? new short[] { 2 } : null; + var queueIds = await queue.ListQueueIdsAsync(runtime.SessionId, statuses, hostToken); + + runtime.Total = queueIds.Count; + + foreach (var qid in queueIds) + { + if (hostToken.IsCancellationRequested) break; + + runtime.CurrentQueueId = qid; + runtime.Processed++; + + try + { + var row = await content.GetCompressedByQueueIdAsync(qid, hostToken); + if (row is null || row.ContentBytes is null || row.ContentBytes.Length == 0) + throw new InvalidOperationException("Content not found"); + + if (!string.Equals(row.ContentEncoding, "gzip", StringComparison.OrdinalIgnoreCase)) + throw new InvalidOperationException($"Unsupported encoding: {row.ContentEncoding}"); + + var html = CompressionUtils.GzipDecompressUtf8(row.ContentBytes); + + using var json = _engine.Extract(html, modelRow.Definition.RootElement); + + await extracted.UpsertAsync(new UpsertExtractedDataDto( + RunId: runtime.RunId, + ModelId: runtime.ModelId, + SessionId: runtime.SessionId, + QueueId: qid, + ExtractedJson: json, + Success: true, + Error: null + ), hostToken); + + runtime.Succeeded++; + } + catch (Exception ex) + { + using var errJson = JsonDocument.Parse("{}"); + + await extracted.UpsertAsync(new UpsertExtractedDataDto( + RunId: runtime.RunId, + ModelId: runtime.ModelId, + SessionId: runtime.SessionId, + QueueId: qid, + ExtractedJson: errJson, + Success: false, + Error: Truncate(ex.Message, 2000) + ), hostToken); + + runtime.Failed++; + } + finally + { + runtime.CurrentQueueId = null; + } + } + + await runs.MarkDoneAsync(runtime.RunId, runtime.Total, runtime.Succeeded, runtime.Failed, hostToken); + } + catch (Exception ex) + { + try + { + using var scope = _scopeFactory.CreateScope(); + var runs = scope.ServiceProvider.GetRequiredService(); + await runs.MarkFailedAsync(runtime.RunId, Truncate(ex.ToString(), 8000), hostToken); + } + catch + { + // ignore double-fault + } + } + finally + { + runtime.IsRunning = false; + runtime.Exit(); + } + } + + private static string Truncate(string s, int max) => s.Length <= max ? s : s[..max]; + + private sealed class Runtime + { + private int _entered; + + public long RunId { get; } + public int SessionId { get; } + public long ModelId { get; } + public bool OnlyDone { get; } + + public bool IsRunning { get; set; } + public int Total { get; set; } + public int Processed { get; set; } + public int Succeeded { get; set; } + public int Failed { get; set; } + public int? CurrentQueueId { get; set; } + + public Runtime(long runId, int sessionId, long modelId, bool onlyDone) + { + RunId = runId; + SessionId = sessionId; + ModelId = modelId; + OnlyDone = onlyDone; + } + + public bool TryEnter() => Interlocked.CompareExchange(ref _entered, 1, 0) == 0; + public void Exit() => Interlocked.Exchange(ref _entered, 0); + } +}