本文介绍如何使用阿里云E-MapReduce创建的Hadoop和Kafka集群,运行PyFlink作业以消费Kafka数据。
前提条件
步骤一:创建Hadoop集群和Kafka集群
创建同一个安全组下的Hadoop和Kafka集群。创建详情请参见创建集群。
说明 本文以EMR-3.29.0为例介绍。
步骤二:在Kafka集群上创建Topic
本示例将创建一个分区数为10、副本数为2、名称为payment_msg和results的Topic。
步骤三:准备测试数据
在步骤二中Kafka集群的命令行窗口,执行如下命令,不断生成测试数据。
python3 -m pip install kafka
rm -rf produce_data.py
cat>produce_data.py<<EOF
import random
import time, calendar
from random import randint
from kafka import KafkaProducer
from json import dumps
from time import sleep
def write_data():
data_cnt = 20000
order_id = calendar.timegm(time.gmtime())
max_price = 100000
topic = "payment_msg"
producer = KafkaProducer(bootstrap_servers=['emr-worker-1:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
for i in range(data_cnt):
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
rd = random.random()
order_id += 1
pay_amount = max_price * rd
pay_platform = 0 if random.random() < 0.9 else 1
province_id = randint(0, 6)
cur_data = {"createTime": ts, "orderId": order_id, "payAmount": pay_amount, "payPlatform": pay_platform, "provinceId": province_id}
producer.send(topic, value=cur_data)
sleep(0.5)
if __name__ == '__main__':
write_data()
EOF
python3 produce_data.py