すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Lindorm データソース

最終更新日:Dec 05, 2025

DataWorks の Data Integration は、Lindorm Reader と Lindorm Writer プラグインを使用して、Lindorm との間でデータの読み書きを行います。このトピックでは、DataWorks が Lindorm に対して提供するデータの読み書き機能について説明します。

適用範囲

説明

Lindorm はマルチモデルデータベースです。詳細については、「Lindorm ドキュメント」をご参照ください。DataWorks は現在、LindormTable とコンピュートエンジンのみをサポートしています。

サポートされるフィールドタイプ

Lindorm Reader と Lindorm Writer は、ほとんどの Lindorm データ型をサポートしますが、一部サポートされていない型もあります。ご利用のデータ型がサポートされていることを確認する必要があります。

次の表に、Lindorm Reader と Lindorm Writer のデータ型変換を示します。

型の分類

データ型

整数

INT、LONG、SHORT

浮動小数点

DOUBLE、FLOAT、DOUBLE

文字列

STRING

日付と時刻

DATE

ブール値

BOOLEAN

バイナリ

BINARYSTRING

データ同期タスクの開発

同期タスクの設定のエントリポイントと手順については、次の設定ガイドをご参照ください。

オフライン単一テーブル同期

リアルタイム単一テーブル同期

リアルタイム全データベース同期

付録:スクリプトデモとパラメーター

コードエディタを使用したバッチ同期タスクの設定

コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスク設定」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。

Reader スクリプトデモ

  • LindormTable の Lindorm SQL テーブルからローカルマシンにデータを抽出するジョブを設定します。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                    "caching": 128,
                    "column": [
                       "id",
                      "value" 
                    ],
                    "envType": 1,
                    "datasource": "lindorm",
                    "tableMode": "tableService",
                    "table": "lindorm_table"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "lindorm",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value" 
                    ],
                    "socketTimeout": 3600000,
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            // 転送速度を byte/s 単位で設定します。DataX はこの速度に達するように試みますが、超えることはありません。
            "byte": 1048576
          }
          // エラー制限
          "errorLimit": {
            // エラーレコードの最大数。エラーレコードの数がこの値を超えると、ジョブは失敗します。
            "record": 0,
            // エラーレコードの最大許容率。たとえば、1.0 は 100% を意味し、0.02 は 2% を意味します。
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  • LindormTable の Lindorm HBaseLike (WideColumn) テーブルからローカルマシンにデータを抽出するジョブを設定します。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                    "column":  [
                         "STRING|rowkey",
                          "INT|f:a"
                    ],
                    "envType": 1,
                    "datasource": "lindorm",
                    "tableMode": "wideColumn",
                    "table":"lindorm_table"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            // 転送速度を byte/s 単位で設定します。DataX はこの速度に達するように試みますが、超えることはありません。
            "byte": 1048576
          }
            // エラー制限
            "errorLimit": {
            // エラーレコードの最大数。エラーレコードの数がこの値を超えると、ジョブは失敗します。
            "record": 0,
            // エラーレコードの最大許容率。たとえば、1.0 は 100% を意味し、0.02 は 2% を意味します。
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  • コンピュートエンジンテーブルからローカルマシンにデータを抽出するジョブを設定します。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                   "datasource": "lindorm_datasource",
                    "column": [
                       "id",
                       "value"
                    ],
                    "tableComment": "",
                    "where": "",
                    "session": [],
                    "splitPk": "id",
                    "table": "auto_ob_149912212480"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            // 転送速度を byte/s 単位で設定します。DataX はこの速度に達するように試みますが、超えることはありません。
            "byte": 1048576
          }
            // エラー制限
            "errorLimit": {
            // エラーレコードの最大数。エラーレコードの数がこの値を超えると、ジョブは失敗します。
            "record": 0,
            // エラーレコードの最大許容率。たとえば、1.0 は 100% を意味し、0.02 は 2% を意味します。
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }

Reader スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

mode

LindormTable に固有です。データの読み取りモードを指定します。有効な値は FixedColumnDynamicColumn です。

はい

FixedColumn

tableMode

LindormTable に固有です。有効な値は、標準テーブル SQL モードの場合は table、ワイドテーブルモードの場合は wideColumn です。デフォルト値は table です。table モードを選択した場合、このパラメーターを指定する必要はありません。

いいえ

デフォルトでは指定されません

table

データの読み取り元となる Lindorm テーブルの名前。テーブル名では大文字と小文字が区別されます。

はい

なし

encoding

LindormTable に固有です。コーデック。有効な値は UTF-8 と GBK です。このパラメーターは通常、バイナリで格納された Lindorm の byte[] 型を String 型に変換するために使用されます。

いいえ

UTF-8

caching

LindormTable に固有です。1 回のバッチで取得するレコード数。値を大きくすると、データ同期システムと Lindorm 間のネットワーク対話が大幅に減少し、全体のスループットが向上します。この値が大きすぎると、Lindorm サーバーに過度の負荷がかかったり、データ同期プロセスでメモリ不足 (OOM) エラーが発生したりする可能性があります。

いいえ

100

selects

LindormTable に固有です。現在読み取り中のテーブルタイプでは、システムは自動的にデータをシャーディングしません。デフォルトでは、タスクは単一の同時実行プロセスで実行されます。データをシャーディングするには、selects パラメーターを手動で設定する必要があります。例:

selects": [
    "where(compare(\"id\", LESS, 5))",
    "where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))",
    "where(compare(\"id\", GREATER_OR_EQUAL, 10))"
    ],

制限事項

  • クエリ条件として使用できるのは、プライマリキー列とインデックス列のみです。標準列をクエリ条件として使用すると、全表スキャンがトリガーされ、ソースクラスターの安定性に影響します。

  • テーブルに複数のプライマリキー列がある場合、クエリ条件は最左一致の原則に従う必要があります。つまり、左側の連続する n-1 個のプライマリキー列は等価条件を使用する必要があります。たとえば、テーブルのプライマリキーが [id, order_time] で、標準列が [type, data] であるとします。type 列にはセカンダリインデックスが作成されています。

    • 推奨される構文の例

      SQL 構文

      プラグイン構文

      where id >= 1 and id < 100

      where(and(compare(\"id\", GREATER_OR_EQUAL, 1), compare(\"id\", LESS, 100)))

      where id = 1 and order_time > 1234567

      where(and(compare(\"id\", EQUAL, 1), compare(\"order_time\", GREATER, 1234567)))

      where type = 'pay'

      where(compare(\"type\", EQUAL, \"pay\"))
    • 推奨されない構文の例

      SQL 構文

      プラグイン構文

      推奨されない理由

      where order_time >= 1234567 and order_time < 5678910

      where(and(compare(\"order_time\", GREATER_OR_EQUAL, 1234567), compare(\"order_time\", LESS, 5678910)))

      最左のプライマリキー列 id がありません。

      where id > 1 and order_time > 1234567

      where(and(compare(\"id\", GREATER, 1), compare(\"order_time\", GREATER, 1234567)))

      最左のプライマリキー列 id が等価条件を使用していません。

      where data > 'xxx'

      where(compare(\"data\", GREATER, \"xxx\"))

      data フィールドはプライマリキー列ではありません。

いいえ

なし

session

コンピュートエンジンに固有です。セッションレベルのジョブパラメーター (例:set hive.execution.engine=tez)。

いいえ

なし

splitPk

コンピュートエンジンに固有です。シャードキー。このパラメーターは、コンピュートエンジンテーブルからのデータ読み取りに固有です。splitPk を指定すると、指定されたフィールドに基づいてデータがシャーディングされます。データ同期は同時実行タスクを開始してデータを同期するため、効率が向上します。

  • splitPk を指定しない場合、または splitPk の値が空の場合、データ同期は単一のチャネルを使用してテーブルデータを同期します。

  • 現在、splitPk は整数データのシャーディングのみをサポートしています。文字列、浮動小数点数、日付、その他のデータ型はサポートしていません。

いいえ

なし

columns

読み取るフィールドのリスト。列のトリミングや並べ替えができます。列のトリミングとは、エクスポートする列のサブセットを選択できることを意味します。列の並べ替えとは、テーブルスキーマとは異なる順序で列をエクスポートできることを意味します。

  • テーブルタイプのテーブルの場合、列名のみを指定します。スキーマ情報はテーブルのメタデータから自動的に取得されます。例:

    [
        "id",
        "name",
        "age",
        "birthday",
        "gender"
    ]
  • HBaseLike (widecolumn) タイプのテーブルの場合。例:

    [
        "STRING|rowkey",
        "INT|f:a",
        "DOUBLE|f:b"
    ]

はい

なし

Writer スクリプトデモ

  • MySQL データソースから LindormTable の Lindorm SQL テーブルにデータを書き込むジョブを設定します。

    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "checkSlave": true,
            "datasource": " ",
            "envType": 1,
            "column": [
              "id",
              "value"
            ],
            "socketTimeout": 3600000,
            "masterSlave": "slave",
            "connection": [
              {
                "datasource": " ",
                "table": []
              }
            ],
            "where": "",
            "splitPk": "",
            "encoding": "UTF-8",
            "print": true
          },
          "name": "mysqlReader",
          "category": "reader"
        },
        {
          "stepType": "lindorm",
          "parameter": {
            "nullMode": "skip",
            "datasource": "lindorm_datasource",
            "envType": 1,
            "column": [
              "id",
              "value"
            ],
            "dynamicColumn": "false",
            "table": "lindorm_table",
            "encoding": "utf8"
          },
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "jvmOption": "",
        "executeMode": null,
        "speed": {
          // 転送速度を byte/s 単位で設定します。DataX はこの速度に達するように試みますが、超えることはありません。
          "byte": 1048576
        },
        // エラー制限
        "errorLimit": {
          // エラーレコードの最大数。エラーレコードの数がこの値を超えると、ジョブは失敗します。
          "record": 0,
          // エラーレコードの最大許容率。たとえば、1.0 は 100% を意味し、0.02 は 2% を意味します。
          "percentage": 0.02
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • MySQL データソースから LindormTable の Lindorm HBaseLike (WideColumn) テーブルにデータを書き込むジョブを設定します。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "mysql",
                "parameter": {
                    "envType": 0,
                    "datasource": " ",
                    "column": [
                         "id",
                         "value"
                    ],
                    "connection": [
                        {
                            "datasource": " ",
                            "table": []
                        }
                    ],
                    "where": "",
                    "splitPk": "",
                    "encoding": "UTF-8"
                },
                "name": "Reader",
                "category": "reader"
    
            },
          {
              "stepType": "lindorm",
              "parameter": {
                "datasource": "lindorm_datasource",
                "table": "xxxxxx",
                "encoding": "utf8",
                "nullMode": "skip",
                "dynamicColumn": "false",
                "caching": 128,
                "column": [  // ソースからのフィールドを順にマッピングします。
                      "ROW|STRING", // 行キー。これは固定設定です。ソースの最初のフィールドが行キーにマッピングされます。この例では、id フィールドが行キーにマッピングされます。
                      "cf:name|STRING" // cf はカラムファミリー名を指定します (変更可能)。name は宛先の列名を指定します (変更可能)。
                ]
                },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting": {
            "jvmOption": "",
                    "errorLimit": {
                            "record": "0"
                    },
                    "speed": {
                        "concurrent": 3,
                        "throttle": false
                    }
        },
        "order": {
                "hops": [
                   {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
      }
  • MySQL データソースからコンピュートエンジンテーブルにデータを書き込むジョブを設定します。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "mysql",
                "parameter": {
                    "envType": 0,
                    "datasource": " ",
                    "column": [
                         "id",
                         "value"
                    ],
                    "connection": [
                        {
                            "datasource": " ",
                            "table": []
                        }
                    ],
                    "where": "",
                    "splitPk": "",
                    "encoding": "UTF-8"
                },
                "name": "Reader",
                "category": "reader"
    
            },
          {
              "stepType": "lindorm",
              "parameter": {
                  "datasource": "lindorm_datasource",
                  "table": "xxxxxx",
                  "column": [ 
                     "id",
                    "value"
                   ],
                  "formatType": "ICEBERG"
                },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting": {
            "jvmOption": "",
                    "errorLimit": {
                            "record": "0"
                    },
                    "speed": {
                        "concurrent": 3,
                        "throttle": false
                    }
        },
        "order": {
                "hops": [
                   {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
      }

Writer スクリプトのパラメーター

パラメーター

説明

必須

デフォルト値

table

データの書き込み先となる Lindorm テーブルの名前。テーブル名では大文字と小文字が区別されます。

はい

なし

encoding

LindormTable のコーデックを指定します。有効な値は UTF-8 と GBK です。このパラメーターは通常、バイナリで格納された Lindorm の byte[] 型を String 型に変換するために使用されます。

いいえ

UTF-8

columns

書き込むフィールドのリストを指定します。列をトリミングできます。つまり、エクスポートする列のサブセットを選択できます。また、列を並べ替えることもできます。つまり、テーブルスキーマとは異なる順序で列をエクスポートできます。

  • `table` タイプのテーブルの場合、列名のみを指定する必要があります。スキーマ情報はテーブルのメタデータから自動的に取得されます。

  • ワイドカラムテーブルの場合。

はい

なし

nullMode

このパラメーターは LindormTable にのみ適用されます。ソースデータからの null 値の処理方法を指定します。

  • SKIP:Lindorm に列を書き込みません。

  • EMPTY_BYTES:フィールド値が null の場合、Lindorm の対応するフィールドに空のバイト配列を書き込みます。

  • NULL:null 値を書き込みます。

  • DELETE:フィールド値が null の場合、Lindorm の対応するフィールドを削除します。

いいえ

EMPTY_BYTES

formatType

このパラメーターはコンピュートエンジンにのみ適用されます。同期タスクのテーブルのフォーマットを指定します。有効な値:

  • iceberg

  • parquet

  • orc

いいえ

なし