All Products
Search
Document Center

ApsaraDB for OceanBase:Synchronize data from OceanBase Database to a Kafka instance

Last Updated:Apr 03, 2024

This topic describes how to use the data transmission service to synchronize data from OceanBase Database to a Kafka instance.

Background

Kafka is a widely used high-performance distributed stream computing platform. The data transmission service supports real-time data synchronization between a self-managed Kafka instance and an Oracle or MySQL tenant of OceanBase Database, extending the message processing capability. Therefore, the data transmission service is widely applied to business scenarios such as real-time data warehouse building, data query, and report distribution.

Prerequisites

  • The data transmission service has the privilege to access cloud resources. For more information, see Grant privileges to roles for data transmission.

  • You have created a dedicated database user for data synchronization in the source OceanBase database and granted corresponding privileges to the user. For more information, see Create a database user.

Limitations

  • Only physical tables can be synchronized.

  • The data transmission service supports Kafka V0.9, V1.0, and V2.x.

  • During data synchronization, if you rename a source table to be synchronized and the new name is beyond the synchronization scope, the data of the source table will not be synchronized to the destination Kafka instance.

  • The name of a table to be synchronized, as well as the names of columns in the table, must not contain Chinese characters.

  • The data transmission service supports the migration of only objects whose database name, table name, and column name are ASCII-encoded without special characters. The special characters are line breaks, spaces, and the following characters: . | " ' ` ( ) = ; / &

  • The data transmission service does not support a standby OceanBase database as the source.

Considerations

  • In a data synchronization project where the source is an OceanBase database and DDL synchronization is enabled, if a RENAME operation is performed on a table in the source, we recommend that you restart the project to avoid data loss during incremental synchronization.

  • Take note of the following items when an updated row contains a LOB column:

    • If the LOB column is updated, do not use the value stored in the LOB column before the UPDATE or DELETE operation.

      The following data types are stored in LOB columns: JSON, GIS, XML, user-defined type (UDT), and TEXT such as LONGTEXT and MEDIUMTEXT.

    • If the LOB column is not updated, the value stored in the LOB column before and after the UPDATE or DELETE operation is NULL.

  • If the clocks between nodes or between the client and the server are out of synchronization, the latency may be inaccurate during incremental synchronization.

    For example, if the clock is earlier than the standard time, the latency can be negative. If the clock is later than the standard time, the latency can be positive.

  • When data transfer is resumed for a project, some data (within the last minute) may be duplicated in the Kafka instance, and deduplication is required in downstream systems.

  • When you synchronize data from an OceanBase database to a Kafka instance, if the statement for unique index creation fails the execution at the source, the Kafka instance consumes the DDL statements for unique index creation and deletion. If the downstream DDL statement for unique index creation fails the execution, ignore this exception.

    Important

    Liboblog V2.2.x does not guarantee the order of DDL or DML statements and may cause data quality issues.

Supported source and destination instance types

The following table lists the supported instance types for the source and destination. OceanBase Database has two types of tenants: MySQL and Oracle. The source can be an OceanBase cluster instance or a self-managed database in a virtual private cloud (VPC).

Source

Destination

OceanBase Database

Kafka instance (Kafka instance on Alibaba Cloud)

OceanBase Database

Kafka instance (self-managed Kafka instance in a VPC)

OceanBase Database

Kafka instance (Kafka instance in the public network)

Supported DDL operations

  • CREATE TABLE

    Important

    The created table must be a synchronization object. To execute the CREATE TABLE statement on a synchronized table, execute the DROP TABLE statement on this table first.

  • ALTER TABLE

  • DROP TABLE

  • TRUNCATE TABLE

    Note

    In delayed deletion, the same transaction contains two identical TRUNCATE TABLE DDL statements. In this case, idempotence is implemented for downstream consumption.

  • ALTER TABLE…TRUNCATE PARTITION

  • CREATE INDEX

  • DROP INDEX

  • COMMENT ON TABLE

  • RENAME TABLE

    Important

    The renamed table must be a synchronization object.

Procedure

  1. Log on to the ApsaraDB for OceanBase console and purchase a data synchronization project.

    For more information, see Purchase a data synchronization project.

  2. Choose Data Transmission > Data Synchronization. On the page that appears, click Configure for the data synchronization project.

    image.png

    If you want to reference the configurations of an existing project, click Reference Configuration. For more information, see Reference and clear data synchronization project configurations.

  3. On the Select Source and Destination page, configure the parameters.

    image.png

    Parameter

    Description

    Project Name

    We recommend that you set it to a combination of digits and letters. It must not contain any spaces and cannot exceed 64 characters in length.

    Label

    Click the field and select a target tag from the drop-down list. You can also click Manage Tags to create, modify, and delete tags. For more information, see Manage data synchronization projects by using tags.

    Source

    If you have created an OceanBase data source, select it from the drop-down list. Otherwise, click New Data Source in the drop-down list to create one in the dialog box on the right side. For more information about the parameters, see Create an OceanBase data source.

    Important

    The source must not be an OceanBase Database tenant instance.

    Destination

    If you have created a Kafka data source, select it from the drop-down list. Otherwise, click New Data Source in the drop-down list to create one in the dialog box on the right side. For more information, see Create a Kafka data source.

  4. Click Next. On the Select Synchronization Type page, specify the synchronization type for the current data synchronization project.

    image.png

    Valid values: Full Synchronization and Incremental Synchronization. Options for Incremental Synchronization are DML Synchronization and DDL Synchronization. Options for DML Synchronization are Insert, Delete, and Update.

  5. Click Next. On the Select Synchronization Objects page, select the objects to be synchronized in the current data synchronization project.

    When you synchronize data from an OceanBase database to a Kafka instance, you can synchronize data from multiple tables to multiple topics.

    1. In the left-side pane, select the objects to be synchronized.

    2. Click >.

    3. Click the Existing Topics drop-down list in the Map the Object to the Topic dialog box and select the target topic.

      image.png

    4. Click OK.

      The data transmission service allows you to import objects by using text. It also allows you to change the topics of the objects, set row filters, and remove a single object or all objects. Objects in the destination database are listed in the structure of Topic > Database > Table.

      image.png

      Operation

      Description

      Import Objects

      1. In the list on the right, click Import Objects in the upper-right corner.

      2. In the dialog box that appears, click OK.

        Important

        This operation will overwrite previous selections. Proceed with caution.

      3. In the Import Synchronization Objects dialog box, import the objects to be synchronized. You can import CSV files to set row filter conditions, filter columns, and sharding columns. For more information, see Download and import the settings of synchronization objects.

      4. Click Validate.

      5. After the validation is passed, click OK.

      Change Topic

      The data transmission service allows you to change topics for objects in the destination database. For more information, see Change topics.

      Settings

      You can use the WHERE clause to filter data by row, select sharding columns, and select columns to be synchronized.

      In the Settings dialog box, you can perform the following operations:

      • In the Row Filters section, specify a standard SQL WHERE clause to filter data by row. For more information, see Use SQL conditions to filter data.

      • Select the sharding columns that you want to use from the Sharding Columns drop-down list. You can select multiple fields as sharding columns. This parameter is optional.

        Unless otherwise specified, select the primary keys as sharding columns. If the primary keys are not load-balanced, select load-balanced fields with unique identifiers as sharding columns to avoid potential performance issues. Sharding columns can be used for the following purposes:

        • Load balancing: Threads used for sending messages can be recognized based on the sharding columns if the destination table supports concurrent writes.

        • Orderliness: The data transmission service ensures that messages are received in order if the values of the sharding columns are the same. The orderliness specifies the sequence of executing DML statements for a column.

      • In the Select Columns section, select the columns to be synchronized. For more information, see Column filtering.

      Remove/Remove All

      The data transmission service allows you to remove a single object or all migration objects that are added to the right-side list during data mapping.

      • Remove a single synchronization object

        In the list on the right, move the pointer over the object that you want to remove, and click Remove to remove the synchronization object.

      • Remove all synchronization objects

        In the list on the right, click Remove All in the upper-right corner. In the dialog box that appears, click OK to remove all synchronization objects.

  6. Click Next. On the Synchronization Options page, configure the parameters.

    image.png

    Parameter

    Description

    Incremental Synchronization Start Timestamp

    • If you have selected Full Synchronization as the synchronization type, the default value of this parameter is the start time of incremental synchronization and cannot be modified.

    • If you have not selected Full Synchronization as the synchronization type, set this parameter to a certain point of time, which is the current system time by default. For more information, see Set an incremental synchronization timestamp.

    Serialization Method

    The message format for synchronizing data to the destination Kafka instance. Valid values: Default, Canal, DataWorks (version 2.0 supported), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, DebeziumSmt, and Avro. For more information, see Data formats.

    Important
    • Only MySQL tenants of OceanBase Database support Debezium, DebeziumFlatten, DebeziumSmt, and Avro.

    • If the message format is set to DataWorks, DDL operations COMMENT ON TABLE and ALTER TABLE…TRUNCATE PARTITION cannot be synchronized.

    Partitioning Rules

    The rule for synchronizing data from an OceanBase database to a Kafka topic. The data transmission service supports Hash, Table, and One.

    • Hash indicates that the data transmission service uses a hash algorithm to select the partition of a Kafka topic based on the value of the primary key or sharding column.

    • Table indicates that the data transmission service delivers all data in a table to the same partition and uses the table name as the hash key.

    • One indicates that JSON messages are delivered to a partition under a topic to ensure ordering.

    Business System Identification (Optional)

    Identifies the source business system of data. The business system identifier consists of 1 to 20 characters.

  7. Click Precheck.

    During the precheck, the data transmission service detects the connection with the destination Kafka instance. You can perform the following operations if an error is returned during the precheck:

    • Identify and troubleshoot the problem and then perform the precheck again.

    • Click Skip in the Actions column of the failed precheck item. A dialog box appears, prompting you about the impact. If you want to skip this operation, click OK.

  8. After the precheck is passed, click Start Project.

    If you do not need to start the project now, click Save. You can manually start the project on the Synchronization Projects page or by performing batch operations later. For more information about the batch operations, see Perform batch operations on data synchronization projects.

    The data transmission service allows you to modify the synchronization objects when a synchronization project is running. For more information, see View and modify synchronization objects. After the data synchronization project is started, it will be executed based on the selected synchronization types. For more information, see View synchronization details.

If the data synchronization project encounters a running exception due to a network failure or slow startup of processes, you can click Recover on the Synchronization Projects page or on the Details page of the synchronization project.

References