Transform Functions
Transform functions are optional scripts that allow users to embed custom processing logic into an ingest pipeline.
Conceptual Model
A transform function operates on each incoming record produced by the source of an ingest pipeline. Today, supported sources include Kafka, Iceberg, and object stores such as S3.
Each record is processed independently. The transform function does not maintain shared state across records.
Language and Execution Model
Transform functions currently support JavaScript only.
A transform function is executed as a JavaScript script and is invoked with the following global variables:
arg: The incoming data record.meta: Metadata associated with the record.- Built-in functions: A limited set of built-in helpers is available (currently only
decodeUtf8).
There is no module system or external dependency support.
Input Variables
arg
Represents the incoming data payload. Its structure depends on the source:
- Kafka: The message payload (value)
- Iceberg: A single table row
- Object stores (e.g. S3): The parsed output produced by the configured parser (for example, one JSON value per line for NDJSON)
meta
Carries source-specific metadata. Examples include:
- Kafka: key, offset, partition, and operation type
- Object stores: file name, line number, and parser-related metadata
The contents of meta vary by source and should be treated as source-dependent.
Return Values and Pipeline Actions
The value returned by a transform function determines how the ingest pipeline processes the record:
- Single object: Indexes a single document.
- Array of objects: Indexes multiple documents. Internally, the array is unnested and applies the same indexing semantics to each object as if they were returned individually.
- Empty array (
[]): Indicates that the record should be ignored and no indexing action should be performed. - Kafka tombstones: Special delete semantics apply when ingesting Kafka tombstone records. These semantics are described in the Kafka Tombstone Support in Mach5 section.
Key _id Semantics
Ingest pipelines apply special handling to the _id field:
- If an object includes an
_idfield, the pipeline performs an upsert using that_id(or a delete, if the Kafka operation type is delete). - If an object does not include an
_idfield, the record is appended and the pipeline automatically generates an_id.
Examples
Update Field Value
This transformation converts database-style timestamp objects into ISO-8601 strings so they can be efficiently indexed and queried.
Input record example:
{
id: "event-123",
extra: {
inserted_date: {
year: 2025,
month: 3,
day: 4,
hour: 9,
minute: 7,
second: 5,
microsecond: 123
},
detected_date: {
year: 2024,
month: 12,
day: 31,
hour: 23,
minute: 59,
second: 59,
microsecond: 999999
},
last_update: {
year: 2025,
month: 1,
day: 1,
hour: 0,
minute: 0,
second: 0,
microsecond: 1
}
}
}
Transformed output record:
{
id: "event-123",
extra: {
inserted_date: "2025-03-04T09:07:05.000123",
detected_date: "2024-12-31T23:59:59.999999",
last_update: "2025-01-01T00:00:00.000001"
}
}
JS transform - for fields inserted_date, detected_date and last_update, update the value to ISO date format:
function toIsoString(obj) {
// safety check
if (!obj || typeof obj !== "object") return undefined;
let year = obj.year.toString().padStart(4, "0");
let month = obj.month.toString().padStart(2, "0");
let day = obj.day.toString().padStart(2, "0");
let hour = obj.hour.toString().padStart(2, "0");
let minute = obj.minute.toString().padStart(2, "0");
let second = obj.second.toString().padStart(2, "0");
// ensure 6 digits
let microsecond = obj.microsecond.toString().padStart(6, "0");
return `${year}-${month}-${day}T${hour}:${minute}:${second}.${microsecond}`;
}
["inserted_date", "detected_date", "last_update"].forEach(key => {
if (arg["extra"][key]) {
arg["extra"][key] = toIsoString(arg["extra"][key]);
}
});
arg
Drop/Ignore Record
This transform drops records that are to be skipped.
Input record example (csv input records):
version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status
2,44010291029,0a4fa7sda8d7ad,10.0.0.1,10.0.0.3,40658,10009,17,1,303,1755475131000,1755475203000,ACCEPT,OK
2,44010291030,-,-,-,-,-,-,-,-,1755475350,1755475381,-,NODATA
2,44010291031,-,-,-,-,-,-,-,-,1755475350,1755475381,-,SKIPDATA
JS transform to ignore NODATA and SKIPDATA records.
if (arg["log-status"] === "SKIPDATA" || arg["log-status"] === "NODATA") {
//empty array to ignore the record
arg = [];
}
arg
Insert _id Field for Upsert Operation
Upsert operation requires the ingest record to contain the _id field. If the record does not have the _id field then one can be inserted using the JS transform.
Input record example:
{
"source_key_hash": "8f3c1a2b9d4e7f6a0c5b2e1d9a8f7c6b",
"ingest_date": "2025-03-04T09:07:05.000123",
"indextype": "emailhash"
}
Transformed output record:
{
"source_key_hash": "8f3c1a2b9d4e7f6a0c5b2e1d9a8f7c6b",
"ingest_date": "2025-03-04T09:07:05.000123",
"indextype": "emailhash",
"_id": "emailhash_8f3c1a2b9d4e7f6a0c5b2e1d9a8f7c6b"
}
JS transform to add _id field
if (arg["indextype"] && arg["source_key_hash"]) {
// Generate deterministic _id
arg["_id"] = arg["indextype"] + "_" + arg["source_key_hash"];
}
arg