Arrow Flight SQL 基于 Apache Arrow 列式内存格式与 gRPC 网络通信框架,相比 MySQL 协议在批量数据读取场景下性能提升明显。本文介绍在云数据库 SelectDB 版实例上启用 Arrow Flight SQL,并通过 Python ADBC、Java ADBC、JDBC 客户端连接的完整步骤。
使用场景
大批量数据读取:从 SelectDB 拉取百万级以上的查询结果到 Pandas、Spark 等下游分析框架时,使用 Arrow 列存格式可避免逐行序列化与反序列化开销,吞吐显著高于 MySQL 协议。
BI 与数据科学集成:Python(ADBC)、Java(ADBC、JDBC)、DBeaver 等通过 arrow-flight-sql 协议连接 SelectDB,可与 Pandas DataFrame、Arrow Table 等内存模型无缝衔接。
前提条件
SelectDB 实例处于运行中状态,且实例版本为 Cloud 4.0.5 及以上。
客户端能通过内网访问 SelectDB 实例。Arrow Flight SQL 协议要求客户端能访问到 BE 节点的 Arrow Flight 端口(8050),该端口仅在 VPC 范围内开放,不通过公网访问。如客户端不在同一 VPC,请通过 VPC 对等连接、云企业网(CEN)或专线打通跨 VPC、IDC 的网络。
实例已开通「集群直连」。Arrow Flight、Connector 等组件依赖该选项放开 VPC 内的内部端口;未开通时 ADBC 客户端拉数据会出现
i/o timeout。详情请参见开通集群直连。客户端 IP(或同 VPC ECS 的内网网段)已加入实例的白名单。
获取连接信息
在页面左上角选择实例所在地域。
在实例列表中点击目标实例 ID 进入实例详情页。
在网络信息面板获取以下信息:
VPC 地址:例如
selectdb-cn-xxxx.selectdbfe.rds.aliyuncs.com。Arrow Flight SQL 端口:默认
8070。可在 SQL 中执行ADMIN SHOW FRONTEND CONFIG LIKE '%arrow_flight%'复核。
通过 Python ADBC 驱动连接
ADBC(Arrow Database Connectivity)是 Apache Arrow 项目的标准化数据库连接接口。Python ADBC 直接返回 Arrow Table,可零拷贝转为 Pandas DataFrame。
安装依赖
使用 pip 安装两个包:通用 ADBC 管理库 + Flight SQL 驱动。
pip install adbc_driver_manager adbc_driver_flightsql连接并查询数据
下面的脚本连接 SelectDB 并将查询结果转为 Pandas DataFrame:
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
# 用 adbc_driver_manager.DatabaseOptions 提供账号密码
conn = flight_sql.connect(
uri="grpc://<VPC地址>:<Arrow Flight SQL端口>",
db_kwargs={
adbc_driver_manager.DatabaseOptions.USERNAME.value: "<数据库账号>",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "<数据库密码>",
},
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM demo.example_table LIMIT 1000")
# fetchallarrow() 返回 Arrow Table,零拷贝转 Pandas
arrow_table = cursor.fetchallarrow()
df = arrow_table.to_pandas()
print(f"got {len(df)} rows")
print(df.head())
cursor.close()
conn.close()用户名 / 密码选项位于 adbc_driver_manager.DatabaseOptions(不是 adbc_driver_flightsql.DatabaseOptions),后者只包含 FlightSQL 特有选项(OAUTH、TLS 等)。导入错误会直接抛 AttributeError: USERNAME。
关于 DDL / DML / 管理语句
CREATE TABLE / INSERT / SET / SHOW VARIABLES 等命令可以通过 cursor.execute() 提交并在服务端正常生效,但获取返回状态需要 ADBC 客户端连到 FE 节点拉取 StatusResult,部分场景下会出现 i/o timeout。建议把这一类操作改用 MySQL 9030 协议(pymysql、JDBC mysql 驱动)执行;Arrow Flight SQL 通道专门用于批量读取查询结果。
通过 JDBC Driver 连接
阿里云 SelectDB 兼容开源 Apache Arrow 提供的 JDBC 驱动 flight-sql-jdbc-core。可以在 Java、Spark JDBC、BI 工具(DBeaver 等)中使用。
添加 Maven 依赖
在 pom.xml 中新增依赖。建议把版本号定义在 properties 中,便于统一升级:
<properties>
<arrow.version>17.0.0</arrow.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql-jdbc-core</artifactId>
<version>${arrow.version}</version>
</dependency>
</dependencies>使用 Java 9 及以上 JDK 时,必须给 JVM 加 --add-opens=java.base/java.nio=ALL-UNNAMED,否则会报 module java.base does not "opens java.nio" to unnamed module。命令行示例:java --add-opens=java.base/java.nio=ALL-UNNAMED -cp ... MainClass;IntelliJ 用户在「Run/Debug Configurations - VM options」中加入相同参数。
连接并查询
JDBC URL 协议头为 jdbc:arrow-flight-sql,其余用法与 jdbc:mysql 接近。返回的 ResultSet 由 Arrow Table 包装而成:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
public class ArrowFlightSqlExample {
public static void main(String[] args) throws Exception {
String url = "jdbc:arrow-flight-sql://<VPC地址>:<Arrow Flight SQL端口>"
+ "?useServerPrepStmts=false"
+ "&useSSL=false"
+ "&useEncryption=false";
String user = "<数据库账号>";
String password = "<数据库密码>";
Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver");
try (Connection conn = DriverManager.getConnection(url, user, password);
Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("SELECT * FROM demo.example_table LIMIT 100");
while (rs.next()) {
System.out.println(rs.getInt(1) + " | " + rs.getString(2));
}
rs.close();
}
}
}通过 Java ADBC Driver 连接
如果下游分析直接基于 Arrow 列存(不需要 JDBC ResultSet 的行存格式),可以使用 Java ADBC Driver,直接拿到 Arrow VectorSchemaRoot,省一次行列转换。
添加 Maven 依赖
<properties>
<adbc.version>0.15.0</adbc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-core</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-manager</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-flight-sql</artifactId>
<version>${adbc.version}</version>
</dependency>
</dependencies>连接并迭代结果
import org.apache.arrow.adbc.core.AdbcConnection;
import org.apache.arrow.adbc.core.AdbcDatabase;
import org.apache.arrow.adbc.core.AdbcDriver;
import org.apache.arrow.adbc.core.AdbcStatement;
import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriver;
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import java.util.HashMap;
import java.util.Map;
public class ArrowFlightAdbcExample {
public static void main(String[] args) throws Exception {
BufferAllocator allocator = new RootAllocator();
FlightSqlDriver driver = new FlightSqlDriver(allocator);
Map<String, Object> parameters = new HashMap<>();
AdbcDriver.PARAM_URI.set(parameters,
Location.forGrpcInsecure("<VPC地址>", <Arrow Flight SQL端口>).getUri().toString());
AdbcDriver.PARAM_USERNAME.set(parameters, "<数据库账号>");
AdbcDriver.PARAM_PASSWORD.set(parameters, "<数据库密码>");
try (AdbcDatabase db = driver.open(parameters);
AdbcConnection connection = db.connect();
AdbcStatement stmt = connection.createStatement()) {
stmt.setSqlQuery("SELECT * FROM demo.example_table LIMIT 100");
try (AdbcStatement.QueryResult result = stmt.executeQuery();
ArrowReader reader = result.getReader()) {
while (reader.loadNextBatch()) {
VectorSchemaRoot batch = reader.getVectorSchemaRoot();
System.out.println("rows in batch: " + batch.getRowCount());
// 在这里把 batch 接入 Spark / Flink / 自定义计算
}
}
}
allocator.close();
}
}三种 Java 连接方式如何选
方式 | 返回数据形态 | 适用场景 |
通过 JDBC Driver 连接(flight-sql-jdbc-core) | JDBC ResultSet(行存) | 已有按行处理的 Java 应用,希望换协议获得加速;BI 工具(DBeaver、Tableau 等)。 |
通过 Java ADBC Driver 连接(FlightSqlDriver) | Arrow VectorSchemaRoot(列存) | 下游基于 Arrow 列存或对接 Spark/Flink 等可消费 Arrow 的引擎。 |
jdbc:mysql(对照参考) | JDBC ResultSet(行存) | 仅查询小结果集或执行 DDL/DML,无需追求传输性能。 |
常见问题
Q:连接时报 i/o timeout, dial tcp x.x.x.x:8050 怎么办?
A:客户端拿到的 BE 真实 IP 不可达。请按以下顺序排查:
客户端是否能通过内网访问 SelectDB 实例。Arrow Flight 不支持公网访问;如客户端不在同 VPC,需通过 VPC 对等连接、云企业网(CEN)或专线先打通网络。
实例是否已开通「集群直连」(实例详情页「网络信息」面板),未开通时 BE 端口对客户端不可达。
客户端 IP 是否已加入实例白名单。
Q:Python ADBC 报 AttributeError: USERNAME。
A:请把 DatabaseOptions.USERNAME 改为 adbc_driver_manager.DatabaseOptions.USERNAME,FlightSQL 驱动自身的 DatabaseOptions 不包含账号密码键。
Q:Java 启动时报 module java.base does not "opens java.nio"。
A:JVM 启动参数添加 --add-opens=java.base/java.nio=ALL-UNNAMED。
Q:Arrow Flight 连接相比 jdbc:mysql 没有明显加速。
A:Arrow Flight 的优势在于批量传输,对小结果集(< 1 万行)和 DDL 优势不明显。建议在十万行以上的查询上做对比基准。