5c5d9bfee7
Quant Engine CI/CD Pipeline / validate-core (push) Failing after 7s
WBS-9.3 - NULL Policy CI Gate / NULL Policy Validation (push) Failing after 8s
Quant Engine CI/CD Pipeline / validate-ui-and-storage (push) Has been skipped
Snapshot Admin Deployment / build-and-deploy (push) Failing after 46s
Deploy to Production / Build & Deploy to Production (push) Failing after 1m5s
- C# 기반의 DataCollectionService 클래스 구현 - 기존의 파이썬 스크립트 실행 방식을 대체하고 KIS API 클라이언트를 직접 사용하여 주식 시세, 호가, 공매도 정보 수집 - CollectionEndpoints에 비동기 수집 요청 처리 통합 및 Program.cs에 서비스 DI 등록
240 lines
8.6 KiB
C#
240 lines
8.6 KiB
C#
using System.Text.Json;
|
|
using QuantEngine.Core.Interfaces;
|
|
|
|
namespace QuantEngine.Application.Services;
|
|
|
|
public class DataCollectionService
|
|
{
|
|
private readonly IKisApiClient _kisApiClient;
|
|
private readonly ICollectionRepository _repository;
|
|
|
|
public DataCollectionService(
|
|
IKisApiClient kisApiClient,
|
|
ICollectionRepository repository)
|
|
{
|
|
_kisApiClient = kisApiClient;
|
|
_repository = repository;
|
|
}
|
|
|
|
public async Task<CollectionRunResult> RunCollectionAsync(
|
|
string runId,
|
|
string account,
|
|
List<string> tickers)
|
|
{
|
|
var result = new CollectionRunResult
|
|
{
|
|
RunId = runId,
|
|
StartedAt = KstNowIso(),
|
|
Status = "RUNNING"
|
|
};
|
|
|
|
try
|
|
{
|
|
await _repository.SaveRunAsync(new CollectionRunRecord(
|
|
RunId: runId,
|
|
Status: "RUNNING",
|
|
StartedAt: result.StartedAt
|
|
));
|
|
|
|
int successCount = 0;
|
|
int errorCount = 0;
|
|
|
|
foreach (var ticker in tickers)
|
|
{
|
|
try
|
|
{
|
|
var normalized = await CollectOneAsync(ticker, account);
|
|
var provenance = new Dictionary<string, object>
|
|
{
|
|
{ "ticker", ticker },
|
|
{ "source", "kis_open_api" }
|
|
};
|
|
|
|
await _repository.SaveSnapshotAsync(new CollectionSnapshotRecord(
|
|
RunId: runId,
|
|
DatasetName: "data_feed",
|
|
Ticker: ticker,
|
|
SourceName: "kis_open_api",
|
|
PayloadJson: JsonSerializer.Serialize(normalized),
|
|
CapturedAt: KstNowIso()
|
|
));
|
|
|
|
successCount++;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
errorCount++;
|
|
System.Diagnostics.Debug.WriteLine($"Error collecting {ticker}: {ex.Message}");
|
|
|
|
await _repository.SaveErrorAsync(new CollectionErrorRecord(
|
|
RunId: runId,
|
|
SourceName: "kis_collector",
|
|
ErrorKind: ex.GetType().Name,
|
|
ErrorMessage: ex.Message,
|
|
Ticker: ticker
|
|
));
|
|
}
|
|
}
|
|
|
|
var finishedAt = KstNowIso();
|
|
await _repository.UpdateRunStatusAsync(
|
|
runId,
|
|
errorCount == 0 ? "COMPLETED" : "COMPLETED_WITH_ERRORS",
|
|
finishedAt,
|
|
successCount,
|
|
errorCount
|
|
);
|
|
|
|
result.Status = errorCount == 0 ? "COMPLETED" : "COMPLETED_WITH_ERRORS";
|
|
result.FinishedAt = finishedAt;
|
|
result.SuccessCount = successCount;
|
|
result.ErrorCount = errorCount;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
System.Diagnostics.Debug.WriteLine($"Fatal error in collection run {runId}: {ex}");
|
|
await _repository.UpdateRunStatusAsync(runId, "FAILED", KstNowIso());
|
|
result.Status = "FAILED";
|
|
result.ErrorMessage = ex.Message;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
private async Task<Dictionary<string, object>> CollectOneAsync(string ticker, string account)
|
|
{
|
|
var normalized = new Dictionary<string, object> { { "ticker", ticker } };
|
|
|
|
try
|
|
{
|
|
var price = await _kisApiClient.GetCurrentPriceAsync(ticker, account);
|
|
normalized["current_price"] = CoerceFloat(FindFirstValue(price, "stck_prpr", "stck_clpr", "close"));
|
|
normalized["open"] = CoerceFloat(FindFirstValue(price, "stck_oprc", "open"));
|
|
normalized["high"] = CoerceFloat(FindFirstValue(price, "stck_hgpr", "high"));
|
|
normalized["low"] = CoerceFloat(FindFirstValue(price, "stck_lwpr", "low"));
|
|
normalized["prev_close"] = CoerceFloat(FindFirstValue(price, "prdy_vrss"));
|
|
normalized["volume"] = CoerceFloat(FindFirstValue(price, "acml_vol", "volume"));
|
|
normalized["change_pct"] = CoerceFloat(FindFirstValue(price, "prdy_ctrt"));
|
|
normalized["price_status"] = "OK";
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
normalized["price_status"] = "ERROR";
|
|
normalized["price_error"] = ex.Message;
|
|
}
|
|
|
|
try
|
|
{
|
|
var orderbook = await _kisApiClient.GetAskingPrice10LevelAsync(ticker, account);
|
|
var output1 = ExtractObject(orderbook, "output1");
|
|
normalized["ask_1"] = CoerceFloat(FindFirstValue(output1, "askp1"));
|
|
normalized["bid_1"] = CoerceFloat(FindFirstValue(output1, "bidp1"));
|
|
normalized["orderbook_status"] = "OK";
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
normalized["orderbook_status"] = "ERROR";
|
|
normalized["orderbook_error"] = ex.Message;
|
|
}
|
|
|
|
try
|
|
{
|
|
var start = DateTime.Now.AddDays(-10).ToString("yyyyMMdd");
|
|
var end = DateTime.Now.ToString("yyyyMMdd");
|
|
var shortSale = await _kisApiClient.GetDailyShortSaleAsync(ticker, start, end, account);
|
|
var rows = ExtractArray(shortSale, "output2");
|
|
if (rows.Count > 0 && rows[0] is Dictionary<string, object> latest)
|
|
{
|
|
normalized["short_turnover_share"] = CoerceFloat(latest.GetValueOrDefault("ssts_vol_rlim"));
|
|
}
|
|
normalized["short_sale_status"] = "OK";
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
normalized["short_sale_status"] = "ERROR";
|
|
normalized["short_sale_error"] = ex.Message;
|
|
}
|
|
|
|
normalized["collection_as_of"] = KstNowIso();
|
|
return normalized;
|
|
}
|
|
|
|
private static object? FindFirstValue(Dictionary<string, object> payload, params string[] keys)
|
|
{
|
|
var stack = new Stack<object>();
|
|
stack.Push(payload);
|
|
|
|
while (stack.Count > 0)
|
|
{
|
|
var item = stack.Pop();
|
|
if (item is Dictionary<string, object> dict)
|
|
{
|
|
foreach (var key in keys)
|
|
{
|
|
if (dict.TryGetValue(key, out var value) && value != null && !string.IsNullOrEmpty(value.ToString()))
|
|
return value;
|
|
}
|
|
foreach (var value in dict.Values)
|
|
if (value != null) stack.Push(value);
|
|
}
|
|
else if (item is JsonElement elem && elem.ValueKind == System.Text.Json.JsonValueKind.Object)
|
|
{
|
|
foreach (var key in keys)
|
|
{
|
|
if (elem.TryGetProperty(key, out var prop) && prop.ValueKind != System.Text.Json.JsonValueKind.Null)
|
|
return prop;
|
|
}
|
|
foreach (var prop in elem.EnumerateObject())
|
|
stack.Push(prop.Value);
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
private static double? CoerceFloat(object? value)
|
|
{
|
|
if (value == null || string.IsNullOrEmpty(value.ToString()))
|
|
return null;
|
|
try
|
|
{
|
|
var str = value.ToString()?.Replace(",", "").Replace("%", "") ?? "";
|
|
return double.TryParse(str, out var d) ? d : null;
|
|
}
|
|
catch { return null; }
|
|
}
|
|
|
|
private static Dictionary<string, object> ExtractObject(Dictionary<string, object> payload, string key)
|
|
{
|
|
if (payload.TryGetValue(key, out var value) && value is Dictionary<string, object> dict)
|
|
return dict;
|
|
if (value is JsonElement elem && elem.ValueKind == System.Text.Json.JsonValueKind.Object)
|
|
return JsonSerializer.Deserialize<Dictionary<string, object>>(elem.GetRawText()) ?? new();
|
|
return new();
|
|
}
|
|
|
|
private static List<object> ExtractArray(Dictionary<string, object> payload, string key)
|
|
{
|
|
if (payload.TryGetValue(key, out var value))
|
|
{
|
|
if (value is List<object> list) return list;
|
|
if (value is JsonElement elem && elem.ValueKind == System.Text.Json.JsonValueKind.Array)
|
|
return JsonSerializer.Deserialize<List<object>>(elem.GetRawText()) ?? new();
|
|
}
|
|
return new();
|
|
}
|
|
|
|
private static string KstNowIso() =>
|
|
DateTime.Now.ToString("o");
|
|
}
|
|
|
|
public class CollectionRunResult
|
|
{
|
|
public string RunId { get; set; } = "";
|
|
public string Status { get; set; } = "";
|
|
public string StartedAt { get; set; } = "";
|
|
public string? FinishedAt { get; set; }
|
|
public int SuccessCount { get; set; }
|
|
public int ErrorCount { get; set; }
|
|
public string? ErrorMessage { get; set; }
|
|
}
|