オープンソースの XXL-JOB に基づく自己管理型のジョブスケジューリングシステムを使用するアプリケーションは、複雑なジョブ構成、低い実行効率、困難なモニタリングと管理などの課題に直面する可能性があります。Alibaba Cloud は、ジョブスケジューリングやジョブシャーディングなどの機能をサポートするオープンソースソリューションを提供しています。このソリューションは、自己管理型のスケジュールされたジョブを分散タスクスケジューリングプラットフォームに迅速に接続するのに役立ちます。
前提条件
RAM ユーザーは XXL-JOB に必要な権限を持っています。詳細については、「XXL-JOB の権限付与」をご参照ください。
XXL-JOB インスタンスが作成されています。詳細については、「インスタンスの作成」をご参照ください。
ソリューションの概要
このガイドでは、Java を使用して XXL-JOB のスケジュールされたジョブをゼロからビルドする方法を説明します。Docker イメージをビルドし、Alibaba Cloud イメージリポジトリにアップロードし、XXL-JOB 用の分散タスクスケジューリングプラットフォームに接続します。その後、スタンドアロンモードとシャーディングブロードキャストモードでスケジュールされたジョブをテストします。このプロセスは、XXL-JOB の使用方法と構成方法を理解するのに役立ちます。このプロセスには、次のステップが含まれます:
アプリケーションの作成: スケジュールされたジョブを一元管理するためのアプリケーションを作成します。これにより、ジョブの表示、構成、スケジューリングが容易になり、管理効率が向上します。
アプリケーションの開発とデプロイ: スケジュールされたジョブのコードを記述し、Docker イメージをビルドし、そのイメージを Alibaba Cloud イメージリポジトリにアップロードします。このプロセスにより、アプリケーションがコンテナー化され、管理とデプロイが容易になります。
テストと検証: アプリケーションをテストして、XXL-JOB プラットフォームで自動的にスケジュールおよび管理できることを確認します。ジョブが正しく、スケジュールどおりに実行されることを検証します。
手順 1: アプリケーションを作成する
MSE XXL-JOB コンソールにログインし、上部のナビゲーションバーでリージョンを選択します。
ターゲットインスタンスをクリックして詳細ページを開きます。左側のナビゲーションウィンドウで、 を選択します。[アプリケーションの作成] をクリックします。[AppName] と [名前] を入力します。システムが生成した [AccessToken] を使用します。[OK] をクリックします。
手順 2: アプリケーションを開発してデプロイする
1. XXL-JOB ジョブを開発する
分散タスクスケジューリングプラットフォームの XXL-JOB エディションは、Java、Go、および Python アプリケーションをサポートしています。詳細については、オープンソースの XXL-JOB の次のデモプロジェクトをご参照ください:
Java バージョン: xxl-job-executor-sample-springboot。
Go バージョン: xxl-job-executor-go。
Python バージョン: xxl-job-executor-python。
Java
環境の構成: pom.xml ファイルに、
xxl-job-coreの Maven 依存関係を追加します。特定のバージョンについては、「xxl-job-executor-sample-springboot」をご参照ください。<!-- xxl-job-core --> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.2.x</version> </dependency>エグゼキューターを初期化します。
@Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }ジョブ実行コードを記述します。この例ではバージョン 2.2.x を使用します。
説明XXL-JOB API はバージョンによって異なります。詳細については、オープンソースのデモプロジェクトをご参照ください。
@Component public class SampleXxlJob { private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class); @XxlJob("helloworld") public ReturnT<String> helloworld(String param) throws Exception { XxlJobLogger.log("XXL-JOB, Hello World, start..."); // XXL-JOB、Hello World、開始... for (int i = 0; i < 5; i++) { XxlJobLogger.log("beat at:" + i); // beat at: + i TimeUnit.SECONDS.sleep(2); } System.out.println("XXL-JOB, Hello World, finished"); // XXL-JOB、Hello World、完了 return ReturnT.SUCCESS; } }
Golang
環境の構成: 次のコマンドを実行して、最新のタグを使用して Go バージョンの XXL-JOB ソフトウェア開発キット (SDK) をプルします。特定のバージョンについては、「xxl-job-executor-go」をご参照ください。
go get github.com/xxl-job/xxl-job-executor-go@{latest_tag}エグゼキュータ初期化コードを記述します。
package main import ( "context" "fmt" xxl "github.com/xxl-job/xxl-job-executor-go" "github.com/xxl-job/xxl-job-executor-go/example/task" "log" ) func main() { exec := xxl.NewExecutor( xxl.ServerAddr("xxxxxx"), // リクエストアドレス。コンソールの接続構成から取得します。 xxl.AccessToken("xxxxxxx"), // リクエストトークン。コンソールの接続構成から取得します。 xxl.ExecutorPort("9999"), // デフォルト: 9999。このパラメーターはオプションです。 xxl.RegistryKey("golang-jobs"), // エグゼキュータの名前。 xxl.SetLogger(&logger{}), // カスタムロガー。 ) exec.Init() exec.Use(customMiddleware) // ログ表示ハンドラを設定します。 exec.LogHandler(customLogHandle) // ジョブハンドラを登録します。 exec.RegTask("task.test", task.Test) exec.RegTask("task.shardingTest", task.ShardingTest) log.Fatal(exec.Run()) } // カスタムログハンドラ。 func customLogHandle(req *xxl.LogReq) *xxl.LogRes { return &xxl.LogRes{Code: xxl.SuccessCode, Msg: "", Content: xxl.LogResContent{ FromLineNum: req.FromLineNum, ToLineNum: 2, LogContent: "This is a custom log handler", IsEnd: true, }} } // xxl.Logger インターフェイスの実装。 type logger struct{} func (l *logger) Info(format string, a ...interface{}) { fmt.Println(fmt.Sprintf("Custom log - "+format, a...)) } func (l *logger) Error(format string, a ...interface{}) { log.Println(fmt.Sprintf("Custom log - "+format, a...)) } // カスタムミドルウェア。 func customMiddleware(tf xxl.TaskFunc) xxl.TaskFunc { return func(cxt context.Context, param *xxl.RunReq) string { log.Println("I am a middleware start") res := tf(cxt, param) log.Println("I am a middleware end") return res } }ジョブ実行コードを記述します。
package task import ( "context" "fmt" xxl "github.com/xxl-job/xxl-job-executor-go" ) func Test(cxt context.Context, param *xxl.RunReq) (msg string) { fmt.Println("test one task" + param.ExecutorHandler + " param: " + param.ExecutorParams + " log_id:" + xxl.Int64ToStr(param.LogID)) return "test done" } func ShardingTest(cxt context.Context, param *xxl.RunReq) (msg string) { fmt.Println("shardingId:" + xxl.Int64ToStr(param.BroadcastIndex) + ", shardingTotal:" + xxl.Int64ToStr(param.BroadcastTotal)) return "ShardingTest done" }
Python
依存関係をプルします。特定のバージョンについては、「xxl-job-executor-python」をご参照ください。
pip install pyxxl # ログを Redis に書き込む必要がある場合 pip install "pyxxl[redis]" # .env から構成をロードする場合 pip install "pyxxl[dotenv]" # すべての機能をインストール pip install "pyxxl[all]"ジョブ実行コードを記述します。
import asyncio import time from pyxxl import ExecutorConfig, PyxxlRunner from pyxxl.ctx import g config = ExecutorConfig( xxl_admin_baseurl="http://xxljob-1b3fd81****.schedulerx.mse.aliyuncs.com/api/", executor_app_name="xueren-test", access_token="default_token", # xxl-admin がエグゼキュータの IP に直接接続できる場合、executor_listen_host を入力する必要はありません ) app = PyxxlRunner(config) @app.register(name="demoJobHandler") async def test_task(): # "g" を使用してジョブパラメーターを取得できます g.logger.info("get executor params: %s" % g.xxl_run_data.executorParams) for i in range(10): g.logger.warning("test logger %s" % i) await asyncio.sleep(5) return "Success..." @app.register(name="sync_func") def test_task4(): # xxl-admin で実行ログを表示するには、g.logger を使用して出力する必要があります。デフォルトでは、INFO レベル以上のログのみが出力されます。 n = 1 g.logger.info("Job %s get executor params: %s" % (g.xxl_run_data.jobId, g.xxl_run_data.executorParams)) # 同期ジョブにループが含まれている場合は、キャンセル操作をサポートするために、各反復で g.cancel_event をチェックする必要があります。 while n <= 10 and not g.cancel_event.is_set(): # xxl-admin からログを表示する必要がない場合は、独自のロガーを使用できます。 g.logger.info( "log to {} logger test_task4.{},params:{}".format( g.xxl_run_data.jobId, n, g.xxl_run_data.executorParams, ) ) time.sleep(2) n += 1 return "Success 3" if __name__ == "__main__": app.run_executor()
2. アプリケーションを Alibaba Cloud にデプロイする
Alibaba Cloud XXL-JOB エディションは、Alibaba Cloud ネットワークのみをサポートします。アプリケーションを Alibaba Cloud にデプロイする必要があります。次の例では、Java アプリケーションを Container Service for Kubernetes (ACK) にデプロイする方法を示します。
ACK クラスターは、XXL-JOB クラスターと同じ Virtual Private Cloud (VPC) 内にある必要があります。
Spring Boot アプリケーションのルートディレクトリに、Dockerfile ファイルを作成します。
# 以下を独自のベースイメージに置き換えます FROM reg.docker.alibaba-inc.com/xxx/xxxx-java:1.0-beta MAINTAINER xueren ENV JAVA_OPTS="" ADD target/xxl-job-executor-sample-springboot-*.jar /app.jar ENTRYPOINT ["sh","-c","java -jar $JAVA_OPTS /app.jar]Docker を使用して Docker イメージをビルドし、Alibaba Cloud Container Registry にアップロードします。
docker login --username=xxx@aliyun.com registry.cn-hangzhou.aliyuncs.com --password=xxxxxx docker buildx build --platform linux/amd64 -t registry.cn-hangzhou.aliyuncs.com/schedulerx/xxljob-demo:2.2.0 . docker push registry.cn-hangzhou.aliyuncs.com/schedulerx/xxljob-demo:2.2.0左側のナビゲーションウィンドウで、 ページに移動します。ターゲットアプリケーションの **[アクション]** 列で、[接続構成] をクリックします。
ACK コンソールにログインし、ターゲットクラスターに移動します。左側のナビゲーションウィンドウで、 を選択します。右上隅にある [YAML から作成] をクリックしてデプロイメントを作成します。この例では、接続構成に接続タイプ 2 を使用します。これにより、`-D` パラメーターを使用してアプリケーションが再起動されます。YAML ファイルの JAVA_OPTS の値を置き換えて、Java 仮想マシン (JVM) パラメーターを環境変数に挿入します。

apiVersion: apps/v1 kind: Deployment metadata: name: xxljob-xueren-test labels: app: xxljob-xueren-test spec: replicas: 2 selector: matchLabels: app: xxljob-xueren-test template: metadata: labels: app: xxljob-xueren-test spec: containers: - name: xxljob-executor image: registry.cn-hangzhou.aliyuncs.com/schedulerx/xxljob-demo:2.2.0 ports: - containerPort: 9999 env: - name: JAVA_OPTS value: >- -Dxxl.job.admin.addresses=http://xxljob-xxxxx.schedulerx.mse.aliyuncs.com -Dxxl.job.executor.appname=xueren_test -Dxxl.job.accessToken=xxxxxxx
ステップ 3: テストと検証
1. エグゼキュータ接続の検証
ターゲットインスタンスの詳細ページに移動します。左側のナビゲーションウィンドウで、[アプリケーション管理] をクリックします。アプリケーションリストで、ターゲットアプリケーションの **[エグゼキュータ]** 列の番号をクリックします。接続されているエグゼキュータのアドレスとオンラインステータスを表示できます。

2. ジョブのテストと検証
スタンドアロンジョブのテスト
スタンドアロンジョブとは、各実行において、ルーティングポリシーに基づいてアプリケーション内のすべての利用可能なエグゼキュータから 1 つのエグゼキュータが選択され、ジョブを実行することを意味します。
左側のナビゲーションウィンドウで、[ジョブ管理] を選択し、[ジョブの作成] をクリックします。基本設定を構成します。[ジョブ名] と [JobHandler 名] を入力します。[関連アプリケーション] で、ターゲットアプリケーションを選択します。[ルーティングポリシー] で、[ポーリング] を選択します。次に、[次へ] をクリックします。

[スケジュール構成] を構成します。[時間タイプ] を cron に設定します。[生成ツールを使用] ボタンをクリックして [Cron 式] を生成します。この例では、ジョブを毎日 12:00 に 1 回実行します。次に、[次へ] をクリックします。

[通知設定] を構成します。タイムアウトアラート、成功通知、失敗アラート、通知方法、および通知オブジェクトを構成できます。この例では、デフォルトのコンソール構成を使用します。

ジョブが作成されたら、ターゲットジョブを見つけ、**[アクション]** 列の [一度実行] をクリックします。手動実行ダイアログボックスで、[マシン] を指定し、[インスタンスパラメーター] を構成して、[OK] をクリックします。

をクリックして、ジョブ実行レコードを表示します。
左側のナビゲーションウィンドウで、[実行リスト] をクリックします。ターゲットの実行レコードを見つけ、**[アクション]** 列の [ログ] をクリックして、このジョブの実行ログを表示します。

シャーディングブロードキャストジョブのテスト
シャーディングブロードキャストとは、各実行がアプリケーション内のすべてのエグゼキュータにブロードキャストされることを意味します。各エグゼキュータは異なるシャード番号を受け取り、これを使用して分散バッチ処理を行うことができます。オープンソースの XXL-JOB のシャーディングブロードキャストは集約をサポートしていません。ただし、Alibaba Cloud XXL-JOB エディションは、各実行のすべてのシャードの実行ステータスを集約して表示できます。
左側のナビゲーションウィンドウで、[ジョブ管理] を選択し、[ジョブの作成] をクリックします。基本設定を構成します。[ジョブ名] と [JobHandler 名] を入力します。[関連アプリケーション] で、ターゲットアプリケーションを選択します。[ルーティングポリシー] で、[シャーディングブロードキャスト] を選択します。次に、[次へ] をクリックします。

[スケジュール構成] を構成します。[時間タイプ] を cron に設定します。[生成ツールを使用] ボタンをクリックして [Cron 式] を生成します。この例では、ジョブを毎時 10 分に実行します。次に、[次へ] をクリックします。

[通知設定] を構成します。タイムアウトアラート、成功通知、失敗アラート、通知方法、および通知オブジェクトを構成できます。この例では、デフォルトのコンソール構成を使用します。

ジョブが作成されたら、ターゲットジョブを見つけ、**[アクション]** 列の [一度実行] をクリックします。手動実行ダイアログボックスで、[マシン] を指定し、[インスタンスパラメーター] を構成して、[OK] をクリックします。

をクリックして、ジョブ実行レコードを表示します。
左側のナビゲーションウィンドウで、[実行リスト] をクリックします。ジョブ実行リストで、ターゲットジョブの実行を見つけ、**[アクション]** 列の [詳細] をクリックします。[シャード詳細] セクションで、各マシンでの実行ステータスの集約されたサマリーを表示できます。

各シャードについて、**[アクション]** 列の [ログ] をクリックして、そのジョブの実行ログを表示します。
