The MaxCompute Go SDK lets you run SQL, upload and download data, and manage tables, partitions, schemas, and instances — all from Go code. This topic covers each capability with working examples.
All examples load credentials from a config.ini file. Any of the authentication methods described in Configure access credentials of the Go SDK work with an ODPS object.
Prerequisites
Before you begin, ensure that you have:
A MaxCompute project
An ODPS object created from valid credentials
The Go SDK installed (
github.com/aliyun/aliyun-odps-go-sdk/odps)
Execute SQL
The SDK provides two ways to run MaxCompute SQL: the SQLTask object and the MaxCompute SQL Driver.
Use SQLTask
SQLTask.Run() submits a SQL statement and returns an instance object representing the job. How you retrieve results depends on the size of the result set:
| Result size | Retrieval method |
|---|---|
| Fewer than 10,000 rows | Read directly from the instance using a CSV Reader |
| More than 10,000 rows | Download via Tunnel |
Execute SELECT and get results (fewer than 10,000 rows)
package main
import (
"fmt"
"io"
"log"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
sql := "select * from all_types_demo where p1>0 or p2 > '';"
// SQL engine parameters, for example, odps.sql.skewjoin
var hints map[string]string = nil
sqlTask := odps.NewSqlTask("select_demo", sql, hints)
ins, err := sqlTask.Run(odpsIns, odpsIns.DefaultProjectName())
if err != nil {
log.Fatalf("%+v", err)
}
err = ins.WaitForSuccess()
if err != nil {
log.Fatalf("%+v", err)
}
csvReader, err := sqlTask.GetSelectResultAsCsv(ins, true)
if err != nil {
log.Fatalf("%+v", err)
}
for {
record, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf("%v\n", record)
}
}Execute SELECT and download results via Tunnel (more than 10,000 rows)
For large result sets, create a Tunnel download session against the instance ID after the job completes.
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// Configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
sql := "select * from all_types_demo where p1 = 20 and p2 = 'hangzhou';"
// SQL engine parameters, for example, odps.sql.skewjoin
var hints map[string]string = nil
sqlTask := odps.NewSqlTask("select_demo", sql, hints)
// Run SQL using the quota associated with the project
projectName := odpsIns.DefaultProjectName()
ins, err := sqlTask.Run(odpsIns, projectName)
if err != nil {
log.Fatalf("%+v", err)
}
err = ins.WaitForSuccess()
if err != nil {
log.Fatalf("%+v", err)
}
// Generate a LogView URL for detailed job information
lv := odpsIns.LogView()
lvUrl, err := lv.GenerateLogView(ins, 10)
if err != nil {
log.Fatalf("%+v", err)
}
println(lvUrl)
project := odpsIns.DefaultProject()
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
// Create a Tunnel instance
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
session, err := tunnelIns.CreateInstanceResultDownloadSession(project.Name(), ins.Id())
if err != nil {
log.Fatalf("%+v", err)
}
start := 0
step := 200000
recordCount := session.RecordCount()
schema := session.Schema()
total := 0
for start < recordCount {
reader, err := session.OpenRecordReader(start, step, 0, nil)
if err != nil {
log.Fatalf("%+v", err)
}
count := 0
err = reader.Iterator(func(record data.Record, _err error) {
count += 1
if _err != nil {
return
}
for i, d := range record {
if d == nil {
fmt.Printf("%s=null", schema.Columns[i].Name)
} else {
fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
}
if i < record.Len()-1 {
fmt.Printf(", ")
} else {
fmt.Println()
}
}
})
if err != nil {
log.Fatalf("%+v", err)
}
start += count
total += count
log.Println(count)
if err = reader.Close(); err != nil {
log.Fatalf("%+v", err)
}
}
println("total count ", total)
}Use MaxCompute SQL Driver
The MaxCompute SQL Driver implements the standard database/sql interface, so you can use familiar Go patterns like db.Exec and db.Query.
Execute a CREATE TABLE statement
The DSN format is http://<endpoint>?project=<project>&<hints>. Set your AccessKey in the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECURITY environment variables.
package main
import (
"database/sql"
"log"
)
func main() {
// Configure AccessKey in environment variables: ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECURITY
dsn := "http://<endpoint>?project=<project>&odps.sql.type.system.odps2=true&odps.sql.decimal.odps2=true"
db, err := sql.Open("odps", dsn)
if err != nil {
log.Fatalf("%+v", err)
}
sqlStr := "create table table_with_date (date_col DATE);"
_, err = db.Exec(sqlStr)
if err != nil {
log.Fatalf("%+v", err)
}
}Execute a SELECT statement and read results
This example reads the DSN from a config.ini file and uses named parameters to bind query values.
package main
import (
"database/sql"
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/sqldriver"
"log"
"reflect"
)
func main() {
config, err := sqldriver.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
dsn := config.FormatDsn()
// or dsn := "http://<accessId>:<accessKey>@<endpoint>?project=<project>"
db, err := sql.Open("odps", dsn)
if err != nil {
log.Fatalf("%+v", err)
}
selectSql := "select * from all_types_demo where bigint_type=@bigint_type and p1=@p1 and p2='@p2';"
rows, err := db.Query(
selectSql,
sql.Named("bigint_type", 100000000000),
sql.Named("p1", 20),
sql.Named("p2", "hangzhou"),
)
if err != nil {
log.Fatalf("%+v", err)
}
columnTypes, err := rows.ColumnTypes()
if err != nil {
log.Fatalf("%+v", err)
}
record := make([]interface{}, len(columnTypes))
for i, columnType := range columnTypes {
record[i] = reflect.New(columnType.ScanType()).Interface()
t := reflect.TypeOf(record[i])
fmt.Printf("kind=%s, name=%s\n", t.Kind(), t.String())
}
columns, err := rows.Columns()
if err != nil {
log.Fatalf("%+v", err)
}
for rows.Next() {
err = rows.Scan(record...)
if err != nil {
log.Fatalf("%+v", err)
}
for i, r := range record {
rr := r.(sqldriver.NullAble)
if rr.IsNull() {
fmt.Printf("%s=NULL", columns[i])
} else {
switch r.(type) {
case *sqldriver.NullInt8:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt8).Int8)
case *sqldriver.NullInt16:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt16).Int16)
case *sqldriver.NullInt32:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt32).Int32)
case *sqldriver.NullInt64:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt64).Int64)
case *sqldriver.Binary:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullFloat32:
fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat32).Float32)
case *sqldriver.NullFloat64:
fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat64).Float64)
case *sqldriver.Decimal:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullString:
fmt.Printf("%s=%s", columns[i], r.(*sqldriver.NullString).String)
case *sqldriver.NullDate:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullDateTime:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullTimeStamp:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullBool:
fmt.Printf("%s=%v", columns[i], r.(*sqldriver.NullBool).Bool)
case *sqldriver.Map:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.Array:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.Struct:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.Json:
fmt.Printf("%s=%s", columns[i], r)
}
}
if i < len(record)-1 {
fmt.Printf(", ")
} else {
fmt.Print("\n\n")
}
}
}
}Go data type mapping
When scanning query results, use the following Go types for each MaxCompute SQL type:
| MaxCompute SQL type | Go type (via sqldriver) |
|---|---|
| TINYINT | *sqldriver.NullInt8 |
| SMALLINT | *sqldriver.NullInt16 |
| INT | *sqldriver.NullInt32 |
| BIGINT | *sqldriver.NullInt64 |
| FLOAT | *sqldriver.NullFloat32 |
| DOUBLE | *sqldriver.NullFloat64 |
| DECIMAL | *sqldriver.Decimal |
| VARCHAR | *sqldriver.NullString |
| CHAR | *sqldriver.NullString |
| STRING | *sqldriver.NullString |
| BINARY | *sqldriver.Binary |
| BOOLEAN | *sqldriver.NullBool |
| DATE | *sqldriver.NullDate |
| DATETIME | *sqldriver.NullDateTime |
| TIMESTAMP | *sqldriver.NullTimeStamp |
| MAP | *sqldriver.Map |
| ARRAY | *sqldriver.Array |
| STRUCT | *sqldriver.Struct |
| JSON | *sqldriver.Json |
Use sqldriver.NullAble.IsNull() to check for null values before reading the underlying field.
Data upload and download
Use the Tunnel for batch uploads and downloads. For streaming writes, see Overview of streaming tunnel.
Initialize the Tunnel
All upload and download operations start with a Tunnel instance. The following example gets the Tunnel endpoint from the project and creates a Tunnel object.
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// Get configuration information from the configuration file
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
// Initialize ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()
// Get Tunnel endpoint
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)
// Initialize Tunnel
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
println("%+v", tunnelIns)
}Batch upload
Batch upload follows three steps:
Create an
UploadSession, specifying the target table or partition and a compression algorithm.Open one or more
Writerobjects from the session. Each Writer uploads one data block, identified by an integer block ID. Multiple Writers can run concurrently to increase throughput.After all Writers finish, call
UploadSession.Commitwith the list of block IDs to finalize the upload.
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// Get configuration information from the configuration file
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
// Initialize ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()
// Initialize Tunnel
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
// Create an UploadSession, specify the table or partition to write to
session, err := tunnelIns.CreateUploadSession(
project.Name(),
"all_types_demo",
tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
tunnel.SessionCfg.WithDefaultDeflateCompressor(),
)
if err != nil {
log.Fatalf("%+v", err)
}
writerNum := 3
blockIds := make([]int, writerNum)
for i := 0; i < writerNum; i++ {
blockIds[i] = i
}
errChan := make(chan error, writerNum)
// Concurrently upload data through multiple writers; each writer has a blockId as its identity
for _, blockId := range blockIds {
blockId := blockId
go func() {
schema := session.Schema()
record, err := makeRecord(schema)
if err != nil {
errChan <- err
return
}
recordWriter, err := session.OpenRecordWriter(blockId)
if err != nil {
errChan <- err
return
}
for i := 0; i < 100; i++ {
err = recordWriter.Write(record)
if err != nil {
_ = recordWriter.Close()
errChan <- err
return
}
}
err = recordWriter.Close()
if err == nil {
fmt.Printf("success to upload %d record, %d bytes\n", recordWriter.RecordCount(), recordWriter.BytesCount())
}
errChan <- err
}()
}
// Wait for all Writers to finish
for i := 0; i < writerNum; i++ {
err := <-errChan
if err != nil {
log.Fatalf("%+v", err)
}
}
// Commit all blocks to finalize the upload
err = session.Commit(blockIds)
log.Println("success to commit all blocks")
if err != nil {
log.Fatalf("%+v", err)
}
}
func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
varchar, _ := data.NewVarChar(500, "varchar")
char, _ := data.NewVarChar(254, "char")
s := data.String("hello world")
date, _ := data.NewDate("2022-10-19")
datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")
mapType := schema.Columns[15].Type.(datatype.MapType)
mapData := data.NewMapWithType(mapType)
err := mapData.Set("hello", 1)
if err != nil {
return nil, err
}
err = mapData.Set("world", 2)
if err != nil {
return nil, err
}
arrayType := schema.Columns[16].Type.(datatype.ArrayType)
arrayData := data.NewArrayWithType(arrayType)
err = arrayData.Append("a")
if err != nil {
return nil, err
}
err = arrayData.Append("b")
if err != nil {
return nil, err
}
structType := schema.Columns[17].Type.(datatype.StructType)
structData := data.NewStructWithTyp(structType)
arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
err = arr.Append("x")
if err != nil {
return nil, err
}
err = arr.Append("y")
if err != nil {
return nil, err
}
err = structData.SetField("arr", arr)
if err != nil {
return nil, err
}
err = structData.SetField("name", "tom")
if err != nil {
return nil, err
}
record := []data.Data{
data.TinyInt(1),
data.SmallInt(32767),
data.Int(100),
data.BigInt(100000000000),
data.Binary("binary"),
data.Float(3.14),
data.Double(3.1415926),
data.NewDecimal(38, 18, "3.1415926"),
varchar,
char,
s,
date,
datetime,
timestamp,
data.Bool(true),
mapData,
arrayData,
structData,
}
return record, nil
}Batch download
Batch download follows two steps:
Create a
DownloadSession, specifying the source table or partition and a compression algorithm.Open a
Readerfrom the session and iterate records in pages.
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// Read configuration information from the configuration file
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
// Initialize ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()
// Get Tunnel endpoint
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
// Create a DownloadSession, specify the table or partition to read
session, err := tunnelIns.CreateDownloadSession(
project.Name(),
"all_types_demo",
tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
)
if err != nil {
log.Fatalf("%+v", err)
}
recordCount := session.RecordCount()
fmt.Printf("record count is %d\n", recordCount)
start := 0
step := 100001
total := 0
schema := session.Schema()
for start < recordCount {
reader, err := session.OpenRecordReader(start, step, nil)
if err != nil {
log.Fatalf("%+v", err)
}
count := 0
err = reader.Iterator(func(record data.Record, _err error) {
if _err != nil {
return
}
for i, d := range record {
if d == nil {
fmt.Printf("%s=null", schema.Columns[i].Name)
} else {
fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
}
if i < record.Len()-1 {
fmt.Printf(", ")
} else {
fmt.Println()
}
}
})
if err != nil {
log.Fatalf("%+v", err)
}
start += count
total += count
log.Println(count)
if err = reader.Close(); err != nil {
log.Fatalf("%+v", err)
}
}
println("total count ", total)
}Streaming upload
Write records to a table or partition without a commit step. The PackWriter buffers records and flushes them to MaxCompute when the buffer reaches the size threshold.
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// Get configuration information from the configuration file
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
// Initialize ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()
// Get Tunnel endpoint
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
session, err := tunnelIns.CreateStreamUploadSession(
project.Name(),
"all_types_demo",
tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
tunnel.SessionCfg.WithCreatePartition(), // Create a new partition if the specified partition does not exist
tunnel.SessionCfg.WithDefaultDeflateCompressor(),
)
if err != nil {
log.Fatalf("%+v", err)
}
packWriter := session.OpenRecordPackWriter()
for i := 0; i < 2; i++ {
record, err := makeRecord(session.Schema())
if err != nil {
log.Fatalf("%+v", err)
}
// Add records to packWriter until the buffer reaches the size threshold
for packWriter.DataSize() < 64 {
err = packWriter.Append(record)
if err != nil {
log.Fatalf("%+v", err)
}
}
// Flush buffered data to MaxCompute
traceId, recordCount, bytesSend, err := packWriter.Flush()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf(
"success to upload data with traceId=%s, record count=%d, record bytes=%d\n",
traceId, recordCount, bytesSend,
)
}
}
func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
varchar, _ := data.NewVarChar(500, "varchar")
char, _ := data.NewVarChar(254, "char")
s := data.String("hello world")
date, _ := data.NewDate("2022-10-19")
datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")
mapType := schema.Columns[15].Type.(datatype.MapType)
mapData := data.NewMapWithType(mapType)
err := mapData.Set("hello", 1)
if err != nil {
return nil, err
}
err = mapData.Set("world", 2)
if err != nil {
return nil, err
}
arrayType := schema.Columns[16].Type.(datatype.ArrayType)
arrayData := data.NewArrayWithType(arrayType)
err = arrayData.Append("a")
if err != nil {
return nil, err
}
err = arrayData.Append("b")
if err != nil {
return nil, err
}
structType := schema.Columns[17].Type.(datatype.StructType)
structData := data.NewStructWithTyp(structType)
arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
err = arr.Append("x")
if err != nil {
return nil, err
}
err = arr.Append("y")
if err != nil {
return nil, err
}
err = structData.SetField("arr", arr)
if err != nil {
return nil, err
}
err = structData.SetField("name", "tom")
if err != nil {
return nil, err
}
record := []data.Data{
data.TinyInt(1),
data.SmallInt(32767),
data.Int(100),
data.BigInt(100000000000),
data.Binary("binary"),
data.Float(3.14),
data.Double(3.1415926),
data.NewDecimal(38, 18, "3.1415926"),
varchar,
char,
s,
date,
datetime,
timestamp,
data.Bool(true),
mapData,
arrayData,
structData,
}
return record, nil
}Schema management
A schema is a namespace within a MaxCompute project that organizes tables into logical groups. The following example shows how to list, create, delete, and inspect schemas.
package schema
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
// schemas represents all schemas in the default project
schemas := odpsIns.Schemas()
// List all schemas in the default project
schemas.List(func(schema *odps.Schema, err error) {
print(schema.Name() + "\n")
})
// Set the current schema
odpsIns.SetCurrentSchemaName("default_schema")
// If no schema is specified, the operation queries tables under the "default" schema.
table := odpsIns.Table("table") // actually, the table name is "project.default_schema.table"
print(table.SchemaName())
// List all tables in a specified schema, for example, schema_A
tablesInSchemaA := odps.NewTables(odpsIns, conf.ProjectName, "schema_A")
tablesInSchemaA.List(func(table *odps.Table, err error) {
print(table.Name() + "\n")
})
// Create a schema
schemas.Create("new_schema", false, "comment")
// Delete a schema
schemas.Delete("to_delete_schema")
// Get schema metadata
schema := schemas.Get("new_schema")
schema.Load()
schema.Name()
schema.ProjectName()
schema.Type()
schema.Owner()
schema.Comment()
schema.CreateTime()
schema.ModifiedTime()
}Table management
Create tables
Create a regular table
The following example creates a partitioned table named all_types_demo with columns covering all supported MaxCompute data types.
If your project uses data type version 1.0, pass the odps.sql.type.system.odps2=true and odps.sql.decimal.odps2=true hints to enable type version 2.0 support.
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
c1 := tableschema.Column{
Name: "tiny_int_type",
Type: datatype.TinyIntType,
}
c2 := tableschema.Column{
Name: "small_int_type",
Type: datatype.SmallIntType,
}
c3 := tableschema.Column{
Name: "int_type",
Type: datatype.IntType,
}
c4 := tableschema.Column{
Name: "bigint_type",
Type: datatype.BigIntType,
}
c5 := tableschema.Column{
Name: "binary_type",
Type: datatype.BinaryType,
}
c6 := tableschema.Column{
Name: "float_type",
Type: datatype.FloatType,
}
c7 := tableschema.Column{
Name: "double_type",
Type: datatype.DoubleType,
}
c8 := tableschema.Column{
Name: "decimal_type",
Type: datatype.NewDecimalType(10, 8),
}
c9 := tableschema.Column{
Name: "varchar_type",
Type: datatype.NewVarcharType(500),
}
c10 := tableschema.Column{
Name: "char_type",
Type: datatype.NewCharType(254),
}
c11 := tableschema.Column{
Name: "string_type",
Type: datatype.StringType,
}
c12 := tableschema.Column{
Name: "date_type",
Type: datatype.DateType,
}
c13 := tableschema.Column{
Name: "datetime_type",
Type: datatype.DateTimeType,
}
c14 := tableschema.Column{
Name: "timestamp_type",
Type: datatype.TimestampType,
}
c15 := tableschema.Column{
Name: "timestamp_ntz_type",
Type: datatype.TimestampNtzType,
}
c16 := tableschema.Column{
Name: "boolean_type",
Type: datatype.BooleanType,
}
mapType := datatype.NewMapType(datatype.StringType, datatype.BigIntType)
arrayType := datatype.NewArrayType(datatype.StringType)
structType := datatype.NewStructType(
datatype.NewStructFieldType("arr", arrayType),
datatype.NewStructFieldType("name", datatype.StringType),
)
jsonType := datatype.NewJsonType()
c17 := tableschema.Column{
Name: "map_type",
Type: mapType,
}
c18 := tableschema.Column{
Name: "array_type",
Type: arrayType,
}
c19 := tableschema.Column{
Name: "struct_type",
Type: structType,
}
c20 := tableschema.Column{
Name: "json_type",
Type: jsonType,
}
p1 := tableschema.Column{
Name: "p1",
Type: datatype.BigIntType,
}
p2 := tableschema.Column{
Name: "p2",
Type: datatype.StringType,
}
schemaBuilder := tableschema.NewSchemaBuilder()
schemaBuilder.Name("all_types_demo").
Columns(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19, c20).
PartitionColumns(p1, p2).
Lifecycle(2) // Unit: day
schema := schemaBuilder.Build()
tablesIns := odpsIns.Tables()
// If the project uses data type version 1.0, enable version 2.0 via hints.
hints := make(map[string]string)
hints["odps.sql.type.system.odps2"] = "true"
hints["odps.sql.decimal.odps2"] = "true"
err = tablesIns.Create(schema, true, hints, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}Create a hash-clustered table
For details about hash clustering, see Hash clustering.
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
// Create a table equivalent to:
// CREATE TABLE test_hash_clustering (a string, b string, c bigint)
// PARTITIONED BY (dt string)
// CLUSTERED BY (c)
// SORTED by (c) INTO 1024 BUCKETS;
c1 := tableschema.Column{
Name: "a",
Type: datatype.StringType,
}
c2 := tableschema.Column{
Name: "b",
Type: datatype.StringType,
}
c3 := tableschema.Column{
Name: "c",
Type: datatype.BigIntType,
}
// Partition column
pc := tableschema.Column{
Name: "dt",
Type: datatype.StringType,
}
sb := tableschema.NewSchemaBuilder()
sb.Name("test_hash_clustering"). // table name
Columns(c1, c2, c3). // columns
PartitionColumns(pc). // partition columns
ClusterType(tableschema.CLUSTER_TYPE.Hash). // hash clustering
ClusterColumns([]string{c3.Name}). // cluster key
// Sort key (optional). In most cases, keep it consistent with the cluster key for best optimization results.
ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
ClusterBucketNum(1024) // bucket count (optional)
tablesIns := odpsIns.Tables()
schema := sb.Build()
println(schema.ToSQLString("test_cluster", "", true))
err = tablesIns.Create(schema, true, nil, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}Create a range-clustered table
For details about range clustering, see Range clustering.
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
// Create a table equivalent to:
// CREATE TABLE test_range_clustering (a string, b string, c int)
// PARTITIONED BY (dt int)
// RANGE CLUSTERED BY (c)
// SORTED by (c)
// INTO 1024 BUCKETS;
c1 := tableschema.Column{
Name: "a",
Type: datatype.StringType,
}
c2 := tableschema.Column{
Name: "b",
Type: datatype.StringType,
}
c3 := tableschema.Column{
Name: "c",
Type: datatype.BigIntType,
}
// Partition column
pc := tableschema.Column{
Name: "dt",
Type: datatype.StringType,
}
sb := tableschema.NewSchemaBuilder()
sb.Name("test_range_clustering"). // table name
Columns(c1, c2, c3). // columns
PartitionColumns(pc). // partition columns
ClusterType(tableschema.CLUSTER_TYPE.Range). // range clustering
ClusterColumns([]string{c3.Name}). // range cluster key
// Sort key (optional). In most cases, keep it consistent with the cluster key for best optimization results.
ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
ClusterBucketNum(1024) // bucket count (optional)
tablesIns := odpsIns.Tables()
schema := sb.Build()
println(schema.ToSQLString("test_cluster", "", true))
err = tablesIns.Create(schema, true, nil, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}Create an OSS external table
For details about external tables, see Create an OSS external table.
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
// create external table if not exists go_sdk_regression_testing.`testCreateExternalTableWithUserDefinedStorageHandler` (
// `a` STRING ,
// `b` STRING ,
// `c` BIGINT
//)
// comment 'External table using user defined TextStorageHandler'
// partitioned by (`dt` STRING)
// stored by 'com.aliyun.odps.udf.example.text.TextStorageHandler'
// with serdeproperties('odps.text.option.delimiter'='|', 'my.own.option'='value')
// location 'MOCKoss://full/uri/path/to/oss/directory/'
// lifecycle 10;
tableName := "testCreateExternalTableWithUserDefinedStorageHandler"
c1 := tableschema.Column{
Name: "a",
Type: datatype.StringType,
}
c2 := tableschema.Column{
Name: "b",
Type: datatype.StringType,
}
c3 := tableschema.Column{
Name: "c",
Type: datatype.BigIntType,
}
// Partition column
pc := tableschema.Column{
Name: "dt",
Type: datatype.StringType,
}
sb := tableschema.NewSchemaBuilder()
sb.Name(tableName). // table name
Columns(c1, c2, c3). // columns
PartitionColumns(pc). // partition columns
Location("oss://full/uri/path/to/oss/directory/").
StorageHandler("com.aliyun.odps.udf.example.text.TextStorageHandler").
Comment("External table using user defined TextStorageHandler").
Lifecycle(10)
tablesIns := odpsIns.Tables()
schema := sb.Build()
// Define the SerDe properties
serDeProperties := map[string]string{
"odps.text.option.delimiter": "|",
"my.own.option": "value",
}
// Define the hints
hints := map[string]string{
"odps.sql.preparse.odps2": "lot",
"odps.sql.planner.mode": "lot",
"odps.sql.planner.parser.odps2": "true",
"odps.sql.ddl.odps2": "true",
"odps.compiler.output.format": "lot,pot",
}
sql, err := schema.ToExternalSQLString(odpsIns.DefaultProjectName(), "", true, serDeProperties, nil)
print(sql)
err = tablesIns.CreateExternal(schema, true, serDeProperties, nil, hints, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}List tables
Filter the list by name prefix or table type (ManagedTable, VirtualView, ExternalTable).
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
ts := project.Tables()
ts.List(
func(t *odps.Table, err error) {
if err != nil {
log.Fatalf("%+v", err)
}
println(t.Name())
},
// Filter by table name prefix
odps.TableFilter.NamePrefix("all_type"),
// Filter by table type. Other table types include: VirtualView, ExternalTable
odps.TableFilter.Type(odps.ManagedTable),
)
}Get table information
Check whether a table exists
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
ok, err := table.Exists()
if err != nil {
log.Fatalf("%+v", err)
}
println(ok)
}Get table size and row count
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get table size (in bytes)
size := table.Size()
println("size = ", size)
// Get table row count
rowCount := table.RecordNum()
println("rowCount = ", rowCount)
}Get table timestamps
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get table creation time
createTime := table.CreatedTime()
println("create time = ", createTime)
// Get the time of the last DDL operation
lastDDLTime := table.LastDDLTime()
println("last ddl time = ", lastDDLTime)
// Get the last modification time of the table
lastModifiedTime := table.LastModifiedTime()
println("last modified time = ", lastModifiedTime)
}Get table owner
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get table owner
owner := table.Owner()
println("owner is ", owner)
}Get table type
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get table type
t := table.Type()
println("type is ", t)
}Get table structure
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
"strings"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("test_cluster_table")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get the table schema
schema := table.Schema()
println("table name = ", schema.TableName)
if table.LifeCycle() > 0 {
println("table lifecycle = ", table.LifeCycle())
}
// Get columns
for _, c := range schema.Columns {
fmt.Printf("column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
}
// Get partition columns
for _, c := range schema.PartitionColumns {
fmt.Printf("partition column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
}
// Get cluster information
if schema.ClusterInfo.ClusterType != "" {
ci := schema.ClusterInfo
println("cluster type = ", ci.ClusterType)
println("cluster columns = ", strings.Join(ci.ClusterCols, ", "))
println("cluster bucket num = ", ci.BucketNum)
}
}Delete a table
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("test_cluster_table")
err = table.Delete()
if err != nil {
log.Fatalf("%+v", err)
}
}Partition management
The SDK supports two levels of partition retrieval: partition values (lightweight, returns strings) and partition objects (full metadata including size and timestamps).
List partition values
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Get the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
partitionValues, err := table.GetPartitionValues()
if err != nil {
log.Fatalf("%+v", err)
}
for _, pv := range partitionValues {
println(pv)
}
}List partition objects
Each partition object includes size, CreatedTime, LastDDLTime, and LastModifiedTime.
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
partitions, err := table.GetPartitions()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf("get %d partitions\n", len(partitions))
for _, p := range partitions {
fmt.Printf(
"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)
}
}Get a single partition
Get basic partition information
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
p, err := table.GetPartition("p1=20/p2=hangzhou")
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf(
"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)
}Get extended partition information
Call LoadExtended() to retrieve additional fields: archive status, lifecycle, and physical size.
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
p, err := table.GetPartition("p1=20/p2=hangzhou")
if err != nil {
log.Fatalf("%+v", err)
}
// Get basic partition information
fmt.Printf(
"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)
// Get extended partition information
err = p.LoadExtended()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf(
"isArchived=%t, lifeCycle=%d, physicalSize=%d",
p.IsArchivedEx(), p.LifeCycleEx(), p.PhysicalSizeEx(),
)
}Add partitions
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.AddPartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
if err != nil {
log.Fatalf("%+v", err)
}
}Delete partitions
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.DeletePartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
if err != nil {
log.Fatalf("%+v", err)
}
}Instance management
MaxCompute returns an instance object each time you submit a SQL job. The instance tracks job status and results.
List instances
Filter instances by time range and status. The example below lists terminated instances within a specific window.
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
"time"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
timeFormat := "2006-01-02 15:04:05"
startTime, _ := time.Parse(timeFormat, "2024-10-11 02:15:30")
endTime, _ := time.Parse(timeFormat, "2024-10-13 06:22:02")
var f = func(i *odps.Instance) {
if err != nil {
log.Fatalf("%+v", err)
}
println(
fmt.Sprintf(
"%s, %s, %s, %s, %s",
i.Id(), i.Owner(), i.StartTime().Format(timeFormat), i.EndTime().Format(timeFormat), i.Status(),
))
}
instances := odpsIns.Instances()
instances.List(
f,
odps.InstanceFilter.TimeRange(startTime, endTime),
odps.InstanceFilter.Status(odps.InstanceTerminated),
)
}Get instance information
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
ins := odpsIns.Instances().Get("<yourInstanceId>")
err = ins.Load()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf("owner=%s\n", ins.Owner())
fmt.Printf("status=%s\n", ins.Status())
fmt.Printf("startTime=%s\n", ins.StartTime())
fmt.Printf("endTime=%s\n", ins.EndTime())
fmt.Printf("result=%+v\n", ins.TaskResults())
}Permission management
Run permission management commands through the SDK using SecurityManager.RunQuery. For an overview of the MaxCompute authorization model, see Overview. The following example runs DESC ROLE to inspect a role's permissions.
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/security"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
var restClient = odpsIns.RestClient()
sm := security.NewSecurityManager(restClient, conf.ProjectName)
result, err := sm.RunQuery("desc role role_project_admin;", true, "")
if err != nil {
log.Fatalf("%+v", err)
}
println(fmt.Sprintf("ok: %s", result))
}LogView
LogView lets you inspect submitted jobs and diagnose execution issues. For more information, see Use LogView to view job information. The following example generates a LogView URL for a SQL job.
package main
import (
"log"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
sql := "select * from all_types_demo where p1>0 or p2 > '';"
// SQL engine parameters, for example, odps.sql.skewjoin
var hints map[string]string = nil
// Create a SqlTask
sqlTask := odps.NewSqlTask("select", sql, hints)
// Run SQL using the quota associated with the project
project := odpsIns.DefaultProjectName()
ins, err := sqlTask.Run(odpsIns, project)
if err != nil {
log.Fatalf("%+v", err)
}
logView, err := odpsIns.LogView().GenerateLogView(ins, 1)
if err != nil {
log.Fatalf("%+v", err)
}
println(logView)
}