using Dapper; using ScrapperAPI.Dtos; using ScrapperAPI.Interfaces; namespace ScrapperAPI.Repositories; public sealed class QueueRepository : IQueueRepository { private readonly IDbConnectionFactory _db; public QueueRepository(IDbConnectionFactory db) => _db = db; public async Task EnqueueAsync(int sessionId, string url, CancellationToken ct) { const string sql = """ insert into queue(session_id, url) values (@sessionId, @url) returning id; """; using var conn = await _db.CreateOpenConnectionAsync(ct); return await conn.ExecuteScalarAsync( new CommandDefinition(sql, new { sessionId, url }, cancellationToken: ct)); } public async Task GetCountsAsync(int sessionId, CancellationToken ct) { const string sql = """ select count(*) as total, count(*) filter (where status = 0) as pending, count(*) filter (where status = 1) as processing, count(*) filter (where status = 2) as done, count(*) filter (where status = 3) as failed from queue where session_id = @sessionId; """; using var conn = await _db.CreateOpenConnectionAsync(ct); return await conn.QuerySingleAsync( new CommandDefinition(sql, new { sessionId }, cancellationToken: ct)); } public async Task TryDequeueAsync(int sessionId, CancellationToken ct) { // Importante: 1 transação + SKIP LOCKED (permite multi-worker no futuro) using var conn = await _db.CreateOpenConnectionAsync(ct); using var tx = conn.BeginTransaction(); const string sql = """ with next as ( select id from queue where session_id = @sessionId and status = 0 order by id for update skip locked limit 1 ) update queue q set status = 1, started_date = now(), attempts = attempts + 1 from next where q.id = next.id returning q.id as Id, q.session_id as SessionId, q.url as Url, q.status as Status, q.created_date as CreatedDate, q.started_date as StartedDate, q.finished_date as FinishedDate, q.attempts as Attempts, q.last_error as LastError; """; var item = await conn.QuerySingleOrDefaultAsync( new CommandDefinition(sql, new { sessionId }, transaction: tx, cancellationToken: ct)); tx.Commit(); return item; } public async Task MarkDoneAsync(int queueId, CancellationToken ct) { const string sql = """ update queue set status = 2, finished_date = now(), last_error = null where id = @queueId; """; using var conn = await _db.CreateOpenConnectionAsync(ct); await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId }, cancellationToken: ct)); } public async Task MarkFailedAsync(int queueId, string error, CancellationToken ct) { const string sql = """ update queue set status = 3, finished_date = now(), last_error = @error where id = @queueId; """; using var conn = await _db.CreateOpenConnectionAsync(ct); await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId, error }, cancellationToken: ct)); } public async Task RequeueStuckProcessingAsync(int sessionId, TimeSpan olderThan, CancellationToken ct) { // Ex.: worker morreu e deixou itens em processing pra sempre. const string sql = """ update queue set status = 0, started_date = null where session_id = @sessionId and status = 1 and started_date < now() - (@olderThanSeconds * interval '1 second'); """; using var conn = await _db.CreateOpenConnectionAsync(ct); return await conn.ExecuteAsync(new CommandDefinition(sql, new { sessionId, olderThanSeconds = (int)olderThan.TotalSeconds }, cancellationToken: ct)); } public async Task RemovePendingByIdAsync(int sessionId, int queueId, CancellationToken ct) { const string sql = """ delete from queue where id = @queueId and session_id = @sessionId and status = 0; """; using var conn = await _db.CreateOpenConnectionAsync(ct); var rows = await conn.ExecuteAsync(new CommandDefinition(sql, new { sessionId, queueId }, cancellationToken: ct)); return rows > 0; } public async Task RemovePendingByUrlAsync(int sessionId, string url, CancellationToken ct) { const string sql = """ delete from queue where session_id = @sessionId and url = @url and status = 0; """; using var conn = await _db.CreateOpenConnectionAsync(ct); return await conn.ExecuteAsync(new CommandDefinition(sql, new { sessionId, url }, cancellationToken: ct)); } }