Topik ini menjelaskan cara membangun solusi analitik offline dan real-time terpadu yang menggunakan MaxCompute untuk membangun gudang data offline serta Flink dan Hologres untuk membangun gudang data real-time. Anda dapat melakukan analitik data real-time di Hologres dan analitik data offline di MaxCompute.
Informasi latar belakang
Seiring percepatan transformasi digital, bisnis semakin membutuhkan data yang tepat waktu. Selain skenario offline tradisional yang dirancang untuk pemrosesan data berskala besar, banyak kasus penggunaan kini memerlukan ingest, penyimpanan, dan analisis data secara real-time. Untuk mengatasi kebutuhan tersebut, muncul konsep analitik offline dan real-time terpadu.
Analitik offline dan real-time terpadu mengacu pada pengelolaan dan pemrosesan data real-time maupun offline dalam satu platform tunggal. Pendekatan ini mengintegrasikan pemrosesan data real-time dengan analitik offline secara mulus guna meningkatkan efisiensi dan akurasi. Manfaat utamanya meliputi hal-hal berikut:
-
Efisiensi pemrosesan data yang lebih baik: Mengintegrasikan data real-time dan offline dalam satu platform mengurangi biaya transfer dan transformasi data.
-
Akurasi analitik yang lebih tinggi: Menggabungkan data real-time dan historis memungkinkan wawasan yang lebih tepat dan akurat.
-
Kompleksitas sistem yang berkurang: Menyederhanakan alur kerja manajemen dan pemrosesan data.
-
Nilai data yang lebih besar: Memaksimalkan nilai bisnis dari data untuk mendukung pengambilan keputusan yang lebih baik.
Alibaba Cloud menyediakan solusi terstruktur untuk analitik offline dan real-time terpadu. Arsitektur ini menggunakan MaxCompute untuk beban kerja offline, Hologres untuk analitik real-time, dan Flink untuk transformasi data real-time. Layanan-layanan ini merupakan komponen inti dari solusi gudang data terpadu Alibaba Cloud.
Arsitektur solusi
Diagram berikut menunjukkan alur kerja end-to-end untuk analitik offline dan real-time terpadu menggunakan dataset event publik GitHub dengan MaxCompute dan Hologres.

Sebuah instans ECS mengumpulkan dan mengagregasi data event GitHub, baik real-time maupun offline. Data tersebut kemudian dialirkan ke pipeline real-time dan offline terpisah, yang akhirnya menyatu di Hologres untuk menyediakan lapisan layanan terpadu.
-
Pipeline real-time: Flink memproses data dari Simple Log Service (SLS) secara real-time dan menuliskannya ke Hologres. Flink adalah mesin pemrosesan stream yang andal. Hologres mendukung kueri langsung terhadap data saat diingest dan memiliki integrasi native dengan Flink, sehingga memungkinkan analitik real-time ber-throughput tinggi, latensi rendah, dan berkualitas tinggi. Pipeline ini memenuhi kebutuhan bisnis real-time seperti mengekstrak event terbaru atau menganalisis aktivitas yang sedang tren.
-
Pipeline offline: MaxCompute memproses dan mengarsipkan volume besar data historis. Alibaba Cloud Object Storage Service (OSS) menyediakan penyimpanan cloud yang aman, andal, dan hemat biaya untuk berbagai jenis data. Dalam solusi ini, data mentah disimpan dalam format JSON di OSS. MaxCompute adalah gudang data cloud enterprise-grade yang menggunakan model software-as-a-service (SaaS) dan dioptimalkan untuk analitik. MaxCompute dapat langsung membaca dan mengurai data semi-terstruktur dari OSS menggunakan tabel eksternal, mengintegrasikan data bernilai tinggi ke dalam penyimpanan internalnya, serta menggunakan DataWorks untuk pengembangan data guna membangun gudang data offline.
-
Hologres dan MaxCompute terintegrasi secara native pada lapisan penyimpanan. Hal ini memungkinkan Hologres mempercepat kueri terhadap dataset historis MaxCompute yang sangat besar untuk memenuhi kebutuhan kueri berfrekuensi rendah namun berkinerja tinggi. Integrasi ini juga memungkinkan koreksi data real-time secara mudah menggunakan data offline guna mengatasi potensi celah atau kelalaian dalam pipeline real-time.
Keunggulan utama solusi ini meliputi hal-hal berikut:
-
Pipeline offline yang stabil dan efisien: Mendukung pembaruan data per jam, pemrosesan batch dataset berskala besar, komputasi kompleks, pengurangan biaya komputasi, dan peningkatan efisiensi pemrosesan.
-
Pipeline real-time yang matang: Mendukung ingest real-time, komputasi event, dan analisis dengan arsitektur yang disederhanakan serta waktu respons di bawah satu detik.
-
Penyimpanan dan layanan terpadu: Hologres melayani seluruh data melalui antarmuka yang konsisten (kueri OLAP dan key-value disatukan dalam SQL), dengan penyimpanan terpusat.
-
Integrasi real-time dan offline yang mulus: Meminimalkan redundansi dan perpindahan data sekaligus memungkinkan koreksi data.
Dengan dukungan pengembangan end-to-end, solusi ini memberikan responsivitas data di bawah satu detik, visibilitas penuh pada seluruh pipeline, jumlah komponen arsitektur yang minimal, ketergantungan yang lebih sedikit, serta pengurangan signifikan pada biaya operasional dan tenaga kerja.
Pemahaman bisnis dan data
Developer membuat banyak proyek open source di GitHub dan menghasilkan banyak event selama proses pengembangan. GitHub mencatat tipe dan detail setiap event, developer, repositori, serta informasi lainnya. GitHub juga mengekspos event publik, seperti pemberian star dan commit kode. Untuk informasi lebih lanjut tentang tipe event tertentu, lihat Webhook events and payloads.
-
GitHub menerbitkan event publik real-time melalui OpenAPI. API ini hanya mengekspos event dari lima menit terakhir. Untuk informasi lebih lanjut, lihat Events. Anda dapat menggunakan API ini untuk mendapatkan data real-time.
-
Proyek GH Archive mengagregasi event publik GitHub per jam dan menyediakannya untuk diunduh. Untuk informasi lebih lanjut, lihat GH Archive. Anda dapat menggunakan proyek ini untuk mendapatkan data offline.
Ikhtisar bisnis GitHub
Bisnis inti GitHub berfokus pada manajemen kode dan kolaborasi developer, terutama melibatkan tiga entitas tingkat atas: Developer, Repository, dan Organization.
Dalam solusi ini, Event diperlakukan sebagai entitas tersendiri untuk penyimpanan dan analisis.

Pemahaman data event publik mentah
Contoh berikut menunjukkan sampel event mentah dalam format JSON:
{
"id": "19541192931",
"type": "WatchEvent",
"actor":
{
"id": 23286640,
"login": "herekeo",
"display_login": "herekeo",
"gravatar_id": "",
"url": "https://api.github.com/users/herekeo",
"avatar_url": "https://avatars.githubusercontent.com/u/23286640?"
},
"repo":
{
"id": 52760178,
"name": "crazyguitar/pysheeet",
"url": "https://api.github.com/repos/crazyguitar/pysheeet"
},
"payload":
{
"action": "started"
},
"public": true,
"created_at": "2022-01-01T00:03:04Z"
}
Solusi ini mencakup 15 tipe event publik, tidak termasuk tipe yang sudah tidak digunakan atau tidak terekam. Untuk daftar lengkap dan deskripsinya, lihat GitHub public event types.
Prasyarat
-
Buat instans Elastic Compute Service (ECS) dan kaitkan dengan Elastic IP Address (EIP) untuk mengekstrak data event real-time dari GitHub API. Untuk informasi lebih lanjut, lihat Create an ECS instance dan Elastic IP Address.
-
Aktifkan Object Storage Service (OSS) dan instal tool ossutil pada instans ECS Anda untuk menyimpan file data JSON dari GH Archive. Untuk informasi lebih lanjut, lihat Activate OSS dan Install ossutil.
-
Aktifkan MaxCompute dan buat proyek. Untuk informasi lebih lanjut, lihat Create a MaxCompute project.
-
Aktifkan DataWorks dan buat ruang kerja untuk membangun tugas penjadwalan offline. Untuk informasi lebih lanjut, lihat Create a workspace.
-
Aktifkan Simple Log Service (SLS) dan buat proyek serta Logstore untuk mengumpulkan data yang diekstrak oleh ECS sebagai log. Untuk informasi lebih lanjut, lihat Use LoongCollector to collect and analyze ECS text logs.
-
Aktifkan Realtime Compute for Apache Flink untuk menuliskan data log yang dikumpulkan SLS ke Hologres secara real-time. Untuk informasi lebih lanjut, lihat Activate Realtime Compute for Apache Flink.
-
Aktifkan Hologres. Untuk informasi lebih lanjut, lihat Purchase a Hologres instance.
Membangun gudang data offline (pembaruan per jam)
Mengunduh file data mentah menggunakan ECS dan mengunggahnya ke OSS
Anda dapat menggunakan instans ECS untuk mengunduh file data JSON yang disediakan oleh GH Archive.
-
Untuk data historis, Anda dapat menggunakan perintah
wget. Misalnya, jalankanwget https://data.gharchive.org/{2012..2022}-{01..12}-{01..31}-{0..23}.json.gzuntuk mengunduh data per jam dari tahun 2012 hingga 2022. -
Untuk data per jam terbaru, Anda dapat menyiapkan tugas terjadwal sebagai berikut.
Catatan-
Pastikan ossutil telah diinstal pada instans ECS Anda. Untuk informasi lebih lanjut, lihat Install ossutil. Kami menyarankan Anda mengunduh paket ossutil langsung ke instans ECS Anda, menginstal unzip dengan
yum install unzip, mengekstrak ossutil, dan memindahkannya ke direktori/usr/bin/. -
Buat bucket OSS di wilayah yang sama dengan instans ECS Anda. Anda dapat menggunakan nama bucket kustom. Dalam contoh ini, nama bucket-nya adalah
githubevents. -
Direktori unduh contoh ECS adalah
/opt/hourlydata/gh_data. Anda dapat menggunakan direktori berbeda.
-
Jalankan perintah berikut untuk membuat file bernama
download_code.shdi direktori/opt/hourlydata.cd /opt/hourlydata vim download_code.sh -
Tekan
iuntuk masuk ke mode edit dan tambahkan skrip berikut.d=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%-H') h=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%H') url=https://data.gharchive.org/${d}.json.gz echo ${url} # Download data to ./gh_data/. You can customize this path. wget ${url} -P ./gh_data/ # Switch to the gh_data directory. cd gh_data # Decompress the downloaded data into a JSON file. gzip -d ${d}.json echo ${d}.json # Switch to the root directory. cd /root # Use ossutil to upload data to OSS. # Create a directory named hr=${h} in the githubevents OSS bucket. ossutil mkdir oss://githubevents/hr=${h} # Upload data from /opt/hourlydata/gh_data (you can customize this path) to OSS. ossutil cp -r /opt/hourlydata/gh_data oss://githubevents/hr=${h} -u echo oss uploaded successfully! rm -rf /opt/hourlydata/gh_data/${d}.json echo ecs deleted! -
Tekan Esc, ketik
:wq, lalu tekan Enter untuk menyimpan dan keluar. -
Jalankan perintah berikut untuk menjadwalkan skrip
download_code.shagar berjalan pada menit ke-10 setiap jam.#1 Run the following command and press I to enter edit mode. crontab -e #2 Add the following line, then press Esc and type :wq to exit. 10 * * * * cd /opt/hourlydata && sh download_code.sh > download.logSetelah diatur, skrip akan mengunduh file JSON jam sebelumnya pada menit ke-10 setiap jam, mengekstraknya di instans ECS, dan mengunggahnya ke OSS (path:
oss://githubevents). Untuk memastikan hanya data jam terbaru yang diproses nanti, setiap unggahan membuat direktori partisi bernamahr=%Y-%m-%d-%H. Pembacaan selanjutnya hanya akan menargetkan partisi terbaru.
-
Mengimpor data OSS ke MaxCompute menggunakan tabel eksternal
Anda dapat menjalankan perintah berikut di klien MaxCompute atau node ODPS SQL di DataWorks. Untuk informasi lebih lanjut, lihat Connect using the local client (odpscmd) atau Develop an ODPS SQL task.
-
Buat tabel eksternal bernama
githubeventsuntuk memetakan file JSON yang disimpan di OSS:CREATE EXTERNAL TABLE IF NOT EXISTS githubevents ( col STRING ) PARTITIONED BY ( hr STRING ) STORED AS textfile LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/githubevents/' ;Untuk informasi lebih lanjut tentang cara membuat tabel eksternal OSS di MaxCompute, lihat ORC external tables.
-
Buat tabel fakta bernama
dwd_github_events_odpsuntuk menyimpan data yang telah diurai. Kode berikut menyediakan pernyataan DDL:CREATE TABLE IF NOT EXISTS dwd_github_events_odps ( id BIGINT COMMENT 'Event ID' ,actor_id BIGINT COMMENT 'Actor ID' ,actor_login STRING COMMENT 'Actor login name' ,repo_id BIGINT COMMENT 'Repository ID' ,repo_name STRING COMMENT 'Full repository name: owner/repository_name' ,org_id BIGINT COMMENT 'Organization ID' ,org_login STRING COMMENT 'Organization name' ,`type` STRING COMMENT 'Event type' ,created_at DATETIME COMMENT 'Event occurrence time' ,action STRING COMMENT 'Event action' ,iss_or_pr_id BIGINT COMMENT 'Issue or pull request ID' ,number BIGINT COMMENT 'Issue or pull request number' ,comment_id BIGINT COMMENT 'Comment ID' ,commit_id STRING COMMENT 'Commit ID' ,member_id BIGINT COMMENT 'Member ID' ,rev_or_push_or_rel_id BIGINT COMMENT 'Review, push, or release ID' ,ref STRING COMMENT 'Name of created or deleted resource' ,ref_type STRING COMMENT 'Type of created or deleted resource' ,state STRING COMMENT 'State of issue, pull request, or pull request review' ,author_association STRING COMMENT 'Relationship between actor and repository' ,language STRING COMMENT 'Programming language of merged code' ,merged BOOLEAN COMMENT 'Whether the merge was accepted' ,merged_at DATETIME COMMENT 'Code merge time' ,additions BIGINT COMMENT 'Number of lines added' ,deletions BIGINT COMMENT 'Number of lines deleted' ,changed_files BIGINT COMMENT 'Number of files changed in pull request' ,push_size BIGINT COMMENT 'Number of commits' ,push_distinct_size BIGINT COMMENT 'Number of distinct commits' ,hr STRING COMMENT 'Hour of event occurrence (e.g., 00:23 → hr=00)' ,`month` STRING COMMENT 'Month of event occurrence (e.g., October 2015 → month=2015-10)' ,`year` STRING COMMENT 'Year of event occurrence (e.g., 2015 → year=2015)' ) PARTITIONED BY ( ds STRING COMMENT 'Date of event occurrence (ds=yyyy-mm-dd)' ); -
Uraikan data JSON dan tulis ke tabel fakta.
Jalankan perintah berikut untuk menambahkan partisi dan mengurai data JSON ke tabel
dwd_github_events_odps:msck repair table githubevents add partitions; set odps.sql.hive.compatible = true; set odps.sql.split.hive.bridge = true; INSERT into TABLE dwd_github_events_odps PARTITION(ds) SELECT CAST(GET_JSON_OBJECT(col,'$.id') AS BIGINT ) AS id ,CAST(GET_JSON_OBJECT(col,'$.actor.id')AS BIGINT) AS actor_id ,GET_JSON_OBJECT(col,'$.actor.login') AS actor_login ,CAST(GET_JSON_OBJECT(col,'$.repo.id')AS BIGINT) AS repo_id ,GET_JSON_OBJECT(col,'$.repo.name') AS repo_name ,CAST(GET_JSON_OBJECT(col,'$.org.id')AS BIGINT) AS org_id ,GET_JSON_OBJECT(col,'$.org.login') AS org_login ,GET_JSON_OBJECT(col,'$.type') as type ,to_date(GET_JSON_OBJECT(col,'$.created_at'), 'yyyy-mm-ddThh:mi:ssZ') AS created_at ,GET_JSON_OBJECT(col,'$.payload.action') AS action ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.id')AS BIGINT) END AS iss_or_pr_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.number')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.number')AS BIGINT) ELSE CAST(GET_JSON_OBJECT(col,'$.payload.number')AS BIGINT) END AS number ,CAST(GET_JSON_OBJECT(col,'$.payload.comment.id')AS BIGINT) AS comment_id ,GET_JSON_OBJECT(col,'$.payload.comment.commit_id') AS commit_id ,CAST(GET_JSON_OBJECT(col,'$.payload.member.id')AS BIGINT) AS member_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.review.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="PushEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.push_id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="ReleaseEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.release.id')AS BIGINT) END AS rev_or_push_or_rel_id ,GET_JSON_OBJECT(col,'$.payload.ref') AS ref ,GET_JSON_OBJECT(col,'$.payload.ref_type') AS ref_type ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.state') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.state') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.state') END AS state ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssueCommentEvent" THEN GET_JSON_OBJECT(col,'$.payload.comment.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.author_association') END AS author_association ,GET_JSON_OBJECT(col,'$.payload.pull_request.base.repo.language') AS language ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.merged') AS BOOLEAN) AS merged ,to_date(GET_JSON_OBJECT(col,'$.payload.pull_request.merged_at'), 'yyyy-mm-ddThh:mi:ssZ') AS merged_at ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.additions')AS BIGINT) AS additions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.deletions')AS BIGINT) AS deletions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.changed_files')AS BIGINT) AS changed_files ,CAST(GET_JSON_OBJECT(col,'$.payload.size')AS BIGINT) AS push_size ,CAST(GET_JSON_OBJECT(col,'$.payload.distinct_size')AS BIGINT) AS push_distinct_size ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),12,2) as hr ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,7),'-','-') as month ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,4) as year ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,10),'-','-') as ds from githubevents where hr = cast(to_char(dateadd(getdate(),-9,'hh'), 'yyyy-mm-dd-hh') as string); -
Kueri data.
Jalankan perintah berikut untuk mengkueri data dari tabel
dwd_github_events_odps:SET odps.sql.allow.fullscan=true; SELECT * FROM dwd_github_events_odps where ds = '2023-03-31' limit 10;Contoh output:

Membangun gudang data real-time
Mengumpulkan data real-time menggunakan ECS
Anda dapat menggunakan instans ECS untuk mengekstrak data event real-time dari GitHub API. Skrip berikut menunjukkan salah satu metode pengumpulan data real-time melalui GitHub API.
-
Setiap eksekusi skrip berlangsung satu menit dan mengumpulkan event real-time yang tersedia selama interval tersebut, menyimpan setiap event sebagai objek JSON.
-
Skrip ini tidak menjamin penangkapan semua event real-time.
-
Untuk terus-menerus mengumpulkan data dari GitHub API, Anda harus menyediakan header Accept dan token Authorization. Header Accept memiliki nilai tetap. Token Authorization memerlukan personal access token dari GitHub. Untuk informasi lebih lanjut tentang cara membuat token, lihat Creating a personal access token.
-
Jalankan perintah berikut untuk membuat file bernama
download_realtime_data.pydi direktori/opt/realtime.cd /opt/realtime vim download_realtime_data.py -
Tekan
iuntuk masuk ke mode edit dan tambahkan konten berikut.#!python import requests import json import sys import time # Get the next page URL from the response headers def get_next_link(resp): resp_link = resp.headers['link'] link = '' for l in resp_link.split(', '): link = l.split('; ')[0][1:-1] rel = l.split('; ')[1] if rel == 'rel="next"': return link return None # Download one page of data from the API def download(link, fname): # Define GitHub API Accept and Authorization headers headers = {"Accept": "application/vnd.github+json","Authorization": "<Bearer> <github_api_token>"} resp = requests.get(link, headers=headers) if int(resp.status_code) != 200: return None with open(fname, 'a') as f: for j in resp.json(): f.write(json.dumps(j)) f.write('\n') print('downloaded {} events to {}'.format(len(resp.json()), fname)) return resp # Download multiple pages of data from the API def download_all_data(fname): link = 'https://api.github.com/events?per_page=100&page=1' while True: resp = download(link, fname) if resp is None: break link = get_next_link(resp) if link is None: break # Get current timestamp in milliseconds def get_current_ms(): return round(time.time()*1000) # Run the script for exactly 1 minute def main(fname): current_ms = get_current_ms() while get_current_ms() - current_ms < 60*1000: download_all_data(fname) time.sleep(0.1) # Execute the script if __name__ == '__main__': if len(sys.argv) < 2: print('usage: python {} <log_file>'.format(sys.argv[0])) exit(0) main(sys.argv[1]) -
Tekan Esc, ketik
:wq, lalu tekan Enter untuk menyimpan dan keluar. -
Buat file
run_py.shuntuk menjalankandownload_realtime_data.pydan menyimpan data dari setiap eksekusi secara terpisah. File tersebut berisi konten berikut:python /opt/realtime/download_realtime_data.py /opt/realtime/gh_realtime_data/$(date '+%Y-%m-%d-%H:%M:%S').json -
Buat file
delete_log.shuntuk menghapus data lama. File tersebut berisi konten berikut:d=$(TZ=UTC date --date='2 day ago' '+%Y-%m-%d') rm -f /opt/realtime/gh_realtime_data/*${d}*.json -
Jalankan perintah berikut untuk mengumpulkan data GitHub setiap menit dan menghapus data lama setiap hari.
#1 Run the following command and press I to enter edit mode. crontab -e #2 Add the following lines, then press Esc and type :wq to exit. * * * * * bash /opt/realtime/run_py.sh 1 1 * * * bash /opt/realtime/delete_log.sh
Mengumpulkan data ECS menggunakan SLS
Anda dapat menggunakan Simple Log Service (SLS) untuk mengumpulkan data event real-time dari instans ECS sebagai log.
SLS mendukung pengumpulan log dari instans ECS melalui Logtail. Karena datanya dalam format JSON, Anda dapat menggunakan mode JSON Logtail untuk dengan cepat mengingest log JSON inkremental dari instans ECS. Untuk informasi lebih lanjut tentang pengumpulan, lihat Collect logs using JSON mode. Dalam solusi ini, SLS mengurai pasangan kunci-nilai tingkat atas dari data mentah.
Parameter path log dalam konfigurasi Logtail diatur ke /opt/realtime/gh_realtime_data/**/*.json.
Setelah dikonfigurasi, SLS terus-menerus mengumpulkan data event inkremental dari instans ECS. Gambar berikut menunjukkan contoh data yang dikumpulkan.
Menuliskan data SLS ke Hologres secara real-time menggunakan Flink
Anda dapat menggunakan Flink untuk menuliskan data log yang dikumpulkan SLS ke Hologres secara real-time. Dengan mendefinisikan tabel sumber SLS dan tabel sink Hologres di Flink, Anda dapat mengalirkan data dari SLS ke Hologres. Untuk informasi lebih lanjut, lihat Import from SLS.
-
Buat tabel internal Hologres.
Tabel ini menyimpan bidang-bidang terpilih dari data JSON mentah. Tetapkan
idevent dan tanggaldssebagai kunci primer,idsebagai kunci distribusi,dssebagai kunci partisi, dancreated_atsebagai kolom waktu event. Anda dapat membuat indeks tambahan berdasarkan pola kueri Anda untuk meningkatkan performa. Untuk informasi lebih lanjut tentang indeks, lihat CREATE TABLE. Kode berikut menyediakan contoh DDL:DROP TABLE IF EXISTS gh_realtime_data; BEGIN; CREATE TABLE gh_realtime_data ( id bigint, actor_id bigint, actor_login text, repo_id bigint, repo_name text, org_id bigint, org_login text, type text, created_at timestamp with time zone NOT NULL, action text, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id text, member_id bigint, rev_or_push_or_rel_id bigint, ref text, ref_type text, state text, author_association text, language text, merged boolean, merged_at timestamp with time zone, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr text, month text, year text, ds text, PRIMARY KEY (id,ds) ) PARTITION BY LIST (ds); CALL set_table_property('public.gh_realtime_data', 'distribution_key', 'id'); CALL set_table_property('public.gh_realtime_data', 'event_time_column', 'created_at'); CALL set_table_property('public.gh_realtime_data', 'clustering_key', 'created_at'); COMMENT ON COLUMN public.gh_realtime_data.id IS 'Event ID'; COMMENT ON COLUMN public.gh_realtime_data.actor_id IS 'Actor ID'; COMMENT ON COLUMN public.gh_realtime_data.actor_login IS 'Actor login name'; COMMENT ON COLUMN public.gh_realtime_data.repo_id IS 'Repository ID'; COMMENT ON COLUMN public.gh_realtime_data.repo_name IS 'Repository name'; COMMENT ON COLUMN public.gh_realtime_data.org_id IS 'Organization ID'; COMMENT ON COLUMN public.gh_realtime_data.org_login IS 'Organization name'; COMMENT ON COLUMN public.gh_realtime_data.type IS 'Event type'; COMMENT ON COLUMN public.gh_realtime_data.created_at IS 'Event occurrence time'; COMMENT ON COLUMN public.gh_realtime_data.action IS 'Event action'; COMMENT ON COLUMN public.gh_realtime_data.iss_or_pr_id IS 'Issue or pull request ID'; COMMENT ON COLUMN public.gh_realtime_data.number IS 'Issue or pull request number'; COMMENT ON COLUMN public.gh_realtime_data.comment_id IS 'Comment ID'; COMMENT ON COLUMN public.gh_realtime_data.commit_id IS 'Commit ID'; COMMENT ON COLUMN public.gh_realtime_data.member_id IS 'Member ID'; COMMENT ON COLUMN public.gh_realtime_data.rev_or_push_or_rel_id IS 'Review, push, or release ID'; COMMENT ON COLUMN public.gh_realtime_data.ref IS 'Name of created or deleted resource'; COMMENT ON COLUMN public.gh_realtime_data.ref_type IS 'Type of created or deleted resource'; COMMENT ON COLUMN public.gh_realtime_data.state IS 'State of issue, pull request, or pull request review'; COMMENT ON COLUMN public.gh_realtime_data.author_association IS 'Relationship between actor and repository'; COMMENT ON COLUMN public.gh_realtime_data.language IS 'Programming language'; COMMENT ON COLUMN public.gh_realtime_data.merged IS 'Whether the merge was accepted'; COMMENT ON COLUMN public.gh_realtime_data.merged_at IS 'Code merge time'; COMMENT ON COLUMN public.gh_realtime_data.additions IS 'Number of lines added'; COMMENT ON COLUMN public.gh_realtime_data.deletions IS 'Number of lines deleted'; COMMENT ON COLUMN public.gh_realtime_data.changed_files IS 'Number of files changed in pull request'; COMMENT ON COLUMN public.gh_realtime_data.push_size IS 'Number of commits'; COMMENT ON COLUMN public.gh_realtime_data.push_distinct_size IS 'Number of distinct commits'; COMMENT ON COLUMN public.gh_realtime_data.hr IS 'Hour of event occurrence (e.g., 00:23 → hr=00)'; COMMENT ON COLUMN public.gh_realtime_data.month IS 'Month of event occurrence (e.g., October 2015 → month=2015-10)'; COMMENT ON COLUMN public.gh_realtime_data.year IS 'Year of event occurrence (e.g., 2015 → year=2015)'; COMMENT ON COLUMN public.gh_realtime_data.ds IS 'Date of event occurrence (ds=yyyy-mm-dd)'; COMMIT; -
Menuliskan data secara real-time menggunakan Flink.
Anda dapat menggunakan Flink untuk lebih lanjut mengurai data SLS dan menuliskannya ke Hologres secara real-time. SQL Flink berikut menyaring data kotor, yaitu data yang ID event-nya atau
created_at-nya null, serta hanya menyimpan event terbaru.CREATE TEMPORARY TABLE sls_input ( actor varchar, created_at varchar, id bigint, org varchar, payload varchar, public varchar, repo varchar, type varchar ) WITH ( 'connector' = 'sls', 'endpoint' = '<endpoint>',--The private endpoint of SLS 'accessid' = '<accesskey id>',--The AccessKey ID of your account 'accesskey' = '<accesskey secret>',--The AccessKey secret of your account 'project' = '<project name>',--The name of the SLS project 'logstore' = '<logstore name>',--The name of the SLS LogStore 'starttime' = '2023-04-06 00:00:00'--The start time for data ingestion from SLS ); CREATE TEMPORARY TABLE hologres_sink ( id bigint, actor_id bigint, actor_login string, repo_id bigint, repo_name string, org_id bigint, org_login string, type string, created_at timestamp, action string, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id string, member_id bigint, rev_or_push_or_rel_id bigint, `ref` string, ref_type string, state string, author_association string, `language` string, merged boolean, merged_at timestamp, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr string, `month` string, `year` string, ds string ) WITH ( 'connector' = 'hologres', 'dbname' = '<hologres dbname>', --The name of the Hologres database 'tablename' = '<hologres tablename>', --The name of the destination table in Hologres 'username' = '<accesskey id>', --The AccessKey ID of your Alibaba Cloud account 'password' = '<accesskey secret>', --The AccessKey secret of your Alibaba Cloud account 'endpoint' = '<endpoint>', --The VPC endpoint of the Hologres instance 'jdbcretrycount' = '1', --The number of retries upon a connection failure 'partitionrouter' = 'true', --Specifies whether to write data to a partitioned table 'createparttable' = 'true', --Specifies whether to automatically create partitions 'mutatetype' = 'insertorignore' --The data write mode ); INSERT INTO hologres_sink SELECT id ,CAST(JSON_VALUE(actor, '$.id') AS bigint) AS actor_id ,JSON_VALUE(actor, '$.login') AS actor_login ,CAST(JSON_VALUE(repo, '$.id') AS bigint) AS repo_id ,JSON_VALUE(repo, '$.name') AS repo_name ,CAST(JSON_VALUE(org, '$.id') AS bigint) AS org_id ,JSON_VALUE(org, '$.login') AS org_login ,type ,TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS created_at ,JSON_VALUE(payload, '$.action') AS action ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.id') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.id') AS bigint) END AS iss_or_pr_id ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.number') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.number') AS bigint) ELSE CAST(JSON_VALUE(payload, '$.number') AS bigint) END AS number ,CAST(JSON_VALUE(payload, '$.comment.id') AS bigint) AS comment_id ,JSON_VALUE(payload, '$.comment.commit_id') AS commit_id ,CAST(JSON_VALUE(payload, '$.member.id') AS bigint) AS member_id ,CASE WHEN type='PullRequestReviewEvent' THEN CAST(JSON_VALUE(payload, '$.review.id') AS bigint) WHEN type='PushEvent' THEN CAST(JSON_VALUE(payload, '$.push_id') AS bigint) WHEN type='ReleaseEvent' THEN CAST(JSON_VALUE(payload, '$.release.id') AS bigint) END AS rev_or_push_or_rel_id ,JSON_VALUE(payload, '$.ref') AS `ref` ,JSON_VALUE(payload, '$.ref_type') AS ref_type ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.state') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.state') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.state') END AS state ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.author_association') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.author_association') WHEN type='IssueCommentEvent' THEN JSON_VALUE(payload, '$.comment.author_association') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.author_association') END AS author_association ,JSON_VALUE(payload, '$.pull_request.base.repo.language') AS `language` ,CAST(JSON_VALUE(payload, '$.pull_request.merged') AS boolean) AS merged ,TO_TIMESTAMP_TZ(replace(JSON_VALUE(payload, '$.pull_request.merged_at'),'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS merged_at ,CAST(JSON_VALUE(payload, '$.pull_request.additions') AS bigint) AS additions ,CAST(JSON_VALUE(payload, '$.pull_request.deletions') AS bigint) AS deletions ,CAST(JSON_VALUE(payload, '$.pull_request.changed_files') AS bigint) AS changed_files ,CAST(JSON_VALUE(payload, '$.size') AS bigint) AS push_size ,CAST(JSON_VALUE(payload, '$.distinct_size') AS bigint) AS push_distinct_size ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),12,2) as hr ,REPLACE(SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,7),'/','-') as `month` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,4) as `year` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,10) as ds FROM sls_input WHERE id IS NOT NULL AND created_at IS NOT NULL AND to_date(replace(created_at,'T',' ')) >= date_add(CURRENT_DATE, -1);Untuk informasi lebih lanjut tentang parameter-parameter tersebut, lihat SLS source table dan Hologres sink table.
CatatanData event mentah dari GitHub berada dalam UTC tetapi tidak menyertakan atribut zona waktu, sedangkan zona waktu default Hologres adalah UTC+8. Oleh karena itu, Anda harus menyesuaikan zona waktu data saat Flink menuliskan data ke Hologres secara real-time. Untuk melakukannya, tetapkan atribut zona waktu UTC pada data tabel sumber di Flink SQL, dan tambahkan pernyataan
table.local-time-zone:Asia/Shanghaidi bagian Flink Configuration pada halaman Job Startup Configuration untuk mengatur zona waktu sistem Flink keAsia/Shanghaisaat pekerjaan dimulai. -
Kueri data.
Anda dapat mengkueri data SLS yang dituliskan ke Hologres oleh Flink. Anda kemudian dapat mengembangkan analitik lebih lanjut sesuai kebutuhan bisnis Anda.
SELECT * FROM public.gh_realtime_data limit 10;Contoh output:

Memperbaiki data real-time menggunakan data offline
Dalam skenario ini, data real-time mungkin memiliki celah. Anda dapat menggunakan data offline untuk memperbaiki data real-time. Langkah-langkah berikut menunjukkan cara memperbaiki data real-time hari sebelumnya. Anda dapat menyesuaikan siklus perbaikan sesuai kebutuhan bisnis Anda.
-
Buat tabel eksternal di Hologres untuk mengakses data offline MaxCompute.
IMPORT FOREIGN SCHEMA <maxcompute_project_name> LIMIT to ( <foreign_table_name> ) FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'update',if_unsupported_type 'error');Untuk informasi lebih lanjut tentang parameter-parameter tersebut, lihat IMPORT FOREIGN SCHEMA.
-
Anda dapat menggunakan data offline untuk memperbaiki data real-time hari sebelumnya dengan membuat tabel sementara.
CatatanHologres V2.1.17 dan versi lebih baru mendukung Serverless Computing. Untuk skenario seperti impor data offline berskala besar, pekerjaan extract, transform, and load (ETL) besar, dan kueri volume besar pada tabel eksternal, Anda dapat menggunakan Serverless Computing untuk menjalankan tugas-tugas tersebut. Fitur ini menggunakan sumber daya serverless tambahan alih-alih sumber daya instans Anda sendiri. Anda tidak perlu menyediakan sumber daya komputasi tambahan untuk instans Anda. Hal ini secara signifikan meningkatkan stabilitas instans, mengurangi kemungkinan error kehabisan memori (OOM), dan Anda hanya dikenai biaya untuk tugas individual tersebut. Untuk informasi lebih lanjut tentang Serverless Computing, lihat Serverless Computing. Untuk informasi tentang cara menggunakan Serverless Computing, lihat Guide to using Serverless Computing.
-- Clean up any existing temporary table DROP TABLE IF EXISTS gh_realtime_data_tmp; -- Create a temporary table SET hg_experimental_enable_create_table_like_properties = ON; CALL HG_CREATE_TABLE_LIKE ('gh_realtime_data_tmp', 'select * from gh_realtime_data'); -- (Optional) Use Serverless Computing for large-scale offline imports and ETL SET hg_computing_resource = 'serverless'; -- Insert data into the temporary table and update statistics INSERT INTO gh_realtime_data_tmp SELECT * FROM <foreign_table_name> WHERE ds = current_date - interval '1 day' ON CONFLICT (id, ds) DO NOTHING; ANALYZE gh_realtime_data_tmp; -- Reset configuration to avoid unnecessary use of serverless resources RESET hg_computing_resource; -- Atomically replace the partition with the corrected data BEGIN; DROP TABLE IF EXISTS "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data_tmp RENAME TO "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data ATTACH PARTITION "gh_realtime_data_<yesterday_date>" FOR VALUES IN ('<yesterday_date>'); COMMIT;
Analitik data
Dengan data yang telah dikumpulkan, Anda dapat melakukan berbagai analitik. Anda dapat merancang lapisan gudang data tambahan berdasarkan rentang waktu yang dibutuhkan untuk mendukung analitik real-time, offline, dan terpadu.
Contoh berikut menunjukkan cara menganalisis data real-time yang dikumpulkan pada langkah-langkah sebelumnya. Anda juga dapat menganalisis repositori atau developer tertentu.
-
Kueri jumlah total event publik hari ini.
SELECT count(*) FROM gh_realtime_data WHERE created_at >= date_trunc('day', now());Contoh output:
count ------ 1006 -
Temukan 5 repositori paling aktif berdasarkan jumlah event dalam sehari terakhir.
SELECT repo_name, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' GROUP BY repo_name ORDER BY events DESC LIMIT 5;Contoh output:
repo_name events ----------------------------------------+------ leo424y/heysiri.ml 29 arm-on/plan 10 Christoffel-T/fiverr-pat-20230331 9 mate-academy/react_dynamic-list-of-goods 9 openvinotoolkit/openvino 7 -
Temukan 5 developer paling aktif berdasarkan jumlah event dalam sehari terakhir.
SELECT actor_login, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' AND actor_login NOT LIKE '%[bot]' GROUP BY actor_login ORDER BY events DESC LIMIT 5;Contoh output:
actor_login events ------------------+------ direwolf-github 13 arm-on 10 sergii-nosachenko 9 Christoffel-T 9 yangwang201911 7 -
Peringkat bahasa pemrograman teratas yang digunakan dalam satu jam terakhir.
SELECT language, count(*) total FROM gh_realtime_data WHERE created_at > now() - interval '1 hour' AND language IS NOT NULL GROUP BY language ORDER BY total DESC LIMIT 10;Contoh output:
language total -----------+---- JavaScript 25 C++ 15 Python 14 TypeScript 13 Java 8 PHP 8 -
Peringkat repositori berdasarkan jumlah star yang ditambahkan dalam sehari terakhir.
CatatanContoh ini tidak memperhitungkan pengguna yang menghapus star.
SELECT repo_id, repo_name, COUNT(actor_login) total FROM gh_realtime_data WHERE type = 'WatchEvent' AND created_at > now() - interval '1 day' GROUP BY repo_id, repo_name ORDER BY total DESC LIMIT 10;Contoh output:
repo_id repo_name total ---------+----------------------------------+----- 618058471 facebookresearch/segment-anything 4 619959033 nomic-ai/gpt4all 1 97249406 denysdovhan/wtfjs 1 9791525 digininja/DVWA 1 168118422 aylei/interview 1 343520006 joehillen/sysz 1 162279822 agalwood/Motrix 1 577723410 huggingface/swift-coreml-diffusers 1 609539715 e2b-dev/e2b 1 254839429 maniackk/KKCallStack 1 -
Kueri jumlah pengguna aktif harian dan repositori hari ini.
SELECT uniq (actor_id) actor_num, uniq (repo_id) repo_num FROM gh_realtime_data WHERE created_at > date_trunc('day', now());Contoh output:
actor_num repo_num ---------+-------- 743 816