commit e8f06e46f024da4fb6ca8e56456c8d2b55a64d1f Author: nicoeri Date: Wed Dec 24 16:43:24 2025 -0300 Implement initial version of web scraping API. diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..38bece4 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,25 @@ +**/.dockerignore +**/.env +**/.git +**/.gitignore +**/.project +**/.settings +**/.toolstarget +**/.vs +**/.vscode +**/.idea +**/*.*proj.user +**/*.dbmdl +**/*.jfm +**/azds.yaml +**/bin +**/charts +**/docker-compose* +**/Dockerfile* +**/node_modules +**/npm-debug.log +**/obj +**/secrets.dev.yaml +**/values.dev.yaml +LICENSE +README.md \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..add57be --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +bin/ +obj/ +/packages/ +riderModule.iml +/_ReSharper.Caches/ \ No newline at end of file diff --git a/ScrapperAPI/Bus/SignalRScrapeEventBus.cs b/ScrapperAPI/Bus/SignalRScrapeEventBus.cs new file mode 100644 index 0000000..e45528e --- /dev/null +++ b/ScrapperAPI/Bus/SignalRScrapeEventBus.cs @@ -0,0 +1,37 @@ +using Microsoft.AspNetCore.SignalR; +using ScrapperAPI.Enums; +using ScrapperAPI.Hub; +using ScrapperAPI.Interfaces; +using ScrapperAPI.Records; + +namespace ScrapperAPI.Bus; + +public sealed class SignalRScrapeEventBus : IScrapeEventBus +{ + private readonly IHubContext _hub; + + public SignalRScrapeEventBus(IHubContext hub) => _hub = hub; + + public Task PublishAsync(ScrapeEvent ev, CancellationToken ct = default) + { + var tasks = new List(2); + + // Detalhes só para a sessão + if (ev.Type is ScrapeEventType.ItemStarted or ScrapeEventType.ItemSucceeded or ScrapeEventType.ItemFailed) + { + tasks.Add(_hub.Clients.Group(ScrapeHub.GroupName(ev.SessionId)) + .SendAsync("scrapeEvent", ev, ct)); + return Task.WhenAll(tasks); + } + + // Overview recebe eventos de "estado/progresso" + tasks.Add(_hub.Clients.Group(ScrapeHub.OverviewGroup) + .SendAsync("scrapeEvent", ev, ct)); + + // E a própria sessão também recebe (pra tela da sessão atualizar sem depender do overview) + tasks.Add(_hub.Clients.Group(ScrapeHub.GroupName(ev.SessionId)) + .SendAsync("scrapeEvent", ev, ct)); + + return Task.WhenAll(tasks); + } +} \ No newline at end of file diff --git a/ScrapperAPI/Controllers/ContentController.cs b/ScrapperAPI/Controllers/ContentController.cs new file mode 100644 index 0000000..7e9bca6 --- /dev/null +++ b/ScrapperAPI/Controllers/ContentController.cs @@ -0,0 +1,66 @@ +using Microsoft.AspNetCore.Mvc; +using ScrapperAPI.Interfaces; +using ScrapperAPI.Utils; + +namespace ScrapperAPI.Controllers; + +[ApiController] +public sealed class ContentController : ControllerBase +{ + private readonly IContentRepository _content; + + public ContentController(IContentRepository content) + { + _content = content; + } + + // ✅ Retorna HTML DESCOMPRIMIDO + // GET /queue/{queueId}/content + [HttpGet("queue/{queueId:int}/content")] + public async Task GetDecompressedHtml(int queueId, CancellationToken ct) + { + var row = await _content.GetCompressedByQueueIdAsync(queueId, ct); + if (row is null || row.ContentBytes is null || row.ContentBytes.Length == 0) + return NotFound(new { message = "Content not found for this queueId." }); + + if (!string.Equals(row.ContentEncoding, "gzip", StringComparison.OrdinalIgnoreCase)) + return StatusCode(415, new { message = $"Unsupported encoding: {row.ContentEncoding}" }); + + string html; + try + { + html = CompressionUtils.GzipDecompressUtf8(row.ContentBytes); + } + catch (Exception ex) + { + // Se o payload estiver corrompido/errado + return StatusCode(500, new { message = "Failed to decompress content.", error = ex.Message }); + } + + // Headers úteis pra debug + Response.Headers["X-Content-Id"] = row.Id.ToString(); + Response.Headers["X-Queue-Id"] = row.QueueId.ToString(); + Response.Headers["X-Content-Encoding"] = row.ContentEncoding; + if (row.OriginalLength is not null) Response.Headers["X-Original-Length"] = row.OriginalLength.Value.ToString(); + if (row.CompressedLength is not null) Response.Headers["X-Compressed-Length"] = row.CompressedLength.Value.ToString(); + + // Retorna como HTML (o browser / front consegue renderizar se quiser) + return Content(html, "text/html; charset=utf-8"); + } + + // (Opcional) debug: retorna descomprimido como texto + // GET /queue/{queueId}/content/raw + [HttpGet("queue/{queueId:int}/content/raw")] + public async Task GetDecompressedRaw(int queueId, CancellationToken ct) + { + var row = await _content.GetCompressedByQueueIdAsync(queueId, ct); + if (row is null || row.ContentBytes is null || row.ContentBytes.Length == 0) + return NotFound(new { message = "Content not found for this queueId." }); + + if (!string.Equals(row.ContentEncoding, "gzip", StringComparison.OrdinalIgnoreCase)) + return StatusCode(415, new { message = $"Unsupported encoding: {row.ContentEncoding}" }); + + var text = CompressionUtils.GzipDecompressUtf8(row.ContentBytes); + return Content(text, "text/plain; charset=utf-8"); + } +} \ No newline at end of file diff --git a/ScrapperAPI/Controllers/ScrapeController.cs b/ScrapperAPI/Controllers/ScrapeController.cs new file mode 100644 index 0000000..5004827 --- /dev/null +++ b/ScrapperAPI/Controllers/ScrapeController.cs @@ -0,0 +1,41 @@ +using Microsoft.AspNetCore.Mvc; +using ScrapperAPI.Interfaces; + +namespace ScrapperAPI.Controllers; + +[ApiController] +[Route("sessions/{sessionId:int}/scrap")] +public sealed class ScrapeController : ControllerBase +{ + private readonly IScrapeCoordinator _coord; + private readonly IQueueRepository _queue; + + public ScrapeController(IScrapeCoordinator coord, IQueueRepository queue) + { + _coord = coord; + _queue = queue; + } + + [HttpPost("start")] + public async Task Start(int sessionId, CancellationToken ct) + { + await _coord.StartAsync(sessionId, ct); + return Accepted(); + } + + [HttpPost("stop")] + public async Task Stop(int sessionId) + { + await _coord.StopAsync(sessionId); + return Accepted(); + } + + [HttpGet("status")] + public async Task Status(int sessionId, CancellationToken ct) + { + var runtime = _coord.GetRuntimeStatus(sessionId); + var counts = await _queue.GetCountsAsync(sessionId, ct); + + return Ok(new { runtime, counts }); + } +} diff --git a/ScrapperAPI/Controllers/ScrapeMonitoringController.cs b/ScrapperAPI/Controllers/ScrapeMonitoringController.cs new file mode 100644 index 0000000..1ea7a55 --- /dev/null +++ b/ScrapperAPI/Controllers/ScrapeMonitoringController.cs @@ -0,0 +1,54 @@ +using Microsoft.AspNetCore.Mvc; +using ScrapperAPI.Interfaces; + +namespace ScrapperAPI.Controllers; + +[ApiController] +[Route("scrap")] +public sealed class ScrapeMonitoringController : ControllerBase +{ + private readonly IScrapeCoordinator _coord; + private readonly IQueueRepository _queue; + + public ScrapeMonitoringController( + IScrapeCoordinator coord, + IQueueRepository queue) + { + _coord = coord; + _queue = queue; + } + + [HttpGet("running-sessions")] + public async Task ListRunningSessions(CancellationToken ct) + { + var running = _coord.ListRunningSessions(); + + // Opcional: enriquecer com progresso do banco + var result = new List(); + + foreach (var r in running) + { + var counts = await _queue.GetCountsAsync(r.SessionId, ct); + + result.Add(new + { + r.SessionId, + r.IsRunning, + r.StopRequested, + r.CurrentQueueId, + r.CurrentUrl, + r.CurrentStartedAt, + Progress = new + { + counts.Total, + counts.Pending, + counts.Processing, + counts.Done, + counts.Failed + } + }); + } + + return Ok(result); + } +} diff --git a/ScrapperAPI/Controllers/SessionsController.cs b/ScrapperAPI/Controllers/SessionsController.cs new file mode 100644 index 0000000..0ed2358 --- /dev/null +++ b/ScrapperAPI/Controllers/SessionsController.cs @@ -0,0 +1,83 @@ +using System.ComponentModel.DataAnnotations; +using Microsoft.AspNetCore.Mvc; +using ScrapperAPI.Dtos; +using ScrapperAPI.Interfaces; + +namespace ScrapperAPI.Controllers; + +public sealed record AddUrlRequest( + [Required, Url] string Url +); + +[ApiController] +[Route("sessions")] +public sealed class SessionsController : ControllerBase +{ + private readonly ISessionRepository _sessions; + private readonly IQueueRepository _queue; + private readonly IScrapeCoordinator _coord; + + public SessionsController( + ISessionRepository sessions, + IQueueRepository queue, + IScrapeCoordinator coord) + { + _sessions = sessions; + _queue = queue; + _coord = coord; + } + + // ✅ Adicionar URL + // POST /sessions/{sessionId}/queue + [HttpPost("/{sessionId:int}/queue")] + public async Task AddUrl(int sessionId, [FromBody] AddUrlRequest req, CancellationToken ct) + { + // (Opcional) valida se session existe + var session = await _sessions.FindByNameAsync( + name: (await _sessions.GetAllAsync(ct)).FirstOrDefault(s => s.Id == sessionId)?.Name ?? "", + ct: ct + ); + + // Melhor: crie um método GetByIdAsync no repo. Por enquanto: + if (session is null) + { + // Se você não quiser validar aqui, pode remover esse bloco. + // Eu recomendo validar. + } + + var id = await _queue.EnqueueAsync(sessionId, req.Url, ct); + return Created($"/sessions/{sessionId}/queue/{id}", new { id, sessionId, req.Url }); + } + + // ✅ Remover por ID (seguro) + // DELETE /sessions/{sessionId}/queue/{queueId} + [HttpDelete("/{sessionId:int}/queue/{queueId:int}")] + public async Task RemoveById(int sessionId, int queueId, CancellationToken ct) + { + var removed = await _queue.RemovePendingByIdAsync(sessionId, queueId, ct); + return removed ? NoContent() : NotFound(new { message = "Queue item not found (or not pending)." }); + } + + [HttpGet] + public async Task GetAllSessions(CancellationToken ct) + { + var allSessions = await _sessions.GetAllAsync(ct); + var result = new List(); + + foreach (var s in allSessions) + { + var counts = await _queue.GetCountsAsync(s.Id, ct); + var runtime = _coord.GetRuntimeStatus(s.Id); + + result.Add(new SessionOverviewDto( + SessionId: s.Id, + Name: s.Name, + IsRunning: runtime.IsRunning, + StopRequested: runtime.StopRequested, + Queue: counts + )); + } + + return Ok(result); + } +} \ No newline at end of file diff --git a/ScrapperAPI/Dockerfile b/ScrapperAPI/Dockerfile new file mode 100644 index 0000000..4b27fb5 --- /dev/null +++ b/ScrapperAPI/Dockerfile @@ -0,0 +1,23 @@ +FROM mcr.microsoft.com/dotnet/aspnet:10.0 AS base +USER $APP_UID +WORKDIR /app +EXPOSE 8080 +EXPOSE 8081 + +FROM mcr.microsoft.com/dotnet/sdk:10.0 AS build +ARG BUILD_CONFIGURATION=Release +WORKDIR /src +COPY ["ScrapperAPI/ScrapperAPI.csproj", "ScrapperAPI/"] +RUN dotnet restore "ScrapperAPI/ScrapperAPI.csproj" +COPY . . +WORKDIR "/src/ScrapperAPI" +RUN dotnet build "./ScrapperAPI.csproj" -c $BUILD_CONFIGURATION -o /app/build + +FROM build AS publish +ARG BUILD_CONFIGURATION=Release +RUN dotnet publish "./ScrapperAPI.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false + +FROM base AS final +WORKDIR /app +COPY --from=publish /app/publish . +ENTRYPOINT ["dotnet", "ScrapperAPI.dll"] diff --git a/ScrapperAPI/Dtos/ContentRow.cs b/ScrapperAPI/Dtos/ContentRow.cs new file mode 100644 index 0000000..dd209fd --- /dev/null +++ b/ScrapperAPI/Dtos/ContentRow.cs @@ -0,0 +1,8 @@ +namespace ScrapperAPI.Dtos; + +public sealed record ContentRow( + int Id, + int QueueId, + string Content, + DateTime CreatedDate +); \ No newline at end of file diff --git a/ScrapperAPI/Dtos/QueueCounts.cs b/ScrapperAPI/Dtos/QueueCounts.cs new file mode 100644 index 0000000..222f78a --- /dev/null +++ b/ScrapperAPI/Dtos/QueueCounts.cs @@ -0,0 +1,9 @@ +namespace ScrapperAPI.Dtos; + +public sealed record QueueCounts( + long Total, + long Pending, + long Processing, + long Done, + long Failed +); \ No newline at end of file diff --git a/ScrapperAPI/Dtos/QueueItem.cs b/ScrapperAPI/Dtos/QueueItem.cs new file mode 100644 index 0000000..7425c34 --- /dev/null +++ b/ScrapperAPI/Dtos/QueueItem.cs @@ -0,0 +1,13 @@ +namespace ScrapperAPI.Dtos; + +public sealed record QueueItem( + int Id, + int SessionId, + string Url, + short Status, + DateTime CreatedDate, + DateTime? StartedDate, + DateTime? FinishedDate, + int Attempts, + string? LastError +); \ No newline at end of file diff --git a/ScrapperAPI/Dtos/SessionOverviewDto.cs b/ScrapperAPI/Dtos/SessionOverviewDto.cs new file mode 100644 index 0000000..68c7711 --- /dev/null +++ b/ScrapperAPI/Dtos/SessionOverviewDto.cs @@ -0,0 +1,9 @@ +namespace ScrapperAPI.Dtos; + +public sealed record SessionOverviewDto( + int SessionId, + string Name, + bool IsRunning, + bool StopRequested, + QueueCounts Queue +); \ No newline at end of file diff --git a/ScrapperAPI/Dtos/SessionRow.cs b/ScrapperAPI/Dtos/SessionRow.cs new file mode 100644 index 0000000..81657e8 --- /dev/null +++ b/ScrapperAPI/Dtos/SessionRow.cs @@ -0,0 +1,6 @@ +namespace ScrapperAPI.Dtos; + +public sealed record SessionRow( + int Id, + string Name +); \ No newline at end of file diff --git a/ScrapperAPI/Enums/ScrapeEventType.cs b/ScrapperAPI/Enums/ScrapeEventType.cs new file mode 100644 index 0000000..ee3bf95 --- /dev/null +++ b/ScrapperAPI/Enums/ScrapeEventType.cs @@ -0,0 +1,12 @@ +namespace ScrapperAPI.Enums; + +public enum ScrapeEventType +{ + SessionStarted, + SessionStopRequested, + SessionStopped, + ItemStarted, + ItemSucceeded, + ItemFailed, + Progress +} \ No newline at end of file diff --git a/ScrapperAPI/Factories/NpgsqlConnectionFactory.cs b/ScrapperAPI/Factories/NpgsqlConnectionFactory.cs new file mode 100644 index 0000000..79773b9 --- /dev/null +++ b/ScrapperAPI/Factories/NpgsqlConnectionFactory.cs @@ -0,0 +1,20 @@ +using System.Data; +using Npgsql; +using ScrapperAPI.Interfaces; + +namespace ScrapperAPI.Factories; + +public sealed class NpgsqlConnectionFactory : IDbConnectionFactory +{ + private readonly string _cs; + + public NpgsqlConnectionFactory(IConfiguration cfg) + => _cs = cfg.GetConnectionString("Default")!; + + public async Task CreateOpenConnectionAsync(CancellationToken ct) + { + var conn = new NpgsqlConnection(_cs); + await conn.OpenAsync(ct); + return conn; + } +} \ No newline at end of file diff --git a/ScrapperAPI/Hub/ScrapeHub.cs b/ScrapperAPI/Hub/ScrapeHub.cs new file mode 100644 index 0000000..a8bd398 --- /dev/null +++ b/ScrapperAPI/Hub/ScrapeHub.cs @@ -0,0 +1,19 @@ +namespace ScrapperAPI.Hub; + +public sealed class ScrapeHub : Microsoft.AspNetCore.SignalR.Hub +{ + public Task Subscribe(int sessionId) + => Groups.AddToGroupAsync(Context.ConnectionId, GroupName(sessionId)); + + public Task Unsubscribe(int sessionId) + => Groups.RemoveFromGroupAsync(Context.ConnectionId, GroupName(sessionId)); + + public Task SubscribeOverview() + => Groups.AddToGroupAsync(Context.ConnectionId, OverviewGroup); + + public Task UnsubscribeOverview() + => Groups.RemoveFromGroupAsync(Context.ConnectionId, OverviewGroup); + + public static string GroupName(int sessionId) => $"session:{sessionId}"; + public const string OverviewGroup = "overview"; +} \ No newline at end of file diff --git a/ScrapperAPI/Interfaces/IContentRepository.cs b/ScrapperAPI/Interfaces/IContentRepository.cs new file mode 100644 index 0000000..57ed479 --- /dev/null +++ b/ScrapperAPI/Interfaces/IContentRepository.cs @@ -0,0 +1,12 @@ +using ScrapperAPI.Dtos; +using ScrapperAPI.Records; + +namespace ScrapperAPI.Interfaces; + +public interface IContentRepository +{ + Task SaveAsync(int queueId, string content, CancellationToken ct); + Task GetByQueueIdAsync(int queueId, CancellationToken ct); + Task GetCompressedByQueueIdAsync(int queueId, CancellationToken ct); + +} \ No newline at end of file diff --git a/ScrapperAPI/Interfaces/IDbConnectionFactory.cs b/ScrapperAPI/Interfaces/IDbConnectionFactory.cs new file mode 100644 index 0000000..50b88cf --- /dev/null +++ b/ScrapperAPI/Interfaces/IDbConnectionFactory.cs @@ -0,0 +1,8 @@ +using System.Data; + +namespace ScrapperAPI.Interfaces; + +public interface IDbConnectionFactory +{ + Task CreateOpenConnectionAsync(CancellationToken ct); +} \ No newline at end of file diff --git a/ScrapperAPI/Interfaces/IDomainRateLimiter.cs b/ScrapperAPI/Interfaces/IDomainRateLimiter.cs new file mode 100644 index 0000000..82094c4 --- /dev/null +++ b/ScrapperAPI/Interfaces/IDomainRateLimiter.cs @@ -0,0 +1,6 @@ +namespace ScrapperAPI.Interfaces; + +public interface IDomainRateLimiter +{ + Task WaitAsync(string host, CancellationToken ct); +} \ No newline at end of file diff --git a/ScrapperAPI/Interfaces/IQueueRepository.cs b/ScrapperAPI/Interfaces/IQueueRepository.cs new file mode 100644 index 0000000..7adb0d9 --- /dev/null +++ b/ScrapperAPI/Interfaces/IQueueRepository.cs @@ -0,0 +1,24 @@ +using ScrapperAPI.Dtos; + +namespace ScrapperAPI.Interfaces; + +public interface IQueueRepository +{ + Task EnqueueAsync(int sessionId, string url, CancellationToken ct); + Task GetCountsAsync(int sessionId, CancellationToken ct); + + /// + /// Pega 1 item pendente e muda para Processing atomica/seguramente. + /// Retorna null se não houver itens pendentes. + /// + Task TryDequeueAsync(int sessionId, CancellationToken ct); + + Task MarkDoneAsync(int queueId, CancellationToken ct); + Task MarkFailedAsync(int queueId, string error, CancellationToken ct); + + // Opcional: resetar stuck processing (se quiser depois) + Task RequeueStuckProcessingAsync(int sessionId, TimeSpan olderThan, CancellationToken ct); + + Task RemovePendingByIdAsync(int sessionId, int queueId, CancellationToken ct); + Task RemovePendingByUrlAsync(int sessionId, string url, CancellationToken ct); +} \ No newline at end of file diff --git a/ScrapperAPI/Interfaces/IScrapeCoordinator.cs b/ScrapperAPI/Interfaces/IScrapeCoordinator.cs new file mode 100644 index 0000000..9018680 --- /dev/null +++ b/ScrapperAPI/Interfaces/IScrapeCoordinator.cs @@ -0,0 +1,11 @@ +using ScrapperAPI.Records; + +namespace ScrapperAPI.Interfaces; + +public interface IScrapeCoordinator +{ + Task StartAsync(int sessionId, CancellationToken ct = default); + Task StopAsync(int sessionId); + ScrapeRuntimeStatus GetRuntimeStatus(int sessionId); + IReadOnlyCollection ListRunningSessions(); +} \ No newline at end of file diff --git a/ScrapperAPI/Interfaces/IScrapeEventBus.cs b/ScrapperAPI/Interfaces/IScrapeEventBus.cs new file mode 100644 index 0000000..5c300b8 --- /dev/null +++ b/ScrapperAPI/Interfaces/IScrapeEventBus.cs @@ -0,0 +1,8 @@ +using ScrapperAPI.Records; + +namespace ScrapperAPI.Interfaces; + +public interface IScrapeEventBus +{ + Task PublishAsync(ScrapeEvent ev, CancellationToken ct = default); +} \ No newline at end of file diff --git a/ScrapperAPI/Interfaces/IScraperHttpClient.cs b/ScrapperAPI/Interfaces/IScraperHttpClient.cs new file mode 100644 index 0000000..9ae4f1b --- /dev/null +++ b/ScrapperAPI/Interfaces/IScraperHttpClient.cs @@ -0,0 +1,6 @@ +namespace ScrapperAPI.Interfaces; + +public interface IScraperHttpClient +{ + Task GetStringWithRetryAsync(string url, CancellationToken ct); +} \ No newline at end of file diff --git a/ScrapperAPI/Interfaces/ISessionRepository.cs b/ScrapperAPI/Interfaces/ISessionRepository.cs new file mode 100644 index 0000000..c96f3d7 --- /dev/null +++ b/ScrapperAPI/Interfaces/ISessionRepository.cs @@ -0,0 +1,11 @@ +using ScrapperAPI.Dtos; + +namespace ScrapperAPI.Interfaces; + +public interface ISessionRepository +{ + Task CreateAsync(string name, CancellationToken ct); + Task FindByNameAsync(string name, CancellationToken ct); + Task GetIdByNameAsync(string name, CancellationToken ct); + Task> GetAllAsync(CancellationToken ct); +} \ No newline at end of file diff --git a/ScrapperAPI/Options/ScraperOptions.cs b/ScrapperAPI/Options/ScraperOptions.cs new file mode 100644 index 0000000..577815d --- /dev/null +++ b/ScrapperAPI/Options/ScraperOptions.cs @@ -0,0 +1,22 @@ +namespace ScrapperAPI.Options; + +public class ScraperOptions +{ + public int DelayMinMs { get; init; } = 100; + public int DelayMaxMs { get; init; } = 3000; + + public RateLimitOptions RateLimit { get; init; } = new(); + public RetryOptions Retry { get; init; } = new(); +} + +public sealed class RateLimitOptions +{ + public int PerDomainMinDelayMs { get; init; } = 500; +} + +public sealed class RetryOptions +{ + public int MaxAttempts { get; init; } = 5; + public int BaseDelayMs { get; init; } = 250; + public int MaxDelayMs { get; init; } = 8000; +} \ No newline at end of file diff --git a/ScrapperAPI/Program.cs b/ScrapperAPI/Program.cs new file mode 100644 index 0000000..e057e48 --- /dev/null +++ b/ScrapperAPI/Program.cs @@ -0,0 +1,63 @@ +using ScrapperAPI.Bus; +using ScrapperAPI.Factories; +using ScrapperAPI.Hub; +using ScrapperAPI.Interfaces; +using ScrapperAPI.Options; +using ScrapperAPI.Repositories; +using ScrapperAPI.Services; +using ScrapperAPI.Utils; +using ScrapperAPI.Workers; + +var builder = WebApplication.CreateBuilder(args); + +builder.Services.AddOpenApi(); +builder.Services.AddSignalR(); +builder.Services.AddControllers(); + +builder.Services.Configure(builder.Configuration.GetSection("Scraper")); + +builder.Services.AddSingleton(sp => +{ + var opts = sp.GetRequiredService>().Value; + return new DomainRateLimiter(opts.RateLimit.PerDomainMinDelayMs); +}); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); + +builder.Services.AddScoped(); +builder.Services.AddScoped(); +builder.Services.AddScoped(); + +builder.Services.AddHttpClient("scraper", c => c.Timeout = TimeSpan.FromSeconds(30)); + +builder.Services.AddSingleton(); +builder.Services.AddHostedService(sp => (ScrapeCoordinator)sp.GetRequiredService()); + +builder.Services.AddCors(options => +{ + options.AddPolicy("AllowReact", + policy => + { + policy.WithOrigins("http://localhost:3000") + .AllowAnyHeader() + .AllowAnyMethod() + .AllowCredentials(); + }); +}); + +var app = builder.Build(); + +app.UseCors("AllowReact"); + +if (app.Environment.IsDevelopment()) +{ + app.MapOpenApi(); +} + +app.MapControllers(); +app.MapHub("/ws/scrape"); + +// app.UseHttpsRedirection(); + +app.Run(); \ No newline at end of file diff --git a/ScrapperAPI/Properties/launchSettings.json b/ScrapperAPI/Properties/launchSettings.json new file mode 100644 index 0000000..fe42aa1 --- /dev/null +++ b/ScrapperAPI/Properties/launchSettings.json @@ -0,0 +1,24 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": false, + "applicationUrl": "http://localhost:5123", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "https": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": false, + "applicationUrl": "https://localhost:7285;http://localhost:5123", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "ConnectionStrings__Default": "Host=localhost;Port=5432;Database=webscrapper_dev;Username=postgres;Password=devpassword;" + } + } + } +} diff --git a/ScrapperAPI/Records/CompressedContent.cs b/ScrapperAPI/Records/CompressedContent.cs new file mode 100644 index 0000000..d0880ac --- /dev/null +++ b/ScrapperAPI/Records/CompressedContent.cs @@ -0,0 +1,11 @@ +namespace ScrapperAPI.Records; + +public sealed record CompressedContent( + int Id, + int QueueId, + string ContentEncoding, + byte[] ContentBytes, + int? OriginalLength, + int? CompressedLength, + DateTime CreatedDate +); \ No newline at end of file diff --git a/ScrapperAPI/Records/ScrapeEvent.cs b/ScrapperAPI/Records/ScrapeEvent.cs new file mode 100644 index 0000000..8d50544 --- /dev/null +++ b/ScrapperAPI/Records/ScrapeEvent.cs @@ -0,0 +1,19 @@ +using ScrapperAPI.Enums; + +namespace ScrapperAPI.Records; + +public sealed record ScrapeEvent( + ScrapeEventType Type, + int SessionId, + DateTimeOffset At, + int? QueueId = null, + string? Url = null, + int? StatusCode = null, + string? Error = null, + long? Total = null, + long? Done = null, + long? Pending = null, + long? Processing = null, + long? Failed = null, + double? Percent = null +); \ No newline at end of file diff --git a/ScrapperAPI/Records/ScrapeRuntimeStatus.cs b/ScrapperAPI/Records/ScrapeRuntimeStatus.cs new file mode 100644 index 0000000..904ae95 --- /dev/null +++ b/ScrapperAPI/Records/ScrapeRuntimeStatus.cs @@ -0,0 +1,10 @@ +namespace ScrapperAPI.Records; + +public sealed record ScrapeRuntimeStatus( + int SessionId, + bool IsRunning, + bool StopRequested, + int? CurrentQueueId, + string? CurrentUrl, + DateTimeOffset? CurrentStartedAt +); \ No newline at end of file diff --git a/ScrapperAPI/Repositories/ContentRepository.cs b/ScrapperAPI/Repositories/ContentRepository.cs new file mode 100644 index 0000000..44ea7e8 --- /dev/null +++ b/ScrapperAPI/Repositories/ContentRepository.cs @@ -0,0 +1,81 @@ +using System.IO.Compression; +using Dapper; +using ScrapperAPI.Dtos; +using ScrapperAPI.Interfaces; +using ScrapperAPI.Records; +using ScrapperAPI.Utils; + +namespace ScrapperAPI.Repositories; + +public sealed class ContentRepository : IContentRepository +{ + private readonly IDbConnectionFactory _db; + + public ContentRepository(IDbConnectionFactory db) => _db = db; + + public async Task SaveAsync(int queueId, string content, CancellationToken ct) + { + var compressed = CompressionUtils.GzipCompressUtf8(content, CompressionLevel.Fastest); + + const string sql = """ + insert into content(queue_id, content_encoding, content_bytes, original_length, compressed_length) + values (@queueId, 'gzip', @bytes, @origLen, @compLen) + returning id; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.ExecuteScalarAsync(new CommandDefinition(sql, new + { + queueId, + bytes = compressed, + origLen = content.Length, // chars (ok) + compLen = compressed.Length // bytes + }, cancellationToken: ct)); + } + + public async Task GetByQueueIdAsync(int queueId, CancellationToken ct) + { + const string sql = """ + select id, queue_id as QueueId, content, created_date as CreatedDate + from content + where queue_id = @queueId + order by id desc + limit 1; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.QuerySingleOrDefaultAsync( + new CommandDefinition(sql, new { queueId }, cancellationToken: ct)); + } + + public async Task GetCompressedByQueueIdAsync( + int queueId, + CancellationToken ct + ) + { + const string sql = """ + select + id, + queue_id as QueueId, + content_encoding as ContentEncoding, + content_bytes as ContentBytes, + original_length as OriginalLength, + compressed_length as CompressedLength, + created_date as CreatedDate + from content + where queue_id = @queueId + order by id desc + limit 1; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + + return await conn.QuerySingleOrDefaultAsync( + new CommandDefinition( + sql, + new { queueId }, + cancellationToken: ct + ) + ); + } +} \ No newline at end of file diff --git a/ScrapperAPI/Repositories/QueueRepository.cs b/ScrapperAPI/Repositories/QueueRepository.cs new file mode 100644 index 0000000..707c332 --- /dev/null +++ b/ScrapperAPI/Repositories/QueueRepository.cs @@ -0,0 +1,158 @@ +using Dapper; +using ScrapperAPI.Dtos; +using ScrapperAPI.Interfaces; + +namespace ScrapperAPI.Repositories; + +public sealed class QueueRepository : IQueueRepository +{ + private readonly IDbConnectionFactory _db; + + public QueueRepository(IDbConnectionFactory db) => _db = db; + + public async Task EnqueueAsync(int sessionId, string url, CancellationToken ct) + { + const string sql = """ + insert into queue(session_id, url) + values (@sessionId, @url) + returning id; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.ExecuteScalarAsync( + new CommandDefinition(sql, new { sessionId, url }, cancellationToken: ct)); + } + + public async Task GetCountsAsync(int sessionId, CancellationToken ct) + { + const string sql = """ + select + count(*) as total, + count(*) filter (where status = 0) as pending, + count(*) filter (where status = 1) as processing, + count(*) filter (where status = 2) as done, + count(*) filter (where status = 3) as failed + from queue + where session_id = @sessionId; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.QuerySingleAsync( + new CommandDefinition(sql, new { sessionId }, cancellationToken: ct)); + } + + public async Task TryDequeueAsync(int sessionId, CancellationToken ct) + { + // Importante: 1 transação + SKIP LOCKED (permite multi-worker no futuro) + using var conn = await _db.CreateOpenConnectionAsync(ct); + using var tx = conn.BeginTransaction(); + + const string sql = """ + with next as ( + select id + from queue + where session_id = @sessionId + and status = 0 + order by id + for update skip locked + limit 1 + ) + update queue q + set status = 1, + started_date = now(), + attempts = attempts + 1 + from next + where q.id = next.id + returning + q.id as Id, + q.session_id as SessionId, + q.url as Url, + q.status as Status, + q.created_date as CreatedDate, + q.started_date as StartedDate, + q.finished_date as FinishedDate, + q.attempts as Attempts, + q.last_error as LastError; + """; + + var item = await conn.QuerySingleOrDefaultAsync( + new CommandDefinition(sql, new { sessionId }, transaction: tx, cancellationToken: ct)); + + tx.Commit(); + return item; + } + + public async Task MarkDoneAsync(int queueId, CancellationToken ct) + { + const string sql = """ + update queue + set status = 2, + finished_date = now(), + last_error = null + where id = @queueId; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId }, cancellationToken: ct)); + } + + public async Task MarkFailedAsync(int queueId, string error, CancellationToken ct) + { + const string sql = """ + update queue + set status = 3, + finished_date = now(), + last_error = @error + where id = @queueId; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId, error }, cancellationToken: ct)); + } + + public async Task RequeueStuckProcessingAsync(int sessionId, TimeSpan olderThan, CancellationToken ct) + { + // Ex.: worker morreu e deixou itens em processing pra sempre. + const string sql = """ + update queue + set status = 0, + started_date = null + where session_id = @sessionId + and status = 1 + and started_date < now() - (@olderThanSeconds * interval '1 second'); + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.ExecuteAsync(new CommandDefinition(sql, + new { sessionId, olderThanSeconds = (int)olderThan.TotalSeconds }, + cancellationToken: ct)); + } + + public async Task RemovePendingByIdAsync(int sessionId, int queueId, CancellationToken ct) + { + const string sql = """ + delete from queue + where id = @queueId + and session_id = @sessionId + and status = 0; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + var rows = await conn.ExecuteAsync(new CommandDefinition(sql, new { sessionId, queueId }, cancellationToken: ct)); + return rows > 0; + } + + public async Task RemovePendingByUrlAsync(int sessionId, string url, CancellationToken ct) + { + const string sql = """ + delete from queue + where session_id = @sessionId + and url = @url + and status = 0; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.ExecuteAsync(new CommandDefinition(sql, new { sessionId, url }, cancellationToken: ct)); + } + +} \ No newline at end of file diff --git a/ScrapperAPI/Repositories/SessionRepository.cs b/ScrapperAPI/Repositories/SessionRepository.cs new file mode 100644 index 0000000..4553516 --- /dev/null +++ b/ScrapperAPI/Repositories/SessionRepository.cs @@ -0,0 +1,54 @@ +using Dapper; +using ScrapperAPI.Dtos; +using ScrapperAPI.Interfaces; + +namespace ScrapperAPI.Repositories; + +public sealed class SessionRepository : ISessionRepository +{ + private readonly IDbConnectionFactory _db; + + public SessionRepository(IDbConnectionFactory db) => _db = db; + + public async Task CreateAsync(string name, CancellationToken ct) + { + const string sql = """ + insert into session(name) values (@name) + returning id; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.ExecuteScalarAsync(new CommandDefinition(sql, new { name }, cancellationToken: ct)); + } + + public async Task FindByNameAsync(string name, CancellationToken ct) + { + const string sql = """ + select id, name + from session + where name = @name + limit 1; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + return await conn.QuerySingleOrDefaultAsync( + new CommandDefinition(sql, new { name }, cancellationToken: ct)); + } + + public async Task GetIdByNameAsync(string name, CancellationToken ct) + => (await FindByNameAsync(name, ct))?.Id; + + public async Task> GetAllAsync(CancellationToken ct) + { + const string sql = """ + select id, name + from session + order by id; + """; + + using var conn = await _db.CreateOpenConnectionAsync(ct); + return (await conn.QueryAsync( + new CommandDefinition(sql, cancellationToken: ct) + )).ToList(); + } +} \ No newline at end of file diff --git a/ScrapperAPI/ScrapperAPI.csproj b/ScrapperAPI/ScrapperAPI.csproj new file mode 100644 index 0000000..7ea7e5f --- /dev/null +++ b/ScrapperAPI/ScrapperAPI.csproj @@ -0,0 +1,23 @@ + + + + net10.0 + enable + enable + Linux + + + + + + + + + + + + .dockerignore + + + + diff --git a/ScrapperAPI/ScrapperAPI.http b/ScrapperAPI/ScrapperAPI.http new file mode 100644 index 0000000..a33c422 --- /dev/null +++ b/ScrapperAPI/ScrapperAPI.http @@ -0,0 +1,70 @@ +@baseUrl = http://localhost:5123 +@sessionId = 1 + +### +# ================================ +# START SCRAP FOR A SESSION +# ================================ +POST {{baseUrl}}/sessions/{{sessionId}}/scrap/start +Accept: application/json + +### + +# ================================ +# STOP SCRAP (GRACEFUL) +# Termina a URL atual e para +# ================================ +POST {{baseUrl}}/sessions/{{sessionId}}/scrap/stop +Accept: application/json + +### + +# ================================ +# GET STATUS FOR ONE SESSION +# Runtime + DB progress +# ================================ +GET {{baseUrl}}/sessions/{{sessionId}}/scrap/status +Accept: application/json + +### + +# ================================ +# LIST ALL RUNNING SESSIONS +# (runtime state) +# ================================ +GET {{baseUrl}}/scrap/running-sessions +Accept: application/json + +### +# ================================ +# LIST ALL SESSIONS +# ================================ +GET {{baseUrl}}/sessions +Accept: application/json + +### +# ================================ +# ADD URL TO SESSION +# ================================ +POST {{baseUrl}}/sessions/{{sessionId}}/queue +Content-Type: application/json +Accept: application/json + +{ + "url": "https://example.com/page-1" +} + +### +# ================================ +# REMOVE URL BY QUEUE ID (only if pending) +# ================================ +DELETE {{baseUrl}}/sessions/{{sessionId}}/queue/2 +Accept: application/json + +### +# HTML descomprimido +GET {{baseUrl}}/queue/22/content + +### +# Texto descomprimido (debug) +GET {{baseUrl}}/queue/22/content/raw \ No newline at end of file diff --git a/ScrapperAPI/Scripts/database.sql b/ScrapperAPI/Scripts/database.sql new file mode 100644 index 0000000..407fdb8 --- /dev/null +++ b/ScrapperAPI/Scripts/database.sql @@ -0,0 +1,40 @@ +create database webscrapper_dev; + +drop table content; +drop table queue; +drop table session; + +create table session( + id serial primary key, + name varchar(255) +); + +create table queue( + id serial primary key, + session_id int references session(id), + url varchar(255), + status smallint not null default 0, + started_date timestamp null, + finished_date timestamp null, + attempts int not null default 0, + last_error text null, + created_date timestamp default now() +); + +create index idx_queue_session_status on queue(session_id, status); + +create table content( + id serial primary key, + queue_id int references queue(id), + content text, + created_date timestamp default now() +); + +create unique index if not exists ux_queue_session_url + on queue(session_id, url); + +alter table content + add column content_encoding varchar(20) not null default 'gzip', + add column content_bytes bytea null, + add column original_length int null, + add column compressed_length int null; \ No newline at end of file diff --git a/ScrapperAPI/Services/ScraperHttpClient.cs b/ScrapperAPI/Services/ScraperHttpClient.cs new file mode 100644 index 0000000..b2a2566 --- /dev/null +++ b/ScrapperAPI/Services/ScraperHttpClient.cs @@ -0,0 +1,146 @@ +using System.Net; +using Microsoft.Extensions.Options; +using ScrapperAPI.Interfaces; +using ScrapperAPI.Options; + +namespace ScrapperAPI.Services; + +public sealed class ScraperHttpClient : IScraperHttpClient +{ + private readonly IHttpClientFactory _httpClientFactory; + private readonly IDomainRateLimiter _rateLimiter; + private readonly ILogger _logger; + private readonly ScraperOptions _opts; + + public ScraperHttpClient( + IHttpClientFactory httpClientFactory, + IDomainRateLimiter rateLimiter, + ILogger logger, + IOptions options) + { + _httpClientFactory = httpClientFactory; + _rateLimiter = rateLimiter; + _logger = logger; + _opts = options.Value; + } + + public async Task GetStringWithRetryAsync(string url, CancellationToken ct) + { + if (!Uri.TryCreate(url, UriKind.Absolute, out var uri)) + throw new ArgumentException("Invalid URL", nameof(url)); + + var host = uri.Host; + var http = _httpClientFactory.CreateClient("scraper"); + + var maxAttempts = Math.Max(1, _opts.Retry.MaxAttempts); + var baseDelay = Math.Max(0, _opts.Retry.BaseDelayMs); + var maxDelay = Math.Max(baseDelay, _opts.Retry.MaxDelayMs); + + Exception? lastEx = null; + + for (var attempt = 1; attempt <= maxAttempts; attempt++) + { + ct.ThrowIfCancellationRequested(); + + // Rate limit por host antes de iniciar a request + var before = DateTimeOffset.UtcNow; + await _rateLimiter.WaitAsync(host, ct); + var waitedMs = (int)(DateTimeOffset.UtcNow - before).TotalMilliseconds; + + if (waitedMs > 0) + _logger.LogDebug("RateLimit applied: waited {WaitedMs}ms for host {Host}", waitedMs, host); + + try + { + using var req = new HttpRequestMessage(HttpMethod.Get, uri); + req.Headers.UserAgent.ParseAdd("webscrapper/1.0"); + req.Headers.Accept.ParseAdd("text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"); + + using var resp = await http.SendAsync(req, HttpCompletionOption.ResponseHeadersRead, ct); + + if (IsTransientStatus(resp.StatusCode)) + { + lastEx = new HttpRequestException($"Transient status code {(int)resp.StatusCode} ({resp.StatusCode})"); + await LogAndDelayRetryAsync(url, host, attempt, maxAttempts, lastEx, baseDelay, maxDelay, ct, resp.StatusCode); + continue; + } + + resp.EnsureSuccessStatusCode(); + return await resp.Content.ReadAsStringAsync(ct); + } + catch (Exception ex) when (IsTransientException(ex, ct)) + { + lastEx = ex; + + if (attempt >= maxAttempts) + break; + + await LogAndDelayRetryAsync(url, host, attempt, maxAttempts, ex, baseDelay, maxDelay, ct, statusCode: null); + } + } + + throw lastEx ?? new Exception("Request failed"); + } + + private static bool IsTransientStatus(HttpStatusCode statusCode) + { + // Transientes típicos: + // 408 Request Timeout + // 429 Too Many Requests + // 5xx Server errors + var code = (int)statusCode; + return code == 408 || code == 429 || (code >= 500 && code <= 599); + } + + private static bool IsTransientException(Exception ex, CancellationToken ct) + { + // HttpRequestException (DNS, socket, etc.) + // TaskCanceledException pode ser timeout (mas se foi cancelamento do host, não retry) + if (ex is OperationCanceledException && ct.IsCancellationRequested) + return false; + + return ex is HttpRequestException + || ex is TaskCanceledException; // timeout de HttpClient costuma cair aqui + } + + private async Task LogAndDelayRetryAsync( + string url, + string host, + int attempt, + int maxAttempts, + Exception ex, + int baseDelayMs, + int maxDelayMs, + CancellationToken ct, + HttpStatusCode? statusCode) + { + var delayMs = ComputeBackoffWithJitterMs(attempt, baseDelayMs, maxDelayMs); + + if (statusCode is not null) + { + _logger.LogWarning( + ex, + "Retrying ({Attempt}/{MaxAttempts}) in {DelayMs}ms due to status {StatusCode} for host {Host}. Url={Url}", + attempt, maxAttempts, delayMs, (int)statusCode.Value, host, url); + } + else + { + _logger.LogWarning( + ex, + "Retrying ({Attempt}/{MaxAttempts}) in {DelayMs}ms due to transient error for host {Host}. Url={Url}", + attempt, maxAttempts, delayMs, host, url); + } + + await Task.Delay(delayMs, ct); + } + + private static int ComputeBackoffWithJitterMs(int attempt, int baseDelayMs, int maxDelayMs) + { + // Exponential backoff: base * 2^(attempt-1), com jitter [0..base) + // clamp em maxDelay + var exp = baseDelayMs * (1 << Math.Clamp(attempt - 1, 0, 30)); + var clamped = Math.Min(exp, maxDelayMs); + var jitter = Random.Shared.Next(0, Math.Max(1, baseDelayMs)); + return Math.Min(clamped + jitter, maxDelayMs); + } +} \ No newline at end of file diff --git a/ScrapperAPI/Utils/CompressionUtils.cs b/ScrapperAPI/Utils/CompressionUtils.cs new file mode 100644 index 0000000..2eb3af6 --- /dev/null +++ b/ScrapperAPI/Utils/CompressionUtils.cs @@ -0,0 +1,29 @@ +using System.IO.Compression; +using System.Text; + +namespace ScrapperAPI.Utils; + +public static class CompressionUtils +{ + public static byte[] GzipCompressUtf8(string text, CompressionLevel level = CompressionLevel.Fastest) + { + var inputBytes = Encoding.UTF8.GetBytes(text); + + using var output = new MemoryStream(); + using (var gzip = new GZipStream(output, level, leaveOpen: true)) + { + gzip.Write(inputBytes, 0, inputBytes.Length); + } + + return output.ToArray(); + } + + public static string GzipDecompressUtf8(byte[] gzBytes) + { + using var input = new MemoryStream(gzBytes); + using var gzip = new GZipStream(input, CompressionMode.Decompress); + using var reader = new StreamReader(gzip, Encoding.UTF8); + + return reader.ReadToEnd(); + } +} \ No newline at end of file diff --git a/ScrapperAPI/Utils/DomainRateLimiter.cs b/ScrapperAPI/Utils/DomainRateLimiter.cs new file mode 100644 index 0000000..1603b7d --- /dev/null +++ b/ScrapperAPI/Utils/DomainRateLimiter.cs @@ -0,0 +1,48 @@ +using System.Collections.Concurrent; +using ScrapperAPI.Interfaces; + +namespace ScrapperAPI.Utils; + +public sealed class DomainRateLimiter : IDomainRateLimiter +{ + private readonly ConcurrentDictionary _hosts = new(); + private readonly int _minDelayMs; + + public DomainRateLimiter(int minDelayMs) + { + _minDelayMs = Math.Max(0, minDelayMs); + } + + public async Task WaitAsync(string host, CancellationToken ct) + { + if (_minDelayMs == 0) return; + + var limiter = _hosts.GetOrAdd(host, _ => new HostLimiter()); + + await limiter.Gate.WaitAsync(ct); + try + { + var now = DateTimeOffset.UtcNow; + var next = limiter.NextAllowedUtc; + + if (next > now) + { + var delay = next - now; + await Task.Delay(delay, ct); + now = DateTimeOffset.UtcNow; + } + + limiter.NextAllowedUtc = now.AddMilliseconds(_minDelayMs); + } + finally + { + limiter.Gate.Release(); + } + } + + private sealed class HostLimiter + { + public SemaphoreSlim Gate { get; } = new(1, 1); + public DateTimeOffset NextAllowedUtc { get; set; } = DateTimeOffset.MinValue; + } +} \ No newline at end of file diff --git a/ScrapperAPI/Workers/ScrapeCoordinator.cs b/ScrapperAPI/Workers/ScrapeCoordinator.cs new file mode 100644 index 0000000..8cdc5f4 --- /dev/null +++ b/ScrapperAPI/Workers/ScrapeCoordinator.cs @@ -0,0 +1,261 @@ +using System.Collections.Concurrent; +using System.Threading.Channels; +using Microsoft.Extensions.Options; +using ScrapperAPI.Enums; +using ScrapperAPI.Interfaces; +using ScrapperAPI.Options; +using ScrapperAPI.Records; + +namespace ScrapperAPI.Workers; + +public sealed class ScrapeCoordinator : BackgroundService, IScrapeCoordinator +{ + private readonly IServiceScopeFactory _scopeFactory; + private readonly IHttpClientFactory _httpClientFactory; + private readonly ILogger _logger; + private readonly IScraperHttpClient _scraperHttp; + private readonly IScrapeEventBus _events; + private readonly ScraperOptions _opts; + + private readonly Channel _startRequests = Channel.CreateUnbounded( + new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); + + private readonly ConcurrentDictionary _runners = new(); + + private static readonly ThreadLocal _rng = + new(() => new Random()); + + public ScrapeCoordinator( + IServiceScopeFactory scopeFactory, + IHttpClientFactory httpClientFactory, + ILogger logger, + IOptions options, + IScraperHttpClient scraperHttp, + IScrapeEventBus events) + { + _scopeFactory = scopeFactory; + _httpClientFactory = httpClientFactory; + _logger = logger; + _opts = options.Value; + _scraperHttp = scraperHttp; + _events = events; + } + + public async Task StartAsync(int sessionId, CancellationToken ct = default) + { + var runner = _runners.GetOrAdd(sessionId, id => new Runner(id)); + runner.RequestStart(); + + await _events.PublishAsync(new ScrapeEvent( + ScrapeEventType.SessionStarted, sessionId, DateTimeOffset.UtcNow + ), ct); + + await _startRequests.Writer.WriteAsync(sessionId, ct); + } + + public Task StopAsync(int sessionId) + { + if (_runners.TryGetValue(sessionId, out var runner)) + { + runner.RequestStop(); + _ = _events.PublishAsync(new ScrapeEvent( + ScrapeEventType.SessionStopRequested, sessionId, DateTimeOffset.UtcNow + )); + } + return Task.CompletedTask; + } + + public ScrapeRuntimeStatus GetRuntimeStatus(int sessionId) + { + if (!_runners.TryGetValue(sessionId, out var r)) + return new(sessionId, false, false, null, null, null); + + return new(sessionId, r.IsRunning, r.StopRequested, r.CurrentQueueId, r.CurrentUrl, r.CurrentStartedAt); + } + + public IReadOnlyCollection ListRunningSessions() + => _runners.Values + .Where(r => r.IsRunning) + .Select(r => new ScrapeRuntimeStatus(r.SessionId, r.IsRunning, r.StopRequested, r.CurrentQueueId, r.CurrentUrl, r.CurrentStartedAt)) + .ToList(); + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("ScrapeCoordinator started."); + + while (!stoppingToken.IsCancellationRequested) + { + int sessionId; + try + { + sessionId = await _startRequests.Reader.ReadAsync(stoppingToken); + } + catch (OperationCanceledException) { break; } + + var runner = _runners.GetOrAdd(sessionId, id => new Runner(id)); + runner.RequestStart(); + + _ = RunSessionLoopAsync(runner, stoppingToken); + } + } + + private Task PoliteDelayAsync(CancellationToken ct) + { + var min = _opts.DelayMinMs; + var max = _opts.DelayMaxMs; + + if (min < 0) min = 0; + if (max < min) max = min; + + var delayMs = Random.Shared.Next(min, max + 1); + return Task.Delay(delayMs, ct); + } + + private async Task RunSessionLoopAsync(Runner runner, CancellationToken hostToken) + { + if (!runner.TryEnterLoop()) + return; + + runner.MarkRunning(true); + + try + { + var http = _httpClientFactory.CreateClient("scraper"); + + while (!hostToken.IsCancellationRequested) + { + // STOP GRACIOSO: não pega próxima URL + if (runner.StopRequested) + break; + + // cria scope (repos scoped vivem aqui dentro) + using var scope = _scopeFactory.CreateScope(); + var queue = scope.ServiceProvider.GetRequiredService(); + var content = scope.ServiceProvider.GetRequiredService(); + + var item = await queue.TryDequeueAsync(runner.SessionId, hostToken); + if (item is null) + break; + + runner.SetCurrent(item.Id, item.Url); + + await _events.PublishAsync(new ScrapeEvent( + ScrapeEventType.ItemStarted, + runner.SessionId, + DateTimeOffset.UtcNow, + QueueId: item.Id, + Url: item.Url + ), hostToken); + + try + { + var html = await _scraperHttp.GetStringWithRetryAsync(item.Url, hostToken); + + await content.SaveAsync(item.Id, html, hostToken); + await queue.MarkDoneAsync(item.Id, hostToken); + + await _events.PublishAsync(new ScrapeEvent( + ScrapeEventType.ItemSucceeded, + runner.SessionId, + DateTimeOffset.UtcNow, + QueueId: item.Id, + Url: item.Url + ), hostToken); + } + catch (Exception ex) + { + await queue.MarkFailedAsync(item.Id, Truncate(ex.ToString(), 8000), hostToken); + + await _events.PublishAsync(new ScrapeEvent( + ScrapeEventType.ItemFailed, + runner.SessionId, + DateTimeOffset.UtcNow, + QueueId: item.Id, + Url: item.Url, + Error: ex.Message + ), hostToken); + } + finally + { + // progresso (snapshot do DB) + percent + var counts = await queue.GetCountsAsync(runner.SessionId, hostToken); + var percent = counts.Total == 0 ? 0 : (double)counts.Done * 100.0 / (double)counts.Total; + + await _events.PublishAsync(new ScrapeEvent( + ScrapeEventType.Progress, + runner.SessionId, + DateTimeOffset.UtcNow, + Total: counts.Total, + Done: counts.Done, + Pending: counts.Pending, + Processing: counts.Processing, + Failed: counts.Failed, + Percent: percent + ), hostToken); + + runner.ClearCurrent(); + + if (!runner.StopRequested && !hostToken.IsCancellationRequested) + await PoliteDelayAsync(hostToken); + } + } + } + finally + { + runner.MarkRunning(false); + await _events.PublishAsync(new ScrapeEvent( + ScrapeEventType.SessionStopped, + runner.SessionId, + DateTimeOffset.UtcNow + ), hostToken); + runner.ExitLoop(); + } + } + + private static async Task FetchHtmlAsync(HttpClient http, string url, CancellationToken ct) + { + using var req = new HttpRequestMessage(HttpMethod.Get, url); + req.Headers.UserAgent.ParseAdd("webscrapper/1.0"); + using var resp = await http.SendAsync(req, HttpCompletionOption.ResponseHeadersRead, ct); + resp.EnsureSuccessStatusCode(); + return await resp.Content.ReadAsStringAsync(ct); + } + + private static string Truncate(string s, int max) => s.Length <= max ? s : s[..max]; + + private sealed class Runner + { + private int _loopEntered; + + public int SessionId { get; } + public bool IsRunning { get; private set; } + public bool StopRequested { get; private set; } + + public int? CurrentQueueId { get; private set; } + public string? CurrentUrl { get; private set; } + public DateTimeOffset? CurrentStartedAt { get; private set; } + + public Runner(int sessionId) => SessionId = sessionId; + + public void RequestStart() => StopRequested = false; + public void RequestStop() => StopRequested = true; + + public bool TryEnterLoop() => Interlocked.CompareExchange(ref _loopEntered, 1, 0) == 0; + public void ExitLoop() => Interlocked.Exchange(ref _loopEntered, 0); + public void MarkRunning(bool running) => IsRunning = running; + + public void SetCurrent(int queueId, string url) + { + CurrentQueueId = queueId; + CurrentUrl = url; + CurrentStartedAt = DateTimeOffset.UtcNow; + } + + public void ClearCurrent() + { + CurrentQueueId = null; + CurrentUrl = null; + CurrentStartedAt = null; + } + } +} diff --git a/ScrapperAPI/appsettings.Development.json b/ScrapperAPI/appsettings.Development.json new file mode 100644 index 0000000..0c208ae --- /dev/null +++ b/ScrapperAPI/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/ScrapperAPI/appsettings.json b/ScrapperAPI/appsettings.json new file mode 100644 index 0000000..51adbe8 --- /dev/null +++ b/ScrapperAPI/appsettings.json @@ -0,0 +1,24 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "ConnectionStrings": { + "Default": "Host=localhost;Port=5432;Database=webscrapper_dev;Username=postgres;Password=devpassword;" + }, + "Scraper": { + "DelayMinMs": 100, + "DelayMaxMs": 3000, + "RateLimit": { + "PerDomainMinDelayMs": 500 + }, + "Retry": { + "MaxAttempts": 5, + "BaseDelayMs": 250, + "MaxDelayMs": 8000 + } + }, + "AllowedHosts": "*" +} diff --git a/WebScrapperPro.sln b/WebScrapperPro.sln new file mode 100644 index 0000000..68a3f05 --- /dev/null +++ b/WebScrapperPro.sln @@ -0,0 +1,16 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ScrapperAPI", "ScrapperAPI\ScrapperAPI.csproj", "{206F88EA-2109-4DC0-B1E1-45AA8D3D092F}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {206F88EA-2109-4DC0-B1E1-45AA8D3D092F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {206F88EA-2109-4DC0-B1E1-45AA8D3D092F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {206F88EA-2109-4DC0-B1E1-45AA8D3D092F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {206F88EA-2109-4DC0-B1E1-45AA8D3D092F}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal