All Products
Search
Document Center

Hologres:Implementing unified batch and real-time analytics using the public GitHub events dataset

Last Updated:May 16, 2026

This tutorial shows how to build a unified batch and real-time analytics solution using GitHub event data. You will use MaxCompute to build a batch data warehouse, and Realtime Compute for Apache Flink and Hologres to build a real-time data warehouse. Then, you will use Hologres and MaxCompute to perform real-time and batch data analysis.

Background

As businesses become more digital, the demand for fresher data is growing. In addition to traditional batch processing for large-scale data, many businesses need to handle real-time data processing, storage, and analytics. Unified batch and real-time analytics was developed to meet this need.

Unified batch and real-time analytics manages and processes both real-time and batch data on a single platform. It enables a seamless connection between real-time processing and batch analytics, which improves the efficiency and accuracy of data analysis. The key benefits include:

  • Improved data processing efficiency: Integrating real-time and batch data on a single platform significantly improves processing efficiency and reduces data transfer and conversion costs.

  • Improved analytics accuracy: Blending real-time and batch data for analysis improves the precision and accuracy of your results.

  • This approach simplifies data management and processing, making these tasks more efficient.

  • You can fully leverage your data's value to provide better business decision support.

Alibaba Cloud offers a simplified, unified data warehouse solution for both batch and real-time scenarios. This solution uses MaxCompute for batch processing and Hologres for real-time analytics. Paired with the real-time processing capabilities of Realtime Compute for Apache Flink, these services form the core engine of Alibaba Cloud's unified data warehouse.

Solution architecture

The following diagram shows the complete pipeline for implementing unified batch and real-time analytics on the GitHub public events dataset using MaxCompute and Hologres.

image

In this architecture, an ECS instance collects and aggregates real-time and batch event data from GitHub, which then serves as the data source. This data is fed into a real-time pipeline and a batch pipeline. The data from both pipelines is then consolidated in Hologres, which provides a unified service layer.

  • Real-time pipeline: Realtime Compute for Apache Flink processes data from Simple Log Service (SLS) in real time and writes it to Hologres. Flink is a powerful stream processing engine. Hologres supports real-time data writes and updates, with data queryable immediately after it is written. Their native integration supports high-throughput, low-latency, model-driven, and high-quality real-time data warehouse development. This meets the needs of real-time business insights, such as extracting the latest events and analyzing trending events.

  • Batch pipeline: MaxCompute processes and archives massive amounts of batch data. Object Storage Service (OSS) is an Alibaba Cloud service for storing various data types. The raw data used in this tutorial is in JSON format, and OSS provides convenient, secure, low-cost, and reliable storage. MaxCompute is an enterprise-grade SaaS cloud data warehouse designed for data analytics. It can directly read and parse semi-structured data in OSS by using an external table, integrate high-value data into its internal storage, and then integrate with DataWorks to create a batch data warehouse.

  • Hologres is seamlessly integrated with MaxCompute at the storage layer. This allows you to use Hologres to accelerate queries and analysis on massive volumes of historical data in MaxCompute, meeting business needs for low-frequency, high-performance queries on historical data. You can also easily use the batch pipeline to correct real-time data, to resolve issues like data omissions that can occur in the real-time pipeline.

This solution offers the following advantages:

  • Stable and efficient batch pipeline: Supports hourly data writes and updates, can process large-scale data in batches, perform complex calculations and analysis, reduce computing costs, and improve data processing efficiency.

  • Mature real-time pipeline: Supports real-time ingestion, real-time event computation, and real-time analytics. The simplified real-time pipeline delivers responses within seconds.

  • Unified storage and service: Hologres provides a unified service layer with centralized data storage and a consistent external interface (a single SQL interface for both OLAP and Key-Value queries).

  • Unified batch and real-time analytics: Reduces data redundancy and movement, and allows for data correction.

This one-stop development approach ultimately achieves second-level data response, end-to-end status visibility, a simplified architecture with fewer components and dependencies, and effective reductions in both O&M and labor costs.

Understanding the business and data

Developers create many events when they work on open source projects on GitHub. GitHub records details for each event. These details include the event type, the developer, and the code repository. GitHub makes public events available, such as starring a repository or committing code. For a complete list of event types, see Webhook events and payloads.

  • GitHub provides public events through an OpenAPI. The API offers real-time data with a five-minute delay. For more information, see Events.

  • The GH Archive project collects and provides hourly archives of GitHub public events. Use these archives to get offline data. For more information, see GH Archive.

Understanding the GitHub business

GitHub's core business is managing code and interactions. It involves three main entities: Developer, Repository, and Organization.image

For this data analysis, an Event is also stored and recorded as an entity.

image

Understanding raw public event data

The following example shows the JSON data for a raw event:

{
    "id": "19541192931",
    "type": "WatchEvent",
    "actor":
    {
        "id": 23286640,
        "login": "herekeo",
        "display_login": "herekeo",
        "gravatar_id": "",
        "url": "https://api.github.com/users/herekeo",
        "avatar_url": "https://avatars.githubusercontent.com/u/23286640?"
    },
    "repo":
    {
        "id": 52760178,
        "name": "crazyguitar/pysheeet",
        "url": "https://api.github.com/repos/crazyguitar/pysheeet"
    },
    "payload":
    {
        "action": "started"
    },
    "public": true,
    "created_at": "2022-01-01T00:03:04Z"
}

This analysis covers 15 types of public events. It does not include events that never happened or are no longer recorded. For details about these event types, see Github public event types.

Prerequisites

Build an offline data warehouse (hourly updates)

Download raw data files using an ECS instance and upload them to OSS

Use an Elastic Compute Service (ECS) instance to download JSON data files from GH Archive.

  • Download historical data using the wget command. For example, run wget https://data.gharchive.org/{2012..2022}-{01..12}-{01..31}-{0..23}.json.gz to download hourly data from 2012 to 2022.

  • To download new data generated each hour, set up a scheduled hourly task as follows.

    Note
    • Make sure that ossutil is installed on the ECS instance. For more information, see Install ossutil. Download the ossutil installation package and upload it to the ECS instance. Run yum install unzip to install the unzip software. Then, decompress the ossutil package and move the executable file to the /usr/bin/ directory.

    • Make sure that you have created an Object Storage Service (OSS) bucket in the same region as your ECS instance. You can use a custom bucket name. This example uses the bucket name githubevents.

    • In this example, the files are downloaded to the /opt/hourlydata/gh_data directory on the ECS instance. You can use a different directory.

    1. Run the following command to create a file named download_code.sh in the /opt/hourlydata directory.

      cd /opt/hourlydata
      vim download_code.sh
    2. Press i to enter edit mode and add the following script.

      d=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%-H')
      h=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%H')
      url=https://data.gharchive.org/${d}.json.gz
      echo ${url}
      
      # Download the data to the ./gh_data/ directory. You can use a different directory.
      wget ${url} -P ./gh_data/
      
      # Change to the gh_data directory.
      cd gh_data
      
      # Decompress the downloaded data into a JSON file.
      gzip -d ${d}.json
      
      echo ${d}.json
      
      # Change to the root directory.
      cd /root
      
      # Use ossutil to upload the data to OSS.
      # Create the hr=${h} directory in the githubevents OSS bucket.
      ossutil mkdir oss://githubevents/hr=${h}
      
      # Upload the data from the /opt/hourlydata/gh_data directory to OSS. You can use a different directory.
      ossutil cp -r /opt/hourlydata/gh_data oss://githubevents/hr=${h} -u
      echo oss uploaded successfully!
      
      rm -rf /opt/hourlydata/gh_data/${d}.json
      echo ecs deleted!
    3. Press the Esc key, enter :wq, and press Enter to save and close the file.

    4. Run the following command to execute the download_code.sh script at 10 minutes past every hour.

      # 1. Run the following command and press I to enter edit mode.
      crontab -e
      
      # 2. Add the following command. Then, press Esc, enter :wq, and press Enter to exit.
      10 * * * * cd /opt/hourlydata && sh download_code.sh > download.log

      After the script runs, the JSON file from the previous hour is downloaded at 10 minutes past every hour. The file is then decompressed on the ECS instance and uploaded to OSS at the path oss://githubevents. To read only the file from the previous hour, a directory named 'hr=%Y-%M-%D-%H' is created as a partition for each file during upload. This ensures that subsequent data write operations read files only from the latest partition.

Import OSS data into MaxCompute using an external table

Run the following commands in the MaxCompute client or an ODPS SQL node in DataWorks. For more information, see Connect to MaxCompute by using the client (odpscmd) or Develop an ODPS SQL task.

  1. Create the external table githubevents to read the JSON files stored in OSS:

    CREATE EXTERNAL TABLE IF NOT EXISTS githubevents
    (
        col  STRING
    )
    PARTITIONED BY 
    (
        hr   STRING
    )
    STORED AS textfile
    LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/githubevents/'
    ;

    For more information about creating external tables to access OSS data in MaxCompute, see Access unstructured data in OSS.

  2. Create the fact table dwd_github_events_odps to store the data. The following code shows the Data Definition Language (DDL) statement:

    CREATE TABLE IF NOT EXISTS dwd_github_events_odps
    (
        id                     BIGINT COMMENT 'Event ID'
        ,actor_id              BIGINT COMMENT 'ID of the event initiator'
        ,actor_login           STRING COMMENT 'Logon name of the event initiator'
        ,repo_id               BIGINT COMMENT 'Repository ID'
        ,repo_name             STRING COMMENT 'Full name of the repository in the format of owner/repository_name'
        ,org_id                BIGINT COMMENT 'ID of the organization to which the repository belongs'
        ,org_login             STRING COMMENT 'Name of the organization to which the repository belongs'
        ,`type`                STRING COMMENT 'Event type'
        ,created_at            DATETIME COMMENT 'Time when the event occurred'
        ,action                STRING COMMENT 'Event action'
        ,iss_or_pr_id          BIGINT COMMENT 'ID of the issue or pull request'
        ,number                BIGINT COMMENT 'Number of the issue or pull request'
        ,comment_id            BIGINT COMMENT 'Comment ID'
        ,commit_id             STRING COMMENT 'Commit ID'
        ,member_id             BIGINT COMMENT 'Member ID'
        ,rev_or_push_or_rel_id BIGINT COMMENT 'ID of the review, push, or release'
        ,ref                   STRING COMMENT 'Name of the created or deleted resource'
        ,ref_type              STRING COMMENT 'Type of the created or deleted resource'
        ,state                 STRING COMMENT 'Status of the issue, pull request, or pull request review'
        ,author_association    STRING COMMENT 'Relationship between the actor and the repository'
        ,language              STRING COMMENT 'Language of the code in the pull request'
        ,merged                BOOLEAN COMMENT 'Indicates whether the pull request was merged'
        ,merged_at             DATETIME COMMENT 'Time when the code was merged'
        ,additions             BIGINT COMMENT 'Number of added lines of code'
        ,deletions             BIGINT COMMENT 'Number of deleted lines of code'
        ,changed_files         BIGINT COMMENT 'Number of files changed in the pull request'
        ,push_size             BIGINT COMMENT 'Number of commits'
        ,push_distinct_size    BIGINT COMMENT 'Number of distinct commits'
        ,hr                    STRING COMMENT 'Hour when the event occurred. For example, if the event occurred at 00:23, the value of hr is 00.'
        ,`month`               STRING COMMENT 'Month when the event occurred. For example, if the event occurred in October 2015, the value of month is 2015-10.'
        ,`year`                STRING COMMENT 'Year when the event occurred. For example, if the event occurred in 2015, the value of year is 2015.'
    )
    PARTITIONED BY 
    (
        ds                     STRING COMMENT 'Date when the event occurred, in the yyyy-mm-dd format.'
    );
  3. Parse the JSON data and write it to the fact table.

    Run the following command to add partitions, parse the JSON data, and write the data to the dwd_github_events_odps table:

    msck repair table githubevents add partitions;
    
    set odps.sql.hive.compatible = true;
    set odps.sql.split.hive.bridge = true;
    INSERT into TABLE dwd_github_events_odps PARTITION(ds)
    SELECT  CAST(GET_JSON_OBJECT(col,'$.id')  AS BIGINT ) AS id
            ,CAST(GET_JSON_OBJECT(col,'$.actor.id')AS BIGINT) AS actor_id
            ,GET_JSON_OBJECT(col,'$.actor.login') AS actor_login
            ,CAST(GET_JSON_OBJECT(col,'$.repo.id')AS BIGINT) AS repo_id
            ,GET_JSON_OBJECT(col,'$.repo.name') AS repo_name
            ,CAST(GET_JSON_OBJECT(col,'$.org.id')AS BIGINT) AS org_id
            ,GET_JSON_OBJECT(col,'$.org.login') AS org_login
            ,GET_JSON_OBJECT(col,'$.type') as type
            ,to_date(GET_JSON_OBJECT(col,'$.created_at'), 'yyyy-mm-ddThh:mi:ssZ') AS created_at
            ,GET_JSON_OBJECT(col,'$.payload.action') AS action
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.id')AS BIGINT) 
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.id')AS BIGINT) 
             END AS iss_or_pr_id
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.number')AS BIGINT) 
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.number')AS BIGINT) 
                     ELSE CAST(GET_JSON_OBJECT(col,'$.payload.number')AS BIGINT)
             END AS number
            ,CAST(GET_JSON_OBJECT(col,'$.payload.comment.id')AS BIGINT) AS comment_id
            ,GET_JSON_OBJECT(col,'$.payload.comment.commit_id') AS commit_id
            ,CAST(GET_JSON_OBJECT(col,'$.payload.member.id')AS BIGINT) AS member_id
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.review.id')AS BIGINT)
                     WHEN GET_JSON_OBJECT(col,'$.type')="PushEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.push_id')AS BIGINT)
                     WHEN GET_JSON_OBJECT(col,'$.type')="ReleaseEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.release.id')AS BIGINT)
             END AS rev_or_push_or_rel_id
            ,GET_JSON_OBJECT(col,'$.payload.ref') AS ref
            ,GET_JSON_OBJECT(col,'$.payload.ref_type') AS ref_type
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.state')
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.state')
                     WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.state') 
             END AS state
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.author_association')
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.author_association')
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssueCommentEvent" THEN GET_JSON_OBJECT(col,'$.payload.comment.author_association')
                     WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.author_association') 
             END AS author_association
            ,GET_JSON_OBJECT(col,'$.payload.pull_request.base.repo.language') AS language
            ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.merged') AS BOOLEAN) AS merged
            ,to_date(GET_JSON_OBJECT(col,'$.payload.pull_request.merged_at'), 'yyyy-mm-ddThh:mi:ssZ') AS merged_at
            ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.additions')AS BIGINT) AS additions
            ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.deletions')AS BIGINT)  AS deletions
            ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.changed_files')AS BIGINT) AS changed_files
            ,CAST(GET_JSON_OBJECT(col,'$.payload.size')AS BIGINT)  AS push_size
            ,CAST(GET_JSON_OBJECT(col,'$.payload.distinct_size')AS BIGINT)   AS push_distinct_size
            ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),12,2) as hr
            ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,7),'/','-') as month
            ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,4) as year
            ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,10),'/','-') as ds
    from githubevents 
    where hr = cast(to_char(dateadd(getdate(),-9,'hh'), 'yyyy-mm-dd-hh') as string);
  4. Query the data.

    Run the following command to query data from the dwd_github_events_odps table:

    SET odps.sql.allow.fullscan=true;
    SELECT * FROM dwd_github_events_odps where ds = '2023-03-31' limit 10;

    The following sample result is returned:

    image

Build a real-time data warehouse

Obtain real-time data using ECS

An Elastic Compute Service (ECS) instance is used to extract real-time event data from the GitHub API. This topic provides a sample script to show how to collect real-time data from the GitHub API.

Note
  • Each time the script runs, it executes for 1 minute. It collects the real-time event data provided by the API during this period and stores each event in JSON format.

  • This script does not guarantee that all real-time event data is collected.

  • To continuously collect data from the GitHub API, provide Accept and Authorization headers. The value of Accept is fixed. For Authorization, enter the personal access token that you obtained from GitHub. For more information about how to create a personal access token, see this documentation.

  1. Run the following commands to create a file named download_realtime_data.py in the /opt/realtime directory.

    cd /opt/realtime
    vim download_realtime_data.py
  2. Press i to enter edit mode and add the following sample content to the file.

    #!python
    
    import requests
    import json
    import sys
    import time
    
    # Get the API URL
    def get_next_link(resp):
        resp_link = resp.headers['link']
        link = ''
        for l in resp_link.split(', '):
            link = l.split('; ')[0][1:-1]
            rel = l.split('; ')[1]
            if rel == 'rel="next"':
                return link
        return None
    
    # Collect one page of data from the API
    def download(link, fname):
    # Define the Accept and Authorization headers for the GitHub API
        headers = {"Accept": "application/vnd.github+json","Authorization": "<Bearer> <github_api_token>"}
        resp = requests.get(link, headers=headers)
    
        if int(resp.status_code) != 200:
            return None
    
        with open(fname, 'a') as f:
            for j in resp.json():
                f.write(json.dumps(j))
                f.write('\n')
    
        print('downloaded {} events to {}'.format(len(resp.json()), fname))
        return resp
    
    # Collect multiple pages of data from the API
    def download_all_data(fname):
        link = 'https://api.github.com/events?per_page=100&page=1'
        while True:
            resp = download(link, fname)
            if resp is None:
                break
            link = get_next_link(resp)
            if link is None:
                break
    
    # Define the current time
    def get_current_ms():
        return round(time.time()*1000)
    
    # Define the script execution duration as 1 minute
    def main(fname):
        current_ms = get_current_ms()
        while get_current_ms() - current_ms < 60*1000:
            download_all_data(fname)
            time.sleep(0.1)
    
    # Run the script
    if __name__ == '__main__':
        if len(sys.argv) < 2:
            print('usage: python {} <log_file>'.format(sys.argv[0]))
            exit(0)
        main(sys.argv[1])
  3. Press the Esc key, enter :wq, and press Enter to save and close the file.

  4. Create a run_py.sh file to run download_realtime_data.py and store the data collected from each run separately. The content is as follows.

    python /opt/realtime/download_realtime_data.py /opt/realtime/gh_realtime_data/$(date '+%Y-%m-%d-%H:%M:%S').json
  5. Create a delete_log.sh file to delete historical data. The content is as follows.

    d=$(TZ=UTC date --date='2 day ago' '+%Y-%m-%d')
    rm -f /opt/realtime/gh_realtime_data/*${d}*.json
  6. Run the following commands to collect GitHub data every minute and delete historical data every day.

    #1. Run the following command and press I to enter edit mode.
    crontab -e
    
    #2. Add the following commands. Then, press Esc, enter :wq, and press Enter to exit.
    * * * * * bash /opt/realtime/run_py.sh
    1 1 * * * bash /opt/realtime/delete_log.sh

Collect ECS data using SLS

Simple Log Service (SLS) is used to collect the real-time event data extracted from the ECS instance as logs.

SLS supports collecting logs from ECS instances by using Logtail. Because the data in this topic is in JSON format, you can use the JSON mode of Logtail to quickly collect incremental JSON logs from the ECS instance. For more information, see Collect logs in JSON mode. In this topic, SLS is configured to parse the top-level key-value pairs of the raw data.

Note

In this example, the log path parameter for the Logtail configuration is set to /opt/realtime/gh_realtime_data/**/*.json.

After the configuration is complete, SLS continuously collects incremental event data from the ECS instance. The following figure shows an example of the collected data.image

Write SLS data to Hologres in real time using Flink

Flink is used to write log data collected by SLS to Hologres in real time. By using an SLS source table and a Hologres result table in Flink, you can write data from SLS to Hologres in real time. For more information, see Import data from Simple Log Service.

  1. Create a Hologres internal table.

    The internal table created in this topic retains only some key-value pairs from the raw JSON data. The event id and date ds are set as the primary key. The event id is set as the distribution key. The date ds is set as the partition key. The event time created_at is set as the event_time_column. You can create an index for other fields as needed to improve query efficiency. For more information about indexes, see CREATE TABLE. The following Data Definition Language (DDL) statement is used to create the table in this example.

    DROP TABLE IF EXISTS gh_realtime_data;
    
    BEGIN;
    
    CREATE TABLE gh_realtime_data (
        id bigint,
        actor_id bigint,
        actor_login text,
        repo_id bigint,
        repo_name text,
        org_id bigint,
        org_login text,
        type text,
        created_at timestamp with time zone NOT NULL,
        action text,
        iss_or_pr_id bigint,
        number bigint,
        comment_id bigint,
        commit_id text,
        member_id bigint,
        rev_or_push_or_rel_id bigint,
        ref text,
        ref_type text,
        state text,
        author_association text,
        language text,
        merged boolean,
        merged_at timestamp with time zone,
        additions bigint,
        deletions bigint,
        changed_files bigint,
        push_size bigint,
        push_distinct_size bigint,
        hr text,
        month text,
        year text,
        ds text,
        PRIMARY KEY (id,ds)
    )
    PARTITION BY LIST (ds);
    CALL set_table_property('public.gh_realtime_data', 'distribution_key', 'id');
    CALL set_table_property('public.gh_realtime_data', 'event_time_column', 'created_at');
    CALL set_table_property('public.gh_realtime_data', 'clustering_key', 'created_at');
    
    COMMENT ON COLUMN public.gh_realtime_data.id IS 'Event ID';
    COMMENT ON COLUMN public.gh_realtime_data.actor_id IS 'ID of the event initiator';
    COMMENT ON COLUMN public.gh_realtime_data.actor_login IS 'Logon name of the event initiator';
    COMMENT ON COLUMN public.gh_realtime_data.repo_id IS 'repo ID';
    COMMENT ON COLUMN public.gh_realtime_data.repo_name IS 'repo name';
    COMMENT ON COLUMN public.gh_realtime_data.org_id IS 'ID of the organization to which the repo belongs';
    COMMENT ON COLUMN public.gh_realtime_data.org_login IS 'Name of the organization to which the repo belongs';
    COMMENT ON COLUMN public.gh_realtime_data.type IS 'Event type';
    COMMENT ON COLUMN public.gh_realtime_data.created_at IS 'Time when the event occurred';
    COMMENT ON COLUMN public.gh_realtime_data.action IS 'Event action';
    COMMENT ON COLUMN public.gh_realtime_data.iss_or_pr_id IS 'issue/pull_request ID';
    COMMENT ON COLUMN public.gh_realtime_data.number IS 'issue/pull_request number';
    COMMENT ON COLUMN public.gh_realtime_data.comment_id IS 'comment ID';
    COMMENT ON COLUMN public.gh_realtime_data.commit_id IS 'Commit ID';
    COMMENT ON COLUMN public.gh_realtime_data.member_id IS 'Member ID';
    COMMENT ON COLUMN public.gh_realtime_data.rev_or_push_or_rel_id IS 'review/push/release ID';
    COMMENT ON COLUMN public.gh_realtime_data.ref IS 'Name of the created or deleted resource';
    COMMENT ON COLUMN public.gh_realtime_data.ref_type IS 'Type of the created or deleted resource';
    COMMENT ON COLUMN public.gh_realtime_data.state IS 'Status of the issue/pull_request/pull_request_review';
    COMMENT ON COLUMN public.gh_realtime_data.author_association IS 'Relationship between the actor and the repo';
    COMMENT ON COLUMN public.gh_realtime_data.language IS 'Programming language';
    COMMENT ON COLUMN public.gh_realtime_data.merged IS 'Specifies whether the merge is accepted';
    COMMENT ON COLUMN public.gh_realtime_data.merged_at IS 'Time when the code was merged';
    COMMENT ON COLUMN public.gh_realtime_data.additions IS 'Number of added lines of code';
    COMMENT ON COLUMN public.gh_realtime_data.deletions IS 'Number of deleted lines of code';
    COMMENT ON COLUMN public.gh_realtime_data.changed_files IS 'Number of files changed in the pull request';
    COMMENT ON COLUMN public.gh_realtime_data.push_size IS 'Number of pushes';
    COMMENT ON COLUMN public.gh_realtime_data.push_distinct_size IS 'Number of distinct pushes';
    COMMENT ON COLUMN public.gh_realtime_data.hr IS 'The hour when the event occurred. For example, if the time is 00:23, hr=00.';
    COMMENT ON COLUMN public.gh_realtime_data.month IS 'The month when the event occurred. For example, if the date is October 2015, month=2015-10.';
    COMMENT ON COLUMN public.gh_realtime_data.year IS 'The year when the event occurred. For example, if the year is 2015, year=2015.';
    COMMENT ON COLUMN public.gh_realtime_data.ds IS 'The day when the event occurred. ds=yyyy-mm-dd.';
    
    COMMIT;
  2. Write data in real time using Flink.

    Use Flink to further parse the SLS data and write it to Hologres in real time. Use the following statements in Flink to filter the data to be written. This discards dirty data where the event ID or event time (created_at) is null and retains only recent event data.

    CREATE TEMPORARY TABLE sls_input (
      actor varchar,
      created_at varchar,
      id bigint,
      org varchar,
      payload varchar,
      public varchar,
      repo varchar,
      type varchar
      )
    WITH (
        'connector' = 'sls',
        'endpoint' = '<endpoint>',--The private endpoint of SLS
        'accessid' = '<accesskey id>',--The AccessKey ID of your account
        'accesskey' = '<accesskey secret>',--The AccessKey secret of your account
        'project' = '<project name>',--The name of the SLS project
        'logstore' = '<logstore name>'--The name of the SLS Logstore
        'starttime' = '2023-04-06 00:00:00',--The start time for SLS data collection
    );
    
    CREATE TEMPORARY TABLE hologres_sink (
        id bigint,
        actor_id bigint,
        actor_login string,
        repo_id bigint,
        repo_name string,
        org_id bigint,
        org_login string,
        type string,
        created_at timestamp,
        action string,
        iss_or_pr_id bigint,
        number bigint,
        comment_id bigint,
        commit_id string,
        member_id bigint,
        rev_or_push_or_rel_id bigint,
        `ref` string,
        ref_type string,
        state string,
        author_association string,
        `language` string,
        merged boolean,
        merged_at timestamp,
        additions bigint,
        deletions bigint,
        changed_files bigint,
        push_size bigint,
        push_distinct_size bigint,
        hr string,
        `month` string,
        `year` string,
        ds string
        )
    WITH (
        'connector' = 'hologres',
        'dbname' = '<hologres dbname>', --The name of the Hologres database
        'tablename' = '<hologres tablename>', --The name of the Hologres table that receives data
        'username' = '<accesskey id>', --The AccessKey ID of the current Alibaba Cloud account
        'password' = '<accesskey secret>', --The AccessKey Secret of the current Alibaba Cloud account
        'endpoint' = '<endpoint>', --The VPC endpoint of the current Hologres instance
        'jdbcretrycount' = '1', --The number of retries upon connection failure
        'partitionrouter' = 'true', --Specifies whether to write data to a partitioned table
        'createparttable' = 'true', --Specifies whether to automatically create partitions
        'mutatetype' = 'insertorignore' --The data writing mode
    );
    
    INSERT INTO hologres_sink
    SELECT id
            ,CAST(JSON_VALUE(actor, '$.id') AS bigint) AS actor_id
            ,JSON_VALUE(actor, '$.login') AS actor_login
            ,CAST(JSON_VALUE(repo, '$.id') AS bigint) AS repo_id
            ,JSON_VALUE(repo, '$.name') AS repo_name
            ,CAST(JSON_VALUE(org, '$.id') AS bigint) AS org_id
            ,JSON_VALUE(org, '$.login') AS org_login
            ,type
            ,TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS created_at
            ,JSON_VALUE(payload, '$.action') AS action
            ,CASE    WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.id') AS bigint)
                     WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.id') AS bigint)
             END AS iss_or_pr_id
            ,CASE    WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.number') AS bigint)
                     WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.number') AS bigint)
                     ELSE CAST(JSON_VALUE(payload, '$.number') AS bigint)
             END AS number
            ,CAST(JSON_VALUE(payload, '$.comment.id') AS bigint) AS comment_id
            ,JSON_VALUE(payload, '$.comment.commit_id') AS commit_id
            ,CAST(JSON_VALUE(payload, '$.member.id') AS bigint) AS member_id
            ,CASE    WHEN type='PullRequestReviewEvent' THEN CAST(JSON_VALUE(payload, '$.review.id') AS bigint)
                     WHEN type='PushEvent' THEN CAST(JSON_VALUE(payload, '$.push_id') AS bigint)
                     WHEN type='ReleaseEvent' THEN CAST(JSON_VALUE(payload, '$.release.id') AS bigint)
             END AS rev_or_push_or_rel_id
            ,JSON_VALUE(payload, '$.ref') AS `ref`
            ,JSON_VALUE(payload, '$.ref_type') AS ref_type
            ,CASE    WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.state')
                     WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.state')
                     WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.state')
             END AS state
            ,CASE    WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.author_association')
                     WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.author_association')
                     WHEN type='IssueCommentEvent' THEN JSON_VALUE(payload, '$.comment.author_association')
                     WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.author_association')
             END AS author_association
            ,JSON_VALUE(payload, '$.pull_request.base.repo.language') AS `language`
            ,CAST(JSON_VALUE(payload, '$.pull_request.merged') AS boolean) AS merged
            ,TO_TIMESTAMP_TZ(replace(JSON_VALUE(payload, '$.pull_request.merged_at'),'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS merged_at
            ,CAST(JSON_VALUE(payload, '$.pull_request.additions') AS bigint) AS additions
            ,CAST(JSON_VALUE(payload, '$.pull_request.deletions') AS bigint) AS deletions
            ,CAST(JSON_VALUE(payload, '$.pull_request.changed_files') AS bigint) AS changed_files
            ,CAST(JSON_VALUE(payload, '$.size') AS bigint) AS push_size
            ,CAST(JSON_VALUE(payload, '$.distinct_size') AS bigint) AS push_distinct_size
            ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),12,2) as hr
            ,REPLACE(SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,7),'/','-') as `month`
            ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,4) as `year`
            ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,10) as ds
    FROM
            sls_input
    WHERE
            id IS NOT NULL
          	AND created_at IS NOT NULL
            AND to_date(replace(created_at,'T',' ')) >= date_add(CURRENT_DATE, -1); 

    For more information about the parameters, see Simple Log Service (SLS) and Hologres.

    Note

    The raw event data from GitHub uses the UTC time zone and does not have a time zone attribute. The default time zone for Hologres is UTC+8. Therefore, adjust the time zone when writing data from Flink to Hologres in real time. Assign the UTC time zone attribute to the source table data in Flink SQL. The steps are as follows:

    Step 1: Go to the job editing page

    • Log on to the Realtime Compute for Apache Flink console

    • Go to the target workspace

    • Find your Flink SQL or JAR job and click Edit

    Step 2: Open the Deployment Details tab

    ⚠️ Key change: "Flink Configuration" is no longer a separate section. It is merged into "Deployment Details".

    • At the top of the job editing page, switch to the Deployment Details tab.

    • Scroll down to the Parameter Configuration section.

    Step 3: Add a custom configuration

    • Click the Edit button to the right of Parameter Configuration.

    • In the dialog box that appears, find the Other Configuration text box.

    • In the text box, add the Flink parameter table.local-time-zone:Asia/Shanghai as a key-value pair to set the Flink system time zone to Asia/Shanghai.

  3. Query data.

    Query the SLS data written to Hologres through Flink. You can then perform data development as needed.

    SELECT * FROM public.gh_realtime_data limit 10;

    The following result is an example:

    image

Correct real-time data using offline data

In the scenario described in this topic, real-time data might be missing. You can use offline data to correct the real-time data. The following steps show how to correct the real-time data from the previous day. You can adjust the data correction period as needed.

  1. Create a foreign table in Hologres to obtain MaxCompute offline data.

    IMPORT FOREIGN SCHEMA <maxcompute_project_name> LIMIT to
    (
        <foreign_table_name>
    ) 
    FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'update',if_unsupported_type 'error');

    For more information about the parameters, see IMPORT FOREIGN SCHEMA.

  2. Create a temporary table to correct the real-time data from the previous day with offline data.

    Note

    Hologres V2.1.17 and later support Serverless Computing. For scenarios such as large-scale offline imports, large ETL jobs, and high-volume foreign table queries, you can use Serverless Computing to run these tasks with additional serverless resources. This approach avoids using your instance resources, which significantly improves instance stability and reduces the probability of out-of-memory (OOM) errors. You are charged only for the resources consumed by the task. For more information, see Serverless Computing. To learn how to use this feature, see Guide to Serverless Computing.

    -- Clean up potential temporary tables
    DROP TABLE IF EXISTS gh_realtime_data_tmp;
    
    -- Create a temporary table
    SET hg_experimental_enable_create_table_like_properties = ON;
    CALL HG_CREATE_TABLE_LIKE ('gh_realtime_data_tmp', 'select * from gh_realtime_data');
    
    -- (Optional) Use Serverless Computing to perform large-scale offline data import and ETL jobs.
    SET hg_computing_resource = 'serverless';
    
    -- Insert data into the temporary table and update statistics
    INSERT INTO gh_realtime_data_tmp
    SELECT
        *
    FROM
        <foreign_table_name>
    WHERE
        ds = current_date - interval '1 day'
    ON CONFLICT (id, ds)
        DO NOTHING;
    ANALYZE gh_realtime_data_tmp;
    
    -- Reset the configuration to ensure that non-essential SQL statements do not use Serverless resources.
    RESET hg_computing_resource;
    
    -- Replace the atomic table with the existing temporary child table
    BEGIN;
    DROP TABLE IF EXISTS "gh_realtime_data_<yesterday_date>";
    ALTER TABLE gh_realtime_data_tmp RENAME TO "gh_realtime_data_<yesterday_date>";
    ALTER TABLE gh_realtime_data ATTACH PARTITION "gh_realtime_data_<yesterday_date>" FOR VALUES IN ('<yesterday_date>');
    COMMIT;

Data analysis

You can perform a wide range of analyses on the massive amount of data you have collected. Based on the time range your business requires, design your data warehouse in layers. This supports various needs, such as real-time analysis, offline analysis, and integrated real-time and offline analysis.

The following examples analyze the real-time data obtained earlier. You can also analyze data for specific code repositories or developers.

  • Query the total number of public events for today.

    SELECT
        count(*)
    FROM
        gh_realtime_data
    WHERE
        created_at >= date_trunc('day', now());

    The following is a sample result:

    count
    ------
    1006
  • Query the most active projects (with the most events) in the last day.

    SELECT
        repo_name,
        COUNT(*) AS events
    FROM
        gh_realtime_data
    WHERE
        created_at >= now() - interval '1 day'
    GROUP BY
        repo_name
    ORDER BY
        events DESC
    LIMIT 5;

    The following is a sample result:

    repo_name	                               events
    ----------------------------------------+------
    leo424y/heysiri.ml	                      29
    arm-on/plan	                              10
    Christoffel-T/fiverr-pat-20230331	        9
    mate-academy/react_dynamic-list-of-goods	9
    openvinotoolkit/openvino	                7
  • Query the most active developers (with the most events) in the last day.

    SELECT
        actor_login,
        COUNT(*) AS events
    FROM
        gh_realtime_data
    WHERE
        created_at >= now() - interval '1 day'
        AND actor_login NOT LIKE '%[bot]'
    GROUP BY
        actor_login
    ORDER BY
        events DESC
    LIMIT 5;

    The following is a sample result:

    actor_login	       events
    ------------------+------
    direwolf-github	    13
    arm-on	            10
    sergii-nosachenko	  9
    Christoffel-T	      9
    yangwang201911	    7
  • Query the ranking of the most popular programming languages in the last hour.

    SELECT
        language,
        count(*) total
    FROM
        gh_realtime_data
    WHERE
        created_at > now() - interval '1 hour'
        AND language IS NOT NULL
    GROUP BY
        language
    ORDER BY
        total DESC
    LIMIT 10;

    The following is a sample result:

    language	  total
    -----------+----
    JavaScript	25
    C++	        15
    Python	    14
    TypeScript	13
    Java	      8
    PHP	        8
  • Query the ranking of projects by the number of stars received in the last day.

    Note

    This example does not account for cases where users unstar a project.

    SELECT
        repo_id,
        repo_name,
        COUNT(actor_login) total
    FROM
        gh_realtime_data
    WHERE
        type = 'WatchEvent'
        AND created_at > now() - interval '1 day'
    GROUP BY
        repo_id,
        repo_name
    ORDER BY
        total DESC
    LIMIT 10;

    The following is a sample result:

    repo_id	   repo_name	                       total
    ---------+----------------------------------+-----
    618058471	facebookresearch/segment-anything	 4
    619959033	nomic-ai/gpt4all	                 1
    97249406	denysdovhan/wtfjs	                 1
    9791525	  digininja/DVWA	                   1
    168118422	aylei/interview	                   1
    343520006	joehillen/sysz	                   1
    162279822	agalwood/Motrix	                   1
    577723410	huggingface/swift-coreml-diffusers 1
    609539715	e2b-dev/e2b	                       1
    254839429	maniackk/KKCallStack	             1
    
  • Query the daily active users and projects for today.

    SELECT
        uniq (actor_id) actor_num,
        uniq (repo_id) repo_num
    FROM
        gh_realtime_data
    WHERE
        created_at > date_trunc('day', now());

    The following is a sample result:

    actor_num	repo_num
    ---------+--------
    743	      816