全部產品
Search
文件中心

PolarDB:資料移轉指南

更新時間:Jan 17, 2026

本指南為您提供了從Amazon DynamoDB遷移至PolarDB PostgreSQL版詳盡的操作步驟和最佳實務。PolarDB提供了一套專用的遷移工具,通過全量同步與增量同步處理相結合的方式,協助您實現平滑、低停機時間的資料移轉。

遷移流程概述

遷移過程主要分為五個階段,由nimo-shake(資料同步,包括全量同步與增量同步處理)、nimo-full-check(資料校正)和PolarDBBackSync(資料反向同步)三個核心工具協同完成。

image
  1. 全量同步(Full Synchronization)

    • 工具:nimo-shake

    • 過程:工具首先自動在目標PolarDB叢集中建立與源端一致的表結構。隨後,通過並發Scan操作高效讀取源端資料庫的全量資料,並使用BatchWriteItem批量寫入目的地組群。

  2. 增量同步處理(Incremental Synchronization)

    • 工具:nimo-shake

    • 過程:全量同步完成後,工具會自動利用AWS DynamoDB Streams機制,即時捕獲源端自遷移啟動以來的所有資料變更(增、刪、改),並將其同步到目的地組群,確保資料最終一致。該過程支援斷點續傳。

  3. 一致性校正(Consistency Validation)

    • 工具:nimo-full-check

    • 過程:在資料同步期間或之後,可隨時運行此工具。它會並發地從源端和目標端讀取資料,按主鍵進行比對,並產生詳細的差異報告,以驗證資料完整性。

  4. (可選)反向同步(Reverse Synchronization)

    • 工具:PolarDBBackSync.jar (基於阿里雲Realtime ComputeFlink版)

    • 過程:驗證完資料一致性後,為確保業務復原時資料的完整性,可以建立從PolarDB PostgreSQL版到源端DynamoDB的反向同步。該工具基於Flink即時捕獲源端PolarDB的變更資料,並根據變更類型調用DynamoDB的PutItem或DeleteItem介面同步更新DynamoDB的資料。

  5. 業務割接(Business Cutover)

    • 過程:當增量資料延遲極低且一致性校正無差異後,短暫停止業務寫入,待所有資料同步完畢,即可將應用串連切換至PolarDB叢集,完成遷移。

注意事項

  • 效能影響:資料移轉過程,尤其是全量同步階段,會對來源資料庫和目標資料庫產生一定的讀寫負載。建議您在業務低峰期執行遷移,並提前評估資料庫的承載能力。

  • 安全配置:在業務割接前,建議對目標PolarDB叢集的寫入許可權進行管控,僅允許資料同步工具的帳號寫入,防止意外資料汙染。

準備工作

在開始遷移前,請確保您已完成以下準備工作:

  1. 擷取工具包

    1. 遷移工具包:NimoShake.tar.gz。其中包含nimo-shakenimo-full-check兩種遷移工具包。

    2. (可選)反向遷移工具包:PolarDBBackSync.jar

  2. PolarDB叢集

    1. 為已有叢集或新叢集開啟相容DynamoDB能力,並擷取DynamoDB訪問地址建立DynamoDB專用帳號用於API訪問的身份憑證(AccessKey)。

    2. (可選)參數配置:若需配置反向同步,則需將PolarDB叢集的wal_level參數修改為logical。由於該參數的調整需重啟叢集,建議在整體遷移流程開始之前完成此項設定。

  3. AWS DynamoDB

    1. 擷取AWS DynamoDB的訪問憑證(AccessKey ID和Secret Access Key)。

    2. 為需要遷移的源端DynamoDB表開啟DynamoDB Streams功能。

  4. 運行環境:準備一台ECS執行個體或其他能夠與PolarDB叢集及AWS DynamoDB已連線的服務器,以便運行遷移工具包。

遷移實施步驟

步驟一:配置並啟動資料同步

  1. 解壓nimo-shake工具包,進入NimoShake/nimo-shake目錄,編輯設定檔conf/nimo-shake.conf。以下是核心配置項說明:

    參數

    說明

    樣本值

    sync_mode

    同步模式。all表示全量+增量,full表示僅全量。

    all

    source.access_key_id

    源端AWS DynamoDB的AccessKey ID。

    AKIAIOSFODNN7...

    source.secret_access_key

    源端AWS DynamoDB的Secret Access Key。

    wJalrXUtnFEMI...

    source.region

    源端AWS DynamoDB所在的地區。

    cn-north-1

    source.endpoint_url

    源端串連地址。對於AWS DynamoDB,此項留空。

    -

    target.address

    目標PolarDB的DynamoDB訪問地址(含http://與連接埠)。

    http://pe-xxx.rwlb.rds...

    target.access_key_id

    目標PolarDB的DynamoDB帳號AccessKey。

    your-polardb-access-key

    target.secret_access_key

    目標PolarDB的DynamoDB帳號SecretKey。

    your-polardb-secret-key

    filter.collection.white

    表過濾白名單,表示僅同步指定表。多個表之間用;相隔。

    說明

    不可與黑名單同時使用,同時指定時,代表全部同步。

    c1;c2

    filter.collection.black

    表過濾黑名單,表示同步時,過濾指定表不進行同步。多個表之間用;相隔。

    說明

    不可與白名單同時使用,同時指定時,代表全部同步。

    c1;c2

  2. 根據您的作業系統架構,選擇對應的二進位檔案啟動同步任務。

    說明

    建議您在後台nohup運行,避免因終端服務斷開而導致同步任務中斷。

    # 返回nimo-shake目錄
    cd ..
    # 以Linux x86-64架構為例啟動任務
    nohup ./bin/nimo-shake.linux.amd64 -conf=./conf/nimo-shake.conf > /dev/null 2>&1 &

    程式將首先進行全量同步,完成後自動轉入增量同步處理階段,並持續運行。

步驟二:執行資料一致性校正

  1. 進入NimoShake/nimo-full-check目錄,編輯其設定檔conf/nimo-full-check.conf。配置項與nimo-shake基本一致,需分別填寫源端和目標端的串連資訊。

  2. 啟動校正任務。

    # 返回nimo-full-check目錄
    cd ..
    # 以Linux x86-64架構為例
    nohup ./bin/nimo-full-check.linux.amd64 -conf=./conf/nimo-full-check.conf > /dev/null 2>&1 &

    校正工具會在終端即時輸出進度,並將詳細日誌和資料差異報告分別存放在預設目錄logs/nimo-full-check-diff/中。 image.png

  3. (可選)修複不一致資料:

    1. 當源端DynamoDB與目前PolarDB PostgreSQL版叢集中的資料不一致時,將在指定的diff目錄中展示不一致的表,例如:

      說明

      diff目錄預設為nimo-full-check-diff,您可以在conf/nimo-full-check.conf設定檔中進行配置,參數為diff_output_file

      nimo-full-check-diff/
        └── testtable-0
        └── testtable-1
    2. 執行修複任務:

      nohup ./bin/nimo-repair.linux.amd64 -conf=./conf/nimo-repair.conf > /dev/null 2>&1 &

步驟三:(可選)配置反向同步

在準備進行業務割接前,您可以預先配置從PolarDB PostgreSQL版到源端DynamoDB的反向同步鏈路。該鏈路在業務正式切換至PolarDB期間啟動,用於實現資料的反向迴流,為業務復原提供保障。配置流程如下:

環境準備

  1. 確保PolarDB參數wal_level的值為logical

  2. 建立高許可權資料庫帳號並授權

    1. (可選)如果您尚未建立高許可權帳號,請前往PolarDB控制台,在叢集的配置与管理 > 账号管理中建立高許可權帳號。

    2. 授權:在叢集的配置与管理 > 数据库管理中,找到polardb_internal_dynamodb資料庫,並修改其Owner為高許可權帳號。

  3. 建立邏輯複製槽和發布:使用高許可權帳號串連至polardb_internal_dynamodb資料庫,執行以下SQL命令以建立一個邏輯複製槽,並向DynamoDB账号授予複製許可權,最終發布包含該資料庫下所有表的訂閱。您可以查看所建立邏輯複製槽的活躍狀態。

    -- 建立邏輯複製槽,'flink_slot' 名稱需與後續 Flink 配置保持一致
    SELECT * FROM pg_create_logical_replication_slot('flink_slot', 'pgoutput');
    
    -- 為之前建立的 DynamoDB 專用帳號授予 REPLICATION 許可權
    -- 將 <your_dynamodb_user> 替換為您的 DynamoDB 專用帳號名
    ALTER ROLE <your_dynamodb_user> REPLICATION;
    
    -- 為所有表建立發布,以便邏輯複製槽可以捕獲變更
    CREATE PUBLICATION dbz_publication FOR ALL TABLES;
    
    -- 檢查複製槽狀態,此時 flink_slot 的 active 應為 f (false)
    SELECT * FROM pg_replication_slots;
  4. 開通並配置Flink

    1. 開通Realtime ComputeFlink版,並建立一個Flink工作空間。

      重要

      Flink工作空間需與PolarDB叢集位於同一個VPC下。

    2. 為Flink工作空間配置公網訪問,使其能夠串連AWS DynamoDB。

  5. 配置PolarDB叢集的IP白名單

    1. 在Flink控制台,單擊工作空間的詳情按鈕,在工作空間詳情頁面擷取其網段資訊。image

    2. 前往PolarDB控制台,在叢集的配置与管理 > 集群白名单新增IP白名单分组,將Flink的網段資訊添加進去。

  6. 驗證PolarDB叢集與Flink工作空間連通性

    1. 在Flink控制台,進入工作空間,單擊右上方的網路探測表徵圖。image

    2. 填寫PolarDB叢集主節點的私人地址與連接埠,單擊探測

    3. 彈窗提示網路探測成功連通,即叢集白名單配置正確。

部署Flink作業

  1. 下載反向同步工具PolarDBBackSync.jar

  2. 準備設定檔:建立名為application.yaml的設定檔,內容如下:

    source:
        # PolarDB主節點的私網地址
        hostname: pc-xxx.pg.polardb.rds.aliyuncs.com
        # PolarDB主節點的私網連接埠
        port: 5432
        # 之前建立的邏輯複製槽的名稱
        slotName: flink_slot
    
    target:
      # 目標 AWS DynamoDB 的 region
      region: cn-north-1
    
    # (可選) 表過濾配置,whiteTableSet 和 blackTableSet 只能聲明一個
    filter:
      # whiteTableSet: 需要反向迴流的表
      # blackTableSet: 不需要反向迴流的表
      whiteTableSet:
      blackTableSet:
    
    # Flink 作業的檢查點(checkpoint)間隔,單位毫秒
    checkpoint:
      interval: 3000
  3. 上傳檔案:進入Flink控制台,找到並進入目標工作空間,在檔案管理頁面上傳PolarDBBackSync.jarapplication.yaml

  4. 安全儲存憑證:為避免明文暴露密鑰,建議使用Flink的變數管理功能儲存敏感資訊。在變數管理頁面新增以下四個變數:

    變數名稱

    變數值

    polardbusername

    PolarDB的DynamoDB帳號。

    polardbpassword

    PolarDB的DynamoDB帳號密碼。

    dynamodbak

    AWS DynamoDB的AccessKey。

    dynamodbsk

    AWS DynamoDB的SecretKey。

  5. 部署並啟動作業

    1. 進入作業營運頁面,選擇部署作業 > JAR作業

    2. 填寫以下主要參數,其他參數可根據業務環境進行配置。然後單擊部署

      參數名稱

      填寫參考

      部署模式

      固定為流模式

      部署名稱

      填寫作業部署名稱,此處以PolarDBBackSync為例。

      引擎版本

      固定為vvr-11.3-jdk11-flink-1.20

      JAR URI

      選擇已上傳的PolarDBBackSync.jar

      Entry Point Class

      固定為org.example.PolarDBCdcJob

      Entry Point Main Arguments

      固定為:

      --polardbusername ${secret_values.polardbusername}

      --polardbpassword ${secret_values.polardbpassword}

      --dynamodbak ${secret_values.dynamodbak}

      --dynamodbsk ${secret_values.dynamodbsk}

      附加依賴檔案

      選擇已上傳的application.yaml

    3. 部署成功後,單擊啟動 > 無狀態啟動

驗證與清理

  • 驗證:在作業啟動後,使用高許可權帳號串連至PolarDB叢集的polardb_internal_dynamodb資料庫,並執行SELECT * FROM pg_replication_slots;。若複製槽flink_slot的active欄位變為t(true),則表示Flink作業已成功串連。此時即可在PolarDB叢集內開始匯入業務流量。

  • 清理:當不再需要進行反向同步時,您可執行以下步驟以釋放相關資源節省費用。

    • Realtime ComputeFlink版

      • 停止作業:前往Flink控制台,在目標工作空間中,進入作業營運頁面,找到目標作業並單擊停止

      • 釋放執行個體:返回Flink控制台,找到目標工作空間,單擊釋放資源

    • PolarDB叢集:使用高許可權帳號串連至polardb_internal_dynamodb資料庫,執行以下命令刪除邏輯複製槽。

      SELECT pg_drop_replication_slot('flink_slot');

步驟四:執行業務割接

增量同步處理延遲較低且資料一致性校正無差異後,可計劃業務割接。當您準備好進行最終的業務切換時,請遵循以下嚴謹的步驟:

  1. 最終校正:在計劃的停機視窗前,反覆運行一致性校正工具,直至確認增量同步處理延遲極低,且資料差異數量降至0或可接受的範圍內。

  2. 停止源端寫入:在停機視窗開始時,暫停所有向源端AWS DynamoDB寫入資料的業務應用。

  3. 等待同步完成:觀察nimo-shake的日誌,確認已無新的增量資料需要同步。

  4. 最後一次校正:再次運行nimo-full-check工具,確保源端和目標端的資料完全一致。

  5. 停止同步工具:在確認資料完全同步後,停止nimo-shake進程。

  6. 切換應用串連:停止業務應用,將業務應用的資料庫連接配置,從AWS DynamoDB的地址切換為PolarDB的DynamoDB訪問地址。

  7. (可選)開啟反向同步任務:在業務正式切換至PolarDB期間開啟反向同步任務,基於阿里雲Realtime ComputeFlink版實現資料的反向迴流,為業務復原提供保障。

  8. 啟動業務:重啟業務應用。至此,割接完成。

  9. (可選)停止反向同步任務:在割接完成且業務穩定運行一段時間後,確認資料一致性滿足業務需求後,即可安全停止反向同步任務(停止Flink作業與釋放相關資源)

附錄:為測試環境類比即時業務流量

如果您希望在測試環境中類比一個真實的、持續有資料寫入的遷移情境,可以使用以下Go語言範例程式碼。該代碼會向源端DynamoDB表周期性地寫入和更新資料。

說明

此步驟僅用於測試和驗證遷移流程,在實際生產遷移中無需執行。

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/credentials"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

// --- Configuration for your source AWS DynamoDB ---
var (
    region    = "cn-north-1"       // AWS DynamoDB region
    accessKey = "your-aws-access-key" // AWS DynamoDB access key
    secretKey = "your-aws-secret-key" // AWS DynamoDB secret key
)

// --- Helper function to create a DynamoDB client ---
func createClient() (*dynamodb.Client, context.Context) {
    ctx := context.Background()
    sdkConfig, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
    if err != nil {
        log.Fatalf("Failed to load AWS config: %v", err)
    }
    client := dynamodb.NewFromConfig(sdkConfig, func(o *dynamodb.Options) {
        o.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
    })
    return client, ctx
}

// --- Function to create a table and populate it with initial data ---
func initializeData(client *dynamodb.Client, ctx context.Context) {
    tableName := "src1" // Example table name

    // Create table if not exists
    _, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{
        TableName: &tableName,
        AttributeDefinitions: []types.AttributeDefinition{
            {AttributeName: aws.String("pk"), AttributeType: types.ScalarAttributeTypeS},
        },
        KeySchema: []types.KeySchemaElement{
            {AttributeName: aws.String("pk"), KeyType: types.KeyTypeHash},
        },
        ProvisionedThroughput: &types.ProvisionedThroughput{
            ReadCapacityUnits:  aws.Int64(100),
            WriteCapacityUnits: aws.Int64(100),
        },
    })
    if err != nil {
		// Ignore if table already exists, fail on other errors
        if _, ok := err.(*types.ResourceInUseException); !ok {
			log.Fatalf("CreateTable failed for %s: %v", tableName, err)
		}
    }

    fmt.Printf("Waiting for table '%s' to become active...\n", tableName)
    waiter := dynamodb.NewTableExistsWaiter(client)
    err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{TableName: &tableName}, 5*time.Minute)
    if err != nil {
        log.Fatalf("Waiter failed for table %s: %v", tableName, err)
    }

    // Insert 100 sample items
    for i := 0; i < 100; i++ {
        pk := fmt.Sprintf("%s_user_%03d", tableName, i)
        item := map[string]types.AttributeValue{
            "pk":  &types.AttributeValueMemberS{Value: pk},
            "val": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", i)},
        }
        client.PutItem(ctx, &dynamodb.PutItemInput{TableName: &tableName, Item: item})
    }
    fmt.Printf("Inserted 100 initial items into '%s'.\n", tableName)
}

// --- Function to simulate continuous business traffic ---
func simulateTraffic(client *dynamodb.Client, ctx context.Context) {
    tableName := "src1"
    fmt.Println("Starting periodic updates to simulate traffic. Press Ctrl+C to stop.")
    i := 0
    for {
        pk := fmt.Sprintf("%s_user_%03d", tableName, i%100)
        newValue := fmt.Sprintf("%d", rand.Intn(1000))
        _, err := client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
            TableName: &tableName,
            Key: map[string]types.AttributeValue{
                "pk": &types.AttributeValueMemberS{Value: pk},
            },
            ExpressionAttributeValues: map[string]types.AttributeValue{
                ":newval": &types.AttributeValueMemberN{Value: newValue},
            },
            UpdateExpression: aws.String("SET val = :newval"),
        })
        if err != nil {
            fmt.Printf("Update error: %v\n", err)
        } else {
            fmt.Printf("Updated pk=%s with new val=%s\n", pk, newValue)
        }
        i++
        time.Sleep(1 * time.Second) // Update one record per second
    }
}

func main() {
    client, ctx := createClient()
    initializeData(client, ctx)
    simulateTraffic(client, ctx)
}