本指南為您提供了從Amazon DynamoDB遷移至PolarDB PostgreSQL版詳盡的操作步驟和最佳實務。PolarDB提供了一套專用的遷移工具,通過全量同步與增量同步處理相結合的方式,協助您實現平滑、低停機時間的資料移轉。
遷移流程概述
遷移過程主要分為五個階段,由nimo-shake(資料同步,包括全量同步與增量同步處理)、nimo-full-check(資料校正)和PolarDBBackSync(資料反向同步)三個核心工具協同完成。
全量同步(Full Synchronization)
工具:
nimo-shake過程:工具首先自動在目標PolarDB叢集中建立與源端一致的表結構。隨後,通過並發
Scan操作高效讀取源端資料庫的全量資料,並使用BatchWriteItem批量寫入目的地組群。
增量同步處理(Incremental Synchronization)
工具:
nimo-shake過程:全量同步完成後,工具會自動利用AWS DynamoDB Streams機制,即時捕獲源端自遷移啟動以來的所有資料變更(增、刪、改),並將其同步到目的地組群,確保資料最終一致。該過程支援斷點續傳。
一致性校正(Consistency Validation)
工具:
nimo-full-check過程:在資料同步期間或之後,可隨時運行此工具。它會並發地從源端和目標端讀取資料,按主鍵進行比對,並產生詳細的差異報告,以驗證資料完整性。
(可選)反向同步(Reverse Synchronization)
工具:
PolarDBBackSync.jar(基於阿里雲Realtime ComputeFlink版)過程:驗證完資料一致性後,為確保業務復原時資料的完整性,可以建立從PolarDB PostgreSQL版到源端DynamoDB的反向同步。該工具基於Flink即時捕獲源端PolarDB的變更資料,並根據變更類型調用DynamoDB的PutItem或DeleteItem介面同步更新DynamoDB的資料。
業務割接(Business Cutover)
過程:當增量資料延遲極低且一致性校正無差異後,短暫停止業務寫入,待所有資料同步完畢,即可將應用串連切換至PolarDB叢集,完成遷移。
注意事項
效能影響:資料移轉過程,尤其是全量同步階段,會對來源資料庫和目標資料庫產生一定的讀寫負載。建議您在業務低峰期執行遷移,並提前評估資料庫的承載能力。
安全配置:在業務割接前,建議對目標PolarDB叢集的寫入許可權進行管控,僅允許資料同步工具的帳號寫入,防止意外資料汙染。
準備工作
在開始遷移前,請確保您已完成以下準備工作:
擷取工具包:
遷移工具包:NimoShake.tar.gz。其中包含
nimo-shake和nimo-full-check兩種遷移工具包。(可選)反向遷移工具包:PolarDBBackSync.jar。
PolarDB叢集:
為已有叢集或新叢集開啟相容DynamoDB能力,並擷取DynamoDB訪問地址和建立DynamoDB專用帳號用於API訪問的身份憑證(AccessKey)。
(可選)參數配置:若需配置反向同步,則需將PolarDB叢集的
wal_level參數修改為logical。由於該參數的調整需重啟叢集,建議在整體遷移流程開始之前完成此項設定。
AWS DynamoDB:
擷取AWS DynamoDB的訪問憑證(AccessKey ID和Secret Access Key)。
為需要遷移的源端DynamoDB表開啟DynamoDB Streams功能。
運行環境:準備一台ECS執行個體或其他能夠與PolarDB叢集及AWS DynamoDB已連線的服務器,以便運行遷移工具包。
遷移實施步驟
步驟一:配置並啟動資料同步
解壓
nimo-shake工具包,進入NimoShake/nimo-shake目錄,編輯設定檔conf/nimo-shake.conf。以下是核心配置項說明:參數
說明
樣本值
sync_mode同步模式。
all表示全量+增量,full表示僅全量。allsource.access_key_id源端AWS DynamoDB的AccessKey ID。
AKIAIOSFODNN7...source.secret_access_key源端AWS DynamoDB的Secret Access Key。
wJalrXUtnFEMI...source.region源端AWS DynamoDB所在的地區。
cn-north-1source.endpoint_url源端串連地址。對於AWS DynamoDB,此項留空。
-
target.address目標PolarDB的DynamoDB訪問地址(含http://與連接埠)。
http://pe-xxx.rwlb.rds...target.access_key_id目標PolarDB的DynamoDB帳號AccessKey。
your-polardb-access-keytarget.secret_access_key目標PolarDB的DynamoDB帳號SecretKey。
your-polardb-secret-keyfilter.collection.white表過濾白名單,表示僅同步指定表。多個表之間用
;相隔。說明不可與黑名單同時使用,同時指定時,代表全部同步。
c1;c2filter.collection.black表過濾黑名單,表示同步時,過濾指定表不進行同步。多個表之間用
;相隔。說明不可與白名單同時使用,同時指定時,代表全部同步。
c1;c2根據您的作業系統架構,選擇對應的二進位檔案啟動同步任務。
說明建議您在後台
nohup運行,避免因終端服務斷開而導致同步任務中斷。# 返回nimo-shake目錄 cd .. # 以Linux x86-64架構為例啟動任務 nohup ./bin/nimo-shake.linux.amd64 -conf=./conf/nimo-shake.conf > /dev/null 2>&1 &程式將首先進行全量同步,完成後自動轉入增量同步處理階段,並持續運行。
步驟二:執行資料一致性校正
進入
NimoShake/nimo-full-check目錄,編輯其設定檔conf/nimo-full-check.conf。配置項與nimo-shake基本一致,需分別填寫源端和目標端的串連資訊。啟動校正任務。
# 返回nimo-full-check目錄 cd .. # 以Linux x86-64架構為例 nohup ./bin/nimo-full-check.linux.amd64 -conf=./conf/nimo-full-check.conf > /dev/null 2>&1 &校正工具會在終端即時輸出進度,並將詳細日誌和資料差異報告分別存放在預設目錄
logs/和nimo-full-check-diff/中。
(可選)修複不一致資料:
當源端DynamoDB與目前PolarDB PostgreSQL版叢集中的資料不一致時,將在指定的diff目錄中展示不一致的表,例如:
說明diff目錄預設為
nimo-full-check-diff,您可以在conf/nimo-full-check.conf設定檔中進行配置,參數為diff_output_file。nimo-full-check-diff/ └── testtable-0 └── testtable-1執行修複任務:
nohup ./bin/nimo-repair.linux.amd64 -conf=./conf/nimo-repair.conf > /dev/null 2>&1 &
步驟三:(可選)配置反向同步
在準備進行業務割接前,您可以預先配置從PolarDB PostgreSQL版到源端DynamoDB的反向同步鏈路。該鏈路在業務正式切換至PolarDB期間啟動,用於實現資料的反向迴流,為業務復原提供保障。配置流程如下:
環境準備
確保PolarDB參數
wal_level的值為logical。建立高許可權資料庫帳號並授權:
(可選)如果您尚未建立高許可權帳號,請前往PolarDB控制台,在叢集的中建立高許可權帳號。
授權:在叢集的中,找到
polardb_internal_dynamodb資料庫,並修改其Owner為高許可權帳號。
建立邏輯複製槽和發布:使用高許可權帳號串連至
polardb_internal_dynamodb資料庫,執行以下SQL命令以建立一個邏輯複製槽,並向DynamoDB账号授予複製許可權,最終發布包含該資料庫下所有表的訂閱。您可以查看所建立邏輯複製槽的活躍狀態。-- 建立邏輯複製槽,'flink_slot' 名稱需與後續 Flink 配置保持一致 SELECT * FROM pg_create_logical_replication_slot('flink_slot', 'pgoutput'); -- 為之前建立的 DynamoDB 專用帳號授予 REPLICATION 許可權 -- 將 <your_dynamodb_user> 替換為您的 DynamoDB 專用帳號名 ALTER ROLE <your_dynamodb_user> REPLICATION; -- 為所有表建立發布,以便邏輯複製槽可以捕獲變更 CREATE PUBLICATION dbz_publication FOR ALL TABLES; -- 檢查複製槽狀態,此時 flink_slot 的 active 應為 f (false) SELECT * FROM pg_replication_slots;開通並配置Flink:
開通Realtime ComputeFlink版,並建立一個Flink工作空間。
重要Flink工作空間需與PolarDB叢集位於同一個VPC下。
為Flink工作空間配置公網訪問,使其能夠串連AWS DynamoDB。
配置PolarDB叢集的IP白名單:
在Flink控制台,單擊工作空間的詳情按鈕,在工作空間詳情頁面擷取其網段資訊。

前往PolarDB控制台,在叢集的中新增IP白名单分组,將Flink的網段資訊添加進去。
驗證PolarDB叢集與Flink工作空間連通性:
在Flink控制台,進入工作空間,單擊右上方的網路探測表徵圖。

填寫PolarDB叢集主節點的私人地址與連接埠,單擊探測。
彈窗提示網路探測成功連通,即叢集白名單配置正確。
部署Flink作業
下載反向同步工具:PolarDBBackSync.jar。
準備設定檔:建立名為
application.yaml的設定檔,內容如下:source: # PolarDB主節點的私網地址 hostname: pc-xxx.pg.polardb.rds.aliyuncs.com # PolarDB主節點的私網連接埠 port: 5432 # 之前建立的邏輯複製槽的名稱 slotName: flink_slot target: # 目標 AWS DynamoDB 的 region region: cn-north-1 # (可選) 表過濾配置,whiteTableSet 和 blackTableSet 只能聲明一個 filter: # whiteTableSet: 需要反向迴流的表 # blackTableSet: 不需要反向迴流的表 whiteTableSet: blackTableSet: # Flink 作業的檢查點(checkpoint)間隔,單位毫秒 checkpoint: interval: 3000上傳檔案:進入Flink控制台,找到並進入目標工作空間,在檔案管理頁面上傳
PolarDBBackSync.jar和application.yaml。安全儲存憑證:為避免明文暴露密鑰,建議使用Flink的變數管理功能儲存敏感資訊。在變數管理頁面新增以下四個變數:
變數名稱
變數值
polardbusernamePolarDB的DynamoDB帳號。
polardbpasswordPolarDB的DynamoDB帳號密碼。
dynamodbakAWS DynamoDB的AccessKey。
dynamodbskAWS DynamoDB的SecretKey。
部署並啟動作業:
進入作業營運頁面,選擇。
填寫以下主要參數,其他參數可根據業務環境進行配置。然後單擊部署。
參數名稱
填寫參考
部署模式
固定為流模式。
部署名稱
填寫作業部署名稱,此處以PolarDBBackSync為例。
引擎版本
固定為vvr-11.3-jdk11-flink-1.20。
JAR URI
選擇已上傳的
PolarDBBackSync.jar。Entry Point Class
固定為
org.example.PolarDBCdcJob。Entry Point Main Arguments
固定為:
--polardbusername ${secret_values.polardbusername}--polardbpassword ${secret_values.polardbpassword}--dynamodbak ${secret_values.dynamodbak}--dynamodbsk ${secret_values.dynamodbsk}附加依賴檔案
選擇已上傳的
application.yaml部署成功後,單擊。
驗證與清理
驗證:在作業啟動後,使用高許可權帳號串連至PolarDB叢集的
polardb_internal_dynamodb資料庫,並執行SELECT * FROM pg_replication_slots;。若複製槽flink_slot的active欄位變為t(true),則表示Flink作業已成功串連。此時即可在PolarDB叢集內開始匯入業務流量。清理:當不再需要進行反向同步時,您可執行以下步驟以釋放相關資源節省費用。
Realtime ComputeFlink版:
停止作業:前往Flink控制台,在目標工作空間中,進入作業營運頁面,找到目標作業並單擊停止
釋放執行個體:返回Flink控制台,找到目標工作空間,單擊釋放資源。
PolarDB叢集:使用高許可權帳號串連至
polardb_internal_dynamodb資料庫,執行以下命令刪除邏輯複製槽。SELECT pg_drop_replication_slot('flink_slot');
步驟四:執行業務割接
增量同步處理延遲較低且資料一致性校正無差異後,可計劃業務割接。當您準備好進行最終的業務切換時,請遵循以下嚴謹的步驟:
最終校正:在計劃的停機視窗前,反覆運行一致性校正工具,直至確認增量同步處理延遲極低,且資料差異數量降至0或可接受的範圍內。
停止源端寫入:在停機視窗開始時,暫停所有向源端AWS DynamoDB寫入資料的業務應用。
等待同步完成:觀察
nimo-shake的日誌,確認已無新的增量資料需要同步。最後一次校正:再次運行
nimo-full-check工具,確保源端和目標端的資料完全一致。停止同步工具:在確認資料完全同步後,停止
nimo-shake進程。切換應用串連:停止業務應用,將業務應用的資料庫連接配置,從AWS DynamoDB的地址切換為PolarDB的DynamoDB訪問地址。
(可選)開啟反向同步任務:在業務正式切換至PolarDB期間開啟反向同步任務,基於阿里雲Realtime ComputeFlink版實現資料的反向迴流,為業務復原提供保障。
啟動業務:重啟業務應用。至此,割接完成。
(可選)停止反向同步任務:在割接完成且業務穩定運行一段時間後,確認資料一致性滿足業務需求後,即可安全停止反向同步任務(停止Flink作業與釋放相關資源)。
附錄:為測試環境類比即時業務流量
如果您希望在測試環境中類比一個真實的、持續有資料寫入的遷移情境,可以使用以下Go語言範例程式碼。該代碼會向源端DynamoDB表周期性地寫入和更新資料。
此步驟僅用於測試和驗證遷移流程,在實際生產遷移中無需執行。
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// --- Configuration for your source AWS DynamoDB ---
var (
region = "cn-north-1" // AWS DynamoDB region
accessKey = "your-aws-access-key" // AWS DynamoDB access key
secretKey = "your-aws-secret-key" // AWS DynamoDB secret key
)
// --- Helper function to create a DynamoDB client ---
func createClient() (*dynamodb.Client, context.Context) {
ctx := context.Background()
sdkConfig, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
if err != nil {
log.Fatalf("Failed to load AWS config: %v", err)
}
client := dynamodb.NewFromConfig(sdkConfig, func(o *dynamodb.Options) {
o.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
})
return client, ctx
}
// --- Function to create a table and populate it with initial data ---
func initializeData(client *dynamodb.Client, ctx context.Context) {
tableName := "src1" // Example table name
// Create table if not exists
_, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: &tableName,
AttributeDefinitions: []types.AttributeDefinition{
{AttributeName: aws.String("pk"), AttributeType: types.ScalarAttributeTypeS},
},
KeySchema: []types.KeySchemaElement{
{AttributeName: aws.String("pk"), KeyType: types.KeyTypeHash},
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(100),
WriteCapacityUnits: aws.Int64(100),
},
})
if err != nil {
// Ignore if table already exists, fail on other errors
if _, ok := err.(*types.ResourceInUseException); !ok {
log.Fatalf("CreateTable failed for %s: %v", tableName, err)
}
}
fmt.Printf("Waiting for table '%s' to become active...\n", tableName)
waiter := dynamodb.NewTableExistsWaiter(client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{TableName: &tableName}, 5*time.Minute)
if err != nil {
log.Fatalf("Waiter failed for table %s: %v", tableName, err)
}
// Insert 100 sample items
for i := 0; i < 100; i++ {
pk := fmt.Sprintf("%s_user_%03d", tableName, i)
item := map[string]types.AttributeValue{
"pk": &types.AttributeValueMemberS{Value: pk},
"val": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", i)},
}
client.PutItem(ctx, &dynamodb.PutItemInput{TableName: &tableName, Item: item})
}
fmt.Printf("Inserted 100 initial items into '%s'.\n", tableName)
}
// --- Function to simulate continuous business traffic ---
func simulateTraffic(client *dynamodb.Client, ctx context.Context) {
tableName := "src1"
fmt.Println("Starting periodic updates to simulate traffic. Press Ctrl+C to stop.")
i := 0
for {
pk := fmt.Sprintf("%s_user_%03d", tableName, i%100)
newValue := fmt.Sprintf("%d", rand.Intn(1000))
_, err := client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
TableName: &tableName,
Key: map[string]types.AttributeValue{
"pk": &types.AttributeValueMemberS{Value: pk},
},
ExpressionAttributeValues: map[string]types.AttributeValue{
":newval": &types.AttributeValueMemberN{Value: newValue},
},
UpdateExpression: aws.String("SET val = :newval"),
})
if err != nil {
fmt.Printf("Update error: %v\n", err)
} else {
fmt.Printf("Updated pk=%s with new val=%s\n", pk, newValue)
}
i++
time.Sleep(1 * time.Second) // Update one record per second
}
}
func main() {
client, ctx := createClient()
initializeData(client, ctx)
simulateTraffic(client, ctx)
}