このトピックでは、2 つの典型的なシナリオを通して、ノードをまたぐパラメーターの使用方法を紹介します。
シナリオ
ノードをまたぐパラメーターには、2 つの典型的なアプリケーション シナリオがあります。
金融企業は、日次スケジュール タスクの為替変換を行い、日次投資収益を米ドルで決済する必要があります。日次終値の為替レートは業務システムから抽出され、オフライン テーブルに同期されます。複数の日次スケジュール タスクに為替変換が含まれており、データ タイムスタンプに対応する為替レートが必要です。出力ノード タスクとして為替レートを読み取るプログラムを作成し、ノードをまたぐパラメーターとして為替レートを出力できます。為替レートを必要とするノード タスクは、このノードをまたぐパラメーターを入力ノードとして参照できます。
小売流通企業は、毎日サプライヤーのサービスに問い合わせて、製品カタログが変更されたかどうかを確認する必要があります。変更が検出された場合は、最新のカタログ データをすべてプルする必要があります (データ量が多いためコストがかかります)。変更が検出されない場合は、以前にプルされたデータが引き続き使用されます。出力ノード タスクで製品カタログが変更されたかどうかを検出するプログラムを作成し、製品カタログの更新ステータスをノードをまたぐパラメーターとして出力できます。同時に、製品カタログをプルおよび同期するノードを入力ノードとして設定し、ノードをまたぐパラメーターの値に基づいてスケジュールできます (条件付きスケジューリング)。
例 1: 為替レート変換
データ構造
為替レート テーブル exchange_rate_table。テーブル スキーマとサンプル データは次のとおりです。


出力ノード
MAX_COMPUTE_SQL タスク
get_exchange_rateを作成し、ノードをまたぐ出力パラメーターusd2cnyを定義します。
タスク エディターで、右クリックして [ノードをまたぐパラメーターの設定] を選択します。

タスク エディターは、定義されたノードをまたぐ出力パラメーターを自動的に表示します。

フィールドのエイリアスをノードをまたぐ出力パラメーターの名前に設定します。システムは、クエリ結果の最初の行にある対応するフィールドの値を、ノードをまたぐパラメーターに自動的に割り当てます。
説明SQL タスクの場合、複数の文がノードをまたぐパラメーターを出力する場合、各文の前に
set文を省略することはできません。この例では、2 つの SQL 記述方法があります。
ノードをまたぐパラメーターごとに 1 つの個別の SQL。

単一の文を使用する。

タスクを送信します。
入力ノード
次のサンプル コードを使用して、SQL タスク
exchange_usd_to_cnyを作成します。同時に、出力ノードget_exchange_rateをアップストリーム ノードとして追加します。-- 10000 USD を CNY、JPY、EUR、AUD、HKD に変換します select 10000 * ${usd2cny_rate}, 10000 * ${usd2jpy_rate}, 10000 * ${usd2eur_rate}, 10000 * ${usd2aud_rate}, 10000 * ${usd2hkd_rate};
識別された変数パラメーターのタイプを [ノードをまたぐ変数] に変更します。

各ノードをまたぐ変数のパラメーター値で、
get_exchange_rateの対応するノードをまたぐ出力パラメーターを選択します。
データ バックフィル
テーブルで現在の日付の為替レートを確認します。

オペレーション センターに移動し、
get_exchange_rateノードの [データ バックフィル] をクリックし、[現在およびアップストリーム/ダウンストリーム タスクのバックフィル] を選択し、ダウンストリームのexchange_usd_to_cnyを選択して、データを一緒にバックフィルします。説明入力ノードのデータをバックフィルする場合、出力ノードもバックフィルする必要があります。そうしないと、入力ノードはノードをまたぐパラメーターのデフォルト値を使用します。

exchange_usd_to_cnyのランタイム ログは次のとおりです。システムがテーブルからデータを読み取り、ノードをまたぐ出力パラメーターに割り当て、ダウンストリーム タスクに渡していることがわかります。
例 2: 更新ステータスの確認
出力ノード
更新ステータスを検出するためのアナログ Python タスクを作成し、ノードをまたぐ出力パラメーター
update_statusを追加します。
コードを入力し、ランダム関数を使用してステータスを返します。右クリックし、[ノードをまたぐパラメーターの設定] を選択して、タスクを送信します。


from random import randint def check_update(): return randint(0, 1) setv("update_status", check_update()) # ノードをまたぐパラメータ update_status に値を設定
入力ノード
オフライン パイプライン タスク
imp_product_catalogを作成し、check_updateをアップストリーム タスクとして追加します。
imp_product_catalogタスクの条件付きスケジューリングを有効にします。
条件付きスケジューリングで、ドライラン スケジューリングの条件
ノードをまたぐパラメーター-check_update.update_status = 0(更新なしを意味する) を追加します。この条件が満たされない場合、デフォルトの条件がヒットします (通常のスケジューリング)。


異なるノードをまたぐパラメーター値を入力して、スケジューリング プランをプレビューします。



