This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access ClickHouse.

Prerequisites

  • DLA is activated and a Spark virtual cluster (VC) is created in the DLA console. For more information about how to activate DLA, see Activate Data Lake Analytics
  • Object Storage Service (OSS) is activated. For more information, see Activate OSS.
  • ClickHouse is activated. For more information, see Activate ClickHouse.
  • vSwitch ID and security group ID that are required by the Spark compute node are obtained. You can select the ID of an existing vSwitch and the ID of an existing security group. You can also create a vSwitch and a security group and use their IDs. Make sure that the vSwitch and security group meet the following conditions:
    • The vSwitch and ClickHouse must be associated with the same VPC. If this condition is met, you can use the vSwitch ID displayed in the ClickHouse console, as shown in the following figure.
    • The security group and ClickHouse must be associated with the same VPC. You can go to the ECS console. In the left-side navigation pane, choose Network & Security > Security Groups. On the Security Groups page, enter the VPC ID in the search box to search for the security groups associated with the VPC and select the ID of a security group.
    • Add the CIDR block of the vSwitch you selected to a ClickHouse whitelist.

Procedure

  1. Prepare the ck.csv file that contains test data and upload the file to OSS.
    name,age
    fox,18
    tiger,20
    alice,36
  2. Prepare the following code that is used to read data from the ck.csv file in OSS and write the data into the ClickHouse table that you created. Then, read data from the ClickHouse table and return the data to the DLA console.
    • Sample data in the POM file
          <dependencies>
              <dependency>
                  <groupId>ru.yandex.clickhouse</groupId>
                  <artifactId>clickhouse-jdbc</artifactId>
                  <version>0.2.4</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-sql_2.11</artifactId>
                  <version>2.4.5</version>
                  <scope>provided</scope>
              </dependency>
           </dependencies>
    • For sample code snippets, see. For more information about the complete code, see alibabacloud-dla-demo.
      package com.aliyun.spark
      
      import java.sql.{ Connection, DriverManager}
      import java.util.Properties
      
      import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
      import org.apache.spark.sql.{ SaveMode, SparkSession}
      
      object SparkWriteToCK {
        val ckProperties = new Properties()
        val url = "jdbc:clickhouse://<Endpoint of the VPC where ClickHouse is deployed>:8123/default"
        ckProperties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
        ckProperties.put("user", "<Username>")
        ckProperties.put("password", "<Password>")
        ckProperties.put("batchsize","100000")
        ckProperties.put("socket_timeout","300000")
        ckProperties.put("numPartitions","8")
        ckProperties.put("rewriteBatchedStatements","true")
      
        // Create a ClickHouse table.
        def createCKTable(table: String): Unit ={
          Class.forName(ckProperties.getProperty("driver"))
          var conn : Connection = null
          try {
            conn = DriverManager.getConnection(url, ckProperties.getProperty("user"), ckProperties.getProperty("password"))
            val stmt = conn.createStatement()
            val sql =
              s"""
                 |create table if not exists default.${table}  on cluster default(
                 |    `name` String,
                 |    `age`  Int32)
                 |ENGINE = MergeTree() ORDER BY `name` SETTINGS index_granularity = 8192;
                 |""".stripMargin
            stmt.executeQuery(sql)
          } finally {
            if(conn ! = null)
              conn.close()
          }
        }
        
        def main(args: Array[String]): Unit = {
          val table = "ck_test"
          // Create a ClickHouse table by using Java Database Connectivity (JDBC).
          createCKTable(table)
          val spark = SparkSession.builder().getOrCreate()
          // Write data from the ck.csv file into the ClickHouse table.
          val csvDF = spark.read.option("header","true").csv("oss://<path/to>/ck.csv").toDF("name", "age")
          csvDF.printSchema()
          csvDF.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, ckProperties)
          // Read data from the ClickHouse table.
          val ckDF = spark.read.jdbc(url, table, ckProperties)
          ckDF.show()
        }
      
      }
  3. Compile and package the preceding code and upload the package to OSS.
  4. Log on to the DLA console. In the left-side navigation pane, choose Serverless Spark > Submit job. On the Parameter Configuration page, select the VC that you created in Prerequisites from the Specify Virtual Cluster drop-down list. Click Create Job. In the Create Job dialog box, configure the parameters and click OK. Then, click Execute.
    {
        "file": "oss://<path/to/your/jar>",
        "name": "SparkWriteToCK",
        "className": "com.aliyun.spark.SparkWriteToCK",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://<Directory where your Spark UI logs are saved/>",
            "spark.dla.connectors": "oss",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.security.group.id": "<ID of the security group you selected in Prerequisites>",
            "spark.dla.eni.vswitch.id": "<ID of the vSwitch you selected in Prerequisites>"
        }
    }
    Note For more configurations, see Spark configuration guide.
  5. After the job succeeds, you can view the job logs and Spark UI in the lower part of the page.