All Products
Search
Document Center

Realtime Compute for Apache Flink:OceanBase

Last Updated:Mar 26, 2026

The OceanBase connector integrates Realtime Compute for Apache Flink with OceanBase, a native distributed hybrid transactional and analytical processing (HTAP) database. Use it to read change data capture (CDC) streams, join dimension tables, and write results back to OceanBase.

The OceanBase connector is in public preview.

Supported features

Category Details
Table types Source, dimension, and sink tables
Runtime modes Streaming mode and batch mode
Data format Not applicable
Specific monitoring metrics None
API SQL
Updates and deletes in sink tables Yes

Choose a connector

OceanBase supports two compatibility modes. Your mode determines which connector to use.

OceanBase mode Recommended connector Alternative
Oracle mode OceanBase connector
MySQL mode OceanBase connector or MySQL connector

MySQL mode notes:

  • The MySQL connector is available for OceanBase 3.2.4.4 and later (public preview). Evaluate it thoroughly before using in production.

  • To read incremental data with the MySQL connector, enable and configure OceanBase Binlog. See OceanBase Binlog service overview and Binlog-related operations.

Prerequisites

Before you begin, make sure that:

Limitations

  • Requires Ververica Runtime (VVR) 8.0.1 or later.

Semantic guarantees:

  • CDC source tables: exactly-once semantics. Data is not lost or duplicated when transitioning from full historical data to Binlog reading, even after a fault.

  • Sink tables: at-least-once semantics. If the sink table has a primary key, idempotence guarantees data correctness.

VVR 11.4.0 CDC architecture change

Starting from VVR 11.4.0, the OceanBase CDC connector was upgraded:

  • The original CDC connector based on the OceanBase LogProxy service is deprecated and removed.

  • Incremental log capture now requires the OceanBase Binlog service. The OceanBase CDC connector offers better protocol compatibility and connection stability with the Binlog service compared to connecting the standard MySQL CDC connector directly. Connecting the standard MySQL CDC connector to the OceanBase Binlog service for change tracking is not recommended.

  • Incremental change tracking in Oracle compatibility mode is no longer supported. For Oracle mode CDC, contact OceanBase Enterprise Technical Support.

Syntax

CREATE TABLE oceanbase_source (
   order_id     INT,
   order_date   TIMESTAMP(0),
   customer_name STRING,
   price        DECIMAL(10, 5),
   product_id   INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector'  = 'oceanbase',
  'url'        = '<your-jdbc-url>',
  'tableName'  = '<your-table-name>',
  'userName'   = '<your-username>',
  'password'   = '<your-password>'
);

Sink write behavior:

For each incoming record, the connector constructs an SQL statement based on the sink table's schema:

  • No primary key: INSERT INTO

  • Has primary key: UPSERT (based on the database compatibility mode)

WITH parameters

General parameters

These parameters apply to all table types.

Parameter Description Required Type Default
connector Set to oceanbase. Yes STRING
password The database password. Yes STRING

Source table parameters

Important

Starting from VVR 11.4.0, the OceanBase CDC connector uses the OceanBase Binlog service for incremental log capture. The LogProxy-based connector is removed. Oracle compatibility mode CDC is no longer supported from VVR 11.4.0.

Parameter Description Required Type Default Notes
hostname The IP address or hostname of the OceanBase database. Use a virtual private cloud (VPC) address when possible. Yes STRING If OceanBase and Realtime Compute for Apache Flink are in different VPCs, set up cross-VPC connectivity or use a public endpoint. See Workspace management and Internet access for Flink clusters.
username The OceanBase database username. Yes STRING
database-name The OceanBase database name. Supports regular expressions to read from multiple databases. Avoid ^ and $ anchors. Yes STRING The connector concatenates database-name and table-name with \\. (VVR 8.0.1+) or . (earlier versions) to form a full path regex. For example, db_.* + tb_.+ becomes db_.*\\.tb_.+.
table-name The OceanBase table name. Supports regular expressions to read from multiple tables. Avoid ^ and $ anchors. Yes STRING See database-name note above.
port The OceanBase database port. No INTEGER 3306
server-id A numeric ID for the database client. Must be globally unique. Supports a range, such as 5400-5408, to assign different IDs to concurrent readers. No STRING Random value between 5400 and 6400 Use a different ID for each job connecting to the same database. See Server ID usage.
scan.incremental.snapshot.chunk.size The number of rows per chunk during incremental snapshot reading. Data in each chunk is buffered in memory before it is fully read. Smaller chunks improve fault recovery granularity but may cause out-of-memory (OOM) errors and reduce throughput. No INTEGER 8096 Balance chunk size against memory and throughput requirements.
scan.snapshot.fetch.size The maximum number of records fetched per pull during full table reads. No INTEGER 1024
scan.startup.mode The startup mode for data consumption. No STRING initial Valid values: initial (scan full history, then read Binlog), latest-offset (Binlog tail only), earliest-offset (earliest available Binlog), specific-offset (set via scan.startup.specific-offset.* parameters), timestamp (set via scan.startup.timestamp-millis).
Important

For earliest-offset, specific-offset, and timestamp modes, the table schema must not change between the specified Binlog position and job startup.

scan.startup.specific-offset.file The Binlog filename for the start offset. Example: mysql-bin.000003. No STRING Requires scan.startup.mode=specific-offset.
scan.startup.specific-offset.pos The byte offset within the specified Binlog file. No INTEGER Requires scan.startup.mode=specific-offset.
scan.startup.specific-offset.gtid-set The GTID set for the start offset. Example: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19. No STRING Requires scan.startup.mode=specific-offset.
scan.startup.timestamp-millis The start timestamp in milliseconds. OceanBase CDC reads the initial event of each Binlog file to locate the file matching this timestamp. The Binlog file must not have been purged. No LONG Requires scan.startup.mode=timestamp.
server-time-zone The session time zone used by the database. Controls how TIMESTAMP types are converted to STRING. See Debezium temporal types. Example: Asia/Shanghai. No STRING Flink job runtime timezone
debezium.min.row.count.to.stream.results The row count threshold above which the connector switches from full read (entire table into memory) to batch read (streaming rows in batches). Full read is faster; batch read avoids OOM for large tables. No INTEGER 1000
connect.timeout The maximum wait time before retrying a timed-out connection. No DURATION 30s
connect.max-retries The maximum number of connection retries after failure. No INTEGER 3
connection.pool.size The size of the database connection pool. Reusing connections reduces the total number of open connections. No INTEGER 20
jdbc.properties.* Custom JDBC URL connection parameters. Example: 'jdbc.properties.useSSL' = 'false'. See MySQL configuration properties. No STRING
debezium.* Custom Debezium parameters for Binlog reading. Example: 'debezium.event.deserialization.failure.handling.mode' = 'ignore'. No STRING
heartbeat.interval The interval at which the source emits heartbeat events to advance the Binlog offset. Prevents the Binlog offset from expiring on infrequently updated tables. An expired offset causes the job to fail and requires a stateless restart. No DURATION 30s
scan.incremental.snapshot.chunk.key-column The column used as the chunk key for splitting data during the snapshot phase. Conditional STRING Required for tables without a primary key (must be NOT NULL). Optional for tables with a primary key (select one column from the primary key).
scan.incremental.close-idle-reader.enabled Specifies whether to close idle readers after the snapshot phase completes. No BOOLEAN false VVR 8.0.1+. Also requires execution.checkpointing.checkpoints-after-tasks-finish.enabled=true.
scan.read-changelog-as-append-only.enabled Specifies whether to convert the changelog stream to an append-only stream. When true, all message types (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER) are converted to INSERT. Enable only in special scenarios, such as preserving delete messages from an upstream table. No BOOLEAN false VVR 8.0.8+.
scan.only.deserialize.captured.tables.changelog.enabled Specifies whether to deserialize change events only for the captured tables during the incremental phase. Setting to true speeds up Binlog reading. No BOOLEAN false (VVR 8.x), true (VVR 11.1+) VVR 8.0.7+. In VVR 8.0.8 and earlier, use the parameter name debezium.scan.only.deserialize.captured.tables.changelog.enable.
scan.parse.online.schema.changes.enabled Specifies whether to parse DDL events for ApsaraDB RDS lockless changes during the incremental phase. Experimental feature. Take a Flink job snapshot before performing online lockless schema changes. No BOOLEAN false VVR 11.1+.
scan.incremental.snapshot.backfill.skip Specifies whether to skip backfill during the snapshot phase. When enabled, changes that occur during the snapshot are read in the incremental phase instead of being merged into the snapshot. May cause data inconsistency; provides only at-least-once semantics. No BOOLEAN false VVR 11.1+.
scan.incremental.snapshot.unbounded-chunk-first.enabled Specifies whether to distribute the unbounded chunk first during the snapshot phase. Reduces the risk of OOM when a TaskManager processes the last chunk. Experimental feature. Add this parameter before the job starts for the first time. No BOOLEAN false VVR 11.1+.

Dimension table parameters

Parameter Description Required Type Default Notes
url The JDBC URL. Must include the MySQL database name or Oracle service name. Yes STRING
userName The database username. Yes STRING
cache The cache policy for dimension table lookups. No STRING ALL ALL: Load all data before the job starts; reload after expiry. Suitable for small tables with many lookup misses. Increase the join node memory to at least twice the table size to support asynchronous loading. LRU: Cache a subset of rows; requires cacheSize. None: No caching.
cacheSize The maximum number of cached entries. No INTEGER 100000 Required when cache=LRU. Ignored when cache=ALL.
cacheTTLMs The cache timeout in milliseconds. Behavior depends on the cache setting: for LRU, entries expire after this duration (no expiry by default); for ALL, the full cache reloads after this duration (no reload by default); for None, this parameter has no effect. No LONG Long.MAX_VALUE
maxRetryTimeout The maximum retry duration. No DURATION 60s

Sink table parameters (JDBC)

Parameter Description Required Type Default Notes
url The JDBC URL. Must include the MySQL database name or Oracle service name. Yes STRING
userName The database username. Yes STRING
tableName The target table name. Yes STRING
sink.mode The write mode. Set to jdbc for standard writes; set to direct-load for bypass import. Yes STRING jdbc
compatibleMode The OceanBase compatibility mode. Valid values: mysql, oracle. No STRING mysql OceanBase-specific parameter.
maxRetryTimes The maximum number of write retries. No INTEGER 3
poolInitialSize The initial connection pool size. No INTEGER 1
poolMaxActive The maximum number of active connections in the pool. No INTEGER 8
poolMaxWait The maximum wait time (ms) for a connection from the pool. No INTEGER 2000
poolMinIdle The minimum number of idle connections in the pool. No INTEGER 1
connectionProperties JDBC connection properties in k1=v1;k2=v2 format. No STRING
ignoreDelete Specifies whether to ignore delete operations. No BOOLEAN false
excludeUpdateColumns Columns to exclude from updates, comma-separated (for example, column1,column2). Primary key columns are always excluded regardless of this setting. No STRING
partitionKey The partition key. When set, data is grouped by this key before the modRule is applied. No STRING
modRule The grouping rule in the format column_name mod number (for example, user_id mod 8). The column must be numeric. Data is first partitioned by partitionKey, then grouped within each partition by this rule. No STRING
bufferSize The data buffer size (number of records). No INTEGER 1000
flushIntervalMs The buffer flush interval (ms). If the buffer does not meet the output condition within this interval, all buffered data is flushed automatically. No LONG 1000
retryIntervalMs The retry interval (ms). No INTEGER 5000

Sink table parameters (bypass import)

Bypass import is a high-throughput write method for bulk data loading in OceanBase. Available in VVR 11.5 and later.

Before using bypass import, check the following constraints:

  • Bounded streams only: The data source must be a bounded stream. Use Flink batch mode for best performance.

  • Table locking during import: The target table is locked throughout the import. DML writes and DDL changes are blocked; read queries are unaffected.

  • Not for real-time writes: For streaming or real-time writes, use the JDBC sink instead.

Parameter Description Required Type Default Notes
sink.mode Set to direct-load to use bypass import. No STRING jdbc
host The IP address or hostname of the OceanBase database. Yes STRING
port The RPC port of the OceanBase database. No INTEGER 2882
username The database username. Yes STRING
tenant-name The OceanBase tenant name. Yes STRING
schema-name For MySQL tenants: the database name. For Oracle tenants: the owner name. Yes STRING
table-name The target table name. Yes STRING
parallel The server-side concurrency for the import task. The server caps the actual degree of parallelism based on tenant CPU specifications without returning an error. Formula: MIN(tenant_cores × 2, parallel) × partition_nodes. For example, with 2 CPU cores, parallel=10, and 2 partition nodes: MIN(4, 10) × 2 = 8. No INTEGER 8
buffer-size The number of records buffered before a single write to OceanBase. No INTEGER 1024
dup-action The behavior when duplicate primary keys are encountered. STOP_ON_DUP: fail the import. REPLACE: overwrite the existing row. IGNORE: discard the incoming row. No STRING REPLACE
load-method The import mode. full: standard bypass import. inc: incremental mode, checks for primary key conflicts (observer 4.3.2+, dup-action=REPLACE not supported). inc_replace: incremental replace mode, overwrites existing rows directly without conflict checks (observer 4.3.2+, dup-action is ignored). No STRING full
max-error-rows The maximum number of tolerated error rows. Error rows include: duplicate primary keys when dup-action=STOP_ON_DUP, mismatched column counts, and rows where type conversion fails. No LONG 0
timeout The overall timeout for the bypass import task. No DURATION 7d
heartbeat-timeout The client-side heartbeat timeout. No DURATION 60s
heartbeat-interval The client-side heartbeat interval. No DURATION 10s

Type mapping

MySQL-compatible mode

OceanBase type Flink type
TINYINT TINYINT
SMALLINT, TINYINT UNSIGNED SMALLINT
INT, MEDIUMINT, SMALLINT UNSIGNED INT
BIGINT, INT UNSIGNED BIGINT
BIGINT UNSIGNED DECIMAL(20, 0)
REAL, FLOAT FLOAT
DOUBLE DOUBLE
NUMERIC(p, s), DECIMAL(p, s) DECIMAL(p, s) (p ≤ 38)
BOOLEAN, TINYINT(1) BOOLEAN
DATE DATE
TIME [(p)] TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)], TIMESTAMP [(p)] TIMESTAMP [(p)] [WITHOUT TIME ZONE]
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
BIT(n) BINARY(⌈n/8⌉)
BINARY(n) BINARY(n)
VARBINARY(N) VARBINARY(N)
TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT STRING
TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB BYTES (max 2,147,483,647 bytes)

Oracle-compatible mode

OceanBase type Flink type
NUMBER(p, s≤0), p−s < 3 TINYINT
NUMBER(p, s≤0), p−s < 5 SMALLINT
NUMBER(p, s≤0), p−s < 10 INT
NUMBER(p, s≤0), p−s < 19 BIGINT
NUMBER(p, s≤0), 19 ≤ p−s ≤ 38 DECIMAL(p−s, 0)
NUMBER(p, s>0) DECIMAL(p, s)
NUMBER(p, s≤0), p−s > 38 STRING
FLOAT, BINARY_FLOAT FLOAT
BINARY_DOUBLE DOUBLE
NUMBER(1) BOOLEAN
DATE, TIMESTAMP [(p)] TIMESTAMP [(p)] [WITHOUT TIME ZONE]
CHAR(n), NCHAR(n), NVARCHAR2(n), VARCHAR(n), VARCHAR2(n), CLOB STRING
BLOB, ROWID BYTES

Examples

Source table and sink table

The following example reads CDC data from an OceanBase source table and writes it to a JDBC sink table. A bypass import sink table definition is also included for reference.

All three tables use 'connector' = 'oceanbase'. The source and JDBC sink use different parameter sets; the direct-load sink sets sink.mode = 'direct-load' and connects via the RPC port.

-- OceanBase CDC source table (reads full history, then Binlog)
CREATE TEMPORARY TABLE oceanbase_source (
  a INT,
  b VARCHAR,
  c VARCHAR
) WITH (
  'connector'     = 'oceanbase',
  'hostname'      = '<your-hostname>',
  'port'          = '3306',
  'username'      = '<your-username>',
  'password'      = '<your-password>',
  'database-name' = '<your-database-name>',
  'table-name'    = '<your-table-name>'
);

-- OceanBase JDBC sink table (for real-time streaming writes)
CREATE TEMPORARY TABLE oceanbase_sink (
  a INT,
  b VARCHAR,
  c VARCHAR
) WITH (
  'connector' = 'oceanbase',
  'url'       = '<your-jdbc-url>',
  'userName'  = '<your-username>',
  'password'  = '<your-password>',
  'tableName' = '<your-table-name>'
);

-- OceanBase bypass import sink table (for high-throughput batch writes)
-- Requires a bounded data source; set Flink to batch mode for best performance
CREATE TEMPORARY TABLE oceanbase_directload_sink (
  a INT,
  b VARCHAR,
  c VARCHAR
) WITH (
  'connector'   = 'oceanbase',
  'sink.mode'   = 'direct-load',
  'host'        = '<your-host>',
  'port'        = '<your-rpc-port>',
  'tenant-name' = '<your-tenant-name>',
  'schema-name' = '<your-schema-name>',
  'table-name'  = '<your-table-name>',
  'username'    = '<your-username>',
  'password'    = '<your-password>'
);

BEGIN STATEMENT SET;
INSERT INTO oceanbase_sink
SELECT * FROM oceanbase_source;
END;

Dimension table

The following example joins a Datagen source with an OceanBase dimension table using a temporal join. The ALL cache policy loads the entire dimension table into memory before the job starts.

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

-- OceanBase dimension table with ALL cache policy
-- ALL cache loads the full table at startup — suitable for small, stable tables
CREATE TEMPORARY TABLE oceanbase_dim (
  a INT,
  b VARCHAR,
  c VARCHAR
) WITH (
  'connector' = 'oceanbase',
  'url'       = '<your-jdbc-url>',
  'userName'  = '<your-username>',
  'password'  = '${secret_values.password}',
  'tableName' = '<your-table-name>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T
JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H
ON T.a = H.a;

What's next