DataWorks は、OpenEvent 機能を通じてメッセージサブスクリプション機能を提供します。サービスプログラムを DataWorks 拡張機能として登録して、サブスクライブしたイベントメッセージをキャプチャして応答できます。これにより、特定のイベントの通知を受信し、プロセスを管理できます。このトピックでは、「1,000 件を超えるデータレコードのダウンロードをリアルタイムでブロックまたは承認する」イベントを例に、リスク識別ルールの開発と検証のプロセスについて説明します。
背景情報
データのダウンロードは、企業のリスク管理における重要な操作です。通常、企業のデータ開発者とアナリストのみがデータプラットフォーム上のデータを閲覧および使用できます。詳細なデータをローカルコンピューターにダウンロードして分析することは許可されていません。データがローカルコンピューターにエクスポートされると、データに対して実行された操作は監査できません。不適切なデータの使用や悪意のある攻撃は、データの誤用や侵害につながる可能性があり、データセキュリティインシデントや否定的な世論を引き起こす可能性があります。このシナリオでは、データのエクスポートをリアルタイムでブロックする方法を示します。
目的
ユーザーが一度に 1,000 行を超えるデータをエクスポートしようとすると、システムは自動的に操作をブロックするか、承認プロセスをトリガーします。
前提条件
DataWorks Enterprise Edition が有効化されていること。このソリューションは、DataWorks Enterprise Edition のオープンプラットフォーム機能に基づいています。
EventBridge が有効化されており、ユーザー操作イベントのメッセージ本文を受信できること。これらのメッセージは、リスク動作拡張機能によって消費されます。
リスク動作拡張機能をデプロイするために、ECS インスタンスまたはオンプレミスのデータセンターが作成されていること。
ステップ 1: メッセージサブスクリプションの有効化と構成
クエリ結果のダウンロードはワークスペースレベルの操作ではないため、このユースケースではデフォルトのバスを使用して操作イベントのメッセージを受信します。
タイプが
dataworks:ResourcesDownload:DownloadResourcesのイベントをクエリします。[アクション] 列の [イベント詳細] をクリックして、イベントメッセージの本文を表示します。以下に例を示します。
{ "datacontenttype": "application/json;charset=utf-8", "aliyunaccountid": "110755000425****", "aliyunpublishtime": "2023-12-05T07:25:31.708Z", "data": { "eventCode": "download-resources", "extensionBizId": "audit_4d7ebb42b805428483148295a97a****", "extensionBizName": "DataWorks_IDE_Query_20231205152530.csv", "requestId": "77cac0c2fc12cecbf1d289128897****@@ac15054317017611303051804e****", "appId": ****, "tenantId": 52425742456****, "blockBusiness": true, "eventBody": { "sqlText": "SELECT * FROM table_1", "queryDwProjectId": "****", "moduleType": "develop_query", "operatorBaseId": "110755000425****", "datasourceId": "1****", "queryDwProjectName": "yongxunQA_emr_chen****", "dataRowSize": 4577, "datasourceName": "odps_source", "operatorUid": "110755000425****" }, "operator": "110755000425****" }, "aliyunoriginalaccountid": "110755000425****", "specversion": "1.0", "aliyuneventbusname": "default", "id": "169d171c-d523-4370-a874-bb0fa083****", "source": "acs.dataworks", "time": "2023-12-05T15:25:31.588Z", "aliyunregionid": "cn-chengdu", "type": "dataworks:ResourcesDownload:DownloadResources" }主要なパラメーターの説明:
パラメーター
説明
sqlText
SQL クエリ。
queryDwProjectId
クエリ対象のデータソースを含むワークスペースの ID。
moduleType
ダウンロードソース。有効な値:
develop_query: データ開発でのクエリ。
sqlx_query: データ分析でのクエリ。
dw_excel: データ分析でのワークブック。
operatorBaseId
オペレーターの UID。
datasourceId
クエリ対象のデータソースの ID。
queryDwProjectName
クエリ対象のデータソースを含むワークスペースの名前。
dataRowSize
ダウンロードするデータ行数。
datasourceName
クエリ対象のデータソースの名前。
ステップ 2: 拡張機能の開発とデプロイ
準備:
メッセージサブスクリプションを有効にし、拡張機能を登録し、拡張機能の開発に必要な情報を取得します。詳細については、「拡張機能の開発とデプロイ: 自己管理型サービス」をご参照ください。
拡張機能の開発とデプロイ。
取得した情報に基づいて、アプリケーションサービスとして拡張機能を開発およびデプロイします。詳細については、「拡張機能の開発とデプロイ: Function Compute」をご参照ください。主要なパラメーター設定とサンプルコードを以下に示します。
拡張機能を登録する際、[処理拡張ポイント] で [データダウンロードの事前イベント] を選択します。
以下のコードは、拡張機能を開発するためのサンプルです。
重要このサンプル拡張機能は、ステップ 1 のイベントメッセージ本文の
dataRowSizeフィールドを使用して、ダウンロードする行数に基づいてリスクをチェックします。応答を構成する際、承認プロセスをトリガーしたい場合は、拡張機能がリスクのあるユーザーの動作を検出したときに
callbackExtensionRequest.setCheckResult()からWARNを返すようにします。操作をブロックしたい場合は、callbackExtensionRequest.setCheckResult()がFAILを返す必要があります。このトピックの拡張コードでは、1,000 件のデータレコードのダウンロードを例として使用しています。ダウンロードされるレコード数に応じて異なる承認フローをトリガーしたい場合は、複数の拡張機能を構成できます。詳細については、「ステップ 3: リスク識別ルールを構成する」をご参照ください。例:
最初の拡張機能は、0 から 2,000 レコードのダウンロードに対してのみトリガーされ、承認フロー 1 に対応します。
2 番目の拡張機能は、2,001 以上のレコードのダウンロードに対してのみトリガーされ、承認フロー 2 に対応します。
package com.aliyun.dataworks.demo; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.aliyun.dataworks.config.Constants; import com.aliyun.dataworks.config.EventCheckEnum; import com.aliyun.dataworks.config.ExtensionParamProperties; import com.aliyun.dataworks.services.DataWorksOpenApiClient; import com.aliyun.dataworks_public20200518.Client; import com.aliyun.dataworks_public20200518.models.CallbackExtensionRequest; import com.aliyun.dataworks_public20200518.models.CallbackExtensionResponse; import com.aliyun.dataworks_public20200518.models.GetOptionValueForProjectRequest; import com.aliyun.dataworks_public20200518.models.GetOptionValueForProjectResponse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author dataworks demo */ @RestController @RequestMapping("/extensions") public class ExtensionsController { @Autowired(required = false) private DataWorksOpenApiClient dataWorksOpenApiClient; @Autowired private ExtensionParamProperties extensionParamProperties; /** * EventBridge からプッシュされたメッセージを受信します。 * * @param jsonParam */ @PostMapping("/consumer") public void consumerEventBridge(@RequestBody String jsonParam) { JSONObject jsonObj = JSON.parseObject(jsonParam); String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED); if (Constants.COMMIT_FILE_EVENT_CODE.equals(eventCode) || Constants.DEPLOY_FILE_EVENT_CODE.equals(eventCode)) { // クライアントを初期化します。 Client client = dataWorksOpenApiClient.createClient(); try { // 現在のイベントのパラメーターに関する情報。 String messageId = jsonObj.getString("id"); JSONObject data = jsonObj.getObject("data", JSONObject.class); // Long projectId = data.getLong("appId"); // イベントコールバックを初期化します。 CallbackExtensionRequest callbackExtensionRequest = new CallbackExtensionRequest(); callbackExtensionRequest.setMessageId(messageId); callbackExtensionRequest.setExtensionCode(extensionParamProperties.getExtensionCode()); JSONObject eventBody = data.getJSONObject("eventBody"); Long dataRowSize = eventBody.getLong("dataRowSize"); // プロジェクト内の拡張機能オプションの構成を取得します。 GetOptionValueForProjectRequest getOptionValueForProjectRequest = new GetOptionValueForProjectRequest(); // グローバル拡張ポイントイベントの構成情報のデフォルトのプロジェクト ID は -1 です。 getOptionValueForProjectRequest.setProjectId("-1"); getOptionValueForProjectRequest.setExtensionCode(extensionParamProperties.getExtensionCode()); GetOptionValueForProjectResponse getOptionValueForProjectResponse = client.getOptionValueForProject(getOptionValueForProjectRequest); JSONObject jsonObject = JSON.parseObject(getOptionValueForProjectResponse.getBody().getOptionValue()); // 注: このパラメーターは、DataWorks で設定されたフォーマットに基づいて構成する必要があります。 Long maxDataRowSize = jsonObject.getLong("dataRowSize"); // コードに制限された機能が含まれているかどうかを確認します。 if (dataRowSize > 1000) { callbackExtensionRequest.setCheckResult(EventCheckEnum.FAIL.getCode()); callbackExtensionRequest.setCheckMessage("ダウンロードする行数が制限を超えています。"); } else { // コールバック成功。 callbackExtensionRequest.setCheckResult(EventCheckEnum.OK.getCode()); } // DataWorks にコールバックします。 CallbackExtensionResponse acsResponse = client.callbackExtension(callbackExtensionRequest); // リクエストの一意の ID。後続のトラブルシューティングに使用されます。 System.out.println("acsResponse:" + acsResponse.getBody().getRequestId()); } catch (Exception e) { // エラーの説明。 System.out.println("ErrMsg:" + e.getMessage()); } } else { // 他のイベントのフィルターに失敗しました。構成ステップを確認してください。 System.out.println("Failed to filter other events. Check the configuration steps."); } } }
ステップ 3: リスク識別ルールを構成する
セキュリティセンターページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、[セキュリティセンターへ] をクリックします。
左側のナビゲーションウィンドウで、 をクリックします。
公開された拡張機能の承認プロセスを構成します。詳細については、「リスク応答を構成する」をご参照ください。

ステップ 4: リスク識別ルールを有効にする
プロンプトに従って [有効化] スイッチをオンにして、拡張機能を有効にします。
ステップ 5: 結果を検証する
[データダウンロード] ページに移動します。
指定したファイルの [アクション] 列にある [ダウンロード] をクリックします。
チェックに合格した場合、ダウンロードを続行できます。
チェックに失敗した場合、ダウンロードはブロックされるか、リクエストの送信を求められます。
他の同様のシナリオ
ワークスペース名、SQL の詳細、データソース名、ユーザー UID など、ダウンロードイベントの他のフィールドを、他のリアルタイムリスク管理シナリオに使用できます。例:
ユーザーの部署 (ワークスペース) に基づいてデータのダウンロードを許可または拒否する。
クエリ SQL に機密フィールドが含まれている場合にダウンロードをブロックする。
階層的なリスク管理を実装する。たとえば、20,000 レコードを超えるダウンロードには承認を要求し、50,000 レコードを超えるダウンロードはブロックする。
ワークスペースのロールに基づいてダウンロード制限を定義する。たとえば、開発者ロールには N レコードのダウンロードを許可し、この制限を超えるダウンロードはブロックします。また、アナリストロールには M レコードのダウンロードを許可し、この制限を超えるダウンロードはブロックすることもできます。これには、ListProjectMembers - ワークスペースメンバーのクエリ API 操作を使用する必要があります。
データ開発とデータ分析のシナリオに対して、異なるダウンロード数量ポリシーを設定する。