すべてのプロダクト
Search
ドキュメントセンター

Lindorm:RDS の完全同期と増分同期

最終更新日:Mar 29, 2026

ApsaraDB RDS から Lindorm へ既存データを移行する必要がある場合、または完全なデータベース移行を実行する場合、Lindorm Tunnel Service (LTS) は単一のタスクで完全データと増分データの両方を処理できます。LTS は最初に既存のすべての行をインポートし、その後自動的に増分同期に切り替わり、ソースからの継続的な変更をキャプチャします。

重要

ApsaraDB RDS から LTS を介した完全データ同期は、2023 年 3 月 10 日に廃止されました。その日付以降に購入された LTS インスタンスは、ApsaraDB RDS から完全データを同期できません。2023 年 3 月 10 日より前に購入された LTS インスタンスは、引き続きこの機能を使用できます。

仕組み

LTS は、次の 2 つのデータソースを組み合わせて使用します。

  • 完全データ:SQL クエリを使用して ApsaraDB RDS MySQL データベースから直接読み取ります。各クエリは 1 つの読み取りスレッドで実行されます。範囲にわたるクエリを分割することで、スループットを向上させ、クエリが失敗した場合のリトライ範囲を縮小します。

  • 増分データ:完全インポートが完了した後、ソーステーブルの変更を追跡する Data Transmission Service (DTS) タスクを介してキャプチャされます。

LTS は、増分同期に切り替える前に、常に完全データをインポートします。デフォルトでは、ソースからの削除操作は送信先に伝播されません。削除を伝播するには、構成で skipDelete: false を設定します。

前提条件

開始する前に、以下を確認してください。

  • ご利用の LTS インスタンスが 2023 年 3 月 10 日より前に購入されていること

  • LTS、送信先 ApsaraDB for HBase クラスター、およびソース ApsaraDB RDS インスタンスが同じ仮想プライベートクラウド (VPC) 内にあること

  • LTS Web UI にログインしていること (詳細については、「同期タスクの作成」をご参照ください)

移行タスクを作成する前に、次のデータソースも構成する必要があります。

制限事項

制約要件
完全データソースMySQL データベースである必要があります
増分データソースDTS タスクである必要があります
宛先SQL エンドポイントまたは HBase 互換エンドポイントを持つ LindormTable ノード

RDS 移行タスクの作成

  1. LTS Web UI で、[データのインポート] > [RDS マイグレーション] を選択します。

  2. [作成] をクリックします。

  3. RDS データソース、DTS データソース、および送信先データソースを選択します。

    Create a task

  4. [編集]」をクリックして、デフォルト構成を確認します。必要に応じて変更してください。リファレンスとして、「サンプル構成」をご参照ください。

  5. インポートするテーブルを選択し、次に [構成の生成] をクリックします。次のデフォルトの動作に注意してください:

    • LTS は、増分データより前に完全データをインポートします。

    • Cassandra Query Language (CQL) と互換性のある LindormTable ノードの場合、LTS はソース ApsaraDB RDS 列と同じ名前とデータの型を持つ送信先列を自動生成します。構成で列名とマッピングをオーバーライドできます。

    • LTS は、f という名前のカラムファミリーを自動生成します。各ソース列は、f カラムファミリー内の列にマッピングされます。行キーは、ソーステーブルのプライマリキー列の連結です。

    • ソースからインポート後に削除された行は、送信先からは削除されません。削除を伝播するには、構成で skipDelete: false を設定します (詳細については、「構成例」をご参照ください)。

  6. [作成] をクリックします。

構成例

すべての例では、データ変換に Jtwig テンプレート構文 ({{...}}) を使用します。完全な構文リファレンスについては、「Jtwig リファレンスマニュアル」をご参照ください。

以下の 2 つの例は、サポートされている 2 つの送信先エンドポイントタイプを対象としています。主な違いは、Lindorm SQL エンドポイントの例ではプライマリキー列を識別するために isPk を使用するのに対し、HBase 互換エンドポイントの例では専用の rowkey フィールドを使用することです。

構成パラメーター

パラメーター説明
querySql完全データインポート用の SQL クエリ。各クエリは 1 つの読み取りスレッドで実行されます。大きなテーブルを範囲ベースのクエリに分割することで、スループットを向上させ、リトライ範囲を縮小します。
name送信先テーブルの列名。
valueソーステーブルの列名、または計算値用の Jtwig 式。
isPk(Lindorm SQL エンドポイント) 列がプライマリキー列であるかどうか。プライマリキー列の場合は true に設定します。
rowkey.value(HBase 互換エンドポイント) 送信先テーブルの行キー。列名または Jtwig 式を受け入れます。
type送信先列のデータの型。オプション。デフォルトはソース列のデータの型です。
config.skipDeletetrue の場合、ソースからの削除操作は送信先に伝播されません。デフォルト:true
table.name名前空間:テーブル名 形式の送信先テーブル名。
table.parameter.compression送信先テーブルの圧縮アルゴリズム。Zstandard (ZSTD) を推奨します。
table.parameter.split送信先テーブルを事前パーティション分割するための分割キー。
sourceTableデータベース.テーブル名 形式のソーステーブル名。

Lindorm SQL エンドポイント

送信先 LindormTable ノードが SQL エンドポイントで構成されている場合は、この構成を使用します。

{
    "reader": {
        "querySql": [
            "select * from dts.cluster where id < 1000",
            "select * from dts.cluster where id >= 1000"
        ]
    },
    "writer": {
        "columns": [
            {
                "name": "f:id",
                "value": "id",
                "isPk": true,
                "type": "BIGINT"
            },
            {
                "name": "cluster_id",
                "value": "cluster_id",
                "isPk": false
            },
            {
                "name": "id_and_cluster",
                "value": "{{concat(id, cluster_id)}}",
                "isPk": true
            }
        ],
        "config": {
            "skipDelete": true
        },
        "table": {
            "name": "dts:cluster",
            "parameter": {
                "compression": "ZSTD"
            }
        },
        "sourceTable": "dts.cluster"
    }
}

HBase 互換エンドポイント

送信先 LindormTable ノードが HBase 互換エンドポイントで構成されている場合は、この構成を使用します。行キーは、isPk ではなく rowkey フィールドを使用して明示的に指定されます。

{
    "reader": {
        "querySql": [
            "select * from dts.cluster where id < 1000",
            "select * from dts.cluster where id >= 1000"
        ]
    },
    "writer": {
        "columns": [
            {
                "name": "f:id",
                "value": "id",
                "isPk": false
            },
            {
                "name": "f:cluster_id",
                "value": "cluster_id",
                "isPk": false
            },
            {
                "name": "f:id_and_cluster",
                "value": "{{concat(id, cluster_id)}}"
            }
        ],
        "rowkey": {
            "value": "id"
        },
        "config": {
            "skipDelete": true
        },
        "table": {
            "name": "dts:cluster",
            "parameter": {
                "compression": "ZSTD",
                "split": ["1", "5", "9", "b"]
            }
        },
        "sourceTable": "dts.cluster"
    }
}

関連トピック