Alibaba Cloud DataWorks のデータ統合における「ベクトル化」機能は、Object Storage Service (OSS)、MaxCompute、HDFS などの異種ソースからデータを抽出し、それをベクトルに変換して、ベクトル対応の宛先へ書き込むことを可能にします。これらの宛先には、Milvus、Elasticsearch、OpenSearch などのベクトルデータベースおよび Hologres のベクトルテーブルが含まれます。この機能により、ETL プロセスが大幅に簡素化され、ナレッジのベクトル化が加速し、検索拡張生成 (RAG) などの AI アプリケーションの実装を支援します。
ベクトル化の必要性
大規模言語モデル (LLM) 技術が進化する中、企業固有のナレッジをモデルに統合することは、ビジネス価値創出において不可欠です。そのために重要な手法として、検索拡張生成 (RAG) が注目されています。RAG は、データをベクトル表現にエンコードし、ベクトルデータベースを用いて効率的に取得する仕組みです。このアプローチにより、LLM は正確で権威性のある、かつ動的に更新可能なドメイン知識を活用できます。
お客様の業務データは、Object Storage Service (OSS)、MaxCompute、HDFS、MySQL、Oracle、メッセージキューなど、さまざまなデータソースに散在している可能性があります。これらのデータをベクトル埋め込みに変換し、Milvus、OpenSearch、Elasticsearch のベクトルデータベースや Hologres のベクトルテーブルといった宛先へ読み込む必要があります。このプロセスでは、複雑な ETL スクリプトの記述や、多様なソースデータ形式への対応が必要となることが多く、抽出、変換、ベクトル化(埋め込み)、読み込みという複数のステージを経るため、長く密結合されたワークフローとなり、モデルの反復サイクルが大幅に延長されるおそれがあります。
機能概要
DataWorks のデータ統合では、ベクトル化機能がサポートされました。これにより、ベクトルデータベースへデータを抽出・ベクトル化・書き込みする一連のエンドツーエンド自動パイプラインを、単一のタスクで構成できます。この機能により、開発の複雑さが低減され、ナレッジベースの更新遅延が短縮され、RAG、インテリジェントカスタマーサポート、検索・レコメンデーションシステムといったユースケースでの効率的なナレッジ統合が実現されます。
データ統合では、ベクトル化バッチ同期タスクの構成に以下の 2 つのモードを提供しています。
制限事項
この機能は、新バージョンのデータ開発が有効化されたワークスペースでのみ利用可能です。
この機能は、サーバーレスリソースグループのみをサポートします。
この機能は、現在、特定のバッチ同期のソースおよび宛先タイプにのみ対応しています。
課金
AI 機能を活用した処理を行うデータ統合タスクの場合、データ統合タスク自体と、大規模言語モデル (LLM) への呼び出しに対してそれぞれ課金されます。詳細については、「データ統合のシナリオ別課金」をご参照ください。課金の詳細は以下のとおりです。
Alibaba Cloud DataWorks Model Service の場合:「サーバーレスリソースグループの課金 - LLM サービス」をご参照ください。
Alibaba Cloud Bailian プラットフォームの場合:「モデル推論(呼び出し)の課金」をご参照ください。
Alibaba Cloud PAI Model Gallery の場合:「EAS の課金」をご参照ください。
事前準備
新バージョンのデータ開発が有効化されたワークスペースを作成します。
サーバーレスリソースグループを作成し、それをワークスペースにバインドします。
AI 機能を活用した処理に必要な LLM サービスを設定します。手順はモデルプロバイダーによって異なります。
Alibaba Cloud DataWorks Model Service:「大規模モデルサービスの管理」コンソールでモデルをデプロイし、モデルサービスを起動します。
Alibaba Cloud Bailian プラットフォーム:「Bailian」を有効化し、「API キーの取得」を行います。
Alibaba Cloud PAI Model Gallery:「Platform for AI (PAI)」を有効化し、モデルサービス用のトークンを取得します。
バッチ同期タスクに必要なソースおよび宛先のデータソースを作成します。
本チュートリアルでは、ソースとして MaxCompute、宛先として Milvus を使用します。そのため、MaxCompute データソースおよび Milvus データソースの両方を作成する必要があります。
テストデータの準備
本チュートリアルで使用するテーブルデータは、公開データセット(E コマース製品レビューセンチメント予測データセット)から取得しています。ユーザーのレビューをベクトル化し、その後の類似検索のために Milvus へ同期します。
MaxCompute ソース側でテストテーブルを作成し、テストデータを挿入します。
Milvus 宛先側では、ベクトル化されたデータを受け取るターゲットコレクションを作成します。コレクションの構造は以下のとおりです。
ターゲットコレクションでは、ID を自動増分で使用します。
パラメーター
型
説明
id
Int64
プライマリキーであり、自動増分です。
sentence
VarChar(32)
元のテキストを格納します。
sentence_e
FloatVector(128)
COSINE メトリックを使用した類似検索用のベクトルフィールドです。
ウィザードモード
本チュートリアルでは、Data 統合のウィザードモードを用いて、MaxCompute ソースからデータを読み込み、ベクトル埋め込みを作成し、Milvus へ同期するバッチ同期タスクを構成する方法を説明します。
ステップ 1:バッチ同期ノードの作成
DataWorks コンソールの「ワークスペース」ページに移動します。上部ナビゲーションバーで目的のリージョンを選択し、該当するワークスペースを見つけ、「操作」列のを選択します。
プロジェクトディレクトリで、をクリックします。ソースと宛先(本チュートリアルでは、ソースに MaxCompute、宛先に Milvus を指定)を構成し、ノードの名前を入力して、OK をクリックします。
ステップ 2:バッチ同期タスクの構成
基本情報を設定します。
データソース:ソースおよび宛先のデータソースを選択します。
タスク実行用リソースグループ:バッチ同期タスクを実行するリソースグループを選択します。このリソースグループは、現在のワークスペースにバインドされており、データソースとのネットワーク接続が確保されている必要があります。
データソースまたはリソースグループが利用できない場合は、「事前準備」の手順を完了していることを確認してください。
ソースを構成します。
以下は、本チュートリアルにおける MaxCompute データソースの主なパラメーターです。他のデータソースでは構成が異なる場合があります。各データソースに応じて、適切なパラメーターを設定してください。

パラメーター
説明
トンネルリソースグループ
トンネルクォータです。デフォルトでは、
パブリックネットワークリソースが選択されており、これは MaxCompute の無料クォータです。MaxCompute のデータ転送リソースの選択について詳しくは、「Data Transmission Service 専用リソースグループの購入と使用」をご参照ください。重要専用トンネルクォータが未払いまたは期限切れにより利用不可となった場合、タスクは自動的に
パブリックネットワークリソースに切り替わります。テーブル
同期するソーステーブルを選択します。
ソーステーブルが利用できない場合は、「テストデータの準備」の手順を完了していることを確認してください。
フィルター方法
パーティションフィルターおよびデータフィルターをサポートしています。
ソーステーブルがパーティションテーブルである場合、パーティション単位で同期するデータ範囲を選択できます。
ソーステーブルがパーティションテーブルでない場合、
WHERE句を設定して同期するデータ範囲をフィルターできます。
構成が正しいか確認するには、データプレビュー をクリックします。

データ処理を構成します。
データ処理オプションを有効化します。データ処理リストで、 をクリックして、データベクトル化 処理ノードを追加します。

データベクトル化ノードを構成します。
主なパラメーターの説明は以下のとおりです。説明データベクトル化ノードのパフォーマンスは、構成されたモデルのパフォーマンスに依存します。Alibaba Cloud Bailian プラットフォームが提供する Qwen モデルには QPS 制限があります。Alibaba Cloud PAI Model Gallery の場合、モデルを PAI-EAS 上にご自身でデプロイする必要がありますが、そのパフォーマンスはデプロイに使用したリソース仕様に依存します。
固定パラメーターを使用する場合、埋め込みモデルが生成するベクトル結果は決定論的です。このため、DataWorks データ統合では、同一の入力データに対して Least Frequently Used (LFU) キャッシュを活用して同期プロセスを最適化しています。このキャッシュにより、モデルへの重複呼び出しを回避し、処理パフォーマンスを向上させ、埋め込みコストを削減できます。
パラメーター
説明
モデルプロバイダー
LLM プロバイダーです。現在サポートされているプロバイダーは以下のとおりです:Alibaba Cloud DataWorks Model Service、Alibaba Cloud Bailian プラットフォーム、およびAlibaba Cloud PAI Model Galleryです。
モデル名
埋め込みモデルの名前です。必要に応じて選択します。
モデル API キー
モデルにアクセスするための API キーです。モデルプロバイダーから取得してください。
Alibaba Cloud Bailian プラットフォーム:「Bailian API キーの取得」をご参照ください。
Alibaba Cloud PAI Model Gallery:デプロイ済みの EAS タスクに移動し、オンラインデバッグページを開き、ヘッダーから Authorization パラメーターの値を取得します。この値を API キーとして使用します。
モデルエンドポイント
Alibaba Cloud PAI Model Gallery をモデルプロバイダーとして選択した場合、モデルのアクセスエンドポイントを構成する必要があります。
バッチサイズ
各バッチで処理するレコード数です。このパラメーターの有効性は、基盤となるモデルがバッチ処理をサポートしているかどうかに依存します。バッチ処理により、埋め込みパフォーマンスが向上し、コストが削減されます。デフォルト値は 10 です。
ベクトル化対象フィールドの選択
どのカラムをベクトル化するかを定義し、出力フィールド名を指定します。データ統合では、ソースの単一フィールドまたは複数のフィールドを連結したものをベクトル化できます。
ベクトル化結果フィールド
出力ベクトルを格納する新しいフィールドの名前です。
ベクトル次元
出力ベクトルの次元です。これは、選択したモデルがサポートする次元と一致している必要があります。デフォルト値は 1024 です。
NULL 値を空文字列に変換
LLM は NULL 値を処理できません。ソースデータに NULL 値が含まれる場合、このオプションを有効化して NULL 値を空文字列に変換することで、エラーを回避できます。デフォルトでは無効です。
フィールド名の連結
ベクトル化時にフィールド名をテキストに連結するかどうかを指定します。有効化した場合、フィールド名デリミタ も構成する必要があります。デフォルトでは無効です。
空フィールドのスキップ
複数のフィールドを連結してベクトル化する場合、空フィールドをスキップするかどうかを指定します。デフォルトでは有効です。
出力データのプレビュー
データベクトル化ノードの構成エリア右上にある 出力データのプレビュー をクリックし、プレビュー をクリックして、ベクトル化された出力をプレビューし、構成を検証します。
バッチ同期編集ページの上部にある ダミー実行 をクリックしても、ベクトル化結果をプレビューできます。

[送信先] を設定します。
以下は、本チュートリアルにおける Milvus データソースの主なパラメーターです。他のデータソースでは構成が異なる場合があります。各データソースに応じて、適切なパラメーターを設定してください。

パラメーター
説明
コレクション
ベクトルデータを受け取るコレクションです。
パーティションキー
任意項目です。コレクションがパーティション化されている場合、受信するベクトルデータのパーティションを指定できます。
書き込みモード
upsert:
コレクションで自動 ID が有効になっていない場合:プライマリキーに基づいてコレクション内のエンティティを更新します。
コレクションで自動 ID が有効になっている場合:エンティティのプライマリキーを自動生成されたものに置き換え、データを挿入します。
insert:通常、自動 ID が有効になっているコレクションへのデータ挿入に使用されます。Milvus が自動的にプライマリキーを生成します。
insertを自動 ID が有効になっていないコレクションで使用すると、重複データが発生する可能性があります。
宛先フィールドマッピングを構成します。
ソース、処理、宛先の構成が完了すると、バッチ同期タスクは自動的にフィールドマッピングを生成します。宛先は固定スキーマを持たないデータソースであるため、デフォルトでは位置によるマッピングが行われます。不要なフィールドを削除したり、マッピング順序を調整したりするには、ソースフィールドまたは宛先フィールドの横にある 編集 をクリックします。
たとえば、本チュートリアルでは不要なフィールドを手動で削除しています。調整後のマッピングは以下のとおりです。

詳細設定を構成します。
ノード構成ページの右側にある 詳細設定 をクリックします。タスクの同時実行数、同期レート、不正データポリシーなどのパラメーターを必要に応じて設定できます。
ステップ 3:実行とデバッグ
バッチ同期ノード編集ページの右側にある Run Configuration をクリックします。デバッグ実行用に リソースグループ および スクリプトパラメーター を設定し、ツールバー上部の 実行 をクリックしてパイプラインを実行します。
Milvus で、宛先コレクションのデータが正しいことを確認します。
ステップ 4:スケジューリングの構成と公開
バッチ同期タスクの右側にある スケジューリング をクリックします。定期実行用に「スケジューリング構成」パラメーターを設定します。その後、ツールバー上部の 公開 をクリックして公開パネルを開き、画面の指示に従って「公開」プロセスを完了します。
スクリプトモード
本チュートリアルでは、Data 統合のスクリプトモードを用いて、MaxCompute ソースからデータを読み込み、ベクトル埋め込みを作成し、Milvus へ同期するバッチ同期タスクを構成する方法を説明します。
ステップ 1:バッチ同期ノードの作成
DataWorks コンソールの「ワークスペース」ページに移動します。上部ナビゲーションバーで目的のリージョンを選択し、該当するワークスペースを見つけ、「操作」列のを選択します。
プロジェクトディレクトリで、をクリックします。ソースと宛先(本チュートリアルでは、ソースに MaxCompute、宛先に Milvus を指定)を構成し、ノードの名前を入力して、OK をクリックします。
ステップ 2:同期スクリプトの構成
バッチ同期ノードのツールバーにある
アイコンをクリックして、スクリプト編集モードに切り替えます。MaxCompute から Milvus へのバッチ同期タスクを構成します。
「付録 1:スクリプトモードのフォーマット」に従って、バッチ同期タスクの JSON スクリプトを構成します。本例のスクリプトは以下のとおりです。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "odps", "parameter": { "partition": [ "split=dev" ], "datasource": "MaxCompute_Source", "successOnNoPartition": true, "tunnelQuota": "default", "column": [ "sentence" ], "enableWhere": false, "table": "test_tb" }, "name": "Reader", "category": "reader" }, { "category": "flatmap", "stepType": "embedding-transformer", "parameter": { "modelProvider": "bailian", "modelName": "text-embedding-v4", "embeddingColumns": { "sourceColumnNames": [ "sentence" ], "embeddingColumnName": "sentence_e" }, "apiKey": "sk-****", "dimension": 128, "nullAsEmptyString": true }, "displayName": "sentence2emb", "description": "" }, { "stepType": "milvus", "parameter": { "schemaCreateMode": "ignore", "enableDynamicSchema": true, "datasource": "Milvus_Source", "column": [ { "name": "sentence", "type": "VarChar", "elementType": "None", "maxLength": "32" }, { "name": "sentence_e", "type": "FloatVector", "dimension": "128", "elementType": "None", "maxLength": "65535" } ], "writeMode": "insert", "collection": "Milvus_Collection", "batchSize": 1024, "columnMapping": [ { "sourceColName": "sentence", "dstColName": "sentence" }, { "sourceColName": "sentence_e", "dstColName": "sentence_e" } ] }, "name": "Writer", "category": "writer" } ], "setting": { "errorLimit": { "record": "0" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }Reader および Writer セクションのパラメーターの説明については、「MaxCompute データソース」および「Milvus データソース」をご参照ください。
他のタイプのデータソースおよび宛先を使用する場合は、「データソース一覧」をご参照ください。
以下の表は、データベクトル化トランスフォーマー (
embedding-transformer) のスクリプトパラメーターについて説明しています。パラメーター
説明
必須
modelProvider
LLM プロバイダーです。現在サポートされているプロバイダーは以下のとおりです。
dataworksModelService:DataWorks の「大規模モデルサービス」を介してデプロイされたモデルサービスです。
bailian:Qwen モデルをサポートする Alibaba Cloud Bailian プラットフォームです。
paiModelGallery:BGE-M3 モデルをサポートする Alibaba Cloud PAI Model Gallery です。
はい
modelName
埋め込みモデルの名前です。
modelProvider が bailian の場合、
text-embedding-v4またはtext-embedding-v3を選択できます。modelProvider が paiModelGallery の場合、
bge-m3を選択できます。
はい
apiKey
モデルにアクセスするための API キーです。モデルプロバイダーから取得してください。
はい
endpoint
modelProvider が paiModelGallery の場合、モデルのアクセスエンドポイント(Endpoint API アドレス)を構成する必要があります。
いいえ
batchSize
各バッチで処理するレコード数です。このパラメーターの有効性は、基盤となるモデルがバッチ処理をサポートしているかどうかに依存します。バッチ処理により、埋め込みパフォーマンスが向上し、コストが削減されます。デフォルト値は 10 です。
いいえ
embeddingColumns
どのカラムをベクトル化するかを定義し、出力フィールド名を指定します。データ統合では、ソースの単一フィールドまたは複数のフィールドを連結したものをベクトル化できます。
例:
{ "embeddingColumns": { "sourceColumnNames": [ "col1", "col2" ], "embeddingColumnName": "my_vector" } }はい
appendDelimiter
複数のフィールド値をベクトル化用の単一テキストに連結する際に使用するデリミタです。デフォルト値は
\nです。いいえ
skipEmptyValue
複数のフィールドをベクトル化する際に、空フィールドをスキップするかどうかを指定します。デフォルト値は
falseです。いいえ
dimension
出力ベクトルの次元です。これは、選択したモデルがサポートする次元と一致している必要があります。デフォルト値は 1024 です。
いいえ
nullAsEmptyString
LLM は埋め込み処理に NULL 値を許可していません。ソースデータに NULL 値が含まれる場合、このオプションを有効化して NULL 値を空文字列に変換することで、エラーを回避できます。デフォルト値は
falseです。いいえ
appendFieldNameEnable
ベクトル化時に元のデータにフィールド名を連結するかどうかを指定します。有効化した場合、appendFieldNameDelimiter も構成する必要があります。デフォルト値は
falseです。いいえ
appendFieldNameDelimiter
フィールド名を連結する際に使用するデリミタです。これは appendFieldNameEnable が有効化されている場合にのみ有効です。
いいえ
ダミー実行を実行します。
バッチ同期ノード編集ページの上部にある ダミー実行 をクリックし、取得開始 および プレビュー をクリックして、ベクトル化された結果を表示し、構成が正しいことを確認します。

詳細設定を構成します。
ノード構成ページの右側にある 詳細設定 をクリックします。タスクの同時実行数、同期レート、不正データポリシーなどのパラメーターを必要に応じて設定できます。
ステップ 3:実行とデバッグ
バッチ同期ノード編集ページの右側にある Run Configuration をクリックします。デバッグ実行用に リソースグループ および スクリプトパラメーター を設定し、ツールバー上部の 実行 をクリックしてパイプラインを実行します。
Milvus で、宛先コレクションのデータが正しいことを確認します。
ステップ 4:スケジューリングの構成と公開
バッチ同期タスクの右側にある スケジューリング をクリックします。定期実行用に「スケジューリング構成」パラメーターを設定します。その後、ツールバー上部の 公開 をクリックして公開パネルを開き、画面の指示に従って「公開」プロセスを完了します。
付録 1:スクリプトモードのフォーマット
スクリプトモードの基本構造は以下のとおりです。
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "xxx",
"parameter": {
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "xxx",
"parameter": {
},
"name": "transformer1",
"category": "map/flatmap"
},
{
"stepType": "xxx",
"parameter": {
},
"name": "transformer2",
"category": "map/flatmap"
},
{
"stepType": "xxx",
"parameter": {
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
}
}steps 配列は、パイプライン内の各処理オペレーターを定義します。Reader および Writer は最低 1 つずつ必要であり、その間に任意の数の Transformer オペレーターを配置できます。並列度を 2 に設定した場合、ジョブは 2 つの並列データ処理ストリームで実行されます。各オペレーター(Reader、Transformer、Writer)は step オブジェクトとして定義されます。

steps では、各オペレーターのタイプおよびパラメーターを定義します。データ同期および処理は、JSON 構成内の各 step の順序に厳密に従って実行されます。
Data 統合でサポートされる各種データソースの Reader および Writer チャネルの詳細なパラメーター構成については、「サポートされるデータソースおよび同期ソリューション」をご参照ください。
付録 2:OSS から Milvus への例
この例では、Object Storage Service (OSS) から JSONL データを Milvus へ同期する方法を示します。パイプラインは JSON を解析し、指定されたフィールドをベクトル化し、最終的なデータを Milvus へ書き込みます。完全な JSON 構成例は以下のとおりです。
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "oss",
"parameter": {
"datasource": "${Your_OSS_Data_Source_Name}",
"column": [
{
"name": "chunk_text",
"index": 0,
"type": "string"
}
],
"fieldDelimiter": ",",
"encoding": "UTF-8",
"fileFormat": "jsonl",
"object": [
"embedding/chunk1.jsonl"
]
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "json-extracting",
"parameter": {
"column": [
{
"name": "text",
"fromColumn": "chunk_text",
"jsonPath": "$.text",
"type": "STRING",
"nullOrInvalidDataAction": "DIRTY_DATA"
}
]
},
"name": "jsonextract",
"category": "flatmap"
},
{
"stepType": "embedding-transformer",
"parameter": {
"modelProvider": "bailian",
"modelName": "text-embedding-v4",
"apiKey": "${Your_API_Key}",
"embeddingColumns": {
"sourceColumnNames": [
"text"
],
"embeddingColumnName": "my_vector"
},
"batchSize": 8,
"dimension": 1024
},
"name": "embedding",
"category": "flatmap"
},
{
"stepType": "milvus",
"parameter": {
"schemaCreateMode": "ignore",
"enableDynamicSchema": true,
"datasource": "${Your_Milvus_Data_Source_Name}",
"column": [
{
"name": "my_vector",
"type": "FloatVector",
"dimension": "1024",
"elementType": "None",
"maxLength": "65535"
},
{
"name": "text",
"type": "VarChar",
"elementType": "None",
"maxLength": "65535"
}
],
"collection": "yunshi_vector_07171130",
"writeMode": "insert",
"batchSize": 1024,
"columnMapping": [
{
"sourceColName": "my_vector",
"dstColName": "my_vector"
},
{
"sourceColName": "text",
"dstColName": "text"
}
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 1
}
}
}
> ノードの作成 > データ統合 > バッチ同期