[Travail long, livraison signée]

Jobs asynchrones

Mettez en file d’attente du travail long avec `POST /v1/jobs`, récupérez un `id`, puis faites du polling sur `GET /v1/jobs/{id}` ou recevez un webhook signé HMAC à la fin. Les kinds initialement supportés sont `intelligence.enrich.bulk`, `export.bulk`, `import.bulk` et `graph.compute`. Chaque transition d’état porte le même `trace_id` que G2 pour une corrélation de bout en bout.

Jobs asynchrones

Mettre un job en file

`POST /v1/jobs` exige le scope `argus:jobs:write` sur un bearer token. Le corps doit inclure `kind` (l’un des kinds supportés) et `payload`. Les champs optionnels sont `callback_url` (URL https que la plateforme signera et appellera en POST à la complétion) et `secrecy_level` (par défaut `standard`; le job hérite de la valeur et ne l’élève jamais en cours d’exécution). En cas de succès, la réponse est 201 avec `{ id, kind, status, created_at, trace_id }`. Le job démarre en `queued` et le `trace_id` est le même que celui qui circule dans les en-têtes de réponse G2, ce qui permet de corréler la mise en file avec le travail en aval.

Cycle de vie et statuts

Un job parcourt une machine d’états stricte. Les workers réclament les lignes en file avec `SELECT ... FOR UPDATE SKIP LOCKED`, ainsi chaque job n’est livré qu’à un seul worker à la fois. Pendant l’exécution, les workers émettent un heartbeat toutes les 30 secondes et mettent `progress_pct` à jour. Un reaper remet en file les jobs en cours dont le heartbeat date de plus de 2 minutes, ainsi un worker tombé ne bloque jamais un job. Les états terminaux (`succeeded`, `failed`, `cancelled`) sont écrits une seule fois.

Idempotence

Envoyez `Idempotency-Key: <uuid>` (UUID v4 recommandé) sur `POST /v1/jobs` pour rendre l’appel rejouable sans risque. Dans une fenêtre de 24 heures par tenant, les rejeux de la même clé renvoient 200 avec le même `id` et le même corps que la 201 d’origine, même si la requête originale a changé certains champs du payload. Sans l’en-tête, Argus ne déduplique pas; une retransmission réseau peut mettre en file deux fois. Les clés sont scopées par tenant, donc deux tenants peuvent utiliser le même UUID sans interférer.

Polling ou webhooks

Choisissez un modèle par requête. Polling: omettez `callback_url` et appelez `GET /v1/jobs/{id}` jusqu’à ce que le statut quitte `queued`/`running`, puis appelez `GET /v1/jobs/{id}/result` une fois pour l’URL signée. Webhooks: incluez `callback_url` et la plateforme signera et POSTera l’événement de complétion à cette URL. Le polling est la voie la plus simple pour les batchs que vous avez vous-même lancés; les webhooks sont le bon choix quand un système externe a besoin du résultat dans les secondes qui suivent la complétion. Les deux modes utilisent le même enregistrement et le même `trace_id`.

Payload de webhook et vérification de signature

À la complétion, Argus POSTe le corps JSON canonique vers votre `callback_url` avec les en-têtes `Content-Type: application/json`, `X-Argus-Trace-ID` (le même trace_id que G2), `X-Argus-Job-ID` et `X-Argus-Signature: sha256=<hex>`. La signature est un HMAC-SHA256 du corps avec votre secret partagé enregistré. Les destinataires DOIVENT vérifier la signature sur les octets bruts du corps avant d’agir sur l’événement. Utilisez une comparaison à temps constant (`hmac.compare_digest` en Python, `crypto.timingSafeEqual` en Node.js) pour éviter les attaques temporelles. Argus réessaie les livraisons en échec avec backoff exponentiel pendant 24 heures maximum.

Annulation

`DELETE /v1/jobs/{id}` demande l’annulation et exige le scope `argus:jobs:write`. Un job `queued` ou `running` renvoie 204 et passe à `cancelled` (le worker observe le drapeau au prochain boundary de heartbeat). Un job déjà `cancelled` renvoie 200 (replay idempotent). Un job ayant déjà atteint un statut terminal de succès (`succeeded` ou `failed`) renvoie 409: le travail terminé ne peut pas être défait. Si un webhook était enregistré, aucun webhook de complétion n’est livré pour un job annulé.

Catalogue des kinds

Quatre kinds sont livrés dans la première version. `intelligence.enrich.bulk` enrichit une liste de profile IDs auprès des connecteurs partenaires. `export.bulk` produit des exports scopés au tenant (CSV, JSONL, FHIR Bundle) et écrit le résultat dans R2; l’endpoint de résultat renvoie une URL signée de courte durée. `import.bulk` ingère un bundle fourni par un partenaire et renvoie les résultats par ligne. `graph.compute` exécute une requête précomputée sur le graphe opérationnel et renvoie le résultat matérialisé. Les futurs kinds sont additifs; les kinds existants conservent leur forme de payload sous semver.

Isolation par tenant

Chaque requête sur la table jobs inclut `tenant_id` ET `organization_id` issus du bearer token. Les recherches inter-tenant renvoient 404 (jamais 403), si bien que l’existence d’un job d’un autre tenant n’est jamais divulguée. L’enregistrement hérite de `secrecy_level` de la requête et les transitions de cycle de vie ne l’élèvent jamais. Chaque changement d’état écrit une entrée d’audit avec utilisateur, organisation, tenant, action (`jobs.enqueue`, `jobs.start`, `jobs.complete`, `jobs.fail`, `jobs.cancel`, `jobs.read`, `jobs.list`), `resource_id = id`, niveau de secret et métadonnées `{ kind, status, progress_pct, trace_id }`.

Cycle de vie et statuts

queued | running | succeeded | failed | cancelled

Mettre un job en file

# Enqueue an async job
curl -X POST https://api.knogin.com/v1/jobs \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -H "Idempotency-Key: 8b9d8a2e-4a8c-4a7e-8a2e-4a8c4a7e8a2e" \
  -d '{
    "kind": "intelligence.enrich.bulk",
    "payload": { "profile_ids": ["p1", "p2"] },
    "callback_url": "https://partner.example.com/argus-webhook",
    "secrecy_level": "standard"
  }'

# 201 Created
# {
#   "id": "01HFXY...",
#   "kind": "intelligence.enrich.bulk",
#   "status": "queued",
#   "created_at": "2026-05-08T12:00:00Z",
#   "trace_id": "0af7651916cd43dd8448eb211c80319c"
# }

Statut, résultat et payload de webhook

# GET /v1/jobs/{id} while running
{
  "id": "01HFXY...",
  "kind": "intelligence.enrich.bulk",
  "status": "running",
  "progress_pct": 42,
  "created_at": "2026-05-08T12:00:00Z",
  "updated_at": "2026-05-08T12:00:30Z",
  "completed_at": null,
  "error": null,
  "trace_id": "0af7651916cd43dd8448eb211c80319c"
}

# GET /v1/jobs/{id}/result after success
{
  "id": "01HFXY...",
  "result_ref": "r2://argus-jobs/results/01HFXY.json",
  "signed_url": "https://r2-presigned-url..."
}

# GET /v1/jobs?kind=intelligence.enrich.bulk&status=running&limit=50
{
  "jobs": [ /* job objects */ ],
  "next_cursor": "eyJjcmVhdGVkX2F0IjoiLi4uIn0=",
  "total_count": 1234
}

Livraison de webhook

# Argus -> partner.example.com/argus-webhook
POST /argus-webhook HTTP/1.1
Host: partner.example.com
Content-Type: application/json
X-Argus-Trace-ID: 0af7651916cd43dd8448eb211c80319c
X-Argus-Job-ID: 01HFXY...
X-Argus-Signature: sha256=<hex of HMAC-SHA256(canonical_json_body, partner_secret)>

{
  "id": "01HFXY...",
  "kind": "intelligence.enrich.bulk",
  "status": "succeeded",
  "trace_id": "0af7651916cd43dd8448eb211c80319c",
  "result_ref": "r2://argus-jobs/results/01HFXY.json"
}

Vérifier la signature de webhook

// Node.js (Express): verify X-Argus-Signature on the raw body
import crypto from 'node:crypto';

function verifyArgusSignature(rawBody, headerValue, sharedSecret) {
  const expectedHex = crypto
    .createHmac('sha256', sharedSecret)
    .update(rawBody)
    .digest('hex');
  const expected = Buffer.from(`sha256=${expectedHex}`, 'utf8');
  const provided = Buffer.from(headerValue ?? '', 'utf8');
  if (provided.length !== expected.length) return false;
  return crypto.timingSafeEqual(provided, expected);
}
# Python (Flask): canonical-body verification
import hmac, hashlib, json

def verify_argus_signature(raw_body: bytes, header_value: str, shared_secret: bytes) -> bool:
    expected = "sha256=" + hmac.new(shared_secret, raw_body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, header_value or "")

# Argus computes the signature over the canonical JSON body it sent:
# canonical = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
# sig = hmac.new(secret, canonical, hashlib.sha256).hexdigest()
# header = f"sha256={sig}"

Prêt à câbler des jobs asynchrones ?

Ouvrez la référence API pour le contrat public, ou contactez Knogin si vous avez besoin d’aide pour définir un kind sur mesure.