すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:パーティションテーブルを入力として使用する例

最終更新日:Jan 07, 2025

このトピックでは、パーティション分割テーブルをMapReduceジョブの入力として使用する方法の例を示します。

  • 例 1:

    public static void main(String[] args) throws Exception {
        JobConf job = new JobConf();
        ...
            LinkedHashMap<String, String> input = new LinkedHashMap<String, String>();
        input.put("pt", "123456");
        InputUtils.addTable(TableInfo.builder().tableName("input_table").partSpec(input).build(), job);
        LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
        output.put("ds", "654321");
        OutputUtils.addTable(TableInfo.builder().tableName("output_table").partSpec(output).build(), job);
        JobClient.runJob(job);
    }
  • 例 2:

    package com.aliyun.odps.mapred.open.example;
    ...
        public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: WordCount <in_table> <out_table>");
            System.exit(2);
        }
        JobConf job = new JobConf();
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(SumCombiner.class);
        job.setReducerClass(SumReducer.class);
        job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
        // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the Resource Access Management (RAM) console.
        // In this example, the AccessKey ID and AccessKey secret are configured as environment variables. You can also save your AccessKey pair in the configuration file based on your business requirements.
        // We recommend that you do not directly specify the AccessKey ID and AccessKey secret in code to prevent AccessKey pair leaks.
        Account account = new AliyunAccount(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        Odps odps = new Odps(account);
        odps.setEndpoint("odps_endpoint_url");
        odps.setDefaultProject("my_project");
        Table table = odps.tables().get(tblname);
        TableInfoBuilder builder = TableInfo.builder().tableName(tblname);
        for (Partition p : table.getPartitions()) {
            if (applicable(p)) {
                LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>();
                for (String key : p.getPartitionSpec().keys()) {
                    partSpec.put(key, p.getPartitionSpec().get(key));
                }
                InputUtils.addTable(builder.partSpec(partSpec).build(), job);
            }
        }
        OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
        JobClient.runJob(job);
    }
説明
  • 例2では、MaxCompute SDKとMapReduce SDKを組み合わせて、特定のパーティションからデータを読み取るMapReduceタスクを実装しています。

  • 上記のコードは実行用にコンパイルできません。 これは主な機能の一例にすぎません。

  • 適用可能な関数は、パーティションをMapReduceジョブの入力として使用できるかどうかを決定するカスタムコードロジックです。