using Dapper; using ScrapperAPI.Dtos; using ScrapperAPI.Interfaces; namespace ScrapperAPI.Repositories; public sealed class QueueRepository : IQueueRepository { private readonly IDbConnectionFactory _db; public QueueRepository(IDbConnectionFactory db) => _db = db; public async Task EnqueueAsync(int sessionId, string url, CancellationToken ct) { const string sql = """ insert into queue(session_id, url) values (@sessionId, @url) returning id; """; using var conn = await _db.CreateOpenConnectionAsync(ct); return await conn.ExecuteScalarAsync( new CommandDefinition(sql, new { sessionId, url }, cancellationToken: ct)); } public async Task GetCountsAsync(int sessionId, CancellationToken ct) { const string sql = """ select count(*) as total, count(*) filter (where status = 0) as pending, count(*) filter (where status = 1) as processing, count(*) filter (where status = 2) as done, count(*) filter (where status = 3) as failed from queue where session_id = @sessionId; """; using var conn = await _db.CreateOpenConnectionAsync(ct); return await conn.QuerySingleAsync( new CommandDefinition(sql, new { sessionId }, cancellationToken: ct)); } public async Task TryDequeueAsync(int sessionId, string workerId, TimeSpan leaseFor, CancellationToken ct) { var batch = await LeaseBatchAsync(sessionId, workerId, take: 1, leaseFor, ct); return batch.FirstOrDefault(); } public async Task> LeaseBatchAsync(int sessionId, string workerId, int take, TimeSpan leaseFor, CancellationToken ct) { if (take <= 0) return Array.Empty(); using var conn = await _db.CreateOpenConnectionAsync(ct); using var tx = conn.BeginTransaction(); const string sql = """ with next as ( select id from queue where session_id = @sessionId and ( status = 0 or (status = 1 and lease_expires_at is not null and lease_expires_at < now()) ) order by id for update skip locked limit @take ) update queue q set status = 1, started_date = coalesce(q.started_date, now()), attempts = q.attempts + 1, leased_by = @workerId, lease_expires_at = now() + (@leaseSeconds * interval '1 second') from next where q.id = next.id returning q.id as Id, q.session_id as SessionId, q.url as Url, q.status as Status, q.created_date as CreatedDate, q.started_date as StartedDate, q.finished_date as FinishedDate, q.attempts as Attempts, q.last_error as LastError; """; var rows = await conn.QueryAsync( new CommandDefinition(sql, new { sessionId, workerId, take, leaseSeconds = Math.Max(1, (int)leaseFor.TotalSeconds) }, transaction: tx, cancellationToken: ct)); tx.Commit(); return rows.ToList(); } public async Task RenewLeaseAsync(int queueId, string workerId, TimeSpan leaseFor, CancellationToken ct) { const string sql = """ update queue set lease_expires_at = now() + (@leaseSeconds * interval '1 second') where id = @queueId and status = 1 and leased_by = @workerId and (lease_expires_at is null or lease_expires_at > now() - interval '5 minutes'); """; using var conn = await _db.CreateOpenConnectionAsync(ct); var rows = await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId, workerId, leaseSeconds = Math.Max(1, (int)leaseFor.TotalSeconds) }, cancellationToken: ct)); return rows > 0; } public async Task MarkDoneAsync(int queueId, string workerId, CancellationToken ct) { const string sql = """ update queue set status = 2, finished_date = now(), last_error = null, lease_expires_at = null where id = @queueId and leased_by = @workerId; """; using var conn = await _db.CreateOpenConnectionAsync(ct); var rows = await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId, workerId }, cancellationToken: ct)); return rows > 0; } public async Task MarkFailedAsync(int queueId, string workerId, string error, CancellationToken ct) { const string sql = """ update queue set status = 3, finished_date = now(), last_error = @error, lease_expires_at = null where id = @queueId and leased_by = @workerId; """; using var conn = await _db.CreateOpenConnectionAsync(ct); var rows = await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId, workerId, error }, cancellationToken: ct)); return rows > 0; } public async Task RequeueStuckProcessingAsync(int sessionId, TimeSpan olderThan, CancellationToken ct) { // Ex.: worker morreu e deixou itens em processing pra sempre. const string sql = """ update queue set status = 0, started_date = null where session_id = @sessionId and status = 1 and started_date < now() - (@olderThanSeconds * interval '1 second'); """; using var conn = await _db.CreateOpenConnectionAsync(ct); return await conn.ExecuteAsync(new CommandDefinition(sql, new { sessionId, olderThanSeconds = (int)olderThan.TotalSeconds }, cancellationToken: ct)); } public async Task RemovePendingByIdAsync(int sessionId, int queueId, CancellationToken ct) { const string sql = """ delete from queue where id = @queueId and session_id = @sessionId and status = 0; """; using var conn = await _db.CreateOpenConnectionAsync(ct); var rows = await conn.ExecuteAsync(new CommandDefinition(sql, new { sessionId, queueId }, cancellationToken: ct)); return rows > 0; } public async Task RemovePendingByUrlAsync(int sessionId, string url, CancellationToken ct) { const string sql = """ delete from queue where session_id = @sessionId and url = @url and status = 0; """; using var conn = await _db.CreateOpenConnectionAsync(ct); return await conn.ExecuteAsync(new CommandDefinition(sql, new { sessionId, url }, cancellationToken: ct)); } public async Task> ListQueueIdsAsync(int sessionId, IReadOnlyCollection? statuses, CancellationToken ct) { // Ex.: statuses = [2] -> DONE var statusFilter = statuses is { Count: > 0 }; var sql = statusFilter ? """ select id from queue where session_id = @sessionId and status = any(@statuses) order by id; """ : """ select id from queue where session_id = @sessionId order by id; """; using var conn = await _db.CreateOpenConnectionAsync(ct); var rows = await conn.QueryAsync(new CommandDefinition(sql, new { sessionId, statuses = statuses?.ToArray() }, cancellationToken: ct)); return rows.ToList(); } }