132 lines
4.9 KiB
C#
132 lines
4.9 KiB
C#
using System.Text.Json;
|
|
using Dapper;
|
|
using ScrapperAPI.Dtos;
|
|
using ScrapperAPI.Interfaces;
|
|
|
|
namespace ScrapperAPI.Repositories;
|
|
|
|
public sealed class ExtractedDataRepository : IExtractedDataRepository
|
|
{
|
|
private readonly IDbConnectionFactory _db;
|
|
public ExtractedDataRepository(IDbConnectionFactory db) => _db = db;
|
|
|
|
public async Task UpsertAsync(UpsertExtractedDataDto dto, CancellationToken ct)
|
|
{
|
|
const string sql = """
|
|
insert into extracted_data(run_id, model_id, session_id, queue_id, extracted_json, success, error)
|
|
values (@runId, @modelId, @sessionId, @queueId, @json::jsonb, @success, @error)
|
|
on conflict (model_id, queue_id)
|
|
do update set
|
|
run_id = excluded.run_id,
|
|
session_id = excluded.session_id,
|
|
extracted_json = excluded.extracted_json,
|
|
success = excluded.success,
|
|
error = excluded.error,
|
|
extracted_at = now();
|
|
""";
|
|
|
|
using var conn = await _db.CreateOpenConnectionAsync(ct);
|
|
await conn.ExecuteAsync(new CommandDefinition(sql, new
|
|
{
|
|
runId = dto.RunId,
|
|
modelId = dto.ModelId,
|
|
sessionId = dto.SessionId,
|
|
queueId = dto.QueueId,
|
|
json = dto.ExtractedJson.RootElement.GetRawText(),
|
|
success = dto.Success,
|
|
error = dto.Error
|
|
}, cancellationToken: ct));
|
|
}
|
|
|
|
public async Task<IReadOnlyList<ExtractedDataRow>> ListBySessionAsync(int sessionId, long modelId, CancellationToken ct)
|
|
{
|
|
const string sql = """
|
|
select
|
|
id,
|
|
run_id as RunId,
|
|
model_id as ModelId,
|
|
session_id as SessionId,
|
|
queue_id as QueueId,
|
|
extracted_json::text as extractedJson,
|
|
success,
|
|
error,
|
|
extracted_at as ExtractedAt
|
|
from extracted_data
|
|
where session_id = @sessionId
|
|
and model_id = @modelId
|
|
order by queue_id;
|
|
""";
|
|
|
|
using var conn = await _db.CreateOpenConnectionAsync(ct);
|
|
var rows = await conn.QueryAsync<RowRaw>(new CommandDefinition(sql, new { sessionId, modelId }, cancellationToken: ct));
|
|
return rows.Select(r => r.ToDto()).ToList();
|
|
}
|
|
|
|
public async Task<ExtractedDataRow?> GetByQueueIdAsync(int queueId, long modelId, CancellationToken ct)
|
|
{
|
|
const string sql = """
|
|
select
|
|
id,
|
|
run_id as RunId,
|
|
model_id as ModelId,
|
|
session_id as SessionId,
|
|
queue_id as QueueId,
|
|
extracted_json::text as extractedJson,
|
|
success,
|
|
error,
|
|
extracted_at as ExtractedAt
|
|
from extracted_data
|
|
where queue_id = @queueId
|
|
and model_id = @modelId
|
|
limit 1;
|
|
""";
|
|
|
|
using var conn = await _db.CreateOpenConnectionAsync(ct);
|
|
var row = await conn.QuerySingleOrDefaultAsync<RowRaw>(new CommandDefinition(sql, new { queueId, modelId }, cancellationToken: ct));
|
|
return row?.ToDto();
|
|
}
|
|
|
|
private sealed class RowRaw
|
|
{
|
|
public ExtractedDataRow ToDto() => new(
|
|
Id,
|
|
RunId,
|
|
ModelId,
|
|
SessionId,
|
|
QueueId,
|
|
JsonDocument.Parse(ExtractedJson ?? "{}"),
|
|
Success,
|
|
Error,
|
|
ExtractedAt
|
|
);
|
|
|
|
public long Id { get; init; }
|
|
public long RunId { get; init; }
|
|
public long ModelId { get; init; }
|
|
public int SessionId { get; init; }
|
|
public int QueueId { get; init; }
|
|
public string? ExtractedJson { get; init; }
|
|
public bool Success { get; init; }
|
|
public string? Error { get; init; }
|
|
public DateTimeOffset ExtractedAt { get; init; }
|
|
|
|
public RowRaw()
|
|
{
|
|
|
|
}
|
|
|
|
public RowRaw(long id, long runId, long modelId, int sessionId, int queueId, string? extractedJson, bool success, string? error, DateTimeOffset extractedAt)
|
|
{
|
|
Id = id;
|
|
RunId = runId;
|
|
ModelId = modelId;
|
|
SessionId = sessionId;
|
|
QueueId = queueId;
|
|
ExtractedJson = extractedJson;
|
|
Success = success;
|
|
Error = error;
|
|
ExtractedAt = extractedAt;
|
|
}
|
|
}
|
|
}
|