Kafka is a distributed message queue service that features high throughput and high scalability. Kafka is widely used for big data analytics such as log collection, monitoring data aggregation, streaming processing, and online and offline analysis. It is important for the big data ecosystem. This topic describes how to synchronize data from a PolarDB for Oracle cluster to a self-managed Kafka cluster by using Data Transmission Service (DTS). The data synchronization feature allows you to extend message processing capabilities.

Prerequisites

  • The source PolarDB for Oracle cluster uses the latest version. For more information, see Version Management.
  • The tables to be synchronized contain primary keys or UNIQUE NOT NULL indexes.
  • The value of the wal_level parameter is set to logical for the source PolarDB for Oracle cluster. This setting ensures that logical decoding is supported in write-ahead logging (WAL). For more information, see Configure cluster parameters.

Precautions

  • In this scenario, DTS supports only incremental data synchronization.Schema synchronization and full data synchronization are not supported.
  • A data synchronization task can synchronize data from only a single database. To synchronize data from multiple databases, you must create a data synchronization task for each database.
  • To ensure that the latency of data synchronization is accurate, DTS adds a heartbeat table named dts_postgres_heartbeat to the source database. The following figure shows the schema of the heartbeat table. Schema of the heartbeat table

Procedure

  1. Purchase a data synchronization instance. For more information, see Purchase a DTS instance.
    Note On the buy page, set Source Instance to PolarDB, Destination Instance to Kafka, and Synchronization Topology to One-way Synchronization.
  2. Log on to the DTS console.
  3. In the left-side navigation pane, click Data Synchronization.
  4. At the top of the Synchronization Tasks page, select the region where the destination instance resides.
  5. Find the data synchronization instance and click Configure Synchronization Channel in the Actions column.
  6. Configure the source and destination instances.
    Configure the source and destination instances
    Section Parameter Description
    N/A Synchronization Task Name The task name that DTS automatically generates. We recommend that you specify a descriptive name that makes it easy to identify the task. You do not need to use a unique task name.
    Source Instance Details Instance Type The value of this parameter is set to PolarDB Instance and cannot be changed.
    Instance Region The source region that you selected on the buy page. The value of this parameter cannot be changed.
    PolarDB Instance ID The ID of the source PolarDB for Oracle cluster.
    Database Name The name of the source database.
    Database Account A privileged account of the source PolarDB for Oracle cluster. For more information about how to create a privileged database account, see Create database accounts.
    Database Password The password of the database account.
    Destination Instance Details Instance Type The access method of the self-managed Kafka cluster. In this example, User-Created Database with Public IP Address is selected.
    Note If the self-managed Kafka cluster is connected over other methods, you must deploy the network environment for the Kafka cluster. For more information, see Preparation overview.
    Instance Region The destination region that you selected on the buy page. The value of this parameter cannot be changed.
    ECS Instance ID The ID of the Elastic Compute Service (ECS) instance on which the Kafka cluster is deployed.
    Database Type Select Kafka.
    Port Number The service port number of the Kafka cluster. Default value: 9092.
    Database Account The username that is used to log on to the Kafka cluster. If no authentication is enabled for the Kafka cluster, you do not need to enter the username.
    Database Password The password that corresponds to the username. If no authentication is enabled for the Kafka cluster, you do not need to enter the password.
    Topic Click Get Topic List and select a topic name from the drop-down list.
    Kafka version The version of the self-managed Kafka cluster.
    Encryption Select Non-encrypted or SCRAM-SHA-256 based on your business and security requirements.
  7. In the lower-right corner of the page, click Set Whitelist and Next.
    Note
    • You do not need to modify the security settings for ApsaraDB instances (such as ApsaraDB RDS for MySQL and ApsaraDB for MongoDB) and ECS-hosted databases. DTS automatically adds the CIDR blocks of DTS servers to the whitelists of ApsaraDB instances or the security group rules of Elastic Compute Service (ECS) instances. For more information, see Add the CIDR blocks of DTS servers to the security settings of on-premises databases.
    • After data synchronization is complete, we recommend that you remove the CIDR blocks of DTS servers from the whitelists or security groups.
  8. Select the objects to be synchronized.
    Select the objects to be synchronized
    Setting Description
    Select the objects to be synchronized Select one or more tables from the Available section and click the Rightwards arrow icon to add the tables to the Selected section. You can select only tables as the objects to synchronize.
    Note DTS maps the table names to the topic name that you select in Step 6. If you want to change the topic name, you can move the pointer over the table and click Edit. You must specify a topic that exists in the Kafka cluster. For more information, see Rename an object to be synchronized.
    Data format delivered to Kafka The data that is synchronized to the Kafka cluster is stored in the Avro or SharePlex JSON format. For more information, see Data formats of a Kafka cluster.
    Policy for Shipping Data to Kafka Partitions Select a synchronization policy based on your business requirements. For more information, see Specify the policy for synchronizing data to Kafka partitions.
    Rename Databases and Tables

    You can use the object name mapping feature to rename the objects that are synchronized to the destination instance. For more information, see Object name mapping.

    Retry Time for Failed Connections
    By default, if DTS fails to connect to the source or destination database, DTS retries within the next 720 minutes (12 hours). You can specify the retry time based on your needs. If DTS reconnects to the source and destination databases within the specified time, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.
    Note When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time based on your business needs. You can also release the DTS instance at your earliest opportunity after the source and destination instances are released.
  9. In the lower-right corner of the page, click Next.
  10. Select the initial synchronization type and filter options.
    Advanced settings
    Parameter Description
    Initialize synchronization Initial Incremental Data Synchronization is selected by default. DTS synchronizes incremental data that is generated in the source database to the destination database.
    Filter options Ignore DDL in incremental synchronization phase is selected by default. DTS does not synchronize DDL operations that are performed on the source database during incremental data synchronization.
    Note The setting of this parameter does not take effect. DTS does not synchronize DDL operations that are performed on the source database regardless of whether you select this option.
  11. In the lower-right corner of the page, click Precheck.
    Note
    • Before you can start the data synchronization task, DTS performs a precheck. You can start the data synchronization task only after the task passes the precheck.
    • If the task fails to pass the precheck, you can click the Info icon icon next to each failed item to view details.
      • After you troubleshoot the issues based on the causes, you can run a precheck again.
      • If you do not need to troubleshoot the issues, you can ignore failed items and run a precheck again.
  12. Close the Precheck dialog box after the following message is displayed: The precheck is passed. Then, the data synchronization task starts.
  13. Wait until initial synchronization is completed and the data synchronization task enters the Synchronizing state.
    You can view the state of the data synchronization task on the Synchronization Tasks page. View the status of a data synchronization task