同步資料到MaxCompute
準備工作
1.建立MaxCompute表
DataHub支援將資料同步到MaxCompute對應的資料表中,同時支援分區表和非分區表,一般情況下推薦使用者使用分區表進行資料同步以方便MaxCompute資料處理。
目前DataHub支援將TUPLE和BLOB的資料同步到MaxCompute資料表中。
1)針對TUPLE類型topic,MaxCompute目標表資料類型需要和DataHub資料類型相匹配,具體的資料類型映射關係如下:
MaxCompute | DataHub |
BIGINT | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
TINYINT | TINIINT |
SMALLINT | SMALLINT |
INT | INTEGER |
FLOAT | FLOAT |
MAP | 不支援 |
ARRAY | 不支援 |
由於目前DataHub並不能完全支援MaxCompute所有的資料類型,請使用者盡量根據DataHub資料類型建立MaxCompute表結構。
2)針對BLOB資料類型,需要要求MaxCompute表結構僅需要包含一列STRING類型的column即可,DataHub預設會將資料同步到該column中。
DataHub | MaxCompute |
BLOB | STRING |
3)同時為了方便資料追蹤和問題排查,建議使用者在建立MaxCompute表結構時,增加一列__rowkey__ STRING欄位,DataHub會自動將DataHub對應資料的trace資訊同步到該列中,以方便後續資料排查。
2.準備同步任務帳號並授權
1)建立同步MaxCompute任務時,需要使用者手動填寫訪問MaxCompute表的帳號資訊,請使用者確保填入有效帳號資訊(一般情況下採用MaxCompute子帳號即可)。
2)需要給該帳號授予訪問MaxCompute表的響應許可權,具體許可權包括CreateInstance、Describe、Alter以及Update許可權。
使用者可以使用DataWorks管控台進行MaxCompute對應表的許可權管理,參考配置MaxCompute引擎許可權配置MaxCompute引擎許可權,也可以選擇使用MaxCompute的命令列工具進行授權,參考MaxCompute使用及授權管理。
3.確認TimestampUnit單位
1) Connector中TimestampUnit的作用,就是將資料中TIMESTAMP類型的資料(如果有),以TimestampUnit為單位進行轉換後寫入到下遊系統的日期類型(如datetime類型)。
2)如果TIMESTAMP列寫入的是以秒為單位的值,那建立Connector的時候TimestampUnit就選擇“SECOND”;如果寫入的是以毫秒為單位的值,那就選擇“MILLISECOND”;如果寫入的是以微秒為單位的值,那就選擇“MICROSECOND”
注意事項
由於MaxCompute目前的寫入標準原因,分區數越多就會導致DataHub同步資料越慢。因此,在建立MaxCompute同步任務時,請儘可能的控制分區數,尤其是
USER_DEFINE同步模式。同一分區的資料越連續越好,不要頻繁的分區跳變。
同步模式控制建立分區時,請不要建立過多的分區數。
建立同步任務
依次進入
專案列表/Project詳情/Topic詳情頁面。點擊右上方的
+ 同步按鈕進行同步任務建立。

選擇MaxCompute類型作業,如下圖所示:
1)TUPLE類型同步
2)BLOB類型同步
部分配置說明:
下面羅列了部分管控台建立同步任務的配置說明,更多更靈活的操作請參考SDK使用。
匯入欄位
DataHub可以根據使用者佈建將部分column內容同步到MaxCompute表中。
分區模式
分區模式決定了將資料寫入到MaxCompute哪個分區中,目前DataHub支援以下分區方式:
分區模式 | 分區依據 | 支援Topic類型 | 說明 |
USER_DEFINE | Record中的分區列(和MaxCompute的分區欄位同名)的value值 | TUPLE | (1). DataHub schema中必須包含MaxCompute分區欄位 (2). 該列值必須為 |
SYSTEM_TIME | Record寫入DataHub的時間 | TUPLE / BLOB | (1). 分區配置中設定MaxCompute分區的時間轉換Format格式 (2). 設定時區資訊 |
EVENT_TIME | Record中的 | TUPLE | (1). 分區配置中設定MaxCompute分區的時間轉換Format格式 (2). 設定時區資訊 |
META_TIME | Record的屬性欄位 | TUPLE / BLOB | (1). 分區配置中設定MaxCompute分區的時間轉換Format格式 (2). 設定時區資訊 |
其中SYSTEM_TIME、EVENT_TIME和META_TIME均是根據時間Timestamp和時區配置來進行MaxCompute分區的轉換過程,單位預設為微秒。
分區配置決定了根據時間戳記轉換MaxCompute分區時的相關配置。目前管控台預設固定的MaxCompute分區格式,分區配置對應為
分區 | 時間Format | 說明 |
ds | %Y%m%d | day |
hh | %H | hour |
mm | %M | minute |
分區間隔決定了根據時間戳記轉換MaxCompute分區時所採用的時間間隔。時間範圍是
15分鐘 ~ 1440分鐘(1天),跳變間隔15分鐘。時區資訊(TimeZone)時區資訊決定了根據時間戳記轉換MaxCompute分區時所採用的轉換時區。
分隔字元BLOB資料同步時,可以指定16進位分隔字元來決定是否對BLOB資料分割後再同步MaxCompute,比如
0A表示\n(分行符號)。Base64編碼DataHub BLOB預設儲存位元據,而MaxCompute對應的同步列為STRING類型,因此管控台建立同步任務時,預設採用base64編碼後進行同步,更多定製化需求請參考SDK實現。
查看同步任務
可以點擊對應connector的詳情頁面查看同步任務的運行狀態和點位等資訊, 包含同步點位、同步狀態以及重啟和停止等操作,如下圖所示:
同步樣本
1. USER_DEFINE同步模式
建立DataHub Topic
備忘: topic schema中必須需要包含MaxCompute分區欄位,類型為STRING,如下圖所示:

向DataHub Topic寫入資料,可以使用datahub-sdk進行資料寫入
測試過程中使用SDK寫入幾條條資料,其中[ds,hh,mm]分別為:[20210304,01,15]和[20210304,02,15],資料內容如下所示:

3 建立同步任務
USER_DEFINE分區模式可以通過在同步中設定分區配置欄位,如果MaxCompute沒有對應的表,可自動建立。
這裡匯入欄位中設定匯入f1、f2欄位,不同步f3欄位。
4 確認同步資料
可以從DataHub管控台查看對應同步任務的同步資訊。
查詢MaxCompute資料結果,結果如下:
可以看到在USER_DEFINE模式下,DataHub會根據MaxCompute分組欄位所對應的value將DataHub中的資料同步到對應的分區中。
2. SYSTEM_TIME同步模式
建立DataHub Topic
備忘:由於分區是根據寫入DataHub時間來計算的,因此topic schema只需包含資料欄位,不需要包含分區欄位,如下圖所示:

向DataHub Topic寫入資料,可以使用datahub-sdk進行資料寫入。
測試過程中使用SDK寫入幾條資料,DataHub目前對應的寫入時間為
2021-03-04 14:02:45,資料內容如下所示:
建立同步任務

請注意分區配置需要和MaxCompute表分區一致。
4 確認同步資料
可以從DataHub管控台查看對應同步任務的同步資訊,如DoneTime。
查詢MaxCompute資料結果,結果如下:
可以看到在SYSTEM_TIME模式下,DataHub會根據資料寫入DataHub的時間將DataHub中的資料同步到對應的分區中。
常見問題
同步到MaxCompute timestamp欄位時間變為1970-01-19
原因:DataHub同步MaxCompute預設時間戳記單位為微秒,使用者寫入時間戳記為毫秒解決方案:寫入DataHub時間戳記以微秒方式寫入。