1
0
voyager-api/VoyagerAgent/AgentWorker.cs

155 lines
5.6 KiB
C#

using System.IO.Compression;
using System.Text;
using Grpc.Core;
using Microsoft.Extensions.Options;
using ScrapperAPI.AgentGrpc;
namespace VoyagerAgent;
public sealed class AgentWorker : BackgroundService
{
private readonly ILogger<AgentWorker> _logger;
private readonly AgentClientOptions _opts;
private readonly GrpcAgentClient _grpc;
public AgentWorker(ILogger<AgentWorker> logger, IOptions<AgentClientOptions> opts, GrpcAgentClient grpc)
{
_logger = logger;
_opts = opts.Value;
_grpc = grpc;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (_opts.SessionIds.Length == 0)
{
_logger.LogWarning("No Agent:SessionIds configured. Agent will idle.");
}
var client = _grpc.CreateClient();
await TryRegisterAsync(client, stoppingToken);
using var http = new HttpClient { Timeout = TimeSpan.FromSeconds(30) };
http.DefaultRequestHeaders.UserAgent.ParseAdd("voyager-agent/1.0");
while (!stoppingToken.IsCancellationRequested)
{
var didWork = false;
foreach (var sessionId in _opts.SessionIds)
{
if (stoppingToken.IsCancellationRequested) break;
try
{
var lease = await client.LeaseWorkAsync(new LeaseWorkRequest
{
AgentId = _opts.AgentId,
SessionId = sessionId,
Capacity = _opts.Capacity
}, cancellationToken: stoppingToken);
if (lease.Items.Count == 0)
continue;
didWork = true;
foreach (var item in lease.Items)
{
if (stoppingToken.IsCancellationRequested) break;
try
{
var html = await http.GetStringAsync(item.Url, stoppingToken);
var compressed = GzipCompressUtf8(html, CompressionLevel.Fastest, out var origLen);
var submit = await client.SubmitResultAsync(new SubmitResultRequest
{
QueueId = item.QueueId,
AgentId = _opts.AgentId,
Success = true,
ContentEncoding = "gzip",
ContentBytes = Google.Protobuf.ByteString.CopyFrom(compressed),
OriginalLength = origLen,
CompressedLength = compressed.Length
}, cancellationToken: stoppingToken);
if (!submit.Ok)
_logger.LogWarning("SubmitResult not ok for queue {QueueId}: {Message}", item.QueueId, submit.Message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Scrape failed for {Url}", item.Url);
try
{
await client.SubmitResultAsync(new SubmitResultRequest
{
QueueId = item.QueueId,
AgentId = _opts.AgentId,
Success = false,
Error = ex.Message
}, cancellationToken: stoppingToken);
}
catch (Exception inner)
{
_logger.LogError(inner, "Failed to submit failure status for queue {QueueId}", item.QueueId);
}
}
}
}
catch (RpcException rpc)
{
_logger.LogWarning("gRPC error: {Status} {Detail}", rpc.StatusCode, rpc.Status.Detail);
}
catch (Exception ex)
{
_logger.LogError(ex, "Unhandled error while leasing work.");
}
}
// heartbeat (best-effort)
try
{
await client.HeartbeatAsync(new HeartbeatRequest { AgentId = _opts.AgentId }, cancellationToken: stoppingToken);
}
catch { /* ignore */ }
if (!didWork)
await Task.Delay(_opts.PollDelayMs, stoppingToken);
}
}
private async Task TryRegisterAsync(AgentService.AgentServiceClient client, CancellationToken ct)
{
try
{
await client.RegisterAgentAsync(new RegisterAgentRequest
{
AgentId = _opts.AgentId,
DisplayName = _opts.DisplayName ?? string.Empty
}, cancellationToken: ct);
_logger.LogInformation("Agent registered as {AgentId}", _opts.AgentId);
}
catch (RpcException rpc)
{
_logger.LogWarning("RegisterAgent failed: {Status} {Detail}", rpc.StatusCode, rpc.Status.Detail);
}
}
private static byte[] GzipCompressUtf8(string content, CompressionLevel level, out int originalLength)
{
var bytes = Encoding.UTF8.GetBytes(content);
originalLength = bytes.Length;
using var ms = new MemoryStream();
using (var gzip = new GZipStream(ms, level, leaveOpen: true))
{
gzip.Write(bytes, 0, bytes.Length);
}
return ms.ToArray();
}
}