[Długotrwałe zadania, podpisana dostawa]

Joby asynchroniczne

Kolejkuj długotrwałe zadania przez `POST /v1/jobs`, otrzymuj `id`, a następnie pobieraj status z `GET /v1/jobs/{id}` lub odbieraj webhook podpisany HMAC po zakończeniu. Początkowo wspierane kindy to `intelligence.enrich.bulk`, `export.bulk`, `import.bulk` i `graph.compute`. Każde przejście stanu niesie ten sam `trace_id` z G2 dla korelacji end-to-end.

Joby asynchroniczne

Kolejkowanie joba

`POST /v1/jobs` wymaga scope `argus:jobs:write` na bearer tokenie. Body musi zawierać `kind` (jeden z wspieranych kindów) i `payload`. Pola opcjonalne to `callback_url` (URL https, na który platforma wykona podpisany POST po zakończeniu) i `secrecy_level` (domyślnie `standard`; job dziedziczy wartość i nigdy nie podnosi jej w trakcie wykonania). W razie sukcesu odpowiedź to 201 z `{ id, kind, status, created_at, trace_id }`. Job startuje w `queued`, a `trace_id` jest tą samą wartością, która przepływa przez nagłówki odpowiedzi G2, więc możesz skorelować skolejkowanie z pracą downstream.

Cykl życia i statusy

Job przechodzi przez ścisłą maszynę stanów. Workery zajmują wiersze w kolejce z `SELECT ... FOR UPDATE SKIP LOCKED`, dzięki czemu każdy job trafia tylko do jednego workera naraz. W trakcie wykonania workery wysyłają heartbeat co 30 sekund i aktualizują `progress_pct`. Reaper przywraca do kolejki running joby, których heartbeat jest starszy niż 2 minuty, więc upadek workera nigdy nie zablokuje joba. Stany końcowe (`succeeded`, `failed`, `cancelled`) zapisywane są dokładnie raz.

Idempotencja

Wyślij `Idempotency-Key: <uuid>` (zalecany UUID v4) w `POST /v1/jobs`, aby uczynić wywołanie bezpiecznie powtarzalnym. W oknie 24 godzin per tenant powtórzenia z tym samym kluczem zwracają 200 z tym samym `id` i body co oryginalna 201, nawet jeśli oryginalne żądanie zmieniło pola payload. Bez nagłówka Argus nie deduplikuje; przejściowy retry sieciowy mógłby zakolejkować dwukrotnie. Klucze są scoped per tenant, dwa tenanty mogą używać tego samego UUID bez konfliktu.

Polling lub webhooki

Wybierz model per żądanie. Polling: pomiń `callback_url` i wywołuj `GET /v1/jobs/{id}` aż status opuści `queued`/`running`, potem wywołaj raz `GET /v1/jobs/{id}/result` po podpisany URL. Webhooki: dołącz `callback_url`, a platforma podpisze i wykona POST zdarzenia ukończenia na ten URL. Polling to najprostsza ścieżka dla batchy, które sam zainicjowałeś; webhooki są właściwym wyborem, gdy zewnętrzny system potrzebuje wyniku w ciągu sekund od ukończenia. Oba tryby używają tego samego rekordu i tego samego `trace_id`.

Payload webhooka i weryfikacja podpisu

Po ukończeniu Argus wykonuje POST kanonicznego JSON body na twój `callback_url` z nagłówkami `Content-Type: application/json`, `X-Argus-Trace-ID` (ten sam trace_id co w G2), `X-Argus-Job-ID` i `X-Argus-Signature: sha256=<hex>`. Podpis to HMAC-SHA256 nad body z twoim zarejestrowanym współdzielonym sekretem. Odbiorcy MUSZĄ zweryfikować podpis względem surowych bajtów body, zanim podejmą akcję. Używaj porównania w stałym czasie (`hmac.compare_digest` w Pythonie, `crypto.timingSafeEqual` w Node.js), aby uniknąć ataków czasowych. Argus ponawia nieudane dostawy z wykładniczym backoffem do 24 godzin.

Anulowanie

`DELETE /v1/jobs/{id}` żąda anulowania i wymaga scope `argus:jobs:write`. Job `queued` lub `running` zwraca 204 i przechodzi do `cancelled` (worker zauważa flagę przy najbliższej granicy heartbeatu). Job już `cancelled` zwraca 200 (idempotentny replay). Job, który osiągnął końcowy sukces (`succeeded` lub `failed`), zwraca 409: ukończonej pracy nie można cofnąć. Jeśli zarejestrowano webhook, dla anulowanego joba nie jest dostarczany webhook ukończenia.

Katalog kindów

Cztery kindy jobów dostarczamy w pierwszym wydaniu. `intelligence.enrich.bulk` wzbogaca listę profile ID o dane z konektorów partnerskich. `export.bulk` wytwarza eksporty scoped per tenant (CSV, JSONL, FHIR Bundle) i zapisuje wynik do R2; endpoint wyniku zwraca krótko żyjący podpisany URL. `import.bulk` przyjmuje bundle dostarczony przez partnera i zwraca wyniki per wiersz. `graph.compute` uruchamia prekomputowane zapytanie po grafie operacyjnym i zwraca zmaterializowany wynik. Przyszłe kindy są addytywne; istniejące kindy zachowują kształt payload pod semver.

Izolacja per tenant

Każde zapytanie tabeli jobs zawiera `tenant_id` ORAZ `organization_id` z bearer tokena. Wyszukiwania cross-tenant zwracają 404 (nigdy 403), więc istnienie joba w innym tenancie nigdy nie jest ujawniane. Rekord dziedziczy `secrecy_level` z żądania i przejścia cyklu życia nigdy go nie podnoszą. Każda zmiana stanu zapisuje wpis audytu z użytkownikiem, organizacją, tenantem, akcją (`jobs.enqueue`, `jobs.start`, `jobs.complete`, `jobs.fail`, `jobs.cancel`, `jobs.read`, `jobs.list`), `resource_id = id`, poziomem tajności i metadanymi `{ kind, status, progress_pct, trace_id }`.

Cykl życia i statusy

queued | running | succeeded | failed | cancelled

Zakolejkować joba

# Enqueue an async job
curl -X POST https://api.knogin.com/v1/jobs \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -H "Idempotency-Key: 8b9d8a2e-4a8c-4a7e-8a2e-4a8c4a7e8a2e" \
  -d '{
    "kind": "intelligence.enrich.bulk",
    "payload": { "profile_ids": ["p1", "p2"] },
    "callback_url": "https://partner.example.com/argus-webhook",
    "secrecy_level": "standard"
  }'

# 201 Created
# {
#   "id": "01HFXY...",
#   "kind": "intelligence.enrich.bulk",
#   "status": "queued",
#   "created_at": "2026-05-08T12:00:00Z",
#   "trace_id": "0af7651916cd43dd8448eb211c80319c"
# }

Status, wynik i payload webhooka

# GET /v1/jobs/{id} while running
{
  "id": "01HFXY...",
  "kind": "intelligence.enrich.bulk",
  "status": "running",
  "progress_pct": 42,
  "created_at": "2026-05-08T12:00:00Z",
  "updated_at": "2026-05-08T12:00:30Z",
  "completed_at": null,
  "error": null,
  "trace_id": "0af7651916cd43dd8448eb211c80319c"
}

# GET /v1/jobs/{id}/result after success
{
  "id": "01HFXY...",
  "result_ref": "r2://argus-jobs/results/01HFXY.json",
  "signed_url": "https://r2-presigned-url..."
}

# GET /v1/jobs?kind=intelligence.enrich.bulk&status=running&limit=50
{
  "jobs": [ /* job objects */ ],
  "next_cursor": "eyJjcmVhdGVkX2F0IjoiLi4uIn0=",
  "total_count": 1234
}

Dostarczenie webhooka

# Argus -> partner.example.com/argus-webhook
POST /argus-webhook HTTP/1.1
Host: partner.example.com
Content-Type: application/json
X-Argus-Trace-ID: 0af7651916cd43dd8448eb211c80319c
X-Argus-Job-ID: 01HFXY...
X-Argus-Signature: sha256=<hex of HMAC-SHA256(canonical_json_body, partner_secret)>

{
  "id": "01HFXY...",
  "kind": "intelligence.enrich.bulk",
  "status": "succeeded",
  "trace_id": "0af7651916cd43dd8448eb211c80319c",
  "result_ref": "r2://argus-jobs/results/01HFXY.json"
}

Zweryfikować podpis webhooka

// Node.js (Express): verify X-Argus-Signature on the raw body
import crypto from 'node:crypto';

function verifyArgusSignature(rawBody, headerValue, sharedSecret) {
  const expectedHex = crypto
    .createHmac('sha256', sharedSecret)
    .update(rawBody)
    .digest('hex');
  const expected = Buffer.from(`sha256=${expectedHex}`, 'utf8');
  const provided = Buffer.from(headerValue ?? '', 'utf8');
  if (provided.length !== expected.length) return false;
  return crypto.timingSafeEqual(provided, expected);
}
# Python (Flask): canonical-body verification
import hmac, hashlib, json

def verify_argus_signature(raw_body: bytes, header_value: str, shared_secret: bytes) -> bool:
    expected = "sha256=" + hmac.new(shared_secret, raw_body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, header_value or "")

# Argus computes the signature over the canonical JSON body it sent:
# canonical = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
# sig = hmac.new(secret, canonical, hashlib.sha256).hexdigest()
# header = f"sha256={sig}"

Gotowy podpiąć joby asynchroniczne?

Otwórz referencję API dla publicznego kontraktu lub skontaktuj się z Knogin, jeśli potrzebujesz pomocy z definicją niestandardowego kind.