Topik ini menjelaskan cara menggunakan konektor Java Database Connectivity (JDBC) untuk menulis data dari Flink ke ClickHouse. Metode yang dijelaskan berlaku untuk berbagai versi Flink.
Informasi latar belakang
Flink melakukan refaktorisasi signifikan terhadap konektor JDBC-nya pada versi 1.11.0:
-
Sebelum refaktorisasi, pada versi 1.10.1 dan sebelumnya, nama paketnya adalah flink-jdbc.
-
Setelah refaktorisasi, pada versi 1.11.0 dan seterusnya, nama paketnya adalah flink-connector-jdbc.
Tabel berikut menunjukkan bagaimana masing-masing paket mendukung penulisan data ke sink ClickHouse.
|
Nama API |
flink-jdbc |
flink-connector-jdbc |
|
DataStream |
Tidak didukung |
Dukungan |
|
Table API (Legacy) |
Dukungan |
Tidak didukung |
|
Table API (DDL) |
Tidak didukung |
Tidak didukung |
Paket flink-connector-jdbc sepenuhnya menghapus dukungan untuk Table API legacy. Anda hanya dapat memanggil Table API menggunakan Data Definition Language (DDL). Namun, metode Table DDL menyematkan driver JDBC yang didukung secara langsung dalam kodenya dan tidak mencakup dukungan untuk ClickHouse. Topik ini menggunakan contoh Flink 1.10.1 dengan flink-jdbc dan Flink 1.11.0 dengan flink-connector-jdbc untuk menunjukkan cara menulis data dari Flink ke ClickHouse.
Flink 1.10.1 + flink-jdbc
Pada Flink 1.10.1 dan versi sebelumnya, Anda harus menggunakan paket flink-jdbc dan Table API untuk menulis data ke ClickHouse. Bagian ini memberikan contoh yang menggunakan Maven dan Flink 1.10.1.
-
Buat proyek menggunakan perintah mvn archetype:generate. Selama proses pembuatan, masukkan group-id, artifact-id, dan informasi lainnya saat diminta.
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.10.1 -
Edit bagian <dependencies /> dalam file
pom.xmluntuk menambahkan dependensi yang diperlukan.<!--//Tambahkan dependensi untuk Flink Table API --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!--//Tambahkan dependensi untuk Flink JDBC dan ClickHouse JDBC Driver --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency> -
Buat file program untuk menulis data.
Program contoh menggunakan
CsvTableSourceuntuk membaca file CSV dan membuat sumber tabel. Program tersebut kemudian menggunakanJDBCAppendTableSinkuntuk menulis data ke sink ClickHouse.Catatan-
Karena ClickHouse memiliki latensi tinggi untuk operasi insert tunggal, atur
BatchSizeuntuk melakukan bulk insert dan meningkatkan performa. -
Dalam implementasi JDBCAppendTableSink, jika jumlah catatan dalam batch terakhir kurang dari
BatchSize, data yang tersisa tidak dimasukkan.
package org.myorg.example import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException } import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink import org.apache.flink.api.common.typeinfo.TypeInformation object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/<YOUR-PATH-TO-TEST-CSV>/source.csv" val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>" val CkUsername = "<YOUR-USERNAME>" val CkPassword = "<YOUR-PASSWORD>" val BatchSize = 500 // Atur ukuran batch Anda val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", Types.STRING) .field("age", Types.LONG) .field("sex", Types.STRING) .field("grade", Types.LONG) .field("rate", Types.FLOAT) .build() tEnv.registerTableSource("source", csvTableSource) val resultTable = tEnv.scan("source").select("name, grade, rate") val insertIntoCkSql = """ | INSERT INTO sink_table ( | name, grade, rate | ) VALUES ( | ?, ?, ? | ) """.stripMargin // Menulis data ke sink ClickHouse val sink = JDBCAppendTableSink .builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl(CkJdbcUrl) .setUsername(CkUsername) .setPassword(CkPassword) .setQuery(insertIntoCkSql) .setBatchSize(BatchSize) .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT) .build() tEnv.registerTableSink( "sink", Array("name", "grade", "rate"), Array(Types.STRING, Types.LONG, Types.FLOAT), sink ) tEnv.insertInto(resultTable, "sink") env.execute("Flink Table API to ClickHouse Example") } }Parameter:
-
SourceCsvPath: Jalur file CSV sumber. -
CkJdbcUrl: Alamat kluster ClickHouse tujuan. -
CkUsername: Username untuk kluster ClickHouse tujuan. -
CkPassword: Password untuk kluster ApsaraDB for ClickHouse tujuan.
-
-
Kompilasi dan jalankan program.
$ mvn clean package $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar
Flink 1.11.0 + flink-connector-jdbc
Pada Flink 1.11.0 dan versi selanjutnya, Anda harus menggunakan paket flink-connector-jdbc dan DataStream API untuk menulis data ke ClickHouse. Bagian ini memberikan contoh yang menggunakan Maven dan Flink 1.11.0.
-
Buat proyek menggunakan perintah mvn archetype:generate. Saat diminta, masukkan group-id, artifact-id, dan informasi lain yang diperlukan.
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.11.0 -
Edit bagian <dependencies /> dalam file
pom.xmluntuk menambahkan dependensi yang diperlukan.<!--//Tambahkan dependensi untuk Flink Table API --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!--//Tambahkan dependensi untuk Flink JDBC Connector dan ClickHouse JDBC Driver --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency> -
Buat file program untuk menulis data.
Program contoh menggunakan
CsvTableSourceuntuk membaca file CSV dan membuat sumber tabel. Program tersebut kemudian mengubah tabel menjadi DataStream menggunakanTableEnvironment.toAppendStream. Terakhir, program menggunakanJdbcSinkuntuk menulis data ke ClickHouse.Catatan-
Karena ClickHouse memiliki latensi tinggi untuk operasi insert tunggal, atur
BatchSizeuntuk melakukan bulk insert dan meningkatkan performa. -
Pada versi saat ini dari flink-connector-jdbc, terjadi masalah serialisasi dengan fungsi lambda ketika Anda memanggil JdbcSink menggunakan Scala API. Untuk mengatasi masalah ini, Anda harus mengimplementasikan antarmuka secara manual untuk meneruskan fungsi pembuat pernyataan JDBC. Kelas
CkSinkBuildermerupakan contoh implementasi tersebut.class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] { def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = { ps.setString(1, v._1) ps.setLong(2, v._2) ps.setFloat(3, v._3) } }
package org.myorg.example import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException } import org.apache.flink.connector.jdbc._ import java.sql.PreparedStatement // Implementasikan antarmuka secara manual untuk meneruskan fungsi pembuat pernyataan JDBC class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] { def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = { ps.setString(1, v._1) ps.setLong(2, v._2) ps.setFloat(3, v._3) } } object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/<YOUR-PATH-TO-TEST-CSV>/source.csv" val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>" val CkUsername = "<YOUR-USERNAME>" val CkPassword = "<YOUR-PASSWORD>" val BatchSize = 500 // Atur ukuran batch Anda val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", Types.STRING) .field("age", Types.LONG) .field("sex", Types.STRING) .field("grade", Types.LONG) .field("rate", Types.FLOAT) .build() tEnv.registerTableSource("source", csvTableSource) val resultTable = tEnv.scan("source").select("name, grade, rate") // Mengubah tabel menjadi DataStream val resultDataStream = tEnv.toAppendStream[(String, Long, Float)](resultTable) val insertIntoCkSql = """ | INSERT INTO sink_table ( | name, grade, rate | ) VALUES ( | ?, ?, ? | ) """.stripMargin // Menulis data ke sink JDBC ClickHouse resultDataStream.addSink( JdbcSink.sink[(String, Long, Float)]( insertIntoCkSql, new CkSinkBuilder, new JdbcExecutionOptions.Builder().withBatchSize(BatchSize).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("ru.yandex.clickhouse.ClickHouseDriver") .withUrl(CkJdbcUrl) .withUsername(CkUsername) .withPassword(CkPassword) .build() ) ) env.execute("Flink DataStream to ClickHouse Example") } }Parameter:
-
SourceCsvPath: Jalur file CSV sumber. -
CkJdbcUrl: Alamat kluster ClickHouse tujuan. -
CkUsername: Username untuk kluster ClickHouse tujuan. -
CkPassword: Password untuk kluster ClickHouse tujuan.
-
-
Kompilasi dan jalankan program.
$ mvn clean package $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar