[Developers]

Data Ingestion Pipeline

A critical infrastructure operator connects a new threat intelligence feed to the platform. The feed delivers JSON over a REST API, uses field names that differ from the Argus schema, contains duplicate records for entit

Category: Data IntegrationLast Updated: Feb 5, 2026
data-integrationaireal-timegeospatial

Overview#

A critical infrastructure operator connects a new threat intelligence feed to the platform. The feed delivers JSON over a REST API, uses field names that differ from the Argus schema, contains duplicate records for entities already in the system, and occasionally delivers records with missing required fields. If any of those issues causes an unhandled failure, the entire feed stops ingesting until someone notices and investigates. If duplicates are accepted silently, analysts start seeing the same entity appear multiple times in search results and cannot tell which record is authoritative.

The Argus Data Ingestion Pipeline is the entry point that handles all of this: schema mapping, entity extraction, validation, deduplication, conflict detection, and dead-letter handling for failed records. It supports file uploads (CSV, JSON, XML), API integrations, database connections, Kafka Streams for real-time streaming, and scheduled synchronisation jobs. Apache NiFi manages the data flow between stages. Everything that enters the platform goes through this pipeline before reaching PostgreSQL. For intelligence agencies, law enforcement, financial crime units, and healthcare data controllers, a reliable ingestion layer is what makes all downstream analysis trustworthy.

Key Features#

  • Multi-Source Ingestion: Import data from file uploads (CSV, JSON, XML), API integrations, database connections, Kafka Streams for real-time streaming, and scheduled pull jobs. The same pipeline handles batch and streaming inputs.
  • Visual Schema Mapping: Map source fields to destination schemas with a visual interface supporting type conversion, validation rules, default values, and conditional logic. Non-standard source schemas are mapped without code changes.
  • Entity Extraction: Automatically extract named entities, detect relationships, link to existing records, and assign confidence scores during ingestion. Entities identified in free-text narrative fields are extracted alongside structured fields.
  • Deduplication: Identify and handle duplicate records during import to prevent redundant data from entering the system and fragmenting the picture of an entity.
  • Dead Letter Queue: Capture failed records in a review queue for investigation and reprocessing without blocking the overall pipeline. The rest of the batch continues while problem records wait for attention.
  • Incremental Updates: Support incremental synchronisation with external databases to process only changed records on a configurable schedule, reducing load on source systems.
  • Conflict Management: Detect and resolve conflicts when ingested data contradicts existing records, with configurable resolution strategies including source trust ranking and field-level precedence rules.
  • Progress Tracking: Monitor import job status in real-time via GraphQL subscriptions with detailed metrics on processed, successful, and failed records. Long-running jobs report progress without polling.
  • Validation and Error Reporting: Validate every record against schema rules and business logic, with categorised error reporting for missing fields, type mismatches, format errors, and constraint violations.
  • Rollback Capability: Reverse completed import jobs if issues are discovered after processing, restoring the system to its pre-import state. Useful for discovering a mapping error after a large batch has been committed.

Use Cases#

  • Threat Intelligence Feed Integration: Connect MISP, STIX/TAXII, MWDB, and commercial threat feeds as continuous ingestion sources. Entity extraction links indicators to known entities automatically. Duplicates from overlapping feeds are resolved rather than accumulated.
  • External Database Synchronisation: Configure ongoing synchronisation with partner databases using scheduled incremental updates, conflict resolution, and health monitoring to keep data current without manual coordination.
  • Historical Data Migration: Migrate large volumes of historical records with progress tracking, error recovery, validation reporting, and rollback capability to safely onboard legacy datasets.
  • Multi-Source Data Consolidation: Combine data from multiple file formats and API sources into a unified data model with entity linking and deduplication across sources.
  • Real-Time Streaming Ingestion: Process live data streams via Kafka with continuous validation, entity extraction, and automatic correlation to support time-sensitive operational workflows. Updates are visible to subscribers via GraphQL subscriptions as they arrive.

Integration#

The Data Ingestion Pipeline connects with the platform's entity resolution, data quality monitoring, schema management, and integration hub modules. Apache NiFi manages data flow between pipeline stages. Real-time streaming uses Kafka Streams. All validated records are written to PostgreSQL as the primary data store, with organisation scoping enforced at the ingestion layer.

Open Standards#

  • STIX 2.1 (OASIS Structured Threat Information eXpression): The ingestion pipeline's two-phase connector architecture requires connectors to deliver Phase 1 output as validated STIX 2.1 object graphs before any Argus type mapping is applied. Threat indicators, malware records, attack patterns, and vulnerability data from MISP, TAXII feeds, CISA KEV, and NVD enter the pipeline as STIX 2.1 bundles validated against the OASIS JSON schemas.
  • TAXII 2.1 (OASIS Trusted Automated eXchange of Intelligence Information): The STIX connector transport layer communicates with TAXII 2.1 collection endpoints to pull threat intelligence bundles. The taxii_21_client implements the TAXII 2.1 REST API for collection discovery, manifest retrieval, and paginated object fetching.
  • NIEM 6.0 (National Information Exchange Model): Cross-domain connectors for justice, emergency management, and defence data sources produce NIEM-conformant JSON-LD as their Phase 1 output contract. Person, organisation, incident, and case management payloads are serialised against NIEM namespaces before structural type mapping in Phase 2.
  • W3C PROV-DM (Provenance Data Model): Every ingestion job emits a W3C PROV-DM provenance record capturing the prov:Entity (the ingested record), the prov:Activity (the normalisation job), and the prov:Agent (the connector and acting user). Provenance records are serialised as PROV-JSON and stored in PostgreSQL with organization_id scoping.
  • OpenLineage Specification (Linux Foundation): The pipeline emits OpenLineage RunEvent payloads at job START, COMPLETE, and FAIL transitions, identifying each pipeline stage as a named Job and each data asset as a Dataset. This provides interoperable lineage metadata compatible with any OpenLineage-aware data observability tool.
  • Apache Kafka Streams: Real-time streaming ingestion uses the Apache Kafka wire protocol and Streams API. Connectors publishing to Kafka topics feed the pipeline continuously; the kafka_streams_client subscribes, deserialises, and enqueues normalisation jobs with the same validation and deduplication path as batch sources.
  • RFC 4180 (CSV) / JSON / XML: File upload ingestion accepts CSV (per RFC 4180), JSON, and XML as primary file formats. The schema mapper performs structural normalisation on all three formats, and the CSV writer used for export neutralises formula-injection characters before serialisation.

Last Reviewed: 2026-02-05 Last Updated: 2026-04-14

Ready to Build?

Get started with our APIs or contact our integration team for support.