This topic provides the DDL syntax that is used to create a Hudi source table, describes the background information, limits, and parameters in the WITH clause and provides an example.

Background information

  • Introduction to Hudi
    The following table describes the definition, features, and common usage scenarios of Hudi.
    Item Description
    Definition Apache Hudi is an open source framework that manages table data in data lakes. Hudi organizes file layouts based on Alibaba Cloud Object Storage Service (OSS) or Hadoop Distributed File System (HDFS) to ensure atomicity, consistency, isolation, durability (ACID) and supports efficient row-level data update and deletion. This simplifies extract, transform, and load (ETL) development. Hudi can automatically manage small files and merge small files into a large file of the specified size. This prevents an excessive number of small files from being created during data insertion and update. This avoids the impact of excessive small files on query performance and the O&M workloads that are generated when you monitor and rewrite small files. You can use Hudi with compute engines such as Apache Flink, Apache Presto, and Apache Spark to ingest data into data lakes for computing and analysis. In most cases, Hudi is used to meet business requirements, such as acceleration of ingesting data from databases into data lakes, real-time consumption of incremental data, and data backfilling. For more information, see Apache Hudi.
    Features
    • Supports ACID semantics and the serializable isolation level. The serializable isolation level is the strictest level of SQL transaction isolation.
    • Supports UPSERT semantics. The UPSERT semantics combines INSERT and UPDATE semantics. Fully managed Flink uses the UPSERT semantics to write data to a destination table based on the following rules: If a data record that is read from the source table does not exist in the destination table, fully managed Flink inserts the data record into the destination table. If a data record that is read from the source table exists in the destination table, fully managed Flink updates the data record. The INSERT INTO statement can significantly simplify the development code and improve data processing efficiency.
    • Provides historical details of the data versions at a point in time based on the time travel feature. This helps you perform data O&M in an efficient manner and improves data quality.
    • Provides the schema evolution feature. This feature allows you to perform schema-related operations. For example, you can use this feature to dynamically add columns and change data types.
    Scenarios
    • Acceleration of ingesting data from databases into data lakes

      Compared with the traditional method that is used to load and merge a large amount of data, Hudi allows you to update and write streaming data to a super large dataset in real time with lower costs. During the real-time ETL process, you can directly write change data capture (CDC) data to a data lake for downstream business. In most cases, you can use the MySQL CDC connector of fully managed Flink to write binary log data of the relational database management system (RDBMS) MySQL to a Hudi table.

    • Incremental ETL

      You can use the incremental extraction method of ETL to obtain the change data streams from Hudi in real time. This method provides better real-time performance and is more lightweight than offline ETL scheduling. In most cases, the online business data is incrementally extracted to an offline storage system. The Flink engine writes the extracted data to a Hudi table, and then the Apache Presto or Apache Spark engine is used to perform efficient online analytical processing (OLAP).

    • Message Queue service

      In scenarios where you need to process only a small amount of data, Hudi can also be used as a Message Queue service to replace Kafka. This simplifies the application development architecture.

    • Data backfilling

      If you want to update historical full data in specific rows and columns of a table, you can use data lakes. This greatly reduces the consumption of computing resources and improves end-to-end performance. In most cases, full data and incremental data are read from Hudi tables in a Hive metastore and the two tables are joined to generate a wide table.

  • Advantages of fully managed Flink into which Hudi is integrated
    Compared with the open source Hudi community, fully managed Flink into which Hudi is integrated provides more advantages. The following table describes these advantages.
    Advantage Description
    Maintenance-free based on the integration between the platform and fully managed Flink Fully managed Flink provides the built-in Hudi connector to simplify O&M and provide a service level agreement (SLA) guarantee.
    Improved data connectivity The Hudi connector is connected to multiple Alibaba Cloud big data computing and analytics engines. This way, data is decoupled from computing engines and can be seamlessly migrated among Apache Flink, Apache Spark, Apache Presto, and Apache Hive.
    Optimized data ingestion from databases to data lakes The Hudi connector works with the Flink CDC connector to simplify data development.
    Enterprise-class features Enterprise-class features are supported, such as unified metadata views of Data Lake Formation (DLF) and automatic and lightweight table schema changes.
    Low-cost storage and high scalability by using Alibaba Cloud OSS Data is stored in the Apache Parquet or Apache Avro format in Alibaba Cloud OSS. Storage and computing are isolated and resources can be scaled in a flexible manner.
  • CDC data synchronizationData import process
    CDC data includes complete database changes. You can use one of the following methods to import data to Hudi:
    • Consume and import Kafka data that is in a specific CDC format to Hudi at the same time.

      Three CDC formats are supported: debezium-json, canal-json, and maxwell-json. This method provides high scalability and requires Kafka and Debezium data synchronization tools.

    • Access binary log data of a database by using the Flink CDC connector and import the data to Hudi.

      This method uses lightweight components to reduce the dependency on tools.

    Note
    • If the upstream data order cannot be ensured, you must specify the write.precombine.field field.
    • In CDC scenarios, you must set changelog.enabled to true to enable the changelog mode.

Limits

  • Only Flink that runs vvr-4.0.11-flink-1.13 or later supports the Hudi connector.
  • Only HDFS or Alibaba Cloud OSS can be used as a file system.
  • You cannot submit jobs in a session cluster.

DDL syntax

CREATE TEMPORARY TABLE hudi_sink (
  uuid BIGINT,
  data STRING,
  ts   TIMESTAMP(3)
) WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'oss.endpoint' = '<yourOSSEndpoint>',
  'path' = 'oss://<yourOSSBucket>/<Custom storage location>',
  'accessKeyId' = '<yourAccessKeyId>',
  'accessKeySecret' = '<yourAccessKeySecret>',
  'read.streaming.enabled' = 'true'
);

Parameters in the WITH clause

  • Basic parameters
    Parameter Description Required Remarks
    connector The type of the source table. Yes Set the value to hudi.
    table.type The type of the table. Yes Valid values:
    • COPY_ON_WRITE: Parquet columnar storage is used. A base file is created each time data is updated.
    • MERGE_ON_READ: Parquet columnar storage and Avro row-based storage are used. Update operations are recorded in the delta log file, and the delta log file and the Parquet columnar file are asynchronously merged to generate a new version file.
    path The path in which the table is stored. Yes The table can be stored in an OSS bucket or HDFS. For example, the path can be in the oss://<bucket name>/table or hdfs://<ip>:<port>/table format.
    read.streaming.enabled Specifies whether to enable data reading in streaming mode. Yes Valid values:
    • true: Data reading in streaming mode is enabled.
    • false: Data reading in streaming mode is disabled. This is the default value.
    read.start-commit The offset from which data is read in streaming mode. No Specify the start offset of data reading in streaming mode in the yyyyMMddHHmmss format, including the specified time point.
    oss.endpoint The endpoint of OSS. No If you store the table in OSS, you must configure this parameter. For more information about OSS endpoints, see Regions and endpoints.
    accessKeyId The AccessKey ID of your Alibaba Cloud account. No If you store the table in OSS, you must configure this parameter. For more information about how to obtain the AccessKey ID, see Obtain an AccessKey pair.
    accessKeySecret The AccessKey secret of your Alibaba Cloud account. No If you store the table in OSS, you must configure this parameter. For more information about how to obtain the AccessKey secret, see Obtain an AccessKey pair.
    hive_sync.enable Specifies whether to synchronize metadata to Hive. No Valid values:
    • true: Metadata is synchronized to Hive.
    • false: Metadata is not synchronized to Hive.
    hive_sync.mode The Hive data synchronization mode. No Valid values:
    • hms: If you use a Hive metastore or DLF catalog, set this parameter to hms. This is the recommended value.
    • jdbc: If you use a Java Database Connectivity (JDBC) catalog, set this parameter to jdbc. This is the default value.
    hive_sync.db The name of the Hive database to which data is synchronized. No N/A.
    hive_sync.table The name of the Hive table to which data is synchronized. No N/A.
    dlf.catalog.region The name of the region in which the DLF service is activated. No
    Note Make sure that the value of this parameter matches the endpoint that is specified by the dlf.catalog.endpoint parameter.
    dlf.catalog.endpoint The endpoint of the DLF service. No
    Note
    write.precombine.field A version field. The system determines whether to update a message based on the value of this field. No The default value is ts. If you do not configure this parameter, the system updates data based on the message sequence that is defined in the engine.
    changelog.enabled Specifies whether to enable the changelog mode. No Valid values:
    • true: The changelog mode is enabled.

      The Hudi connector allows you to retain all changes to messages. After the Hudi connector is connected to the Flink engine, the end-to-end near-real-time data warehousing is implemented. For a Merge on Read (MoR) table, the Hudi connector retains all changes in messages in the row-based storage format. This way, the reader can read all changes on the MoR table in streaming mode and consume all change records. In this case, you must set changelog.enabled to true to enable the changelog mode.

      After you set changelog.enabled to true, all changes can be consumed. The asynchronous compaction task compacts intermediate changes into one record. Therefore, if data reading in streaming mode and data consumption are not performed in a timely manner, only the last record can be read after data compression. You can change the buffer time for data compression to reserve sufficient time for the reader to read and consume data. For example, you can adjust the configuration of the compaction.delta_commits and compaction.delta_seconds parameters.

    • false: The changelog mode is disabled. This is the default value. If the changelog mode is disabled, the UPSERT semantics is supported. Among all records, only the last compaction record is available and the intermediate changes may be compacted.
    Note If data is read in streaming mode, each data change is displayed. If a specific amount of data is read at a time, only the change structure after compaction is displayed.

Example

This example shows how to randomly generate streaming data to write data to a Hudi table and then write the streaming data from the Hudi table to a blackhole table by using the Datagen connector.
  1. Create an OSS bucket.

    For more information, see Create buckets.

  2. On the Draft Editor page in the console of fully managed Flink, write code for an SQL streaming job in the text editor of the job that you want to edit.
    CREATE TEMPORARY TABLE datagen(
      id INT NOT NULL PRIMARY KEY NOT ENFORCED,
      data  STRING,
      ts TIMESTAMP(3)
    ) WITH (
      'connector' = 'datagen' ,
      'rows-per-second'='100' 
    );
    
    CREATE TEMPORARY TABLE datasink (
      id INT NOT NULL PRIMARY KEY NOT ENFORCED,
      data STRING,
      ts TIMESTAMP(3)
    ) WITH (
      'connector' = 'blackhole'      
    );
    
    CREATE TEMPORARY TABLE hudi (
      id INT NOT NULL PRIMARY KEY NOT ENFORCED,
      data STRING,
      ts TIMESTAMP(3)
    ) WITH (
      'connector' = 'hudi',                         
      'oss.endpoint' = '<yourOSSEndpoint>',                         
      'accessKeyId' = '<yourAccessKeyId>',           
      'accessKeySecret' = '<yourAccessKeySecret>',      
      'path' = 'oss://<yourOSSBucket>/<Custom storage location>',
      'table.type' = 'MERGE_ON_READ',     
      'read.streaming.enabled' = 'true'
    );
    
    BEGIN STATEMENT SET;
    INSERT INTO hudi SELECT * from datagen;
    INSERT INTO datasink SELECT * FROM hudi;
    END;
    Note You can also run two jobs to perform read and write operations in streaming mode. In the preceding sample code, the statements for writing data to two sinks are used in a job. If you use statements to write data to multiple sinks, make sure that the statements start with BEGIN STATEMENT SET; and end with END;. For more information, see INSERT INTO statement.
  3. On the right side of the Draft Editor page, click the Advanced tab and set Engine Version to vvr-4.0.11-flink-1.13. Engine Version
  4. Click Validate.
  5. Click Publish.
  6. On the Deployments page in the console of fully managed Flink, find the job that you want to start and click Start in the Actions column.
  7. View the test data that has been written in the OSS console.

    You can view the test data that has been written after the first checkpointing operation is complete.