Both the Flink native JDBC connector and flink-connector-clickhouse can be used to process and write data to a ClickHouse cluster. This topic describes how to use flink-connector-clickhouse to write Flink data to an ApsaraDB for ClickHouse cluster. When flink-connector-clickhouse is used, Flink SQL statements are supported. You can use the Flink Table DDL statement to define an ApsaraDB for ClickHouse table and then execute the INSERT statement to write data to the ApsaraDB for ClickHouse table.

Prerequisites

Fully managed Flink method by Alibaba Cloud

You can use the fully managed Flink method by Alibaba Cloud to develop jobs. For more information, see Develop a job. For more information about defining the ClickHouse connector, see Manage custom connectors.

Table API method

To use the Flink Table API method to write data to an ApsaraDB for ClickHouse cluster, perform the following steps:

  1. Run the mvn archetype:generate command to create a project. You must enter information such as group-id and artifact-id during this process.
    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.12.0
  2. Edit the <dependencies /> section in the pom.xml file to add dependencies.
        <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>flink-connector-clickhouse</artifactId>
          <version>1.12.0</version>
        </dependency>
    Note
    • If maven dependencies cannot be downloaded, you can download them at flink-connector and manually install them.
    • Run the following command to install maven dependencies:
      mvn install:install-file -DgroupId=com.aliyun -DartifactId=flink-connector-clickhouse -Dversion=1.12.0 -Dpackaging=jar -Dfile=flink-connector-clickhouse-1.11.0.jar
  3. Create a program file to write data.

    You can execute the Flink Table DDL statement to define an ApsaraDB for ClickHouse sink table, because Flink SQL statements are supported by flink-connector-clickhouse.

    CREATE TABLE sink_table (
        name VARCHAR,
        grade BIGINT,
        rate FLOAT,
        more FLOAT,
        PRIMARY KEY (name, grade) NOT ENFORCED /* If pk is specified, enter upsert mode. */
    ) WITH (
    'connector' = 'clickhouse',
        'url' = 'clickhouse://<host>:<port>',
        'username' = '<username>',
        'password' = '<password>',
        'database-name' = 'default',        /* The name of the ApsaraDB for ClickHouse cluster. The default name is default. */
        'table-name' = 'd_sink_table',      /* The name of the ApsaraDB for ClickHouse table. */
        'sink.batch-size' = '1000',         /* The size of the batch. */
        'sink.flush-interval' = '1000',     /* The flush interval. */
        'sink.max-retries' = '3',           /* The maximum number of retries. */
        'sink.partition-strategy' = 'hash', /* hash | random | balanced */
        'sink.partition-key' = 'name',      /* The partition key for the hash policy. */
        'sink.ignore-delete' = 'true'       /* Ignore DELETE and process UPDATE as INSERT. */
    )

    Execute the INSERT statement to write data to the ApsaraDB for ClickHouse table.

    INSERT INTO sink_table SELECT name, grade, rate FROM source

    An example of a complete data writer file:

    package org.myorg.quickstart
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
    
    TableEnvironment,TableSchema,
    Types,ValidationException
    }
    
    object StreamingJob {
    def main(args: Array[String]) {
    val SourceCsvPath ="/<your-path-to-test-csv>/source.csv"
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.getConfig.disableClosureCleaner
      val tEnv = StreamTableEnvironment.create(env)
      val csvTableSource = CsvTableSource
      .builder()
      .path(SourceCsvPath)
    .ignoreFirstLine()
    
    .fieldDelimiter(",")
      .field("name", DataTypes.STRING()) .field("age", DataTypes.BIGINT())
        .field("sex", DataTypes.STRING())
          .field("grade", DataTypes.BIGINT())
    
        .field("rate", DataTypes.FLOAT())
    
        .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val create_sql =
          s"""
          | CREATE TABLE sink_table (
          |    name VARCHAR,
          |    grade BIGINT,
          |    rate FLOAT,
          |    PRIMARY KEY (name) NOT ENFORCED
          |) WITH (
          |    'connector' = 'clickhouse',
          |    'url' = 'clickhouse://<host>:<port>',
          |    'table-name' = 'd_sink_table',
    
        |    'sink.batch-size' = '1000',
    
        |    'sink.partition-strategy' = 'hash',
          |    'sink.partition-key' = 'name'
          |)
          |""".stripMargin
          tEnv.executeSql(create_sql);
          tEnv.executeSql(
          "INSERT INTO sink_table SELECT name, grade, rate FROM source"
          )
          }
    }
          |    'url' = 'clickhouse://<host>:<port>',
          |    'table-name' = 'd_sink_table',
          |    'sink.batch-size' = '1000',
          |    'sink.partition-strategy' = 'hash',
          |    'sink.partition-key' = 'name'
          |)
          |""".stripMargin
    
        tEnv.executeSql(create_sql);
    
        tEnv.executeSql(
          "INSERT INTO sink_table SELECT name, grade, rate FROM source"
        )
      }
    }
  4. Compile and run the file.
    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar

Connector parameters

Parameter Description
database-name The name of the ApsaraDB for ClickHouse cluster. The default name is default.
table-name The name of the ApsaraDB for ClickHouse table.
sink.batch-size You can determine the size of the batch based on your business scenario or by testing. Our testing shows an ideal batch size of about 8000.
sink.flush-interval The flush interval. Default value: 1s.
sink.max-retries The maximum number of retries.
sink.partition-strategy The partitioning policy.
  • balanced: The round-robin partitioning policy.
  • random: The random partitioning policy.
  • hash: The hash partitioning policy. The partition key is specified by using the sink.partition-key parameter.
sink.partition-key The partition key for the hash policy.
sink.ignore-delete The default value is true, indicating DELETE is ignored and UPDATE is processed as INSERT. ApsaraDB for ClickHouse does not completely support the UPDATE and DELETE operations. For example, synchronous update is not supported. If you set the sink.ignore-delete parameter to false in an explicit way, ALTER TABLE UPDATE/DELETE is executed to update data. This significantly deteriorates the performance.

Data type mapping between Flink and ApsaraDB for ClickHouse

Flink data type ApsaraDB for ClickHouse data type
BOOLEAN UInt8
TINYINT Int8
SMALLINT Int16
INTEGER Int32
INTERVAL_YEAR_MONTH Int32
BIGINT Int64
INTERVAL_DAY_TIME Int64
FLOAT Float32
DOUBLE Float64
CHAR String
VARCHAR String
BINARY FixedString
VARBINARY FixedString
DATE Date
TIME_WITHOUT_TIME_ZONE DateTime
TIMESTAMP_WITH_TIME_ZONE DateTime
TIMESTAMP_WITHOUT_TIME_ZONE DateTime
DECIMAL Decimal

FAQ

1. How do I enable multi-threaded writing?
You can invoke the Flink SetParallelism method to enable multi-threaded writing.
2. What do I do if the dependencies of flink-connector-clickhouse are not found?
You can download the dependencies and manually install them.
3. What do I do if the "Method not found" message is reported or a data type cannot be converted?
This is usually caused by Java dependencies. Check whether multiple dependencies are used for the class that encounters an error. We recommend that you use the fully managed Flink method by Alibaba Cloud to provide the program file to write data.
4. Does flink-connector-clickhouse support the recall of update messages?
No.
5. What do I do if no data ouputs are generated when I run the flink-connector-clickhouse locally?
First, add the printSink function to check data inputs are provided. Then, check whether the thresholds of the sink.batch-size and sink.flush-interval parameters are reached.