全部产品
Search
文档中心

实时计算Flink版:OceanBase(公测中)

更新时间:Apr 09, 2024

本文为您介绍如何使用OceanBase连接器。

背景信息

OceanBase数据库是一款原生分布式的HTAP数据库管理系统,详情请参见OceanBase官网。为了降低您从MySQL数据库或Oracle数据库迁移到OceanBase数据库时引发的业务系统改造成本,OceanBase数据库支持Oracle和MySQL两种兼容模式,两种模式下的数据类型、SQL功能、内部视图等与MySQL数据库或Oracle数据库保持一致。两种模式下建议使用的连接器如下:

  • Oracle模式:只能使用OceanBase连接器。

  • MySQL模式:与原生MySQL保持高度兼容,支持使用OceanBase和MySQL两种连接器,MySQL连接器详情请参见MySQL

    • 在无需高级特性的情况下,维表和结果表建议优先考虑MySQL连接器,配置更简单。

    • 使用OceanBase 3.2.4.4及以上版本时,源表建议优先使用MySQL连接器(因为OceanBase 3.2.4.4及以上版本MySQL模式开始支持开启Binlog服务,输出格式与原生MySQL Binlog一致),业务架构改造成本低。

OceanBase连接器支持的信息如下。

类别

详情

支持类型

源表、维表和结果表

运行模式

流模式和批模式

数据格式

暂不适用

特有监控指标

暂无

API种类

SQL

是否支持更新或删除结果表数据

前提条件

连接的数据库和表都已被创建。具体操作可参考以下文档:

使用限制

  • 维表和结果表

    • Flink计算引擎VVR 8.0.1及以上版本支持OceanBase连接器。

    • 语义上可以保证At-Least-Once,在结果表有主键的情况下,幂等可以保证数据的正确性。

  • 结果表:OceanBase数据库没有部署OceanBase数据库代理(OBProxy)时,连接器会使用OCJ(OceanBase Connector Java)连接OceanBase数据库,该模式需要用到config url,要求OceanBase数据库已部署OceanBase云平台。该工作方式只能用于OceanBase数据库的MySQL兼容模式,不支持Oracle兼容模式。

    说明

    OBProxy与OCJ实现了相同的路由功能,区别在于OCJ驱动集成于Java应用程序,而OBProxy是一个独立的代理服务。目前,OceanBase团队推荐通过OBProxy来连接OceanBase集群,OCJ驱动主要用于兼容一些历史集群和应用程序。

语法结构

CREATE TABLE oceanabse_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'oceanbase',
  'url' = '<yourJdbcUrl>',
  'tableName' = '<yourTableName>',
  'userName' = '<yourUserName>',
  'password' = '<yourPassword>'
);

说明

连接器写入结果表原理:写入结果表时,会将接收到的每条数据拼接成一条SQL去执行。具体执行的SQL情况如下:

  • 对于没有主键的结果表,会拼接成INSERT INTO语句。

  • 对于包含主键的结果表,会根据数据库的兼容模式拼接成UPSERT语句。

WITH参数

  • 通用

    参数

    说明

    是否必填

    数据类型

    默认值

    备注

    connector

    表类型。

    STRING

    • 作为CDC源表或维表时,固定值为oceanbase

    • 作为结果表时,参数取值如下:

      • 如果使用了OceanBase数据库代理OBProxy,则表类型取值为oceanbase

      • 如果直连OceanBase集群,则表类型取值为oceanbase-ocj

    userName

    用户名。

    STRING

    无。

    password

    密码。

    STRING

    无。

  • 源表独有

    说明

    连接器支持通过数据库名称(database-name)和表名(table-name)的正则匹配和表列表(table-list)的精确匹配两种模式来指定需要监听的表。当同时使用两种方式时,将会监听两种方式匹配的所有表。

    参数

    说明

    是否必填

    数据类型

    默认值

    备注

    logproxy.host

    OceanBase日志代理服务器的IP地址或主机名。

    String

    无。

    logproxy.port

    OceanBase日志代理服务器的端口号。

    Integer

    无。

    scan.startup.mode

    OceanBase CDC的启动模式。

    String

    参数取值如下:

    • initial:从初始位点开始拉取全部数据。

    • latest-offset:从当前位点开始拉取变更数据。

    • timestamp:从指定的时间戳开始拉取变更数据。

    tenant-name

    OceanBase数据库的租户名。

    String

    无。

    database-name

    OceanBase数据库名称。

    String

    支持使用正则表达式指定数据库名称。

    说明

    仅支持在启动模式为initial时,使用该参数。

    table-name

    OceanBase数据库的表名称。

    String

    支持使用正则表达式指定表名称。

    说明

    仅支持在启动模式为initial时,使用该参数。

    table-list

    OceanBase数据库的全路径的表名列表。

    String

    可以使用英文逗号(,)分隔,例如db1.table1, db2.table2

    hostname

    OceanBase数据库或 OceanBase 代理OBProxy的IP地址或主机名。

    String

    无。

    port

    OceanBase数据库服务器的端口号。

    Integer

    可以是OceanBase服务器的SQL端口号(默认值为2881)

    或OceanBase代理服务器的端口号(默认值为2883)。

    connect.timeout

    连接到OceanBase数据库服务器之前的最长超时时间。

    Duration

    30s

    无。

    server-time-zone

    数据库服务器中的会话时区。

    String

    +00:00

    会话时区值的合法格式为±hh:mm,表示与协调世界时(UTC)的时区偏移量。

    说明
    • 会话时区的设置会影响到时间类型的显示和存储方式。因此,如果您需要控制OceanBase的时间类型如何转换为字符串,则需要设置合理的会话时区信息,以确保显示正确的本地时间。

    • 如果您在MySQL数据库中已存在一个用于存储时区信息的表,则在设置时区时,可以使用这个表中已经创建的时区作为合法的值。

    logproxy.client.id

    OceanBase日志代理服务器的客户端连接ID。

    String

    规则生成

    如果您没有指定,则Flink会默认按照{flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}规则生成。

    rootserver-list

    OceanBase根服务器列表。

    String

    服务器列表格式为ip:rpc_port:sql_port。您可以执行SHOW PARAMETERS LIKE 'rootservice_list';SQL语句获取服务器列表信息。

    说明
    • OceanBase社区版本必填。

    • 多个服务器地址使用英文分号(;)分隔。

    config-url

    从配置服务器获取服务器信息的url。

    String

    OceanBase企业版本必填。

    working-mode

    日志代理中libobcdc的工作模式。

    String

    storage

    参数取值如下:

    • storage:表示数据将被存储在磁盘或其他持久性存储介质中。

    • memory:表示数据将被存储在内存中。

    compatible-mode

    OceanBase的兼容模式。

    String

    mysql

    参数取值如下:

    • mysql

    • oracle

    jdbc.driver

    全量读取源表数据时使用的jdbc驱动类名。

    String

    com.mysql.jdbc.Driver

    无。

    jdbc.properties.*

    传递自定义的JDBC URL属性。

    String

    例如 'jdbc.properties.useSSL' = 'false'表示不使用SSL加密。

    obcdc.properties.*

    将自定义的 OBCDC参数传递给libobcdc。

    String

    例如'obcdc.properties.sort_trans_participants' = '1'

    更多参数信息见obcdc parameters

  • 维表独有

    参数

    说明

    是否必填

    数据类型

    默认值

    备注

    url

    JDBC url或config url。

    STRING

    • 当连接器类型为oceanbase时使用JDBC url,连接器类型为oceanbase-ocj时,使用config url。

    • url中需要包含MySQL database名或Oracle service名。

    cache

    缓存策略。

    STRING

    ALL

    支持以下三种缓存策略:

    • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

      适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。

    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据。如果没有找到,则去物理维表中查找。使用该缓存策略时,必须配置cacheSize参数。

    • None:无缓存。

    重要
    • 使用ALL缓存策略时,请注意节点内存大小,防止出现OOM。

    • 因为系统会异步加载维表数据,所以在使用ALL缓存策略时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

    cacheSize

    最大缓存条数。

    INTEGER

    100000

    • 当选择LRU缓存策略后,必须设置缓存大小。

    • 当选择ALL缓存策略后,可以不设置缓存大小。

    cacheTTLMs

    缓存超时时间。

    LONG

    Long.MAX_VALUE

    cacheTTLMs的配置和cache有关,详情如下:

    • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。

    • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。

    • 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。

    maxRetryTimeout

    最大重试时间。

    DURATION

    60s

    无。

  • 结果表独有

    参数

    说明

    是否必填

    数据类型

    默认值

    备注

    compatibleMode

    OceanBase的兼容模式。

    STRING

    mysql

    参数取值如下:

    • mysql

    • oracle

    说明

    oceanabse独有参数。

    databaseName

    数据库名。

    STRING

    应当与config url中保持一致。

    说明

    oceanbase-ocj独有参数。

    passwordEncrypted

    是否使用加密过的密码。

    Boolean

    false

    oceanbase-ocj独有参数。

    slowQueryThresholdMs

    慢查询等待阈值。

    INTEGER

    60000

    单位毫秒。

    说明

    oceanbase-ocj独有参数。

    url

    JDBC url或config url。

    STRING

    • 当连接器类型为oceanbase时使用JDBC url,连接器类型为oceanbase-ocj时,使用config url。

    • url中需要包含MySQL database名或Oracle service名。

    tableName

    表名。

    STRING

    无。

    maxRetryTimes

    最大重试次数。

    INTEGER

    3

    无。

    poolInitialSize

    数据库连接池初始大小。

    INTEGER

    1

    无。

    poolMaxActive

    数据库连接池最大连接数。

    INTEGER

    8

    无。

    poolMaxWait

    从数据库连接池中获取连接的最大等待时间。

    INTEGER

    2000

    单位毫秒。

    poolMinIdle

    数据库连接池中最小空闲连接数。

    INTEGER

    1

    无。

    connectionProperties

    jdbc的连接属性。

    STRING

    格式为"k1=v1;k2=v2;k3=v3"。

    ignoreDelete

    是否忽略数据Delete操作。

    Boolean

    false

    无。

    excludeUpdateColumns

    指定要排除的列名。在执行更新操作时,这些列将不会被更新。

    STRING

    如果忽略指定的字段为多个时,则需要使用英文逗号(,)分隔。例如excludeUpdateColumns=column1,column2

    说明

    该值始终会包含主键列,也就是实际生效的列名为您指定的列名和主键列。

    partitionKey

    分区键。

    STRING

    当设置分区键时,连接器会先将数据按照分区键进行分组,各个分组将分别写入数据库。这里的分组处理早于modRule的处理。

    modRule

    分组规则。

    STRING

    分组规则格式需要为"列名mod数字",列类型必须为数字类型。当设置分组规则时,数据会根据计算所得结果进行分组,各个分组将分别写入数据库。这里的分组处理晚于partitionKey的处理。

    bufferSize

    数据缓冲区大小。

    INTEGER

    1000

    无。

    flushIntervalMs

    清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。

    LONG

    1000

    无。

    retryIntervalMs

    最大重试时间。

    INTEGER

    5000

    单位毫秒。

类型映射

  • MySQL兼容模式

    OceanBase字段类型

    Flink字段类型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    REAL

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    NUMERIC(p, s)

    DECIMAL(p, s)

    说明

    其中p <= 38。

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    重要

    Flink仅支持小于等于2,147,483,647(2^31 - 1)的BLOB类型的记录。

    BLOB

    MEDIUMBLOB

    LONGBLOB

  • Oracle兼容模式

    OceanBase字段类型

    Flink字段类型

    NUMBER(p, s <= 0), p - s < 3

    TINYINT

    NUMBER(p, s <= 0), p - s < 5

    SMALLINT

    NUMBER(p, s <= 0), p - s < 10

    INT

    NUMBER(p, s <= 0), p - s < 19

    BIGINT

    NUMBER(p, s <= 0), 19 <= p - s <= 38

    DECIMAL(p - s, 0)

    NUMBER(p, s > 0)

    DECIMAL(p, s)

    NUMBER(p, s <= 0), p - s > 38

    STRING

    FLOAT

    FLOAT

    BINARY_FLOAT

    BINARY_DOUBLE

    DOUBLE

    NUMBER(1)

    BOOLEAN

    DATE

    TIMESTAMP [(p)] [WITHOUT TIMEZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    STRING

    NCHAR(n)

    NVARCHAR2(n)

    VARCHAR(n)

    VARCHAR2(n)

    CLOB

    BLOB

    BYTES

    ROWID

使用示例

  • 源表&结果表

    CREATE TEMPORARY TABLE oceanbase_source (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'scan.startup.mode' = 'initial',
      'username' = 'user',
      'password' = 'password',
      'tenant-name' = 'tenant',
      'database-name' = '^test_db$',
      'table-name' = '^orders$',
      'hostname' = '11.22.33.44',
      'port' = '2883',
      'config-url' = 'http://11.22.33.44:55/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx',
      'logproxy.host' = '11.22.33.44',
      'logproxy.port' = '2983',
      'working-mode' = 'memory'
    );
    
    -- oceanbase结果表
    CREATE TEMPORARY TABLE oceanbase_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTableName>'
    );
    
    --oceanbase-ocj结果表
    CREATE TEMPORARY TABLE oceanbase_ocj_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase-ocj',
      'url' = '<yourConfigUrl>',
      'userName' = '<yourUserName>',
      'password' = '${secret_values.password}',
      'databaseName' = '<yourDatabaseName>',
      'tableName' = '<yourTableName>'
    );
    
    BEGIN STATEMENT SET;  
    INSERT INTO oceanbase_sink
    SELECT * FROM oceanbase_source;
    INSERT INTO oceanbase_ocj_sink
    SELECT * FROM oceanbase_source;
    END; 

  • 维表

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE oceanbase_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '${secret_values.password}',
      'tableName' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T 
    JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H 
    ON T.a = H.a;

相关文档

Flink支持的连接器,请参见支持的连接器