[Lang laufende Arbeit, signierte Zustellung]

Asynchrone Jobs

Reihen Sie lang laufende Arbeit mit `POST /v1/jobs` ein, erhalten Sie eine `id` und pollen Sie dann `GET /v1/jobs/{id}` oder empfangen Sie einen HMAC-signierten Webhook beim Abschluss. Initial unterstützte Kinds sind `intelligence.enrich.bulk`, `export.bulk`, `import.bulk` und `graph.compute`. Jeder Statusübergang trägt dieselbe `trace_id` aus G2 für die durchgängige Korrelation.

Asynchrone Jobs

Job einreihen

`POST /v1/jobs` erfordert den Scope `argus:jobs:write` auf einem Bearer-Token. Der Body muss `kind` (einer der unterstützten Job-Kinds) und `payload` enthalten. Optionale Felder sind `callback_url` (https-URL, an die die Plattform beim Abschluss signiert per POST sendet) und `secrecy_level` (Standard `standard`; der Job erbt den Wert und hebt ihn während der Ausführung niemals an). Bei Erfolg liefert die Antwort 201 mit `{ id, kind, status, created_at, trace_id }`. Der Job startet in `queued` und die `trace_id` ist derselbe Wert, der durch die G2-Antwort-Header fließt, sodass Sie das Einreihen mit der nachgelagerten Arbeit korrelieren können.

Lebenszyklus und Status

Ein Job durchläuft eine strikte Statusmaschine. Worker beanspruchen Zeilen in der Warteschlange mit `SELECT ... FOR UPDATE SKIP LOCKED`, sodass jeder Job nur an einen Worker gleichzeitig zugestellt wird. Während der Ausführung senden Worker alle 30 Sekunden einen Heartbeat und aktualisieren `progress_pct`. Ein Reaper reiht laufende Jobs, deren Heartbeat älter als 2 Minuten ist, neu ein, sodass ein abgestürzter Worker nie einen Job blockiert. Endzustände (`succeeded`, `failed`, `cancelled`) werden genau einmal geschrieben.

Idempotenz

Senden Sie `Idempotency-Key: <uuid>` (UUID v4 empfohlen) auf `POST /v1/jobs`, um den Aufruf gefahrlos wiederholbar zu machen. Innerhalb eines Zeitfensters von 24 Stunden pro Tenant geben Wiederholungen mit demselben Schlüssel 200 mit derselben `id` und demselben Body wie das ursprüngliche 201 zurück, selbst wenn die Originalanfrage Payload-Felder geändert hat. Ohne den Header dedupliziert Argus nicht; ein vorübergehender Netz-Retry könnte zweimal einreihen. Schlüssel sind pro Tenant gescoped, zwei Tenants können dieselbe UUID ohne Konflikt verwenden.

Polling oder Webhooks

Wählen Sie pro Anfrage ein Modell. Polling: lassen Sie `callback_url` weg und rufen Sie `GET /v1/jobs/{id}` auf, bis der Status `queued`/`running` verlässt, dann einmal `GET /v1/jobs/{id}/result` für die signierte URL. Webhooks: geben Sie `callback_url` an, und die Plattform signiert und POSTet das Abschlussereignis an diese URL. Polling ist der einfachste Weg für Batch-Jobs, die Sie selbst gestartet haben; Webhooks sind die richtige Wahl, wenn ein externes System das Ergebnis innerhalb von Sekunden nach Abschluss benötigt. Beide Modi nutzen denselben Datensatz und dieselbe `trace_id`.

Webhook-Payload und Signaturprüfung

Beim Abschluss POSTet Argus den kanonischen JSON-Body an Ihre `callback_url` mit den Headern `Content-Type: application/json`, `X-Argus-Trace-ID` (dieselbe trace_id wie in G2), `X-Argus-Job-ID` und `X-Argus-Signature: sha256=<hex>`. Die Signatur ist HMAC-SHA256 über den Body mit Ihrem registrierten gemeinsamen Geheimnis. Empfänger MÜSSEN die Signatur gegen die rohen Bytes des Bodys prüfen, bevor sie auf das Ereignis reagieren. Verwenden Sie einen konstantzeitigen Vergleich (`hmac.compare_digest` in Python, `crypto.timingSafeEqual` in Node.js), um Timing-Angriffe zu vermeiden. Argus wiederholt fehlgeschlagene Zustellungen bis zu 24 Stunden lang mit exponentiellem Backoff.

Abbruch

`DELETE /v1/jobs/{id}` fordert den Abbruch an und erfordert den Scope `argus:jobs:write`. Ein `queued` oder `running` Job liefert 204 und wechselt nach `cancelled` (der Worker erkennt das Flag an der nächsten Heartbeat-Grenze). Ein bereits `cancelled` Job liefert 200 (idempotenter Replay). Ein Job, der bereits einen erfolgreichen Endzustand (`succeeded` oder `failed`) erreicht hat, liefert 409: abgeschlossene Arbeit kann nicht rückgängig gemacht werden. Wenn ein Webhook registriert war, wird für einen abgebrochenen Job kein Abschluss-Webhook zugestellt.

Kinds-Katalog

Vier Job-Kinds werden im ersten Release ausgeliefert. `intelligence.enrich.bulk` reichert eine Liste von Profil-IDs gegenüber Partner-Konnektoren an. `export.bulk` erzeugt tenant-gescopte Exporte (CSV, JSONL, FHIR Bundle) und schreibt das Ergebnis nach R2; der Result-Endpunkt liefert eine kurzlebige signierte URL. `import.bulk` ingestiert ein Partner-Bundle und liefert Ergebnisse pro Zeile. `graph.compute` führt eine vorberechnete Abfrage gegen den operativen Graphen aus und liefert das materialisierte Ergebnis. Künftige Kinds sind additiv; bestehende Kinds behalten ihre Payload-Form unter SemVer.

Tenant-Isolation

Jede Abfrage der Tabelle jobs enthält `tenant_id` UND `organization_id` aus dem Bearer-Token. Tenant-übergreifende Abfragen liefern 404 (niemals 403), sodass die Existenz eines Jobs in einem anderen Tenant nie offenbart wird. Der Datensatz erbt `secrecy_level` aus der Anfrage, und Lebenszyklusübergänge erhöhen ihn nie. Jeder Statuswechsel schreibt einen Audit-Eintrag mit Benutzer, Organisation, Tenant, Aktion (`jobs.enqueue`, `jobs.start`, `jobs.complete`, `jobs.fail`, `jobs.cancel`, `jobs.read`, `jobs.list`), `resource_id = id`, Geheimhaltungsstufe und Metadaten `{ kind, status, progress_pct, trace_id }`.

Lebenszyklus und Status

queued | running | succeeded | failed | cancelled

Job einreihen

# Enqueue an async job
curl -X POST https://api.knogin.com/v1/jobs \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -H "Idempotency-Key: 8b9d8a2e-4a8c-4a7e-8a2e-4a8c4a7e8a2e" \
  -d '{
    "kind": "intelligence.enrich.bulk",
    "payload": { "profile_ids": ["p1", "p2"] },
    "callback_url": "https://partner.example.com/argus-webhook",
    "secrecy_level": "standard"
  }'

# 201 Created
# {
#   "id": "01HFXY...",
#   "kind": "intelligence.enrich.bulk",
#   "status": "queued",
#   "created_at": "2026-05-08T12:00:00Z",
#   "trace_id": "0af7651916cd43dd8448eb211c80319c"
# }

Status, Ergebnis und Webhook-Payload

# GET /v1/jobs/{id} while running
{
  "id": "01HFXY...",
  "kind": "intelligence.enrich.bulk",
  "status": "running",
  "progress_pct": 42,
  "created_at": "2026-05-08T12:00:00Z",
  "updated_at": "2026-05-08T12:00:30Z",
  "completed_at": null,
  "error": null,
  "trace_id": "0af7651916cd43dd8448eb211c80319c"
}

# GET /v1/jobs/{id}/result after success
{
  "id": "01HFXY...",
  "result_ref": "r2://argus-jobs/results/01HFXY.json",
  "signed_url": "https://r2-presigned-url..."
}

# GET /v1/jobs?kind=intelligence.enrich.bulk&status=running&limit=50
{
  "jobs": [ /* job objects */ ],
  "next_cursor": "eyJjcmVhdGVkX2F0IjoiLi4uIn0=",
  "total_count": 1234
}

Webhook-Zustellung

# Argus -> partner.example.com/argus-webhook
POST /argus-webhook HTTP/1.1
Host: partner.example.com
Content-Type: application/json
X-Argus-Trace-ID: 0af7651916cd43dd8448eb211c80319c
X-Argus-Job-ID: 01HFXY...
X-Argus-Signature: sha256=<hex of HMAC-SHA256(canonical_json_body, partner_secret)>

{
  "id": "01HFXY...",
  "kind": "intelligence.enrich.bulk",
  "status": "succeeded",
  "trace_id": "0af7651916cd43dd8448eb211c80319c",
  "result_ref": "r2://argus-jobs/results/01HFXY.json"
}

Webhook-Signatur prüfen

// Node.js (Express): verify X-Argus-Signature on the raw body
import crypto from 'node:crypto';

function verifyArgusSignature(rawBody, headerValue, sharedSecret) {
  const expectedHex = crypto
    .createHmac('sha256', sharedSecret)
    .update(rawBody)
    .digest('hex');
  const expected = Buffer.from(`sha256=${expectedHex}`, 'utf8');
  const provided = Buffer.from(headerValue ?? '', 'utf8');
  if (provided.length !== expected.length) return false;
  return crypto.timingSafeEqual(provided, expected);
}
# Python (Flask): canonical-body verification
import hmac, hashlib, json

def verify_argus_signature(raw_body: bytes, header_value: str, shared_secret: bytes) -> bool:
    expected = "sha256=" + hmac.new(shared_secret, raw_body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, header_value or "")

# Argus computes the signature over the canonical JSON body it sent:
# canonical = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
# sig = hmac.new(secret, canonical, hashlib.sha256).hexdigest()
# header = f"sha256={sig}"

Bereit, asynchrone Jobs zu integrieren?

Öffnen Sie die API-Referenz für den öffentlichen Vertrag oder sprechen Sie mit Knogin, wenn Sie Hilfe bei einem maßgeschneiderten Kind benötigen.