1
0
voyager-api/ScrapperAPI/Repositories/ExtractionRunRepository.cs

97 lines
3.7 KiB
C#

using Dapper;
using ScrapperAPI.Dtos;
using ScrapperAPI.Interfaces;
namespace ScrapperAPI.Repositories;
public sealed class ExtractionRunRepository : IExtractionRunRepository
{
private readonly IDbConnectionFactory _db;
public ExtractionRunRepository(IDbConnectionFactory db) => _db = db;
public async Task<long> CreateAsync(CreateExtractionRunDto dto, CancellationToken ct)
{
const string sql = """
insert into extraction_run(model_id, session_id, status)
values (@modelId, @sessionId, 0)
returning id;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
return await conn.ExecuteScalarAsync<long>(new CommandDefinition(sql, new
{
modelId = dto.ModelId,
sessionId = dto.SessionId
}, cancellationToken: ct));
}
public async Task<ExtractionRunRow?> GetByIdAsync(long id, CancellationToken ct)
{
const string sql = """
select
id,
model_id as ModelId,
session_id as SessionId,
status,
created_at as CreatedAt,
started_at as StartedAt,
finished_at as FinishedAt,
total,
succeeded,
failed,
error
from extraction_run
where id = @id
limit 1;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
return await conn.QuerySingleOrDefaultAsync<ExtractionRunRow>(new CommandDefinition(sql, new { id }, cancellationToken: ct));
}
public async Task MarkRunningAsync(long runId, CancellationToken ct)
{
const string sql = """
update extraction_run
set status = 1,
started_at = now(),
error = null
where id = @runId;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
await conn.ExecuteAsync(new CommandDefinition(sql, new { runId }, cancellationToken: ct));
}
public async Task MarkDoneAsync(long runId, int total, int succeeded, int failed, CancellationToken ct)
{
const string sql = """
update extraction_run
set status = 2,
finished_at = now(),
total = @total,
succeeded = @succeeded,
failed = @failed,
error = null
where id = @runId;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
await conn.ExecuteAsync(new CommandDefinition(sql, new { runId, total, succeeded, failed }, cancellationToken: ct));
}
public async Task MarkFailedAsync(long runId, string error, CancellationToken ct)
{
const string sql = """
update extraction_run
set status = 3,
finished_at = now(),
error = @error
where id = @runId;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
await conn.ExecuteAsync(new CommandDefinition(sql, new { runId, error }, cancellationToken: ct));
}
}