開發人員在基於GitHub開源專案進行開發時會產生海量事件,GitHub會記錄每次事件的類型、詳情、開發人員和代碼倉庫等資訊,並開放其中的公開事件。DataWorks提供“GitHub十大熱門程式設計語言”模板,通過對GitHub中公開資料集進行加工和分析,並將分析結果以郵件的方式發送給指定使用者。運行本案例後,您將得到GitHub中Top10程式設計語言每小時被提交的次數與排行。
手動設定與實際應用中的ETL模板在工作流程細節上存在一定差異,但兩者實現的案例效果大體相同。
案例說明
DataWorks為您提供了一個公用的MySQL資料來源,儲存來自GitHub的公開即時資料,本案例將此資料進行同步分析,最終將分析結果通過郵件發送至指定郵箱。主要業務過程如下:
通過DataWorks的Data Integration功能,將MySQL上的GitHub即時資料同步至MaxCompute。
將同步至MaxCompute的資料進行分析處理,查詢擷取過去1小時Github中Top10的代碼語言及提交次數,並將處理結果儲存於阿里雲OSS。
在Function Compute中開發一個Python函數,函數邏輯為將OSS中的處理結果發送至指定郵箱。
通過DataWorks的任務調度能力,實現過去1小時GitHub熱門程式設計語言資料自動更新,並將資料處理結果發送至指定郵箱。
操作步驟
資源準備
進行本實踐前,您需先開通涉及的阿里雲產品並完成以下準備工作。
建議將以下涉及的雲產品開通在同一地區,本文以均開通在上海地區為例。
如果您此前未使用過阿里雲的以下產品,可申請免費試用資源,詳情請前往DataWorks免費試用、MaxCompute免費試用、Function Compute免費試用、OSS免費試用。
開通巨量資料開發治理平台DataWorks並建立工作空間(本實踐以使用標準模式工作空間為例,簡單模式的操作類似)。操作詳情請參見開通DataWorks服務、建立工作空間。
開通雲原生MaxCompute,並建立MaxCompute專案。操作詳情請參見開通MaxCompute和DataWorks。
開通Function ComputeFC。操作詳情請參見步驟一:開通Function Compute服務。
開通Object Storage Service並建立OSS Bucket。操作詳情請參見控制台快速入門、步驟一:建立儲存空間。
OSS側操作:建立OSS Bucket
登入OSS控制台,在Bucket列表頁面單擊建立Bucket,配置Bucket名稱和地區後單擊確定,建立OSS Bucket。

Function Compute側操作:建立並開發函數
登入Function Compute控制台建立服務並為服務添加OSS許可權。
由於後續開發的函數代碼邏輯需要讀取OSS Bucket中的資料並將資料發送至指定郵箱,因此需給Function Compute的服務授予OSS的許可權。
在右上方單擊返回Function Compute2.0,返回至Function Compute2.0的工作台頁面。

在Function Compute2.0頁面,單擊切換至服務及函數頁面,並在左上方切換地區,單擊建立服務,佈建服務名稱後單擊確定。
單擊建立好的服務,單擊左側服務詳情頁簽,在角色配置地區單擊編輯,選擇服務角色AliyunFcDefaultRole,單擊儲存,回到服務詳情頁簽,在角色配置地區單擊服務角色AliyunFcDefaultRole,進入RAM存取控制的角色頁面。
單擊新增授權,選擇系統策略中的AliyunOSSReadOnlyAccess許可權,根據介面提示進行添加。完成後即給Function Compute服務授予了OSS的讀許可權。
建立函數並開發函數邏輯。
建立函數。
回到Function Compute中建立的服務頁面,單擊左側函數管理頁簽,單擊建立函數,配置函數名稱,並選擇運行環境為Python3.9,其他參數可保持預設值,完成後單擊建立。
為函數環境安裝相關依賴包。
說明本實踐需使用oss2阿里雲二方包和pandas開源三方包。其中oss2包在python3.9 runtime內建支援無需手動安裝,您需參考以下步驟手動安裝panadas包。
單擊建立好的函數,在函數頁面的函數配置頁簽中,單擊層配置地區後的編輯,單擊添加層,選擇添加官方公用層後,選擇Pandas1.x,完成後單擊確定。

回到函數頁面後,單擊函數代碼頁簽,當WebIDE的Python環境載入完成後,複製以下代碼至Index.py檔案中,並修改其中的OSS內網Endpoint參數、郵箱相關參數。
# -*- coding: utf-8 -*- import logging import json import smtplib import oss2 import pandas as pd from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.mime.text import MIMEText from email.utils import COMMASPACE from email import encoders def handler(event, context): evts = json.loads(event) bucket_name = evts["bucketName"] file_path = evts["filePath"] auth = oss2.StsAuth(context.credentials.access_key_id, context.credentials.access_key_secret, context.credentials.security_token) endpoint = 'https://oss-{}-internal.aliyuncs.com'.format(context.region) bucket = oss2.Bucket(auth, endpoint, bucket_name) file_name = file_path for obj in oss2.ObjectIteratorV2(bucket, prefix=file_path): if not obj.key.endswith('/'): file_name = obj.key csv_file = bucket.get_object(file_name) logger = logging.getLogger() logger.info('event: %s', evts) mail_host = 'smtp.***.com' ## 郵箱服務地址 mail_port = '465'; ## 郵箱smtp協議連接埠號碼 mail_username = 'sender_****@163.com' ## 身份認證使用者名稱:填完整的郵箱名 mail_password = 'EWEL******KRU' ## 身份認證密碼:填郵箱 SMTP 授權碼 mail_sender = 'sender_****@163.com' ## 寄件者郵箱地址 mail_receivers = ['receiver_****@163.com'] ## 收件者郵箱地址 message = MIMEMultipart('alternative') message['Subject'] = 'Github資料加工結果' message['From'] = mail_sender message['To'] = mail_receivers[0] html_message = generate_mail_content(evts, csv_file) message.attach(html_message) # Send email smtpObj = smtplib.SMTP_SSL(mail_host + ':' + mail_port) smtpObj.login(mail_username,mail_password) smtpObj.sendmail(mail_sender,mail_receivers,message.as_string()) smtpObj.quit() return 'mail send success' def generate_mail_title(evt): mail_title='' if 'mailTitle' in evt.keys(): mail_content=evt['mailTitle'] else: logger = logging.getLogger() logger.error('msg not present in event') return mail_title def generate_mail_content(evts, csv_file): headerList = ['Github Repos', 'Stars'] # Read csv file content dumped_file = pd.read_csv(csv_file, names=headerList) # Convert DataFrame to HTML table table_html = dumped_file.to_html(header=headerList,index=False) # Convert DataFrame to HTML table table_html = dumped_file.to_html(index=False) mail_title=generate_mail_title(evts) # Email body html = f""" <html> <body> <h2>{mail_title}</h2> <p>Here are the top 10 languages on GitHub in the past hour:</p> {table_html} </body> </html> """ # Attach HTML message html_message = MIMEText(html, 'html') return html_message說明範例程式碼中使用到了bucketName、filePath、mailTitle這三個變數,此三個變數的取值後續通過DataWorks的Function Compute節點同步取值,無需在代碼中修改。
待修改參數
配置指導
OSS內網Endpoint
(第20行)
根據您當前操作的地區,將其中的
'https://oss-{}-internal.aliyuncs.com'替換為OSS的內網Endpoint取值。以上海地區為例,需修改參數為
'https://oss-cn-shanghai-internal.aliyuncs.com'。各地區的OSS內網Endpoint資訊請參見OSS地區和訪問網域名稱。
郵箱相關參數
(31~36行)
根據實際業務需要:
修改31~35行為後續發送郵件郵箱服務地址、smtp協議連接埠號碼、信箱使用者名及密碼等資訊。
修改36行為後續內送郵件的郵箱地址。
說明您可在您使用的郵箱協助文檔中查看如何擷取相關取值。
完成代碼開發後,單擊部署代碼。
DataWorks側操作:建立資料來源
建立MySQL資料來源。
本實踐使用的公用GitHub資料存放區在公用的MySQL資料庫中,您需要先建立一個MySQL資料來源,用於後續同步資料至MaxCompute時對接MySQL資料庫。
進入資料來源頁面。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,在下拉框中選擇對應工作空間後單擊進入管理中心。
進入工作空間管理中心頁面後,單擊左側導覽列的資料來源,進入資料來源頁面。
單擊新增資料來源,選擇資料來源類型為MySQL,根據介面提示配置資料來源名稱等參數,核心參數如下。
參數
說明
資料來源類型
選擇串連串模式。
資料來源名稱
自訂。本文以github_events_share為例。
JDBC URL
配置為:jdbc:mysql://rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com:3306/github_events_share
重要該資料來源僅支援資料同步情境下讀取使用,其他模組不支援。
使用者名稱
配置為:workshop
密碼
配置為:workshop#2017
此密碼僅為本教程樣本,請勿在實際業務中使用。
認證選項
無認證。
資源群組連通性
單擊Data Integration公用資源群組後的測試連通性,等待介面提示測試完成,連通狀態為可連通。
建立MaxCompute資料來源。
後續需將GitHub資料同步至MaxCompute,因此您需建立一個MaxCompute資料來源。
進入資料來源頁面。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,在下拉框中選擇對應工作空間後單擊進入管理中心。
進入工作空間管理中心頁面後,單擊左側導覽列的資料來源,進入資料來源頁面。
單擊新增資料來源,選擇資料來源類型為MaxCompute,根據介面提示配置資料來源名稱、對應的MaxCompute專案等參數,詳細請參見綁定MaxCompute計算資源。
綁定MaxCompute計算資源至資料開發。
後續需建立一個MaxCompute的SQL任務進行資料處理,因此您需要將MaxCompute計算資源綁定到計算資源,便於後續建立ODPS SQL節點進行SQL任務開發。更多資訊,請參見開發前準備:綁定計算資源或叢集。
說明當資料來源資訊發生變更時,若當前介面資料更新不及時,請重新整理當前頁面更新快取資料。
DataWorks側操作:建立商務程序並開發資料處理任務
進入資料開發頁面。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,在下拉框中選擇對應工作空間後單擊進入資料開發。
建立商務程序。
單擊左上方的,配置業務名稱後單擊建立。
建立業務節點並配置依賴關係。

雙擊建立的業務名稱,開啟商務程序頁面。
在商務程序頁面單擊建立節點,拖拽離線同步節點進商務程序頁面,配置節點名稱後單擊確認,建立一個離線同步節點。
重複上述步驟,再建立一個ODPS SQL節點、Function Compute節點。
配置離線同步節點。
雙擊商務程序中建立的離線同步節點,進入離線同步節點頁面。
配置離線同步任務的網路與資源。

配置項
配置說明
資料來源
選擇資料來源為MySQL,資料來源選擇上述步驟建立的MySQL資料來源。
資料去向
選擇資料去向為MaxCompute,資料來源選擇已建立的MaxCompute資料來源。
我的資源
選擇右下角的。
完成後單擊下一步,根據介面提示完成網路連通測試。
配置離線同步任務,核心參數如下,其他參數可保持預設。
配置項
配置說明
資料來源
表:在下拉框中選擇github_public_event。
資料過濾:配置為
created_at >'${day1} ${hour1}' and created_at<'${day2} ${hour2}'
資料去向
表:單擊一鍵產生目標表結構,在彈框中單擊建立表。
分區資訊:配置為
pt=${day_hour}。
單擊頁面右側的調度配置,配置調度參數,核心參數如下,其他參數可保持預設。
配置項
配置說明
調度參數
單擊載入代碼中的參數,新增以下五個參數並配置參數的取值邏輯如下:
day1:$[yyyy-mm-dd-1/24]hour1:$[hh24-1/24]day2:$[yyyy-mm-dd]hour2:$[hh24]day_hour:$[yyyymmddhh24]
時間屬性
調度周期:配置為小時
重跑屬性:配置為運行成功或失敗後皆可重跑
調度依賴
勾選使用工作空間根節點
單擊右上方的儲存按鈕,儲存節點配置。
配置ODPS SQL節點。
雙擊商務程序中建立的ODPS SQL節點,進入ODPS SQL節點頁面。
將以下範例程式碼貼入節點中。
重要以下範例程式碼建立了一個OSS外部表格,用於儲存處理後的資料。如果您是首次使用OSS外部表格,您還需對當前操作帳號進行授權,否則後續商務程序運行會報錯,授權操作請參見OSS的STS模式授權。
-- 1. 建立odps的oss外部表格用於接收Github公用資料集資料加工結果。 -- 本案例建立的oss外表為odps_external,存放於在步驟1建立的OSS Bucket,本案例OSS Bucket名為xc-bucket-demo2,您需要根據實際情況進行修改。 CREATE EXTERNAL TABLE IF NOT EXISTS odps_external( language STRING COMMENT 'repo全名:owner/Repository_name', num STRING COMMENT '提交次數' ) partitioned by ( direction string ) STORED BY 'com.aliyun.odps.CsvStorageHandler' WITH SERDEPROPERTIES( 'odps.text.option.header.lines.count'='0', 'odps.text.option.encoding'='UTF-8', 'odps.text.option.ignore.empty.lines'='false', 'odps.text.option.null.indicator'='') LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/${YOUR_BUCKET_NAME}/odps_external/'; -- 2. 對同步MaxCompute的GitHub資料加工後寫入MaxCompute的oss外表。 -- 查詢擷取過去1小時Github中Top10的代碼語言及提交次數 SET odps.sql.unstructured.oss.commit.mode=true; INSERT INTO TABLE odps_external partition (direction='${day_hour}') SELECT language, COUNT(*) AS num FROM github_public_event WHERE language IS NOT NULL AND pt='${day_hour}' GROUP BY language ORDER BY num DESC limit 10;單擊頁面右側的調度配置,配置調度參數,核心參數如下,其他參數可保持預設。
配置項
配置說明
調度參數
單擊載入代碼中的參數,新增以下幾個參數並配置參數的取值邏輯:
YOUR_BUCKET_NAME:參數值為上述步驟中建立的OSS Bucket名稱。day_hour:$[yyyymmddhh24]
時間屬性
調度周期:配置為小時
重跑屬性:配置為運行成功或失敗後皆可重跑
單擊右上方的儲存按鈕,儲存節點配置。
配置Function Compute節點。
雙擊商務程序中建立的Function Compute節點,進入Function Compute節點頁面。
配置Function Compute節點任務。
配置項
配置說明
選擇服務
選擇上述步驟在Function Compute控制台中建立的服務。
選擇函數
選擇上述步驟在Function Compute控制台中建立的函數。
調用方式
選擇同步。
變數
配置為以下內容。
{ "bucketName": "${YOUR_BUCKET_NAME}", "filePath": "odps_external/direction=${day_hour}/", "mailTitle":"過去1小時Github中Top10的代碼語言及其提交次數" }單擊頁面右側的調度配置,配置調度參數,核心參數如下,其他參數可保持預設。
配置項
配置說明
調度參數
單擊新增參數,新增以下幾個參數並配置參數的取值邏輯如下:
YOUR_BUCKET_NAME:參數值為上述步驟中建立的OSS Bucket名稱。day_hour:$[yyyymmddhh24]
時間屬性
調度周期:配置為小時
重跑屬性:配置為運行成功或失敗後皆可重跑
單擊右上方的儲存按鈕,儲存節點配置。
DataWorks側:調試工作流程
在DataWorks資料開發頁面雙擊建立的業務名稱,開啟商務程序頁面。
單擊頂部的運行按鈕,調試運行整個商務程序。
當介面提示運行完成後,您可登入收取資料處理結果的郵箱查看郵件。
DataWorks側:提交發布工作流程
(可選)後續如果您希望周期性同步資料至MaxCompute進行處理,並周期性發送處理結果到指定郵箱,您需要將商務程序提交發布至DataWorks的營運中心。
在資料開發頁面,雙擊建立的業務名稱,開啟商務程序頁面。
單擊商務程序頁面的提交按鈕,根據介面提示將商務程序提交發布至營運中心,操作詳情請參見發布任務。
後續商務程序將根據配置的調度周期,周期性運行。
後續步驟:釋放資源
如果您使用的是免費試用資源,或後續您不需要繼續使用此實踐的雲產品,可釋放對應的雲產品資源,避免產生額外費用。
釋放OSS資源:前往OSS控制台刪除本案例使用的Bucket。
釋放Function Compute資源:前往Function Compute控制台刪除服務。
釋放DataWorks資源:前往DataWorks控制台刪除DataWorks工作空間。
釋放MaxCompute資源:前往MaxCompute控制台刪除MaxCompute專案。