This topic describes how to access the Zeppelin web UI from E-MapReduce (EMR) and use different analytics engines in Zeppelin. After you access the Zeppelin web UI, you can perform big data analytics and visualization on the web UI.

Prerequisites

  • An EMR cluster is created, and Zeppelin is selected from the optional services during the cluster creation. For more information, see Create a cluster.
  • Port 8080 is enabled in the security group where the EMR cluster resides. For more information, see Access the web UIs of open source components.
  • The services that you want to use in Zeppelin, such as Presto, Flink, and Impala, are added to the EMR cluster.

    For more information about how to add a service to an EMR cluster, see Add a service.

Access the Zeppelin web UI

  1. Log on to the Alibaba Cloud EMR console.
  2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.
  3. Click the Cluster Management tab.
  4. On the Cluster Management page, find your cluster and click Details in the Actions column.
  5. In the left-side navigation pane, click Connect Strings.
  6. Click the link of Zeppelin.
    The Zeppelin web UI appears.

Examples of using analytics engines in Zeppelin

The following examples apply only to EMR V3.33.0 and later V3.X.X versions, and EMR V4.6.0 and later V4.X.X versions.

Use Spark in Zeppelin

  1. On the Zeppelin web UI, click Create new note.
  2. In the Create new note dialog box, specify Note Name and select spark for Default Interpreter.
  3. Click create.
  4. On the Notebook page, run commands to analyze data.
    You do not need to configure an interpreter because Spark Interpreter is configured in EMR Zeppelin by default. Spark runs in yarn-cluster mode by default. The following three types of code are supported:
    • Spark Scala
      %spark indicates that Spark Scala code is used.
      %spark
      
      val df = spark.read.options(Map("inferSchema"->"true","delimiter"->";","header"->"true"))
      .csv("file:///usr/lib/spark-current/examples/src/main/resources/people.csv")
      z.show(df)
      df.registerTempTable("people")
      Information similar to that shown in the following figure is returned.Spark Scala
    • PySpark
      %spark.pyspark indicates that PySpark code is used.
      %spark.pyspark
      
      df = spark.read.csv('file:///usr/lib/spark-current/examples/src/main/resources/people.csv',header=True,sep=';')
      df.show()
      Information similar to that shown in the following figure is returned.PySpark
    • Spark SQL
      %spark.sql indicates that Spark SQL code is used.
      %spark.sql
      
      show tables;
      select * from people;
      Information similar to that shown in the following figure is returned.Spark Sql

Use Flink in Zeppelin

  1. On the Zeppelin web UI, click Create new note.
  2. In the Create new note dialog box, specify Note Name and select flink for Default Interpreter.
  3. Click create.
  4. On the Notebook page, run commands to analyze data.
    You do not need to configure an interpreter because Flink Interpreter is configured in EMR Zeppelin by default. The following three types of code are supported:
    • Flink Scala
      %flink indicates that Flink Scala code is used.
      %flink
      
      val data = benv.fromElements("hello world","hello flink","hello hadoop")
      data.flatMap(line => line.split("\\s"))
                           .map(w => (w,1))
                           .groupBy(0)
                           .sum(1)
                           .print()
      Information similar to that shown in the following figure is returned.Flink Scala
    • PyFlink
      %flink.pyflink indicates that PyFlink code is used.PyFlink
    • Flink SQL

      %flink.ssql indicates that Flink SQL code is used.

      Before you run Flink SQL code, you must run the following code to build simulated log data.
      %flink
      
      import org.apache.flink.streaming.api.functions.source.SourceFunction
      import org.apache.flink.table.api.TableEnvironment
      import org.apache.flink.streaming.api.TimeCharacteristic
      import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
      import org.apache.flink.runtime.state.filesystem.FsStateBackend
      
      import java.util.Collections
      import scala.collection.JavaConversions._
      
      
      senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      senv.enableCheckpointing(5000)
      senv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));
      
      
      val data = senv.addSource(new SourceFunction[(Long, String)] with ListCheckpointed[java.lang.Long] {
      
      
        val pages = Seq("home", "search", "search", "product", "product", "product")
        var count: Long = 0
        var running : Boolean = true
        // startTime is 2020/1/1
        var startTime: Long = new java.util.Date(2020 - 1900,0,1).getTime
        var sleepInterval = 100
      
        override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = {
          val lock = ctx.getCheckpointLock
      
          while (count < 1000000 && running) {
            lock.synchronized({
              ctx.collect((startTime + count * sleepInterval, pages(count.toInt % pages.size)))
              count += 1
              Thread.sleep(sleepInterval)
            })
          }
        }
      
        override def cancel(): Unit = {
          running = false
        }
      
        override def snapshotState(checkpointId: Long, timestamp: Long): java.util.List[java.lang.Long] = {
          Collections.singletonList(count)
        }
      
        override def restoreState(state: java.util.List[java.lang.Long]): Unit = {
          state.foreach(s => count = s)
        }
      
      }).assignAscendingTimestamps(_._1)
      
      stenv.registerDataStream("log", data, 'time, 'url, 'rowtime.rowtime)
      Zeppelin supports all types of Flink SQL statements, including DDL and DML statements. You can also use Flink to visualize streaming data in Zeppelin. Flink supports the following visualization modes:
      • Single mode

        The Single mode applies to the scenario where only one row of output is returned. This mode is not suitable for graphical display. The following SELECT statement is used as an example. This SQL statement contains only one row of data, which is continuously updated. The output of this statement is in the HTML format. You can use the template parameter to specify an output template. {i} indicates the placeholder of the i-th column.

        %flink.ssql(type=single,parallelism=1,refreshInterval=1000,template=<h1>{1}</h1> until <h2>{0}</h2>)
        
        select max(rowtime),count(1) from log
        Information similar to that shown in the following figure is returned.single
      • Update mode
        The Update mode applies to the scenario where multiple rows of output are returned. The following SELECT GROUP BY statement is used as an example. The multiple rows of data are updated on a regular basis. The output is in the table format of Zeppelin. Therefore, you can use the built-in visualization control of Zeppelin to visualize the streaming data.
        %flink.ssql(type=update,parallelism=1,refreshInterval=2000)
        
        select url,count(1) as pv from log group by url
        Information similar to that shown in the following figure is returned.update
      • Append mode
        The Append mode applies to the scenario in which new output is continuously returned and appended to existing data. The following window-based GROUP BY statement is used as an example. The Append mode requires that the data type of the first column is TIMESTAMP. In this example, the start_time column is of the TIMESTAMP type.
        %flink.ssql(type=append,parallelism=1,refreshInterval=2000,threshold=60000)
        
        select TUMBLE_START(rowtime,INTERVAL '5' SECOND) start_time,url,count(1) as pv from log
        group by TUMBLE(rowtime,INTERVAL '5' SECOND),url
        Information similar to that shown in the following figure is returned.append
        If the chart shown in the preceding figure is not displayed, your chart settings may be invalid. You must click settings, configure the required parameters shown in the following figure, and run the code again.append_set

Use Presto in Zeppelin

  1. On the Zeppelin web UI, click Create new note.
  2. In the Create new note dialog box, specify Note Name and select presto for Default Interpreter.
  3. Click create.
  4. On the Notebook page, run commands to view table information.
    %presto indicates that Presto SQL code is used. You do not need to configure Presto. Zeppelin automatically connects to the Presto service of the EMR cluster.
    %presto
    
    show tables;
    select * from test_1;
    Information similar to that shown in the following figure is returned.Presto

Use Impala in Zeppelin

  1. On the Zeppelin web UI, click Create new note.
  2. In the Create new note dialog box, specify Note Name and select impala for Default Interpreter.
  3. Click create.
  4. On the Notebook page, run commands to view table information.
    %impala indicates that Impala SQL code is used. You do not need to configure Impala. Zeppelin automatically connects to the Impala service of the EMR cluster.
    %impala
    
    drop table if exists test_1;
    create table test_1(id int,name string);
    insert into  test_1 values(1,'test1');
    insert into  test_1 values(2,'test2');
    select * from test_1;
    Information similar to that shown in the following figure is returned.impala

Use Hive in Zeppelin

  1. On the Zeppelin web UI, click Create new note.
  2. In the Create new note dialog box, specify Note Name and select hive for Default Interpreter.
  3. Click create.
  4. On the Notebook page, run commands to view table information.
    %hive indicates that Hive SQL code is used. You do not need to configure Hive. Zeppelin automatically connects to the Hive Thrift Server service of the EMR cluster.
    %hive
    
    show tables;
    select * from test_1;
    Information similar to that shown in the following figure is returned.hive