using System.Text.Json; using QuantEngine.Core.Interfaces; namespace QuantEngine.Application.Services; public class DataCollectionService { private readonly IKisApiClient _kisApiClient; private readonly ICollectionRepository _repository; public DataCollectionService( IKisApiClient kisApiClient, ICollectionRepository repository) { _kisApiClient = kisApiClient; _repository = repository; } public async Task RunCollectionAsync( string runId, string account, List tickers) { var result = new CollectionRunResult { RunId = runId, StartedAt = KstNowIso(), Status = "RUNNING" }; try { await _repository.SaveRunAsync(new CollectionRunRecord( RunId: runId, Status: "RUNNING", StartedAt: result.StartedAt )); int successCount = 0; int errorCount = 0; foreach (var ticker in tickers) { try { var normalized = await CollectOneAsync(ticker, account); var provenance = new Dictionary { { "ticker", ticker }, { "source", "kis_open_api" } }; await _repository.SaveSnapshotAsync(new CollectionSnapshotRecord( RunId: runId, DatasetName: "data_feed", Ticker: ticker, SourceName: "kis_open_api", PayloadJson: JsonSerializer.Serialize(normalized), CapturedAt: KstNowIso() )); successCount++; } catch (Exception ex) { errorCount++; System.Diagnostics.Debug.WriteLine($"Error collecting {ticker}: {ex.Message}"); await _repository.SaveErrorAsync(new CollectionErrorRecord( RunId: runId, SourceName: "kis_collector", ErrorKind: ex.GetType().Name, ErrorMessage: ex.Message, Ticker: ticker )); } } var finishedAt = KstNowIso(); await _repository.UpdateRunStatusAsync( runId, errorCount == 0 ? "COMPLETED" : "COMPLETED_WITH_ERRORS", finishedAt, successCount, errorCount ); result.Status = errorCount == 0 ? "COMPLETED" : "COMPLETED_WITH_ERRORS"; result.FinishedAt = finishedAt; result.SuccessCount = successCount; result.ErrorCount = errorCount; } catch (Exception ex) { System.Diagnostics.Debug.WriteLine($"Fatal error in collection run {runId}: {ex}"); await _repository.UpdateRunStatusAsync(runId, "FAILED", KstNowIso()); result.Status = "FAILED"; result.ErrorMessage = ex.Message; } return result; } private async Task> CollectOneAsync(string ticker, string account) { var normalized = new Dictionary { { "ticker", ticker } }; try { var price = await _kisApiClient.GetCurrentPriceAsync(ticker, account); normalized["current_price"] = CoerceFloat(FindFirstValue(price, "stck_prpr", "stck_clpr", "close")); normalized["open"] = CoerceFloat(FindFirstValue(price, "stck_oprc", "open")); normalized["high"] = CoerceFloat(FindFirstValue(price, "stck_hgpr", "high")); normalized["low"] = CoerceFloat(FindFirstValue(price, "stck_lwpr", "low")); normalized["prev_close"] = CoerceFloat(FindFirstValue(price, "prdy_vrss")); normalized["volume"] = CoerceFloat(FindFirstValue(price, "acml_vol", "volume")); normalized["change_pct"] = CoerceFloat(FindFirstValue(price, "prdy_ctrt")); normalized["price_status"] = "OK"; } catch (Exception ex) { normalized["price_status"] = "ERROR"; normalized["price_error"] = ex.Message; } try { var orderbook = await _kisApiClient.GetAskingPrice10LevelAsync(ticker, account); var output1 = ExtractObject(orderbook, "output1"); normalized["ask_1"] = CoerceFloat(FindFirstValue(output1, "askp1")); normalized["bid_1"] = CoerceFloat(FindFirstValue(output1, "bidp1")); normalized["orderbook_status"] = "OK"; } catch (Exception ex) { normalized["orderbook_status"] = "ERROR"; normalized["orderbook_error"] = ex.Message; } try { var start = DateTime.Now.AddDays(-10).ToString("yyyyMMdd"); var end = DateTime.Now.ToString("yyyyMMdd"); var shortSale = await _kisApiClient.GetDailyShortSaleAsync(ticker, start, end, account); var rows = ExtractArray(shortSale, "output2"); if (rows.Count > 0 && rows[0] is Dictionary latest) { normalized["short_turnover_share"] = CoerceFloat(latest.GetValueOrDefault("ssts_vol_rlim")); } normalized["short_sale_status"] = "OK"; } catch (Exception ex) { normalized["short_sale_status"] = "ERROR"; normalized["short_sale_error"] = ex.Message; } normalized["collection_as_of"] = KstNowIso(); return normalized; } private static object? FindFirstValue(Dictionary payload, params string[] keys) { var stack = new Stack(); stack.Push(payload); while (stack.Count > 0) { var item = stack.Pop(); if (item is Dictionary dict) { foreach (var key in keys) { if (dict.TryGetValue(key, out var value) && value != null && !string.IsNullOrEmpty(value.ToString())) return value; } foreach (var value in dict.Values) if (value != null) stack.Push(value); } else if (item is JsonElement elem && elem.ValueKind == System.Text.Json.JsonValueKind.Object) { foreach (var key in keys) { if (elem.TryGetProperty(key, out var prop) && prop.ValueKind != System.Text.Json.JsonValueKind.Null) return prop; } foreach (var prop in elem.EnumerateObject()) stack.Push(prop.Value); } } return null; } private static double? CoerceFloat(object? value) { if (value == null || string.IsNullOrEmpty(value.ToString())) return null; try { var str = value.ToString()?.Replace(",", "").Replace("%", "") ?? ""; return double.TryParse(str, out var d) ? d : null; } catch { return null; } } private static Dictionary ExtractObject(Dictionary payload, string key) { if (payload.TryGetValue(key, out var value) && value is Dictionary dict) return dict; if (value is JsonElement elem && elem.ValueKind == System.Text.Json.JsonValueKind.Object) return JsonSerializer.Deserialize>(elem.GetRawText()) ?? new(); return new(); } private static List ExtractArray(Dictionary payload, string key) { if (payload.TryGetValue(key, out var value)) { if (value is List list) return list; if (value is JsonElement elem && elem.ValueKind == System.Text.Json.JsonValueKind.Array) return JsonSerializer.Deserialize>(elem.GetRawText()) ?? new(); } return new(); } private static string KstNowIso() => DateTime.Now.ToString("o"); } public class CollectionRunResult { public string RunId { get; set; } = ""; public string Status { get; set; } = ""; public string StartedAt { get; set; } = ""; public string? FinishedAt { get; set; } public int SuccessCount { get; set; } public int ErrorCount { get; set; } public string? ErrorMessage { get; set; } }