すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:Aggregator メカニズム

最終更新日:Mar 27, 2026

Aggregator は MaxCompute Graph の一般的な機能であり、分散ジョブにおいてすべてのワーカーにわたるグローバル情報を集約・処理します。これを使用して、グローバル条件が満たされているかどうか(機械学習における収束など)を確認したり、複数のワーカーにまたがる統計情報を維持したりできます。

仕組み

Aggregator のロジックは、部分集約を行うすべてのワーカーと、グローバル集約を行う単一の指定されたワーカー(Aggregator オーナー)の 2 か所で実行されます。

各スーパーステップは以下のシーケンスに従います。

  1. 各ワーカーは起動時に createStartupValue を呼び出して、AggregatorValue を作成します。

  2. 各反復の開始時に、各ワーカーは createInitialValue を呼び出して、その反復用の AggregatorValue を初期化します。

  3. 反復中に、各頂点は context.aggregate() を呼び出し、これにより aggregate() がトリガーされてワーカー上で部分結果が構築されます。

  4. 各ワーカーはその部分結果を Aggregator オーナーワーカーに送信します。

  5. Aggregator オーナーワーカーは merge を繰り返し呼び出して、すべての部分結果をグローバル集約結果に結合します。

  6. Aggregator オーナーワーカーは terminate を呼び出してグローバル結果を確定し、反復を終了するかどうかを決定します。

その後、グローバル結果は次のスーパーステップの開始時にすべてのワーカーに配信されます。

Implementation mechanism

API オペレーション

Aggregator には 5 つの API オペレーションがあります。このうち 3 つはすべてのワーカーで実行され、部分集約を処理します。残りの 2 つは Aggregator オーナーワーカーでのみ実行され、グローバル集約を処理します。

API 実行環境 呼び出し元 目的
createStartupValue(context) すべてのワーカー フレームワーク(各スーパーステップ前に 1 回) 初期化 AggregatorValue
createInitialValue(context) すべてのワーカー フレームワーク(各スーパーステップ開始時に 1 回) 現在の反復用の AggregatorValue の初期化
aggregate(value, item) すべてのワーカー 明示的な呼び出し ComputeContext#aggregate(item) 部分集約
merge(value, partial) Aggregator オーナーのみ フレームワーク 部分結果をマージしてグローバル結果を生成
terminate(context, value) Aggregator オーナーのみ merge() グローバル結果を確定し、true を返して反復を終了

createStartupValue(context)

各スーパーステップ開始前に、すべてのワーカーで 1 回呼び出されます。AggregatorValue を初期化するために使用します。スーパーステップ 0 では、WorkerContext.getLastAggregatedValue() または ComputeContext.getLastAggregatedValue() を呼び出して初期化済みオブジェクトを取得します。

createInitialValue(context)

各スーパーステップの開始時に、すべてのワーカーで 1 回呼び出されます。現在の反復用の AggregatorValue を初期化するために使用します。通常、WorkerContext.getLastAggregatedValue() を呼び出して前回の反復の結果を取得し、それをもとに初期化します。

aggregate(value, item)

すべてのワーカーで呼び出されます。createStartupValuecreateInitialValue とは異なり、このメソッドは自動的に呼び出されません。頂点コードが ComputeContext#aggregate(item) を呼び出したときにトリガーされます。

  • value:このスーパーステップにおけるワーカーの現在の集約結果(createInitialValue によって初期化)

  • itemComputeContext#aggregate(item) によって渡された値

valueitem を使って更新し、部分結果を構築します。すべての aggregate 呼び出しが完了すると、フレームワークは value を Aggregator オーナーワーカーに送信します。

merge(value, partial)

Aggregator オーナーワーカーで呼び出され、すべてのワーカーからの部分結果を結合します。

  • value:実行中のグローバル集約結果

  • partial:ワーカーから受信した部分結果

partial を使用して value を更新します。たとえば、ワーカー w0、w1、w2 が部分結果 p0、p1、p2 を生成し、順に p1、p0、p2 で到着した場合:

  1. merge(p1, p0) — p1 が p0 を含むように更新されます

  2. merge(p1, p2) — p1 が p2 を含むように更新され、p1 がグローバル集約結果となります

ワーカーが 1 つしかない場合は、merge() は呼び出されません。

terminate(context, value)

Aggregator オーナーワーカーで、すべての merge() 呼び出しが完了した後に呼び出されます。value にはグローバル集約結果が含まれます。

必要に応じて value を変更し、次のように返します。

  • true — ジョブ全体の反復を終了

  • false — 次の反復に進む

terminate() が返された後、フレームワークはグローバル集約オブジェクトを次のスーパーステップのためにすべてのワーカーに配信します。収束が完了した時点で true を返すことでジョブを即座に停止します。これは機械学習のシナリオで一般的なパターンです。

k 平均法クラスタリングの例

以下の例では、k 平均法クラスタリング用の Aggregator の実装方法を示します。主なロジックは Aggregator クラスに集中しており、ワーカー間の部分集約を調整し、収束を推進します。

完全なソースコードについては、Kmeans.gz をダウンロードしてください。以下のコードはリファレンスとして抜粋したものです。

GraphLoader

KmeansReader は入力テーブルの各行を頂点としてロードします。recordNum が頂点 ID となり、行データは頂点値として DenseVector に格納されます。(DenseVectormatrix-toolkits-java に由来します。)

public static class KmeansValue implements Writable {
    DenseVector sample;
    public KmeansValue() {
    }
    public KmeansValue(DenseVector v) {
        this.sample = v;
    }
    @Override
        public void write(DataOutput out) throws IOException {
        wirteForDenseVector(out, sample);
    }
    @Override
        public void readFields(DataInput in) throws IOException {
        sample = readFieldsForDenseVector(in);
    }
}
public static class KmeansReader extends
    GraphLoader<LongWritable, KmeansValue, NullWritable, NullWritable> {
    @Override
        public void load(
        LongWritable recordNum,
        WritableRecord record,
        MutationContext<LongWritable, KmeansValue, NullWritable, NullWritable> context)
        throws IOException {
        KmeansVertex v = new KmeansVertex();
        v.setId(recordNum);
        int n = record.size();
        DenseVector dv = new DenseVector(n);
        for (int i = 0; i < n; i++) {
            dv.set(i, ((DoubleWritable)record.get(i)).get());
        }
        v.setValue(new KmeansValue(dv));
        context.addVertexRequest(v);
    }
}

頂点

各頂点はサンプルを部分集約に提供します。compute ロジック全体は、単一の context.aggregate() 呼び出しで構成されます。

public static class KmeansVertex extends
    Vertex<LongWritable, KmeansValue, NullWritable, NullWritable> {
    @Override
        public void compute(
        ComputeContext<LongWritable, KmeansValue, NullWritable, NullWritable> context,
        Iterable<NullWritable> messages) throws IOException {
        context.aggregate(getValue()); // この頂点のサンプルを部分集約に提出
    }
}

Aggregator

KmeansAggrValue は、ワーカー間で集約され、各スーパーステップで再配信されるデータを保持します。

public static class KmeansAggrValue implements Writable {
    DenseMatrix centroids; // 現在のクラスターセンターの K x m マトリックス
    DenseMatrix sums;      // センター再計算用のクラスターディメンションごとの累積和
    DenseVector counts;    // 各クラスターセンターに割り当てられたサンプル数
    @Override
        public void write(DataOutput out) throws IOException {
        wirteForDenseDenseMatrix(out, centroids);
        wirteForDenseDenseMatrix(out, sums);
        wirteForDenseVector(out, counts);
    }
    @Override
        public void readFields(DataInput in) throws IOException {
        centroids = readFieldsForDenseMatrix(in);
        sums = readFieldsForDenseMatrix(in);
        counts = readFieldsForDenseVector(in);
    }
}

sums(i,j) は、センター i に最も近いすべてのサンプルのディメンション j の合計を格納します。counts と併用することで、各スーパーステップで新しいセンター位置を再計算します。

createStartupValuecenters キャッシュファイルから初期センターを読み取り、sums および counts をゼロに初期化します。

public static class KmeansAggregator extends Aggregator<KmeansAggrValue> {
    public KmeansAggrValue createStartupValue(WorkerContext context) throws IOException {
        KmeansAggrValue av = new KmeansAggrValue();
        byte[] centers = context.readCacheFile("centers"); // 初期クラスターセンターをロード
        String lines[] = new String(centers).split("\n");
        int rows = lines.length;
        int cols = lines[0].split(",").length; // 行数 >= 1 を前提とする
        av.centroids = new DenseMatrix(rows, cols);
        av.sums = new DenseMatrix(rows, cols);
        av.sums.zero();   // 最初のスーパーステップ前にゼロに初期化
        av.counts = new DenseVector(rows);
        av.counts.zero(); // 最初のスーパーステップ前にゼロに初期化
        for (int i = 0; i < lines.length; i++) {
            String[] ss = lines[i].split(",");
            for (int j = 0; j < ss.length; j++) {
                av.centroids.set(i, j, Double.valueOf(ss[j]));
            }
        }
        return av;
    }
}

createInitialValuesums および counts をゼロにリセットしながら、前回の反復の centroids は保持します。

@Override
public KmeansAggrValue createInitialValue(WorkerContext context)
    throws IOException {
    KmeansAggrValue av = (KmeansAggrValue)context.getLastAggregatedValue(0);
    // アキュムレーターをリセットし、前回の反復の重心を保持
    av.sums.zero();
    av.counts.zero();
    return av;
}

aggregate — 各サンプルに対して最も近い重心を見つけ、sums および counts を累積します(各ワーカーでの部分集約)。

@Override
public void aggregate(KmeansAggrValue value, Object item)
    throws IOException {
    DenseVector sample = ((KmeansValue)item).sample;
    int min = findNearestCentroid(value.centroids, sample); // 最も近いクラスターセンターを検索
    for (int i = 0; i < sample.size(); i ++) {
        value.sums.add(min, i, sample.get(i)); // サンプルディメンションを累積
    }
    value.counts.add(min, 1.0d); // このクラスターのサンプル数をインクリメント
}

merge — すべてのワーカーからの部分結果を、sums および counts を合計することで結合します(Aggregator オーナーワーカーでのグローバル集約)。

@Override
public void merge(KmeansAggrValue value, KmeansAggrValue partial)
    throws IOException {
    value.sums.add(partial.sums);     // このワーカーからの sums を累積
    value.counts.add(partial.counts); // このワーカーからの counts を累積
}

terminate — 新しいクラスターセンターを計算し、ユークリッド距離を用いてしきい値 0.05 で収束をチェックし、反復を終了するかどうかを決定します。

@Override
public boolean terminate(WorkerContext context, KmeansAggrValue value)
    throws IOException {
    // 集約された sums および counts から新しいセンターを計算
    DenseMatrix newCentriods = calculateNewCentroids(value.sums, value.counts, value.centroids);
    // デバッグ用に古い重心と新しい重心を出力
    System.out.println("\nsuperstep: " + context.getSuperstep() +
                       "\nold centriod:\n" + value.centroids + " new centriod:\n" + newCentriods);
    boolean converged = isConverged(newCentriods, value.centroids, 0.05d); // ユークリッド距離のしきい値
    System.out.println("superstep: " + context.getSuperstep() + "/"
                       + (context.getMaxIteration() - 1) + " converged: " + converged);
    if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {
        // 収束した、または最大反復回数に達した — 最終的なセンターを書き込み、停止
        for (int i = 0; i < newCentriods.numRows(); i++) {
            Writable[] centriod = new Writable[newCentriods.numColumns()];
            for (int j = 0; j < newCentriods.numColumns(); j++) {
                centriod[j] = new DoubleWritable(newCentriods.get(i, j));
            }
            context.write(centriod);
        }
        return true;  // 反復を終了
    }
    value.centroids.set(newCentriods); // 次の反復のためにセンターを更新
    return false; // 反復を継続
}

main メソッド

main メソッドは GraphJob を構築し、すべてのコンポーネントクラスを設定してジョブを送信します。デフォルトの最大反復回数は 30 で、第 3 引数で設定可能です。

public static void main(String[] args) throws IOException {
    if (args.length < 2)
        printUsage();
    GraphJob job = new GraphJob();
    job.setGraphLoaderClass(KmeansReader.class);
    job.setRuntimePartitioning(false); // 各ワーカーが独自のデータパーティションをロードして保持
    job.setVertexClass(KmeansVertex.class);
    job.setAggregatorClass(KmeansAggregator.class); // Aggregator 実装を登録
    job.addInput(TableInfo.builder().tableName(args[0]).build());
    job.addOutput(TableInfo.builder().tableName(args[1]).build());
    // デフォルトの最大反復回数は 30
    job.setMaxIteration(30);
    if (args.length >= 3)
        job.setMaxIteration(Integer.parseInt(args[2]));
    long start = System.currentTimeMillis();
    job.run();
    System.out.println("Job Finished in "
                       + (System.currentTimeMillis() - start) / 1000.0 + " seconds");
}
job.setRuntimePartitioningfalse に設定されている場合、各ワーカーによってロードされたデータは partitioner によってパーティション分割されません。各ワーカーが独自のデータをロードして保持します。