全部产品
Search
文档中心

MaxCompute:Contoh Pipeline

更新时间:Jun 19, 2025

Topik ini menyajikan contoh pipeline MapReduce untuk membantu Anda memahami cara mencapai pemrosesan dan analisis data yang efisien melalui pipeline.

Prasyarat

Selesaikan konfigurasi lingkungan pengujian dengan merujuk ke Memulai.

Persiapan

  1. Siapkan paket JAR program uji. Dalam contoh ini, paket JAR bernama mapreduce-examples.jar disimpan di direktori bin\data\resources dalam jalur instalasi lokal MaxCompute.

  2. Siapkan tabel uji dan sumber daya:

    1. Buat tabel.

      CREATE TABLE wc_in (key STRING, value STRING);
      CREATE TABLE wc_out(key STRING, cnt BIGINT);
    2. Tambahkan sumber daya.

      -- Saat menambahkan paket JAR untuk pertama kali, Anda dapat mengabaikan flag -f.
      add jar data\resources\mapreduce-examples.jar -f;
  3. Gunakan Perintah Tunnel untuk mengimpor file data.txt dari direktori bin klien MaxCompute ke tabel wc_in.

    tunnel upload data.txt wc_in;

    Konten berikut dari file data.txt diimpor ke tabel wc_in:

    hello,odps

Prosedur

Jalankan pipeline WordCount pada klien MaxCompute.

jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.WordCountPipeline wc_in wc_out;

Hasil yang Diharapkan

Jika pekerjaan berhasil, hasil berikut akan dikembalikan:

+------------+------------+
| key        | cnt        |
+------------+------------+
| hello      | 1          |
| odps       | 1          |
+------------+------------+

Contoh Kode

Untuk informasi tentang dependensi Project Object Model (POM), lihat Peringatan.

Berikut adalah contoh kode Java yang menunjukkan cara melakukan statistik frekuensi kata menggunakan MapReduce dan pipeline:

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.Job;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.pipeline.Pipeline;
public class WordCountPipelineTest {
    public static class TokenizerMapper extends MapperBase {
        Record word;
        Record one;
        @Override
            public void setup(TaskContext context) throws IOException {
            word = context.createMapOutputKeyRecord();
            one = context.createMapOutputValueRecord();
            one.setBigint(0, 1L);
        }
        @Override
            public void map(long recordNum, Record record, TaskContext context)
            throws IOException {
            for (int i = 0; i < record.getColumnCount(); i++) {
                String[] words = record.get(i).toString().split("\\s+");
                for (String w : words) {
                    word.setString(0, w);
                    context.write(word, one);
                }
            }
        }
    }
    public static class SumReducer extends ReducerBase {
        private Record value;
        @Override
            public void setup(TaskContext context) throws IOException {
            value = context.createOutputValueRecord();
        }
        @Override
            public void reduce(Record key, Iterator<Record> values, TaskContext context)
            throws IOException {
            long count = 0;
            while (values.hasNext()) {
                Record val = values.next();
                count += (Long) val.get(0);
            }
            value.set(0, count);
            context.write(key, value);
        }
    }
    public static class IdentityReducer extends ReducerBase {
        private Record result;
        @Override
            public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
        }
        @Override
            public void reduce(Record key, Iterator<Record> values, TaskContext context)
            throws IOException {
            while (values.hasNext()) {
                result.set(0, key.get(0));
                result.set(1, values.next().get(0));
                context.write(result);
            }
        }
    }
    public static void main(String[] args) throws OdpsException {
        if (args.length != 2) {
            System.err.println("Usage: WordCountPipeline <in_table> <out_table>");
            System.exit(2);
        }
        Job job = new Job();
        /** Selama pembuatan pipeline, jika Anda tidak menentukan OutputKeySortColumns, PartitionColumns, dan OutputGroupingColumns untuk mapper, kerangka kerja menggunakan OutputKey dari mapper sebagai nilai default parameter-parameter tersebut.
         */
        Pipeline pipeline = Pipeline.builder()
            .addMapper(TokenizerMapper.class)
            .setOutputKeySchema(
            new Column[] { new Column("word", OdpsType.STRING) })
            .setOutputValueSchema(
            new Column[] { new Column("count", OdpsType.BIGINT) })
            .setOutputKeySortColumns(new String[] { "word" })
            .setPartitionColumns(new String[] { "word" })
            .setOutputGroupingColumns(new String[] { "word" })
            .addReducer(SumReducer.class)
            .setOutputKeySchema(
            new Column[] { new Column("word", OdpsType.STRING) })
            .setOutputValueSchema(
            new Column[] { new Column("count", OdpsType.BIGINT)})
            .addReducer(IdentityReducer.class).createPipeline();
        /** Tambahkan pipeline ke jobconf. Jika Anda ingin mengonfigurasi combiner, gunakan jobconf. */
        job.setPipeline(pipeline);
        /** Konfigurasikan tabel input dan output. */
        job.addInput(TableInfo.builder().tableName(args[0]).build());
        job.addOutput(TableInfo.builder().tableName(args[1]).build());
        /**Kirim pekerjaan dan tunggu hingga pekerjaan selesai. */
        job.submit();
        job.waitForCompletion();
        System.exit(job.isSuccessful() == true ? 0 : 1);
    }
}