All Products
Search
Document Center

PolarDB:Migrate from Amazon DynamoDB to PolarDB for PostgreSQL

Last Updated:Dec 02, 2025

This guide provides a step-by-step process for migrating data from Amazon DynamoDB to PolarDB for PostgreSQL. The migration uses a dedicated toolset to perform a full data copy followed by continuous, real-time synchronization. This process minimizes downtime and ensures data integrity.

Migration overview

The migration process has five main stages and uses three command-line tools:  nimo-shake for the full and incremental synchronization, nimo-full-check for validation, and PolarDBBackSync for reverse synchronization.

image
  1. Full synchronization

    • Tool: nimo-shake

    • Process: The nimo-shake tool first connects to your source DynamoDB table and automatically creates a matching table structure in your target PolarDB cluster. It then reads all data from the source table in parallel and writes it to the target cluster in batches.

  2. Incremental synchronization

    • Tool: nimo-shake

    • Process: After the full copy is complete, nimo-shake automatically begins capturing data changes (inserts, updates, and deletes) from the source database in real time. It uses Amazon DynamoDB Streams to achieve this and applies the changes to the target cluster, ensuring the two databases remain in sync. This process supports resumable transfers if interrupted.

  3. Consistency check

    • Tool: nimo-full-check

    • Process: At any time during or after the synchronization, you can run the nimo-full-check tool. It reads data from both the source and target databases, compares records by their primary key, and generates a detailed report of any differences.

  4. (Optional) Reverse synchronization

    • Tool: PolarDBBackSync.jar (Based on Realtime Compute for Apache Flink)

    • Process: After the consistency check, you can create a reverse synchronization task from PolarDB for PostgreSQL to DynamoDB for data integrity. This tool is implemented based on Realtime Compute for Apache Flink to capture data changes in the source PolarDB cluster and call the PutItem or DeleteItem operation to update data in the DynamoDB based on the type of the changed data.

  5. Business cutover

    • Process: Once the incremental data lag is near zero and the validation tool reports no data differences, you can perform the cutover. This involves a brief service pause to stop writes to the source database, followed by redirecting your application's connection to the PolarDB cluster.

Notes

  • Performance impact: The data migration process, especially the full synchronization phase, creates I/O load on the source and target databases. We recommend that you run the migration during off-peak hours and evaluate your database capacity beforehand.

  • Security configuration: Before the business cutover, manage the write permissions for the target PolarDB cluster. To prevent accidental data corruption, grant write permissions only to the account used by the data synchronization tool.

Preparations

Before you start the migration, complete the following preparations:

  1. Obtain the tools.

    1. Download th tools for data synchronization: NimoShake.tar.gz. The package includes the nimo-shake and nimo-full-check tools.

    2. (Optional) Download the tool for reverse synchronization: PolarDBBackSync.jar.

  2. Configure the PolarDB cluster.

    1. Enable the DynamoDB-compatible feature for the cluster. Then, obtain the DynamoDB endpoint of the cluster and create a dedicated DynamoDB account for API access.

    2. (Optional) Configure parameters: If you want to configure reverse synchronization, set the wal_level parameter of the PolarDB cluster to logical. The change of this parameter takes effect only after the cluster is restart. We recommend changing this parameter before the synchronization.

  3. Configure the AWS DynamoDB instance.

    1. Get the access credential (AccessKey ID and Secret Access Key) for the AWS DynamoDB instance.

    2. Enable the DynamoDB Streams feature for the source DynamoDB tables.

  4. Prepare the runtime environment.

    Create an ECS instance or a server that can connect to the PolarDB cluster and the AWS DynamoDB instance.

Migration procedure

Step 1: Configure and start data synchronization

  1. Unzip the nimo-shake tool package and edit the conf/nimo-shake.conf configuration file in the NimoShake/nimo-shake directory. The following table describes the core configuration items:

    Parameter

    Description

    Example value

    sync_mode

    Synchronization mode. all specifies full and incremental synchronization. full specifies only full synchronization.

    all

    source.access_key_id

    The AccessKey ID of the source AWS DynamoDB instance.

    AKIAIOSFODNN7...

    source.secret_access_key

    The Secret Access Key of the source AWS DynamoDB instance.

    wJalrXUtnFEMI...

    source.region

    The region where the source AWS DynamoDB instance resides.

    cn-north-1

    source.endpoint_url

    The source endpoint. For AWS DynamoDB, leave this item empty.

    -

    target.address

    The DynamoDB endpoint of the target PolarDB. Include http:// and the port number.

    http://pe-xxx.rwlb.rds...

    target.access_key_id

    The AccessKey of the DynamoDB account for the target PolarDB.

    your-polardb-access-key

    target.secret_access_key

    The SecretKey of the DynamoDB account for the target PolarDB.

    your-polardb-secret-key

    filter.collection.white

    Whitelist for table filtering. Only the specified tables are synchronized. Separate multiple table names with a semicolon (;).

    Note

    Do not use the whitelist and blacklist at the same time. If you specify both, all tables are synchronized.

    c1;c2

    filter.collection.black

    Blacklist for table filtering. The specified tables are excluded from synchronization. Separate multiple table names with a semicolon (;).

    Note

    Do not use the whitelist and blacklist at the same time. If you specify both, all tables are synchronized.

    c1;c2

  2. Select the binary file that corresponds to your operating system architecture and start the sync task.

    Note

    Run the task with the nohup option to prevent the sync task from being interrupted if the terminal session disconnects.

    # Return to the nimo-shake directory
    cd ..
    # For example, run the task on a Linux server with the x86-64 architecture
    nohup ./bin/nimo-shake.linux.amd64 -conf=./conf/nimo-shake.conf > nimo-shake.log 2>&1 &

    The program first performs a full synchronization. After it is complete, the program automatically switches to the incremental synchronization phase and continues to run.

Step 2: Run a data consistency check

  1. Edit the conf/nimo-full-check.conf configuration file in the NimoShake/nimo-full-check directory. The configuration items are similar to those for nimo-shake. Provide the connection information for both the source and target databases.

  2. Start the check task.

    # Return to the nimo-full-check directory
    cd ..
    # For example, run the task on a Linux server with the x86-64 architecture
    nohup ./bin/nimo-full-check.linux.amd64 -conf=./conf/nimo-full-check.conf > nimo-full-check.log 2>&1 &

    The check tool outputs its progress to the terminal in real time. Detailed logs and data difference reports are stored in the logs/ and nimo-full-check-diff/ directories, respectively. image.png

(Optional) Step 3: Configure reverse synchronization

Before business switchover, you can configure a reverse synchronization task from the PolarDB for PostgreSQL cluster to the DynamoDB instance. The task starts during the switchover for rollback.

Prepare the environment

  1. Change the wal_level parameter of the PolarDB cluster to logical.

  2. Create a privileged database account and grant permissions to it.

    1. (Optional) Go to the PolarDB console. On the details page of the cluster, choose Settings and Management > Accounts and create a privileged account.

    2. On the details page of the cluster, choose Settings and Management > Databases. Change the owner of the polardb_internal_dynamodb database to a privileged account.

  3. Create a logical replication slot and publish: Use the privileged account to connect to the polardb_internal_dynamodb database. Then, run the following commands to create a logical replication slot and grant the replication permissions to the DynamoDB Account. The publication includes subscription to all tables in the database. You can view whether the logical replication slot is active.

    -- Create a logical replication slot named 'flink_slot'. The name must be the same as the configurations in Realtime Compute for Apache Flink
    
    -- Grant the REPLICATION permission for the dedicated DynamoDB account.
    -- Replace <your_dynamodb_user> with the actual account name.
    ALTER ROLE <your_dynamodb_user> REPLICATION;
    
    -- Create a publication for all tables in the database. The replication slot gets the changes of the tables
    CREATE PUBLICATION dbz_publication FOR ALL TABLES;
    
    -- Check the status of the replication slot. The state of flink_slot should be f (false)
    SELECT * FROM pg_replication_slots;
  4. Activate and configure Realtime Compute for Apache Flink.

    1. Active Realtime Compute for Apache Flink and create a workspace.

      Important

      The workspace must be in the same VPC as the PolarDB cluster.

    2. Configure an Internet connection for the workspace so that it can connect to the AWS DynamoDB instance.

  5. Configure an IP whitelist for the PolarDB cluster.

    1. Navigate to the Realtime Compute for Apache Flink console and click Details in the Actions column corresponding to the workspace. On the Workspace Details dialog box that appears, get the value in the CIDR Block column.image

    2. Go to the PolarDB console. On the details page of the PolarDB cluster, choose Settings and Management > Cluster Whitelists in the left-side navigation pane. On the page that appears, click Add Whitelist and add the CIDR block of the workspace to the whitelist.

  6. Check whether the PolarDB cluster is connected to the Realtime Compute for Apache Flink workspace.

    1. Go to the details page of the workspace in the Realtime Compute for Apache Flink console. Click the Network detection icon in the upper-right corner.image

    2. Enter the private endpoint and port of the primary node of the PolarDB cluster, and then click Detect.

    3. If the This connection can be reached prompt is displayed, the whitelist is correctly configured.

Create a Flink deployment

  1. Download the reverse synchronization tool: PolarDBBackSync.jar.

  2. Prepare the configuration file: Create a configuration file named application.yaml with the following content:

    source:
        # Specify the private endpoint of the primary node of the PolarDB cluster.
        hostname: pc-xxx.pg.polardb.rds.aliyuncs.com
        # Specify the port of the primary node of the PolarDB cluster.
        port: 5432
        # Specify the name of the logical replication slot created in the preceding step.
        slotName: flink_slot
    
    target:
      # The region of the AWS DynamoDB instance.
      region: cn-north-1
    
    # (Optional) Specify table filtering conditions. You can specify either whiteTableSet or blackTableSet.
    filter:
      # whiteTableSet: Specify the tables that need to be reversely synchronized.
      # blackTableSet: Specify the tables that need not to be reversely synchronized.
      whiteTableSet:
      blackTableSet:
    
    # Specify the checkpoint interval of the Flink deployment.
    checkpoint:
      interval: 3000
  3. Upload the files: Go to the details page of the Flink workspace in the Realtime Compute for Apache Flink console. In the left-side navigation pane, click Artifacts. On the page that appears, click Upload Artifact to upload the PolarDBBackSync.jar and application.yaml files.

  4. Configure credentials: We recommend saving credentials as parameters. In the left-side navigation pane, click Variables. On the page that appears, add the following variables:

    Variable

    Value

    polardbusername

    The dedicated DynamoDB account created for the PolarDB cluster.

    polardbpassword

    The password of the dedicated DynamoDB account.

    dynamodbak

    The AccessKey ID of the AWS DynamoDB instance.

    dynamodbsk

    The Secret Access Key of the AWS DynamoDB instance.

  5. Create and start the deployment:

    1. In the left-side navigation pane, click O&M > Deployments. On the page that appears, click Create Deployment > JAR Deployment.

    2. Specify the parameters and click Deploy. The following table describes how to configure the main parameters. Configure other parameters based on your business requirements.

      Parameter

      Description

      Deployment Mode

      Select Stream Mode.

      Deployment Name

      Specify the name of the deployment. Example: PolarDBBackSync.

      Engine Version

      Select vvr-11.3-jdk11-flink-1.20.

      JAR URI

      Select the uploaded PolarDBBackSync.jar file.

      Entry Point Class

      Set this parameter to org.example.PolarDBCdcJob.

      Entry Point Main Arguments

      Set the following values:

      --polardbusername ${secret_values.polardbusername}

      --polardbpassword ${secret_values.polardbpassword}

      --dynamodbak ${secret_values.dynamodbak}

      --dynamodbsk ${secret_values.dynamodbsk}

      Additional Dependencies

      Select the uploaded application.yaml file.

    3. After the deployment is deployed, click Start in the Actions column corresponding to the deployment. On the panel that appears, select Initial Mode and click Start.

Validation and resource releasing

  • Validation: After the deployment is started, use the privileged account to connect to the polardb_internal_dynamodb database of the PolarDB cluster and execute the SELECT * FROM pg_replication_slots; statement. If the active field of the flink_slot changes to t (true), the Flink deployment is connected and you can proceed to the next step.

  • Resource releasing: If you no longer need to perform reverse synchronization, perform the following steps to release resources:

    • Realtime Compute for Apache Flink:

      • Stop the deployment: Go to the details page of the workspace in the Realtime Compute for Apache Flink console. Choose O&M > Deployments in the left-navigation pane. On the page that appears, find the deployment and click Stop in the Actions column.

      • Release the workspace: Go to the home page of the Realtime Compute for Apache Flink console, click Release Resources in the Actions column corresponding to the workspace.

    • PolarDB cluster: Use the privileged account to connect to the polardb_internal_dynamodb database. Then, run the following command to delete the logical replication slot:

      SELECT pg_drop_replication_slot('flink_slot');

Step 4: Perform the business cutover

After the latency of incremental synchronization becomes stable at a low level and the data consistency check is passed, you can plan for the business cutover. When you are ready for the final cutover, follow these steps carefully:

  1. Final check: Before the planned downtime window, run the consistency check tool repeatedly. Continue until the incremental synchronization latency is minimal and there are no data discrepancies, or the number of discrepancies is within an acceptable range.

  2. Stop writes to the source: At the beginning of the downtime window, pause all business applications that write data to the source AWS DynamoDB.

  3. Wait for synchronization to complete: Check the nimo-shake logs to confirm that the synchronization of all incremental data is complete.

  4. Final validation: Run the nimo-full-check tool again to ensure that the data in the source and target databases is completely consistent.

  5. Stop the synchronization tool: After you confirm that the data is fully synchronized, stop the nimo-shake process.

  6. Switch the application connection: Stop your business application. Then, update the database connection configuration of your business applications to use the DynamoDB endpoint of PolarDB instead of the AWS DynamoDB address.

  7. (Optional) Start a reverse synchronization task: During the business switchover to the PolarDB cluster, start a reverse synchronization task based on Realtime Compute for Apache Flink for rollback.

  8. Start the business application: Restart the business applications to complete the cutover.

  9. (Optional) Stop the reverse synchronization task: After the business is switched over and keeps running for a period of time, you can stop the reverse synchronization task and release related resources.

Appendix: Simulate real-time service traffic for a test environment

To simulate a realistic migration scenario with continuous data writes in a test environment, you can use the following Go sample code. The code periodically writes and updates data in the source DynamoDB table.

Note

This step is only for testing and validating the migration process. Do not perform this step in a production migration.

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/credentials"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

// --- Configuration for your source AWS DynamoDB ---
var (
    region    = "cn-north-1"       // AWS DynamoDB region
    accessKey = "your-aws-access-key" // AWS DynamoDB access key
    secretKey = "your-aws-secret-key" // AWS DynamoDB secret key
)

// --- Helper function to create a DynamoDB client ---
func createClient() (*dynamodb.Client, context.Context) {
    ctx := context.Background()
    sdkConfig, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
    if err != nil {
        log.Fatalf("Failed to load AWS config: %v", err)
    }
    client := dynamodb.NewFromConfig(sdkConfig, func(o *dynamodb.Options) {
        o.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
    })
    return client, ctx
}

// --- Function to create a table and populate it with initial data ---
func initializeData(client *dynamodb.Client, ctx context.Context) {
    tableName := "src1" // Example table name

    // Create table if not exists
    _, err := client.CreateTable(ctx, &dynamodb.CreateTableInput{
        TableName: &tableName,
        AttributeDefinitions: []types.AttributeDefinition{
            {AttributeName: aws.String("pk"), AttributeType: types.ScalarAttributeTypeS},
        },
        KeySchema: []types.KeySchemaElement{
            {AttributeName: aws.String("pk"), KeyType: types.KeyTypeHash},
        },
        ProvisionedThroughput: &types.ProvisionedThroughput{
            ReadCapacityUnits:  aws.Int64(100),
            WriteCapacityUnits: aws.Int64(100),
        },
    })
    if err != nil {
		// Ignore if table already exists, fail on other errors
        if _, ok := err.(*types.ResourceInUseException); !ok {
			log.Fatalf("CreateTable failed for %s: %v", tableName, err)
		}
    }

    fmt.Printf("Waiting for table '%s' to become active...\n", tableName)
    waiter := dynamodb.NewTableExistsWaiter(client)
    err = waiter.Wait(ctx, &dynamodb.DescribeTableInput{TableName: &tableName}, 5*time.Minute)
    if err != nil {
        log.Fatalf("Waiter failed for table %s: %v", tableName, err)
    }

    // Insert 100 sample items
    for i := 0; i < 100; i++ {
        pk := fmt.Sprintf("%s_user_%03d", tableName, i)
        item := map[string]types.AttributeValue{
            "pk":  &types.AttributeValueMemberS{Value: pk},
            "val": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", i)},
        }
        client.PutItem(ctx, &dynamodb.PutItemInput{TableName: &tableName, Item: item})
    }
    fmt.Printf("Inserted 100 initial items into '%s'.\n", tableName)
}

// --- Function to simulate continuous business traffic ---
func simulateTraffic(client *dynamodb.Client, ctx context.Context) {
    tableName := "src1"
    fmt.Println("Starting periodic updates to simulate traffic. Press Ctrl+C to stop.")
    i := 0
    for {
        pk := fmt.Sprintf("%s_user_%03d", tableName, i%100)
        newValue := fmt.Sprintf("%d", rand.Intn(1000))
        _, err := client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
            TableName: &tableName,
            Key: map[string]types.AttributeValue{
                "pk": &types.AttributeValueMemberS{Value: pk},
            },
            ExpressionAttributeValues: map[string]types.AttributeValue{
                ":newval": &types.AttributeValueMemberN{Value: newValue},
            },
            UpdateExpression: aws.String("SET val = :newval"),
        })
        if err != nil {
            fmt.Printf("Update error: %v\n", err)
        } else {
            fmt.Printf("Updated pk=%s with new val=%s\n", pk, newValue)
        }
        i++
        time.Sleep(1 * time.Second) // Update one record per second
    }
}

func main() {
    client, ctx := createClient()
    initializeData(client, ctx)
    simulateTraffic(client, ctx)
}