The OceanBase connector integrates Realtime Compute for Apache Flink with OceanBase, a native distributed hybrid transactional and analytical processing (HTAP) database. Use it to read change data capture (CDC) streams, join dimension tables, and write results back to OceanBase.
The OceanBase connector is in public preview.
Supported features
| Category | Details |
|---|---|
| Table types | Source, dimension, and sink tables |
| Runtime modes | Streaming mode and batch mode |
| Data format | Not applicable |
| Specific monitoring metrics | None |
| API | SQL |
| Updates and deletes in sink tables | Yes |
Choose a connector
OceanBase supports two compatibility modes. Your mode determines which connector to use.
| OceanBase mode | Recommended connector | Alternative |
|---|---|---|
| Oracle mode | OceanBase connector | — |
| MySQL mode | OceanBase connector or MySQL connector | — |
MySQL mode notes:
-
The MySQL connector is available for OceanBase 3.2.4.4 and later (public preview). Evaluate it thoroughly before using in production.
-
To read incremental data with the MySQL connector, enable and configure OceanBase Binlog. See OceanBase Binlog service overview and Binlog-related operations.
Prerequisites
Before you begin, make sure that:
-
The target database and tables exist in OceanBase
-
An IP address whitelist is configured — see Configure a whitelist group
-
(For CDC source tables) The OceanBase Binlog service is enabled — see Binlog-related operations
-
(For bypass import sink tables) The bypass import port is enabled — see Bypass import
Limitations
-
Requires Ververica Runtime (VVR) 8.0.1 or later.
Semantic guarantees:
-
CDC source tables: exactly-once semantics. Data is not lost or duplicated when transitioning from full historical data to Binlog reading, even after a fault.
-
Sink tables: at-least-once semantics. If the sink table has a primary key, idempotence guarantees data correctness.
VVR 11.4.0 CDC architecture change
Starting from VVR 11.4.0, the OceanBase CDC connector was upgraded:
-
The original CDC connector based on the OceanBase LogProxy service is deprecated and removed.
-
Incremental log capture now requires the OceanBase Binlog service. The OceanBase CDC connector offers better protocol compatibility and connection stability with the Binlog service compared to connecting the standard MySQL CDC connector directly. Connecting the standard MySQL CDC connector to the OceanBase Binlog service for change tracking is not recommended.
-
Incremental change tracking in Oracle compatibility mode is no longer supported. For Oracle mode CDC, contact OceanBase Enterprise Technical Support.
Syntax
CREATE TABLE oceanbase_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = '<your-jdbc-url>',
'tableName' = '<your-table-name>',
'userName' = '<your-username>',
'password' = '<your-password>'
);
Sink write behavior:
For each incoming record, the connector constructs an SQL statement based on the sink table's schema:
-
No primary key:
INSERT INTO -
Has primary key:
UPSERT(based on the database compatibility mode)
WITH parameters
General parameters
These parameters apply to all table types.
| Parameter | Description | Required | Type | Default |
|---|---|---|---|---|
connector |
Set to oceanbase. |
Yes | STRING | — |
password |
The database password. | Yes | STRING | — |
Source table parameters
Starting from VVR 11.4.0, the OceanBase CDC connector uses the OceanBase Binlog service for incremental log capture. The LogProxy-based connector is removed. Oracle compatibility mode CDC is no longer supported from VVR 11.4.0.
| Parameter | Description | Required | Type | Default | Notes |
|---|---|---|---|---|---|
hostname |
The IP address or hostname of the OceanBase database. Use a virtual private cloud (VPC) address when possible. | Yes | STRING | — | If OceanBase and Realtime Compute for Apache Flink are in different VPCs, set up cross-VPC connectivity or use a public endpoint. See Workspace management and Internet access for Flink clusters. |
username |
The OceanBase database username. | Yes | STRING | — | — |
database-name |
The OceanBase database name. Supports regular expressions to read from multiple databases. Avoid ^ and $ anchors. |
Yes | STRING | — | The connector concatenates database-name and table-name with \\. (VVR 8.0.1+) or . (earlier versions) to form a full path regex. For example, db_.* + tb_.+ becomes db_.*\\.tb_.+. |
table-name |
The OceanBase table name. Supports regular expressions to read from multiple tables. Avoid ^ and $ anchors. |
Yes | STRING | — | See database-name note above. |
port |
The OceanBase database port. | No | INTEGER | 3306 |
— |
server-id |
A numeric ID for the database client. Must be globally unique. Supports a range, such as 5400-5408, to assign different IDs to concurrent readers. |
No | STRING | Random value between 5400 and 6400 | Use a different ID for each job connecting to the same database. See Server ID usage. |
scan.incremental.snapshot.chunk.size |
The number of rows per chunk during incremental snapshot reading. Data in each chunk is buffered in memory before it is fully read. Smaller chunks improve fault recovery granularity but may cause out-of-memory (OOM) errors and reduce throughput. | No | INTEGER | 8096 |
Balance chunk size against memory and throughput requirements. |
scan.snapshot.fetch.size |
The maximum number of records fetched per pull during full table reads. | No | INTEGER | 1024 |
— |
scan.startup.mode |
The startup mode for data consumption. | No | STRING | initial |
Valid values: initial (scan full history, then read Binlog), latest-offset (Binlog tail only), earliest-offset (earliest available Binlog), specific-offset (set via scan.startup.specific-offset.* parameters), timestamp (set via scan.startup.timestamp-millis). Important
For |
scan.startup.specific-offset.file |
The Binlog filename for the start offset. Example: mysql-bin.000003. |
No | STRING | — | Requires scan.startup.mode=specific-offset. |
scan.startup.specific-offset.pos |
The byte offset within the specified Binlog file. | No | INTEGER | — | Requires scan.startup.mode=specific-offset. |
scan.startup.specific-offset.gtid-set |
The GTID set for the start offset. Example: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19. |
No | STRING | — | Requires scan.startup.mode=specific-offset. |
scan.startup.timestamp-millis |
The start timestamp in milliseconds. OceanBase CDC reads the initial event of each Binlog file to locate the file matching this timestamp. The Binlog file must not have been purged. | No | LONG | — | Requires scan.startup.mode=timestamp. |
server-time-zone |
The session time zone used by the database. Controls how TIMESTAMP types are converted to STRING. See Debezium temporal types. Example: Asia/Shanghai. |
No | STRING | Flink job runtime timezone | — |
debezium.min.row.count.to.stream.results |
The row count threshold above which the connector switches from full read (entire table into memory) to batch read (streaming rows in batches). Full read is faster; batch read avoids OOM for large tables. | No | INTEGER | 1000 |
— |
connect.timeout |
The maximum wait time before retrying a timed-out connection. | No | DURATION | 30s |
— |
connect.max-retries |
The maximum number of connection retries after failure. | No | INTEGER | 3 |
— |
connection.pool.size |
The size of the database connection pool. Reusing connections reduces the total number of open connections. | No | INTEGER | 20 |
— |
jdbc.properties.* |
Custom JDBC URL connection parameters. Example: 'jdbc.properties.useSSL' = 'false'. See MySQL configuration properties. |
No | STRING | — | — |
debezium.* |
Custom Debezium parameters for Binlog reading. Example: 'debezium.event.deserialization.failure.handling.mode' = 'ignore'. |
No | STRING | — | — |
heartbeat.interval |
The interval at which the source emits heartbeat events to advance the Binlog offset. Prevents the Binlog offset from expiring on infrequently updated tables. An expired offset causes the job to fail and requires a stateless restart. | No | DURATION | 30s |
— |
scan.incremental.snapshot.chunk.key-column |
The column used as the chunk key for splitting data during the snapshot phase. | Conditional | STRING | — | Required for tables without a primary key (must be NOT NULL). Optional for tables with a primary key (select one column from the primary key). |
scan.incremental.close-idle-reader.enabled |
Specifies whether to close idle readers after the snapshot phase completes. | No | BOOLEAN | false |
VVR 8.0.1+. Also requires execution.checkpointing.checkpoints-after-tasks-finish.enabled=true. |
scan.read-changelog-as-append-only.enabled |
Specifies whether to convert the changelog stream to an append-only stream. When true, all message types (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER) are converted to INSERT. Enable only in special scenarios, such as preserving delete messages from an upstream table. |
No | BOOLEAN | false |
VVR 8.0.8+. |
scan.only.deserialize.captured.tables.changelog.enabled |
Specifies whether to deserialize change events only for the captured tables during the incremental phase. Setting to true speeds up Binlog reading. |
No | BOOLEAN | false (VVR 8.x), true (VVR 11.1+) |
VVR 8.0.7+. In VVR 8.0.8 and earlier, use the parameter name debezium.scan.only.deserialize.captured.tables.changelog.enable. |
scan.parse.online.schema.changes.enabled |
Specifies whether to parse DDL events for ApsaraDB RDS lockless changes during the incremental phase. Experimental feature. Take a Flink job snapshot before performing online lockless schema changes. | No | BOOLEAN | false |
VVR 11.1+. |
scan.incremental.snapshot.backfill.skip |
Specifies whether to skip backfill during the snapshot phase. When enabled, changes that occur during the snapshot are read in the incremental phase instead of being merged into the snapshot. May cause data inconsistency; provides only at-least-once semantics. | No | BOOLEAN | false |
VVR 11.1+. |
scan.incremental.snapshot.unbounded-chunk-first.enabled |
Specifies whether to distribute the unbounded chunk first during the snapshot phase. Reduces the risk of OOM when a TaskManager processes the last chunk. Experimental feature. Add this parameter before the job starts for the first time. | No | BOOLEAN | false |
VVR 11.1+. |
Dimension table parameters
| Parameter | Description | Required | Type | Default | Notes |
|---|---|---|---|---|---|
url |
The JDBC URL. Must include the MySQL database name or Oracle service name. | Yes | STRING | — | — |
userName |
The database username. | Yes | STRING | — | — |
cache |
The cache policy for dimension table lookups. | No | STRING | ALL |
ALL: Load all data before the job starts; reload after expiry. Suitable for small tables with many lookup misses. Increase the join node memory to at least twice the table size to support asynchronous loading. LRU: Cache a subset of rows; requires cacheSize. None: No caching. |
cacheSize |
The maximum number of cached entries. | No | INTEGER | 100000 |
Required when cache=LRU. Ignored when cache=ALL. |
cacheTTLMs |
The cache timeout in milliseconds. Behavior depends on the cache setting: for LRU, entries expire after this duration (no expiry by default); for ALL, the full cache reloads after this duration (no reload by default); for None, this parameter has no effect. |
No | LONG | Long.MAX_VALUE |
— |
maxRetryTimeout |
The maximum retry duration. | No | DURATION | 60s |
— |
Sink table parameters (JDBC)
| Parameter | Description | Required | Type | Default | Notes |
|---|---|---|---|---|---|
url |
The JDBC URL. Must include the MySQL database name or Oracle service name. | Yes | STRING | — | — |
userName |
The database username. | Yes | STRING | — | — |
tableName |
The target table name. | Yes | STRING | — | — |
sink.mode |
The write mode. Set to jdbc for standard writes; set to direct-load for bypass import. |
Yes | STRING | jdbc |
— |
compatibleMode |
The OceanBase compatibility mode. Valid values: mysql, oracle. |
No | STRING | mysql |
OceanBase-specific parameter. |
maxRetryTimes |
The maximum number of write retries. | No | INTEGER | 3 |
— |
poolInitialSize |
The initial connection pool size. | No | INTEGER | 1 |
— |
poolMaxActive |
The maximum number of active connections in the pool. | No | INTEGER | 8 |
— |
poolMaxWait |
The maximum wait time (ms) for a connection from the pool. | No | INTEGER | 2000 |
— |
poolMinIdle |
The minimum number of idle connections in the pool. | No | INTEGER | 1 |
— |
connectionProperties |
JDBC connection properties in k1=v1;k2=v2 format. |
No | STRING | — | — |
ignoreDelete |
Specifies whether to ignore delete operations. | No | BOOLEAN | false |
— |
excludeUpdateColumns |
Columns to exclude from updates, comma-separated (for example, column1,column2). Primary key columns are always excluded regardless of this setting. |
No | STRING | — | — |
partitionKey |
The partition key. When set, data is grouped by this key before the modRule is applied. |
No | STRING | — | — |
modRule |
The grouping rule in the format column_name mod number (for example, user_id mod 8). The column must be numeric. Data is first partitioned by partitionKey, then grouped within each partition by this rule. |
No | STRING | — | — |
bufferSize |
The data buffer size (number of records). | No | INTEGER | 1000 |
— |
flushIntervalMs |
The buffer flush interval (ms). If the buffer does not meet the output condition within this interval, all buffered data is flushed automatically. | No | LONG | 1000 |
— |
retryIntervalMs |
The retry interval (ms). | No | INTEGER | 5000 |
— |
Sink table parameters (bypass import)
Bypass import is a high-throughput write method for bulk data loading in OceanBase. Available in VVR 11.5 and later.
Before using bypass import, check the following constraints:
-
Bounded streams only: The data source must be a bounded stream. Use Flink batch mode for best performance.
-
Table locking during import: The target table is locked throughout the import. DML writes and DDL changes are blocked; read queries are unaffected.
-
Not for real-time writes: For streaming or real-time writes, use the JDBC sink instead.
| Parameter | Description | Required | Type | Default | Notes |
|---|---|---|---|---|---|
sink.mode |
Set to direct-load to use bypass import. |
No | STRING | jdbc |
— |
host |
The IP address or hostname of the OceanBase database. | Yes | STRING | — | — |
port |
The RPC port of the OceanBase database. | No | INTEGER | 2882 |
— |
username |
The database username. | Yes | STRING | — | — |
tenant-name |
The OceanBase tenant name. | Yes | STRING | — | — |
schema-name |
For MySQL tenants: the database name. For Oracle tenants: the owner name. | Yes | STRING | — | — |
table-name |
The target table name. | Yes | STRING | — | — |
parallel |
The server-side concurrency for the import task. The server caps the actual degree of parallelism based on tenant CPU specifications without returning an error. Formula: MIN(tenant_cores × 2, parallel) × partition_nodes. For example, with 2 CPU cores, parallel=10, and 2 partition nodes: MIN(4, 10) × 2 = 8. |
No | INTEGER | 8 |
— |
buffer-size |
The number of records buffered before a single write to OceanBase. | No | INTEGER | 1024 |
— |
dup-action |
The behavior when duplicate primary keys are encountered. STOP_ON_DUP: fail the import. REPLACE: overwrite the existing row. IGNORE: discard the incoming row. |
No | STRING | REPLACE |
— |
load-method |
The import mode. full: standard bypass import. inc: incremental mode, checks for primary key conflicts (observer 4.3.2+, dup-action=REPLACE not supported). inc_replace: incremental replace mode, overwrites existing rows directly without conflict checks (observer 4.3.2+, dup-action is ignored). |
No | STRING | full |
— |
max-error-rows |
The maximum number of tolerated error rows. Error rows include: duplicate primary keys when dup-action=STOP_ON_DUP, mismatched column counts, and rows where type conversion fails. |
No | LONG | 0 |
— |
timeout |
The overall timeout for the bypass import task. | No | DURATION | 7d |
— |
heartbeat-timeout |
The client-side heartbeat timeout. | No | DURATION | 60s |
— |
heartbeat-interval |
The client-side heartbeat interval. | No | DURATION | 10s |
— |
Type mapping
MySQL-compatible mode
| OceanBase type | Flink type |
|---|---|
TINYINT |
TINYINT |
SMALLINT, TINYINT UNSIGNED |
SMALLINT |
INT, MEDIUMINT, SMALLINT UNSIGNED |
INT |
BIGINT, INT UNSIGNED |
BIGINT |
BIGINT UNSIGNED |
DECIMAL(20, 0) |
REAL, FLOAT |
FLOAT |
DOUBLE |
DOUBLE |
NUMERIC(p, s), DECIMAL(p, s) |
DECIMAL(p, s) (p ≤ 38) |
BOOLEAN, TINYINT(1) |
BOOLEAN |
DATE |
DATE |
TIME [(p)] |
TIME [(p)] [WITHOUT TIME ZONE] |
DATETIME [(p)], TIMESTAMP [(p)] |
TIMESTAMP [(p)] [WITHOUT TIME ZONE] |
CHAR(n) |
CHAR(n) |
VARCHAR(n) |
VARCHAR(n) |
BIT(n) |
BINARY(⌈n/8⌉) |
BINARY(n) |
BINARY(n) |
VARBINARY(N) |
VARBINARY(N) |
TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT |
STRING |
TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB |
BYTES (max 2,147,483,647 bytes) |
Oracle-compatible mode
| OceanBase type | Flink type |
|---|---|
NUMBER(p, s≤0), p−s < 3 |
TINYINT |
NUMBER(p, s≤0), p−s < 5 |
SMALLINT |
NUMBER(p, s≤0), p−s < 10 |
INT |
NUMBER(p, s≤0), p−s < 19 |
BIGINT |
NUMBER(p, s≤0), 19 ≤ p−s ≤ 38 |
DECIMAL(p−s, 0) |
NUMBER(p, s>0) |
DECIMAL(p, s) |
NUMBER(p, s≤0), p−s > 38 |
STRING |
FLOAT, BINARY_FLOAT |
FLOAT |
BINARY_DOUBLE |
DOUBLE |
NUMBER(1) |
BOOLEAN |
DATE, TIMESTAMP [(p)] |
TIMESTAMP [(p)] [WITHOUT TIME ZONE] |
CHAR(n), NCHAR(n), NVARCHAR2(n), VARCHAR(n), VARCHAR2(n), CLOB |
STRING |
BLOB, ROWID |
BYTES |
Examples
Source table and sink table
The following example reads CDC data from an OceanBase source table and writes it to a JDBC sink table. A bypass import sink table definition is also included for reference.
All three tables use 'connector' = 'oceanbase'. The source and JDBC sink use different parameter sets; the direct-load sink sets sink.mode = 'direct-load' and connects via the RPC port.
-- OceanBase CDC source table (reads full history, then Binlog)
CREATE TEMPORARY TABLE oceanbase_source (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'oceanbase',
'hostname' = '<your-hostname>',
'port' = '3306',
'username' = '<your-username>',
'password' = '<your-password>',
'database-name' = '<your-database-name>',
'table-name' = '<your-table-name>'
);
-- OceanBase JDBC sink table (for real-time streaming writes)
CREATE TEMPORARY TABLE oceanbase_sink (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'oceanbase',
'url' = '<your-jdbc-url>',
'userName' = '<your-username>',
'password' = '<your-password>',
'tableName' = '<your-table-name>'
);
-- OceanBase bypass import sink table (for high-throughput batch writes)
-- Requires a bounded data source; set Flink to batch mode for best performance
CREATE TEMPORARY TABLE oceanbase_directload_sink (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'oceanbase',
'sink.mode' = 'direct-load',
'host' = '<your-host>',
'port' = '<your-rpc-port>',
'tenant-name' = '<your-tenant-name>',
'schema-name' = '<your-schema-name>',
'table-name' = '<your-table-name>',
'username' = '<your-username>',
'password' = '<your-password>'
);
BEGIN STATEMENT SET;
INSERT INTO oceanbase_sink
SELECT * FROM oceanbase_source;
END;
Dimension table
The following example joins a Datagen source with an OceanBase dimension table using a temporal join. The ALL cache policy loads the entire dimension table into memory before the job starts.
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
-- OceanBase dimension table with ALL cache policy
-- ALL cache loads the full table at startup — suitable for small, stable tables
CREATE TEMPORARY TABLE oceanbase_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'oceanbase',
'url' = '<your-jdbc-url>',
'userName' = '<your-username>',
'password' = '${secret_values.password}',
'tableName' = '<your-table-name>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T
JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H
ON T.a = H.a;
What's next
-
Supported connectors — full list of connectors available in Realtime Compute for Apache Flink
-
MySQL connector — alternative connector for OceanBase MySQL mode
-
OceanBase Binlog service overview — required for CDC incremental reads