ClickHouse是面向联机分析处理(OLAP)和列式存储的开源的数据库管理系统(DBMS),本文为您介绍ClickHouse Writer的实现原理、参数说明及配置示例。
使用限制
- ClickHouse Writer仅支持使用独享数据集成资源组,不支持使用公共资源组和自定义资源组。
- ClickHouse Writer使用JDBC连接ClickHouse,且仅支持使用JDBC Statement插入数据。
- ClickHouse Writer支持筛选部分列、列换序等功能,您可以自行填写列。
- 考虑到ClickHouse负载问题,ClickHouse Writer使用INSERT模式时,建议您限流系统吞吐量(TPS)最高为1,000。
- ClickHouse Writer在所有Task写入任务后,Job Post单进程执行Flush工作,保证数据在ClickHouse整体更新。
- 您需要确认驱动和您的ClickHouse服务之间的兼容能力,数据库驱动使用如下版本。
<dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4.ali2-SNAPSHOT</version> </dependency>
背景信息
ClickHouse Writer实现了向ClickHouse写出数据的功能。在底层实现上,ClickHouse Writer通过JDBC连接远程ClickHouse数据库,并执行相应的insert into
语句,写入数据至ClickHouse。
ClickHouse Writer面向ETL开发工程师,通过ClickHouse Writer从数仓导入数据至ClickHouse。同时ClickHouse Writer可以作为数据迁移工具,为数据库管理员等用户提供服务。
ClickHouse Writer通过数据集成框架获取Reader生成的协议数据,并利用ClickHouse暴露的INSERT接口写入ClickHouse,根据您的配置生成相应的SQL插入语句。
参数说明
参数 | 描述 | 是否必选 | 默认值 |
---|---|---|---|
jdbcUrl | 到对端数据库的JDBC连接信息,jdbcUrl包含在connection配置单元中。
|
是 | 无 |
username | 数据源的用户名。 | 是 | 无 |
password | 数据源指定用户名的密码。 | 是 | 无 |
table | 需要同步写出的表名称,使用JSON的数组进行描述。
说明 table必须包含在connection配置单元中。
|
是 | 无 |
column | 目标表需要写入数据的字段,字段之间用英文所逗号分隔。例如"column": ["id", "name", "age"] 。
说明 column配置项必须指定,不能为空。
|
是 | 无 |
preSql | 写入数据至目标表前,会先执行此处的标准语句。如果SQL中有需要操作的表名称,请使用@table 表示,以便在实际执行SQL语句时,对变量按照实际表名称进行替换。
|
否 | 无 |
postSql | 写入数据至目标表后,会执行此处的标准语句。 | 否 | 无 |
batchSize | 一次性批量提交的记录数大小,该值可以极大减少数据同步系统与ClickHouse的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致数据同步运行进程OOM异常。 | 否 | 1,024 |
向导开发介绍
暂不支持向导模式开发。
脚本开发介绍
使用脚本模式开发的详情请参见通过脚本模式配置任务。
说明 实际执行时,请删除下述代码中的注释。
向ClickHouse写出数据的JSON配置,如下所示。
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"clickhouse",//插件名。
"parameter":{
"username": "",
"password": "",
"column": [//字段。
"id",
"name"
],
"connection": [
{
"table": [//表名。
"ClickHouse_table"
],
"jdbcUrl": "jdbc:clickhouse://ip:port/database"
}
],
"preSql": [ //执行数据同步任务之前率先执行的SQL语句。
"delete from @table where db_id = -1"
],
"postSql": [//执行数据同步任务之后率先执行的SQL语句。
"update @table set db_modify_time = now() where db_id = 1"
],
"batchSize": "1024",
"batchByteSize": "67108864",
"writeMode": "insert"
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":false,//false代表不限流,下面的限流的速度不生效;true代表限流。
"concurrent":1,//作业并发数。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}