All Products
Search
Document Center

Lindorm:Import data in batches

Last Updated:Oct 27, 2023

Lindorm provides the bulkload feature to allow you to import data in batches. The bulkload feature can be used to quickly import data in a stable manner. This topic describes how to use the bulkload feature to import data in batches.

Feature

The bulkload feature loads data in bypass mode. The bulkload feature does not load data by using API operations or computing resources of your Lindorm instance. Compared with the method of calling API operations to import data, the bulkload feature provides the following advantages:

  • The data importing rate of bulkload is more than 10 times higher than the rate of importing data by using API operations.

  • Bulkload jobs do not affect your online services because bulkload jobs do not require the computing resources of your online services.

  • Resource usage for bulkload jobs is more flexible because the system uses Spark computing resources to run bulkload jobs.

  • The bulkload feature can be used to import data from various types of data sources, including Comma Separated Values (CSV) files, Optimized Row Columnar (ORC) files, Parquet files, and tables in Alibaba Cloud MaxCompute.

  • The bulkload feature is easy to use. You do not need to write a line of code to load data in batches in bypass mode.

  • Cost-effectiveness. Lindorm Tunnel Service (LTS) provides resources for bulkloading based on the cloud native elastic capability of serverless Spark. LTS provides computing resources that can be scaled based on your business requirements, and the resources that are used for bulkloading are billed by using the pay-as-you-go billing method. You do not need to purchase computing resources for bulkload jobs for a long period of time. This helps you reduce resource costs.

Prerequisites

Supported data sources

Source

Destination

MaxCompute Table

Wide tables in the Lindorm wide table engine service (LindormTable)

HDFS CSV or OSS CSV

HDFS Parquet or OSS Parquet

HDFS ORC or OSS ORC

How to create a bulkload job

You can create a bulkload job by using one of the following methods:

Create a bulkload job in the LTS console

  1. Log on to the LTS web UI of your Lindorm instance. For more information, see Activate and log on to LTS.

  2. In the left-side navigation pane, choose Data Source Management > Add Data Source.

    • Create a MaxCompute data source. For information about how to create a MaxCompute data source, see ODPS data source.

    • Create a LindormTable data source. For information about how to create a LindormTable data source, see Add a LindormTable data source.

    • Create a Hadoop Distributed File System (HDFS) data source. For information about how to create an HDFS data source, see Add an HDFS data source.

  3. In the left-side navigation pane, choose Data Import > Bulkload. Bulkload

  4. Click create new job. Then, configure the parameters on the page that appears. The following table describes the parameters.

    Section

    Parameter

    Description

    Select Datasource

    Source

    Select the MaxCompute data source or HDFS data source from which you want to import data to Lindorm.

    Target

    Select the LindormTable data source to which you want to import data.

    Configuration

    Reader Config

    • If the data source is a table in MaxCompute, the configuration of the reader must include the following parameters:

      • table: the name of the source table in MaxCompute.

      • column: the names of the columns from which you want to import data to Lindorm.

      • partition: If the source table is a partitioned table, specify the partitions from which you want to import data to Lindorm. If the source table is a non-partitioned table, this parameter can be left empty.

      • numPartitions: the number of threads that you want the reader to use to read data from the source table.

    • If the data source is a CSV file in an HDFS system, the configuration of the reader must include the following parameters:

      • filePath: the directory where the CSV file resides.

      • header: specifies whether the CSV file includes a header line.

      • delimiter: the delimiter that is used in the CSV file.

      • column: the names and data types of the columns in the CSV file.

    • If the data source is a Parquet file in an HDFS system, the configuration of the reader must include the following parameters:

      • filePath: the directory where the Parquet file reside.

      • column: the names of the columns in the Parquet file.

    Note

    For more information about how to configure a reader, see Sample configurations.

    Writer Config

    • namespace: the namespace of the LindormTable cluster.

    • lindormTable: the name of the destination wide table in LindormTable.

    • compression: the compression algorithm that you want to use. Only the Zstandard (Zstd) compression algorithm is supported. If you do not want to use a compression algorithm, specify none as the value of this parameter.

    • columns: the names of the columns in the destination wide table.

      • If the destination wide table is a Lindorm wide table that supports SQL queries, specify the names of the columns that are included in the Lindorm wide table as the value of this parameter. The columns must correspond to the columns that are specified in the reader configuration.

      • If the destination wide table is a Lindorm wide table that is compatible with ApsaraDB for HBase, specify the standard names of the columns in the destination table as the value of this parameter. The columns must correspond to the columns that are specified in the reader configuration.

    • timestamp: the timestamp of the data in the wide table. The following data types are supported:

      • 13-digit Long values.

      • Strings in the yyyy-MM-dd HH:mm:ss or yyyy-MM-dd HH:mm:ss SSS formats.

    Note

    For more information about how to configure a writer, see Sample configurations.

    Job Config

    Spark Driver Spec

    Select the instance type of the Spark driver of the bulkload job.

    Spark Executor Spec

    Select the instance type of the Spark executors of the bulkload job.

    Spark Executor Instances

    Specify the number of Spark executors that you want to use to run the bulkload job.

    Spark Config

    Optional. The extended configurations of the bulkload job.

  5. Click Submit.

  6. On the Bulkload page, click the name of the bulkload job to view information about the job.

    • Click the name of the bulkload job. Then, view information about the job on the Spark web UI of the job.

    • Click Details to view the execution logs of the job.

    Note

    If you want to import data to a Lindorm wide table in which data is evenly distributed to each partition, the size of data is 100 GB, and the compression ratio is 1:4, the system requires approximately 1 hour to import the data. In actual business scenarios, the import duration is determined based on the size of the data that is imported.

Call the related API operation to create a bulkload job

Create a bulkload job

  • API operation: http://{BDSMaster}:12311/pro/proc/bulkload/create. The HTTP request method is POST. {BDSMaster}: specifies the domain name of the primary node of your Lindorm instance. To obtain the domain name, log on to the LTS web UI of the Lindorm instance and view the information in the Basic Info section on the Cluster Info page.

  • The following table describes the request parameters.

    Parameter

    Description

    src

    The name of the source data source.

    dst

    The name of the destination data source.

    readerConfig

    The configuration of the reader. The configuration data must be in the JSON format. For more information, see Sample configurations.

    writerConfig

    The configuration of the writer. The configuration data must be in the JSON format. For more information, see Sample configurations.

    driverSpec

    The instance type of the driver of the bulkload job. Valid values: small, medium, large, and xlarge. We recommend that you set this parameter to large.

    instances

    The number of Spark executors that you want to use to run the bulkload job.

    fileType

    The type of the source file. If the source data source is an HDFS data source, the file type is CSV or Parquet.

    sparkAdditionalParams

    Optional. Extension parameters.

  • Example:

    curl -d "src=hdfs&dst=ld&readerConfig={\"filePath\":\"parquet/\",\"column\":[\"id\",\"intcol\",\"doublecol\",\"stringcol\",\"string1col\",\"decimalcol\"]}&writerConfig={\"columns\":[\"ROW||String\",\"f:intcol||Int\",\"f:doublecol||Double\",\"f:stringcol||String\",\"f:string1col||String\",\"f:decimalcol||Decimal\"],\"namespace\":\"default\",\"lindormTable\":\"bulkload_test\",\"compression\":\"zstd\"}&driverSpec=large&instances=5&fileType=Parquet" -H "Content-Type: application/x-www-form-urlencoded" -X POST http://{LTSMaster}:12311/pro/proc/bulkload/create

    The following result is returned. The message parameter in the result indicates the ID of the job that is created.

    {"success":"true","message":"proc-91-ff383c616e5242888b398e51359c****"}

Query information about a bulkload job.

  • API operation: http://{LTSMaster}:12311/pro/proc/{procId}/info. The HTTP request method is GET. {LTSMaster}: specifies the domain name of the primary node of your Lindorm instance. To obtain the domain name, log on to the LTS web UI of the Lindorm instance and view the information in the Basic Info section on the Cluster Info page.

  • Request parameter: procId. The procId parameter specifies the ID of the job.

  • Example:

    curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/info

    The following result is returned:

    {
        "data":{
            "checkJobs":Array,
            "procId":"proc-91-ff383c616e5242888b398e51359c****",  //Indicates the job ID.
            "incrJobs":Array,
            "procConfig":Object,
            "stage":"WAIT_FOR_SUCCESS",
            "fullJobs":Array,
            "mergeJobs":Array,
            "srcDS":"hdfs",    //Indicates the source data source.
            "sinkDS":"ld-uf6el41jkba96****",  //Indicates the destination data source.
            "state":"RUNNING",   //Indicates the status of the job.
            "schemaJob":Object,   
            "procType":"SPARK_BULKLOAD"   //Indicates the type of the job.
        },
        "success":"true"
    }

Terminate a bulkload job

  • API operation: http://{LTSMaster}:12311/pro/proc/{procId}/abort. The HTTP request method is GET. {LTSMaster}: specifies the domain name of the primary node of your Lindorm instance. To obtain the domain name, log on to the LTS web UI of the Lindorm instance and view the information in the Basic Info section on the Cluster Info page.

  • Request parameter: procId. The procId parameter specifies the ID of the job.

  • Example:

    curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/abort

    The following result is returned:

    {"success":"true","message":"ok"}

Retry a bulkload job

  • API operation: http://{LTSMaster}:12311/pro/proc/{procId}/retry. The HTTP request method is GET. {LTSMaster}: specifies the domain name of the primary node of your Lindorm instance. To obtain the domain name, log on to the LTS cluster of the Lindorm instance and view the information in the Basic Info section on the Cluster Info page.

  • Request parameter: procId. The procId parameter specifies the ID of the job.

  • Example:

    curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/retry

    The following result is returned:

    {"success":"true","message":"ok"}

Delete a bulkload job

  • API operation: http://{LTSMaster}:12311/pro/proc/{procId}/delete. The HTTP request method is GET. {LTSMaster}: specifies the domain name of the primary node of your Lindorm instance. To obtain the domain name, log on to the LTS cluster of the Lindorm instance and view the information in the Basic Info section on the Cluster Info page.

  • Request parameter: procId. The procId parameter specifies the ID of the job.

  • Example:

    curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/delete

    The following result is returned:

    {"success":"true","message":"ok"}

Sample configurations

  • Sample configurations that are used to read data from a data source.

    • The following sample code provides an example on how to configure the reader to read data from a MaxCompute data source.

      {
        "table": "test",
        "column": [ 
          "id",
          "intcol",
          "doublecol",
          "stringcol",
          "string1col",
          "decimalcol"
        ],
        "partition": [
          "pt=1" 
        ],
        "numPartitions":10 
      }
    • The following sample code provides an example on how to configure the reader to read data from a CSV file in an HDFS data source.

      {
        "filePath":"csv/",
        "header": false,
        "delimiter": ",",
        "column": [
          "id|string",
          "intcol|int",
          "doublecol|double",
          "stringcol|string",
          "string1col|string",
          "decimalcol|decimal"
        ]
      }
    • The following sample code provides an example on how to configure the reader to read data from a Parquet file in an HDFS data source.

      {
        "filePath":"parquet/",
        "column": [   // The names of columns in the Parquet file.
          "id",
          "intcol",
          "doublecol",
          "stringcol",
          "string1col",
          "decimalcol"
        ]
      }
  • Sample configurations that are used to write data to a destination table.

    • The following sample code provides an example on how to configure the writer to write data to a Lindorm wide table that supports SQL queries.

      {
        "namespace": "default",
        "lindormTable": "xxx",
        "compression":"zstd",
        "timestamp":"2022-07-01 10:00:00",
        "columns": [
             "id",
             "intcol",
             "doublecol",
             "stringcol",
              "string1col",
              "decimalcol"
        ]
      }
    • The following sample code provides an example on how to configure the writer to write data to a Lindorm wide table that is compatible with ApsaraDB for HBase.

      {
        "namespace": "default",
        "lindormTable": "xxx",
        "compression":"zstd",
        "timestamp":"2022-07-01 10:00:00",
        "columns": [
          "ROW||String",    //ROW indicates the row key, and String indicates the data type of the column.
          "f:intcol||Int", // Column family:Column name||Data type
          "f:doublecol||Double",
          "f:stringcol||String",
          "f:string1col||String",
          "f:decimalcol||Decimal"
        ]
      }