This topic describes how to create a custom result table in Realtime Compute to meet diversified output requirements.

Notice This topic only applies to Realtime Compute in exclusive mode.

Build a development environment

You can use the following two methods to build a development environment for a custom result table:
  • Use the environment in the demo.
    Realtime Compute provides a demo that you refer to when you create a custom result table. This allows you to quickly develop your business data.
    Note The demo provides the development environment of the related version. You do not need to build the development environment.
  • Download a JAR package and build your own environment.
    Note If you reference the following dependency packages in a Maven project, you must set the Scope parameter to <scope>provided</scope>.
    • Realtime Compute V3.0.0
      • JAR packages that you need to download
        Note You must add the following information to the pom.xml file to complete the automatic downloading of the flink-table_2.11 JAR package.
        <profiles>
          <profile>
             <id>allow-snapshots</id>
                <activation><activeByDefault>true</activeByDefault></activation>
             <repositories>
               <repository>
                 <id>snapshots-repo</id>
                 <url>https://oss.sonatype.org/content/repositories/snapshots</url>
                 <releases><enabled>false</enabled></releases>
                 <snapshots><enabled>true</enabled></snapshots>
               </repository>
             </repositories>
           </profile>
        </profiles>
      • Dependency packages
         <dependencies>
          <dependency>
            <groupId>com.alibaba.blink</groupId>
            <artifactId>blink-connector-common</artifactId>
            <version>blink-3.2.1-SNAPSHOT</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>com.alibaba.blink</groupId>
            <artifactId>blink-connector-custom</artifactId>
            <version>blink-3.2.1-SNAPSHOT</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>com.alibaba.blink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>blink-3.2.1-SNAPSHOT</version>
            <scope>provided</scope>
          </dependency>
        </dependencies>
    • Realtime Compute V2.0.0
      • JAR packages that you need to download
      • Dependency packages
          <dependencies>
            <dependency>
              <groupId>com.alibaba.blink</groupId>
              <artifactId>blink-table</artifactId>
              <version>blink-2.2.4-SNAPSHOT</version>
              <scope>provided</scope>
            </dependency>
            <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-table_2.11</artifactId>
              <version>blink-2.2.4-SNAPSHOT</version>
              <scope>provided</scope>
            </dependency>
            <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-core</artifactId>
              <version>blink-2.2.4-SNAPSHOT</version>
              <scope>provided</scope>
            </dependency>
            <dependency>
              <groupId>com.alibaba.blink</groupId>
              <artifactId>blink-connector-common</artifactId>
              <version>blink-2.2.4-SNAPSHOT</version>
              <scope>provided</scope>
            </dependency>
            <dependency>
              <groupId>com.alibaba.blink</groupId>
              <artifactId>blink-connector-custom</artifactId>
              <version>blink-2.2.4-SNAPSHOT</version>
              <scope>provided</scope>
            </dependency>
          </dependencies>
    • Realtime Compute V1.0.0
      • JAR packages that you need to download
      • Dependency packages
              <dependencies>
                <dependency>
                    <groupId>com.alibaba.blink</groupId>
                    <artifactId>blink-connector-common</artifactId>
                    <version>blink-1.4-SNAPSHOT</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>com.alibaba.blink</groupId>
                    <artifactId>blink-connector-custom</artifactId>
                    <version>blink-1.4-SNAPSHOT</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                    <version>blink-1.4-SNAPSHOT</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-core</artifactId>
                    <version>blink-1.4-SNAPSHOT</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>com.alibaba.blink</groupId>
                    <artifactId>blink-table</artifactId>
                    <version>blink-1.4-SNAPSHOT</version>
                    <scope>provided</scope>
                </dependency>
            </dependencies>

API description

The class of a custom result table must inherit the base class CustomSinkBase of the custom sink plug-in and implement the following methods:
protected Map<String,String> userParamsMap; // userParamsMap is the key-value pair defined in the WITH clause of custom SQL statements. All keys are in lowercase letters.
protected Set<String> primaryKeys; // primaryKeys is the custom primary key field.
protected List<String> headerFields; // headerFields is the list of fields marked as header.
protected RowTypeInfo rowTypeInfo; // rowTypeInfo indicates the field type and name.
/**
 * The initialization method. This method is called when you create a table for the first time or in the case of a failover.
 * 
 * @ param taskNumber   The ID of the current node.
 * @ param numTasks   The total number of sink nodes.
 * @throws IOException
 */
public abstract void open(int taskNumber,int numTasks) throws IOException;

/**
 * The close method, which is used to release resources.
 *
 * @throws IOException
 */
public abstract void close() throws IOException;

/**
 * Insert a single row of data.
 *
 * @param row
 * @throws IOException
 */
public abstract void writeAddRecord(Row row) throws IOException;

/**
 * Delete a single row of data.
 *
 * @param row
 * @throws IOException
 */
public abstract void writeDeleteRecord(Row row) throws IOException;

/**
 * If you want to use this method to insert multiple rows of data at a time, you must load all data cached in the threads to the downstream storage systems. If you do not want to do so, this method is not required.
 *
 * @throws IOException
 */
public abstract void sync() throws IOException;

/** 
* Return the class name. 
*/ 
public String getName();
After you upload the JAR packages to Realtime Compute and reference resources in the packages, you must specify type = 'custom' for your custom sink plug-in. In addition, you must specify the class that implements the method. The following code shows an example of a custom Redis result table. Download the demo.
create table in_table(
    kv varchar 
)with(
    type = 'random'
);

create table out_table(
    `key` varchar,
    `value` varchar
)with(
    type = 'custom',
    class = 'com.alibaba.blink.connector.custom.demo.RedisSink',
    -- **1. You can define more custom parameters. These parameters can be obtained by using userParamsMap in the open function. ** 
    -- **2. Keys in the WITH clause are not case-sensitive. In Realtime Compute, the values of key parameters are processed as lowercase values. We recommend that you declare keys in lowercase letters in the data definition language (DDL) statements that reference the data store. **
    host = 'r-uf****.redis.rds.aliyuncs.com',
    port = '6379',
    db = '0',
    batsize = '10',
    password = '<yourHostPassword>'
);

insert into out_table
select
substring(kv,0,4) as `key`,
substring(kv,0,6) as `value`
from in_table;
The following table describes parameters of the Redis sink plug-in.
Parameter Description Required Remarks
host The internal endpoint of an ApsaraDB for Redis instance. Yes None
port The number of the port that is used to connect to an ApsaraDB for Redis database. Yes None
password The password that is used to connect to an ApsaraDB for Redis instance. Yes None
db The number of an ApsaraDB for Redis database. No Default value: 0. This value indicates db0.
batchsize The size of data records to be written at a time. No Default value: 1. This value indicates that data is not written in batches.