185 lines
6.3 KiB
C#
185 lines
6.3 KiB
C#
using Dapper;
|
|
using ScrapperAPI.Dtos;
|
|
using ScrapperAPI.Interfaces;
|
|
|
|
namespace ScrapperAPI.Repositories;
|
|
|
|
public sealed class QueueRepository : IQueueRepository
|
|
{
|
|
private readonly IDbConnectionFactory _db;
|
|
|
|
public QueueRepository(IDbConnectionFactory db) => _db = db;
|
|
|
|
public async Task<int> EnqueueAsync(int sessionId, string url, CancellationToken ct)
|
|
{
|
|
const string sql = """
|
|
insert into queue(session_id, url)
|
|
values (@sessionId, @url)
|
|
returning id;
|
|
""";
|
|
|
|
using var conn = await _db.CreateOpenConnectionAsync(ct);
|
|
return await conn.ExecuteScalarAsync<int>(
|
|
new CommandDefinition(sql, new { sessionId, url }, cancellationToken: ct));
|
|
}
|
|
|
|
public async Task<QueueCounts> GetCountsAsync(int sessionId, CancellationToken ct)
|
|
{
|
|
const string sql = """
|
|
select
|
|
count(*) as total,
|
|
count(*) filter (where status = 0) as pending,
|
|
count(*) filter (where status = 1) as processing,
|
|
count(*) filter (where status = 2) as done,
|
|
count(*) filter (where status = 3) as failed
|
|
from queue
|
|
where session_id = @sessionId;
|
|
""";
|
|
|
|
using var conn = await _db.CreateOpenConnectionAsync(ct);
|
|
return await conn.QuerySingleAsync<QueueCounts>(
|
|
new CommandDefinition(sql, new { sessionId }, cancellationToken: ct));
|
|
}
|
|
|
|
public async Task<QueueItem?> TryDequeueAsync(int sessionId, CancellationToken ct)
|
|
{
|
|
// Importante: 1 transação + SKIP LOCKED (permite multi-worker no futuro)
|
|
using var conn = await _db.CreateOpenConnectionAsync(ct);
|
|
using var tx = conn.BeginTransaction();
|
|
|
|
const string sql = """
|
|
with next as (
|
|
select id
|
|
from queue
|
|
where session_id = @sessionId
|
|
and status = 0
|
|
order by id
|
|
for update skip locked
|
|
limit 1
|
|
)
|
|
update queue q
|
|
set status = 1,
|
|
started_date = now(),
|
|
attempts = attempts + 1
|
|
from next
|
|
where q.id = next.id
|
|
returning
|
|
q.id as Id,
|
|
q.session_id as SessionId,
|
|
q.url as Url,
|
|
q.status as Status,
|
|
q.created_date as CreatedDate,
|
|
q.started_date as StartedDate,
|
|
q.finished_date as FinishedDate,
|
|
q.attempts as Attempts,
|
|
q.last_error as LastError;
|
|
""";
|
|
|
|
var item = await conn.QuerySingleOrDefaultAsync<QueueItem>(
|
|
new CommandDefinition(sql, new { sessionId }, transaction: tx, cancellationToken: ct));
|
|
|
|
tx.Commit();
|
|
return item;
|
|
}
|
|
|
|
public async Task MarkDoneAsync(int queueId, CancellationToken ct)
|
|
{
|
|
const string sql = """
|
|
update queue
|
|
set status = 2,
|
|
finished_date = now(),
|
|
last_error = null
|
|
where id = @queueId;
|
|
""";
|
|
|
|
using var conn = await _db.CreateOpenConnectionAsync(ct);
|
|
await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId }, cancellationToken: ct));
|
|
}
|
|
|
|
public async Task MarkFailedAsync(int queueId, string error, CancellationToken ct)
|
|
{
|
|
const string sql = """
|
|
update queue
|
|
set status = 3,
|
|
finished_date = now(),
|
|
last_error = @error
|
|
where id = @queueId;
|
|
""";
|
|
|
|
using var conn = await _db.CreateOpenConnectionAsync(ct);
|
|
await conn.ExecuteAsync(new CommandDefinition(sql, new { queueId, error }, cancellationToken: ct));
|
|
}
|
|
|
|
public async Task<int> RequeueStuckProcessingAsync(int sessionId, TimeSpan olderThan, CancellationToken ct)
|
|
{
|
|
// Ex.: worker morreu e deixou itens em processing pra sempre.
|
|
const string sql = """
|
|
update queue
|
|
set status = 0,
|
|
started_date = null
|
|
where session_id = @sessionId
|
|
and status = 1
|
|
and started_date < now() - (@olderThanSeconds * interval '1 second');
|
|
""";
|
|
|
|
using var conn = await _db.CreateOpenConnectionAsync(ct);
|
|
return await conn.ExecuteAsync(new CommandDefinition(sql,
|
|
new { sessionId, olderThanSeconds = (int)olderThan.TotalSeconds },
|
|
cancellationToken: ct));
|
|
}
|
|
|
|
public async Task<bool> RemovePendingByIdAsync(int sessionId, int queueId, CancellationToken ct)
|
|
{
|
|
const string sql = """
|
|
delete from queue
|
|
where id = @queueId
|
|
and session_id = @sessionId
|
|
and status = 0;
|
|
""";
|
|
|
|
using var conn = await _db.CreateOpenConnectionAsync(ct);
|
|
var rows = await conn.ExecuteAsync(new CommandDefinition(sql, new { sessionId, queueId }, cancellationToken: ct));
|
|
return rows > 0;
|
|
}
|
|
|
|
public async Task<int> RemovePendingByUrlAsync(int sessionId, string url, CancellationToken ct)
|
|
{
|
|
const string sql = """
|
|
delete from queue
|
|
where session_id = @sessionId
|
|
and url = @url
|
|
and status = 0;
|
|
""";
|
|
|
|
using var conn = await _db.CreateOpenConnectionAsync(ct);
|
|
return await conn.ExecuteAsync(new CommandDefinition(sql, new { sessionId, url }, cancellationToken: ct));
|
|
}
|
|
|
|
public async Task<IReadOnlyList<int>> ListQueueIdsAsync(int sessionId, IReadOnlyCollection<short>? statuses, CancellationToken ct)
|
|
{
|
|
// Ex.: statuses = [2] -> DONE
|
|
var statusFilter = statuses is { Count: > 0 };
|
|
|
|
var sql = statusFilter
|
|
? """
|
|
select id
|
|
from queue
|
|
where session_id = @sessionId
|
|
and status = any(@statuses)
|
|
order by id;
|
|
"""
|
|
: """
|
|
select id
|
|
from queue
|
|
where session_id = @sessionId
|
|
order by id;
|
|
""";
|
|
|
|
using var conn = await _db.CreateOpenConnectionAsync(ct);
|
|
var rows = await conn.QueryAsync<int>(new CommandDefinition(sql,
|
|
new { sessionId, statuses = statuses?.ToArray() },
|
|
cancellationToken: ct));
|
|
return rows.ToList();
|
|
}
|
|
|
|
} |