Migrate data from Amazon DynamoDB to PolarDB for PostgreSQL with minimal downtime. The migration combines full synchronization, incremental synchronization, and an optional reverse synchronization fallback — all managed by a dedicated toolset.
How it works
The migration uses three tools across five stages:
| Tool | Role |
|---|---|
nimo-shake | Full and incremental synchronization from DynamoDB to PolarDB |
nimo-full-check | Consistency check between source and target |
PolarDBBackSync | Reverse synchronization from PolarDB back to DynamoDB (optional, for rollback) |
Stage 1 — Full synchronization (nimo-shake): The tool auto-creates table structures in the target PolarDB cluster that mirror the source, then reads all data concurrently via DynamoDB Scan operations and writes it in batches using BatchWriteItem.
Stage 2 — Incremental synchronization (nimo-shake): After full synchronization completes, the tool switches automatically to capture ongoing changes (inserts, deletes, and updates) via AWS DynamoDB Streams and applies them to PolarDB. This phase supports resumable transfers.
Stage 3 — Consistency check (nimo-full-check): Run at any point during or after synchronization. The tool reads both source and target concurrently, compares records by primary key, and produces a detailed diff report.
Stage 4 — Reverse synchronization (PolarDBBackSync, optional): Before cutover, set up a reverse link from PolarDB back to DynamoDB. If cutover causes issues, this link lets data flow back to the source as a rollback safeguard. PolarDBBackSync uses Flink to capture real-time changes from PolarDB and applies them to DynamoDB using PutItem or DeleteItem operations based on the change type. It is built on Realtime Compute for Apache Flink.
Stage 5 — Business cutover: When incremental synchronization lag is near-zero and the consistency check shows no differences, stop writes to the source, confirm the last sync, then switch your application connections to PolarDB.

Usage notes
Schedule during off-peak hours: Assess database capacity and run the migration when load is lowest to minimize performance impact.
Restrict write access before cutover: Grant write permissions to the target PolarDB cluster only for the account used by the synchronization tool. This prevents accidental data contamination before the official switch.
Prerequisites
Before you begin, ensure that you have:
Migration tools — Download NimoShake.tar.gz. This archive contains both
nimo-shakeandnimo-full-check(Optional) Reverse migration tool — Download PolarDBBackSync.jar if you plan to configure a rollback path
A PolarDB for PostgreSQL cluster — Enable DynamoDB compatibility, obtain the DynamoDB endpoint, and create a dedicated DynamoDB account to get the AccessKey credentials for API access
(Optional) `wal_level` set to `logical` — Required only if you plan to configure reverse synchronization. This change requires a cluster restart, so complete it before starting the migration
AWS DynamoDB credentials — AccessKey ID and Secret Access Key for the source DynamoDB
A runtime host — An ECS instance or any server that can reach both the PolarDB cluster and AWS DynamoDB
Migrate data
Step 1: Start data synchronization
Unpack
NimoShake.tar.gz, go to theNimoShake/nimo-shakedirectory, and editconf/nimo-shake.conf. The following table describes the core parameters.Parameter Description Example sync_modeSynchronization scope. allruns full then incremental synchronization;fullruns full synchronization only.allsource.access_key_idAccessKey ID for the source AWS DynamoDB AKIAIOSFODNN7...source.secret_access_keySecret Access Key for the source AWS DynamoDB wJalrXUtnFEMI...source.regionRegion where the source AWS DynamoDB is located cn-north-1source.endpoint_urlSource endpoint. Leave blank for AWS DynamoDB. - target.addressDynamoDB endpoint of the target PolarDB cluster, including the http://prefix and porthttp://pe-xxx.rwlb.rds...target.access_key_idAccessKey for the PolarDB DynamoDB account your-polardb-access-keytarget.secret_access_keySecret Access Key for the PolarDB DynamoDB account your-polardb-secret-keyfilter.collection.whiteWhitelist of tables to synchronize, separated by ;.NoteCannot be used together with the blacklist. If both are set, all tables are synchronized.
c1;c2filter.collection.blackBlacklist of tables to exclude, separated by ;.NoteCannot be used together with the whitelist. If both are set, all tables are synchronized.
c1;c2Start the synchronization task. Run the binary that matches your operating system architecture from the
NimoShake/nimo-shakedirectory.Run the process with
nohupso the synchronization task continues if your terminal session disconnects.# Start on Linux x86-64 nohup ./bin/nimo-shake.linux.amd64 -conf=./conf/nimo-shake.conf > /dev/null 2>&1 &The tool completes full synchronization first, then transitions automatically to incremental synchronization and continues running.
Step 2: Check data consistency
Go to the
NimoShake/nimo-full-checkdirectory and editconf/nimo-full-check.conf. The configuration parameters follow the same structure asnimo-shake.conf— provide the connection information for both the source and the target.Start the consistency check.
# Start on Linux x86-64 nohup ./bin/nimo-full-check.linux.amd64 -conf=./conf/nimo-full-check.conf > /dev/null 2>&1 &The tool prints progress to the terminal in real time. Detailed logs are written to
logs/, and diff reports are written tonimo-full-check-diff/.
(Optional) Fix inconsistent data. If any tables have inconsistencies, the diff directory lists them by table name. For example:
The default diff directory is
nimo-full-check-diff. To change it, set thediff_output_fileparameter inconf/nimo-full-check.conf.nimo-full-check-diff/ └── testtable-0 └── testtable-1Run the repair tool to fix the differences.
nohup ./bin/nimo-repair.linux.amd64 -conf=./conf/nimo-repair.conf > /dev/null 2>&1 &
Step 3: (Optional) Configure reverse synchronization
Set up a reverse synchronization link from PolarDB for PostgreSQL back to the source DynamoDB before you perform the cutover. When you start the cutover, activate this link so data can flow back to the source if you need to roll back.
Prepare the environment
Confirm that the PolarDB cluster's
wal_levelparameter is set tological.(Optional) If you have not yet created a privileged account, go to the PolarDB consolePolarDB console and create one under Settings and Management > Accounts.
Connect to the
polardb_internal_dynamodbdatabase using the privileged account and run the following SQL to create a logical replication slot, grant replication permissions to the DynamoDB account, and create a publication covering all tables.-- Create a logical replication slot. The name 'flink_slot' must match the Flink configuration. SELECT * FROM pg_create_logical_replication_slot('flink_slot', 'pgoutput'); -- Grant the REPLICATION permission to the dedicated DynamoDB account. -- Replace <your_dynamodb_user> with the actual account name. ALTER ROLE <your_dynamodb_user> REPLICATION; -- Verify the slot was created. The 'active' field for flink_slot should be 'f' (false) at this point. SELECT * FROM pg_replication_slots;Set up Flink:
Enable Realtime Compute for Apache Flink and create a Flink workspace. > Important: The Flink workspace must be in the same VPC as the PolarDB cluster.
Configure public network access for the Flink workspace so it can reach AWS DynamoDB.
Add the Flink workspace CIDR block to the PolarDB cluster's IP address whitelist:
In the Flink console, click Details for the workspace and copy the CIDR block from the Workspace Details page.
In the PolarDB consolePolarDB console, go to Settings and Management > Cluster Whitelists, click Add Whitelist, and add the CIDR block.
Verify connectivity between the Flink workspace and the PolarDB cluster:
In the Flink console, open the workspace and click the Network Probe icon in the upper-right corner.
Enter the private address and port of the PolarDB primary node, then click Detect.
Confirm the message Network probe connected successfully appears.
Deploy the Flink job
Create a configuration file named
application.yamlwith the following content. Replace the placeholder values with your actual cluster details.snapshot: mode: never source: # Private endpoint of the PolarDB primary node hostname: pc-xxx.pg.polardb.rds.aliyuncs.com # Private port of the PolarDB primary node port: 5432 # Name of the logical replication slot created earlier slotName: flink_slot target: # Region of the target AWS DynamoDB region: cn-north-1 # (Optional) Table filtering. Declare only one of whiteTableSet or blackTableSet. filter: # whiteTableSet: Tables to include in reverse synchronization. whiteTableSet: # blackTableSet: Tables to exclude from reverse synchronization. blackTableSet: # Checkpoint interval for the Flink job, in milliseconds checkpoint: interval: 3000In the Flink console, go to the target workspace. On the File Management page, upload both
PolarDBBackSync.jarandapplication.yaml.Store credentials securely using Flink's Variable Management feature to avoid exposing keys in plaintext. On the Variable Management page, add the following variables.
Variable name Variable value polardbusernameDynamoDB account name for PolarDB polardbpasswordPassword for the PolarDB DynamoDB account. This is the original account password, not the DynamoDB account secret key. dynamodbakAccessKey ID for AWS DynamoDB dynamodbskSecret Access Key for AWS DynamoDB configfilename(Optional) Name of the configuration file. Defaults to application.yaml.Deploy and start the job:
Go to Job O&M > Deploy Job > JAR Job.
Set the following parameters, configure other settings as needed, and click Deploy.
Parameter Value Deployment mode Fixed: Stream mode Deployment name Enter a descriptive name, for example, PolarDBBackSync.Engine version Fixed: vvr-11.3-jdk11-flink-1.20 JAR URI Select the uploaded PolarDBBackSync.jarfile.Entry point class org.example.PolarDBCdcJobEntry point main arguments --polardbusername ${secret_values.polardbusername}--polardbpassword ${secret_values.polardbpassword}--dynamodbak ${secret_values.dynamodbak}--dynamodbsk ${secret_values.dynamodbsk}(Optional)--configfilename ${secret_values.configfilename}Additional dependency files Select the uploaded application.yamlfile.After deployment succeeds, click Start > Stateless Start.
Verify reverse synchronization
Connect to the polardb_internal_dynamodb database using the privileged account and run:
SELECT * FROM pg_replication_slots;If the active field for flink_slot has changed to t (true), the Flink job has connected successfully. Direct business traffic to the PolarDB cluster.
Clean up reverse synchronization resources
When reverse synchronization is no longer needed, release the associated resources to reduce costs.
Stop the Flink job: In the Flink console, go to Job O&M, find the job, and click Stop.
Release the Flink workspace: In the Flink console, find the workspace and click Release Resources.
Delete the replication slot: Connect to the
polardb_internal_dynamodbdatabase with the privileged account and run:SELECT pg_drop_replication_slot('flink_slot');
Step 4: Perform the business cutover
When incremental synchronization lag is low and the consistency check shows no discrepancies, follow these steps to cut over.
Final validation: Run the consistency check tool repeatedly until incremental synchronization lag is minimal and data discrepancies are zero (or within an acceptable range).
Stop writes to the source: Pause all business application writes to the source AWS DynamoDB.
Wait for the last sync to complete: Monitor the
nimo-shakelogs until no new incremental data remains to be synchronized.Final verification: Run
nimo-full-checkonce more to confirm source and target data are identical.Stop the synchronization tool: Stop the
nimo-shakeprocess.Switch application connections: Change the database connection configuration in your business applications from the AWS DynamoDB endpoint to the DynamoDB-compatible endpoint of the PolarDB cluster.
(Optional) Start reverse synchronization: Start the reverse synchronization task to replicate data back to the source DynamoDB, providing a rollback safeguard during the cutover period.
Restart business applications: The cutover is complete.
(Optional) Stop reverse synchronization: After services have run stably and data consistency meets your requirements, stop the Flink job and release the associated resources.
Appendix: Simulate real-time traffic for testing
To validate the migration process in a test environment with continuous data writes, use the following Go sample code. It creates a DynamoDB table, inserts 100 initial items, then updates one record per second to simulate business traffic.
This code is for testing only. Do not run it during a production migration.
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// --- Configuration for your source AWS DynamoDB ---
var (
region = "cn-north-1" // AWS DynamoDB region
accessKey = "your-aws-access-key" // AWS DynamoDB access key
secretKey = "your-aws-secret-key" // AWS DynamoDB secret key
)
// --- Helper function to create a DynamoDB client ---
func createClient() (*dynamodb.Client, context.Context) {
ctx := context.Background()
sdkConfig, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
if err != nil {
log.Fatalf("Failed to load AWS config: %v", err)
}
client := dynamodb.NewFromConfig(sdkConfig, func(o *dynamodb.Options) {
o.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
})
return client, ctx
}
// --- Function to create a table and populate it with initial data ---
func initializeData(client *dynamodb.Client, ctx context.Context) {
tableName := "src1" // Example table name
// Create table if not exists
_, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: &tableName,
AttributeDefinitions: []types.AttributeDefinition{
{AttributeName: aws.String("pk"), AttributeType: types.ScalarAttributeTypeS},
},
KeySchema: []types.KeySchemaElement{
{AttributeName: aws.String("pk"), KeyType: types.KeyTypeHash},
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(100),
WriteCapacityUnits: aws.Int64(100),
},
})
if err != nil {
// Ignore if table already exists, fail on other errors
if _, ok := err.(*types.ResourceInUseException); !ok {
log.Fatalf("CreateTable failed for %s: %v", tableName, err)
}
}
fmt.Printf("Waiting for table '%s' to become active...\n", tableName)
waiter := dynamodb.NewTableExistsWaiter(client)
err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{TableName: &tableName}, 5*time.Minute)
if err != nil {
log.Fatalf("Waiter failed for table %s: %v", tableName, err)
}
// Insert 100 sample items
for i := 0; i < 100; i++ {
pk := fmt.Sprintf("%s_user_%03d", tableName, i)
item := map[string]types.AttributeValue{
"pk": &types.AttributeValueMemberS{Value: pk},
"val": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", i)},
}
client.PutItem(ctx, &dynamodb.PutItemInput{TableName: &tableName, Item: item})
}
fmt.Printf("Inserted 100 initial items into '%s'.\n", tableName)
}
// --- Function to simulate continuous business traffic ---
func simulateTraffic(client *dynamodb.Client, ctx context.Context) {
tableName := "src1"
fmt.Println("Starting periodic updates to simulate traffic. Press Ctrl+C to stop.")
i := 0
for {
pk := fmt.Sprintf("%s_user_%03d", tableName, i%100)
newValue := fmt.Sprintf("%d", rand.Intn(1000))
_, err := client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
TableName: &tableName,
Key: map[string]types.AttributeValue{
"pk": &types.AttributeValueMemberS{Value: pk},
},
ExpressionAttributeValues: map[string]types.AttributeValue{
":newval": &types.AttributeValueMemberN{Value: newValue},
},
UpdateExpression: aws.String("SET val = :newval"),
})
if err != nil {
fmt.Printf("Update error: %v\n", err)
} else {
fmt.Printf("Updated pk=%s with new val=%s\n", pk, newValue)
}
i++
time.Sleep(1 * time.Second) // Update one record per second
}
}
func main() {
client, ctx := createClient()
initializeData(client, ctx)
simulateTraffic(client, ctx)
}