1
0
voyager-api/ScrapperAPI/Repositories/QueueRepository.cs

158 lines
5.4 KiB
C#

using Dapper;
using ScrapperAPI.Dtos;
using ScrapperAPI.Interfaces;
namespace ScrapperAPI.Repositories;
public sealed class QueueRepository : IQueueRepository
{
private readonly IDbConnectionFactory _db;
public QueueRepository(IDbConnectionFactory db) => _db = db;
public async Task<int> EnqueueAsync(int sessionId, string url, CancellationToken ct)
{
const string sql = """
insert into queue(session_id, url)
values (@sessionId, @url)
returning id;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
return await conn.ExecuteScalarAsync<int>(
new CommandDefinition(sql, new { sessionId, url }, cancellationToken: ct));
}
public async Task<QueueCounts> GetCountsAsync(int sessionId, CancellationToken ct)
{
const string sql = """
select
count(*) as total,
count(*) filter (where status = 0) as pending,
count(*) filter (where status = 1) as processing,
count(*) filter (where status = 2) as done,
count(*) filter (where status = 3) as failed
from queue
where session_id = @sessionId;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
return await conn.QuerySingleAsync<QueueCounts>(
new CommandDefinition(sql, new { sessionId }, cancellationToken: ct));
}
public async Task<QueueItem?> TryDequeueAsync(int sessionId, CancellationToken ct)
{
// Importante: 1 transação + SKIP LOCKED (permite multi-worker no futuro)
using var conn = await _db.CreateOpenConnectionAsync(ct);
using var tx = conn.BeginTransaction();
const string sql = """
with next as (
select id
from queue
where session_id = @sessionId
and status = 0
order by id
for update skip locked
limit 1
)
update queue q
set status = 1,
started_date = now(),
attempts = attempts + 1
from next
where q.id = next.id
returning
q.id as Id,
q.session_id as SessionId,
q.url as Url,
q.status as Status,
q.created_date as CreatedDate,
q.started_date as StartedDate,
q.finished_date as FinishedDate,
q.attempts as Attempts,
q.last_error as LastError;
""";
var item = await conn.QuerySingleOrDefaultAsync<QueueItem>(
new CommandDefinition(sql, new { sessionId }, transaction: tx, cancellationToken: ct));
tx.Commit();
return item;
}
public async Task MarkDoneAsync(int queueId, CancellationToken ct)
{
const string sql = """
update queue
set status = 2,
finished_date = now(),
last_error = null
where id = @queueId;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId }, cancellationToken: ct));
}
public async Task MarkFailedAsync(int queueId, string error, CancellationToken ct)
{
const string sql = """
update queue
set status = 3,
finished_date = now(),
last_error = @error
where id = @queueId;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId, error }, cancellationToken: ct));
}
public async Task<int> RequeueStuckProcessingAsync(int sessionId, TimeSpan olderThan, CancellationToken ct)
{
// Ex.: worker morreu e deixou itens em processing pra sempre.
const string sql = """
update queue
set status = 0,
started_date = null
where session_id = @sessionId
and status = 1
and started_date < now() - (@olderThanSeconds * interval '1 second');
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
return await conn.ExecuteAsync(new CommandDefinition(sql,
new { sessionId, olderThanSeconds = (int)olderThan.TotalSeconds },
cancellationToken: ct));
}
public async Task<bool> RemovePendingByIdAsync(int sessionId, int queueId, CancellationToken ct)
{
const string sql = """
delete from queue
where id = @queueId
and session_id = @sessionId
and status = 0;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
var rows = await conn.ExecuteAsync(new CommandDefinition(sql, new { sessionId, queueId }, cancellationToken: ct));
return rows > 0;
}
public async Task<int> RemovePendingByUrlAsync(int sessionId, string url, CancellationToken ct)
{
const string sql = """
delete from queue
where session_id = @sessionId
and url = @url
and status = 0;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
return await conn.ExecuteAsync(new CommandDefinition(sql, new { sessionId, url }, cancellationToken: ct));
}
}