308 lines
11 KiB
C#
308 lines
11 KiB
C#
using System.Collections.Concurrent;
|
|
using System.Threading.Channels;
|
|
using Microsoft.Extensions.Options;
|
|
using ScrapperAPI.Enums;
|
|
using ScrapperAPI.Interfaces;
|
|
using ScrapperAPI.Options;
|
|
using ScrapperAPI.Records;
|
|
|
|
namespace ScrapperAPI.Workers;
|
|
|
|
public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator
|
|
{
|
|
private readonly IServiceScopeFactory _scopeFactory;
|
|
private readonly IHttpClientFactory _httpClientFactory;
|
|
private readonly ILogger<ScrapeCoordinator> _logger;
|
|
private readonly IScraperHttpClient _scraperHttp;
|
|
private readonly IScrapeEventBus _events;
|
|
private readonly ScraperOptions _opts;
|
|
private readonly WorkerOptions _workerOpts;
|
|
|
|
private readonly Channel<int> _startRequests = Channel.CreateUnbounded<int>(
|
|
new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
|
|
|
|
private readonly ConcurrentDictionary<int, Runner> _runners = new();
|
|
|
|
private static readonly ThreadLocal<Random> _rng =
|
|
new(() => new Random());
|
|
|
|
public ScrapeCoordinator(
|
|
IServiceScopeFactory scopeFactory,
|
|
IHttpClientFactory httpClientFactory,
|
|
ILogger<ScrapeCoordinator> logger,
|
|
IOptions<ScraperOptions> options,
|
|
IOptions<WorkerOptions> workerOptions,
|
|
IScraperHttpClient scraperHttp,
|
|
IScrapeEventBus events)
|
|
{
|
|
_scopeFactory = scopeFactory;
|
|
_httpClientFactory = httpClientFactory;
|
|
_logger = logger;
|
|
_opts = options.Value;
|
|
_workerOpts = workerOptions.Value;
|
|
_scraperHttp = scraperHttp;
|
|
_events = events;
|
|
}
|
|
|
|
public async Task StartAsync(int sessionId, CancellationToken ct = default)
|
|
{
|
|
var runner = _runners.GetOrAdd(sessionId, id => new Runner(id));
|
|
runner.RequestStart();
|
|
|
|
await _events.PublishAsync(new ScrapeEvent(
|
|
ScrapeEventType.SessionStarted, sessionId, DateTimeOffset.UtcNow
|
|
), ct);
|
|
|
|
await _startRequests.Writer.WriteAsync(sessionId, ct);
|
|
}
|
|
|
|
public Task StopAsync(int sessionId)
|
|
{
|
|
if (_runners.TryGetValue(sessionId, out var runner))
|
|
{
|
|
runner.RequestStop();
|
|
_ = _events.PublishAsync(new ScrapeEvent(
|
|
ScrapeEventType.SessionStopRequested, sessionId, DateTimeOffset.UtcNow
|
|
));
|
|
}
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public ScrapeRuntimeStatus GetRuntimeStatus(int sessionId)
|
|
{
|
|
if (!_runners.TryGetValue(sessionId, out var r))
|
|
return new(sessionId, false, false, null, null, null);
|
|
|
|
return new(sessionId, r.IsRunning, r.StopRequested, r.CurrentQueueId, r.CurrentUrl, r.CurrentStartedAt);
|
|
}
|
|
|
|
public IReadOnlyCollection<ScrapeRuntimeStatus> ListRunningSessions()
|
|
=> _runners.Values
|
|
.Where(r => r.IsRunning)
|
|
.Select(r => new ScrapeRuntimeStatus(r.SessionId, r.IsRunning, r.StopRequested, r.CurrentQueueId, r.CurrentUrl, r.CurrentStartedAt))
|
|
.ToList();
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
_logger.LogInformation("ScrapeCoordinator started.");
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
int sessionId;
|
|
try
|
|
{
|
|
sessionId = await _startRequests.Reader.ReadAsync(stoppingToken);
|
|
}
|
|
catch (OperationCanceledException) { break; }
|
|
|
|
var runner = _runners.GetOrAdd(sessionId, id => new Runner(id));
|
|
runner.RequestStart();
|
|
|
|
_ = RunSessionLoopAsync(runner, stoppingToken);
|
|
}
|
|
}
|
|
|
|
private Task PoliteDelayAsync(CancellationToken ct)
|
|
{
|
|
var min = _opts.DelayMinMs;
|
|
var max = _opts.DelayMaxMs;
|
|
|
|
if (min < 0) min = 0;
|
|
if (max < min) max = min;
|
|
|
|
var delayMs = Random.Shared.Next(min, max + 1);
|
|
return Task.Delay(delayMs, ct);
|
|
}
|
|
|
|
private async Task RunSessionLoopAsync(Runner runner, CancellationToken hostToken)
|
|
{
|
|
if (!runner.TryEnterLoop())
|
|
return;
|
|
|
|
runner.MarkRunning(true);
|
|
|
|
try
|
|
{
|
|
if (!_workerOpts.Local.Enabled)
|
|
return;
|
|
|
|
var http = _httpClientFactory.CreateClient("scraper");
|
|
|
|
// When PreferAgents: only run local if no agent was recently seen.
|
|
while (!hostToken.IsCancellationRequested)
|
|
{
|
|
if (_workerOpts.Mode == DistributedMode.PreferAgents)
|
|
{
|
|
var noAgents = await NoAgentsRecentlySeenAsync(_workerOpts.Local.PreferAgentsGraceSeconds, hostToken);
|
|
if (!noAgents)
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(2), hostToken);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// STOP GRACIOSO: não pega próxima URL
|
|
if (runner.StopRequested)
|
|
break;
|
|
var concurrency = Math.Max(1, _workerOpts.Local.Concurrency);
|
|
var leaseFor = TimeSpan.FromSeconds(Math.Max(5, _workerOpts.LeaseSeconds));
|
|
|
|
var tasks = Enumerable.Range(0, concurrency)
|
|
.Select(i => RunLocalWorkerLoopAsync(runner, workerId: $"local:{Environment.MachineName}:{i}", leaseFor, hostToken))
|
|
.ToArray();
|
|
|
|
await Task.WhenAll(tasks);
|
|
break; // no more work (or stop requested)
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
runner.MarkRunning(false);
|
|
await _events.PublishAsync(new ScrapeEvent(
|
|
ScrapeEventType.SessionStopped,
|
|
runner.SessionId,
|
|
DateTimeOffset.UtcNow
|
|
), hostToken);
|
|
runner.ExitLoop();
|
|
}
|
|
}
|
|
|
|
private async Task RunLocalWorkerLoopAsync(Runner runner, string workerId, TimeSpan leaseFor, CancellationToken hostToken)
|
|
{
|
|
while (!hostToken.IsCancellationRequested && !runner.StopRequested)
|
|
{
|
|
using var scope = _scopeFactory.CreateScope();
|
|
var queue = scope.ServiceProvider.GetRequiredService<IQueueRepository>();
|
|
var content = scope.ServiceProvider.GetRequiredService<IContentRepository>();
|
|
|
|
var item = await queue.TryDequeueAsync(runner.SessionId, workerId, leaseFor, hostToken);
|
|
if (item is null)
|
|
return;
|
|
|
|
runner.SetCurrent(item.Id, item.Url);
|
|
|
|
await _events.PublishAsync(new ScrapeEvent(
|
|
ScrapeEventType.ItemStarted,
|
|
runner.SessionId,
|
|
DateTimeOffset.UtcNow,
|
|
QueueId: item.Id,
|
|
Url: item.Url
|
|
), hostToken);
|
|
|
|
try
|
|
{
|
|
var html = await _scraperHttp.GetStringWithRetryAsync(item.Url, hostToken);
|
|
|
|
await content.SaveAsync(item.Id, html, hostToken);
|
|
await queue.MarkDoneAsync(item.Id, workerId, hostToken);
|
|
|
|
await _events.PublishAsync(new ScrapeEvent(
|
|
ScrapeEventType.ItemSucceeded,
|
|
runner.SessionId,
|
|
DateTimeOffset.UtcNow,
|
|
QueueId: item.Id,
|
|
Url: item.Url
|
|
), hostToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
await queue.MarkFailedAsync(item.Id, workerId, Truncate(ex.ToString(), 8000), hostToken);
|
|
|
|
await _events.PublishAsync(new ScrapeEvent(
|
|
ScrapeEventType.ItemFailed,
|
|
runner.SessionId,
|
|
DateTimeOffset.UtcNow,
|
|
QueueId: item.Id,
|
|
Url: item.Url,
|
|
Error: ex.Message
|
|
), hostToken);
|
|
}
|
|
finally
|
|
{
|
|
var counts = await queue.GetCountsAsync(runner.SessionId, hostToken);
|
|
var percent = counts.Total == 0 ? 0 : (double)counts.Done * 100.0 / (double)counts.Total;
|
|
|
|
await _events.PublishAsync(new ScrapeEvent(
|
|
ScrapeEventType.Progress,
|
|
runner.SessionId,
|
|
DateTimeOffset.UtcNow,
|
|
Total: counts.Total,
|
|
Done: counts.Done,
|
|
Pending: counts.Pending,
|
|
Processing: counts.Processing,
|
|
Failed: counts.Failed,
|
|
Percent: percent
|
|
), hostToken);
|
|
|
|
runner.ClearCurrent();
|
|
|
|
if (!runner.StopRequested && !hostToken.IsCancellationRequested)
|
|
await PoliteDelayAsync(hostToken);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task<bool> NoAgentsRecentlySeenAsync(int withinSeconds, CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
using var scope = _scopeFactory.CreateScope();
|
|
var agents = scope.ServiceProvider.GetRequiredService<IAgentRepository>();
|
|
var active = await agents.CountActiveAsync(TimeSpan.FromSeconds(Math.Max(1, withinSeconds)), ct);
|
|
return active == 0;
|
|
}
|
|
catch
|
|
{
|
|
// If agents table isn't configured yet, default to "no agents".
|
|
return true;
|
|
}
|
|
}
|
|
|
|
private static async Task<string> FetchHtmlAsync(HttpClient http, string url, CancellationToken ct)
|
|
{
|
|
using var req = new HttpRequestMessage(HttpMethod.Get, url);
|
|
req.Headers.UserAgent.ParseAdd("webscrapper/1.0");
|
|
using var resp = await http.SendAsync(req, HttpCompletionOption.ResponseHeadersRead, ct);
|
|
resp.EnsureSuccessStatusCode();
|
|
return await resp.Content.ReadAsStringAsync(ct);
|
|
}
|
|
|
|
private static string Truncate(string s, int max) => s.Length <= max ? s : s[..max];
|
|
|
|
private sealed class Runner
|
|
{
|
|
private int _loopEntered;
|
|
|
|
public int SessionId { get; }
|
|
public bool IsRunning { get; private set; }
|
|
public bool StopRequested { get; private set; }
|
|
|
|
public int? CurrentQueueId { get; private set; }
|
|
public string? CurrentUrl { get; private set; }
|
|
public DateTimeOffset? CurrentStartedAt { get; private set; }
|
|
|
|
public Runner(int sessionId) => SessionId = sessionId;
|
|
|
|
public void RequestStart() => StopRequested = false;
|
|
public void RequestStop() => StopRequested = true;
|
|
|
|
public bool TryEnterLoop() => Interlocked.CompareExchange(ref _loopEntered, 1, 0) == 0;
|
|
public void ExitLoop() => Interlocked.Exchange(ref _loopEntered, 0);
|
|
public void MarkRunning(bool running) => IsRunning = running;
|
|
|
|
public void SetCurrent(int queueId, string url)
|
|
{
|
|
CurrentQueueId = queueId;
|
|
CurrentUrl = url;
|
|
CurrentStartedAt = DateTimeOffset.UtcNow;
|
|
}
|
|
|
|
public void ClearCurrent()
|
|
{
|
|
CurrentQueueId = null;
|
|
CurrentUrl = null;
|
|
CurrentStartedAt = null;
|
|
}
|
|
}
|
|
}
|