After successfully creating the ODPS object, you can manage tables and instances within your project space. This can be done in ways such as performing SQL operations, uploading and downloading data, managing tables and partitions, and handling instance management.
You can use any of the methods described in Configure access credentials of the Go SDK to create an ODPS object. In this topic, all example codes use the method that loads the AccessKey from the config.ini
file.
Execute SQL
Execute various MaxCompute SQL commands by using the run
method of the SQLTask
object or the MaxCompute SQL Driver.
Use the SDK
You can execute various types of MaxCompute SQL statements by using the run
method of the SQLTask
object, which returns an instance object. When executing a SELECT
statement, if the query result contains more than 10,000 rows of data, you need to use the Tunnel to download the entire query result. If the query result contains fewer than 10,000 rows of data, you can directly obtain the query results from the Instance object. The following code demonstrates the SQL execution method by using a SELECT
statement as an example:
Example 1: Execute SELECT and get query results
For query results that contain fewer than 10,000 rows of data, you can directly read the data by using a CSV Reader.
package main
import (
"fmt"
"io"
"log"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
sql := "select * from all_types_demo where p1>0 or p2 > '';"
// SQL engine parameters, for example, odps.sql.skewjoin
var hints map[string]string = nil
sqlTask := odps.NewSqlTask("select_demo", sql, hints)
ins, err := sqlTask.Run(odpsIns, odpsIns.DefaultProjectName())
if err != nil {
log.Fatalf("%+v", err)
}
err = ins.WaitForSuccess()
if err != nil {
log.Fatalf("%+v", err)
}
csvReader, err := sqlTask.GetSelectResultAsCsv(ins, true)
if err != nil {
log.Fatalf("%+v", err)
}
for {
record, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf("%v\n", record)
}
}
Example 2: Execute SELECT and get query results using a Tunnel
For query results exceeding 10,000 rows, a Tunnel is required to obtain the data.
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// Configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
sql := "select * from all_types_demo where p1 = 20 and p2 = 'hangzhou';"
// SQL engine parameters, for example, odps.sql.skewjoin
var hints map[string]string = nil
sqlTask := odps.NewSqlTask("select_demo", sql, hints)
// Run SQL by using the quota associated with the project
projectName := odpsIns.DefaultProjectName()
ins, err := sqlTask.Run(odpsIns, projectName)
if err != nil {
log.Fatalf("%+v", err)
}
err = ins.WaitForSuccess()
if err != nil {
log.Fatalf("%+v", err)
}
// Generate logView to get detailed job information
lv := odpsIns.LogView()
lvUrl, err := lv.GenerateLogView(ins, 10)
if err != nil {
log.Fatalf("%+v", err)
}
println(lvUrl)
project := odpsIns.DefaultProject()
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
// Create a Tunnel instance
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
session, err := tunnelIns.CreateInstanceResultDownloadSession(project.Name(), ins.Id())
if err != nil {
log.Fatalf("%+v", err)
}
start := 0
step := 200000
recordCount := session.RecordCount()
schema := session.Schema()
total := 0
for start < recordCount {
reader, err := session.OpenRecordReader(start, step, 0, nil)
if err != nil {
log.Fatalf("%+v", err)
}
count := 0
err = reader.Iterator(func(record data.Record, _err error) {
count += 1
if _err != nil {
return
}
for i, d := range record {
if d == nil {
fmt.Printf("%s=null", schema.Columns[i].Name)
} else {
fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
}
if i < record.Len()-1 {
fmt.Printf(", ")
} else {
fmt.Println()
}
}
})
if err != nil {
log.Fatalf("%+v", err)
}
start += count
total += count
log.Println(count)
if err = reader.Close(); err != nil {
log.Fatalf("%+v", err)
}
}
println("total count ", total)
}
Use MaxCompute SQL Driver
Example 1: Execute CREATE TABLE statement
package main
import (
"database/sql"
"log"
)
func main() {
// Configure AccessKey in environment variables: ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECURITY
dsn := "http://<endpoint>?project=<project>&odps.sql.type.system.odps2=true&odps.sql.decimal.odps2=true"
db, err := sql.Open("odps", dsn)
if err != nil {
log.Fatalf("%+v", err)
}
sqlStr := "create table table_with_date (date_col DATE);"
_, err = db.Exec(sqlStr)
if err != nil {
log.Fatalf("%+v", err)
}
}
Example 2: Execute SELECT statement and get results
package main
import (
"database/sql"
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/sqldriver"
"log"
"reflect"
)
func main() {
config, err := sqldriver.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
dsn := config.FormatDsn()
// or dsn := "http://<accessId>:<accessKey>@<endpoint>?project=<project>"
db, err := sql.Open("odps", dsn)
if err != nil {
log.Fatalf("%+v", err)
}
selectSql := "select * from all_types_demo where bigint_type=@bigint_type and p1=@p1 and p2='@p2';"
rows, err := db.Query(
selectSql,
sql.Named("bigint_type", 100000000000),
sql.Named("p1", 20),
sql.Named("p2", "hangzhou"),
)
if err != nil {
log.Fatalf("%+v", err)
}
columnTypes, err := rows.ColumnTypes()
if err != nil {
log.Fatalf("%+v", err)
}
record := make([]interface{}, len(columnTypes))
for i, columnType := range columnTypes {
record[i] = reflect.New(columnType.ScanType()).Interface()
t := reflect.TypeOf(record[i])
fmt.Printf("kind=%s, name=%s\n", t.Kind(), t.String())
}
columns, err := rows.Columns()
if err != nil {
log.Fatalf("%+v", err)
}
for rows.Next() {
err = rows.Scan(record...)
if err != nil {
log.Fatalf("%+v", err)
}
for i, r := range record {
rr := r.(sqldriver.NullAble)
if rr.IsNull() {
fmt.Printf("%s=NULL", columns[i])
} else {
switch r.(type) {
case *sqldriver.NullInt8:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt8).Int8)
case *sqldriver.NullInt16:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt16).Int16)
case *sqldriver.NullInt32:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt32).Int32)
case *sqldriver.NullInt64:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt64).Int64)
case *sqldriver.Binary:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullFloat32:
fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat32).Float32)
case *sqldriver.NullFloat64:
fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat64).Float64)
case *sqldriver.Decimal:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullString:
fmt.Printf("%s=%s", columns[i], r.(*sqldriver.NullString).String)
case *sqldriver.NullDate:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullDateTime:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullTimeStamp:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullBool:
fmt.Printf("%s=%v", columns[i], r.(*sqldriver.NullBool).Bool)
case *sqldriver.Map:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.Array:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.Struct:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.Json:
fmt.Printf("%s=%s", columns[i], r)
}
}
if i < len(record)-1 {
fmt.Printf(", ")
} else {
fmt.Print("\n\n")
}
}
}
}
Data upload and download
You can use the Tunnel to perform batch uploads and downloads of data for tables or partitions, or you can write data into tables or partitions through the MaxCompute Streaming Tunnel. For more information, see Overview of streaming tunnel.
Initialize Tunnel
You can use any of the methods described in Configure access credentials of the Go SDK to create an ODPS object. The following example code uses the method that loads the AccessKey from the config.ini
file:
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// Get configuration information from the configuration file
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
// Initialize ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()
// Get Tunnel Endpoint
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)
// Initialize Tunnel
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
println("%+v", tunnelIns)
}
Batch data upload
The following steps are necessary for uploading data to a table or partition after initializing the Tunnel:
To initiate an
UploadSession
, you need to specify the table or partition where you want to upload the data, along with the compression algorithm used for the data upload.Create a
Writer
through theUploadSession
to upload data. AWriter
is responsible for uploading the data. The data uploaded by a singleWriter
is referred to as a Block, identified by anINT
value as the Block ID. To improve upload speed, you can create multipleWriters
to upload data concurrently.After the Writer uploads the data, you must call
UploadSession.commit
to finalize the upload process. The commit method requires specifying the block ID list.package main import ( "fmt" "github.com/aliyun/aliyun-odps-go-sdk/odps" "github.com/aliyun/aliyun-odps-go-sdk/odps/account" "github.com/aliyun/aliyun-odps-go-sdk/odps/data" "github.com/aliyun/aliyun-odps-go-sdk/odps/datatype" "github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema" "github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel" "log" ) func main() { // Get configuration information from the configuration file conf, err := odps.NewConfigFromIni("./config.ini") if err != nil { log.Fatalf("%+v", err) } // Initialize ODPS aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey) odpsIns := odps.NewOdps(aliAccount, conf.Endpoint) odpsIns.SetDefaultProjectName(conf.ProjectName) project := odpsIns.DefaultProject() // Initialize Tunnel tunnelEndpoint, err := project.GetTunnelEndpoint() if err != nil { log.Fatalf("%+v", err) } fmt.Println("tunnel endpoint: " + tunnelEndpoint) tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint) // Create an UploadSession, specify the table or partition to write to session, err := tunnelIns.CreateUploadSession( project.Name(), "all_types_demo", tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"), tunnel.SessionCfg.WithDefaultDeflateCompressor(), ) if err != nil { log.Fatalf("%+v", err) } writerNum := 3 blockIds := make([]int, writerNum) for i := 0; i < writerNum; i++ { blockIds[i] = i } errChan := make(chan error, writerNum) // Concurrently upload data through multiple writers, each writer has a blockId as its identity for the data it writes for _, blockId := range blockIds { blockId := blockId go func() { schema := session.Schema() record, err := makeRecord(schema) if err != nil { errChan <- err return } recordWriter, err := session.OpenRecordWriter(blockId) if err != nil { errChan <- err return } for i := 0; i < 100; i++ { err = recordWriter.Write(record) if err != nil { _ = recordWriter.Close() errChan <- err return } } err = recordWriter.Close() if err == nil { fmt.Printf("success to upload %d record, %d bytes\n", recordWriter.RecordCount(), recordWriter.BytesCount()) } errChan <- err }() } // Wait for all Writers to complete data upload for i := 0; i < writerNum; i++ { err := <-errChan if err != nil { log.Fatalf("%+v", err) } } // Commit all Blocks to complete the upload, and you can view the data in the table err = session.Commit(blockIds) log.Println("success to commit all blocks") if err != nil { log.Fatalf("%+v", err) } } func makeRecord(schema tableschema.TableSchema) (data.Record, error) { varchar, _ := data.NewVarChar(500, "varchar") char, _ := data.NewVarChar(254, "char") s := data.String("hello world") date, _ := data.NewDate("2022-10-19") datetime, _ := data.NewDateTime("2022-10-19 17:00:00") timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000") mapType := schema.Columns[15].Type.(datatype.MapType) mapData := data.NewMapWithType(mapType) err := mapData.Set("hello", 1) if err != nil { return nil, err } err = mapData.Set("world", 2) if err != nil { return nil, err } arrayType := schema.Columns[16].Type.(datatype.ArrayType) arrayData := data.NewArrayWithType(arrayType) err = arrayData.Append("a") if err != nil { return nil, err } err = arrayData.Append("b") if err != nil { return nil, err } structType := schema.Columns[17].Type.(datatype.StructType) structData := data.NewStructWithTyp(structType) arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType)) err = arr.Append("x") if err != nil { return nil, err } err = arr.Append("y") if err != nil { return nil, err } err = structData.SetField("arr", arr) if err != nil { return nil, err } err = structData.SetField("name", "tom") if err != nil { return nil, err } record := []data.Data{ data.TinyInt(1), data.SmallInt(32767), data.Int(100), data.BigInt(100000000000), data.Binary("binary"), data.Float(3.14), data.Double(3.1415926), data.NewDecimal(38, 18, "3.1415926"), varchar, char, s, date, datetime, timestamp, data.Bool(true), mapData, arrayData, structData, } return record, nil }
Batch data download
The following steps are necessary for downloading data from a table or partition after initializing the Tunnel:
To create a
DownloadSession
, you need to specify the table or partition from which you want to download the data, along with the compression algorithm for data transfer.Create a
Reader
through theDownloadSession
to batch download data locally in a paginated form.package main import ( "fmt" "github.com/aliyun/aliyun-odps-go-sdk/odps" "github.com/aliyun/aliyun-odps-go-sdk/odps/account" "github.com/aliyun/aliyun-odps-go-sdk/odps/data" "github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel" "log" ) func main() { // Read configuration information from the configuration file conf, err := odps.NewConfigFromIni("./config.ini") if err != nil { log.Fatalf("%+v", err) } // Initialize ODPS aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey) odpsIns := odps.NewOdps(aliAccount, conf.Endpoint) odpsIns.SetDefaultProjectName(conf.ProjectName) project := odpsIns.DefaultProject() // Get Tunnel Endpoint tunnelEndpoint, err := project.GetTunnelEndpoint() if err != nil { log.Fatalf("%+v", err) } fmt.Println("tunnel endpoint: " + tunnelEndpoint) tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint) // Create a DownloadSession, specify the table or partition to read session, err := tunnelIns.CreateDownloadSession( project.Name(), "all_types_demo", tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"), ) if err != nil { log.Fatalf("%+v", err) } recordCount := session.RecordCount() fmt.Printf("record count is %d\n", recordCount) start := 0 step := 100001 total := 0 schema := session.Schema() for start < recordCount { reader, err := session.OpenRecordReader(start, step, nil) if err != nil { log.Fatalf("%+v", err) } count := 0 err = reader.Iterator(func(record data.Record, _err error) { if _err != nil { return } for i, d := range record { if d == nil { fmt.Printf("%s=null", schema.Columns[i].Name) } else { fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql()) } if i < record.Len()-1 { fmt.Printf(", ") } else { fmt.Println() } } }) if err != nil { log.Fatalf("%+v", err) } start += count total += count log.Println(count) if err = reader.Close(); err != nil { log.Fatalf("%+v", err) } } println("total count ", total) }
Streaming data upload
MaxCompute supports writing data into tables or partitions by using Overview of streaming tunnel.
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// Get configuration information from the configuration file
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
// Initialize ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()
// Get Tunnel Endpoint
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
session, err := tunnelIns.CreateStreamUploadSession(
project.Name(),
"all_types_demo",
tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
tunnel.SessionCfg.WithCreatePartition(), // Create a new partition if the specified partition does not exist
tunnel.SessionCfg.WithDefaultDeflateCompressor(),
)
if err != nil {
log.Fatalf("%+v", err)
}
packWriter := session.OpenRecordPackWriter()
for i := 0; i < 2; i++ {
record, err := makeRecord(session.Schema())
if err != nil {
log.Fatalf("%+v", err)
}
// Add data to packWriter until the data size reaches the threshold
for packWriter.DataSize() < 64 {
err = packWriter.Append(record)
if err != nil {
log.Fatalf("%+v", err)
}
}
// Refresh data
traceId, recordCount, bytesSend, err := packWriter.Flush()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf(
"success to upload data with traceId=%s, record count=%d, record bytes=%d\n",
traceId, recordCount, bytesSend,
)
}
}
func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
varchar, _ := data.NewVarChar(500, "varchar")
char, _ := data.NewVarChar(254, "char")
s := data.String("hello world")
date, _ := data.NewDate("2022-10-19")
datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")
mapType := schema.Columns[15].Type.(datatype.MapType)
mapData := data.NewMapWithType(mapType)
err := mapData.Set("hello", 1)
if err != nil {
return nil, err
}
err = mapData.Set("world", 2)
if err != nil {
return nil, err
}
arrayType := schema.Columns[16].Type.(datatype.ArrayType)
arrayData := data.NewArrayWithType(arrayType)
err = arrayData.Append("a")
if err != nil {
return nil, err
}
err = arrayData.Append("b")
if err != nil {
return nil, err
}
structType := schema.Columns[17].Type.(datatype.StructType)
structData := data.NewStructWithTyp(structType)
arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
err = arr.Append("x")
if err != nil {
return nil, err
}
err = arr.Append("y")
if err != nil {
return nil, err
}
err = structData.SetField("arr", arr)
if err != nil {
return nil, err
}
err = structData.SetField("name", "tom")
if err != nil {
return nil, err
}
record := []data.Data{
data.TinyInt(1),
data.SmallInt(32767),
data.Int(100),
data.BigInt(100000000000),
data.Binary("binary"),
data.Float(3.14),
data.Double(3.1415926),
data.NewDecimal(38, 18, "3.1415926"),
varchar,
char,
s,
date,
datetime,
timestamp,
data.Bool(true),
mapData,
arrayData,
structData,
}
return record, nil
}
Table management
Create tables
Create a regular table
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
c1 := tableschema.Column{
Name: "tiny_int_type",
Type: datatype.TinyIntType,
}
c2 := tableschema.Column{
Name: "small_int_type",
Type: datatype.SmallIntType,
}
c3 := tableschema.Column{
Name: "int_type",
Type: datatype.IntType,
}
c4 := tableschema.Column{
Name: "bigint_type",
Type: datatype.BigIntType,
}
c5 := tableschema.Column{
Name: "binary_type",
Type: datatype.BinaryType,
}
c6 := tableschema.Column{
Name: "float_type",
Type: datatype.FloatType,
}
c7 := tableschema.Column{
Name: "double_type",
Type: datatype.DoubleType,
}
c8 := tableschema.Column{
Name: "decimal_type",
Type: datatype.NewDecimalType(10, 8),
}
c9 := tableschema.Column{
Name: "varchar_type",
Type: datatype.NewVarcharType(500),
}
c10 := tableschema.Column{
Name: "char_type",
Type: datatype.NewCharType(254),
}
c11 := tableschema.Column{
Name: "string_type",
Type: datatype.StringType,
}
c12 := tableschema.Column{
Name: "date_type",
Type: datatype.DateType,
}
c13 := tableschema.Column{
Name: "datetime_type",
Type: datatype.DateTimeType,
}
c14 := tableschema.Column{
Name: "timestamp_type",
Type: datatype.TimestampType,
}
c15 := tableschema.Column{
Name: "timestamp_ntz_type",
Type: datatype.TimestampNtzType,
}
c16 := tableschema.Column{
Name: "boolean_type",
Type: datatype.BooleanType,
}
mapType := datatype.NewMapType(datatype.StringType, datatype.BigIntType)
arrayType := datatype.NewArrayType(datatype.StringType)
structType := datatype.NewStructType(
datatype.NewStructFieldType("arr", arrayType),
datatype.NewStructFieldType("name", datatype.StringType),
)
jsonType := datatype.NewJsonType()
c17 := tableschema.Column{
Name: "map_type",
Type: mapType,
}
c18 := tableschema.Column{
Name: "array_type",
Type: arrayType,
}
c19 := tableschema.Column{
Name: "struct_type",
Type: structType,
}
c20 := tableschema.Column{
Name: "json_type",
Type: jsonType,
}
p1 := tableschema.Column{
Name: "p1",
Type: datatype.BigIntType,
}
p2 := tableschema.Column{
Name: "p2",
Type: datatype.StringType,
}
schemaBuilder := tableschema.NewSchemaBuilder()
schemaBuilder.Name("all_types_demo").
Columns(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19, c20).
PartitionColumns(p1, p2).
Lifecycle(2) // Unit: day
schema := schemaBuilder.Build()
tablesIns := odpsIns.Tables()
// If the data type version of the project is 1.0, you need to use the version 2.0 of data tyep by using the hints.
hints := make(map[string]string)
hints["odps.sql.type.system.odps2"] = "true"
hints["odps.sql.decimal.odps2"] = "true"
err = tablesIns.Create(schema, true, hints, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}
Create a clustered table
Create a hash-clustered table
For more information about the hash-clustered table, see Hash clustering.
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
// Create a table which the DDL statement is "
// CREATE TABLE test_hash_clustering (a string, b string, c bigint)
// PARTITIONED BY (dt string)
// CLUSTERED BY (c)
// SORTED by (c) INTO 1024 BUCKETS;"
c1 := tableschema.Column{
Name: "a",
Type: datatype.StringType,
}
c2 := tableschema.Column{
Name: "b",
Type: datatype.StringType,
}
c3 := tableschema.Column{
Name: "c",
Type: datatype.BigIntType,
}
// partition
pc := tableschema.Column{
Name: "dt",
Type: datatype.StringType,
}
sb := tableschema.NewSchemaBuilder()
sb.Name("test_hash_clustering"). // the table name
Columns(c1, c2, c3). // columns
PartitionColumns(pc). // partition columns
ClusterType(tableschema.CLUSTER_TYPE.Hash). // ClusterType is the hash clustering
ClusterColumns([]string{c3.Name}). // Specify Cluster Key
// Sort key (optional), in most sconario, In most cases, we recommended that keep it consistent with the Cluster Key for the best optimization results.
ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
ClusterBucketNum(1024) // Bucket number (optional)
tablesIns := odpsIns.Tables()
schema := sb.Build()
println(schema.ToSQLString("test_cluster", "", true))
err = tablesIns.Create(schema, true, nil, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}
Create a range-clustered table
For more information about the range-clustered table, see Range clustering.
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
// Create a table which the DDL statement is "
// CREATE TABLE test_range_clustering (a string, b string, c int)
// PARTITIONED BY (dt int)
// RANGE CLUSTERED BY (c)
// SORTED by (c)
// INTO 1024 BUCKETS;"
c1 := tableschema.Column{
Name: "a",
Type: datatype.StringType,
}
c2 := tableschema.Column{
Name: "b",
Type: datatype.StringType,
}
c3 := tableschema.Column{
Name: "c",
Type: datatype.BigIntType,
}
// partition
pc := tableschema.Column{
Name: "dt",
Type: datatype.StringType,
}
sb := tableschema.NewSchemaBuilder()
sb.Name("test_range_clustering"). // the table name
Columns(c1, c2, c3). // columns
PartitionColumns(pc). // partition columns
ClusterType(tableschema.CLUSTER_TYPE.Range). // ClusterType is the Range Clustering
ClusterColumns([]string{c3.Name}). // Specify the Range Cluster Key
// Sort key (optional), in most sconario, In most cases, we recommended that keep it consistent with the Cluster Key for the best optimization results.
ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
ClusterBucketNum(1024) // Bucket number (optional)
tablesIns := odpsIns.Tables()
schema := sb.Build()
println(schema.ToSQLString("test_cluster", "", true))
err = tablesIns.Create(schema, true, nil, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}
Create an OSS external table
For more information about the OSS external table, see Create an OSS external table.
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
// create external table if not exists go_sdk_regression_testing.`testCreateExternalTableWithUserDefinedStorageHandler` (
// `a` STRING ,
// `b` STRING ,
// `c` BIGINT
//)
// comment 'External table using user defined TextStorageHandler'
// partitioned by (`dt` STRING)
// stored by 'com.aliyun.odps.udf.example.text.TextStorageHandler'
// with serdeproperties('odps.text.option.delimiter'='|', 'my.own.option'='value')
// location 'MOCKoss://full/uri/path/to/oss/directory/'
// lifecycle 10;
tableName := "testCreateExternalTableWithUserDefinedStorageHandler"
c1 := tableschema.Column{
Name: "a",
Type: datatype.StringType,
}
c2 := tableschema.Column{
Name: "b",
Type: datatype.StringType,
}
c3 := tableschema.Column{
Name: "c",
Type: datatype.BigIntType,
}
// partition column
pc := tableschema.Column{
Name: "dt",
Type: datatype.StringType,
}
sb := tableschema.NewSchemaBuilder()
sb.Name(tableName). // the table name
Columns(c1, c2, c3). // columns
PartitionColumns(pc). // partition columns
Location("oss://full/uri/path/to/oss/directory/").
StorageHandler("com.aliyun.odps.udf.example.text.TextStorageHandler").
Comment("External table using user defined TextStorageHandler").
Lifecycle(10)
tablesIns := odpsIns.Tables()
schema := sb.Build()
// define the properties mapping
serDeProperties := map[string]string{
"odps.text.option.delimiter": "|",
"my.own.option": "value",
}
// define the hints mapping
hints := map[string]string{
"odps.sql.preparse.odps2": "lot",
"odps.sql.planner.mode": "lot",
"odps.sql.planner.parser.odps2": "true",
"odps.sql.ddl.odps2": "true",
"odps.compiler.output.format": "lot,pot",
}
sql, err := schema.ToExternalSQLString(odpsIns.DefaultProjectName(), "", true, serDeProperties, nil)
print(sql)
err = tablesIns.CreateExternal(schema, true, serDeProperties, nil, hints, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}
Get table list
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
ts := project.Tables()
ts.List(
func(t *odps.Table, err error) {
if err != nil {
log.Fatalf("%+v", err)
}
println(t.Name())
},
// Filter by table name prefix
odps.TableFilter.NamePrefix("all_type"),
// Filter by table type. Other table types include: VirtualView, ExternalTable
odps.TableFilter.Type(odps.ManagedTable),
)
}
Get single table information
Check whether a table exists
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
ok, err := table.Exists()
if err != nil {
log.Fatalf("%+v", err)
}
println(ok)
}
Get table size and row count
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get table size (in bytes)
size := table.Size()
println("size = ", size)
// Get table row count
rowCount := table.RecordNum()
println("rowCount = ", rowCount)
}
Get table CreatedTime, LastDDLTime, ModifiedTime
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get table creation time
createTime := table.CreatedTime()
println("create time = ", createTime)
// Get the time of the last DDL operation
lastDDLTime := table.LastDDLTime()
println("last ddl time = ", lastDDLTime)
// Get the last modification time of the table
lastModifiedTime := table.LastModifiedTime()
println("last modified time = ", lastModifiedTime)
}
Get table owner
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get table Owner
owner := table.Owner()
println("owner is ", owner)
}
Get table type
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get table type
t := table.Type()
println("type is ", t)
}
Get table structure
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
"strings"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("test_cluster_table")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get the table Schema
schema := table.Schema()
println("table name = ", schema.TableName)
if table.LifeCycle() > 0 {
println("table lifecycle = ", table.LifeCycle())
}
// Get columns
for _, c := range schema.Columns {
fmt.Printf("column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
}
// Get partition columns
for _, c := range schema.PartitionColumns {
fmt.Printf("partition column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
}
// Get the cluster information
if schema.ClusterInfo.ClusterType != "" {
ci := schema.ClusterInfo
println("cluster type = ", ci.ClusterType)
println("cluster columns = ", strings.Join(ci.ClusterCols, ", "))
println("cluster bucket num = ", ci.BucketNum)
}
}
Delete table
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("test_cluster_table")
err = table.Delete()
if err != nil {
log.Fatalf("%+v", err)
}
}
Partition management
Get partition list
The MaxCompute SDK supports retrieving a list of all partition values and partition objects for a table. The partition objects contain partition information, such as size and lastModifiedTime.
Get partition list values
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Get the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
partitionValues, err := table.GetPartitionValues()
if err != nil {
log.Fatalf("%+v", err)
}
for _, pv := range partitionValues {
println(pv)
}
}
Get partition object list
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
partitions, err := table.GetPartitions()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf("get %d partitions\n", len(partitions))
for _, p := range partitions {
fmt.Printf(
"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)
}
}
Get single partition information
Get basic partition information
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
p, err := table.GetPartition("p1=20/p2=hangzhou")
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf(
"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)
}
Get extended partition information
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
p, err := table.GetPartition("p1=20/p2=hangzhou")
if err != nil {
log.Fatalf("%+v", err)
}
// Get basic partition information
fmt.Printf(
"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)
// Get extended partition information
err = p.LoadExtended()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf(
"isArchived=%t, lifeCycle=%d, physicalSize=%d",
p.IsArchivedEx(), p.LifeCycleEx(), p.PhysicalSizeEx(),
)
}
Add partition
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.AddPartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
if err != nil {
log.Fatalf("%+v", err)
}
}
Delete partition
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.DeletePartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
if err != nil {
log.Fatalf("%+v", err)
}
}
Instance management
MaxCompute returns an instance object after SQL execution. This instance object represents a MaxCompute job and allows you to track the execution status and results.
Get instance list
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
"time"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
timeFormat := "2006-01-02 15:04:05"
startTime, _ := time.Parse(timeFormat, "2024-10-11 02:15:30")
endTime, _ := time.Parse(timeFormat, "2024-10-13 06:22:02")
var f = func(i *odps.Instance) {
if err != nil {
log.Fatalf("%+v", err)
}
println(
fmt.Sprintf(
"%s, %s, %s, %s, %s",
i.Id(), i.Owner(), i.StartTime().Format(timeFormat), i.EndTime().Format(timeFormat), i.Status(),
))
}
instances := odpsIns.Instances()
instances.List(
f,
odps.InstanceFilter.TimeRange(startTime, endTime),
odps.InstanceFilter.Status(odps.InstanceTerminated),
)
}
Get instance information
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
ins := odpsIns.Instances().Get("<yourInstanceId>")
err = ins.Load()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf("owner=%s\n", ins.Owner())
fmt.Printf("status=%s\n", ins.Status())
fmt.Printf("startTime=%s\n", ins.StartTime())
fmt.Printf("endTime=%s\n", ins.EndTime())
fmt.Printf("result=%+v\n", ins.TaskResults())
}
Permission management
MaxCompute facilitates permission management through specific commands. For an overview of authorization schemes, see Overview. The following example shows how to use the DESC ROLE
command to view role-related information:
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/security"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
var restClient = odpsIns.RestClient()
sm := security.NewSecurityManager(restClient, conf.ProjectName)
result, err := sm.RunQuery("desc role role_project_admin;", true, "")
if err != nil {
log.Fatalf("%+v", err)
}
println(fmt.Sprintf("ok: %s", result))
}
Logview
Logview allows you to view submitted MaxCompute jobs and conduct debugging tests. For more information, see Use LogView to view job information.
package main
import (
"log"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
sql := "select * from all_types_demo where p1>0 or p2 > '';"
// SQL engine parameters, for example, odps.sql.skewjoin
var hints map[string]string = nil
// Create a SqlTask
sqlTask := odps.NewSqlTask("select", sql, hints)
// Run SQL by using the quota associated with the project
project := odpsIns.DefaultProjectName()
ins, err := sqlTask.Run(odpsIns, project)
if err != nil {
log.Fatalf("%+v", err)
}
logView, err := odpsIns.LogView().GenerateLogView(ins, 1)
if err != nil {
log.Fatalf("%+v", err)
}
println(logView)
}