This tutorial shows how to build a unified batch and real-time analytics solution using GitHub event data. You will use MaxCompute to build a batch data warehouse, and Realtime Compute for Apache Flink and Hologres to build a real-time data warehouse. Then, you will use Hologres and MaxCompute to perform real-time and batch data analysis.
Background
As businesses become more digital, the demand for fresher data is growing. In addition to traditional batch processing for large-scale data, many businesses need to handle real-time data processing, storage, and analytics. Unified batch and real-time analytics was developed to meet this need.
Unified batch and real-time analytics manages and processes both real-time and batch data on a single platform. It enables a seamless connection between real-time processing and batch analytics, which improves the efficiency and accuracy of data analysis. The key benefits include:
Improved data processing efficiency: Integrating real-time and batch data on a single platform significantly improves processing efficiency and reduces data transfer and conversion costs.
Improved analytics accuracy: Blending real-time and batch data for analysis improves the precision and accuracy of your results.
This approach simplifies data management and processing, making these tasks more efficient.
You can fully leverage your data's value to provide better business decision support.
Alibaba Cloud offers a simplified, unified data warehouse solution for both batch and real-time scenarios. This solution uses MaxCompute for batch processing and Hologres for real-time analytics. Paired with the real-time processing capabilities of Realtime Compute for Apache Flink, these services form the core engine of Alibaba Cloud's unified data warehouse.
Solution architecture
The following diagram shows the complete pipeline for implementing unified batch and real-time analytics on the GitHub public events dataset using MaxCompute and Hologres.

In this architecture, an ECS instance collects and aggregates real-time and batch event data from GitHub, which then serves as the data source. This data is fed into a real-time pipeline and a batch pipeline. The data from both pipelines is then consolidated in Hologres, which provides a unified service layer.
Real-time pipeline: Realtime Compute for Apache Flink processes data from Simple Log Service (SLS) in real time and writes it to Hologres. Flink is a powerful stream processing engine. Hologres supports real-time data writes and updates, with data queryable immediately after it is written. Their native integration supports high-throughput, low-latency, model-driven, and high-quality real-time data warehouse development. This meets the needs of real-time business insights, such as extracting the latest events and analyzing trending events.
Batch pipeline: MaxCompute processes and archives massive amounts of batch data. Object Storage Service (OSS) is an Alibaba Cloud service for storing various data types. The raw data used in this tutorial is in JSON format, and OSS provides convenient, secure, low-cost, and reliable storage. MaxCompute is an enterprise-grade SaaS cloud data warehouse designed for data analytics. It can directly read and parse semi-structured data in OSS by using an external table, integrate high-value data into its internal storage, and then integrate with DataWorks to create a batch data warehouse.
Hologres is seamlessly integrated with MaxCompute at the storage layer. This allows you to use Hologres to accelerate queries and analysis on massive volumes of historical data in MaxCompute, meeting business needs for low-frequency, high-performance queries on historical data. You can also easily use the batch pipeline to correct real-time data, to resolve issues like data omissions that can occur in the real-time pipeline.
This solution offers the following advantages:
Stable and efficient batch pipeline: Supports hourly data writes and updates, can process large-scale data in batches, perform complex calculations and analysis, reduce computing costs, and improve data processing efficiency.
Mature real-time pipeline: Supports real-time ingestion, real-time event computation, and real-time analytics. The simplified real-time pipeline delivers responses within seconds.
Unified storage and service: Hologres provides a unified service layer with centralized data storage and a consistent external interface (a single SQL interface for both OLAP and Key-Value queries).
Unified batch and real-time analytics: Reduces data redundancy and movement, and allows for data correction.
This one-stop development approach ultimately achieves second-level data response, end-to-end status visibility, a simplified architecture with fewer components and dependencies, and effective reductions in both O&M and labor costs.
Understanding the business and data
Developers create many events when they work on open source projects on GitHub. GitHub records details for each event. These details include the event type, the developer, and the code repository. GitHub makes public events available, such as starring a repository or committing code. For a complete list of event types, see Webhook events and payloads.
GitHub provides public events through an OpenAPI. The API offers real-time data with a five-minute delay. For more information, see Events.
The GH Archive project collects and provides hourly archives of GitHub public events. Use these archives to get offline data. For more information, see GH Archive.
Understanding the GitHub business
GitHub's core business is managing code and interactions. It involves three main entities: Developer, Repository, and Organization.
For this data analysis, an Event is also stored and recorded as an entity.

Understanding raw public event data
The following example shows the JSON data for a raw event:
{
"id": "19541192931",
"type": "WatchEvent",
"actor":
{
"id": 23286640,
"login": "herekeo",
"display_login": "herekeo",
"gravatar_id": "",
"url": "https://api.github.com/users/herekeo",
"avatar_url": "https://avatars.githubusercontent.com/u/23286640?"
},
"repo":
{
"id": 52760178,
"name": "crazyguitar/pysheeet",
"url": "https://api.github.com/repos/crazyguitar/pysheeet"
},
"payload":
{
"action": "started"
},
"public": true,
"created_at": "2022-01-01T00:03:04Z"
}This analysis covers 15 types of public events. It does not include events that never happened or are no longer recorded. For details about these event types, see Github public event types.
Prerequisites
An Elastic Compute Service (ECS) instance is created and an Elastic IP Address (EIP) is associated with it. The instance is used to extract real-time event data from the GitHub API. For more information, see Creation guide and Elastic IP Address.
Object Storage Service (OSS) is activated, and the ossutil tool is installed on the ECS instance to store the JSON data files from GH Archive. For more information, see Activate OSS and Install ossutil.
MaxCompute is activated and a project is created. For more information, see Create a MaxCompute project.
DataWorks is activated and a workspace is created to create offline scheduling tasks. For more information, see Create a workspace.
Simple Log Service (SLS) is activated, and a project and a Logstore are created to collect data from the ECS instance as logs. For more information, see Collect and analyze ECS text logs using LoongCollector.
A Realtime Compute for Apache Flink instance is activated to write log data from SLS to Hologres in real time. For more information, see Activate Realtime Compute for Apache Flink.
Hologres is activated. For more information, see Purchase a Hologres instance.
Build an offline data warehouse (hourly updates)
Download raw data files using an ECS instance and upload them to OSS
Use an Elastic Compute Service (ECS) instance to download JSON data files from GH Archive.
Download historical data using the
wgetcommand. For example, runwget https://data.gharchive.org/{2012..2022}-{01..12}-{01..31}-{0..23}.json.gzto download hourly data from 2012 to 2022.To download new data generated each hour, set up a scheduled hourly task as follows.
NoteMake sure that ossutil is installed on the ECS instance. For more information, see Install ossutil. Download the ossutil installation package and upload it to the ECS instance. Run
yum install unzipto install the unzip software. Then, decompress the ossutil package and move the executable file to the/usr/bin/directory.Make sure that you have created an Object Storage Service (OSS) bucket in the same region as your ECS instance. You can use a custom bucket name. This example uses the bucket name
githubevents.In this example, the files are downloaded to the
/opt/hourlydata/gh_datadirectory on the ECS instance. You can use a different directory.
Run the following command to create a file named
download_code.shin the/opt/hourlydatadirectory.cd /opt/hourlydata vim download_code.shPress
ito enter edit mode and add the following script.d=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%-H') h=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%H') url=https://data.gharchive.org/${d}.json.gz echo ${url} # Download the data to the ./gh_data/ directory. You can use a different directory. wget ${url} -P ./gh_data/ # Change to the gh_data directory. cd gh_data # Decompress the downloaded data into a JSON file. gzip -d ${d}.json echo ${d}.json # Change to the root directory. cd /root # Use ossutil to upload the data to OSS. # Create the hr=${h} directory in the githubevents OSS bucket. ossutil mkdir oss://githubevents/hr=${h} # Upload the data from the /opt/hourlydata/gh_data directory to OSS. You can use a different directory. ossutil cp -r /opt/hourlydata/gh_data oss://githubevents/hr=${h} -u echo oss uploaded successfully! rm -rf /opt/hourlydata/gh_data/${d}.json echo ecs deleted!Press the Esc key, enter
:wq, and press Enter to save and close the file.Run the following command to execute the
download_code.shscript at 10 minutes past every hour.# 1. Run the following command and press I to enter edit mode. crontab -e # 2. Add the following command. Then, press Esc, enter :wq, and press Enter to exit. 10 * * * * cd /opt/hourlydata && sh download_code.sh > download.logAfter the script runs, the JSON file from the previous hour is downloaded at 10 minutes past every hour. The file is then decompressed on the ECS instance and uploaded to OSS at the path
oss://githubevents. To read only the file from the previous hour, a directory named'hr=%Y-%M-%D-%H'is created as a partition for each file during upload. This ensures that subsequent data write operations read files only from the latest partition.
Import OSS data into MaxCompute using an external table
Run the following commands in the MaxCompute client or an ODPS SQL node in DataWorks. For more information, see Connect to MaxCompute by using the client (odpscmd) or Develop an ODPS SQL task.
Create the external table
githubeventsto read the JSON files stored in OSS:CREATE EXTERNAL TABLE IF NOT EXISTS githubevents ( col STRING ) PARTITIONED BY ( hr STRING ) STORED AS textfile LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/githubevents/' ;For more information about creating external tables to access OSS data in MaxCompute, see Access unstructured data in OSS.
Create the fact table
dwd_github_events_odpsto store the data. The following code shows the Data Definition Language (DDL) statement:CREATE TABLE IF NOT EXISTS dwd_github_events_odps ( id BIGINT COMMENT 'Event ID' ,actor_id BIGINT COMMENT 'ID of the event initiator' ,actor_login STRING COMMENT 'Logon name of the event initiator' ,repo_id BIGINT COMMENT 'Repository ID' ,repo_name STRING COMMENT 'Full name of the repository in the format of owner/repository_name' ,org_id BIGINT COMMENT 'ID of the organization to which the repository belongs' ,org_login STRING COMMENT 'Name of the organization to which the repository belongs' ,`type` STRING COMMENT 'Event type' ,created_at DATETIME COMMENT 'Time when the event occurred' ,action STRING COMMENT 'Event action' ,iss_or_pr_id BIGINT COMMENT 'ID of the issue or pull request' ,number BIGINT COMMENT 'Number of the issue or pull request' ,comment_id BIGINT COMMENT 'Comment ID' ,commit_id STRING COMMENT 'Commit ID' ,member_id BIGINT COMMENT 'Member ID' ,rev_or_push_or_rel_id BIGINT COMMENT 'ID of the review, push, or release' ,ref STRING COMMENT 'Name of the created or deleted resource' ,ref_type STRING COMMENT 'Type of the created or deleted resource' ,state STRING COMMENT 'Status of the issue, pull request, or pull request review' ,author_association STRING COMMENT 'Relationship between the actor and the repository' ,language STRING COMMENT 'Language of the code in the pull request' ,merged BOOLEAN COMMENT 'Indicates whether the pull request was merged' ,merged_at DATETIME COMMENT 'Time when the code was merged' ,additions BIGINT COMMENT 'Number of added lines of code' ,deletions BIGINT COMMENT 'Number of deleted lines of code' ,changed_files BIGINT COMMENT 'Number of files changed in the pull request' ,push_size BIGINT COMMENT 'Number of commits' ,push_distinct_size BIGINT COMMENT 'Number of distinct commits' ,hr STRING COMMENT 'Hour when the event occurred. For example, if the event occurred at 00:23, the value of hr is 00.' ,`month` STRING COMMENT 'Month when the event occurred. For example, if the event occurred in October 2015, the value of month is 2015-10.' ,`year` STRING COMMENT 'Year when the event occurred. For example, if the event occurred in 2015, the value of year is 2015.' ) PARTITIONED BY ( ds STRING COMMENT 'Date when the event occurred, in the yyyy-mm-dd format.' );Parse the JSON data and write it to the fact table.
Run the following command to add partitions, parse the JSON data, and write the data to the
dwd_github_events_odpstable:msck repair table githubevents add partitions; set odps.sql.hive.compatible = true; set odps.sql.split.hive.bridge = true; INSERT into TABLE dwd_github_events_odps PARTITION(ds) SELECT CAST(GET_JSON_OBJECT(col,'$.id') AS BIGINT ) AS id ,CAST(GET_JSON_OBJECT(col,'$.actor.id')AS BIGINT) AS actor_id ,GET_JSON_OBJECT(col,'$.actor.login') AS actor_login ,CAST(GET_JSON_OBJECT(col,'$.repo.id')AS BIGINT) AS repo_id ,GET_JSON_OBJECT(col,'$.repo.name') AS repo_name ,CAST(GET_JSON_OBJECT(col,'$.org.id')AS BIGINT) AS org_id ,GET_JSON_OBJECT(col,'$.org.login') AS org_login ,GET_JSON_OBJECT(col,'$.type') as type ,to_date(GET_JSON_OBJECT(col,'$.created_at'), 'yyyy-mm-ddThh:mi:ssZ') AS created_at ,GET_JSON_OBJECT(col,'$.payload.action') AS action ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.id')AS BIGINT) END AS iss_or_pr_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.number')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.number')AS BIGINT) ELSE CAST(GET_JSON_OBJECT(col,'$.payload.number')AS BIGINT) END AS number ,CAST(GET_JSON_OBJECT(col,'$.payload.comment.id')AS BIGINT) AS comment_id ,GET_JSON_OBJECT(col,'$.payload.comment.commit_id') AS commit_id ,CAST(GET_JSON_OBJECT(col,'$.payload.member.id')AS BIGINT) AS member_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.review.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="PushEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.push_id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="ReleaseEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.release.id')AS BIGINT) END AS rev_or_push_or_rel_id ,GET_JSON_OBJECT(col,'$.payload.ref') AS ref ,GET_JSON_OBJECT(col,'$.payload.ref_type') AS ref_type ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.state') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.state') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.state') END AS state ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssueCommentEvent" THEN GET_JSON_OBJECT(col,'$.payload.comment.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.author_association') END AS author_association ,GET_JSON_OBJECT(col,'$.payload.pull_request.base.repo.language') AS language ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.merged') AS BOOLEAN) AS merged ,to_date(GET_JSON_OBJECT(col,'$.payload.pull_request.merged_at'), 'yyyy-mm-ddThh:mi:ssZ') AS merged_at ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.additions')AS BIGINT) AS additions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.deletions')AS BIGINT) AS deletions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.changed_files')AS BIGINT) AS changed_files ,CAST(GET_JSON_OBJECT(col,'$.payload.size')AS BIGINT) AS push_size ,CAST(GET_JSON_OBJECT(col,'$.payload.distinct_size')AS BIGINT) AS push_distinct_size ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),12,2) as hr ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,7),'/','-') as month ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,4) as year ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,10),'/','-') as ds from githubevents where hr = cast(to_char(dateadd(getdate(),-9,'hh'), 'yyyy-mm-dd-hh') as string);Query the data.
Run the following command to query data from the
dwd_github_events_odpstable:SET odps.sql.allow.fullscan=true; SELECT * FROM dwd_github_events_odps where ds = '2023-03-31' limit 10;The following sample result is returned:

Build a real-time data warehouse
Obtain real-time data using ECS
An Elastic Compute Service (ECS) instance is used to extract real-time event data from the GitHub API. This topic provides a sample script to show how to collect real-time data from the GitHub API.
Each time the script runs, it executes for 1 minute. It collects the real-time event data provided by the API during this period and stores each event in JSON format.
This script does not guarantee that all real-time event data is collected.
To continuously collect data from the GitHub API, provide Accept and Authorization headers. The value of Accept is fixed. For Authorization, enter the personal access token that you obtained from GitHub. For more information about how to create a personal access token, see this documentation.
Run the following commands to create a file named
download_realtime_data.pyin the/opt/realtimedirectory.cd /opt/realtime vim download_realtime_data.pyPress
ito enter edit mode and add the following sample content to the file.#!python import requests import json import sys import time # Get the API URL def get_next_link(resp): resp_link = resp.headers['link'] link = '' for l in resp_link.split(', '): link = l.split('; ')[0][1:-1] rel = l.split('; ')[1] if rel == 'rel="next"': return link return None # Collect one page of data from the API def download(link, fname): # Define the Accept and Authorization headers for the GitHub API headers = {"Accept": "application/vnd.github+json","Authorization": "<Bearer> <github_api_token>"} resp = requests.get(link, headers=headers) if int(resp.status_code) != 200: return None with open(fname, 'a') as f: for j in resp.json(): f.write(json.dumps(j)) f.write('\n') print('downloaded {} events to {}'.format(len(resp.json()), fname)) return resp # Collect multiple pages of data from the API def download_all_data(fname): link = 'https://api.github.com/events?per_page=100&page=1' while True: resp = download(link, fname) if resp is None: break link = get_next_link(resp) if link is None: break # Define the current time def get_current_ms(): return round(time.time()*1000) # Define the script execution duration as 1 minute def main(fname): current_ms = get_current_ms() while get_current_ms() - current_ms < 60*1000: download_all_data(fname) time.sleep(0.1) # Run the script if __name__ == '__main__': if len(sys.argv) < 2: print('usage: python {} <log_file>'.format(sys.argv[0])) exit(0) main(sys.argv[1])Press the Esc key, enter
:wq, and press Enter to save and close the file.Create a
run_py.shfile to rundownload_realtime_data.pyand store the data collected from each run separately. The content is as follows.python /opt/realtime/download_realtime_data.py /opt/realtime/gh_realtime_data/$(date '+%Y-%m-%d-%H:%M:%S').jsonCreate a
delete_log.shfile to delete historical data. The content is as follows.d=$(TZ=UTC date --date='2 day ago' '+%Y-%m-%d') rm -f /opt/realtime/gh_realtime_data/*${d}*.jsonRun the following commands to collect GitHub data every minute and delete historical data every day.
#1. Run the following command and press I to enter edit mode. crontab -e #2. Add the following commands. Then, press Esc, enter :wq, and press Enter to exit. * * * * * bash /opt/realtime/run_py.sh 1 1 * * * bash /opt/realtime/delete_log.sh
Collect ECS data using SLS
Simple Log Service (SLS) is used to collect the real-time event data extracted from the ECS instance as logs.
SLS supports collecting logs from ECS instances by using Logtail. Because the data in this topic is in JSON format, you can use the JSON mode of Logtail to quickly collect incremental JSON logs from the ECS instance. For more information, see Collect logs in JSON mode. In this topic, SLS is configured to parse the top-level key-value pairs of the raw data.
In this example, the log path parameter for the Logtail configuration is set to /opt/realtime/gh_realtime_data/**/*.json.
After the configuration is complete, SLS continuously collects incremental event data from the ECS instance. The following figure shows an example of the collected data.
Write SLS data to Hologres in real time using Flink
Flink is used to write log data collected by SLS to Hologres in real time. By using an SLS source table and a Hologres result table in Flink, you can write data from SLS to Hologres in real time. For more information, see Import data from Simple Log Service.
Create a Hologres internal table.
The internal table created in this topic retains only some key-value pairs from the raw JSON data. The event
idand datedsare set as the primary key. The eventidis set as the distribution key. The datedsis set as the partition key. The event timecreated_atis set as the event_time_column. You can create an index for other fields as needed to improve query efficiency. For more information about indexes, see CREATE TABLE. The following Data Definition Language (DDL) statement is used to create the table in this example.DROP TABLE IF EXISTS gh_realtime_data; BEGIN; CREATE TABLE gh_realtime_data ( id bigint, actor_id bigint, actor_login text, repo_id bigint, repo_name text, org_id bigint, org_login text, type text, created_at timestamp with time zone NOT NULL, action text, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id text, member_id bigint, rev_or_push_or_rel_id bigint, ref text, ref_type text, state text, author_association text, language text, merged boolean, merged_at timestamp with time zone, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr text, month text, year text, ds text, PRIMARY KEY (id,ds) ) PARTITION BY LIST (ds); CALL set_table_property('public.gh_realtime_data', 'distribution_key', 'id'); CALL set_table_property('public.gh_realtime_data', 'event_time_column', 'created_at'); CALL set_table_property('public.gh_realtime_data', 'clustering_key', 'created_at'); COMMENT ON COLUMN public.gh_realtime_data.id IS 'Event ID'; COMMENT ON COLUMN public.gh_realtime_data.actor_id IS 'ID of the event initiator'; COMMENT ON COLUMN public.gh_realtime_data.actor_login IS 'Logon name of the event initiator'; COMMENT ON COLUMN public.gh_realtime_data.repo_id IS 'repo ID'; COMMENT ON COLUMN public.gh_realtime_data.repo_name IS 'repo name'; COMMENT ON COLUMN public.gh_realtime_data.org_id IS 'ID of the organization to which the repo belongs'; COMMENT ON COLUMN public.gh_realtime_data.org_login IS 'Name of the organization to which the repo belongs'; COMMENT ON COLUMN public.gh_realtime_data.type IS 'Event type'; COMMENT ON COLUMN public.gh_realtime_data.created_at IS 'Time when the event occurred'; COMMENT ON COLUMN public.gh_realtime_data.action IS 'Event action'; COMMENT ON COLUMN public.gh_realtime_data.iss_or_pr_id IS 'issue/pull_request ID'; COMMENT ON COLUMN public.gh_realtime_data.number IS 'issue/pull_request number'; COMMENT ON COLUMN public.gh_realtime_data.comment_id IS 'comment ID'; COMMENT ON COLUMN public.gh_realtime_data.commit_id IS 'Commit ID'; COMMENT ON COLUMN public.gh_realtime_data.member_id IS 'Member ID'; COMMENT ON COLUMN public.gh_realtime_data.rev_or_push_or_rel_id IS 'review/push/release ID'; COMMENT ON COLUMN public.gh_realtime_data.ref IS 'Name of the created or deleted resource'; COMMENT ON COLUMN public.gh_realtime_data.ref_type IS 'Type of the created or deleted resource'; COMMENT ON COLUMN public.gh_realtime_data.state IS 'Status of the issue/pull_request/pull_request_review'; COMMENT ON COLUMN public.gh_realtime_data.author_association IS 'Relationship between the actor and the repo'; COMMENT ON COLUMN public.gh_realtime_data.language IS 'Programming language'; COMMENT ON COLUMN public.gh_realtime_data.merged IS 'Specifies whether the merge is accepted'; COMMENT ON COLUMN public.gh_realtime_data.merged_at IS 'Time when the code was merged'; COMMENT ON COLUMN public.gh_realtime_data.additions IS 'Number of added lines of code'; COMMENT ON COLUMN public.gh_realtime_data.deletions IS 'Number of deleted lines of code'; COMMENT ON COLUMN public.gh_realtime_data.changed_files IS 'Number of files changed in the pull request'; COMMENT ON COLUMN public.gh_realtime_data.push_size IS 'Number of pushes'; COMMENT ON COLUMN public.gh_realtime_data.push_distinct_size IS 'Number of distinct pushes'; COMMENT ON COLUMN public.gh_realtime_data.hr IS 'The hour when the event occurred. For example, if the time is 00:23, hr=00.'; COMMENT ON COLUMN public.gh_realtime_data.month IS 'The month when the event occurred. For example, if the date is October 2015, month=2015-10.'; COMMENT ON COLUMN public.gh_realtime_data.year IS 'The year when the event occurred. For example, if the year is 2015, year=2015.'; COMMENT ON COLUMN public.gh_realtime_data.ds IS 'The day when the event occurred. ds=yyyy-mm-dd.'; COMMIT;Write data in real time using Flink.
Use Flink to further parse the SLS data and write it to Hologres in real time. Use the following statements in Flink to filter the data to be written. This discards dirty data where the event ID or event time (
created_at) is null and retains only recent event data.CREATE TEMPORARY TABLE sls_input ( actor varchar, created_at varchar, id bigint, org varchar, payload varchar, public varchar, repo varchar, type varchar ) WITH ( 'connector' = 'sls', 'endpoint' = '<endpoint>',--The private endpoint of SLS 'accessid' = '<accesskey id>',--The AccessKey ID of your account 'accesskey' = '<accesskey secret>',--The AccessKey secret of your account 'project' = '<project name>',--The name of the SLS project 'logstore' = '<logstore name>'--The name of the SLS Logstore 'starttime' = '2023-04-06 00:00:00',--The start time for SLS data collection ); CREATE TEMPORARY TABLE hologres_sink ( id bigint, actor_id bigint, actor_login string, repo_id bigint, repo_name string, org_id bigint, org_login string, type string, created_at timestamp, action string, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id string, member_id bigint, rev_or_push_or_rel_id bigint, `ref` string, ref_type string, state string, author_association string, `language` string, merged boolean, merged_at timestamp, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr string, `month` string, `year` string, ds string ) WITH ( 'connector' = 'hologres', 'dbname' = '<hologres dbname>', --The name of the Hologres database 'tablename' = '<hologres tablename>', --The name of the Hologres table that receives data 'username' = '<accesskey id>', --The AccessKey ID of the current Alibaba Cloud account 'password' = '<accesskey secret>', --The AccessKey Secret of the current Alibaba Cloud account 'endpoint' = '<endpoint>', --The VPC endpoint of the current Hologres instance 'jdbcretrycount' = '1', --The number of retries upon connection failure 'partitionrouter' = 'true', --Specifies whether to write data to a partitioned table 'createparttable' = 'true', --Specifies whether to automatically create partitions 'mutatetype' = 'insertorignore' --The data writing mode ); INSERT INTO hologres_sink SELECT id ,CAST(JSON_VALUE(actor, '$.id') AS bigint) AS actor_id ,JSON_VALUE(actor, '$.login') AS actor_login ,CAST(JSON_VALUE(repo, '$.id') AS bigint) AS repo_id ,JSON_VALUE(repo, '$.name') AS repo_name ,CAST(JSON_VALUE(org, '$.id') AS bigint) AS org_id ,JSON_VALUE(org, '$.login') AS org_login ,type ,TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS created_at ,JSON_VALUE(payload, '$.action') AS action ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.id') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.id') AS bigint) END AS iss_or_pr_id ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.number') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.number') AS bigint) ELSE CAST(JSON_VALUE(payload, '$.number') AS bigint) END AS number ,CAST(JSON_VALUE(payload, '$.comment.id') AS bigint) AS comment_id ,JSON_VALUE(payload, '$.comment.commit_id') AS commit_id ,CAST(JSON_VALUE(payload, '$.member.id') AS bigint) AS member_id ,CASE WHEN type='PullRequestReviewEvent' THEN CAST(JSON_VALUE(payload, '$.review.id') AS bigint) WHEN type='PushEvent' THEN CAST(JSON_VALUE(payload, '$.push_id') AS bigint) WHEN type='ReleaseEvent' THEN CAST(JSON_VALUE(payload, '$.release.id') AS bigint) END AS rev_or_push_or_rel_id ,JSON_VALUE(payload, '$.ref') AS `ref` ,JSON_VALUE(payload, '$.ref_type') AS ref_type ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.state') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.state') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.state') END AS state ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.author_association') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.author_association') WHEN type='IssueCommentEvent' THEN JSON_VALUE(payload, '$.comment.author_association') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.author_association') END AS author_association ,JSON_VALUE(payload, '$.pull_request.base.repo.language') AS `language` ,CAST(JSON_VALUE(payload, '$.pull_request.merged') AS boolean) AS merged ,TO_TIMESTAMP_TZ(replace(JSON_VALUE(payload, '$.pull_request.merged_at'),'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS merged_at ,CAST(JSON_VALUE(payload, '$.pull_request.additions') AS bigint) AS additions ,CAST(JSON_VALUE(payload, '$.pull_request.deletions') AS bigint) AS deletions ,CAST(JSON_VALUE(payload, '$.pull_request.changed_files') AS bigint) AS changed_files ,CAST(JSON_VALUE(payload, '$.size') AS bigint) AS push_size ,CAST(JSON_VALUE(payload, '$.distinct_size') AS bigint) AS push_distinct_size ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),12,2) as hr ,REPLACE(SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,7),'/','-') as `month` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,4) as `year` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,10) as ds FROM sls_input WHERE id IS NOT NULL AND created_at IS NOT NULL AND to_date(replace(created_at,'T',' ')) >= date_add(CURRENT_DATE, -1);For more information about the parameters, see Simple Log Service (SLS) and Hologres.
NoteThe raw event data from GitHub uses the UTC time zone and does not have a time zone attribute. The default time zone for Hologres is UTC+8. Therefore, adjust the time zone when writing data from Flink to Hologres in real time. Assign the UTC time zone attribute to the source table data in Flink SQL. The steps are as follows:
Step 1: Go to the job editing page
Log on to the Realtime Compute for Apache Flink console
Go to the target workspace
Find your Flink SQL or JAR job and click Edit
Step 2: Open the Deployment Details tab
⚠️ Key change: "Flink Configuration" is no longer a separate section. It is merged into "Deployment Details".
At the top of the job editing page, switch to the Deployment Details tab.
Scroll down to the Parameter Configuration section.
Step 3: Add a custom configuration
Click the Edit button to the right of Parameter Configuration.
In the dialog box that appears, find the Other Configuration text box.
In the text box, add the Flink parameter
table.local-time-zone:Asia/Shanghaias a key-value pair to set the Flink system time zone toAsia/Shanghai.
Query data.
Query the SLS data written to Hologres through Flink. You can then perform data development as needed.
SELECT * FROM public.gh_realtime_data limit 10;The following result is an example:

Correct real-time data using offline data
In the scenario described in this topic, real-time data might be missing. You can use offline data to correct the real-time data. The following steps show how to correct the real-time data from the previous day. You can adjust the data correction period as needed.
Create a foreign table in Hologres to obtain MaxCompute offline data.
IMPORT FOREIGN SCHEMA <maxcompute_project_name> LIMIT to ( <foreign_table_name> ) FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'update',if_unsupported_type 'error');For more information about the parameters, see IMPORT FOREIGN SCHEMA.
Create a temporary table to correct the real-time data from the previous day with offline data.
NoteHologres V2.1.17 and later support Serverless Computing. For scenarios such as large-scale offline imports, large ETL jobs, and high-volume foreign table queries, you can use Serverless Computing to run these tasks with additional serverless resources. This approach avoids using your instance resources, which significantly improves instance stability and reduces the probability of out-of-memory (OOM) errors. You are charged only for the resources consumed by the task. For more information, see Serverless Computing. To learn how to use this feature, see Guide to Serverless Computing.
-- Clean up potential temporary tables DROP TABLE IF EXISTS gh_realtime_data_tmp; -- Create a temporary table SET hg_experimental_enable_create_table_like_properties = ON; CALL HG_CREATE_TABLE_LIKE ('gh_realtime_data_tmp', 'select * from gh_realtime_data'); -- (Optional) Use Serverless Computing to perform large-scale offline data import and ETL jobs. SET hg_computing_resource = 'serverless'; -- Insert data into the temporary table and update statistics INSERT INTO gh_realtime_data_tmp SELECT * FROM <foreign_table_name> WHERE ds = current_date - interval '1 day' ON CONFLICT (id, ds) DO NOTHING; ANALYZE gh_realtime_data_tmp; -- Reset the configuration to ensure that non-essential SQL statements do not use Serverless resources. RESET hg_computing_resource; -- Replace the atomic table with the existing temporary child table BEGIN; DROP TABLE IF EXISTS "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data_tmp RENAME TO "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data ATTACH PARTITION "gh_realtime_data_<yesterday_date>" FOR VALUES IN ('<yesterday_date>'); COMMIT;
Data analysis
You can perform a wide range of analyses on the massive amount of data you have collected. Based on the time range your business requires, design your data warehouse in layers. This supports various needs, such as real-time analysis, offline analysis, and integrated real-time and offline analysis.
The following examples analyze the real-time data obtained earlier. You can also analyze data for specific code repositories or developers.
Query the total number of public events for today.
SELECT count(*) FROM gh_realtime_data WHERE created_at >= date_trunc('day', now());The following is a sample result:
count ------ 1006Query the most active projects (with the most events) in the last day.
SELECT repo_name, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' GROUP BY repo_name ORDER BY events DESC LIMIT 5;The following is a sample result:
repo_name events ----------------------------------------+------ leo424y/heysiri.ml 29 arm-on/plan 10 Christoffel-T/fiverr-pat-20230331 9 mate-academy/react_dynamic-list-of-goods 9 openvinotoolkit/openvino 7Query the most active developers (with the most events) in the last day.
SELECT actor_login, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' AND actor_login NOT LIKE '%[bot]' GROUP BY actor_login ORDER BY events DESC LIMIT 5;The following is a sample result:
actor_login events ------------------+------ direwolf-github 13 arm-on 10 sergii-nosachenko 9 Christoffel-T 9 yangwang201911 7Query the ranking of the most popular programming languages in the last hour.
SELECT language, count(*) total FROM gh_realtime_data WHERE created_at > now() - interval '1 hour' AND language IS NOT NULL GROUP BY language ORDER BY total DESC LIMIT 10;The following is a sample result:
language total -----------+---- JavaScript 25 C++ 15 Python 14 TypeScript 13 Java 8 PHP 8Query the ranking of projects by the number of stars received in the last day.
NoteThis example does not account for cases where users unstar a project.
SELECT repo_id, repo_name, COUNT(actor_login) total FROM gh_realtime_data WHERE type = 'WatchEvent' AND created_at > now() - interval '1 day' GROUP BY repo_id, repo_name ORDER BY total DESC LIMIT 10;The following is a sample result:
repo_id repo_name total ---------+----------------------------------+----- 618058471 facebookresearch/segment-anything 4 619959033 nomic-ai/gpt4all 1 97249406 denysdovhan/wtfjs 1 9791525 digininja/DVWA 1 168118422 aylei/interview 1 343520006 joehillen/sysz 1 162279822 agalwood/Motrix 1 577723410 huggingface/swift-coreml-diffusers 1 609539715 e2b-dev/e2b 1 254839429 maniackk/KKCallStack 1Query the daily active users and projects for today.
SELECT uniq (actor_id) actor_num, uniq (repo_id) repo_num FROM gh_realtime_data WHERE created_at > date_trunc('day', now());The following is a sample result:
actor_num repo_num ---------+-------- 743 816