すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:Aggregator実装メカニズム

最終更新日:Jan 07, 2025

このトピックでは、Aggregatorの実装メカニズムと関連するAPI操作について説明します。 また、k-meansクラスタリングを使用してAggregatorを適用する方法についても説明します。

Aggregatorは、MaxCompute Graphジョブの一般的な機能です。 機械学習の問題を処理するのに最も適しています。 MaxCompute Graphでは、Aggregatorを使用してグローバル情報を集計および処理します。

実装メカニズム

Aggregatorのロジックは2つの部分に分かれています。

  • 1つの部分は、分散モードですべてのワーカーに実装されます。

  • 他の部分は、Aggregatorの所有者が単一頂点モードにあるワーカーにのみ実装されます。

初期値が作成され、これらの値の一部が各ワーカーに集計されます。 次に、すべてのワーカーの部分的な集計結果が、Aggregatorの所有者が存在するワーカーに送信されます。 次に、このワーカーは、受け取った部分集約オブジェクトをグローバル集約結果に集約し、反復を終了するかどうかを判断します。 グローバル集計結果は、反復のために次のスーパーステップですべてのワーカーに配布されます。 Implementation mechanism

Aggregatorのプロセス:

  1. 各ワーカーが起動すると、createStartupValueを実行してAggregatorValueを作成します。

  2. 各イテレーションの開始前に、各ワーカーはcreateInitialValueを実行して、イテレーションのAggregatorValueを初期化します。

  3. イテレーションでは、各頂点がcontext.aggregate() を使用してaggregate() を呼び出し、ワーカーで部分イテレーションを実装します。

  4. 各ワーカーは、部分的な反復結果を、Aggregatorの所有者が存在するワーカーに送信します。

  5. Aggregatorの所有者が存在するワーカーは、mergeを複数回実行してグローバル集計を実装します。

  6. Aggregatorの所有者が存在するワーカーは、グローバル集計結果を処理するためにterminateを実行し、反復を終了するかどうかを決定します。

API 操作

Aggregatorは5つのAPI操作を提供します。 このセクションでは、これらのAPI操作を使用するタイミングと方法について説明します。

  • createStartupValue (コンテキスト)

    このAPI操作は、各スーパーステップが開始される前にすべてのワーカーで1回実行されます。 AggregatorValueを初期化するために使用されます。 最初のスーパーステップ (スーパーステップ0) では、WorkerContext.getLastAggregatedValue() またはComputeContext.getLastAggregatedValue() を呼び出して、初期化されたAggregatorValueオブジェクトを取得します。

  • createInitialValue (コンテキスト)

    このAPI操作は、各スーパーステップの開始時にすべてのワーカーで1回実行されます。 現在のイテレーションのAggregatorValueを初期化するために使用されます。 ほとんどの場合、WorkerContext.getLastAggregatedValue() が呼び出されて、前の反復の結果が取得されます。 次に、部分初期化が実施される。

  • 集計 (値、アイテム)

    このAPI操作は、すべてのワーカーに対しても実行されます。 これは、ComputeContext#aggregate(item) への明示的な呼び出しによってトリガーされますが、前の2つのAPI操作はフレームワークによって自動的に呼び出されます。 このAPI操作は、部分集約を実装するために使用されます。 最初のパラメーターは、現在のスーパーステップでのワーカーの集計結果を示します。 初期値は、createInitialValueによって返されるオブジェクトです。 2番目のパラメーターは、コードを使用してComputeContext#aggregate(item) が呼び出されたときに渡されます。 このAPI操作では、ほとんどの場合、itemを使用して集計のを更新します。 すべてのaggregate操作が実行された後、取得されたがワーカーの部分集計結果になります。 次に、結果は、フレームワークによって、アグリゲータの所有者が存在するワーカーに送信される。

  • merge(value, partial)

    このAPI操作は、Aggregatorの所有者が存在するワーカーで実行されます。 ワーカーの部分集計結果をマージしてグローバル集計オブジェクトを取得するために使用されます。 aggregateと同様に、このAPI操作のvalueは集計結果を示し、partialは集計するオブジェクトを示します。 partialvalueの更新に使用されます。

    例えば、3つのワーカーw0, w1, w2は、部分集約結果p0, p1, p2を生成する。 p1、p0、およびp2が、Aggregatorの所有者が存在するワーカーに順番に送信される場合、マージ操作は次の順序で実行されます。

    1. まず、merge(p1, p0) を実行し、p1とp0をp1として集約する。

    2. merge(p1, p2) を実行してp1とp2をp1として集約する。 p1は、このスーパーステップにおけるグローバル集計結果である。

    したがって、ワーカーが1つしか存在しない場合、mergeメソッドは必要ありません。 この場合、merge() は呼び出されません。

  • terminate (コンテキスト、値)

    Aggregatorの所有者が存在するワーカーがmerge() を実行した後、フレームワークはterminate(context, value) を呼び出して最終処理を実行します。 2番目のパラメーターは、merge() を呼び出して取得したグローバル集計結果を示します。 グローバル集計結果は、このAPI操作でさらに変更できます。 terminate() が実行されると、フレームワークは次のスーパーステップのためにグローバル集計オブジェクトをすべてのワーカーに配布します。 terminate() にtrueが返された場合、ジョブ全体のイテレーションが終了します。 そうでない場合、反復は継続します。 収束完了後にtrueが返された場合は、直ちにジョブを終了します。 これは機械学習シナリオに適用されます。

K-meansクラスタリングの例

このセクションでは、例としてk-meansクラスタリングを使用して、Aggregatorの使用方法を示します。

説明

完全なコードが必要な場合は、「Kmeans」をご参照ください。 このセクションでは、コードは解析されており、参照用です。

  • GraphLoader

    GraphLoaderは、入力テーブルを読み込み、それをグラフの頂点またはエッジに変換するために使用されます。 この例では、入力テーブルのデータの各行はサンプルであり、各サンプルは頂点を構成し、頂点値はサンプルを格納するために使用されます。

    書き込み可能なクラスKmeansValueは、頂点の値型として定義されます。

    public static class KmeansValue implements Writable {
        DenseVector sample;
        public KmeansValue() {
        }
        public KmeansValue(DenseVector v) {
            this.sample = v;
        }
        @Override
            public void write(DataOutput out) throws IOException {
            wirteForDenseVector(out, sample);
        }
        @Override
            public void readFields(DataInput in) throws IOException {
            sample = readFieldsForDenseVector(in);
        }
    }

    DenseVectorオブジェクトは、サンプルを格納するためにKmeansValueにカプセル化されます。 DenseVector型は、matrix-toolkits-javaに由来します。 wirteForDenseVector()readFieldsForDenseVector() は、シリアル化と逆シリアル化に使用されます。

    カスタムKmeansReaderコード:

    public static class KmeansReader extends
        GraphLoader<LongWritable, KmeansValue, NullWritable, NullWritable> {
        @Override
            public void load(
            LongWritable recordNum,
            WritableRecord record,
            MutationContext<LongWritable, KmeansValue, NullWritable, NullWritable> context)
            throws IOException {
            KmeansVertex v = new KmeansVertex();
            v.setId(recordNum);
            int n = record.size();
            DenseVector dv = new DenseVector(n);
            for (int i = 0; i < n; i++) {
                dv.set(i, ((DoubleWritable)record.get(i)).get());
            }
            v.setValue(new KmeansValue(dv));
            context.addVertexRequest(v);
        }
    }

    KmeansReaderでは、データの各行 (レコード) を読み取るときに頂点が作成されます。 recordNumが頂点IDとして使用され、recordコンテンツがDenseVectorオブジェクトに変換され、VertexValueにカプセル化されます。

  • Vertex

    カスタムKmeansVertexコード: 上記のコードのロジックは、反復ごとに維持されるサンプルの部分集計を実装することです。 ロジックの詳細については、次のセクションのAggregatorの実装を参照してください。

    public static class KmeansVertex extends
        Vertex<LongWritable, KmeansValue, NullWritable, NullWritable> {
        @Override
            public void compute(
            ComputeContext<LongWritable, KmeansValue, NullWritable, NullWritable> context,
            Iterable<NullWritable> messages) throws IOException {
            context.aggregate(getValue());
        }
    }
  • Aggregator

    k-meansの主なロジックはAggregatorに集中しています。 カスタムKmeansAggrValueは、集約および配布するコンテンツを維持するために使用されます。

    public static class KmeansAggrValue implements Writable {
        DenseMatrix centroids;
        DenseMatrix sums; // used to recalculate new centroids
        DenseVector counts; // used to recalculate new centroids
        @Override
            public void write(DataOutput out) throws IOException {
            wirteForDenseDenseMatrix(out, centroids);
            wirteForDenseDenseMatrix(out, sums);
            wirteForDenseVector(out, counts);
        }
        @Override
            public void readFields(DataInput in) throws IOException {
            centroids = readFieldsForDenseMatrix(in);
            sums = readFieldsForDenseMatrix(in);
            counts = readFieldsForDenseVector(in);
        }
    }

    上記のコードでは、KmeansAggrValueに3つのオブジェクトが保持されています。

    • centroids: 既存のKセンターを示します。 サンプルがm次元である場合、重心はK × mの行列である。

    • sums: centroidsと同じサイズの行列を示します。 各要素は、特定の中心に最も近いサンプルの特定の寸法の合計を記録する。 例えば、sums(i,j) は、中心iに最も近いサンプルの次元jの合計を示す。

    • countsはK次元のベクトルです。 それは、各中心に最も近いサンプルの数を記録する。 countsは、集約されるメインコンテンツである新しい中心を計算するためにsumsとともに使用されます。

    KmeansAggregatorは、カスタムAggregator実装クラスです。 次のセクションでは、上記のAPI操作の実装について説明します。

    1. createStartupValue() の実装

      public static class KmeansAggregator extends Aggregator<KmeansAggrValue> {
          public KmeansAggrValue createStartupValue(WorkerContext context) throws IOException {
              KmeansAggrValue av = new KmeansAggrValue();
              byte[] centers = context.readCacheFile("centers");
              String lines[] = new String(centers).split("\n");
              int rows = lines.length;
              int cols = lines[0].split(",").length; // assumption rows >= 1
              av.centroids = new DenseMatrix(rows, cols);
              av.sums = new DenseMatrix(rows, cols);
              av.sums.zero();
              av.counts = new DenseVector(rows);
              av.counts.zero();
              for (int i = 0; i < lines.length; i++) {
                  String[] ss = lines[i].split(",");
                  for (int j = 0; j < ss.length; j++) {
                      av.centroids.set(i, j, Double.valueOf(ss[j]));
                  }
              }
              return av;
          }
      }

      このメソッドは、KmeansAggrValueオブジェクトを初期化し、centersファイルから初期中心を読み取り、centroidsに値を割り当てます。 sumcountの初期値は0です。

    2. createInitialValue() の実装

      @Override
      public KmeansAggrValue createInitialValue(WorkerContext context)
          throws IOException {
          KmeansAggrValue av = (KmeansAggrValue)context.getLastAggregatedValue(0);
          // reset for next iteration
          av.sums.zero();
          av.counts.zero();
          return av;
      }

      このメソッドは、最初に前の反復のKmeansAggrValueを取得し、sumおよびcountの値をクリアします。 前回の反復の重心値のみが保持されます。

    3. aggregate() の実装

      @Override
      public void aggregate(KmeansAggrValue value, Object item)
          throws IOException {
          DenseVector sample = ((KmeansValue)item).sample;
          // find the nearest centroid
          int min = findNearestCentroid(value.centroids, sample);
          // update sum and count
          for (int i = 0; i < sample.size(); i ++) {
              value.sums.add(min, i, sample.get(i));
          }
          value.counts.add(min, 1.0d);
      }

      このメソッドは、findNearestCentroid() を呼び出して、サンプルitemに最も近い中心のインデックスを見つけ、sumsを使用してすべてのディメンションを合計し、countの値を1だけ増分します。

    上記の3つの方法をすべてのワーカーで実行して、部分集約を実装します。 次のメソッドを使用して、Aggregator所有者が存在するワーカーにグローバル集計を実装できます。

    1. merge() の実装

      @Override
      public void merge(KmeansAggrValue value, KmeansAggrValue partial)
          throws IOException {
          value.sums.add(partial.sums);
          value.counts.add(partial.counts);
      }

      上記の例では、mergeの実装ロジックは、各ワーカーによって集計されたsumscountの値を加算します。

    2. terminate() の実装

      @Override
      public boolean terminate(WorkerContext context, KmeansAggrValue value)
          throws IOException {
          // Calculate the new means to be the centroids (original sums)
          DenseMatrix newCentriods = calculateNewCentroids(value.sums, value.counts, value.centroids);
          // print old centroids and new centroids for debugging
          System.out.println("\nsuperstep: " + context.getSuperstep() +
                             "\nold centriod:\n" + value.centroids + " new centriod:\n" + newCentriods);
          boolean converged = isConverged(newCentriods, value.centroids, 0.05d);
          System.out.println("superstep: " + context.getSuperstep() + "/"
                             + (context.getMaxIteration() - 1) + " converged: " + converged);
          if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {
              // converged or reach max iteration, output centriods
              for (int i = 0; i < newCentriods.numRows(); i++) {
                  Writable[] centriod = new Writable[newCentriods.numColumns()];
                  for (int j = 0; j < newCentriods.numColumns(); j++) {
                      centriod[j] = new DoubleWritable(newCentriods.get(i, j));
                  }
                  context.write(centriod);
              }
              // true means to terminate iteration
              return true;
          }
          // update centriods
          value.centroids.set(newCentriods);
          // false means to continue iteration
          return false;
      }

      上記の例では、teminate() は、sumscountに基づいてcalculateNewCentroids() を呼び出して、平均値を計算し、新しい中心を取得します。 次に、isConverged() を呼び出して、新しい中心と古い中心の間のユークリッド距離に基づいて中心が収束しているかどうかを確認します。 収束または反復の数が上限に達すると、新しい中心が生成され、反復を終了するためにtrueが返されます。 それ以外の場合、センターは更新され、反復を継続するためにfalseが返されます。

  • メインメソッド

    mainメソッドは、GraphJobの構築、関連設定の構成、およびジョブの送信に使用されます。

    public static void main(String[] args) throws IOException {
        if (args.length < 2)
            printUsage();
        GraphJob job = new GraphJob();
        job.setGraphLoaderClass(KmeansReader.class);
        job.setRuntimePartitioning(false);
        job.setVertexClass(KmeansVertex.class);
        job.setAggregatorClass(KmeansAggregator.class);
        job.addInput(TableInfo.builder().tableName(args[0]).build());
        job.addOutput(TableInfo.builder().tableName(args[1]).build());
        // default max iteration is 30
        job.setMaxIteration(30);
        if (args.length >= 3)
            job.setMaxIteration(Integer.parseInt(args[2]));
        long start = System.currentTimeMillis();
        job.run();
        System.out.println("Job Finished in "
                           + (System.currentTimeMillis() - start) / 1000.0 + " seconds");
    }
    説明

    job.setRuntimePartitioningがfalseに設定されている場合、各ワーカーによって読み込まれたデータはパーティショナーに基づいてパーティション分割されません。 データは同じワーカーによってロードされ、維持されます。