All Products
Search
Document Center

MaxCompute:SQL

Last Updated:Aug 11, 2023

PyODPS supports basic SQL statements that are supported on MaxCompute. This topic describes how to use SQL statements in PyODPS.

Background information

The following table describes the methods that you can use to execute MaxCompute SQL statements in PyODPS.

Method name

Description

execute_sql()/run_sql()

For more information, see Execution of SQL statements.

open_reader()

For more information, see Obtain the execution results of SQL statements.

Note

Not all SQL statements can be executed by using the methods of the MaxCompute entry object. Specific SQL statements that can be executed on the MaxCompute client may fail to be executed by using the execute_sql() and run_sql() methods. You must use other methods to execute non-DDL or non-DML statements. For example, you must use the run_security_query method to execute GRANT or REVOKE statements, and use the run_xflow or execute_xflow method to call API operations.

When you write a user-defined function (UDF) in Python, if the resources referenced by the UDF dynamically change, you can configure an alias for the old resource in the execute_sql() method and use the alias as the name of the new resource. This way, you do not need to delete the UDF or create another UDF. For more information, see Configure resource aliases.

Execution of SQL statements

This section describes MaxCompute SQL statements in PyODPS:

  • Parameters

    • statement: the SQL statement that you want to execute.

    • hints: the runtime parameters. The hints parameter is of the DICT type.

  • Return value

    Information about task instances is returned after you execute the execute_sql() and run_sql() methods. For more information, see Task instances.

  • Examples

    • Example 1

      Execute SQL statements.

      o.execute_sql('select * from table_name')  # Execute the statement in synchronous mode. Other instances are blocked until the execution of the SQL statement is complete. 
      instance = o.run_sql('select * from table_name')  # Execute the statement in asynchronous mode. 
      print(instance.get_logview_address())  # Obtain the Logview URL of an instance. 
      instance.wait_for_success()  # Other instances are blocked until the execution of the SQL statement is complete.

    • Example 2

      Configure the hints parameter for SQL statements.

      o.execute_sql('select * from pyodps_iris', hints={'odps.stage.mapper.split.size': 16})

      You can also globally configure sql.settings to configure runtime parameters. The following code shows an example. After you globally configure runtime parameters, the runtime parameters take effect each time you execute the SQL statement. For more information about the parameters that you can globally configure, see Flag parameters.

      from odps import options
      options.sql.settings = {'odps.stage.mapper.split.size': 16}
      o.execute_sql('select * from pyodps_iris')  # The hints parameter is automatically configured based on global settings.

Obtain the execution results of SQL statements

You can call the open_reader() method to read execution results of SQL statements. The return value varies based on the following scenarios:

  • If table data is read, structured data is returned. In this case, use the FOR clause to traverse each record.

    with o.execute_sql('select * from table_name').open_reader() as reader:
        for record in reader: # Process each record. 
            print(record)
  • If you run commands, such as desc, unstructured data is returned. In this case, call the reader.raw operation to obtain the command output.

    with o.execute_sql('desc table_name').open_reader() as reader:
        print(reader.raw)

If you call the open_reader() method, PyODPS automatically calls the old Result interface. This may cause a timeout or may limit the number of data records that can be obtained. You can use one of the following methods to specify PyODPS to call InstanceTunnel:

  • Add options.tunnel.use_instance_tunnel =True to the script.

  • Configure open_reader(tunnel=True). The following code shows an example. For PyODPS V0.7.7.1 and later, you can use the open_reader() method to read full data.

    with o.execute_sql('select * from table_name').open_reader(tunnel=True) as reader:
        for record in reader:
            print(record)
Note
  • If you are using an earlier version of MaxCompute or an error occurs when PyODPS calls InstanceTunnel, PyODPS generates an alert and automatically downgrades the call object to the old Result interface. You can identify the cause of the issue based on the information about the alert.

  • If your MaxCompute version supports only the old Result interface and you want to read all execution results of SQL statements, you can write the execution results to another table and then use the open_reader() method to read data from the table. This operation is subject to the security configurations of your MaxCompute project.

  • For more information about InstanceTunnel, see InstanceTunnel.

By default, PyODPS does not limit the amount of data that can be read from an instance. However, the project owner may configure protection settings for the MaxCompute project to limit the amount of data that can be read from the instance. In this case, data can be read only in read-limit mode. In this mode, the number of rows that can be read is restricted based on project configurations. In most cases, a maximum of 10,000 rows can be read. If PyODPS detects that the amount of data read from the instance is restricted and options.tunnel.limit_instance_tunnel is not configured, PyODPS automatically enables the read-limit mode.

  • If your project is protected and you want to manually enable the read-limit mode, you can add the configuration limit=True to the open_reader() method, such as open_reader(limit=True). You can also add the configuration options.tunnel.limit_instance_tunnel = True to enable the read-limit mode.

  • In specific environments, such as DataWorks, options.tunnel.limit_instance_tunnel may be set to True by default. In this case, if you want to read all data from the instance, you must add the tunnel=True and limit=False configurations for the open_reader() method, such as open_reader(tunnel=True, limit=False).

  • Important

    If your project is protected and the configurations tunnel=True and limit=False cannot be used to remove protection, you must contact the project owner to grant the related read permissions.

Configure resource aliases

If the resources referenced by a UDF dynamically change, you can configure an alias for the old resource and use the alias as the name of the new resource. This way, you do not need to delete the UDF or create another UDF.

from odps.models import Schema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file

@annotate('bigint->bigint')
class Example(object):
    def __init__(self):
        self.n = int(get_cache_file('test_alias_res1').read())

    def evaluate(self, arg):
        return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
                  class_type='test_alias.Example',
                  resources=['test_alias.py', 'test_alias_res1'])

table = o.create_table(
    'test_table',
    schema=Schema.from_lists(['size'], ['bigint']),
    if_not_exists=True
)

data = [[1, ], ]
# Write a row of data that contains only one value: 1. 
o.write_table(table, 0, [table.new_record(it) for it in data])

with o.execute_sql(
    'select test_alias_func(size) from test_table').open_reader() as reader:
    print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# Set the alias of resource res1 as the name of resource res2. You do not need to modify the UDF or resource. 
with o.execute_sql(
    'select test_alias_func(size) from test_table',
    aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
    print(reader[0][0])

In specific scenarios, you must specify biz_id for the SQL statement to execute. Otherwise, an error occurs. If an error occurs, you can configure biz_id in the global options to fix the error.

from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')