Lindorm uses Debezium Format V2.0 to represent operation records generated when data in subscribed tables is inserted, updated, or deleted. Configure your consumer client to parse these records based on the format described in this topic.
Message structure
Each operation record is a JSON object with two top-level fields: payload and schema.
{
"payload": {
"op": "u",
"ts_ms": 1465491411815,
"before": {
"id": 1004,
"name": "Jane"
},
"after": {
"id": 1004,
"name": "Anne"
},
"source": {
"version": "v1.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1465491411807
}
},
"schema": {
"type": "struct",
"fields": [
{ "type": "string", "optional": false, "field": "op" },
{ "type": "int64", "optional": false, "field": "ts_ms" },
{
"type": "struct",
"fields": [
{ "type": "int32", "optional": false, "field": "id" },
{ "type": "string", "optional": false, "field": "name" }
],
"optional": true,
"field": "before"
},
{
"type": "struct",
"fields": [
{ "type": "int32", "optional": false, "field": "id" },
{ "type": "string", "optional": false, "field": "name" }
],
"optional": true,
"field": "after"
},
{
"type": "struct",
"fields": [
{ "type": "string", "optional": false, "field": "version" },
{ "type": "string", "optional": false, "field": "db" },
{ "type": "string", "optional": false, "field": "namespace" },
{ "type": "string", "optional": false, "field": "table" },
{ "type": "int64", "optional": false, "field": "ts_ms" }
],
"optional": false,
"field": "source"
}
],
"optional": false
}
}Payload fields
The payload field carries the operation data.
| Field | Data type | Description |
|---|---|---|
op | string | Operation type: c = insert, u = update, d = delete, r = full export (not available). |
ts_ms | int64 | UNIX timestamp in milliseconds when the record is written to Kafka. |
before | struct | Row data before the operation. null for insert operations. |
after | struct | Row data after the operation. null for delete operations. |
source.version | string | Version number of the Lindorm database. |
source.db | string | ID of the source Lindorm instance. |
source.namespace | string | Namespace the table belongs to. |
source.table | string | Name of the Lindorm database table. |
source.ts_ms | int64 | UNIX timestamp in milliseconds when the operation occurs in the Lindorm database table. |
The before and after fields vary by operation type:
| Operation | op | before | after |
|---|---|---|---|
| Insert | c | null | Row data after insert |
| Update | u | Row data before update | Row data after update |
| Delete | d | Row data before delete | null |
| Column delete | u | Row data before delete | Row data with the deleted column omitted |
Schema field
The schema field is automatically generated from the payload and included in every operation record by default. It describes the data types of all fields in the message using a recursive structure.
| Sub-field | Description |
|---|---|
field | Name of the field. |
type | Data type of the field. |
name | Description of the schema. |
fields | Nested fields stored in a recursive structure. |
optional | Whether the field is optional. |
HBase tables
The data format for HBase operation records follows the same structure as SQL operation records. However, HBase tables have two structural differences:
Binary data encoding: HBase stores raw binary data. Consumed records represent this data as Base64-encoded strings.
Column naming: HBase uses column families. Non-primary key column names follow the format
column family_column name. The primary key column is always namedROW.
Sample records
SQL table samples
The following samples are based on this table schema:
CREATE TABLE customers (id VARCHAR, first_name VARCHAR, last_name VARCHAR, PRIMARY KEY(id));Insert (op: "c", before is null)
{
"schema": {},
"payload": {
"op": "c",
"ts_ms": 1465491411815,
"before": null,
"after": {
"id": "1004",
"first_name": "Anne",
"last_name": "Kretchmar"
},
"source": {
"version": "v1.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1465491411807
}
}
}Update (op: "u", both before and after are populated)
{
"schema": {},
"payload": {
"op": "u",
"ts_ms": 1465491411815,
"before": {
"id": "1004",
"first_name": "Anne Marie",
"last_name": "Kretchmar"
},
"after": {
"id": "1004",
"first_name": "Anne",
"last_name": "Kretchmar"
},
"source": {
"version": "v1.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1465491411807
}
}
}Row delete (op: "d", after is null)
{
"schema": {},
"payload": {
"op": "d",
"ts_ms": 1465491411815,
"before": {
"id": "1004",
"first_name": "Anne Marie",
"last_name": "Kretchmar"
},
"after": null,
"source": {
"version": "v1.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1465491411807
}
}
}Column delete (op: "u", deleted column is omitted from after)
{
"schema": {},
"payload": {
"op": "u",
"ts_ms": 1465491411815,
"before": {
"id": "1004",
"first_name": "Anne Marie",
"last_name": "Kretchmar"
},
"after": {
"id": "1004",
"first_name": "Anne Marie"
},
"source": {
"version": "v1.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1465491411807
}
}
}HBase table samples
The following Java code inserts a row into an HBase table:
Put put = new Put(Bytes.toBytes("user1"));
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("lucky"));
table.put(put);The corresponding consumed record uses Base64-encoded strings. The ROW field contains the Base64-encoded row key, and the f_name field (column family f, column name) contains the Base64-encoded column value.
{
"schema": {},
"payload": {
"op": "c",
"ts_ms": 1725258859839,
"after": {
"ROW": "dXNlcjE=",
"f_name": "bHVja3k="
},
"source": {
"version": "v2.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1725258833727
}
}
}