全部产品
Search
文档中心

MaxCompute:Contoh WordCount

更新时间:Jun 19, 2025

Topik ini menjelaskan cara menjalankan WordCount di MapReduce.

Prasyarat

Selesaikan konfigurasi lingkungan untuk pengujian, lihat Memulai.

Persiapan

  1. Siapkan paket JAR program uji. Dalam topik ini, paket JAR diberi nama mapreduce-examples.jar dan disimpan di direktori bin\data\resources dalam jalur instalasi lokal MaxCompute.

    1. Buat tabel uji.

      CREATE TABLE wc_in (key STRING, value STRING);
      CREATE TABLE wc_out (key STRING, cnt BIGINT);
    2. Tambahkan sumber daya uji.

      -- Saat menambahkan paket JAR untuk pertama kali, Anda dapat mengabaikan flag -f.
      add jar data\resources\mapreduce-examples.jar -f;
  2. Gunakan Tunnel untuk mengimpor file data.txt dari direktori bin klien MaxCompute ke tabel wc_in.

    tunnel upload data.txt wc_in;

    Data berikut diimpor ke tabel wc_in:

    hello,odps

Prosedur

Jalankan WordCount pada klien MaxCompute.

jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out

Hasil yang Diharapkan

Pekerjaan berjalan normal. Data berikut dikembalikan di tabel wc_out:

+------------+------------+
| key        | cnt        |
+------------+------------+
| hello      | 1          |
| odps       | 1          |
+------------+------------+

Kode Contoh

Untuk informasi tentang dependensi Project Object Model (POM), lihat Peringatan.

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
public class WordCount {
    public static class TokenizerMapper extends MapperBase {
        private Record word;
        private Record one;
        @Override
            public void setup(TaskContext context) throws IOException {
            word = context.createMapOutputKeyRecord();
            one = context.createMapOutputValueRecord();
            one.set(new Object[] { 1L });
            System.out.println("TaskID:" + context.getTaskID().toString());
        }
        @Override
            public void map(long recordNum, Record record, TaskContext context)
            throws IOException {
            for (int i = 0; i < record.getColumnCount(); i++) {
                word.set(new Object[] { record.get(i).toString() });
                context.write(word, one);
            }
        }
    }
    /**
       * Kelas combiner yang menggabungkan output peta dengan menjumlahkannya.
       **/
    public static class SumCombiner extends ReducerBase {
        private Record count;
        @Override
            public void setup(TaskContext context) throws IOException {
            count = context.createMapOutputValueRecord();
        }
        /** Combiner mengimplementasikan antarmuka yang sama seperti reducer. Combiner memungkinkan Anda segera menjalankan pekerjaan reduce lokal pada mapper untuk mengurangi output mapper. */
        @Override
            public void reduce(Record key, Iterator<Record> values, TaskContext context)
            throws IOException {
            long c = 0;
            while (values.hasNext()) {
                Record val = values.next();
                c += (Long) val.get(0);
            }
            count.set(0, c);
            context.write(key, count);
        }
    }
    /**
       * Kelas reducer yang hanya mengeluarkan jumlah nilai input.
       **/
    public static class SumReducer extends ReducerBase {
        private Record result = null;
        @Override
            public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
        }
        @Override
            public void reduce(Record key, Iterator<Record> values, TaskContext context)
            throws IOException {
            long count = 0;
            while (values.hasNext()) {
                Record val = values.next();
                count += (Long) val.get(0);
            }
            result.set(0, key.get(0));
            result.set(1, count);
            context.write(result);
        }
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: WordCount <in_table> <out_table>");
            System.exit(2);
        }
        JobConf job = new JobConf();
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(SumCombiner.class);
        job.setReducerClass(SumReducer.class);
        /** Konfigurasikan skema yang mendefinisikan output perantara mapper sebagai pasangan key-value. Output perantara mapper ada sebagai catatan. */
        job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
        /** Konfigurasikan informasi tentang tabel input dan output. */
        InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job);
        OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
        JobClient.runJob(job);
    }
}