本文介绍如何创建使用MaxCompute Sink Connector,您可以通过MaxCompute Sink Connector将数据从云消息队列 Kafka 版实例的数据源Topic导出至MaxCompute的表中。
前提条件
详细步骤,请参见创建前提。注意事项
如需使用MaxCompute分区功能,创建表时需额外创建一个分区列,列名为time,类型为STRING。
步骤一:创建目标资源
通过MaxCompute客户端创建表。更多信息,请参见创建表。
本文以名称为kafka_to_maxcompute的表为例。表中有3列数据,并使用分区功能。该表的建表语句如下:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT) PARTITIONED by (time STRING);
如不使用分区功能,语句如下:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT);
执行成功后,如下图所示:
在表管理页面,查看创建的表信息。
步骤二:创建MaxCompute Sink Connector并启动
- 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
- 在左侧导航栏,选择 。
- 在消息流出(Sink)页面,单击创建任务。
- 在消息流出创建面板,配置以下参数,单击确定。完成上述配置后,在消息流出(Sink)页面,找到刚创建的MaxCompute Sink Connector任务,单击其右侧操作列的启动。当状态栏由启动中变为运行中时,Connector创建成功。
步骤三:测试MaxCompute Sink Connector
- 在消息流出(Sink)页面,在MaxCompute Sink Connector任务的事件源列单击源Topic。
- 在Topic详情页面,单击体验发送消息。
- 在快速体验消息收发面板,按照下图配置消息内容,然后单击确定。
- 进入MaxCompute控制台,执行以下SQL语句查看分区信息。
show PARTITIONS kafka_to_maxcompute;
查询结果如下所示: - 根据分区信息,执行以下语句,查看分区内数据。
SELECT * FROM kafka_to_maxcompute WHERE time="2023-04-13 17";
查询结果如下所示: