This topic provides the DDL syntax that is used to create an ApsaraDB for Lindorm dimension table, describes the parameters in the WITH clause, and provides the data type mapping method and sample code.

What is ApsaraDB for Lindorm?

ApsaraDB for Lindorm is a cloud-native multi-mode hyper-converged database service that is designed and optimized for Internet of Things (IoT), Internet, and Internet of Vehicles (IoV). ApsaraDB for Lindorm is suitable for various scenarios, such as logging, monitoring, billing, advertising, social networking, travel, and risk management. ApsaraDB for Lindorm is also one of the database services that support the core business of Alibaba Cloud.

ApsaraDB for Lindorm provides the following features:
  • Supports unified access and processing of various data, such as wide tables, time series, text, objects, streams, and spaces.
  • Is compatible with multiple standard interfaces, such as SQL, Apache HBase, Apache Cassandra, Amazon S3, Time Series Database (TSDB), Hadoop Distributed File System (HDFS), Apache Solr, and Kafka. ApsaraDB for Lindorm can also be seamlessly integrated with third-party ecosystem tools.

Prerequisites

  • An ApsaraDB for Lindorm wide table engine and an ApsaraDB for Lindorm table are created.
  • A network connection is established between the ApsaraDB for Lindorm cluster and the fully managed Flink cluster. For example, the ApsaraDB for Lindorm cluster and the fully managed Flink cluster reside in the same virtual private cloud (VPC).

Limits

Only Flink that uses VVR 4.0.8 or later supports ApsaraDB for Lindorm.

Precautions

When you declare a dimension table, you must specify the primary key. When you join a dimension table with another table, the ON condition must contain an equivalent condition that includes the primary key of one of the tables.

DDL syntax

CREATE TABLE white_list (
 id varchar,
 name varchar,
 age int,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'lindorm', 
 'seedserver' = '<yourSeedServer>',
 'namespace' = '<yourNamespace>',
 'username' = '<yourUsername>',
 'password' = '<yourPassword>',
 'tableName' = '<yourTableName>',
 'columnFamily' = '<yourColumnFamily>'
);

Parameters in the WITH clause

Parameter Description Required Remarks
seedserver The endpoint of the ApsaraDB for Lindorm server. Yes Fully managed Flink calls Java APIs to access ApsaraDB for Lindorm.

The endpoint of the ApsaraDB for Lindorm server is in the host:port format.

namespace The namespace of the ApsaraDB for Lindorm database. Yes N/A.
username The username that is used to access the ApsaraDB for Lindorm database. Yes N/A.
password The password that is used to access the ApsaraDB for Lindorm database. Yes N/A.
tableName The name of the ApsaraDB for Lindorm table. Yes N/A.
columnFamily The name of the column family of the ApsaraDB for Lindorm table. Yes If the column family name is not specified when you create the ApsaraDB for Lindorm table, enter f.
retryIntervalMs The interval at which the read operation is retried when data reading fails. No Default value: 1000. Unit: milliseconds.
maxRetryTimes The maximum number of retries for reading data. No Default value: 5.
partitionedJoin Specifies whether to use the JoinKey for partitioning. No Valid values:
  • true: The JoinKey is used for partitioning. Data is distributed to each JOIN node to improve the cache hit rate.
  • false: The JoinKey is not used for partitioning. This is the default value.
shuffleEmptyKey Specifies whether to randomly shuffle empty upstream keys to downstream nodes. No Valid values:
  • true: The system randomly shuffles empty upstream keys to downstream nodes.
  • false: The system shuffles empty upstream keys to the first parallel thread of each downstream node. The first parallel thread is numbered 0. This is the default value.

Cache parameters

Parameter Description Required Remarks
cache The cache policy. No Valid values:
  • None: indicates that no data is cached. This is the default value.
  • LRU: indicates that only recently accessed data in the dimension table is cached.

    If you set the cache parameter to LRU, you can configure the cacheSize and cacheTTLMs parameters.

cacheSize The number of rows of data that can be cached. No Default value: 10000. If you set the cache parameter to LRU, you can configure the cacheSize parameter.
cacheTTLMs The cache timeout period. No Unit: milliseconds. If you set the cache parameter to LRU, you can configure the cacheTTLMs parameter. By default, cache entries do not expire.

Data type mapping method

Fully managed Flink parses the ApsaraDB for Lindorm data (byte[] bytes) of the related field based on the data type that is defined in a table. The following table describes how fully managed Flink parses ApsaraDB for Lindorm data.
Flink SQL data type Operations that are performed on ApsaraDB for Lindorm data
CHAR org.apache.flink.table.data.StringData::fromBytes
VARCHAR
BOOLEAN com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal
BINARY Return bytes.
VARBINARY
DECIMAL com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal
TINYINT Return bytes[0].
SMALLINT com.alibaba.lindorm.client.core.utils.Bytes::toShort
INT com.alibaba.lindorm.client.core.utils.Bytes::toInt
BIGINT com.alibaba.lindorm.client.core.utils.Bytes::toLong
FLOAT com.alibaba.lindorm.client.core.utils.Bytes::toFloat
DOUBLE com.alibaba.lindorm.client.core.utils.Bytes::toDouble
DATE Call com.alibaba.lindorm.client.core.utils.Bytes::toInt to obtain the number of days since January 1, 1970.
TIME Call com.alibaba.lindorm.client.core.utils.Bytes::toInt to obtain the number of milliseconds since 00:00:00 of the current day.
TIMESTAMP Call com.alibaba.lindorm.client.core.utils.Bytes::toLong to obtain the number of milliseconds since 00:00:00 on January 1, 1970.

Sample code

CREATE TEMPORARY TABLE lindorm_source(
 id INT,
 proc_time AS PROCTIME()
) WITH (
 'connector' = 'datagen',
 'number-of-rows' = '10',
 'fields.id.kind' = 'sequence',
 'fields.id.start' = '0',
 'fields.id.end' = '9'
);

CREATE TEMPORARY TABLE lindorm_hbase_dim(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_dim_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

CREATE TEMPORARY TABLE lindorm_hbase_sink(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_sink_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

INSERT INTO lindorm_hbase_sink
SELECT lindorm_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM lindorm_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON lindorm_source.id = lindorm_hbase_dim.id;