This topic provides the DDL syntax that is used to create an ApsaraDB for MongoDB result table, describes the parameters in the WITH clause, and provides sample code.

What is ApsaraDB for MongoDB?

ApsaraDB for MongoDB is a MongoDB-compatible database service that is developed based on the Apsara distributed operating system and a high-reliability storage engine. ApsaraDB for MongoDB uses a multi-node architecture to ensure high availability, and supports elastic scaling, disaster recovery, backup and restoration, and performance optimization.

Prerequisites

An ApsaraDB for MongoDB instance is created. For more information, see Create a standalone instance.

Limits

  • Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the ApsaraDB for MongoDB connector.
  • You can use ApsaraDB for MongoDB tables only as result tables.
  • You cannot update the primary key in an ApsaraDB for MongoDB result table. Therefore, data that has the same primary key is inserted repeatedly into the table.

DDL syntax

CREATE TABLE mongodb_sink(
  id INT, 
  number INT
) WITH (
  'connector' = 'mongodb',
  'database' = '<yourDatabase>',
  'collection' = '<yourCollection>', 
  'uri' = '<yourUri>',
  'maxConnectionIdleTime' = '<yourMaxConnectionIdleTime>',  
  'batchSize' = '1024'  
);

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to mongodb.
database The name of the database. Yes N/A.
collection A collection of data. Yes N/A.
uri The Uniform Resource Identifier (URI) of the ApsaraDB for MongoDB database. Yes Example: mongodb://root:xxxxx@dds-xxxxxx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xxxxxxx.
maxConnectionIdleTime The connection timeout period. No Default value: 60000. Unit: milliseconds.
batchSize The number of data records that can be written at the same time. No Default value: 1024.

Data type mappings

Data type of ApsaraDB for MongoDB Data type of Flink
String STRING
Double DOUBLE
Boolean BOOLEAN
32-bit integer INT
64-bit integer BIGINT
Array ARRAY
Date DATE
Timestamp TIMESTAMP

Sample code

CREATE TEMPORARY TABLE datagen_source (
  v INT, 
  p INT
) with (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE mongodb_sink(
  v INT, 
  p INT
) with (
  'connector'='mongodb',
  'database' = '<yourDatabase>',
  'collection' = '<yourCollection>', 
  'uri'='<yourUri>'
);

INSERT INTO mongodb_sink 
   SELECT v, p
FROM datagen_source;