All Products
Search
Document Center

Realtime Compute for Apache Flink:OceanBase (public preview)

Last Updated:Dec 06, 2025

This topic describes how to use the OceanBase connector.

Background information

OceanBase is a native distributed hybrid transactional and analytical processing (HTAP) database management system. For more information, see the official OceanBase website. OceanBase supports both Oracle and MySQL compatibility modes. This reduces the cost of refactoring business systems when you migrate from a MySQL or Oracle database. The data types, SQL features, and internal views in these modes are consistent with those of MySQL or Oracle. The recommended connectors for each mode are as follows:

  • Oracle mode: You can only use the OceanBase connector.

  • MySQL mode: This mode is highly compatible with native MySQL syntax. You can use both the OceanBase and MySQL connectors to read from and write to OceanBase.

    Important
    • The OceanBase connector is in public preview. For OceanBase 3.2.4.4 and later, you can use the MySQL connector to read from and write to OceanBase. This feature is also in public preview. Evaluate this feature carefully before you use it.

    • When you use the MySQL connector to read incremental data from OceanBase, ensure that OceanBase binary logging (Binlog) is enabled and correctly configured. For more information about OceanBase Binlog, see Overview or Binary logging-related operations.

The following table describes the information supported by the OceanBase connector.

Category

Details

Supported type

Source table, dimension table, and sink table

Running mode

Streaming and batch

Data format

Not applicable

Specific monitoring metrics

None

API type

SQL

Supports updating or deleting data in a sink table

Yes

Prerequisites

Limits

  • The OceanBase connector is supported in Realtime Compute for Apache Flink with Ververica Runtime (VVR) 8.0.1 or later.

  • At-least-once semantics are guaranteed. If the sink table has a primary key, idempotence ensures data correctness.

Syntax

CREATE TABLE oceanabse_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'oceanbase',
  'url' = '<yourJdbcUrl>',
  'tableName' = '<yourTableName>',
  'userName' = '<yourUserName>',
  'password' = '<yourPassword>'
);
Note

The connector writes to a sink table by building and executing an SQL statement for each data record that it receives.

  • If a sink table does not have a primary key, an INSERT INTO statement is generated.

  • If a sink table has a primary key, an UPSERT statement is generated based on the database compatibility mode.

WITH parameters

  • General

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    connector

    The type of the table.

    Yes

    STRING

    None

    The value must be oceanbase.

    password

    The password.

    Yes

    STRING

    None

    None.

  • Source-specific

    Important

    Note: Starting from Realtime Compute for Apache Flink with VVR 11.4.0, the OceanBase CDC connector has undergone major architectural upgrades and feature adjustments. The core changes are described as follows to help you understand the updates and migrate versions smoothly:

    • The original CDC connector based on the OceanBase LogProxy service has been officially deprecated and removed from the distribution. Starting from VVR 11.4.0, the OceanBase CDC connector supports capturing incremental logs and synchronizing data only through the OceanBase Binlog service.

    • The OceanBase CDC connector provides enhanced protocol compatibility and connection stability with the OceanBase Binlog service. Therefore, we recommend that you use the OceanBase CDC connector.

      The OceanBase Binlog service is fully compatible with the MySQL replication protocol. You can also use the standard MySQL CDC connector to connect to the OceanBase Binlog service for change tracking, but this is not recommended.

    • Starting from Realtime Compute for Apache Flink with VVR 11.4.0, the OceanBase CDC connector no longer supports incremental data subscription in Oracle-compatible mode. For incremental data subscription in Oracle-compatible mode, contact OceanBase enterprise technical support.

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    hostname

    The IP address or hostname of the OceanBase database.

    Yes

    STRING

    No

    We recommend that you specify a virtual private cloud (VPC) address.

    Note

    If OceanBase and Realtime Compute for Apache Flink are not in the same VPC, you must first establish a cross-VPC network connection or use the Internet for access. For more information, see Storage management and operations and How can a fully managed Flink cluster access the Internet?.

    username

    The username for the OceanBase database service.

    Yes

    STRING

    No

    None.

    database-name

    The name of the OceanBase database.

    Yes

    STRING

    None

    • As a source table, the database name supports regular expressions to read data from multiple databases.

    • When using regular expressions, try to avoid using the ^ and $ symbols to match the beginning and end. For the reason, see the remarks for the table-name parameter.

    table-name

    Indicated by OceanBase.

    Yes

    STRING

    None

    • As a source table, the table name supports regular expressions to read data from multiple tables.

    • When using regular expressions, try to avoid using the ^ and $ symbols to match the beginning and end. For the reason, see the following note.

    Note

    When the OceanBase source table matches table names with a regular expression, it combines the database-name and table-name you provide into a full-path regular expression using the string \\. (the character . was used before VVR 8.0.1). This combined regular expression is then used to match the fully qualified names of tables in the OceanBase database.

    For example, if you set 'database-name'='db_.*' and 'table-name'='tb_.+', the connector uses the regular expression db_.*\\.tb_.+ (or db_.*.tb_.+ before version 8.0.1) to match fully qualified table names to determine which tables to read.

    port

    The port number of the OceanBase database service.

    No

    INTEGER

    3306

    None.

    server-id

    A numeric ID for the database client.

    No

    STRING

    A random value between 5400 and 6400 is generated by default.

    This ID must be globally unique. We recommend that you set a different ID for each job connected to the same database.

    This parameter also supports an ID range format, such as 5400-5408. When incremental reading is enabled, multiple concurrent reads are supported. In this case, we recommend setting an ID range so that each concurrent task uses a different ID. For more information, see Server ID usage.

    scan.incremental.snapshot.chunk.size

    The size of each chunk in number of rows.

    No

    INTEGER

    8096

    When incremental snapshot reading is enabled, the table is split into multiple chunks for reading. The data of a chunk is buffered in memory before it is fully read.

    A smaller chunk size results in a larger total number of chunks for the table. This provides finer-grained fault recovery but may lead to out-of-memory (OOM) errors and lower overall throughput. Therefore, you need to find a balance and set a reasonable chunk size.

    scan.snapshot.fetch.size

    The maximum number of records to pull in each batch when you read the full data of a table.

    No

    INTEGER

    1024

    None.

    scan.startup.mode

    The startup mode for data consumption.

    No

    STRING

    initial

    Valid values:

    • initial (default): Scans historical full data on the first startup, then reads the latest Binlog data.

    • latest-offset: Does not scan historical full data on the first startup. It starts reading from the end of the Binlog (the latest Binlog position), meaning it only reads the latest changes made after the connector starts.

    • earliest-offset: Does not scan historical full data. It starts reading from the earliest available Binlog position.

    • specific-offset: Does not scan historical full data. It starts from a specific Binlog offset that you specify. You can specify the offset by setting both the scan.startup.specific-offset.file and scan.startup.specific-offset.pos parameters, or by setting only the scan.startup.specific-offset.gtid-set parameter to start from a specific GTID set.

    • timestamp: Does not scan historical full data. It starts reading the Binlog from a specified timestamp. The timestamp is specified by scan.startup.timestamp-millis in milliseconds.

    Important

    When using the earliest-offset, specific-offset, or timestamp startup modes, ensure that the schema of the corresponding table does not change between the specified Binlog consumption position and the job startup time. This avoids errors caused by schema differences.

    scan.startup.specific-offset.file

    The Binlog filename for the start offset when using the specific offset mode.

    No

    STRING

    None

    When using this parameter, scan.startup.mode must be set to specific-offset. An example filename format is mysql-bin.000003.

    scan.startup.specific-offset.pos

    The offset in the specified Binlog file for the start offset when using the specific offset mode.

    No

    INTEGER

    None

    When using this parameter, scan.startup.mode must be set to specific-offset.

    scan.startup.specific-offset.gtid-set

    The GTID set for the start offset when using the specific offset mode.

    No

    STRING

    None

    When using this parameter, scan.startup.mode must be set to specific-offset. An example GTID set format is 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

    scan.startup.timestamp-millis

    The timestamp in milliseconds for the start offset when using the specific time mode.

    No

    LONG

    None

    When using this parameter, scan.startup.mode must be set to timestamp. The unit is milliseconds.

    Important

    When using a specific time, OceanBase CDC attempts to read the initial event of each Binlog file to determine its timestamp, eventually locating the Binlog file corresponding to the specified time. Ensure that the Binlog file for the specified timestamp has not been cleared from the database and can be read.

    server-time-zone

    The session time zone used by the database.

    No

    STRING

    If you do not specify this parameter, the system uses the environment time zone of the Flink job runtime as the database server time zone. This is the time zone of the zone you selected.

    Example: Asia/Shanghai. This parameter controls how TIMESTAMP types are converted to STRING types. For more information, see Debezium temporal values.

    debezium.min.row.count.to.stream.results

    If the number of rows in a table is greater than this value, the batch read mode is used.

    No

    INTEGER

    1000

    Flink reads data from an OceanBase source table in one of the following ways:

    • Full read: Reads the entire table's data directly into memory. This is fast but consumes a corresponding amount of memory. If the source table is very large, there is a risk of OOM errors.

    • Batch read: Reads data in multiple batches, with a certain number of rows per batch, until all data is read. This avoids OOM risks for large tables but is relatively slower.

    connect.timeout

    The maximum time to wait before a connection retry after a connection to the OceanBase database server times out.

    No

    DURATION

    30s

    None.

    connect.max-retries

    The maximum number of retries after a failed connection to the OceanBase database service.

    No

    INTEGER

    3

    None.

    connection.pool.size

    The size of the database connection pool.

    No

    INTEGER

    20

    The database connection pool is used to reuse connections, which can reduce the number of database connections.

    jdbc.properties.*

    Custom connection parameters in the JDBC URL.

    No

    STRING

    None

    You can pass custom connection parameters. For example, to not use the SSL protocol, set 'jdbc.properties.useSSL' = 'false'.

    For more information about supported connection parameters, see MySQL Configuration Properties.

    debezium.*

    Custom parameters for Debezium to read binary logs.

    No

    STRING

    None

    You can pass custom Debezium parameters. For example, use 'debezium.event.deserialization.failure.handling.mode'='ignore' to specify the handling logic for parsing errors.

    heartbeat.interval

    The interval at which the source uses heartbeat events to advance the Binlog offset.

    No

    DURATION

    30s

    Heartbeat events are used to advance the Binlog offset in the source, which is very useful for slowly updated tables in OceanBase. For such tables, the Binlog offset does not advance automatically. Heartbeat events can push the Binlog offset forward, preventing it from expiring. An expired Binlog offset can cause the job to fail and be unrecoverable, requiring a stateless restart.

    scan.incremental.snapshot.chunk.key-column

    The column to use for splitting chunks during the snapshot phase.

    See Remarks.

    STRING

    None

    • Required for tables without a primary key. The selected column must be of a non-null type (NOT NULL).

    • Optional for tables with a primary key. Only one column from the primary key can be selected.

    scan.incremental.close-idle-reader.enabled

    Specifies whether to close idle readers after the snapshot phase ends.

    No

    BOOLEAN

    false

    • Supported only in Realtime Compute for Apache Flink that uses VVR 8.0.1 or later.

    • For this parameter to take effect, set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true.

    scan.read-changelog-as-append-only.enabled

    Specifies whether to convert the changelog data stream to an append-only data stream.

    No

    BOOLEAN

    false

    Valid values:

    • true: All types of messages (including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER) are converted to INSERT messages. Enable this only in special scenarios, such as when you need to save delete messages from an ancestor table.

    • false (default): All types of messages are sent downstream as they are.

    Note

    Supported only in Realtime Compute for Apache Flink that uses VVR 8.0.8 or later.

    scan.only.deserialize.captured.tables.changelog.enabled

    Specifies whether to deserialize change events only for the specified tables during the incremental phase.

    No

    BOOLEAN

    • The default value is false in VVR 8.x versions.

    • The default value is true in VVR 11.1 and later versions.

    Valid values:

    • true: Deserializes change data only for the target tables, which speeds up Binlog reading.

    • false (default): Deserializes change data for all tables.

    Note
    • Supported only in Realtime Compute for Apache Flink that uses VVR 8.0.7 or later.

    • When using this parameter in Realtime Compute for Apache Flink that uses VVR 8.0.8 or earlier, change the parameter name to debezium.scan.only.deserialize.captured.tables.changelog.enable.

    scan.parse.online.schema.changes.enabled

    Specifies whether to parse DDL events for lockless changes in RDS during the incremental phase.

    No

    BOOLEAN

    false

    Valid values:

    • true: Parses DDL events for lockless changes in RDS.

    • false (default): Does not parse DDL events for lockless changes in RDS.

    This is an experimental feature. Before performing an online lockless change, we recommend that you take a snapshot of the Flink job for recovery.

    Note

    Supported only in Realtime Compute for Apache Flink that uses VVR 11.1 or later.

    scan.incremental.snapshot.backfill.skip

    Specifies whether to skip backfill during the snapshot read phase.

    No

    BOOLEAN

    false

    Valid values:

    • true: Skips backfill during the snapshot read phase.

    • false (default): Does not skip backfill during the snapshot read phase.

    If you skip backfill, changes to the table during the snapshot phase are read in the later incremental phase instead of being merged into the snapshot.

    Important

    Skipping backfill may cause data inconsistency because changes that occur during the snapshot phase might be replayed. Only at-least-once semantics are guaranteed.

    Note

    Supported only in Realtime Compute for Apache Flink that uses VVR 11.1 or later.

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    Specifies whether to distribute unbounded chunks first during the snapshot read phase.

    No

    BOOLEAN

    false

    Valid values:

    • true: Distributes unbounded chunks first during the snapshot read phase.

    • false (default): Does not distribute unbounded chunks first during the snapshot read phase.

    This is an experimental feature. Enabling it can reduce the risk of OOM errors when a TaskManager synchronizes the last chunk during the snapshot phase. We recommend that you add this parameter before the job's first startup.

    Note

    Supported only in Realtime Compute for Apache Flink that uses VVR 11.1 or later.

  • Dimension table-specific

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    url

    The JDBC URL.

    Yes

    STRING

    None

    • The URL must contain the MySQL database name or the Oracle service name.

    userName

    The username.

    Yes

    STRING

    None

    None.

    cache

    The cache policy.

    No

    STRING

    ALL

    The following three cache policies are supported:

    • ALL: Caches all data in the dimension table. Before the job runs, the system loads all data from the dimension table into the cache. All subsequent lookups for dimension table data are performed through the cache. If data is not found in the cache, the key does not exist. The full cache is reloaded after it expires.

      This policy is suitable for scenarios where the remote table has a small data volume and many missing keys (the source table and dimension table cannot be joined on the ON condition).

    • LRU: Caches a portion of the data in the dimension table. For each record from the source table, the system first looks for data in the cache. If not found, it queries the physical dimension table. When using this cache policy, you must configure the cacheSize parameter.

    • None: No cache.

    Important
    • When using the ALL cache policy, monitor the node's memory size to prevent OOM errors.

    • Because the system loads dimension table data asynchronously, when using the ALL cache policy, you need to increase the memory of the dimension table join node. The increased memory size should be twice the data volume of the remote table.

    cacheSize

    The maximum number of cached entries.

    No

    INTEGER

    100000

    • If you select the LRU cache policy, you must set the cache size.

    • If you select the ALL cache policy, you do not need to set the cache size.

    cacheTTLMs

    The cache time-to-live (TTL).

    No

    LONG

    Long.MAX_VALUE

    The configuration of cacheTTLMs depends on the cache parameter:

    • If cache is set to None, you do not need to configure cacheTTLMs. This means the cache does not time out.

    • If cache is set to LRU, cacheTTLMs is the cache TTL. By default, the cache does not expire.

    • If cache is set to ALL, cacheTTLMs is the cache reload time. By default, the cache is not reloaded.

    maxRetryTimeout

    The maximum retry time.

    No

    DURATION

    60s

    None.

  • Sink-specific

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    userName

    The username.

    Yes

    STRING

    None

    None.

    compatibleMode

    The compatibility mode of OceanBase.

    No

    STRING

    mysql

    Valid values:

    • mysql

    • oracle

    Note

    This is an OceanBase-specific parameter.

    url

    The JDBC URL.

    Yes

    STRING

    None

    • The URL must contain the MySQL database name or the Oracle service name.

    tableName

    The table name.

    Yes

    STRING

    None

    None.

    maxRetryTimes

    The maximum number of retries.

    No

    INTEGER

    3

    None.

    poolInitialSize

    The initial size of the database connection pool.

    No

    INTEGER

    1

    None.

    poolMaxActive

    The maximum number of connections in the database connection pool.

    No

    INTEGER

    8

    None.

    poolMaxWait

    The maximum time to wait for a connection from the database connection pool.

    No

    INTEGER

    2000

    Unit: milliseconds.

    poolMinIdle

    The minimum number of idle connections in the database connection pool.

    No

    INTEGER

    1

    None.

    connectionProperties

    The JDBC connection properties.

    No

    STRING

    None

    The format is "k1=v1;k2=v2;k3=v3".

    ignoreDelete

    Specifies whether to ignore DELETE operations.

    No

    Boolean

    false

    None.

    excludeUpdateColumns

    Specifies the names of columns to exclude. These columns are not updated during update operations.

    No

    STRING

    None

    If you specify multiple columns to ignore, separate them with commas (,). Example: excludeUpdateColumns=column1,column2.

    Note

    This value always includes the primary key column. The columns that actually take effect are the ones you specify plus the primary key column.

    partitionKey

    The partition key.

    No

    STRING

    None

    When a partition key is set, the connector first groups data by the partition key. Each group is then written to the database separately. This grouping is processed before the modRule.

    modRule

    The grouping rule.

    No

    STRING

    None

    The grouping rule must be in the format of "column_name mod number", such as user_id mod 8. The column must be of a numeric type.

    When a grouping rule is set, data is first partitioned by the partitionKey. Within each partition, data is then grouped based on the result of the modRule calculation.

    bufferSize

    The size of the data buffer.

    No

    INTEGER

    1000

    None.

    flushIntervalMs

    The interval for flushing the cache. If the data in the cache does not meet the output condition after the specified waiting time, the system automatically outputs all data in the cache.

    No

    LONG

    1000

    None.

    retryIntervalMs

    The maximum retry time.

    No

    INTEGER

    5000

    Unit: milliseconds.

Type mapping

  • MySQL-compatible mode

    OceanBase field type

    Flink field type

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    REAL

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    NUMERIC(p, s)

    DECIMAL(p, s)

    Note

    p must be less than or equal to 38.

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    Important

    Flink supports only BLOB type records that are less than or equal to 2,147,483,647 (2^31 - 1) bytes.

    BLOB

    MEDIUMBLOB

    LONGBLOB

  • Oracle-compatible mode

    OceanBase field type

    Flink field type

    NUMBER(p, s <= 0), p - s < 3

    TINYINT

    NUMBER(p, s <= 0), p - s < 5

    SMALLINT

    NUMBER(p, s <= 0), p - s < 10

    INT

    NUMBER(p, s <= 0), p - s < 19

    BIGINT

    NUMBER(p, s <= 0), 19 <= p - s <= 38

    DECIMAL(p - s, 0)

    NUMBER(p, s > 0)

    DECIMAL(p, s)

    NUMBER(p, s <= 0), p - s > 38

    STRING

    FLOAT

    FLOAT

    BINARY_FLOAT

    BINARY_DOUBLE

    DOUBLE

    NUMBER(1)

    BOOLEAN

    DATE

    TIMESTAMP [(p)] [WITHOUT TIMEZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    STRING

    NCHAR(n)

    NVARCHAR2(n)

    VARCHAR(n)

    VARCHAR2(n)

    CLOB

    BLOB

    BYTES

    ROWID

Usage examples

  • Source and sink tables

    -- OceanBase CDC source table
    CREATE TEMPORARY TABLE oceanbase_source (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    -- OceanBase sink table
    CREATE TEMPORARY TABLE oceanbase_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTableName>'
    );
    
    
    BEGIN STATEMENT SET;  
    INSERT INTO oceanbase_sink
    SELECT * FROM oceanbase_source;
    END; 

  • Dimension table

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE oceanbase_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '${secret_values.password}',
      'tableName' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T 
    JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H 
    ON T.a = H.a;

References

For more information about the connectors that Flink supports, see Supported connectors.