ApsaraDB RDS から Lindorm へ既存データを移行する必要がある場合、または完全なデータベース移行を実行する場合、Lindorm Tunnel Service (LTS) は単一のタスクで完全データと増分データの両方を処理できます。LTS は最初に既存のすべての行をインポートし、その後自動的に増分同期に切り替わり、ソースからの継続的な変更をキャプチャします。
ApsaraDB RDS から LTS を介した完全データ同期は、2023 年 3 月 10 日に廃止されました。その日付以降に購入された LTS インスタンスは、ApsaraDB RDS から完全データを同期できません。2023 年 3 月 10 日より前に購入された LTS インスタンスは、引き続きこの機能を使用できます。
仕組み
LTS は、次の 2 つのデータソースを組み合わせて使用します。
完全データ:SQL クエリを使用して ApsaraDB RDS MySQL データベースから直接読み取ります。各クエリは 1 つの読み取りスレッドで実行されます。範囲にわたるクエリを分割することで、スループットを向上させ、クエリが失敗した場合のリトライ範囲を縮小します。
増分データ:完全インポートが完了した後、ソーステーブルの変更を追跡する Data Transmission Service (DTS) タスクを介してキャプチャされます。
LTS は、増分同期に切り替える前に、常に完全データをインポートします。デフォルトでは、ソースからの削除操作は送信先に伝播されません。削除を伝播するには、構成で skipDelete: false を設定します。
前提条件
開始する前に、以下を確認してください。
ご利用の LTS インスタンスが 2023 年 3 月 10 日より前に購入されていること
LTS、送信先 ApsaraDB for HBase クラスター、およびソース ApsaraDB RDS インスタンスが同じ仮想プライベートクラウド (VPC) 内にあること
LTS Web UI にログインしていること (詳細については、「同期タスクの作成」をご参照ください)
移行タスクを作成する前に、次のデータソースも構成する必要があります。
LindormTable 送信先:LindormTable データソースの追加
ApsaraDB RDS ソース:ApsaraDB RDS データソースの追加
DTS ソース:DTS データソースの追加
制限事項
| 制約 | 要件 |
|---|---|
| 完全データソース | MySQL データベースである必要があります |
| 増分データソース | DTS タスクである必要があります |
| 宛先 | SQL エンドポイントまたは HBase 互換エンドポイントを持つ LindormTable ノード |
RDS 移行タスクの作成
LTS Web UI で、[データのインポート] > [RDS マイグレーション] を選択します。
[作成] をクリックします。
RDS データソース、DTS データソース、および送信先データソースを選択します。

「[編集]」をクリックして、デフォルト構成を確認します。必要に応じて変更してください。リファレンスとして、「サンプル構成」をご参照ください。
インポートするテーブルを選択し、次に [構成の生成] をクリックします。次のデフォルトの動作に注意してください:
LTS は、増分データより前に完全データをインポートします。
Cassandra Query Language (CQL) と互換性のある LindormTable ノードの場合、LTS はソース ApsaraDB RDS 列と同じ名前とデータの型を持つ送信先列を自動生成します。構成で列名とマッピングをオーバーライドできます。
LTS は、
fという名前のカラムファミリーを自動生成します。各ソース列は、fカラムファミリー内の列にマッピングされます。行キーは、ソーステーブルのプライマリキー列の連結です。ソースからインポート後に削除された行は、送信先からは削除されません。削除を伝播するには、構成で
skipDelete: falseを設定します (詳細については、「構成例」をご参照ください)。
[作成] をクリックします。
構成例
すべての例では、データ変換に Jtwig テンプレート構文 ({{...}}) を使用します。完全な構文リファレンスについては、「Jtwig リファレンスマニュアル」をご参照ください。
以下の 2 つの例は、サポートされている 2 つの送信先エンドポイントタイプを対象としています。主な違いは、Lindorm SQL エンドポイントの例ではプライマリキー列を識別するために isPk を使用するのに対し、HBase 互換エンドポイントの例では専用の rowkey フィールドを使用することです。
構成パラメーター
| パラメーター | 説明 |
|---|---|
querySql | 完全データインポート用の SQL クエリ。各クエリは 1 つの読み取りスレッドで実行されます。大きなテーブルを範囲ベースのクエリに分割することで、スループットを向上させ、リトライ範囲を縮小します。 |
name | 送信先テーブルの列名。 |
value | ソーステーブルの列名、または計算値用の Jtwig 式。 |
isPk | (Lindorm SQL エンドポイント) 列がプライマリキー列であるかどうか。プライマリキー列の場合は true に設定します。 |
rowkey.value | (HBase 互換エンドポイント) 送信先テーブルの行キー。列名または Jtwig 式を受け入れます。 |
type | 送信先列のデータの型。オプション。デフォルトはソース列のデータの型です。 |
config.skipDelete | true の場合、ソースからの削除操作は送信先に伝播されません。デフォルト:true。 |
table.name | 名前空間:テーブル名 形式の送信先テーブル名。 |
table.parameter.compression | 送信先テーブルの圧縮アルゴリズム。Zstandard (ZSTD) を推奨します。 |
table.parameter.split | 送信先テーブルを事前パーティション分割するための分割キー。 |
sourceTable | データベース.テーブル名 形式のソーステーブル名。 |
Lindorm SQL エンドポイント
送信先 LindormTable ノードが SQL エンドポイントで構成されている場合は、この構成を使用します。
{
"reader": {
"querySql": [
"select * from dts.cluster where id < 1000",
"select * from dts.cluster where id >= 1000"
]
},
"writer": {
"columns": [
{
"name": "f:id",
"value": "id",
"isPk": true,
"type": "BIGINT"
},
{
"name": "cluster_id",
"value": "cluster_id",
"isPk": false
},
{
"name": "id_and_cluster",
"value": "{{concat(id, cluster_id)}}",
"isPk": true
}
],
"config": {
"skipDelete": true
},
"table": {
"name": "dts:cluster",
"parameter": {
"compression": "ZSTD"
}
},
"sourceTable": "dts.cluster"
}
}HBase 互換エンドポイント
送信先 LindormTable ノードが HBase 互換エンドポイントで構成されている場合は、この構成を使用します。行キーは、isPk ではなく rowkey フィールドを使用して明示的に指定されます。
{
"reader": {
"querySql": [
"select * from dts.cluster where id < 1000",
"select * from dts.cluster where id >= 1000"
]
},
"writer": {
"columns": [
{
"name": "f:id",
"value": "id",
"isPk": false
},
{
"name": "f:cluster_id",
"value": "cluster_id",
"isPk": false
},
{
"name": "f:id_and_cluster",
"value": "{{concat(id, cluster_id)}}"
}
],
"rowkey": {
"value": "id"
},
"config": {
"skipDelete": true
},
"table": {
"name": "dts:cluster",
"parameter": {
"compression": "ZSTD",
"split": ["1", "5", "9", "b"]
}
},
"sourceTable": "dts.cluster"
}
}