All Products
Search
Document Center

Realtime Compute for Apache Flink:OceanBase connector (public preview)

Last Updated:Apr 22, 2024

This topic describes how to use the OceanBase connector.

Background information

OceanBase is a native distributed hybrid transactional/analytical processing (HTAP) database management system. For more information, visit the OceanBase official website.

The following table describes the capabilities supported by the OceanBase connector.

Item

Description

Table type

Source table, dimension table, and result table

Running mode

Streaming mode and batch mode

Data format

None

Metric

None

API type

SQL API

Data update or deletion in a result table

Supported

Prerequisites

The database and table to which you want to connect are created. For more information about related operations, see the following topics:

Note

To reduce the business system transformation costs incurred when you migrate data from a MySQL database or an Oracle database to an OceanBase database, OceanBase supports MySQL-compatible and Oracle-compatible modes. This helps ensure the consistency of data types, SQL features, and internal views between MySQL or Oracle databases and OceanBase databases.

Limits

  • Dimension table and result table

    • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.1 or later supports the OceanBase connector.

    • The at-least-once semantics can be used. If a result table contains a primary key, idempotence can be used to ensure data correctness.

  • Result table: If OceanBase Database Proxy (OBProxy) is not deployed in the OceanBase database, the OceanBase connector uses OceanBase Connector Java (OCJ) to connect to the OceanBase database. In this case, Config URL is required. The OceanBase cloud platform must be deployed for the OceanBase database. The OCJ connection mode can be used only when the OceanBase database uses the MySQL-compatible mode.

    Note

    OBProxy and OCJ provide the same routing feature. The OCJ driver is integrated into the Java application. OBProxy is an independent proxy service. The OceanBase team recommends that you use OBProxy to connect to OceanBase clusters. In most cases, the OCJ driver is used to achieve compatibility with specific historical clusters and applications.

Syntax

CREATE TABLE oceanabse_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'oceanbase',
  'url' = '<yourJdbcUrl>',
  'tableName' = '<yourTableName>',
  'userName' = '<yourUserName>',
  'password' = '<yourPassword>'
);

Note

When the connector writes data to a result table, the connector concatenates each received data record into an SQL statement and executes the SQL statement based on the following rules:

  • If the result table does not have a primary key, the received data records are concatenated into an INSERT INTO statement.

  • If the result table has a primary key, the received data records are concatenated into an UPSERT statement based on the compatibility mode of the database.

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    connector

    The type of the table.

    Yes

    STRING

    No default value

    • If you use the OceanBase connector for a source table or dimension table, set the value to oceanbase.

    • If you use the OceanBase connector for a result table, specify a value for this parameter based on the following rules:

      • If you use OBProxy, set the value to oceanbase.

      • If you want to set up a direct connection to the OceanBase cluster, set the value to oceanbase-ocj.

    userName

    The username that is used to access the OceanBase database.

    Yes

    STRING

    No default value

    N/A.

    password

    The password that is used to access the OceanBase database.

    Yes

    STRING

    No default value

    N/A.

  • Parameters only for source tables

    Note

    The OceanBase connector provides the database-name and table-name parameters for regular expression matching of tables to be listened to and the table-list parameter for exact matching of tables to be listened to. If you use regular expression matching together with exact matching to specify tables, the connector listens to all tables that are specified by using the two matching methods.

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    logproxy.host

    The IP address or hostname of the OceanBase log proxy (OBLogProxy).

    Yes

    STRING

    No default value

    N/A.

    logproxy.port

    The port number of OBLogProxy.

    Yes

    INTEGER

    No default value

    N/A.

    scan.startup.mode

    The mode in which the OceanBase CDC connector starts to read data from an OceanBase database.

    Yes

    STRING

    No default value

    Valid values:

    • initial: All data is pulled from the initial offset.

    • latest-offset: Change data is pulled from the current offset.

    • timestamp: Change data is pulled from a specific timestamp.

    tenant-name

    The name of the tenant to which the OceanBase database belongs.

    Yes

    STRING

    No default value

    N/A.

    database-name

    The name of the OceanBase database.

    No

    STRING

    No default value

    You can use a regular expression to specify a database name.

    Note

    This parameter is required only when the scan.startup.mode parameter is set to initial.

    table-name

    The name of the table in the OceanBase database.

    No

    STRING

    No default value

    You can use a regular expression to specify a table name.

    Note

    This parameter is required only when the scan.startup.mode parameter is set to initial.

    table-list

    The names of tables in the full path of the OceanBase database.

    No

    STRING

    No default value

    Separate multiple table names with commas (,). Example: db1.table1, db2.table2.

    hostname

    The IP address or hostname of the OceanBase database or OBProxy.

    No

    STRING

    No default value

    N/A.

    port

    The port number of the OceanBase database server.

    No

    INTEGER

    No default value

    The value of this parameter can be the SQL port number of the OceanBase server. Default value: 2881.

    The value of this parameter can also be the port number of OBProxy. Default value: 2883.

    connect.timeout

    The timeout period before Realtime Compute for Apache Flink connects to the OceanBase database server.

    No

    DURATION

    30s

    N/A.

    server-time-zone

    The session time zone in the database server.

    No

    STRING

    +00:00

    The session time zone is in the ±hh:mm format. The value of this parameter represents the time zone offset from Coordinated Universal Time (UTC).

    Note
    • If you specify the session time zone, the display and storage of data of a time type are affected. Therefore, if you want to control the conversion of data of a time type in the OceanBase database into strings, you must specify a proper session time zone. This helps ensure that the correct local time is displayed.

    • If a table that stores time zone data already exists in the MySQL database, you can use the time zone created in the table as a valid value of this parameter when you specify the session time zone.

    logproxy.client.id

    The client ID of OBLogProxy.

    No

    STRING

    Generated by rules

    If you do not specify this parameter, Realtime Compute for Apache Flink automatically generates the client ID based on the {flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant} rule.

    rootserver-list

    The OceanBase root servers.

    No

    STRING

    No default value

    The value of this parameter is in the ip:rpc_port:sql_port format. You can execute the SHOW PARAMETERS LIKE 'rootservice_list'; statement to obtain the root servers.

    Note
    • This parameter is required in OceanBase Community Edition.

    • Separate multiple root servers with semicolons (;).

    config-url

    The URL that is used to obtain server information from the Config server.

    No

    STRING

    No default value

    This parameter is required in OceanBase Enterprise Edition.

    working-mode

    The working mode of libobcdc in OBLogProxy.

    No

    STRING

    storage

    Valid values:

    • storage: Data is stored on a disk or in other persistent storage media.

    • memory: Data is stored in the memory.

    compatible-mode

    The compatibility mode of the OceanBase database.

    No

    STRING

    mysql

    Valid values:

    • mysql

    • oracle

    jdbc.driver

    The class name of a JDBC driver that is used to read all data from the source table.

    No

    STRING

    com.mysql.jdbc.Driver

    N/A.

    jdbc.properties.*

    Custom JDBC URL parameters.

    No

    STRING

    No default value

    For example, if you set the jdbc.properties.useSSL parameter to false, SSL encryption is not enabled.

    obcdc.properties.*

    Custom obcdc parameters that are used in libobcdc.

    No

    STRING

    No default value

    Example: 'obcdc.properties.sort_trans_participants' = '1'.

    For more information about the parameters, see obcdc parameters.

  • Parameters only for dimension tables

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    url

    The Java Database Connectivity (JDBC) URL or Config URL.

    Yes

    STRING

    No default value

    • If the connector parameter is set to oceanbase, the JDBC URL is used. If the connector parameter is set to oceanbase-ocj, the Config URL is used.

    • The URL must contain the MySQL database name or Oracle service name.

    cache

    The cache policy.

    No

    STRING

    ALL

    Valid values:

    • ALL: indicates that all the data in the dimension table is cached. Before a deployment runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

      If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause.

    • LRU: Only specific data in the dimension table is cached. The system searches for data in the cache each time a data record is read from the source table. If the data is not found, the system searches for the data in the physical dimension table. If you set the cache parameter to LRU, you must configure the cacheSize parameter.

    • None: No data is cached.

    Important
    • If you set the cache parameter to ALL, you must monitor the memory usage of the operator to prevent out of memory (OOM) errors.

    • If you set the cache parameter to ALL, you must increase the memory of the operator for joining tables because the system asynchronously loads data from the dimension table. The increased memory size is twice that of the remote table.

    cacheSize

    The maximum number of data records that can be cached.

    No

    INTEGER

    100000

    • You must configure this parameter if the cache parameter is set to LRU.

    • You do not need to configure this parameter if the cache parameter is set to ALL.

    cacheTTLMs

    The cache timeout period.

    No

    LONG

    Long.MAX_VALUE

    The configuration of the cacheTTLMs parameter varies based on the value of the cache parameter.

    • If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.

    • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.

    • If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system reloads the cache. By default, the cache is not refreshed.

    maxRetryTimeout

    The maximum timeout period for a retry.

    No

    DURATION

    60s

    N/A.

  • Parameters only for result tables

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    compatibleMode

    The compatibility mode of the OceanBase database.

    No

    STRING

    mysql

    Valid values:

    • mysql

    • oracle

    Note

    This parameter is required only when the connector parameter is set to oceanbase.

    databaseName

    The name of the database.

    Yes

    STRING

    No default value

    The value must be the same as the value that is used in Config URL.

    Note

    This parameter is required only when the connector parameter is set to oceanbase-ocj.

    passwordEncrypted

    Specifies whether to use an encrypted password.

    No

    BOOLEAN

    false

    This parameter is required only when the connector parameter is set to oceanbase-ocj.

    slowQueryThresholdMs

    The wait threshold for slow queries.

    No

    INTEGER

    60000

    Unit: milliseconds.

    Note

    This parameter is required only when the connector parameter is set to oceanbase-ocj.

    url

    The JDBC URL or Config URL.

    Yes

    STRING

    No default value

    • If the connector parameter is set to oceanbase, the JDBC URL is used. If the connector parameter is set to oceanbase-ocj, the Config URL is used.

    • The URL must contain the MySQL database name or Oracle service name.

    tableName

    The name of the table in the database.

    Yes

    STRING

    No default value

    N/A.

    maxRetryTimes

    The maximum number of retries for writing data to the table.

    No

    INTEGER

    3

    N/A.

    poolInitialSize

    The initial size of the database connection pool.

    No

    INTEGER

    1

    N/A.

    poolMaxActive

    The maximum number of connections in the database connection pool.

    No

    INTEGER

    8

    N/A.

    poolMaxWait

    The maximum wait time for obtaining a connection from the database connection pool.

    No

    INTEGER

    2000

    Unit: milliseconds.

    poolMinIdle

    The minimum number of idle connections in the database connection pool.

    No

    INTEGER

    1

    N/A.

    connectionProperties

    The JDBC connection properties.

    No

    STRING

    No default value

    The value is in the k1=v1;k2=v2;k3=v3 format.

    ignoreDelete

    Specifies whether to ignore delete operations.

    No

    BOOLEAN

    false

    N/A.

    excludeUpdateColumns

    The names of the columns that you want to exclude. If you configure this parameter, the excluded columns are not updated when you perform update operations.

    No

    STRING

    No default value

    Separate multiple column names with commas (,). Example: excludeUpdateColumns=column1,column2.

    Note

    The columns that are not updated when you perform update operations include the primary key column and the columns specified by this parameter.

    partitionKey

    The partition key column.

    No

    STRING

    No default value

    If you specify a partition key, the connector groups data based on the partition key. Then, data of each group is separately written to the database. The grouping operation based on the partition key is performed earlier than the grouping operation based on the grouping rule specified by the modRule parameter.

    modRule

    The grouping rule.

    No

    STRING

    No default value

    The grouping rule must be in the format of column name mod number. Data in the specified column must be of the NUMERIC type. If you configure a grouping rule, data is grouped based on the calculated results. Then, the data of each group is separately written to the database. The grouping operation based on the grouping rule is performed later than the grouping operation based on the partition key specified by the partitionKey parameter.

    bufferSize

    The buffer size.

    No

    INTEGER

    1000

    N/A.

    flushIntervalMs

    The interval at which the cache is cleared. The value of this parameter indicates that if the number of input data records does not reach the value specified by the batchSize parameter within a specific period of time, all cached data is written to the result table.

    No

    LONG

    1000

    N/A.

    retryIntervalMs

    The maximum timeout period for a retry.

    No

    INTEGER

    5000

    Unit: milliseconds.

Data type mappings

  • OceanBase database in MySQL-compatible mode

    Data type of OceanBase

    Data type of Realtime Compute for Apache Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    REAL

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    NUMERIC(p, s)

    DECIMAL(p, s)

    Note

    The value of p is less than or equal to 38.

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    Important

    Flink supports only data records of the BLOB type that are less than or equal to 2,147,483,647(2^31 - 1).

    BLOB

    MEDIUMBLOB

    LONGBLOB

  • OceanBase database in Oracle-compatible mode

    Data type of OceanBase

    Data type of Realtime Compute for Apache Flink

    NUMBER(p, s <= 0), p - s < 3

    TINYINT

    NUMBER(p, s <= 0), p - s < 5

    SMALLINT

    NUMBER(p, s <= 0), p - s < 10

    INT

    NUMBER(p, s <= 0), p - s < 19

    BIGINT

    NUMBER(p, s <= 0), 19 <= p - s <= 38

    DECIMAL(p - s, 0)

    NUMBER(p, s > 0)

    DECIMAL(p, s)

    NUMBER(p, s <= 0), p - s > 38

    STRING

    FLOAT

    FLOAT

    BINARY_FLOAT

    BINARY_DOUBLE

    DOUBLE

    NUMBER(1)

    BOOLEAN

    DATE

    TIMESTAMP [(p)] [WITHOUT TIMEZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    STRING

    NCHAR(n)

    NVARCHAR2(n)

    VARCHAR(n)

    VARCHAR2(n)

    CLOB

    BLOB

    BYTES

    ROWID

Sample code

  • Sample code for a source table or result table

    CREATE TEMPORARY TABLE oceanbase_source (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'scan.startup.mode' = 'initial',
      'username' = 'user',
      'password' = 'password',
      'tenant-name' = 'tenant',
      'database-name' = '^test_db$',
      'table-name' = '^orders$',
      'hostname' = '11.22.33.44',
      'port' = '2883',
      'config-url' = 'http://11.22.33.44:55/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx',
      'logproxy.host' = '11.22.33.44',
      'logproxy.port' = '2983',
      'working-mode' = 'memory'
    );
    
    -- Create a result table for which the connector parameter is set to oceanbase.
    CREATE TEMPORARY TABLE oceanbase_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTableName>'
    );
    
    -- Create a result table for which the connector parameter is set to oceanbase-ocj.
    CREATE TEMPORARY TABLE oceanbase_ocj_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase-ocj',
      'url' = '<yourConfigUrl>',
      'userName' = '<yourUserName>',
      'password' = '${secret_values.password}',
      'databaseName' = '<yourDatabaseName>',
      'tableName' = '<yourTableName>'
    );
    
    BEGIN STATEMENT SET;  
    INSERT INTO oceanbase_sink
    SELECT * FROM oceanbase_source;
    INSERT INTO oceanbase_ocj_sink
    SELECT * FROM oceanbase_source;
    END; 

  • Sample code for a dimension table

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE oceanbase_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '${secret_values.password}',
      'tableName' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T 
    JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H 
    ON T.a = H.a;

References

For more information about the connectors that are supported by Realtime Compute for Apache Flink, see Supported connectors.