From fe448405ec4c5522a447b2a728f27071f4b80708 Mon Sep 17 00:00:00 2001 From: nicoeri Date: Mon, 9 Feb 2026 21:52:34 -0300 Subject: [PATCH] Add distributed scraping architecture with agent-based support via gRPC --- .../.idea/dataSources.local.xml | 48 ++++ README.md | 121 +++++++++ ScrapperAPI/AgentGrpc/AgentServiceImpl.cs | 199 ++++++++++++++ .../Controllers/ExtractionModelsController.cs | 8 +- ScrapperAPI/Dtos/AgentRow.cs | 10 + ScrapperAPI/Dtos/ExtractionModelDtos.cs | 68 +++-- ScrapperAPI/Dtos/ExtractionRunDtos.cs | 255 ++++++++++++++---- ScrapperAPI/Interfaces/IAgentRepository.cs | 13 + ScrapperAPI/Interfaces/IContentRepository.cs | 11 + ScrapperAPI/Interfaces/IQueueRepository.cs | 21 +- ScrapperAPI/Options/WorkerOptions.cs | 39 +++ ScrapperAPI/Program.cs | 6 + ScrapperAPI/Protos/agent.proto | 66 +++++ ScrapperAPI/Repositories/AgentRepository.cs | 84 ++++++ ScrapperAPI/Repositories/ContentRepository.cs | 25 ++ .../Repositories/ExtractedDataRepository.cs | 45 +++- .../Repositories/ExtractionModelRepository.cs | 2 +- ScrapperAPI/Repositories/QueueRepository.cs | 79 ++++-- ScrapperAPI/ScrapperAPI.csproj | 9 + ScrapperAPI/Scripts/database.sql | 18 ++ ScrapperAPI/Workers/ExtractionCoordinator.cs | 42 +-- ScrapperAPI/Workers/ScrapeCoordinator.cs | 184 ++++++++----- ScrapperAPI/appsettings.Development.json | 25 ++ ScrapperAPI/appsettings.json | 13 + Voyager.sln | 6 + VoyagerAgent/AgentClientOptions.cs | 28 ++ VoyagerAgent/AgentWorker.cs | 154 +++++++++++ VoyagerAgent/GrpcAgentClient.cs | 37 +++ VoyagerAgent/Program.cs | 19 ++ VoyagerAgent/VoyagerAgent.csproj | 30 +++ VoyagerAgent/appsettings.json | 13 + 31 files changed, 1477 insertions(+), 201 deletions(-) create mode 100644 .idea/.idea.WebScrapperPro/.idea/dataSources.local.xml create mode 100644 README.md create mode 100644 ScrapperAPI/AgentGrpc/AgentServiceImpl.cs create mode 100644 ScrapperAPI/Dtos/AgentRow.cs create mode 100644 ScrapperAPI/Interfaces/IAgentRepository.cs create mode 100644 ScrapperAPI/Options/WorkerOptions.cs create mode 100644 ScrapperAPI/Protos/agent.proto create mode 100644 ScrapperAPI/Repositories/AgentRepository.cs create mode 100644 VoyagerAgent/AgentClientOptions.cs create mode 100644 VoyagerAgent/AgentWorker.cs create mode 100644 VoyagerAgent/GrpcAgentClient.cs create mode 100644 VoyagerAgent/Program.cs create mode 100644 VoyagerAgent/VoyagerAgent.csproj create mode 100644 VoyagerAgent/appsettings.json diff --git a/.idea/.idea.WebScrapperPro/.idea/dataSources.local.xml b/.idea/.idea.WebScrapperPro/.idea/dataSources.local.xml new file mode 100644 index 0000000..873b1ce --- /dev/null +++ b/.idea/.idea.WebScrapperPro/.idea/dataSources.local.xml @@ -0,0 +1,48 @@ + + + + + + " + + + master_key + postgres + + + + + + + + + + + + + + + + + " + + + master_key + admin + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..484a363 --- /dev/null +++ b/README.md @@ -0,0 +1,121 @@ +# Voyager (central) + Agentes (opcionais) + +Este ZIP adiciona uma arquitetura **distribuída e opcional** para o Voyager: + +- **Central (ScrapperAPI)** continua capaz de fazer o scrape localmente. +- **Agentes (VoyagerAgent)** são *opcionais*: quando habilitados, eles pegam lotes de URLs do central via **gRPC** e devolvem o conteúdo. +- A coordenação é feita por **lease** no banco (PostgreSQL). Se um agente morrer, o lease expira e outro worker pode recuperar o item. + +## Visão geral + +- Banco: a tabela `queue` ganhou colunas `leased_by` e `lease_expires_at`, além de `attempts` e `last_error`. +- Central expõe um gRPC `AgentService` para: + - `RegisterAgent` (registro + thumbprint do cert) + - `Heartbeat` + - `LeaseWork` (lote de URLs) + - `SubmitResult` (conteúdo + status) +- Segurança: recomendado **mTLS** (TLS mútuo) no endpoint gRPC. + +## Modos (Workers.Mode) + +Em `ScrapperAPI/appsettings*.json`: + +- `LocalOnly`: **somente** worker local. +- `Hybrid` (padrão): local + agentes ao mesmo tempo. +- `PreferAgents`: local só trabalha quando não há agentes ativos (por uma janela de graça). +- `PreferLocal`: (reservado) mantenha local sempre ativo. + +## Como rodar (dev) + +1) Rode o banco e aplique o script: + +- `ScrapperAPI/Scripts/database.sql` + +2) Rode o central: + +- `dotnet run --project ScrapperAPI` + +3) (Opcional) Rode um agente: + +- ajuste `VoyagerAgent/appsettings.json` com `CentralGrpcAddress` e `SessionIds` +- `dotnet run --project VoyagerAgent` + +> Em dev, você pode deixar `Workers:Agents:RequireMutualTls=false` para testar sem cert. + +## Como habilitar mTLS (produção) + +### 1) Gere uma CA local e certs (exemplo) + +> Ajuste paths conforme seu ambiente. + +```bash +# CA +openssl genrsa -out ca.key 4096 +openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -subj "/CN=Voyager-CA" -out ca.crt + +# Servidor (central) +openssl genrsa -out server.key 2048 +openssl req -new -key server.key -subj "/CN=voyager-grpc" -out server.csr +openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 825 -sha256 + +# Agente +openssl genrsa -out agent01.key 2048 +openssl req -new -key agent01.key -subj "/CN=agent-01" -out agent01.csr +openssl x509 -req -in agent01.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out agent01.crt -days 825 -sha256 + +# PFX do agente +openssl pkcs12 -export -out agent-01.pfx -inkey agent01.key -in agent01.crt -certfile ca.crt +``` + +### 2) Configure o Kestrel do central + +A forma mais comum é via `appsettings.Production.json` (exemplo): + +```json +{ + "Kestrel": { + "Endpoints": { + "HttpsGrpc": { + "Url": "https://0.0.0.0:7443", + "Certificate": { + "Path": "./certs/server.pfx", + "Password": "change-me" + } + } + } + }, + "Workers": { + "Agents": { + "Enabled": true, + "RequireMutualTls": true + } + } +} +``` + +> Observação: o código do gRPC **exige cert do cliente** quando `RequireMutualTls=true`. + +### 3) Configure o agente + +Em `VoyagerAgent/appsettings.json`: + +- `ClientCertificatePath` -> `./certs/agent-01.pfx` +- `ClientCertificatePassword` -> senha do PFX +- `CentralGrpcAddress` -> https do central (porta 7443, por exemplo) + +### 4) Registro do agente + +Ao iniciar, o agente chama `RegisterAgent` e o central grava: +- `agent.id` +- `agent.cert_thumbprint` + +Depois disso, os requests são validados pelo thumbprint. + +## O que foi adicionado/alterado + +- `queue`: lease + tentativas +- `agent`: tabela para registrar agentes (thumbprint) +- `IQueueRepository`: lease batch, renew, mark done/failed validando `leased_by` +- `ScrapperAPI`: gRPC `AgentServiceImpl` +- `VoyagerAgent`: Worker Service que faz lease + scrape + submit + diff --git a/ScrapperAPI/AgentGrpc/AgentServiceImpl.cs b/ScrapperAPI/AgentGrpc/AgentServiceImpl.cs new file mode 100644 index 0000000..1effbb5 --- /dev/null +++ b/ScrapperAPI/AgentGrpc/AgentServiceImpl.cs @@ -0,0 +1,199 @@ +using Grpc.Core; +using Grpc.AspNetCore.Server; +using Microsoft.Extensions.Options; +using ScrapperAPI.AgentGrpc; +using ScrapperAPI.Interfaces; +using ScrapperAPI.Options; + +namespace ScrapperAPI.AgentGrpc; + +public sealed class AgentServiceImpl : AgentService.AgentServiceBase +{ + private readonly IAgentRepository _agents; + private readonly IQueueRepository _queue; + private readonly IContentRepository _content; + private readonly WorkerOptions _opts; + + public AgentServiceImpl( + IAgentRepository agents, + IQueueRepository queue, + IContentRepository content, + IOptions options) + { + _agents = agents; + _queue = queue; + _content = content; + _opts = options.Value; + } + + public override async Task RegisterAgent(RegisterAgentRequest request, ServerCallContext context) + { + EnsureAgentsEnabled(); + + var (agentId, displayName) = (request.AgentId?.Trim(), request.DisplayName?.Trim()); + if (string.IsNullOrWhiteSpace(agentId)) + throw new RpcException(new Status(StatusCode.InvalidArgument, "agent_id is required")); + + var thumbprint = GetClientCertThumbprint(context); + await _agents.UpsertAsync(agentId, string.IsNullOrWhiteSpace(displayName) ? null : displayName, thumbprint, context.CancellationToken); + + return new RegisterAgentResponse { Ok = true }; + } + + public override async Task Heartbeat(HeartbeatRequest request, ServerCallContext context) + { + EnsureAgentsEnabled(); + var agentId = request.AgentId?.Trim(); + if (string.IsNullOrWhiteSpace(agentId)) + throw new RpcException(new Status(StatusCode.InvalidArgument, "agent_id is required")); + + await ValidateAgentAsync(agentId, context); + await _agents.TouchAsync(agentId, context.CancellationToken); + + return new HeartbeatResponse { Ok = true }; + } + + public override async Task LeaseWork(LeaseWorkRequest request, ServerCallContext context) + { + EnsureAgentsEnabled(); + + if (_opts.Mode == DistributedMode.LocalOnly) + { + return new LeaseWorkResponse + { + ServerTimeUtcMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + }; + } + + var agentId = request.AgentId?.Trim(); + if (string.IsNullOrWhiteSpace(agentId)) + throw new RpcException(new Status(StatusCode.InvalidArgument, "agent_id is required")); + + await ValidateAgentAsync(agentId, context); + await _agents.TouchAsync(agentId, context.CancellationToken); + + var capacity = Math.Clamp(request.Capacity, 0, 1000); + if (capacity == 0) + { + return new LeaseWorkResponse + { + ServerTimeUtcMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + }; + } + + var workerId = $"agent:{agentId}"; + var leaseFor = TimeSpan.FromSeconds(Math.Max(5, _opts.LeaseSeconds)); + + var batch = await _queue.LeaseBatchAsync(request.SessionId, workerId, capacity, leaseFor, context.CancellationToken); + + var resp = new LeaseWorkResponse + { + ServerTimeUtcMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + }; + + foreach (var it in batch) + { + resp.Items.Add(new WorkItem + { + QueueId = it.Id, + SessionId = it.SessionId, + Url = it.Url, + LeaseExpiresUtcMs = DateTimeOffset.UtcNow.Add(leaseFor).ToUnixTimeMilliseconds() + }); + } + + return resp; + } + + public override async Task SubmitResult(SubmitResultRequest request, ServerCallContext context) + { + EnsureAgentsEnabled(); + + var agentId = request.AgentId?.Trim(); + if (string.IsNullOrWhiteSpace(agentId)) + throw new RpcException(new Status(StatusCode.InvalidArgument, "agent_id is required")); + + await ValidateAgentAsync(agentId, context); + await _agents.TouchAsync(agentId, context.CancellationToken); + + if (request.QueueId <= 0) + throw new RpcException(new Status(StatusCode.InvalidArgument, "queue_id must be > 0")); + + var workerId = $"agent:{agentId}"; + + try + { + if (request.Success) + { + if (request.ContentBytes is { Length: > 0 }) + { + var encoding = string.IsNullOrWhiteSpace(request.ContentEncoding) ? "gzip" : request.ContentEncoding; + var origLen = request.OriginalLength > 0 ? request.OriginalLength : 0; + var compLen = request.CompressedLength > 0 ? request.CompressedLength : request.ContentBytes.Length; + + await _content.SaveCompressedAsync( + request.QueueId, + encoding, + request.ContentBytes.ToByteArray(), + origLen, + compLen, + context.CancellationToken); + } + else + { + await _content.SaveAsync(request.QueueId, request.ContentText ?? string.Empty, context.CancellationToken); + } + + var ok = await _queue.MarkDoneAsync(request.QueueId, workerId, context.CancellationToken); + if (!ok) + return new SubmitResultResponse { Ok = false, Message = "Lease is not valid for this agent" }; + + return new SubmitResultResponse { Ok = true, Message = "Stored" }; + } + + var error = string.IsNullOrWhiteSpace(request.Error) ? "unknown error" : request.Error; + var failed = await _queue.MarkFailedAsync(request.QueueId, workerId, error, context.CancellationToken); + if (!failed) + return new SubmitResultResponse { Ok = false, Message = "Lease is not valid for this agent" }; + + return new SubmitResultResponse { Ok = true, Message = "Marked failed" }; + } + catch (Exception ex) + { + throw new RpcException(new Status(StatusCode.Internal, ex.Message)); + } + } + + private void EnsureAgentsEnabled() + { + if (!_opts.Agents.Enabled) + throw new RpcException(new Status(StatusCode.Unavailable, "Agents are disabled")); + } + + private async Task ValidateAgentAsync(string agentId, ServerCallContext context) + { + var row = await _agents.GetAsync(agentId, context.CancellationToken); + if (row is null) + throw new RpcException(new Status(StatusCode.PermissionDenied, "Agent not registered")); + + if (!row.IsEnabled) + throw new RpcException(new Status(StatusCode.PermissionDenied, "Agent disabled")); + + var thumbprint = GetClientCertThumbprint(context); + if (!string.Equals(row.CertThumbprint, thumbprint, StringComparison.OrdinalIgnoreCase)) + throw new RpcException(new Status(StatusCode.PermissionDenied, "Client certificate does not match agent")); + } + + private string GetClientCertThumbprint(ServerCallContext context) + { + if (!_opts.Agents.RequireMutualTls) + return ""; + + var http = context.GetHttpContext(); + var cert = http.Connection.ClientCertificate; + if (cert is null) + throw new RpcException(new Status(StatusCode.Unauthenticated, "Client certificate is required")); + + return (cert.Thumbprint ?? string.Empty).Replace(" ", string.Empty); + } +} diff --git a/ScrapperAPI/Controllers/ExtractionModelsController.cs b/ScrapperAPI/Controllers/ExtractionModelsController.cs index 47e7974..16e3578 100644 --- a/ScrapperAPI/Controllers/ExtractionModelsController.cs +++ b/ScrapperAPI/Controllers/ExtractionModelsController.cs @@ -24,10 +24,10 @@ public sealed class ExtractionModelsController : ControllerBase public async Task Create([FromBody] CreateExtractionModelRequest req, CancellationToken ct) { var id = await _models.CreateAsync(new CreateExtractionModelDto( - Name: req.Name, - Version: req.Version <= 0 ? 1 : req.Version, - Description: req.Description, - Definition: req.Definition + name: req.Name, + version: req.Version <= 0 ? 1 : req.Version, + description: req.Description, + definition: req.Definition ), ct); return Created($"/extraction-models/{id}", new { id }); diff --git a/ScrapperAPI/Dtos/AgentRow.cs b/ScrapperAPI/Dtos/AgentRow.cs new file mode 100644 index 0000000..93f8e4b --- /dev/null +++ b/ScrapperAPI/Dtos/AgentRow.cs @@ -0,0 +1,10 @@ +namespace ScrapperAPI.Dtos; + +public sealed record AgentRow( + string Id, + string? DisplayName, + string CertThumbprint, + DateTimeOffset CreatedAt, + DateTimeOffset LastSeenAt, + bool IsEnabled +); diff --git a/ScrapperAPI/Dtos/ExtractionModelDtos.cs b/ScrapperAPI/Dtos/ExtractionModelDtos.cs index 80d9b4e..d44ca1c 100644 --- a/ScrapperAPI/Dtos/ExtractionModelDtos.cs +++ b/ScrapperAPI/Dtos/ExtractionModelDtos.cs @@ -2,19 +2,57 @@ using System.Text.Json; namespace ScrapperAPI.Dtos; -public sealed record CreateExtractionModelDto( - string Name, - int Version, - string? Description, - JsonDocument Definition -); +public sealed class CreateExtractionModelDto +{ + public string Name { get; init; } = null!; + public int Version { get; init; } + public string? Description { get; init; } + public JsonDocument Definition { get; init; } = null!; -public sealed record ExtractionModelRow( - long Id, - string Name, - int Version, - string? Description, - JsonDocument Definition, - DateTimeOffset CreatedAt, - DateTimeOffset UpdatedAt -); + public CreateExtractionModelDto() + { + + } + + public CreateExtractionModelDto(string name, int version, string? description, JsonDocument definition) + { + Name = name; + Version = version; + Description = description; + Definition = definition; + } +} + +public sealed class ExtractionModelRow +{ + public long Id { get; init; } + public string Name { get; init; } = null!; + public int Version { get; init; } + public string? Description { get; init; } + public JsonDocument Definition { get; init; } = null!; + public DateTimeOffset CreatedAt { get; init; } + public DateTimeOffset UpdatedAt { get; init; } + + public ExtractionModelRow() + { + + } + + public ExtractionModelRow( + long id, + string name, + int version, + string? description, + JsonDocument definition, + DateTimeOffset createdAt, + DateTimeOffset updatedAt) + { + Id = id; + Name = name; + Version = version; + Description = description; + Definition = definition; + CreatedAt = createdAt; + UpdatedAt = updatedAt; + } +} diff --git a/ScrapperAPI/Dtos/ExtractionRunDtos.cs b/ScrapperAPI/Dtos/ExtractionRunDtos.cs index 71f8150..1b2d121 100644 --- a/ScrapperAPI/Dtos/ExtractionRunDtos.cs +++ b/ScrapperAPI/Dtos/ExtractionRunDtos.cs @@ -3,68 +3,211 @@ using System.Text.Json; namespace ScrapperAPI.Dtos; -public sealed record StartExtractionRequest( - [Required] int SessionId, - [Required] long ModelId, - bool OnlyDone = true -); +public sealed class StartExtractionRequest +{ + [Required] + public int SessionId { get; set; } + + [Required] + public long ModelId { get; set; } + + public bool OnlyDone { get; set; } + + public StartExtractionRequest() + { + } + + public StartExtractionRequest(int sessionId, long modelId, bool onlyDone = true) + { + SessionId = sessionId; + ModelId = modelId; + OnlyDone = onlyDone; + } +} + +public sealed class BulkStartExtractionRequest +{ + [Required] + public long ModelId { get; set; } -public sealed record BulkStartExtractionRequest( - [Required] long ModelId, /// /// Se vazio/nulo, roda para todas as sessions existentes. /// - int[]? SessionIds = null, - bool OnlyDone = true -); + public int[]? SessionIds { get; set; } -public sealed record CreateExtractionRunDto( - long ModelId, - int SessionId -); + public bool OnlyDone { get; set; } -public sealed record ExtractionRunRow( - long Id, - long ModelId, - int SessionId, - short Status, - DateTimeOffset CreatedAt, - DateTimeOffset? StartedAt, - DateTimeOffset? FinishedAt, - int Total, - int Succeeded, - int Failed, - string? Error -); + public BulkStartExtractionRequest() + { + } -public sealed record ExtractionRuntimeStatus( - long RunId, - bool IsRunning, - int Processed, - int Total, - int Succeeded, - int Failed, - int? CurrentQueueId -); + public BulkStartExtractionRequest(long modelId, int[]? sessionIds = null, bool onlyDone = true) + { + ModelId = modelId; + SessionIds = sessionIds; + OnlyDone = onlyDone; + } +} -public sealed record UpsertExtractedDataDto( - long RunId, - long ModelId, - int SessionId, - int QueueId, - JsonDocument ExtractedJson, - bool Success, - string? Error -); +public sealed class CreateExtractionRunDto +{ + public long ModelId { get; set; } -public sealed record ExtractedDataRow( - long Id, - long RunId, - long ModelId, - int SessionId, - int QueueId, - JsonDocument ExtractedJson, - bool Success, - string? Error, - DateTimeOffset ExtractedAt -); + public int SessionId { get; set; } + + public CreateExtractionRunDto() + { + } + + public CreateExtractionRunDto(long modelId, int sessionId) + { + ModelId = modelId; + SessionId = sessionId; + } +} + +public sealed class ExtractionRunRow +{ + public long Id { get; set; } + + public long ModelId { get; set; } + + public int SessionId { get; set; } + + public short Status { get; set; } + + public DateTimeOffset CreatedAt { get; set; } + + public DateTimeOffset? StartedAt { get; set; } + + public DateTimeOffset? FinishedAt { get; set; } + + public int Total { get; set; } + + public int Succeeded { get; set; } + + public int Failed { get; set; } + + public string? Error { get; set; } + + public ExtractionRunRow() + { + } + + public ExtractionRunRow(long id, long modelId, int sessionId, short status, DateTimeOffset createdAt, DateTimeOffset? startedAt, DateTimeOffset? finishedAt, int total, int succeeded, int failed, string? error) + { + Id = id; + ModelId = modelId; + SessionId = sessionId; + Status = status; + CreatedAt = createdAt; + StartedAt = startedAt; + FinishedAt = finishedAt; + Total = total; + Succeeded = succeeded; + Failed = failed; + Error = error; + } +} + +public sealed class ExtractionRuntimeStatus +{ + public long RunId { get; set; } + + public bool IsRunning { get; set; } + + public int Processed { get; set; } + + public int Total { get; set; } + + public int Succeeded { get; set; } + + public int Failed { get; set; } + + public int? CurrentQueueId { get; set; } + + public ExtractionRuntimeStatus() + { + } + + public ExtractionRuntimeStatus(long runId, bool isRunning, int processed, int total, int succeeded, int failed, int? currentQueueId) + { + RunId = runId; + IsRunning = isRunning; + Processed = processed; + Total = total; + Succeeded = succeeded; + Failed = failed; + CurrentQueueId = currentQueueId; + } +} + +public sealed class UpsertExtractedDataDto +{ + public long RunId { get; set; } + + public long ModelId { get; set; } + + public int SessionId { get; set; } + + public int QueueId { get; set; } + + public JsonDocument ExtractedJson { get; set; } + + public bool Success { get; set; } + + public string? Error { get; set; } + + public UpsertExtractedDataDto() + { + } + + public UpsertExtractedDataDto(long runId, long modelId, int sessionId, int queueId, JsonDocument extractedJson, bool success, string? error) + { + RunId = runId; + ModelId = modelId; + SessionId = sessionId; + QueueId = queueId; + ExtractedJson = extractedJson; + Success = success; + Error = error; + } +} + +public sealed class ExtractedDataRow +{ + public long Id { get; set; } + + public long RunId { get; set; } + + public long ModelId { get; set; } + + public int SessionId { get; set; } + + public int QueueId { get; set; } + + public JsonDocument ExtractedJson { get; set; } + + public bool Success { get; set; } + + public string? Error { get; set; } + + public DateTimeOffset ExtractedAt { get; set; } + + public ExtractedDataRow() + { + } + + public ExtractedDataRow(long id, long runId, long modelId, int sessionId, int queueId, JsonDocument extractedJson, bool success, string? error, DateTimeOffset extractedAt) + { + Id = id; + RunId = runId; + ModelId = modelId; + SessionId = sessionId; + QueueId = queueId; + ExtractedJson = extractedJson; + Success = success; + Error = error; + ExtractedAt = extractedAt; + } +} diff --git a/ScrapperAPI/Interfaces/IAgentRepository.cs b/ScrapperAPI/Interfaces/IAgentRepository.cs new file mode 100644 index 0000000..41a7da6 --- /dev/null +++ b/ScrapperAPI/Interfaces/IAgentRepository.cs @@ -0,0 +1,13 @@ +using ScrapperAPI.Dtos; + +namespace ScrapperAPI.Interfaces; + +public interface IAgentRepository +{ + Task UpsertAsync(string agentId, string? displayName, string certThumbprint, CancellationToken ct); + Task IsEnabledAsync(string agentId, CancellationToken ct); + Task GetThumbprintAsync(string agentId, CancellationToken ct); + Task TouchAsync(string agentId, CancellationToken ct); + Task CountActiveAsync(TimeSpan seenWithin, CancellationToken ct); + Task GetAsync(string agentId, CancellationToken ct); +} diff --git a/ScrapperAPI/Interfaces/IContentRepository.cs b/ScrapperAPI/Interfaces/IContentRepository.cs index 57ed479..9925679 100644 --- a/ScrapperAPI/Interfaces/IContentRepository.cs +++ b/ScrapperAPI/Interfaces/IContentRepository.cs @@ -6,6 +6,17 @@ namespace ScrapperAPI.Interfaces; public interface IContentRepository { Task SaveAsync(int queueId, string content, CancellationToken ct); + + /// + /// Saves already-compressed content (e.g. from a remote agent) without recompressing. + /// + Task SaveCompressedAsync( + int queueId, + string contentEncoding, + byte[] contentBytes, + int originalLength, + int compressedLength, + CancellationToken ct); Task GetByQueueIdAsync(int queueId, CancellationToken ct); Task GetCompressedByQueueIdAsync(int queueId, CancellationToken ct); diff --git a/ScrapperAPI/Interfaces/IQueueRepository.cs b/ScrapperAPI/Interfaces/IQueueRepository.cs index aa81131..77fc1a8 100644 --- a/ScrapperAPI/Interfaces/IQueueRepository.cs +++ b/ScrapperAPI/Interfaces/IQueueRepository.cs @@ -8,13 +8,24 @@ public interface IQueueRepository Task GetCountsAsync(int sessionId, CancellationToken ct); /// - /// Pega 1 item pendente e muda para Processing atomica/seguramente. - /// Retorna null se não houver itens pendentes. + /// Pega 1 item pendente e faz "lease" atomico (Processing) para um worker. + /// Retorna null se não houver itens disponíveis. /// - Task TryDequeueAsync(int sessionId, CancellationToken ct); + Task TryDequeueAsync(int sessionId, string workerId, TimeSpan leaseFor, CancellationToken ct); - Task MarkDoneAsync(int queueId, CancellationToken ct); - Task MarkFailedAsync(int queueId, string error, CancellationToken ct); + /// + /// Pega um lote de itens pendentes e faz "lease" atomico (Processing) para um worker. + /// Itens com lease expirado também podem ser reprocessados. + /// + Task> LeaseBatchAsync(int sessionId, string workerId, int take, TimeSpan leaseFor, CancellationToken ct); + + /// + /// Renova o lease de um item (se ele ainda pertence ao mesmo worker). + /// + Task RenewLeaseAsync(int queueId, string workerId, TimeSpan leaseFor, CancellationToken ct); + + Task MarkDoneAsync(int queueId, string workerId, CancellationToken ct); + Task MarkFailedAsync(int queueId, string workerId, string error, CancellationToken ct); // Opcional: resetar stuck processing (se quiser depois) Task RequeueStuckProcessingAsync(int sessionId, TimeSpan olderThan, CancellationToken ct); diff --git a/ScrapperAPI/Options/WorkerOptions.cs b/ScrapperAPI/Options/WorkerOptions.cs new file mode 100644 index 0000000..2006fb4 --- /dev/null +++ b/ScrapperAPI/Options/WorkerOptions.cs @@ -0,0 +1,39 @@ +namespace ScrapperAPI.Options; + +public enum DistributedMode +{ + LocalOnly = 0, + Hybrid = 1, + PreferAgents = 2, + PreferLocal = 3 +} + +public sealed class WorkerOptions +{ + public DistributedMode Mode { get; set; } = DistributedMode.Hybrid; + + /// + /// Lease duration for a queue item before it can be recovered by another worker. + /// + public int LeaseSeconds { get; set; } = 120; + + public LocalWorkerOptions Local { get; set; } = new(); + public AgentOptions Agents { get; set; } = new(); +} + +public sealed class LocalWorkerOptions +{ + public bool Enabled { get; set; } = true; + public int Concurrency { get; set; } = 1; + + /// + /// When Mode=PreferAgents, local worker will run only if no agent was seen within this window. + /// + public int PreferAgentsGraceSeconds { get; set; } = 30; +} + +public sealed class AgentOptions +{ + public bool Enabled { get; set; } = true; + public bool RequireMutualTls { get; set; } = true; +} diff --git a/ScrapperAPI/Program.cs b/ScrapperAPI/Program.cs index 5eeb8b6..73f05a0 100644 --- a/ScrapperAPI/Program.cs +++ b/ScrapperAPI/Program.cs @@ -10,12 +10,14 @@ using ScrapperAPI.Repositories; using ScrapperAPI.Services; using ScrapperAPI.Utils; using ScrapperAPI.Workers; +using ScrapperAPI.AgentGrpc; var builder = WebApplication.CreateBuilder(args); builder.Services.AddOpenApi(); builder.Services.AddSignalR(); builder.Services.AddControllers(); +builder.Services.AddGrpc(); // Authentik (OIDC) - JWT Bearer validation for API + SignalR builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme) @@ -63,6 +65,7 @@ builder.Services.AddAuthorization(options => builder.Services.Configure(builder.Configuration.GetSection("Scraper")); builder.Services.Configure(builder.Configuration.GetSection("Extraction")); +builder.Services.Configure(builder.Configuration.GetSection("Workers")); builder.Services.AddSingleton(sp => { @@ -76,6 +79,7 @@ builder.Services.AddSingleton(); builder.Services.AddScoped(); builder.Services.AddScoped(); builder.Services.AddScoped(); +builder.Services.AddScoped(); // Extraction builder.Services.AddSingleton(); @@ -115,8 +119,10 @@ if (app.Environment.IsDevelopment()) app.MapOpenApi().AllowAnonymous(); } +app.MapGrpcService(); app.MapControllers(); app.MapHub("/ws/scrape").RequireAuthorization(); +app.MapGrpcService().AllowAnonymous(); // app.UseHttpsRedirection(); diff --git a/ScrapperAPI/Protos/agent.proto b/ScrapperAPI/Protos/agent.proto new file mode 100644 index 0000000..03fd288 --- /dev/null +++ b/ScrapperAPI/Protos/agent.proto @@ -0,0 +1,66 @@ +syntax = "proto3"; + +option csharp_namespace = "ScrapperAPI.AgentGrpc"; + +package voyager.agent; + +message RegisterAgentRequest { + string agent_id = 1; + string display_name = 2; +} + +message RegisterAgentResponse { + bool ok = 1; +} + +message HeartbeatRequest { + string agent_id = 1; +} + +message HeartbeatResponse { + bool ok = 1; +} + +message LeaseWorkRequest { + int32 session_id = 1; + string agent_id = 2; + int32 capacity = 3; +} + +message LeaseWorkResponse { + repeated WorkItem items = 1; + int64 server_time_utc_ms = 2; +} + +message WorkItem { + int32 queue_id = 1; + int32 session_id = 2; + string url = 3; + int64 lease_expires_utc_ms = 4; +} + +message SubmitResultRequest { + int32 queue_id = 1; + string agent_id = 2; + bool success = 3; + string error = 4; + + // Content: either plain text (content_text) or compressed bytes (content_bytes). + string content_text = 5; + bytes content_bytes = 6; + string content_encoding = 7; // e.g. "gzip" + int32 original_length = 8; + int32 compressed_length = 9; +} + +message SubmitResultResponse { + bool ok = 1; + string message = 2; +} + +service AgentService { + rpc RegisterAgent(RegisterAgentRequest) returns (RegisterAgentResponse); + rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse); + rpc LeaseWork(LeaseWorkRequest) returns (LeaseWorkResponse); + rpc SubmitResult(SubmitResultRequest) returns (SubmitResultResponse); +} diff --git a/ScrapperAPI/Repositories/AgentRepository.cs b/ScrapperAPI/Repositories/AgentRepository.cs new file mode 100644 index 0000000..bf99c16 --- /dev/null +++ b/ScrapperAPI/Repositories/AgentRepository.cs @@ -0,0 +1,84 @@ +using Dapper; +using ScrapperAPI.Dtos; +using ScrapperAPI.Interfaces; + +namespace ScrapperAPI.Repositories; + +public sealed class AgentRepository : IAgentRepository +{ + private readonly IDbConnectionFactory _db; + + public AgentRepository(IDbConnectionFactory db) => _db = db; + + public async Task UpsertAsync(string agentId, string? displayName, string certThumbprint, CancellationToken ct) + { + const string sql = """ + insert into agent(id, display_name, cert_thumbprint, last_seen_at, is_enabled) + values (@agentId, @displayName, @certThumbprint, now(), true) + on conflict (id) + do update set + display_name = excluded.display_name, + cert_thumbprint = excluded.cert_thumbprint, + last_seen_at = now(); + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + await conn.ExecuteAsync(new CommandDefinition(sql, new { agentId, displayName, certThumbprint }, cancellationToken: ct)); + } + + public async Task IsEnabledAsync(string agentId, CancellationToken ct) + { + const string sql = """ + select is_enabled from agent where id = @agentId; + """; + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.ExecuteScalarAsync(new CommandDefinition(sql, new { agentId }, cancellationToken: ct)); + } + + public async Task GetThumbprintAsync(string agentId, CancellationToken ct) + { + const string sql = """ + select cert_thumbprint from agent where id = @agentId; + """; + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.ExecuteScalarAsync(new CommandDefinition(sql, new { agentId }, cancellationToken: ct)); + } + + public async Task TouchAsync(string agentId, CancellationToken ct) + { + const string sql = """ + update agent set last_seen_at = now() where id = @agentId; + """; + using var conn = await _db.CreateOpenConnectionAsync(ct); + await conn.ExecuteAsync(new CommandDefinition(sql, new { agentId }, cancellationToken: ct)); + } + + public async Task CountActiveAsync(TimeSpan seenWithin, CancellationToken ct) + { + const string sql = """ + select count(*) + from agent + where is_enabled = true + and last_seen_at > now() - (@seenSeconds * interval '1 second'); + """; + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.ExecuteScalarAsync(new CommandDefinition(sql, new { seenSeconds = (int)seenWithin.TotalSeconds }, cancellationToken: ct)); + } + + public async Task GetAsync(string agentId, CancellationToken ct) + { + const string sql = """ + select + id as Id, + display_name as DisplayName, + cert_thumbprint as CertThumbprint, + created_at as CreatedAt, + last_seen_at as LastSeenAt, + is_enabled as IsEnabled + from agent + where id = @agentId; + """; + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.QuerySingleOrDefaultAsync(new CommandDefinition(sql, new { agentId }, cancellationToken: ct)); + } +} diff --git a/ScrapperAPI/Repositories/ContentRepository.cs b/ScrapperAPI/Repositories/ContentRepository.cs index 44ea7e8..175a83d 100644 --- a/ScrapperAPI/Repositories/ContentRepository.cs +++ b/ScrapperAPI/Repositories/ContentRepository.cs @@ -33,6 +33,31 @@ public sealed class ContentRepository : IContentRepository }, cancellationToken: ct)); } + public async Task SaveCompressedAsync( + int queueId, + string contentEncoding, + byte[] contentBytes, + int originalLength, + int compressedLength, + CancellationToken ct) + { + const string sql = """ + insert into content(queue_id, content_encoding, content_bytes, original_length, compressed_length) + values (@queueId, @contentEncoding, @bytes, @origLen, @compLen) + returning id; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.ExecuteScalarAsync(new CommandDefinition(sql, new + { + queueId, + contentEncoding, + bytes = contentBytes, + origLen = originalLength, + compLen = compressedLength + }, cancellationToken: ct)); + } + public async Task GetByQueueIdAsync(int queueId, CancellationToken ct) { const string sql = """ diff --git a/ScrapperAPI/Repositories/ExtractedDataRepository.cs b/ScrapperAPI/Repositories/ExtractedDataRepository.cs index 2483f51..a6e15a8 100644 --- a/ScrapperAPI/Repositories/ExtractedDataRepository.cs +++ b/ScrapperAPI/Repositories/ExtractedDataRepository.cs @@ -47,7 +47,7 @@ public sealed class ExtractedDataRepository : IExtractedDataRepository model_id as ModelId, session_id as SessionId, queue_id as QueueId, - extracted_json::text as extracted_json, + extracted_json::text as extractedJson, success, error, extracted_at as ExtractedAt @@ -71,7 +71,7 @@ public sealed class ExtractedDataRepository : IExtractedDataRepository model_id as ModelId, session_id as SessionId, queue_id as QueueId, - extracted_json::text as extracted_json, + extracted_json::text as extractedJson, success, error, extracted_at as ExtractedAt @@ -86,16 +86,7 @@ public sealed class ExtractedDataRepository : IExtractedDataRepository return row?.ToDto(); } - private sealed record RowRaw( - long Id, - long RunId, - long ModelId, - int SessionId, - int QueueId, - string Extracted_Json, - bool Success, - string? Error, - DateTimeOffset ExtractedAt) + private sealed class RowRaw { public ExtractedDataRow ToDto() => new( Id, @@ -103,10 +94,38 @@ public sealed class ExtractedDataRepository : IExtractedDataRepository ModelId, SessionId, QueueId, - JsonDocument.Parse(Extracted_Json), + JsonDocument.Parse(ExtractedJson ?? "{}"), Success, Error, ExtractedAt ); + + public long Id { get; init; } + public long RunId { get; init; } + public long ModelId { get; init; } + public int SessionId { get; init; } + public int QueueId { get; init; } + public string? ExtractedJson { get; init; } + public bool Success { get; init; } + public string? Error { get; init; } + public DateTimeOffset ExtractedAt { get; init; } + + public RowRaw() + { + + } + + public RowRaw(long id, long runId, long modelId, int sessionId, int queueId, string? extractedJson, bool success, string? error, DateTimeOffset extractedAt) + { + Id = id; + RunId = runId; + ModelId = modelId; + SessionId = sessionId; + QueueId = queueId; + ExtractedJson = extractedJson; + Success = success; + Error = error; + ExtractedAt = extractedAt; + } } } diff --git a/ScrapperAPI/Repositories/ExtractionModelRepository.cs b/ScrapperAPI/Repositories/ExtractionModelRepository.cs index 6c0c86c..1db03e3 100644 --- a/ScrapperAPI/Repositories/ExtractionModelRepository.cs +++ b/ScrapperAPI/Repositories/ExtractionModelRepository.cs @@ -37,7 +37,7 @@ public sealed class ExtractionModelRepository : IExtractionModelRepository name, version, description, - definition::text as definition_json, + definition::text as definitionJson, created_at, updated_at from extraction_model diff --git a/ScrapperAPI/Repositories/QueueRepository.cs b/ScrapperAPI/Repositories/QueueRepository.cs index 47e0b70..6fbc054 100644 --- a/ScrapperAPI/Repositories/QueueRepository.cs +++ b/ScrapperAPI/Repositories/QueueRepository.cs @@ -41,9 +41,16 @@ public sealed class QueueRepository : IQueueRepository new CommandDefinition(sql, new { sessionId }, cancellationToken: ct)); } - public async Task TryDequeueAsync(int sessionId, CancellationToken ct) + public async Task TryDequeueAsync(int sessionId, string workerId, TimeSpan leaseFor, CancellationToken ct) { - // Importante: 1 transação + SKIP LOCKED (permite multi-worker no futuro) + var batch = await LeaseBatchAsync(sessionId, workerId, take: 1, leaseFor, ct); + return batch.FirstOrDefault(); + } + + public async Task> LeaseBatchAsync(int sessionId, string workerId, int take, TimeSpan leaseFor, CancellationToken ct) + { + if (take <= 0) return Array.Empty(); + using var conn = await _db.CreateOpenConnectionAsync(ct); using var tx = conn.BeginTransaction(); @@ -52,15 +59,20 @@ public sealed class QueueRepository : IQueueRepository select id from queue where session_id = @sessionId - and status = 0 + and ( + status = 0 + or (status = 1 and lease_expires_at is not null and lease_expires_at < now()) + ) order by id for update skip locked - limit 1 + limit @take ) update queue q set status = 1, - started_date = now(), - attempts = attempts + 1 + started_date = coalesce(q.started_date, now()), + attempts = q.attempts + 1, + leased_by = @workerId, + lease_expires_at = now() + (@leaseSeconds * interval '1 second') from next where q.id = next.id returning @@ -75,39 +87,72 @@ public sealed class QueueRepository : IQueueRepository q.last_error as LastError; """; - var item = await conn.QuerySingleOrDefaultAsync( - new CommandDefinition(sql, new { sessionId }, transaction: tx, cancellationToken: ct)); + var rows = await conn.QueryAsync( + new CommandDefinition(sql, + new + { + sessionId, + workerId, + take, + leaseSeconds = Math.Max(1, (int)leaseFor.TotalSeconds) + }, + transaction: tx, + cancellationToken: ct)); tx.Commit(); - return item; + return rows.ToList(); } - public async Task MarkDoneAsync(int queueId, CancellationToken ct) + public async Task RenewLeaseAsync(int queueId, string workerId, TimeSpan leaseFor, CancellationToken ct) + { + const string sql = """ + update queue + set lease_expires_at = now() + (@leaseSeconds * interval '1 second') + where id = @queueId + and status = 1 + and leased_by = @workerId + and (lease_expires_at is null or lease_expires_at > now() - interval '5 minutes'); + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + var rows = await conn.ExecuteAsync(new CommandDefinition(sql, + new { queueId, workerId, leaseSeconds = Math.Max(1, (int)leaseFor.TotalSeconds) }, + cancellationToken: ct)); + return rows > 0; + } + + public async Task MarkDoneAsync(int queueId, string workerId, CancellationToken ct) { const string sql = """ update queue set status = 2, finished_date = now(), - last_error = null - where id = @queueId; + last_error = null, + lease_expires_at = null + where id = @queueId + and leased_by = @workerId; """; using var conn = await _db.CreateOpenConnectionAsync(ct); - await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId }, cancellationToken: ct)); + var rows = await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId, workerId }, cancellationToken: ct)); + return rows > 0; } - public async Task MarkFailedAsync(int queueId, string error, CancellationToken ct) + public async Task MarkFailedAsync(int queueId, string workerId, string error, CancellationToken ct) { const string sql = """ update queue set status = 3, finished_date = now(), - last_error = @error - where id = @queueId; + last_error = @error, + lease_expires_at = null + where id = @queueId + and leased_by = @workerId; """; using var conn = await _db.CreateOpenConnectionAsync(ct); - await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId, error }, cancellationToken: ct)); + var rows = await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId, workerId, error }, cancellationToken: ct)); + return rows > 0; } public async Task RequeueStuckProcessingAsync(int sessionId, TimeSpan olderThan, CancellationToken ct) diff --git a/ScrapperAPI/ScrapperAPI.csproj b/ScrapperAPI/ScrapperAPI.csproj index 30dc7ce..5ec09fe 100644 --- a/ScrapperAPI/ScrapperAPI.csproj +++ b/ScrapperAPI/ScrapperAPI.csproj @@ -7,12 +7,21 @@ + + + + all + + + + + .dockerignore diff --git a/ScrapperAPI/Scripts/database.sql b/ScrapperAPI/Scripts/database.sql index 739a5ad..d8ebfde 100644 --- a/ScrapperAPI/Scripts/database.sql +++ b/ScrapperAPI/Scripts/database.sql @@ -16,12 +16,30 @@ create table queue( status smallint not null default 0, started_date timestamp null, finished_date timestamp null, + leased_by text null, + lease_expires_at timestamptz null, attempts int not null default 0, last_error text null, created_date timestamp default now() ); create index idx_queue_session_status on queue(session_id, status); +create index idx_queue_lease on queue(session_id, status, lease_expires_at); + +-- ------------------------------------------------------------ +-- Agents (optional distributed workers) +-- ------------------------------------------------------------ + +drop table if exists agent; + +create table agent( + id text primary key, + display_name text null, + cert_thumbprint text not null, + created_at timestamptz not null default now(), + last_seen_at timestamptz not null default now(), + is_enabled boolean not null default true +); create table content( id serial primary key, diff --git a/ScrapperAPI/Workers/ExtractionCoordinator.cs b/ScrapperAPI/Workers/ExtractionCoordinator.cs index a162aa5..cd55525 100644 --- a/ScrapperAPI/Workers/ExtractionCoordinator.cs +++ b/ScrapperAPI/Workers/ExtractionCoordinator.cs @@ -98,13 +98,13 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi return new ExtractionRuntimeStatus(runId, false, 0, 0, 0, 0, null); return new ExtractionRuntimeStatus( - RunId: r.RunId, - IsRunning: r.IsRunning, - Processed: r.Processed, - Total: r.Total, - Succeeded: r.Succeeded, - Failed: r.Failed, - CurrentQueueId: r.CurrentQueueId + runId: r.RunId, + isRunning: r.IsRunning, + processed: r.Processed, + total: r.Total, + succeeded: r.Succeeded, + failed: r.Failed, + currentQueueId: r.CurrentQueueId ); } @@ -194,13 +194,13 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi using var json = _engine.Extract(html, modelRow.Definition.RootElement); await extracted.UpsertAsync(new UpsertExtractedDataDto( - RunId: runtime.RunId, - ModelId: runtime.ModelId, - SessionId: runtime.SessionId, - QueueId: qid, - ExtractedJson: json, - Success: true, - Error: null + runId: runtime.RunId, + modelId: runtime.ModelId, + sessionId: runtime.SessionId, + queueId: qid, + extractedJson: json, + success: true, + error: null ), hostToken); runtime.Succeeded++; @@ -210,13 +210,13 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi using var errJson = JsonDocument.Parse("{}"); await extracted.UpsertAsync(new UpsertExtractedDataDto( - RunId: runtime.RunId, - ModelId: runtime.ModelId, - SessionId: runtime.SessionId, - QueueId: qid, - ExtractedJson: errJson, - Success: false, - Error: Truncate(ex.Message, 2000) + runId: runtime.RunId, + modelId: runtime.ModelId, + sessionId: runtime.SessionId, + queueId: qid, + extractedJson: errJson, + success: false, + error: Truncate(ex.Message, 2000) ), hostToken); runtime.Failed++; diff --git a/ScrapperAPI/Workers/ScrapeCoordinator.cs b/ScrapperAPI/Workers/ScrapeCoordinator.cs index 8cdc5f4..be5ba4a 100644 --- a/ScrapperAPI/Workers/ScrapeCoordinator.cs +++ b/ScrapperAPI/Workers/ScrapeCoordinator.cs @@ -16,6 +16,7 @@ public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator private readonly IScraperHttpClient _scraperHttp; private readonly IScrapeEventBus _events; private readonly ScraperOptions _opts; + private readonly WorkerOptions _workerOpts; private readonly Channel _startRequests = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); @@ -30,6 +31,7 @@ public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator IHttpClientFactory httpClientFactory, ILogger logger, IOptions options, + IOptions workerOptions, IScraperHttpClient scraperHttp, IScrapeEventBus events) { @@ -37,6 +39,7 @@ public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator _httpClientFactory = httpClientFactory; _logger = logger; _opts = options.Value; + _workerOpts = workerOptions.Value; _scraperHttp = scraperHttp; _events = events; } @@ -120,84 +123,36 @@ public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator try { + if (!_workerOpts.Local.Enabled) + return; + var http = _httpClientFactory.CreateClient("scraper"); + // When PreferAgents: only run local if no agent was recently seen. while (!hostToken.IsCancellationRequested) { + if (_workerOpts.Mode == DistributedMode.PreferAgents) + { + var noAgents = await NoAgentsRecentlySeenAsync(_workerOpts.Local.PreferAgentsGraceSeconds, hostToken); + if (!noAgents) + { + await Task.Delay(TimeSpan.FromSeconds(2), hostToken); + continue; + } + } + // STOP GRACIOSO: não pega próxima URL if (runner.StopRequested) break; + var concurrency = Math.Max(1, _workerOpts.Local.Concurrency); + var leaseFor = TimeSpan.FromSeconds(Math.Max(5, _workerOpts.LeaseSeconds)); - // cria scope (repos scoped vivem aqui dentro) - using var scope = _scopeFactory.CreateScope(); - var queue = scope.ServiceProvider.GetRequiredService(); - var content = scope.ServiceProvider.GetRequiredService(); + var tasks = Enumerable.Range(0, concurrency) + .Select(i => RunLocalWorkerLoopAsync(runner, workerId: $"local:{Environment.MachineName}:{i}", leaseFor, hostToken)) + .ToArray(); - var item = await queue.TryDequeueAsync(runner.SessionId, hostToken); - if (item is null) - break; - - runner.SetCurrent(item.Id, item.Url); - - await _events.PublishAsync(new ScrapeEvent( - ScrapeEventType.ItemStarted, - runner.SessionId, - DateTimeOffset.UtcNow, - QueueId: item.Id, - Url: item.Url - ), hostToken); - - try - { - var html = await _scraperHttp.GetStringWithRetryAsync(item.Url, hostToken); - - await content.SaveAsync(item.Id, html, hostToken); - await queue.MarkDoneAsync(item.Id, hostToken); - - await _events.PublishAsync(new ScrapeEvent( - ScrapeEventType.ItemSucceeded, - runner.SessionId, - DateTimeOffset.UtcNow, - QueueId: item.Id, - Url: item.Url - ), hostToken); - } - catch (Exception ex) - { - await queue.MarkFailedAsync(item.Id, Truncate(ex.ToString(), 8000), hostToken); - - await _events.PublishAsync(new ScrapeEvent( - ScrapeEventType.ItemFailed, - runner.SessionId, - DateTimeOffset.UtcNow, - QueueId: item.Id, - Url: item.Url, - Error: ex.Message - ), hostToken); - } - finally - { - // progresso (snapshot do DB) + percent - var counts = await queue.GetCountsAsync(runner.SessionId, hostToken); - var percent = counts.Total == 0 ? 0 : (double)counts.Done * 100.0 / (double)counts.Total; - - await _events.PublishAsync(new ScrapeEvent( - ScrapeEventType.Progress, - runner.SessionId, - DateTimeOffset.UtcNow, - Total: counts.Total, - Done: counts.Done, - Pending: counts.Pending, - Processing: counts.Processing, - Failed: counts.Failed, - Percent: percent - ), hostToken); - - runner.ClearCurrent(); - - if (!runner.StopRequested && !hostToken.IsCancellationRequested) - await PoliteDelayAsync(hostToken); - } + await Task.WhenAll(tasks); + break; // no more work (or stop requested) } } finally @@ -212,6 +167,97 @@ public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator } } + private async Task RunLocalWorkerLoopAsync(Runner runner, string workerId, TimeSpan leaseFor, CancellationToken hostToken) + { + while (!hostToken.IsCancellationRequested && !runner.StopRequested) + { + using var scope = _scopeFactory.CreateScope(); + var queue = scope.ServiceProvider.GetRequiredService(); + var content = scope.ServiceProvider.GetRequiredService(); + + var item = await queue.TryDequeueAsync(runner.SessionId, workerId, leaseFor, hostToken); + if (item is null) + return; + + runner.SetCurrent(item.Id, item.Url); + + await _events.PublishAsync(new ScrapeEvent( + ScrapeEventType.ItemStarted, + runner.SessionId, + DateTimeOffset.UtcNow, + QueueId: item.Id, + Url: item.Url + ), hostToken); + + try + { + var html = await _scraperHttp.GetStringWithRetryAsync(item.Url, hostToken); + + await content.SaveAsync(item.Id, html, hostToken); + await queue.MarkDoneAsync(item.Id, workerId, hostToken); + + await _events.PublishAsync(new ScrapeEvent( + ScrapeEventType.ItemSucceeded, + runner.SessionId, + DateTimeOffset.UtcNow, + QueueId: item.Id, + Url: item.Url + ), hostToken); + } + catch (Exception ex) + { + await queue.MarkFailedAsync(item.Id, workerId, Truncate(ex.ToString(), 8000), hostToken); + + await _events.PublishAsync(new ScrapeEvent( + ScrapeEventType.ItemFailed, + runner.SessionId, + DateTimeOffset.UtcNow, + QueueId: item.Id, + Url: item.Url, + Error: ex.Message + ), hostToken); + } + finally + { + var counts = await queue.GetCountsAsync(runner.SessionId, hostToken); + var percent = counts.Total == 0 ? 0 : (double)counts.Done * 100.0 / (double)counts.Total; + + await _events.PublishAsync(new ScrapeEvent( + ScrapeEventType.Progress, + runner.SessionId, + DateTimeOffset.UtcNow, + Total: counts.Total, + Done: counts.Done, + Pending: counts.Pending, + Processing: counts.Processing, + Failed: counts.Failed, + Percent: percent + ), hostToken); + + runner.ClearCurrent(); + + if (!runner.StopRequested && !hostToken.IsCancellationRequested) + await PoliteDelayAsync(hostToken); + } + } + } + + private async Task NoAgentsRecentlySeenAsync(int withinSeconds, CancellationToken ct) + { + try + { + using var scope = _scopeFactory.CreateScope(); + var agents = scope.ServiceProvider.GetRequiredService(); + var active = await agents.CountActiveAsync(TimeSpan.FromSeconds(Math.Max(1, withinSeconds)), ct); + return active == 0; + } + catch + { + // If agents table isn't configured yet, default to "no agents". + return true; + } + } + private static async Task FetchHtmlAsync(HttpClient http, string url, CancellationToken ct) { using var req = new HttpRequestMessage(HttpMethod.Get, url); diff --git a/ScrapperAPI/appsettings.Development.json b/ScrapperAPI/appsettings.Development.json index 60701f5..cdeace0 100644 --- a/ScrapperAPI/appsettings.Development.json +++ b/ScrapperAPI/appsettings.Development.json @@ -1,4 +1,16 @@ { + "Kestrel": { + "Endpoints": { + "Http": { + "Url": "http://0.0.0.0:5123", + "Protocols": "Http1" + }, + "Grpc": { + "Url": "https://0.0.0.0:5001", + "Protocols": "Http2" + } + } + }, "Logging": { "LogLevel": { "Default": "Information", @@ -12,5 +24,18 @@ }, "Extraction": { "MaxParallelRuns": 3 + }, + "Workers": { + "Mode": "Hybrid", + "LeaseSeconds": 60, + "Local": { + "Enabled": true, + "Concurrency": 1, + "PreferAgentsGraceSeconds": 15 + }, + "Agents": { + "Enabled": true, + "RequireMutualTls": false + } } } \ No newline at end of file diff --git a/ScrapperAPI/appsettings.json b/ScrapperAPI/appsettings.json index 72e3efd..76b12bb 100644 --- a/ScrapperAPI/appsettings.json +++ b/ScrapperAPI/appsettings.json @@ -23,6 +23,19 @@ "Extraction": { "MaxParallelRuns": 3 }, + "Workers": { + "Mode": "Hybrid", + "LeaseSeconds": 120, + "Local": { + "Enabled": true, + "Concurrency": 1, + "PreferAgentsGraceSeconds": 30 + }, + "Agents": { + "Enabled": true, + "RequireMutualTls": true + } + }, "AllowedHosts": "*", "Authentication": { "Authority": "https://auth.evolucao.io/application/o/web-scrapper/", diff --git a/Voyager.sln b/Voyager.sln index 68a3f05..af9a73f 100644 --- a/Voyager.sln +++ b/Voyager.sln @@ -2,6 +2,8 @@ Microsoft Visual Studio Solution File, Format Version 12.00 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ScrapperAPI", "ScrapperAPI\ScrapperAPI.csproj", "{206F88EA-2109-4DC0-B1E1-45AA8D3D092F}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "VoyagerAgent", "VoyagerAgent\VoyagerAgent.csproj", "{29EADEEB-C9EE-483C-80EC-DFDBA98B23FE}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -12,5 +14,9 @@ Global {206F88EA-2109-4DC0-B1E1-45AA8D3D092F}.Debug|Any CPU.Build.0 = Debug|Any CPU {206F88EA-2109-4DC0-B1E1-45AA8D3D092F}.Release|Any CPU.ActiveCfg = Release|Any CPU {206F88EA-2109-4DC0-B1E1-45AA8D3D092F}.Release|Any CPU.Build.0 = Release|Any CPU + {29EADEEB-C9EE-483C-80EC-DFDBA98B23FE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {29EADEEB-C9EE-483C-80EC-DFDBA98B23FE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {29EADEEB-C9EE-483C-80EC-DFDBA98B23FE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {29EADEEB-C9EE-483C-80EC-DFDBA98B23FE}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/VoyagerAgent/AgentClientOptions.cs b/VoyagerAgent/AgentClientOptions.cs new file mode 100644 index 0000000..569284e --- /dev/null +++ b/VoyagerAgent/AgentClientOptions.cs @@ -0,0 +1,28 @@ +namespace VoyagerAgent; + +public sealed class AgentClientOptions +{ + /// Unique id for this agent (e.g. "agent-01"). + public string AgentId { get; set; } = "agent-01"; + + public string? DisplayName { get; set; } + + /// Central Voyager gRPC endpoint, e.g. "https://voyager.example.com:7443". + public string CentralGrpcAddress { get; set; } = "https://localhost:7443"; + + /// Session ids this agent should pull from. + public int[] SessionIds { get; set; } = Array.Empty(); + + /// How many URLs to request per lease batch. + public int Capacity { get; set; } = 10; + + /// Client certificate (PFX) path for mTLS. + public string ClientCertificatePath { get; set; } = ""; + public string ClientCertificatePassword { get; set; } = ""; + + /// If true, skip strict server certificate validation (dev only). + public bool InsecureSkipServerCertificateValidation { get; set; } = false; + + /// Delay between polls when no work is available. + public int PollDelayMs { get; set; } = 1500; +} diff --git a/VoyagerAgent/AgentWorker.cs b/VoyagerAgent/AgentWorker.cs new file mode 100644 index 0000000..4e3923e --- /dev/null +++ b/VoyagerAgent/AgentWorker.cs @@ -0,0 +1,154 @@ +using System.IO.Compression; +using System.Text; +using Grpc.Core; +using Microsoft.Extensions.Options; +using ScrapperAPI.AgentGrpc; + +namespace VoyagerAgent; + +public sealed class AgentWorker : BackgroundService +{ + private readonly ILogger _logger; + private readonly AgentClientOptions _opts; + private readonly GrpcAgentClient _grpc; + + public AgentWorker(ILogger logger, IOptions opts, GrpcAgentClient grpc) + { + _logger = logger; + _opts = opts.Value; + _grpc = grpc; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + if (_opts.SessionIds.Length == 0) + { + _logger.LogWarning("No Agent:SessionIds configured. Agent will idle."); + } + + var client = _grpc.CreateClient(); + + await TryRegisterAsync(client, stoppingToken); + + using var http = new HttpClient { Timeout = TimeSpan.FromSeconds(30) }; + http.DefaultRequestHeaders.UserAgent.ParseAdd("voyager-agent/1.0"); + + while (!stoppingToken.IsCancellationRequested) + { + var didWork = false; + + foreach (var sessionId in _opts.SessionIds) + { + if (stoppingToken.IsCancellationRequested) break; + + try + { + var lease = await client.LeaseWorkAsync(new LeaseWorkRequest + { + AgentId = _opts.AgentId, + SessionId = sessionId, + Capacity = _opts.Capacity + }, cancellationToken: stoppingToken); + + if (lease.Items.Count == 0) + continue; + + didWork = true; + + foreach (var item in lease.Items) + { + if (stoppingToken.IsCancellationRequested) break; + + try + { + var html = await http.GetStringAsync(item.Url, stoppingToken); + var compressed = GzipCompressUtf8(html, CompressionLevel.Fastest, out var origLen); + + var submit = await client.SubmitResultAsync(new SubmitResultRequest + { + QueueId = item.QueueId, + AgentId = _opts.AgentId, + Success = true, + ContentEncoding = "gzip", + ContentBytes = Google.Protobuf.ByteString.CopyFrom(compressed), + OriginalLength = origLen, + CompressedLength = compressed.Length + }, cancellationToken: stoppingToken); + + if (!submit.Ok) + _logger.LogWarning("SubmitResult not ok for queue {QueueId}: {Message}", item.QueueId, submit.Message); + } + catch (Exception ex) + { + _logger.LogError(ex, "Scrape failed for {Url}", item.Url); + + try + { + await client.SubmitResultAsync(new SubmitResultRequest + { + QueueId = item.QueueId, + AgentId = _opts.AgentId, + Success = false, + Error = ex.Message + }, cancellationToken: stoppingToken); + } + catch (Exception inner) + { + _logger.LogError(inner, "Failed to submit failure status for queue {QueueId}", item.QueueId); + } + } + } + } + catch (RpcException rpc) + { + _logger.LogWarning("gRPC error: {Status} {Detail}", rpc.StatusCode, rpc.Status.Detail); + } + catch (Exception ex) + { + _logger.LogError(ex, "Unhandled error while leasing work."); + } + } + + // heartbeat (best-effort) + try + { + await client.HeartbeatAsync(new HeartbeatRequest { AgentId = _opts.AgentId }, cancellationToken: stoppingToken); + } + catch { /* ignore */ } + + if (!didWork) + await Task.Delay(_opts.PollDelayMs, stoppingToken); + } + } + + private async Task TryRegisterAsync(AgentService.AgentServiceClient client, CancellationToken ct) + { + try + { + await client.RegisterAgentAsync(new RegisterAgentRequest + { + AgentId = _opts.AgentId, + DisplayName = _opts.DisplayName ?? string.Empty + }, cancellationToken: ct); + + _logger.LogInformation("Agent registered as {AgentId}", _opts.AgentId); + } + catch (RpcException rpc) + { + _logger.LogWarning("RegisterAgent failed: {Status} {Detail}", rpc.StatusCode, rpc.Status.Detail); + } + } + + private static byte[] GzipCompressUtf8(string content, CompressionLevel level, out int originalLength) + { + var bytes = Encoding.UTF8.GetBytes(content); + originalLength = bytes.Length; + + using var ms = new MemoryStream(); + using (var gzip = new GZipStream(ms, level, leaveOpen: true)) + { + gzip.Write(bytes, 0, bytes.Length); + } + return ms.ToArray(); + } +} diff --git a/VoyagerAgent/GrpcAgentClient.cs b/VoyagerAgent/GrpcAgentClient.cs new file mode 100644 index 0000000..2451d3b --- /dev/null +++ b/VoyagerAgent/GrpcAgentClient.cs @@ -0,0 +1,37 @@ +using System.Net.Http; +using System.Security.Cryptography.X509Certificates; +using Grpc.Net.Client; +using Microsoft.Extensions.Options; +using ScrapperAPI.AgentGrpc; + +namespace VoyagerAgent; + +public sealed class GrpcAgentClient +{ + private readonly AgentClientOptions _opts; + + public GrpcAgentClient(IOptions options) + { + _opts = options.Value; + } + + public AgentService.AgentServiceClient CreateClient() + { + var handler = new HttpClientHandler(); + + if (!string.IsNullOrWhiteSpace(_opts.ClientCertificatePath)) + { + var cert = new X509Certificate2(_opts.ClientCertificatePath, _opts.ClientCertificatePassword); + handler.ClientCertificates.Add(cert); + } + + if (_opts.InsecureSkipServerCertificateValidation) + { + handler.ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator; + } + + var httpClient = new HttpClient(handler); + var channel = GrpcChannel.ForAddress(_opts.CentralGrpcAddress, new GrpcChannelOptions { HttpClient = httpClient }); + return new AgentService.AgentServiceClient(channel); + } +} diff --git a/VoyagerAgent/Program.cs b/VoyagerAgent/Program.cs new file mode 100644 index 0000000..3f65572 --- /dev/null +++ b/VoyagerAgent/Program.cs @@ -0,0 +1,19 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace VoyagerAgent; + +public static class Program +{ + public static void Main(string[] args) + { + var builder = Host.CreateApplicationBuilder(args); + + builder.Services.Configure(builder.Configuration.GetSection("Agent")); + builder.Services.AddSingleton(); + builder.Services.AddHostedService(); + + var host = builder.Build(); + host.Run(); + } +} diff --git a/VoyagerAgent/VoyagerAgent.csproj b/VoyagerAgent/VoyagerAgent.csproj new file mode 100644 index 0000000..c3e2a1d --- /dev/null +++ b/VoyagerAgent/VoyagerAgent.csproj @@ -0,0 +1,30 @@ + + + net10.0 + enable + enable + + + + + + + all + + + + + + all + + + + + + + + + + + + diff --git a/VoyagerAgent/appsettings.json b/VoyagerAgent/appsettings.json new file mode 100644 index 0000000..37df65f --- /dev/null +++ b/VoyagerAgent/appsettings.json @@ -0,0 +1,13 @@ +{ + "Agent": { + "AgentId": "agent-01", + "DisplayName": "Edge Worker 01", + "CentralGrpcAddress": "http://localhost:5001", + "SessionIds": [1], + "Capacity": 25, + "ClientCertificatePath": "", + "ClientCertificatePassword": "", + "InsecureSkipServerCertificateValidation": true, + "PollDelayMs": 1500 + } +}