本文為您介紹MaxCompute連接器的文法結構、WITH參數和使用樣本等。
背景資訊
MaxComputeMaxCompute(原名ODPS)是一種快速、完全託管的EB級資料倉儲解決方案,致力於批量結構化資料的儲存和計算,提供海量資料倉儲的解決方案及分析建模服務。
MaxCompute連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表、維表和結果表 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不支援 |
特有監控指標 | |
API種類 | Datastream和SQL |
是否支援更新或刪除結果表資料 | Batch Tunnel和Stream Tunnel模式僅支援插入資料,Upsert Tunnel模式支援插入、更新和刪除資料。 |
前提條件
已建立MaxCompute表,詳情請參見建立表。
使用限制
MaxCompute連接器僅支援At Least Once語義。
說明At Least Once語義會保證資料不缺失,但在少部分情況下,可能會將重複資料寫入MaxCompute。不同的MaxCompute Tunnel出現重複資料的情況不同,MaxCompute Tunnel詳情請參見如何選擇資料通道?。
預設情況下源表為全量模式,僅會讀取partition參數中指定的分區,在讀完所有資料後結束運行,狀態轉換為finished,不會監控是否有新分區產生。
如果您需要持續監控新分區,請通過WITH參數中指定startPartition使用增量源表模式。
說明維表每次更新時都會檢查最新分區,不受這一限制。
在源表開始運行後,向分區裡添加的新資料不會被讀取,請在分區資料完整的情況下運行作業。
SQL
MaxCompute連接器可以在SQL作業中使用,作為源表,維表或者結果表。
文法結構
CREATE TEMPORARY TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);WITH參數
通用
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 表類型。 | String | 是 | 無 | 固定值為odps。 |
endpoint | MaxCompute服務地址。 | String | 是 | 無 | 請參見Endpoint。 |
tunnelEndpoint | MaxCompute Tunnel服務的串連地址。 | String | 否 | 無 | 請參見Endpoint。 說明 如果未填寫,MaxCompute會根據內部的負載平衡服務分配Tunnel的串連。 |
project | MaxCompute專案名稱。 | String | 是 | 無 | 無。 |
schemaName | MaxCompute Schema名稱。 | String | 否 | 無 | 僅當MaxCompute專案開啟Schema功能時,需填寫該值為MaxCompute表所屬Schema名,詳情請參見 Schema操作。 說明 僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。 |
tableName | MaxCompute表名。 | String | 是 | 無 | 無。 |
accessId | MaxCompute AccessKey ID。 | String | 是 | 無 | 詳情請參見如何查看AccessKey ID和AccessKey Secret資訊? 重要 為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數。 |
accessKey | MaxCompute AccessKey Secret。 | String | 是 | 無 | |
partition | MaxCompute分區名。 | String | 否 | 無 | 對於非分區表和增量源表無需填寫。 說明 分區表詳情請參見在讀取或寫入分區時,如何填寫Partition參數?。 |
compressAlgorithm | MaxCompute Tunnel使用的壓縮演算法。 | String | 否 | SNAPPY | 參數取值如下:
|
quotaName | MaxCompute獨享Data Transmission Service的quota名稱。 | String | 否 | 無 | 設定該值來使用獨享的MaxComputeData Transmission Service。 重要
|
源表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
maxPartitionCount | 可以讀取的最大分區數量。 | Integer | 否 | 100 | 如果讀取的分區數量超過了該參數,則會出現報錯 重要 一次性讀取過多分區會增加 MaxCompute 負載並拖慢作業啟動,建議確認partition參數是否誤配。如確需讀取大量分區,請手動調大maxPartitionCount |
useArrow | 是否使用Arrow格式讀取資料。 | Boolean | 否 | false | 使用Arrow格式能夠調用MaxCompute的Storage API。 重要
|
splitSize | 在使用Arrow格式讀取資料時,一次拉取的資料大小。 | MemorySize | 否 | 256 MB | 僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。 重要 僅在批作業中生效。 |
compressCodec | 在使用Arrow格式讀取資料時,採用的壓縮演算法。 | String | 否 | "" | 參數取值如下:
指定壓縮演算法相比無壓縮能帶來一定的吞吐提升。 重要
|
dynamicLoadBalance | 是否允許動態分配分區。 | Boolean | 否 | false | 參數取值如下:
允許動態分配分區能夠發揮Flink不同節點的處理效能,減少源表整體讀取時間,但也會導致不同節點讀取總資料量不一致,出現資料扭曲情況。 重要
|
增量源表專屬
增量源表通過間歇輪詢MaxCompute伺服器擷取所有的分區資訊來發現新增的分區,讀取新分區時要求分區內資料已寫入完畢,詳情參見增量MaxCompute源表監聽到新分區時,如果該分區還有資料沒有寫完,如何處理?。通過startPartition可以指定起始點位,但注意唯讀取字典序大於等於起始點位的分區,例如分區year=2023,month=10字典序小於分區year=2023,month=9,對於這種類型的分區聲明可以通過加0補齊的方式來保證字典序正確,例如year=2023,month=09。
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
startPartition | 增量讀取的起始MaxCompute分區點位(包含)。 | String | 是 | 無 |
說明 startPartition參數詳情,請參見如何填寫增量MaxCompute的startPartition參數?。 |
subscribeIntervalInSec | 輪詢MaxCompute擷取分區列表的時間間隔。 | Integer | 否 | 30 | 單位為秒。 |
modifiedTableOperation | 讀取分區過程中遇到分區資料被修改時的處理。 | Enum (NONE, SKIP) | 否 | NONE | 由於下載session被儲存在檢查點中,每次從檢查點恢複時嘗試從該session恢複讀取進度,而該session由於分區資料被修改不可用,Flink任務會陷入不斷重啟。此時您可以設定該參數,參數取值如下:
重要
|
結果表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
useStreamTunnel | 是否使用MaxCompute Stream Tunnel上傳資料。 | Boolean | 否 | false | 參數取值如下:
說明 資料通道選擇詳情請參見如何選擇資料通道?。 |
flushIntervalMs | MaxCompute Tunnel Writer緩衝區flush間隔。 | Long | 否 | 30000(30秒) | 先將資料寫入緩衝區,待緩衝區滿或達到 flushIntervalMs 間隔後,批量寫入目標表。
單位為毫秒。 說明 本參數可以與batchSize一同使用,滿足任一條件即會Flush資料。 |
batchSize | MaxCompute Tunnel Writer緩衝區flush的大小。 | Long | 否 | 67108864(64 MB) | 單位為位元組。 寫入記錄時,先將資料存放區到MaxCompute的緩衝區中,等緩衝區達到一定大小(batchSize),再把緩衝區裡的資料寫到目標MaxCompute表。 說明 本參數可以與flushIntervalMs一同使用,滿足任一條件即會Flush資料。 |
numFlushThreads | MaxCompute Tunnel Writer緩衝區flush的線程數。 | Integer | 否 | 1 | 每個MaxCompute Sink並發將建立numFlushThreads個線程用於flush資料。當該值大於1時,將允許不同分區的資料並發Flush,提升Flush的效率。 |
slotNum | MaxCompute Tunnel Writer使用的slot數。 | Integer | 否 | 0 | slot數的限制請參見Data Transmission Service概述。 |
dynamicPartitionLimit | 寫入動態分區的最大數量。 | Integer | 否 | 100 | 當結果表在兩次Checkpoint之間寫入的動態分區數量超過了dynamicPartitionLimit,則會出現報錯 重要 由於一次性寫入大量分區會給MaxCompute服務帶來一定壓力,同時也會導致結果表flush和作業Checkpoint變慢。因此當報錯出現時,您需要確認是否需要寫入這麼多分區。如果確實需要,需要手動調大dynamicPartitionLimit參數。 |
retryTimes | 向MaxCompute伺服器請求最大重試次數。 | Integer | 否 | 3 | 建立session、提交session、flush資料時可能存在短暫的MaxCompute服務停用情況,會根據該配置進行重試。 |
sleepMillis | 稍候再試時間。 | Integer | 否 | 1000 | 單位為毫秒。 |
enableUpsert | 是否使用MaxCompute Upsert Tunnel上傳資料。 | Boolean | 否 | false | 參數取值如下:
重要
|
upsertAsyncCommit | Upsert模式下在提交session時是否使用非同步模式。 | Boolean | 否 | false | 參數取值如下:
說明 僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。 |
upsertCommitTimeoutMs | Upsert模式下提交session逾時時間。 | Integer | 否 | 120000 (120秒) | 單位毫秒。 說明 僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。 |
sink.operation | 寫入Delta Table時的寫入模式。 | String | 否 | insert | 參數取值如下:
說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
sink.parallelism | 寫入Delta Table時的並行度 | Integer | 否 | None |
重要 確保Delta Table表屬性 write.bucket.num 是該配置值的整數倍,這樣可以獲得最佳的寫入效能,並且能夠最有效地節省 Sink 節點記憶體。 |
sink.file-cached.enable | 寫入Delta table動態分區時,是否使用檔案快取模式。 | Boolean | 否 | false | 參數取值如下:
使用檔案快取模式能夠減少寫入服務端的小檔案數量,但是寫出資料的延遲更高。在結果表並行度較高時建議使用檔案快取模式。 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
sink.file-cached.writer.num | 檔案快取模式下,單個Task上傳資料的並發數。 | Integer | 否 | 16 |
說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
sink.bucket.check-interval | 檔案快取模式下,檢查檔案大小的周期,單位:毫秒(ms)。 | Integer | 否 | 60000 | 僅在設定了 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
sink.file-cached.rolling.max-size | 檔案快取模式下,單個快取檔案的最大值。 | MemorySize | 否 | 16 M |
說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
sink.file-cached.memory | 檔案快取模式下,寫入檔案使用的最大堆外記憶體大小。 | MemorySize | 否 | 64 M | 僅在設定了 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
sink.file-cached.memory.segment-size | 檔案快取模式下,寫入檔案的使用的buffer大小。 | MemorySize | 否 | 128 KB | 僅在設定了 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
sink.file-cached.flush.always | 檔案快取模式下,寫入檔案是否使用緩衝。 | Boolean | 否 | true | 僅在設定了 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
sink.file-cached.write.max-retries | 檔案快取模式下,上傳資料的重試次數。 | Integer | 否 | 3 | 僅在設定了 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
upsert.writer.max-retries | Upsert Writer寫入Bucket失敗後的重試次數。 | Integer | 否 | 3 | 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
upsert.writer.buffer-size | 單個Upsert Writer資料在Flink中的緩衝大小。 | MemorySize | 否 | 64 m |
說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
upsert.writer.bucket.buffer-size | 單個Bucket資料在Flink中的緩衝大小。 | MemorySize | 否 | 1 m | 當叢集記憶體資源緊張時,可以減小該參數值。 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
upsert.write.bucket.num | 寫入表的bucket數量。 | Integer | 是 | None | 必須與寫入Delta Table表的 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
upsert.write.slot-num | 單個Session使用Tunnel slot數量。 | Integer | 否 | 1 | 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
upsert.commit.max-retries | Upsert Session Commit重試次數。 | Integer | 否 | 3 | 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
upsert.commit.thread-num | Upsert Session Commit的並行度。 | Integer | 否 | 16 | 不建議將此參數值調整得過大,因為當同時進行的提交並發數越多時,會導致資源消耗增加,可能導致效能問題或資源過度消耗。 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
upsert.commit.timeout | Upsert Session Commit等待逾時時間,單位:秒(s)。 | Integer | 否 | 600 | 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
upsert.flush.concurrent | 限制單個分區允許同時寫入的最大Bucket數。 | Integer | 否 | 2 | 每當一個bucket的資料重新整理時,將會佔用一個Tunnel Slot資源。 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
insert.commit.thread-num | Commit Session的並行度。 | Integer | 否 | 16 | 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
insert.arrow-writer.enable | 是否使用Arrow格式。 | Boolean | 否 | false | 參數取值如下:
說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
insert.arrow-writer.batch-size | Arrow Batch的最大行數。 | Integer | 否 | 512 | 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
insert.arrow-writer.flush-interval | Writer Flush間隔,單位毫秒(ms)。 | Integer | 否 | 100000 | 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
insert.writer.buffer-size | 使用Buffered Writer的緩衝大小。 | MemorySize | 否 | 64 M | 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。 |
upsert.partial-column.enable | 是否僅更新部分列。 | Boolean | 否 | false | 只在結果表類型為Delta Table時生效,詳情請參見部分列更新。 參數取值如下:
根據結果表是否存在更新資料的主鍵,資料寫入分以下幾種情況:
說明 僅Realtime Compute引擎VVR 8.0.11及以上版本支援該參數。 |
維表專屬
MaxCompute維表在作業啟動時從指定的分區拉取全量資料,partition參數支援使用max_pt()等函數。當緩衝到期重新載入時會重新解析partition參數拉取最新的分區,使用max_two_pt()時維表可拉取兩個分區,其他情況下只支援指定單個分區。
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
cache | 緩衝策略。 | String | 是 | 無 | 目前MaxCompute維表僅支援 ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表查詢都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。 說明
|
cacheSize | 最多緩衝的資料條數。 | Long | 否 | 100000 | 如果維表資料量超過了cacheSize,則會出現報錯 重要 由於維表資料量太大會佔用大量JVM堆記憶體,同時也會讓作業啟動和維表更新變慢,因此您需要確認是否需要緩衝這麼多資料,如果確實需要,需要手動調大該參數。 |
cacheTTLMs | 緩衝逾時時間,也就是緩衝更新的間隔時間。 | Long | 否 | Long.MAX_VALUE(相當於永不更新) | 單位為毫秒。 |
cacheReloadTimeBlackList | 更新時間黑名單。在該參數規定的時間段內不會更新緩衝。 | String | 否 | 無 | 用於防止緩衝在關鍵時間段(例如活動流量峰值期間)更新導致作業不穩定。填寫方式詳情請參見如何填寫CacheReloadTimeBlackList參數?。 |
maxLoadRetries | 緩衝更新時(包含作業啟動時初次拉取資料)最多嘗試次數,超過該次數後作業運行失敗。 | Integer | 否 | 10 | 無。 |
類型映射
MaxCompute支援的類型參見2.0資料類型版本。
MaxCompute類型 | Flink類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
TIMESTAMP_NTZ | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
當MaxCompute物理表中同時存在嵌套的複合類型欄位(ARRAY、MAP或STRUCT)和JSON類型欄位時,需要在建立MaxCompute物理表時指定tblproperties('columnar.nested.type'='true'),才能被Flink正確讀寫。
資料攝入(公測中)
MaxCompute連接器可以用於資料攝入YAML作業開發,作為目標端寫入。
使用限制
僅Realtime Compute引擎VVR 11.1及以上版本支援。
文法結構
source:
type: xxx
sink:
type: maxcompute
name: MaxComputeSink
access-id: ${your_accessId}
access-key: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
buckets-num: 8配置項
配置項 | 是否必填 | 預設值 | 類型 | 描述 |
type | 是 | 無 | String | 指定要使用的連接器,這裡需要設定成 |
name | 否 | 無 | String | Sink的名稱。 |
access-id | 是 | 無 | String | 阿里雲帳號或RAM使用者的AccessKey ID。您可以進入AccessKey管理頁面擷取AccessKey ID。 |
access-key | 是 | 無 | String | AccessKey ID對應的AccessKey Secret。 |
endpoint | 是 | 無 | String | MaxCompute服務的串連地址。您需要根據建立MaxCompute專案時選擇的地區以及網路連接方式配置Endpoint。各地區及網路對應的Endpoint值,請參見 Endpoint。 |
project | 是 | 無 | String | MaxCompute專案名稱。您可以登入MaxCompute控制台,在工作區>專案管理頁面擷取MaxCompute專案名稱。 |
tunnel.endpoint | 否 | 無 | String | MaxCompute Tunnel服務的串連地址,通常這項配置可以根據指定的專案所在的地區進行自動路由。僅在使用代理等特殊網路環境下使用該配置。 |
quota.name | 否 | 無 | String | MaxCompute資料轉送使用的獨享資源群組名稱,如不指定該配置,則使用共用資源組。詳情可以參見購買與使用獨享Data Transmission Service資源群組。 |
sts-token | 否 | 無 | String | 當使用RAM角色頒發的短時有效存取權杖(STS Token)進行鑒權時,需要指定該參數。 |
buckets-num | 否 | 16 | Integer | 自動建立MaxCompute Delta表時使用的桶數。使用方式請參見近即時數倉概述。 |
compress.algorithm | 否 | zlib | String | 寫入MaxCompute時使用的資料壓縮演算法,當前支援 |
total.buffer-size | 否 | 64MB | String | 記憶體中緩衝的資料量大小,單位為分區級(非分區表單位為表級),不同分區(表)的緩衝區相互獨立,達到閾值後資料寫入到MaxCompute。 |
bucket.buffer-size | 否 | 4MB | String | 記憶體中緩衝的資料量大小,單位為桶級,僅寫入Delta表時生效。不同資料桶的緩衝區相互獨立,達到閾值後將該桶資料寫入到MaxCompute。 |
commit.thread-num | 否 | 16 | Integer | Checkpoint階段,能夠同時處理的分區(表)數量。 |
flush.concurrent-num | 否 | 4 | Integer | 寫入資料到MaxCompute時,能夠同時寫入的桶數量。僅寫入Delta表時生效。 |
表位置映射
連接器自動建表時,使用如下映射關係,將源表的位置資訊映射到MaxCompute表中。
當MaxCompute專案不支援Schema模型時,以上遊MySQL為例,每個同步任務僅能同步一個MySQL Database。(其他資料來源同理,連接器Connector會忽略tableId.namespace資訊)。
資料攝入作業中對象 | MaxCompute位置 | MySQL位置 |
配置中的Project參數 | Project | none |
TableId.namespace | Schema(僅當MaxCompute專案支援Schema模型時,如不支援,將忽略該配置) | Database |
TableId.tableName | Table | Table |
類型映射
CDC類型 | MaxCompute類型 |
CHAR | STRING |
VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |
使用樣本
SQL
源表示例
全量讀取
預設情況下源表為全量模式,讀取partition參數中指定的分區。
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=201809*'
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT
cid,
COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;增量讀取
從指定的startPartition開始增量讀取。
CREATE TEMPORARY TABLE odps_source (
cid VARCHAR,
rt DOUBLE
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpointName>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 從20180905對應分區開始讀取
);
CREATE TEMPORARY TABLE blackhole_sink (
cid VARCHAR,
invoke_count BIGINT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;結果表示例
寫入固定分區
指定partition固定分區值。
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905' -- 寫入固定分區ds=20180905。
);
INSERT INTO odps_sink
SELECT
id, len, content
FROM datagen_source;寫入動態分區
根據表分區欄位指定partition。
CREATE TEMPORARY TABLE datagen_source (
id INT,
len INT,
content VARCHAR,
c TIMESTAMP
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_sink (
id INT,
len INT,
content VARCHAR,
ds VARCHAR --需要顯式聲明動態分區列。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds' --不寫分區的值,表示根據ds欄位的值寫入不同分區。
);
INSERT INTO odps_sink
SELECT
id,
len,
content,
DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;維表示例
一對一維表
一對一維表需要聲明主鍵。
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR,
PRIMARY KEY (k) NOT ENFORCED -- 一對一維表需要聲明主鍵。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;一對多維表
一對多維表無需聲明主鍵。
CREATE TEMPORARY TABLE datagen_source (
k INT,
v VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE odps_dim (
k INT,
v VARCHAR
-- 一對多維表無需聲明主鍵。
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'project' = '<yourProjectName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=20180905',
'cache' = 'ALL'
);
CREATE TEMPORARY TABLE blackhole_sink (
k VARCHAR,
v1 VARCHAR,
v2 VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;DataStream
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法。
為了保護智慧財產權,從Realtime Compute引擎VVR6.0.6版本起,此連接器在本地調試單次運行作業的時間為30分鐘,30分鐘後作業會報錯並退出。本地運行和調試包含MaxCompute連接器的作業,請參見本地運行和調試包含連接器的作業。
暫不支援讀取Delta Table,即建表時指定了
primary key和transactional=true的表,詳情請參見基本概念。
在DataStream中使用MaxCompute連接器推薦使用SQL聲明MaxCompute表,通過Table/DataStream相互轉換來串連MaxCompute表和資料流
串連源表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); 串連結果表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();XML
MaxCompute連接器的Maven依賴包含了構建全量源表、增量源表、結果表和維表的所需要的類。Maven中央庫中已經放置了MaxCompute DataStream連接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>