すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Function Computeを使用して、syslogプロトコルを介してSIEMプラットフォームにログを送信する

最終更新日:Aug 30, 2024

Syslogは広く使用されているログ転送プロトコルです。 IBM QRadarやHP ArcSightなど、ほとんどのセキュリティ情報およびイベント管理 (SIEM) プラットフォームはsyslogを使用してログを受信します。 このトピックでは、Function Computeを使用して、syslogプロトコルを介してSimple Log ServiceからSIEMプラットフォームにログを送信する方法について説明します。

背景情報

  • Syslogは、RFC 5424およびRFC 3164で定義されています。 RFC 3164は2001年に公開され、RFC 5424は2009年に公開されたアップグレード版です。 このバージョンはRFC 3164と互換性があり、RFC 3164よりも多くの問題を解決するため、RFC 5424の使用を推奨します。 詳細については、「RFC 5424」および「RFC 3164」をご参照ください。

  • Syslog over TCP/TLS: Syslogは、ログメッセージの標準形式のみを定義します。 SyslogはTCPとUDPをサポートし、データ伝送の安定性を確保します。 RFC 5425では、ログメッセージのトランスポート層プロトコルとしてトランスポート層セキュリティ (TLS) の使用を定義しています。 SIEMプラットフォームがTCPまたはTLSをサポートしている場合は、UDPの代わりにTCPまたはTLSを使用することを推奨します。 詳細については、「RFC 5425」をご参照ください。

  • Simple Log Service: Simple Log Serviceは、クラウドネイティブの監視および分析プラットフォームであり、ログ、メトリック、およびトレースを処理するための大規模で低コストのリアルタイムサービスを提供します。 Simple Log Serviceを使用すると、データを収集、クエリ、分析、変換、視覚化、消費、および出荷できます。 Simple Log Serviceでは、アラートを設定することもできます。

  • Function Compute: Alibaba Cloud Function Computeは、完全マネージド型のイベント駆動型コンピューティングサービスです。 Function Computeを使用すると、サーバーなどのインフラストラクチャリソースを管理する必要なく、コードを作成およびアップロードできます。 Function Computeは、コンピューティングリソースを割り当て、柔軟で信頼性の高い方法でコードを実行し、ログクエリ、パフォーマンスモニタリング、アラートなどの機能を提供します。

  • Simple Log Serviceトリガー: 抽出、変換、および読み込み (ETL) ジョブは、Function ComputeのSimple Log Serviceトリガーに対応し、関数を呼び出すために使用されます。 Simple Log ServiceでLogstoreのETLジョブを作成すると、ジョブ設定に基づいてLogstoreのシャードからデータをポーリングするためのタイマーが開始されます。 データがLogstoreに書き込まれると、<shard_id,begin_cursor,end_cursor > 形式のトリプルデータレコードが関数イベントとして生成されます。 次に、関連するETL関数が呼び出される。

配送プロセス

データをリアルタイムで消費するために、消費者グループに基づいたプログラムをSimple Log Serviceで作成することを推奨します。 Function Computeでコンシューマーグループを管理できます。 次に、syslog over TCP/TLSを使用してSIEMプラットフォームにログを送信するようにSimple Log Serviceトリガーを設定できます。

image.png

配送例

次の例では、データソースとしてSimple Log Service Logstoreが使用され、syslogプロトコルを介してSIEMプラットフォームにログを送信するためにFunction Computeが使用されます。

説明
  • このトピックは、Simple Log Serviceトリガーに基づいています。

  • このトピックでは、Function Compute 2.0を使用します。

前提条件

手順

ステップ1: function Computeで関数を作成する

  1. Function Compute コンソールにログインします。 右上隅の [Function Compute 2に戻る] をクリックします。

    image

  2. 左側のナビゲーションウィンドウで、[サービスと機能] をクリックします。

  3. 上部のナビゲーションバーで、リージョンを選択します。 [サービス] ページで、[サービスの作成] をクリックします。

    [名前] および [サービスロール] パラメーターを設定します。 Simple Log Serviceリソースを管理するロールを作成する場合は、Logstoreのデータを使用する権限をロールに付与します。 その他のパラメータについては、デフォルト設定を保持します。 [詳細オプションの表示] をクリックして、サービスロールパラメーターを検索できます。 詳細については、「他のAlibaba CloudサービスにアクセスするためのFunction Compute権限の付与」および「カスタムポリシーを使用してRAMユーザーに権限を付与する例」をご参照ください。

  4. サービスの作成後、[関数] ページで [関数の作成] をクリックします。

    image

    関数を設定するには、次の手順を実行します。

    1. [組み込みランタイムを使用] を選択して関数を作成します。

    2. 基本設定

      • [関数名] フィールドに名前を入力します。

      • ハンドラタイプに [イベントハンドラ] を選択します。

    3. コード

      • ランタイムに [Python 3.9] を選択します。

      • コードのアップロード方法でサンプルコードを使用を選択します。 サンプルコードの説明セクションで、Log Serviceによってトリガーされる関数を選択します。

    4. 環境変数

      1. [フォームエディター] をクリックします。

      2. [+ 変数の追加] をクリックします。

      3. キーと値のペアを環境変数として設定します。

      • SYSLOG_HOST: データの受信側であるsyslogサーバのホスト。 形式: hostName.Simple Log Serviceエンドポイント エンドポイントの詳細については、「エンドポイント」をご参照ください。

      • SYSLOG_PORT: データの受信側であるsyslogサーバのポート。 デフォルト値: 10009

      • SYSLOG_PROTOCOL: syslogのトランスポート層プロトコル。 デフォルト値: tcp。

        image.png

    5. トリガー設定

      詳細については、「Simple Log Serviceトリガー」をご参照ください。

      image

      1. [トリガータイプ][Log Service] を選択します。

      2. ビジネス要件に基づいて、[Simple Log Service Project][Logstore][トリガー間隔][再試行] 、および [トリガーログ] パラメーターを設定します。

      3. [呼び出しパラメーター] のデフォルト設定を保持します。

      4. ロール名にAliyunLogETLRoleを選択します。

        説明

        このタイプのトリガーを初めて作成するときは、[OK] をクリックした後に表示されるダイアログボックスで、[今すぐ許可] をクリックします。 次に、認証を完了して、Simple Log ServiceトリガーのAliyunLogETLRoleシステムロールを作成します。

  5. [作成] をクリックします。

    image

ステップ2: 関数コードの設定

  1. [コード] タブで、index.pyを次のコードに置き換えます。

    """
    The sample code is used to implement the following features:
    * Parse the event parameter to obtain the trigger information of Simple Log Service events.
    * Initialize the Simple Log Service client based on the obtained information.
    * Obtain real-time logs from the source Logstore.
    * Ship the logs over syslog.
    
    
    This sample code is mainly doing the following things:
    * Get SLS processing related information from event
    * Initiate SLS client
    * Pull logs from source log store
    * Send logs with syslog
    
    """
    # !/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import json
    from aliyun.log import LogClient
    import os
    import logging
    import six
    from datetime import datetime
    
    from aliyun.log import PullLogResponse
    from aliyun.log.ext import syslogclient
    from pysyslogclient import SyslogClientRFC5424 as SyslogClient
    
    logger = logging.getLogger()
    
    
    def get_syslog_config():
        config = {
            'host': os.environ.get('SYSLOG_HOST', ''),
            'port': int(os.environ.get('SYSLOG_PORT', '514')),
            'protocol': os.environ.get('SYSLOG_PROTOCOL', 'tcp'),
            "facility": syslogclient.FAC_USER,  # Optional. You can refer to the values of other syslogclient.FAC_* parameters. 
            "severity": syslogclient.SEV_INFO,  # Optional. You can refer to the values of other syslogclient.SEV_* parameters. 
            "hostname": "aliyun.example.com",  # Optional. Specify a hostname. The default value is the hostname of the local host. 
            "tag": "tag"  # Optional. Specify a tag. The default value is a hyphen (-). 
        }
        return config
    
    
    def init_syslog_client(config):
        client = SyslogClient(config.get('host'), config.get('port'), config.get('protocol'))
        return client
    
    
    def process(shard_id, log_groups, config):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(shard_id, len(logs)))
        try:
            client = init_syslog_client(config)
            for log in logs:
                # suppose we only care about audit log
                timestamp = datetime.fromtimestamp(int(log[u'__time__']))
                del log['__time__']
    
                io = six.StringIO()
                # Modify the content format based on your business requirements. In this example, data is transmitted by using key-value pairs that are separated by two consecutive vertical bars (||). 
                for k, v in six.iteritems(log):
                    io.write("{0}{1}={2}".format('||', k, v))
    
                data = io.getvalue()
    
                # Modify the facility and severity parameters based on your business requirements. 
                client.log(data,
                           facility=config.get("facility", None),
                           severity=config.get("severity", None),
                           timestamp=timestamp,
                           program=config.get("tag", None),
                           hostname=config.get("hostname", None))
    
        except Exception as err:
            logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(config, err))
            # Add code to handle errors. For example, you can add code to retry requests or report errors. 
            raise err
    
        logger.info("Complete send data to remote")
    
    
    def handler(event, context):
        # Use context.credentials.to obtain information about keys.
        # Access keys can be fetched through context.credentials
        print("The content in context entity is: \n")
        print(context)
        creds = context.credentials
        access_key_id = creds.access_key_id
        access_key_secret = creds.access_key_secret
        security_token = creds.security_token
    
        # Parse the event parameter to the object data type.
        # parse event in object
        event_obj = json.loads(event.decode())
        print("The content in event entity is: \n")
        print(event_obj)
    
        # Use event.source to obtain the following information: project name, Logstore name, Simple Log Service endpoint, start cursor, end cursor, and shard ID.
        # Get the name of log project, the name of log store, the endpoint of sls, begin cursor, end cursor and shardId from event.source
        source = event_obj['source']
        log_project = source['projectName']
        log_store = source['logstoreName']
        endpoint = source['endpoint']
        begin_cursor = source['beginCursor']
        end_cursor = source['endCursor']
        shard_id = source['shardId']
    
        # Initialize the Simple Log Service client.
        # Initialize client of sls
        client = LogClient(endpoint=endpoint,
                           accessKeyId=access_key_id,
                           accessKey=access_key_secret,
                           securityToken=security_token)
    
        syslog_config = get_syslog_config()
    
        # Read logs based on the start and end cursors in the source Logstore. In this example, the specified cursors cover all logs that trigger the invocation of the function.
        # Read data from source logstore within cursor: [begin_cursor, end_cursor) in the example, which contains all the logs trigger the invocation
        while True:
            response = client.pull_logs(project_name=log_project,
                                        logstore_name=log_store,
                                        shard_id=shard_id,
                                        cursor=begin_cursor,
                                        count=100,
                                        end_cursor=end_cursor,
                                        compress=False)
            log_group_cnt = response.get_loggroup_count()
            if log_group_cnt == 0:
                break
            logger.info("get %d log group from %s" % (log_group_cnt, log_store))
            process(shard_id, response.get_loggroup_list(), syslog_config)
    
            begin_cursor = response.get_next_cursor()
    
        return 'success'
    

    image

ステップ3: カスタムレイヤーの作成

このトピックでは、ログはsyslogプロトコルで送信されます。 したがって、syslogの依存ライブラリが必要です。 次の手順では、依存関係ライブラリをインポートするカスタムレイヤーを作成する方法について説明します。

  1. [レイヤーの編集] をクリックします。

  2. 表示されるパネルで、[レイヤーの追加] > [カスタムレイヤーの追加] を選択します。

  3. [カスタムレイヤーの作成] をクリックします。 次に、次のパラメーターを設定します。

    1. 互換ランタイムPython 3.9を選択します。

    2. レイヤーアップロードメソッド[依存レイヤーオンラインのビルド] を選択します。

    3. [ビルド環境][Python 3.9] を選択します。

    4. requirements.txt Fileフィールドにpysyslogclientと入力します。

    5. [作成] をクリックします。

  4. レイヤー変更パネルに戻り、レイヤー1用に3で作成したカスタムレイヤーを選択し、[OK] をクリックします。

ステップ4: 関数のテストとコードのデプロイ

  1. [テスト関数] をクリックします。 successが返された場合、ログの配布は成功です。

    image

  2. [デプロイ] をクリックしてコードをデプロイします。

よくある質問

関数のテスト時にUnhandleInvocationErrorエラーが報告された場合はどうすればよいですか?

  1. サービスロールに必要な権限がないためにエラーが報告された場合は、「ロールの設定」をご参照ください。

  2. 設定された環境変数が無効なためにエラーが報告された場合は、「環境変数の設定」をご参照ください。

  3. ご不明な点がございましたら、DingTalkグループ11721331に参加して、Function Computeエンジニアにお問い合わせください。

次に何をすべきか

  • syslogサーバでログが受信されているかどうかを確認します。

  • [ログ] タブで、関数のトリガーと呼び出し情報を表示します。

    image

関連ドキュメント