このトピックでは、Aggregatorの実装メカニズムと関連するAPI操作について説明します。 また、k-meansクラスタリングを使用してAggregatorを適用する方法についても説明します。
Aggregatorは、MaxCompute Graphジョブの一般的な機能です。 機械学習の問題を処理するのに最も適しています。 MaxCompute Graphでは、Aggregatorを使用してグローバル情報を集計および処理します。
実装メカニズム
Aggregatorのロジックは2つの部分に分かれています。
1つの部分は、分散モードですべてのワーカーに実装されます。
他の部分は、Aggregatorの所有者が単一頂点モードにあるワーカーにのみ実装されます。
初期値が作成され、これらの値の一部が各ワーカーに集計されます。 次に、すべてのワーカーの部分的な集計結果が、Aggregatorの所有者が存在するワーカーに送信されます。 次に、このワーカーは、受け取った部分集約オブジェクトをグローバル集約結果に集約し、反復を終了するかどうかを判断します。 グローバル集計結果は、反復のために次のスーパーステップですべてのワーカーに配布されます。 
Aggregatorのプロセス:
各ワーカーが起動すると、createStartupValueを実行してAggregatorValueを作成します。
各イテレーションの開始前に、各ワーカーはcreateInitialValueを実行して、イテレーションのAggregatorValueを初期化します。
イテレーションでは、各頂点が
context.aggregate()を使用してaggregate()を呼び出し、ワーカーで部分イテレーションを実装します。各ワーカーは、部分的な反復結果を、Aggregatorの所有者が存在するワーカーに送信します。
Aggregatorの所有者が存在するワーカーは、
mergeを複数回実行してグローバル集計を実装します。Aggregatorの所有者が存在するワーカーは、グローバル集計結果を処理するために
terminateを実行し、反復を終了するかどうかを決定します。
API 操作
Aggregatorは5つのAPI操作を提供します。 このセクションでは、これらのAPI操作を使用するタイミングと方法について説明します。
createStartupValue (コンテキスト)
このAPI操作は、各スーパーステップが開始される前にすべてのワーカーで1回実行されます。
AggregatorValueを初期化するために使用されます。 最初のスーパーステップ (スーパーステップ0) では、WorkerContext.getLastAggregatedValue()またはComputeContext.getLastAggregatedValue()を呼び出して、初期化されたAggregatorValueオブジェクトを取得します。createInitialValue (コンテキスト)
このAPI操作は、各スーパーステップの開始時にすべてのワーカーで1回実行されます。 現在のイテレーションの
AggregatorValueを初期化するために使用されます。 ほとんどの場合、WorkerContext.getLastAggregatedValue()が呼び出されて、前の反復の結果が取得されます。 次に、部分初期化が実施される。集計 (値、アイテム)
このAPI操作は、すべてのワーカーに対しても実行されます。 これは、
ComputeContext#aggregate(item)への明示的な呼び出しによってトリガーされますが、前の2つのAPI操作はフレームワークによって自動的に呼び出されます。 このAPI操作は、部分集約を実装するために使用されます。 最初のパラメーター値は、現在のスーパーステップでのワーカーの集計結果を示します。 初期値は、createInitialValueによって返されるオブジェクトです。 2番目のパラメーターは、コードを使用してComputeContext#aggregate(item)が呼び出されたときに渡されます。 このAPI操作では、ほとんどの場合、itemを使用して集計の値を更新します。 すべてのaggregate操作が実行された後、取得された値がワーカーの部分集計結果になります。 次に、結果は、フレームワークによって、アグリゲータの所有者が存在するワーカーに送信される。merge(value, partial)
このAPI操作は、Aggregatorの所有者が存在するワーカーで実行されます。 ワーカーの部分集計結果をマージしてグローバル集計オブジェクトを取得するために使用されます。
aggregateと同様に、このAPI操作のvalueは集計結果を示し、partialは集計するオブジェクトを示します。partialはvalueの更新に使用されます。例えば、3つのワーカーw0, w1, w2は、部分集約結果p0, p1, p2を生成する。 p1、p0、およびp2が、Aggregatorの所有者が存在するワーカーに順番に送信される場合、
マージ操作は次の順序で実行されます。まず、
merge(p1, p0)を実行し、p1とp0をp1として集約する。merge(p1, p2)を実行してp1とp2をp1として集約する。 p1は、このスーパーステップにおけるグローバル集計結果である。
したがって、ワーカーが1つしか存在しない場合、
mergeメソッドは必要ありません。 この場合、merge()は呼び出されません。terminate (コンテキスト、値)
Aggregatorの所有者が存在するワーカーが
merge()を実行した後、フレームワークはterminate(context, value)を呼び出して最終処理を実行します。 2番目のパラメーター値は、merge()を呼び出して取得したグローバル集計結果を示します。 グローバル集計結果は、このAPI操作でさらに変更できます。terminate()が実行されると、フレームワークは次のスーパーステップのためにグローバル集計オブジェクトをすべてのワーカーに配布します。terminate()にtrueが返された場合、ジョブ全体のイテレーションが終了します。 そうでない場合、反復は継続します。 収束完了後にtrueが返された場合は、直ちにジョブを終了します。 これは機械学習シナリオに適用されます。
K-meansクラスタリングの例
このセクションでは、例としてk-meansクラスタリングを使用して、Aggregatorの使用方法を示します。
完全なコードが必要な場合は、「Kmeans」をご参照ください。 このセクションでは、コードは解析されており、参照用です。
GraphLoader
GraphLoaderは、入力テーブルを読み込み、それをグラフの頂点またはエッジに変換するために使用されます。 この例では、入力テーブルのデータの各行はサンプルであり、各サンプルは頂点を構成し、頂点値はサンプルを格納するために使用されます。
書き込み可能なクラス
KmeansValueは、頂点の値型として定義されます。public static class KmeansValue implements Writable { DenseVector sample; public KmeansValue() { } public KmeansValue(DenseVector v) { this.sample = v; } @Override public void write(DataOutput out) throws IOException { wirteForDenseVector(out, sample); } @Override public void readFields(DataInput in) throws IOException { sample = readFieldsForDenseVector(in); } }DenseVectorオブジェクトは、サンプルを格納するためにKmeansValueにカプセル化されます。DenseVector型は、matrix-toolkits-javaに由来します。wirteForDenseVector()とreadFieldsForDenseVector()は、シリアル化と逆シリアル化に使用されます。カスタム
KmeansReaderコード:public static class KmeansReader extends GraphLoader<LongWritable, KmeansValue, NullWritable, NullWritable> { @Override public void load( LongWritable recordNum, WritableRecord record, MutationContext<LongWritable, KmeansValue, NullWritable, NullWritable> context) throws IOException { KmeansVertex v = new KmeansVertex(); v.setId(recordNum); int n = record.size(); DenseVector dv = new DenseVector(n); for (int i = 0; i < n; i++) { dv.set(i, ((DoubleWritable)record.get(i)).get()); } v.setValue(new KmeansValue(dv)); context.addVertexRequest(v); } }KmeansReaderでは、データの各行 (レコード) を読み取るときに頂点が作成されます。recordNumが頂点IDとして使用され、recordコンテンツがDenseVectorオブジェクトに変換され、VertexValueにカプセル化されます。Vertex
カスタム
KmeansVertexコード: 上記のコードのロジックは、反復ごとに維持されるサンプルの部分集計を実装することです。 ロジックの詳細については、次のセクションのAggregatorの実装を参照してください。public static class KmeansVertex extends Vertex<LongWritable, KmeansValue, NullWritable, NullWritable> { @Override public void compute( ComputeContext<LongWritable, KmeansValue, NullWritable, NullWritable> context, Iterable<NullWritable> messages) throws IOException { context.aggregate(getValue()); } }Aggregator
k-meansの主なロジックはAggregatorに集中しています。 カスタム
KmeansAggrValueは、集約および配布するコンテンツを維持するために使用されます。public static class KmeansAggrValue implements Writable { DenseMatrix centroids; DenseMatrix sums; // used to recalculate new centroids DenseVector counts; // used to recalculate new centroids @Override public void write(DataOutput out) throws IOException { wirteForDenseDenseMatrix(out, centroids); wirteForDenseDenseMatrix(out, sums); wirteForDenseVector(out, counts); } @Override public void readFields(DataInput in) throws IOException { centroids = readFieldsForDenseMatrix(in); sums = readFieldsForDenseMatrix(in); counts = readFieldsForDenseVector(in); } }上記のコードでは、
KmeansAggrValueに3つのオブジェクトが保持されています。centroids: 既存のKセンターを示します。 サンプルがm次元である場合、重心はK × mの行列である。sums:centroidsと同じサイズの行列を示します。 各要素は、特定の中心に最も近いサンプルの特定の寸法の合計を記録する。 例えば、sums(i,j)は、中心iに最も近いサンプルの次元jの合計を示す。countsはK次元のベクトルです。 それは、各中心に最も近いサンプルの数を記録する。countsは、集約されるメインコンテンツである新しい中心を計算するためにsumsとともに使用されます。
KmeansAggregatorは、カスタムAggregator実装クラスです。 次のセクションでは、上記のAPI操作の実装について説明します。createStartupValue()の実装public static class KmeansAggregator extends Aggregator<KmeansAggrValue> { public KmeansAggrValue createStartupValue(WorkerContext context) throws IOException { KmeansAggrValue av = new KmeansAggrValue(); byte[] centers = context.readCacheFile("centers"); String lines[] = new String(centers).split("\n"); int rows = lines.length; int cols = lines[0].split(",").length; // assumption rows >= 1 av.centroids = new DenseMatrix(rows, cols); av.sums = new DenseMatrix(rows, cols); av.sums.zero(); av.counts = new DenseVector(rows); av.counts.zero(); for (int i = 0; i < lines.length; i++) { String[] ss = lines[i].split(","); for (int j = 0; j < ss.length; j++) { av.centroids.set(i, j, Double.valueOf(ss[j])); } } return av; } }このメソッドは、
KmeansAggrValueオブジェクトを初期化し、centersファイルから初期中心を読み取り、centroidsに値を割り当てます。sumとcountの初期値は0です。createInitialValue()の実装@Override public KmeansAggrValue createInitialValue(WorkerContext context) throws IOException { KmeansAggrValue av = (KmeansAggrValue)context.getLastAggregatedValue(0); // reset for next iteration av.sums.zero(); av.counts.zero(); return av; }このメソッドは、最初に前の反復の
KmeansAggrValueを取得し、sumおよびcountの値をクリアします。 前回の反復の重心値のみが保持されます。aggregate()の実装@Override public void aggregate(KmeansAggrValue value, Object item) throws IOException { DenseVector sample = ((KmeansValue)item).sample; // find the nearest centroid int min = findNearestCentroid(value.centroids, sample); // update sum and count for (int i = 0; i < sample.size(); i ++) { value.sums.add(min, i, sample.get(i)); } value.counts.add(min, 1.0d); }このメソッドは、
findNearestCentroid()を呼び出して、サンプルitemに最も近い中心のインデックスを見つけ、sumsを使用してすべてのディメンションを合計し、countの値を1だけ増分します。
上記の3つの方法をすべてのワーカーで実行して、部分集約を実装します。 次のメソッドを使用して、Aggregator所有者が存在するワーカーにグローバル集計を実装できます。
merge()の実装@Override public void merge(KmeansAggrValue value, KmeansAggrValue partial) throws IOException { value.sums.add(partial.sums); value.counts.add(partial.counts); }上記の例では、
mergeの実装ロジックは、各ワーカーによって集計されたsumsとcountの値を加算します。terminate()の実装@Override public boolean terminate(WorkerContext context, KmeansAggrValue value) throws IOException { // Calculate the new means to be the centroids (original sums) DenseMatrix newCentriods = calculateNewCentroids(value.sums, value.counts, value.centroids); // print old centroids and new centroids for debugging System.out.println("\nsuperstep: " + context.getSuperstep() + "\nold centriod:\n" + value.centroids + " new centriod:\n" + newCentriods); boolean converged = isConverged(newCentriods, value.centroids, 0.05d); System.out.println("superstep: " + context.getSuperstep() + "/" + (context.getMaxIteration() - 1) + " converged: " + converged); if (converged || context.getSuperstep() == context.getMaxIteration() - 1) { // converged or reach max iteration, output centriods for (int i = 0; i < newCentriods.numRows(); i++) { Writable[] centriod = new Writable[newCentriods.numColumns()]; for (int j = 0; j < newCentriods.numColumns(); j++) { centriod[j] = new DoubleWritable(newCentriods.get(i, j)); } context.write(centriod); } // true means to terminate iteration return true; } // update centriods value.centroids.set(newCentriods); // false means to continue iteration return false; }上記の例では、
teminate()は、sumsとcountに基づいてcalculateNewCentroids()を呼び出して、平均値を計算し、新しい中心を取得します。 次に、isConverged()を呼び出して、新しい中心と古い中心の間のユークリッド距離に基づいて中心が収束しているかどうかを確認します。 収束または反復の数が上限に達すると、新しい中心が生成され、反復を終了するためにtrueが返されます。 それ以外の場合、センターは更新され、反復を継続するためにfalseが返されます。
メインメソッドmainメソッドは、GraphJobの構築、関連設定の構成、およびジョブの送信に使用されます。public static void main(String[] args) throws IOException { if (args.length < 2) printUsage(); GraphJob job = new GraphJob(); job.setGraphLoaderClass(KmeansReader.class); job.setRuntimePartitioning(false); job.setVertexClass(KmeansVertex.class); job.setAggregatorClass(KmeansAggregator.class); job.addInput(TableInfo.builder().tableName(args[0]).build()); job.addOutput(TableInfo.builder().tableName(args[1]).build()); // default max iteration is 30 job.setMaxIteration(30); if (args.length >= 3) job.setMaxIteration(Integer.parseInt(args[2])); long start = System.currentTimeMillis(); job.run(); System.out.println("Job Finished in " + (System.currentTimeMillis() - start) / 1000.0 + " seconds"); }説明job.setRuntimePartitioningがfalseに設定されている場合、各ワーカーによって読み込まれたデータはパーティショナーに基づいてパーティション分割されません。 データは同じワーカーによってロードされ、維持されます。