Alibaba Cloud DataWorks データ統合は、埋め込みベクトル化機能を導入しました。この機能を使用すると、OSS、MaxCompute、Hadoop 分散ファイルシステム (HDFS) などの異種データソースからデータを抽出し、ベクトルに変換できます。その後、ベクトルを Milvus、Elasticsearch、OpenSearch などのベクトルデータベースや Hologres ベクトルテーブルなど、ベクトルストレージ機能を持つ宛先に書き込むことができます。これにより、抽出・変換・書き出し (ETL) プロセスが簡素化され、知識が効率的にベクトル化され、検索拡張生成 (RAG) などの AI シナリオの実装に役立ちます。
埋め込みを使用する理由
大規模言語モデル (LLM) 技術が進化し続ける中で、プライベートナレッジをモデルシステムに深く統合することは、企業がこれらのモデルを効果的に適用し、価値を創造するための鍵となります。検索拡張生成 (RAG) は、この目的のための主要な技術的アプローチとなっています。データをベクトル表現にエンコードし、ベクトルデータベースを使用して効率的な取得を行うことで、RAG は LLM に正確で、信頼でき、動的に更新されるドメイン知識を提供します。
ビジネスデータは、OSS、MaxCompute、HDFS、MySQL、Oracle、メッセージキューなど、多くの異種データソースに散在している可能性があります。このデータは、埋め込みを使用してベクトル化し、Milvus、OpenSearch、Elasticsearch などのベクトルデータベースや Hologres ベクトルテーブルなど、ベクトルストレージ機能を持つさまざまな宛先に書き込む必要があります。このプロセスでは、複雑な ETL スクリプトを作成し、さまざまなソースデータ型に適応させる必要があります。データは、抽出、変換、ベクトル化 (埋め込み)、書き込みなど、複数の段階を経ます。この長く、密結合したワークフローは、モデルの反復サイクルを大幅に延長させます。
機能紹介
DataWorks データ統合は、埋め込みベクトル化をサポートしています。単一のデータチャネル内でデータを抽出、ベクトル化し、ベクトルデータベースに書き込むためのワンストップソリューションを提供し、エンドツーエンドの自動処理を可能にします。この機能により、開発の複雑さが大幅に軽減され、知識の更新レイテンシーが短縮され、RAG、インテリジェントなカスタマーサービス、検索と推奨などのシナリオで効率的に知識を取り込むことができます。
Data Integration のオフラインデータ同期のための埋め込みベクトル化機能は、2 つの構成モードをサポートしています。
コードレス UI 構成: ビジュアルインターフェイスを使用して、オフライン埋め込み同期を迅速に構成できます。
コードエディター構成: コードエディターは、より複雑で高度な構成機能をサポートしています。コードエディターを使用して、カスタムニーズに合わせてさまざまな同期パイプラインを構成できます。
制限事項
この機能は、新バージョンのデータ開発が有効になっているワークスペースでのみ利用できます。
Serverless リソースグループのみがサポートされています。
この機能は現在、一部のオフライン同期チャネルでのみ利用可能です。
課金
AI 支援処理を使用するデータ統合タスクでは、データ統合タスク自体のコストに加えて、大規模言語モデルの呼び出しによるコストが発生します。詳細については、「データ統合シナリオ」をご参照ください。
Alibaba Cloud DataWorks モデルサービスの課金の詳細については、「Serverless リソースグループの課金 - 大規模言語モデルサービス」をご参照ください。
Alibaba Cloud Model Studio の課金の詳細については、「モデル推論 (呼び出し) の課金」をご参照ください。
Alibaba Cloud PAI モデルマーケットプレイスの課金の詳細については、「Elastic Algorithm Service (EAS) の課金」をご参照ください。
準備
新バージョンのデータ開発が有効になっているワークスペースを作成します。
Serverless リソースグループを作成し、ワークスペースにアタッチします。
AI 支援処理に必要な大規模言語モデルサービスを準備します。準備は、選択した大規模言語モデルサービスプロバイダーによって異なります。
Alibaba Cloud DataWorks モデルサービス: 大規模言語モデルサービス管理でモデルをデプロイし、モデルサービスを開始します。
Alibaba Cloud Model Studio: Alibaba Cloud Model Studio をアクティブ化し、API キーを取得して構成します。
Alibaba Cloud PAI モデルマーケットプレイス: Platform for AI (PAI) をアクティブ化し、モデルサービスのトークンを取得します。
オフライン同期タスクに必要なソースと宛先のデータソースを作成します。
このチュートリアルでは、MaxCompute をソースとして、Milvus を宛先として使用します。したがって、まず MaxCompute データソースと Milvus データソースを作成する必要があります。
テストデータの準備
このチュートリアルで使用されるテーブルデータは、公開データセット (E コマース製品レビュー感情予測データセット) から取得したものです。製品のユーザーレビューをベクトル化し、その後の類似検索のために Milvus に同期します。
MaxCompute ソース側: テストテーブルを作成し、テストデータを挿入します。
Milvus 宛先側: ベクトル化されたデータを受け取るための宛先テーブルを作成します。テーブルスキーマは次のとおりです。
宛先テーブルでは autoid 機能が有効になっています。
フィールド名
タイプ
説明
id
Int64
プライマリキー。自動インクリメント。
sentence
VarChar(32)
生テキストを格納します。
sentence_e
FloatVector(128)
類似検索用のベクトルフィールド。COSINE メジャーを使用します。
コードレス UI 構成
このチュートリアルでは、MaxCompute (ODPS) ソースからデータを読み取り、埋め込みを使用してベクトル化し、Milvus に同期する例を使用します。Data Integration でコードレス UI を使用してオフライン同期タスクを構成する方法を示します。
1. オフライン同期ノードの作成
DataWorks コンソールの ワークスペースページに移動します。上部のナビゲーションバーで、目的のリージョンを選択します。目的のワークスペースを見つけ、[操作] 列で を選択します。
プロジェクトフォルダで、 をクリックします。[データソースと宛先] (このチュートリアルではソース: MaxCompute、宛先: Milvus) とノードの [名前] を構成します。次に、[確認] をクリックします。
2. オフライン同期タスクの構成
基本情報を構成します。
[データソース]: ソースと宛先のデータソースを選択します。
[実行用リソースグループ]: オフライン同期タスクを実行するリソースグループを選択します。リソースグループは現在のワークスペースにアタッチされ、データソースに接続できる必要があります。
利用可能なデータソースまたはリソースグループがない場合は、準備が完了していることを確認してください。
[データソース] を構成します。
以下は、このチュートリアルの MaxCompute データソースの主要なパラメーターです。別のデータソースを使用する場合、構成が異なる場合があります。必要に応じてパラメーターを構成できます。

パラメーター
説明
[トンネルリソースグループ]
Tunnel Quota のデフォルト値は
パブリック伝送リソースで、これは MaxCompute の無料クォータです。データ伝送リソースの選択の詳細については、「専用の Data Transmission Service リソースグループの購入と使用」をご参照ください。重要支払い遅延や有効期限切れにより専用の Tunnel Quota が利用できなくなった場合、実行中のジョブは自動的に
パブリック伝送リソースに切り替わります。[テーブル]
同期するソーステーブルを選択します。
利用可能なソーステーブルがない場合は、テストデータを準備したことを確認してください。
[フィルター方法]
[パーティションフィルタリング] と [データフィルタリング] をサポートしています。
ソーステーブルがパーティションテーブルの場合、パーティションごとに同期するデータを選択できます。
ソーステーブルが非パーティションテーブルの場合、
WHERE句を使用して同期するデータを選択できます。
[データプレビュー] をクリックして、構成が正しいかどうかを確認します。

[データ処理] を構成します。
データ処理スイッチを有効にします。次に、[データ処理リスト] で をクリックして、[データベクトル化] 処理ノードを追加します。

データベクトル化ノードを構成します。
主要なパラメーターは次のとおりです。説明データベクトル化ノードのパフォーマンスは、構成されたモデルのパフォーマンスに依存します。Alibaba Cloud Model Studio が提供する QWen モデルには、クエリ/秒 (QPS) の制限があります。Alibaba Cloud PAI モデルマーケットプレイスの場合、モデルを PAI Elastic Algorithm Service (EAS) にデプロイする必要があります。そのパフォーマンスは、デプロイメントに使用されるリソース仕様に依存します。
特定のパラメーターセットに対して、埋め込みモデルによって生成されるベクトルは決定論的です。したがって、DataWorks データ統合は、同期中に同一の生データに対して Least Frequently Used (LFU) キャッシュを使用します。この最適化により、同じデータに対する埋め込みモデルの冗長な呼び出しが回避され、処理パフォーマンスが向上し、埋め込みコストが削減されます。
パラメーター
説明
[モデルプロバイダー]
大規模言語モデルプロバイダー。現在、Alibaba Cloud DataWorks モデルサービス、Alibaba Cloud Model Studio、Alibaba Cloud PAI モデルマーケットプレイス がサポートされています。
[モデル名]
埋め込みモデルの名前。必要に応じて選択します。
[モデル API キー]
モデルにアクセスするための API キー。このキーはモデルプロバイダーから取得します。
Alibaba Cloud Model Studio: Model Studio API キーの取得。
Alibaba Cloud PAI モデルマーケットプレイス: デプロイされた EAS タスクに移動し、オンラインデバッグを開きます。ヘッダーから Authorization パラメーターの値を取得します。この値を API キーとして入力します。
[モデルエンドポイント]
[モデルプロバイダー] を [Alibaba Cloud PAI モデルマーケットプレイス] に設定した場合、モデルにアクセスするためのエンドポイント (Endpoint API アドレス) を構成する必要があります。
[バッチサイズ]
ベクトル化のバッチサイズ。これは、埋め込みモデルがバッチ処理をサポートしているかどうかによって異なります。バッチ処理は、埋め込みのパフォーマンスを向上させ、コストを削減するのに役立ちます。デフォルト値は 10 です。
[ベクトル化するフィールドの選択]
ベクトル化する列を定義し、出力フィールドの名前を指定します。Data Integration は、単一のソースフィールドまたは複数のフィールドの連結のベクトル化をサポートしています。
[ベクトル化結果フィールド]
ソーステーブルフィールドをベクトル化した後に定義されるベクトル化されたフィールドの名前。
[ベクトルディメンション]
出力ベクトルのディメンション。構成された埋め込みモデルは、定義されたベクトルディメンションをサポートしている必要があります。デフォルト値は 1024 です。
[NULL を空の文字列に変換]
大規模言語モデルは、ベクトル化のために NULL データを許可しません。ソースデータに NULL 値が含まれている場合、それらを空の文字列に変換してベクトル化エラーを防ぐことができます。このオプションはデフォルトで無効になっています。
[フィールド名の連結]
ベクトル化のためにフィールド名をテキストと連結するかどうかを指定します。このオプションを選択した場合は、[フィールド名デリミタ] も構成する必要があります。このオプションはデフォルトで無効になっています。
[空のフィールドをスキップ]
ベクトル化のために複数のフィールドを連結する場合に、空のフィールドをスキップするかどうかを指定します。デフォルトでは、このオプションは選択されており、空のフィールドはスキップされます。
データ出力のプレビュー
データベクトル化ノード構成エリアの右上隅にある [データ出力プレビュー] をクリックします。次に、[プレビュー] をクリックしてベクトル化された結果を表示し、構成が正しいことを確認します。
オフライン同期編集ページの上部にある [シミュレーション実行] をクリックして、ベクトル化された結果をプレビューすることもできます。

[データ宛先] を構成します。
以下は、このチュートリアルの Milvus データソースの主要なパラメーターです。別のデータソースを使用する場合、構成が異なる場合があります。必要に応じてパラメーターを構成できます。

パラメーター
説明
[コレクション]
ベクトルデータを受け取るコレクション。
[パーティションキー]
オプション。コレクションがパーティション分割されている場合、受信したベクトルデータのパーティションを指定できます。
[書き込みモード]
[upsert]:
テーブルで autoid が有効になっていない場合: プライマリキーに基づいてコレクション内のエンティティを更新します。
テーブルで autoid が有効になっている場合: エンティティ内のプライマリキーを自動生成されたプライマリキーに置き換え、データを挿入します。
[insert]: 主に autoid が有効になっているテーブルにデータを挿入するために使用されます。Milvus は自動的にプライマリキーを生成します。
autoid が有効になっていないテーブルで
insertを使用すると、データが重複します。
[宛先フィールドマッピング] を構成します。
データソース、データ処理、データ宛先を構成すると、オフライン同期タスクは自動的にフィールドマッピングを生成します。宛先は固定スキーマのないデータソースであるため、マッピングはデフォルトで行ごとに実行されます。[ソースフィールド] または [宛先フィールド] の横にある [編集] をクリックして、マッピング順序を調整したり、不要なフィールドを削除したりして、マッピングが正しいことを確認します。
たとえば、このチュートリアルでは、不要なフィールドが手動で削除されます。調整後のマッピングは次のとおりです。

[詳細設定] を構成します。
ノード構成ページの右側にある [詳細設定] をクリックします。タスクの同時実行数、同期レート、ダーティデータポリシーなどのパラメーターを必要に応じて設定できます。
3. テスト実行
オフライン同期ノード編集ページの右側にある [デバッグ構成] をクリックします。テスト実行の [リソースグループ] と [スクリプトパラメーター] を設定します。次に、上部のツールバーで [実行] をクリックして、同期パイプラインが正常に実行されるかどうかをテストします。
Milvus に移動し、宛先コレクションのデータが期待どおりであるかどうかを確認します。
4. スケジューリングの構成と公開
オフライン同期タスクの右側にある [スケジューリング構成] をクリックします。定期実行のための スケジューリング構成 パラメーターを設定します。次に、上部のツールバーで [公開] をクリックします。公開パネルで、画面の指示に従ってタスクを 公開 します。
コードエディター構成
このチュートリアルでは、MaxCompute (ODPS) ソースからデータを読み取り、埋め込みを使用してベクトル化し、Milvus に同期する例を使用します。Data Integration でコードエディターを使用してオフライン同期タスクを構成する方法を示します。
1. オフライン同期ノードの作成
DataWorks コンソールの ワークスペースページに移動します。上部のナビゲーションバーで、目的のリージョンを選択します。目的のワークスペースを見つけ、[操作] 列で を選択します。
プロジェクトフォルダで、 をクリックします。[データソースと宛先] (このチュートリアルではソース: MaxCompute、宛先: Milvus) とノードの [名前] を構成します。次に、[確認] をクリックします。
2. 同期スクリプトの構成
オフライン同期ノードの上部にあるツールバーの
をクリックして、コードエディターに切り替えます。MaxCompute から Milvus へのオフライン同期タスクを構成します。
付録 1: コードエディターのフォーマット説明 に基づいて、オフライン同期タスクの JSON スクリプトを構成します。この例のスクリプトは次のとおりです。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "odps", "parameter": { "partition": [ "split=dev" ], "datasource": "MaxCompute_Source", "successOnNoPartition": true, "tunnelQuota": "default", "column": [ "sentence" ], "enableWhere": false, "table": "test_tb" }, "name": "Reader", "category": "reader" }, { "category": "flatmap", "stepType": "embedding-transformer", "parameter": { "modelProvider": "bailian", "modelName": "text-embedding-v4", "embeddingColumns": { "sourceColumnNames": [ "sentence" ], "embeddingColumnName": "sentence_e" }, "apiKey": "sk-****", "dimension": 128, "nullAsEmptyString": true }, "displayName": "sentence2emb", "description": "" }, { "stepType": "milvus", "parameter": { "schemaCreateMode": "ignore", "enableDynamicSchema": true, "datasource": "Milvus_Source", "column": [ { "name": "sentence", "type": "VarChar", "elementType": "None", "maxLength": "32" }, { "name": "sentence_e", "type": "FloatVector", "dimension": "128", "elementType": "None", "maxLength": "65535" } ], "writeMode": "insert", "collection": "Milvus_Collection", "batchSize": 1024, "columnMapping": [ { "sourceColName": "sentence", "dstColName": "sentence" }, { "sourceColName": "sentence_e", "dstColName": "sentence_e" } ] }, "name": "Writer", "category": "writer" } ], "setting": { "errorLimit": { "record": "0" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }Reader および Writer セクションのパラメーターの説明については、「MaxCompute データソース」および「Milvus データソース」をご参照ください。
他のタイプのソースと宛先を使用する場合は、「データソースリスト」をご参照ください。
スクリプト内のデータベクトル化処理ノードのパラメーターは、次のように説明されます。
パラメーター
説明
必須
モデルプロバイダー
大規模言語モデルプロバイダー。現在、以下のプロバイダーがサポートされています。
[dataworksModelService]: DataWorks 大規模言語モデルサービスを介してデプロイされたモデルサービス。
[bailian]: Alibaba Cloud Model Studio。QWen モデルをサポートします。
[paiModelGallery]: Alibaba Cloud PAI モデルマーケットプレイス。BGE-M3 モデルをサポートします。
はい
[modelName]
埋め込みモデルの名前。
[modelProvider] が [bailian] の場合、
text-embedding-v4またはtext-embedding-v3を選択できます。[modelProvider] が [paiModelGallery] の場合、
bge-m3を選択できます。
はい
[apiKey]
モデルにアクセスするための API キー。このキーはモデルプロバイダーから取得します。
はい
[endpoint]
[modelProvider] が [paiModelGallery] の場合、モデルにアクセスするためのエンドポイント (Endpoint API アドレス) を構成する必要があります。
いいえ
[batchSize]
ベクトル化のバッチサイズ。これは、埋め込みモデルがバッチ処理をサポートしているかどうかによって異なります。バッチ処理は、埋め込みのパフォーマンスを向上させ、コストを削減するのに役立ちます。デフォルト値は 10 です。
いいえ
[embeddingColumns]
ベクトル化する列を定義し、ベクトル化されたデータの出力フィールド名を指定します。Data Integration は、単一のソースフィールドまたは複数の連結されたフィールドの組み合わせのベクトル化をサポートしています。
例:
{ "embeddingColumns": { "sourceColumnNames": [ "col1", "col2" ], "embeddingColumnName": "my_vector" } }はい
[appendDelimiter]
複数のフィールド値を単一のテキストに連結してベクトル化するために使用されるデリミタ。デフォルト値は
\nです。いいえ
[skipEmptyValue]
ベクトル化のために複数のフィールドを連結する場合に、空のフィールドをスキップするかどうかを指定します。デフォルト値は False です。
いいえ
[dimension]
出力ベクトルのディメンション。構成された埋め込みモデルは、定義されたベクトルディメンションをサポートしている必要があります。デフォルト値は 1024 です。
いいえ
[nullAsEmptyString]
大規模言語モデルは、埋め込みのために NULL データを許可しません。ソースデータに NULL 値が含まれている場合、それらを空の文字列に変換してベクトル化エラーを防ぐことができます。デフォルト値は
Falseです。いいえ
[appendFieldNameEnable]
ベクトル化のために生データをフィールド名と連結するかどうかを指定します。有効にした場合、[appendFieldNameDelimiter] も構成する必要があります。デフォルト値は
Falseです。いいえ
[appendFieldNameDelimiter]
フィールド名を連結するためのデリミタ。このパラメーターは、[appendFieldNameEnable] が有効な場合にのみ有効になります。
いいえ
シミュレーション実行
オフライン同期ノード編集ページの上部にある [シミュレーション実行] をクリックします。次に、[コレクションの開始] と [プレビュー] をクリックして、ベクトル化された結果を表示し、構成が正しいことを確認します。

[詳細設定] を構成します。
ノード構成ページの右側にある [詳細設定] をクリックします。タスクの同時実行数、同期レート、ダーティデータポリシーなどのパラメーターを必要に応じて設定できます。
3. テスト実行
オフライン同期ノード編集ページの右側にある [デバッグ構成] をクリックします。テスト実行の [リソースグループ] と [スクリプトパラメーター] を設定します。次に、上部のツールバーで [実行] をクリックして、同期パイプラインが正常に実行されるかどうかをテストします。
Milvus に移動し、宛先コレクションのデータが期待どおりであるかどうかを確認します。
4. スケジューリングの構成と公開
オフライン同期タスクの右側にある [スケジューリング構成] をクリックします。定期実行のための スケジューリング構成 パラメーターを設定します。次に、上部のツールバーで [公開] をクリックします。公開パネルで、画面の指示に従ってタスクを 公開 します。
付録 1: コードエディターのフォーマット説明
コードエディターの基本的な構造は次のとおりです。
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "xxx",
"parameter": {
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "xxx",
"parameter": {
},
"name": "transformer1",
"category": "map/flatmap"
},
{
"stepType": "xxx",
"parameter": {
},
"name": "transformer2",
"category": "map/flatmap"
},
{
"stepType": "xxx",
"parameter": {
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
}
}steps 配列は、各プロセスのオペレーターを定義します。少なくとも 1 つの Reader と 1 つの Writer を含める必要があります。その間に複数の Transformer オペレーターを含めることができます。たとえば、同時実行数を 2 に設定すると、ジョブは 2 つの並列データ処理ストリームを持つことになります。各 Reader、Transformer、Writer は、タスク構成の step です。

steps 配列は、各オペレーターのタイプとパラメーターを定義します。データ同期と処理は、JSON 構成の各 step の順序に厳密に従います。
Data Integration がサポートするさまざまなデータソースの読み取りおよび書き込みチャネルの詳細なパラメーター構成については、「サポートされているデータソースと同期ソリューション」をご参照ください。
付録 2: OSS から Milvus へのデータ同期の例
この例では、OSS から JSONL フォーマットのデータを同期し、JSON データを解析し、JSON 内の指定されたフィールドをベクトル化し、最後にデータを Milvus に同期する方法を示します。完全な JSON 構成は次のとおりです。
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "oss",
"parameter": {
"datasource": "${OSS_Data_Source_Name}",
"column": [
{
"name": "chunk_text",
"index": 0,
"type": "string"
}
],
"fieldDelimiter": ",",
"encoding": "UTF-8",
"fileFormat": "jsonl",
"object": [
"embedding/chunk1.jsonl"
]
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "json-extracting",
"parameter": {
"column": [
{
"name": "text",
"fromColumn": "chunk_text",
"jsonPath": "$.text",
"type": "STRING",
"nullOrInvalidDataAction": "DIRTY_DATA"
}
]
},
"name": "jsonextract",
"category": "flatmap"
},
{
"stepType": "embedding-transformer",
"parameter": {
"modelProvider": "bailian",
"modelName": "text-embedding-v4",
"apiKey": "${Your_API_Key}",
"embeddingColumns": {
"sourceColumnNames": [
"text"
],
"embeddingColumnName": "my_vector"
},
"batchSize": 8,
"dimension": 1024
},
"name": "embedding",
"category": "flatmap"
},
{
"stepType": "milvus",
"parameter": {
"schemaCreateMode": "ignore",
"enableDynamicSchema": true,
"datasource": "${Milvus_Data_Source_Name}",
"column": [
{
"name": "my_vector",
"type": "FloatVector",
"dimension": "1024",
"elementType": "None",
"maxLength": "65535"
},
{
"name": "text",
"type": "VarChar",
"elementType": "None",
"maxLength": "65535"
}
],
"collection": "yunshi_vector_07171130",
"writeMode": "insert",
"batchSize": 1024,
"columnMapping": [
{
"sourceColName": "my_vector",
"dstColName": "my_vector"
},
{
"sourceColName": "text",
"dstColName": "text"
}
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 1
}
}
}
> [ノードの作成] > [データ統合] > [オフライン同期]