diff --git a/src/dotnet/QuantEngine.Application/Services/DataCollectionService.cs b/src/dotnet/QuantEngine.Application/Services/DataCollectionService.cs new file mode 100644 index 0000000..1fa4702 --- /dev/null +++ b/src/dotnet/QuantEngine.Application/Services/DataCollectionService.cs @@ -0,0 +1,239 @@ +using System.Text.Json; +using QuantEngine.Core.Interfaces; + +namespace QuantEngine.Application.Services; + +public class DataCollectionService +{ + private readonly IKisApiClient _kisApiClient; + private readonly ICollectionRepository _repository; + + public DataCollectionService( + IKisApiClient kisApiClient, + ICollectionRepository repository) + { + _kisApiClient = kisApiClient; + _repository = repository; + } + + public async Task RunCollectionAsync( + string runId, + string account, + List tickers) + { + var result = new CollectionRunResult + { + RunId = runId, + StartedAt = KstNowIso(), + Status = "RUNNING" + }; + + try + { + await _repository.SaveRunAsync(new CollectionRunRecord( + RunId: runId, + Status: "RUNNING", + StartedAt: result.StartedAt + )); + + int successCount = 0; + int errorCount = 0; + + foreach (var ticker in tickers) + { + try + { + var normalized = await CollectOneAsync(ticker, account); + var provenance = new Dictionary + { + { "ticker", ticker }, + { "source", "kis_open_api" } + }; + + await _repository.SaveSnapshotAsync(new CollectionSnapshotRecord( + RunId: runId, + DatasetName: "data_feed", + Ticker: ticker, + SourceName: "kis_open_api", + PayloadJson: JsonSerializer.Serialize(normalized), + CapturedAt: KstNowIso() + )); + + successCount++; + } + catch (Exception ex) + { + errorCount++; + System.Diagnostics.Debug.WriteLine($"Error collecting {ticker}: {ex.Message}"); + + await _repository.SaveErrorAsync(new CollectionErrorRecord( + RunId: runId, + SourceName: "kis_collector", + ErrorKind: ex.GetType().Name, + ErrorMessage: ex.Message, + Ticker: ticker + )); + } + } + + var finishedAt = KstNowIso(); + await _repository.UpdateRunStatusAsync( + runId, + errorCount == 0 ? "COMPLETED" : "COMPLETED_WITH_ERRORS", + finishedAt, + successCount, + errorCount + ); + + result.Status = errorCount == 0 ? "COMPLETED" : "COMPLETED_WITH_ERRORS"; + result.FinishedAt = finishedAt; + result.SuccessCount = successCount; + result.ErrorCount = errorCount; + } + catch (Exception ex) + { + System.Diagnostics.Debug.WriteLine($"Fatal error in collection run {runId}: {ex}"); + await _repository.UpdateRunStatusAsync(runId, "FAILED", KstNowIso()); + result.Status = "FAILED"; + result.ErrorMessage = ex.Message; + } + + return result; + } + + private async Task> CollectOneAsync(string ticker, string account) + { + var normalized = new Dictionary { { "ticker", ticker } }; + + try + { + var price = await _kisApiClient.GetCurrentPriceAsync(ticker, account); + normalized["current_price"] = CoerceFloat(FindFirstValue(price, "stck_prpr", "stck_clpr", "close")); + normalized["open"] = CoerceFloat(FindFirstValue(price, "stck_oprc", "open")); + normalized["high"] = CoerceFloat(FindFirstValue(price, "stck_hgpr", "high")); + normalized["low"] = CoerceFloat(FindFirstValue(price, "stck_lwpr", "low")); + normalized["prev_close"] = CoerceFloat(FindFirstValue(price, "prdy_vrss")); + normalized["volume"] = CoerceFloat(FindFirstValue(price, "acml_vol", "volume")); + normalized["change_pct"] = CoerceFloat(FindFirstValue(price, "prdy_ctrt")); + normalized["price_status"] = "OK"; + } + catch (Exception ex) + { + normalized["price_status"] = "ERROR"; + normalized["price_error"] = ex.Message; + } + + try + { + var orderbook = await _kisApiClient.GetAskingPrice10LevelAsync(ticker, account); + var output1 = ExtractObject(orderbook, "output1"); + normalized["ask_1"] = CoerceFloat(FindFirstValue(output1, "askp1")); + normalized["bid_1"] = CoerceFloat(FindFirstValue(output1, "bidp1")); + normalized["orderbook_status"] = "OK"; + } + catch (Exception ex) + { + normalized["orderbook_status"] = "ERROR"; + normalized["orderbook_error"] = ex.Message; + } + + try + { + var start = DateTime.Now.AddDays(-10).ToString("yyyyMMdd"); + var end = DateTime.Now.ToString("yyyyMMdd"); + var shortSale = await _kisApiClient.GetDailyShortSaleAsync(ticker, start, end, account); + var rows = ExtractArray(shortSale, "output2"); + if (rows.Count > 0 && rows[0] is Dictionary latest) + { + normalized["short_turnover_share"] = CoerceFloat(latest.GetValueOrDefault("ssts_vol_rlim")); + } + normalized["short_sale_status"] = "OK"; + } + catch (Exception ex) + { + normalized["short_sale_status"] = "ERROR"; + normalized["short_sale_error"] = ex.Message; + } + + normalized["collection_as_of"] = KstNowIso(); + return normalized; + } + + private static object? FindFirstValue(Dictionary payload, params string[] keys) + { + var stack = new Stack(); + stack.Push(payload); + + while (stack.Count > 0) + { + var item = stack.Pop(); + if (item is Dictionary dict) + { + foreach (var key in keys) + { + if (dict.TryGetValue(key, out var value) && value != null && !string.IsNullOrEmpty(value.ToString())) + return value; + } + foreach (var value in dict.Values) + if (value != null) stack.Push(value); + } + else if (item is JsonElement elem && elem.ValueKind == System.Text.Json.JsonValueKind.Object) + { + foreach (var key in keys) + { + if (elem.TryGetProperty(key, out var prop) && prop.ValueKind != System.Text.Json.JsonValueKind.Null) + return prop; + } + foreach (var prop in elem.EnumerateObject()) + stack.Push(prop.Value); + } + } + return null; + } + + private static double? CoerceFloat(object? value) + { + if (value == null || string.IsNullOrEmpty(value.ToString())) + return null; + try + { + var str = value.ToString()?.Replace(",", "").Replace("%", "") ?? ""; + return double.TryParse(str, out var d) ? d : null; + } + catch { return null; } + } + + private static Dictionary ExtractObject(Dictionary payload, string key) + { + if (payload.TryGetValue(key, out var value) && value is Dictionary dict) + return dict; + if (value is JsonElement elem && elem.ValueKind == System.Text.Json.JsonValueKind.Object) + return JsonSerializer.Deserialize>(elem.GetRawText()) ?? new(); + return new(); + } + + private static List ExtractArray(Dictionary payload, string key) + { + if (payload.TryGetValue(key, out var value)) + { + if (value is List list) return list; + if (value is JsonElement elem && elem.ValueKind == System.Text.Json.JsonValueKind.Array) + return JsonSerializer.Deserialize>(elem.GetRawText()) ?? new(); + } + return new(); + } + + private static string KstNowIso() => + DateTime.Now.ToString("o"); +} + +public class CollectionRunResult +{ + public string RunId { get; set; } = ""; + public string Status { get; set; } = ""; + public string StartedAt { get; set; } = ""; + public string? FinishedAt { get; set; } + public int SuccessCount { get; set; } + public int ErrorCount { get; set; } + public string? ErrorMessage { get; set; } +} diff --git a/src/dotnet/QuantEngine.Web/Endpoints/CollectionEndpoints.cs b/src/dotnet/QuantEngine.Web/Endpoints/CollectionEndpoints.cs index 04222fa..83826a2 100644 --- a/src/dotnet/QuantEngine.Web/Endpoints/CollectionEndpoints.cs +++ b/src/dotnet/QuantEngine.Web/Endpoints/CollectionEndpoints.cs @@ -1,5 +1,5 @@ using QuantEngine.Core.Interfaces; -using System.Diagnostics; +using QuantEngine.Application.Services; namespace QuantEngine.Web.Endpoints; @@ -108,51 +108,30 @@ public static class CollectionEndpoints } } - private static async Task StartCollectionRun(ICollectionRepository repo, ILogger logger) + private static async Task StartCollectionRun( + DataCollectionService collectionService, + HttpRequest request, + ILogger logger) { try { var runId = Guid.NewGuid().ToString("N"); var now = DateTime.UtcNow.ToString("o"); - var run = new CollectionRunRecord( - RunId: runId, - Status: "running", - StartedAt: now, - FinishedAt: null, - TotalSnapshots: null, - TotalErrors: null, - UpdatedAt: now - ); + var body = await request.ReadAsAsync(); + var account = body?.Account ?? "real"; + var tickers = body?.Tickers ?? new List { "005930", "000660" }; - await repo.SaveRunAsync(run); - - // Temp: Invoke Python subprocess for actual collection + // Trigger async collection (fire-and-forget) _ = Task.Run(async () => { try { - var process = new Process - { - StartInfo = new ProcessStartInfo - { - FileName = "python", - Arguments = "tools/run_kis_data_collection_v1.py --input-json GatherTradingData.json --sqlite-db src/quant_engine/kis_data_collection.db --kis-account real", - UseShellExecute = false, - RedirectStandardOutput = true, - CreateNoWindow = true - } - }; - - process.Start(); - await process.WaitForExitAsync(); - - await repo.UpdateRunStatusAsync(runId, "completed", DateTime.UtcNow.ToString("o"), 0, 0); + await collectionService.RunCollectionAsync(runId, account, tickers); } catch (Exception ex) { - logger.LogError(ex, $"Collection run {runId} failed"); - await repo.UpdateRunStatusAsync(runId, "failed", DateTime.UtcNow.ToString("o"), null, null); + logger.LogError(ex, "Collection run {RunId} failed", runId); } }); @@ -160,12 +139,20 @@ public static class CollectionEndpoints { runId, status = "running", - startedAt = now + startedAt = now, + tickerCount = tickers.Count }); } - catch + catch (Exception ex) { + logger.LogError(ex, "Failed to start collection run"); return Results.StatusCode(500); } } + + private class CollectionRunRequest + { + public string? Account { get; set; } + public List? Tickers { get; set; } + } } diff --git a/src/dotnet/QuantEngine.Web/Program.cs b/src/dotnet/QuantEngine.Web/Program.cs index 225b08e..72a6ea4 100644 --- a/src/dotnet/QuantEngine.Web/Program.cs +++ b/src/dotnet/QuantEngine.Web/Program.cs @@ -6,6 +6,7 @@ using QuantEngine.Infrastructure.Services; using QuantEngine.Core.Interfaces; using QuantEngine.Application.Services; using System.Text.Json; +using static QuantEngine.Application.Services.DataCollectionService; using Microsoft.FluentUI.AspNetCore.Components; using Serilog; using QuantEngine.Web.Infrastructure; @@ -41,6 +42,7 @@ builder.Services.AddScoped(); builder.Services.AddScoped(); builder.Services.AddScoped(); builder.Services.AddScoped(); +builder.Services.AddScoped(); // HTTP Client & API Services builder.Services.AddHttpClient();