Gitea Act Runner — Synology 기동 스크립트 + 토큰 홈 검증 도구
로컬 워크스페이스에서 Gitea Actions를 검증·디스패치할 수 있는 GITEA_TOKEN_HOME API 토큰 계약과, Synology에서 act_runner를 기동하는 스크립트를 추가한다. - validate_gitea_token_home_v1.py: 저장소 메타데이터 조회, 워크플로 접근 확인, workflow_dispatch 트리거 + 최신 실행 결과 폴링 - start_act_runner_synology.sh: Synology 환경에서 act_runner 기동 - setup_act_runner.sh: 기동 절차 갱신
This commit is contained in:
@@ -0,0 +1,144 @@
|
||||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
DEFAULT_BASE_URL = "http://192.168.123.100:8418"
|
||||
DEFAULT_OWNER = "KimJaeHyun"
|
||||
DEFAULT_REPO = "myfinance"
|
||||
|
||||
|
||||
def _token() -> str:
|
||||
token = os.environ.get("GITEA_TOKEN_HOME", "")
|
||||
if not token.strip():
|
||||
raise SystemExit("GITEA_TOKEN_HOME missing or empty")
|
||||
return token.strip()
|
||||
|
||||
|
||||
def _request_json(url: str, token: str, method: str = "GET", body: dict[str, Any] | None = None) -> tuple[int, str, Any]:
|
||||
data = None
|
||||
headers = {
|
||||
"Authorization": f"token {token}",
|
||||
"Accept": "application/json",
|
||||
}
|
||||
if body is not None:
|
||||
headers["Content-Type"] = "application/json"
|
||||
data = json.dumps(body).encode("utf-8")
|
||||
req = urllib.request.Request(url, data=data, headers=headers, method=method)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
raw = resp.read().decode("utf-8", errors="replace")
|
||||
payload = json.loads(raw) if raw else None
|
||||
return resp.status, resp.reason, payload
|
||||
except urllib.error.HTTPError as exc:
|
||||
raw = exc.read().decode("utf-8", errors="replace")
|
||||
try:
|
||||
payload = json.loads(raw) if raw else None
|
||||
except Exception:
|
||||
payload = raw
|
||||
return exc.code, exc.reason or "", payload
|
||||
|
||||
|
||||
def _repo_base(args: argparse.Namespace) -> str:
|
||||
return f"{args.base_url.rstrip('/')}/api/v1/repos/{args.owner}/{args.repo}"
|
||||
|
||||
|
||||
def _latest_run(workflow_runs: Any, workflow_name: str) -> dict[str, Any] | None:
|
||||
if not isinstance(workflow_runs, dict):
|
||||
return None
|
||||
runs = workflow_runs.get("workflow_runs") or workflow_runs.get("runs") or []
|
||||
if not isinstance(runs, list):
|
||||
return None
|
||||
for item in runs:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
path = str(item.get("path") or "")
|
||||
if workflow_name in path:
|
||||
return item
|
||||
return None
|
||||
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser(description="Validate and optionally dispatch Gitea Actions using GITEA_TOKEN_HOME.")
|
||||
ap.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
||||
ap.add_argument("--owner", default=DEFAULT_OWNER)
|
||||
ap.add_argument("--repo", default=DEFAULT_REPO)
|
||||
ap.add_argument("--workflow", default="kis_data_collection.yml")
|
||||
ap.add_argument("--ref", default="main")
|
||||
ap.add_argument("--dispatch", action="store_true", help="Trigger workflow_dispatch after the read-only checks pass.")
|
||||
ap.add_argument("--wait-seconds", type=int, default=5, help="Seconds to wait before polling for the dispatched run.")
|
||||
args = ap.parse_args()
|
||||
|
||||
token = _token()
|
||||
base = _repo_base(args)
|
||||
evidence: dict[str, Any] = {
|
||||
"base_url": args.base_url,
|
||||
"owner": args.owner,
|
||||
"repo": args.repo,
|
||||
"workflow": args.workflow,
|
||||
"ref": args.ref,
|
||||
"token_length": len(token),
|
||||
}
|
||||
|
||||
status, reason, repo_payload = _request_json(base, token)
|
||||
evidence["repo_status"] = status
|
||||
evidence["repo_reason"] = reason
|
||||
if status != 200:
|
||||
print(json.dumps({"gate": "FAIL", "evidence": evidence, "error": repo_payload}, ensure_ascii=False, indent=2))
|
||||
return 1
|
||||
|
||||
status, reason, workflow_payload = _request_json(f"{base}/actions/workflows/{args.workflow}", token)
|
||||
evidence["workflow_status"] = status
|
||||
evidence["workflow_reason"] = reason
|
||||
evidence["workflow_payload_type"] = type(workflow_payload).__name__
|
||||
if status != 200:
|
||||
print(json.dumps({"gate": "FAIL", "evidence": evidence, "error": workflow_payload}, ensure_ascii=False, indent=2))
|
||||
return 1
|
||||
|
||||
dispatch_result = None
|
||||
if args.dispatch:
|
||||
status, reason, dispatch_result = _request_json(
|
||||
f"{base}/actions/workflows/{args.workflow}/dispatches",
|
||||
token,
|
||||
method="POST",
|
||||
body={"ref": args.ref},
|
||||
)
|
||||
evidence["dispatch_status"] = status
|
||||
evidence["dispatch_reason"] = reason
|
||||
if status not in (201, 204):
|
||||
print(json.dumps({"gate": "FAIL", "evidence": evidence, "error": dispatch_result}, ensure_ascii=False, indent=2))
|
||||
return 1
|
||||
|
||||
time.sleep(max(0, args.wait_seconds))
|
||||
runs_status, runs_reason, runs_payload = _request_json(
|
||||
f"{base}/actions/runs?per_page=10",
|
||||
token,
|
||||
)
|
||||
evidence["runs_status"] = runs_status
|
||||
evidence["runs_reason"] = runs_reason
|
||||
latest = _latest_run(runs_payload, args.workflow)
|
||||
evidence["latest_run"] = latest
|
||||
if not latest:
|
||||
print(json.dumps({"gate": "FAIL", "evidence": evidence, "error": "latest_run_missing"}, ensure_ascii=False, indent=2))
|
||||
return 1
|
||||
|
||||
result = {
|
||||
"gate": "PASS",
|
||||
"evidence": evidence,
|
||||
"dispatch_result": dispatch_result,
|
||||
}
|
||||
print(json.dumps(result, ensure_ascii=False, indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user