using System.Collections.Concurrent; using System.Threading.Channels; using Microsoft.Extensions.Options; using ScrapperAPI.Enums; using ScrapperAPI.Interfaces; using ScrapperAPI.Options; using ScrapperAPI.Records; namespace ScrapperAPI.Workers; public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator { private readonly IServiceScopeFactory _scopeFactory; private readonly IHttpClientFactory _httpClientFactory; private readonly ILogger _logger; private readonly IScraperHttpClient _scraperHttp; private readonly IScrapeEventBus _events; private readonly ScraperOptions _opts; private readonly WorkerOptions _workerOpts; private readonly Channel _startRequests = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); private readonly ConcurrentDictionary _runners = new(); private static readonly ThreadLocal _rng = new(() => new Random()); public ScrapeCoordinator( IServiceScopeFactory scopeFactory, IHttpClientFactory httpClientFactory, ILogger logger, IOptions options, IOptions workerOptions, IScraperHttpClient scraperHttp, IScrapeEventBus events) { _scopeFactory = scopeFactory; _httpClientFactory = httpClientFactory; _logger = logger; _opts = options.Value; _workerOpts = workerOptions.Value; _scraperHttp = scraperHttp; _events = events; } public async Task StartAsync(int sessionId, CancellationToken ct = default) { var runner = _runners.GetOrAdd(sessionId, id => new Runner(id)); runner.RequestStart(); await _events.PublishAsync(new ScrapeEvent( ScrapeEventType.SessionStarted, sessionId, DateTimeOffset.UtcNow ), ct); await _startRequests.Writer.WriteAsync(sessionId, ct); } public Task StopAsync(int sessionId) { if (_runners.TryGetValue(sessionId, out var runner)) { runner.RequestStop(); _ = _events.PublishAsync(new ScrapeEvent( ScrapeEventType.SessionStopRequested, sessionId, DateTimeOffset.UtcNow )); } return Task.CompletedTask; } public ScrapeRuntimeStatus GetRuntimeStatus(int sessionId) { if (!_runners.TryGetValue(sessionId, out var r)) return new(sessionId, false, false, null, null, null); return new(sessionId, r.IsRunning, r.StopRequested, r.CurrentQueueId, r.CurrentUrl, r.CurrentStartedAt); } public IReadOnlyCollection ListRunningSessions() => _runners.Values .Where(r => r.IsRunning) .Select(r => new ScrapeRuntimeStatus(r.SessionId, r.IsRunning, r.StopRequested, r.CurrentQueueId, r.CurrentUrl, r.CurrentStartedAt)) .ToList(); protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("ScrapeCoordinator started."); while (!stoppingToken.IsCancellationRequested) { int sessionId; try { sessionId = await _startRequests.Reader.ReadAsync(stoppingToken); } catch (OperationCanceledException) { break; } var runner = _runners.GetOrAdd(sessionId, id => new Runner(id)); runner.RequestStart(); _ = RunSessionLoopAsync(runner, stoppingToken); } } private Task PoliteDelayAsync(CancellationToken ct) { var min = _opts.DelayMinMs; var max = _opts.DelayMaxMs; if (min < 0) min = 0; if (max < min) max = min; var delayMs = Random.Shared.Next(min, max + 1); return Task.Delay(delayMs, ct); } private async Task RunSessionLoopAsync(Runner runner, CancellationToken hostToken) { if (!runner.TryEnterLoop()) return; runner.MarkRunning(true); try { if (!_workerOpts.Local.Enabled) return; var http = _httpClientFactory.CreateClient("scraper"); // When PreferAgents: only run local if no agent was recently seen. while (!hostToken.IsCancellationRequested) { if (_workerOpts.Mode == DistributedMode.PreferAgents) { var noAgents = await NoAgentsRecentlySeenAsync(_workerOpts.Local.PreferAgentsGraceSeconds, hostToken); if (!noAgents) { await Task.Delay(TimeSpan.FromSeconds(2), hostToken); continue; } } // STOP GRACIOSO: não pega próxima URL if (runner.StopRequested) break; var concurrency = Math.Max(1, _workerOpts.Local.Concurrency); var leaseFor = TimeSpan.FromSeconds(Math.Max(5, _workerOpts.LeaseSeconds)); var tasks = Enumerable.Range(0, concurrency) .Select(i => RunLocalWorkerLoopAsync(runner, workerId: $"local:{Environment.MachineName}:{i}", leaseFor, hostToken)) .ToArray(); await Task.WhenAll(tasks); break; // no more work (or stop requested) } } finally { runner.MarkRunning(false); await _events.PublishAsync(new ScrapeEvent( ScrapeEventType.SessionStopped, runner.SessionId, DateTimeOffset.UtcNow ), hostToken); runner.ExitLoop(); } } private async Task RunLocalWorkerLoopAsync(Runner runner, string workerId, TimeSpan leaseFor, CancellationToken hostToken) { while (!hostToken.IsCancellationRequested && !runner.StopRequested) { using var scope = _scopeFactory.CreateScope(); var queue = scope.ServiceProvider.GetRequiredService(); var content = scope.ServiceProvider.GetRequiredService(); var item = await queue.TryDequeueAsync(runner.SessionId, workerId, leaseFor, hostToken); if (item is null) return; runner.SetCurrent(item.Id, item.Url); await _events.PublishAsync(new ScrapeEvent( ScrapeEventType.ItemStarted, runner.SessionId, DateTimeOffset.UtcNow, QueueId: item.Id, Url: item.Url ), hostToken); try { var html = await _scraperHttp.GetStringWithRetryAsync(item.Url, hostToken); await content.SaveAsync(item.Id, html, hostToken); await queue.MarkDoneAsync(item.Id, workerId, hostToken); await _events.PublishAsync(new ScrapeEvent( ScrapeEventType.ItemSucceeded, runner.SessionId, DateTimeOffset.UtcNow, QueueId: item.Id, Url: item.Url ), hostToken); } catch (Exception ex) { await queue.MarkFailedAsync(item.Id, workerId, Truncate(ex.ToString(), 8000), hostToken); await _events.PublishAsync(new ScrapeEvent( ScrapeEventType.ItemFailed, runner.SessionId, DateTimeOffset.UtcNow, QueueId: item.Id, Url: item.Url, Error: ex.Message ), hostToken); } finally { var counts = await queue.GetCountsAsync(runner.SessionId, hostToken); var percent = counts.Total == 0 ? 0 : (double)counts.Done * 100.0 / (double)counts.Total; await _events.PublishAsync(new ScrapeEvent( ScrapeEventType.Progress, runner.SessionId, DateTimeOffset.UtcNow, Total: counts.Total, Done: counts.Done, Pending: counts.Pending, Processing: counts.Processing, Failed: counts.Failed, Percent: percent ), hostToken); runner.ClearCurrent(); if (!runner.StopRequested && !hostToken.IsCancellationRequested) await PoliteDelayAsync(hostToken); } } } private async Task NoAgentsRecentlySeenAsync(int withinSeconds, CancellationToken ct) { try { using var scope = _scopeFactory.CreateScope(); var agents = scope.ServiceProvider.GetRequiredService(); var active = await agents.CountActiveAsync(TimeSpan.FromSeconds(Math.Max(1, withinSeconds)), ct); return active == 0; } catch { // If agents table isn't configured yet, default to "no agents". return true; } } private static async Task FetchHtmlAsync(HttpClient http, string url, CancellationToken ct) { using var req = new HttpRequestMessage(HttpMethod.Get, url); req.Headers.UserAgent.ParseAdd("webscrapper/1.0"); using var resp = await http.SendAsync(req, HttpCompletionOption.ResponseHeadersRead, ct); resp.EnsureSuccessStatusCode(); return await resp.Content.ReadAsStringAsync(ct); } private static string Truncate(string s, int max) => s.Length <= max ? s : s[..max]; private sealed class Runner { private int _loopEntered; public int SessionId { get; } public bool IsRunning { get; private set; } public bool StopRequested { get; private set; } public int? CurrentQueueId { get; private set; } public string? CurrentUrl { get; private set; } public DateTimeOffset? CurrentStartedAt { get; private set; } public Runner(int sessionId) => SessionId = sessionId; public void RequestStart() => StopRequested = false; public void RequestStop() => StopRequested = true; public bool TryEnterLoop() => Interlocked.CompareExchange(ref _loopEntered, 1, 0) == 0; public void ExitLoop() => Interlocked.Exchange(ref _loopEntered, 0); public void MarkRunning(bool running) => IsRunning = running; public void SetCurrent(int queueId, string url) { CurrentQueueId = queueId; CurrentUrl = url; CurrentStartedAt = DateTimeOffset.UtcNow; } public void ClearCurrent() { CurrentQueueId = null; CurrentUrl = null; CurrentStartedAt = null; } } }