Overview#
Raw data arriving from an external connector rarely arrives in a format the platform can use directly. A customs database export might use national identifier formats that need normalisation. A partner nation's intelligence feed might encode dates and locations differently from the platform's internal schema. The Ingestion Pipeline domain takes that raw data and moves it through a structured multi-stage process: validate the structure, extract the relevant fields, normalise the values, map them to internal schemas, enrich with cross-references, and mark complete. Each stage is tracked, retried on failure, and visible to operators monitoring queue health.
The pipeline layer sits between raw data connectors and the platform's entity and investigation data. It ensures that whatever enters the platform is consistently structured and traceable back to its source payload.
Key Features#
- Normalisation job submission with configurable retry limits
- Multi-stage pipeline processing: validation, extraction, normalisation, mapping, enrichment, and completion
- Job status tracking: pending, running, retrying, completed, and failed
- Queue depth monitoring for operational visibility into pipeline throughput
- Manual job processing and retry capabilities for operator intervention
- Correlation key tracking for linking related data items across pipeline runs
- Raw and normalised payload preservation with processing metrics
Use Cases#
- Normalising external data from various connectors into standardised platform formats before entity creation
- Monitoring ingestion queue health and processing throughput to identify bottlenecks or failures
- Retrying failed normalisation jobs after transient errors in source systems or enrichment services
- Tracking data through multi-stage processing pipelines with full visibility into each stage's outcome
Industry Context#
National intelligence agencies ingest partner data through normalisation pipelines that convert foreign identifier formats and classification markings into the platform's internal schema. Border control systems normalise passport and travel document records from multiple international formats before watchlist matching. Utility operators normalise telemetry alarm data from different sensor manufacturers through standardised pipeline stages before routing to work order creation. Financial crime units normalise transaction records from multiple banking systems with different date, currency, and identifier conventions into a unified analytical format.
Integration#
The Ingestion Pipeline domain integrates with Connector Registry for data connectors, Data Source Integration for source management, and Entity Resolution for deduplication of normalised records. Pipeline state is persisted in PostgreSQL with organisation-scoped access control.
Open Standards#
- GraphQL (June 2018 specification): The entire pipeline API surface, including job submission, status queries, dead-letter queue operations, and lineage queries, is exposed exclusively via a Strawberry-GraphQL schema with structural type checking enforced at compile time.
- W3C PROV-DM (Provenance Data Model, W3C Recommendation 2013): Every ingestion event records a
prov:ActivitywithwasGeneratedBy,wasAttributedTo, andwasDerivedFromrelationships in PostgreSQL, giving a full audit trail from raw payload to normalised entity. - OpenLineage Specification (Linux Foundation): Pipeline runs emit
START,COMPLETE, andFAILRunEventrecords referencing named connector datasets, enabling downstream lineage DAG queries and data-flow observability. - STIX 2.1 / NIEM / OASIS CAP v1.2: Connectors feeding the pipeline adopt one of these open standards as their primary extraction contract; the ingestion service accepts validated open-standard DTOs before applying structural type mapping.
- RFC 5322 (Internet Message Format): The entity extractor applies an RFC 5322-compliant pattern to detect and normalise email addresses extracted from raw payloads for correlation key generation.
- ITU-T E.164 (International Public Telecommunication Numbering Plan): Phone numbers extracted from ingested payloads are normalised to E.164 digit format before being stored as correlation keys.
- ISO 8601 (Date and Time Representation): Timestamps injected at ingestion and extracted from payload fields are validated and stored in ISO 8601 format; the schema mapper coerces heterogeneous date strings to this format during normalisation.
- STANAG 4774 (Confidentiality Metadata Label Syntax): A post-ingestion classification hook applies STANAG 4774 sensitivity labels to normalised entities; failures in this hook are isolated and never interrupt pipeline completion.
Last Reviewed: 2026-02-05 Last Updated: 2026-04-14