This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access ApsaraDB for ClickHouse.

Prerequisites

  • DLA is activated and a Spark virtual cluster (VC) is created in the DLA console. For more information about how to activate DLA, see Activate DLA. 19773401
  • Object Storage Service (OSS) is activated. For more information, see Activate OSS.
  • ApsaraDB for ClickHouse is activated. For more information, see .
  • vSwitch ID and security group ID that are required by the Spark compute node are obtained. You can select the ID of an existing vSwitch and the ID of an existing security group. You can also create a Create a vSwitch and Create a security group and use their IDs. The vSwitch and security group that you selected must meet the following conditions:
    • The vSwitch and ApsaraDB for ClickHouse cluster must be associated with the same VPC. If this condition is met, you can use the vSwitch ID displayed in the ApsaraDB for ClickHouse console, as shown in the following figure. 19773402
    • The security group and ApsaraDB for ClickHouse cluster must be associated with the same VPC. You can log on to the Elastic Compute Service (ECS) console. In the left-side navigation pane, choose Network & Security > Security Groups. On the Security Groups page, enter the VPC ID in the search box to search for the security groups that are associated with the VPC and select the ID of a security group. 19773402
    • Add the CIDR block of the vSwitch you selected to the whitelist of the ApsaraDB for ClickHouse cluster.

Procedure

  1. Prepare the ck.csv file that contains test data and upload the file to OSS.
    name,age
    fox,18
    tiger,20
    alice,36
  2. Prepare the following code that is used to read data from the ck.csv file in OSS and write the data into the ApsaraDB for ClickHouse table that you created. Then, read data from the ApsaraDB for ClickHouse table and return the data to the DLA console.
    • Sample data in the POM file.
          <dependencies>
              <dependency>
                  <groupId>ru.yandex.clickhouse</groupId>
                  <artifactId>clickhouse-jdbc</artifactId>
                  <version>0.2.4</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-sql_2.11</artifactId>
                  <version>2.4.5</version>
                  <scope>provided</scope>
              </dependency>
           </dependencies>
    • Sample code snippets. For more information about the complete code, click here.
      package com.aliyun.spark
      
      import java.sql.{Connection, DriverManager}
      import java.util.Properties
      
      import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
      import org.apache.spark.sql.{SaveMode, SparkSession}
      
      object SparkWriteToCK {
        val ckProperties = new Properties()
        val url = "jdbc:clickhouse://<Endpoint of the VPC where ClickHouse is deployed>:8123/default"
        ckProperties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
        ckProperties.put("user", "<Username>")
        ckProperties.put("password", "<Password>")
        ckProperties.put("batchsize","100000")
        ckProperties.put("socket_timeout","300000")
        ckProperties.put("numPartitions","8")
        ckProperties.put("rewriteBatchedStatements","true")
      
        // Create an ApsaraDB for ClickHouse table.
        def createCKTable(table: String): Unit ={
          Class.forName(ckProperties.getProperty("driver"))
          var conn : Connection = null
          try {
            conn = DriverManager.getConnection(url, ckProperties.getProperty("user"), ckProperties.getProperty("password"))
            val stmt = conn.createStatement()
            val sql =
              s"""
                 |create table if not exists default.${table}  on cluster default(
                 |    `name` String,
                 |    `age`  Int32)
                 |ENGINE = MergeTree() ORDER BY `name` SETTINGS index_granularity = 8192;
                 |""".stripMargin
            stmt.executeQuery(sql)
          } finally {
            if(conn != null)
              conn.close()
          }
        }
        
        def main(args: Array[String]): Unit = {
          val table = "ck_test"
          // Create an ApsaraDB for ClickHouse table by using Java Database Connectivity (JDBC).
          createCKTable(table)
          val spark = SparkSession.builder().getOrCreate()
          // Write data of the ck.csv file to the ApsaraDB for ClickHouse table.
          val csvDF = spark.read.option("header","true").csv("oss://<path/to>/ck.csv").toDF("name", "age")
          csvDF.printSchema()
          csvDF.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, ckProperties)
          // Read data from the ApsaraDB for ClickHouse table.
          val ckDF = spark.read.jdbc(url, table, ckProperties)
          ckDF.show()
        }
      
      }
  3. Compile and package the preceding code and upload the package to OSS.
  4. Log on to the DLA console. In the left-side navigation pane, choose Serverless Spark > Submit job. On the Parameter Configuration page, select the VC that you created in Prerequisites from the Specify Virtual Cluster drop-down list. Click Create Job. In the Create Job dialog box, configure the parameters and click OK. Then, click Execute.
    19773406
    {
        "file": "oss://<path/to/your/jar>",
        "name": "SparkWriteToCK",
        "className": "com.aliyun.spark.SparkWriteToCK",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://<Directory where your Spark UI logs are stored/>",
            "spark.dla.connectors": "oss",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.security.group.id": "<ID of the security group you selected in Prerequisites>",
            "spark.dla.eni.vswitch.id": "<ID of the vSwitch you selected in Prerequisites>"
        }
    }
    Note For more configurations, see Configure a Spark job.
  5. After the job succeeds, you can view the job logs and Spark UI in the lower part of the page.