Virtual Private Cloud (VPC) コンソールでフローログ機能を有効にすると、フローログが収集され、Simple log Serviceに送信されます。 Simple Log Serviceを使用して、フローログを照会および分析できます。 フローログに基づいてネットワークエラーのトラブルシューティングを行うこともできます。 このトピックでは、Simple Log Serviceのデータ変換機能を使用して、インターネットトラフィックログのフローログをフィルタリングする方法について説明します。
前提条件
使用するVPCでフローログ機能が有効になっています。 詳細については、「フローログ機能の有効化」をご参照ください。
データ変換後のVPCフローログのインターネットトラフィックログを保存するためのプロジェクトとLogstoreが作成されます。 詳細については、「プロジェクトの作成」および「Logstore の作成」をご参照ください。
背景情報
VPCはフローログ機能を提供します。 この機能は、elastic network interface (ENI) のインバウンドトラフィックとアウトバウンドトラフィックに関する情報を記録します。 フローログに基づいて、アクセス制御ルールの確認、ネットワークトラフィックの監視、ネットワークエラーのトラブルシューティングを行うことができます。
フローログ機能は、トラフィック情報をキャプチャし、トラフィック情報をログに記録してから、ログをSimple log Serviceに送信します。 各ログは、指定された時間ウィンドウ内にキャプチャされたネットワークトラフィックの指定された5タプルを記録します。 時間ウィンドウは約10分です。 この期間中、フローログ機能はトラフィックデータを集約し、ログとして記録されたトラフィックデータをSimple log Serviceに送信します。 VPCまたはvSwitchのフローログ機能を有効にすると、VPCまたはvSwitchのENIを介して転送されるトラフィックがキャプチャされます。 フローログ機能が有効になった後に作成されるENIが含まれます。
シナリオ
たとえば、VPCのフローログ機能を有効にすると、フローログが収集され、Simple log Serviceに送信されます。 サンプルログ:
{
"vm-id": "i-bp13cg******zs2l",
"srcaddr": "172.16.XX.XX",
"__time__": 1650964251,
"__topic__": "flow_log",
"dstport": "53",
"account-id": "1379******4",
"__source__": "log_service",
"start": "1650862360",
"dstaddr": "100.100.XX.XX",
"vpc-id": "vpc-bp1cznk******vv",
"version": "1",
"packets": "1",
"eni-id": "eni-bp17w******5sfw6m",
"protocol": "17",
"__pack_meta__": "1|MTY1MDk2NDAxOTEyMjczMTQ1NQ==|5|4",
"bytes": "92",
"vswitch-id": "vsw-bp16******wqe6p44",
"srcport": "59986",
"action": "ACCEPT",
"end": "1650862391",
"log-status": "OK",
"direction": "out"
}フローログをクエリおよび分析するときにインターネットトラフィックを分析するには、生ログに対して次の操作を実行する必要があります。
srcaddrまたはdstaddrフィールドがrawログに存在しない場合は、rawログを破棄します。
生ログに記録されたトラフィックが内部ネットワーク経由で転送される場合は、生ログを破棄します。
上記のシナリオでインターネットトラフィックを分析する場合は、データ変換機能を使用して、収集したフローログを変換できます。
手順
[プロジェクト] セクションで、管理するプロジェクトをクリックします。

左側のナビゲーションウィンドウで、[ログストレージ] をクリックします。 Logstoreリストで、管理するLogstoreをクリックします。

データ変換.
表示されるコードエディターで、次の変換ルールを入力します。
# If the srcaddr or dstaddr field does not exist in the raw logs, discard the raw logs. e_if(e_not_has("srcaddr"), e_drop()) e_if(e_not_has("dstaddr"), e_drop()) # If the value of the srcaddr or dstaddr field in the raw logs does not conform to the IP address format, discard the raw logs. e_if(op_not(e_match("srcaddr", grok(r'%{IP}'))), e_drop()); e_if(op_not(e_match("dstaddr", grok(r'%{IP}'))), e_drop()); # If the traffic recorded in the raw logs is transferred over internal networks, discard the raw logs. e_if(op_and( op_or(ip_cidrmatch("10.0.0.0/8", v("srcaddr")), ip_cidrmatch("172.16.0.0/12", v("srcaddr")), ip_cidrmatch("192.168.0.0/16", v("srcaddr")) ), op_or(ip_cidrmatch("10.0.0.0/8", v("dstaddr")), ip_cidrmatch("172.16.0.0/12", v("dstaddr")), ip_cidrmatch("192.168.0.0/16", v("dstaddr")) )),e_drop())e_ifおよびe_not_has関数を使用して、srcaddrまたはdstadrフィールドを含まない生ログを削除します。 詳細については、「e_if」、「e_not_has」、および「e_drop」をご参照ください。
e_if、op_not、およびe_match関数を使用して、srcaddrまたはdstadrフィールドの値がIPアドレス形式に準拠していない生ログを削除します。 詳細については、「op_not」および「e_match」をご参照ください。
e_if、op_and、op_or、およびip_cidrmatch関数を使用して、記録されたトラフィックが内部ネットワーク経由で転送される生のログを削除します。 詳細については、「op_and」、「op_or」、および「ip_cidrmatch」をご参照ください。
クリックデータのプレビュー.
インターネットトラフィックを記録するフローログのみが保持されます。

変換ジョブとして保存.
データ変換ジョブの作成パネル、パラメータを設定し、OK.
基本情報を設定します。
パラメーター
説明
ジョブ名
データ変換ジョブの名前。 例: vpc-flowlog-public
権限付与方法
Simple Log ServiceがソースLogstoreからデータを読み取ることを許可するために使用されるメソッド。 例: デフォルトロール
ストレージの宛先を設定します。
パラメーター
説明
宛先名
保存先の名前。 例: target-a
宛先リージョン
ターゲットプロジェクトが存在するリージョン。 例: 中国 (杭州)
宛先プロジェクト
インターネットトラフィックログの保存に使用されるプロジェクトの名前。 例: project-vpc-flowlog-public
ターゲットストア
インターネットトラフィックログの保存に使用されるLogstoreの名前。 例: logstore-vpc-flowlog-public
権限付与方法
Simple Log Serviceが宛先Logstoreからデータを読み書きすることを許可するために使用されるメソッド。
例: デフォルトロール
データ変換の時間範囲を指定します。
パラメーター
説明
時間範囲
データ変換の時間範囲。 [すべて] を選択した場合、Simple Log ServiceはソースLogstore内のすべてのデータを変換します。
データ変換ジョブの作成後、次の操作を実行できます。
データ変換ジョブの詳細とステータスを表示します。 データ変換ジョブを開始、停止、変更、または削除することもできます。 詳細については、「データ変換ジョブの管理」をご参照ください。
送信先のLogstoreに移動して、VPCフローログのインターネットトラフィックログを表示します。 Logstoreには、インターネットトラフィックログのみが保存されます。
たとえば、次のクエリステートメントを入力して、送信元の都市と送信先の都市ごとにインターネットトラフィックをクエリおよび分析できます。
*|select ip_to_city(srcaddr) as sourceAddr,ip_to_city(dstaddr) as dstAddr,COUNT(*) as pv group by sourceAddr,dstAddr order by pv limit 10