查询加速MCQA(MaxCompute Query Acceleration)是MaxCompute内建的功能,使用原生的MaxCompute SQL语言,并且支持所有的MaxCompute内建函数以及权限系统。本文为您介绍如何使用MCQA功能。
背景信息
- MaxCompute客户端:无缝接入,使用说明请参见MaxCompute客户端。
- DataWorks临时查询或数据开发:默认无缝接入,使用说明请参见DataWorks临时查询或数据开发。
- JDBC:无缝接入,使用说明请参见JDBC。
- SDK:需要配置Pom依赖,使用说明请参见基于Java SDK启用MCQA功能。
- MaxCompute Studio:需要配置SQL执行模式,使用说明请参见基于MaxCompute Studio启用MCQA功能
- PyODPS:需要调用查询加速执行方法,使用说明请参见基于PyODPS启用MCQA功能。
- SQLAlchemy:基于PyODPS无缝接入,使用说明请参见基于PyODPS,使用SQLAlchemy或其他支持SQLAlchemy接口的第三方工具实现查询加速。
MaxCompute客户端
您可以通过MaxCompute客户端开启MCQA功能,操作步骤如下:
DataWorks临时查询或数据开发
DataWorks的临时查询及手动业务流程模块默认开启MCQA功能,您无需手动开启。如果您需要关闭MCQA功能,请提工单处理。
JDBC
- 使用JDBC连接MaxCompute,详情请参见JDBC使用说明。如果您需要在该场景下开启MCQA功能,需要修改相关配置,详情请参见配置JDBC启用MCQA功能。
- 使用JDBC连接Tableau,对MaxCompute中的数据进行可视化分析,详情请参见配置JDBC使用Tableau。如果您需要在该场景下开启MCQA功能,需要修改相关配置,详情请参见基于JDBC配置Tableau启用MCQA功能。
- 使用JDBC连接SQL Workbench/J,使用SQL Workbench/J对MaxCompute中的数据执行SQL语句,详情请参见配置JDBC使用SQL Workbench/J。如果您需要在该场景下开启MCQA功能,需要修改相关配置,详情请参见基于JDBC配置SQLWorkBench启用MCQA功能。
配置JDBC启用MCQA功能
使用JDBC连接MaxCompute时,您可以通过执行如下操作开启MCQA功能。使用JDBC连接MaxCompute的操作详情请参见JDBC使用说明。
基于JDBC配置Tableau启用MCQA功能
服务器增加interactiveMode=true
属性,用于开启MCQA功能。建议您同步增加enableOdpsLogger=true
属性,用于打印日志。配置操作详情请参见配置JDBC使用Tableau。
http://service.cn-beijing.maxcompute.aliyun.com/api?project=****_beijing&interactiveMode=true&enableOdpsLogger=true&autoSelectLimit=1000000000"
如果只对项目空间中的部分表进行Tableau操作,您可以在服务器参数中增加table_list=table_name1, table_name2
属性选择需要的表,表之间用英文逗号(,)分隔。如果表过多,会导致Tableau打开缓慢,强烈建议使用此方式只载入需要的表。示例如下。http://service.cn-beijing.maxcompute.aliyun.com/api?project=****_beijing&interactiveMode=true&enableOdpsLogger=true&autoSelectLimit=1000000000"&table_list=orders,customers
对于有大量分区的表不建议把所有分区的数据都设置成数据源,可以筛选需要的分区,或通过自定义SQL获取需要的数据。基于JDBC配置SQLWorkBench启用MCQA功能
完成JDBC驱动配置后,在Profile配置界面修改已填写的JDBC URL,支持SQLWorkBench使用MCQA功能。Profile配置操作详情请参见配置JDBC使用SQL Workbench/J。
jdbc:odps:<MaxCompute_endpoint>?project=<MaxCompute_project_name>&accessId=<AccessKey
ID>&accessKey=<AccessKey Secret>&charset=UTF-8&interactiveMode=true&autoSelectLimit=1000000000"
,参数说明如下:
- maxcompute_endpoint:MaxCompute服务所在区域的Endpoint。详情请参见Endpoint。
- maxcompute_project_name:MaxCompute项目空间名称。
- AccessKey ID:有访问指定项目空间权限的AccessKey ID。
- AccessKey Secret:AccessKey ID对应的AccessKey Secret。
- charset=UTF-8:字符集编码格式。
- interactiveMode:MCQA功能开关,
true
表示开启MCQA功能。 - autoSelectLimit:数据量超过100万限制时,需要配置此参数。
基于Java SDK启用MCQA功能
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>0.35.5-public</version>
</dependency>
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.ResultSet;
import com.aliyun.odps.sqa.*;
import java.io.IOException;
import java.util.*;
public class SQLExecutorExample {
public static void SimpleExample() {
// 设置账号和项目信息。
Account account = new AliyunAccount("<your_access_id>", "<your_access_key>");
Odps odps = new Odps(account);
odps.setDefaultProject("<your_project_name>");
odps.setEndpoint("http://service.<regionid>.maxcompute.aliyun.com/api");
// 准备构建SQLExecutor。
SQLExecutorBuilder builder = SQLExecutorBuilder.builder();
SQLExecutor sqlExecutor = null;
try {
// run in offline mode or run in interactive mode
if (false) {
// 创建一个默认执行离线SQL的Executor。
sqlExecutor = builder.odps(odps).executeMode(ExecuteMode.OFFLINE).build();
} else {
// 创建一个默认执行查询加速SQL的Executor,并且在查询加速模式失败后,自动回退到离线查询。
sqlExecutor = builder.odps(odps).executeMode(ExecuteMode.INTERACTIVE).fallbackPolicy(FallbackPolicy.alwaysFallbackPolicy()).build();
}
// 如果需要的话可以传入查询的特殊设置。
Map<String, String> queryHint = new HashMap<>();
queryHint.put("odps.sql.mapper.split.size", "128");
// 提交一个查询作业,支持传入Hint。
sqlExecutor.run("select count(1) from test_table;", queryHint);
// 列举一些支持的常用获取信息的接口。
// UUID
System.out.println("ExecutorId:" + sqlExecutor.getId());
// 当前查询作业的logview。
System.out.println("Logview:" + sqlExecutor.getLogView());
// 当前查询作业的Instance对象(Interactive模式多个查询作业可能为同一个Instance)。
System.out.println("InstanceId:" + sqlExecutor.getInstance().getId());
// 当前查询作业的阶段进度(Console的进度条)。
System.out.println("QueryStageProgress:" + sqlExecutor.getProgress());
// 当前查询作业的执行状态变化日志,例如回退信息。
System.out.println("QueryExecutionLog:" + sqlExecutor.getExecutionLog());
// 提供两种获取结果的接口。
if(false) {
// 直接获取全部查询作业结果,同步接口,可能会占用本线程直到查询成功或失败。
// 一次性读取全部结果数据到内存中,当数据量较大时不建议使用,可能会有内存问题。
List<Record> records = sqlExecutor.getResult();
printRecords(records);
} else {
// 获取查询结果的迭代器ResultSet,同步接口,可能会占用本线程直到查询成功或失败。
// 获取大量结果数据时推荐使用,分次读取查询结果。
ResultSet resultSet = sqlExecutor.getResultSet();
while (resultSet.hasNext()) {
printRecord(resultSet.next());
}
}
// run another query
sqlExecutor.run("select * from test_table;", new HashMap<>());
if(false) {
// 直接获取全部查询结果,同步接口,可能会占用本线程直到查询成功或失败。
// 一次性读取全部结果数据到内存中,当数据量较大时不建议使用,可能会有内存问题。
List<Record> records = sqlExecutor.getResult();
printRecords(records);
} else {
// 获取查询结果的迭代器ResultSet,同步接口,可能会占用本线程直到查询成功或失败。
// 获取大量结果数据时推荐使用,分次读取查询结果。
ResultSet resultSet = sqlExecutor.getResultSet();
while (resultSet.hasNext()) {
printRecord(resultSet.next());
}
}
} catch (OdpsException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (sqlExecutor != null) {
// 关闭Executor释放相关资源。
sqlExecutor.close();
}
}
}
// SQLExecutor can be reused by pool mode
public static void ExampleWithPool() {
// 设置账号和项目信息。
Account account = new AliyunAccount("your_access_id", "your_access_key");
Odps odps = new Odps(account);
odps.setDefaultProject("your_project_name");
odps.setEndpoint("http://service.<regionid>.maxcompute.aliyun.com/api");
// 通过连接池方式执行查询。
SQLExecutorPool sqlExecutorPool = null;
SQLExecutor sqlExecutor = null;
try {
// 准备连接池,设置连接池大小和默认执行模式。
SQLExecutorPoolBuilder builder = SQLExecutorPoolBuilder.builder();
builder.odps(odps)
.initPoolSize(1) // init pool executor number
.maxPoolSize(5) // max executors in pool
.executeMode(ExecuteMode.INTERACTIVE); // run in interactive mode
sqlExecutorPool = builder.build();
// 从连接池中获取一个Executor,如果不够将会在Max限制内新增Executor。
sqlExecutor = sqlExecutorPool.getExecutor();
// Executor具体用法和上一示例一致。
sqlExecutor.run("select count(1) from test_table;", new HashMap<>());
System.out.println("InstanceId:" + sqlExecutor.getId());
System.out.println("Logview:" + sqlExecutor.getLogView());
List<Record> records = sqlExecutor.getResult();
printRecords(records);
} catch (OdpsException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
sqlExecutor.close();
}
sqlExecutorPool.close();
}
private static void printRecord(Record record) {
for (int k = 0; k < record.getColumnCount(); k++) {
if (k != 0) {
System.out.print("\t");
}
if (record.getColumns()[k].getType().equals(OdpsType.STRING)) {
System.out.print(record.getString(k));
} else if (record.getColumns()[k].getType().equals(OdpsType.BIGINT)) {
System.out.print(record.getBigint(k));
} else {
System.out.print(record.get(k));
}
}
}
private static void printRecords(List<Record> records) {
for (Record record : records) {
printRecord(record);
System.out.println();
}
}
public static void main(String args[]) {
SimpleExample();
ExampleWithPool();
}
}
基于MaxCompute Studio启用MCQA功能
V3.5.0或以上版本的MaxCompute Studio插件支持MCQA功能,推荐您安装最新版本的MaxCompute Studio插件。更多安装MaxCompute Studio插件的操作,请参见安装MaxCompute Studio。
您在MaxCompute Studio的SQL编辑器中,选择带查询加速功能的SQL执行模式查询加速或加速失败重跑后,执行查询语句即可启用查询加速功能。如下图所示。

- 查询加速:使用查询加速功能执行SQL查询语句。
- 加速失败重跑:使用查询加速功能执行SQL查询语句失败后,回退到离线模式(即默认模式)执行SQL查询语句。
基于PyODPS启用MCQA功能
run_sql_interactive()
方法启用MCQA功能。命令示例如下。odps = ODPS(...)
instance = odps.run_sql_interactive('select count(*) from test_table')
# logview
print(instance.get_logview_address())
# 可选:如果使用Tunnel下载结果,则不需要客户端等待。
# 等待运行完毕,也可以使用wait_for_completion,区别在于wait_for_success将在任务失败时抛出异常。
instance.wait_for_success()
# 下载结果
# 显示ODPS返回的警告
print(instance.get_warnings())
# 用于屏显,若任务失败则返回错误信息
print(instance.get_printable_result())
# 从frontend下载结果
for each_record in instance.open_reader():
pass # each_record 是 odps.Record 类型
# 使用tunnel下载结果
for each_record in instance.open_reader(tunnel=True):
pass
run_sql_interactive()
方法可携带的参数如下:
service_name
:可选。用于指定查询加速会话的名称。service_startup_timeout
:可选。用于指定等待Session Attach的超时时间。force_reattach
:可选。run_sql_interactive()
默认会自动复用Session。用于强制重新Attach Session。
基于PyODPS,使用SQLAlchemy或其他支持SQLAlchemy接口的第三方工具实现查询加速
interactive_mode=true
:必填。查询加速功能总开关。reuse_odps=true
:可选。打开强制复用连接,对于部分第三方工具(例如Apache SuperSet),打开此选项可提高性能。
fallback_policy=<policy1>,<policy2>,...
参数,完善处理逻辑。与JDBC的配置项类似,控制加速失败的回退行为。
generic
:默认为False,设置为True时,表示发生未知错误时回退到离线模式。noresource
:默认为False,设置为True时,表示发生资源不足问题时回退到离线模式。upgrading
:默认为False,设置为True时,表示升级期间回退到离线模式。timeout
:默认为False,设置为True时,表示执行超时时回退到离线模式。unsupported
:默认为False,设置为True时,表示遇到MCQA不支持的场景时回退到离线模式。default
:等同于同时指定unsupported、upgrading、noresource和timeout。如果连接串中未指定fallback_policy
,则此项为默认值。all
:默认为False,设置为True时,表示在以上几种场景下全部回退到离线模式,仅在JDBC 3.2.3及以上版本才支持。
odps://<access_id>:<access_key>@<project>/?endpoint=<endpoint>&interactive_mode=true&reuse_odps=true&fallback_policy=unsupported,upgrading,noresource