全部產品
Search
文件中心

Tablestore:使用教程

更新時間:Jan 14, 2025

本文為您介紹如何使用Flink計算Tablestore的資料,Tablestore中的資料表或時序表均可作為Realtime ComputeFlink的源表或結果表進行使用。

前提條件

Realtime Compute作業開發流程

步驟一:建立作業

  1. 進入SQL作業建立頁面。

    1. 登入Realtime Compute控制台

    2. 單擊目標工作空間操作列下的控制台

    3. 在左側導覽列,單擊資料開發 > ETL

  2. 單擊建立後,在新增作業草稿對話方塊,選擇空白的流作業草稿,單擊下一步

    說明

    Flink也為您提供了豐富的代碼模板和資料同步,每種代碼模板都為您提供了具體的使用情境、程式碼範例和使用指導。您可以直接單擊對應的模板快速地瞭解Flink產品功能和相關文法,實現您的商務邏輯,詳情請參見代碼模板資料同步模板

  3. 填寫作業資訊

    作業參數

    說明

    樣本

    檔案名稱

    作業的名稱。

    說明

    作業名稱在當前專案中必須保持唯一。

    flink-test

    儲存位置

    指定該作業的代碼檔案所屬的檔案夾。

    您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

    作業草稿

    引擎版本

    當前作業使用的Flink的引擎版本,引擎版本詳情請參見功能發布記錄引擎版本介紹

    vvr-8.0.10-flink-1.17

  4. 單擊建立

步驟二:編寫SQL作業

說明

此處以將資料表中的資料同步至另一個資料表為例,為您介紹如何編寫SQL作業。更多SQL樣本,請參考SQL樣本

  1. 分別建立源表(資料表)和結果表(資料表)的暫存資料表。

    詳細配置資訊,請參見附錄1:Tablestore連接器

    -- 建立源表(資料表)的暫存資料表 tablestore_stream
    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector' = 'ots', -- 源表的連接器類型。固定取值為ots。
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Table Store執行個體的VPC地址。
        'instanceName' = 'xxx', -- Table Store的執行個體名稱。
        'tableName' = 'flink_source_table', -- Table Store的源表名稱。
        'tunnelName' = 'flink_source_tunnel', -- Table Store源表的資料通道名稱。
        'accessId' = 'xxxxxxxxxxx', -- 阿里雲帳號或者RAM使用者的AccessKey ID。
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- 阿里雲帳號或者RAM使用者的AccessKey Secret。
        'ignoreDelete' = 'false' -- 是否忽略DELETE操作類型的即時資料:不忽略。
    );
    
    -- 建立結果表(資料表)的暫存資料表 tablestore_sink
    CREATE TEMPORARY TABLE tablestore_sink(
       `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED -- 主鍵。
    ) WITH (
        'connector' = 'ots', -- 結果表的連接器類型。固定取值為ots。
        'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Table Store執行個體的VPC地址。
        'instanceName' = 'xxx', -- Table Store的執行個體名稱。
        'tableName' = 'flink_sink_table', -- Table Store的結果表名稱。
        'accessId' = 'xxxxxxxxxxx',  -- 阿里雲帳號或者RAM使用者的AccessKey ID。
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- 阿里雲帳號或者RAM使用者的AccessKey Secret。
        'valueColumns'='customerid,customername' --插入欄位的列名。
    );
  2. 編寫作業邏輯。

    將源表資料插入到結果表的程式碼範例如下:

    --將源表資料插入到結果表
    INSERT INTO tablestore_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

步驟三:(可選)查看配置資訊

在SQL編輯地區右側頁簽,您可以查看或上傳相關配置。

頁簽名稱

配置說明

更多配置

  • 引擎版本:當前作業使用的Flink的引擎版本。

  • 附加依賴檔案:作業中需要使用到的附加依賴,例如臨時函數等。

    您可以下載VVR依賴,並在資源檔頁簽進行上傳,然後選擇附加依賴檔案為上傳的VVR依賴即可。具體操作,請參見附錄2:配置VVR依賴

代碼結構

  • 資料流向圖:您可以通過資料流向圖快速查看資料的流向。

  • 樹狀結構圖:您可以通過樹狀結構圖快速查看資料的來源。

版本資訊

您可以在此處查看作業版本資訊,操作列下的功能詳情請參見管理作業版本

步驟四:(可選)進行深度檢查

深度檢查能夠檢查作業的SQL語義、網路連通性以及作業使用的表的中繼資料資訊。同時,您可以單擊結果地區的SQL最佳化,展開查看SQL風險問題提示以及對應的SQL最佳化建議。

  1. 在SQL編輯地區右上方,單擊深度檢查

  2. 深度檢查對話方塊,單擊確認

步驟五:(可選)進行作業調試

您可以使用作業調試功能類比作業運行、檢查輸出結果,驗證SELECT或INSERT商務邏輯的正確性,提升開發效率,降低資料品質風險。

  1. 在SQL編輯地區右上方,單擊調試

  2. 調試對話方塊,選擇調試叢集後,單擊下一步

    如果沒有可用叢集則需要建立新的Session叢集,Session叢集與SQL作業引擎版本需要保持一致並處於運行中。詳情請參見建立Session叢集

  3. 配置調試資料。

    • 如果您使用線上資料,無需處理。

    • 如果您需要使用調試資料,需要先單擊下載調試資料範本,填寫調試資料後,上傳調試資料。詳情請參見作業調試

  4. 確定好調試資料後,單擊確定

步驟六:進行作業部署

在SQL編輯地區右上方,單擊部署,在部署新版本對話方塊,可根據需要填寫或選中相關內容,單擊確定

說明

Session叢集適用於非生產環境的開發測試環境,通過部署或調試作業提高作業JM(Job Manager)資源使用率和提高作業啟動速度。但不推薦您將生產作業提交至Session叢集中,可能會導致業務穩定性問題。

步驟七:啟動並查看Flink計算結果

  1. 在左側導覽列,單擊營運中心 > 作業營運

  2. 單擊目標作業操作列中的啟動

    選擇無狀態啟動後,單擊啟動。當作業狀態轉變為運行中時,代表作業運行正常。作業啟動參數配置,詳情請參見作業啟動

    說明
    • Flink中的每個TaskManager建議配置2CPU和4GB記憶體,此配置可以充分發揮每個TaskManager的計算能力。單個TaskManager能達到1萬/s的寫入速率。

    • 在source表分區數目足夠多的情況下,建議Flink中並發配置在16以內,寫入速率隨並發線性增長。

  3. 在作業營運詳情頁面,查看Flink計算結果。

    1. 營運中心 > 作業營運頁面,單擊目標作業名稱。

    2. 作業日誌頁簽,單擊運行Task Managers頁簽下Path,ID列的目標任務。

    3. 單擊日誌,在頁面查看相關的日誌資訊。

  4. (可選)停止作業。

    如果您對作業進行了修改(例如更改代碼、增刪改WITH參數、更改作業版本等),且希望修改生效,則需要重新部署作業,然後停止再啟動。另外,如果作業無法複用State,希望作業全新啟動時,或者更新非動態生效的參數配置時,也需要停止後再啟動作業。作業停止詳情請參見作業停止

附錄

附錄1:Tablestore連接器

Realtime ComputeFlink版內建了Table StoreTablestore連接器,用於Tablestore的資料讀寫與同步。

源表

DDL定義
資料表

資料表作為源表的DDL定義樣本如下:

-- 建立源表(資料表)的暫存資料表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false'
);
時序表

時序表作為源表的DDL定義樣本如下:

-- 建立源表(時序表)的暫存資料表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

屬性列支援讀取待消費欄位和Tunnel Service,以及返回資料中的OtsRecordTypeOtsRecordTimestamp兩個欄位。欄位說明請參見下表。

欄位名

Flink映射名

描述

OtsRecordType

type

資料操作類型。

OtsRecordTimestamp

timestamp

資料操作時間,單位為微秒。

說明

全量讀取資料時,取值為0。

WITH參數

參數

適用表

是否必填

描述

connector

通用參數

源表的連接器類型。固定取值為ots。

endPoint

通用參數

Tablestore執行個體的服務地址,必須使用VPC地址。更多資訊,請參見服務地址

instanceName

通用參數

Tablestore的執行個體名稱。

tableName

通用參數

Tablestore的源表名稱。

tunnelName

通用參數

Tablestore源表的通道名稱。關於建立通道的具體操作,請參見建立資料通道

accessId

通用參數

阿里雲帳號或者RAM使用者的AccessKey(包括AccessKey ID和AccessKey Secret)。

重要

為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID和AccessKey Secret,詳情請參見變數管理

accessKey

通用參數

connectTimeout

通用參數

連接器串連Tablestore的逾時時間,單位為毫秒。預設值為30000。

socketTimeout

通用參數

連接器串連Tablestore的Socket逾時時間,單位為毫秒。預設值為30000。

ioThreadCount

通用參數

IO線程數量。預設值為4。

callbackThreadPoolSize

通用參數

回調線程池大小。預設值為4。

ignoreDelete

資料表

是否忽略DELETE操作類型的即時資料。預設值為false,表示不忽略DELETE操作類型的即時資料。

skipInvalidData

通用參數

是否忽略髒資料。預設值為false,表示不忽略髒資料。如果不忽略髒資料,則處理髒資料時會報錯。

重要

僅Realtime Compute引擎VVR 8.0.4及以上版本支援該參數。

retryStrategy

通用參數

重試策略。參數取值如下:

  • TIME(預設值):在逾時時間retryTimeoutMs內持續進行重試。

  • COUNT:在最大重試次數retryCount內持續進行重試。

retryCount

通用參數

重試次數。當retryStrategy設定為COUNT時,可以設定重試次數。預設值為3。

retryTimeoutMs

通用參數

重試的逾時時間,單位為毫秒。當retryStrategy設定為TIME時,可以設定重試的逾時時間。預設值為180000。

streamOriginColumnMapping

通用參數

原始列名到真實列名的映射。

說明

原始列名與真實列名之間,請使用半形冒號(:)隔開;多組映射之間,請使用半形逗號(,)隔開。例如origin_col1:col1,origin_col2:col2

outputSpecificRowType

通用參數

是否透傳具體的RowType。參數取值如下:

  • false(預設值):不透傳,所有資料RowType均為INSERT。

  • true:透傳,將根據透傳的類型相應設定為INSERT、DELETE或UPDATE_AFTER。

類型映射

Tablestore欄位類型

Flink欄位類型

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

結果表

DDL定義
資料表

資料表作為結果表的DDL定義樣本如下:

-- 建立結果表(資料表)的暫存資料表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);
說明

Tablestore結果表必須定義主鍵(Primary Key)和至少一個屬性列,輸出資料以Update方式追加到Tablestore表。

時序表

時序模型結果表需要指定_m_name_data_source_tags_time四個主鍵,其餘配置與資料表的結果表配置相同。目前支援WITH參數,SINK表主鍵和Map格式主鍵三種方式指定時序表主鍵。三種方式_tags列的轉換優先順序為WITH參數方式的優先順序最高,Map格式主鍵與SINK表主鍵方式次之。

使用WITH參數方式

使用WITH參數方式定義DDL的樣本如下。

-- 建立結果表(時序表)的暫存資料表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
    'timeseriesSchema' = '{"measurement":"_m_name", "datasource":"_data_source", "tag_a":"_tags", "tag_b":"_tags", "tag_c":"_tags", "tag_d":"_tags", "tag_e":"_tags", "tag_f":"_tags", "time":"_time"}'
);

-- 將源表資料插入到結果表
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
使用Map格式主鍵方式

使用Map格式主鍵方式定義DDL的樣本如下。

說明

Tablestore引入了Flink的Map類型,以便於產生時序模型中時序表的_tags列,Map類型可以支援列的改名、簡單函數等映射操作。使用Map時必須保證其中的_tags主鍵聲明位置在第三位。

-- 建立結果表(時序表)的暫存資料表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- 將源表資料插入到結果表
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    MAP[`tag_a`, `tag_b`, `tag_c`, `tag_d`, `tag_e`, `tag_f`] AS tags,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
使用SINK表主鍵方式

使用SINK表主鍵方式定義DDL的樣本如下。主鍵定義中的第一位measurement為_m_name列,第二位datasource為_data_source列,最後一位time為time列,中間的多列為tag列。

-- 建立結果表(時序表)的暫存資料表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
    PRIMARY KEY(measurement, datasource, tag_a, tag_b, tag_c, tag_d, tag_e, tag_f, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- 將源表資料插入到結果表
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
WITH參數

參數

適用表

是否必填

說明

connector

通用參數

結果表的連接器類型。固定取值為ots。

endPoint

通用參數

Tablestore執行個體的服務地址,必須使用VPC地址。更多資訊,請參見服務地址

instanceName

通用參數

Tablestore的執行個體名稱。

tableName

通用參數

Tablestore的時序表名稱。

accessId

通用參數

阿里雲帳號或者RAM使用者的AccessKey(包括AccessKey ID和AccessKey Secret)。

重要

為了避免您的AK資訊泄露,建議您通過密鑰管理的方式填寫AccessKey ID和AccessKey Secret,詳情請參見變數管理

accessKey

通用參數

valueColumns

資料表

插入欄位的列名。多個欄位以半形逗號(,)分割,例如ID,NAME

storageType

通用參數

重要

當時序表作為結果表時,必須配置為TIMESERIES。

資料存放區類型。取值範圍如下:

  • WIDE_COLUMN(預設值):資料表

  • TIMESERIES:時序表

timeseriesSchema

時序表

重要

當時序表作為結果表時,如果使用WITH參數的方式指定時序表主鍵,則必須配置該參數。

需要指定為時序表主鍵的列。

  • 以JSON的key-value格式來指定時序表主鍵,例如{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}

  • 配置的主鍵類型必須與時序表中主鍵類型一致。其中tags主鍵可以支援同時包含多列。

connectTimeout

通用參數

連接器串連Tablestore的逾時時間,單位為毫秒。預設值為30000。

socketTimeout

通用參數

連接器串連Tablestore的Socket逾時時間,單位為毫秒。預設值為30000。

ioThreadCount

通用參數

IO線程數量。預設值為4。

callbackThreadPoolSize

通用參數

回調線程池大小。預設值為4。

retryIntervalMs

通用參數

稍候再試時間,單位為毫秒。預設值為1000。

maxRetryTimes

通用參數

最大重試次數。預設值為10。

bufferSize

通用參數

流入多少條資料後開始輸出。預設值為5000,表示輸入的資料達到5000條就開始輸出。

batchWriteTimeoutMs

通用參數

寫入逾時的時間,單位為毫秒。預設值為5000,表示如果緩衝中的資料在等待5秒後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。

batchSize

通用參數

一次批量寫入的條數。預設值為100,最大值為200。

ignoreDelete

通用參數

是否忽略DELETE操作類型的即時資料。預設值為false,表示不忽略DELETE操作類型的即時資料。

重要

僅資料表作為源表時可以根據需要配置。

autoIncrementKey

資料表

當結果表中包含主鍵自增列時,通過該參數指定主鍵自增列的列名稱。當結果表沒有主鍵自增列時,請不要設定此參數。

重要

僅Realtime Compute引擎VVR 8.0.4及以上版本支援該參數。

overwriteMode

通用參數

資料覆蓋模式。參數取值如下:

  • PUT(預設值):以PUT方式將資料寫入到Tablestore表。

  • UPDATE:以UPDATE方式寫入到Tablestore表。

說明

動態列模式下只支援UPDATE模式。

defaultTimestampInMillisecond

通用參數

設定寫入Tablestore資料的預設時間戳記。如果不指定,則使用系統當前的毫秒時間戳記。

dynamicColumnSink

通用參數

是否開啟動態列模式。預設值為false,表示不開啟動態列模式。

說明
  • 動態列模式適用於在表定義中無需指定列名,根據作業運行情況動態插入資料列的情境。建表語句中主鍵需要定義為前若干列,最後兩列中前一列的值作為列名變數,且類型必須為String,後一列的值作為該列對應的值。

  • 開啟動態列模式時,不支援主鍵自增列,且參數overwriteMode必須設定為UPDATE。

checkSinkTableMeta

通用參數

是否檢查結果表中繼資料。預設值為true,表示檢查Tablestore表的主鍵列和此處的建表語句中指定的主鍵是否一致。

enableRequestCompression

通用參數

資料寫入過程中是否開啟資料壓縮。預設值為false,表示不開啟資料壓縮。

類型映射

Flink欄位類型

Tablestore欄位類型

BINARY

BINARY

VARBINARY

BINARY

CHAR

STRING

VARCHAR

STRING

TINYINT

INTEGER

SMALLINT

INTEGER

INTEGER

INTEGER

BIGINT

INTEGER

FLOAT

DOUBLE

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

SQL樣本

同步源表資料到結果表
同步資料表資料到時序表

從源表(資料表)flink_source_table中讀取資料,並將結果寫入結果表(時序表)flink_sink_table。

SQL樣本如下:

-- 建立源表(資料表)的暫存資料表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- 使用With參數方式建立結果表(時序表)的暫存資料表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
     measurement STRING,
     datasource STRING,
     tag_a STRING,
     `time` BIGINT,
     binary_value BINARY,
     bool_value BOOLEAN,
     double_value DOUBLE,
     long_value BIGINT,
     string_value STRING,
     tag_b STRING,
     tag_c STRING,
     tag_d STRING,
     tag_e STRING,
     tag_f STRING,
     PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
 ) WITH (
     'connector' = 'ots',
     'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
     'instanceName' = 'xxx',
     'tableName' = 'flink_sink_table',
     'accessId' = 'xxxxxxxxxxx',
     'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
     'storageType' = 'TIMESERIES',
     'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
 );
 
--將源表資料插入到結果表
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
同步時序表資料到資料表

從源表(時序表)flink_source_table中讀取資料,並將結果寫入結果表(資料表)flink_sink_table。

SQL樣本如下:

-- 建立源表(時序表)的暫存資料表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

-- 建立結果表(資料表)的暫存資料表 print_table。
CREATE TEMPORARY TABLE tablestore_target(
    measurement STRING,
    datasource STRING,
    tags STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY (measurement,datasource, tags, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='binary_value,bool_value,double_value,long_value,string_value'
);

--將源表資料插入到結果表
INSERT INTO tablestore_target
SELECT
    _m_name,
    _data_source,
    _tags,
    _time,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from tablestore_stream;
讀取源表資料並列印到控制台

批量從源表flink_source_table中讀取資料,您可以使用作業調試功能類比作業運行,調試結果將顯示在SQL編輯器下方。

SQL樣本如下:

-- 建立源表(資料表)的暫存資料表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- 從源表讀取資料
SELECT * FROM tablestore_stream LIMIT 100;
讀取源表資料並列印到TaskManager日誌

從源表flink_source_table中讀取資料,並通過Print連接器將結果列印到TaskManager日誌中。

SQL樣本如下:

-- 建立源表(資料表)的暫存資料表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- 建立結果表的暫存資料表 print_table。
CREATE TEMPORARY TABLE print_table(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
  'connector' = 'print',   -- print連接器
  'logger' = 'true'        -- 控制台顯示計算結果
);

-- 列印源表的欄位
INSERT INTO print_table
SELECT `order`,orderid,customerid,customername from tablestore_stream;

附錄2:配置VVR依賴

  1. 下載VVR依賴

  2. 上傳VVR依賴。

    1. 登入Realtime Compute控制台

    2. 單擊目標工作空間操作列下的控制台

    3. 在左側導覽列,單擊檔案管理

    4. 資源檔頁簽,單擊上傳資源,選擇要上傳的VVR依賴的JAR包。

  3. 在目標作業的SQL編輯地區右側頁簽,單擊更多配置。在附加依賴檔案項,選擇目標VVR依賴的JAR包。