全部產品
Search
文件中心

Realtime Compute for Apache Flink:MaxCompute

更新時間:Jan 21, 2026

本文為您介紹MaxCompute連接器的文法結構、WITH參數和使用樣本等。

背景資訊

MaxComputeMaxCompute(原名ODPS)是一種快速、完全託管的EB級資料倉儲解決方案,致力於批量結構化資料的儲存和計算,提供海量資料倉儲的解決方案及分析建模服務。

MaxCompute連接器支援的資訊如下。

類別

詳情

支援類型

源表、維表和結果表

運行模式

流模式和批模式

資料格式

暫不支援

特有監控指標

監控指標

  • 源表

    numRecordsIn

    numRecordsInPerSecond

    numBytesIn

    numBytesInPerSecond

  • 結果表

    numRecordsOut

    numRecordsOutPerSecond

    numBytesOut

    numBytesOutPerSecond

  • 維表

    dim.odps.cacheSize

說明

指標含義詳情,請參見監控指標說明

API種類

Datastream和SQL

是否支援更新或刪除結果表資料

Batch Tunnel和Stream Tunnel模式僅支援插入資料,Upsert Tunnel模式支援插入、更新和刪除資料。

前提條件

已建立MaxCompute表,詳情請參見建立表

使用限制

  • MaxCompute連接器僅支援At Least Once語義。

    說明

    At Least Once語義會保證資料不缺失,但在少部分情況下,可能會將重複資料寫入MaxCompute。不同的MaxCompute Tunnel出現重複資料的情況不同,MaxCompute Tunnel詳情請參見如何選擇資料通道?

  • 預設情況下源表為全量模式,僅會讀取partition參數中指定的分區,在讀完所有資料後結束運行,狀態轉換為finished,不會監控是否有新分區產生。

    如果您需要持續監控新分區,請通過WITH參數中指定startPartition使用增量源表模式。

    說明
    • 維表每次更新時都會檢查最新分區,不受這一限制。

    • 在源表開始運行後,向分區裡添加的新資料不會被讀取,請在分區資料完整的情況下運行作業。

SQL

MaxCompute連接器可以在SQL作業中使用,作為源表,維表或者結果表。

文法結構

CREATE TEMPORARY TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'schemaName' = '<yourSchemaName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

WITH參數

通用

參數

說明

資料類型

是否必填

預設值

備忘

connector

表類型。

String

固定值為odps。

endpoint

MaxCompute服務地址。

String

請參見Endpoint

tunnelEndpoint

MaxCompute Tunnel服務的串連地址。

String

請參見Endpoint

說明

如果未填寫,MaxCompute會根據內部的負載平衡服務分配Tunnel的串連。

project

MaxCompute專案名稱。

String

無。

schemaName

MaxCompute Schema名稱。

String

僅當MaxCompute專案開啟Schema功能時,需填寫該值為MaxCompute表所屬Schema名,詳情請參見 Schema操作

說明

僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

tableName

MaxCompute表名。

String

無。

accessId

MaxCompute AccessKey ID。

String

詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

重要

為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數

accessKey

MaxCompute AccessKey Secret。

String

partition

MaxCompute分區名。

String

對於非分區表和增量源表無需填寫。

compressAlgorithm

MaxCompute Tunnel使用的壓縮演算法。

String

SNAPPY

參數取值如下:

  • RAW(無壓縮)

  • ZLIB

  • SNAPPY

    SNAPPY相比ZLIB能帶來明顯的吞吐提升。在測試情境下,吞吐提升約50%。

quotaName

MaxCompute獨享Data Transmission Service的quota名稱。

String

設定該值來使用獨享的MaxComputeData Transmission Service。

重要
  • 僅Realtime Compute引擎VVR 8.0.3及以上版本支援該參數。

  • 設定該值時,必須刪除tunnelEndpoint參數,否則仍將使用tunnelEndpoint中指定的資料通道。

源表專屬

參數

說明

資料類型

是否必填

預設值

備忘

maxPartitionCount

可以讀取的最大分區數量。

Integer

100

如果讀取的分區數量超過了該參數,則會出現報錯The number of matched partitions exceeds the default limit

重要

一次性讀取過多分區會增加 MaxCompute 負載並拖慢作業啟動,建議確認partition參數是否誤配。如確需讀取大量分區,請手動調大maxPartitionCount

useArrow

是否使用Arrow格式讀取資料。

Boolean

false

使用Arrow格式能夠調用MaxCompute的Storage API。

重要
  • 僅在批作業中生效。

  • 僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

splitSize

在使用Arrow格式讀取資料時,一次拉取的資料大小。

MemorySize

256 MB

僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

重要

僅在批作業中生效。

compressCodec

在使用Arrow格式讀取資料時,採用的壓縮演算法。

String

""

參數取值如下:

  • "" (無壓縮)

  • ZSTD

  • LZ4_FRAME

指定壓縮演算法相比無壓縮能帶來一定的吞吐提升。

重要
  • 僅在批作業中生效。

  • 僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

dynamicLoadBalance

是否允許動態分配分區。

Boolean

false

參數取值如下:

  • true:允許

  • false:不允許

允許動態分配分區能夠發揮Flink不同節點的處理效能,減少源表整體讀取時間,但也會導致不同節點讀取總資料量不一致,出現資料扭曲情況。

重要
  • 僅在批作業中生效。

  • 僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

增量源表專屬

增量源表通過間歇輪詢MaxCompute伺服器擷取所有的分區資訊來發現新增的分區,讀取新分區時要求分區內資料已寫入完畢,詳情參見增量MaxCompute源表監聽到新分區時,如果該分區還有資料沒有寫完,如何處理?。通過startPartition可以指定起始點位,但注意唯讀取字典序大於等於起始點位的分區,例如分區year=2023,month=10字典序小於分區year=2023,month=9,對於這種類型的分區聲明可以通過加0補齊的方式來保證字典序正確,例如year=2023,month=09

參數

說明

資料類型

是否必填

預設值

備忘

startPartition

增量讀取的起始MaxCompute分區點位(包含)。

String

  • 使用該參數後啟用增量源表模式,將忽略partition參數。

  • 多級分區必須按分區層級從大到小聲明每個分區列的值。

說明

startPartition參數詳情,請參見如何填寫增量MaxCompute的startPartition參數?

subscribeIntervalInSec

輪詢MaxCompute擷取分區列表的時間間隔。

Integer

30

單位為秒。

modifiedTableOperation

讀取分區過程中遇到分區資料被修改時的處理。

Enum (NONE, SKIP)

NONE

由於下載session被儲存在檢查點中,每次從檢查點恢複時嘗試從該session恢複讀取進度,而該session由於分區資料被修改不可用,Flink任務會陷入不斷重啟。此時您可以設定該參數,參數取值如下:

  • NONE:需要您修改startPartition參數使其大於不可用分區,並從無狀態啟動作業。

  • SKIP:若不希望無狀態啟動,可將模式修改為SKIP,Flink嘗試從檢查點恢複session時將跳過停用分區。

重要
  • 僅Realtime Compute引擎VVR 8.0.3及以上版本支援該參數。

  • NONE和SKIP模式下,被修改分區中已讀取的資料不會被撤回,未讀取的資料將不會被讀取。

結果表專屬

參數

說明

資料類型

是否必填

預設值

備忘

useStreamTunnel

是否使用MaxCompute Stream Tunnel上傳資料。

Boolean

false

參數取值如下:

  • true:使用MaxCompute Stream Tunnel上傳資料。

  • false:使用MaxCompute Batch Tunnel上傳資料。

說明

資料通道選擇詳情請參見如何選擇資料通道?

flushIntervalMs

MaxCompute Tunnel Writer緩衝區flush間隔。

Long

30000(30秒)

先將資料寫入緩衝區,待緩衝區滿或達到 flushIntervalMs 間隔後,批量寫入目標表。

  • Stream Tunnel :資料flush後立即可見。

  • Batch Tunnel :需等待Checkpoint完成後才可見,建議將 flushIntervalMs 設為 0 ,關閉定時 flush,避免延遲。

單位為毫秒。

說明

本參數可以與batchSize一同使用,滿足任一條件即會Flush資料。

batchSize

MaxCompute Tunnel Writer緩衝區flush的大小。

Long

67108864(64 MB)

單位為位元組。

寫入記錄時,先將資料存放區到MaxCompute的緩衝區中,等緩衝區達到一定大小(batchSize),再把緩衝區裡的資料寫到目標MaxCompute表。

說明

本參數可以與flushIntervalMs一同使用,滿足任一條件即會Flush資料。

numFlushThreads

MaxCompute Tunnel Writer緩衝區flush的線程數。

Integer

1

每個MaxCompute Sink並發將建立numFlushThreads個線程用於flush資料。當該值大於1時,將允許不同分區的資料並發Flush,提升Flush的效率。

slotNum

MaxCompute Tunnel Writer使用的slot數。

Integer

0

slot數的限制請參見Data Transmission Service概述

dynamicPartitionLimit

寫入動態分區的最大數量。

Integer

100

當結果表在兩次Checkpoint之間寫入的動態分區數量超過了dynamicPartitionLimit,則會出現報錯Too many dynamic partitions

重要

由於一次性寫入大量分區會給MaxCompute服務帶來一定壓力,同時也會導致結果表flush和作業Checkpoint變慢。因此當報錯出現時,您需要確認是否需要寫入這麼多分區。如果確實需要,需要手動調大dynamicPartitionLimit參數。

retryTimes

向MaxCompute伺服器請求最大重試次數。

Integer

3

建立session、提交session、flush資料時可能存在短暫的MaxCompute服務停用情況,會根據該配置進行重試。

sleepMillis

稍候再試時間。

Integer

1000

單位為毫秒。

enableUpsert

是否使用MaxCompute Upsert Tunnel上傳資料。

Boolean

false

參數取值如下:

  • true:使用Upsert Tunnel,處理Flink中的INSERT、UPDATE_AFTER和DELETE資料。

  • false:根據useStreamTunnel參數使用Batch Tunnel或Stream Tunnel,處理Flink中的INSERT、UPDATE_AFTER資料。

重要
  • 若Upsert模式下MaxCompute sink提交時出現報錯、失敗、耗時間長度等情況,建議限制sink節點的並發數在10以內。

  • 僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

upsertAsyncCommit

Upsert模式下在提交session時是否使用非同步模式。

Boolean

false

參數取值如下:

  • true:使用非同步模式,提交耗時更短,但提交完成時寫入的資料非立即可讀。

  • false:預設為同步模式,提交時將等待服務側處理完session。

說明

僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

upsertCommitTimeoutMs

Upsert模式下提交session逾時時間。

Integer

120000

(120秒)

單位毫秒。

說明

僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。

sink.operation

寫入Delta Table時的寫入模式。

String

insert

參數取值如下:

  • insert: 寫入資料模式為追加

  • upsert:寫入資料模式為更新

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

sink.parallelism

寫入Delta Table時的並行度

Integer

None

  • 寫入的並行度,如果不設定,則預設使用上遊資料並行度。

  • 僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

重要

確保Delta Table表屬性 write.bucket.num 是該配置值的整數倍,這樣可以獲得最佳的寫入效能,並且能夠最有效地節省 Sink 節點記憶體。

sink.file-cached.enable

寫入Delta table動態分區時,是否使用檔案快取模式。

Boolean

false

參數取值如下:

  • true:使用檔案快取模式

  • false:不使用檔案快取模式

使用檔案快取模式能夠減少寫入服務端的小檔案數量,但是寫出資料的延遲更高。在結果表並行度較高時建議使用檔案快取模式。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

sink.file-cached.writer.num

檔案快取模式下,單個Task上傳資料的並發數。

Integer

16

  • 僅在設定了sink.file-cached.enable為 true 的情況下生效。

  • 建議不要大幅提升該參數值,因為當同時寫入的分區數量過多時,容易導致記憶體溢出(OOM)。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

sink.bucket.check-interval

檔案快取模式下,檢查檔案大小的周期,單位:毫秒(ms)。

Integer

60000

僅在設定了sink.file-cached.enable 為 true 的情況下生效。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

sink.file-cached.rolling.max-size

檔案快取模式下,單個快取檔案的最大值。

MemorySize

16 M

  • 僅在設定了sink.file-cached.enable為 true 的情況下生效。

  • 若檔案大小超過該值,會將該檔案資料上傳到服務端。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

sink.file-cached.memory

檔案快取模式下,寫入檔案使用的最大堆外記憶體大小。

MemorySize

64 M

僅在設定了sink.file-cached.enable為 true 的情況下生效。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

sink.file-cached.memory.segment-size

檔案快取模式下,寫入檔案的使用的buffer大小。

MemorySize

128 KB

僅在設定了sink.file-cached.enable為 true 的情況下生效。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

sink.file-cached.flush.always

檔案快取模式下,寫入檔案是否使用緩衝。

Boolean

true

僅在設定了sink.file-cached.enable為 true 的情況下生效。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

sink.file-cached.write.max-retries

檔案快取模式下,上傳資料的重試次數。

Integer

3

僅在設定了sink.file-cached.enable為 true 的情況下生效。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

upsert.writer.max-retries

Upsert Writer寫入Bucket失敗後的重試次數。

Integer

3

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

upsert.writer.buffer-size

單個Upsert Writer資料在Flink中的緩衝大小。

MemorySize

64 m

  • 當所有Bucket的緩衝區大小總和達到預設閾值時,系統將自動觸發重新整理操作,將資料更新到伺服器端。

  • 一個upsert writer裡會同時寫入多個Bucket,建議提高該值,以提升寫入效率。

  • 若寫入分區較多時,會存在引發記憶體OOM風險,可考慮降低該參數值。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

upsert.writer.bucket.buffer-size

單個Bucket資料在Flink中的緩衝大小。

MemorySize

1 m

當叢集記憶體資源緊張時,可以減小該參數值。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

upsert.write.bucket.num

寫入表的bucket數量。

Integer

None

必須與寫入Delta Table表的write.bucket.num屬性值一致。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

upsert.write.slot-num

單個Session使用Tunnel slot數量。

Integer

1

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

upsert.commit.max-retries

Upsert Session Commit重試次數。

Integer

3

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

upsert.commit.thread-num

Upsert Session Commit的並行度。

Integer

16

不建議將此參數值調整得過大,因為當同時進行的提交並發數越多時,會導致資源消耗增加,可能導致效能問題或資源過度消耗。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

upsert.commit.timeout

Upsert Session Commit等待逾時時間,單位:秒(s)。

Integer

600

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

upsert.flush.concurrent

限制單個分區允許同時寫入的最大Bucket數。

Integer

2

每當一個bucket的資料重新整理時,將會佔用一個Tunnel Slot資源。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

insert.commit.thread-num

Commit Session的並行度。

Integer

16

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

insert.arrow-writer.enable

是否使用Arrow格式。

Boolean

false

參數取值如下:

  • true:使用Arrow模式

  • false:不使用Arrow模式

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

insert.arrow-writer.batch-size

Arrow Batch的最大行數。

Integer

512

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

insert.arrow-writer.flush-interval

Writer Flush間隔,單位毫秒(ms)。

Integer

100000

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

insert.writer.buffer-size

使用Buffered Writer的緩衝大小。

MemorySize

64 M

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援該參數。

upsert.partial-column.enable

是否僅更新部分列。

Boolean

false

只在結果表類型為Delta Table時生效,詳情請參見部分列更新

參數取值如下:

  • true:僅更新部分列

  • false:更新全部列

根據結果表是否存在更新資料的主鍵,資料寫入分以下幾種情況:

  • 結果表存在相同主鍵的資料,按照主鍵更新這條資料,使用指定列不為null的數值進行更新。

  • 結果表不存在相同主鍵的資料,按照主鍵新增一條資料,插入指定列的數值,對於指定列之外的列插入null

說明

僅Realtime Compute引擎VVR 8.0.11及以上版本支援該參數。

維表專屬

MaxCompute維表在作業啟動時從指定的分區拉取全量資料,partition參數支援使用max_pt()等函數。當緩衝到期重新載入時會重新解析partition參數拉取最新的分區,使用max_two_pt()時維表可拉取兩個分區,其他情況下只支援指定單個分區。

參數

說明

資料類型

是否必填

預設值

備忘

cache

緩衝策略。

String

目前MaxCompute維表僅支援ALL策略,必須顯式聲明。 適用於遠端資料表資料量小且MISS KEY(源表資料和維表JOIN時,ON條件無法關聯)特別多的情境。

ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表查詢都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。

說明
  • 因為系統會非同步載入維表資料,所以在使用CACHE ALL時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的至少4倍,具體值與MaxCompute儲存壓縮演算法有關。

  • 如果MaxCompute維表資料量較大,可以考慮使用SHUFFLE_HASH註解將維表資料均勻分散到各個並發中。詳情請參見如何使用維表SHUFFLE_HASH註解?

  • 在使用超大MaxCompute維表時,如果JVM頻繁GC導致作業異常,且在增加維表JOIN節點的記憶體仍無改善的情況下,建議改為支援LRU Cache策略的KV型維表,例如雲資料庫Hbase版維表。

cacheSize

最多緩衝的資料條數。

Long

100000

如果維表資料量超過了cacheSize,則會出現報錯Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit

重要

由於維表資料量太大會佔用大量JVM堆記憶體,同時也會讓作業啟動和維表更新變慢,因此您需要確認是否需要緩衝這麼多資料,如果確實需要,需要手動調大該參數。

cacheTTLMs

緩衝逾時時間,也就是緩衝更新的間隔時間。

Long

Long.MAX_VALUE(相當於永不更新)

單位為毫秒。

cacheReloadTimeBlackList

更新時間黑名單。在該參數規定的時間段內不會更新緩衝。

String

用於防止緩衝在關鍵時間段(例如活動流量峰值期間)更新導致作業不穩定。填寫方式詳情請參見如何填寫CacheReloadTimeBlackList參數?

maxLoadRetries

緩衝更新時(包含作業啟動時初次拉取資料)最多嘗試次數,超過該次數後作業運行失敗。

Integer

10

無。

類型映射

MaxCompute支援的類型參見2.0資料類型版本

MaxCompute類型

Flink類型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

TIMESTAMP_NTZ

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

重要

當MaxCompute物理表中同時存在嵌套的複合類型欄位(ARRAY、MAP或STRUCT)和JSON類型欄位時,需要在建立MaxCompute物理表時指定tblproperties('columnar.nested.type'='true'),才能被Flink正確讀寫。

資料攝入(公測中)

MaxCompute連接器可以用於資料攝入YAML作業開發,作為目標端寫入。

使用限制

僅Realtime Compute引擎VVR 11.1及以上版本支援。

文法結構

source:
  type: xxx

sink:
   type: maxcompute
   name: MaxComputeSink
   access-id: ${your_accessId}
   access-key: ${your_accessKey}
   endpoint: ${your_maxcompute_endpoint}
   project: ${your_project}
   buckets-num: 8

配置項

配置項

是否必填

預設值

類型

描述

type

String

指定要使用的連接器,這裡需要設定成 maxcompute

name

String

Sink的名稱。

access-id

String

阿里雲帳號或RAM使用者的AccessKey ID。您可以進入AccessKey管理頁面擷取AccessKey ID。

access-key

String

AccessKey ID對應的AccessKey Secret。

endpoint

String

MaxCompute服務的串連地址。您需要根據建立MaxCompute專案時選擇的地區以及網路連接方式配置Endpoint。各地區及網路對應的Endpoint值,請參見 Endpoint

project

String

MaxCompute專案名稱。您可以登入MaxCompute控制台,在工作區>專案管理頁面擷取MaxCompute專案名稱。

tunnel.endpoint

String

MaxCompute Tunnel服務的串連地址,通常這項配置可以根據指定的專案所在的地區進行自動路由。僅在使用代理等特殊網路環境下使用該配置。

quota.name

String

MaxCompute資料轉送使用的獨享資源群組名稱,如不指定該配置,則使用共用資源組。詳情可以參見購買與使用獨享Data Transmission Service資源群組

sts-token

String

當使用RAM角色頒發的短時有效存取權杖(STS Token)進行鑒權時,需要指定該參數。

buckets-num

16

Integer

自動建立MaxCompute Delta表時使用的桶數。使用方式請參見近即時數倉概述

compress.algorithm

zlib

String

寫入MaxCompute時使用的資料壓縮演算法,當前支援raw(不進行壓縮)、zlibsnappy

total.buffer-size

64MB

String

記憶體中緩衝的資料量大小,單位為分區級(非分區表單位為表級),不同分區(表)的緩衝區相互獨立,達到閾值後資料寫入到MaxCompute。

bucket.buffer-size

4MB

String

記憶體中緩衝的資料量大小,單位為桶級,僅寫入Delta表時生效。不同資料桶的緩衝區相互獨立,達到閾值後將該桶資料寫入到MaxCompute。

commit.thread-num

16

Integer

Checkpoint階段,能夠同時處理的分區(表)數量。

flush.concurrent-num

4

Integer

寫入資料到MaxCompute時,能夠同時寫入的桶數量。僅寫入Delta表時生效。

表位置映射

連接器自動建表時,使用如下映射關係,將源表的位置資訊映射到MaxCompute表中。

重要

當MaxCompute專案不支援Schema模型時,以上遊MySQL為例,每個同步任務僅能同步一個MySQL Database。(其他資料來源同理,連接器Connector會忽略tableId.namespace資訊)。

資料攝入作業中對象

MaxCompute位置

MySQL位置

配置中的Project參數

Project

none

TableId.namespace

Schema(僅當MaxCompute專案支援Schema模型時,如不支援,將忽略該配置)

Database

TableId.tableName

Table

Table

類型映射

CDC類型

MaxCompute類型

CHAR

STRING

VARCHAR

STRING

BOOLEAN

BOOLEAN

BINARY/VARBINARY

BINARY

DECIMAL

DECIMAL

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

TIME_WITHOUT_TIME_ZONE

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_NTZ

TIMESTAMP_WITH_LOCAL_TIME_ZONE

TIMESTAMP

TIMESTAMP_WITH_TIME_ZONE

TIMESTAMP

ARRAY

ARRAY

MAP

MAP

ROW

STRUCT

使用樣本

SQL

源表示例

全量讀取

預設情況下源表為全量模式,讀取partition參數中指定的分區。

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=201809*'
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT
   cid,
   COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;
增量讀取

從指定的startPartition開始增量讀取。

CREATE TEMPORARY TABLE odps_source (
  cid VARCHAR,
  rt DOUBLE
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpointName>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 從20180905對應分區開始讀取
);

CREATE TEMPORARY TABLE blackhole_sink (
  cid VARCHAR,
  invoke_count BIGINT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT cid, COUNT(*) AS invoke_count
FROM odps_source GROUP BY cid;

結果表示例

寫入固定分區

指定partition固定分區值。

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905' -- 寫入固定分區ds=20180905。
);

INSERT INTO odps_sink
SELECT
  id, len, content
FROM datagen_source;
寫入動態分區

根據表分區欄位指定partition

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  len INT,
  content VARCHAR,
  c TIMESTAMP
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_sink (
  id  INT,
  len INT,
  content VARCHAR,
  ds VARCHAR --需要顯式聲明動態分區列。
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds' --不寫分區的值,表示根據ds欄位的值寫入不同分區。
);

INSERT INTO odps_sink
SELECT
   id,
   len,
   content,
   DATE_FORMAT(c, 'yyMMdd') as ds
FROM datagen_source;

維表示例

一對一維表

一對一維表需要聲明主鍵。

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR,
  PRIMARY KEY (k) NOT ENFORCED  -- 一對一維表需要聲明主鍵。
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
一對多維表

一對多維表無需聲明主鍵。

CREATE TEMPORARY TABLE datagen_source (
  k INT,
  v VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE odps_dim (
  k INT,
  v VARCHAR
  -- 一對多維表無需聲明主鍵。
) WITH (
  'connector' = 'odps',
  'endpoint' = '<yourEndpoint>',
  'project' = '<yourProjectName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=20180905',
  'cache' = 'ALL'
);

CREATE TEMPORARY TABLE blackhole_sink (
  k VARCHAR,
  v1 VARCHAR,
  v2 VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT k, s.v, d.v
FROM datagen_source AS s
INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream

重要
  • 通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink,DataStream連接器設定方法請參見DataStream連接器使用方法

  • 為了保護智慧財產權,從Realtime Compute引擎VVR6.0.6版本起,此連接器在本地調試單次運行作業的時間為30分鐘,30分鐘後作業會報錯並退出。本地運行和調試包含MaxCompute連接器的作業,請參見本地運行和調試包含連接器的作業

  • 暫不支援讀取Delta Table,即建表時指定了primary keytransactional=true的表,詳情請參見基本概念

在DataStream中使用MaxCompute連接器推薦使用SQL聲明MaxCompute表,通過Table/DataStream相互轉換來串連MaxCompute表和資料流

串連源表

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=201809*'",
    ")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); 

串連結果表

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=20180905'",
    ")");
DataStream<Row> data = env.fromElements(
    Row.of("id0", 3.),
    Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();

XML

MaxCompute連接器的Maven依賴包含了構建全量源表、增量源表、結果表和維表的所需要的類。Maven中央庫中已經放置了MaxCompute DataStream連接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${vvr-version}</version>
</dependency>

常見問題