このトピックでは、GitHubのリアルタイムイベントデータに基づいて、MaxComputeを使用してオフラインデータウェアハウスを構築し、Realtime Compute for Apache FlinkとHologresを使用してリアルタイムデータウェアハウスを構築する方法について説明します。また、HologresとMaxComputeを使用してリアルタイムおよびオフラインデータ分析を実行する方法についても説明します。このプロセスは、オフラインデータ処理とリアルタイムデータ処理の統合と呼ばれます。
背景情報
デジタル化の発展に伴い、企業のデータ適時性への需要はますます高まっています。オフラインモードでの大規模データ処理を伴う従来のビジネスを除き、ほとんどのビジネスではリアルタイム処理、リアルタイムストレージ、およびリアルタイム分析が必要です。これらの要件を満たすために、Alibaba Cloudはオフラインデータ処理とリアルタイムデータ処理の統合を導入しています。
オフラインデータ処理とリアルタイムデータ処理の統合により、リアルタイムデータとオフラインデータを同じプラットフォームで管理および処理できます。このシームレスな統合は、データ分析の効率と精度を向上させるのに役立ちます。オフラインデータ処理とリアルタイムデータ処理の統合には、次の利点があります。
高いデータ処理効率:リアルタイムデータとオフラインデータの両方が同じプラットフォームで管理および処理されます。これにより、データ処理効率が大幅に向上し、データ転送と変換のコストが削減されます。
高いデータ分析精度:リアルタイムデータとオフラインデータのハイブリッド分析により、より高い精度と正確性が実現します。
低いシステム複雑度:データ管理とデータ処理がよりシンプルで効率的になります。
高いデータ価値:データ価値が十分に活用され、企業の意思決定が促進されます。
Alibaba Cloudの統合データウェアハウスソリューションでは、MaxComputeはオフラインデータ分析に使用され、Hologresはリアルタイムデータ分析に使用され、Realtime Compute for Apache Flinkはリアルタイムデータ処理に使用されます。
アーキテクチャ
次の図は、MaxComputeとHologresを一緒に使用してGitHubパブリックイベントデータセットのデータをリアルタイムおよびオフラインで処理する統合データウェアハウスソリューションのアーキテクチャを示しています。

Elastic Compute Service(ECS)インスタンスは、GitHubからリアルタイムおよびオフラインイベントデータをデータソースとして収集します。リアルタイムデータとオフラインデータは個別に処理され、Hologresに書き込まれます。 Hologresは、外部アプリケーションにデータサービスを提供します。
リアルタイム処理:Realtime Compute for Apache Flinkは、Simple Log Serviceのデータをリアルタイムで処理し、処理されたデータをHologresに書き込みます。 Realtime Compute for Apache Flinkは強力なストリームコンピューティングエンジンです。 Hologresは、リアルタイムのデータ書き込みと更新をサポートしています。 Hologresのデータは、書き込まれた直後にクエリできます。 Realtime Compute for Apache FlinkとHologresはネイティブに統合されており、高スループット、低レイテンシ、モデリング、高品質のパフォーマンスを提供するリアルタイムデータウェアハウスをサポートします。リアルタイムデータウェアハウスは、最新のイベント抽出やホットイベント分析などのシナリオにおけるビジネスインサイトのリアルタイム要件を満たします。
オフライン処理:MaxComputeは、大量のオフラインデータを処理およびアーカイブします。 Object Storage Service(OSS)は、Alibaba Cloudが提供するクラウドストレージサービスです。 OSSを使用して、さまざまな種類のデータを保存できます。このプラクティスで参照される生データはJSON形式です。 OSSは、便利で安全、低コスト、信頼性の高いストレージ機能を提供します。 MaxComputeは、データ分析に適したエンタープライズレベルのSoftware as a Service(SaaS)データウェアハウスです。 MaxComputeは、外部テーブルを使用してOSSの半構造化データを直接読み取って解析し、高価値データを保存してから、DataWorksを使用してデータを開発し、オフラインデータウェアハウスを構築できます。
Hologresは、基盤レイヤーでMaxComputeとシームレスに統合されています。 Hologresを使用して、MaxComputeに保存されている大量の履歴データのクエリと分析を高速化し、履歴データの低頻度で高パフォーマンスのクエリに対するビジネス要件を満たすことができます。また、オフライン処理によってリアルタイムデータを簡単に変更し、リアルタイムリンクで発生する可能性のあるデータ損失などの問題を解決することもできます。
統合データウェアハウスソリューションには、次の利点があります。
安定した効率的なオフライン処理:データは1時間ごとに書き込みおよび更新できます。大量のデータをバッチ処理して、複雑な計算と分析を実行できます。これにより、計算コストが削減され、データ処理効率が向上します。
成熟したリアルタイム処理:リアルタイム書き込み、リアルタイムイベントコンピューティング、リアルタイム分析がサポートされています。リアルタイムデータは数秒でクエリされます。
統合ストレージとサービス:Hologresはサービスを提供するために使用されます。 Hologresはデータを一元的に保存し、オンライン分析処理(OLAP)クエリとキーバリューペアのポイントクエリ用の統一SQLインターフェースを提供します。
リアルタイムデータとオフラインデータの統合:冗長データは少なくなります。データの移行はほとんどなく、修正できます。
ワンストップ開発により、データクエリへの数秒以内の応答、データ処理ステータスの視覚化、必要なコンポーネントと依存関係の削減が実現します。これにより、O&Mコストと人件費が大幅に削減されます。
ビジネスとデータの紹介
多数の開発者がGitHubでオープンソースプロジェクトを開発し、プロジェクトの開発中に多数のイベントを生成します。 GitHubは、各イベントの種類と詳細、開発者、およびコードリポジトリを記録します。 GitHubは、お気に入りにアイテムを追加したり、コードを送信したりするときに生成されるイベントなど、パブリックイベントも公開します。イベントタイプの詳細については、「Webhookイベントとペイロード」をご参照ください。
GitHubは、APIを使用してパブリックイベントを公開します。 APIは、5分前に発生したイベントのみを公開します。詳細については、「イベント」をご参照ください。 APIを使用してリアルタイムデータを取得できます。
GH Archiveプロジェクトは、GitHubパブリックイベントを1時間ごとに集計し、開発者からのアクセスを許可します。 GH Archiveプロジェクトの詳細については、「GH Archive」をご参照ください。 GH Archiveプロジェクトからオフラインデータを取得できます。
GitHubの紹介
GitHubはコード管理に使用され、開発者間のコミュニケーションプラットフォームとしても機能します。 GitHubには、開発者、リポジトリ、組織の3つのレベル1エンティティオブジェクトが関係します。
このプラクティスでは、eventsはエンティティオブジェクトとして保存および記録されます。

元のパブリックイベントのデータの紹介
元のイベントのJSON形式のサンプルデータ:
{
"id": "19541192931",
"type": "WatchEvent",
"actor":
{
"id": 23286640,
"login": "herekeo",
"display_login": "herekeo",
"gravatar_id": "",
"url": "https://api.github.com/users/herekeo",
"avatar_url": "https://avatars.githubusercontent.com/u/23286640?"
},
"repo":
{
"id": 52760178,
"name": "crazyguitar/pysheeet",
"url": "https://api.github.com/repos/crazyguitar/pysheeet"
},
"payload":
{
"action": "started"
},
"public": true,
"created_at": "2022-01-01T00:03:04Z"
}この例では、発生しないイベントまたは記録されなくなったイベントを除いて、15種類のパブリックイベントが関係しています。イベントの種類と説明の詳細については、「GitHubイベントの種類」をご参照ください。
前提条件
ECSインスタンスが作成され、Elastic IP Address(EIP)に関連付けられています。 ECSインスタンスを使用して、GitHub APIからリアルタイムイベントデータを取得できます。詳細については、「インスタンスの作成」および「EIPの関連付けまたは関連付け解除」をご参照ください。
OSS がアクティブ化され、ossutil ツールが ECS インスタンスにインストールされています。 ossutil ツールを使用して、GH Archive から JSON 形式のデータを OSS に保存できます。 詳細については、「OSS のアクティブ化」および「ossutil のインストール」をご参照ください。
MaxComputeがアクティブ化され、MaxComputeプロジェクトが作成されています。詳細については、「MaxComputeプロジェクトの作成」をご参照ください。
DataWorksがアクティブ化され、オフラインスケジュールタスクを作成するためのワークスペースが作成されています。詳細については、「ワークスペースの作成」をご参照ください。
Simple Log Serviceがアクティブ化されています。 ECSインスタンスによって抽出されたデータを収集し、ログとして保存するために、プロジェクトとログストアが作成されています。詳細については、「はじめに」をご参照ください。
Realtime Compute for Apache Flinkがアクティブ化され、Simple Log Serviceによって収集されたログデータをリアルタイムでHologresに書き込みます。詳細については、「Realtime Compute for Apache Flinkのアクティブ化」をご参照ください。
Hologresインスタンスが購入されています。詳細については、「Hologresインスタンスの購入」をご参照ください。
オフラインデータウェアハウスの構築(1時間ごとに更新)
ECSインスタンスを使用して生データファイルをダウンロードし、ファイルをOSSにアップロードする
ECSインスタンスを使用して、GH ArchiveからJSON形式のデータファイルをダウンロードできます。
履歴データをダウンロードする場合は、
wgetコマンドを実行できます。たとえば、wget https://data.gharchive.org/{2012..2022}-{01..12}-{01..31}-{0..23}.json.gzコマンドを実行して、2012年から2022年まで1時間ごとにアーカイブされたデータをダウンロードできます。1時間ごとにアーカイブされるリアルタイムデータをダウンロードする場合は、次の手順を実行して、1時間ごとに実行されるスケジュールタスクを設定します。
説明ossutilツールがECSインスタンスにインストールされていることを確認してください。詳細については、「ossutilのインストール」をご参照ください。 ossutilインストールパッケージをダウンロードしてECSインスタンスにアップロードすることをお勧めします。
yum install unzipコマンドを実行してunzipソフトウェアをインストールし、ossutilインストールパッケージを解凍して、解凍されたファイルを/usr/bin/ディレクトリに移動できます。ECSインスタンスが存在するリージョンにOSSバケットが作成されていることを確認してください。この例では、
githubeventsという名前のOSSバケットを使用します。この例では、ECSファイルのダウンロードディレクトリは
/opt/hourlydata/gh_dataです。ビジネス要件に基づいてディレクトリを選択できます。
次のコマンドを実行して
/opt/hourlydataディレクトリに移動し、download_code.shという名前のファイルを作成します。cd /opt/hourlydata vim download_code.shファイルに
iと入力して、編集モードに入ります。次に、次のサンプルコードを追加します。d=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%-H') h=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%H') url=https://data.gharchive.org/${d}.json.gz echo ${url} # データを./gh_data/ディレクトリにダウンロードします。ビジネス要件に基づいてディレクトリを指定できます。 wget ${url} -P ./gh_data/ # gh_dataディレクトリに切り替えます。 cd gh_data # ダウンロードしたデータをJSONファイルに解凍します。 gzip -d ${d}.json echo ${d}.json # ルートディレクトリに切り替えます。 cd /root # ossutilツールを使用して、データをOSSにアップロードします。 # githubeventsという名前のOSSバケットにhr=${h}という名前のディレクトリを作成します。 ossutil mkdir oss://githubevents/hr=${h} # /hourlydata/gh_dataディレクトリのデータをOSSにアップロードします。ビジネス要件に基づいてディレクトリを指定できます。 ossutil cp -r /opt/hourlydata/gh_data oss://githubevents/hr=${h} -u echo oss uploaded successfully! rm -rf /opt/hourlydata/gh_data/${d}.json echo ecs deleted![esc]キーを押して、編集モードを終了します。
:wqと入力し、Enterキーを押してファイルを保存して閉じます。次のコマンドを実行して、
download_code.shファイルが毎時10分に実行されるスケジュールタスクを設定します。#1.次のコマンドを実行し、iを入力して編集モードに入ります。 crontab -e #2.次のコマンドを追加します。次に、Escキーを押して編集モードを終了し、:wqと入力してEnterキーを押してファイルを保存して閉じます。 10 * * * * cd /opt/hourlydata && sh download_code.sh > download.log毎時10分に、前の時間にアーカイブされたJSONファイルがダウンロードされ、OSSの
oss://githubeventsディレクトリに解凍されます。 MaxComputeが前の時間にアーカイブされたファイルのみを読み取るようにするには、'hr=%Y-%M-%D-%H'形式の名前で各ファイルのパーティションを作成することをお勧めします。これにより、MaxComputeは最新のパーティションからデータを読み取ります。
外部テーブルを使用してOSSからMaxComputeにデータをインポートする
MaxComputeクライアントまたはDataWorksコンソールのODPS SQLノードで次の手順を実行します。詳細については、「MaxComputeクライアント(odpscmd)」または「MaxCompute SQLタスクの開発」をご参照ください。
OSSに保存されているJSONファイルを変換するために、
githubeventsという名前の外部テーブルを作成します。CREATE EXTERNAL TABLE IF NOT EXISTS githubevents ( col STRING ) PARTITIONED BY ( hr STRING ) STORED AS textfile LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/githubevents/' ;MaxComputeでOSS外部テーブルを作成する方法の詳細については、「OSS外部テーブルの作成」をご参照ください。
データを保存するために、
dwd_github_events_odpsという名前のファクトテーブルを作成します。サンプルデータ定義言語(DDL)ステートメント:CREATE TABLE IF NOT EXISTS dwd_github_events_odps ( id BIGINT COMMENT 'イベントID' ,actor_id BIGINT COMMENT 'イベントアクターのID' ,actor_login STRING COMMENT 'イベントアクターのユーザー名' ,repo_id BIGINT COMMENT 'repoID' ,repo_name STRING COMMENT 'owner/Repository_name形式の完全なリポジトリ名' ,org_id BIGINT COMMENT 'リポジトリが属する組織のID' ,org_login STRING COMMENT 'リポジトリが属する組織の名前' ,`type` STRING COMMENT 'イベントタイプ' ,created_at DATETIME COMMENT 'イベントが発生した時刻' ,action STRING COMMENT 'イベントアクション' ,iss_or_pr_id BIGINT COMMENT 'issue/pull_request ID' ,number BIGINT COMMENT 'issueまたはpull_requestリクエストのシーケンス番号' ,comment_id BIGINT COMMENT 'コメントID' ,commit_id STRING COMMENT 'コミットID' ,member_id BIGINT COMMENT 'メンバーID' ,rev_or_push_or_rel_id BIGINT COMMENT 'review/push/release ID' ,ref STRING COMMENT '作成または削除されたリソースの名前' ,ref_type STRING COMMENT '作成または削除されたリソースのタイプ' ,state STRING COMMENT 'issue、pull_request、またはpull_request_reviewリクエストのステータス' ,author_association STRING COMMENT 'アクターとリポジトリの関係' ,language STRING COMMENT 'マージリクエストコードに使用される言語' ,merged BOOLEAN COMMENT 'マージが許可されているかどうかを指定します' ,merged_at DATETIME COMMENT 'コードがマージされた時刻' ,additions BIGINT COMMENT 'コードに追加された行数' ,deletions BIGINT COMMENT 'コードから削除された行数' ,changed_files BIGINT COMMENT 'pull requestによって変更されたファイルの数' ,push_size BIGINT COMMENT 'プッシュサイズ' ,push_distinct_size BIGINT COMMENT '異なるプッシュサイズ' ,hr STRING COMMENT 'イベントが発生した時間。たとえば、イベントが00:23に発生した場合、このパラメーターの値は00です。' ,`month` STRING COMMENT 'イベントが発生した月。たとえば、イベントが2015年10月に発生した場合、このパラメーターの値は2015-10です。' ,`year` STRING COMMENT 'イベントが発生した年。たとえば、イベントが2015年に発生した場合、このパラメーターの値は2015です。' ) PARTITIONED BY ( ds STRING COMMENT 'イベントが発生した日。このパラメーターの値はyyyy-mm-dd形式です。' );JSON形式のデータを解析し、ファクトテーブルに書き込みます。
次のコマンドを実行して、パーティションを追加し、JSON形式のデータを解析し、解析されたデータを
dwd_github_events_odpsテーブルに書き込みます。msck repair table githubevents add partitions; set odps.sql.hive.compatible = true; set odps.sql.split.hive.bridge = true; INSERT into TABLE dwd_github_events_odps PARTITION(ds) SELECT CAST(GET_JSON_OBJECT(col,'$.id') AS BIGINT ) AS id ,CAST(GET_JSON_OBJECT(col,'$.actor.id')AS BIGINT) AS actor_id ,GET_JSON_OBJECT(col,'$.actor.login') AS actor_login ,CAST(GET_JSON_OBJECT(col,'$.repo.id')AS BIGINT) AS repo_id ,GET_JSON_OBJECT(col,'$.repo.name') AS repo_name ,CAST(GET_JSON_OBJECT(col,'$.org.id')AS BIGINT) AS org_id ,GET_JSON_OBJECT(col,'$.org.login') AS org_login ,GET_JSON_OBJECT(col,'$.type') as type ,to_date(GET_JSON_OBJECT(col,'$.created_at'), 'yyyy-mm-ddThh:mi:ssZ') AS created_at ,GET_JSON_OBJECT(col,'$.payload.action') AS action ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.id')AS BIGINT) END AS iss_or_pr_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.number')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.number')AS BIGINT) ELSE CAST(GET_JSON_OBJECT(col,'$.payload.number')AS BIGINT) END AS number ,CAST(GET_JSON_OBJECT(col,'$.payload.comment.id')AS BIGINT) AS comment_id ,GET_JSON_OBJECT(col,'$.payload.comment.commit_id') AS commit_id ,CAST(GET_JSON_OBJECT(col,'$.payload.member.id')AS BIGINT) AS member_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.review.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="PushEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.push_id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="ReleaseEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.release.id')AS BIGINT) END AS rev_or_push_or_rel_id ,GET_JSON_OBJECT(col,'$.payload.ref') AS ref ,GET_JSON_OBJECT(col,'$.payload.ref_type') AS ref_type ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.state') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.state') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.state') END AS state ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssueCommentEvent" THEN GET_JSON_OBJECT(col,'$.payload.comment.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.author_association') END AS author_association ,GET_JSON_OBJECT(col,'$.payload.pull_request.base.repo.language') AS language ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.merged') AS BOOLEAN) AS merged ,to_date(GET_JSON_OBJECT(col,'$.payload.pull_request.merged_at'), 'yyyy-mm-ddThh:mi:ssZ') AS merged_at ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.additions')AS BIGINT) AS additions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.deletions')AS BIGINT) AS deletions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.changed_files')AS BIGINT) AS changed_files ,CAST(GET_JSON_OBJECT(col,'$.payload.size')AS BIGINT) AS push_size ,CAST(GET_JSON_OBJECT(col,'$.payload.distinct_size')AS BIGINT) AS push_distinct_size ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),12,2) as hr ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,7),'/','-') as month ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,4) as year ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,10),'/','-') as ds from githubevents where hr = cast(to_char(dateadd(getdate(),-9,'hh'), 'yyyy-mm-dd-hh') as string);データをクエリします。
次のコマンドを実行して、
dwd_github_events_odpsテーブルからデータをクエリします。SET odps.sql.allow.fullscan=true; SELECT * FROM dwd_github_events_odps where ds = '2023-03-31' limit 10;次の図は、返された結果を示しています。

リアルタイムデータウェアハウスの構築
ECSインスタンスを使用してリアルタイムデータを収集する
ECSインスタンスを使用して、GitHub APIからリアルタイムイベントデータを抽出できます。このセクションでは、GitHub APIからリアルタイムデータを収集する方法について説明します。
この例では、1分以内に生成されたリアルタイムイベントデータがGitHub APIから収集され、JSON形式で保存されます。
スクリプトを実行して収集されたリアルタイムイベントデータは、完全ではない場合があります。
GitHub APIから継続的にデータを収集する場合は、AcceptおよびAuthorizationパラメーターを指定する必要があります。 Acceptの値は固定です。 Authorizationの値は、GitHubから申請したアクセストークンから取得できます。アクセストークンの作成方法の詳細については、「パーソナルアクセストークンの作成」をご参照ください。
次のコマンドを実行して
/opt/realtimeディレクトリに移動し、download_realtime_data.pyという名前のファイルを作成します。cd /opt/realtime vim download_realtime_data.pyファイルに
iと入力して、編集モードに入ります。次に、次のサンプルコードを追加します。#!python import requests import json import sys import time # GitHub APIのURLを取得します。 def get_next_link(resp): resp_link = resp.headers['link'] link = '' for l in resp_link.split(', '): link = l.split('; ')[0][1:-1] rel = l.split('; ')[1] if rel == 'rel="next"': return link return None # GitHub APIから1ページのデータを収集します。 def download(link, fname): # GitHub APIのAcceptとAuthorizationを定義します。 headers = {"Accept": "application/vnd.github+json","Authorization": "<Bearer> <github_api_token>"} resp = requests.get(link, headers=headers) if int(resp.status_code) != 200: return None with open(fname, 'a') as f: for j in resp.json(): f.write(json.dumps(j)) f.write('\n') print('downloaded {} events to {}'.format(len(resp.json()), fname)) return resp # GitHub APIから複数ページのデータを収集します。 def download_all_data(fname): link = 'https://api.github.com/events?per_page=100&page=1' while True: resp = download(link, fname) if resp is None: break link = get_next_link(resp) if link is None: break # 現在時刻を定義します。 def get_current_ms(): return round(time.time()*1000) # 各スクリプトの実行時間を1分に設定します。 def main(fname): current_ms = get_current_ms() while get_current_ms() - current_ms < 60*1000: download_all_data(fname) time.sleep(0.1) # スクリプトを実行します。 if __name__ == '__main__': if len(sys.argv) < 2: print('usage: python {} <log_file>'.format(sys.argv[0])) exit(0) main(sys.argv[1])[esc]キーを押して、編集モードを終了します。
:wqと入力し、Enterキーを押してスクリプトを保存して閉じます。run_py.shという名前のファイルを作成し、このファイルを使用してdownload_realtime_data.pyファイルのスクリプトを実行し、収集されたデータをス実行時間に基づいて個別に保存します。サンプルコード:python /opt/realtime/download_realtime_data.py /opt/realtime/gh_realtime_data/$(date '+%Y-%m-%d-%H:%M:%S').json履歴データを削除するために使用される
delete_log.shファイルを作成します。サンプルコード:d=$(TZ=UTC date --date='2 day ago' '+%Y-%m-%d') rm -f /opt/realtime/gh_realtime_data/*${d}*.json次のコマンドを実行して、1分ごとにGitHubデータを収集し、毎日履歴データを削除します。
#1.次のコマンドを実行し、iを入力して編集モードに入ります。 crontab -e #2.次のコマンドを追加します。次に、Escキーを押して編集モードを終了し、:wqと入力してEnterキーを押してファイルを保存して閉じます。 * * * * * bash /opt/realtime/run_py.sh 1 1 * * * bash /opt/realtime/delete_log.sh
Simple Log Serviceを使用してECSインスタンスからデータを収集する
Simple Log Serviceを使用して、ECSインスタンスによって抽出されたリアルタイムイベントデータを収集し、データをログとして保存できます。
Simple Log Serviceは、ECSインスタンスからログデータを収集するためのLogtailを提供します。このトピックのサンプルデータはJSON形式です。 LogtailのJSONモードを使用して、ECSインスタンスから増分JSONログをすばやく収集できます。詳細については、「サーバーからのテキストログの収集」をご参照ください。このトピックでは、Simple Log Serviceを使用して、トップレベルに属する生データキーバリューペアを解析します。
この例では、Logtailは/opt/realtime/gh_realtime_data/**/*.jsonディレクトリの*.jsonファイルに収集されたデータを記録します。
設定が完了すると、Simple Log ServiceはECSインスタンスから増分イベントデータを継続的に収集します。次の図は、収集されたデータの例を示しています。
Realtime Compute for Apache Flinkを使用してSimple Log ServiceからHologresにデータを書き込む
Realtime Compute for Apache Flinkを使用して、Simple Log Serviceによって収集されたログデータをリアルタイムでHologresに書き込むことができます。このプロセスでは、Realtime Compute for Apache FlinkでSimple Log ServiceソーステーブルとHologres結果テーブルが使用されます。詳細については、「Simple Log Serviceからのデータのインポート」をご参照ください。
Hologres内部テーブルを作成します。
この例では、JSON形式の生データの一部のキーのみがHologres内部テーブルに保持されます。
idで指定されたイベントIDは配布キーとして設定され、dsで指定された日付はパーティションキーとして設定され、created_atで指定されたイベント発生時刻はevent_time_columnとして設定されます。idとdsはプライマリキーです。クエリ要件に基づいて、Hologres内部テーブルの他のフィールドにインデックスを作成できます。これにより、クエリの効率が向上します。インデックスの詳細については、「CREATE TABLE」をご参照ください。サンプルDDLステートメント:DROP TABLE IF EXISTS gh_realtime_data; BEGIN; CREATE TABLE gh_realtime_data ( id bigint, actor_id bigint, actor_login text, repo_id bigint, repo_name text, org_id bigint, org_login text, type text, created_at timestamp with time zone NOT NULL, action text, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id text, member_id bigint, rev_or_push_or_rel_id bigint, ref text, ref_type text, state text, author_association text, language text, merged boolean, merged_at timestamp with time zone, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr text, month text, year text, ds text, PRIMARY KEY (id,ds) ) PARTITION BY LIST (ds); CALL set_table_property('public.gh_realtime_data', 'distribution_key', 'id'); CALL set_table_property('public.gh_realtime_data', 'event_time_column', 'created_at'); CALL set_table_property('public.gh_realtime_data', 'clustering_key', 'created_at'); COMMENT ON COLUMN public.gh_realtime_data.id IS 'イベントID'; COMMENT ON COLUMN public.gh_realtime_data.actor_id IS 'アクターID'; COMMENT ON COLUMN public.gh_realtime_data.actor_login IS 'イベントアクターのユーザー名'; COMMENT ON COLUMN public.gh_realtime_data.repo_id IS 'repoID'; COMMENT ON COLUMN public.gh_realtime_data.repo_name IS 'リポジトリ名'; COMMENT ON COLUMN public.gh_realtime_data.org_id IS 'リポジトリが属する組織のID'; COMMENT ON COLUMN public.gh_realtime_data.org_login IS 'リポジトリが属する組織の名前'; COMMENT ON COLUMN public.gh_realtime_data.type IS 'イベントタイプ'; COMMENT ON COLUMN public.gh_realtime_data.created_at IS 'イベントが発生した時刻。'; COMMENT ON COLUMN public.gh_realtime_data.action IS 'イベントアクション'; COMMENT ON COLUMN public.gh_realtime_data.iss_or_pr_id IS 'issue/pull_request ID'; COMMENT ON COLUMN public.gh_realtime_data.number IS 'issueまたはpull_requestのシーケンス番号'; COMMENT ON COLUMN public.gh_realtime_data.comment_id IS 'コメントID'; COMMENT ON COLUMN public.gh_realtime_data.commit_id IS 'コミットID'; COMMENT ON COLUMN public.gh_realtime_data.member_id IS 'メンバーID'; COMMENT ON COLUMN public.gh_realtime_data.rev_or_push_or_rel_id IS 'review/push/release ID'; COMMENT ON COLUMN public.gh_realtime_data.ref IS '作成または削除されたリソースの名前'; COMMENT ON COLUMN public.gh_realtime_data.ref_type IS '作成または削除されたリソースのタイプ'; COMMENT ON COLUMN public.gh_realtime_data.state IS 'issue、pull_request、またはpull_request_reviewリクエストのステータス'; COMMENT ON COLUMN public.gh_realtime_data.author_association IS 'アクターとリポジトリの関係'; COMMENT ON COLUMN public.gh_realtime_data.language IS 'プログラミング言語'; COMMENT ON COLUMN public.gh_realtime_data.merged IS 'マージが許可されているかどうかを指定します'; COMMENT ON COLUMN public.gh_realtime_data.merged_at IS 'コードがマージされた時刻'; COMMENT ON COLUMN public.gh_realtime_data.additions IS 'コードに追加された行数'; COMMENT ON COLUMN public.gh_realtime_data.deletions IS 'コードから削除された行数'; COMMENT ON COLUMN public.gh_realtime_data.changed_files IS 'pull requestによって変更されたファイルの数'; COMMENT ON COLUMN public.gh_realtime_data.push_size IS 'プッシュサイズ'; COMMENT ON COLUMN public.gh_realtime_data.push_distinct_size IS '異なるプッシュサイズ'; COMMENT ON COLUMN public.gh_realtime_data.hr IS 'イベントが発生した時間。たとえば、イベントが00:23に発生した場合、このパラメーターの値は00です。'; COMMENT ON COLUMN public.gh_realtime_data.month IS 'イベントが発生した月。たとえば、イベントが2015年10月に発生した場合、このパラメーターの値は2015-10です。'; COMMENT ON COLUMN public.gh_realtime_data.year IS 'イベントが発生した年。たとえば、イベントが2015年に発生した場合、このパラメーターの値は2015です。'; COMMENT ON COLUMN public.gh_realtime_data.ds IS 'イベントが発生した日。このパラメーターの値はyyyy-mm-dd形式です。'; COMMIT;Realtime Compute for Apache Flinkを使用してリアルタイムデータを書き込みます。
Realtime Compute for Apache Flinkを使用して、Simple Log Serviceからのデータをさらに解析し、解析されたデータをリアルタイムでHologresに書き込むことができます。 Realtime Compute for Apache Flinkで次のステートメントを実行して、Hologresに書き込むデータをフィルタリングします。データフィルタリングでは、
created_atで指定されたイベントIDやイベント発生時刻などのダーティデータが破棄されます。最近発生したイベントのデータのみが保存されます。CREATE TEMPORARY TABLE sls_input ( actor varchar, created_at varchar, id bigint, org varchar, payload varchar, public varchar, repo varchar, type varchar ) WITH ( 'connector' = 'sls', 'endpoint' = '<endpoint>',-- Simple Log Serviceにアクセスするために使用される内部エンドポイント。 'accesssid'= '<accesskey id>',-- アカウントのAccessKey ID。 'accesskey' = '<accesskey secret>',-- アカウントのAccessKeyシークレット。 'project' = '<project name>',-- Simple Log Serviceのプロジェクト名。 'logstore' = '<logstore name>'-- Simple Log Serviceのログストア名。 'starttime' = '2023-04-06 00:00:00',-- Simple Log Serviceでデータの収集が開始された時刻。 ); CREATE TEMPORARY TABLE hologres_sink ( id bigint, actor_id bigint, actor_login string, repo_id bigint, repo_name string, org_id bigint, org_login string, type string, created_at timestamp, action string, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id string, member_id bigint, rev_or_push_or_rel_id bigint, `ref` string, ref_type string, state string, author_association string, `language` string, merged boolean, merged_at timestamp, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr string, `month` string, `year` string, ds string ) WITH ( 'connector' = 'hologres', 'dbname' = '<hologres dbname>', -- Hologresデータベースの名前。 'tablename' = '<hologres tablename>', -- データを書き込むHologresテーブルの名前。 'username' = '<accesskey id>', -- Alibaba CloudアカウントのAccessKey ID。 'password' = '<accesskey secret>', -- Alibaba CloudアカウントのAccessKeyシークレット。 'endpoint' = '<endpoint>', -- HologresインスタンスのVirtual Private Cloud(VPC)エンドポイント。 'jdbcretrycount' = '1', -- 接続に失敗した場合に許可される最大再試行回数。 'partitionrouter' = 'true', -- パーティションテーブルにデータを書き込むかどうかを指定します。 'createparttable' = 'true', -- パーティションを自動的に作成するかどうかを指定します。 'mutatetype' = 'insertorignore' -- データ書き込みモード。 ); INSERT INTO hologres_sink SELECT id ,CAST(JSON_VALUE(actor, '$.id') AS bigint) AS actor_id ,JSON_VALUE(actor, '$.login') AS actor_login ,CAST(JSON_VALUE(repo, '$.id') AS bigint) AS repo_id ,JSON_VALUE(repo, '$.name') AS repo_name ,CAST(JSON_VALUE(org, '$.id') AS bigint) AS org_id ,JSON_VALUE(org, '$.login') AS org_login ,type ,TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS created_at ,JSON_VALUE(payload, '$.action') AS action ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.id') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.id') AS bigint) END AS iss_or_pr_id ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.number') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.number') AS bigint) ELSE CAST(JSON_VALUE(payload, '$.number') AS bigint) END AS number ,CAST(JSON_VALUE(payload, '$.comment.id') AS bigint) AS comment_id ,JSON_VALUE(payload, '$.comment.commit_id') AS commit_id ,CAST(JSON_VALUE(payload, '$.member.id') AS bigint) AS member_id ,CASE WHEN type='PullRequestReviewEvent' THEN CAST(JSON_VALUE(payload, '$.review.id') AS bigint) WHEN type='PushEvent' THEN CAST(JSON_VALUE(payload, '$.push_id') AS bigint) WHEN type='ReleaseEvent' THEN CAST(JSON_VALUE(payload, '$.release.id') AS bigint) END AS rev_or_push_or_rel_id ,JSON_VALUE(payload, '$.ref') AS `ref` ,JSON_VALUE(payload, '$.ref_type') AS ref_type ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.state') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.state') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.state') END AS state ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.author_association') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.author_association') WHEN type='IssueCommentEvent' THEN JSON_VALUE(payload, '$.comment.author_association') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.author_association') END AS author_association ,JSON_VALUE(payload, '$.pull_request.base.repo.language') AS `language` ,CAST(JSON_VALUE(payload, '$.pull_request.merged') AS boolean) AS merged ,TO_TIMESTAMP_TZ(replace(JSON_VALUE(payload, '$.pull_request.merged_at'),'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS merged_at ,CAST(JSON_VALUE(payload, '$.pull_request.additions') AS bigint) AS additions ,CAST(JSON_VALUE(payload, '$.pull_request.deletions') AS bigint) AS deletions ,CAST(JSON_VALUE(payload, '$.pull_request.changed_files') AS bigint) AS changed_files ,CAST(JSON_VALUE(payload, '$.size') AS bigint) AS push_size ,CAST(JSON_VALUE(payload, '$.distinct_size') AS bigint) AS push_distinct_size ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),12,2) as hr ,REPLACE(SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,7),'/','-') as `month` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,4) as `year` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,10) as ds FROM sls_input WHERE id IS NOT NULL AND created_at IS NOT NULL AND to_date(replace(created_at,'T',' ')) >= date_add(CURRENT_DATE, -1);パラメーターの詳細については、「Simple Log Serviceコネクタ」および「Hologres コネクタ」をご参照ください。
説明GitHubは生のイベントデータをUTCタイムゾーンで記録し、データにタイムゾーンプロパティを含めません。ただし、HologresはデフォルトでUTC + 8タイムゾーンを使用します。 Realtime Compute for Apache Flinkを使用してリアルタイムデータをHologresに書き込むときは、次の操作を実行してデータのタイムゾーンを変換する必要があります。Flink SQLのソーステーブルのデータにUTCタイムゾーンプロパティを追加します。 [デプロイ開始設定]ページの[flink設定]セクションで、
table.local-time-zone:Asia/Shanghaiステートメントを追加して、Realtime Compute for Apache FlinkのタイムゾーンをAsia/Shanghaiに設定します。データをクエリします。
Realtime Compute for Apache Flinkを使用して、Simple Log ServiceからHologresに書き込まれたデータをクエリできます。また、ビジネス要件に基づいてデータを開発することもできます。
SELECT * FROM public.gh_realtime_data limit 10;次の図は、返された結果を示しています。

オフラインデータを使用してリアルタイムデータを修正する
このトピックで説明されているシナリオでは、リアルタイムデータは完全ではない場合があります。オフラインデータを使用してリアルタイムデータを修正できます。次のセクションでは、前日に生成されたリアルタイムデータを修正する方法について説明します。ビジネス要件に基づいてデータ修正期間を変更できます。
Hologresに外部テーブルを作成して、MaxComputeからオフラインデータを取得します。
IMPORT FOREIGN SCHEMA <maxcompute_project_name> LIMIT to ( <foreign_table_name> ) FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'update',if_unsupported_type 'error');パラメーターの詳細については、「IMPORT FOREIGN SCHEMA」をご参照ください。
一時テーブルを作成し、オフラインデータを使用して前日に生成されたリアルタイムデータを修正します。
説明Hologres V2.1.17以降では、サーバーレスコンピューティング機能がサポートされています。サーバーレスコンピューティング機能は、大量のデータをオフラインでインポートしたり、大規模な抽出、変換、読み込み(ETL)ジョブを実行したり、外部テーブルから大量のデータをクエリしたりするシナリオに適しています。サーバーレスコンピューティング機能を使用して、追加のサーバーレスコンピューティングリソースに基づいて前述の操作を実行できます。これにより、インスタンスに追加のコンピューティングリソースを予約する必要がなくなります。これにより、インスタンスの安定性が向上し、メモリ不足(OOM)エラーの発生が減少します。タスクで使用される追加のサーバーレスコンピューティングリソースに対してのみ課金されます。サーバーレスコンピューティング機能の詳細については、「サーバーレスコンピューティング」をご参照ください。サーバーレスコンピューティング機能の使用方法の詳細については、「サーバーレスコンピューティングのユーザーガイド」をご参照ください。
-- 一時テーブルが存在する場合は削除します。 DROP TABLE IF EXISTS gh_realtime_data_tmp; -- 一時テーブルを作成します。 SET hg_experimental_enable_create_table_like_properties = ON; CALL HG_CREATE_TABLE_LIKE ('gh_realtime_data_tmp', 'select * from gh_realtime_data'); -- オプション。大量のデータをオフラインでインポートし、抽出、変換、読み込み(ETL)ジョブを実行するには、サーバーレスコンピューティング機能を使用することをお勧めします。 SET hg_computing_resource = 'serverless'; -- 一時テーブルにデータを挿入し、統計を更新します。 INSERT INTO gh_realtime_data_tmp SELECT * FROM <foreign_table_name> WHERE ds = current_date - interval '1 day' ON CONFLICT (id, ds) DO NOTHING; ANALYZE gh_realtime_data_tmp; -- 設定をリセットします。これにより、後続のSQLステートメントでサーバーレスコンピューティングリソースが使用されなくなります。 RESET hg_computing_resource; -- 元の子テーブルを一時子テーブルに置き換えます。 BEGIN; DROP TABLE IF EXISTS "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data_tmp RENAME TO "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data ATTACH PARTITION "gh_realtime_data_<yesterday_date>" FOR VALUES IN ('<yesterday_date>'); COMMIT;
データ分析
大量のデータは、さまざまなデータ分析に使用できます。データウェアハウスを設計する際には、ビジネスに必要な時間範囲に基づいてデータレイヤーを定義できます。データウェアハウスは、リアルタイムデータ分析、オフラインデータ分析、およびオフラインデータ処理とリアルタイムデータ処理の統合に関する要件を満たすことができます。
このセクションでは、前のセクションで取得したリアルタイムデータを分析する方法について説明します。指定されたコードリポジトリのデータを分析したり、開発者としてデータ分析を実行したりすることもできます。
現在発生したパブリックイベントの総数をクエリします。
SELECT count(*) FROM gh_realtime_data WHERE created_at >= date_trunc('day', now());次の結果が返されます。
count ------ 1006前日に最も多くのイベントが発生した上位プロジェクトをクエリします。
SELECT repo_name, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' GROUP BY repo_name ORDER BY events DESC LIMIT 5;次の結果が返されます。
repo_name events ----------------------------------------+------ leo424y/heysiri.ml 29 arm-on/plan 10 Christoffel-T/fiverr-pat-20230331 9 mate-academy/react_dynamic-list-of-goods 9 openvinotoolkit/openvino 7前日に最も多くのイベントを開始した上位開発者をクエリします。
SELECT actor_login, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' AND actor_login NOT LIKE '%[bot]' GROUP BY actor_login ORDER BY events DESC LIMIT 5;次の結果が返されます。
actor_login events ------------------+------ direwolf-github 13 arm-on 10 sergii-nosachenko 9 Christoffel-T 9 yangwang201911 7過去1時間で最も人気のあるプログラミング言語をクエリします。
SELECT language, count(*) total FROM gh_realtime_data WHERE created_at > now() - interval '1 hour' AND language IS NOT NULL GROUP BY language ORDER BY total DESC LIMIT 10;次の結果が返されます。
language total -----------+---- JavaScript 25 C++ 15 Python 14 TypeScript 13 Java 8 PHP 8前日にリポジトリがお気に入りに追加された回数でリポジトリを降順にランク付けします。
説明この例では、リポジトリがお気に入りから削除された回数は計算されません。
SELECT repo_id, repo_name, COUNT(actor_login) total FROM gh_realtime_data WHERE type = 'WatchEvent' AND created_at > now() - interval '1 day' GROUP BY repo_id, repo_name ORDER BY total DESC LIMIT 10;次の結果が返されます。
repo_id repo_name total ---------+----------------------------------+----- 618058471 facebookresearch/segment-anything 4 619959033 nomic-ai/gpt4all 1 97249406 denysdovhan/wtfjs 1 9791525 digininja/DVWA 1 168118422 aylei/interview 1 343520006 joehillen/sysz 1 162279822 agalwood/Motrix 1 577723410 huggingface/swift-coreml-diffusers 1 609539715 e2b-dev/e2b 1 254839429 maniackk/KKCallStack 1現在関与しているアクティブなアクターの数とアクティブなリポジトリの数をクエリします。
SELECT uniq (actor_id) actor_num, uniq (repo_id) repo_num FROM gh_realtime_data WHERE created_at > date_trunc('day', now());次の結果が返されます。
actor_num repo_num ---------+-------- 743 816