全部產品
Search
文件中心

DataHub:建立同步MaxCompute

更新時間:Jan 24, 2025

同步資料到MaxCompute

準備工作

1.建立MaxCompute表

DataHub支援將資料同步到MaxCompute對應的資料表中,同時支援分區表和非分區表,一般情況下推薦使用者使用分區表進行資料同步以方便MaxCompute資料處理。

目前DataHub支援將TUPLE和BLOB的資料同步到MaxCompute資料表中。

1)針對TUPLE類型topic,MaxCompute目標表資料類型需要和DataHub資料類型相匹配,具體的資料類型映射關係如下:

MaxCompute

DataHub

BIGINT

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

DATETIME

TIMESTAMP

DECIMAL

DECIMAL

TINYINT

TINIINT

SMALLINT

SMALLINT

INT

INTEGER

FLOAT

FLOAT

MAP

不支援

ARRAY

不支援

由於目前DataHub並不能完全支援MaxCompute所有的資料類型,請使用者盡量根據DataHub資料類型建立MaxCompute表結構。

2)針對BLOB資料類型,需要要求MaxCompute表結構僅需要包含一列STRING類型的column即可,DataHub預設會將資料同步到該column中。

DataHub

MaxCompute

BLOB

STRING

3)同時為了方便資料追蹤和問題排查,建議使用者在建立MaxCompute表結構時,增加一列__rowkey__ STRING欄位,DataHub會自動將DataHub對應資料的trace資訊同步到該列中,以方便後續資料排查。

2.準備同步任務帳號並授權

1)建立同步MaxCompute任務時,需要使用者手動填寫訪問MaxCompute表的帳號資訊,請使用者確保填入有效帳號資訊(一般情況下採用MaxCompute子帳號即可)。

2)需要給該帳號授予訪問MaxCompute表的響應許可權,具體許可權包括CreateInstanceDescribeAlter以及Update許可權。

使用者可以使用DataWorks管控台進行MaxCompute對應表的許可權管理,參考配置MaxCompute引擎許可權配置MaxCompute引擎許可權,也可以選擇使用MaxCompute的命令列工具進行授權,參考MaxCompute使用及授權管理

3.確認TimestampUnit單位

1) Connector中TimestampUnit的作用,就是將資料中TIMESTAMP類型的資料(如果有),以TimestampUnit為單位進行轉換後寫入到下遊系統的日期類型(如datetime類型)。

2)如果TIMESTAMP列寫入的是以秒為單位的值,那建立Connector的時候TimestampUnit就選擇“SECOND”;如果寫入的是以毫秒為單位的值,那就選擇“MILLISECOND”;如果寫入的是以微秒為單位的值,那就選擇“MICROSECOND”

注意事項

  1. 由於MaxCompute目前的寫入標準原因,分區數越多就會導致DataHub同步資料越慢。因此,在建立MaxCompute同步任務時,請儘可能的控制分區數,尤其是USER_DEFINE同步模式。

    • 同一分區的資料越連續越好,不要頻繁的分區跳變。

    • 同步模式控制建立分區時,請不要建立過多的分區數。

建立同步任務

  1. 依次進入專案列表/Project詳情/Topic詳情頁面。

  2. 點擊右上方的 + 同步按鈕進行同步任務建立。

c

  1. 選擇MaxCompute類型作業,如下圖所示:

    1)TUPLE類型同步5-22)BLOB類型同步

5-3部分配置說明

下面羅列了部分管控台建立同步任務的配置說明,更多更靈活的操作請參考SDK使用。

  1. 匯入欄位

    DataHub可以根據使用者佈建將部分column內容同步到MaxCompute表中。

  2. 分區模式

    分區模式決定了將資料寫入到MaxCompute哪個分區中,目前DataHub支援以下分區方式:

分區模式

分區依據

支援Topic類型

說明

USER_DEFINE

Record中的分區列(和MaxCompute的分區欄位同名)的value值

TUPLE

(1). DataHub schema中必須包含MaxCompute分區欄位 (2). 該列值必須為UTF8字串 欄位值可以為空白,表示不分區

SYSTEM_TIME

Record寫入DataHub的時間

TUPLE / BLOB

(1). 分區配置中設定MaxCompute分區的時間轉換Format格式 (2). 設定時區資訊

EVENT_TIME

Record中的event_time(TIMESTAMP)列的value值

TUPLE

(1). 分區配置中設定MaxCompute分區的時間轉換Format格式 (2). 設定時區資訊

META_TIME

Record的屬性欄位__dh_meta_time__的value值

TUPLE / BLOB

(1). 分區配置中設定MaxCompute分區的時間轉換Format格式 (2). 設定時區資訊

其中SYSTEM_TIMEEVENT_TIMEMETA_TIME均是根據時間Timestamp和時區配置來進行MaxCompute分區的轉換過程,單位預設為微秒。

  1. 分區配置決定了根據時間戳記轉換MaxCompute分區時的相關配置。目前管控台預設固定的MaxCompute分區格式,分區配置對應為

分區

時間Format

說明

ds

%Y%m%d

day

hh

%H

hour

mm

%M

minute

  1. 分區間隔決定了根據時間戳記轉換MaxCompute分區時所採用的時間間隔。時間範圍是15分鐘 ~ 1440分鐘(1天),跳變間隔15分鐘

  2. 時區資訊(TimeZone)時區資訊決定了根據時間戳記轉換MaxCompute分區時所採用的轉換時區。

  3. 分隔字元BLOB資料同步時,可以指定16進位分隔字元來決定是否對BLOB資料分割後再同步MaxCompute,比如 0A表示\n(分行符號)。

  4. Base64編碼DataHub BLOB預設儲存位元據,而MaxCompute對應的同步列為STRING類型,因此管控台建立同步任務時,預設採用base64編碼後進行同步,更多定製化需求請參考SDK實現。

查看同步任務

可以點擊對應connector的詳情頁面查看同步任務的運行狀態和點位等資訊, 包含同步點位、同步狀態以及重啟和停止等操作,如下圖所示:5-4

同步樣本

1. USER_DEFINE同步模式

  1. 建立DataHub Topic

    備忘: topic schema中必須需要包含MaxCompute分區欄位,類型為STRING,如下圖所示:5-5

  2. 向DataHub Topic寫入資料,可以使用datahub-sdk進行資料寫入

    測試過程中使用SDK寫入幾條條資料,其中[ds,hh,mm]分別為:[20210304,01,15]和[20210304,02,15],資料內容如下所示:

5-6

3 建立同步任務

USER_DEFINE分區模式可以通過在同步中設定分區配置欄位,如果MaxCompute沒有對應的表,可自動建立。5-7 這裡匯入欄位中設定匯入f1、f2欄位,不同步f3欄位。

4 確認同步資料

可以從DataHub管控台查看對應同步任務的同步資訊。5-8 查詢MaxCompute資料結果,結果如下:5-9 可以看到在USER_DEFINE模式下,DataHub會根據MaxCompute分組欄位所對應的value將DataHub中的資料同步到對應的分區中。

2. SYSTEM_TIME同步模式

  1. 建立DataHub Topic

    備忘:由於分區是根據寫入DataHub時間來計算的,因此topic schema只需包含資料欄位,不需要包含分區欄位,如下圖所示:

a

  1. 向DataHub Topic寫入資料,可以使用datahub-sdk進行資料寫入。

    測試過程中使用SDK寫入幾條資料,DataHub目前對應的寫入時間為2021-03-04 14:02:45,資料內容如下所示:5-11

  2. 建立同步任務

    5-12

    • 請注意分區配置需要和MaxCompute表分區一致。

4 確認同步資料

可以從DataHub管控台查看對應同步任務的同步資訊,如DoneTime。 5-13 查詢MaxCompute資料結果,結果如下: 5-14 可以看到在SYSTEM_TIME模式下,DataHub會根據資料寫入DataHub的時間將DataHub中的資料同步到對應的分區中。

常見問題

  • 同步到MaxCompute timestamp欄位時間變為1970-01-19

    原因:DataHub同步MaxCompute預設時間戳記單位為微秒,使用者寫入時間戳記為毫秒解決方案:寫入DataHub時間戳記以微秒方式寫入。