本指南為您提供了從Amazon DynamoDB遷移至PolarDB PostgreSQL版詳盡的操作步驟和最佳實務。PolarDB提供了一套專用的遷移工具,通過全量同步與增量同步處理相結合的方式,協助您實現平滑、低停機時間的資料移轉。
遷移流程概述
遷移過程主要分為五個階段,由nimo-shake(資料同步,包括全量同步與增量同步處理)、nimo-full-check(資料校正)和PolarDBBackSync(資料反向同步)三個核心工具協同完成。

全量同步(Full Synchronization)
工具:
nimo-shake過程:工具首先自動在目標PolarDB叢集中建立與源端一致的表結構。隨後,通過並發
Scan操作高效讀取源端資料庫的全量資料,並使用BatchWriteItem批量寫入目的地組群。
增量同步處理(Incremental Synchronization)
工具:
nimo-shake過程:全量同步完成後,工具會自動利用AWS DynamoDB Streams機制,即時捕獲源端自遷移啟動以來的所有資料變更(增、刪、改),並將其同步到目的地組群,確保資料最終一致。該過程支援斷點續傳。
一致性校正(Consistency Validation)
工具:
nimo-full-check過程:在資料同步期間或之後,可隨時運行此工具。它會並發地從源端和目標端讀取資料,按主鍵進行比對,並產生詳細的差異報告,以驗證資料完整性。
(可選)反向同步(Reverse Synchronization)
工具:
PolarDBBackSync.jar(基於阿里雲Realtime ComputeFlink版)過程:驗證完資料一致性後,為確保業務復原時資料的完整性,可以建立從PolarDB PostgreSQL版到源端DynamoDB的反向同步。該工具基於Flink即時捕獲源端PolarDB的變更資料,並根據變更類型調用DynamoDB的PutItem或DeleteItem介面同步更新DynamoDB的資料。
業務割接(Business Cutover)
過程:當增量資料延遲極低且一致性校正無差異後,短暫停止業務寫入,待所有資料同步完畢,即可將應用串連切換至PolarDB叢集,完成遷移。
注意事項
效能影響:資料移轉過程,尤其是全量同步階段,會對來源資料庫和目標資料庫產生一定的讀寫負載。建議您在業務低峰期執行遷移,並提前評估資料庫的承載能力。
安全配置:在業務割接前,建議對目標PolarDB叢集的寫入許可權進行管控,僅允許資料同步工具的帳號寫入,防止意外資料汙染。
準備工作
在開始遷移前,請確保您已完成以下準備工作:
擷取工具包:
遷移工具包:NimoShake_v20260616.zip。其中包含
nimo-shake,nimo-full-check和nimo-repair三種遷移工具包。(可選)反向遷移工具包:PolarDBBackSync.jar。
PolarDB叢集:
為已有叢集或新叢集開啟相容DynamoDB能力,並擷取DynamoDB訪問地址和建立DynamoDB專用帳號用於API訪問的身份憑證(AccessKey)。
(可選)參數配置:若需配置反向同步,則需將PolarDB叢集的
wal_level參數修改為logical。由於該參數的調整需重啟叢集,建議在整體遷移流程開始之前完成此項設定。
AWS DynamoDB:
擷取AWS DynamoDB的訪問憑證(AccessKey ID和Secret Access Key)。
運行環境:準備一台ECS執行個體或其他能夠與PolarDB叢集及AWS DynamoDB已連線的服務器,以便運行遷移工具包。
遷移實施步驟
步驟一:配置並啟動資料同步
解壓
NimoShake.zip,進入NimoShake目錄,編輯統一設定檔conf/nimo.conf。nimo-shake、nimo-full-check、nimo-repair三個工具共用此檔案,每個工具唯讀取自己能識別的參數。以下是啟動同步所需的核心配置項:參數
說明
樣本值
sync_mode同步模式。
all表示全量+增量,full表示僅全量。allsource.access_key_id源端AWS DynamoDB的AccessKey ID。
AKIAIOSFODNN7...source.secret_access_key源端AWS DynamoDB的Secret Access Key。
wJalrXUtnFEMI...source.region源端AWS DynamoDB所在的地區。
cn-north-1target.endpoint_url目標PolarDB的DynamoDB訪問地址(含連接埠)。
http://pe-xxx.rwlb.rds...target.access_key_id目標PolarDB的DynamoDB帳號AccessKey。
your-polardb-access-keytarget.secret_access_key目標PolarDB的DynamoDB帳號SecretKey。
your-polardb-secret-keyfilter.collection.white表過濾白名單,多個表之間用
;相隔,與黑名單不可同時使用。c1;c2filter.collection.black表過濾黑名單,多個表之間用
;相隔,與白名單不可同時使用。c1;c2若您希望使用 基於 S3 快照的校正模式(推薦,詳見步驟二),還需配置以下 PostgreSQL 原生串連參數:
參數
說明
樣本值
s3.export_state_file匯出狀態檔案路徑,由
nimo-shake寫入,nimo-full-check讀取。../nimo-shake-s3-exports.jsons3.export_bucketDynamoDB資料匯出的目標S3儲存桶名稱。
my-export-buckets3.export_prefixS3路徑首碼,用於組織匯出檔案。
exports/my-project/target.pg.endpoint_urlPolarDB PostgreSQL原生連接埠地址(
host:port格式)。pc-xxx.pg.polardb.rds...:5432target.pg.userPolarDB PostgreSQL使用者名稱(同時作為schema名)。
說明DynamoDB 帳號本質上是普通資料庫帳號,其存取金鑰(SK)由使用者設定的密碼按相容演算法加密產生,因此 target.pg.user 與 target.access_key_id 實際對應同一帳號,而 target.pg.password 是使用者自訂密碼,SK 則是基於該密碼派生的密鑰。
your-usernametarget.pg.passwordPolarDB PostgreSQL密碼。
your-password(可選)全量同步 PG 直寫入模式
預設情況下,全量同步使用原生 PostgreSQL 直寫入模式,通過標準 PostgreSQL 協議直接將資料寫入 PolarDB,繞過 DynamoDB 相容層 API,效能更高。如需回退到 DynamoDB 層 API(
BatchWriteItem,每次最多寫入 25 條),可手動將該參數改為dynamodb。可在conf/nimo.conf的[SHAKE]區塊中配置以下參數:參數
說明
預設值
full.write_protocol全量寫入協議。
postgresql(預設)為 PG 直寫高效能模式;dynamodb為 DynamoDB 層 API(BatchWriteItem,單次最大 25 條)。postgresqlfull.document.write.batch每次批量寫入的文檔數。PG 直寫時不受 25 條限制,可適當調大以進一步提升寫入吞吐。如切回
dynamodb協議,請將此值改回 25。1000full.document.concurrency每張表的並發寫入線程數。PG 直寫入模式下,工具會自動按此值設定串連池大小,確保線程與串連一一對應。
4說明PG 直寫入模式同樣需要配置
target.pg.*三項參數(與 s3/incr 校正模式共用同一組配置,無需重複填寫)。該模式僅影響全量同步階段的寫入路徑;增量同步處理始終通過 DynamoDB 相容層 API 寫入。
根據您的作業系統架構,選擇對應的二進位檔案啟動同步任務。
說明建議您在後台
nohup運行,避免因終端服務斷開而導致同步任務中斷。# 在NimoShake目錄下執行 nohup ./bin/nimo-shake.linux.amd64 -conf=./conf/nimo.conf > /dev/null 2>&1 &程式將首先進行全量同步,完成後自動轉入增量同步處理階段,並持續運行。
步驟二:執行資料一致性校正
nimo-full-check與nimo-shake共用同一份conf/nimo.conf,無需切換目錄或編輯額外的設定檔。通過mode參數選擇校正模式:scan:即時掃描源端與目標端資料進行對比。配置最簡單,無需額外基礎設施,但校正期間會持續消耗 DynamoDB 讀取配額。s3(推薦):全量同步完成時,nimo-shake會自動將源端資料匯出至 S3,並產生一份時間點快照。nimo-full-check以該快照為基準進行校正,無需再次掃描源端 DynamoDB。需在步驟一中配置 S3 相關參數。incr(配合s3模式使用):增量同步處理階段,nimo-shake會將每條變更記錄的主鍵寫入 PolarDB 中的檢查表(ct_{user}_{table})。nimo-full-check讀取檢查表,僅對這些發生過變更的記錄進行定向校正。兩種校正方式對比
scan 模式
s3 / incr 模式
所需停機時間
較長。校正期間須停止業務寫入,避免源端資料在掃描過程中發生變化。
更短。基於快照校正,停機後僅需確認增量追平即可割接。
對源端的影響
持續消耗 DynamoDB 讀取配額。
全量校正基於 S3 快照,不消耗 DynamoDB 配額;增量校正(incr 模式)僅對有變更的記錄發起查詢,配額消耗較少。
配置複雜度
低。
較高,需額外配置 S3 和 PolarDB PostgreSQL 串連。
啟動校正任務。
nohup ./bin/nimo-full-check.linux.amd64 -conf=./conf/nimo.conf > /dev/null 2>&1 &校正工具會將詳細日誌和資料差異報告分別存放在預設目錄
logs/和nimo-full-check-diff/中。
(可選)修複不一致資料:
當源端DynamoDB與目前PolarDB PostgreSQL版叢集中的資料不一致時,將在指定的diff目錄中展示不一致的表,例如:
說明diff目錄預設為
nimo-full-check-diff,您可以在conf/nimo.conf中進行配置,參數為diff_output_file。nimo-full-check-diff/ └── testtable-0 └── testtable-1執行修複任務:
nohup ./bin/nimo-repair.linux.amd64 -conf=./conf/nimo-repair.conf > /dev/null 2>&1 &
步驟三:(可選)配置反向同步
在準備進行業務割接前,您可以預先配置從PolarDB PostgreSQL版到源端DynamoDB的反向同步鏈路。該鏈路在業務正式切換至PolarDB期間啟動,用於實現資料的反向迴流,為業務復原提供保障。配置流程如下:
環境準備
確保PolarDB參數
wal_level的值為logical。建立高許可權資料庫帳號並授權:
(可選)如果您尚未建立高許可權帳號,請前往PolarDB控制台,在叢集的中建立高許可權帳號。
建立邏輯複製槽和發布:使用高許可權帳號串連至
polardb_internal_dynamodb資料庫,執行以下SQL命令以建立一個邏輯複製槽,並向DynamoDB账号授予複製許可權,最終發布包含該資料庫下所有表的訂閱。您可以查看所建立邏輯複製槽的活躍狀態。-- 建立邏輯複製槽,'flink_slot' 名稱需與後續 Flink 配置保持一致 SELECT * FROM pg_create_logical_replication_slot('flink_slot', 'pgoutput'); -- 為之前建立的 DynamoDB 專用帳號授予 REPLICATION 許可權 -- 將 <your_dynamodb_user> 替換為您的 DynamoDB 專用帳號名 ALTER ROLE <your_dynamodb_user> REPLICATION; -- 檢查複製槽狀態,此時 flink_slot 的 active 應為 f (false) SELECT * FROM pg_replication_slots;開通並配置Flink:
開通Realtime ComputeFlink版,並建立一個Flink工作空間。
重要Flink工作空間需與PolarDB叢集位於同一個VPC下。
為Flink工作空間配置公網訪問,使其能夠串連AWS DynamoDB。
配置PolarDB叢集的IP白名單:
在Flink控制台,單擊工作空間的详情按鈕,在工作空間詳情頁面擷取其網段資訊。
前往PolarDB控制台,在叢集的中新增IP白名单分组,將Flink的網段資訊添加進去。
驗證PolarDB叢集與Flink工作空間連通性:
在Flink控制台,進入工作空間,單擊右上方的網路探測表徵圖。
填寫PolarDB叢集主節點的私人地址與連接埠,單擊探測。
彈窗提示網路探測成功連通,即叢集白名單配置正確。
部署Flink作業
下載反向同步工具:PolarDBBackSync.jar。
準備設定檔:建立名為
application.yaml的設定檔,內容如下:snapshot: mode: never source: # PolarDB主節點的私網地址 hostname: pc-xxx.pg.polardb.rds.aliyuncs.com # PolarDB主節點的私網連接埠 port: 5432 # 之前建立的邏輯複製槽的名稱 slotName: flink_slot target: # 目標 AWS DynamoDB 的 region region: cn-north-1 # (可選) 表過濾配置,whiteTableSet 和 blackTableSet 只能聲明一個 filter: # whiteTableSet: 需要反向迴流的表 # blackTableSet: 不需要反向迴流的表 whiteTableSet: blackTableSet: # Flink 作業的檢查點(checkpoint)間隔,單位毫秒 checkpoint: interval: 3000上傳檔案:進入Flink控制台,找到並進入目標工作空間,在檔案管理頁面上傳
PolarDBBackSync.jar和application.yaml。安全儲存憑證:為避免明文暴露密鑰,建議使用Flink的變數管理功能儲存敏感資訊。在變數管理頁面新增以下四個變數:
變數名稱
變數值
polardbusernamePolarDB的DynamoDB帳號。
polardbpasswordPolarDB的DynamoDB帳號的原始密碼,並非DynamoDB帳號密鑰。
dynamodbakAWS DynamoDB的AccessKey。
dynamodbskAWS DynamoDB的SecretKey。
configfilename(可選)附加依賴檔案名稱,預設為
application.yaml。部署並啟動作業:
進入作業營運頁面,選擇部署作業 > JAR作業。
填寫以下主要參數,其他參數可根據業務環境進行配置。然後單擊部署。
參數名稱
填寫參考
部署模式
固定為流模式。
部署名稱
填寫作業部署名稱,此處以PolarDBBackSync為例。
引擎版本
固定為vvr-11.3-jdk11-flink-1.20。
JAR URI
選擇已上傳的
PolarDBBackSync.jar。Entry Point Class
固定為
org.example.PolarDBCdcJob。Entry Point Main Arguments
固定為:
--polardbusername ${secret_values.polardbusername}--polardbpassword ${secret_values.polardbpassword}--dynamodbak ${secret_values.dynamodbak}--dynamodbsk ${secret_values.dynamodbsk}(可選)
--configfilename ${secret_values.configfilename}附加依賴檔案
選擇已上傳的
application.yaml。重要如果您的密碼或其他參數值中包含特殊字元,可能會導致 Flink 作業解析參數失敗。為防止此問題,請在作業建立完成後,在部署詳情 > 運行參數配置 中點擊編輯按鈕,在其他配置中,添加以下配置來防止此類問題:
env.java.opts: -Dconfig.disable-inline-comment=true。部署成功後,單擊啟動 > 無狀態。
驗證與清理
驗證:在作業啟動後,使用高許可權帳號串連至PolarDB叢集的
polardb_internal_dynamodb資料庫,並執行SELECT * FROM pg_replication_slots;。若複製槽flink_slot的active欄位變為t(true),則表示Flink作業已成功串連。此時即可在PolarDB叢集內開始匯入業務流量。清理:當不再需要進行反向同步時,您可執行以下步驟以釋放相關資源節省費用。
Realtime ComputeFlink版:
停止作業:前往Flink控制台,在目標工作空間中,進入作業營運頁面,找到目標作業並單擊停止
釋放執行個體:返回Flink控制台,找到目標工作空間,單擊釋放資源。
PolarDB叢集:使用高許可權帳號串連至
polardb_internal_dynamodb資料庫,執行以下命令刪除邏輯複製槽。SELECT pg_drop_replication_slot('flink_slot');
步驟四:執行業務割接
增量同步處理延遲較低且資料一致性校正無差異後,可計劃業務割接。當您準備好進行最終的業務切換時,請遵循以下嚴謹的步驟:
最終校正:在計劃的停機視窗前,反覆運行一致性校正工具,直至確認增量同步處理延遲極低,且資料差異數量降至0或可接受的範圍內。
停止源端寫入:在停機視窗開始時,暫停所有向源端AWS DynamoDB寫入資料的業務應用。
等待同步完成:觀察
nimo-shake的日誌,確認已無新的增量資料需要同步。最後一次校正:再次運行
nimo-full-check工具,確保源端和目標端的資料完全一致。停止同步工具:在確認資料完全同步後,停止
nimo-shake進程。切換應用串連:停止業務應用,將業務應用的資料庫連接配置,從AWS DynamoDB的地址切換為PolarDB的DynamoDB訪問地址。
(可選)開啟反向同步任務:在業務正式切換至PolarDB期間開啟反向同步任務,基於阿里雲Realtime ComputeFlink版實現資料的反向迴流,為業務復原提供保障。
啟動業務:重啟業務應用。至此,割接完成。
(可選)停止反向同步任務:在割接完成且業務穩定運行一段時間後,確認資料一致性滿足業務需求後,即可安全停止反向同步任務(停止Flink作業與釋放相關資源)。
附錄:為測試環境類比即時業務流量
如果您希望在測試環境中類比一個真實的、持續有資料寫入的遷移情境,可以使用以下Go語言範例程式碼。該代碼會向源端DynamoDB表周期性地寫入和更新資料。
此步驟僅用於測試和驗證遷移流程,在實際生產遷移中無需執行。
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// --- Configuration for your source AWS DynamoDB ---
var (
region = "cn-north-1" // AWS DynamoDB region
accessKey = "your-aws-access-key" // AWS DynamoDB access key
secretKey = "your-aws-secret-key" // AWS DynamoDB secret key
)
// --- Helper function to create a DynamoDB client ---
func createClient() (*dynamodb.Client, context.Context) {
ctx := context.Background()
sdkConfig, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
if err != nil {
log.Fatalf("Failed to load AWS config: %v", err)
}
client := dynamodb.NewFromConfig(sdkConfig, func(o *dynamodb.Options) {
o.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
})
return client, ctx
}
// --- Function to create a table and populate it with initial data ---
func initializeData(client *dynamodb.Client, ctx context.Context) {
tableName := "src1" // Example table name
// Create table if not exists
_, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: &tableName,
AttributeDefinitions: [ ]types.AttributeDefinition{
{AttributeName: aws.String("pk"), AttributeType: types.ScalarAttributeTypeS},
},
KeySchema: [ ]types.KeySchemaElement{
{AttributeName: aws.String("pk"), KeyType: types.KeyTypeHash},
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(100),
WriteCapacityUnits: aws.Int64(100),
},
})
if err != nil {
// Ignore if table already exists, fail on other errors
if _, ok := err.(*types.ResourceInUseException); !ok {
log.Fatalf("CreateTable failed for %s: %v", tableName, err)
}
}
fmt.Printf("Waiting for table '%s' to become active...\n", tableName)
waiter := dynamodb.NewTableExistsWaiter(client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{TableName: &tableName}, 5*time.Minute)
if err != nil {
log.Fatalf("Waiter failed for table %s: %v", tableName, err)
}
// Insert 100 sample items
for i := 0; i < 100; i++ {
pk := fmt.Sprintf("%s_user_%03d", tableName, i)
item := map[string]types.AttributeValue{
"pk": &types.AttributeValueMemberS{Value: pk},
"val": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", i)},
}
client.PutItem(ctx, &dynamodb.PutItemInput{TableName: &tableName, Item: item})
}
fmt.Printf("Inserted 100 initial items into '%s'.\n", tableName)
}
// --- Function to simulate continuous business traffic ---
func simulateTraffic(client *dynamodb.Client, ctx context.Context) {
tableName := "src1"
fmt.Println("Starting periodic updates to simulate traffic. Press Ctrl+C to stop.")
i := 0
for {
pk := fmt.Sprintf("%s_user_%03d", tableName, i%100)
newValue := fmt.Sprintf("%d", rand.Intn(1000))
_, err := client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
TableName: &tableName,
Key: map[string]types.AttributeValue{
"pk": &types.AttributeValueMemberS{Value: pk},
},
ExpressionAttributeValues: map[string]types.AttributeValue{
":newval": &types.AttributeValueMemberN{Value: newValue},
},
UpdateExpression: aws.String("SET val = :newval"),
})
if err != nil {
fmt.Printf("Update error: %v\n", err)
} else {
fmt.Printf("Updated pk=%s with new val=%s\n", pk, newValue)
}
i++
time.Sleep(1 * time.Second) // Update one record per second
}
}
func main() {
client, ctx := createClient()
initializeData(client, ctx)
simulateTraffic(client, ctx)
}