Simple Log Service (SLS) トリガーを構成することで、Simple Log Service (SLS) を Function Compute と統合できます。SLS トリガーは、新しいログが生成されると自動的に関数を実行し、Simple Log Service の Logstore からデータを増分的に消費してカスタム処理タスクを実行します。
ユースケース
データクレンジングおよび処理
Simple Log Service を使用すると、ログを迅速に収集・処理・照会・分析できます。
データ転送
この機能は、さまざまな送信先へのデータ配信をサポートし、クラウドベースのビッグデータプロダクト間でデータパイプラインを構築します。
データ処理関数
関数の種類
テンプレート関数
詳細については、「aliyun-log-fc-functions」をご参照ください。
カスタム関数
関数の構成フォーマットは、関数の具体的な実装によって異なります。詳細については、「ETL 関数開発ガイド」をご参照ください。
トリガーの仕組み
Simple Log Service の ETL ジョブは、Function Compute のトリガーに対応します。ETL ジョブを作成すると、Simple Log Service はタイマーを開始し、定期的に Logstore 内のシャード情報をポーリングします。新しいデータが書き込まれると、システムは <shard_id,begin_cursor,end_cursor> 形式のトリプレットとしてイベントを生成し、関数をトリガーします。
ストレージシステムがスペックアップされた場合、新しいデータが書き込まれていなくてもカーソルが変更されることがあります。この場合、各シャードは空のペイロードで 1 回トリガーされます。関数内では、カーソルを使用してシャードからデータをプルし、データが返されない場合は空のトリガーであるため、呼び出しを無視してください。詳細については、「カスタム関数開発ガイド」をご参照ください。
Simple Log Service の ETL タスクのトリガーの仕組みは、時間ベースです。たとえば、ETL ジョブのトリガー間隔を 60 秒に設定し、Logstore のシャード 0 に継続的にデータが書き込まれている場合、シャードは 60 秒ごとに関数の実行をトリガーします。シャードに新しいデータが書き込まれていない場合、関数はトリガーされません。関数の入力は、直近 60 秒間のカーソル範囲です。関数内では、このカーソルに基づいてシャード 0 からデータを読み取り、さらに処理できます。
制限事項
単一のプロジェクトに関連付けられる Simple Log Service トリガーの最大数は、そのプロジェクト内の Logstore 数の 5 倍です。
各 Logstore につき、Simple Log Service トリガーを 5 つ以下に設定することをお勧めします。そうしないと、Function Compute へのデータ転送の効率に影響が出る可能性があります。
例となるシナリオ
Simple Log Service トリガーを構成して、更新されたデータを定期的にフェッチし、関数を呼び出すことで、Logstore からデータを増分的に消費できます。関数内で、データクレンジングや処理などのカスタムタスクを実行し、データをサードパーティサービスに転送できます。この例では、ログデータの取得と出力方法のみを示します。
データ処理に使用する関数は、Simple Log Service が提供するテンプレートまたはユーザーが作成したカスタム関数のいずれかです。
前提条件
Function Compute
Simple Log Service (SLS)
1 つのプロジェクトと 2 つの Logstore を作成します。収集されたログ用のソース Logstore と、トリガーの実行ログ用の別の Logstore です。Function Compute は新しいデータによってトリガーされるため、ログが継続的に収集されていることを確認する必要があります。
Log プロジェクトと Function Compute サービスは、同じリージョンに配置されている必要があります。
入力パラメーター
eventSimple Log Service トリガーが起動すると、イベントデータを表す JSON オブジェクトを関数の
event入力パラメーターに渡します。形式は次のとおりです。{ "parameter": {}, "source": { "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "fc-test-project", "logstoreName": "fc-test-logstore", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }, "jobName": "1f7043ced683de1a4e3d8d70b5a412843d81****", "taskId": "c2691505-38da-4d1b-998a-f1d4bb8c****", "cursorTime": 1529486425 }以下の表は、パラメーターの説明です。
パラメーター
説明
parameter
トリガー構成時に指定したInvocation Parametersの値です。
source
関数が読み取るログブロックに関する情報です。
endpoint:Simple Log Service プロジェクトが存在するリージョンです。
projectName:プロジェクトの名前です。
logstoreName:Function Compute が消費する Logstore の名前です。現在のトリガーはこの Logstore からデータをサブスクライブし、定期的に関数サービスに送信してカスタム処理を行います。
shardId:Logstore 内の特定のシャードです。
beginCursor:データ消費を開始する位置です。
endCursor:データ消費を停止する位置です。
説明関数をデバッグする際は、GetCursor by time API オペレーションを呼び出してbeginCursorおよびendCursorを取得し、上記の例に基づいて関数イベントを構築してテストできます。
jobName
Simple Log Service ETL ジョブの名前です。関数に構成された Simple Log Service トリガーは、Simple Log Service の ETL ジョブに対応します。
このパラメーターは Function Compute によって自動的に生成され、ユーザーによる構成は不要です。
taskId
ETL ジョブにおいて、taskId は関数呼び出しの一意な識別子です。
このパラメーターは Function Compute によって自動的に生成され、ユーザーによる構成は不要です。
cursorTime
最新のログが Simple Log Service サーバーに到着した UNIX タイムスタンプ(秒単位)です。
contextFunction Compute が関数を実行する際、
contextオブジェクトを関数に渡します。このオブジェクトには、呼び出し、サービス、関数、トレーシング、および実行環境に関する情報が含まれます。本トピックでは、
context.credentialsを使用して認証情報のキーを取得します。その他のフィールドの詳細については、「Context」をご参照ください。
ステップ 1:SLS トリガーの作成
Function Compute コンソールにログインします。左側のナビゲーションウィンドウで、を選択します。
上部のナビゲーションバーでリージョンを選択します。関数ページで、対象の関数をクリックします。
関数詳細ページで、トリガータブをクリックし、次にトリガーの作成をクリックします。[トリガーの作成] パネルで、トリガータイプにLog Serviceを選択し、他のパラメーターを構成して、OKをクリックします。
パラメーター
操作
例
名前
トリガーのカスタム名を入力します。このパラメーターを空のままにすると、Function Compute が自動的に名前を生成します。
log_trigger
バージョンまたはエイリアス
デフォルト値はLATESTです。別のバージョンまたはエイリアス用のトリガーを作成する場合は、関数詳細ページの右上隅でそのバージョンまたはエイリアスに切り替える必要があります。バージョンとエイリアスの詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。
LATEST
Log Service プロジェクト
データを消費する Simple Log Service プロジェクトを選択します。
aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****
Logstore
データを消費する Logstore を選択します。トリガーはこの Logstore から定期的にデータをサブスクライブし、関数サービスに送信してカスタム処理を行います。
function-log
トリガー間隔
Simple Log Service が関数をトリガーする間隔を指定します。
有効値:3~600。単位:秒。デフォルト値:60。
60
再試行
1 回の呼び出しで許可される最大リトライ回数を指定します。
有効値:0~100。デフォルト値:3。
説明ステータスが 200 で、ヘッダーの
X-Fc-Error-Typeパラメーターの値がUnhandledInvocationErrorまたはHandledInvocationErrorでない場合、成功した実行とみなされます。それ以外の場合は失敗した実行となり、リトライがトリガーされます。X-Fc-Error-Typeパラメーターの詳細については、「応答パラメーター」をご参照ください。実行が失敗した場合、構成に基づいてシステムがリトライを実行します。初期のすべてのリトライが失敗した場合は、間隔を長くしたバックオフ再試行フェーズに入ります。
3
トリガーログ
作成済みの Logstore を選択します。Simple Log Service によってトリガーされた関数実行のログは、この Logstore に記録されます。
function-log2
呼び出しパラメーター
関数に渡すカスタムパラメーターです。値は JSON 形式の文字列である必要があり、イベントのparameterフィールドとして渡されます。
このパラメーターはデフォルトで空です。
なし
ロール名
AliyunLogETLRoleを選択します。
説明このタイプのトリガーを初めて作成する場合は、OKをクリックした後、表示されるダイアログボックスで今すぐ承認を選択する必要があります。
AliyunLogETLRole
トリガーを作成すると、トリガー名リストに表示されます。トリガーを変更または削除するには、「トリガーの管理」をご参照ください。
ステップ 2:権限の構成
関数の詳細ページで、設定タブを選択します。詳細設定セクションで、変更をクリックします。詳細設定パネルで、関数ロールを選択します。
デフォルトロールAliyunFCServerlessDevsRoleを使用できます。このロールは、デフォルトで Simple Log Service の読み取り専用権限を持ちます。
カスタム RAM ロールを作成することもできます。カスタム RAM ロールは、次の 2 つの要件を満たす必要があります。
RAM ロールを作成する際、{key, select, RAM {アカウント} Service {クラウドサービス} Federated {アイデンティティプロバイダー者} Account {アカウント} CurrentAccount {現在のアカウント} OtherAccount {他のアカウント} AllAccounts {すべてのアカウント} RAMType {アイデンティティの種類} UserName {ユーザー名} RoleName {ロール名} FederatedType {ID プロバイダーのタイプ} other { {key} } }を信頼できるエンティティとして選択し、Function Computeを信頼できるサービスとして選択します。詳細については、「信頼できる Alibaba Cloud サービス向けの RAM ロールの作成」をご参照ください。
関数の具体的な要件に基づいて、RAM ロールに必要な Simple Log Service 権限を付与します。詳細については、「カスタム RAM ポリシーの例」をご参照ください。
完了したら、デプロイをクリックします。
ステップ 3:デプロイとログの確認
コードタブの関数詳細ページで、コードエディタにコードを入力し、デプロイメントコードをクリックします。
この例では、次の操作を行う Python 関数をデプロイします。
eventパラメーターから、endpoint、projectName、logstoreName、beginCursorなどの Simple Log Service イベントトリガー情報を取得します。contextパラメーターから、accessKeyId、accessKey、securityTokenなどの認証情報を取得します。取得した情報に基づいて SLS クライアントを初期化します。
指定されたカーソル位置でソース Logstore からログデータを取得します。
説明以下のサンプルコードは、ほとんどの論理ログを抽出するためのテンプレートとして使用できます。
""" This sample code shows how to perform the following actions: * Parse SLS event trigger information from the event. * Initialize the SLS client based on the obtained information. * Pull real-time log data from the source Logstore. """ #!/usr/bin/env python # -*- coding: utf-8 -*- import logging import json import os from aliyun.log import LogClient logger = logging.getLogger() def handler(event, context): # Access keys can be fetched through context.credentials. print("The content in context entity is: ", context) creds = context.credentials access_key_id = creds.access_key_id access_key_secret = creds.access_key_secret security_token = creds.security_token # Parse event into an object. event_obj = json.loads(event.decode()) print("The content in event entity is: ", event_obj) # Get the name of the Project, the name of the Logstore, the endpoint of SLS, beginCursor, endCursor, and shardId from event.source. source = event_obj['source'] log_project = source['projectName'] log_store = source['logstoreName'] endpoint = source['endpoint'] begin_cursor = source['beginCursor'] end_cursor = source['endCursor'] shard_id = source['shardId'] # Initialize the SLS client. client = LogClient(endpoint=endpoint, accessKeyId=access_key_id, accessKey=access_key_secret, securityToken=security_token) # Read logs from the source Logstore within cursor range [begin_cursor, end_cursor). In this example, the range contains all logs that trigger the invocation. while True: response = client.pull_logs(project_name=log_project, logstore_name=log_store, shard_id=shard_id, cursor=begin_cursor, count=100, end_cursor=end_cursor, compress=False) log_group_cnt = response.get_loggroup_count() if log_group_cnt == 0: break logger.info("get %d log group from %s" % (log_group_cnt, log_store)) logger.info(response.get_loggroup_list()) begin_cursor = response.get_next_cursor() return 'success'関数の詳細ページで、を選択して、関数実行時に取得された最新のデータを確認します。メッセージ「現在の関数ではログ機能が有効になっていません。」が表示された場合は、有効化。
これで、Simple Log Service トリガーの構成が完了しました。コンソールでコードをデバッグするには、次の手順に従ってください。
(オプション)ステップ 4:シミュレートイベントでのテスト
関数の詳細ページの [コード] タブで、[テスト関数] の横にある
アイコンをクリックし、ドロップダウンリストから [テストパラメーターの設定] を選択します。テストパラメーターの設定パネルで、新規テストイベントの作成または既存のテストイベントの変更を選択し、イベント名と内容を入力して OK をクリックします。 新規テストイベントを作成する場合は、Log Serviceテンプレートを選択することを推奨します。テストデータの構成方法の詳細については、「event」をご参照ください。
シミュレートイベントを構成したら、関数のテストをクリックします。
実行が完了すると、コードタブの上部に実行結果が表示されます。
よくある質問
トラブルシューティング:SLS トリガーが呼び出されない
次の項目を確認して問題を解決できます。
トリガーに構成された Logstore に新しいデータが存在することを確認します。シャードデータが変更されると関数がトリガーされます。
トリガーログおよび関数実行ログを確認し、例外がないかチェックします。
呼び出し頻度が高いのはなぜですか?
各シャードは個別にトリガーされます。Logstore 全体のトリガー回数が多く見える場合がありますが、各シャードのトリガーは指定された間隔内で発生します。
単一シャードのトリガー間隔と、1 回の処理で対象となるデータ範囲(時間間隔)は同一です。関数実行中のトリガー間隔には、次の 2 つのシナリオがあります。トリガー間隔を 60 秒と仮定します。
トリガー遅延なし:関数は設定どおり 60 秒ごとに定期的にトリガーされ、
[now -60s, now)の範囲のデータを処理します。説明関数トリガーは各シャードごとに独立しています。Logstore に 10 個のシャードがあると仮定すると、リアルタイムデータ処理中(トリガー遅延なし)は、60 秒ごとに 10 回の関数呼び出しが発生します。
トリガー遅延が発生(Simple Log Service シャードの現在の処理位置が最新の書き込みデータより 10 秒以上遅れている場合):遅れを取り戻すためにトリガーが加速し、トリガー間隔が 2 秒に短縮される可能性がありますが、引き続き 60 秒間のウィンドウでデータを処理します。
エラー:「denied by sts or ram」
このエラーが関数ログに表示される場合、関数に必要な権限が構成されていないか、権限ポリシーが正しく設定されていない可能性があります。詳細については、「ステップ 2:権限の構成」をご参照ください。