すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:Go SDKの使用

最終更新日:Jan 17, 2025

ODPSオブジェクトを正常に作成した後、プロジェクトスペース内でテーブルとインスタンスを管理できます。 これは、SQL操作の実行、データのアップロードとダウンロード、テーブルとパーティションの管理、インスタンス管理の処理などの方法で実行できます。

説明

Go SDKのアクセス資格情報の設定」で説明されている方法のいずれかを使用して、ODPSオブジェクトを作成できます。 このトピックでは、すべてのコード例で、config.iniファイルからAccessKeyを読み込むメソッドを使用します。

SQLの実行

SQLTaskオブジェクトまたはMaxCompute SQL Driverのrunメソッドを使用して、さまざまなMaxCompute SQLコマンドを実行します。

SDKの使用

インスタンスオブジェクトを返すSQLTaskオブジェクトのrunメソッドを使用して、さまざまな種類のMaxCompute SQLステートメントを実行できます。 SELECTステートメントを実行するときに、クエリ結果に10,000行を超えるデータが含まれている場合は、Tunnelを使用してクエリ結果全体をダウンロードする必要があります。 クエリ結果に含まれるデータが10,000行未満の場合、インスタンスオブジェクトから直接クエリ結果を取得できます。 次のコードは、例としてSELECTステートメントを使用したSQL実行メソッドを示しています。

例1: SELECTの実行とクエリ結果の取得

10,000行未満のデータを含むクエリ結果の場合、CSVリーダーを使用してデータを直接読み取ることができます。

package main

import (
	"fmt"
	"io"
	"log"

	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	sql := "select * from all_types_demo where p1>0 or p2 > '';"

	// SQL engine parameters, for example, odps.sql.skewjoin 
	var hints map[string]string = nil

	sqlTask := odps.NewSqlTask("select_demo", sql, hints)
	ins, err := sqlTask.Run(odpsIns, odpsIns.DefaultProjectName())
	if err != nil {
		log.Fatalf("%+v", err)
	}

	err = ins.WaitForSuccess()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	csvReader, err := sqlTask.GetSelectResultAsCsv(ins, true)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for {
		record, err := csvReader.Read()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("%+v", err)
		}
		fmt.Printf("%v\n", record)
	}
}

例2: SELECTを実行し、トンネルを使用してクエリ結果を取得

10,000行を超えるクエリ結果の場合、データを取得するにはトンネルが必要です。

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
	"log"
)

func main() {
	// Configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
	    log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	sql := "select * from all_types_demo where p1 = 20 and p2 = 'hangzhou';"

	// SQL engine parameters, for example, odps.sql.skewjoin
	var hints map[string]string = nil

	sqlTask := odps.NewSqlTask("select_demo", sql, hints)

	// Run SQL by using the quota associated with the project
	projectName := odpsIns.DefaultProjectName()

	ins, err := sqlTask.Run(odpsIns, projectName)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	err = ins.WaitForSuccess()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Generate logView to get detailed job information
	lv := odpsIns.LogView()
	lvUrl, err := lv.GenerateLogView(ins, 10)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(lvUrl)

	project := odpsIns.DefaultProject()
	tunnelEndpoint, err := project.GetTunnelEndpoint()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Create a Tunnel instance
	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
	session, err := tunnelIns.CreateInstanceResultDownloadSession(project.Name(), ins.Id())
	if err != nil {
		log.Fatalf("%+v", err)
	}

	start := 0
	step := 200000
	recordCount := session.RecordCount()
	schema := session.Schema()
	total := 0

	for start < recordCount {
		reader, err := session.OpenRecordReader(start, step, 0, nil)
		if err != nil {
			log.Fatalf("%+v", err)
		}

		count := 0
		err = reader.Iterator(func(record data.Record, _err error) {
			count += 1

			if _err != nil {
				return
			}
			
			for i, d := range record {
				if d == nil {
					fmt.Printf("%s=null", schema.Columns[i].Name)
				} else {
					fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
				}
			
				if i < record.Len()-1 {
					fmt.Printf(", ")
				} else {
					fmt.Println()
				}
			}
		})

		if err != nil {
			log.Fatalf("%+v", err)
		}

		start += count
		total += count
		log.Println(count)
		if err = reader.Close(); err != nil {
			log.Fatalf("%+v", err)
		}
	}

	println("total count ", total)
}

MaxCompute SQLドライバーの使用

例1: CREATE TABLEステートメントの実行

package main

import (
	"database/sql"
	"log"
)

func main() {
	// Configure AccessKey in environment variables: ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECURITY
	dsn := "http://<endpoint>?project=<project>&odps.sql.type.system.odps2=true&odps.sql.decimal.odps2=true"

	db, err := sql.Open("odps", dsn)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	sqlStr := "create table table_with_date (date_col DATE);"
	_, err = db.Exec(sqlStr)
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

例2: SELECTステートメントを実行して結果を取得

package main

import (
	"database/sql"
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/sqldriver"
	"log"
	"reflect"
)

func main() {
	config, err := sqldriver.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	dsn := config.FormatDsn()
	// or dsn := "http://<accessId>:<accessKey>@<endpoint>?project=<project>"

	db, err := sql.Open("odps", dsn)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	selectSql := "select * from all_types_demo where bigint_type=@bigint_type and p1=@p1 and p2='@p2';"

	rows, err := db.Query(
		selectSql,
		sql.Named("bigint_type", 100000000000),
		sql.Named("p1", 20),
		sql.Named("p2", "hangzhou"),
	)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	columnTypes, err := rows.ColumnTypes()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	record := make([]interface{}, len(columnTypes))

	for i, columnType := range columnTypes {
		record[i] = reflect.New(columnType.ScanType()).Interface()
		t := reflect.TypeOf(record[i])

		fmt.Printf("kind=%s, name=%s\n", t.Kind(), t.String())
	}

	columns, err := rows.Columns()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for rows.Next() {
		err = rows.Scan(record...)
		if err != nil {
			log.Fatalf("%+v", err)
		}

		for i, r := range record {
			rr := r.(sqldriver.NullAble)

			if rr.IsNull() {
				fmt.Printf("%s=NULL", columns[i])
			} else {
				switch r.(type) {
				case *sqldriver.NullInt8:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt8).Int8)
				case *sqldriver.NullInt16:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt16).Int16)
				case *sqldriver.NullInt32:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt32).Int32)
				case *sqldriver.NullInt64:
					fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt64).Int64)
				case *sqldriver.Binary:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullFloat32:
					fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat32).Float32)
				case *sqldriver.NullFloat64:
					fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat64).Float64)
				case *sqldriver.Decimal:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullString:
					fmt.Printf("%s=%s", columns[i], r.(*sqldriver.NullString).String)
				case *sqldriver.NullDate:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullDateTime:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullTimeStamp:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.NullBool:
					fmt.Printf("%s=%v", columns[i], r.(*sqldriver.NullBool).Bool)
				case *sqldriver.Map:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.Array:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.Struct:
					fmt.Printf("%s=%s", columns[i], r)
				case *sqldriver.Json:
					fmt.Printf("%s=%s", columns[i], r)
				}
			}

			if i < len(record)-1 {
				fmt.Printf(", ")
			} else {
				fmt.Print("\n\n")
			}
		}
	}
}

データのアップロードとダウンロード

Tunnelを使用して、テーブルまたはパーティションのデータのバッチアップロードおよびダウンロードを実行することも、MaxCompute Streaming Tunnelを介してテーブルまたはパーティションにデータを書き込むこともできます。 詳細については、「ストリーミングトンネルの概要」をご参照ください。

トンネルの初期化

Go SDKのアクセス資格情報の設定」で説明されている方法のいずれかを使用して、ODPSオブジェクトを作成できます。 次のコード例では、config.iniファイルからAccessKeyを読み込むメソッドを使用しています。

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
	"log"
)

func main() {
	// Get configuration information from the configuration file
	conf, err := odps.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Initialize ODPS
	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)
	project := odpsIns.DefaultProject()

	// Get Tunnel Endpoint
	tunnelEndpoint, err := project.GetTunnelEndpoint()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	fmt.Println("tunnel endpoint: " + tunnelEndpoint)
	
	// Initialize Tunnel
	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
	
	println("%+v", tunnelIns)
}

バッチデータのアップロード

トンネルの初期化後にテーブルまたはパーティションにデータをアップロードするには、次の手順が必要です。

  1. UploadSessionを開始するには、データのアップロードに使用される圧縮アルゴリズムとともに、データをアップロードするテーブルまたはパーティションを指定する必要があります。

  2. UploadSessionを使用してWriterを作成し、データをアップロードします。 ライターは、データのアップロードを担当します。 単一のWriterによってアップロードされたデータはブロックと呼ばれ、ブロックIDとしてINT値によって識別されます。 アップロード速度を向上させるために、複数のライターを作成してデータを同時にアップロードできます。

  3. Writerがデータをアップロードした後、UploadSession.com mitを呼び出してアップロードプロセスを完了する必要があります。 コミットメソッドでは、ブロックIDリストを指定する必要があります。

    package main
    
    import (
    	"fmt"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
    	"log"
    )
    
    func main() {
    	// Get configuration information from the configuration file
    	conf, err := odps.NewConfigFromIni("./config.ini")
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	// Initialize ODPS
    	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
    	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
    	odpsIns.SetDefaultProjectName(conf.ProjectName)
    	project := odpsIns.DefaultProject()
    
    	// Initialize Tunnel
    	tunnelEndpoint, err := project.GetTunnelEndpoint()
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    	fmt.Println("tunnel endpoint: " + tunnelEndpoint)
    	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
    
            // Create an UploadSession, specify the table or partition to write to
    	session, err := tunnelIns.CreateUploadSession(
    		project.Name(),
    		"all_types_demo",
    		tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
    		tunnel.SessionCfg.WithDefaultDeflateCompressor(),
    	)
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	writerNum := 3
    	blockIds := make([]int, writerNum)
    	for i := 0; i < writerNum; i++ {
    		blockIds[i] = i
    	}
    
    	errChan := make(chan error, writerNum)
    
    	// Concurrently upload data through multiple writers, each writer has a blockId as its identity for the data it writes
    	for _, blockId := range blockIds {
    		blockId := blockId
    		go func() {
    			schema := session.Schema()
    			record, err := makeRecord(schema)
    			if err != nil {
    				errChan <- err
    				return
    			}
    
    			recordWriter, err := session.OpenRecordWriter(blockId)
    			if err != nil {
    				errChan <- err
    				return
    			}
    
    			for i := 0; i < 100; i++ {
    				err = recordWriter.Write(record)
    
    				if err != nil {
    					_ = recordWriter.Close()
    					errChan <- err
    					return
    				}
    			}
    			err = recordWriter.Close()
    			if err == nil {
    				fmt.Printf("success to upload %d record, %d bytes\n", recordWriter.RecordCount(), recordWriter.BytesCount())
    			}
    
    			errChan <- err
    		}()
    	}
    
    	// Wait for all Writers to complete data upload
    	for i := 0; i < writerNum; i++ {
    		err := <-errChan
    
    		if err != nil {
    			log.Fatalf("%+v", err)
    		}
    	}
    
    	// Commit all Blocks to complete the upload, and you can view the data in the table
    	err = session.Commit(blockIds)
    	log.Println("success to commit all blocks")
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    }
    
    func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
    	varchar, _ := data.NewVarChar(500, "varchar")
    	char, _ := data.NewVarChar(254, "char")
    	s := data.String("hello world")
    	date, _ := data.NewDate("2022-10-19")
    	datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
    	timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")
    
    	mapType := schema.Columns[15].Type.(datatype.MapType)
    	mapData := data.NewMapWithType(mapType)
    	err := mapData.Set("hello", 1)
    	if err != nil {
    		return nil, err
    	}
    
    	err = mapData.Set("world", 2)
    	if err != nil {
    		return nil, err
    	}
    
    	arrayType := schema.Columns[16].Type.(datatype.ArrayType)
    	arrayData := data.NewArrayWithType(arrayType)
    	err = arrayData.Append("a")
    	if err != nil {
    		return nil, err
    	}
    
    	err = arrayData.Append("b")
    	if err != nil {
    		return nil, err
    	}
    
    	structType := schema.Columns[17].Type.(datatype.StructType)
    	structData := data.NewStructWithTyp(structType)
    
    	arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
    	err = arr.Append("x")
    	if err != nil {
    		return nil, err
    	}
    	err = arr.Append("y")
    	if err != nil {
    		return nil, err
    	}
    	err = structData.SetField("arr", arr)
    	if err != nil {
    		return nil, err
    	}
    	err = structData.SetField("name", "tom")
    	if err != nil {
    		return nil, err
    	}
    
    	record := []data.Data{
    		data.TinyInt(1),
    		data.SmallInt(32767),
    		data.Int(100),
    		data.BigInt(100000000000),
    		data.Binary("binary"),
    		data.Float(3.14),
    		data.Double(3.1415926),
    		data.NewDecimal(38, 18, "3.1415926"),
    		varchar,
    		char,
    		s,
    		date,
    		datetime,
    		timestamp,
    		data.Bool(true),
    		mapData,
    		arrayData,
    		structData,
    	}
    
    	return record, nil
    }
    

バッチデータのダウンロード

トンネルの初期化後にテーブルまたはパーティションからデータをダウンロードするには、次の手順が必要です。

  1. DownloadSessionを作成するには、データをダウンロードするテーブルまたはパーティションと、データ転送の圧縮アルゴリズムを指定する必要があります。

  2. DownloadSessionを使用してReaderを作成し、ページ付けされた形式でデータをローカルにバッチダウンロードします。

    package main
    
    import (
    	"fmt"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
    	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
    	"log"
    )
    
    func main() {
    	// Read configuration information from the configuration file
    	conf, err := odps.NewConfigFromIni("./config.ini")
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	// Initialize ODPS
    	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
    	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
    	odpsIns.SetDefaultProjectName(conf.ProjectName)
    	project := odpsIns.DefaultProject()
    
    	// Get Tunnel Endpoint
    	tunnelEndpoint, err := project.GetTunnelEndpoint()
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    	fmt.Println("tunnel endpoint: " + tunnelEndpoint)
            tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
    
    	// Create a DownloadSession, specify the table or partition to read
    	session, err := tunnelIns.CreateDownloadSession(
    		project.Name(),
    		"all_types_demo",
    		tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
    	)
    	if err != nil {
    		log.Fatalf("%+v", err)
    	}
    
    	recordCount := session.RecordCount()
    	fmt.Printf("record count is %d\n", recordCount)
    
    	start := 0
    	step := 100001
    	total := 0
    	schema := session.Schema()
    
    	for start < recordCount {
    		reader, err := session.OpenRecordReader(start, step, nil)
    		if err != nil {
    			log.Fatalf("%+v", err)
    		}
    
    		count := 0
    		err = reader.Iterator(func(record data.Record, _err error) {
    			if _err != nil {
    				return
    			}
    
    			for i, d := range record {
    				if d == nil {
    					fmt.Printf("%s=null", schema.Columns[i].Name)
    				} else {
    					fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
    				}
    
    				if i < record.Len()-1 {
    					fmt.Printf(", ")
    				} else {
    					fmt.Println()
    				}
    			}
    		})
    
    		if err != nil {
    			log.Fatalf("%+v", err)
    		}
    
    		start += count
    		total += count
    
    		log.Println(count)
    
    		if err = reader.Close(); err != nil {
    			log.Fatalf("%+v", err)
    		}
    	}
    
    	println("total count ", total)
    }

ストリーミングデータのアップロード

MaxComputeでは、ストリーミングトンネルの概要を使用してテーブルまたはパーティションにデータを書き込むことができます。

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
	"log"
)

func main() {
	// Get configuration information from the configuration file
	conf, err := odps.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Initialize ODPS
	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)
	project := odpsIns.DefaultProject()

	// Get Tunnel Endpoint
	tunnelEndpoint, err := project.GetTunnelEndpoint()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	fmt.Println("tunnel endpoint: " + tunnelEndpoint)

	tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)

	session, err := tunnelIns.CreateStreamUploadSession(
		project.Name(),
		"all_types_demo",
		tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
		tunnel.SessionCfg.WithCreatePartition(), // Create a new partition if the specified partition does not exist
		tunnel.SessionCfg.WithDefaultDeflateCompressor(),
	)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	packWriter := session.OpenRecordPackWriter()

	for i := 0; i < 2; i++ {
		record, err := makeRecord(session.Schema())
		if err != nil {
			log.Fatalf("%+v", err)
		}

		// Add data to packWriter until the data size reaches the threshold
		for packWriter.DataSize() < 64 {
			err = packWriter.Append(record)
			if err != nil {
				log.Fatalf("%+v", err)
			}
		}

		// Refresh data
		traceId, recordCount, bytesSend, err := packWriter.Flush()
		if err != nil {
			log.Fatalf("%+v", err)
		}

		fmt.Printf(
			"success to upload data with traceId=%s, record count=%d, record bytes=%d\n",
			traceId, recordCount, bytesSend,
		)
	}
}

func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
	varchar, _ := data.NewVarChar(500, "varchar")
	char, _ := data.NewVarChar(254, "char")
	s := data.String("hello world")
	date, _ := data.NewDate("2022-10-19")
	datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
	timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")

	mapType := schema.Columns[15].Type.(datatype.MapType)
	mapData := data.NewMapWithType(mapType)
	err := mapData.Set("hello", 1)
	if err != nil {
		return nil, err
	}

	err = mapData.Set("world", 2)
	if err != nil {
		return nil, err
	}

	arrayType := schema.Columns[16].Type.(datatype.ArrayType)
	arrayData := data.NewArrayWithType(arrayType)
	err = arrayData.Append("a")
	if err != nil {
		return nil, err
	}

	err = arrayData.Append("b")
	if err != nil {
		return nil, err
	}

	structType := schema.Columns[17].Type.(datatype.StructType)
	structData := data.NewStructWithTyp(structType)

	arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
	err = arr.Append("x")
	if err != nil {
		return nil, err
	}
	err = arr.Append("y")
	if err != nil {
		return nil, err
	}
	err = structData.SetField("arr", arr)
	if err != nil {
		return nil, err
	}
	err = structData.SetField("name", "tom")
	if err != nil {
		return nil, err
	}

	record := []data.Data{
		data.TinyInt(1),
		data.SmallInt(32767),
		data.Int(100),
		data.BigInt(100000000000),
		data.Binary("binary"),
		data.Float(3.14),
		data.Double(3.1415926),
		data.NewDecimal(38, 18, "3.1415926"),
		varchar,
		char,
		s,
		date,
		datetime,
		timestamp,
		data.Bool(true),
		mapData,
		arrayData,
		structData,
	}

	return record, nil
}

スキーマ管理

package schema

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	// schemas means all Schema in default project
	schemas := odpsIns.Schemas()
	// Get all Schemas in the default project
	schemas.List(func(schema *odps.Schema, err error) {
		print(schema.Name() + "\n")
	})

	// Specify the current Schema
	odpsIns.SetCurrentSchemaName("default_schema")
	// To directly operate on a table, if no Schema is specified, the operation queries the tables under the "default" Schema.
	table := odpsIns.Table("table") // actually, the table name is "project.default_schema.table"
	print(table.SchemaName())

	// Get all tables of a specified Schema. For example: schema_A
	tablesInSchemaA := odps.NewTables(odpsIns, conf.ProjectName, "schema_A")
	tablesInSchemaA.List(func(table *odps.Table, err error) {
		print(table.Name() + "\n")
	})

	// Create a Schema
	schemas.Create("new_schema", false, "comment")

	// Delete the Schema
	schemas.Delete("to_delete_schema")

	// Get Schema metadata
	schema := schemas.Get("new_schema")
	schema.Load()

	schema.Name()
	schema.ProjectName()
	schema.Type()
	schema.Owner()
	schema.Comment()
	schema.CreateTime()
	schema.ModifiedTime()
}

テーブル管理

テーブルの作成

レギュラーテーブルの作成

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"log"
)

func main() {
	conf, err := odps.NewConfigFromIni("./config.ini")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	c1 := tableschema.Column{
		Name: "tiny_int_type",
		Type: datatype.TinyIntType,
	}

	c2 := tableschema.Column{
		Name: "small_int_type",
		Type: datatype.SmallIntType,
	}

	c3 := tableschema.Column{
		Name: "int_type",
		Type: datatype.IntType,
	}

	c4 := tableschema.Column{
		Name: "bigint_type",
		Type: datatype.BigIntType,
	}

	c5 := tableschema.Column{
		Name: "binary_type",
		Type: datatype.BinaryType,
	}

	c6 := tableschema.Column{
		Name: "float_type",
		Type: datatype.FloatType,
	}

	c7 := tableschema.Column{
		Name: "double_type",
		Type: datatype.DoubleType,
	}

	c8 := tableschema.Column{
		Name: "decimal_type",
		Type: datatype.NewDecimalType(10, 8),
	}

	c9 := tableschema.Column{
		Name: "varchar_type",
		Type: datatype.NewVarcharType(500),
	}

	c10 := tableschema.Column{
		Name: "char_type",
		Type: datatype.NewCharType(254),
	}

	c11 := tableschema.Column{
		Name: "string_type",
		Type: datatype.StringType,
	}

	c12 := tableschema.Column{
		Name: "date_type",
		Type: datatype.DateType,
	}

	c13 := tableschema.Column{
		Name: "datetime_type",
		Type: datatype.DateTimeType,
	}

	c14 := tableschema.Column{
		Name: "timestamp_type",
		Type: datatype.TimestampType,
	}

	c15 := tableschema.Column{
		Name: "timestamp_ntz_type",
		Type: datatype.TimestampNtzType,
	}

	c16 := tableschema.Column{
		Name: "boolean_type",
		Type: datatype.BooleanType,
	}

	mapType := datatype.NewMapType(datatype.StringType, datatype.BigIntType)
	arrayType := datatype.NewArrayType(datatype.StringType)
	structType := datatype.NewStructType(
		datatype.NewStructFieldType("arr", arrayType),
		datatype.NewStructFieldType("name", datatype.StringType),
	)
	jsonType := datatype.NewJsonType()

	c17 := tableschema.Column{
		Name: "map_type",
		Type: mapType,
	}

	c18 := tableschema.Column{
		Name: "array_type",
		Type: arrayType,
	}

	c19 := tableschema.Column{
		Name: "struct_type",
		Type: structType,
	}

	c20 := tableschema.Column{
		Name: "json_type",
		Type: jsonType,
	}

	p1 := tableschema.Column{
		Name: "p1",
		Type: datatype.BigIntType,
	}

	p2 := tableschema.Column{
		Name: "p2",
		Type: datatype.StringType,
	}

	schemaBuilder := tableschema.NewSchemaBuilder()
	schemaBuilder.Name("all_types_demo").
		Columns(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19, c20).
		PartitionColumns(p1, p2).
		Lifecycle(2) // Unit: day

	schema := schemaBuilder.Build()
	tablesIns := odpsIns.Tables()
	// If the data type version of the project is 1.0, you need to use the version 2.0 of data tyep by using the hints.
	hints := make(map[string]string)
	hints["odps.sql.type.system.odps2"] = "true"
	hints["odps.sql.decimal.odps2"] = "true"

	err = tablesIns.Create(schema, true, hints, nil)
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

クラスター化テーブル作成

ハッシュクラスタ化テーブルの作成

ハッシュクラスタ化テーブルの詳細については、「ハッシュクラスタリング」をご参照ください。

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	// Create a table which the DDL statement is "
	// CREATE TABLE test_hash_clustering (a string, b string, c bigint)
	// PARTITIONED BY (dt string)
	// CLUSTERED BY (c)
	// SORTED by (c) INTO 1024 BUCKETS;"

	c1 := tableschema.Column{
		Name: "a",
		Type: datatype.StringType,
	}

	c2 := tableschema.Column{
		Name: "b",
		Type: datatype.StringType,
	}

	c3 := tableschema.Column{
		Name: "c",
		Type: datatype.BigIntType,
	}

	// partition
	pc := tableschema.Column{
		Name: "dt",
		Type: datatype.StringType,
	}

	sb := tableschema.NewSchemaBuilder()

	sb.Name("test_hash_clustering"). // the table name
						Columns(c1, c2, c3).                        // columns
						PartitionColumns(pc).                       // partition columns
						ClusterType(tableschema.CLUSTER_TYPE.Hash). // ClusterType is the hash clustering
						ClusterColumns([]string{c3.Name}).          // Specify Cluster Key
		// Sort key (optional), in most sconario, In most cases, we recommended that keep it consistent with the Cluster Key for the best optimization results.
		ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
		ClusterBucketNum(1024) // Bucket number (optional)

	tablesIns := odpsIns.Tables()

	schema := sb.Build()

	println(schema.ToSQLString("test_cluster", "", true))

	err = tablesIns.Create(schema, true, nil, nil)
	if err != nil {
		log.Fatalf("%+v", err)
	}

}
rangeクラスタ化テーブルの作成

range-clusteredテーブルの詳細については、「Range clustering」をご参照ください。

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	// Create a table which the DDL statement is "
	// CREATE TABLE test_range_clustering (a string, b string, c int)
	// PARTITIONED BY (dt int)
	// RANGE CLUSTERED BY (c)
	// SORTED by (c)
	// INTO 1024 BUCKETS;"

	c1 := tableschema.Column{
		Name: "a",
		Type: datatype.StringType,
	}

	c2 := tableschema.Column{
		Name: "b",
		Type: datatype.StringType,
	}

	c3 := tableschema.Column{
		Name: "c",
		Type: datatype.BigIntType,
	}

	// partition
	pc := tableschema.Column{
		Name: "dt",
		Type: datatype.StringType,
	}

	sb := tableschema.NewSchemaBuilder()

	sb.Name("test_range_clustering"). // the table name
						Columns(c1, c2, c3).                         // columns
						PartitionColumns(pc).                        // partition columns
						ClusterType(tableschema.CLUSTER_TYPE.Range). // ClusterType is the Range Clustering
						ClusterColumns([]string{c3.Name}).           // Specify the Range Cluster Key
		// Sort key (optional), in most sconario, In most cases, we recommended that keep it consistent with the Cluster Key for the best optimization results.
		ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
		ClusterBucketNum(1024) // Bucket number (optional)

	tablesIns := odpsIns.Tables()

	schema := sb.Build()

	println(schema.ToSQLString("test_cluster", "", true))

	err = tablesIns.Create(schema, true, nil, nil)
	if err != nil {
		log.Fatalf("%+v", err)
	}

}

OSS外部テーブルの作成

OSS外部テーブルの詳細については、「OSS外部テーブルの作成」をご参照ください。

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
        if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	// create external table if not exists go_sdk_regression_testing.`testCreateExternalTableWithUserDefinedStorageHandler` (
	//    `a` STRING ,
	//    `b` STRING ,
	//    `c` BIGINT
	//)
	//	comment 'External table using user defined TextStorageHandler'
	//	partitioned by (`dt` STRING)
	//	stored by 'com.aliyun.odps.udf.example.text.TextStorageHandler'
	//	with serdeproperties('odps.text.option.delimiter'='|', 'my.own.option'='value')
	//	location 'MOCKoss://full/uri/path/to/oss/directory/'
	//	lifecycle 10;

	tableName := "testCreateExternalTableWithUserDefinedStorageHandler"

	c1 := tableschema.Column{
		Name: "a",
		Type: datatype.StringType,
	}

	c2 := tableschema.Column{
		Name: "b",
		Type: datatype.StringType,
	}

	c3 := tableschema.Column{
		Name: "c",
		Type: datatype.BigIntType,
	}

	// partition column
	pc := tableschema.Column{
		Name: "dt",
		Type: datatype.StringType,
	}

	sb := tableschema.NewSchemaBuilder()

	sb.Name(tableName). // the table name
		Columns(c1, c2, c3). // columns
		PartitionColumns(pc). // partition columns
		Location("oss://full/uri/path/to/oss/directory/").
		StorageHandler("com.aliyun.odps.udf.example.text.TextStorageHandler").
		Comment("External table using user defined TextStorageHandler").
		Lifecycle(10)

	tablesIns := odpsIns.Tables()

	schema := sb.Build()

	// define the properties mapping
	serDeProperties := map[string]string{
		"odps.text.option.delimiter": "|",
		"my.own.option":              "value",
	}

	// define the hints mapping
	hints := map[string]string{
		"odps.sql.preparse.odps2":       "lot",
		"odps.sql.planner.mode":         "lot",
		"odps.sql.planner.parser.odps2": "true",
		"odps.sql.ddl.odps2":            "true",
		"odps.compiler.output.format":   "lot,pot",
	}

	sql, err := schema.ToExternalSQLString(odpsIns.DefaultProjectName(), "", true, serDeProperties, nil)
	print(sql)

	err = tablesIns.CreateExternal(schema, true, serDeProperties, nil, hints, nil)

	if err != nil {
		log.Fatalf("%+v", err)
	}
}

テーブルリストの取得

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	ts := project.Tables()

	ts.List(
		func(t *odps.Table, err error) {
			if err != nil {
				log.Fatalf("%+v", err)
			}

			println(t.Name())
		},
		// Filter by table name prefix
		odps.TableFilter.NamePrefix("all_type"),
		// Filter by table type. Other table types include: VirtualView, ExternalTable
		odps.TableFilter.Type(odps.ManagedTable),
	)
}

シングルテーブル情報の取得

テーブルが存在するかどうかを確認する

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	ok, err := table.Exists()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(ok)
}

テーブルサイズと行数を取得する

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")
	
	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	
	// Get table size (in bytes)
	size := table.Size()
	println("size = ", size)
	
	// Get table row count
	rowCount := table.RecordNum()
	println("rowCount = ", rowCount)
}

テーブルCreatedTime、LastDDLTime、ModifiedTimeを取得する

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Get table creation time
	createTime := table.CreatedTime()
	println("create time = ", createTime)
	
	// Get the time of the last DDL operation
	lastDDLTime := table.LastDDLTime()
	println("last ddl time = ", lastDDLTime)
	
	// Get the last modification time of the table
	lastModifiedTime := table.LastModifiedTime()
	println("last modified time = ", lastModifiedTime)
}

テーブル所有者を取得

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Get table Owner
	owner := table.Owner()
	println("owner is ", owner)
}

テーブルタイプを取得する

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Get table type
	t := table.Type()
	println("type is ", t)
}

テーブル構造を取得するGet table structure

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
	"strings"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("test_cluster_table")

	err = table.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Get the table Schema
	schema := table.Schema()

	println("table name = ", schema.TableName)
	if table.LifeCycle() > 0 {
		println("table lifecycle = ", table.LifeCycle())		
	}

	// Get columns
	for _, c := range schema.Columns {
		fmt.Printf("column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
	}

	// Get partition columns
	for _, c := range schema.PartitionColumns {
		fmt.Printf("partition column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
	}

	// Get the cluster information
	if schema.ClusterInfo.ClusterType != "" {
		ci := schema.ClusterInfo
		println("cluster type = ", ci.ClusterType)
		println("cluster columns = ", strings.Join(ci.ClusterCols, ", "))
		println("cluster bucket num = ", ci.BucketNum)
	}
}

テーブルの削除

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("test_cluster_table")

	err = table.Delete()
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

パーティション管理

パーティション一覧の取得

MaxCompute SDKは、テーブルのすべてのパーティション値とパーティションオブジェクトのリストの取得をサポートしています。 パーティションオブジェクトには、サイズやlastModifiedTimeなどのパーティション情報が含まれます。

パーティションリストの値を取得するGet partition list values

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Get the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	partitionValues, err := table.GetPartitionValues()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for _, pv := range partitionValues {
		println(pv)
	}

}

パーティションオブジェクト一覧の取得Get partition object list

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	partitions, err := table.GetPartitions()
	if err != nil {
		log.Fatalf("%+v", err)
	}
	
	fmt.Printf("get %d partitions\n", len(partitions))
	
	for _, p := range partitions {
        fmt.Printf(	
    		"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
    		p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
    	)
	}
}

単一パーティション情報の取得

基本パーティション情報の取得

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	p, err := table.GetPartition("p1=20/p2=hangzhou")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	fmt.Printf(
		"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
		p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
	)
}

拡張パーティション情報の取得

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	p, err := table.GetPartition("p1=20/p2=hangzhou")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// Get basic partition information
	fmt.Printf(
		"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
		p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
	)

	// Get extended partition information
	err = p.LoadExtended()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	fmt.Printf(
		"isArchived=%t, lifeCycle=%d, physicalSize=%d",
		p.IsArchivedEx(), p.LifeCycleEx(), p.PhysicalSizeEx(),
	)
}

パーティションの追加

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.AddPartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

パーティションの削除

package main

import (
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	project := odpsIns.Project(conf.ProjectName)
	tables := project.Tables()
	table := tables.Get("all_types_demo")

	err = table.DeletePartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
	if err != nil {
		log.Fatalf("%+v", err)
	}
}

インスタンス管理

MaxComputeは、SQL実行後にインスタンスオブジェクトを返します。 このインスタンスオブジェクトはMaxComputeジョブを表し、実行ステータスと結果を追跡できます。

インスタンスリストの取得

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
	"time"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	timeFormat := "2006-01-02 15:04:05"
	startTime, _ := time.Parse(timeFormat, "2024-10-11 02:15:30")
	endTime, _ := time.Parse(timeFormat, "2024-10-13 06:22:02")

	var f = func(i *odps.Instance) {
		if err != nil {
			log.Fatalf("%+v", err)
		}

		println(
			fmt.Sprintf(
				"%s, %s, %s, %s, %s",
				i.Id(), i.Owner(), i.StartTime().Format(timeFormat), i.EndTime().Format(timeFormat), i.Status(),
			))
	}

	instances := odpsIns.Instances()
	instances.List(
		f,
		odps.InstanceFilter.TimeRange(startTime, endTime),
		odps.InstanceFilter.Status(odps.InstanceTerminated),
	)
}

インスタンス情報の取得

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	ins := odpsIns.Instances().Get("<yourInstanceId>")
	err = ins.Load()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	fmt.Printf("owner=%s\n", ins.Owner())
	fmt.Printf("status=%s\n", ins.Status())
	fmt.Printf("startTime=%s\n", ins.StartTime())
	fmt.Printf("endTime=%s\n", ins.EndTime())
	fmt.Printf("result=%+v\n", ins.TaskResults())
}

権限管理

MaxComputeは、特定のコマンドによる権限管理を容易にします。 承認スキームの概要については、「概要」をご参照ください。 次の例は、DESC ROLEコマンドを使用してロール関連情報を表示する方法を示しています。

package main

import (
	"fmt"
	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/security"
	"log"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	var restClient = odpsIns.RestClient()

	sm := security.NewSecurityManager(restClient, conf.ProjectName)
	result, err := sm.RunQuery("desc role role_project_admin;", true, "")

	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(fmt.Sprintf("ok: %s", result))
}

ログビュー

Logviewでは、送信されたMaxComputeジョブを表示し、デバッグテストを実行できます。 詳細については、「LogViewを使用したジョブ情報の表示」をご参照ください。

package main

import (
	"log"

	"github.com/aliyun/aliyun-odps-go-sdk/odps"
	"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)

func main() {
	// Specify the configuration file path
	configPath := "./config.ini"
	conf, err := odps.NewConfigFromIni(configPath)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
	odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
	// Set the default MaxCompute project
	odpsIns.SetDefaultProjectName(conf.ProjectName)

	sql := "select * from all_types_demo where p1>0 or p2 > '';"

	// SQL engine parameters, for example, odps.sql.skewjoin
	var hints map[string]string = nil

	// Create a SqlTask
	sqlTask := odps.NewSqlTask("select", sql, hints)

	// Run SQL by using the quota associated with the project
	project := odpsIns.DefaultProjectName()
	ins, err := sqlTask.Run(odpsIns, project)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	logView, err := odpsIns.LogView().GenerateLogView(ins, 1)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(logView)
}