Run the following commands to generate the lib.jar and job.py files:
rm -rf job.py
cat>job.py<<EOF
import os
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
provinces = ("beijing", "shanghai", "hangzhou", "shenzhen", "jiangxi", "chongqing", "xizang")
@udf(input_types=[DataTypes.INT()], result_type=DataTypes.STRING())
def province_id_to_name(id):
return provinces[id]
# Enter the following information based on the created Kafka cluster:
def log_processing():
kafka_servers = "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092,xx.xx.xx.xx:9092"
kafka_zookeeper_servers = "xx.xx.xx.xx:2181,xx.xx.xx.xx:2181,xx.xx.xx.xx:2181"
source_topic = "payment_msg"
sink_topic = "results"
kafka_consumer_group_id = "test_3"
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=env_settings)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
source_ddl = f"""
CREATE TABLE payment_msg(
createTime VARCHAR,
rt as TO_TIMESTAMP(createTime),
orderId BIGINT,
payAmount DOUBLE,
payPlatform INT,
provinceId INT,
WATERMARK FOR rt as rt - INTERVAL '2' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = '{source_topic}',
'connector.properties.bootstrap.servers' = '{kafka_servers}',
'connector.properties.zookeeper.connect' = '{kafka_zookeeper_servers}',
'connector.properties.group.id' = '{kafka_consumer_group_id}',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
)
"""
es_sink_ddl = f"""
CREATE TABLE es_sink (
province VARCHAR,
pay_amount DOUBLE,
rowtime TIMESTAMP(3)
) with (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = '{sink_topic}',
'connector.properties.bootstrap.servers' = '{kafka_servers}',
'connector.properties.zookeeper.connect' = '{kafka_zookeeper_servers}',
'connector.properties.group.id' = '{kafka_consumer_group_id}',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
)
"""
t_env.sql_update(source_ddl)
t_env.sql_update(es_sink_ddl)
t_env.register_function('province_id_to_name', province_id_to_name)
query = """
select province_id_to_name(provinceId) as province, sum(payAmount) as pay_amount, tumble_start(rt, interval '5' second) as rowtime
from payment_msg
group by tumble(rt, interval '5' second), provinceId
"""
t_env.sql_query(query).insert_into("es_sink")
t_env.execute("payment_demo")
if __name__ == '__main__':
log_processing()
EOF
rm -rf lib
mkdir lib
cd lib
wget https://maven.aliyun.com/nexus/content/groups/public/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.1/flink-sql-connector-kafka_2.11-1.10.1.jar
wget https://maven.aliyun.com/nexus/content/groups/public/org/apache/flink/flink-json/1.10.1/flink-json-1.10.1-sql-jar.jar
cd ../
zip -r lib.jar lib/*
Specify the following parameters in job.py based on the actual situation of the cluster.
Parameter
Description
kafka_servers
The list of IP addresses for Kafka brokers in the Kafka cluster. All the IP addresses
are the internal IP address of the Kafka cluster. The default port number is 9092.
For more information about the IP addresses, see List of components in the Kafka cluster.
kafka_zookeeper_servers
The list of IP addresses for ZooKeeper components in the Kafka cluster. All the IP
addresses are the internal IP address of the Kafka cluster. The default port number
is 2181. For more information about the IP addresses, see List of components in the Kafka cluster.
source_topic
The Kafka topic of the source table. In this example, the topic is payment_msg.
sink_topic
The Kafka topic of the result table. In this example, the topic is results.
Figure 1. List of components in the Kafka cluster
The following figure provides an example of lib.jar and job.py.
Use SSH Secure File Transfer Client to connect to the master node of the Hadoop cluster,
and then download and save lib.jar and job.py to your on-premises machine that runs a Windows operating system.
Create an OSS bucket and upload the two files to the bucket. For information about
how to create an OSS bucket, see Create buckets. For information about how to upload a file, see Upload objects.
In this example, the upload paths are oss://emr-logs2/test/lib.jar and oss://emr-logs2/test/job.py.