全部产品
Search
文档中心

Hologres:Analitik offline dan real-time terpadu menggunakan dataset event publik GitHub

更新时间:Feb 04, 2026

Topik ini menjelaskan cara membangun solusi analitik offline dan real-time terpadu yang menggunakan MaxCompute untuk membangun gudang data offline serta Flink dan Hologres untuk membangun gudang data real-time. Anda dapat melakukan analitik data real-time di Hologres dan analitik data offline di MaxCompute.

Informasi latar belakang

Seiring percepatan transformasi digital, bisnis semakin membutuhkan data yang tepat waktu. Selain skenario offline tradisional yang dirancang untuk pemrosesan data berskala besar, banyak kasus penggunaan kini memerlukan ingest, penyimpanan, dan analisis data secara real-time. Untuk mengatasi kebutuhan tersebut, muncul konsep analitik offline dan real-time terpadu.

Analitik offline dan real-time terpadu mengacu pada pengelolaan dan pemrosesan data real-time maupun offline dalam satu platform tunggal. Pendekatan ini mengintegrasikan pemrosesan data real-time dengan analitik offline secara mulus guna meningkatkan efisiensi dan akurasi. Manfaat utamanya meliputi hal-hal berikut:

  • Efisiensi pemrosesan data yang lebih baik: Mengintegrasikan data real-time dan offline dalam satu platform mengurangi biaya transfer dan transformasi data.

  • Akurasi analitik yang lebih tinggi: Menggabungkan data real-time dan historis memungkinkan wawasan yang lebih tepat dan akurat.

  • Kompleksitas sistem yang berkurang: Menyederhanakan alur kerja manajemen dan pemrosesan data.

  • Nilai data yang lebih besar: Memaksimalkan nilai bisnis dari data untuk mendukung pengambilan keputusan yang lebih baik.

Alibaba Cloud menyediakan solusi terstruktur untuk analitik offline dan real-time terpadu. Arsitektur ini menggunakan MaxCompute untuk beban kerja offline, Hologres untuk analitik real-time, dan Flink untuk transformasi data real-time. Layanan-layanan ini merupakan komponen inti dari solusi gudang data terpadu Alibaba Cloud.

Arsitektur solusi

Diagram berikut menunjukkan alur kerja end-to-end untuk analitik offline dan real-time terpadu menggunakan dataset event publik GitHub dengan MaxCompute dan Hologres.

image

Sebuah instans ECS mengumpulkan dan mengagregasi data event GitHub, baik real-time maupun offline. Data tersebut kemudian dialirkan ke pipeline real-time dan offline terpisah, yang akhirnya menyatu di Hologres untuk menyediakan lapisan layanan terpadu.

  • Pipeline real-time: Flink memproses data dari Simple Log Service (SLS) secara real-time dan menuliskannya ke Hologres. Flink adalah mesin pemrosesan stream yang andal. Hologres mendukung kueri langsung terhadap data saat diingest dan memiliki integrasi native dengan Flink, sehingga memungkinkan analitik real-time ber-throughput tinggi, latensi rendah, dan berkualitas tinggi. Pipeline ini memenuhi kebutuhan bisnis real-time seperti mengekstrak event terbaru atau menganalisis aktivitas yang sedang tren.

  • Pipeline offline: MaxCompute memproses dan mengarsipkan volume besar data historis. Alibaba Cloud Object Storage Service (OSS) menyediakan penyimpanan cloud yang aman, andal, dan hemat biaya untuk berbagai jenis data. Dalam solusi ini, data mentah disimpan dalam format JSON di OSS. MaxCompute adalah gudang data cloud enterprise-grade yang menggunakan model software-as-a-service (SaaS) dan dioptimalkan untuk analitik. MaxCompute dapat langsung membaca dan mengurai data semi-terstruktur dari OSS menggunakan tabel eksternal, mengintegrasikan data bernilai tinggi ke dalam penyimpanan internalnya, serta menggunakan DataWorks untuk pengembangan data guna membangun gudang data offline.

  • Hologres dan MaxCompute terintegrasi secara native pada lapisan penyimpanan. Hal ini memungkinkan Hologres mempercepat kueri terhadap dataset historis MaxCompute yang sangat besar untuk memenuhi kebutuhan kueri berfrekuensi rendah namun berkinerja tinggi. Integrasi ini juga memungkinkan koreksi data real-time secara mudah menggunakan data offline guna mengatasi potensi celah atau kelalaian dalam pipeline real-time.

Keunggulan utama solusi ini meliputi hal-hal berikut:

  • Pipeline offline yang stabil dan efisien: Mendukung pembaruan data per jam, pemrosesan batch dataset berskala besar, komputasi kompleks, pengurangan biaya komputasi, dan peningkatan efisiensi pemrosesan.

  • Pipeline real-time yang matang: Mendukung ingest real-time, komputasi event, dan analisis dengan arsitektur yang disederhanakan serta waktu respons di bawah satu detik.

  • Penyimpanan dan layanan terpadu: Hologres melayani seluruh data melalui antarmuka yang konsisten (kueri OLAP dan key-value disatukan dalam SQL), dengan penyimpanan terpusat.

  • Integrasi real-time dan offline yang mulus: Meminimalkan redundansi dan perpindahan data sekaligus memungkinkan koreksi data.

Dengan dukungan pengembangan end-to-end, solusi ini memberikan responsivitas data di bawah satu detik, visibilitas penuh pada seluruh pipeline, jumlah komponen arsitektur yang minimal, ketergantungan yang lebih sedikit, serta pengurangan signifikan pada biaya operasional dan tenaga kerja.

Pemahaman bisnis dan data

Developer membuat banyak proyek open source di GitHub dan menghasilkan banyak event selama proses pengembangan. GitHub mencatat tipe dan detail setiap event, developer, repositori, serta informasi lainnya. GitHub juga mengekspos event publik, seperti pemberian star dan commit kode. Untuk informasi lebih lanjut tentang tipe event tertentu, lihat Webhook events and payloads.

  • GitHub menerbitkan event publik real-time melalui OpenAPI. API ini hanya mengekspos event dari lima menit terakhir. Untuk informasi lebih lanjut, lihat Events. Anda dapat menggunakan API ini untuk mendapatkan data real-time.

  • Proyek GH Archive mengagregasi event publik GitHub per jam dan menyediakannya untuk diunduh. Untuk informasi lebih lanjut, lihat GH Archive. Anda dapat menggunakan proyek ini untuk mendapatkan data offline.

Ikhtisar bisnis GitHub

Bisnis inti GitHub berfokus pada manajemen kode dan kolaborasi developer, terutama melibatkan tiga entitas tingkat atas: Developer, Repository, dan Organization.image

Dalam solusi ini, Event diperlakukan sebagai entitas tersendiri untuk penyimpanan dan analisis.

image

Pemahaman data event publik mentah

Contoh berikut menunjukkan sampel event mentah dalam format JSON:

{
    "id": "19541192931",
    "type": "WatchEvent",
    "actor":
    {
        "id": 23286640,
        "login": "herekeo",
        "display_login": "herekeo",
        "gravatar_id": "",
        "url": "https://api.github.com/users/herekeo",
        "avatar_url": "https://avatars.githubusercontent.com/u/23286640?"
    },
    "repo":
    {
        "id": 52760178,
        "name": "crazyguitar/pysheeet",
        "url": "https://api.github.com/repos/crazyguitar/pysheeet"
    },
    "payload":
    {
        "action": "started"
    },
    "public": true,
    "created_at": "2022-01-01T00:03:04Z"
}

Solusi ini mencakup 15 tipe event publik, tidak termasuk tipe yang sudah tidak digunakan atau tidak terekam. Untuk daftar lengkap dan deskripsinya, lihat GitHub public event types.

Prasyarat

Membangun gudang data offline (pembaruan per jam)

Mengunduh file data mentah menggunakan ECS dan mengunggahnya ke OSS

Anda dapat menggunakan instans ECS untuk mengunduh file data JSON yang disediakan oleh GH Archive.

  • Untuk data historis, Anda dapat menggunakan perintah wget. Misalnya, jalankan wget https://data.gharchive.org/{2012..2022}-{01..12}-{01..31}-{0..23}.json.gz untuk mengunduh data per jam dari tahun 2012 hingga 2022.

  • Untuk data per jam terbaru, Anda dapat menyiapkan tugas terjadwal sebagai berikut.

    Catatan
    • Pastikan ossutil telah diinstal pada instans ECS Anda. Untuk informasi lebih lanjut, lihat Install ossutil. Kami menyarankan Anda mengunduh paket ossutil langsung ke instans ECS Anda, menginstal unzip dengan yum install unzip, mengekstrak ossutil, dan memindahkannya ke direktori /usr/bin/.

    • Buat bucket OSS di wilayah yang sama dengan instans ECS Anda. Anda dapat menggunakan nama bucket kustom. Dalam contoh ini, nama bucket-nya adalah githubevents.

    • Direktori unduh contoh ECS adalah /opt/hourlydata/gh_data. Anda dapat menggunakan direktori berbeda.

    1. Jalankan perintah berikut untuk membuat file bernama download_code.sh di direktori /opt/hourlydata.

      cd /opt/hourlydata
      vim download_code.sh
    2. Tekan i untuk masuk ke mode edit dan tambahkan skrip berikut.

      d=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%-H')
      h=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%H')
      url=https://data.gharchive.org/${d}.json.gz
      echo ${url}
      
      # Download data to ./gh_data/. You can customize this path.
      wget ${url} -P ./gh_data/
      
      # Switch to the gh_data directory.
      cd gh_data
      
      # Decompress the downloaded data into a JSON file.
      gzip -d ${d}.json
      
      echo ${d}.json
      
      # Switch to the root directory.
      cd /root
      
      # Use ossutil to upload data to OSS.
      # Create a directory named hr=${h} in the githubevents OSS bucket.
      ossutil mkdir oss://githubevents/hr=${h}
      
      # Upload data from /opt/hourlydata/gh_data (you can customize this path) to OSS.
      ossutil cp -r /opt/hourlydata/gh_data oss://githubevents/hr=${h} -u
      echo oss uploaded successfully!
      
      rm -rf /opt/hourlydata/gh_data/${d}.json
      echo ecs deleted!
    3. Tekan Esc, ketik :wq, lalu tekan Enter untuk menyimpan dan keluar.

    4. Jalankan perintah berikut untuk menjadwalkan skrip download_code.sh agar berjalan pada menit ke-10 setiap jam.

      #1 Run the following command and press I to enter edit mode.
      crontab -e
      
      #2 Add the following line, then press Esc and type :wq to exit.
      10 * * * * cd /opt/hourlydata && sh download_code.sh > download.log

      Setelah diatur, skrip akan mengunduh file JSON jam sebelumnya pada menit ke-10 setiap jam, mengekstraknya di instans ECS, dan mengunggahnya ke OSS (path: oss://githubevents). Untuk memastikan hanya data jam terbaru yang diproses nanti, setiap unggahan membuat direktori partisi bernama hr=%Y-%m-%d-%H. Pembacaan selanjutnya hanya akan menargetkan partisi terbaru.

Mengimpor data OSS ke MaxCompute menggunakan tabel eksternal

Anda dapat menjalankan perintah berikut di klien MaxCompute atau node ODPS SQL di DataWorks. Untuk informasi lebih lanjut, lihat Connect using the local client (odpscmd) atau Develop an ODPS SQL task.

  1. Buat tabel eksternal bernama githubevents untuk memetakan file JSON yang disimpan di OSS:

    CREATE EXTERNAL TABLE IF NOT EXISTS githubevents
    (
        col  STRING
    )
    PARTITIONED BY 
    (
        hr   STRING
    )
    STORED AS textfile
    LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/githubevents/'
    ;

    Untuk informasi lebih lanjut tentang cara membuat tabel eksternal OSS di MaxCompute, lihat ORC external tables.

  2. Buat tabel fakta bernama dwd_github_events_odps untuk menyimpan data yang telah diurai. Kode berikut menyediakan pernyataan DDL:

    CREATE TABLE IF NOT EXISTS dwd_github_events_odps
    (
        id                     BIGINT COMMENT 'Event ID'
        ,actor_id              BIGINT COMMENT 'Actor ID'
        ,actor_login           STRING COMMENT 'Actor login name'
        ,repo_id               BIGINT COMMENT 'Repository ID'
        ,repo_name             STRING COMMENT 'Full repository name: owner/repository_name'
        ,org_id                BIGINT COMMENT 'Organization ID'
        ,org_login             STRING COMMENT 'Organization name'
        ,`type`                STRING COMMENT 'Event type'
        ,created_at            DATETIME COMMENT 'Event occurrence time'
        ,action                STRING COMMENT 'Event action'
        ,iss_or_pr_id          BIGINT COMMENT 'Issue or pull request ID'
        ,number                BIGINT COMMENT 'Issue or pull request number'
        ,comment_id            BIGINT COMMENT 'Comment ID'
        ,commit_id             STRING COMMENT 'Commit ID'
        ,member_id             BIGINT COMMENT 'Member ID'
        ,rev_or_push_or_rel_id BIGINT COMMENT 'Review, push, or release ID'
        ,ref                   STRING COMMENT 'Name of created or deleted resource'
        ,ref_type              STRING COMMENT 'Type of created or deleted resource'
        ,state                 STRING COMMENT 'State of issue, pull request, or pull request review'
        ,author_association    STRING COMMENT 'Relationship between actor and repository'
        ,language              STRING COMMENT 'Programming language of merged code'
        ,merged                BOOLEAN COMMENT 'Whether the merge was accepted'
        ,merged_at             DATETIME COMMENT 'Code merge time'
        ,additions             BIGINT COMMENT 'Number of lines added'
        ,deletions             BIGINT COMMENT 'Number of lines deleted'
        ,changed_files         BIGINT COMMENT 'Number of files changed in pull request'
        ,push_size             BIGINT COMMENT 'Number of commits'
        ,push_distinct_size    BIGINT COMMENT 'Number of distinct commits'
        ,hr                    STRING COMMENT 'Hour of event occurrence (e.g., 00:23 → hr=00)'
        ,`month`               STRING COMMENT 'Month of event occurrence (e.g., October 2015 → month=2015-10)'
        ,`year`                STRING COMMENT 'Year of event occurrence (e.g., 2015 → year=2015)'
    )
    PARTITIONED BY 
    (
        ds                     STRING COMMENT 'Date of event occurrence (ds=yyyy-mm-dd)'
    );
  3. Uraikan data JSON dan tulis ke tabel fakta.

    Jalankan perintah berikut untuk menambahkan partisi dan mengurai data JSON ke tabel dwd_github_events_odps:

    msck repair table githubevents add partitions;
    
    set odps.sql.hive.compatible = true;
    set odps.sql.split.hive.bridge = true;
    INSERT into TABLE dwd_github_events_odps PARTITION(ds)
    SELECT  CAST(GET_JSON_OBJECT(col,'$.id')  AS BIGINT ) AS id
            ,CAST(GET_JSON_OBJECT(col,'$.actor.id')AS BIGINT) AS actor_id
            ,GET_JSON_OBJECT(col,'$.actor.login') AS actor_login
            ,CAST(GET_JSON_OBJECT(col,'$.repo.id')AS BIGINT) AS repo_id
            ,GET_JSON_OBJECT(col,'$.repo.name') AS repo_name
            ,CAST(GET_JSON_OBJECT(col,'$.org.id')AS BIGINT) AS org_id
            ,GET_JSON_OBJECT(col,'$.org.login') AS org_login
            ,GET_JSON_OBJECT(col,'$.type') as type
            ,to_date(GET_JSON_OBJECT(col,'$.created_at'), 'yyyy-mm-ddThh:mi:ssZ') AS created_at
            ,GET_JSON_OBJECT(col,'$.payload.action') AS action
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.id')AS BIGINT) 
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.id')AS BIGINT) 
             END AS iss_or_pr_id
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.number')AS BIGINT) 
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.number')AS BIGINT) 
                     ELSE CAST(GET_JSON_OBJECT(col,'$.payload.number')AS BIGINT)
             END AS number
            ,CAST(GET_JSON_OBJECT(col,'$.payload.comment.id')AS BIGINT) AS comment_id
            ,GET_JSON_OBJECT(col,'$.payload.comment.commit_id') AS commit_id
            ,CAST(GET_JSON_OBJECT(col,'$.payload.member.id')AS BIGINT) AS member_id
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.review.id')AS BIGINT)
                     WHEN GET_JSON_OBJECT(col,'$.type')="PushEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.push_id')AS BIGINT)
                     WHEN GET_JSON_OBJECT(col,'$.type')="ReleaseEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.release.id')AS BIGINT)
             END AS rev_or_push_or_rel_id
            ,GET_JSON_OBJECT(col,'$.payload.ref') AS ref
            ,GET_JSON_OBJECT(col,'$.payload.ref_type') AS ref_type
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.state')
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.state')
                     WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.state') 
             END AS state
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.author_association')
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.author_association')
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssueCommentEvent" THEN GET_JSON_OBJECT(col,'$.payload.comment.author_association')
                     WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.author_association') 
             END AS author_association
            ,GET_JSON_OBJECT(col,'$.payload.pull_request.base.repo.language') AS language
            ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.merged') AS BOOLEAN) AS merged
            ,to_date(GET_JSON_OBJECT(col,'$.payload.pull_request.merged_at'), 'yyyy-mm-ddThh:mi:ssZ') AS merged_at
            ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.additions')AS BIGINT) AS additions
            ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.deletions')AS BIGINT)  AS deletions
            ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.changed_files')AS BIGINT) AS changed_files
            ,CAST(GET_JSON_OBJECT(col,'$.payload.size')AS BIGINT)  AS push_size
            ,CAST(GET_JSON_OBJECT(col,'$.payload.distinct_size')AS BIGINT)   AS push_distinct_size
            ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),12,2) as hr
            ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,7),'-','-') as month
            ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,4) as year
            ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,10),'-','-') as ds
    from githubevents 
    where hr = cast(to_char(dateadd(getdate(),-9,'hh'), 'yyyy-mm-dd-hh') as string);
  4. Kueri data.

    Jalankan perintah berikut untuk mengkueri data dari tabel dwd_github_events_odps:

    SET odps.sql.allow.fullscan=true;
    SELECT * FROM dwd_github_events_odps where ds = '2023-03-31' limit 10;

    Contoh output:

    image

Membangun gudang data real-time

Mengumpulkan data real-time menggunakan ECS

Anda dapat menggunakan instans ECS untuk mengekstrak data event real-time dari GitHub API. Skrip berikut menunjukkan salah satu metode pengumpulan data real-time melalui GitHub API.

Catatan
  • Setiap eksekusi skrip berlangsung satu menit dan mengumpulkan event real-time yang tersedia selama interval tersebut, menyimpan setiap event sebagai objek JSON.

  • Skrip ini tidak menjamin penangkapan semua event real-time.

  • Untuk terus-menerus mengumpulkan data dari GitHub API, Anda harus menyediakan header Accept dan token Authorization. Header Accept memiliki nilai tetap. Token Authorization memerlukan personal access token dari GitHub. Untuk informasi lebih lanjut tentang cara membuat token, lihat Creating a personal access token.

  1. Jalankan perintah berikut untuk membuat file bernama download_realtime_data.py di direktori /opt/realtime.

    cd /opt/realtime
    vim download_realtime_data.py
  2. Tekan i untuk masuk ke mode edit dan tambahkan konten berikut.

    #!python
    
    import requests
    import json
    import sys
    import time
    
    # Get the next page URL from the response headers
    def get_next_link(resp):
        resp_link = resp.headers['link']
        link = ''
        for l in resp_link.split(', '):
            link = l.split('; ')[0][1:-1]
            rel = l.split('; ')[1]
            if rel == 'rel="next"':
                return link
        return None
    
    # Download one page of data from the API
    def download(link, fname):
    # Define GitHub API Accept and Authorization headers
        headers = {"Accept": "application/vnd.github+json","Authorization": "<Bearer> <github_api_token>"}
        resp = requests.get(link, headers=headers)
    
        if int(resp.status_code) != 200:
            return None
    
        with open(fname, 'a') as f:
            for j in resp.json():
                f.write(json.dumps(j))
                f.write('\n')
    
        print('downloaded {} events to {}'.format(len(resp.json()), fname))
        return resp
    
    # Download multiple pages of data from the API
    def download_all_data(fname):
        link = 'https://api.github.com/events?per_page=100&page=1'
        while True:
            resp = download(link, fname)
            if resp is None:
                break
            link = get_next_link(resp)
            if link is None:
                break
    
    # Get current timestamp in milliseconds
    def get_current_ms():
        return round(time.time()*1000)
    
    # Run the script for exactly 1 minute
    def main(fname):
        current_ms = get_current_ms()
        while get_current_ms() - current_ms < 60*1000:
            download_all_data(fname)
            time.sleep(0.1)
    
    # Execute the script
    if __name__ == '__main__':
        if len(sys.argv) < 2:
            print('usage: python {} <log_file>'.format(sys.argv[0]))
            exit(0)
        main(sys.argv[1])
  3. Tekan Esc, ketik :wq, lalu tekan Enter untuk menyimpan dan keluar.

  4. Buat file run_py.sh untuk menjalankan download_realtime_data.py dan menyimpan data dari setiap eksekusi secara terpisah. File tersebut berisi konten berikut:

    python /opt/realtime/download_realtime_data.py /opt/realtime/gh_realtime_data/$(date '+%Y-%m-%d-%H:%M:%S').json
  5. Buat file delete_log.sh untuk menghapus data lama. File tersebut berisi konten berikut:

    d=$(TZ=UTC date --date='2 day ago' '+%Y-%m-%d')
    rm -f /opt/realtime/gh_realtime_data/*${d}*.json
  6. Jalankan perintah berikut untuk mengumpulkan data GitHub setiap menit dan menghapus data lama setiap hari.

    #1 Run the following command and press I to enter edit mode.
    crontab -e
    
    #2 Add the following lines, then press Esc and type :wq to exit.
    * * * * * bash /opt/realtime/run_py.sh
    1 1 * * * bash /opt/realtime/delete_log.sh

Mengumpulkan data ECS menggunakan SLS

Anda dapat menggunakan Simple Log Service (SLS) untuk mengumpulkan data event real-time dari instans ECS sebagai log.

SLS mendukung pengumpulan log dari instans ECS melalui Logtail. Karena datanya dalam format JSON, Anda dapat menggunakan mode JSON Logtail untuk dengan cepat mengingest log JSON inkremental dari instans ECS. Untuk informasi lebih lanjut tentang pengumpulan, lihat Collect logs using JSON mode. Dalam solusi ini, SLS mengurai pasangan kunci-nilai tingkat atas dari data mentah.

Catatan

Parameter path log dalam konfigurasi Logtail diatur ke /opt/realtime/gh_realtime_data/**/*.json.

Setelah dikonfigurasi, SLS terus-menerus mengumpulkan data event inkremental dari instans ECS. Gambar berikut menunjukkan contoh data yang dikumpulkan.image

Menuliskan data SLS ke Hologres secara real-time menggunakan Flink

Anda dapat menggunakan Flink untuk menuliskan data log yang dikumpulkan SLS ke Hologres secara real-time. Dengan mendefinisikan tabel sumber SLS dan tabel sink Hologres di Flink, Anda dapat mengalirkan data dari SLS ke Hologres. Untuk informasi lebih lanjut, lihat Import from SLS.

  1. Buat tabel internal Hologres.

    Tabel ini menyimpan bidang-bidang terpilih dari data JSON mentah. Tetapkan id event dan tanggal ds sebagai kunci primer, id sebagai kunci distribusi, ds sebagai kunci partisi, dan created_at sebagai kolom waktu event. Anda dapat membuat indeks tambahan berdasarkan pola kueri Anda untuk meningkatkan performa. Untuk informasi lebih lanjut tentang indeks, lihat CREATE TABLE. Kode berikut menyediakan contoh DDL:

    DROP TABLE IF EXISTS gh_realtime_data;
    
    BEGIN;
    
    CREATE TABLE gh_realtime_data (
        id bigint,
        actor_id bigint,
        actor_login text,
        repo_id bigint,
        repo_name text,
        org_id bigint,
        org_login text,
        type text,
        created_at timestamp with time zone NOT NULL,
        action text,
        iss_or_pr_id bigint,
        number bigint,
        comment_id bigint,
        commit_id text,
        member_id bigint,
        rev_or_push_or_rel_id bigint,
        ref text,
        ref_type text,
        state text,
        author_association text,
        language text,
        merged boolean,
        merged_at timestamp with time zone,
        additions bigint,
        deletions bigint,
        changed_files bigint,
        push_size bigint,
        push_distinct_size bigint,
        hr text,
        month text,
        year text,
        ds text,
        PRIMARY KEY (id,ds)
    )
    PARTITION BY LIST (ds);
    CALL set_table_property('public.gh_realtime_data', 'distribution_key', 'id');
    CALL set_table_property('public.gh_realtime_data', 'event_time_column', 'created_at');
    CALL set_table_property('public.gh_realtime_data', 'clustering_key', 'created_at');
    
    COMMENT ON COLUMN public.gh_realtime_data.id IS 'Event ID';
    COMMENT ON COLUMN public.gh_realtime_data.actor_id IS 'Actor ID';
    COMMENT ON COLUMN public.gh_realtime_data.actor_login IS 'Actor login name';
    COMMENT ON COLUMN public.gh_realtime_data.repo_id IS 'Repository ID';
    COMMENT ON COLUMN public.gh_realtime_data.repo_name IS 'Repository name';
    COMMENT ON COLUMN public.gh_realtime_data.org_id IS 'Organization ID';
    COMMENT ON COLUMN public.gh_realtime_data.org_login IS 'Organization name';
    COMMENT ON COLUMN public.gh_realtime_data.type IS 'Event type';
    COMMENT ON COLUMN public.gh_realtime_data.created_at IS 'Event occurrence time';
    COMMENT ON COLUMN public.gh_realtime_data.action IS 'Event action';
    COMMENT ON COLUMN public.gh_realtime_data.iss_or_pr_id IS 'Issue or pull request ID';
    COMMENT ON COLUMN public.gh_realtime_data.number IS 'Issue or pull request number';
    COMMENT ON COLUMN public.gh_realtime_data.comment_id IS 'Comment ID';
    COMMENT ON COLUMN public.gh_realtime_data.commit_id IS 'Commit ID';
    COMMENT ON COLUMN public.gh_realtime_data.member_id IS 'Member ID';
    COMMENT ON COLUMN public.gh_realtime_data.rev_or_push_or_rel_id IS 'Review, push, or release ID';
    COMMENT ON COLUMN public.gh_realtime_data.ref IS 'Name of created or deleted resource';
    COMMENT ON COLUMN public.gh_realtime_data.ref_type IS 'Type of created or deleted resource';
    COMMENT ON COLUMN public.gh_realtime_data.state IS 'State of issue, pull request, or pull request review';
    COMMENT ON COLUMN public.gh_realtime_data.author_association IS 'Relationship between actor and repository';
    COMMENT ON COLUMN public.gh_realtime_data.language IS 'Programming language';
    COMMENT ON COLUMN public.gh_realtime_data.merged IS 'Whether the merge was accepted';
    COMMENT ON COLUMN public.gh_realtime_data.merged_at IS 'Code merge time';
    COMMENT ON COLUMN public.gh_realtime_data.additions IS 'Number of lines added';
    COMMENT ON COLUMN public.gh_realtime_data.deletions IS 'Number of lines deleted';
    COMMENT ON COLUMN public.gh_realtime_data.changed_files IS 'Number of files changed in pull request';
    COMMENT ON COLUMN public.gh_realtime_data.push_size IS 'Number of commits';
    COMMENT ON COLUMN public.gh_realtime_data.push_distinct_size IS 'Number of distinct commits';
    COMMENT ON COLUMN public.gh_realtime_data.hr IS 'Hour of event occurrence (e.g., 00:23 → hr=00)';
    COMMENT ON COLUMN public.gh_realtime_data.month IS 'Month of event occurrence (e.g., October 2015 → month=2015-10)';
    COMMENT ON COLUMN public.gh_realtime_data.year IS 'Year of event occurrence (e.g., 2015 → year=2015)';
    COMMENT ON COLUMN public.gh_realtime_data.ds IS 'Date of event occurrence (ds=yyyy-mm-dd)';
    
    COMMIT;
  2. Menuliskan data secara real-time menggunakan Flink.

    Anda dapat menggunakan Flink untuk lebih lanjut mengurai data SLS dan menuliskannya ke Hologres secara real-time. SQL Flink berikut menyaring data kotor, yaitu data yang ID event-nya atau created_at-nya null, serta hanya menyimpan event terbaru.

    CREATE TEMPORARY TABLE sls_input (
      actor varchar,
      created_at varchar,
      id bigint,
      org varchar,
      payload varchar,
      public varchar,
      repo varchar,
      type varchar
      )
    WITH (
        'connector' = 'sls',
        'endpoint' = '<endpoint>',--The private endpoint of SLS
        'accessid' = '<accesskey id>',--The AccessKey ID of your account
        'accesskey' = '<accesskey secret>',--The AccessKey secret of your account
        'project' = '<project name>',--The name of the SLS project
        'logstore' = '<logstore name>',--The name of the SLS LogStore
        'starttime' = '2023-04-06 00:00:00'--The start time for data ingestion from SLS
    );
    
    CREATE TEMPORARY TABLE hologres_sink (
        id bigint,
        actor_id bigint,
        actor_login string,
        repo_id bigint,
        repo_name string,
        org_id bigint,
        org_login string,
        type string,
        created_at timestamp,
        action string,
        iss_or_pr_id bigint,
        number bigint,
        comment_id bigint,
        commit_id string,
        member_id bigint,
        rev_or_push_or_rel_id bigint,
        `ref` string,
        ref_type string,
        state string,
        author_association string,
        `language` string,
        merged boolean,
        merged_at timestamp,
        additions bigint,
        deletions bigint,
        changed_files bigint,
        push_size bigint,
        push_distinct_size bigint,
        hr string,
        `month` string,
        `year` string,
        ds string
        )
    WITH (
        'connector' = 'hologres',
        'dbname' = '<hologres dbname>', --The name of the Hologres database
        'tablename' = '<hologres tablename>', --The name of the destination table in Hologres
        'username' = '<accesskey id>', --The AccessKey ID of your Alibaba Cloud account
        'password' = '<accesskey secret>', --The AccessKey secret of your Alibaba Cloud account
        'endpoint' = '<endpoint>', --The VPC endpoint of the Hologres instance
        'jdbcretrycount' = '1', --The number of retries upon a connection failure
        'partitionrouter' = 'true', --Specifies whether to write data to a partitioned table
        'createparttable' = 'true', --Specifies whether to automatically create partitions
        'mutatetype' = 'insertorignore' --The data write mode
    );
    
    INSERT INTO hologres_sink
    SELECT id
            ,CAST(JSON_VALUE(actor, '$.id') AS bigint) AS actor_id
            ,JSON_VALUE(actor, '$.login') AS actor_login
            ,CAST(JSON_VALUE(repo, '$.id') AS bigint) AS repo_id
            ,JSON_VALUE(repo, '$.name') AS repo_name
            ,CAST(JSON_VALUE(org, '$.id') AS bigint) AS org_id
            ,JSON_VALUE(org, '$.login') AS org_login
            ,type
            ,TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS created_at
            ,JSON_VALUE(payload, '$.action') AS action
            ,CASE    WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.id') AS bigint)
                     WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.id') AS bigint)
             END AS iss_or_pr_id
            ,CASE    WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.number') AS bigint)
                     WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.number') AS bigint)
                     ELSE CAST(JSON_VALUE(payload, '$.number') AS bigint)
             END AS number
            ,CAST(JSON_VALUE(payload, '$.comment.id') AS bigint) AS comment_id
            ,JSON_VALUE(payload, '$.comment.commit_id') AS commit_id
            ,CAST(JSON_VALUE(payload, '$.member.id') AS bigint) AS member_id
            ,CASE    WHEN type='PullRequestReviewEvent' THEN CAST(JSON_VALUE(payload, '$.review.id') AS bigint)
                     WHEN type='PushEvent' THEN CAST(JSON_VALUE(payload, '$.push_id') AS bigint)
                     WHEN type='ReleaseEvent' THEN CAST(JSON_VALUE(payload, '$.release.id') AS bigint)
             END AS rev_or_push_or_rel_id
            ,JSON_VALUE(payload, '$.ref') AS `ref`
            ,JSON_VALUE(payload, '$.ref_type') AS ref_type
            ,CASE    WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.state')
                     WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.state')
                     WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.state')
             END AS state
            ,CASE    WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.author_association')
                     WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.author_association')
                     WHEN type='IssueCommentEvent' THEN JSON_VALUE(payload, '$.comment.author_association')
                     WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.author_association')
             END AS author_association
            ,JSON_VALUE(payload, '$.pull_request.base.repo.language') AS `language`
            ,CAST(JSON_VALUE(payload, '$.pull_request.merged') AS boolean) AS merged
            ,TO_TIMESTAMP_TZ(replace(JSON_VALUE(payload, '$.pull_request.merged_at'),'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS merged_at
            ,CAST(JSON_VALUE(payload, '$.pull_request.additions') AS bigint) AS additions
            ,CAST(JSON_VALUE(payload, '$.pull_request.deletions') AS bigint) AS deletions
            ,CAST(JSON_VALUE(payload, '$.pull_request.changed_files') AS bigint) AS changed_files
            ,CAST(JSON_VALUE(payload, '$.size') AS bigint) AS push_size
            ,CAST(JSON_VALUE(payload, '$.distinct_size') AS bigint) AS push_distinct_size
            ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),12,2) as hr
            ,REPLACE(SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,7),'/','-') as `month`
            ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,4) as `year`
            ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,10) as ds
    FROM
            sls_input
    WHERE
            id IS NOT NULL
          	AND created_at IS NOT NULL
            AND to_date(replace(created_at,'T',' ')) >= date_add(CURRENT_DATE, -1); 

    Untuk informasi lebih lanjut tentang parameter-parameter tersebut, lihat SLS source table dan Hologres sink table.

    Catatan

    Data event mentah dari GitHub berada dalam UTC tetapi tidak menyertakan atribut zona waktu, sedangkan zona waktu default Hologres adalah UTC+8. Oleh karena itu, Anda harus menyesuaikan zona waktu data saat Flink menuliskan data ke Hologres secara real-time. Untuk melakukannya, tetapkan atribut zona waktu UTC pada data tabel sumber di Flink SQL, dan tambahkan pernyataan table.local-time-zone:Asia/Shanghai di bagian Flink Configuration pada halaman Job Startup Configuration untuk mengatur zona waktu sistem Flink ke Asia/Shanghai saat pekerjaan dimulai.

  3. Kueri data.

    Anda dapat mengkueri data SLS yang dituliskan ke Hologres oleh Flink. Anda kemudian dapat mengembangkan analitik lebih lanjut sesuai kebutuhan bisnis Anda.

    SELECT * FROM public.gh_realtime_data limit 10;

    Contoh output:

    image

Memperbaiki data real-time menggunakan data offline

Dalam skenario ini, data real-time mungkin memiliki celah. Anda dapat menggunakan data offline untuk memperbaiki data real-time. Langkah-langkah berikut menunjukkan cara memperbaiki data real-time hari sebelumnya. Anda dapat menyesuaikan siklus perbaikan sesuai kebutuhan bisnis Anda.

  1. Buat tabel eksternal di Hologres untuk mengakses data offline MaxCompute.

    IMPORT FOREIGN SCHEMA <maxcompute_project_name> LIMIT to
    (
        <foreign_table_name>
    ) 
    FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'update',if_unsupported_type 'error');

    Untuk informasi lebih lanjut tentang parameter-parameter tersebut, lihat IMPORT FOREIGN SCHEMA.

  2. Anda dapat menggunakan data offline untuk memperbaiki data real-time hari sebelumnya dengan membuat tabel sementara.

    Catatan

    Hologres V2.1.17 dan versi lebih baru mendukung Serverless Computing. Untuk skenario seperti impor data offline berskala besar, pekerjaan extract, transform, and load (ETL) besar, dan kueri volume besar pada tabel eksternal, Anda dapat menggunakan Serverless Computing untuk menjalankan tugas-tugas tersebut. Fitur ini menggunakan sumber daya serverless tambahan alih-alih sumber daya instans Anda sendiri. Anda tidak perlu menyediakan sumber daya komputasi tambahan untuk instans Anda. Hal ini secara signifikan meningkatkan stabilitas instans, mengurangi kemungkinan error kehabisan memori (OOM), dan Anda hanya dikenai biaya untuk tugas individual tersebut. Untuk informasi lebih lanjut tentang Serverless Computing, lihat Serverless Computing. Untuk informasi tentang cara menggunakan Serverless Computing, lihat Guide to using Serverless Computing.

    -- Clean up any existing temporary table
    DROP TABLE IF EXISTS gh_realtime_data_tmp;
    
    -- Create a temporary table
    SET hg_experimental_enable_create_table_like_properties = ON;
    CALL HG_CREATE_TABLE_LIKE ('gh_realtime_data_tmp', 'select * from gh_realtime_data');
    
    -- (Optional) Use Serverless Computing for large-scale offline imports and ETL
    SET hg_computing_resource = 'serverless';
    
    -- Insert data into the temporary table and update statistics
    INSERT INTO gh_realtime_data_tmp
    SELECT
        *
    FROM
        <foreign_table_name>
    WHERE
        ds = current_date - interval '1 day'
    ON CONFLICT (id, ds)
        DO NOTHING;
    ANALYZE gh_realtime_data_tmp;
    
    -- Reset configuration to avoid unnecessary use of serverless resources
    RESET hg_computing_resource;
    
    -- Atomically replace the partition with the corrected data
    BEGIN;
    DROP TABLE IF EXISTS "gh_realtime_data_<yesterday_date>";
    ALTER TABLE gh_realtime_data_tmp RENAME TO "gh_realtime_data_<yesterday_date>";
    ALTER TABLE gh_realtime_data ATTACH PARTITION "gh_realtime_data_<yesterday_date>" FOR VALUES IN ('<yesterday_date>');
    COMMIT;

Analitik data

Dengan data yang telah dikumpulkan, Anda dapat melakukan berbagai analitik. Anda dapat merancang lapisan gudang data tambahan berdasarkan rentang waktu yang dibutuhkan untuk mendukung analitik real-time, offline, dan terpadu.

Contoh berikut menunjukkan cara menganalisis data real-time yang dikumpulkan pada langkah-langkah sebelumnya. Anda juga dapat menganalisis repositori atau developer tertentu.

  • Kueri jumlah total event publik hari ini.

    SELECT
        count(*)
    FROM
        gh_realtime_data
    WHERE
        created_at >= date_trunc('day', now());

    Contoh output:

    count
    ------
    1006
  • Temukan 5 repositori paling aktif berdasarkan jumlah event dalam sehari terakhir.

    SELECT
        repo_name,
        COUNT(*) AS events
    FROM
        gh_realtime_data
    WHERE
        created_at >= now() - interval '1 day'
    GROUP BY
        repo_name
    ORDER BY
        events DESC
    LIMIT 5;

    Contoh output:

    repo_name	                               events
    ----------------------------------------+------
    leo424y/heysiri.ml	                      29
    arm-on/plan	                              10
    Christoffel-T/fiverr-pat-20230331	        9
    mate-academy/react_dynamic-list-of-goods	9
    openvinotoolkit/openvino	                7
  • Temukan 5 developer paling aktif berdasarkan jumlah event dalam sehari terakhir.

    SELECT
        actor_login,
        COUNT(*) AS events
    FROM
        gh_realtime_data
    WHERE
        created_at >= now() - interval '1 day'
        AND actor_login NOT LIKE '%[bot]'
    GROUP BY
        actor_login
    ORDER BY
        events DESC
    LIMIT 5;

    Contoh output:

    actor_login	       events
    ------------------+------
    direwolf-github	    13
    arm-on	            10
    sergii-nosachenko	  9
    Christoffel-T	      9
    yangwang201911	    7
  • Peringkat bahasa pemrograman teratas yang digunakan dalam satu jam terakhir.

    SELECT
        language,
        count(*) total
    FROM
        gh_realtime_data
    WHERE
        created_at > now() - interval '1 hour'
        AND language IS NOT NULL
    GROUP BY
        language
    ORDER BY
        total DESC
    LIMIT 10;

    Contoh output:

    language	  total
    -----------+----
    JavaScript	25
    C++	        15
    Python	    14
    TypeScript	13
    Java	      8
    PHP	        8
  • Peringkat repositori berdasarkan jumlah star yang ditambahkan dalam sehari terakhir.

    Catatan

    Contoh ini tidak memperhitungkan pengguna yang menghapus star.

    SELECT
        repo_id,
        repo_name,
        COUNT(actor_login) total
    FROM
        gh_realtime_data
    WHERE
        type = 'WatchEvent'
        AND created_at > now() - interval '1 day'
    GROUP BY
        repo_id,
        repo_name
    ORDER BY
        total DESC
    LIMIT 10;

    Contoh output:

    repo_id	   repo_name	                       total
    ---------+----------------------------------+-----
    618058471	facebookresearch/segment-anything	 4
    619959033	nomic-ai/gpt4all	                 1
    97249406	denysdovhan/wtfjs	                 1
    9791525	  digininja/DVWA	                   1
    168118422	aylei/interview	                   1
    343520006	joehillen/sysz	                   1
    162279822	agalwood/Motrix	                   1
    577723410	huggingface/swift-coreml-diffusers 1
    609539715	e2b-dev/e2b	                       1
    254839429	maniackk/KKCallStack	             1
    
  • Kueri jumlah pengguna aktif harian dan repositori hari ini.

    SELECT
        uniq (actor_id) actor_num,
        uniq (repo_id) repo_num
    FROM
        gh_realtime_data
    WHERE
        created_at > date_trunc('day', now());

    Contoh output:

    actor_num	repo_num
    ---------+--------
    743	      816