1
0
voyager-api/ScrapperAPI/Repositories/ExtractedDataRepository.cs

132 lines
4.9 KiB
C#

using System.Text.Json;
using Dapper;
using ScrapperAPI.Dtos;
using ScrapperAPI.Interfaces;
namespace ScrapperAPI.Repositories;
public sealed class ExtractedDataRepository : IExtractedDataRepository
{
private readonly IDbConnectionFactory _db;
public ExtractedDataRepository(IDbConnectionFactory db) => _db = db;
public async Task UpsertAsync(UpsertExtractedDataDto dto, CancellationToken ct)
{
const string sql = """
insert into extracted_data(run_id, model_id, session_id, queue_id, extracted_json, success, error)
values (@runId, @modelId, @sessionId, @queueId, @json::jsonb, @success, @error)
on conflict (model_id, queue_id)
do update set
run_id = excluded.run_id,
session_id = excluded.session_id,
extracted_json = excluded.extracted_json,
success = excluded.success,
error = excluded.error,
extracted_at = now();
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
await conn.ExecuteAsync(new CommandDefinition(sql, new
{
runId = dto.RunId,
modelId = dto.ModelId,
sessionId = dto.SessionId,
queueId = dto.QueueId,
json = dto.ExtractedJson.RootElement.GetRawText(),
success = dto.Success,
error = dto.Error
}, cancellationToken: ct));
}
public async Task<IReadOnlyList<ExtractedDataRow>> ListBySessionAsync(int sessionId, long modelId, CancellationToken ct)
{
const string sql = """
select
id,
run_id as RunId,
model_id as ModelId,
session_id as SessionId,
queue_id as QueueId,
extracted_json::text as extractedJson,
success,
error,
extracted_at as ExtractedAt
from extracted_data
where session_id = @sessionId
and model_id = @modelId
order by queue_id;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
var rows = await conn.QueryAsync<RowRaw>(new CommandDefinition(sql, new { sessionId, modelId }, cancellationToken: ct));
return rows.Select(r => r.ToDto()).ToList();
}
public async Task<ExtractedDataRow?> GetByQueueIdAsync(int queueId, long modelId, CancellationToken ct)
{
const string sql = """
select
id,
run_id as RunId,
model_id as ModelId,
session_id as SessionId,
queue_id as QueueId,
extracted_json::text as extractedJson,
success,
error,
extracted_at as ExtractedAt
from extracted_data
where queue_id = @queueId
and model_id = @modelId
limit 1;
""";
using var conn = await _db.CreateOpenConnectionAsync(ct);
var row = await conn.QuerySingleOrDefaultAsync<RowRaw>(new CommandDefinition(sql, new { queueId, modelId }, cancellationToken: ct));
return row?.ToDto();
}
private sealed class RowRaw
{
public ExtractedDataRow ToDto() => new(
Id,
RunId,
ModelId,
SessionId,
QueueId,
JsonDocument.Parse(ExtractedJson ?? "{}"),
Success,
Error,
ExtractedAt
);
public long Id { get; init; }
public long RunId { get; init; }
public long ModelId { get; init; }
public int SessionId { get; init; }
public int QueueId { get; init; }
public string? ExtractedJson { get; init; }
public bool Success { get; init; }
public string? Error { get; init; }
public DateTimeOffset ExtractedAt { get; init; }
public RowRaw()
{
}
public RowRaw(long id, long runId, long modelId, int sessionId, int queueId, string? extractedJson, bool success, string? error, DateTimeOffset extractedAt)
{
Id = id;
RunId = runId;
ModelId = modelId;
SessionId = sessionId;
QueueId = queueId;
ExtractedJson = extractedJson;
Success = success;
Error = error;
ExtractedAt = extractedAt;
}
}
}