すべてのプロダクト
Search
ドキュメントセンター

Microservices Engine:MapReduce

最終更新日:Jan 13, 2025

MapReduceは、SchedulerXによって開発された軽量モデルであり、バッチデータ処理を分散方式で実行します。 MapJobProcessorまたはMapReduceJobProcessorインターフェースを呼び出して、接続されたワーカーを分散コンピューティングエンジンとして使用し、大量のデータをバッチで処理できます。 MapReduceには、大量のデータのバッチ処理に使用されるHadoopやSparkなどの従来の方法に比べて、低コスト、高速、シンプルなプログラミングなどの利点があります。 MapReduceは、ビッグデータプラットフォームにデータをインポートする必要がなく、追加のストレージとコンピューティングコストをかけずに、数秒でデータを処理します。

注意事項

  • タスクのサイズは 64 KB を超えることはできません。

  • ProcessResultのresultフィールドの戻り値のサイズは 1,000 バイトを超えることはできません。

  • reduceメソッドを使用する場合、すべてのタスクの結果はマスターノードにキャッシュされます。 この場合、マスターノードのメモリ負荷が高くなります。 タスク数を少なくし、resultフィールドの戻り値を小さくすることをお勧めします。 reduceメソッドが不要な場合は、MapJobProcessorインターフェースを直接呼び出すことができます。

  • フェイルオーバーがトリガーされると、SchedulerXはタスクを複数回実行する場合があります。 この場合、タスクの冪等性を実装する必要があります。

メソッド

  • MapJobProcessor 派生クラス

    メソッド

    説明

    必須

    public ProcessResult process(JobContext context) throws Exception;

    タスクを実行するためのエントリです。 コンテキストからタスク名を取得するために使用するロジックを開発する必要があります。 このメソッドが呼び出されると、システムはProcessResultを返します。

    はい

    public ProcessResult map(List<? extends Object> taskList, String taskName);

    mapメソッドは、複数のワーカーにタスクのバッチを分散するために使用されます。 mapメソッドは複数回呼び出すことができます。 taskListが空の場合、エラーが返されます。 このメソッドが呼び出されると、システムはProcessResultを返します。

    はい

    public void kill(JobContext context);

    このメソッドは、フロントエンドでジョブが終了したときにトリガーされます。 ワークロードを中断するために使用するロジックを開発する必要があります。

    いいえ

  • MapReduceJobProcessor 派生クラス

    メソッド

    説明

    必須

    public ProcessResult process(JobContext context) throws Exception;

    タスクを実行するためのエントリです。 コンテキストからタスク名を取得するために使用するロジックを開発する必要があります。 このメソッドが呼び出されると、システムはProcessResultを返します。

    はい

    public ProcessResult map(List<? extends Object> taskList, String taskName);

    mapメソッドは、複数のワーカーにタスクのバッチを分散するために使用されます。 mapメソッドは複数回呼び出すことができます。 taskListが空の場合、エラーが返されます。 このメソッドが呼び出されると、システムはProcessResultを返します。

    はい

    public ProcessResult reduce(JobContext context);

    reduceメソッドは、すべてのワーカーノードのタスクが実行された後にコールバックされます。 reduceメソッドはマスターノードによって実行され、通常はデータの集計、ダウンストリームへの通知の送信、またはワークフローでのアップストリームアプリケーションとダウンストリームアプリケーション間のデータの受け渡しに使用されます。

    reduceメソッドは、すべてのタスクの結果を処理します。

    1. システムは、return ProcessResult(true, result)を使用して、注文 ID などのタスクの実行結果を返します。

    2. システムがreduceメソッドを呼び出すと、システムはコンテキストからすべてのタスクのステータス(context.getTaskStatuses())と実行結果(context.getTaskResults())を取得し、実行結果を処理します。

    はい

    public void kill(JobContext context);

    このメソッドは、フロントエンドでジョブが終了したときにトリガーされます。 ワークロードを中断するために使用するロジックを開発する必要があります。

    いいえ

    public boolean runReduceIfFail(JobContext context)

    タスクが失敗した場合に reduce メソッドを呼び出すかどうかを指定します。 デフォルト設定:タスクが失敗した場合、reduceメソッドが実行されます。

    いいえ

手順

  1. SchedulerX コンソールにログインします。 左側のナビゲーションペインで、[タスク管理] をクリックします。

  2. [タスク管理] ページで、[タスクの作成] をクリックします。

  3. [タスクの作成] パネルで、[実行モード] ドロップダウンリストから [mapreduce] を選択し、[詳細設定] セクションで関連パラメータを設定します。 次の表にパラメータを示します。

    パラメータ

    説明

    分散ポリシー

    説明

    エージェントバージョンは 1.10.3 以降である必要があります。

    • ポーリングスキーム:システムは、各ワーカーに同じ数のタスクを均等に分散します。 このポリシーは、各ワーカーがタスクの処理にほぼ同じ時間を必要とするシナリオに適しています。 これはデフォルト値です。

    • Workerload最適戦略:マスターノードはワーカーノードの負荷を自動的に検出します。 このポリシーは、各ワーカーがタスクの処理に必要とする時間に大きな差があるシナリオに適しています。

    単一マシン並列サブタスク数

    ワーカーの実行スレッド数です。 デフォルト値:5。実行を高速化するために、より大きな値を指定できます。 ダウンストリームまたはデータベースが指定した値に耐えられない場合は、より小さい値を指定できます。

    サブタスクの失敗再試行回数

    タスクが失敗した場合、タスクは自動的に再試行されます。 デフォルト値:0。

    サブタスクの失敗再試行間隔

    タスクが失敗した場合の2回連続の再試行の間隔です。 単位:秒。 デフォルト値:0。

    サブタスクフェイルオーバー戦略

    説明

    エージェントバージョンは 1.8.12 以降である必要があります。

    ワーカーがタスクの実行に失敗して停止した後、新しいワーカーにタスクを分散するかどうかを指定します。 スイッチをオンにすると、フェイルオーバーがトリガーされたときにシステムがタスクを複数回実行する場合があります。 タスクの冪等性を実装する必要があります。

    マスターノードが実行に参加する

    説明

    エージェントバージョンは 1.8.12 以降である必要があります。

    マスターノードがタスクの実行に参加するかどうかを指定します。 タスクを実行するには、少なくとも 2 つのワーカーが使用可能である必要があります。 非常に多数のタスクが存在する場合は、スイッチをオフにすることをお勧めします。

    サブタスク分散方法

    • プッシュモデル:システムはタスクを各ワーカーに均等に分散します。

    • プルモデル:各ワーカーは自動的にタスクをプルします。 このモデルには木桶理論は適用されません。 このモデルは、タスクをプルするための動的なスケールアップをサポートしています。 プルプロセス中に、すべてのタスクはマスターノードにキャッシュされます。 これにより、メモリに大きな負荷がかかります。 同時に 10,000 を超えるタスクを分散しないことをお勧めします。

    パラメータの詳細については、「ジョブ管理の詳細パラメータ」をご参照ください。

デモ

単一テーブルのデータ処理(連続 ID)

  1. ジョブは最小 ID と最大 ID を読み取ります。

    select min(id), max(id) from Tab1;
  2. ID の範囲に基づいてページネーションが実行されます。 各タスクには、startId フィールドと endId フィールドが含まれています。

  3. 各タスクは ID 範囲でデータを取得します。

    select * from Tab1 where id >= startId and id < endId;

例:

class PageTask {
    private long startId;
    private long endId;

    public PageTask(long startId, long endId) {
        this.startId = startId;
        this.endId = endId;
    }

    public long getStartId() {
        return startId;
    }

    public long getEndId() {
        return endId;
    }
}
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName=context.getJobParameters(); // 複数のジョブのバックエンドコードは一貫性があります。SchedulerX コンソールでジョブパラメータを設定して、テーブル名を指定します。
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); // ページ数を計算します。
            if (step > 0) {
                for (long i = minId; i < maxId; i+=step) {
                    tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
                }
            } else {
                tasks.add(new PageTask(minId, maxId));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO レコードを処理します
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}

単一テーブルのデータ処理(不連続 ID)

  1. データベースはバケッティング戦略を採用し、バケットフィールドをインデックスとして追加します。

  2. たとえば、1,024 個のバケットが存在します。 データベースにレコードの行が追加されるたびに、注文番号または ID がハッシュされます。 たとえば、注文番号 %1024 はバケットフィールドに含まれます。

  3. 基本的に、各バケットは平均化されます。 各バケットについて、次の SQL ステートメントを実行して結果の完全クエリを実行できます。

    select * from Tab1 where bucket=xxx;

例:

@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName = context.getJobParameters(); // 複数のジョブのバックエンドコードは一貫性があります。SchedulerX コンソールでジョブパラメータを設定して、テーブル名を指定します。
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            List<Integer> tasks = Lists.newArrayList();
            for (int i = 0; i< 1024; i++) {
                tasks.add(i);
            }    
            return map(tasks, "BucketTask");
        } else if (taskName.equals("BucketTask")) {
            int bucketId = (int)task;
            List<Record> records = queryRecord(tableName, bucketId);
            //TODO レコードを処理します
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private List<Record> queryRecord(String tableName, int bucketId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from #{tableName} where bucket= #{bucketId}
        return records;
    }

}

データベースシャーディングとテーブルシャーディングのデータを処理する

class PageTask {
    private String tableName;
    private long startId;
    private long endId;

    public PageTask(String tableName, long startId, long endId) {
        this.tableName = tableName;
        this.startId = startId;
        this.endId = endId;
    }

    public String getTableName() {
        return tableName;
    }

    public long getStartId() {
        return startId;
    }

    public long getEndId() {
        return endId;
    }
}
@Component
public class ScanShardingTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            // データベースシャーディングを実行します。
            List<String> dbList = getDbList();
            return map(dbList, "DbTask");
        } else if (taskName.equals("DbTask")) {
            // データベースシャーディング結果に基づいてテーブルシャーディングを実行します。
            String dbName = (String)task;
            List<String> tableList = getTableList(dbName);
            return map(tableList, "TableTask");
        } else if (taskName.equals("TableTask")) {
            // テーブルシャードが大きい場合はページネーションを実行します。
            String tableName = (String)task;
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); // ページ数を計算します。
            if (step > 0) {
                for (long i = minId; i < maxId; i+=step) {
                    tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));
                }
            } else {
                tasks.add(new PageTask(tableName, minId, maxId));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            String tableName = pageTask.getTableName();
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO レコードを処理します
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private List<String> getDbList() {
        List<String> dbList = Lists.newArrayList();
        //TODO データベースシャードのリストを返します。
        return dbList;
    }

    private List<String> getTableList(String dbName) {
        List<String> tableList = Lists.newArrayList();
        // TODO テーブルシャードのリストを返します。
        return tableList;
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}

50 メッセージを処理し、reduce メソッドを呼び出して結果を返す

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum=50;
        if (isRootTask(context)) {
            System.out.println("ルートタスクを開始します"); //ルートタスクを開始します
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            System.out.println(task);
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }

}

50 メッセージを処理し、reduce メソッドを呼び出してすべてのタスクの実行結果を集計する

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum = 50;
        if (context.getJobParameters() != null) {
            dispatchNum = Integer.valueOf(context.getJobParameters());
        }
        if (isRootTask(context)) {
            System.out.println("ルートタスクを開始します"); //ルートタスクを開始します
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            Thread.sleep(2000);
            return new ProcessResult(true, task);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
            System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());  // taskId: + result.getKey() + , result: + result.getValue()
        }
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }
}