すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Elasticsearch データソース

最終更新日:Oct 24, 2025

Elasticsearch データソースは、Elasticsearch からのデータの読み取りと Elasticsearch へのデータの書き込みを行うための双方向チャネルを提供します。このトピックでは、DataWorks が Elasticsearch に対してサポートするデータ同期機能について説明します。

適用範囲

Elasticsearch 5.x は、パブリックリソースグループでサポートされています。Elasticsearch 5.x、6.x、7.x、および 8.x は、サーバーレスリソースグループ (推奨) およびデータ統合専用リソースグループでサポートされています。

説明

Elasticsearch は、Lucene に基づく分散検索およびデータ分析エンジンです。主流のエンタープライズレベルの検索エンジンとして、Apache オープンソースライセンスに準拠したオープンソースプロダクトです。次の表は、Elasticsearch のコアコンセプトとリレーショナルデータベースのコアコンセプトをマッピングしたものです。

Elasticsearch

リレーショナルデータベース

Elasticsearch (インスタンス)

リレーショナル DB (インスタンス)

インデックス

データベース

タイプ

テーブル

ドキュメント

フィールド

Elasticsearch インスタンスには、複数のインデックス (データベース) を含めることができます。各インデックスには、複数のタイプ (テーブル) を含めることができます。各タイプには、複数のドキュメント (行) を含めることができます。各ドキュメントには、複数のフィールド (列) を含めることができます。Elasticsearch writer プラグインは、Elasticsearch の REST API を使用して、reader から読み取ったデータをバッチで Elasticsearch に書き込みます。

サポートされているバージョン

DataWorks は、Alibaba Cloud Elasticsearch 5.x、6.x、7.x、および 8.x データソースのみをサポートします。セルフマネージドの Elasticsearch データソースを追加することはできません。

制限事項

Elasticsearch データソースでオフラインの読み取りおよび書き込み操作を実行する場合、次の制限が適用されます。

  • Elasticsearch Reader は、データ同期のためにサーバーからシャード情報をフェッチします。データ同期中にサーバー上のシャードがアクティブであることを確認する必要があります。そうしないと、データの不整合が発生する可能性があります。

  • Elasticsearch 6.x 以降を使用する場合は、サーバーレスリソースグループ (推奨) または データ統合専用リソースグループ を使用する必要があります。

  • scaled_float 型のフィールドは同期できません。

  • フィールドにキーワード $ref を含むインデックスは同期できません。

サポートされているフィールドタイプ

タイプ

オフライン読み取り (Elasticsearch Reader)

オフライン書き込み (Elasticsearch Writer)

リアルタイム書き込み

binary

サポート済み

サポート済み

サポート済み

boolean

サポート済み

サポート

サポート済み

keyword

サポート済み

サポート済み

サポート済み

constant_keyword

サポートされていません

サポートされていません

サポートされていません

wildcard

サポートされていません

サポートされていません

サポートされていません

long

サポート済み

サポート済み

サポート済み

integer

サポート済み

サポート済み

サポート済み

short

サポート済み

サポート済み

サポート済み

byte

サポート済み

サポート済み

サポート済み

double

サポート済み

サポート済み

サポート済み

float

サポート

サポート済み

サポート済み

half_float

サポートされていません

サポートされていません

サポートされていません

scaled_float

サポートされていません

サポートされていません

サポートされていません

unsigned_long

サポートされていません

サポートされていません

サポートされていません

date

サポート済み

サポート済み

サポート済み

date_nanos

サポートされていません

サポートされていません

サポートされていません

alias

サポートされていません

サポートされていません

サポートされていません

object

サポート

サポート済み

サポート済み

flattened

サポートされていません

サポートされていません

サポートされていません

nested

サポート済み

サポート

サポート済み

join

サポートされていません

サポートされていません

サポートされていません

integer_range

サポート済み

サポート

サポート済み

float_range

サポート済み

サポート済み

サポート済み

long_range

サポート済み

サポート済み

サポート済み

double_range

サポート済み

サポート済み

サポート済み

date_range

サポート済み

サポート済み

サポート済み

ip_range

サポートされていません

サポート済み

サポート済み

ip

サポート

サポート済み

サポート済み

version

サポート

サポート済み

サポート済み

murmur3

サポートされていません

サポートされていません

サポートされていません

aggregate_metric_double

サポートされていません

サポートされていません

サポートされていません

histogram

サポートされていません

サポートされていません

サポートされていません

text

サポート済み

サポート済み

サポート済み

annotated-text

サポートされていません

サポートされていません

サポートされていません

completion

サポート済み

サポートされていません

サポートされていません

search_as_you_type

サポートされていません

サポートされていません

サポートされていません

token_count

サポート済み

サポートされていません

サポートされていません

dense_vector

サポートされていません

サポートされていません

サポートされていません

rank_feature

サポートされていません

サポートされていません

サポートされていません

rank_features

サポートされていません

サポートされていません

サポートされていません

geo_point

サポート済み

サポート済み

サポート済み

geo_shape

サポート済み

サポート済み

サポート済み

point

サポートされていません

サポートされていません

サポートされていません

shape

サポートされていません

サポートされていません

サポートされていません

percolator

サポートされていません

サポートされていません

サポートされていません

string

サポート

サポート

サポート済み

仕組み

Elasticsearch Reader は次のように動作します。

  • Elasticsearch の _search scroll slice メソッドを使用します。slice 機能は、Data Integration タスクのマルチスレッドシャーディングメカニズムと連携して動作します。

  • Elasticsearch のマッピング構成に基づいてデータ型を変換します。

詳細については、「Elasticsearch 公式ドキュメント」をご参照ください。

説明

Elasticsearch Reader は、データ同期のためにサーバーからシャード情報をフェッチします。データ同期中にサーバー上のシャードがアクティブであることを確認する必要があります。そうしないと、データの不整合が発生する可能性があります。

基本設定

重要

コードを実行する前にコメントを削除する必要があります。

{
 "order":{
  "hops":[
   {
    "from":"Reader",
    "to":"Writer"
   }
  ]
 },
 "setting":{
  "errorLimit":{
   "record":"0" // エラーレコードの数。
  },
  "jvmOption":"",
  "speed":{
   "concurrent":3,// 同時実行スレッドの数。
   "throttle":true,//
                     "mbps":"12",// 速度制限。1 Mbps = 1 MB/s。
  }
 },
 "steps":[
  {
   "category":"reader",
   "name":"Reader",
   "parameter":{
    "column":[ // 読み取る列。
     "id",
     "name"
    ],
    "endpoint":"", // エンドポイント。
    "index":"",  // インデックス。
    "password":"",  // パスワード。
    "scroll":"",  // スクロールフラグ。
    "search":"",  // クエリパラメーター。これは Elasticsearch のクエリコンテンツと同じです。_search API が使用され、search に名前が変更されます。
    "type":"default",
    "username":""  // ユーザー名。
   },
   "stepType":"elasticsearch"
  },
  {
   "stepType": "elasticsearch",
            "parameter": {
                "column": [ // 書き込む列。
                    {
                        "name": "id",
                        "type": "integer"
                    },
                    {
                        "name": "name",
                        "type": "text"
                    }
                ],
                "index": "test",   // 宛先インデックス。
                 "indexType": "",   // 宛先インデックスのタイプ。Elasticsearch 7.x の場合は空のままにします。
                "actionType": "index",  // 書き込みモード。
                "cleanup": false,         // インデックスを再作成するかどうかを指定します。
                "datasource": "test",   // データソースの名前。
                "primaryKeyInfo": {     // プライマリキーの値を取得するメソッド。
                    "fieldDelimiterOrigin": ",",
                    "column": [
                        "id"
                    ],
                    "type": "specific",
                    "fieldDelimiter": ","
                },
                "dynamic": false,  // 動的マッピング。
                "batchSize": 1024   // バッチで書き込むドキュメントの数。
            },
            "name": "Writer",
            "category": "writer"
  }
 ],
 "type":"job",
 "version":"2.0" // バージョン番号。
}

高度な機能

データソースの追加

DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールのパラメーターの infotip を表示して、パラメーターの意味を理解できます

データ同期タスクの開発

同期タスクを構成するためのエントリポイントと手順については、次の構成ガイドをご参照ください。

単一テーブルのオフライン同期タスクの構成

単一テーブルのリアルタイム書き込みタスクの構成

手順の詳細については、「DataStudio でリアルタイム同期タスクを構成する」をご参照ください。

データベース全体のオフライン書き込みタスク、または単一テーブルまたはデータベース全体の完全/増分リアルタイム書き込みタスクの構成

手順の詳細については、「データベース全体のリアルタイム同期タスクを構成する」をご参照ください。

付録 1: スクリプトデモとパラメーターの説明

コードエディタを使用したバッチ同期タスクの構成

コードエディタを使用してバッチ同期タスクを構成する場合は、統一されたスクリプト形式の要件に基づいてスクリプト内の関連パラメーターを構成する必要があります。詳細については、「コードエディタでタスクを構成する」をご参照ください。次の情報は、コードエディタを使用してバッチ同期タスクを構成する際にデータソースに構成する必要があるパラメーターについて説明しています。

Reader スクリプトデモ

{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" // エラーレコードの数。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ // 読み取る列。
                    "id",
                    "name"
                ],
                "endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", // エンドポイント。
                "index":"aliyun_es_xx",  // インデックス。
                "password":"*******",  // パスワード。
                "multiThread":true,
                "scroll":"5m",  // スクロールフラグ。
                "pageSize":5000,
                "connTimeOut":600000,
                "readTimeOut":600000,
                "retryCount":30,
                "retrySleepTime":"10000",
                "search":{
                            "range":{
                                "gmt_modified":{
                                    "gte":0
                                }
                            }
                        },  // クエリパラメーター。これは Elasticsearch のクエリコンテンツと同じです。_search API が使用され、search に名前が変更されます。
                "type":"doc",
                "username":"aliyun_di"  // ユーザー名。
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" // バージョン番号。
}

Reader スクリプトパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加されたデータソースの名前と同じである必要があります。

はい

なし

index

Elasticsearch の index の名前。

はい

なし

type

Elasticsearch の index type

いいえ

インデックス名

search

Elasticsearch のクエリパラメーター。

はい

なし

pageSize

一度に読み取るデータエントリの数。

いいえ

100

scroll

Elasticsearch のページングパラメーター。スクロールを維持する期間を指定します。

  • このパラメーターを小さい値に設定すると、2 ページのデータを読み取る間のアイドル時間が長すぎるとスクロールが期限切れになる可能性があります。これにより、データが失われる可能性があります。

  • このパラメーターを大きい値に設定すると、同時クエリの数がサーバーの max_open_scroll_context パラメーターの値を超えた場合にデータクエリエラーが発生する可能性があります。

はい

なし

strictMode

厳密モードで Elasticsearch からデータを読み取るかどうかを指定します。Elasticsearch で shard.failed エラーが発生した場合、少量のデータが読み取られるのを防ぐために読み取り操作が停止します。

いいえ

true

sort

結果をソートするフィールド。

いいえ

なし

retryCount

失敗後のリトライ回数。

いいえ

300

connTimeOut

クライアントの接続タイムアウト期間。

いいえ

600,000

readTimeOut

クライアントの読み取りタイムアウト期間。

いいえ

600,000

multiThread

HTTP リクエストに複数のスレッドを使用するかどうかを指定します。

いいえ

true

preemptiveAuth

HTTP リクエストにプリエンプティブ認証を使用するかどうかを指定します。

いいえ

false

retrySleepTime

失敗後のリトライ間隔。

いいえ

1000

discovery

ノード検出を有効にするかどうかを指定します。

  • true: クラスター内のランダムなノードに接続します。ノード検出を有効にすると、システムはクライアントのサーバーリストをポーリングして定期的に更新し、検出されたノードにクエリリクエストを送信します。

  • false: 構成されたエンドポイントにクエリリクエストを送信します。

いいえ

false

compression

GZIP を使用してリクエスト本文を圧縮するかどうかを指定します。この機能を使用する場合は、Elasticsearch ノードで http.compression 設定を有効にする必要があります。

いいえ

false

dateFormat

同期するフィールドが date 型で、このフィールドのマッピングにフォーマット構成がない場合は、dateFormat パラメーターを構成する必要があります。構成は次の形式です: "dateFormat" : "yyyy-MM-dd||yyyy-MM-dd HH:mm:ss"。この構成には、同期する date 型フィールドのすべてのフォーマットを含める必要があります。

いいえ

なし

full

ドキュメントの全コンテンツを単一のフィールドとして宛先に同期するかどうかを指定します。Elasticsearch からクエリされたデータは、単一のフィールドとして扱われます。設定の詳細については、「シナリオ 1: 完全なデータプル」をご参照ください。

いいえ

なし

multi

これは 5 つの使用シナリオを持つ高度な機能です。これには 2 つのサブプロパティがあります: multi.keymulti.mult。設定の詳細については、「高度な機能」の表をご参照ください。

いいえ

なし

Writer スクリプトデモ

{
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle":true,// throttle が false に設定されている場合、mbps パラメーターは効果がなく、スロットリングは行われません。throttle が true に設定されている場合、スロットリングが有効になります。
            "concurrent":1, // 同時実行ジョブの数。
            "mbps":"12"// 速度制限。1 Mbps = 1 MB/s。
        }
    },
    "steps": [
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {

            },
            "stepType": "stream"
        },
        {
            "category": "writer",
            "name": "Writer",
            "parameter": {
                "datasource":"xxx",
                "index": "test-1",
                "type": "default",
                "cleanup": true,
                "settings": {
                        "number_of_shards": 1,
                        "number_of_replicas": 0
                },
                "discovery": false,
                "primaryKeyInfo":{
                    "type":"pk",    
                     "fieldDelimiter":",",
                     "column":[]
                    },
                "batchSize": 1000,
                "dynamic":false,
                "esPartitionColumn":[
                    {
                        "name":"col1",  
                        "comment":"xx", 
                        "type":"STRING" 
                        }
                     ],
                "column": [
                    {
                        "name": "pk",
                        "type": "id"
                    },
                    {
                        "name": "col_ip",
                        "type": "ip"
                    },
                    {
                        "name": "col_array",
                        "type": "long",
                        "array": true
                    },
                    {
                        "name": "col_double",
                        "type": "double"
                    },
                    {
                        "name": "col_long",
                        "type": "long"
                    },
                    {
                        "name": "col_integer",
                        "type": "integer"
                    },
                    {
                        "name": "col_keyword",
                        "type": "keyword"
                    },
                    {
                        "name": "col_text",
                        "type": "text",
                        "analyzer": "ik_max_word",
                        "other_params":
                            {
                                "doc_values": false
                            }
                    },
                    {
                        "name": "col_geo_point",
                        "type": "geo_point"
                    },
                    {
                        "name": "col_date",
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss"
                    },
                    {
                        "name": "col_nested1",
                        "type": "nested"
                    },
                    {
                        "name": "col_nested2",
                        "type": "nested"
                    },
                    {
                        "name": "col_object1",
                        "type": "object"
                    },
                    {
                        "name": "col_object2",
                        "type": "object"
                    },
                    {
                        "name": "col_integer_array",
                        "type": "integer",
                        "array": true
                    },
                    {
                        "name": "col_geo_shape",
                        "type": "geo_shape",
                        "tree": "quadtree",
                        "precision": "10m"
                    }
                ]
            },
            "stepType": "elasticsearch"
        }
    ],
    "type": "job",
    "version": "2.0"
}
説明

VPC 内の Elasticsearch インスタンスは、ネットワークの分離により、デフォルトのリソースグループからはアクセスできません。データ同期のために VPC にアクセスするには、サーバーレスリソースグループ (推奨) またはデータ統合専用リソースグループを使用する必要があります。リソースグループの追加方法の詳細については、「サーバーレスリソースグループ」をご参照ください。

Writer スクリプトパラメーター

パラメーター

説明

必須

デフォルト値

datasource

データを同期する Elasticsearch データソース。DataWorks でデータソースを作成していない場合は、作成してください。詳細については、「Elasticsearch データソースを構成する」をご参照ください。

はい

なし

index

Elasticsearch のインデックスの名前。

はい

なし

indexType

Elasticsearch のインデックスのタイプ。

いいえ

Elasticsearch

cleanup

既存のインデックスからデータを削除するかどうかを指定します。

  • true: データをインポートする前に、元のインデックスが削除され、同じ名前のインデックスが再作成されます。この操作により、インデックス内のデータが削除されます。

  • false: データをインポートする前に、インデックス内の既存のデータが保持されます。

いいえ

false

batchSize

同期タスクで一度に Elasticsearch に挿入するドキュメントの数。

いいえ

1,000

trySize

Elasticsearch へのデータ書き込みに失敗した後のリトライ回数。

いいえ

30

timeout

クライアントのタイムアウト期間。

いいえ

600,000

discovery

タスクのノード検出機能を有効にするかどうかを指定します。

  • true: クラスター内のランダムなノードに接続します。ノード検出を有効にすると、システムはクライアントのサーバーリストをポーリングして定期的に更新します。

  • false: Elasticsearch クラスターに接続します。

いいえ

false

compression

HTTP リクエストの圧縮を有効にします。

いいえ

true

multiThread

HTTP リクエストに複数のスレッドを使用するかどうかを指定します。

いいえ

true

ignoreWriteError

書き込みエラーを無視し、リトライせず、書き込みを続行します。

いいえ

false

ignoreParseError

データ形式の解析エラーを無視し、書き込みを続行します。

いいえ

true

alias

Elasticsearch のエイリアスは、データベースのビューに似ています。たとえば、my_index という名前のインデックスに my_index_alias という名前のエイリアスを作成した場合、my_index_alias に対する操作は my_index に対する操作と同じです。

エイリアスを構成すると、データのインポート後に指定されたインデックスのエイリアスが作成されます。

いいえ

なし

aliasMode

データのインポート後にエイリアスを追加するモード。有効な値: append および exclusive

  • aliasModeappend に設定すると、現在のインデックスがエイリアスマッピングに追加されます。1 つのエイリアスは複数のインデックスに対応できます。

  • aliasModeexclusive に設定すると、エイリアスが最初に削除され、次に現在のインデックスがエイリアスマッピングに追加されます。1 つのエイリアスは 1 つのインデックスに対応します。

その後、エイリアスは実際のインデックス名に変換されます。エイリアスは、インデックスの移行や複数のインデックスの統一されたクエリに使用できます。また、ビューの機能を実装するためにも使用できます。

いいえ

append

settings

インデックスを作成するための設定。これは公式の Elasticsearch 設定と一致します。

いいえ

なし

column

column パラメーターは、複数のドキュメントのフィールド情報を構成するために使用されます。各フィールドについて、nametype などの基本設定、および AnalyzerFormatArray などの拡張設定を構成できます。

Elasticsearch でサポートされているフィールドタイプは次のとおりです。

- id  // id タイプは Elasticsearch の _id に対応し、一意のプライマリキーとして理解できます。データを書き込む際、同じ ID のデータは上書きされ、インデックスは作成されません。
- string
- text
- keyword
- long
- integer
- short
- byte
- double
- float
- date
- boolean
- binary
- integer_range
- float_range
- long_range
- double_range
- date_range
- geo_point
- geo_shape
- ip
- token_count
- array
- object
- nested

次のセクションでは、列のタイプについて説明します。

  • 列のタイプが text の場合、analyzernormsindex_options などのパラメーターを構成できます。次の例は構成を示しています。

    {
        "name": "col_text",
        "type": "text",
        "analyzer": "ik_max_word"
        }
  • 列のタイプが Date の場合、次の 2 つの方法のいずれかを構成してソースデータを解析できます。構成方法が一貫していることを確認してください。

    • 方法 1: reader によって読み取られたフィールドのコンテンツを es data フィールドに直接書き込みます。

      • origin:true を設定します。これは必須です。これにより、読み取られたフィールドのコンテンツを es data に直接書き込むことができます。

      • "format" を構成します。これは、es writermapping を作成するときに、このフィールドに format プロパティを設定する必要があることを示します。次の例は構成を示しています。

          {
             "parameter":{
               "column":[{
                   "name": "col_date",
                   "type": "date",
                   "format": "yyyy-MM-dd HH:mm:ss",
                   "origin": true
                }]
           }
        }
    • 方法 2: (タイムゾーン変換) Data Integration にタイムゾーン変換を実行させる必要がある場合は、Timezone パラメーターを追加します。次の例は構成を示しています。

      構成された "format" は、Data Integration がタイムゾーン変換中に解析する時間形式を示します。

        {
           "parameter" :{
             "column": [{
                "name": "col_date",
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss",
               "Timezone": "UTC"
             }]
         }
      }
  • 列のタイプが geo_shape の場合、tree (geohash または quadtree) や precision などのプロパティを構成できます。次の例は構成を示しています。

    {
        "name": "col_geo_shape",
        "type": "geo_shape",
        "tree": "quadtree",
        "precision": "10m"
        }

列の type 以外のプロパティを構成するには、[other_params] パラメーターを使用できます。このパラメーターは [column] で構成され、マッピングを更新する際に列の type 以外の Elasticsearch プロパティを記述するために使用されます。

 {
   "name": "guid",
   "other_params":
    {
       "doc_values": false
      },
    "type": "text"
  }

ソースデータを配列として Elasticsearch に書き込む場合は、JSON 形式で、または区切り文字を指定してソースデータを解析できます。設定の詳細については、「付録 2: 配列形式で Elasticsearch にデータを書き込む」をご参照ください。

はい

なし

dynamic

同期タスクが、Elasticsearch の動的マッピングメカニズムを使用して、ドキュメント内で見つかった存在しないフィールドのマッピングを追加するかどうかを指定します。

  • true: Elasticsearch の自動マッピングを保持します。

  • false: これはデフォルト値です。このパラメーターを指定しない場合、値は false になります。Elasticsearch のマッピングは、同期タスクの列構成に基づいて生成および更新されます。

Elasticsearch 7.x のデフォルトの type_doc です。Elasticsearch で自動マッピングを使用する場合は、_docesVersion を 7 に設定します。

コードエディタに切り替えて、バージョンパラメーターを追加する必要があります: "esVersion": "7"

重要

フィールドマッピングエラーが発生した場合は、このパラメーターを有効にして問題を解決しようとすることができます。ただし、この方法では、フィールドタイプが期待と一致しなくなったり、データ例外が発生したりする可能性があります。有効にするかどうかを決定する前に、データ構造に基づいてリスクを評価してください。

いいえ

false

actionType

Elasticsearch にデータを書き込むためのアクションタイプ。Data Integration は indexupdate をサポートします。actionType のデフォルト値は index です。

  • index: Elasticsearch SDK の Index.Builder が下位層で使用され、バッチリクエストを構築します。Elasticsearch index アクションでデータを挿入する場合、まず挿入するドキュメントデータに ID が指定されているかどうかを判断する必要があります。

    • ID が指定されていない場合、Elasticsearch はデフォルトで一意の ID を生成します。この場合、ドキュメントは直接 Elasticsearch に追加されます。

    • ID が指定されている場合、ドキュメント全体が更新 (置換) されます。特定のフィールドの変更はサポートされていません。

      説明

      ここでの更新は、指定された一部の列を置き換える Elasticsearch の更新とは異なります。

  • update: ユーザーが指定した ID に基づいてドキュメントを更新します。ID がインデックスに存在しない場合、ドキュメントは挿入されます。ID が存在する場合、指定された column フィールドのコンテンツが更新されます。他のドキュメントフィールドのコンテンツは変更されません。各 update 操作は、特定のフィールドを変更するためにドキュメント全体の情報を取得します。update アクションは条件付きフィルタリングをサポートせず、指定された ID に基づいてのみ更新します。各更新は元のドキュメントを取得する必要があるため、パフォーマンスが大幅に影響を受ける可能性があります。

    説明

    アクションタイプを update に設定する場合は、[primaryKeyInfo] パラメーターを設定する必要があります。

いいえ

index

primaryKeyInfo

Elasticsearch に書き込むときにプライマリキーの値を取得する方法を定義します。

  • ビジネスプライマリキー (pk): _id の値が特定のフィールドに設定されます。

    "parameter":{
    "primaryKeyInfo":{
    "type":"pk",
    "column":["id"]}
    }
  • 複合プライマリキー (specific): _id の値は、いくつかのフィールドの値の連結です。区切り文字は、[プライマリキー区切り文字] (fieldDelimiter) に設定した値です。

    説明

    フィールド名は、Elasticsearch writer の宛先フィールドです。コードレス UI では、[プライマリキー列の構成] には、Elasticsearch インデックスに既に存在するフィールドのみが含まれます。

    "parameter":{
    "primaryKeyInfo":{
    "type":"specific",
    "fieldDelimiter":",",
    "column":["col1","col2"]}
    }
  • プライマリキーなし (nopk): _id は、Elasticsearch に書き込むときにシステムによって自動的に生成されます。

    "primaryKeyInfo":{
    "type":"nopk"
    }

はい

specific

esPartitionColumn

Elasticsearch に書き込むときにパーティショニングを有効にするかどうかを指定します。これは Elasticsearch のルーティングパラメーターを変更するために使用されます。

  • パーティショニングを有効にする: 指定された列の値が空の文字列を区切り文字として連結され、ルーティング値として設定されます。データを書き込むとき、これにより指定されたシャードにドキュメントが挿入または更新されます。パーティショニングを有効にする場合は、パーティションキー列を指定する必要があります。

    {    "esPartitionColumn": [
            {
                "name":"col1",
                "comment":"xx",
                "type":"STRING"
                }
            ],
        }
  • パーティショニングを無効にする: このパラメーターを指定しないでください。デフォルトでは、_id がルーティングキーとして使用され、データスキューを防ぐために複数のシャードにドキュメントが均等に分散されます。

いいえ

false

enableWriteNull

ソースからの null 値フィールドを Elasticsearch に同期するかどうかを指定します。有効な値:

  • true: はい。同期後、Elasticsearch の対応するフィールドの値は null になります。

  • false: いいえ。ソースからの null 値フィールドは Elasticsearch に同期されません。フィールドは Elasticsearch に表示されません。

いいえ

true

付録 2: 配列形式で Elasticsearch にデータを書き込む

次の 2 つの方法を使用して、ソースデータを配列として Elasticsearch に書き込むことができます。

  • JSON 形式でソースデータを解析する

    たとえば、ソースデータが "[1,2,3,4,5]" の場合、json_array=true を設定してデータを解析します。その後、データは配列形式で Elasticsearch に書き込まれます。

    "parameter" : {
      {
        "name":"docs_1",
        "type":"keyword",
        "json_array":true
      }
    }
  • 区切り文字を使用してソースデータを解析する

    たとえば、ソースデータが "1,2,3,4,5" の場合、区切り文字 splitter="," を設定してデータを解析します。その後、データは配列形式で Elasticsearch に書き込まれます。

    説明

    タスクは 1 種類の区切り文字のみをサポートします。splitter パラメーターはグローバルに一意です。複数の配列フィールドに異なる区切り文字を構成することはできません。たとえば、ソースフィールドが col1="1,2,3,4,5"col2="6-7-8-9-10" の場合、各フィールドに個別の splitter を構成することはできません。

    "parameter" : {
          "column": [
            {
              "name": "docs_2",
              "array": true,
              "type": "long"
            }
          ],
          "splitter":","// 注: splitter 構成は column 構成と同じレベルにあります。
    }

付録 3: シナリオ

シナリオ 1: 完全なデータプル

  • 背景: Elasticsearch のドキュメントのクエリ結果を単一のフィールドとしてプルできます。

  • 例:

    
    ## Reader: Elasticsearch の生データ
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "IXgdO4MB4GR_1DmrjTXP",
            "_score": 1.0,
            "_source": {
                "feature1": "value1",
                "feature2": "value2",
                "feature3": "value3"
            }
        }]
    
    ## Data Integration Elasticsearch Reader プラグインの構成
    "parameter": {
      "column": [
          "content"
      ],
      "full":true
    }
    
    ## Writer の結果: 1 行 1 列が宛先に同期されます。
    {"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}

シナリオ 2: ネストされたフィールドまたはオブジェクトフィールドのプロパティを同期する

  • 背景: Object またはネストされたフィールドのプロパティを同期する場合、パスを使用してそれらを解決できます。

  • 構成:

    • property

    • property.sub-property

    • property[0].sub-property

  • スクリプト構成:

    "multi":{
        "multi":true
    }
    説明

    現在、コードレス UI での構成は利用できません。

  • 例:

    ## Reader: Elasticsearch の生データ
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "7XAOOoMB4GR_1Dmrrust",
            "_score": 1.0,
            "_source": {
                "level1": {
                    "level2": [
                        {
                            "level3": "testlevel3_1"
                        },
                        {
                            "level3": "testlevel3_2"
                        }
                    ]
                }
            }
        }
    ]
    ## Data Integration Elasticsearch reader プラグインの構成
    "parameter": {
      "column": [
          "level1",
          "level1.level2",
          "level1.level2[0]",
          "level1.level2.level3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ## Writer の結果: 4 列のデータが 1 行
    column1(level1):            {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]}
    column2(level1.level2):     [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]
    column3(level1.level2[0]):  {"level3":"testlevel3_1"}
    column4(level1.level2.level3):  null
    説明
    • 取得したノードの親ノードに配列が含まれている場合、結果は null になります。たとえば、`level1.level2.level3` を取得してもエラーは報告されませんが、同期結果は null になります。パスを `level1.level2[0].level3` または `level1.level2[1].level3` として構成する必要があります。`level1.level2[*].level3` 形式は現在サポートされていません。

    • キーにピリオド (.) を含むデータはサポートされていません。たとえば、`{"level1.level2":{"level3":"testlevel3_1"}}` のようなデータです。この場合、このデータエントリの取得結果は null になります。

シナリオ 3: 配列プロパティを複数の行に分割する

  • 背景: 1 対多の関係が存在する場合、配列列を複数の行に分割できます。

  • 構成: property[*].sub-property

  • 効果: `{ "splitKey" :[1,2,3,4,5]}` のようなソースデータは、5 つの別々の行として分割されて宛先に書き込まれます。

  • スクリプト構成:

    "multi":{   
           "multi":true,    
            "key": "headers"
    }
    説明
    • コードレス UI で [多行配列列名の分割] を構成すると、同等のスクリプト構成が自動的に生成されます。

    • 値はリストである必要があります。そうでない場合、エラーが発生します。

  • 例:

    ## Reader: Elasticsearch の生データ
    [
        {
            "_index": "lmtestjson",
            "_type": "_doc",
            "_id": "nhxmIYMBKDL4VkVLyXRN",
            "_score": 1.0,
            "_source": {
                "headers": [
                    {
                        "remoteip": "192.0.2.1"
                    },
                    {
                        "remoteip": "192.0.2.2"
                    }
                ]
            }
        },
        {
            "_index": "lmtestjson",
            "_type": "_doc",
            "_id": "wRxsIYMBKDL4VkVLcXqf",
            "_score": 1.0,
            "_source": {
                "headers": [
                    {
                        "remoteip": "192.0.2.3"
                    },
                    {
                        "remoteip": "192.0.2.4"
                    }
                ]
            }
        }
    ]
    ## Data Integration Elasticsearch reader プラグインの構成
    {
       "column":[
          "headers[*].remoteip"
      ]
      "multi":{
          "multi":true,
          "key": "headers"
      }
    }
    
    ## Writer の結果: 4 行
    192.0.2.1
    192.0.2.2
    192.0.2.3
    192.0.2.4

シナリオ 4: 配列プロパティの重複排除とマージ

  • 背景: 配列プロパティを重複排除してマージし、その結果を文字列プロパティとして書き込むことができます。配列プロパティは、`name1.name2` のようなサブプロパティにすることができます。`toString` の結果が重複排除の基準として使用されます。

  • 構成: `property[]`。

    列名に `[]` が含まれている場合、このプロパティに対して重複排除とマージが実行されます。

  • スクリプト構成:

    "multi":{
        "multi":true
    }
    説明

    このパラメーターはコードレス UI では構成できません。

  • 例:

    ## Reader: Elasticsearch の生データ
    "hits": [
    {
        "_index": "mutiltest_1",
        "_type": "_doc",
        "_id": "4nbUOoMB4GR_1Dmryj8O",
        "_score": 1.0,
        "_source": {
            "feature1": [
                "value1",
                "value1",
                "value2",
                "value2",
                "value3"
            ]
        }
    }
    ]
    ## Data Integration Elasticsearch reader プラグインの構成
    "parameter": {
      "column":[
            "feature1[]"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ## Writer の結果: 1 行 1 列のデータ
    "value1,value2,value3"

シナリオ 5: 複数のプロパティをマージして同期する

  • 背景: 複数のプロパティを選択的に処理できます。値を持つ最初のプロパティが返されます。指定されたプロパティのいずれも存在しない場合、null が書き込まれます。

  • 構成: `property1|property2|...`

    列名に `|` 区切り文字が含まれている場合、この項目に対して複数のプロパティが選択されます。

  • スクリプト構成:

    "multi":{    
        "multi":true
    }
    説明

    このパラメーターはコードレス UI では構成できません。

  • 例:

    ## Reader: Elasticsearch の生データ
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "v3ShOoMB4GR_1DmrZN22",
            "_score": 1.0,
            "_source": {
                "feature1": "feature1",
                "feature2": [
                    1,
                    2,
                    3
                ],
                "feature3": {
                    "child": "feature3"
                }
            }
        }]
    
    ## Data Integration Elasticsearch reader プラグインの構成
    "parameter": {
      "column":[
            "feature1|feature2|feature3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ## Writer の結果: 1 行 1 列のデータ
    "feature1"

シナリオ 6: 複数のプロパティを選択的に同期する

  • 背景: 複数のプロパティを選択的に処理できます。値を持つ最初のプロパティが返されます。指定されたプロパティのいずれも存在しない場合、null が書き込まれます。

  • 構成: `property1|property2|...`

    列名に `|` 区切り文字が含まれている場合、この項目に対して複数のプロパティが選択されます。

  • スクリプト構成:

    "multi":{
        "multi":true
    }
    説明

    このパラメーターはコードレス UI では構成できません。

  • 例:

    ## Reader: Elasticsearch の生データ
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "v3ShOoMB4GR_1DmrZN22",
            "_score": 1.0,
            "_source": {
                "feature1": "feature1",
                "feature2": [
                    1,
                    2,
                    3
                ],
                "feature3": {
                    "child": "feature3"
                }
            }
        }]
    ## Data Integration Elasticsearch reader プラグインの構成
    "parameter": {
      "column":[
            "feature1,feature2,feature3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ## Writer の結果: 1 行 1 列のデータ
    "feature1,[1,2,3],{"child":"feature3"}"

リファレンス

Data Integration は追加のデータソースをサポートしています。詳細については、「サポートされているデータソースと同期ソリューション」をご参照ください。