Transform Functions

Transform functions are optional scripts that allow users to embed custom processing logic into an ingest pipeline.

Conceptual Model

A transform function operates on each incoming record produced by the source of an ingest pipeline. Today, supported sources include Kafka, Iceberg, and object stores such as S3.

Each record is processed independently. The transform function does not maintain shared state across records.

Language and Execution Model

Transform functions currently support JavaScript only.

A transform function is executed as a JavaScript script and is invoked with the following global variables:

  • arg: The incoming data record.
  • meta: Metadata associated with the record.
  • Built-in functions: A limited set of built-in helpers is available (currently only decodeUtf8).

There is no module system or external dependency support.

Input Variables

arg

Represents the incoming data payload. Its structure depends on the source:

  • Kafka: The message payload (value)
  • Iceberg: A single table row
  • Object stores (e.g. S3): The parsed output produced by the configured parser (for example, one JSON value per line for NDJSON)

meta

Carries source-specific metadata. Examples include:

  • Kafka: key, offset, partition, and operation type
  • Object stores: file name, line number, and parser-related metadata

The contents of meta vary by source and should be treated as source-dependent.

Return Values and Pipeline Actions

The value returned by a transform function determines how the ingest pipeline processes the record:

  • Single object: Indexes a single document.
  • Array of objects: Indexes multiple documents. Internally, the array is unnested and applies the same indexing semantics to each object as if they were returned individually.
  • Empty array ([]): Indicates that the record should be ignored and no indexing action should be performed.
  • Kafka tombstones: Special delete semantics apply when ingesting Kafka tombstone records. These semantics are described in the Kafka Tombstone Support in Mach5 section.

Key _id Semantics

Ingest pipelines apply special handling to the _id field:

  • If an object includes an _id field, the pipeline performs an upsert using that _id (or a delete, if the Kafka operation type is delete).
  • If an object does not include an _id field, the record is appended and the pipeline automatically generates an _id.

Examples

Update Field Value

This transformation converts database-style timestamp objects into ISO-8601 strings so they can be efficiently indexed and queried.

Input record example:

{
  id: "event-123",
  extra: {
    inserted_date: {
      year: 2025,
      month: 3,
      day: 4,
      hour: 9,
      minute: 7,
      second: 5,
      microsecond: 123
    },
    detected_date: {
      year: 2024,
      month: 12,
      day: 31,
      hour: 23,
      minute: 59,
      second: 59,
      microsecond: 999999
    },
    last_update: {
      year: 2025,
      month: 1,
      day: 1,
      hour: 0,
      minute: 0,
      second: 0,
      microsecond: 1
    }
  }
}

Transformed output record:

{
  id: "event-123",
  extra: {
    inserted_date: "2025-03-04T09:07:05.000123",
    detected_date: "2024-12-31T23:59:59.999999",
    last_update: "2025-01-01T00:00:00.000001"
  }
}

JS transform - for fields inserted_date, detected_date and last_update, update the value to ISO date format:

function toIsoString(obj) {
  // safety check
  if (!obj || typeof obj !== "object") return undefined;

  let year = obj.year.toString().padStart(4, "0");
  let month = obj.month.toString().padStart(2, "0");
  let day = obj.day.toString().padStart(2, "0");
  let hour = obj.hour.toString().padStart(2, "0");
  let minute = obj.minute.toString().padStart(2, "0");
  let second = obj.second.toString().padStart(2, "0");

  // ensure 6 digits
  let microsecond = obj.microsecond.toString().padStart(6, "0");

  return `${year}-${month}-${day}T${hour}:${minute}:${second}.${microsecond}`;
}

["inserted_date", "detected_date", "last_update"].forEach(key => {
  if (arg["extra"][key]) {
    arg["extra"][key] = toIsoString(arg["extra"][key]);
  }
});

arg

Drop/Ignore Record

This transform drops records that are to be skipped.

Input record example (csv input records):

version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status
2,44010291029,0a4fa7sda8d7ad,10.0.0.1,10.0.0.3,40658,10009,17,1,303,1755475131000,1755475203000,ACCEPT,OK
2,44010291030,-,-,-,-,-,-,-,-,1755475350,1755475381,-,NODATA
2,44010291031,-,-,-,-,-,-,-,-,1755475350,1755475381,-,SKIPDATA

JS transform to ignore NODATA and SKIPDATA records.

if (arg["log-status"] === "SKIPDATA" || arg["log-status"] === "NODATA") {
  //empty array to ignore the record
  arg = [];
}

arg

Insert _id Field for Upsert Operation

Upsert operation requires the ingest record to contain the _id field. If the record does not have the _id field then one can be inserted using the JS transform.

Input record example:

{
  "source_key_hash": "8f3c1a2b9d4e7f6a0c5b2e1d9a8f7c6b",
  "ingest_date": "2025-03-04T09:07:05.000123",
  "indextype": "emailhash"
}

Transformed output record:

{
  "source_key_hash": "8f3c1a2b9d4e7f6a0c5b2e1d9a8f7c6b",
  "ingest_date": "2025-03-04T09:07:05.000123",
  "indextype": "emailhash",
  "_id": "emailhash_8f3c1a2b9d4e7f6a0c5b2e1d9a8f7c6b"
}

JS transform to add _id field

if (arg["indextype"] && arg["source_key_hash"]) {
    // Generate deterministic _id
    arg["_id"] = arg["indextype"] + "_" + arg["source_key_hash"];
}

arg