All Products
Search
Document Center

Hologres:Batch and real-time analytics

Last Updated:Mar 26, 2026

This tutorial shows how to build a unified batch and real-time analytics solution using GitHub public event datasets. MaxCompute handles the offline data warehouse with hourly updates, Flink streams live events into Hologres in real time, and Hologres serves as the unified query layer for both pipelines.

By the end of this tutorial, you will have:

  • Set up an offline pipeline that downloads GitHub event data hourly, parses it in MaxCompute, and stores it in a partitioned fact table

  • Set up a real-time pipeline that collects live GitHub events on an Elastic Compute Service (ECS) instance, ingests them into Simple Log Service (SLS), and streams them to Hologres via Flink

  • Corrected real-time data gaps using offline data

  • Run analytics queries across both real-time and historical data in Hologres

Background

Businesses increasingly need both large-scale historical analysis and up-to-the-minute insights. Unified offline and real-time analytics addresses this by processing both data types on a single platform, reducing data transfer costs, improving analytics accuracy, and simplifying data management.

Alibaba Cloud's unified data warehouse solution uses three core engines:

  • MaxCompute: enterprise-grade cloud data warehouse, optimized for large-scale offline batch processing

  • Realtime Compute for Apache Flink: stream-processing engine for real-time data transformation

  • Hologres: real-time analytics database that natively integrates with both MaxCompute and Flink, serving as the unified query layer

Solution architecture

The following diagram shows the end-to-end architecture for this solution.

image

An ECS instance collects GitHub event data for both pipelines. The data then flows into a real-time path and an offline path, both converging in Hologres as the unified service layer.

Real-time pipeline: The ECS instance pulls live events from the GitHub API every minute and writes them as JSON files. SLS collects these files via Logtail. Flink reads from SLS, parses the JSON, and writes records to a partitioned Hologres table — supporting sub-second query response times.

Offline pipeline: The ECS instance downloads hourly event archives from GH Archive and uploads them to Object Storage Service (OSS). MaxCompute reads the raw JSON via an external table, parses it into a fact table, and updates it hourly.

Unified layer: Hologres and MaxCompute are natively integrated at the storage layer, so Hologres can directly accelerate queries on MaxCompute's historical data. This also enables using offline data to correct gaps in the real-time pipeline.

With end-to-end development support, this solution delivers sub-second data responsiveness, full pipeline visibility, minimal architecture components, fewer dependencies, and significantly reduced operational and labor costs.

Key characteristics of this solution:

  • Offline pipeline: hourly data updates, optimized for large-scale batch processing and complex computations, with reduced computing costs

  • Real-time pipeline: sub-second response times, simplified architecture with minimal components

  • Unified storage: Hologres serves both OLAP and key-value queries under a single SQL interface

  • Data correction: offline data can correct real-time pipeline gaps without architectural complexity

Business and data understanding

GitHub records developer activity as typed events — starring a repository, opening a pull request, pushing code, and so on. This solution uses two data sources:

  • GitHub Events API: exposes public events from the past five minutes. See Events.

  • GH Archive: aggregates GitHub public events hourly and makes them available for download. See GH Archive.

This solution covers 15 public event types, excluding deprecated or unrecorded types. For the full list, see GitHub event types.

GitHub data model

GitHub's core entities are Developer, Repository, and Organization.

image

This solution treats Event as a distinct entity for storage and analysis.

image

Raw event structure

Each event is a JSON object. The following is a representative example:

{
    "id": "19541192931",
    "type": "WatchEvent",
    "actor": {
        "id": 23286640,
        "login": "herekeo",
        "display_login": "herekeo",
        "gravatar_id": "",
        "url": "https://api.github.com/users/herekeo",
        "avatar_url": "https://avatars.githubusercontent.com/u/23286640?"
    },
    "repo": {
        "id": 52760178,
        "name": "crazyguitar/pysheeet",
        "url": "https://api.github.com/repos/crazyguitar/pysheeet"
    },
    "payload": {
        "action": "started"
    },
    "public": true,
    "created_at": "2022-01-01T00:03:04Z"
}

Prerequisites

Before you begin, make sure you have the following services activated and resources created.

Services and resources:

Account permissions:

  • An AccessKey ID and AccessKey secret with permissions to write to OSS, read/write MaxCompute tables, write to SLS, and read/write Hologres tables.

  • A GitHub personal access token for continuous data collection from the GitHub Events API. See Creating a personal access token.

Build the offline data warehouse

This pipeline downloads hourly GitHub event archives, uploads them to OSS, and parses them into a partitioned MaxCompute fact table.

Download event data and upload to OSS

The ECS instance downloads compressed JSON archives from GH Archive and uploads them to OSS, organized into hourly partitions. Each partition directory is named hr=%Y-%m-%d-%H, which MaxCompute reads directly.

Historical backfill: To download data from 2012 to 2022 in bulk, run:

wget https://data.gharchive.org/{2012..2022}-{01..12}-{01..31}-{0..23}.json.gz

Hourly scheduled download: Set up a script that runs at the 10th minute of each hour to download the previous hour's data and upload it to OSS.

Create the OSS bucket in the same region as your ECS instance. The example bucket name is githubevents. The example ECS download directory is /opt/hourlydata/gh_data. Install ossutil by running yum install unzip, extracting the package, and moving the binary to /usr/bin/.
  1. Create /opt/hourlydata/download_code.sh:

    cd /opt/hourlydata
    vim download_code.sh
  2. Press i and add the following script:

    d=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%-H')
    h=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%H')
    url=https://data.gharchive.org/${d}.json.gz
    echo ${url}
    
    # Download data to ./gh_data/. You can customize this path.
    wget ${url} -P ./gh_data/
    
    # Switch to the gh_data directory.
    cd gh_data
    
    # Decompress the downloaded data into a JSON file.
    gzip -d ${d}.json
    
    echo ${d}.json
    
    # Switch to the root directory.
    cd /root
    
    # Use ossutil to upload data to OSS.
    # Create a directory named hr=${h} in the githubevents OSS bucket.
    ossutil mkdir oss://githubevents/hr=${h}
    
    # Upload data from /opt/hourlydata/gh_data (you can customize this path) to OSS.
    ossutil cp -r /opt/hourlydata/gh_data oss://githubevents/hr=${h} -u
    echo oss uploaded successfully!
    
    rm -rf /opt/hourlydata/gh_data/${d}.json
    echo ecs deleted!
  3. Press Esc, type :wq, and press Enter to save.

  4. Schedule the script to run at the 10th minute of every hour:

    # Open the crontab editor.
    crontab -e
    
    # Add the following line, then press Esc and type :wq to exit.
    10 * * * * cd /opt/hourlydata && sh download_code.sh > download.log

After setup, the script downloads and decompresses the previous hour's JSON file and uploads it to oss://githubevents in a partition directory named hr=%Y-%m-%d-%H.

Import OSS data into MaxCompute

Run the following commands in the MaxCompute client or an ODPS SQL node in DataWorks. See Connect using the local client (odpscmd) or Develop an ODPS SQL task.

Step 1: Create an external table to map OSS JSON files.

MaxCompute reads raw JSON from OSS via an external table. The table treats each line as a single string column (col), partitioned by hr to match the OSS directory structure.

CREATE EXTERNAL TABLE IF NOT EXISTS githubevents
(
    col  STRING
)
PARTITIONED BY
(
    hr   STRING
)
STORED AS textfile
LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/githubevents/'
;

For more information, see ORC external tables.

Step 2: Create a fact table to store parsed event data.

The fact table dwd_github_events_odps stores all 30 parsed fields, partitioned by date (ds):

CREATE TABLE IF NOT EXISTS dwd_github_events_odps
(
    id                     BIGINT COMMENT 'Event ID'
    ,actor_id              BIGINT COMMENT 'Actor ID'
    ,actor_login           STRING COMMENT 'Actor login name'
    ,repo_id               BIGINT COMMENT 'Repository ID'
    ,repo_name             STRING COMMENT 'Full repository name: owner/repository_name'
    ,org_id                BIGINT COMMENT 'Organization ID'
    ,org_login             STRING COMMENT 'Organization name'
    ,`type`                STRING COMMENT 'Event type'
    ,created_at            DATETIME COMMENT 'Event occurrence time'
    ,action                STRING COMMENT 'Event action'
    ,iss_or_pr_id          BIGINT COMMENT 'Issue or pull request ID'
    ,number                BIGINT COMMENT 'Issue or pull request number'
    ,comment_id            BIGINT COMMENT 'Comment ID'
    ,commit_id             STRING COMMENT 'Commit ID'
    ,member_id             BIGINT COMMENT 'Member ID'
    ,rev_or_push_or_rel_id BIGINT COMMENT 'Review, push, or release ID'
    ,ref                   STRING COMMENT 'Name of created or deleted resource'
    ,ref_type              STRING COMMENT 'Type of created or deleted resource'
    ,state                 STRING COMMENT 'State of issue, pull request, or pull request review'
    ,author_association    STRING COMMENT 'Relationship between actor and repository'
    ,language              STRING COMMENT 'Programming language of merged code'
    ,merged                BOOLEAN COMMENT 'Whether the merge was accepted'
    ,merged_at             DATETIME COMMENT 'Code merge time'
    ,additions             BIGINT COMMENT 'Number of lines added'
    ,deletions             BIGINT COMMENT 'Number of lines deleted'
    ,changed_files         BIGINT COMMENT 'Number of files changed in pull request'
    ,push_size             BIGINT COMMENT 'Number of commits'
    ,push_distinct_size    BIGINT COMMENT 'Number of distinct commits'
    ,hr                    STRING COMMENT 'Hour of event occurrence (e.g., 00:23 → hr=00)'
    ,`month`               STRING COMMENT 'Month of event occurrence (e.g., October 2015 → month=2015-10)'
    ,`year`                STRING COMMENT 'Year of event occurrence (e.g., 2015 → year=2015)'
)
PARTITIONED BY
(
    ds                     STRING COMMENT 'Date of event occurrence (ds=yyyy-mm-dd)'
);

Step 3: Parse JSON and load into the fact table.

This step discovers new OSS partitions and inserts parsed data for the most recent hour. The GET_JSON_OBJECT function extracts each field from the raw JSON string.

msck repair table githubevents add partitions;

set odps.sql.hive.compatible = true;
set odps.sql.split.hive.bridge = true;
INSERT into TABLE dwd_github_events_odps PARTITION(ds)
SELECT  CAST(GET_JSON_OBJECT(col,'$.id')  AS BIGINT ) AS id
        ,CAST(GET_JSON_OBJECT(col,'$.actor.id')AS BIGINT) AS actor_id
        ,GET_JSON_OBJECT(col,'$.actor.login') AS actor_login
        ,CAST(GET_JSON_OBJECT(col,'$.repo.id')AS BIGINT) AS repo_id
        ,GET_JSON_OBJECT(col,'$.repo.name') AS repo_name
        ,CAST(GET_JSON_OBJECT(col,'$.org.id')AS BIGINT) AS org_id
        ,GET_JSON_OBJECT(col,'$.org.login') AS org_login
        ,GET_JSON_OBJECT(col,'$.type') as type
        ,to_date(GET_JSON_OBJECT(col,'$.created_at'), 'yyyy-mm-ddThh:mi:ssZ') AS created_at
        ,GET_JSON_OBJECT(col,'$.payload.action') AS action
        ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.id')AS BIGINT)
                 WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.id')AS BIGINT)
         END AS iss_or_pr_id
        ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.number')AS BIGINT)
                 WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.number')AS BIGINT)
                 ELSE CAST(GET_JSON_OBJECT(col,'$.payload.number')AS BIGINT)
         END AS number
        ,CAST(GET_JSON_OBJECT(col,'$.payload.comment.id')AS BIGINT) AS comment_id
        ,GET_JSON_OBJECT(col,'$.payload.comment.commit_id') AS commit_id
        ,CAST(GET_JSON_OBJECT(col,'$.payload.member.id')AS BIGINT) AS member_id
        ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.review.id')AS BIGINT)
                 WHEN GET_JSON_OBJECT(col,'$.type')="PushEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.push_id')AS BIGINT)
                 WHEN GET_JSON_OBJECT(col,'$.type')="ReleaseEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.release.id')AS BIGINT)
         END AS rev_or_push_or_rel_id
        ,GET_JSON_OBJECT(col,'$.payload.ref') AS ref
        ,GET_JSON_OBJECT(col,'$.payload.ref_type') AS ref_type
        ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.state')
                 WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.state')
                 WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.state')
         END AS state
        ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.author_association')
                 WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.author_association')
                 WHEN GET_JSON_OBJECT(col,'$.type')="IssueCommentEvent" THEN GET_JSON_OBJECT(col,'$.payload.comment.author_association')
                 WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.author_association')
         END AS author_association
        ,GET_JSON_OBJECT(col,'$.payload.pull_request.base.repo.language') AS language
        ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.merged') AS BOOLEAN) AS merged
        ,to_date(GET_JSON_OBJECT(col,'$.payload.pull_request.merged_at'), 'yyyy-mm-ddThh:mi:ssZ') AS merged_at
        ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.additions')AS BIGINT) AS additions
        ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.deletions')AS BIGINT)  AS deletions
        ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.changed_files')AS BIGINT) AS changed_files
        ,CAST(GET_JSON_OBJECT(col,'$.payload.size')AS BIGINT)  AS push_size
        ,CAST(GET_JSON_OBJECT(col,'$.payload.distinct_size')AS BIGINT)   AS push_distinct_size
        ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),12,2) as hr
        ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,7),'-','-') as month
        ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,4) as year
        ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,10),'-','-') as ds
from githubevents
where hr = cast(to_char(dateadd(getdate(),-9,'hh'), 'yyyy-mm-dd-hh') as string);

Step 4: Verify the data.

SET odps.sql.allow.fullscan=true;
SELECT * FROM dwd_github_events_odps where ds = '2023-03-31' limit 10;

Sample output:

image

Build the real-time data warehouse

This pipeline collects live GitHub events every minute, ingests them into SLS, and streams them to Hologres via Flink.

Collect real-time data using ECS

The GitHub Events API returns public events from the past five minutes. The following Python script polls the API continuously for one minute per run, paginating through all available events and appending each event as a JSON line to a timestamped output file.

Each script run collects events available during that one-minute window and does not guarantee capturing every event. Continuous collection requires an Accept header and a GitHub personal access token in the Authorization header.
  1. Create /opt/realtime/download_realtime_data.py:

    cd /opt/realtime
    vim download_realtime_data.py
  2. Press i and add the following script:

    #!python
    
    import requests
    import json
    import sys
    import time
    
    # Get the next page URL from the response headers
    def get_next_link(resp):
        resp_link = resp.headers['link']
        link = ''
        for l in resp_link.split(', '):
            link = l.split('; ')[0][1:-1]
            rel = l.split('; ')[1]
            if rel == 'rel="next"':
                return link
        return None
    
    # Download one page of data from the API
    def download(link, fname):
    # Define GitHub API Accept and Authorization headers
        headers = {"Accept": "application/vnd.github+json","Authorization": "<Bearer> <github_api_token>"}
        resp = requests.get(link, headers=headers)
    
        if int(resp.status_code) != 200:
            return None
    
        with open(fname, 'a') as f:
            for j in resp.json():
                f.write(json.dumps(j))
                f.write('\n')
    
        print('downloaded {} events to {}'.format(len(resp.json()), fname))
        return resp
    
    # Download multiple pages of data from the API
    def download_all_data(fname):
        link = 'https://api.github.com/events?per_page=100&page=1'
        while True:
            resp = download(link, fname)
            if resp is None:
                break
            link = get_next_link(resp)
            if link is None:
                break
    
    # Get current timestamp in milliseconds
    def get_current_ms():
        return round(time.time()*1000)
    
    # Run the script for exactly 1 minute
    def main(fname):
        current_ms = get_current_ms()
        while get_current_ms() - current_ms < 60*1000:
            download_all_data(fname)
            time.sleep(0.1)
    
    # Execute the script
    if __name__ == '__main__':
        if len(sys.argv) < 2:
            print('usage: python {} <log_file>'.format(sys.argv[0]))
            exit(0)
        main(sys.argv[1])
  3. Press Esc, type :wq, and press Enter to save.

  4. Create run_py.sh to run the script each minute with a timestamped output file:

    python /opt/realtime/download_realtime_data.py /opt/realtime/gh_realtime_data/$(date '+%Y-%m-%d-%H:%M:%S').json
  5. Create delete_log.sh to delete files older than two days:

    d=$(TZ=UTC date --date='2 day ago' '+%Y-%m-%d')
    rm -f /opt/realtime/gh_realtime_data/*${d}*.json
  6. Schedule both scripts via cron:

    # Open the crontab editor.
    crontab -e
    
    # Add the following lines, then press Esc and type :wq to exit.
    * * * * * bash /opt/realtime/run_py.sh
    1 1 * * * bash /opt/realtime/delete_log.sh

Collect ECS data using SLS

SLS collects the JSON files written by the ECS script using Logtail in JSON mode. Logtail parses top-level key-value pairs from each JSON event and ingests them incrementally as log entries.

Configure the Logtail log path as /opt/realtime/gh_realtime_data/**/*.json. For setup details, see Collect logs using JSON mode.

After configuration, SLS continuously ingests new event data from the ECS instance. Sample collected data:

image

Stream SLS data to Hologres using Flink

Flink reads events from the SLS source table, parses the nested JSON fields, and writes records to a partitioned Hologres table. For the SLS-to-Hologres integration overview, see Import from SLS.

Step 1: Create the Hologres internal table.

The table uses (id, ds) as the primary key, id as the distribution key, ds as the list partition key, and created_at as the event time column. Add additional indexes based on your query patterns.

DROP TABLE IF EXISTS gh_realtime_data;

BEGIN;

CREATE TABLE gh_realtime_data (
    id bigint,
    actor_id bigint,
    actor_login text,
    repo_id bigint,
    repo_name text,
    org_id bigint,
    org_login text,
    type text,
    created_at timestamp with time zone NOT NULL,
    action text,
    iss_or_pr_id bigint,
    number bigint,
    comment_id bigint,
    commit_id text,
    member_id bigint,
    rev_or_push_or_rel_id bigint,
    ref text,
    ref_type text,
    state text,
    author_association text,
    language text,
    merged boolean,
    merged_at timestamp with time zone,
    additions bigint,
    deletions bigint,
    changed_files bigint,
    push_size bigint,
    push_distinct_size bigint,
    hr text,
    month text,
    year text,
    ds text,
    PRIMARY KEY (id,ds)
)
PARTITION BY LIST (ds);
CALL set_table_property('public.gh_realtime_data', 'distribution_key', 'id');
CALL set_table_property('public.gh_realtime_data', 'event_time_column', 'created_at');
CALL set_table_property('public.gh_realtime_data', 'clustering_key', 'created_at');

COMMENT ON COLUMN public.gh_realtime_data.id IS 'Event ID';
COMMENT ON COLUMN public.gh_realtime_data.actor_id IS 'Actor ID';
COMMENT ON COLUMN public.gh_realtime_data.actor_login IS 'Actor login name';
COMMENT ON COLUMN public.gh_realtime_data.repo_id IS 'Repository ID';
COMMENT ON COLUMN public.gh_realtime_data.repo_name IS 'Repository name';
COMMENT ON COLUMN public.gh_realtime_data.org_id IS 'Organization ID';
COMMENT ON COLUMN public.gh_realtime_data.org_login IS 'Organization name';
COMMENT ON COLUMN public.gh_realtime_data.type IS 'Event type';
COMMENT ON COLUMN public.gh_realtime_data.created_at IS 'Event occurrence time';
COMMENT ON COLUMN public.gh_realtime_data.action IS 'Event action';
COMMENT ON COLUMN public.gh_realtime_data.iss_or_pr_id IS 'Issue or pull request ID';
COMMENT ON COLUMN public.gh_realtime_data.number IS 'Issue or pull request number';
COMMENT ON COLUMN public.gh_realtime_data.comment_id IS 'Comment ID';
COMMENT ON COLUMN public.gh_realtime_data.commit_id IS 'Commit ID';
COMMENT ON COLUMN public.gh_realtime_data.member_id IS 'Member ID';
COMMENT ON COLUMN public.gh_realtime_data.rev_or_push_or_rel_id IS 'Review, push, or release ID';
COMMENT ON COLUMN public.gh_realtime_data.ref IS 'Name of created or deleted resource';
COMMENT ON COLUMN public.gh_realtime_data.ref_type IS 'Type of created or deleted resource';
COMMENT ON COLUMN public.gh_realtime_data.state IS 'State of issue, pull request, or pull request review';
COMMENT ON COLUMN public.gh_realtime_data.author_association IS 'Relationship between actor and repository';
COMMENT ON COLUMN public.gh_realtime_data.language IS 'Programming language';
COMMENT ON COLUMN public.gh_realtime_data.merged IS 'Whether the merge was accepted';
COMMENT ON COLUMN public.gh_realtime_data.merged_at IS 'Code merge time';
COMMENT ON COLUMN public.gh_realtime_data.additions IS 'Number of lines added';
COMMENT ON COLUMN public.gh_realtime_data.deletions IS 'Number of lines deleted';
COMMENT ON COLUMN public.gh_realtime_data.changed_files IS 'Number of files changed in pull request';
COMMENT ON COLUMN public.gh_realtime_data.push_size IS 'Number of commits';
COMMENT ON COLUMN public.gh_realtime_data.push_distinct_size IS 'Number of distinct commits';
COMMENT ON COLUMN public.gh_realtime_data.hr IS 'Hour of event occurrence (e.g., 00:23 → hr=00)';
COMMENT ON COLUMN public.gh_realtime_data.month IS 'Month of event occurrence (e.g., October 2015 → month=2015-10)';
COMMENT ON COLUMN public.gh_realtime_data.year IS 'Year of event occurrence (e.g., 2015 → year=2015)';
COMMENT ON COLUMN public.gh_realtime_data.ds IS 'Date of event occurrence (ds=yyyy-mm-dd)';

COMMIT;

For CREATE TABLE options and index configuration, see CREATE TABLE.

Step 2: Write data in real time using Flink.

The Flink job defines an SLS source table and a Hologres sink table, then transforms and inserts each event. The WHERE clause filters out records with a null id or created_at, and drops events older than one day to keep the Hologres partition manageable.

GitHub event timestamps are in UTC, but Hologres defaults to UTC+8. To align the two, assign the UTC time zone attribute to the source data in Flink SQL and set table.local-time-zone:Asia/Shanghai in the Flink Configuration section on the Job Startup Configuration page.
CREATE TEMPORARY TABLE sls_input (
  actor varchar,
  created_at varchar,
  id bigint,
  org varchar,
  payload varchar,
  public varchar,
  repo varchar,
  type varchar
  )
WITH (
    'connector' = 'sls',
    'endpoint' = '<endpoint>',--The private endpoint of SLS
    'accessid' = '<accesskey id>',--The AccessKey ID of your account
    'accesskey' = '<accesskey secret>',--The AccessKey secret of your account
    'project' = '<project name>',--The name of the SLS project
    'logstore' = '<logstore name>',--The name of the SLS Logstore
    'starttime' = '2023-04-06 00:00:00'--The start time for data ingestion from SLS
);

CREATE TEMPORARY TABLE hologres_sink (
    id bigint,
    actor_id bigint,
    actor_login string,
    repo_id bigint,
    repo_name string,
    org_id bigint,
    org_login string,
    type string,
    created_at timestamp,
    action string,
    iss_or_pr_id bigint,
    number bigint,
    comment_id bigint,
    commit_id string,
    member_id bigint,
    rev_or_push_or_rel_id bigint,
    `ref` string,
    ref_type string,
    state string,
    author_association string,
    `language` string,
    merged boolean,
    merged_at timestamp,
    additions bigint,
    deletions bigint,
    changed_files bigint,
    push_size bigint,
    push_distinct_size bigint,
    hr string,
    `month` string,
    `year` string,
    ds string
    )
WITH (
    'connector' = 'hologres',
    'dbname' = '<hologres dbname>', --The name of the Hologres database
    'tablename' = '<hologres tablename>', --The name of the destination table in Hologres
    'username' = '<accesskey id>', --The AccessKey ID of your Alibaba Cloud account
    'password' = '<accesskey secret>', --The AccessKey secret of your Alibaba Cloud account
    'endpoint' = '<endpoint>', --The VPC endpoint of the Hologres instance
    'jdbcretrycount' = '1', --The number of retries upon a connection failure
    'partitionrouter' = 'true', --Specifies whether to write data to a partitioned table
    'createparttable' = 'true', --Specifies whether to automatically create partitions
    'mutatetype' = 'insertorignore' --The data write mode
);

INSERT INTO hologres_sink
SELECT id
        ,CAST(JSON_VALUE(actor, '$.id') AS bigint) AS actor_id
        ,JSON_VALUE(actor, '$.login') AS actor_login
        ,CAST(JSON_VALUE(repo, '$.id') AS bigint) AS repo_id
        ,JSON_VALUE(repo, '$.name') AS repo_name
        ,CAST(JSON_VALUE(org, '$.id') AS bigint) AS org_id
        ,JSON_VALUE(org, '$.login') AS org_login
        ,type
        ,TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS created_at
        ,JSON_VALUE(payload, '$.action') AS action
        ,CASE    WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.id') AS bigint)
                 WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.id') AS bigint)
         END AS iss_or_pr_id
        ,CASE    WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.number') AS bigint)
                 WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.number') AS bigint)
                 ELSE CAST(JSON_VALUE(payload, '$.number') AS bigint)
         END AS number
        ,CAST(JSON_VALUE(payload, '$.comment.id') AS bigint) AS comment_id
        ,JSON_VALUE(payload, '$.comment.commit_id') AS commit_id
        ,CAST(JSON_VALUE(payload, '$.member.id') AS bigint) AS member_id
        ,CASE    WHEN type='PullRequestReviewEvent' THEN CAST(JSON_VALUE(payload, '$.review.id') AS bigint)
                 WHEN type='PushEvent' THEN CAST(JSON_VALUE(payload, '$.push_id') AS bigint)
                 WHEN type='ReleaseEvent' THEN CAST(JSON_VALUE(payload, '$.release.id') AS bigint)
         END AS rev_or_push_or_rel_id
        ,JSON_VALUE(payload, '$.ref') AS `ref`
        ,JSON_VALUE(payload, '$.ref_type') AS ref_type
        ,CASE    WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.state')
                 WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.state')
                 WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.state')
         END AS state
        ,CASE    WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.author_association')
                 WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.author_association')
                 WHEN type='IssueCommentEvent' THEN JSON_VALUE(payload, '$.comment.author_association')
                 WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.author_association')
         END AS author_association
        ,JSON_VALUE(payload, '$.pull_request.base.repo.language') AS `language`
        ,CAST(JSON_VALUE(payload, '$.pull_request.merged') AS boolean) AS merged
        ,TO_TIMESTAMP_TZ(replace(JSON_VALUE(payload, '$.pull_request.merged_at'),'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS merged_at
        ,CAST(JSON_VALUE(payload, '$.pull_request.additions') AS bigint) AS additions
        ,CAST(JSON_VALUE(payload, '$.pull_request.deletions') AS bigint) AS deletions
        ,CAST(JSON_VALUE(payload, '$.pull_request.changed_files') AS bigint) AS changed_files
        ,CAST(JSON_VALUE(payload, '$.size') AS bigint) AS push_size
        ,CAST(JSON_VALUE(payload, '$.distinct_size') AS bigint) AS push_distinct_size
        ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),12,2) as hr
        ,REPLACE(SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,7),'/','-') as `month`
        ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,4) as `year`
        ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,10) as ds
FROM
        sls_input
WHERE
        id IS NOT NULL
        AND created_at IS NOT NULL
        AND to_date(replace(created_at,'T',' ')) >= date_add(CURRENT_DATE, -1);

For connector parameter details, see SLS source table and Hologres sink table.

Step 3: Verify the data.

SELECT * FROM public.gh_realtime_data limit 10;

Sample output:

image

Correct real-time data using offline data

Because the real-time pipeline polls an API that exposes only the past five minutes of events, some events may be missed. You can backfill any gaps by replacing the previous day's Hologres partition with data from the MaxCompute fact table.

The correction uses an atomic partition-swap pattern: write corrected data into a temporary table, then atomically replace the live partition in a transaction. This avoids interrupting in-flight queries and eliminates any window where the partition is absent.

Hologres Serverless Computing (available from version 2.1.17) uses extra serverless resources for large-scale offline imports and ETL jobs instead of consuming your instance's capacity. This reduces the risk of out-of-memory (OOM) errors and applies pay-per-task billing. See Serverless Computing overview and Serverless Computing user guide.
  1. Create a foreign table in Hologres to access MaxCompute data. See IMPORT FOREIGN SCHEMA for parameter details.

    IMPORT FOREIGN SCHEMA <maxcompute_project_name> LIMIT to
    (
        <foreign_table_name>
    )
    FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'update',if_unsupported_type 'error');
  2. Run the following SQL to backfill the previous day's partition:

    -- Clean up any existing temporary table
    DROP TABLE IF EXISTS gh_realtime_data_tmp;
    
    -- Create a temporary table with the same schema and properties as the live table
    SET hg_experimental_enable_create_table_like_properties = ON;
    CALL HG_CREATE_TABLE_LIKE ('gh_realtime_data_tmp', 'select * from gh_realtime_data');
    
    -- (Optional) Use Serverless Computing for large-scale offline imports and ETL
    SET hg_computing_resource = 'serverless';
    
    -- Insert offline data; skip rows that already exist in the real-time table
    INSERT INTO gh_realtime_data_tmp
    SELECT
        *
    FROM
        <foreign_table_name>
    WHERE
        ds = current_date - interval '1 day'
    ON CONFLICT (id, ds)
        DO NOTHING;
    ANALYZE gh_realtime_data_tmp;
    
    -- Reset configuration to avoid unnecessary use of serverless resources
    RESET hg_computing_resource;
    
    -- Atomically replace the partition with the corrected data
    BEGIN;
    DROP TABLE IF EXISTS "gh_realtime_data_<yesterday_date>";
    ALTER TABLE gh_realtime_data_tmp RENAME TO "gh_realtime_data_<yesterday_date>";
    ALTER TABLE gh_realtime_data ATTACH PARTITION "gh_realtime_data_<yesterday_date>" FOR VALUES IN ('<yesterday_date>');
    COMMIT;

Data analytics

With both pipelines running, you can query real-time and historical data directly in Hologres using standard SQL. The following examples show analytics queries on the gh_realtime_data table.

Total public events today:

SELECT
    count(*)
FROM
    gh_realtime_data
WHERE
    created_at >= date_trunc('day', now());

Sample output:

count
------
1006

Top 5 most active repositories in the past day:

SELECT
    repo_name,
    COUNT(*) AS events
FROM
    gh_realtime_data
WHERE
    created_at >= now() - interval '1 day'
GROUP BY
    repo_name
ORDER BY
    events DESC
LIMIT 5;

Sample output:

repo_name	                               events
----------------------------------------+------
leo424y/heysiri.ml	                      29
arm-on/plan	                              10
Christoffel-T/fiverr-pat-20230331	        9
mate-academy/react_dynamic-list-of-goods	9
openvinotoolkit/openvino	                7

Top 5 most active developers in the past day (bot accounts excluded):

SELECT
    actor_login,
    COUNT(*) AS events
FROM
    gh_realtime_data
WHERE
    created_at >= now() - interval '1 day'
    AND actor_login NOT LIKE '%[bot]'
GROUP BY
    actor_login
ORDER BY
    events DESC
LIMIT 5;

Sample output:

actor_login	       events
------------------+------
direwolf-github	    13
arm-on	            10
sergii-nosachenko	  9
Christoffel-T	      9
yangwang201911	    7

Top programming languages used in the past hour:

SELECT
    language,
    count(*) total
FROM
    gh_realtime_data
WHERE
    created_at > now() - interval '1 hour'
    AND language IS NOT NULL
GROUP BY
    language
ORDER BY
    total DESC
LIMIT 10;

Sample output:

language	  total
-----------+----
JavaScript	25
C++	        15
Python	    14
TypeScript	13
Java	      8
PHP	        8

Repositories ranked by stars in the past day:

This query uses WatchEvent as a proxy for stars and does not account for users who remove stars.
SELECT
    repo_id,
    repo_name,
    COUNT(actor_login) total
FROM
    gh_realtime_data
WHERE
    type = 'WatchEvent'
    AND created_at > now() - interval '1 day'
GROUP BY
    repo_id,
    repo_name
ORDER BY
    total DESC
LIMIT 10;

Sample output:

repo_id	   repo_name	                       total
---------+----------------------------------+-----
618058471	facebookresearch/segment-anything	 4
619959033	nomic-ai/gpt4all	                 1
97249406	denysdovhan/wtfjs	                 1
9791525	  digininja/DVWA	                   1
168118422	aylei/interview	                   1
343520006	joehillen/sysz	                   1
162279822	agalwood/Motrix	                   1
577723410	huggingface/swift-coreml-diffusers 1
609539715	e2b-dev/e2b	                       1
254839429	maniackk/KKCallStack	             1

Daily active users and repositories:

SELECT
    uniq (actor_id) actor_num,
    uniq (repo_id) repo_num
FROM
    gh_realtime_data
WHERE
    created_at > date_trunc('day', now());

Sample output:

actor_num	repo_num
---------+--------
743	      816

What's next

  • Add more data warehouse layers (DWD, DWS, ADS) on top of the fact table to support aggregation, reporting, and dashboards.

  • Query MaxCompute historical data directly through Hologres foreign tables to enrich real-time analytics with years of historical context.

  • Extend analytics to specific repositories or developer profiles using the repo_name and actor_login fields.

  • For event type definitions and payload schemas, see Webhook events and payloads.