All Products
Search
Document Center

MaxCompute:Use the Go SDK

Last Updated:Mar 25, 2026

The MaxCompute Go SDK lets you run SQL, upload and download data, and manage tables, partitions, schemas, and instances — all from Go code. This topic covers each capability with working examples.

Note

All examples load credentials from a config.ini file. Any of the authentication methods described in Configure access credentials of the Go SDK work with an ODPS object.

Prerequisites

Before you begin, ensure that you have:

  • A MaxCompute project

  • An ODPS object created from valid credentials

  • The Go SDK installed (github.com/aliyun/aliyun-odps-go-sdk/odps)

Execute SQL

The SDK provides two ways to run MaxCompute SQL: the SQLTask object and the MaxCompute SQL Driver.

Use SQLTask

SQLTask.Run() submits a SQL statement and returns an instance object representing the job. How you retrieve results depends on the size of the result set:

Result sizeRetrieval method
Fewer than 10,000 rowsRead directly from the instance using a CSV Reader
More than 10,000 rowsDownload via Tunnel

Execute SELECT and get results (fewer than 10,000 rows)

package main

import (
"fmt"
"io"
"log"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

sql := "select * from all_types_demo where p1>0 or p2 > '';"

// SQL engine parameters, for example, odps.sql.skewjoin
var hints map[string]string = nil

sqlTask := odps.NewSqlTask("select_demo", sql, hints)
ins, err := sqlTask.Run(odpsIns, odpsIns.DefaultProjectName())
if err != nil {
	log.Fatalf("%+v", err)
}

err = ins.WaitForSuccess()
if err != nil {
	log.Fatalf("%+v", err)
}

csvReader, err := sqlTask.GetSelectResultAsCsv(ins, true)
if err != nil {
	log.Fatalf("%+v", err)
}

for {
	record, err := csvReader.Read()
	if err == io.EOF {
		break
	}
	if err != nil {
		log.Fatalf("%+v", err)
	}
	fmt.Printf("%v\n", record)
}
}

Execute SELECT and download results via Tunnel (more than 10,000 rows)

For large result sets, create a Tunnel download session against the instance ID after the job completes.

package main

import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)

func main() {
// Configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
    log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

sql := "select * from all_types_demo where p1 = 20 and p2 = 'hangzhou';"

// SQL engine parameters, for example, odps.sql.skewjoin
var hints map[string]string = nil

sqlTask := odps.NewSqlTask("select_demo", sql, hints)

// Run SQL using the quota associated with the project
projectName := odpsIns.DefaultProjectName()

ins, err := sqlTask.Run(odpsIns, projectName)
if err != nil {
	log.Fatalf("%+v", err)
}

err = ins.WaitForSuccess()
if err != nil {
	log.Fatalf("%+v", err)
}

// Generate a LogView URL for detailed job information
lv := odpsIns.LogView()
lvUrl, err := lv.GenerateLogView(ins, 10)
if err != nil {
	log.Fatalf("%+v", err)
}

println(lvUrl)

project := odpsIns.DefaultProject()
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
	log.Fatalf("%+v", err)
}

// Create a Tunnel instance
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
session, err := tunnelIns.CreateInstanceResultDownloadSession(project.Name(), ins.Id())
if err != nil {
	log.Fatalf("%+v", err)
}

start := 0
step := 200000
recordCount := session.RecordCount()
schema := session.Schema()
total := 0

for start < recordCount {
	reader, err := session.OpenRecordReader(start, step, 0, nil)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	count := 0
	err = reader.Iterator(func(record data.Record, _err error) {
		count += 1

		if _err != nil {
			return
		}

		for i, d := range record {
			if d == nil {
				fmt.Printf("%s=null", schema.Columns[i].Name)
			} else {
				fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
			}

			if i < record.Len()-1 {
				fmt.Printf(", ")
			} else {
				fmt.Println()
			}
		}
	})

	if err != nil {
		log.Fatalf("%+v", err)
	}

	start += count
	total += count
	log.Println(count)
	if err = reader.Close(); err != nil {
		log.Fatalf("%+v", err)
	}
}

println("total count ", total)
}

Use MaxCompute SQL Driver

The MaxCompute SQL Driver implements the standard database/sql interface, so you can use familiar Go patterns like db.Exec and db.Query.

Execute a CREATE TABLE statement

The DSN format is http://<endpoint>?project=<project>&<hints>. Set your AccessKey in the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECURITY environment variables.

package main

import (
"database/sql"
"log"
)

func main() {
// Configure AccessKey in environment variables: ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECURITY
dsn := "http://<endpoint>?project=<project>&odps.sql.type.system.odps2=true&odps.sql.decimal.odps2=true"

db, err := sql.Open("odps", dsn)
if err != nil {
	log.Fatalf("%+v", err)
}

sqlStr := "create table table_with_date (date_col DATE);"
_, err = db.Exec(sqlStr)
if err != nil {
	log.Fatalf("%+v", err)
}
}

Execute a SELECT statement and read results

This example reads the DSN from a config.ini file and uses named parameters to bind query values.

package main

import (
"database/sql"
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/sqldriver"
"log"
"reflect"
)

func main() {
config, err := sqldriver.NewConfigFromIni("./config.ini")
if err != nil {
	log.Fatalf("%+v", err)
}

dsn := config.FormatDsn()
// or dsn := "http://<accessId>:<accessKey>@<endpoint>?project=<project>"

db, err := sql.Open("odps", dsn)
if err != nil {
	log.Fatalf("%+v", err)
}

selectSql := "select * from all_types_demo where bigint_type=@bigint_type and p1=@p1 and p2='@p2';"

rows, err := db.Query(
	selectSql,
	sql.Named("bigint_type", 100000000000),
	sql.Named("p1", 20),
	sql.Named("p2", "hangzhou"),
)
if err != nil {
	log.Fatalf("%+v", err)
}

columnTypes, err := rows.ColumnTypes()
if err != nil {
	log.Fatalf("%+v", err)
}

record := make([]interface{}, len(columnTypes))

for i, columnType := range columnTypes {
	record[i] = reflect.New(columnType.ScanType()).Interface()
	t := reflect.TypeOf(record[i])

	fmt.Printf("kind=%s, name=%s\n", t.Kind(), t.String())
}

columns, err := rows.Columns()
if err != nil {
	log.Fatalf("%+v", err)
}

for rows.Next() {
	err = rows.Scan(record...)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for i, r := range record {
		rr := r.(sqldriver.NullAble)

		if rr.IsNull() {
			fmt.Printf("%s=NULL", columns[i])
		} else {
			switch r.(type) {
			case *sqldriver.NullInt8:
				fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt8).Int8)
			case *sqldriver.NullInt16:
				fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt16).Int16)
			case *sqldriver.NullInt32:
				fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt32).Int32)
			case *sqldriver.NullInt64:
				fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt64).Int64)
			case *sqldriver.Binary:
				fmt.Printf("%s=%s", columns[i], r)
			case *sqldriver.NullFloat32:
				fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat32).Float32)
			case *sqldriver.NullFloat64:
				fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat64).Float64)
			case *sqldriver.Decimal:
				fmt.Printf("%s=%s", columns[i], r)
			case *sqldriver.NullString:
				fmt.Printf("%s=%s", columns[i], r.(*sqldriver.NullString).String)
			case *sqldriver.NullDate:
				fmt.Printf("%s=%s", columns[i], r)
			case *sqldriver.NullDateTime:
				fmt.Printf("%s=%s", columns[i], r)
			case *sqldriver.NullTimeStamp:
				fmt.Printf("%s=%s", columns[i], r)
			case *sqldriver.NullBool:
				fmt.Printf("%s=%v", columns[i], r.(*sqldriver.NullBool).Bool)
			case *sqldriver.Map:
				fmt.Printf("%s=%s", columns[i], r)
			case *sqldriver.Array:
				fmt.Printf("%s=%s", columns[i], r)
			case *sqldriver.Struct:
				fmt.Printf("%s=%s", columns[i], r)
			case *sqldriver.Json:
				fmt.Printf("%s=%s", columns[i], r)
			}
		}

		if i < len(record)-1 {
			fmt.Printf(", ")
		} else {
			fmt.Print("\n\n")
		}
	}
}
}

Go data type mapping

When scanning query results, use the following Go types for each MaxCompute SQL type:

MaxCompute SQL typeGo type (via sqldriver)
TINYINT*sqldriver.NullInt8
SMALLINT*sqldriver.NullInt16
INT*sqldriver.NullInt32
BIGINT*sqldriver.NullInt64
FLOAT*sqldriver.NullFloat32
DOUBLE*sqldriver.NullFloat64
DECIMAL*sqldriver.Decimal
VARCHAR*sqldriver.NullString
CHAR*sqldriver.NullString
STRING*sqldriver.NullString
BINARY*sqldriver.Binary
BOOLEAN*sqldriver.NullBool
DATE*sqldriver.NullDate
DATETIME*sqldriver.NullDateTime
TIMESTAMP*sqldriver.NullTimeStamp
MAP*sqldriver.Map
ARRAY*sqldriver.Array
STRUCT*sqldriver.Struct
JSON*sqldriver.Json

Use sqldriver.NullAble.IsNull() to check for null values before reading the underlying field.

Data upload and download

Use the Tunnel for batch uploads and downloads. For streaming writes, see Overview of streaming tunnel.

Initialize the Tunnel

All upload and download operations start with a Tunnel instance. The following example gets the Tunnel endpoint from the project and creates a Tunnel object.

package main

import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)

func main() {
// Get configuration information from the configuration file
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
	log.Fatalf("%+v", err)
}

// Initialize ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()

// Get Tunnel endpoint
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
	log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)

// Initialize Tunnel
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)

println("%+v", tunnelIns)
}

Batch upload

Batch upload follows three steps:

  1. Create an UploadSession, specifying the target table or partition and a compression algorithm.

  2. Open one or more Writer objects from the session. Each Writer uploads one data block, identified by an integer block ID. Multiple Writers can run concurrently to increase throughput.

  3. After all Writers finish, call UploadSession.Commit with the list of block IDs to finalize the upload.

package main

import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)

func main() {
// Get configuration information from the configuration file
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
	log.Fatalf("%+v", err)
}

// Initialize ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()

// Initialize Tunnel
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
	log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)

        // Create an UploadSession, specify the table or partition to write to
session, err := tunnelIns.CreateUploadSession(
	project.Name(),
	"all_types_demo",
	tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
	tunnel.SessionCfg.WithDefaultDeflateCompressor(),
)
if err != nil {
	log.Fatalf("%+v", err)
}

writerNum := 3
blockIds := make([]int, writerNum)
for i := 0; i < writerNum; i++ {
	blockIds[i] = i
}

errChan := make(chan error, writerNum)

// Concurrently upload data through multiple writers; each writer has a blockId as its identity
for _, blockId := range blockIds {
	blockId := blockId
	go func() {
		schema := session.Schema()
		record, err := makeRecord(schema)
		if err != nil {
			errChan <- err
			return
		}

		recordWriter, err := session.OpenRecordWriter(blockId)
		if err != nil {
			errChan <- err
			return
		}

		for i := 0; i < 100; i++ {
			err = recordWriter.Write(record)

			if err != nil {
				_ = recordWriter.Close()
				errChan <- err
				return
			}
		}
		err = recordWriter.Close()
		if err == nil {
			fmt.Printf("success to upload %d record, %d bytes\n", recordWriter.RecordCount(), recordWriter.BytesCount())
		}

		errChan <- err
	}()
}

// Wait for all Writers to finish
for i := 0; i < writerNum; i++ {
	err := <-errChan

	if err != nil {
		log.Fatalf("%+v", err)
	}
}

// Commit all blocks to finalize the upload
err = session.Commit(blockIds)
log.Println("success to commit all blocks")
if err != nil {
	log.Fatalf("%+v", err)
}
}

func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
varchar, _ := data.NewVarChar(500, "varchar")
char, _ := data.NewVarChar(254, "char")
s := data.String("hello world")
date, _ := data.NewDate("2022-10-19")
datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")

mapType := schema.Columns[15].Type.(datatype.MapType)
mapData := data.NewMapWithType(mapType)
err := mapData.Set("hello", 1)
if err != nil {
	return nil, err
}

err = mapData.Set("world", 2)
if err != nil {
	return nil, err
}

arrayType := schema.Columns[16].Type.(datatype.ArrayType)
arrayData := data.NewArrayWithType(arrayType)
err = arrayData.Append("a")
if err != nil {
	return nil, err
}

err = arrayData.Append("b")
if err != nil {
	return nil, err
}

structType := schema.Columns[17].Type.(datatype.StructType)
structData := data.NewStructWithTyp(structType)

arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
err = arr.Append("x")
if err != nil {
	return nil, err
}
err = arr.Append("y")
if err != nil {
	return nil, err
}
err = structData.SetField("arr", arr)
if err != nil {
	return nil, err
}
err = structData.SetField("name", "tom")
if err != nil {
	return nil, err
}

record := []data.Data{
	data.TinyInt(1),
	data.SmallInt(32767),
	data.Int(100),
	data.BigInt(100000000000),
	data.Binary("binary"),
	data.Float(3.14),
	data.Double(3.1415926),
	data.NewDecimal(38, 18, "3.1415926"),
	varchar,
	char,
	s,
	date,
	datetime,
	timestamp,
	data.Bool(true),
	mapData,
	arrayData,
	structData,
}

return record, nil
}

Batch download

Batch download follows two steps:

  1. Create a DownloadSession, specifying the source table or partition and a compression algorithm.

  2. Open a Reader from the session and iterate records in pages.

package main

import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)

func main() {
// Read configuration information from the configuration file
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
	log.Fatalf("%+v", err)
}

// Initialize ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()

// Get Tunnel endpoint
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
	log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)
        tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)

// Create a DownloadSession, specify the table or partition to read
session, err := tunnelIns.CreateDownloadSession(
	project.Name(),
	"all_types_demo",
	tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
)
if err != nil {
	log.Fatalf("%+v", err)
}

recordCount := session.RecordCount()
fmt.Printf("record count is %d\n", recordCount)

start := 0
step := 100001
total := 0
schema := session.Schema()

for start < recordCount {
	reader, err := session.OpenRecordReader(start, step, nil)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	count := 0
	err = reader.Iterator(func(record data.Record, _err error) {
		if _err != nil {
			return
		}

		for i, d := range record {
			if d == nil {
				fmt.Printf("%s=null", schema.Columns[i].Name)
			} else {
				fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
			}

			if i < record.Len()-1 {
				fmt.Printf(", ")
			} else {
				fmt.Println()
			}
		}
	})

	if err != nil {
		log.Fatalf("%+v", err)
	}

	start += count
	total += count

	log.Println(count)

	if err = reader.Close(); err != nil {
		log.Fatalf("%+v", err)
	}
}

println("total count ", total)
}

Streaming upload

Write records to a table or partition without a commit step. The PackWriter buffers records and flushes them to MaxCompute when the buffer reaches the size threshold.

package main

import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)

func main() {
// Get configuration information from the configuration file
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
	log.Fatalf("%+v", err)
}

// Initialize ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()

// Get Tunnel endpoint
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
	log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)

tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)

session, err := tunnelIns.CreateStreamUploadSession(
	project.Name(),
	"all_types_demo",
	tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
	tunnel.SessionCfg.WithCreatePartition(), // Create a new partition if the specified partition does not exist
	tunnel.SessionCfg.WithDefaultDeflateCompressor(),
)

if err != nil {
	log.Fatalf("%+v", err)
}

packWriter := session.OpenRecordPackWriter()

for i := 0; i < 2; i++ {
	record, err := makeRecord(session.Schema())
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Add records to packWriter until the buffer reaches the size threshold
	for packWriter.DataSize() < 64 {
		err = packWriter.Append(record)
		if err != nil {
			log.Fatalf("%+v", err)
		}
	}

	// Flush buffered data to MaxCompute
	traceId, recordCount, bytesSend, err := packWriter.Flush()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	fmt.Printf(
		"success to upload data with traceId=%s, record count=%d, record bytes=%d\n",
		traceId, recordCount, bytesSend,
	)
}
}

func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
varchar, _ := data.NewVarChar(500, "varchar")
char, _ := data.NewVarChar(254, "char")
s := data.String("hello world")
date, _ := data.NewDate("2022-10-19")
datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")

mapType := schema.Columns[15].Type.(datatype.MapType)
mapData := data.NewMapWithType(mapType)
err := mapData.Set("hello", 1)
if err != nil {
	return nil, err
}

err = mapData.Set("world", 2)
if err != nil {
	return nil, err
}

arrayType := schema.Columns[16].Type.(datatype.ArrayType)
arrayData := data.NewArrayWithType(arrayType)
err = arrayData.Append("a")
if err != nil {
	return nil, err
}

err = arrayData.Append("b")
if err != nil {
	return nil, err
}

structType := schema.Columns[17].Type.(datatype.StructType)
structData := data.NewStructWithTyp(structType)

arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
err = arr.Append("x")
if err != nil {
	return nil, err
}
err = arr.Append("y")
if err != nil {
	return nil, err
}
err = structData.SetField("arr", arr)
if err != nil {
	return nil, err
}
err = structData.SetField("name", "tom")
if err != nil {
	return nil, err
}

record := []data.Data{
	data.TinyInt(1),
	data.SmallInt(32767),
	data.Int(100),
	data.BigInt(100000000000),
	data.Binary("binary"),
	data.Float(3.14),
	data.Double(3.1415926),
	data.NewDecimal(38, 18, "3.1415926"),
	varchar,
	char,
	s,
	date,
	datetime,
	timestamp,
	data.Bool(true),
	mapData,
	arrayData,
	structData,
}

return record, nil
}

Schema management

A schema is a namespace within a MaxCompute project that organizes tables into logical groups. The following example shows how to list, create, delete, and inspect schemas.

package schema

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

// schemas represents all schemas in the default project
schemas := odpsIns.Schemas()
// List all schemas in the default project
schemas.List(func(schema *odps.Schema, err error) {
	print(schema.Name() + "\n")
})

// Set the current schema
odpsIns.SetCurrentSchemaName("default_schema")
// If no schema is specified, the operation queries tables under the "default" schema.
table := odpsIns.Table("table") // actually, the table name is "project.default_schema.table"
print(table.SchemaName())

// List all tables in a specified schema, for example, schema_A
tablesInSchemaA := odps.NewTables(odpsIns, conf.ProjectName, "schema_A")
tablesInSchemaA.List(func(table *odps.Table, err error) {
	print(table.Name() + "\n")
})

// Create a schema
schemas.Create("new_schema", false, "comment")

// Delete a schema
schemas.Delete("to_delete_schema")

// Get schema metadata
schema := schemas.Get("new_schema")
schema.Load()

schema.Name()
schema.ProjectName()
schema.Type()
schema.Owner()
schema.Comment()
schema.CreateTime()
schema.ModifiedTime()
}

Table management

Create tables

Create a regular table

The following example creates a partitioned table named all_types_demo with columns covering all supported MaxCompute data types.

Note

If your project uses data type version 1.0, pass the odps.sql.type.system.odps2=true and odps.sql.decimal.odps2=true hints to enable type version 2.0 support.

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)

func main() {
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)

c1 := tableschema.Column{
	Name: "tiny_int_type",
	Type: datatype.TinyIntType,
}

c2 := tableschema.Column{
	Name: "small_int_type",
	Type: datatype.SmallIntType,
}

c3 := tableschema.Column{
	Name: "int_type",
	Type: datatype.IntType,
}

c4 := tableschema.Column{
	Name: "bigint_type",
	Type: datatype.BigIntType,
}

c5 := tableschema.Column{
	Name: "binary_type",
	Type: datatype.BinaryType,
}

c6 := tableschema.Column{
	Name: "float_type",
	Type: datatype.FloatType,
}

c7 := tableschema.Column{
	Name: "double_type",
	Type: datatype.DoubleType,
}

c8 := tableschema.Column{
	Name: "decimal_type",
	Type: datatype.NewDecimalType(10, 8),
}

c9 := tableschema.Column{
	Name: "varchar_type",
	Type: datatype.NewVarcharType(500),
}

c10 := tableschema.Column{
	Name: "char_type",
	Type: datatype.NewCharType(254),
}

c11 := tableschema.Column{
	Name: "string_type",
	Type: datatype.StringType,
}

c12 := tableschema.Column{
	Name: "date_type",
	Type: datatype.DateType,
}

c13 := tableschema.Column{
	Name: "datetime_type",
	Type: datatype.DateTimeType,
}

c14 := tableschema.Column{
	Name: "timestamp_type",
	Type: datatype.TimestampType,
}

c15 := tableschema.Column{
	Name: "timestamp_ntz_type",
	Type: datatype.TimestampNtzType,
}

c16 := tableschema.Column{
	Name: "boolean_type",
	Type: datatype.BooleanType,
}

mapType := datatype.NewMapType(datatype.StringType, datatype.BigIntType)
arrayType := datatype.NewArrayType(datatype.StringType)
structType := datatype.NewStructType(
	datatype.NewStructFieldType("arr", arrayType),
	datatype.NewStructFieldType("name", datatype.StringType),
)
jsonType := datatype.NewJsonType()

c17 := tableschema.Column{
	Name: "map_type",
	Type: mapType,
}

c18 := tableschema.Column{
	Name: "array_type",
	Type: arrayType,
}

c19 := tableschema.Column{
	Name: "struct_type",
	Type: structType,
}

c20 := tableschema.Column{
	Name: "json_type",
	Type: jsonType,
}

p1 := tableschema.Column{
	Name: "p1",
	Type: datatype.BigIntType,
}

p2 := tableschema.Column{
	Name: "p2",
	Type: datatype.StringType,
}

schemaBuilder := tableschema.NewSchemaBuilder()
schemaBuilder.Name("all_types_demo").
	Columns(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19, c20).
	PartitionColumns(p1, p2).
	Lifecycle(2) // Unit: day

schema := schemaBuilder.Build()
tablesIns := odpsIns.Tables()
// If the project uses data type version 1.0, enable version 2.0 via hints.
hints := make(map[string]string)
hints["odps.sql.type.system.odps2"] = "true"
hints["odps.sql.decimal.odps2"] = "true"

err = tablesIns.Create(schema, true, hints, nil)
if err != nil {
	log.Fatalf("%+v", err)
}
}

Create a hash-clustered table

For details about hash clustering, see Hash clustering.

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)

if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

// Create a table equivalent to:
// CREATE TABLE test_hash_clustering (a string, b string, c bigint)
// PARTITIONED BY (dt string)
// CLUSTERED BY (c)
// SORTED by (c) INTO 1024 BUCKETS;

c1 := tableschema.Column{
	Name: "a",
	Type: datatype.StringType,
}

c2 := tableschema.Column{
	Name: "b",
	Type: datatype.StringType,
}

c3 := tableschema.Column{
	Name: "c",
	Type: datatype.BigIntType,
}

// Partition column
pc := tableschema.Column{
	Name: "dt",
	Type: datatype.StringType,
}

sb := tableschema.NewSchemaBuilder()

sb.Name("test_hash_clustering"). // table name
				Columns(c1, c2, c3).                        // columns
				PartitionColumns(pc).                       // partition columns
				ClusterType(tableschema.CLUSTER_TYPE.Hash). // hash clustering
				ClusterColumns([]string{c3.Name}).          // cluster key
	// Sort key (optional). In most cases, keep it consistent with the cluster key for best optimization results.
	ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
	ClusterBucketNum(1024) // bucket count (optional)

tablesIns := odpsIns.Tables()

schema := sb.Build()

println(schema.ToSQLString("test_cluster", "", true))

err = tablesIns.Create(schema, true, nil, nil)
if err != nil {
	log.Fatalf("%+v", err)
}

}

Create a range-clustered table

For details about range clustering, see Range clustering.

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)

if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

// Create a table equivalent to:
// CREATE TABLE test_range_clustering (a string, b string, c int)
// PARTITIONED BY (dt int)
// RANGE CLUSTERED BY (c)
// SORTED by (c)
// INTO 1024 BUCKETS;

c1 := tableschema.Column{
	Name: "a",
	Type: datatype.StringType,
}

c2 := tableschema.Column{
	Name: "b",
	Type: datatype.StringType,
}

c3 := tableschema.Column{
	Name: "c",
	Type: datatype.BigIntType,
}

// Partition column
pc := tableschema.Column{
	Name: "dt",
	Type: datatype.StringType,
}

sb := tableschema.NewSchemaBuilder()

sb.Name("test_range_clustering"). // table name
				Columns(c1, c2, c3).                         // columns
				PartitionColumns(pc).                        // partition columns
				ClusterType(tableschema.CLUSTER_TYPE.Range). // range clustering
				ClusterColumns([]string{c3.Name}).           // range cluster key
	// Sort key (optional). In most cases, keep it consistent with the cluster key for best optimization results.
	ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
	ClusterBucketNum(1024) // bucket count (optional)

tablesIns := odpsIns.Tables()

schema := sb.Build()

println(schema.ToSQLString("test_cluster", "", true))

err = tablesIns.Create(schema, true, nil, nil)
if err != nil {
	log.Fatalf("%+v", err)
}

}

Create an OSS external table

For details about external tables, see Create an OSS external table.

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
        if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

// create external table if not exists go_sdk_regression_testing.`testCreateExternalTableWithUserDefinedStorageHandler` (
//    `a` STRING ,
//    `b` STRING ,
//    `c` BIGINT
//)
//	comment 'External table using user defined TextStorageHandler'
//	partitioned by (`dt` STRING)
//	stored by 'com.aliyun.odps.udf.example.text.TextStorageHandler'
//	with serdeproperties('odps.text.option.delimiter'='|', 'my.own.option'='value')
//	location 'MOCKoss://full/uri/path/to/oss/directory/'
//	lifecycle 10;

tableName := "testCreateExternalTableWithUserDefinedStorageHandler"

c1 := tableschema.Column{
	Name: "a",
	Type: datatype.StringType,
}

c2 := tableschema.Column{
	Name: "b",
	Type: datatype.StringType,
}

c3 := tableschema.Column{
	Name: "c",
	Type: datatype.BigIntType,
}

// Partition column
pc := tableschema.Column{
	Name: "dt",
	Type: datatype.StringType,
}

sb := tableschema.NewSchemaBuilder()

sb.Name(tableName). // table name
	Columns(c1, c2, c3). // columns
	PartitionColumns(pc). // partition columns
	Location("oss://full/uri/path/to/oss/directory/").
	StorageHandler("com.aliyun.odps.udf.example.text.TextStorageHandler").
	Comment("External table using user defined TextStorageHandler").
	Lifecycle(10)

tablesIns := odpsIns.Tables()

schema := sb.Build()

// Define the SerDe properties
serDeProperties := map[string]string{
	"odps.text.option.delimiter": "|",
	"my.own.option":              "value",
}

// Define the hints
hints := map[string]string{
	"odps.sql.preparse.odps2":       "lot",
	"odps.sql.planner.mode":         "lot",
	"odps.sql.planner.parser.odps2": "true",
	"odps.sql.ddl.odps2":            "true",
	"odps.compiler.output.format":   "lot,pot",
}

sql, err := schema.ToExternalSQLString(odpsIns.DefaultProjectName(), "", true, serDeProperties, nil)
print(sql)

err = tablesIns.CreateExternal(schema, true, serDeProperties, nil, hints, nil)

if err != nil {
	log.Fatalf("%+v", err)
}
}

List tables

Filter the list by name prefix or table type (ManagedTable, VirtualView, ExternalTable).

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)

if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
ts := project.Tables()

ts.List(
	func(t *odps.Table, err error) {
		if err != nil {
			log.Fatalf("%+v", err)
		}

		println(t.Name())
	},
	// Filter by table name prefix
	odps.TableFilter.NamePrefix("all_type"),
	// Filter by table type. Other table types include: VirtualView, ExternalTable
	odps.TableFilter.Type(odps.ManagedTable),
)
}

Get table information

Check whether a table exists

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")

ok, err := table.Exists()
if err != nil {
	log.Fatalf("%+v", err)
}

println(ok)
}

Get table size and row count

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")

err = table.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

// Get table size (in bytes)
size := table.Size()
println("size = ", size)

// Get table row count
rowCount := table.RecordNum()
println("rowCount = ", rowCount)
}

Get table timestamps

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")

err = table.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

// Get table creation time
createTime := table.CreatedTime()
println("create time = ", createTime)

// Get the time of the last DDL operation
lastDDLTime := table.LastDDLTime()
println("last ddl time = ", lastDDLTime)

// Get the last modification time of the table
lastModifiedTime := table.LastModifiedTime()
println("last modified time = ", lastModifiedTime)
}

Get table owner

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")

err = table.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

// Get table owner
owner := table.Owner()
println("owner is ", owner)
}

Get table type

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")

err = table.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

// Get table type
t := table.Type()
println("type is ", t)
}

Get table structure

package main

import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
"strings"
)

func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("test_cluster_table")

err = table.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

// Get the table schema
schema := table.Schema()

println("table name = ", schema.TableName)
if table.LifeCycle() > 0 {
	println("table lifecycle = ", table.LifeCycle())
}

// Get columns
for _, c := range schema.Columns {
	fmt.Printf("column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
}

// Get partition columns
for _, c := range schema.PartitionColumns {
	fmt.Printf("partition column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
}

// Get cluster information
if schema.ClusterInfo.ClusterType != "" {
	ci := schema.ClusterInfo
	println("cluster type = ", ci.ClusterType)
	println("cluster columns = ", strings.Join(ci.ClusterCols, ", "))
	println("cluster bucket num = ", ci.BucketNum)
}
}

Delete a table

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("test_cluster_table")

err = table.Delete()
if err != nil {
	log.Fatalf("%+v", err)
}
}

Partition management

The SDK supports two levels of partition retrieval: partition values (lightweight, returns strings) and partition objects (full metadata including size and timestamps).

List partition values

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
// Get the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")

partitionValues, err := table.GetPartitionValues()
if err != nil {
	log.Fatalf("%+v", err)
}

for _, pv := range partitionValues {
	println(pv)
}

}

List partition objects

Each partition object includes size, CreatedTime, LastDDLTime, and LastModifiedTime.

package main

import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")

partitions, err := table.GetPartitions()
if err != nil {
	log.Fatalf("%+v", err)
}

fmt.Printf("get %d partitions\n", len(partitions))

for _, p := range partitions {
        fmt.Printf(
    		"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
    		p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
    	)
}
}

Get a single partition

Get basic partition information

package main

import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")

p, err := table.GetPartition("p1=20/p2=hangzhou")
if err != nil {
	log.Fatalf("%+v", err)
}

fmt.Printf(
	"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
	p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)
}

Get extended partition information

Call LoadExtended() to retrieve additional fields: archive status, lifecycle, and physical size.

package main

import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")

p, err := table.GetPartition("p1=20/p2=hangzhou")
if err != nil {
	log.Fatalf("%+v", err)
}

// Get basic partition information
fmt.Printf(
	"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
	p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)

// Get extended partition information
err = p.LoadExtended()
if err != nil {
	log.Fatalf("%+v", err)
}

fmt.Printf(
	"isArchived=%t, lifeCycle=%d, physicalSize=%d",
	p.IsArchivedEx(), p.LifeCycleEx(), p.PhysicalSizeEx(),
)
}

Add partitions

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")

err = table.AddPartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
if err != nil {
	log.Fatalf("%+v", err)
}
}

Delete partitions

package main

import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")

err = table.DeletePartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
if err != nil {
	log.Fatalf("%+v", err)
}
}

Instance management

MaxCompute returns an instance object each time you submit a SQL job. The instance tracks job status and results.

List instances

Filter instances by time range and status. The example below lists terminated instances within a specific window.

package main

import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
"time"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

timeFormat := "2006-01-02 15:04:05"
startTime, _ := time.Parse(timeFormat, "2024-10-11 02:15:30")
endTime, _ := time.Parse(timeFormat, "2024-10-13 06:22:02")

var f = func(i *odps.Instance) {
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(
		fmt.Sprintf(
			"%s, %s, %s, %s, %s",
			i.Id(), i.Owner(), i.StartTime().Format(timeFormat), i.EndTime().Format(timeFormat), i.Status(),
		))
}

instances := odpsIns.Instances()
instances.List(
	f,
	odps.InstanceFilter.TimeRange(startTime, endTime),
	odps.InstanceFilter.Status(odps.InstanceTerminated),
)
}

Get instance information

package main

import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

ins := odpsIns.Instances().Get("<yourInstanceId>")
err = ins.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

fmt.Printf("owner=%s\n", ins.Owner())
fmt.Printf("status=%s\n", ins.Status())
fmt.Printf("startTime=%s\n", ins.StartTime())
fmt.Printf("endTime=%s\n", ins.EndTime())
fmt.Printf("result=%+v\n", ins.TaskResults())
}

Permission management

Run permission management commands through the SDK using SecurityManager.RunQuery. For an overview of the MaxCompute authorization model, see Overview. The following example runs DESC ROLE to inspect a role's permissions.

package main

import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/security"
"log"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

var restClient = odpsIns.RestClient()

sm := security.NewSecurityManager(restClient, conf.ProjectName)
result, err := sm.RunQuery("desc role role_project_admin;", true, "")

if err != nil {
	log.Fatalf("%+v", err)
}

println(fmt.Sprintf("ok: %s", result))
}

LogView

LogView lets you inspect submitted jobs and diagnose execution issues. For more information, see Use LogView to view job information. The following example generates a LogView URL for a SQL job.

package main

import (
"log"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)

func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
	log.Fatalf("%+v", err)
}

aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)

sql := "select * from all_types_demo where p1>0 or p2 > '';"

// SQL engine parameters, for example, odps.sql.skewjoin
var hints map[string]string = nil

// Create a SqlTask
sqlTask := odps.NewSqlTask("select", sql, hints)

// Run SQL using the quota associated with the project
project := odpsIns.DefaultProjectName()
ins, err := sqlTask.Run(odpsIns, project)
if err != nil {
	log.Fatalf("%+v", err)
}

logView, err := odpsIns.LogView().GenerateLogView(ins, 1)
if err != nil {
	log.Fatalf("%+v", err)
}

println(logView)
}