本文以一个示例为您演示开源Flink如何实时写入数据至Hologres。
前提条件
- 开通Hologres实例,并连接开发工具。本次示例使用psql客户端连接Hologres,详情请参见PSQL客户端。
- 创建Flink集群。您可以进入Flink官网下载二进制文件,启动一个Standalone集群,详细操作步骤请参见集群搭建。本次示例使用Flink1.10集群。
- 请您根据实际业务准备Flink的输入数据源。
操作步骤
- Hologres创建结果表。
Holoogres实例连接开发工具后,您需要创建一张结果表,用于接收实时写入的数据。示例语句如下。
begin;
create table order_details(user_id bigint, user_name text, item_id bigint, item_name text, price numeric(38, 2), province text, city text, ip text, longitude text, latitude text, sale_timestamp timestamptz not null, primary key(user_id, item_id));
call set_table_property ('order_details', 'segment_key', 'sale_timestamp');
commit;
- 下载并编译Flink的JAR文件。
- 下载并安装Hologres Connector依赖的JAR文件hologres-flink-connector-1.10-jar-with-dependencies.jar,示例语句如下。
mvn install:install-file -Dfile=hologres-flink-connector-1.10-jar-with-dependencies.jar -DgroupId=org.apache.flink -DartifactId=hologres-flink-connector -Dversion=1.10 -Dpackaging=jar -DgeneratePom=true
- 进入Hologres的Git官方示例库,下载并编译JAR文件,示例语句如下。
git clone https://github.com/hologres/hologres-flink-examples.git
cd hologres-flink-examples
git checkout -b example
mvn package -DskipTests
- 提交Flink作业。
编译完JAR文件后,配置作业参数,提交Flink作业,示例语句如下。
说明 示例使用命令行方式提交Flink作业,您也可以选择使用Flink Web页面提交作业。
flink run -c io.hologres.flink.example.HologresSinkExample ../hologres-flink-example/target/hologres-flink-examples-1.0.0-jar-with-dependencies.jar --endpoint $ENDPOINT --username $USERNAME --password $PASSWORD --database $DATABASE --tablename order_details
参数说明如下表所示。
参数 |
描述 |
示例 |
endpoint |
Hologres的Endpoint地址。
进入Hologres管理控制台的实例详情页,从实例配置获取Endpoint。
说明 本地Flink请使用Hologres的公共网络地址,阿里云VPC网络请使用Hologres的VPC网络地址。
|
ssseeee-cn-hangzhou.hologres.aliyuncs.com:80 |
username |
当前阿里云账号的AccessKey ID。
您可以单击AccessKey 管理,获取AccessKey ID。
|
无 |
password |
当前阿里云账号的AccessKey Secret。
您可以单击AccessKey 管理,获取AccessKey Secret。
|
无 |
database |
连接的Hologres数据库名称。 |
hologres_demo |
tablename |
Hologres接收数据的表名称。 |
order_details |
- Hologres查询数据。
成功启动任务后,您可以在Hologres中实时查询写入的数据。示例语句如下。
select count(1) from order_details;
select item_id, sum(price) as total from order_details group by item_id limit 10;