全部产品
Search
文档中心

云原生数据仓库AnalyticDB:基于AnalyticDB Spark+Notebook实现数仓与数据湖间的数据交换

更新时间:Jan 27, 2026

本文将介绍如何利用AnalyticDB Spark,在 Notebook 环境中通过 PySpark 或 Spark SQL 实现数据仓库与数据湖之间的双向数据交换与 ETL 处理。

背景说明

AnalyticDB for MySQL支持两种数据存储模式:数据仓库表(高性能存储)和数据湖表(低成本存储)。根据数据来源与业务需求,应选择不同的存储模式作为贴源层,在生产环境中,这两种表类型通常并存:

  • 数据仓库表:适用于通过 DTS 同步 MySQL、PolarDB 或 MongoDB 的实时数据。其优势在于高并发更新性能。

  • 数据湖表:适用于从 OSS 或数据库定期批量加载数据。其优势在于高吞吐量及数据共享能力。

image

准备工作

AnalyticDB for MySQL

  1. AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版

  2. 集群的内核需为3.2.6及以上版本。

    说明

    云原生数据仓库AnalyticDB MySQL控制台集群信息页面,配置信息区域,查看和升级内核版本。若集群已是最新默认基线版本但仍需升级,请通过钉钉联系阿里云服务支持处理(钉钉账号:x5v_rm8wqzuqf)。

  3. 已创建数据库账号。

  4. 创建Job型资源组和Spark引擎Interactive型资源组

  5. 已开启ENI访问。

    重要
    • 登录云原生数据仓库AnalyticDB MySQL控制台,在集群管理 > 集群信息网络信息区域,打开ENI网络开关。

    • 开启和关闭ENI网络会导致数据库连接中断大约2分钟,无法读写。请谨慎评估影响后再开启或关闭ENI网络。

Notebook工作空间

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。

  2. 单击作业开发 > Notebook开发。确保已完成如下准备工作,然后单击进入DMS Notebook

    image

  3. 新建工作空间

  4. 引入数据源

    image

  5. 创建Spark集群。

    1. 单击image按钮,进入资源管理页面,单击计算集群

    2. 选择Spark集群页签,单击创建集群,并配置如下参数:

      image

      参数

      说明

      示例值

      集群名称

      输入便于识别使用场景的集群名称。

      spark_test

      运行环境

      根据需求选择适合的镜像版本。

      adb-spark:v3.5-python3.9-scala2.12

      AnalyticDB实例

      在下拉框中选择AnalyticDB for MySQL集群。

      amv-uf6i4bi88****

      AnalyticDB MySQL资源组

      在下拉框中选择Job型资源组。

      testjob

      Spark APP Executor规格

      选择Spark Executor的资源规格。

      不同型号的取值对应不同的规格,详情请参见Spark应用配置参数说明的型号列。

      large

      交换机

      选择当前VPC下的交换机。

      vsw-uf6n9ipl6qgo****

  6. 创建Notebook会话。

    创建并启动Notebook会话时,首次启动需要等待大约5分钟。

    image

    参数

    说明

    示例值

    会话名称

    您可自定义会话名称。

    new_session

    所属集群

    选择上一步骤创建的Spark集群。

    spark_test

    镜像

    选择镜像规格。

    Spark3.5_Scala2.12_Python3.9:1.0.9

    规格

    kernel的资源规格。

    4C16G

    配置

    profile资源。

    您可编辑profile的名称、资源释放时长、数据存储位置、Pypi包管理和环境变量信息。

    重要

    资源释放时长:当资源空闲时间超过设置的时长,则会自动释放。资源释放时长设置为0,表示资源永久不会自动释放。

    deault_profile

注意事项

  • 确保 Notebook 绑定的 Spark Cluster 与集群实例位于同一交换机下。

  • 为避免在代码中明文硬编码敏感信息,请在 DMS Notebook 右下角设置中的变量管理,添加数据源变量。

    选择数据源后,需填写username与password。

    image

  • 在加工数据的时候,需要在Notebook里切换不同的SQL计算引擎来实现完整的湖仓加工流程。

    除Spark SQL外,其余为XIHE引擎(无需特意指定数据库,可在SQL中指定库表名)

    image

湖仓ETL

本案例基于 TPC-H 数据集,演示两种核心场景:

  1. 仓转湖:读取实时更新的数据仓库表,加工为宽表写入数据湖,用于数据共享或机器学习。

  2. 湖转仓:读取数据湖数据,构建宽表写入数据仓库,支持亚秒级 BI 报表查询。

场景一:读取数仓表,加工宽表并写入数据湖

场景描述

OLTP 数据实时同步至数仓表(贴源层)。Spark 读取贴源层数据,清洗整合成宽表,并存储于数据湖。

步骤一:初始化 Spark Session

  1. 网络信息页面查看交换机ID,可以前往VPC控制台查询该交换机对应的 IPv4 网段,将其加入数据安全-白名单设置并记录。

  2. ECS 控制台的安全组页面,选择全部类型后,搜索AnalytiDB MySQL实例的集群ID,并记录对应的安全组 ID。

  3. 在 Notebook 的 Python 单元格中执行以下代码。请替换:

    • 数据源变量名dbl_adbmysql_2

    • 交换机IDspark.adb.eni.vswitchId

    • 安全组IDsecurityGroupId

    from pyspark.sql import SparkSession
    
    # 获取环境变量
    host = dms.get_datasource("dbl_adbmysql_2")['host']
    username = dms.get_datasource("dbl_adbmysql_2")['username']
    password = dms.get_datasource("dbl_adbmysql_2")['password']
    
    # 初始化 SparkSession,配置 ENI 和 JDBC Catalog
    spark = SparkSession.builder.appName("FromLakeToWarehouse")\
            .config("spark.adb.eni.enabled", "true")\
            .config("spark.adb.eni.vswitchId", "vsw-*****")\
            .config("spark.adb.eni.securityGroupId", "sg-******") \
            .config("spark.sql.catalog.jdbc", "org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog") \
            .config("spark.sql.catalog.jdbc.url", f"jdbc:mysql://{host}:3306/?useServerPrepStmts=false&rewriteBatchedStatements=true") \
            .config("spark.sql.catalog.jdbc.user", username) \
            .config("spark.sql.catalog.jdbc.password", password) \
            .getOrCreate()

步骤二:准备数仓源数据

在 SQL 单元格(使用 XIHE 引擎)中执行,模拟数仓中的实时数据。

选择ADB_External_TPCH_10GB执行。
CREATE DATABASE IF NOT EXISTS DATAWAREHOUSE;

# 创建一张非分区表(数据仓库格式)
CREATE TABLE IF NOT EXISTS DATAWAREHOUSE.ORDERS 
AS SELECT * FROM ADB_External_TPCH_10GB.External_orders;

#创建一张分区表(数据仓库格式),再写入数据
CREATE TABLE IF NOT EXISTS DATAWAREHOUSE.LINEITEM(
`l_orderkey` bigint NOT NULL COMMENT '',
`l_partkey` int NOT NULL COMMENT '',
`l_suppkey` int NOT NULL COMMENT '',
`l_linenumber` int NOT NULL COMMENT '',
`l_quantity` decimal(15, 2) NOT NULL COMMENT '',
`l_extendedprice` decimal(15, 2) NOT NULL COMMENT '',
`l_discount` decimal(15, 2) NOT NULL COMMENT '',
`l_tax` decimal(15, 2) NOT NULL COMMENT '',
`l_returnflag` varchar(1) NOT NULL COMMENT '',
`l_linestatus` varchar(1) NOT NULL COMMENT '',
`l_shipdate` date NOT NULL COMMENT '',
`l_commitdate` date NOT NULL COMMENT '',
`l_receiptdate` date NOT NULL COMMENT '',
`l_shipinstruct` varchar(25) NOT NULL COMMENT '',
`l_shipmode` varchar(10) NOT NULL COMMENT '',
`l_comment` varchar(1024) NOT NULL COMMENT '',
`dummy` varchar(1024),
PRIMARY KEY(l_orderkey,l_commitdate)
)
DISTRIBUTED BY HASH(l_orderkey)
PARTITION BY VALUE(DATE_FORMAT(l_commitdate, '%Y%m%d'));

-- 写入数据
INSERT INTO DATAWAREHOUSE.LINEITEM 
SELECT * FROM ADB_External_TPCH_10GB.External_lineitem;

步骤三:生成数据湖快照

将数仓表的全量快照写入数据湖。以下提供两种等效方案,任选其一。

  • Spark SQL从数据仓库表读取小数据量加载到数据湖。

  • XIHE SQL:推荐用于常规快照生成。

Spark SQL

执行前需创建一个绑定 OSS 路径的数据库,作为数据湖的存储根目录,可以让后续的表自动继承路径。后续建表时无需再次指定路径,可以更好地区分湖仓表(额外声明创建一个库用于管理湖表)。
CREATE DATABASE IF NOT EXISTS LAKEHOUSE_DB LOCATION 'oss://<Bucket>/ORDERS_DATALAKE_SNAPSHOT/';

CREATE TABLE IF NOT EXISTS LAKEHOUSE_DB.ORDERS_DATALAKE_SNAPSHOT 
USING ICEBERG
AS SELECT * FROM jdbc.DATAWAREHOUSE.ORDERS;

CREATE TABLE IF NOT EXISTS LAKEHOUSE_DB.LINEITEM_DATALAKE_SNAPSHOT 
USING ICEBERG
AS SELECT * FROM jdbc.DATAWAREHOUSE.LINEITEM LIMIT 10000;

XIHE SQL

替换LOCATION为OSS实际Bucket。
-- 快照 Orders 表
CREATE TABLE IF NOT EXISTS DATAWAREHOUSE.ORDERS_DATALAKE_SNAPSHOT 
STORED AS ICEBERG
LOCATION 'oss://<Bucket>/ORDERS_DATALAKE_SNAPSHOT/'
AS SELECT * FROM DATAWAREHOUSE.ORDERS;

-- 快照 Lineitem 表
CREATE TABLE IF NOT EXISTS DATAWAREHOUSE.LINEITEM_DATALAKE_SNAPSHOT 
STORED AS ICEBERG
LOCATION 'oss://<Bucket>/ORDERS_DATALAKE_SNAPSHOT/'
AS SELECT * FROM DATAWAREHOUSE.LINEITEM;

步骤四:生成宽表

(Spark SQL)创建目标 Iceberg 宽表结构。

说明

步骤三采用了Spark SQL方式,额外创建了LAKEHOUSE_DB。如果采用XIHE SQL,请注意后续步骤替换对应数据库名称。

CREATE TABLE IF NOT EXISTS LAKEHOUSE_DB.order_fulfillment_detail (
    transaction_id      STRING          COMMENT '交易流水号',
    order_key           LONG            COMMENT '订单号',
    line_number         INT             COMMENT '明细行号',
    order_date          DATE            COMMENT '下单日期',
    ship_date           DATE            COMMENT '实际发货日期',
    commit_date         DATE            COMMENT '承诺送达日期',
    fulfillment_days    INT             COMMENT '履约耗时(天)',
    is_delayed          BOOLEAN         COMMENT '是否逾期',
    ship_mode           STRING          COMMENT '运输方式',
    customer_key        LONG            COMMENT '客户ID',
    clerk_id            STRING          COMMENT '经手员',
    priority            STRING          COMMENT '优先级',
    order_status        STRING          COMMENT '整单状态',
    line_status         STRING          COMMENT '明细状态',
    return_flag         STRING          COMMENT '退货标记',
    quantity            DECIMAL(10, 2)  COMMENT '数量',
    list_amount         DECIMAL(18, 2)  COMMENT '码价总额',
    discount_amount     DECIMAL(18, 2)  COMMENT '折扣金额',
    tax_amount          DECIMAL(18, 2)  COMMENT '税额',
    final_pay_amount    DECIMAL(18, 2)  COMMENT '实付金额'
)
USING iceberg
PARTITIONED BY (months(order_date));

步骤五:写入宽表

选择其中一种方式,将快照数据清洗后写入宽表。

Spark SQL

INSERT OVERWRITE LAKEHOUSE_DB.order_fulfillment_detail
SELECT 
    CONCAT(CAST(o.o_orderkey AS STRING), '-', CAST(l.l_linenumber AS STRING)) AS transaction_id,
    o.o_orderkey            AS order_key,
    l.l_linenumber          AS line_number,
    o.o_orderdate           AS order_date,
    l.l_shipdate            AS ship_date,
    l.l_commitdate          AS commit_date,
    DATEDIFF(l.l_shipdate, o.o_orderdate) AS fulfillment_days,
    CASE 
        WHEN l.l_shipdate > l.l_commitdate THEN true 
        ELSE false 
    END                     AS is_delayed,
    l.l_shipmode            AS ship_mode,
    o.o_custkey             AS customer_key,
    o.o_clerk               AS clerk_id,
    o.o_orderpriority       AS priority,
    o.o_orderstatus         AS order_status,
    l.l_linestatus          AS line_status,
    l.l_returnflag          AS return_flag,
    l.l_quantity            AS quantity,
    l.l_extendedprice       AS list_amount,
    CAST(l.l_extendedprice * l.l_discount AS DECIMAL(18, 2)) AS discount_amount,
    CAST(l.l_extendedprice * (1 - l.l_discount) * l.l_tax AS DECIMAL(18, 2)) AS tax_amount,
    CAST(l.l_extendedprice * (1 - l.l_discount) * (1 + l.l_tax) AS DECIMAL(18, 2)) AS final_pay_amount
FROM LAKEHOUSE_DB.ORDERS_DATALAKE_SNAPSHOT o
JOIN LAKEHOUSE_DB.LINEITEM_DATALAKE_SNAPSHOT l 
    ON o.o_orderkey = l.l_orderkey
WHERE 
    -- 时间过滤,减少 80% 以上的数据扫描
    o.o_orderdate >= '1998-01-01' 
    -- 状态过滤,只看已下单的活跃订单
    AND o.o_orderstatus IN ('O', 'F') 
    -- 进一步缩小范围(只分析高优先级的订单)
    AND o.o_orderpriority IN ('1-URGENT', '2-HIGH');

PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, concat, concat_ws, datediff, 
    when, round, count, sum, avg
)
from pyspark.sql.types import DecimalType


# ==========================================
# 1. 从数据仓库表的Iceberg快照表里,读取源数据 (Reader)
# ==========================================
df_orders = spark.table("LAKEHOUSE_DB.ORDERS_DATALAKE_SNAPSHOT").alias("o")
df_lineitem = spark.table("LAKEHOUSE_DB.LINEITEM_DATALAKE_SNAPSHOT").alias("l")

# ==========================================
# 2. 核心逻辑 (Transformation)
# ==========================================

# 关联逻辑
df_joined = df_orders.join(
    df_lineitem, 
    on=col("o.o_orderkey") == col("l.l_orderkey"), 
    how="inner")

# 计算与字段映射逻辑
df_result = df_joined.select(
    # --- 唯一标识 ---
    # 使用 concat_ws (With Separator) 拼接主键,比 SQL 的 concat 更优雅
    concat_ws("-", col("o.o_orderkey"), col("l.l_linenumber")).alias("transaction_id"),
    col("o.o_orderkey").alias("order_key"),
    col("l.l_linenumber").alias("line_number"),

    # --- 时间与履约 ---
    col("o.o_orderdate").alias("order_date"),
    col("l.l_shipdate").alias("ship_date"),
    col("l.l_commitdate").alias("commit_date"),
    
    # 履约耗时 (Datediff)
    datediff(col("l.l_shipdate"), col("o.o_orderdate")).alias("fulfillment_days"),
    
    # 是否逾期 (Boolean 逻辑)
    (col("l.l_shipdate") > col("l.l_commitdate")).alias("is_delayed"),
    
    col("l.l_shipmode").alias("ship_mode"),

    # --- 业务属性 ---
    col("o.o_custkey").alias("customer_key"),
    col("o.o_clerk").alias("clerk_id"),
    col("o.o_orderpriority").alias("priority"),
    col("o.o_orderstatus").alias("order_status"),
    col("l.l_linestatus").alias("line_status"),
    col("l.l_returnflag").alias("return_flag"),

    # --- 财务指标 (注意类型转换) ---
    col("l.l_quantity").cast(DecimalType(10, 2)).alias("quantity"),
    col("l.l_extendedprice").cast(DecimalType(18, 2)).alias("list_amount"),

    # 折扣金额 = Price * Discount
    (col("l.l_extendedprice") * col("l.l_discount"))
        .cast(DecimalType(18, 2)).alias("discount_amount"),

    # 税额 = Price * (1 - Discount) * Tax
    (col("l.l_extendedprice") * (1 - col("l.l_discount")) * col("l.l_tax"))
        .cast(DecimalType(18, 2)).alias("tax_amount"),

    # 实付金额 = Price * (1 - Discount) * (1 + Tax)
    (col("l.l_extendedprice") * (1 - col("l.l_discount")) * (1 + col("l.l_tax")))
        .cast(DecimalType(18, 2)).alias("final_pay_amount")
)

# ==========================================
# 3. 写入数据到数据湖 (Writer)
# ==========================================
TARGET_TABLE = "LAKEHOUSE_DB.order_fulfillment_detail"

print(f"正在写入表: {TARGET_TABLE} ...")

df_result.writeTo(TARGET_TABLE) \
    .overwritePartitions() # 或者 .overwrite() 视需求而定

print(">>> ETL 完成!")

步骤六:查询数据

Spark SQL查询。
SELECT * FROM LAKEHOUSE_DB.order_fulfillment_detail;

场景二:从数据湖加载数据到数据仓库

场景描述

将数据湖中的原始数据(Part 和 Partsupp)加工成宽表,写入数据仓库(Hot Storage),以支持高性能查询。

步骤一:初始化 Spark Session

  1. 网络信息页面查看交换机ID,可以前往VPC控制台查询该交换机对应的 IPv4 网段,将其加入数据安全-白名单设置并记录。

  2. ECS 控制台的安全组页面,选择全部类型后,搜索AnalytiDB MySQL实例的集群ID,并记录对应的安全组 ID。

  3. 在 Notebook 的 Python 单元格中执行以下代码。请替换:

    • 数据源变量名dbl_adbmysql_2

    • 交换机IDspark.adb.eni.vswitchId

    • 安全组IDsecurityGroupId

    from pyspark.sql import SparkSession
    
    # 获取环境变量
    host = dms.get_datasource("dbl_adbmysql_2")['host']
    username = dms.get_datasource("dbl_adbmysql_2")['username']
    password = dms.get_datasource("dbl_adbmysql_2")['password']
    
    # 初始化 SparkSession,配置 ENI 和 JDBC Catalog
    spark = SparkSession.builder.appName("FromLakeToWarehouse")\
            .config("spark.adb.eni.enabled", "true")\
            .config("spark.adb.eni.vswitchId", "vsw-*****")\
            .config("spark.adb.eni.securityGroupId", "sg-******") \
            .config("spark.sql.catalog.jdbc", "org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog") \
            .config("spark.sql.catalog.jdbc.url", f"jdbc:mysql://{host}:3306/?useServerPrepStmts=false&rewriteBatchedStatements=true") \
            .config("spark.sql.catalog.jdbc.user", username) \
            .config("spark.sql.catalog.jdbc.password", password) \
            .getOrCreate()

步骤二:准备湖仓源数据

(Spark SQL)在 LAKEHOUSE 中创建 Iceberg 格式的源表。

CREATE DATABASE IF NOT EXISTS LAKEHOUSE LOCATION 'oss://<bucket>/iceberg/';

CREATE TABLE IF NOT EXISTS LAKEHOUSE.Part
USING ICEBERG
AS SELECT * FROM ADB_External_TPCH_10GB.External_Part;

CREATE TABLE IF NOT EXISTS LAKEHOUSE.partsupp
USING ICEBERG
PARTITIONED BY (bucket(64, ps_partkey)) 
AS SELECT * FROM ADB_External_TPCH_10GB.External_Partsupp;

步骤三:创建目标数仓表

(XIHE SQL)在数据仓库中预先创建目标表。

CREATE DATABASE IF NOT EXISTS DATAWAREHOUSE;

CREATE TABLE IF NOT EXISTS DATAWAREHOUSE.product_inventory_analysis (
`sku_id` varchar NOT NULL COMMENT 'SKU唯一标识(PartKey-SuppKey)',
`part_key` bigint NOT NULL COMMENT '商品ID',
`supplier_key` bigint NOT NULL COMMENT '供应商ID',
`part_name` varchar NOT NULL COMMENT '商品名称',
`brand` varchar NOT NULL COMMENT '品牌',
`type` varchar NOT NULL COMMENT '类型',
`size` int NOT NULL COMMENT '尺寸',
`container` varchar NOT NULL COMMENT '包装方式',
`stock_qty` int NOT NULL COMMENT '当前库存数量',
`supply_cost` decimal(18, 2) NOT NULL COMMENT '进货成本(单价)',
`retail_price` decimal(18, 2) NOT NULL COMMENT '建议零售价',
`total_inv_cost` decimal(18, 2) NOT NULL COMMENT '库存总资金占用 (数量*进货价)',
`potential_revenue` decimal(18, 2) NOT NULL COMMENT '库存潜在营收 (数量*零售价)',
`unit_margin_amt` decimal(18, 2) NOT NULL COMMENT '单件潜在毛利 (零售价-进货价)',
`margin_rate` decimal(10, 4) NOT NULL COMMENT '潜在毛利率 ((零售-进货)/零售)',
`is_underwater` boolean NOT NULL COMMENT '是否价格倒挂 (进货价 > 零售价)'
) DISTRIBUTE BY HASH (`sku_id`);

步骤四:写入数据仓库

通过JDBC连接器写入仓表,目标表名必须包含jdbc.前缀。即jdbc.DATAWAREHOUSE.product_inventory_analysis

Spark SQL

INSERT INTO jdbc.DATAWAREHOUSE.product_inventory_analysis
SELECT 
    -- 1. 生成SKU主键
    CONCAT(CAST(p.p_partkey AS STRING), '-', CAST(ps.ps_suppkey AS STRING)) AS sku_id,
    p.p_partkey             AS part_key,
    ps.ps_suppkey           AS supplier_key,
    -- 2. 商品维度
    p.p_name                AS part_name,
    p.p_brand               AS brand,
    p.p_type                AS type,
    p.p_size                AS size,
    p.p_container           AS container,
    -- 3. 基础度量
    ps.ps_availqty          AS stock_qty,
    ps.ps_supplycost        AS supply_cost,
    p.p_retailprice         AS retail_price,
    -- 4. 资金占用 (Inventory Value)
    CAST(ps.ps_availqty * ps.ps_supplycost AS DECIMAL(18, 2)) AS total_inv_cost,
    -- 5. 潜在营收 (Potential GMV)
    CAST(ps.ps_availqty * p.p_retailprice AS DECIMAL(18, 2))  AS potential_revenue,
    -- 6. 单件毛利 (Unit Margin)
    CAST(p.p_retailprice - ps.ps_supplycost AS DECIMAL(18, 2)) AS unit_margin_amt,
    -- 7. 毛利率 (Margin Rate)
    CAST(
    CASE 
        WHEN p.p_retailprice = 0 THEN 0 
        ELSE (p.p_retailprice - ps.ps_supplycost) / p.p_retailprice 
    END 
    AS DECIMAL(10, 4))      AS margin_rate,
    -- 8. 倒挂风险判断
    CASE 
        WHEN ps.ps_supplycost > p.p_retailprice THEN true 
        ELSE false 
    END                     AS is_underwater
FROM LAKEHOUSE.part p
JOIN LAKEHOUSE.partsupp ps 
ON p.p_partkey = ps.ps_partkey;

PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, lit, concat, when, cast)
from pyspark.sql.types import DecimalType, BooleanType

df_part = spark.table("LAKEHOUSE.part").alias("p")
df_partsupp = spark.table("LAKEHOUSE.partsupp").alias("ps")


df_joined = df_part.join(
    df_partsupp,
    on=col("p.p_partkey") == col("ps.ps_partkey"),
    how="inner"
)

# 核心计算与转换逻辑
df_result = df_joined.select(
    # --- 1. 生成 SKU 主键 ---
    # SQL: CONCAT(CAST(p.p_partkey AS STRING), '-', CAST(ps.ps_suppkey AS STRING))
    concat(
        col("p.p_partkey").cast("string"), 
        lit("-"), 
        col("ps.ps_suppkey").cast("string")
    ).alias("sku_id"),
    
    col("p.p_partkey").alias("part_key"),
    col("ps.ps_suppkey").alias("supplier_key"),

    # --- 2. 商品维度 ---
    col("p.p_name").alias("part_name"),
    col("p.p_brand").alias("brand"),
    col("p.p_type").alias("type"),
    col("p.p_size").alias("size"),
    col("p.p_container").alias("container"),

    # --- 3. 基础度量 ---
    col("ps.ps_availqty").alias("stock_qty"),
    col("ps.ps_supplycost").alias("supply_cost"),
    col("p.p_retailprice").alias("retail_price"),

    # --- 4. 资金占用 (Inventory Value) ---
    # SQL: CAST(ps.ps_availqty * ps.ps_supplycost AS DECIMAL(18, 2))
    (col("ps.ps_availqty") * col("ps.ps_supplycost"))
        .cast(DecimalType(18, 2)).alias("total_inv_cost"),

    # --- 5. 潜在营收 (Potential GMV) ---
    # SQL: CAST(ps.ps_availqty * p.p_retailprice AS DECIMAL(18, 2))
    (col("ps.ps_availqty") * col("p.p_retailprice"))
        .cast(DecimalType(18, 2)).alias("potential_revenue"),

    # --- 6. 单件毛利 (Unit Margin) ---
    # SQL: CAST(p.p_retailprice - ps.ps_supplycost AS DECIMAL(18, 2))
    (col("p.p_retailprice") - col("ps.ps_supplycost"))
        .cast(DecimalType(18, 2)).alias("unit_margin_amt"),

    # --- 7. 毛利率 (Margin Rate) ---
    # 必须处理分母为 0 的情况
    # SQL: CASE WHEN p.p_retailprice = 0 THEN 0 ELSE (P-C)/P END
    when(col("p.p_retailprice") == 0, lit(0))
        .otherwise(
            (col("p.p_retailprice") - col("ps.ps_supplycost")) / col("p.p_retailprice")
        ).cast(DecimalType(10, 4)).alias("margin_rate"),

    # --- 8. 倒挂风险判断 ---
    # SQL: CASE WHEN cost > price THEN true ELSE false END
    when(col("ps.ps_supplycost") > col("p.p_retailprice"), lit(True))
        .otherwise(lit(False))
        .alias("is_underwater")
)

# ==========================================
# 写入数据仓库表 (DATAWAREHOUSE 层)
# ==========================================
# 根据 SQL 的 INSERT INTO jdbc.DATAWAREHOUSE... 映射目标表
TARGET_TABLE = "jdbc.DATAWAREHOUSE.product_inventory_analysis"

print(f"正在准备写入数据至: {TARGET_TABLE} ...")

try:
    df_result.write \
        .format("adb") \
        .mode("append") \
        .saveAsTable(TARGET_TABLE)
    print(">>> ETL 任务执行完成!数据已成功存入仓库。")
except Exception as e:
    print(f">>> ETL 任务失败,报错原因: {str(e)}")

步骤五:查询数据

Spark SQL查询。
SELECT * FROM DATAWAREHOUSE.product_inventory_analysis LIMIT 1000;