using System.Collections.Concurrent; using System.Threading.Channels; using Microsoft.Extensions.Options; using ScrapperAPI.Enums; using ScrapperAPI.Interfaces; using ScrapperAPI.Options; using ScrapperAPI.Records; namespace ScrapperAPI.Workers; public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator { private readonly IServiceScopeFactory _scopeFactory; private readonly IHttpClientFactory _httpClientFactory; private readonly ILogger _logger; private readonly IScraperHttpClient _scraperHttp; private readonly IScrapeEventBus _events; private readonly ScraperOptions _opts; private readonly Channel _startRequests = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); private readonly ConcurrentDictionary _runners = new(); private static readonly ThreadLocal _rng = new(() => new Random()); public ScrapeCoordinator( IServiceScopeFactory scopeFactory, IHttpClientFactory httpClientFactory, ILogger logger, IOptions options, IScraperHttpClient scraperHttp, IScrapeEventBus events) { _scopeFactory = scopeFactory; _httpClientFactory = httpClientFactory; _logger = logger; _opts = options.Value; _scraperHttp = scraperHttp; _events = events; } public async Task StartAsync(int sessionId, CancellationToken ct = default) { var runner = _runners.GetOrAdd(sessionId, id => new Runner(id)); runner.RequestStart(); await _events.PublishAsync(new ScrapeEvent( ScrapeEventType.SessionStarted, sessionId, DateTimeOffset.UtcNow ), ct); await _startRequests.Writer.WriteAsync(sessionId, ct); } public Task StopAsync(int sessionId) { if (_runners.TryGetValue(sessionId, out var runner)) { runner.RequestStop(); _ = _events.PublishAsync(new ScrapeEvent( ScrapeEventType.SessionStopRequested, sessionId, DateTimeOffset.UtcNow )); } return Task.CompletedTask; } public ScrapeRuntimeStatus GetRuntimeStatus(int sessionId) { if (!_runners.TryGetValue(sessionId, out var r)) return new(sessionId, false, false, null, null, null); return new(sessionId, r.IsRunning, r.StopRequested, r.CurrentQueueId, r.CurrentUrl, r.CurrentStartedAt); } public IReadOnlyCollection ListRunningSessions() => _runners.Values .Where(r => r.IsRunning) .Select(r => new ScrapeRuntimeStatus(r.SessionId, r.IsRunning, r.StopRequested, r.CurrentQueueId, r.CurrentUrl, r.CurrentStartedAt)) .ToList(); protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("ScrapeCoordinator started."); while (!stoppingToken.IsCancellationRequested) { int sessionId; try { sessionId = await _startRequests.Reader.ReadAsync(stoppingToken); } catch (OperationCanceledException) { break; } var runner = _runners.GetOrAdd(sessionId, id => new Runner(id)); runner.RequestStart(); _ = RunSessionLoopAsync(runner, stoppingToken); } } private Task PoliteDelayAsync(CancellationToken ct) { var min = _opts.DelayMinMs; var max = _opts.DelayMaxMs; if (min < 0) min = 0; if (max < min) max = min; var delayMs = Random.Shared.Next(min, max + 1); return Task.Delay(delayMs, ct); } private async Task RunSessionLoopAsync(Runner runner, CancellationToken hostToken) { if (!runner.TryEnterLoop()) return; runner.MarkRunning(true); try { var http = _httpClientFactory.CreateClient("scraper"); while (!hostToken.IsCancellationRequested) { // STOP GRACIOSO: não pega próxima URL if (runner.StopRequested) break; // cria scope (repos scoped vivem aqui dentro) using var scope = _scopeFactory.CreateScope(); var queue = scope.ServiceProvider.GetRequiredService(); var content = scope.ServiceProvider.GetRequiredService(); var item = await queue.TryDequeueAsync(runner.SessionId, hostToken); if (item is null) break; runner.SetCurrent(item.Id, item.Url); await _events.PublishAsync(new ScrapeEvent( ScrapeEventType.ItemStarted, runner.SessionId, DateTimeOffset.UtcNow, QueueId: item.Id, Url: item.Url ), hostToken); try { var html = await _scraperHttp.GetStringWithRetryAsync(item.Url, hostToken); await content.SaveAsync(item.Id, html, hostToken); await queue.MarkDoneAsync(item.Id, hostToken); await _events.PublishAsync(new ScrapeEvent( ScrapeEventType.ItemSucceeded, runner.SessionId, DateTimeOffset.UtcNow, QueueId: item.Id, Url: item.Url ), hostToken); } catch (Exception ex) { await queue.MarkFailedAsync(item.Id, Truncate(ex.ToString(), 8000), hostToken); await _events.PublishAsync(new ScrapeEvent( ScrapeEventType.ItemFailed, runner.SessionId, DateTimeOffset.UtcNow, QueueId: item.Id, Url: item.Url, Error: ex.Message ), hostToken); } finally { // progresso (snapshot do DB) + percent var counts = await queue.GetCountsAsync(runner.SessionId, hostToken); var percent = counts.Total == 0 ? 0 : (double)counts.Done * 100.0 / (double)counts.Total; await _events.PublishAsync(new ScrapeEvent( ScrapeEventType.Progress, runner.SessionId, DateTimeOffset.UtcNow, Total: counts.Total, Done: counts.Done, Pending: counts.Pending, Processing: counts.Processing, Failed: counts.Failed, Percent: percent ), hostToken); runner.ClearCurrent(); if (!runner.StopRequested && !hostToken.IsCancellationRequested) await PoliteDelayAsync(hostToken); } } } finally { runner.MarkRunning(false); await _events.PublishAsync(new ScrapeEvent( ScrapeEventType.SessionStopped, runner.SessionId, DateTimeOffset.UtcNow ), hostToken); runner.ExitLoop(); } } private static async Task FetchHtmlAsync(HttpClient http, string url, CancellationToken ct) { using var req = new HttpRequestMessage(HttpMethod.Get, url); req.Headers.UserAgent.ParseAdd("webscrapper/1.0"); using var resp = await http.SendAsync(req, HttpCompletionOption.ResponseHeadersRead, ct); resp.EnsureSuccessStatusCode(); return await resp.Content.ReadAsStringAsync(ct); } private static string Truncate(string s, int max) => s.Length <= max ? s : s[..max]; private sealed class Runner { private int _loopEntered; public int SessionId { get; } public bool IsRunning { get; private set; } public bool StopRequested { get; private set; } public int? CurrentQueueId { get; private set; } public string? CurrentUrl { get; private set; } public DateTimeOffset? CurrentStartedAt { get; private set; } public Runner(int sessionId) => SessionId = sessionId; public void RequestStart() => StopRequested = false; public void RequestStop() => StopRequested = true; public bool TryEnterLoop() => Interlocked.CompareExchange(ref _loopEntered, 1, 0) == 0; public void ExitLoop() => Interlocked.Exchange(ref _loopEntered, 0); public void MarkRunning(bool running) => IsRunning = running; public void SetCurrent(int queueId, string url) { CurrentQueueId = queueId; CurrentUrl = url; CurrentStartedAt = DateTimeOffset.UtcNow; } public void ClearCurrent() { CurrentQueueId = null; CurrentUrl = null; CurrentStartedAt = null; } } }