diff --git a/.idea/.idea.WebScrapperPro/.idea/dataSources.local.xml b/.idea/.idea.WebScrapperPro/.idea/dataSources.local.xml
new file mode 100644
index 0000000..873b1ce
--- /dev/null
+++ b/.idea/.idea.WebScrapperPro/.idea/dataSources.local.xml
@@ -0,0 +1,48 @@
+
+
+
+
+
+ "
+
+
+ master_key
+ postgres
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ "
+
+
+ master_key
+ admin
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..484a363
--- /dev/null
+++ b/README.md
@@ -0,0 +1,121 @@
+# Voyager (central) + Agentes (opcionais)
+
+Este ZIP adiciona uma arquitetura **distribuída e opcional** para o Voyager:
+
+- **Central (ScrapperAPI)** continua capaz de fazer o scrape localmente.
+- **Agentes (VoyagerAgent)** são *opcionais*: quando habilitados, eles pegam lotes de URLs do central via **gRPC** e devolvem o conteúdo.
+- A coordenação é feita por **lease** no banco (PostgreSQL). Se um agente morrer, o lease expira e outro worker pode recuperar o item.
+
+## Visão geral
+
+- Banco: a tabela `queue` ganhou colunas `leased_by` e `lease_expires_at`, além de `attempts` e `last_error`.
+- Central expõe um gRPC `AgentService` para:
+ - `RegisterAgent` (registro + thumbprint do cert)
+ - `Heartbeat`
+ - `LeaseWork` (lote de URLs)
+ - `SubmitResult` (conteúdo + status)
+- Segurança: recomendado **mTLS** (TLS mútuo) no endpoint gRPC.
+
+## Modos (Workers.Mode)
+
+Em `ScrapperAPI/appsettings*.json`:
+
+- `LocalOnly`: **somente** worker local.
+- `Hybrid` (padrão): local + agentes ao mesmo tempo.
+- `PreferAgents`: local só trabalha quando não há agentes ativos (por uma janela de graça).
+- `PreferLocal`: (reservado) mantenha local sempre ativo.
+
+## Como rodar (dev)
+
+1) Rode o banco e aplique o script:
+
+- `ScrapperAPI/Scripts/database.sql`
+
+2) Rode o central:
+
+- `dotnet run --project ScrapperAPI`
+
+3) (Opcional) Rode um agente:
+
+- ajuste `VoyagerAgent/appsettings.json` com `CentralGrpcAddress` e `SessionIds`
+- `dotnet run --project VoyagerAgent`
+
+> Em dev, você pode deixar `Workers:Agents:RequireMutualTls=false` para testar sem cert.
+
+## Como habilitar mTLS (produção)
+
+### 1) Gere uma CA local e certs (exemplo)
+
+> Ajuste paths conforme seu ambiente.
+
+```bash
+# CA
+openssl genrsa -out ca.key 4096
+openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -subj "/CN=Voyager-CA" -out ca.crt
+
+# Servidor (central)
+openssl genrsa -out server.key 2048
+openssl req -new -key server.key -subj "/CN=voyager-grpc" -out server.csr
+openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 825 -sha256
+
+# Agente
+openssl genrsa -out agent01.key 2048
+openssl req -new -key agent01.key -subj "/CN=agent-01" -out agent01.csr
+openssl x509 -req -in agent01.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out agent01.crt -days 825 -sha256
+
+# PFX do agente
+openssl pkcs12 -export -out agent-01.pfx -inkey agent01.key -in agent01.crt -certfile ca.crt
+```
+
+### 2) Configure o Kestrel do central
+
+A forma mais comum é via `appsettings.Production.json` (exemplo):
+
+```json
+{
+ "Kestrel": {
+ "Endpoints": {
+ "HttpsGrpc": {
+ "Url": "https://0.0.0.0:7443",
+ "Certificate": {
+ "Path": "./certs/server.pfx",
+ "Password": "change-me"
+ }
+ }
+ }
+ },
+ "Workers": {
+ "Agents": {
+ "Enabled": true,
+ "RequireMutualTls": true
+ }
+ }
+}
+```
+
+> Observação: o código do gRPC **exige cert do cliente** quando `RequireMutualTls=true`.
+
+### 3) Configure o agente
+
+Em `VoyagerAgent/appsettings.json`:
+
+- `ClientCertificatePath` -> `./certs/agent-01.pfx`
+- `ClientCertificatePassword` -> senha do PFX
+- `CentralGrpcAddress` -> https do central (porta 7443, por exemplo)
+
+### 4) Registro do agente
+
+Ao iniciar, o agente chama `RegisterAgent` e o central grava:
+- `agent.id`
+- `agent.cert_thumbprint`
+
+Depois disso, os requests são validados pelo thumbprint.
+
+## O que foi adicionado/alterado
+
+- `queue`: lease + tentativas
+- `agent`: tabela para registrar agentes (thumbprint)
+- `IQueueRepository`: lease batch, renew, mark done/failed validando `leased_by`
+- `ScrapperAPI`: gRPC `AgentServiceImpl`
+- `VoyagerAgent`: Worker Service que faz lease + scrape + submit
+
diff --git a/ScrapperAPI/AgentGrpc/AgentServiceImpl.cs b/ScrapperAPI/AgentGrpc/AgentServiceImpl.cs
new file mode 100644
index 0000000..1effbb5
--- /dev/null
+++ b/ScrapperAPI/AgentGrpc/AgentServiceImpl.cs
@@ -0,0 +1,199 @@
+using Grpc.Core;
+using Grpc.AspNetCore.Server;
+using Microsoft.Extensions.Options;
+using ScrapperAPI.AgentGrpc;
+using ScrapperAPI.Interfaces;
+using ScrapperAPI.Options;
+
+namespace ScrapperAPI.AgentGrpc;
+
+public sealed class AgentServiceImpl : AgentService.AgentServiceBase
+{
+ private readonly IAgentRepository _agents;
+ private readonly IQueueRepository _queue;
+ private readonly IContentRepository _content;
+ private readonly WorkerOptions _opts;
+
+ public AgentServiceImpl(
+ IAgentRepository agents,
+ IQueueRepository queue,
+ IContentRepository content,
+ IOptions options)
+ {
+ _agents = agents;
+ _queue = queue;
+ _content = content;
+ _opts = options.Value;
+ }
+
+ public override async Task RegisterAgent(RegisterAgentRequest request, ServerCallContext context)
+ {
+ EnsureAgentsEnabled();
+
+ var (agentId, displayName) = (request.AgentId?.Trim(), request.DisplayName?.Trim());
+ if (string.IsNullOrWhiteSpace(agentId))
+ throw new RpcException(new Status(StatusCode.InvalidArgument, "agent_id is required"));
+
+ var thumbprint = GetClientCertThumbprint(context);
+ await _agents.UpsertAsync(agentId, string.IsNullOrWhiteSpace(displayName) ? null : displayName, thumbprint, context.CancellationToken);
+
+ return new RegisterAgentResponse { Ok = true };
+ }
+
+ public override async Task Heartbeat(HeartbeatRequest request, ServerCallContext context)
+ {
+ EnsureAgentsEnabled();
+ var agentId = request.AgentId?.Trim();
+ if (string.IsNullOrWhiteSpace(agentId))
+ throw new RpcException(new Status(StatusCode.InvalidArgument, "agent_id is required"));
+
+ await ValidateAgentAsync(agentId, context);
+ await _agents.TouchAsync(agentId, context.CancellationToken);
+
+ return new HeartbeatResponse { Ok = true };
+ }
+
+ public override async Task LeaseWork(LeaseWorkRequest request, ServerCallContext context)
+ {
+ EnsureAgentsEnabled();
+
+ if (_opts.Mode == DistributedMode.LocalOnly)
+ {
+ return new LeaseWorkResponse
+ {
+ ServerTimeUtcMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
+ };
+ }
+
+ var agentId = request.AgentId?.Trim();
+ if (string.IsNullOrWhiteSpace(agentId))
+ throw new RpcException(new Status(StatusCode.InvalidArgument, "agent_id is required"));
+
+ await ValidateAgentAsync(agentId, context);
+ await _agents.TouchAsync(agentId, context.CancellationToken);
+
+ var capacity = Math.Clamp(request.Capacity, 0, 1000);
+ if (capacity == 0)
+ {
+ return new LeaseWorkResponse
+ {
+ ServerTimeUtcMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
+ };
+ }
+
+ var workerId = $"agent:{agentId}";
+ var leaseFor = TimeSpan.FromSeconds(Math.Max(5, _opts.LeaseSeconds));
+
+ var batch = await _queue.LeaseBatchAsync(request.SessionId, workerId, capacity, leaseFor, context.CancellationToken);
+
+ var resp = new LeaseWorkResponse
+ {
+ ServerTimeUtcMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
+ };
+
+ foreach (var it in batch)
+ {
+ resp.Items.Add(new WorkItem
+ {
+ QueueId = it.Id,
+ SessionId = it.SessionId,
+ Url = it.Url,
+ LeaseExpiresUtcMs = DateTimeOffset.UtcNow.Add(leaseFor).ToUnixTimeMilliseconds()
+ });
+ }
+
+ return resp;
+ }
+
+ public override async Task SubmitResult(SubmitResultRequest request, ServerCallContext context)
+ {
+ EnsureAgentsEnabled();
+
+ var agentId = request.AgentId?.Trim();
+ if (string.IsNullOrWhiteSpace(agentId))
+ throw new RpcException(new Status(StatusCode.InvalidArgument, "agent_id is required"));
+
+ await ValidateAgentAsync(agentId, context);
+ await _agents.TouchAsync(agentId, context.CancellationToken);
+
+ if (request.QueueId <= 0)
+ throw new RpcException(new Status(StatusCode.InvalidArgument, "queue_id must be > 0"));
+
+ var workerId = $"agent:{agentId}";
+
+ try
+ {
+ if (request.Success)
+ {
+ if (request.ContentBytes is { Length: > 0 })
+ {
+ var encoding = string.IsNullOrWhiteSpace(request.ContentEncoding) ? "gzip" : request.ContentEncoding;
+ var origLen = request.OriginalLength > 0 ? request.OriginalLength : 0;
+ var compLen = request.CompressedLength > 0 ? request.CompressedLength : request.ContentBytes.Length;
+
+ await _content.SaveCompressedAsync(
+ request.QueueId,
+ encoding,
+ request.ContentBytes.ToByteArray(),
+ origLen,
+ compLen,
+ context.CancellationToken);
+ }
+ else
+ {
+ await _content.SaveAsync(request.QueueId, request.ContentText ?? string.Empty, context.CancellationToken);
+ }
+
+ var ok = await _queue.MarkDoneAsync(request.QueueId, workerId, context.CancellationToken);
+ if (!ok)
+ return new SubmitResultResponse { Ok = false, Message = "Lease is not valid for this agent" };
+
+ return new SubmitResultResponse { Ok = true, Message = "Stored" };
+ }
+
+ var error = string.IsNullOrWhiteSpace(request.Error) ? "unknown error" : request.Error;
+ var failed = await _queue.MarkFailedAsync(request.QueueId, workerId, error, context.CancellationToken);
+ if (!failed)
+ return new SubmitResultResponse { Ok = false, Message = "Lease is not valid for this agent" };
+
+ return new SubmitResultResponse { Ok = true, Message = "Marked failed" };
+ }
+ catch (Exception ex)
+ {
+ throw new RpcException(new Status(StatusCode.Internal, ex.Message));
+ }
+ }
+
+ private void EnsureAgentsEnabled()
+ {
+ if (!_opts.Agents.Enabled)
+ throw new RpcException(new Status(StatusCode.Unavailable, "Agents are disabled"));
+ }
+
+ private async Task ValidateAgentAsync(string agentId, ServerCallContext context)
+ {
+ var row = await _agents.GetAsync(agentId, context.CancellationToken);
+ if (row is null)
+ throw new RpcException(new Status(StatusCode.PermissionDenied, "Agent not registered"));
+
+ if (!row.IsEnabled)
+ throw new RpcException(new Status(StatusCode.PermissionDenied, "Agent disabled"));
+
+ var thumbprint = GetClientCertThumbprint(context);
+ if (!string.Equals(row.CertThumbprint, thumbprint, StringComparison.OrdinalIgnoreCase))
+ throw new RpcException(new Status(StatusCode.PermissionDenied, "Client certificate does not match agent"));
+ }
+
+ private string GetClientCertThumbprint(ServerCallContext context)
+ {
+ if (!_opts.Agents.RequireMutualTls)
+ return "";
+
+ var http = context.GetHttpContext();
+ var cert = http.Connection.ClientCertificate;
+ if (cert is null)
+ throw new RpcException(new Status(StatusCode.Unauthenticated, "Client certificate is required"));
+
+ return (cert.Thumbprint ?? string.Empty).Replace(" ", string.Empty);
+ }
+}
diff --git a/ScrapperAPI/Controllers/ExtractionModelsController.cs b/ScrapperAPI/Controllers/ExtractionModelsController.cs
index 47e7974..16e3578 100644
--- a/ScrapperAPI/Controllers/ExtractionModelsController.cs
+++ b/ScrapperAPI/Controllers/ExtractionModelsController.cs
@@ -24,10 +24,10 @@ public sealed class ExtractionModelsController : ControllerBase
public async Task Create([FromBody] CreateExtractionModelRequest req, CancellationToken ct)
{
var id = await _models.CreateAsync(new CreateExtractionModelDto(
- Name: req.Name,
- Version: req.Version <= 0 ? 1 : req.Version,
- Description: req.Description,
- Definition: req.Definition
+ name: req.Name,
+ version: req.Version <= 0 ? 1 : req.Version,
+ description: req.Description,
+ definition: req.Definition
), ct);
return Created($"/extraction-models/{id}", new { id });
diff --git a/ScrapperAPI/Dtos/AgentRow.cs b/ScrapperAPI/Dtos/AgentRow.cs
new file mode 100644
index 0000000..93f8e4b
--- /dev/null
+++ b/ScrapperAPI/Dtos/AgentRow.cs
@@ -0,0 +1,10 @@
+namespace ScrapperAPI.Dtos;
+
+public sealed record AgentRow(
+ string Id,
+ string? DisplayName,
+ string CertThumbprint,
+ DateTimeOffset CreatedAt,
+ DateTimeOffset LastSeenAt,
+ bool IsEnabled
+);
diff --git a/ScrapperAPI/Dtos/ExtractionModelDtos.cs b/ScrapperAPI/Dtos/ExtractionModelDtos.cs
index 80d9b4e..d44ca1c 100644
--- a/ScrapperAPI/Dtos/ExtractionModelDtos.cs
+++ b/ScrapperAPI/Dtos/ExtractionModelDtos.cs
@@ -2,19 +2,57 @@ using System.Text.Json;
namespace ScrapperAPI.Dtos;
-public sealed record CreateExtractionModelDto(
- string Name,
- int Version,
- string? Description,
- JsonDocument Definition
-);
+public sealed class CreateExtractionModelDto
+{
+ public string Name { get; init; } = null!;
+ public int Version { get; init; }
+ public string? Description { get; init; }
+ public JsonDocument Definition { get; init; } = null!;
-public sealed record ExtractionModelRow(
- long Id,
- string Name,
- int Version,
- string? Description,
- JsonDocument Definition,
- DateTimeOffset CreatedAt,
- DateTimeOffset UpdatedAt
-);
+ public CreateExtractionModelDto()
+ {
+
+ }
+
+ public CreateExtractionModelDto(string name, int version, string? description, JsonDocument definition)
+ {
+ Name = name;
+ Version = version;
+ Description = description;
+ Definition = definition;
+ }
+}
+
+public sealed class ExtractionModelRow
+{
+ public long Id { get; init; }
+ public string Name { get; init; } = null!;
+ public int Version { get; init; }
+ public string? Description { get; init; }
+ public JsonDocument Definition { get; init; } = null!;
+ public DateTimeOffset CreatedAt { get; init; }
+ public DateTimeOffset UpdatedAt { get; init; }
+
+ public ExtractionModelRow()
+ {
+
+ }
+
+ public ExtractionModelRow(
+ long id,
+ string name,
+ int version,
+ string? description,
+ JsonDocument definition,
+ DateTimeOffset createdAt,
+ DateTimeOffset updatedAt)
+ {
+ Id = id;
+ Name = name;
+ Version = version;
+ Description = description;
+ Definition = definition;
+ CreatedAt = createdAt;
+ UpdatedAt = updatedAt;
+ }
+}
diff --git a/ScrapperAPI/Dtos/ExtractionRunDtos.cs b/ScrapperAPI/Dtos/ExtractionRunDtos.cs
index 71f8150..1b2d121 100644
--- a/ScrapperAPI/Dtos/ExtractionRunDtos.cs
+++ b/ScrapperAPI/Dtos/ExtractionRunDtos.cs
@@ -3,68 +3,211 @@ using System.Text.Json;
namespace ScrapperAPI.Dtos;
-public sealed record StartExtractionRequest(
- [Required] int SessionId,
- [Required] long ModelId,
- bool OnlyDone = true
-);
+public sealed class StartExtractionRequest
+{
+ [Required]
+ public int SessionId { get; set; }
+
+ [Required]
+ public long ModelId { get; set; }
+
+ public bool OnlyDone { get; set; }
+
+ public StartExtractionRequest()
+ {
+ }
+
+ public StartExtractionRequest(int sessionId, long modelId, bool onlyDone = true)
+ {
+ SessionId = sessionId;
+ ModelId = modelId;
+ OnlyDone = onlyDone;
+ }
+}
+
+public sealed class BulkStartExtractionRequest
+{
+ [Required]
+ public long ModelId { get; set; }
-public sealed record BulkStartExtractionRequest(
- [Required] long ModelId,
///
/// Se vazio/nulo, roda para todas as sessions existentes.
///
- int[]? SessionIds = null,
- bool OnlyDone = true
-);
+ public int[]? SessionIds { get; set; }
-public sealed record CreateExtractionRunDto(
- long ModelId,
- int SessionId
-);
+ public bool OnlyDone { get; set; }
-public sealed record ExtractionRunRow(
- long Id,
- long ModelId,
- int SessionId,
- short Status,
- DateTimeOffset CreatedAt,
- DateTimeOffset? StartedAt,
- DateTimeOffset? FinishedAt,
- int Total,
- int Succeeded,
- int Failed,
- string? Error
-);
+ public BulkStartExtractionRequest()
+ {
+ }
-public sealed record ExtractionRuntimeStatus(
- long RunId,
- bool IsRunning,
- int Processed,
- int Total,
- int Succeeded,
- int Failed,
- int? CurrentQueueId
-);
+ public BulkStartExtractionRequest(long modelId, int[]? sessionIds = null, bool onlyDone = true)
+ {
+ ModelId = modelId;
+ SessionIds = sessionIds;
+ OnlyDone = onlyDone;
+ }
+}
-public sealed record UpsertExtractedDataDto(
- long RunId,
- long ModelId,
- int SessionId,
- int QueueId,
- JsonDocument ExtractedJson,
- bool Success,
- string? Error
-);
+public sealed class CreateExtractionRunDto
+{
+ public long ModelId { get; set; }
-public sealed record ExtractedDataRow(
- long Id,
- long RunId,
- long ModelId,
- int SessionId,
- int QueueId,
- JsonDocument ExtractedJson,
- bool Success,
- string? Error,
- DateTimeOffset ExtractedAt
-);
+ public int SessionId { get; set; }
+
+ public CreateExtractionRunDto()
+ {
+ }
+
+ public CreateExtractionRunDto(long modelId, int sessionId)
+ {
+ ModelId = modelId;
+ SessionId = sessionId;
+ }
+}
+
+public sealed class ExtractionRunRow
+{
+ public long Id { get; set; }
+
+ public long ModelId { get; set; }
+
+ public int SessionId { get; set; }
+
+ public short Status { get; set; }
+
+ public DateTimeOffset CreatedAt { get; set; }
+
+ public DateTimeOffset? StartedAt { get; set; }
+
+ public DateTimeOffset? FinishedAt { get; set; }
+
+ public int Total { get; set; }
+
+ public int Succeeded { get; set; }
+
+ public int Failed { get; set; }
+
+ public string? Error { get; set; }
+
+ public ExtractionRunRow()
+ {
+ }
+
+ public ExtractionRunRow(long id, long modelId, int sessionId, short status, DateTimeOffset createdAt, DateTimeOffset? startedAt, DateTimeOffset? finishedAt, int total, int succeeded, int failed, string? error)
+ {
+ Id = id;
+ ModelId = modelId;
+ SessionId = sessionId;
+ Status = status;
+ CreatedAt = createdAt;
+ StartedAt = startedAt;
+ FinishedAt = finishedAt;
+ Total = total;
+ Succeeded = succeeded;
+ Failed = failed;
+ Error = error;
+ }
+}
+
+public sealed class ExtractionRuntimeStatus
+{
+ public long RunId { get; set; }
+
+ public bool IsRunning { get; set; }
+
+ public int Processed { get; set; }
+
+ public int Total { get; set; }
+
+ public int Succeeded { get; set; }
+
+ public int Failed { get; set; }
+
+ public int? CurrentQueueId { get; set; }
+
+ public ExtractionRuntimeStatus()
+ {
+ }
+
+ public ExtractionRuntimeStatus(long runId, bool isRunning, int processed, int total, int succeeded, int failed, int? currentQueueId)
+ {
+ RunId = runId;
+ IsRunning = isRunning;
+ Processed = processed;
+ Total = total;
+ Succeeded = succeeded;
+ Failed = failed;
+ CurrentQueueId = currentQueueId;
+ }
+}
+
+public sealed class UpsertExtractedDataDto
+{
+ public long RunId { get; set; }
+
+ public long ModelId { get; set; }
+
+ public int SessionId { get; set; }
+
+ public int QueueId { get; set; }
+
+ public JsonDocument ExtractedJson { get; set; }
+
+ public bool Success { get; set; }
+
+ public string? Error { get; set; }
+
+ public UpsertExtractedDataDto()
+ {
+ }
+
+ public UpsertExtractedDataDto(long runId, long modelId, int sessionId, int queueId, JsonDocument extractedJson, bool success, string? error)
+ {
+ RunId = runId;
+ ModelId = modelId;
+ SessionId = sessionId;
+ QueueId = queueId;
+ ExtractedJson = extractedJson;
+ Success = success;
+ Error = error;
+ }
+}
+
+public sealed class ExtractedDataRow
+{
+ public long Id { get; set; }
+
+ public long RunId { get; set; }
+
+ public long ModelId { get; set; }
+
+ public int SessionId { get; set; }
+
+ public int QueueId { get; set; }
+
+ public JsonDocument ExtractedJson { get; set; }
+
+ public bool Success { get; set; }
+
+ public string? Error { get; set; }
+
+ public DateTimeOffset ExtractedAt { get; set; }
+
+ public ExtractedDataRow()
+ {
+ }
+
+ public ExtractedDataRow(long id, long runId, long modelId, int sessionId, int queueId, JsonDocument extractedJson, bool success, string? error, DateTimeOffset extractedAt)
+ {
+ Id = id;
+ RunId = runId;
+ ModelId = modelId;
+ SessionId = sessionId;
+ QueueId = queueId;
+ ExtractedJson = extractedJson;
+ Success = success;
+ Error = error;
+ ExtractedAt = extractedAt;
+ }
+}
diff --git a/ScrapperAPI/Interfaces/IAgentRepository.cs b/ScrapperAPI/Interfaces/IAgentRepository.cs
new file mode 100644
index 0000000..41a7da6
--- /dev/null
+++ b/ScrapperAPI/Interfaces/IAgentRepository.cs
@@ -0,0 +1,13 @@
+using ScrapperAPI.Dtos;
+
+namespace ScrapperAPI.Interfaces;
+
+public interface IAgentRepository
+{
+ Task UpsertAsync(string agentId, string? displayName, string certThumbprint, CancellationToken ct);
+ Task IsEnabledAsync(string agentId, CancellationToken ct);
+ Task GetThumbprintAsync(string agentId, CancellationToken ct);
+ Task TouchAsync(string agentId, CancellationToken ct);
+ Task CountActiveAsync(TimeSpan seenWithin, CancellationToken ct);
+ Task GetAsync(string agentId, CancellationToken ct);
+}
diff --git a/ScrapperAPI/Interfaces/IContentRepository.cs b/ScrapperAPI/Interfaces/IContentRepository.cs
index 57ed479..9925679 100644
--- a/ScrapperAPI/Interfaces/IContentRepository.cs
+++ b/ScrapperAPI/Interfaces/IContentRepository.cs
@@ -6,6 +6,17 @@ namespace ScrapperAPI.Interfaces;
public interface IContentRepository
{
Task SaveAsync(int queueId, string content, CancellationToken ct);
+
+ ///
+ /// Saves already-compressed content (e.g. from a remote agent) without recompressing.
+ ///
+ Task SaveCompressedAsync(
+ int queueId,
+ string contentEncoding,
+ byte[] contentBytes,
+ int originalLength,
+ int compressedLength,
+ CancellationToken ct);
Task GetByQueueIdAsync(int queueId, CancellationToken ct);
Task GetCompressedByQueueIdAsync(int queueId, CancellationToken ct);
diff --git a/ScrapperAPI/Interfaces/IQueueRepository.cs b/ScrapperAPI/Interfaces/IQueueRepository.cs
index aa81131..77fc1a8 100644
--- a/ScrapperAPI/Interfaces/IQueueRepository.cs
+++ b/ScrapperAPI/Interfaces/IQueueRepository.cs
@@ -8,13 +8,24 @@ public interface IQueueRepository
Task GetCountsAsync(int sessionId, CancellationToken ct);
///
- /// Pega 1 item pendente e muda para Processing atomica/seguramente.
- /// Retorna null se não houver itens pendentes.
+ /// Pega 1 item pendente e faz "lease" atomico (Processing) para um worker.
+ /// Retorna null se não houver itens disponíveis.
///
- Task TryDequeueAsync(int sessionId, CancellationToken ct);
+ Task TryDequeueAsync(int sessionId, string workerId, TimeSpan leaseFor, CancellationToken ct);
- Task MarkDoneAsync(int queueId, CancellationToken ct);
- Task MarkFailedAsync(int queueId, string error, CancellationToken ct);
+ ///
+ /// Pega um lote de itens pendentes e faz "lease" atomico (Processing) para um worker.
+ /// Itens com lease expirado também podem ser reprocessados.
+ ///
+ Task> LeaseBatchAsync(int sessionId, string workerId, int take, TimeSpan leaseFor, CancellationToken ct);
+
+ ///
+ /// Renova o lease de um item (se ele ainda pertence ao mesmo worker).
+ ///
+ Task RenewLeaseAsync(int queueId, string workerId, TimeSpan leaseFor, CancellationToken ct);
+
+ Task MarkDoneAsync(int queueId, string workerId, CancellationToken ct);
+ Task MarkFailedAsync(int queueId, string workerId, string error, CancellationToken ct);
// Opcional: resetar stuck processing (se quiser depois)
Task RequeueStuckProcessingAsync(int sessionId, TimeSpan olderThan, CancellationToken ct);
diff --git a/ScrapperAPI/Options/WorkerOptions.cs b/ScrapperAPI/Options/WorkerOptions.cs
new file mode 100644
index 0000000..2006fb4
--- /dev/null
+++ b/ScrapperAPI/Options/WorkerOptions.cs
@@ -0,0 +1,39 @@
+namespace ScrapperAPI.Options;
+
+public enum DistributedMode
+{
+ LocalOnly = 0,
+ Hybrid = 1,
+ PreferAgents = 2,
+ PreferLocal = 3
+}
+
+public sealed class WorkerOptions
+{
+ public DistributedMode Mode { get; set; } = DistributedMode.Hybrid;
+
+ ///
+ /// Lease duration for a queue item before it can be recovered by another worker.
+ ///
+ public int LeaseSeconds { get; set; } = 120;
+
+ public LocalWorkerOptions Local { get; set; } = new();
+ public AgentOptions Agents { get; set; } = new();
+}
+
+public sealed class LocalWorkerOptions
+{
+ public bool Enabled { get; set; } = true;
+ public int Concurrency { get; set; } = 1;
+
+ ///
+ /// When Mode=PreferAgents, local worker will run only if no agent was seen within this window.
+ ///
+ public int PreferAgentsGraceSeconds { get; set; } = 30;
+}
+
+public sealed class AgentOptions
+{
+ public bool Enabled { get; set; } = true;
+ public bool RequireMutualTls { get; set; } = true;
+}
diff --git a/ScrapperAPI/Program.cs b/ScrapperAPI/Program.cs
index 5eeb8b6..73f05a0 100644
--- a/ScrapperAPI/Program.cs
+++ b/ScrapperAPI/Program.cs
@@ -10,12 +10,14 @@ using ScrapperAPI.Repositories;
using ScrapperAPI.Services;
using ScrapperAPI.Utils;
using ScrapperAPI.Workers;
+using ScrapperAPI.AgentGrpc;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddOpenApi();
builder.Services.AddSignalR();
builder.Services.AddControllers();
+builder.Services.AddGrpc();
// Authentik (OIDC) - JWT Bearer validation for API + SignalR
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
@@ -63,6 +65,7 @@ builder.Services.AddAuthorization(options =>
builder.Services.Configure(builder.Configuration.GetSection("Scraper"));
builder.Services.Configure(builder.Configuration.GetSection("Extraction"));
+builder.Services.Configure(builder.Configuration.GetSection("Workers"));
builder.Services.AddSingleton(sp =>
{
@@ -76,6 +79,7 @@ builder.Services.AddSingleton();
builder.Services.AddScoped();
builder.Services.AddScoped();
builder.Services.AddScoped();
+builder.Services.AddScoped();
// Extraction
builder.Services.AddSingleton();
@@ -115,8 +119,10 @@ if (app.Environment.IsDevelopment())
app.MapOpenApi().AllowAnonymous();
}
+app.MapGrpcService();
app.MapControllers();
app.MapHub("/ws/scrape").RequireAuthorization();
+app.MapGrpcService().AllowAnonymous();
// app.UseHttpsRedirection();
diff --git a/ScrapperAPI/Protos/agent.proto b/ScrapperAPI/Protos/agent.proto
new file mode 100644
index 0000000..03fd288
--- /dev/null
+++ b/ScrapperAPI/Protos/agent.proto
@@ -0,0 +1,66 @@
+syntax = "proto3";
+
+option csharp_namespace = "ScrapperAPI.AgentGrpc";
+
+package voyager.agent;
+
+message RegisterAgentRequest {
+ string agent_id = 1;
+ string display_name = 2;
+}
+
+message RegisterAgentResponse {
+ bool ok = 1;
+}
+
+message HeartbeatRequest {
+ string agent_id = 1;
+}
+
+message HeartbeatResponse {
+ bool ok = 1;
+}
+
+message LeaseWorkRequest {
+ int32 session_id = 1;
+ string agent_id = 2;
+ int32 capacity = 3;
+}
+
+message LeaseWorkResponse {
+ repeated WorkItem items = 1;
+ int64 server_time_utc_ms = 2;
+}
+
+message WorkItem {
+ int32 queue_id = 1;
+ int32 session_id = 2;
+ string url = 3;
+ int64 lease_expires_utc_ms = 4;
+}
+
+message SubmitResultRequest {
+ int32 queue_id = 1;
+ string agent_id = 2;
+ bool success = 3;
+ string error = 4;
+
+ // Content: either plain text (content_text) or compressed bytes (content_bytes).
+ string content_text = 5;
+ bytes content_bytes = 6;
+ string content_encoding = 7; // e.g. "gzip"
+ int32 original_length = 8;
+ int32 compressed_length = 9;
+}
+
+message SubmitResultResponse {
+ bool ok = 1;
+ string message = 2;
+}
+
+service AgentService {
+ rpc RegisterAgent(RegisterAgentRequest) returns (RegisterAgentResponse);
+ rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
+ rpc LeaseWork(LeaseWorkRequest) returns (LeaseWorkResponse);
+ rpc SubmitResult(SubmitResultRequest) returns (SubmitResultResponse);
+}
diff --git a/ScrapperAPI/Repositories/AgentRepository.cs b/ScrapperAPI/Repositories/AgentRepository.cs
new file mode 100644
index 0000000..bf99c16
--- /dev/null
+++ b/ScrapperAPI/Repositories/AgentRepository.cs
@@ -0,0 +1,84 @@
+using Dapper;
+using ScrapperAPI.Dtos;
+using ScrapperAPI.Interfaces;
+
+namespace ScrapperAPI.Repositories;
+
+public sealed class AgentRepository : IAgentRepository
+{
+ private readonly IDbConnectionFactory _db;
+
+ public AgentRepository(IDbConnectionFactory db) => _db = db;
+
+ public async Task UpsertAsync(string agentId, string? displayName, string certThumbprint, CancellationToken ct)
+ {
+ const string sql = """
+ insert into agent(id, display_name, cert_thumbprint, last_seen_at, is_enabled)
+ values (@agentId, @displayName, @certThumbprint, now(), true)
+ on conflict (id)
+ do update set
+ display_name = excluded.display_name,
+ cert_thumbprint = excluded.cert_thumbprint,
+ last_seen_at = now();
+ """;
+
+ using var conn = await _db.CreateOpenConnectionAsync(ct);
+ await conn.ExecuteAsync(new CommandDefinition(sql, new { agentId, displayName, certThumbprint }, cancellationToken: ct));
+ }
+
+ public async Task IsEnabledAsync(string agentId, CancellationToken ct)
+ {
+ const string sql = """
+ select is_enabled from agent where id = @agentId;
+ """;
+ using var conn = await _db.CreateOpenConnectionAsync(ct);
+ return await conn.ExecuteScalarAsync(new CommandDefinition(sql, new { agentId }, cancellationToken: ct));
+ }
+
+ public async Task GetThumbprintAsync(string agentId, CancellationToken ct)
+ {
+ const string sql = """
+ select cert_thumbprint from agent where id = @agentId;
+ """;
+ using var conn = await _db.CreateOpenConnectionAsync(ct);
+ return await conn.ExecuteScalarAsync(new CommandDefinition(sql, new { agentId }, cancellationToken: ct));
+ }
+
+ public async Task TouchAsync(string agentId, CancellationToken ct)
+ {
+ const string sql = """
+ update agent set last_seen_at = now() where id = @agentId;
+ """;
+ using var conn = await _db.CreateOpenConnectionAsync(ct);
+ await conn.ExecuteAsync(new CommandDefinition(sql, new { agentId }, cancellationToken: ct));
+ }
+
+ public async Task CountActiveAsync(TimeSpan seenWithin, CancellationToken ct)
+ {
+ const string sql = """
+ select count(*)
+ from agent
+ where is_enabled = true
+ and last_seen_at > now() - (@seenSeconds * interval '1 second');
+ """;
+ using var conn = await _db.CreateOpenConnectionAsync(ct);
+ return await conn.ExecuteScalarAsync(new CommandDefinition(sql, new { seenSeconds = (int)seenWithin.TotalSeconds }, cancellationToken: ct));
+ }
+
+ public async Task GetAsync(string agentId, CancellationToken ct)
+ {
+ const string sql = """
+ select
+ id as Id,
+ display_name as DisplayName,
+ cert_thumbprint as CertThumbprint,
+ created_at as CreatedAt,
+ last_seen_at as LastSeenAt,
+ is_enabled as IsEnabled
+ from agent
+ where id = @agentId;
+ """;
+ using var conn = await _db.CreateOpenConnectionAsync(ct);
+ return await conn.QuerySingleOrDefaultAsync(new CommandDefinition(sql, new { agentId }, cancellationToken: ct));
+ }
+}
diff --git a/ScrapperAPI/Repositories/ContentRepository.cs b/ScrapperAPI/Repositories/ContentRepository.cs
index 44ea7e8..175a83d 100644
--- a/ScrapperAPI/Repositories/ContentRepository.cs
+++ b/ScrapperAPI/Repositories/ContentRepository.cs
@@ -33,6 +33,31 @@ public sealed class ContentRepository : IContentRepository
}, cancellationToken: ct));
}
+ public async Task SaveCompressedAsync(
+ int queueId,
+ string contentEncoding,
+ byte[] contentBytes,
+ int originalLength,
+ int compressedLength,
+ CancellationToken ct)
+ {
+ const string sql = """
+ insert into content(queue_id, content_encoding, content_bytes, original_length, compressed_length)
+ values (@queueId, @contentEncoding, @bytes, @origLen, @compLen)
+ returning id;
+ """;
+
+ using var conn = await _db.CreateOpenConnectionAsync(ct);
+ return await conn.ExecuteScalarAsync(new CommandDefinition(sql, new
+ {
+ queueId,
+ contentEncoding,
+ bytes = contentBytes,
+ origLen = originalLength,
+ compLen = compressedLength
+ }, cancellationToken: ct));
+ }
+
public async Task GetByQueueIdAsync(int queueId, CancellationToken ct)
{
const string sql = """
diff --git a/ScrapperAPI/Repositories/ExtractedDataRepository.cs b/ScrapperAPI/Repositories/ExtractedDataRepository.cs
index 2483f51..a6e15a8 100644
--- a/ScrapperAPI/Repositories/ExtractedDataRepository.cs
+++ b/ScrapperAPI/Repositories/ExtractedDataRepository.cs
@@ -47,7 +47,7 @@ public sealed class ExtractedDataRepository : IExtractedDataRepository
model_id as ModelId,
session_id as SessionId,
queue_id as QueueId,
- extracted_json::text as extracted_json,
+ extracted_json::text as extractedJson,
success,
error,
extracted_at as ExtractedAt
@@ -71,7 +71,7 @@ public sealed class ExtractedDataRepository : IExtractedDataRepository
model_id as ModelId,
session_id as SessionId,
queue_id as QueueId,
- extracted_json::text as extracted_json,
+ extracted_json::text as extractedJson,
success,
error,
extracted_at as ExtractedAt
@@ -86,16 +86,7 @@ public sealed class ExtractedDataRepository : IExtractedDataRepository
return row?.ToDto();
}
- private sealed record RowRaw(
- long Id,
- long RunId,
- long ModelId,
- int SessionId,
- int QueueId,
- string Extracted_Json,
- bool Success,
- string? Error,
- DateTimeOffset ExtractedAt)
+ private sealed class RowRaw
{
public ExtractedDataRow ToDto() => new(
Id,
@@ -103,10 +94,38 @@ public sealed class ExtractedDataRepository : IExtractedDataRepository
ModelId,
SessionId,
QueueId,
- JsonDocument.Parse(Extracted_Json),
+ JsonDocument.Parse(ExtractedJson ?? "{}"),
Success,
Error,
ExtractedAt
);
+
+ public long Id { get; init; }
+ public long RunId { get; init; }
+ public long ModelId { get; init; }
+ public int SessionId { get; init; }
+ public int QueueId { get; init; }
+ public string? ExtractedJson { get; init; }
+ public bool Success { get; init; }
+ public string? Error { get; init; }
+ public DateTimeOffset ExtractedAt { get; init; }
+
+ public RowRaw()
+ {
+
+ }
+
+ public RowRaw(long id, long runId, long modelId, int sessionId, int queueId, string? extractedJson, bool success, string? error, DateTimeOffset extractedAt)
+ {
+ Id = id;
+ RunId = runId;
+ ModelId = modelId;
+ SessionId = sessionId;
+ QueueId = queueId;
+ ExtractedJson = extractedJson;
+ Success = success;
+ Error = error;
+ ExtractedAt = extractedAt;
+ }
}
}
diff --git a/ScrapperAPI/Repositories/ExtractionModelRepository.cs b/ScrapperAPI/Repositories/ExtractionModelRepository.cs
index 6c0c86c..1db03e3 100644
--- a/ScrapperAPI/Repositories/ExtractionModelRepository.cs
+++ b/ScrapperAPI/Repositories/ExtractionModelRepository.cs
@@ -37,7 +37,7 @@ public sealed class ExtractionModelRepository : IExtractionModelRepository
name,
version,
description,
- definition::text as definition_json,
+ definition::text as definitionJson,
created_at,
updated_at
from extraction_model
diff --git a/ScrapperAPI/Repositories/QueueRepository.cs b/ScrapperAPI/Repositories/QueueRepository.cs
index 47e0b70..6fbc054 100644
--- a/ScrapperAPI/Repositories/QueueRepository.cs
+++ b/ScrapperAPI/Repositories/QueueRepository.cs
@@ -41,9 +41,16 @@ public sealed class QueueRepository : IQueueRepository
new CommandDefinition(sql, new { sessionId }, cancellationToken: ct));
}
- public async Task TryDequeueAsync(int sessionId, CancellationToken ct)
+ public async Task TryDequeueAsync(int sessionId, string workerId, TimeSpan leaseFor, CancellationToken ct)
{
- // Importante: 1 transação + SKIP LOCKED (permite multi-worker no futuro)
+ var batch = await LeaseBatchAsync(sessionId, workerId, take: 1, leaseFor, ct);
+ return batch.FirstOrDefault();
+ }
+
+ public async Task> LeaseBatchAsync(int sessionId, string workerId, int take, TimeSpan leaseFor, CancellationToken ct)
+ {
+ if (take <= 0) return Array.Empty();
+
using var conn = await _db.CreateOpenConnectionAsync(ct);
using var tx = conn.BeginTransaction();
@@ -52,15 +59,20 @@ public sealed class QueueRepository : IQueueRepository
select id
from queue
where session_id = @sessionId
- and status = 0
+ and (
+ status = 0
+ or (status = 1 and lease_expires_at is not null and lease_expires_at < now())
+ )
order by id
for update skip locked
- limit 1
+ limit @take
)
update queue q
set status = 1,
- started_date = now(),
- attempts = attempts + 1
+ started_date = coalesce(q.started_date, now()),
+ attempts = q.attempts + 1,
+ leased_by = @workerId,
+ lease_expires_at = now() + (@leaseSeconds * interval '1 second')
from next
where q.id = next.id
returning
@@ -75,39 +87,72 @@ public sealed class QueueRepository : IQueueRepository
q.last_error as LastError;
""";
- var item = await conn.QuerySingleOrDefaultAsync(
- new CommandDefinition(sql, new { sessionId }, transaction: tx, cancellationToken: ct));
+ var rows = await conn.QueryAsync(
+ new CommandDefinition(sql,
+ new
+ {
+ sessionId,
+ workerId,
+ take,
+ leaseSeconds = Math.Max(1, (int)leaseFor.TotalSeconds)
+ },
+ transaction: tx,
+ cancellationToken: ct));
tx.Commit();
- return item;
+ return rows.ToList();
}
- public async Task MarkDoneAsync(int queueId, CancellationToken ct)
+ public async Task RenewLeaseAsync(int queueId, string workerId, TimeSpan leaseFor, CancellationToken ct)
+ {
+ const string sql = """
+ update queue
+ set lease_expires_at = now() + (@leaseSeconds * interval '1 second')
+ where id = @queueId
+ and status = 1
+ and leased_by = @workerId
+ and (lease_expires_at is null or lease_expires_at > now() - interval '5 minutes');
+ """;
+
+ using var conn = await _db.CreateOpenConnectionAsync(ct);
+ var rows = await conn.ExecuteAsync(new CommandDefinition(sql,
+ new { queueId, workerId, leaseSeconds = Math.Max(1, (int)leaseFor.TotalSeconds) },
+ cancellationToken: ct));
+ return rows > 0;
+ }
+
+ public async Task MarkDoneAsync(int queueId, string workerId, CancellationToken ct)
{
const string sql = """
update queue
set status = 2,
finished_date = now(),
- last_error = null
- where id = @queueId;
+ last_error = null,
+ lease_expires_at = null
+ where id = @queueId
+ and leased_by = @workerId;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
- await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId }, cancellationToken: ct));
+ var rows = await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId, workerId }, cancellationToken: ct));
+ return rows > 0;
}
- public async Task MarkFailedAsync(int queueId, string error, CancellationToken ct)
+ public async Task MarkFailedAsync(int queueId, string workerId, string error, CancellationToken ct)
{
const string sql = """
update queue
set status = 3,
finished_date = now(),
- last_error = @error
- where id = @queueId;
+ last_error = @error,
+ lease_expires_at = null
+ where id = @queueId
+ and leased_by = @workerId;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
- await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId, error }, cancellationToken: ct));
+ var rows = await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId, workerId, error }, cancellationToken: ct));
+ return rows > 0;
}
public async Task RequeueStuckProcessingAsync(int sessionId, TimeSpan olderThan, CancellationToken ct)
diff --git a/ScrapperAPI/ScrapperAPI.csproj b/ScrapperAPI/ScrapperAPI.csproj
index 30dc7ce..5ec09fe 100644
--- a/ScrapperAPI/ScrapperAPI.csproj
+++ b/ScrapperAPI/ScrapperAPI.csproj
@@ -7,12 +7,21 @@
+
+
+
+ all
+
+
+
+
+
.dockerignore
diff --git a/ScrapperAPI/Scripts/database.sql b/ScrapperAPI/Scripts/database.sql
index 739a5ad..d8ebfde 100644
--- a/ScrapperAPI/Scripts/database.sql
+++ b/ScrapperAPI/Scripts/database.sql
@@ -16,12 +16,30 @@ create table queue(
status smallint not null default 0,
started_date timestamp null,
finished_date timestamp null,
+ leased_by text null,
+ lease_expires_at timestamptz null,
attempts int not null default 0,
last_error text null,
created_date timestamp default now()
);
create index idx_queue_session_status on queue(session_id, status);
+create index idx_queue_lease on queue(session_id, status, lease_expires_at);
+
+-- ------------------------------------------------------------
+-- Agents (optional distributed workers)
+-- ------------------------------------------------------------
+
+drop table if exists agent;
+
+create table agent(
+ id text primary key,
+ display_name text null,
+ cert_thumbprint text not null,
+ created_at timestamptz not null default now(),
+ last_seen_at timestamptz not null default now(),
+ is_enabled boolean not null default true
+);
create table content(
id serial primary key,
diff --git a/ScrapperAPI/Workers/ExtractionCoordinator.cs b/ScrapperAPI/Workers/ExtractionCoordinator.cs
index a162aa5..cd55525 100644
--- a/ScrapperAPI/Workers/ExtractionCoordinator.cs
+++ b/ScrapperAPI/Workers/ExtractionCoordinator.cs
@@ -98,13 +98,13 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi
return new ExtractionRuntimeStatus(runId, false, 0, 0, 0, 0, null);
return new ExtractionRuntimeStatus(
- RunId: r.RunId,
- IsRunning: r.IsRunning,
- Processed: r.Processed,
- Total: r.Total,
- Succeeded: r.Succeeded,
- Failed: r.Failed,
- CurrentQueueId: r.CurrentQueueId
+ runId: r.RunId,
+ isRunning: r.IsRunning,
+ processed: r.Processed,
+ total: r.Total,
+ succeeded: r.Succeeded,
+ failed: r.Failed,
+ currentQueueId: r.CurrentQueueId
);
}
@@ -194,13 +194,13 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi
using var json = _engine.Extract(html, modelRow.Definition.RootElement);
await extracted.UpsertAsync(new UpsertExtractedDataDto(
- RunId: runtime.RunId,
- ModelId: runtime.ModelId,
- SessionId: runtime.SessionId,
- QueueId: qid,
- ExtractedJson: json,
- Success: true,
- Error: null
+ runId: runtime.RunId,
+ modelId: runtime.ModelId,
+ sessionId: runtime.SessionId,
+ queueId: qid,
+ extractedJson: json,
+ success: true,
+ error: null
), hostToken);
runtime.Succeeded++;
@@ -210,13 +210,13 @@ public sealed class ExtractionCoordinator : BackgroundService, IExtractionCoordi
using var errJson = JsonDocument.Parse("{}");
await extracted.UpsertAsync(new UpsertExtractedDataDto(
- RunId: runtime.RunId,
- ModelId: runtime.ModelId,
- SessionId: runtime.SessionId,
- QueueId: qid,
- ExtractedJson: errJson,
- Success: false,
- Error: Truncate(ex.Message, 2000)
+ runId: runtime.RunId,
+ modelId: runtime.ModelId,
+ sessionId: runtime.SessionId,
+ queueId: qid,
+ extractedJson: errJson,
+ success: false,
+ error: Truncate(ex.Message, 2000)
), hostToken);
runtime.Failed++;
diff --git a/ScrapperAPI/Workers/ScrapeCoordinator.cs b/ScrapperAPI/Workers/ScrapeCoordinator.cs
index 8cdc5f4..be5ba4a 100644
--- a/ScrapperAPI/Workers/ScrapeCoordinator.cs
+++ b/ScrapperAPI/Workers/ScrapeCoordinator.cs
@@ -16,6 +16,7 @@ public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator
private readonly IScraperHttpClient _scraperHttp;
private readonly IScrapeEventBus _events;
private readonly ScraperOptions _opts;
+ private readonly WorkerOptions _workerOpts;
private readonly Channel _startRequests = Channel.CreateUnbounded(
new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
@@ -30,6 +31,7 @@ public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator
IHttpClientFactory httpClientFactory,
ILogger logger,
IOptions options,
+ IOptions workerOptions,
IScraperHttpClient scraperHttp,
IScrapeEventBus events)
{
@@ -37,6 +39,7 @@ public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator
_httpClientFactory = httpClientFactory;
_logger = logger;
_opts = options.Value;
+ _workerOpts = workerOptions.Value;
_scraperHttp = scraperHttp;
_events = events;
}
@@ -120,84 +123,36 @@ public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator
try
{
+ if (!_workerOpts.Local.Enabled)
+ return;
+
var http = _httpClientFactory.CreateClient("scraper");
+ // When PreferAgents: only run local if no agent was recently seen.
while (!hostToken.IsCancellationRequested)
{
+ if (_workerOpts.Mode == DistributedMode.PreferAgents)
+ {
+ var noAgents = await NoAgentsRecentlySeenAsync(_workerOpts.Local.PreferAgentsGraceSeconds, hostToken);
+ if (!noAgents)
+ {
+ await Task.Delay(TimeSpan.FromSeconds(2), hostToken);
+ continue;
+ }
+ }
+
// STOP GRACIOSO: não pega próxima URL
if (runner.StopRequested)
break;
+ var concurrency = Math.Max(1, _workerOpts.Local.Concurrency);
+ var leaseFor = TimeSpan.FromSeconds(Math.Max(5, _workerOpts.LeaseSeconds));
- // cria scope (repos scoped vivem aqui dentro)
- using var scope = _scopeFactory.CreateScope();
- var queue = scope.ServiceProvider.GetRequiredService();
- var content = scope.ServiceProvider.GetRequiredService();
+ var tasks = Enumerable.Range(0, concurrency)
+ .Select(i => RunLocalWorkerLoopAsync(runner, workerId: $"local:{Environment.MachineName}:{i}", leaseFor, hostToken))
+ .ToArray();
- var item = await queue.TryDequeueAsync(runner.SessionId, hostToken);
- if (item is null)
- break;
-
- runner.SetCurrent(item.Id, item.Url);
-
- await _events.PublishAsync(new ScrapeEvent(
- ScrapeEventType.ItemStarted,
- runner.SessionId,
- DateTimeOffset.UtcNow,
- QueueId: item.Id,
- Url: item.Url
- ), hostToken);
-
- try
- {
- var html = await _scraperHttp.GetStringWithRetryAsync(item.Url, hostToken);
-
- await content.SaveAsync(item.Id, html, hostToken);
- await queue.MarkDoneAsync(item.Id, hostToken);
-
- await _events.PublishAsync(new ScrapeEvent(
- ScrapeEventType.ItemSucceeded,
- runner.SessionId,
- DateTimeOffset.UtcNow,
- QueueId: item.Id,
- Url: item.Url
- ), hostToken);
- }
- catch (Exception ex)
- {
- await queue.MarkFailedAsync(item.Id, Truncate(ex.ToString(), 8000), hostToken);
-
- await _events.PublishAsync(new ScrapeEvent(
- ScrapeEventType.ItemFailed,
- runner.SessionId,
- DateTimeOffset.UtcNow,
- QueueId: item.Id,
- Url: item.Url,
- Error: ex.Message
- ), hostToken);
- }
- finally
- {
- // progresso (snapshot do DB) + percent
- var counts = await queue.GetCountsAsync(runner.SessionId, hostToken);
- var percent = counts.Total == 0 ? 0 : (double)counts.Done * 100.0 / (double)counts.Total;
-
- await _events.PublishAsync(new ScrapeEvent(
- ScrapeEventType.Progress,
- runner.SessionId,
- DateTimeOffset.UtcNow,
- Total: counts.Total,
- Done: counts.Done,
- Pending: counts.Pending,
- Processing: counts.Processing,
- Failed: counts.Failed,
- Percent: percent
- ), hostToken);
-
- runner.ClearCurrent();
-
- if (!runner.StopRequested && !hostToken.IsCancellationRequested)
- await PoliteDelayAsync(hostToken);
- }
+ await Task.WhenAll(tasks);
+ break; // no more work (or stop requested)
}
}
finally
@@ -212,6 +167,97 @@ public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator
}
}
+ private async Task RunLocalWorkerLoopAsync(Runner runner, string workerId, TimeSpan leaseFor, CancellationToken hostToken)
+ {
+ while (!hostToken.IsCancellationRequested && !runner.StopRequested)
+ {
+ using var scope = _scopeFactory.CreateScope();
+ var queue = scope.ServiceProvider.GetRequiredService();
+ var content = scope.ServiceProvider.GetRequiredService();
+
+ var item = await queue.TryDequeueAsync(runner.SessionId, workerId, leaseFor, hostToken);
+ if (item is null)
+ return;
+
+ runner.SetCurrent(item.Id, item.Url);
+
+ await _events.PublishAsync(new ScrapeEvent(
+ ScrapeEventType.ItemStarted,
+ runner.SessionId,
+ DateTimeOffset.UtcNow,
+ QueueId: item.Id,
+ Url: item.Url
+ ), hostToken);
+
+ try
+ {
+ var html = await _scraperHttp.GetStringWithRetryAsync(item.Url, hostToken);
+
+ await content.SaveAsync(item.Id, html, hostToken);
+ await queue.MarkDoneAsync(item.Id, workerId, hostToken);
+
+ await _events.PublishAsync(new ScrapeEvent(
+ ScrapeEventType.ItemSucceeded,
+ runner.SessionId,
+ DateTimeOffset.UtcNow,
+ QueueId: item.Id,
+ Url: item.Url
+ ), hostToken);
+ }
+ catch (Exception ex)
+ {
+ await queue.MarkFailedAsync(item.Id, workerId, Truncate(ex.ToString(), 8000), hostToken);
+
+ await _events.PublishAsync(new ScrapeEvent(
+ ScrapeEventType.ItemFailed,
+ runner.SessionId,
+ DateTimeOffset.UtcNow,
+ QueueId: item.Id,
+ Url: item.Url,
+ Error: ex.Message
+ ), hostToken);
+ }
+ finally
+ {
+ var counts = await queue.GetCountsAsync(runner.SessionId, hostToken);
+ var percent = counts.Total == 0 ? 0 : (double)counts.Done * 100.0 / (double)counts.Total;
+
+ await _events.PublishAsync(new ScrapeEvent(
+ ScrapeEventType.Progress,
+ runner.SessionId,
+ DateTimeOffset.UtcNow,
+ Total: counts.Total,
+ Done: counts.Done,
+ Pending: counts.Pending,
+ Processing: counts.Processing,
+ Failed: counts.Failed,
+ Percent: percent
+ ), hostToken);
+
+ runner.ClearCurrent();
+
+ if (!runner.StopRequested && !hostToken.IsCancellationRequested)
+ await PoliteDelayAsync(hostToken);
+ }
+ }
+ }
+
+ private async Task NoAgentsRecentlySeenAsync(int withinSeconds, CancellationToken ct)
+ {
+ try
+ {
+ using var scope = _scopeFactory.CreateScope();
+ var agents = scope.ServiceProvider.GetRequiredService();
+ var active = await agents.CountActiveAsync(TimeSpan.FromSeconds(Math.Max(1, withinSeconds)), ct);
+ return active == 0;
+ }
+ catch
+ {
+ // If agents table isn't configured yet, default to "no agents".
+ return true;
+ }
+ }
+
private static async Task FetchHtmlAsync(HttpClient http, string url, CancellationToken ct)
{
using var req = new HttpRequestMessage(HttpMethod.Get, url);
diff --git a/ScrapperAPI/appsettings.Development.json b/ScrapperAPI/appsettings.Development.json
index 60701f5..cdeace0 100644
--- a/ScrapperAPI/appsettings.Development.json
+++ b/ScrapperAPI/appsettings.Development.json
@@ -1,4 +1,16 @@
{
+ "Kestrel": {
+ "Endpoints": {
+ "Http": {
+ "Url": "http://0.0.0.0:5123",
+ "Protocols": "Http1"
+ },
+ "Grpc": {
+ "Url": "https://0.0.0.0:5001",
+ "Protocols": "Http2"
+ }
+ }
+ },
"Logging": {
"LogLevel": {
"Default": "Information",
@@ -12,5 +24,18 @@
},
"Extraction": {
"MaxParallelRuns": 3
+ },
+ "Workers": {
+ "Mode": "Hybrid",
+ "LeaseSeconds": 60,
+ "Local": {
+ "Enabled": true,
+ "Concurrency": 1,
+ "PreferAgentsGraceSeconds": 15
+ },
+ "Agents": {
+ "Enabled": true,
+ "RequireMutualTls": false
+ }
}
}
\ No newline at end of file
diff --git a/ScrapperAPI/appsettings.json b/ScrapperAPI/appsettings.json
index 72e3efd..76b12bb 100644
--- a/ScrapperAPI/appsettings.json
+++ b/ScrapperAPI/appsettings.json
@@ -23,6 +23,19 @@
"Extraction": {
"MaxParallelRuns": 3
},
+ "Workers": {
+ "Mode": "Hybrid",
+ "LeaseSeconds": 120,
+ "Local": {
+ "Enabled": true,
+ "Concurrency": 1,
+ "PreferAgentsGraceSeconds": 30
+ },
+ "Agents": {
+ "Enabled": true,
+ "RequireMutualTls": true
+ }
+ },
"AllowedHosts": "*",
"Authentication": {
"Authority": "https://auth.evolucao.io/application/o/web-scrapper/",
diff --git a/Voyager.sln b/Voyager.sln
index 68a3f05..af9a73f 100644
--- a/Voyager.sln
+++ b/Voyager.sln
@@ -2,6 +2,8 @@
Microsoft Visual Studio Solution File, Format Version 12.00
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ScrapperAPI", "ScrapperAPI\ScrapperAPI.csproj", "{206F88EA-2109-4DC0-B1E1-45AA8D3D092F}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "VoyagerAgent", "VoyagerAgent\VoyagerAgent.csproj", "{29EADEEB-C9EE-483C-80EC-DFDBA98B23FE}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -12,5 +14,9 @@ Global
{206F88EA-2109-4DC0-B1E1-45AA8D3D092F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{206F88EA-2109-4DC0-B1E1-45AA8D3D092F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{206F88EA-2109-4DC0-B1E1-45AA8D3D092F}.Release|Any CPU.Build.0 = Release|Any CPU
+ {29EADEEB-C9EE-483C-80EC-DFDBA98B23FE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {29EADEEB-C9EE-483C-80EC-DFDBA98B23FE}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {29EADEEB-C9EE-483C-80EC-DFDBA98B23FE}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {29EADEEB-C9EE-483C-80EC-DFDBA98B23FE}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
diff --git a/VoyagerAgent/AgentClientOptions.cs b/VoyagerAgent/AgentClientOptions.cs
new file mode 100644
index 0000000..569284e
--- /dev/null
+++ b/VoyagerAgent/AgentClientOptions.cs
@@ -0,0 +1,28 @@
+namespace VoyagerAgent;
+
+public sealed class AgentClientOptions
+{
+ /// Unique id for this agent (e.g. "agent-01").
+ public string AgentId { get; set; } = "agent-01";
+
+ public string? DisplayName { get; set; }
+
+ /// Central Voyager gRPC endpoint, e.g. "https://voyager.example.com:7443".
+ public string CentralGrpcAddress { get; set; } = "https://localhost:7443";
+
+ /// Session ids this agent should pull from.
+ public int[] SessionIds { get; set; } = Array.Empty();
+
+ /// How many URLs to request per lease batch.
+ public int Capacity { get; set; } = 10;
+
+ /// Client certificate (PFX) path for mTLS.
+ public string ClientCertificatePath { get; set; } = "";
+ public string ClientCertificatePassword { get; set; } = "";
+
+ /// If true, skip strict server certificate validation (dev only).
+ public bool InsecureSkipServerCertificateValidation { get; set; } = false;
+
+ /// Delay between polls when no work is available.
+ public int PollDelayMs { get; set; } = 1500;
+}
diff --git a/VoyagerAgent/AgentWorker.cs b/VoyagerAgent/AgentWorker.cs
new file mode 100644
index 0000000..4e3923e
--- /dev/null
+++ b/VoyagerAgent/AgentWorker.cs
@@ -0,0 +1,154 @@
+using System.IO.Compression;
+using System.Text;
+using Grpc.Core;
+using Microsoft.Extensions.Options;
+using ScrapperAPI.AgentGrpc;
+
+namespace VoyagerAgent;
+
+public sealed class AgentWorker : BackgroundService
+{
+ private readonly ILogger _logger;
+ private readonly AgentClientOptions _opts;
+ private readonly GrpcAgentClient _grpc;
+
+ public AgentWorker(ILogger logger, IOptions opts, GrpcAgentClient grpc)
+ {
+ _logger = logger;
+ _opts = opts.Value;
+ _grpc = grpc;
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ if (_opts.SessionIds.Length == 0)
+ {
+ _logger.LogWarning("No Agent:SessionIds configured. Agent will idle.");
+ }
+
+ var client = _grpc.CreateClient();
+
+ await TryRegisterAsync(client, stoppingToken);
+
+ using var http = new HttpClient { Timeout = TimeSpan.FromSeconds(30) };
+ http.DefaultRequestHeaders.UserAgent.ParseAdd("voyager-agent/1.0");
+
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ var didWork = false;
+
+ foreach (var sessionId in _opts.SessionIds)
+ {
+ if (stoppingToken.IsCancellationRequested) break;
+
+ try
+ {
+ var lease = await client.LeaseWorkAsync(new LeaseWorkRequest
+ {
+ AgentId = _opts.AgentId,
+ SessionId = sessionId,
+ Capacity = _opts.Capacity
+ }, cancellationToken: stoppingToken);
+
+ if (lease.Items.Count == 0)
+ continue;
+
+ didWork = true;
+
+ foreach (var item in lease.Items)
+ {
+ if (stoppingToken.IsCancellationRequested) break;
+
+ try
+ {
+ var html = await http.GetStringAsync(item.Url, stoppingToken);
+ var compressed = GzipCompressUtf8(html, CompressionLevel.Fastest, out var origLen);
+
+ var submit = await client.SubmitResultAsync(new SubmitResultRequest
+ {
+ QueueId = item.QueueId,
+ AgentId = _opts.AgentId,
+ Success = true,
+ ContentEncoding = "gzip",
+ ContentBytes = Google.Protobuf.ByteString.CopyFrom(compressed),
+ OriginalLength = origLen,
+ CompressedLength = compressed.Length
+ }, cancellationToken: stoppingToken);
+
+ if (!submit.Ok)
+ _logger.LogWarning("SubmitResult not ok for queue {QueueId}: {Message}", item.QueueId, submit.Message);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Scrape failed for {Url}", item.Url);
+
+ try
+ {
+ await client.SubmitResultAsync(new SubmitResultRequest
+ {
+ QueueId = item.QueueId,
+ AgentId = _opts.AgentId,
+ Success = false,
+ Error = ex.Message
+ }, cancellationToken: stoppingToken);
+ }
+ catch (Exception inner)
+ {
+ _logger.LogError(inner, "Failed to submit failure status for queue {QueueId}", item.QueueId);
+ }
+ }
+ }
+ }
+ catch (RpcException rpc)
+ {
+ _logger.LogWarning("gRPC error: {Status} {Detail}", rpc.StatusCode, rpc.Status.Detail);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Unhandled error while leasing work.");
+ }
+ }
+
+ // heartbeat (best-effort)
+ try
+ {
+ await client.HeartbeatAsync(new HeartbeatRequest { AgentId = _opts.AgentId }, cancellationToken: stoppingToken);
+ }
+ catch { /* ignore */ }
+
+ if (!didWork)
+ await Task.Delay(_opts.PollDelayMs, stoppingToken);
+ }
+ }
+
+ private async Task TryRegisterAsync(AgentService.AgentServiceClient client, CancellationToken ct)
+ {
+ try
+ {
+ await client.RegisterAgentAsync(new RegisterAgentRequest
+ {
+ AgentId = _opts.AgentId,
+ DisplayName = _opts.DisplayName ?? string.Empty
+ }, cancellationToken: ct);
+
+ _logger.LogInformation("Agent registered as {AgentId}", _opts.AgentId);
+ }
+ catch (RpcException rpc)
+ {
+ _logger.LogWarning("RegisterAgent failed: {Status} {Detail}", rpc.StatusCode, rpc.Status.Detail);
+ }
+ }
+
+ private static byte[] GzipCompressUtf8(string content, CompressionLevel level, out int originalLength)
+ {
+ var bytes = Encoding.UTF8.GetBytes(content);
+ originalLength = bytes.Length;
+
+ using var ms = new MemoryStream();
+ using (var gzip = new GZipStream(ms, level, leaveOpen: true))
+ {
+ gzip.Write(bytes, 0, bytes.Length);
+ }
+ return ms.ToArray();
+ }
+}
diff --git a/VoyagerAgent/GrpcAgentClient.cs b/VoyagerAgent/GrpcAgentClient.cs
new file mode 100644
index 0000000..2451d3b
--- /dev/null
+++ b/VoyagerAgent/GrpcAgentClient.cs
@@ -0,0 +1,37 @@
+using System.Net.Http;
+using System.Security.Cryptography.X509Certificates;
+using Grpc.Net.Client;
+using Microsoft.Extensions.Options;
+using ScrapperAPI.AgentGrpc;
+
+namespace VoyagerAgent;
+
+public sealed class GrpcAgentClient
+{
+ private readonly AgentClientOptions _opts;
+
+ public GrpcAgentClient(IOptions options)
+ {
+ _opts = options.Value;
+ }
+
+ public AgentService.AgentServiceClient CreateClient()
+ {
+ var handler = new HttpClientHandler();
+
+ if (!string.IsNullOrWhiteSpace(_opts.ClientCertificatePath))
+ {
+ var cert = new X509Certificate2(_opts.ClientCertificatePath, _opts.ClientCertificatePassword);
+ handler.ClientCertificates.Add(cert);
+ }
+
+ if (_opts.InsecureSkipServerCertificateValidation)
+ {
+ handler.ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator;
+ }
+
+ var httpClient = new HttpClient(handler);
+ var channel = GrpcChannel.ForAddress(_opts.CentralGrpcAddress, new GrpcChannelOptions { HttpClient = httpClient });
+ return new AgentService.AgentServiceClient(channel);
+ }
+}
diff --git a/VoyagerAgent/Program.cs b/VoyagerAgent/Program.cs
new file mode 100644
index 0000000..3f65572
--- /dev/null
+++ b/VoyagerAgent/Program.cs
@@ -0,0 +1,19 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+
+namespace VoyagerAgent;
+
+public static class Program
+{
+ public static void Main(string[] args)
+ {
+ var builder = Host.CreateApplicationBuilder(args);
+
+ builder.Services.Configure(builder.Configuration.GetSection("Agent"));
+ builder.Services.AddSingleton();
+ builder.Services.AddHostedService();
+
+ var host = builder.Build();
+ host.Run();
+ }
+}
diff --git a/VoyagerAgent/VoyagerAgent.csproj b/VoyagerAgent/VoyagerAgent.csproj
new file mode 100644
index 0000000..c3e2a1d
--- /dev/null
+++ b/VoyagerAgent/VoyagerAgent.csproj
@@ -0,0 +1,30 @@
+
+
+ net10.0
+ enable
+ enable
+
+
+
+
+
+
+ all
+
+
+
+
+
+ all
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/VoyagerAgent/appsettings.json b/VoyagerAgent/appsettings.json
new file mode 100644
index 0000000..37df65f
--- /dev/null
+++ b/VoyagerAgent/appsettings.json
@@ -0,0 +1,13 @@
+{
+ "Agent": {
+ "AgentId": "agent-01",
+ "DisplayName": "Edge Worker 01",
+ "CentralGrpcAddress": "http://localhost:5001",
+ "SessionIds": [1],
+ "Capacity": 25,
+ "ClientCertificatePath": "",
+ "ClientCertificatePassword": "",
+ "InsecureSkipServerCertificateValidation": true,
+ "PollDelayMs": 1500
+ }
+}