このドキュメントでは、Realtime Compute で カスタム結果テーブルを作成する方法について説明します。

排他モードでデプロイされた Realtime Compute にのみ適用されます。

多様な出力要件を満たすために、Realtime Compute でシンクプラグインをカスタマイズできるようになりました。 Maven プロジェクトには、次の依存パッケージが必要です。 スコープ設定は <scope>provided</scope> です。

JAR パッケージのダウンロード

  1. blink-connector-common-blink-2.2.4
  2. blink-connector-custom-blink-2.2.4
  3. blink-table-blink-2.2.4
  4. flink-table_2.11-blink-2.2.4
  5. flink-core-blink-2.2.4

Realtime Compute V2.0 以降の依存関係



   <dependency> 
    <dependency> 
      <groupId>com.alibaba.blink</groupId> 
      <artifactId>blink-table</artifactId> 
      <version>blink-2.2.4-SNAPSHOT</version> 
      <scope>provided</scope> 
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table_2.11</artifactId>
      <version>blink-2.2.4-SNAPSHOT</version>
      <scope>provided</scope> 
    </dependency>
    <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-core</artifactId>
      <version>blink-2.2.4-SNAPSHOT</version>
      <scope>provided</scope> 
    </dependency> 
    <dependency> 
      <groupId>com.alibaba.blink</groupId>
      <artifactId>blink-connector-common</artifactId>
      <version>blink-2.2.4-SNAPSHOT</version> 
      <scope>provided</scope>
    </dependency> 
    <dependency> 
      <groupId>com.alibaba.blink</groupId> 
      <artifactId>blink-connector-custom</artifactId>
      <version>blink-2.2.4-SNAPSHOT</version>
      <scope>provided</scope>
    </dependency> 
  </dependency>
			

API の説明

カスタム結果テーブルクラスは、カスタムシンクプラグインの基本クラスを継承し、次のメソッドを実装する必要があります。

protected Map<String,String> userParamsMap;// The key-value pairs defined by you in the SQL WITH statement. All keys are in lowercase.
protected Set<String> primaryKeys;// The custom primary key fields.
protected List<String> headerFields;// The list of fields marked as header.
protected RowTypeInfo rowTypeInfo;// The field type and name.
/**
 * The initialization method. This method is called when you create a table for the first time or in the case of failover.
 *
 * @param taskNumber  // The ID of the current operator.
 * @param numTasks  // The total number of sink operators.
 * @throws IOException
 */
public abstract void open(int taskNumber,int numTasks) throws IOException;

/**
 * The close method, which is used to release resources.
 *
 * @throws IOException
 */
public abstract void close() throws IOException;

/**
 * Insert a single row of data.
 *
 * @param row
 * @throws IOException
 */
public abstract void writeAddRecord(Row row) throws IOException;

/**
 * Delete a single row of data.
 *
 * @param row
 * @throws IOException
 */
public abstract void writeDeleteRecord(Row row) throws IOException;

/**
 * If you want to use this method to insert multiple rows at a time, you need to load all data cached in threads to the downstream operator. If you do not want to insert multiple rows at a time, this method is not required.
 *
 * @throws IOException
 */
public abstract void sync() throws IOException;

/** 
* Return the class name. 
*/ 
public String getName();
			
JAR パッケージを Realtime Compute にアップロードしてリソースを参照した後、 type = 'custom' をカスタムシンクプラグインに 指定する必要があります。 さらに、メソッドを実装するクラスを指定します。 以下に、カスタム Redis 結果テーブルの例を示します (ここからデモをダウンロードします)。
create table in_table(
    kv varchar 
)with(
    type = 'random'
);

create table out_table(
    `key` varchar,
    `value` varchar
)with(
    type = 'custom',
    class = 'com.alibaba.blink.connector.custom.demo.RedisSink',
    -- **You can define more user parameters, which can be obtained by using userParamsMap in the open() function.** 
    host = 'r-uf****.redis.rds.aliyuncs.com',
    port = '6379',
    db = '0',
    batsize = '10',
    password = '‘<yourDatabasePassword>'
);

insert into out_table
select
substring(kv,0,4) as `key`,
substring(kv,0,6) as `value`
from in_table;
次の表に、Redis シンクプラグインのパラメーターを示します。
名前 説明 必須/省略可能 備考
host Redis インスタンスのイントラネット接続アドレス。 必須 なし
port Redis インスタンスのポート番号。 必須 なし
password Redis インスタンスへの接続に使用されるパスワード。 必須 なし
db データベース ID。 省略可能 デフォルト値:0。db0 を示します。
batchsize 一度に書き込まれるデータレコードの数。 省略可能 デフォルト値:1。一度に複数のデータレコードの書き込みがサポートされていないことを示します。