This topic describes how to deploy Storm clusters and Kafka clusters on E-MapReduce and run Storm topologies to consume data in Kafka.

Prepare the environment

The test is performed using EMR that is deployed in the China East 1 (Hangzhou) region. The version of EMR is 3.8.0. The component versions required for this test are as follows.
  • Kafka: 2.11_1.0.0
  • Storm: 1.0.1

In this topic, we use Alibaba Cloud E-MapReduce to create a Kafka cluster automatically. For more information, see Create a cluster.

  • Create a Hadoop cluster
  • Create a Kafka cluster
    • If you choose classic network as the network type, put the Hadoop cluster and the Kafka cluster in the same security group to save time for configuring connections between instances.
    • If you choose VPC as the network type, put the Hadoop cluster and the Kafka cluster in the same VPC and the same security group to save time for configuring a VPC peering connection.
    • If you are familiar with networking and security groups for ECS, you can create configurations as needed.
  • Configure the environment for Storm
    Consuming Kafka data fails if you run Storm topologies in the initial environment. To avoid such failures, you need to install the following dependencies for the Storm environment:
    These dependencies have been tested. If you need additional dependencies, perform the following operations to add them to the lib folder of Storm.
    You need to perform the preceding operations on each node in the Hadoop cluster. After the operations are complete, restart Storm in the E-MapReduce console as shown in the following figure.
    You can view operation logs to check the status of Storm:

Create Storm topologies and Kafka topics

  • E-MapReduce provides sample code that you can use directly. The links are as follows:
  • Write data to topics
    1. Log on to the Kafka cluster.
    2. Create a test topic with 10 partitions and 2 replicas.
      /usr/lib/kafka-current/bin/ --partitions 10 --replication-factor 2 --zookeeper emr-header-1:/kafka-1.0.0 --topic test --create
    3. Write 100 records of data to the test topic.
      /usr/lib/kafka-current/bin/ --num-records 100 --throughput 10000 --record-size 1024 --producer-props bootstrap.servers=emr-worker-1:9092 --topic test
    Note The preceding command is run on the emr-header-1 node in the Kafka cluster. You can also run the command on client nodes.
  • Run a Storm topology.
    Log on to the Hadoop cluster, compile the project and copy the examples-1.1-shaded.jar file that under/target/shaded directory to the emr-header-1 node. In this example, the file is stored in the HDFS root directory. Run the following command to submit the topology:
    /usr/lib/storm-current/bin/storm jar examples-1.1-shaded.jar com.aliyun.emr.example.storm.StormKafkaSample test aaa.bbb.ccc.ddd hdfs://emr-header-1:9000 sample
  • View the running status of a topology
    • View the running status of Storm
      You can use the Web UI to view the services on a cluster in the following ways:
      In this topic, we use SSH to access the Web UI. The endpoint is http://localhost:9999/index.html. You can see the topology that we have submitted. Click the topology to view the running logs:
    • View the output files in HDFS
      • View the output files in HDFS.
        [root@emr-header-1 ~]# hadoop fs -ls /foo/
        -rw-r--r--   3 root hadoop     615000 2018-02-11 13:37 /foo/bolt-2-0-1518327393692.txt
        -rw-r--r--   3 root hadoop     205000 2018-02-11 13:37 /foo/bolt-2-0-1518327441777.txt
        [root@emr-header-1 ~]# hadoop fs -cat /foo/bolt-2-0-1518327441777.txt | wc -l
      • Write 120 records of data to the test topic in Kafka.
        [root@emr-header-1 ~]# /usr/lib/kafka-current/bin/ --num-records 120 --throughput 10000 --record-size 1024 --producer-props bootstrap.servers=emr-worker-1:9092 --topic test
        120 records sent, 816.326531 records/sec (0.80 MB/sec), 35.37 ms avg latency, 134.00 ms max latency, 35 ms 50th, 39 ms 95th, 41 ms 99th, 134 ms 99.9th.
      • Output the line number of the HDFS file.
        [root@emr-header-1 ~]# hadoop fs -cat /foo/bolt-2-0-1518327441777.txt | wc -l


We have successfully deployed a Storm cluster and a Kafka cluster on E-MapReduce, run a Storm topology and consumed Kafka data. E-MapReduce also supports the Spark streaming and the Flink components, which can run in Hadoop clusters and process Kafka data.


E-MapReduce does not provide the Storm cluster option. Therefore, we have created a Hadoop cluster and have installed the Storm components. If you do not need to use other components, you can easily disable them in the E-MapReduce console. Then a Hadoop cluster is equivalent to a Storm cluster.