This topic provides the DDL syntax that is used to create a Hudi result table, describes the background information, limits, and parameters in the WITH clause, and provides examples.

Background information

  • Introduction to Hudi
    The following table describes the definition, features, and common usage scenarios of Hudi.
    Item Description
    Definition Apache Hudi is an open source framework that manages table data in data lakes. Hudi organizes file layouts based on Alibaba Cloud Object Storage Service (OSS) or Hadoop Distributed File System (HDFS) to ensure atomicity, consistency, isolation, durability (ACID) and supports efficient row-level data update and deletion. This simplifies extract, transform, load (ETL) development. Hudi can automatically manage small files and merge small files into a large file of the specified size. This prevents an excessive number of small files from being created during data insertion and update. This avoids the impact of excessive small files on query performance and the O&M workloads that are generated when you monitor and rewrite small files. You can use Hudi with compute engines such as Apache Flink, Apache Presto, and Apache Spark to ingest data into data lakes and compute and analyze the data. In most cases, Hudi is used to meet business requirements, such as acceleration of ingesting data from databases into data lakes, real-time consumption of incremental data, and data backfilling. For more information, see Apache Hudi.
    Features
    • Supports ACID semantics and the serializable isolation level. The serializable isolation level is the strictest level of SQL transaction isolation.
    • Supports UPSERT semantics. The UPSERT semantics combines INSERT and UPDATE semantics. Fully managed Flink uses the UPSERT semantics to write data to a destination table based on the following rules: If a data record that is read from the source table does not exist in the destination table, fully managed Flink inserts the data record into the destination table. If a data record that is read from the source table exists in the destination table, fully managed Flink updates the data record. The INSERT INTO statement can significantly simplify the development code and improve the data processing efficiency.
    • Provides historical details of the data versions at a point in time based on the time travel feature. This helps you perform data O&M in an efficient manner and improves data quality.
    • Supports the schema evolution feature. This feature allows you to perform schema-related operations, such as dynamically adding columns and changing data types.
    Scenarios
    • Acceleration of ingesting data from databases into data lakes

      Compared with the traditional method that is used to load and merge a large volume of data, Hudi allows you to update and write streaming data to a super large dataset in real time in a more cost-effective manner. During the real-time ETL process, you can directly write change data capture (CDC) data to a data lake for downstream business. In most cases, you can use the MySQL CDC connector of fully managed Flink to write binary log data of the relational database management system (RDBMS) MySQL to a Hudi table.

    • Incremental ETL

      You can use the incremental extraction method of ETL to obtain the change data streams from Hudi in real time. This method provides better real-time performance and is more lightweight than offline ETL scheduling. In most cases, the online business data is incrementally extracted to an offline storage system. The Flink engine writes the extracted data to a Hudi table, and then the Apache Presto or Apache Spark engine is used to perform efficient online analytical processing (OLAP).

    • Message Queue service

      In scenarios where a small amount of data exists, Hudi can also be used as a Message Queue service to replace Kafka. This simplifies the application development architecture.

    • Data backfilling

      If you want to update historical full data in specific rows and columns of a table, you can use data lakes. This greatly reduces the consumption of computing resources and improves end-to-end performance. In most cases, full data and incremental data are read from Hudi tables in a Hive metastore and the two tables are joined to generate a wide table.

  • Advantages of fully managed Flink into which Hudi is integrated
    Compared with the open source Hudi community, fully managed Flink into which Hudi is integrated provides more advantages. The following table describes these advantages.
    Advantage Description
    O&M-free based on the integration between the platform and fully managed Flink Fully managed Flink provides the built-in Hudi connector to simplify O&M and provide a service level agreement (SLA) guarantee.
    Improved data connectivity The Hudi connector is connected to multiple Alibaba Cloud big data computing and analytics engines. This way, data is decoupled from computing engines and can be seamlessly migrated among Apache Flink, Apache Spark, Apache Presto, or Apache Hive.
    Optimized processing of ingesting data from databases into data lakes The Hudi connector works with the Flink CDC connector to simplify data development.
    Enterprise-class features Enterprise-class features are supported, such as unified metadata views of Data Lake Formation (DLF) and automatic and lightweight table schema changes.
    Low-cost storage and high scalability by using Alibaba Cloud OSS Data is stored in the Apache Parquet or Apache Avro format in Alibaba Cloud OSS. Storage and computing are isolated and resources can be scaled in a flexible manner.
  • CDC data synchronizationData import process
    CDC data includes complete database changes. You can use one of the following methods to import data to Hudi:
    • Consume and import Kafka data that is in a specific CDC format to Hudi at the same time.

      Three CDC formats are supported: debezium-json, canal-json, and maxwell-json. This method provides high scalability and requires Kafka and Debezium data synchronization tools.

    • Access binary log data of a database by using the Flink CDC connector and import the data to Hudi.

      This method uses lightweight components to reduce the dependency on tools.

    Note
    • If the upstream data order cannot be ensured, you must specify the write.precombine.field field.
    • In CDC scenarios, you must set changelog.enabled to true to enable the changelog mode.

Limits

  • Only Flink that runs vvr-4.0.11-flink-1.13 or later supports the Hudi connector.
  • Only HDFS or Alibaba Cloud OSS can be used as a file system.
  • You cannot submit jobs in a session cluster.

DDL syntax

CREATE TEMPORARY TABLE hudi_sink (
  uuid BIGINT,
  data STRING,
  ts   TIMESTAMP(3)
) WITH (
  'connector' = 'hudi',
  'table.type' = 'COPY_ON_WRITE',
  'path' = 'oss://<yourOSSBucket>/<Custom storage location>',
  'oss.endpoint' = '<yourOSSEndpoint>',
  'accessKeyId' = '<yourAccessKeyId>',
  'accessKeySecret' = '<yourAccessKeySecret>' ,
  'hive_sync.enable'='true',
  'hive_sync.db'='<db name>',
  'hive_sync.table' = '<table name>',
  'hive_sync.mode' = 'hms',
  'dlf.catalog.region' = 'cn-hangzhou',
  'dlf.catalog.endpoint' = 'dlf-vpc.cn-hangzhou.aliyuncs.com'
);

Parameters in the WITH clause

  • Basic parameters
    Parameter Description Required Remarks
    connector The type of the result table. Yes Set the value to hudi.
    table.type The type of the table. Yes Valid values:
    • COPY_ON_WRITE: Parquet columnar storage is used. A base file is created each time data is updated.
    • MERGE_ON_READ: Parquet columnar storage and Avro row-based storage are used. Update operations are recorded in the delta log file, and the delta log file and the Parquet columnar file are asynchronously merged to a new version file.
    path The path in which the table is stored. Yes The table can be stored in an OSS bucket or HDFS. For example, the path can be in the oss://<bucket name>/table or hdfs://<ip>:<port>/table format.
    oss.endpoint The endpoint of your OSS bucket. No If you store the table in OSS, you must configure this parameter. For more information about OSS endpoints, see Regions and endpoints.
    accessKeyId The AccessKey ID of your Alibaba Cloud account. No If you store the table in OSS, you must configure this parameter. For more information about how to obtain an AccessKey ID, see Obtain an AccessKey pair.
    accessKeySecret The AccessKey secret of your Alibaba Cloud account. No If you store the table in OSS, you must configure this parameter. For more information about how to obtain an AccessKey secret, see Obtain an AccessKey pair.
    hive_sync.enable Specifies whether to synchronize metadata to Hive. No Valid values:
    • true: Metadata is synchronized to Hive.
    • false: Metadata is not synchronized to Hive.
    hive_sync.mode The Hive data synchronization mode. No Valid values:
    • hms: If you use a Hive metastore or DLF catalog, set this parameter to hms. This is the recommended value.
    • jdbc: If you use a Java Database Connectivity (JDBC) catalog, set this parameter to jdbc. This is the default value.
    hive_sync.db The name of the Hive database to which data is synchronized. No N/A.
    hive_sync.table The name of the Hive table to which data is synchronized. No N/A.
    dlf.catalog.region The ID of the region in which the DLF service is activated. No
    Note
    • The configuration of the dlf.catalog.region parameter takes effect only when you set hive_sync.mode to hms.
    • Make sure that the value of this parameter matches the endpoint specified by the dlf.catalog.endpoint parameter.
    dlf.catalog.endpoint The endpoint of the DLF service. No
    Note
    • The configuration of the dlf.catalog.endpoint parameter takes effect only when you set hive_sync.mode to hms.
    • We recommend that you set the dlf.catalog.endpoint parameter to a VPC endpoint of DLF. For example, if you select the China (Hangzhou) region, set the dlf.catalog.endpoint parameter to dlf-vpc.cn-hangzhou.aliyuncs.com.
    • If you want to access DLF across VPCs, follow the instructions provided in How does Realtime Compute for Apache Flink access storage resources across VPCs? .
    write.operation The mode that is used for write operations. No Valid values:
    • insert: Data is written to the table in append mode.
    • upsert: Data is updated to the table. This is the default value.
    • bulk_insert: Multiple data records are written to the table at a time.
    write.precombine.field A version field. The system determines whether to update a message based on the size of this field. No The default value is ts. If you do not configure this parameter, the system updates data based on the message sequence that is defined in the engine.
  • Advanced parameters
    • Memory parameters
      Note
      • The unit of all memory parameters is MB.
      • Three factors affect memory: the number and memory configuration of TaskManagers, the parallelism of write tasks, and the memory that can be allocated to each write task. Therefore, we recommend that you confirm the memory that can be allocated to each write task before you consider the settings of relevant memory parameters.
      Parameter Description Default value Remarks
      write.task.max.size The maximum amount of available memory for a write task. 1024 The size of the memory buffer that is reserved for each write task is the difference between the value of the write.task.max.size parameter and the value of the compaction.max_memory parameter.

      If the memory buffer of the write task reaches the threshold, data in the memory is stored on disks.

      Take note of the amount of memory that is allocated by TaskManagers to each write task to ensure that each write task is allocated with memory based on the memory size that is specified by the write.task.max.size parameter. For example, if a TaskManager has 4 GB of memory and runs two StreamWriteFunction tasks, each of the StreamWriteFunction tasks can be allocated with 2 GB of memory. In this case, you can reserve a buffer for other tasks, such as BucketAssignFunction tasks, on the TaskManager to consume memory.

      compaction.max_memory The maximum amount of available memory for file compaction. 100 If you want to compact files online and memory resources are sufficient, you can increase the value of this parameter. For example, you can set this parameter to 1 GB.
      Take note of the changes in compaction memory. The compaction.max_memory parameter specifies the size of memory that can be used when each compaction task reads logs. If memory resources are sufficient, we recommend that you perform the following operation based on the table type:
      • For a Merge on Read (MoR) table, you can increase the value of the compaction.max_memory parameter.
      • For a Copy on Write (CoW) table, you can increase the values of the write.task.max.size and write.merge.max_memory parameters at the same time.
      write.merge.max_memory The CoW operation. During this operation, incremental and full data files are merged. The incremental data is cached in the memory. This parameter specifies the size of the heap memory that can be used. 100 In most cases, you can retain the default value.
    • Parameters for parallelism
      Parameter Description Default value Remarks
      write.tasks The parallelism of write tasks. In each write task, data is written to one bucket or to a specified number of buckets in sequence. 4 If you increase the value of this parameter, the number of small files does not increase.
      write.bucket_assign.tasks The number of bucket assigners that can run in parallel. 1 If you increase the value of this parameter, the number of buckets increases and the number of small files increases.
      write.index_bootstrap.tasks The parallelism of index bootstrap operators. If you increase the parallelism of index bootstrap operators, the efficiency of the bootstrap phase can be improved. Checkpoints may be blocked in the bootstrap phase. To resolve this issue, you can set the number of failed checkpoints that is allowed to a large value. If this parameter is not explicitly specified, the parallelism of Flink operators is used by default. This parameter takes effect only when the index.bootstrap.enabled parameter is set to true.
    • Parameters for online compaction
      Parameter Description Default value Remarks
      compaction.tasks The number of parallel operators that can be compacted online. 4 Online compaction consumes computing resources.
      compaction.trigger.strategy The compaction policy. num_commits Valid values:
      • num_commits: Compaction is triggered when the specified number of commits files is reached.
      • time_elapsed: Compaction is triggered when the specified period of time elapses.
      • num_and_time: Compaction is triggered when both num_commits and time_elapsed are met.
      • num_or_time: Compaction is triggered when num_commits or time_elapsed is met.
      compaction.delta_commits The maximum number of delta commits files. 5 The value of this parameter is an integer. We recommend that you set this parameter to a value not greater than 20. The default value 5 indicates that compaction is triggered when five delta commits files are generated.
      compaction.delta_seconds The maximum interval to trigger online compaction. 3600 Unit: seconds.
      compaction.target_io The maximum I/O throughput for each compression task. 500 (GB) N/A.
  • Parameter for enabling the changelog mode

    The Hudi connector allows you to retain all changes to messages. After the Hudi connector is connected to the Flink engine, you can use the end-to-end near-real-time data warehousing is implemented. For a MoR table, the Hudi connector retains all changes in messages in the row-based storage format. This way, the reader can read all modifications on the MoR table in streaming mode and all change records. In this case, you must set changelog.enabled to true to enable the changelog mode.

    After you set changelog.enabled to true, all changes can be consumed. The asynchronous compaction task compacts intermediate changes into one record. Therefore, if data reading in streaming mode and data consumption are not performed in a timely manner, only the last record can be read after data compression. You can change the buffer time for data compression to reserve sufficient time for the reader to read and consume data. For example, you can adjust the configuration of the compaction.delta_commits and compaction.delta_seconds parameters.
    Parameter Description Required Remarks
    changelog.enabled Specifies whether to enable the changelog mode. No
    Valid values:
    • true: The changelog mode is enabled.
    • false: The changelog mode is disabled. This is the default value. If the changelog mode is disabled, the UPSERT semantics is supported. Among all records, only the last compaction record is available and the intermediate changes may be compacted.
    Note If data is read in streaming mode, each data change is displayed. If a specific amount of data is read at a time, only the change structure after compaction is displayed.
  • Parameters for batch import
    If the existing data is obtained from a data source other than Hudi, you can use the batch import feature to quickly import the existing data to a Hudi table. When you use the batch import feature, take note of the following points:
    • During batch import, Avro-based data serialization and data compaction are not performed. Deduplication is not performed after data is imported. If you have high requirements on the uniqueness of data, you must make sure that the imported data is unique by yourself.
    • In batch execution mode, the write.operation parameter is more efficient. The system sorts input data by partition and writes the data to a Hudi table by default. This way, the write operation is not frequently switched among different files and the system performance does not decrease.
    • You can configure the write.tasks parameter to specify the parallelism of bulk_insert write tasks. The number of small files varies based on the parallelism.
    The following table describes the parameters in the WITH clause.
    Parameter Description Required Remarks
    write.tasks The parallelism of write tasks. In each write task, data is written to one bucket or to a specified number of buckets in sequence. No Default value: 4.
    write.bulk_insert.shuffle_by_partition Specifies whether to scatter data by partition field and then write the data to partitions in write tasks. No Default value: true.
    Note If this parameter is set to true, the number of small files reduces but data skew may occur.
    write.bulk_insert.sort_by_partition Specifies whether to sort data by partition field before data is written to the table. No Default value: true.
    Note If the system writes data to multiple partitions in a write task, the number of small files reduces when this parameter is set to true.
    write.sort.memory The available management memory for the sort operator. No Default value: 128. Unit: MB.
  • Parameters for full incremental loading
    If you want to load indexes for offline data to your offline Hudi result table that contains full data and then write incremental data to the result table with deduplication, you can set the index.bootstrap.enabled parameter to true to enable the full incremental loading feature.
    Note If it takes a long period of time to write data to the table, you can increase the number of resources when you write full data to the table. After full data is written, you can decrease the number of resources or set the write.rate.limit parameter to true when you write incremental data to the table.
    Parameter Description Required Remarks
    index.bootstrap.enabled Specifies whether to enable the full incremental loading feature. After you enable this feature, the index data of the existing table is loaded to the state storage at a time. Yes Valid values:
    • true: The full incremental loading feature is enabled.
    • false: The full incremental loading feature is disabled. This is the default value.
    index.partition.regex Specifies a regular expression to filter partitions. No By default, all partitions are loaded.
  • Parameter for write operation throttling
    High write throughput and out-of-order data that occurs when data is randomly written to partitions may easily decrease the write performance and cause throughput glitches. To ensure smooth write traffic, you can enable the write operation throttling feature. For example, you can synchronize tens of billions of full and incremental data to Kafka, and then import database and table data into Hudi databases and tables based on the streaming consumption of Flink.
    Parameter Description Required Remarks
    write.rate.limit Specifies whether to enable the write throttling feature. No Valid values:
    • true: The write throttling feature is enabled.
    • false: The write throttling feature is disabled. This is the default value.
  • Parameter for writing data in append mode
    Parameter Description Required Remarks
    write.insert.cluster Specifies whether to merge small files during data writing. No Default value: false.
    Note
    • By default, data is written to a CoW table in append mode. In this mode, files are not merged. However, you can enable the small file merging feature to merge small files.
    • If you set this parameter to true, small files are merged without deduplication before data writing. The throughput is affected.

Examples

  • Example 1: Write data to a Hudi result table
    This example shows how to use the MySQL CDC connector to read data in streaming mode and then write the data to a Hudi table.
    1. Create an OSS bucket.

      For more information, see Create buckets.

    2. On the Draft Editor page, write code for an SQL streaming job in the text editor of the job that you want to edit.
      CREATE TEMPORARY TABLE datagen(
        uuid    BIGINT,
        data  STRING,
        ts TIMESTAMP(3)
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE hudi_sink (
        uuid BIGINT,
        data STRING,
        ts TIMESTAMP(3)
      ) WITH (
        'connector' = 'hudi',           
        'oss.endpoint' = '<yourOSSEndpoint>',                     
        'accessKeyId' = '<yourAccessKeyId>',                    
        'accessKeySecret' = '<yourAccessKeySecret>',                    
        'path' = 'oss://<yourOSSBucket>/<Custom storage location>', 
        'table.type' = 'COPY_ON_WRITE'                           
      );
      
      INSERT INTO hudi_sink SELECT * from datagen;
    3. On the right side of the Draft Editor page, click the Advanced tab and set Engine Version to vvr-4.0.11-flink-1.13. Engine Version
    4. Click Validate.
    5. Click Publish.
    6. On the Deployments page in the console of fully managed Flink, find the job that you want to start and click Start in the Actions column.
    7. View the test data that has been written in the OSS console.

      You can view the test data that has been written in the OSS console after the first checkpointing operation is complete.

  • Example 2: Ingest MySQL CDC data into data lakes
    This example shows how to read data from a MySQL CDC source table and then write data to a Hudi table.
    1. Create an OSS bucket.

      For more information, see Create buckets.

    2. On the Draft Editor page, write code for an SQL streaming job in the text editor of the job that you want to edit.
      CREATE TEMPORARY TABLE mysql_src (
        id BIGINT,
        name STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '<yourRDSHostName>',
        'port' = '3306',
        'username' = '<yourRDSUserName>',
        'password' = '<yourRDSPassword>',
        'database-name' = 'user_db.*', -- Use a regular expression to match multiple database shards. 
        'table-name' = 'user.*'   -- Use a regular expression to match multiple tables in the sharded database. 
      );
      
      CREATE TEMPORARY TABLE hudi_sink (
        id BIGINT PRIMARY KEY NOT ENFORCED,
        name STRING
      ) WITH (
        'connector' = 'hudi',
        'oss.endpoint' = '<yourOSSEndpoint>',
        'accessKeyId' = '<yourAccessKeyId>',
        'accessKeySecret' = '<yourAccessKeySecret>',
        'path' = 'oss://<yourOSSBucekt>/<Path to Table>/',
        'table.type' = 'MERGE_ON_READ'
      );
      
      INSERT INTO hudi_sink SELECT * FROM mysql_src;
    3. On the right side of the Draft Editor page, click the Advanced tab and set Engine Version to vvr-4.0.12-flink-1.13.
    4. Click Validate.
    5. Click Publish.
    6. On the Deployments page in the console of fully managed Flink, find the job that you want to start and click Start in the Actions column.
      After the job is published, you can view the vertex graph of the job on the Overview tab to learn the running process of the job. Publish successful
    7. View the test data that has been written in the OSS console.

      You can view the test data that has been written in the OSS console after the first checkpointing operation is complete.