このドキュメントでは、Realtime Compute で カスタム結果テーブルを作成する方法について説明します。
注 排他モードでデプロイされた Realtime Compute にのみ適用されます。
多様な出力要件を満たすために、Realtime Compute でシンクプラグインをカスタマイズできるようになりました。 Maven プロジェクトには、次の依存パッケージが必要です。
スコープ設定は <scope>provided</scope>
です。
JAR パッケージのダウンロード
Realtime Compute V2.0 以降の依存関係
<dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>blink-table</artifactId>
<version>blink-2.2.4-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>blink-2.2.4-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>blink-2.2.4-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>blink-connector-common</artifactId>
<version>blink-2.2.4-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>blink-connector-custom</artifactId>
<version>blink-2.2.4-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
</dependency>
API の説明
カスタム結果テーブルクラスは、カスタムシンクプラグインの基本クラスを継承し、次のメソッドを実装する必要があります。
protected Map<String,String> userParamsMap;// The key-value pairs defined by you in the SQL WITH statement. All keys are in lowercase.
protected Set<String> primaryKeys;// The custom primary key fields.
protected List<String> headerFields;// The list of fields marked as header.
protected RowTypeInfo rowTypeInfo;// The field type and name.
/**
* The initialization method. This method is called when you create a table for the first time or in the case of failover.
*
* @param taskNumber // The ID of the current operator.
* @param numTasks // The total number of sink operators.
* @throws IOException
*/
public abstract void open(int taskNumber,int numTasks) throws IOException;
/**
* The close method, which is used to release resources.
*
* @throws IOException
*/
public abstract void close() throws IOException;
/**
* Insert a single row of data.
*
* @param row
* @throws IOException
*/
public abstract void writeAddRecord(Row row) throws IOException;
/**
* Delete a single row of data.
*
* @param row
* @throws IOException
*/
public abstract void writeDeleteRecord(Row row) throws IOException;
/**
* If you want to use this method to insert multiple rows at a time, you need to load all data cached in threads to the downstream operator. If you do not want to insert multiple rows at a time, this method is not required.
*
* @throws IOException
*/
public abstract void sync() throws IOException;
/**
* Return the class name.
*/
public String getName();
JAR パッケージを Realtime Compute にアップロードしてリソースを参照した後、
type = 'custom'
をカスタムシンクプラグインに 指定する必要があります。 さらに、メソッドを実装するクラスを指定します。 以下に、カスタム Redis 結果テーブルの例を示します
(ここからデモをダウンロードします)。
create table in_table(
kv varchar
)with(
type = 'random'
);
create table out_table(
`key` varchar,
`value` varchar
)with(
type = 'custom',
class = 'com.alibaba.blink.connector.custom.demo.RedisSink',
-- **You can define more user parameters, which can be obtained by using userParamsMap in the open() function.**
host = 'r-uf****.redis.rds.aliyuncs.com',
port = '6379',
db = '0',
batsize = '10',
password = '‘<yourDatabasePassword>'
);
insert into out_table
select
substring(kv,0,4) as `key`,
substring(kv,0,6) as `value`
from in_table;
次の表に、Redis シンクプラグインのパラメーターを示します。
名前 | 説明 | 必須/省略可能 | 備考 |
---|---|---|---|
host | Redis インスタンスのイントラネット接続アドレス。 | 必須 | なし |
port | Redis インスタンスのポート番号。 | 必須 | なし |
password | Redis インスタンスへの接続に使用されるパスワード。 | 必須 | なし |
db | データベース ID。 | 省略可能 | デフォルト値:0。db0 を示します。 |
batchsize | 一度に書き込まれるデータレコードの数。 | 省略可能 | デフォルト値:1。一度に複数のデータレコードの書き込みがサポートされていないことを示します。 |