edit-icon download-icon

Write your first MapReduce

Last Updated: Apr 03, 2018

This documentation is targeted for any organization who wants to try MaxCompute and use SQL and MapReduce with MaxCompute on Alibaba Cloud.

Introduction

MaxCompute is a big data processing platform independently developed by Alibaba. MaxCompute is a highly-efficient, low-cost, and high-availability Big Data processing platform mainly used for batch structural data processing and storage to provide massive data warehouse solutions and Big Data modeling.

Along with the diversified data collection, industrial data has been increasingly accumulated too. The data size has grown up to a massive level (TB, even PB), which the traditional software industry cannot manage. Under the analysis of massive data scenarios, the data analysts usually adopt distributed computing mode due to the limited processing capacity of the single server. But the distributed computing model demands more to the data analysis and is difficult to be maintained. Using the distributed model, data analysts not only must understand the service requirements, but also need to be familiar with the underlying computing model.

Pre-requisite

  • Java JDK 1.6 and later installed in client machine
  • SQL Queries
  • Alibaba Cloud console access and credits
  • Macbook/Linux/Windows PC
  • Java coding experience
  • MapReduce development experience

Activating and provisioning MaxCompute

  1. Log on to the Alibaba Cloud console, and use DataWorks to get into MaxCompute.

    2

    Note:

    DataWorks uses MaxCompute as its core computing and storage engine, to provide massive offline data processing, analysis, and mining capabilities. For more information, see What is MaxCompute.

  2. Log on to the DataWorks console.

    Make sure you complete the previous step before you log on to the DataWorks console. Click Dataworks and go into the console.

    2

  3. Create a project.

    For more information about creating a project, see Create a project.

    Note:

    Make sure you select the region like in International Portal. South East Asia is only available at time of writing this document.

Client configuration on the local machine

  1. Download the MaxCompute local client.

  2. Once you download the client, the folder odpscmd_public-X is created. Open the folder and make sure you can see following folders (in this example, it is done on MacBook Pro).

    2

  3. Go to conf folder and edit odps_config.ini file.

    odps_condig.ini file helps you configure endpoints so that local environment can connect to MaxCompute.

    2

    You are required to change project_name, access_id and access_key with your data.

    • project_name is the project name that you created in the console.

    • access_id & access_key you will get from the Alibaba Cloud console.

    Note:

    Make sure you change the end_point as show in the previous screenshot. Endpoint differs depending on the regions you selected.

  4. After you have configured endpoints, open terminal and go to odpscmd_public-X folder of downloaded client and execute the following queries.

    1. /bin/ospscmd

      After the query execution, SQL Prompt displays as shown in the following figure.

      2

    2. Once you execute whoami at the SQL Prompt, then you can see your details. If the execution succeeds then you have configured it successfully.

Execution of SQL Queries to verify successful connection to odps environment

After configuration, execute the following queries at the SQL Prompt.

  1. create table tbl1(id bigint).
  2. insert overwrite table tbl1 select count(*) from tbl1.
  3. select welcome to MaxCompute! from tbl1.

All the previous queries executes successfully.

Preparing for MapReduce

MapReduce is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster. For more information, see Reference 1 and 2.

A MapReduce program is composed of a Map() procedure (method) that performs filtering and sorting (such as sorting students by first name into queues, one queue for each name) and a Reduce() method that performs a summary operation (such as counting the number of students in each queue, yielding name frequencies). The “MapReduce System” (also called “infrastructure” or “framework”) orchestrates the processing by marshalling the distributed servers, running the various tasks in parallel, managing all communications and data transfers between the various parts of the system, and providing for redundancy and fault tolerance.

Upload file in Maxcompute

  1. Create file in the same client folder in your computer where the client was downloaded. Name it kv.txt.

    2

  2. Contents of kv.txt have 3 lines as follows.

    2

  3. Log on to SQL Prompt by typing ./bin/odpscmd.

    After logon, execute the following queries.

    1. CREATE TABLE wc_in (key STRING, value STRING);
    2. CREATE TABLE wc_out (key STRING, cnt BIGINT);
    3. tunnel u kv.txt wc_in;

    As shown in the following figure, Tunnel Command displays OK which means your file was uploaded successfully.

    2

    Note:

    Tunnel Commands are mainly used to upload or download data.

  4. Once file uploaded successfully, execute select * from wc_in.

    2

Write and execute MapReduce program

With the following steps you can write MapReduce which takes data from wc_in table and create wc_out table with result. Output will be word count of each word in wc_in table.

  1. Configure MapReduce.

    For more information, see Install.

  2. Once you configure, you will get default program in odps template when you create new odps project in Example.

  3. In examples folder, open WordCount.java and replace the main function with the following code and create a jar file with name wordcount.jar.

    Code sample is as follows:

    1. public static void main(String[] args) throws Exception {
    2. JobConf job = new JobConf();
    3. job.setMapperClass(TokenizerMapper.class);
    4. job.setCombinerClass(SumCombiner.class);
    5. job.setReducerClass(SumReducer.class);
    6. job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
    7. job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
    8. //InputUtils.addTable(TableInfo.builder().tableName("wc_in1").cols(new String[] {"col2", "col3"})
    9. //.build(), job);
    10. InputUtils.addTable(TableInfo.builder().tableName("wc_in").cols(new String[] {"key", "value"})
    11. .build(), job);
    12. //InputUtils.addTable(TableInfo.builder().tableName("wc_in2").partSpec("p1=2/p2=1").build(), job);
    13. OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
    14. RunningJob rj = JobClient.runJob(job);
    15. rj.waitForCompletion();
    16. }
  4. Paste the jar file in same folder where you have odps client.

    Once you have created jar file, at the SQL Prompt execute the following query.

    1. add jar wordcount.jar;
  5. Once you add jar, execute the following query on odps query prompt.

    1. jar -resources wordcount.jar -classpath wordcount.jar com.nikesh.gogia.wordcount wc_in wc_out;

    2

    After you execute the query it returns OK.

  6. Query select * from wc_out and you see word count based on input of wc_in.

    2

Conclusion

The purpose of MaxCompute is to provide a convenient way to analyse and process big data. The user can analyse big data without concerning details of distributed computing. You can get option of SQL, MapReduce and R language to query and analyse data.

Thank you! We've received your feedback.