概述
在數字化營運時代,遊戲公司亟需對玩家行為進行精細化分析,以提升留存、付費與活躍度。傳統數倉架構存在成本高、擴充難、多引擎割裂等問題。
本方案基於 阿里雲 OpenLake 開源湖倉體系,結合以下核心技術組件:
EMR Serverless Spark:無伺服器化 Spark 引擎,用於高效 ETL。
Paimon:流批一體的湖儲存格式,支援 ACID、Schema Evolution、Time Travel。
DLF(Data Lake Formation):統一中繼資料管理,打通 Spark、StarRocks、Flink 等引擎。
StarRocks:極速統一分析引擎,支援高並發點查與複雜 OLAP。
通過該方案,您可實現:
從 OSS 原始日誌到 DWD/ADS 分層建模。
利用 Spark 進行靈活、強大的批處理 ETL。
將加工結果寫入 Paimon 湖表,供多引擎共用。
通過 StarRocks 直接查詢湖表或內表,支撐 BI 報表與即席分析。
架構圖
前提條件
請確保已完成以下準備工作:
產品 | 操作 |
EMR Serverless | 已建立 Spark 計算叢集,並綁定 DLF 許可權 |
DLF | 已開通服務,在目標 Region 建立 Paimon Catalog,擷取 |
StarRocks | 已部署 EMR Serverless StarRocks 執行個體(可選,若僅用 Spark + Paimon 可不依賴) |
DataWorks | 用於運行 Spark ETL 指令碼 |
操作步驟
步驟 1:配置環境變數(在 Notebook 中執行)
%emr_serverless_spark
DLF_CATALOG_ID = "clg-paimon-e62c8d1e8fa04ee097be4870af155296" # ← 替換為您的 DLF Catalog ID
REGION = "cn-hangzhou" # ← 替換為您的 Region
print(f"DLF Catalog ID: {DLF_CATALOG_ID}")
print(f"Region: {REGION}")步驟 2:初始化 Spark Session 並驗證資料來源
from pyspark.sql import SparkSession
OSS_PUBLIC_BUCKET = f"emr-starrocks-benchmark-resource-{REGION}"
PROFILE_SRC_GLOB = f"oss://{OSS_PUBLIC_BUCKET}/sr_game_demo_v2/user_profile/*.parquet"
EVENT_SRC_GLOB = f"oss://{OSS_PUBLIC_BUCKET}/sr_game_demo_v2/user_event/*.parquet"
spark = (
SparkSession.builder
.appName("DLF-Paimon-Ingest-sr_game_demo_v2")
.config("spark.dlf.catalog.id", DLF_CATALOG_ID)
.config("spark.dlf.region", REGION)
.config("spark.hadoop.fs.oss.endpoint", f"oss-{REGION}-internal.aliyuncs.com")
.enableHiveSupport()
.getOrCreate()
)
# 驗證檔案是否存在
def glob_count(path_glob: str) -> int:
from py4j.java_gateway import java_import
jvm = spark._jvm
hconf = spark.sparkContext._jsc.hadoopConfiguration()
Path = jvm.org.apache.hadoop.fs.Path
p = Path(path_glob)
fs = p.getFileSystem(hconf)
stats = fs.globStatus(p)
return 0 if stats is None else len(stats)
print("profile matched:", glob_count(PROFILE_SRC_GLOB))
print("event matched: ", glob_count(EVENT_SRC_GLOB))
步驟 3:讀取未經處理資料並寫入 Paimon ODS 層
優勢:Paimon 自動管理檔案合并、索引、Schema 演化,無需手動維護 Parquet 分區
df_profile = spark.read.parquet(PROFILE_SRC_GLOB)
df_event = spark.read.parquet(EVENT_SRC_GLOB)
spark.sql("CREATE DATABASE IF NOT EXISTS game_db")
# 寫入 Paimon 表(ODS 層)
(df_profile.write
.format("paimon")
.mode("overwrite")
.saveAsTable("game_db.ods_user_profile"))
(df_event.write
.format("paimon")
.mode("overwrite")
.saveAsTable("game_db.ods_user_event"))
步驟 4:構建 DWD 詳細資料層(Spark SQL ETL)
4.1 使用者明細表 dwd_user_details
DROP TABLE IF EXISTS game_db.dwd_user_details;CREATE TABLE game_db.dwd_user_details
USING paimon
AS
WITH p AS (
SELECT
user_id,
decode(gender, 'UTF-8') AS gender,
decode(os_version, 'UTF-8') AS os_version,
CAST(decode(current_level, 'UTF-8') AS INT) AS current_level,
decode(device_type, 'UTF-8') AS device_type,
TO_DATE(decode(last_login_date, 'UTF-8')) AS last_login_date,
decode(favorite_game_mode, 'UTF-8') AS favorite_game_mode,
decode(language_preference, 'UTF-8') AS language_preference,
decode(active_time, 'UTF-8') AS active_time,
TO_DATE(decode(registration_date, 'UTF-8')) AS registration_date,
CAST(decode(total_deaths, 'UTF-8') AS INT) AS total_deaths,
CAST(decode(game_hours, 'UTF-8') AS DOUBLE) AS game_hours,
decode(location, 'UTF-8') AS location,
decode(play_frequency, 'UTF-8') AS play_frequency,
ROW_NUMBER() OVER (
PARTITION BY user_id
ORDER BY TO_DATE(decode(last_login_date, 'UTF-8')) DESC NULLS LAST
) AS rn
FROM game_db.ods_user_profile
)
SELECT * EXCEPT (rn)
FROM p
WHERE rn = 1; -- 去重:每個 user_id 僅保留 last_login_date 最新的記錄4.2 使用者事件明細表 dwd_user_event
DROP TABLE IF EXISTS game_db.dwd_user_event;CREATE TABLE game_db.dwd_user_event
USING paimon
AS
WITH e AS (
SELECT
user_id,
LOWER(decode(event_type, 'UTF-8')) AS event_type,
TRIM(decode(timestamp, 'UTF-8')) AS ts_str,
decode(event_details, 'UTF-8') AS event_details,
decode(location, 'UTF-8') AS event_location
FROM game_db.ods_user_event
),
e_ts AS (
SELECT *,
CASE
WHEN ts_str RLIKE '^[0-9]{13}$' THEN TO_TIMESTAMP(FROM_UNIXTIME(CAST(ts_str AS BIGINT)/1000))
WHEN ts_str RLIKE '^[0-9]{10}$' THEN TO_TIMESTAMP(FROM_UNIXTIME(CAST(ts_str AS BIGINT)))
ELSE TO_TIMESTAMP(ts_str)
END AS event_ts
FROM e
)
SELECT
e.user_id,
e.event_ts,
TO_DATE(e.event_ts) AS event_date,
e.event_type,
COALESCE(NULLIF(e.event_location,''), d.location) AS location,
-- 從 JSON 提取金額
COALESCE(CAST(get_json_object(event_details, '$.amount') AS DOUBLE), 0.0) AS amount,
d.gender, d.device_type
FROM e_ts e
LEFT JOIN game_db.dwd_user_details d ON e.user_id = d.user_id
WHERE e.event_ts IS NOT NULL;步驟 5:構建 ADS 應用資料層(指標彙總)
5.1 日留存率表 ads_retention_daily
DROP TABLE IF EXISTS game_db.ads_retention_daily;CREATE TABLE game_db.ads_retention_daily
USING paimon
AS
WITH dau AS (
SELECT event_date AS dt, user_id
FROM game_db.dwd_user_event
GROUP BY event_date, user_id
)
SELECT
base.dt,
base.dau,
COALESCE(d1.d1_retained, 0) AS d1_retained,
ROUND(COALESCE(d1.d1_retained,0) * 1.0 / base.dau, 4) AS d1_retention_rate,
COALESCE(d7.d7_retained, 0) AS d7_retained,
ROUND(COALESCE(d7.d7_retained,0) * 1.0 / base.dau, 4) AS d7_retention_rate
FROM (
SELECT dt, COUNT(DISTINCT user_id) AS dau FROM dau GROUP BY dt
) base
LEFT JOIN (
SELECT a.dt, COUNT(DISTINCT a.user_id) AS d1_retained
FROM dau a JOIN dau b ON a.user_id = b.user_id AND b.dt = DATE_ADD(a.dt, 1)
GROUP BY a.dt
) d1 ON base.dt = d1.dt
LEFT JOIN (
SELECT a.dt, COUNT(DISTINCT a.user_id) AS d7_retained
FROM dau a JOIN dau b ON a.user_id = b.user_id AND b.dt = DATE_ADD(a.dt, 7)
GROUP BY a.dt
) d7 ON base.dt = d7.dt;5.2 其他 ADS 表(略,參考原代碼)
ads_purchase_trends_daily:每日 GMV、付費使用者數ads_device_preference_daily:裝置分布及佔比ads_region_distribution_daily:省市 DAU 分布
步驟 6:對接 StarRocks
建立 Starrocks 節點 直接查詢湖表。
-- 直接查詢湖表
SELECT * FROM paimon_catalog.game_db.ads_retention_daily LIMIT 10;
步驟 7:可視化(Quick BI)
在 Quick BI 控制台 建立 StarRocks 資料來源。
建立資料集,SQL 如下:
資料集SQL1:
SELECT * FROM game_db.ADS_MV_USER_RETENTION;資料集SQL2:
SELECT * FROM game_db.ADS_MV_USER_GEOGRAPHIC_DISTRIBUTION;資料集SQL3:
SELECT * FROM game_db.ADS_MV_USER_DEVICE_PREFERENCE;資料集SQL4:
SELECT * FROM game_db.ADS_MV_USER_PURCHASE_TRENDS;
製作折線圖、地圖、儀錶盤等,監控核心指標
方案優勢總結
維度 | 傳統方案 | 本方案 |
儲存成本 | HDFS 成本高 | OSS 低成本 + Paimon 自動小檔案合并 |
計算彈性 | 需常駐叢集 | EMR Serverless 按需計費 |
資料一致性 | 手動維護分區/版本 | Paimon ACID + Time Travel |
多引擎協同 | 資料孤島 | DLF 統一中繼資料,Spark/Flink/StarRocks 共用 |
開發效率 | 複雜調度依賴 | Notebook 一站式 ETL + SQL 建模 |