このトピックでは、パラメーターの説明や関数の例など、リソース関数の構文規則について説明します。

関数

データ型 機能 説明
Local res_local 現在のデータ変換タスクの詳細パラメーターの情報を取得します。
ApsaraDB for RDS res_rds_mysql ApsaraDB RDS for MySQL から指定されたデータベーステーブルのデータを取得します。 データは定期的に更新することができます。
Log Service res_log_logstore_pull ログストアの変換中に、別のログストアからデータをプルします。 データをプルし、継続的にテーブルに保持することができます。
OSS res_oss_file 指定されたバケット内のオブジェクトのコンテンツを OSS から取得します。

res_local

  • 関数の構文
    res_local(param, default=None, type="auto")
  • 関数のパラメーター
    パラメーター データ型 必須/任意 説明
    param String 必須 データ変換タスクの詳細パラメーターの名前です。
    default Arbitrary type 任意 詳細パラメーターが存在しない場合のデフォルト値です。 デフォルト値は None です。
    type String 任意 出力データのフォーマットです。 設定可能な値は以下のとおりです。
    • auto:デフォルト値です。 生データは JSON 文字列に変換されます。 変換が失敗した場合は、データは生文字列として返されます。
    • JSON:生データは JSON 文字列に変換されます。 変換が失敗した場合は、default パラメーターの値が返されます。
    • raw:生文字列が返されます。
  • レスポンス

    対応するパラメーター設定に応じて、JSON また は生文字列が返されます。

  • JSON 文字列への変換
    • 下表に、変換の成功例を示します。
      生文字列 戻り値 戻り値のデータ型
      1 1 Integer
      1.2 1.2 Float
      true True Boolean
      false False Boolean
      "123" 123 String
      null None None
      ["v1", "v2", "v3"] ["v1", "v2", "v3"] List
      ["v1", 3, 4.0] ["v1", 3, 4.0] List
      {"v1": 100, "v2": "good"} {"v1": 100, "v2": "good"} List
      {"v1": {"v11": 100, "v2": 200}, "v3": "good"} {"v1": {"v11": 100, "v2": 200}, "v3": "good"} List
    • 次の表は、失敗した変換の例を示しています。 生文字列は、JSON 文字列への変換に失敗した後に返されます。
      生文字列 戻り値 説明
      (1,2,3) "(1,2,3)" タプルはサポートされていません。 データ型はリストを使用する必要があります。
      True "True", データ型がブールの値は、true と false のみです。
      {1: 2, 3: 4} "{1: 2, 3: 4}" 辞書のキーは文字列のみです。
  • 関数の例:次の例では、詳細パラメーターの情報を local パラメーターに再割り当てします。
    下図に、コンソールで設定する詳細パラメーターを示します。詳細パラメーター
    生ログ:
    content: 1
    変換ルール:
    e_set("local", res_local('endpoint'))
    変換結果:
    content: 1
    local: hangzhou

res_rds_mysql

  • 関数の構文
    res_rds_mysql(address, username, password, database, table=None, sql=None, fields=None, fetch_include_data=None, fetch_exclude_data=None, refresh_interval=0,base_retry_back_off=1,max_retry_back_off=60,use_ssl=False)
  • 関数のパラメーター
    パラメーター データ型 必須/任意 説明
    address String 必須 ドメイン名または IP アドレスです。 ポート番号が 3306 でない場合は、このパラメータを IP address:portの形式で設定します。 IP アドレスのみがサポートされています。
    username String 必須 ApsaraDB RDS for MySQL にアクセスするユーザーの名前です。 プレーンテキストのユーザー名はサポートされていません。 詳細については、「AccessKey:」をご参照ください。
    password String 必須 ユーザーのパスワードです。 プレーンテキストのパスワードはサポートされていません。 詳細については、「AccessKey:」をご参照ください。
    database String 必須 接続するデータベースの名前です。
    table String 必須 操作するテーブルの名前です。 sql パラメーターが指定された場合は、このパラメーターは必要ありません。
    sql String 必須 操作するテーブルの名前です。 table パラメーターが指定された場合は、このパラメーターは必要ありません。
    fields String list or string alias list 任意 文字列リストまたは文字列エイリアスリストです。 このパラメーターが指定されていない場合は、 table または sql パラメーターによって返されるすべての列が使用されます。 たとえば、["user_id", "province", "city", "name", "age"] リストの name パラメーターを user_name に変更したい場合は、フィールドのパラメーターを ["user_id", "province", "city", ("name", "user_name"), ("nickname", "nick_name"), "age"] に設定します。
    fetch_include_data Search string 任意 文字列のホワイトリストです。 fetch_include_data パラメーターに一致する文字列は保持されます。 このパラメーターに一致しない文字列は破棄されます。
    fetch_exclude_data Search string 任意 文字列のブラックリストです。 fetch_exclude_data パラメーターに一致する文字列は破棄されます。 このパラメーターに一致しない文字列は保持されます。
    refresh_interval Numeric string or number 任意 データがプルされる間隔です。 単位:秒。 デフォルト値は 0 です。これは、検索条件に一致するすべてのデータを 1 回だけロードすることを意味します。 この場合、データは更新されません。
    base_retry_back_off Number 任意 プルの失敗後にデータが再プルされる間隔です。 デフォルト値: 1。 単位:秒。
    max_retry_back_off int 任意 デフォルト値は 60 です。 単位:秒。 変換タスクが失敗した後の再試行間の最大間隔です。 デフォルト値の使用を推奨します。
    use_ssl Bool 任意 SSL プロトコルを使用して ApsaraDB RDS for MySQL に接続するかどうかを示します。 デフォルト値:false 。
    ApsaraDB RDS for MySQL で SSL が有効であり、このパラメーターが指定されている場合は、SSL チャネルは ApsaraDB RDS for MySQL への接続に使用されますが、サーバーの CA 証明書は検証されません。 サーバーが提供する CA 証明書を接続に使用することはできません。
  • レスポンス

    複数の列を含むテーブルが返されます。 列は、fields パラメーターで定義されます。

  • エラー処理

    データのプル処理中にエラーが生じた場合は、エラーメッセージが表示されますが、データ変換タスクは続行されます。 アニールと再試行は、refresh_interval_max パラメーターの設定に従って実行されます。 最初の再試行間隔は 1 秒です。 最初の再試行が失敗した場合は 、2 番目の再試行間隔は最初の再試行の長さの 2 倍になり、間隔が refresh_interval_max パラメーターの値に達するまで繰り返されます。 以降の再試行は、refresh_interval_max パラメーターの値に従って実行されます。 再試行が成功すると、再試行間隔が初期値 (1 秒) に戻ります。

  • 関数の例
    • 次の例では、test_db データベースの test_table テーブルから 300 秒ごとにデータを取得します。
      res_rds_mysql(address="rds mysql database host IP address",username="xxx",password="xxx",database="test_db",table="test_table",refresh_interval=300)
    • 次の例では、status 値が delete であるデータを除いて、test_table テーブルからデータをプルします。
      res_rds_mysql(address="rds mysql database host IP address",username="xxx",password="xxx",database="test_db",table="test_table",refresh_interval=300,fetch_exclude_data="'status':'delete'")
    • 次の例では、status 値が exit であるデータを test_table テーブルからプルします。
      res_rds_mysql(address="rds mysql database host IP address",username="xxx",password="xxx",database="test_db",table="test_table",refresh_interval=300,fetch_include_data="'status':'exit'")
    • 次の例では、 status 値が exit であるデータを test_table テーブルからプルします。name の値が aliyun であるデータは除外されます。
      res_rds_mysql(address="rds mysql database host IP address",username="xxx",password="xxx",database="test_db",table="test_table",refresh_interval=300,fetch_exclude_data="'status':'exit'",fetch_exclude_data="'name':'aliyun'")

res_log_logstore_pull

  • 関数の構文
    res_log_logstore_pull(endpoint, ak_id, ak_secret, project, logstore, fields, from_time="begin", to_time="end", fetch_include_data=None, fetch_exclude_data=None, primary_keys=None, upsert_data=None, delete_data=None,max_retry_back_off=60,fetch_interval=2,base_retry_back_off=1)
  • 関数パラメーター
    パラメーター データ型 必須項目 説明
    endpoint String 必須 サービスエンドポイントです。 Log Service エンドポイントの詳細については、「 サービスエンドポイント」をご参照ください。 デフォルト値は HTTPS アドレスです。 このパラメーターは、 HTTP アドレスにすることもできます。 ポート 80 または 443 を使用することができます。 たとえば、このパラメーターは http://endpoint:port と設定することができます。
    ak_id String 必須 ご利用中のアカウントの AccessKey ID です。 プレーンテキストはサポートされていません。
    ak_secret String 必須 ご利用中のアカウントの AccessKey secret です。 プレーンテキストはサポートされていません。
    project String 必須 データをプルするプロジェクトの名前です。
    logstore String 必須 データをプルするプロジェクトの Logstore の名前です。
    fields String list or string alias list 必須 文字列リストまたは文字列エイリアスリストです。 Logstore に指定された列が含まれていない場合は、この列の値は null となります。 たとえば、["user_id", "province", "city", "name", "age"] リストの "name" パラメーターを "user_name" に変更したい場合は、フィールドのパラメーターを ["user_id", "province", "city", ("name", "user_name"), ("nickname", "nick_name"), "age"] と設定します。
    from_time String 任意 Logstore からのデータプルが開始するサーバー時刻です。 以下の時間フォーマットをサポートしています。
    • Unix タイムスタンプ。
    • 時間文字列。
    • 特定の文字列、たとえば begin または end
    • 式: dt_関数によって返される時間。 たとえば、式dt_totimestamp(dt_truncate(dt_today(tz="Aisa/Shanghai"), day=op_neg(-1))) は、昨日のデータプルの開始時刻を示します。 現在時刻が 2019-5-5 10:10:10+8:00 の場合は、この式は時間 2019-5-4 0:0:0+8:00 を示します。
    デフォルト値は begin、つまり、データプルは最初のログエントリから始まります。
    to_time String 任意 Logstore からのデータプルが終了するサーバー時刻です。 以下の時間フォーマットをサポートしています。
    • Unix タイムスタンプ。
    • 時間文字列。
    • 特定の文字列、たとえば begin または end
    • 式: dt_関数によって返される時間。
    デフォルト値は end つまり、データプルは最後のログで終了します。

    このパラメーターが指定されていないか、または None に設定されている場合は、継続的に最新のログエントリからデータをプルすることを意味します。

    このパラメーターを将来の時刻に設定すると、ログストア内の既存のデータのみがプルされます。
    fetch_include_data Search string 任意 文字列のホワイトリストです。 fetch_include_data パラメーターに一致する文字列は保持されます。 このパラメーターに一致しない文字列は破棄されます。
    fetch_exclude_data Search string 任意 文字列のブラックリストです。 fetch_exclude_data パラメーターに一致する文字列は破棄されます。 このパラメーターに一致しない文字列は保持されます。
    primary_keys String list 任意 テーブルの維持に使用される主キー列のリストです。 fields パラメーターで主キー列の名前を変更する場合は、このパラメーターを主キーとして変更した列に設定する必要があります。
    primary_keys パラメーターの値は、fields パラメーターで指定された列名である必要があります。 データがプルされるログストアにシャードが 1 つだけ存在する場合、このパラメーターは有効です。
    fetch_interval Int 任意 継続的なデータプルリクエストの間隔です。 デフォルト値は 2 秒です。 値は 1 秒以上でなければなりません。
    delete_data Search string 任意 テーブルからデータを削除します。 primary_keys 列が指定されていないログエントリは削除できません。
    base_retry_back_off Number 任意 プルの失敗後にデータが再プルされる間隔です。 デフォルト値:1です。 単位:秒。
    max_retry_back_off Int 任意 デフォルト値は 60 です。 単位:秒。 変換タスクが失敗した後の再試行間の最大間隔です。 デフォルト値を使用することを推奨します。
  • レスポンス

    複数の列を含むテーブルが返されます。

  • エラー処理

    データプル処理中にエラーが生じた場合は、エラーメッセージが表示されますが、データ変換タスクは続行されます。 アニールと再試行は、refresh_interval_max パラメーターの設定に従って実行されます。 最初の再試行間隔は 1 秒です。 最初の再試行が失敗した場合は、2 番目の再試行間隔は最初の再試行の長さの 2 倍になり、間隔が refresh_interval_max パラメーターの値に達するまで繰り替えされます。 以降の再試行は、refresh_interval_max パラメーターの値に従って実行されます。 再試行が成功すると、再試行間隔が初期値 (1 秒) に戻ります。

  • 関数の例
    # Sample data in the Logstore
    
    time:1234567
    __topic__:None
    key1:value1
    key2:value2
    
    ...
    • 次の例では、test_project プロジェクトの test_logstore Logstore から列 key1 と列 key2 のデータをプルします。 データプルは、ログデータがログストアに書き込まれた時点から開始され、データの書き込みが終了した時点で終了します。 データプルは 1 回だけ実行されます。
      res_log_logstore_pull("endpoint", "ak_id", "ak_secret", "test_project", "test_logstore", ["key1","key2"], from_time="begin", to_time="end")
    • この例では、30 秒間隔でログストアからデータをプルします。
      res_log_logstore_pull("endpoint", "ak_id", "ak_secret", "test_project", "test_logstore", ["key1","key2"], from_time="begin", to_time=None,fetch_interval=30)
    • 次の例では、Logstore からデータをプルし、key1:value1 列を含むデータを除外するブラックリストを指定します。
      res_log_logstore_pull("endpoint", "ak_id", "ak_secret", "test_project", "test_logstore", ["key1","key2"], from_time="begin", to_time=None,fetch_interval=30,fetch_exclude_data="key1:value1")
    • 次の例では、ホワイトリストを指定して、key1: value1 列 を含むデータをログストアからプルします。
      res_log_logstore_pull("endpoint", "ak_id", "ak_secret", "test_project", "test_logstore", ["key1","key2"], from_time="begin", to_time=None,fetch_interval=30,fetch_include_data="key1:value1")

res_oss_file

  • 関数の構文
    
    res_oss_file(endpoint, ak_id, ak_key, bucket, file, format='text', change_detect_interval=0,base_retry_back_off=1, max_retry_back_off=60,encoding='utf8',error='ignore')
  • 関数パラメーター
    パラメーター データ型 必須/任意 説明
    endpoint String 必須 サービスエンドポイントです。 OSS サービスエンドポイントの詳細については、「リージョンとエンドポイント」をご参照ください。 デフォルト値は HTTPS アドレスです。 このパラメーターは、HTTP アドレスにすることもできます。 ポート 80 または 443 を使用できます。 たとえば、このパラメーターは http://endpoint:port に設定できます。
    ネットワークが安定した高速のイントラネットとなるよう、OSS バケットとデータ変換 Logstore を同じリージョンに配置することを推奨します。 それ以外の場合は、ネットワークはパブリックネットワークであり、より多くの帯域幅を消費し、安定性が低下します。
    ak_id String 必須 ご利用中のアカウントの AccessKey ID です。 プレーンテキストはサポートされていません。
    ak_key String 必須 ご利用中のアカウントの AccessKey secret です。 プレーンテキストはサポートされていません。
    bucket String 必須 オブジェクトデータを抽出するバケットの名前です。
    file String 必須 プロジェクトから抽出されるファイルのパスです。 このパラメーターはスラッシュで始めることはできません (/)。 フォーマットはtest/data.txt です。
    format String 必須 出力ファイルの形式です。 デフォルト値は text、つまり、出力ファイルが text ファイルです。 もしformat パラメーターが binary に設定されている場合は 、出力ファイルはbyte 形式です。
    change_detect_interval String 任意 ファイルの change-tag パラメーターに従って、更新間隔が存在するかどうかを確認します。 更新間隔が存在する場合は、ファイルはこの間隔で更新されます。 デフォルト値は 0 で、ファイルを更新しないことを意味します。
    base_retry_back_off Number 任意 プルの失敗後にデータが再プルされる間隔です。 デフォルト値:1。 単位:秒。
    max_retry_back_off int 任意 デフォルト値は 60 です。 単位:秒。 変換タスクが失敗した後の再試行間の最大間隔です。 デフォルト値を使用することを推奨します。
    encoding String 任意 エンコード形式は次のとおりです。[DO NOT TRANSLATE] format パラメーターが textに設定されている場合は、encoding パラメーターはデフォルトで、utf8 に設定されています。
    error String 任意 エラーの処理方法です。 デフォルト値は ignore です。 他の有効な値は strict 、replace、および xmlcharrefreplace です。 このパラメーターは、 UnicodeError メッセージが報告された場合にのみ有効です。
  • レスポンス
    • ファイルデータはバイトストリーム形式で返されます。
    • ファイルデータはテキスト形式で返されます。
  • エラー処理

    データプル処理中にエラーが生じた場合は、エラーメッセージが表示されますが、データ変換タスクは続行されます。 アニールと再試行は、refresh_interval_max パラメーターの設定に従って実行されます。 最初の再試行間隔は 1 秒です。 最初の再試行が失敗した場合は、2 番目の再試行間隔は最初の再試行の長さの 2 倍になり、間隔が refresh_interval_max パラメーターの値に達するまで繰り返されます。 以降の再試行は、refresh_interval_max パラメーターの値に従って実行されます。 再試行が成功すると、再試行間隔が初期値 (1 秒) に戻ります。

  • 例 1:この例では、OSS から JSON データファイルを抽出します。
    • OSS 内の JSON ファイルのコンテンツ
      {
        "users": [
          {
              "name": "user1",
              "login_historys": [
                {
                  "date": "2019-10-10 0:0:0",
                  "login_ip": "1.1.1.1"
                },
                {
                  "date": "2019-10-10 1:0:0",
                  "login_ip": "1.1.1.1"
                }
              ]
          },
          {
              "name": "user2",
              "login_historys": [
                {
                  "date": "2019-10-11 0:0:0",
                  "login_ip": "1.1.1.2"
                },
                {
                  "date": "2019-10-11 1:0:0",
                  "login_ip": "1.1.1.3"
                },
                {
                  "date": "2019-10-11 1:1:0",
                  "login_ip": "1.1.1.5"
                }
              ]
          }
        ]
      }
    • 生ログ:
      content: 123
    • 変換ルール:
      e_set("json_parse",json_parse(res_oss_file(endpoint='http://oss-cn-hangzhou.aliyuncs.com',ak_id='your ak_id',
                                                                 ak_key='your ak_key',
                                                                 bucket='your bucket', file='testjson.json')))
    • 変換結果:
      content: 123
          prjson_parse: 
      '{
        "users": [
          {
              "name": "user1",
              "login_historys": [
                {
                  "date": "2019-10-10 0:0:0",
                  "login_ip": "1.1.1.1"
                },
                {
                  "date": "2019-10-10 1:0:0",
                  "login_ip": "1.1.1.1"
                }
              ]
          },
          {
              "name": "user2",
              "login_historys": [
                {
                  "date": "2019-10-11 0:0:0",
                  "login_ip": "1.1.1.2"
                },
                {
                  "date": "2019-10-11 1:0:0",
                  "login_ip": "1.1.1.3"
                },
                {
                  "date": "2019-10-11 1:1:0",
                  "login_ip": "1.1.1.5"
                }
              ]
          }
        ]
      }'
  • 例 2:この例では、test.txt ファイルのコンテンツを OSS から抽出します。
    • OSS 内のファイル test.txt のコンテンツ[DO NOT TRANSLATE]
      Test bytes
    • 生ログ:
      content: 123
    • 変換ルール:
      e_set("test_txt",res_oss_file(endpoint='http://oss-cn-hangzhou.aliyuncs.com',ak_id='your ak_id',
                                                                 ak_key='your ak_key',
                                                                 bucket='your bucket', file='test.txt'))
    • 変換結果:
      content: 123
      test_txt: Test bytes