All Products
Search
Document Center

MaxCompute:Use the Go SDK

Last Updated:Nov 04, 2024

After successfully creating the ODPS object, you can manage tables and instances within your project space. This can be done in ways such as performing SQL operations, uploading and downloading data, managing tables and partitions, and handling instance management.

Note

You can use any of the methods described in Configure access credentials of the Go SDK to create an ODPS object. In this topic, all example codes use the method that loads the AccessKey from the config.ini file.

Execute SQL

Execute various MaxCompute SQL commands by using the run method of the SQLTask object or the MaxCompute SQL Driver.

Use the SDK

You can execute various types of MaxCompute SQL statements by using the run method of the SQLTask object, which returns an instance object. When executing a SELECT statement, if the query result contains more than 10,000 rows of data, you need to use the Tunnel to download the entire query result. If the query result contains fewer than 10,000 rows of data, you can directly obtain the query results from the Instance object. The following code demonstrates the SQL execution method by using a SELECT statement as an example:

Example 1: Execute SELECT and get query results

For query results that contain fewer than 10,000 rows of data, you can directly read the data by using a CSV Reader.

package main

import (
	"fmt"
	"io"
	"log"

	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	sql := "select * from all_types_demo where p1>0 or p2 > '';"

	// SQL engine parameters, for example, odps.sql.skewjoin 
	var hints map[string]string = nil

	sqlTask := odps.NewSqlTask("select_demo", sql, hints)
	ins, err := sqlTask.Run(odpsIns, odpsIns.DefaultProjectName())
	if err != nil {
		log.Fatalf("%+v", err)
	}

	err = ins.WaitForSuccess()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	csvReader, err := sqlTask.GetSelectResultAsCsv(ins, true)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for {
		record, err := csvReader.Read()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("%+v", err)
		}
		fmt.Printf("%v\n", record)
	}
}

Example 2: Execute SELECT and get query results using a Tunnel

For query results exceeding 10,000 rows, a Tunnel is required to obtain the data.

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
	"log"
)

func main() {
	// Configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
	    log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	sql := "select * from all_types_demo where p1 = 20 and p2 = 'hangzhou';"

	// SQL engine parameters, for example, odps.sql.skewjoin
	var hints map[string]string = nil

	sqlTask := odps.NewSqlTask("select_demo", sql, hints)

	// Run SQL by using the quota associated with the project
	projectName := odpsIns.DefaultProjectName()

	ins, err := sqlTask.Run(odpsIns, projectName)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	err = ins.WaitForSuccess()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Generate logView to get detailed job information
	lv := odpsIns.LogView()
	lvUrl, err := lv.GenerateLogView(ins, 10)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(lvUrl)

	project := odpsIns.DefaultProject()
	tunnelEndpoint, err := project.GetTunnelEndpoint()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Create a Tunnel instance
	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
	session, err := tunnelIns.CreateInstanceResultDownloadSession(project.Name(), ins.Id())
	if err != nil {
		log.Fatalf("%+v", err)
	}

	start := 0
	step := 200000
	recordCount := session.RecordCount()
	schema := session.Schema()
	total := 0

	for start < recordCount {
		reader, err := session.OpenRecordReader(start, step, 0, nil)
		if err != nil {
			log.Fatalf("%+v", err)
		}

		count := 0
		err = reader.Iterator(func(record data.Record, _err error) {
			count += 1

			if _err != nil {
				return
			}
			
			for i, d := range record {
				if d == nil {
					fmt.Printf("%s=null", schema.Columns[i].Name)
				} else {
					fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
				}
			
				if i < record.Len()-1 {
					fmt.Printf(", ")
				} else {
					fmt.Println()
				}
			}
		})

		if err != nil {
			log.Fatalf("%+v", err)
		}

		start += count
		total += count
		log.Println(count)
		if err = reader.Close(); err != nil {
			log.Fatalf("%+v", err)
		}
	}

	println("total count ", total)
}

Use MaxCompute SQL Driver

Example 1: Execute CREATE TABLE statement

package main

import (
	"database/sql"
	"log"
)

func main() {
	// Configure AccessKey in environment variables: ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECURITY
	dsn := "http://<endpoint>?project=<project>&odps.sql.type.system.odps2=true&odps.sql.decimal.odps2=true"

	db, err := sql.Open("odps", dsn)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	sqlStr := "create table table_with_date (date_col DATE);"
	_, err = db.Exec(sqlStr)
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

Example 2: Execute SELECT statement and get results

package main

import (
	"database/sql"
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/sqldriver"
	"log"
	"reflect"
)

func main() {
	config, err := sqldriver.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	dsn := config.FormatDsn()
	// or dsn := "http://<accessId>:<accessKey>@<endpoint>?project=<project>"

	db, err := sql.Open("odps", dsn)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	selectSql := "select * from all_types_demo where bigint_type=@bigint_type and p1=@p1 and p2='@p2';"

	rows, err := db.Query(
		selectSql,
		sql.Named("bigint_type", 100000000000),
		sql.Named("p1", 20),
		sql.Named("p2", "hangzhou"),
	)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	columnTypes, err := rows.ColumnTypes()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	record := make([]interface{}, len(columnTypes))

	for i, columnType := range columnTypes {
		record[i] = reflect.New(columnType.ScanType()).Interface()
		t := reflect.TypeOf(record[i])

		fmt.Printf("kind=%s, name=%s\n", t.Kind(), t.String())
	}

	columns, err := rows.Columns()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for rows.Next() {
		err = rows.Scan(record...)
		if err != nil {
			log.Fatalf("%+v", err)
		}

		for i, r := range record {
			rr := r.(sqldriver.NullAble)

			if rr.IsNull() {
				fmt.Printf("%s=NULL", columns[i])
			} else {
				switch r.(type) {
				case *sqldriver.NullInt8:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt8).Int8)
				case *sqldriver.NullInt16:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt16).Int16)
				case *sqldriver.NullInt32:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt32).Int32)
				case *sqldriver.NullInt64:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt64).Int64)
				case *sqldriver.Binary:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullFloat32:
					fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat32).Float32)
				case *sqldriver.NullFloat64:
					fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat64).Float64)
				case *sqldriver.Decimal:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullString:
					fmt.Printf("%s=%s", columns[i], r.(*sqldriver.NullString).String)
				case *sqldriver.NullDate:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullDateTime:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullTimeStamp:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullBool:
					fmt.Printf("%s=%v", columns[i], r.(*sqldriver.NullBool).Bool)
				case *sqldriver.Map:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.Array:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.Struct:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.Json:
					fmt.Printf("%s=%s", columns[i], r)
				}
			}

			if i < len(record)-1 {
				fmt.Printf(", ")
			} else {
				fmt.Print("\n\n")
			}
		}
	}
}

Data upload and download

You can use the Tunnel to perform batch uploads and downloads of data for tables or partitions, or you can write data into tables or partitions through the MaxCompute Streaming Tunnel. For more information, see Overview of streaming tunnel.

Initialize Tunnel

You can use any of the methods described in Configure access credentials of the Go SDK to create an ODPS object. The following example code uses the method that loads the AccessKey from the config.ini file:

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
	"log"
)

func main() {
	// Get configuration information from the configuration file
	conf, err := odps.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Initialize ODPS
	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)
	project := odpsIns.DefaultProject()

	// Get Tunnel Endpoint
	tunnelEndpoint, err := project.GetTunnelEndpoint()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	fmt.Println("tunnel endpoint: " + tunnelEndpoint)
	
	// Initialize Tunnel
	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
	
	println("%+v", tunnelIns)
}

Batch data upload

The following steps are necessary for uploading data to a table or partition after initializing the Tunnel:

  1. To initiate an UploadSession, you need to specify the table or partition where you want to upload the data, along with the compression algorithm used for the data upload.

  2. Create a Writer through the UploadSession to upload data. A Writer is responsible for uploading the data. The data uploaded by a single Writer is referred to as a Block, identified by an INT value as the Block ID. To improve upload speed, you can create multiple Writers to upload data concurrently.

  3. After the Writer uploads the data, you must call UploadSession.commit to finalize the upload process. The commit method requires specifying the block ID list.

    package main
    
    import (
    	"fmt"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
    	"log"
    )
    
    func main() {
    	// Get configuration information from the configuration file
    	conf, err := odps.NewConfigFromIni("./config.ini")
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	// Initialize ODPS
    	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
    	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
    	odpsIns.SetDefaultProjectName(conf.ProjectName)
    	project := odpsIns.DefaultProject()
    
    	// Initialize Tunnel
    	tunnelEndpoint, err := project.GetTunnelEndpoint()
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    	fmt.Println("tunnel endpoint: " + tunnelEndpoint)
    	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
    
            // Create an UploadSession, specify the table or partition to write to
    	session, err := tunnelIns.CreateUploadSession(
    		project.Name(),
    		"all_types_demo",
    		tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
    		tunnel.SessionCfg.WithDefaultDeflateCompressor(),
    	)
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	writerNum := 3
    	blockIds := make([]int, writerNum)
    	for i := 0; i < writerNum; i++ {
    		blockIds[i] = i
    	}
    
    	errChan := make(chan error, writerNum)
    
    	// Concurrently upload data through multiple writers, each writer has a blockId as its identity for the data it writes
    	for _, blockId := range blockIds {
    		blockId := blockId
    		go func() {
    			schema := session.Schema()
    			record, err := makeRecord(schema)
    			if err != nil {
    				errChan <- err
    				return
    			}
    
    			recordWriter, err := session.OpenRecordWriter(blockId)
    			if err != nil {
    				errChan <- err
    				return
    			}
    
    			for i := 0; i < 100; i++ {
    				err = recordWriter.Write(record)
    
    				if err != nil {
    					_ = recordWriter.Close()
    					errChan <- err
    					return
    				}
    			}
    			err = recordWriter.Close()
    			if err == nil {
    				fmt.Printf("success to upload %d record, %d bytes\n", recordWriter.RecordCount(), recordWriter.BytesCount())
    			}
    
    			errChan <- err
    		}()
    	}
    
    	// Wait for all Writers to complete data upload
    	for i := 0; i < writerNum; i++ {
    		err := <-errChan
    
    		if err != nil {
    			log.Fatalf("%+v", err)
    		}
    	}
    
    	// Commit all Blocks to complete the upload, and you can view the data in the table
    	err = session.Commit(blockIds)
    	log.Println("success to commit all blocks")
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    }
    
    func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
    	varchar, _ := data.NewVarChar(500, "varchar")
    	char, _ := data.NewVarChar(254, "char")
    	s := data.String("hello world")
    	date, _ := data.NewDate("2022-10-19")
    	datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
    	timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")
    
    	mapType := schema.Columns[15].Type.(datatype.MapType)
    	mapData := data.NewMapWithType(mapType)
    	err := mapData.Set("hello", 1)
    	if err != nil {
    		return nil, err
    	}
    
    	err = mapData.Set("world", 2)
    	if err != nil {
    		return nil, err
    	}
    
    	arrayType := schema.Columns[16].Type.(datatype.ArrayType)
    	arrayData := data.NewArrayWithType(arrayType)
    	err = arrayData.Append("a")
    	if err != nil {
    		return nil, err
    	}
    
    	err = arrayData.Append("b")
    	if err != nil {
    		return nil, err
    	}
    
    	structType := schema.Columns[17].Type.(datatype.StructType)
    	structData := data.NewStructWithTyp(structType)
    
    	arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
    	err = arr.Append("x")
    	if err != nil {
    		return nil, err
    	}
    	err = arr.Append("y")
    	if err != nil {
    		return nil, err
    	}
    	err = structData.SetField("arr", arr)
    	if err != nil {
    		return nil, err
    	}
    	err = structData.SetField("name", "tom")
    	if err != nil {
    		return nil, err
    	}
    
    	record := []data.Data{
    		data.TinyInt(1),
    		data.SmallInt(32767),
    		data.Int(100),
    		data.BigInt(100000000000),
    		data.Binary("binary"),
    		data.Float(3.14),
    		data.Double(3.1415926),
    		data.NewDecimal(38, 18, "3.1415926"),
    		varchar,
    		char,
    		s,
    		date,
    		datetime,
    		timestamp,
    		data.Bool(true),
    		mapData,
    		arrayData,
    		structData,
    	}
    
    	return record, nil
    }
    

Batch data download

The following steps are necessary for downloading data from a table or partition after initializing the Tunnel:

  1. To create a DownloadSession, you need to specify the table or partition from which you want to download the data, along with the compression algorithm for data transfer.

  2. Create a Reader through the DownloadSession to batch download data locally in a paginated form.

    package main
    
    import (
    	"fmt"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
    	"log"
    )
    
    func main() {
    	// Read configuration information from the configuration file
    	conf, err := odps.NewConfigFromIni("./config.ini")
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	// Initialize ODPS
    	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
    	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
    	odpsIns.SetDefaultProjectName(conf.ProjectName)
    	project := odpsIns.DefaultProject()
    
    	// Get Tunnel Endpoint
    	tunnelEndpoint, err := project.GetTunnelEndpoint()
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    	fmt.Println("tunnel endpoint: " + tunnelEndpoint)
            tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
    
    	// Create a DownloadSession, specify the table or partition to read
    	session, err := tunnelIns.CreateDownloadSession(
    		project.Name(),
    		"all_types_demo",
    		tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
    	)
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	recordCount := session.RecordCount()
    	fmt.Printf("record count is %d\n", recordCount)
    
    	start := 0
    	step := 100001
    	total := 0
    	schema := session.Schema()
    
    	for start < recordCount {
    		reader, err := session.OpenRecordReader(start, step, nil)
    		if err != nil {
    			log.Fatalf("%+v", err)
    		}
    
    		count := 0
    		err = reader.Iterator(func(record data.Record, _err error) {
    			if _err != nil {
    				return
    			}
    
    			for i, d := range record {
    				if d == nil {
    					fmt.Printf("%s=null", schema.Columns[i].Name)
    				} else {
    					fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
    				}
    
    				if i < record.Len()-1 {
    					fmt.Printf(", ")
    				} else {
    					fmt.Println()
    				}
    			}
    		})
    
    		if err != nil {
    			log.Fatalf("%+v", err)
    		}
    
    		start += count
    		total += count
    
    		log.Println(count)
    
    		if err = reader.Close(); err != nil {
    			log.Fatalf("%+v", err)
    		}
    	}
    
    	println("total count ", total)
    }

Streaming data upload

MaxCompute supports writing data into tables or partitions by using Overview of streaming tunnel.

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
	"log"
)

func main() {
	// Get configuration information from the configuration file
	conf, err := odps.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Initialize ODPS
	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)
	project := odpsIns.DefaultProject()

	// Get Tunnel Endpoint
	tunnelEndpoint, err := project.GetTunnelEndpoint()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	fmt.Println("tunnel endpoint: " + tunnelEndpoint)

	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)

	session, err := tunnelIns.CreateStreamUploadSession(
		project.Name(),
		"all_types_demo",
		tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
		tunnel.SessionCfg.WithCreatePartition(), // Create a new partition if the specified partition does not exist
		tunnel.SessionCfg.WithDefaultDeflateCompressor(),
	)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	packWriter := session.OpenRecordPackWriter()

	for i := 0; i < 2; i++ {
		record, err := makeRecord(session.Schema())
		if err != nil {
			log.Fatalf("%+v", err)
		}

		// Add data to packWriter until the data size reaches the threshold
		for packWriter.DataSize() < 64 {
			err = packWriter.Append(record)
			if err != nil {
				log.Fatalf("%+v", err)
			}
		}

		// Refresh data
		traceId, recordCount, bytesSend, err := packWriter.Flush()
		if err != nil {
			log.Fatalf("%+v", err)
		}

		fmt.Printf(
			"success to upload data with traceId=%s, record count=%d, record bytes=%d\n",
			traceId, recordCount, bytesSend,
		)
	}
}

func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
	varchar, _ := data.NewVarChar(500, "varchar")
	char, _ := data.NewVarChar(254, "char")
	s := data.String("hello world")
	date, _ := data.NewDate("2022-10-19")
	datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
	timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")

	mapType := schema.Columns[15].Type.(datatype.MapType)
	mapData := data.NewMapWithType(mapType)
	err := mapData.Set("hello", 1)
	if err != nil {
		return nil, err
	}

	err = mapData.Set("world", 2)
	if err != nil {
		return nil, err
	}

	arrayType := schema.Columns[16].Type.(datatype.ArrayType)
	arrayData := data.NewArrayWithType(arrayType)
	err = arrayData.Append("a")
	if err != nil {
		return nil, err
	}

	err = arrayData.Append("b")
	if err != nil {
		return nil, err
	}

	structType := schema.Columns[17].Type.(datatype.StructType)
	structData := data.NewStructWithTyp(structType)

	arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
	err = arr.Append("x")
	if err != nil {
		return nil, err
	}
	err = arr.Append("y")
	if err != nil {
		return nil, err
	}
	err = structData.SetField("arr", arr)
	if err != nil {
		return nil, err
	}
	err = structData.SetField("name", "tom")
	if err != nil {
		return nil, err
	}

	record := []data.Data{
		data.TinyInt(1),
		data.SmallInt(32767),
		data.Int(100),
		data.BigInt(100000000000),
		data.Binary("binary"),
		data.Float(3.14),
		data.Double(3.1415926),
		data.NewDecimal(38, 18, "3.1415926"),
		varchar,
		char,
		s,
		date,
		datetime,
		timestamp,
		data.Bool(true),
		mapData,
		arrayData,
		structData,
	}

	return record, nil
}

Table management

Create tables

Create a regular table

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"log"
)

func main() {
	conf, err := odps.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	c1 := tableschema.Column{
		Name: "tiny_int_type",
		Type: datatype.TinyIntType,
	}

	c2 := tableschema.Column{
		Name: "small_int_type",
		Type: datatype.SmallIntType,
	}

	c3 := tableschema.Column{
		Name: "int_type",
		Type: datatype.IntType,
	}

	c4 := tableschema.Column{
		Name: "bigint_type",
		Type: datatype.BigIntType,
	}

	c5 := tableschema.Column{
		Name: "binary_type",
		Type: datatype.BinaryType,
	}

	c6 := tableschema.Column{
		Name: "float_type",
		Type: datatype.FloatType,
	}

	c7 := tableschema.Column{
		Name: "double_type",
		Type: datatype.DoubleType,
	}

	c8 := tableschema.Column{
		Name: "decimal_type",
		Type: datatype.NewDecimalType(10, 8),
	}

	c9 := tableschema.Column{
		Name: "varchar_type",
		Type: datatype.NewVarcharType(500),
	}

	c10 := tableschema.Column{
		Name: "char_type",
		Type: datatype.NewCharType(254),
	}

	c11 := tableschema.Column{
		Name: "string_type",
		Type: datatype.StringType,
	}

	c12 := tableschema.Column{
		Name: "date_type",
		Type: datatype.DateType,
	}

	c13 := tableschema.Column{
		Name: "datetime_type",
		Type: datatype.DateTimeType,
	}

	c14 := tableschema.Column{
		Name: "timestamp_type",
		Type: datatype.TimestampType,
	}

	c15 := tableschema.Column{
		Name: "timestamp_ntz_type",
		Type: datatype.TimestampNtzType,
	}

	c16 := tableschema.Column{
		Name: "boolean_type",
		Type: datatype.BooleanType,
	}

	mapType := datatype.NewMapType(datatype.StringType, datatype.BigIntType)
	arrayType := datatype.NewArrayType(datatype.StringType)
	structType := datatype.NewStructType(
		datatype.NewStructFieldType("arr", arrayType),
		datatype.NewStructFieldType("name", datatype.StringType),
	)
	jsonType := datatype.NewJsonType()

	c17 := tableschema.Column{
		Name: "map_type",
		Type: mapType,
	}

	c18 := tableschema.Column{
		Name: "array_type",
		Type: arrayType,
	}

	c19 := tableschema.Column{
		Name: "struct_type",
		Type: structType,
	}

	c20 := tableschema.Column{
		Name: "json_type",
		Type: jsonType,
	}

	p1 := tableschema.Column{
		Name: "p1",
		Type: datatype.BigIntType,
	}

	p2 := tableschema.Column{
		Name: "p2",
		Type: datatype.StringType,
	}

	schemaBuilder := tableschema.NewSchemaBuilder()
	schemaBuilder.Name("all_types_demo").
		Columns(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19, c20).
		PartitionColumns(p1, p2).
		Lifecycle(2) // Unit: day

	schema := schemaBuilder.Build()
	tablesIns := odpsIns.Tables()
	// If the data type version of the project is 1.0, you need to use the version 2.0 of data tyep by using the hints.
	hints := make(map[string]string)
	hints["odps.sql.type.system.odps2"] = "true"
	hints["odps.sql.decimal.odps2"] = "true"

	err = tablesIns.Create(schema, true, hints, nil)
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

Create a clustered table

Create a hash-clustered table

For more information about the hash-clustered table, see Hash clustering.

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	// Create a table which the DDL statement is "
	// CREATE TABLE test_hash_clustering (a string, b string, c bigint)
	// PARTITIONED BY (dt string)
	// CLUSTERED BY (c)
	// SORTED by (c) INTO 1024 BUCKETS;"

	c1 := tableschema.Column{
		Name: "a",
		Type: datatype.StringType,
	}

	c2 := tableschema.Column{
		Name: "b",
		Type: datatype.StringType,
	}

	c3 := tableschema.Column{
		Name: "c",
		Type: datatype.BigIntType,
	}

	// partition
	pc := tableschema.Column{
		Name: "dt",
		Type: datatype.StringType,
	}

	sb := tableschema.NewSchemaBuilder()

	sb.Name("test_hash_clustering"). // the table name
						Columns(c1, c2, c3).                        // columns
						PartitionColumns(pc).                       // partition columns
						ClusterType(tableschema.CLUSTER_TYPE.Hash). // ClusterType is the hash clustering
						ClusterColumns([]string{c3.Name}).          // Specify Cluster Key
		// Sort key (optional), in most sconario, In most cases, we recommended that keep it consistent with the Cluster Key for the best optimization results.
		ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
		ClusterBucketNum(1024) // Bucket number (optional)

	tablesIns := odpsIns.Tables()

	schema := sb.Build()

	println(schema.ToSQLString("test_cluster", "", true))

	err = tablesIns.Create(schema, true, nil, nil)
	if err != nil {
		log.Fatalf("%+v", err)
	}

}
Create a range-clustered table

For more information about the range-clustered table, see Range clustering.

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	// Create a table which the DDL statement is "
	// CREATE TABLE test_range_clustering (a string, b string, c int)
	// PARTITIONED BY (dt int)
	// RANGE CLUSTERED BY (c)
	// SORTED by (c)
	// INTO 1024 BUCKETS;"

	c1 := tableschema.Column{
		Name: "a",
		Type: datatype.StringType,
	}

	c2 := tableschema.Column{
		Name: "b",
		Type: datatype.StringType,
	}

	c3 := tableschema.Column{
		Name: "c",
		Type: datatype.BigIntType,
	}

	// partition
	pc := tableschema.Column{
		Name: "dt",
		Type: datatype.StringType,
	}

	sb := tableschema.NewSchemaBuilder()

	sb.Name("test_range_clustering"). // the table name
						Columns(c1, c2, c3).                         // columns
						PartitionColumns(pc).                        // partition columns
						ClusterType(tableschema.CLUSTER_TYPE.Range). // ClusterType is the Range Clustering
						ClusterColumns([]string{c3.Name}).           // Specify the Range Cluster Key
		// Sort key (optional), in most sconario, In most cases, we recommended that keep it consistent with the Cluster Key for the best optimization results.
		ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
		ClusterBucketNum(1024) // Bucket number (optional)

	tablesIns := odpsIns.Tables()

	schema := sb.Build()

	println(schema.ToSQLString("test_cluster", "", true))

	err = tablesIns.Create(schema, true, nil, nil)
	if err != nil {
		log.Fatalf("%+v", err)
	}

}

Create an OSS external table

For more information about the OSS external table, see Create an OSS external table.

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
        if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	// create external table if not exists go_sdk_regression_testing.`testCreateExternalTableWithUserDefinedStorageHandler` (
	//    `a` STRING ,
	//    `b` STRING ,
	//    `c` BIGINT
	//)
	//	comment 'External table using user defined TextStorageHandler'
	//	partitioned by (`dt` STRING)
	//	stored by 'com.aliyun.odps.udf.example.text.TextStorageHandler'
	//	with serdeproperties('odps.text.option.delimiter'='|', 'my.own.option'='value')
	//	location 'MOCKoss://full/uri/path/to/oss/directory/'
	//	lifecycle 10;

	tableName := "testCreateExternalTableWithUserDefinedStorageHandler"

	c1 := tableschema.Column{
		Name: "a",
		Type: datatype.StringType,
	}

	c2 := tableschema.Column{
		Name: "b",
		Type: datatype.StringType,
	}

	c3 := tableschema.Column{
		Name: "c",
		Type: datatype.BigIntType,
	}

	// partition column
	pc := tableschema.Column{
		Name: "dt",
		Type: datatype.StringType,
	}

	sb := tableschema.NewSchemaBuilder()

	sb.Name(tableName). // the table name
		Columns(c1, c2, c3). // columns
		PartitionColumns(pc). // partition columns
		Location("oss://full/uri/path/to/oss/directory/").
		StorageHandler("com.aliyun.odps.udf.example.text.TextStorageHandler").
		Comment("External table using user defined TextStorageHandler").
		Lifecycle(10)

	tablesIns := odpsIns.Tables()

	schema := sb.Build()

	// define the properties mapping
	serDeProperties := map[string]string{
		"odps.text.option.delimiter": "|",
		"my.own.option":              "value",
	}

	// define the hints mapping
	hints := map[string]string{
		"odps.sql.preparse.odps2":       "lot",
		"odps.sql.planner.mode":         "lot",
		"odps.sql.planner.parser.odps2": "true",
		"odps.sql.ddl.odps2":            "true",
		"odps.compiler.output.format":   "lot,pot",
	}

	sql, err := schema.ToExternalSQLString(odpsIns.DefaultProjectName(), "", true, serDeProperties, nil)
	print(sql)

	err = tablesIns.CreateExternal(schema, true, serDeProperties, nil, hints, nil)

	if err != nil {
		log.Fatalf("%+v", err)
	}
}

Get table list

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	ts := project.Tables()

	ts.List(
		func(t *odps.Table, err error) {
			if err != nil {
				log.Fatalf("%+v", err)
			}

			println(t.Name())
		},
		// Filter by table name prefix
		odps.TableFilter.NamePrefix("all_type"),
		// Filter by table type. Other table types include: VirtualView, ExternalTable
		odps.TableFilter.Type(odps.ManagedTable),
	)
}

Get single table information

Check whether a table exists

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	ok, err := table.Exists()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(ok)
}

Get table size and row count

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")
	
	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	
	// Get table size (in bytes)
	size := table.Size()
	println("size = ", size)
	
	// Get table row count
	rowCount := table.RecordNum()
	println("rowCount = ", rowCount)
}

Get table CreatedTime, LastDDLTime, ModifiedTime

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Get table creation time
	createTime := table.CreatedTime()
	println("create time = ", createTime)
	
	// Get the time of the last DDL operation
	lastDDLTime := table.LastDDLTime()
	println("last ddl time = ", lastDDLTime)
	
	// Get the last modification time of the table
	lastModifiedTime := table.LastModifiedTime()
	println("last modified time = ", lastModifiedTime)
}

Get table owner

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Get table Owner
	owner := table.Owner()
	println("owner is ", owner)
}

Get table type

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Get table type
	t := table.Type()
	println("type is ", t)
}

Get table structure

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
	"strings"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("test_cluster_table")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Get the table Schema
	schema := table.Schema()

	println("table name = ", schema.TableName)
	if table.LifeCycle() > 0 {
		println("table lifecycle = ", table.LifeCycle())		
	}

	// Get columns
	for _, c := range schema.Columns {
		fmt.Printf("column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
	}

	// Get partition columns
	for _, c := range schema.PartitionColumns {
		fmt.Printf("partition column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
	}

	// Get the cluster information
	if schema.ClusterInfo.ClusterType != "" {
		ci := schema.ClusterInfo
		println("cluster type = ", ci.ClusterType)
		println("cluster columns = ", strings.Join(ci.ClusterCols, ", "))
		println("cluster bucket num = ", ci.BucketNum)
	}
}

Delete table

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("test_cluster_table")

	err = table.Delete()
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

Partition management

Get partition list

The MaxCompute SDK supports retrieving a list of all partition values and partition objects for a table. The partition objects contain partition information, such as size and lastModifiedTime.

Get partition list values

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Get the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	partitionValues, err := table.GetPartitionValues()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for _, pv := range partitionValues {
		println(pv)
	}

}

Get partition object list

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	partitions, err := table.GetPartitions()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	
	fmt.Printf("get %d partitions\n", len(partitions))
	
	for _, p := range partitions {
        fmt.Printf(	
    		"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
    		p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
    	)
	}
}

Get single partition information

Get basic partition information

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	p, err := table.GetPartition("p1=20/p2=hangzhou")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	fmt.Printf(
		"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
		p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
	)
}

Get extended partition information

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	p, err := table.GetPartition("p1=20/p2=hangzhou")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Get basic partition information
	fmt.Printf(
		"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
		p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
	)

	// Get extended partition information
	err = p.LoadExtended()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	fmt.Printf(
		"isArchived=%t, lifeCycle=%d, physicalSize=%d",
		p.IsArchivedEx(), p.LifeCycleEx(), p.PhysicalSizeEx(),
	)
}

Add partition

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.AddPartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

Delete partition

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.DeletePartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

Instance management

MaxCompute returns an instance object after SQL execution. This instance object represents a MaxCompute job and allows you to track the execution status and results.

Get instance list

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
	"time"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	timeFormat := "2006-01-02 15:04:05"
	startTime, _ := time.Parse(timeFormat, "2024-10-11 02:15:30")
	endTime, _ := time.Parse(timeFormat, "2024-10-13 06:22:02")

	var f = func(i *odps.Instance) {
		if err != nil {
			log.Fatalf("%+v", err)
		}

		println(
			fmt.Sprintf(
				"%s, %s, %s, %s, %s",
				i.Id(), i.Owner(), i.StartTime().Format(timeFormat), i.EndTime().Format(timeFormat), i.Status(),
			))
	}

	instances := odpsIns.Instances()
	instances.List(
		f,
		odps.InstanceFilter.TimeRange(startTime, endTime),
		odps.InstanceFilter.Status(odps.InstanceTerminated),
	)
}

Get instance information

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	ins := odpsIns.Instances().Get("<yourInstanceId>")
	err = ins.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	fmt.Printf("owner=%s\n", ins.Owner())
	fmt.Printf("status=%s\n", ins.Status())
	fmt.Printf("startTime=%s\n", ins.StartTime())
	fmt.Printf("endTime=%s\n", ins.EndTime())
	fmt.Printf("result=%+v\n", ins.TaskResults())
}

Permission management

MaxCompute facilitates permission management through specific commands. For an overview of authorization schemes, see Overview. The following example shows how to use the DESC ROLE command to view role-related information:

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/security"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	var restClient = odpsIns.RestClient()

	sm := security.NewSecurityManager(restClient, conf.ProjectName)
	result, err := sm.RunQuery("desc role role_project_admin;", true, "")

	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(fmt.Sprintf("ok: %s", result))
}

Logview

Logview allows you to view submitted MaxCompute jobs and conduct debugging tests. For more information, see Use LogView to view job information.

package main

import (
	"log"

	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	sql := "select * from all_types_demo where p1>0 or p2 > '';"

	// SQL engine parameters, for example, odps.sql.skewjoin
	var hints map[string]string = nil

	// Create a SqlTask
	sqlTask := odps.NewSqlTask("select", sql, hints)

	// Run SQL by using the quota associated with the project
	project := odpsIns.DefaultProjectName()
	ins, err := sqlTask.Run(odpsIns, project)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	logView, err := odpsIns.LogView().GenerateLogView(ins, 1)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(logView)
}