全部產品
Search
文件中心

MaxCompute:使用Go SDK

更新時間:Nov 13, 2024

成功構建ODPS對象後,您可對專案空間下的Tables和Instances等對象執行後續操作,包括SQL操作、資料上傳/下載、表/分區管理,以及Instance管理等。

您可使用訪問憑證配置方式中的任意一種方法來建立ODPS對象,為了方便,本文的範例程式碼中,均使用config.ini中載入AK的方法。

執行SQL

您可通過SQLTask對象的run方法或MaxCompute SQL Driver執行各類MaxCompute SQL。

通過SDK執行SQL

您可通過SQLTask對象的run方法執行各類MaxCompute SQL,該方法會返回Instance對象。當執行SELECT語句時,如果查詢結果大於10000行資料,需要使用Tunnel下載全部的查詢結果。當查詢結果小於10000行資料時,可以直接從Instance對象擷取查詢結果。下面將以SELECT語句為例介紹SQL執行方法。

樣本一:執行SELECT並擷取查詢結果

當查詢結果小於10000行資料時,查詢結果可用CSV Reader的形式直接讀取。

package main

import (
	"fmt"
	"io"
	"log"

	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	sql := "select * from all_types_demo where p1>0 or p2 > '';"

	// SQL引擎參數, 例如odps.sql.skewjoin 
	var hints map[string]string = nil

	sqlTask := odps.NewSqlTask("select_demo", sql, hints)
	ins, err := sqlTask.Run(odpsIns, odpsIns.DefaultProjectName())
	if err != nil {
		log.Fatalf("%+v", err)
	}

	err = ins.WaitForSuccess()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	csvReader, err := sqlTask.GetSelectResultAsCsv(ins, true)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for {
		record, err := csvReader.Read()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("%+v", err)
		}
		fmt.Printf("%v\n", record)
	}
}

樣本二:執行SELECT,通過Tunnel擷取查詢結果

當查詢結果大於10000行資料時,查詢結果需要通過Tunnel擷取。

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
	"log"
)

func main() {
	// 設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
	    log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	sql := "select * from all_types_demo where p1 = 20 and p2 = 'hangzhou';"

	// SQL引擎參數, 例如odps.sql.skewjoin
	var hints map[string]string = nil

	sqlTask := odps.NewSqlTask("select_demo", sql, hints)

	// 使用專案關聯的配額運行SQL
	projectName := odpsIns.DefaultProjectName()

	ins, err := sqlTask.Run(odpsIns, projectName)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	err = ins.WaitForSuccess()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 產生logView以擷取作業詳細資料
	lv := odpsIns.LogView()
	lvUrl, err := lv.GenerateLogView(ins, 10)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(lvUrl)

	project := odpsIns.DefaultProject()
	tunnelEndpoint, err := project.GetTunnelEndpoint()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 建立Tunnel執行個體
	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
	session, err := tunnelIns.CreateInstanceResultDownloadSession(project.Name(), ins.Id())
	if err != nil {
		log.Fatalf("%+v", err)
	}

	start := 0
	step := 200000
	recordCount := session.RecordCount()
	schema := session.Schema()
	total := 0

	for start < recordCount {
		reader, err := session.OpenRecordReader(start, step, 0, nil)
		if err != nil {
			log.Fatalf("%+v", err)
		}

		count := 0
		err = reader.Iterator(func(record data.Record, _err error) {
			count += 1

			if _err != nil {
				return
			}
			
			for i, d := range record {
				if d == nil {
					fmt.Printf("%s=null", schema.Columns[i].Name)
				} else {
					fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
				}
			
				if i < record.Len()-1 {
					fmt.Printf(", ")
				} else {
					fmt.Println()
				}
			}
		})

		if err != nil {
			log.Fatalf("%+v", err)
		}

		start += count
		total += count
		log.Println(count)
		if err = reader.Close(); err != nil {
			log.Fatalf("%+v", err)
		}
	}

	println("total count ", total)
}

通過MaxCompute SQL Driver執行SQL

樣本一:執行CREATE TABLE語句

package main

import (
	"database/sql"
	"log"
)

func main() {
	// 在環境變數中設定AK: ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECURITY
	dsn := "http://<endpoint>?project=<project>&odps.sql.type.system.odps2=true&odps.sql.decimal.odps2=true"

	db, err := sql.Open("odps", dsn)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	sqlStr := "create table table_with_date (date_col DATE);"
	_, err = db.Exec(sqlStr)
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

樣本二:執行SELECT語句,並擷取結果

package main

import (
	"database/sql"
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/sqldriver"
	"log"
	"reflect"
)

func main() {
	config, err := sqldriver.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	dsn := config.FormatDsn()
	// or dsn := "http://<accessId>:<accessKey>@<endpoint>?project=<project>"

	db, err := sql.Open("odps", dsn)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	selectSql := "select * from all_types_demo where bigint_type=@bigint_type and p1=@p1 and p2='@p2';"

	rows, err := db.Query(
		selectSql,
		sql.Named("bigint_type", 100000000000),
		sql.Named("p1", 20),
		sql.Named("p2", "hangzhou"),
	)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	columnTypes, err := rows.ColumnTypes()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	record := make([]interface{}, len(columnTypes))

	for i, columnType := range columnTypes {
		record[i] = reflect.New(columnType.ScanType()).Interface()
		t := reflect.TypeOf(record[i])

		fmt.Printf("kind=%s, name=%s\n", t.Kind(), t.String())
	}

	columns, err := rows.Columns()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for rows.Next() {
		err = rows.Scan(record...)
		if err != nil {
			log.Fatalf("%+v", err)
		}

		for i, r := range record {
			rr := r.(sqldriver.NullAble)

			if rr.IsNull() {
				fmt.Printf("%s=NULL", columns[i])
			} else {
				switch r.(type) {
				case *sqldriver.NullInt8:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt8).Int8)
				case *sqldriver.NullInt16:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt16).Int16)
				case *sqldriver.NullInt32:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt32).Int32)
				case *sqldriver.NullInt64:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt64).Int64)
				case *sqldriver.Binary:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullFloat32:
					fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat32).Float32)
				case *sqldriver.NullFloat64:
					fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat64).Float64)
				case *sqldriver.Decimal:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullString:
					fmt.Printf("%s=%s", columns[i], r.(*sqldriver.NullString).String)
				case *sqldriver.NullDate:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullDateTime:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullTimeStamp:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullBool:
					fmt.Printf("%s=%v", columns[i], r.(*sqldriver.NullBool).Bool)
				case *sqldriver.Map:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.Array:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.Struct:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.Json:
					fmt.Printf("%s=%s", columns[i], r)
				}
			}

			if i < len(record)-1 {
				fmt.Printf(", ")
			} else {
				fmt.Print("\n\n")
			}
		}
	}
}

資料上傳與下載

您可使用Tunnel對錶/分區的資料進行批量上傳與下載,也可通過流式資料通道將資料寫入表/分區。

初始化Tunnel

Tunnel的初始化範例程式碼如下。您可使用訪問憑證配置方式中的任意一種方法來建立ODPS對象,為了方便,接下來的範例程式碼中,都使用config.ini中載入AK的方法。

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
	"log"
)

func main() {
	// 從設定檔中擷取配置資訊
	conf, err := odps.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 初始化ODPS
	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)
	project := odpsIns.DefaultProject()

	// 擷取Tunnel Endpoint
	tunnelEndpoint, err := project.GetTunnelEndpoint()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	fmt.Println("tunnel endpoint: " + tunnelEndpoint)
	
	// 初始化Tunnel
	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
	
	println("%+v", tunnelIns)
}

批量資料上傳

上傳表或分區資料時,在初始化Tunnel後,需要進行如下操作:

  1. 建立UploadSession,指定要將資料上傳到哪張表/分區,以及指定上傳資料使用的壓縮演算法等。

  2. 使用UploadSession建立Writer,Writer進行資料上傳,一個Writer上傳的資料被稱為一個Block,使用一個INT類型值作為Block ID。為了提高上傳速度,可以建立多個Writer進行並發資料上傳。

  3. Writer上傳資料完畢後,需要調用UploadSession.commit來最終完成上傳,commit需要指定Block ID列表。

    package main
    
    import (
    	"fmt"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
    	"log"
    )
    
    func main() {
    	// 從設定檔中擷取配置資訊
    	conf, err := odps.NewConfigFromIni("./config.ini")
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	// 初始化ODPS
    	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
    	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
    	odpsIns.SetDefaultProjectName(conf.ProjectName)
    	project := odpsIns.DefaultProject()
    
    	// 初始化Tunnel
    	tunnelEndpoint, err := project.GetTunnelEndpoint()
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    	fmt.Println("tunnel endpoint: " + tunnelEndpoint)
    	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
    
            // 建立一個UploadSession,指定要寫入的表/分區
    	session, err := tunnelIns.CreateUploadSession(
    		project.Name(),
    		"all_types_demo",
    		tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
    		tunnel.SessionCfg.WithDefaultDeflateCompressor(),
    	)
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	writerNum := 3
    	blockIds := make([]int, writerNum)
    	for i := 0; i < writerNum; i++ {
    		blockIds[i] = i
    	}
    
    	errChan := make(chan error, writerNum)
    
    	// 通過多個writer並發上傳資料,每個writer都有一個blockId作為它寫入的資料的身份
    	for _, blockId := range blockIds {
    		blockId := blockId
    		go func() {
    			schema := session.Schema()
    			record, err := makeRecord(schema)
    			if err != nil {
    				errChan <- err
    				return
    			}
    
    			recordWriter, err := session.OpenRecordWriter(blockId)
    			if err != nil {
    				errChan <- err
    				return
    			}
    
    			for i := 0; i < 100; i++ {
    				err = recordWriter.Write(record)
    
    				if err != nil {
    					_ = recordWriter.Close()
    					errChan <- err
    					return
    				}
    			}
    			err = recordWriter.Close()
    			if err == nil {
    				fmt.Printf("success to upload %d record, %d bytes\n", recordWriter.RecordCount(), recordWriter.BytesCount())
    			}
    
    			errChan <- err
    		}()
    	}
    
    	// 等待所有Writers完成資料上傳
    	for i := 0; i < writerNum; i++ {
    		err := <-errChan
    
    		if err != nil {
    			log.Fatalf("%+v", err)
    		}
    	}
    
    	// 提交所有Blocks完成上傳,即可在表中查看資料
    	err = session.Commit(blockIds)
    	log.Println("success to commit all blocks")
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    }
    
    func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
    	varchar, _ := data.NewVarChar(500, "varchar")
    	char, _ := data.NewVarChar(254, "char")
    	s := data.String("hello world")
    	date, _ := data.NewDate("2022-10-19")
    	datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
    	timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")
    
    	mapType := schema.Columns[15].Type.(datatype.MapType)
    	mapData := data.NewMapWithType(mapType)
    	err := mapData.Set("hello", 1)
    	if err != nil {
    		return nil, err
    	}
    
    	err = mapData.Set("world", 2)
    	if err != nil {
    		return nil, err
    	}
    
    	arrayType := schema.Columns[16].Type.(datatype.ArrayType)
    	arrayData := data.NewArrayWithType(arrayType)
    	err = arrayData.Append("a")
    	if err != nil {
    		return nil, err
    	}
    
    	err = arrayData.Append("b")
    	if err != nil {
    		return nil, err
    	}
    
    	structType := schema.Columns[17].Type.(datatype.StructType)
    	structData := data.NewStructWithTyp(structType)
    
    	arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
    	err = arr.Append("x")
    	if err != nil {
    		return nil, err
    	}
    	err = arr.Append("y")
    	if err != nil {
    		return nil, err
    	}
    	err = structData.SetField("arr", arr)
    	if err != nil {
    		return nil, err
    	}
    	err = structData.SetField("name", "tom")
    	if err != nil {
    		return nil, err
    	}
    
    	record := []data.Data{
    		data.TinyInt(1),
    		data.SmallInt(32767),
    		data.Int(100),
    		data.BigInt(100000000000),
    		data.Binary("binary"),
    		data.Float(3.14),
    		data.Double(3.1415926),
    		data.NewDecimal(38, 18, "3.1415926"),
    		varchar,
    		char,
    		s,
    		date,
    		datetime,
    		timestamp,
    		data.Bool(true),
    		mapData,
    		arrayData,
    		structData,
    	}
    
    	return record, nil
    }
    

批量資料下載

下載表/分區資料時,在初始化Tunnel後,需要進行如下操作:

  1. 建立DownloadSession,指定從哪個表、分區下載資料,以及指定傳輸資料使用的壓縮演算法等。

  2. 使用DownloadSession建立Reader,Reader可以用分頁的形式,分批將資料下載到本地。

    package main
    
    import (
    	"fmt"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
    	"log"
    )
    
    func main() {
    	// 從設定檔中讀取配置資訊
    	conf, err := odps.NewConfigFromIni("./config.ini")
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	// 初始化ODPS
    	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
    	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
    	odpsIns.SetDefaultProjectName(conf.ProjectName)
    	project := odpsIns.DefaultProject()
    
    	// 擷取Tunnel Endpoint
    	tunnelEndpoint, err := project.GetTunnelEndpoint()
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    	fmt.Println("tunnel endpoint: " + tunnelEndpoint)
            tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
    
    	// 建立一個DownloadSession,指定要讀取的表/分區
    	session, err := tunnelIns.CreateDownloadSession(
    		project.Name(),
    		"all_types_demo",
    		tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
    	)
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	recordCount := session.RecordCount()
    	fmt.Printf("record count is %d\n", recordCount)
    
    	start := 0
    	step := 100001
    	total := 0
    	schema := session.Schema()
    
    	for start < recordCount {
    		reader, err := session.OpenRecordReader(start, step, nil)
    		if err != nil {
    			log.Fatalf("%+v", err)
    		}
    
    		count := 0
    		err = reader.Iterator(func(record data.Record, _err error) {
    			if _err != nil {
    				return
    			}
    
    			for i, d := range record {
    				if d == nil {
    					fmt.Printf("%s=null", schema.Columns[i].Name)
    				} else {
    					fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
    				}
    
    				if i < record.Len()-1 {
    					fmt.Printf(", ")
    				} else {
    					fmt.Println()
    				}
    			}
    		})
    
    		if err != nil {
    			log.Fatalf("%+v", err)
    		}
    
    		start += count
    		total += count
    
    		log.Println(count)
    
    		if err = reader.Close(); err != nil {
    			log.Fatalf("%+v", err)
    		}
    	}
    
    	println("total count ", total)
    }

流式資料上傳

MaxCompute支援通過流式資料通道將資料寫入表/分區的能力。

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
	"log"
)

func main() {
	// 從設定檔中擷取配置資訊
	conf, err := odps.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 初始化ODPS
	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)
	project := odpsIns.DefaultProject()

	// 擷取Tunnel Endpoint
	tunnelEndpoint, err := project.GetTunnelEndpoint()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	fmt.Println("tunnel endpoint: " + tunnelEndpoint)

	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)

	session, err := tunnelIns.CreateStreamUploadSession(
		project.Name(),
		"all_types_demo",
		tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
		tunnel.SessionCfg.WithCreatePartition(), // 如果指定分區不存在,則建立新的分區
		tunnel.SessionCfg.WithDefaultDeflateCompressor(),
	)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	packWriter := session.OpenRecordPackWriter()

	for i := 0; i < 2; i++ {
		record, err := makeRecord(session.Schema())
		if err != nil {
			log.Fatalf("%+v", err)
		}

		// 將資料加入packWriter中,直到資料大小達到閾值
		for packWriter.DataSize() < 64 {
			err = packWriter.Append(record)
			if err != nil {
				log.Fatalf("%+v", err)
			}
		}

		// 重新整理資料
		traceId, recordCount, bytesSend, err := packWriter.Flush()
		if err != nil {
			log.Fatalf("%+v", err)
		}

		fmt.Printf(
			"success to upload data with traceId=%s, record count=%d, record bytes=%d\n",
			traceId, recordCount, bytesSend,
		)
	}
}

func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
	varchar, _ := data.NewVarChar(500, "varchar")
	char, _ := data.NewVarChar(254, "char")
	s := data.String("hello world")
	date, _ := data.NewDate("2022-10-19")
	datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
	timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")

	mapType := schema.Columns[15].Type.(datatype.MapType)
	mapData := data.NewMapWithType(mapType)
	err := mapData.Set("hello", 1)
	if err != nil {
		return nil, err
	}

	err = mapData.Set("world", 2)
	if err != nil {
		return nil, err
	}

	arrayType := schema.Columns[16].Type.(datatype.ArrayType)
	arrayData := data.NewArrayWithType(arrayType)
	err = arrayData.Append("a")
	if err != nil {
		return nil, err
	}

	err = arrayData.Append("b")
	if err != nil {
		return nil, err
	}

	structType := schema.Columns[17].Type.(datatype.StructType)
	structData := data.NewStructWithTyp(structType)

	arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
	err = arr.Append("x")
	if err != nil {
		return nil, err
	}
	err = arr.Append("y")
	if err != nil {
		return nil, err
	}
	err = structData.SetField("arr", arr)
	if err != nil {
		return nil, err
	}
	err = structData.SetField("name", "tom")
	if err != nil {
		return nil, err
	}

	record := []data.Data{
		data.TinyInt(1),
		data.SmallInt(32767),
		data.Int(100),
		data.BigInt(100000000000),
		data.Binary("binary"),
		data.Float(3.14),
		data.Double(3.1415926),
		data.NewDecimal(38, 18, "3.1415926"),
		varchar,
		char,
		s,
		date,
		datetime,
		timestamp,
		data.Bool(true),
		mapData,
		arrayData,
		structData,
	}

	return record, nil
}

Schema管理

package schema

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	// schemas means all Schema in default project
	schemas := odpsIns.Schemas()
	// 擷取預設專案中的所有Schema
	schemas.List(func(schema *odps.Schema, err error) {
		print(schema.Name() + "\n")
	})

	// 指定當前Schema
	odpsIns.SetCurrentSchemaName("default_schema")
	// 直接對錶進行操作,如果未指定Schema,則查詢“default” Schema下的表
	table := odpsIns.Table("table") // actually, the table name is "project.default_schema.table"
	print(table.SchemaName())

	// 擷取指定Schema(例如:schema_A)下的所有表
	tablesInSchemaA := odps.NewTables(odpsIns, conf.ProjectName, "schema_A")
	tablesInSchemaA.List(func(table *odps.Table, err error) {
		print(table.Name() + "\n")
	})

	// 建立Schema
	schemas.Create("new_schema", false, "comment")

	// 刪除Schema
	schemas.Delete("to_delete_schema")

	// 擷取Schema中繼資料
	schema := schemas.Get("new_schema")
	schema.Load()

	schema.Name()
	schema.ProjectName()
	schema.Type()
	schema.Owner()
	schema.Comment()
	schema.CreateTime()
	schema.ModifiedTime()
}

表管理

建立表

建立普通表

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"log"
)

func main() {
	conf, err := odps.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	c1 := tableschema.Column{
		Name: "tiny_int_type",
		Type: datatype.TinyIntType,
	}

	c2 := tableschema.Column{
		Name: "small_int_type",
		Type: datatype.SmallIntType,
	}

	c3 := tableschema.Column{
		Name: "int_type",
		Type: datatype.IntType,
	}

	c4 := tableschema.Column{
		Name: "bigint_type",
		Type: datatype.BigIntType,
	}

	c5 := tableschema.Column{
		Name: "binary_type",
		Type: datatype.BinaryType,
	}

	c6 := tableschema.Column{
		Name: "float_type",
		Type: datatype.FloatType,
	}

	c7 := tableschema.Column{
		Name: "double_type",
		Type: datatype.DoubleType,
	}

	c8 := tableschema.Column{
		Name: "decimal_type",
		Type: datatype.NewDecimalType(10, 8),
	}

	c9 := tableschema.Column{
		Name: "varchar_type",
		Type: datatype.NewVarcharType(500),
	}

	c10 := tableschema.Column{
		Name: "char_type",
		Type: datatype.NewCharType(254),
	}

	c11 := tableschema.Column{
		Name: "string_type",
		Type: datatype.StringType,
	}

	c12 := tableschema.Column{
		Name: "date_type",
		Type: datatype.DateType,
	}

	c13 := tableschema.Column{
		Name: "datetime_type",
		Type: datatype.DateTimeType,
	}

	c14 := tableschema.Column{
		Name: "timestamp_type",
		Type: datatype.TimestampType,
	}

	c15 := tableschema.Column{
		Name: "timestamp_ntz_type",
		Type: datatype.TimestampNtzType,
	}

	c16 := tableschema.Column{
		Name: "boolean_type",
		Type: datatype.BooleanType,
	}

	mapType := datatype.NewMapType(datatype.StringType, datatype.BigIntType)
	arrayType := datatype.NewArrayType(datatype.StringType)
	structType := datatype.NewStructType(
		datatype.NewStructFieldType("arr", arrayType),
		datatype.NewStructFieldType("name", datatype.StringType),
	)
	jsonType := datatype.NewJsonType()

	c17 := tableschema.Column{
		Name: "map_type",
		Type: mapType,
	}

	c18 := tableschema.Column{
		Name: "array_type",
		Type: arrayType,
	}

	c19 := tableschema.Column{
		Name: "struct_type",
		Type: structType,
	}

	c20 := tableschema.Column{
		Name: "json_type",
		Type: jsonType,
	}

	p1 := tableschema.Column{
		Name: "p1",
		Type: datatype.BigIntType,
	}

	p2 := tableschema.Column{
		Name: "p2",
		Type: datatype.StringType,
	}

	schemaBuilder := tableschema.NewSchemaBuilder()
	schemaBuilder.Name("all_types_demo").
		Columns(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19, c20).
		PartitionColumns(p1, p2).
		Lifecycle(2) // 單位:天

	schema := schemaBuilder.Build()
	tablesIns := odpsIns.Tables()
	// 如果project的資料類型版本是1.0,需要通過下面的hints使用mc 2.0資料類型
	hints := make(map[string]string)
	hints["odps.sql.type.system.odps2"] = "true"
	hints["odps.sql.decimal.odps2"] = "true"

	err = tablesIns.Create(schema, true, hints, nil)
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

建立聚簇表

建立Hash聚簇表

Hash聚簇表的詳細介紹請參考Hash Clustering

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"log"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	// 建立一個DDL語句為“
	// CREATE TABLE test_hash_clustering (a string, b string, c bigint)
	// PARTITIONED BY (dt string)
	// CLUSTERED BY (c)
	// SORTED by (c) INTO 1024 BUCKETS;”的表。

	c1 := tableschema.Column{
		Name: "a",
		Type: datatype.StringType,
	}

	c2 := tableschema.Column{
		Name: "b",
		Type: datatype.StringType,
	}

	c3 := tableschema.Column{
		Name: "c",
		Type: datatype.BigIntType,
	}

	// 分區列
	pc := tableschema.Column{
		Name: "dt",
		Type: datatype.StringType,
	}

	sb := tableschema.NewSchemaBuilder()

	sb.Name("test_hash_clustering"). // 表名稱
						Columns(c1, c2, c3).                        // 列名
						PartitionColumns(pc).                       // 分區列
						ClusterType(tableschema.CLUSTER_TYPE.Hash). // 聚簇類型為雜湊聚簇(hash clustering)
						ClusterColumns([]string{c3.Name}).          // 指定Cluster Key(即Hash Key)
		// Sort key,可選項,但在大多數情況下,建議和Cluster Key一致,以便取得最佳的最佳化效果。
		ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
		ClusterBucketNum(1024) // Hash分區(Bucket)的數目

	tablesIns := odpsIns.Tables()

	schema := sb.Build()

	println(schema.ToSQLString("test_cluster", "", true))

	err = tablesIns.Create(schema, true, nil, nil)
	if err != nil {
		log.Fatalf("%+v", err)
	}

}
建立Range聚簇表

Range聚簇表的詳細介紹請參考Range Clustering

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"log"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	// 建立一個DDL語句為”
	// CREATE TABLE test_range_clustering (a string, b string, c int)
	// PARTITIONED BY (dt int)
	// RANGE CLUSTERED BY (c)
	// SORTED by (c)
	// INTO 1024 BUCKETS;“的表

	c1 := tableschema.Column{
		Name: "a",
		Type: datatype.StringType,
	}

	c2 := tableschema.Column{
		Name: "b",
		Type: datatype.StringType,
	}

	c3 := tableschema.Column{
		Name: "c",
		Type: datatype.BigIntType,
	}

	// 分區列
	pc := tableschema.Column{
		Name: "dt",
		Type: datatype.StringType,
	}

	sb := tableschema.NewSchemaBuilder()

	sb.Name("test_range_clustering"). // 表名稱
						Columns(c1, c2, c3).                         // 列名
						PartitionColumns(pc).                        // 分區列
						ClusterType(tableschema.CLUSTER_TYPE.Range). // 聚簇類型為Range Clustering
						ClusterColumns([]string{c3.Name}).           // 指定Range Cluster Key
		// Sort key,可選項,但在大多數情況下,建議和Cluster Key一致,以便取得最佳的最佳化效果。
		ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
		ClusterBucketNum(1024) // Bucket數目(可選)

	tablesIns := odpsIns.Tables()

	schema := sb.Build()

	println(schema.ToSQLString("test_cluster", "", true))

	err = tablesIns.Create(schema, true, nil, nil)
	if err != nil {
		log.Fatalf("%+v", err)
	}

}

建立OSS外表

關於建立OSS外表的詳細資料可以參考建立OSS外部表格

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"log"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
        if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	// create external table if not exists go_sdk_regression_testing.`testCreateExternalTableWithUserDefinedStorageHandler` (
	//    `a` STRING ,
	//    `b` STRING ,
	//    `c` BIGINT
	//)
	//	comment 'External table using user defined TextStorageHandler'
	//	partitioned by (`dt` STRING)
	//	stored by 'com.aliyun.odps.udf.example.text.TextStorageHandler'
	//	with serdeproperties('odps.text.option.delimiter'='|', 'my.own.option'='value')
	//	location 'MOCKoss://full/uri/path/to/oss/directory/'
	//	lifecycle 10;

	tableName := "testCreateExternalTableWithUserDefinedStorageHandler"

	c1 := tableschema.Column{
		Name: "a",
		Type: datatype.StringType,
	}

	c2 := tableschema.Column{
		Name: "b",
		Type: datatype.StringType,
	}

	c3 := tableschema.Column{
		Name: "c",
		Type: datatype.BigIntType,
	}

	// partition column
	pc := tableschema.Column{
		Name: "dt",
		Type: datatype.StringType,
	}

	sb := tableschema.NewSchemaBuilder()

	sb.Name(tableName). // table name
		Columns(c1, c2, c3). // columns
		PartitionColumns(pc). // partition columns
		Location("oss://full/uri/path/to/oss/directory/").
		StorageHandler("com.aliyun.odps.udf.example.text.TextStorageHandler").
		Comment("External table using user defined TextStorageHandler").
		Lifecycle(10)

	tablesIns := odpsIns.Tables()

	schema := sb.Build()

	// 定義 properties 映射
	serDeProperties := map[string]string{
		"odps.text.option.delimiter": "|",
		"my.own.option":              "value",
	}

	// 定義 hints 映射
	hints := map[string]string{
		"odps.sql.preparse.odps2":       "lot",
		"odps.sql.planner.mode":         "lot",
		"odps.sql.planner.parser.odps2": "true",
		"odps.sql.ddl.odps2":            "true",
		"odps.compiler.output.format":   "lot,pot",
	}

	sql, err := schema.ToExternalSQLString(odpsIns.DefaultProjectName(), "", true, serDeProperties, nil)
	print(sql)

	err = tablesIns.CreateExternal(schema, true, serDeProperties, nil, hints, nil)

	if err != nil {
		log.Fatalf("%+v", err)
	}
}

擷取表列表

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	ts := project.Tables()

	ts.List(
		func(t *odps.Table, err error) {
			if err != nil {
				log.Fatalf("%+v", err)
			}

			println(t.Name())
		},
		// 按表名首碼過濾
		odps.TableFilter.NamePrefix("all_type"),
		// 按表類型過濾. 其他表類型包括:VirtualView、ExternalTable
		odps.TableFilter.Type(odps.ManagedTable),
	)
}

擷取單個表資訊

判斷表是否存在

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	ok, err := table.Exists()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(ok)
}

擷取表的大小、行數

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")
	
	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	
	// 擷取表大小(以位元組為單位)
	size := table.Size()
	println("size = ", size)
	
	// 擷取表的行數
	rowCount := table.RecordNum()
	println("rowCount = ", rowCount)
}

擷取表的CreatedTime、LastDDLTime、ModifiedTime

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 擷取表的建立時間
	createTime := table.CreatedTime()
	println("create time = ", createTime)
	
	// 擷取最近一次執行DDL操作的時間
	lastDDLTime := table.LastDDLTime()
	println("last ddl time = ", lastDDLTime)
	
	// 擷取最近一次修改表的時間
	lastModifiedTime := table.LastModifiedTime()
	println("last modified time = ", lastModifiedTime)
}

擷取表的所有者

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 擷取表Owner
	owner := table.Owner()
	println("owner is ", owner)
}

擷取表類型

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 擷取表類型
	t := table.Type()
	println("type is ", t)
}

擷取表結構

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
	"strings"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("test_cluster_table")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 擷取表的Schema
	schema := table.Schema()

	println("table name = ", schema.TableName)
	if table.LifeCycle() > 0 {
		println("table lifecycle = ", table.LifeCycle())		
	}

	// 擷取列
	for _, c := range schema.Columns {
		fmt.Printf("column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
	}

	// 擷取分區列
	for _, c := range schema.PartitionColumns {
		fmt.Printf("partition column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
	}

	// 擷取叢集資訊
	if schema.ClusterInfo.ClusterType != "" {
		ci := schema.ClusterInfo
		println("cluster type = ", ci.ClusterType)
		println("cluster columns = ", strings.Join(ci.ClusterCols, ", "))
		println("cluster bucket num = ", ci.BucketNum)
	}
}

刪除表

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("test_cluster_table")

	err = table.Delete()
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

分區管理

擷取分區列表

通過MaxCompute SDK既可以擷取一個表的所有“分區值”列表,也可以擷取一個表的所有“分區對象”列表。“分區對象”中包含分區的一些基本資料,如size、lastModifiedTime等。

擷取分區列表值

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 擷取設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	partitionValues, err := table.GetPartitionValues()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for _, pv := range partitionValues {
		println(pv)
	}

}

擷取分區對象列表

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	partitions, err := table.GetPartitions()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	
	fmt.Printf("get %d partitions\n", len(partitions))
	
	for _, p := range partitions {
        fmt.Printf(	
    		"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
    		p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
    	)
	}
}

擷取單個分區資訊

擷取分區的基本資料

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	p, err := table.GetPartition("p1=20/p2=hangzhou")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	fmt.Printf(
		"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
		p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
	)
}

擷取分區的擴充資訊

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	p, err := table.GetPartition("p1=20/p2=hangzhou")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 擷取基本分區資訊
	fmt.Printf(
		"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
		p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
	)

	// 擷取擴充分區資訊
	err = p.LoadExtended()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	fmt.Printf(
		"isArchived=%t, lifeCycle=%d, physicalSize=%d",
		p.IsArchivedEx(), p.LifeCycleEx(), p.PhysicalSizeEx(),
	)
}

添加分區

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.AddPartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

刪除分區

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.DeletePartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

Instance管理

MaxCompute執行SQL後返回Instance對象,Instance表示MaxCompute SQL作業,用於追蹤SQL執行狀態、結果。

擷取Instance列表

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
	"time"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	timeFormat := "2006-01-02 15:04:05"
	startTime, _ := time.Parse(timeFormat, "2024-10-11 02:15:30")
	endTime, _ := time.Parse(timeFormat, "2024-10-13 06:22:02")

	var f = func(i *odps.Instance) {
		if err != nil {
			log.Fatalf("%+v", err)
		}

		println(
			fmt.Sprintf(
				"%s, %s, %s, %s, %s",
				i.Id(), i.Owner(), i.StartTime().Format(timeFormat), i.EndTime().Format(timeFormat), i.Status(),
			))
	}

	instances := odpsIns.Instances()
	instances.List(
		f,
		odps.InstanceFilter.TimeRange(startTime, endTime),
		odps.InstanceFilter.Status(odps.InstanceTerminated),
	)
}

擷取Instance資訊

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	ins := odpsIns.Instances().Get("<yourInstanceId>")
	err = ins.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	fmt.Printf("owner=%s\n", ins.Owner())
	fmt.Printf("status=%s\n", ins.Status())
	fmt.Printf("startTime=%s\n", ins.StartTime())
	fmt.Printf("endTime=%s\n", ins.EndTime())
	fmt.Printf("result=%+v\n", ins.TaskResults())
}

許可權管理

MaxCompute可以通過操作許可權的相關命令進行許可權管理,關於授權方案詳情,請參見授權方案概述。下述樣本通過DESC ROLE命令查看角色的相關資訊。

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/security"
	"log"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	var restClient = odpsIns.RestClient()

	sm := security.NewSecurityManager(restClient, conf.ProjectName)
	result, err := sm.RunQuery("desc role role_project_admin;", true, "")

	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(fmt.Sprintf("ok: %s", result))
}

Logview

您可以通過Logview查看已提交的MaxCompute作業,並進行Debug調試,詳情請參見使用Logview查看作業運行資訊

package main

import (
	"log"

	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)

func main() {
	// 指定設定檔路徑
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// 設定預設的MaxCompute專案
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	sql := "select * from all_types_demo where p1>0 or p2 > '';"

	// SQL引擎參數, 例如odps.sql.skewjoin
	var hints map[string]string = nil

	// 建立一個SqlTask
	sqlTask := odps.NewSqlTask("select", sql, hints)

	// 使用專案關聯的Quota運行SQL
	project := odpsIns.DefaultProjectName()
	ins, err := sqlTask.Run(odpsIns, project)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	logView, err := odpsIns.LogView().GenerateLogView(ins, 1)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(logView)
}