All Products
Search
Document Center

PolarDB:Data migration

Last Updated:Mar 28, 2026

Migrate data from Amazon DynamoDB to PolarDB for PostgreSQL with minimal downtime. The migration combines full synchronization, incremental synchronization, and an optional reverse synchronization fallback — all managed by a dedicated toolset.

How it works

The migration uses three tools across five stages:

ToolRole
nimo-shakeFull and incremental synchronization from DynamoDB to PolarDB
nimo-full-checkConsistency check between source and target
PolarDBBackSyncReverse synchronization from PolarDB back to DynamoDB (optional, for rollback)

Stage 1 — Full synchronization (nimo-shake): The tool auto-creates table structures in the target PolarDB cluster that mirror the source, then reads all data concurrently via DynamoDB Scan operations and writes it in batches using BatchWriteItem.

Stage 2 — Incremental synchronization (nimo-shake): After full synchronization completes, the tool switches automatically to capture ongoing changes (inserts, deletes, and updates) via AWS DynamoDB Streams and applies them to PolarDB. This phase supports resumable transfers.

Stage 3 — Consistency check (nimo-full-check): Run at any point during or after synchronization. The tool reads both source and target concurrently, compares records by primary key, and produces a detailed diff report.

Stage 4 — Reverse synchronization (PolarDBBackSync, optional): Before cutover, set up a reverse link from PolarDB back to DynamoDB. If cutover causes issues, this link lets data flow back to the source as a rollback safeguard. PolarDBBackSync uses Flink to capture real-time changes from PolarDB and applies them to DynamoDB using PutItem or DeleteItem operations based on the change type. It is built on Realtime Compute for Apache Flink.

Stage 5 — Business cutover: When incremental synchronization lag is near-zero and the consistency check shows no differences, stop writes to the source, confirm the last sync, then switch your application connections to PolarDB.

Data Migration Guide - Flowchart

Usage notes

  • Schedule during off-peak hours: Assess database capacity and run the migration when load is lowest to minimize performance impact.

  • Restrict write access before cutover: Grant write permissions to the target PolarDB cluster only for the account used by the synchronization tool. This prevents accidental data contamination before the official switch.

Prerequisites

Before you begin, ensure that you have:

  • Migration tools — Download NimoShake.tar.gz. This archive contains both nimo-shake and nimo-full-check

  • (Optional) Reverse migration tool — Download PolarDBBackSync.jar if you plan to configure a rollback path

  • A PolarDB for PostgreSQL clusterEnable DynamoDB compatibility, obtain the DynamoDB endpoint, and create a dedicated DynamoDB account to get the AccessKey credentials for API access

  • (Optional) `wal_level` set to `logical` — Required only if you plan to configure reverse synchronization. This change requires a cluster restart, so complete it before starting the migration

  • AWS DynamoDB credentials — AccessKey ID and Secret Access Key for the source DynamoDB

  • A runtime host — An ECS instance or any server that can reach both the PolarDB cluster and AWS DynamoDB

Migrate data

Step 1: Start data synchronization

  1. Unpack NimoShake.tar.gz, go to the NimoShake/nimo-shake directory, and edit conf/nimo-shake.conf. The following table describes the core parameters.

    ParameterDescriptionExample
    sync_modeSynchronization scope. all runs full then incremental synchronization; full runs full synchronization only.all
    source.access_key_idAccessKey ID for the source AWS DynamoDBAKIAIOSFODNN7...
    source.secret_access_keySecret Access Key for the source AWS DynamoDBwJalrXUtnFEMI...
    source.regionRegion where the source AWS DynamoDB is locatedcn-north-1
    source.endpoint_urlSource endpoint. Leave blank for AWS DynamoDB.-
    target.addressDynamoDB endpoint of the target PolarDB cluster, including the http:// prefix and porthttp://pe-xxx.rwlb.rds...
    target.access_key_idAccessKey for the PolarDB DynamoDB accountyour-polardb-access-key
    target.secret_access_keySecret Access Key for the PolarDB DynamoDB accountyour-polardb-secret-key
    filter.collection.whiteWhitelist of tables to synchronize, separated by ;.
    Note

    Cannot be used together with the blacklist. If both are set, all tables are synchronized.

    c1;c2
    filter.collection.blackBlacklist of tables to exclude, separated by ;.
    Note

    Cannot be used together with the whitelist. If both are set, all tables are synchronized.

    c1;c2
  2. Start the synchronization task. Run the binary that matches your operating system architecture from the NimoShake/nimo-shake directory.

    Run the process with nohup so the synchronization task continues if your terminal session disconnects.
    # Start on Linux x86-64
    nohup ./bin/nimo-shake.linux.amd64 -conf=./conf/nimo-shake.conf > /dev/null 2>&1 &

    The tool completes full synchronization first, then transitions automatically to incremental synchronization and continues running.

Step 2: Check data consistency

  1. Go to the NimoShake/nimo-full-check directory and edit conf/nimo-full-check.conf. The configuration parameters follow the same structure as nimo-shake.conf — provide the connection information for both the source and the target.

  2. Start the consistency check.

    # Start on Linux x86-64
    nohup ./bin/nimo-full-check.linux.amd64 -conf=./conf/nimo-full-check.conf > /dev/null 2>&1 &

    The tool prints progress to the terminal in real time. Detailed logs are written to logs/, and diff reports are written to nimo-full-check-diff/.

    image.png

  3. (Optional) Fix inconsistent data. If any tables have inconsistencies, the diff directory lists them by table name. For example:

    The default diff directory is nimo-full-check-diff. To change it, set the diff_output_file parameter in conf/nimo-full-check.conf.
    nimo-full-check-diff/
      └── testtable-0
      └── testtable-1

    Run the repair tool to fix the differences.

    nohup ./bin/nimo-repair.linux.amd64 -conf=./conf/nimo-repair.conf > /dev/null 2>&1 &

Step 3: (Optional) Configure reverse synchronization

Set up a reverse synchronization link from PolarDB for PostgreSQL back to the source DynamoDB before you perform the cutover. When you start the cutover, activate this link so data can flow back to the source if you need to roll back.

Prepare the environment

  1. Confirm that the PolarDB cluster's wal_level parameter is set to logical.

  2. (Optional) If you have not yet created a privileged account, go to the PolarDB consolePolarDB console and create one under Settings and Management > Accounts.

  3. Connect to the polardb_internal_dynamodb database using the privileged account and run the following SQL to create a logical replication slot, grant replication permissions to the DynamoDB account, and create a publication covering all tables.

    -- Create a logical replication slot. The name 'flink_slot' must match the Flink configuration.
    SELECT * FROM pg_create_logical_replication_slot('flink_slot', 'pgoutput');
    
    -- Grant the REPLICATION permission to the dedicated DynamoDB account.
    -- Replace <your_dynamodb_user> with the actual account name.
    ALTER ROLE <your_dynamodb_user> REPLICATION;
    
    -- Verify the slot was created. The 'active' field for flink_slot should be 'f' (false) at this point.
    SELECT * FROM pg_replication_slots;
  4. Set up Flink:

    1. Enable Realtime Compute for Apache Flink and create a Flink workspace. > Important: The Flink workspace must be in the same VPC as the PolarDB cluster.

    2. Configure public network access for the Flink workspace so it can reach AWS DynamoDB.

  5. Add the Flink workspace CIDR block to the PolarDB cluster's IP address whitelist:

    1. In the Flink console, click Details for the workspace and copy the CIDR block from the Workspace Details page.

    2. In the PolarDB consolePolarDB console, go to Settings and Management > Cluster Whitelists, click Add Whitelist, and add the CIDR block.

  6. Verify connectivity between the Flink workspace and the PolarDB cluster:

    1. In the Flink console, open the workspace and click the Network Probe icon in the upper-right corner.

    2. Enter the private address and port of the PolarDB primary node, then click Detect.

    3. Confirm the message Network probe connected successfully appears.

Deploy the Flink job

  1. Create a configuration file named application.yaml with the following content. Replace the placeholder values with your actual cluster details.

    snapshot:
      mode: never
    
    source:
        # Private endpoint of the PolarDB primary node
        hostname: pc-xxx.pg.polardb.rds.aliyuncs.com
        # Private port of the PolarDB primary node
        port: 5432
        # Name of the logical replication slot created earlier
        slotName: flink_slot
    
    target:
      # Region of the target AWS DynamoDB
      region: cn-north-1
    
    # (Optional) Table filtering. Declare only one of whiteTableSet or blackTableSet.
    filter:
      # whiteTableSet: Tables to include in reverse synchronization.
      whiteTableSet:
      # blackTableSet: Tables to exclude from reverse synchronization.
      blackTableSet:
    
    # Checkpoint interval for the Flink job, in milliseconds
    checkpoint:
      interval: 3000
  2. In the Flink console, go to the target workspace. On the File Management page, upload both PolarDBBackSync.jar and application.yaml.

  3. Store credentials securely using Flink's Variable Management feature to avoid exposing keys in plaintext. On the Variable Management page, add the following variables.

    Variable nameVariable value
    polardbusernameDynamoDB account name for PolarDB
    polardbpasswordPassword for the PolarDB DynamoDB account. This is the original account password, not the DynamoDB account secret key.
    dynamodbakAccessKey ID for AWS DynamoDB
    dynamodbskSecret Access Key for AWS DynamoDB
    configfilename(Optional) Name of the configuration file. Defaults to application.yaml.
  4. Deploy and start the job:

    1. Go to Job O&M > Deploy Job > JAR Job.

    2. Set the following parameters, configure other settings as needed, and click Deploy.

      ParameterValue
      Deployment modeFixed: Stream mode
      Deployment nameEnter a descriptive name, for example, PolarDBBackSync.
      Engine versionFixed: vvr-11.3-jdk11-flink-1.20
      JAR URISelect the uploaded PolarDBBackSync.jar file.
      Entry point classorg.example.PolarDBCdcJob
      Entry point main arguments--polardbusername ${secret_values.polardbusername} --polardbpassword ${secret_values.polardbpassword} --dynamodbak ${secret_values.dynamodbak} --dynamodbsk ${secret_values.dynamodbsk} (Optional) --configfilename ${secret_values.configfilename}
      Additional dependency filesSelect the uploaded application.yaml file.
    3. After deployment succeeds, click Start > Stateless Start.

Verify reverse synchronization

Connect to the polardb_internal_dynamodb database using the privileged account and run:

SELECT * FROM pg_replication_slots;

If the active field for flink_slot has changed to t (true), the Flink job has connected successfully. Direct business traffic to the PolarDB cluster.

Clean up reverse synchronization resources

When reverse synchronization is no longer needed, release the associated resources to reduce costs.

  • Stop the Flink job: In the Flink console, go to Job O&M, find the job, and click Stop.

  • Release the Flink workspace: In the Flink console, find the workspace and click Release Resources.

  • Delete the replication slot: Connect to the polardb_internal_dynamodb database with the privileged account and run:

    SELECT pg_drop_replication_slot('flink_slot');

Step 4: Perform the business cutover

When incremental synchronization lag is low and the consistency check shows no discrepancies, follow these steps to cut over.

  1. Final validation: Run the consistency check tool repeatedly until incremental synchronization lag is minimal and data discrepancies are zero (or within an acceptable range).

  2. Stop writes to the source: Pause all business application writes to the source AWS DynamoDB.

  3. Wait for the last sync to complete: Monitor the nimo-shake logs until no new incremental data remains to be synchronized.

  4. Final verification: Run nimo-full-check once more to confirm source and target data are identical.

  5. Stop the synchronization tool: Stop the nimo-shake process.

  6. Switch application connections: Change the database connection configuration in your business applications from the AWS DynamoDB endpoint to the DynamoDB-compatible endpoint of the PolarDB cluster.

  7. (Optional) Start reverse synchronization: Start the reverse synchronization task to replicate data back to the source DynamoDB, providing a rollback safeguard during the cutover period.

  8. Restart business applications: The cutover is complete.

  9. (Optional) Stop reverse synchronization: After services have run stably and data consistency meets your requirements, stop the Flink job and release the associated resources.

Appendix: Simulate real-time traffic for testing

To validate the migration process in a test environment with continuous data writes, use the following Go sample code. It creates a DynamoDB table, inserts 100 initial items, then updates one record per second to simulate business traffic.

This code is for testing only. Do not run it during a production migration.
package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/credentials"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

// --- Configuration for your source AWS DynamoDB ---
var (
    region    = "cn-north-1"       // AWS DynamoDB region
    accessKey = "your-aws-access-key" // AWS DynamoDB access key
    secretKey = "your-aws-secret-key" // AWS DynamoDB secret key
)

// --- Helper function to create a DynamoDB client ---
func createClient() (*dynamodb.Client, context.Context) {
    ctx := context.Background()
    sdkConfig, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
    if err != nil {
        log.Fatalf("Failed to load AWS config: %v", err)
    }
    client := dynamodb.NewFromConfig(sdkConfig, func(o *dynamodb.Options) {
        o.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
    })
    return client, ctx
}

// --- Function to create a table and populate it with initial data ---
func initializeData(client *dynamodb.Client, ctx context.Context) {
    tableName := "src1" // Example table name

    // Create table if not exists
    _, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{
        TableName: &tableName,
        AttributeDefinitions: []types.AttributeDefinition{
            {AttributeName: aws.String("pk"), AttributeType: types.ScalarAttributeTypeS},
        },
        KeySchema: []types.KeySchemaElement{
            {AttributeName: aws.String("pk"), KeyType: types.KeyTypeHash},
        },
        ProvisionedThroughput: &types.ProvisionedThroughput{
            ReadCapacityUnits:  aws.Int64(100),
            WriteCapacityUnits: aws.Int64(100),
        },
    })
    if err != nil {
		// Ignore if table already exists, fail on other errors
        if _, ok := err.(*types.ResourceInUseException); !ok {
			log.Fatalf("CreateTable failed for %s: %v", tableName, err)
		}
    }

    fmt.Printf("Waiting for table '%s' to become active...\n", tableName)
    waiter := dynamodb.NewTableExistsWaiter(client)
    err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{TableName: &tableName}, 5*time.Minute)
    if err != nil {
        log.Fatalf("Waiter failed for table %s: %v", tableName, err)
    }

    // Insert 100 sample items
    for i := 0; i < 100; i++ {
        pk := fmt.Sprintf("%s_user_%03d", tableName, i)
        item := map[string]types.AttributeValue{
            "pk":  &types.AttributeValueMemberS{Value: pk},
            "val": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", i)},
        }
        client.PutItem(ctx, &dynamodb.PutItemInput{TableName: &tableName, Item: item})
    }
    fmt.Printf("Inserted 100 initial items into '%s'.\n", tableName)
}

// --- Function to simulate continuous business traffic ---
func simulateTraffic(client *dynamodb.Client, ctx context.Context) {
    tableName := "src1"
    fmt.Println("Starting periodic updates to simulate traffic. Press Ctrl+C to stop.")
    i := 0
    for {
        pk := fmt.Sprintf("%s_user_%03d", tableName, i%100)
        newValue := fmt.Sprintf("%d", rand.Intn(1000))
        _, err := client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
            TableName: &tableName,
            Key: map[string]types.AttributeValue{
                "pk": &types.AttributeValueMemberS{Value: pk},
            },
            ExpressionAttributeValues: map[string]types.AttributeValue{
                ":newval": &types.AttributeValueMemberN{Value: newValue},
            },
            UpdateExpression: aws.String("SET val = :newval"),
        })
        if err != nil {
            fmt.Printf("Update error: %v\n", err)
        } else {
            fmt.Printf("Updated pk=%s with new val=%s\n", pk, newValue)
        }
        i++
        time.Sleep(1 * time.Second) // Update one record per second
    }
}

func main() {
    client, ctx := createClient()
    initializeData(client, ctx)
    simulateTraffic(client, ctx)
}