DataWorks は、OpenSearch データソースにデータを書き込むための OpenSearch Writer を提供します。このトピックでは、オフラインモードで OpenSearch データソースにデータを書き込む機能について説明します。
サポートされている OpenSearch のバージョン
OpenSearch V3 は、サードパーティ製パッケージを使用します。POM は com.aliyun.opensearch aliyun-sdk-opensearch 2.1.3 です。
OpenSearch Writer を使用するには、JDK 1.6-32 以降をインストールする必要があります。java-version
コマンドを実行して、JDK のバージョンを確認できます。
次の Alibaba Cloud OpenSearch エディションがサポートされています。業界アルゴリズム版、LLM ベースの会話型検索版、高性能検索版、ベクトル検索版、および検索エンジン版。
制限事項
OpenSearch Writer は、サーバーレスリソースグループとData Integration 専用リソースグループのみをサポートしています。OpenSearch Writer は、カスタムリソースグループをサポートしていません。サーバーレスリソースグループを使用することをお勧めします。
OpenSearch の列は順序付けされていません。OpenSearch Writer は、指定された列の順序に厳密に従ってデータを書き込みます。指定された列の数が OpenSearch の列の数よりも少ない場合、OpenSearch の超過列はデフォルト値または null に設定されます。
たとえば、OpenSearch テーブルに列 a、b、c が含まれており、列 b と c にデータを書き込む場合、column パラメーターを ["c","b"] に設定できます。この場合、OpenSearch Writer は、リーダーから取得したソースデータの 1 番目と 2 番目の列を OpenSearch テーブルの列 c と b にインポートします。OpenSearch テーブルの列 a は、デフォルト値または null に設定されます。
コードエディターのみを使用して、バッチ同期タスクを設定し、OpenSearch データソースにデータを同期できます。
データ型マッピング
OpenSearch Writer は、ほとんどの OpenSearch データ型をサポートしています。データベースのデータ型がサポートされていることを確認してください。次の表に、OpenSearch Writer がデータ型を変換するデータ型マッピングを示します。
カテゴリ | OpenSearch データ型 |
整数 | INT |
浮動小数点 | DOUBLE および FLOAT |
文字列 | TEXT、LITERAL、および SHORT_TEXT |
日付と時刻 | INT |
ブール値 | LITERAL |
データ同期タスクの開発
同期タスクのエントリポイントと設定手順については、次の設定ガイドを参照してください。
追加情報
列設定エラーの処理
冗長な列によるデータ損失を防ぎ、高いデータ信頼性を確保するために、書き込む列の数が宛先テーブルの列の数よりも多い場合、OpenSearch Writer はエラーを返します。たとえば、OpenSearch テーブルに列 a、b、c が含まれているとします。テーブルに 3 つ以上の列を書き込む必要がある場合、OpenSearch Writer はエラーを返します。
テーブル設定
OpenSearch Writer は、一度に 1 つのテーブルにのみデータを書き込むことができます。
タスクの再実行
ノードが再実行されると、ID に基づいてデータが上書きされます。したがって、OpenSearch に書き込まれるデータには ID 列が含まれている必要があります。ID は、OpenSearch の行の一意の識別子です。新しいデータと同じ ID を持つ既存のデータは上書きされます。
付録:コードとパラメーター
コードエディターを使用してバッチ同期タスクを設定する
コードエディターを使用してバッチ同期タスクを設定する場合は、統一スクリプト形式の要件に基づいて、スクリプトに関連パラメーターを設定する必要があります。詳細については、「コードエディターを使用してバッチ同期タスクを設定する」をご参照ください。次の情報は、コードエディターを使用してバッチ同期タスクを設定する場合にデータソースに設定する必要があるパラメーターについて説明しています。
業界アルゴリズム版、LLM ベースの会話型検索版、および高性能検索版の OpenSearch Writer のコード
{
"type": "job",
"version": "1.0",
"configuration": {
"reader": {},
"writer": {
"plugin": "opensearch",
"parameter": {
"accessId": "*********",
"accessKey": "********",
"host": "http://yyyy.aliyuncs.com",
"indexName": "datax_xxx",
"table": "datax_yyy",
"column": [
"appkey",
"id",
"title",
"gmt_create",
"pic_default"
],
"batchSize": 500,
"writeMode": add,
"version":"v2",
"ignoreWriteError": false
}
}
}
}
業界アルゴリズム版、LLM ベースの会話型検索版、および高性能検索版の OpenSearch Writer のコードのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
accessId | OpenSearch に接続するために使用するアカウントの AccessKey ID。 | はい | デフォルト値なし |
accessKey | OpenSearch に接続するために使用するアカウントの AccessKey シークレット。 | はい | デフォルト値なし |
host | OpenSearch のエンドポイント。Alibaba Cloud 管理コンソールでエンドポイントを取得できます。 | はい | デフォルト値なし |
indexName | OpenSearch プロジェクトの名前。 | はい | デフォルト値なし |
table | データを書き込むテーブルの名前。Data Integration は一度に複数のテーブルにデータをインポートできないため、1 つのテーブルのみを指定できます。 | はい | デフォルト値なし |
column | データを書き込む列の名前。宛先テーブルのすべての列にデータを書き込む場合は、このパラメーターをアスタリスク (*) に設定します(例:"column":["*"] )。宛先テーブルの特定の列にのみデータを書き込む場合は、このパラメーターを列名に設定します。列名はコンマ (,) で区切ります(例:"column":["id","name"] )。 OpenSearch Writer は、列をフィルタリングし、列の順序を変更できます。たとえば、OpenSearch テーブルに 3 つの列(a、b、c)が含まれているとします。列 c と b にのみデータを書き込む場合は、column パラメーターを ["c","b"] に設定できます。データ同期の際、列 a は自動的に null に設定されます。 | はい | デフォルト値なし |
batchSize | 一度に書き込むデータレコードの数。OpenSearch Writer は、一度に複数のデータレコードを OpenSearch に書き込みます。OpenSearch はデータクエリ機能を提供します。ほとんどの場合、OpenSearch の 1 秒あたりのトランザクション数 (TPS) は高くありません。OpenSearch に接続するために使用するアカウントで使用可能なリソースに基づいて、このパラメーターを設定します。 ほとんどの場合、データレコードのサイズは 1 MB 未満、一度に書き込むデータレコードの合計サイズは 2 MB 未満である必要があります。 | パーティションテーブルにデータを書き込む場合のみ必須 | 300 |
writeMode | 書き込みモード。書き込み操作の冪等性を確保するには、このパラメーターを add/update に設定します。 | はい | デフォルト値なし |
ignoreWriteError | 失敗した書き込み操作を無視するかどうかを指定します。 例:"ignoreWriteError":true 。OpenSearch Writer が一度に複数のデータレコードを OpenSearch に書き込む場合、このパラメーターは現在のバッチで失敗した書き込み操作を無視するかどうかを指定します。このパラメーターを true に設定すると、OpenSearch Writer は他の書き込み操作を続行します。false に設定すると、同期タスクは終了し、OpenSearch Writer はエラーを返します。デフォルト値を使用することをお勧めします。 | いいえ | false |
version | OpenSearch のバージョン(例:"version":"v3" )。OpenSearch V2 ではプッシュ操作に多くの制限があるため、OpenSearch V3 を使用することをお勧めします。 | いいえ | v2 |
ベクトル検索版および検索エンジン版の OpenSearch Writer のコード
{
"stepType": "opensearch",
"parameter": {
"indexName": "",
"column": [
{
"name": "col3double",
"type": "DOUBLE"
},
{
"name": "col2vector",
"type": "MULTI_FLOAT"
}
],
"datasource": "zm_test_vector_01",
"batchSize": "500",
"table": "demotable"
},
"name": "Writer",
"category": "writer"
}
ベクトル検索版および検索エンジン版の OpenSearch Writer のコードのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
table | データを書き込むテーブルの名前。Data Integration は一度に複数のテーブルにデータをインポートできないため、1 つのテーブルのみを指定できます。 | はい | デフォルト値なし |
column | データを書き込む列の名前。宛先テーブルのすべての列にデータを書き込む場合は、このパラメーターをアスタリスク (*) に設定します(例:"column":["*"] )。宛先テーブルの特定の列にのみデータを書き込む場合は、このパラメーターを列名に設定します。列名はコンマ (,) で区切ります(例:"column":["id","name"] )。 OpenSearch Writer は、列をフィルタリングし、列の順序を変更できます。たとえば、OpenSearch テーブルに 3 つの列(a、b、c)が含まれているとします。列 c と b にのみデータを書き込む場合は、column パラメーターを ["c","b"] に設定できます。データ同期の際、列 a は自動的に null に設定されます。 | はい | デフォルト値なし |
batchSize | 一度に書き込むデータレコードの数。OpenSearch Writer は、一度に複数のデータレコードを OpenSearch に書き込みます。OpenSearch はデータクエリ機能を提供します。ほとんどの場合、OpenSearch の 1 秒あたりのトランザクション数 (TPS) は高くありません。OpenSearch に接続するために使用するアカウントで使用可能なリソースに基づいて、このパラメーターを設定します。 ほとんどの場合、データレコードのサイズは 1 MB 未満、一度に書き込むデータレコードの合計サイズは 2 MB 未満である必要があります。 | パーティションテーブルにデータを書き込む場合のみ必須 | 300 |