ODPSオブジェクトを正常に作成した後、プロジェクトスペース内でテーブルとインスタンスを管理できます。 これは、SQL操作の実行、データのアップロードとダウンロード、テーブルとパーティションの管理、インスタンス管理の処理などの方法で実行できます。
「Go SDKのアクセス資格情報の設定」で説明されている方法のいずれかを使用して、ODPSオブジェクトを作成できます。 このトピックでは、すべてのコード例で、config.ini
ファイルからAccessKeyを読み込むメソッドを使用します。
SQLの実行
SQLTask
オブジェクトまたはMaxCompute SQL Driverのrun
メソッドを使用して、さまざまなMaxCompute SQLコマンドを実行します。
SDKの使用
インスタンスオブジェクトを返すSQLTask
オブジェクトのrun
メソッドを使用して、さまざまな種類のMaxCompute SQLステートメントを実行できます。 SELECT
ステートメントを実行するときに、クエリ結果に10,000行を超えるデータが含まれている場合は、Tunnelを使用してクエリ結果全体をダウンロードする必要があります。 クエリ結果に含まれるデータが10,000行未満の場合、インスタンスオブジェクトから直接クエリ結果を取得できます。 次のコードは、例としてSELECT
ステートメントを使用したSQL実行メソッドを示しています。
例1: SELECTの実行とクエリ結果の取得
10,000行未満のデータを含むクエリ結果の場合、CSVリーダーを使用してデータを直接読み取ることができます。
package main
import (
"fmt"
"io"
"log"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
sql := "select * from all_types_demo where p1>0 or p2 > '';"
// SQL engine parameters, for example, odps.sql.skewjoin
var hints map[string]string = nil
sqlTask := odps.NewSqlTask("select_demo", sql, hints)
ins, err := sqlTask.Run(odpsIns, odpsIns.DefaultProjectName())
if err != nil {
log.Fatalf("%+v", err)
}
err = ins.WaitForSuccess()
if err != nil {
log.Fatalf("%+v", err)
}
csvReader, err := sqlTask.GetSelectResultAsCsv(ins, true)
if err != nil {
log.Fatalf("%+v", err)
}
for {
record, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf("%v\n", record)
}
}
例2: SELECTを実行し、トンネルを使用してクエリ結果を取得
10,000行を超えるクエリ結果の場合、データを取得するにはトンネルが必要です。
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// Configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
sql := "select * from all_types_demo where p1 = 20 and p2 = 'hangzhou';"
// SQL engine parameters, for example, odps.sql.skewjoin
var hints map[string]string = nil
sqlTask := odps.NewSqlTask("select_demo", sql, hints)
// Run SQL by using the quota associated with the project
projectName := odpsIns.DefaultProjectName()
ins, err := sqlTask.Run(odpsIns, projectName)
if err != nil {
log.Fatalf("%+v", err)
}
err = ins.WaitForSuccess()
if err != nil {
log.Fatalf("%+v", err)
}
// Generate logView to get detailed job information
lv := odpsIns.LogView()
lvUrl, err := lv.GenerateLogView(ins, 10)
if err != nil {
log.Fatalf("%+v", err)
}
println(lvUrl)
project := odpsIns.DefaultProject()
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
// Create a Tunnel instance
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
session, err := tunnelIns.CreateInstanceResultDownloadSession(project.Name(), ins.Id())
if err != nil {
log.Fatalf("%+v", err)
}
start := 0
step := 200000
recordCount := session.RecordCount()
schema := session.Schema()
total := 0
for start < recordCount {
reader, err := session.OpenRecordReader(start, step, 0, nil)
if err != nil {
log.Fatalf("%+v", err)
}
count := 0
err = reader.Iterator(func(record data.Record, _err error) {
count += 1
if _err != nil {
return
}
for i, d := range record {
if d == nil {
fmt.Printf("%s=null", schema.Columns[i].Name)
} else {
fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
}
if i < record.Len()-1 {
fmt.Printf(", ")
} else {
fmt.Println()
}
}
})
if err != nil {
log.Fatalf("%+v", err)
}
start += count
total += count
log.Println(count)
if err = reader.Close(); err != nil {
log.Fatalf("%+v", err)
}
}
println("total count ", total)
}
MaxCompute SQLドライバーの使用
例1: CREATE TABLEステートメントの実行
package main
import (
"database/sql"
"log"
)
func main() {
// Configure AccessKey in environment variables: ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECURITY
dsn := "http://<endpoint>?project=<project>&odps.sql.type.system.odps2=true&odps.sql.decimal.odps2=true"
db, err := sql.Open("odps", dsn)
if err != nil {
log.Fatalf("%+v", err)
}
sqlStr := "create table table_with_date (date_col DATE);"
_, err = db.Exec(sqlStr)
if err != nil {
log.Fatalf("%+v", err)
}
}
例2: SELECTステートメントを実行して結果を取得
package main
import (
"database/sql"
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/sqldriver"
"log"
"reflect"
)
func main() {
config, err := sqldriver.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
dsn := config.FormatDsn()
// or dsn := "http://<accessId>:<accessKey>@<endpoint>?project=<project>"
db, err := sql.Open("odps", dsn)
if err != nil {
log.Fatalf("%+v", err)
}
selectSql := "select * from all_types_demo where bigint_type=@bigint_type and p1=@p1 and p2='@p2';"
rows, err := db.Query(
selectSql,
sql.Named("bigint_type", 100000000000),
sql.Named("p1", 20),
sql.Named("p2", "hangzhou"),
)
if err != nil {
log.Fatalf("%+v", err)
}
columnTypes, err := rows.ColumnTypes()
if err != nil {
log.Fatalf("%+v", err)
}
record := make([]interface{}, len(columnTypes))
for i, columnType := range columnTypes {
record[i] = reflect.New(columnType.ScanType()).Interface()
t := reflect.TypeOf(record[i])
fmt.Printf("kind=%s, name=%s\n", t.Kind(), t.String())
}
columns, err := rows.Columns()
if err != nil {
log.Fatalf("%+v", err)
}
for rows.Next() {
err = rows.Scan(record...)
if err != nil {
log.Fatalf("%+v", err)
}
for i, r := range record {
rr := r.(sqldriver.NullAble)
if rr.IsNull() {
fmt.Printf("%s=NULL", columns[i])
} else {
switch r.(type) {
case *sqldriver.NullInt8:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt8).Int8)
case *sqldriver.NullInt16:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt16).Int16)
case *sqldriver.NullInt32:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt32).Int32)
case *sqldriver.NullInt64:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt64).Int64)
case *sqldriver.Binary:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullFloat32:
fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat32).Float32)
case *sqldriver.NullFloat64:
fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat64).Float64)
case *sqldriver.Decimal:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullString:
fmt.Printf("%s=%s", columns[i], r.(*sqldriver.NullString).String)
case *sqldriver.NullDate:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullDateTime:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullTimeStamp:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullBool:
fmt.Printf("%s=%v", columns[i], r.(*sqldriver.NullBool).Bool)
case *sqldriver.Map:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.Array:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.Struct:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.Json:
fmt.Printf("%s=%s", columns[i], r)
}
}
if i < len(record)-1 {
fmt.Printf(", ")
} else {
fmt.Print("\n\n")
}
}
}
}
データのアップロードとダウンロード
Tunnelを使用して、テーブルまたはパーティションのデータのバッチアップロードおよびダウンロードを実行することも、MaxCompute Streaming Tunnelを介してテーブルまたはパーティションにデータを書き込むこともできます。 詳細については、「ストリーミングトンネルの概要」をご参照ください。
トンネルの初期化
「Go SDKのアクセス資格情報の設定」で説明されている方法のいずれかを使用して、ODPSオブジェクトを作成できます。 次のコード例では、config.ini
ファイルからAccessKeyを読み込むメソッドを使用しています。
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// Get configuration information from the configuration file
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
// Initialize ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()
// Get Tunnel Endpoint
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)
// Initialize Tunnel
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
println("%+v", tunnelIns)
}
バッチデータのアップロード
トンネルの初期化後にテーブルまたはパーティションにデータをアップロードするには、次の手順が必要です。
UploadSession
を開始するには、データのアップロードに使用される圧縮アルゴリズムとともに、データをアップロードするテーブルまたはパーティションを指定する必要があります。UploadSession
を使用してWriter
を作成し、データをアップロードします。ライター
は、データのアップロードを担当します。 単一のWriter
によってアップロードされたデータはブロックと呼ばれ、ブロックIDとしてINT
値によって識別されます。 アップロード速度を向上させるために、複数のライター
を作成してデータを同時にアップロードできます。Writerがデータをアップロードした後、
UploadSession.com mit
を呼び出してアップロードプロセスを完了する必要があります。 コミットメソッドでは、ブロックIDリストを指定する必要があります。package main import ( "fmt" "github.com/aliyun/aliyun-odps-go-sdk/odps" "github.com/aliyun/aliyun-odps-go-sdk/odps/account" "github.com/aliyun/aliyun-odps-go-sdk/odps/data" "github.com/aliyun/aliyun-odps-go-sdk/odps/datatype" "github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema" "github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel" "log" ) func main() { // Get configuration information from the configuration file conf, err := odps.NewConfigFromIni("./config.ini") if err != nil { log.Fatalf("%+v", err) } // Initialize ODPS aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey) odpsIns := odps.NewOdps(aliAccount, conf.Endpoint) odpsIns.SetDefaultProjectName(conf.ProjectName) project := odpsIns.DefaultProject() // Initialize Tunnel tunnelEndpoint, err := project.GetTunnelEndpoint() if err != nil { log.Fatalf("%+v", err) } fmt.Println("tunnel endpoint: " + tunnelEndpoint) tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint) // Create an UploadSession, specify the table or partition to write to session, err := tunnelIns.CreateUploadSession( project.Name(), "all_types_demo", tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"), tunnel.SessionCfg.WithDefaultDeflateCompressor(), ) if err != nil { log.Fatalf("%+v", err) } writerNum := 3 blockIds := make([]int, writerNum) for i := 0; i < writerNum; i++ { blockIds[i] = i } errChan := make(chan error, writerNum) // Concurrently upload data through multiple writers, each writer has a blockId as its identity for the data it writes for _, blockId := range blockIds { blockId := blockId go func() { schema := session.Schema() record, err := makeRecord(schema) if err != nil { errChan <- err return } recordWriter, err := session.OpenRecordWriter(blockId) if err != nil { errChan <- err return } for i := 0; i < 100; i++ { err = recordWriter.Write(record) if err != nil { _ = recordWriter.Close() errChan <- err return } } err = recordWriter.Close() if err == nil { fmt.Printf("success to upload %d record, %d bytes\n", recordWriter.RecordCount(), recordWriter.BytesCount()) } errChan <- err }() } // Wait for all Writers to complete data upload for i := 0; i < writerNum; i++ { err := <-errChan if err != nil { log.Fatalf("%+v", err) } } // Commit all Blocks to complete the upload, and you can view the data in the table err = session.Commit(blockIds) log.Println("success to commit all blocks") if err != nil { log.Fatalf("%+v", err) } } func makeRecord(schema tableschema.TableSchema) (data.Record, error) { varchar, _ := data.NewVarChar(500, "varchar") char, _ := data.NewVarChar(254, "char") s := data.String("hello world") date, _ := data.NewDate("2022-10-19") datetime, _ := data.NewDateTime("2022-10-19 17:00:00") timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000") mapType := schema.Columns[15].Type.(datatype.MapType) mapData := data.NewMapWithType(mapType) err := mapData.Set("hello", 1) if err != nil { return nil, err } err = mapData.Set("world", 2) if err != nil { return nil, err } arrayType := schema.Columns[16].Type.(datatype.ArrayType) arrayData := data.NewArrayWithType(arrayType) err = arrayData.Append("a") if err != nil { return nil, err } err = arrayData.Append("b") if err != nil { return nil, err } structType := schema.Columns[17].Type.(datatype.StructType) structData := data.NewStructWithTyp(structType) arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType)) err = arr.Append("x") if err != nil { return nil, err } err = arr.Append("y") if err != nil { return nil, err } err = structData.SetField("arr", arr) if err != nil { return nil, err } err = structData.SetField("name", "tom") if err != nil { return nil, err } record := []data.Data{ data.TinyInt(1), data.SmallInt(32767), data.Int(100), data.BigInt(100000000000), data.Binary("binary"), data.Float(3.14), data.Double(3.1415926), data.NewDecimal(38, 18, "3.1415926"), varchar, char, s, date, datetime, timestamp, data.Bool(true), mapData, arrayData, structData, } return record, nil }
バッチデータのダウンロード
トンネルの初期化後にテーブルまたはパーティションからデータをダウンロードするには、次の手順が必要です。
DownloadSession
を作成するには、データをダウンロードするテーブルまたはパーティションと、データ転送の圧縮アルゴリズムを指定する必要があります。DownloadSession
を使用してReader
を作成し、ページ付けされた形式でデータをローカルにバッチダウンロードします。package main import ( "fmt" "github.com/aliyun/aliyun-odps-go-sdk/odps" "github.com/aliyun/aliyun-odps-go-sdk/odps/account" "github.com/aliyun/aliyun-odps-go-sdk/odps/data" "github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel" "log" ) func main() { // Read configuration information from the configuration file conf, err := odps.NewConfigFromIni("./config.ini") if err != nil { log.Fatalf("%+v", err) } // Initialize ODPS aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey) odpsIns := odps.NewOdps(aliAccount, conf.Endpoint) odpsIns.SetDefaultProjectName(conf.ProjectName) project := odpsIns.DefaultProject() // Get Tunnel Endpoint tunnelEndpoint, err := project.GetTunnelEndpoint() if err != nil { log.Fatalf("%+v", err) } fmt.Println("tunnel endpoint: " + tunnelEndpoint) tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint) // Create a DownloadSession, specify the table or partition to read session, err := tunnelIns.CreateDownloadSession( project.Name(), "all_types_demo", tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"), ) if err != nil { log.Fatalf("%+v", err) } recordCount := session.RecordCount() fmt.Printf("record count is %d\n", recordCount) start := 0 step := 100001 total := 0 schema := session.Schema() for start < recordCount { reader, err := session.OpenRecordReader(start, step, nil) if err != nil { log.Fatalf("%+v", err) } count := 0 err = reader.Iterator(func(record data.Record, _err error) { if _err != nil { return } for i, d := range record { if d == nil { fmt.Printf("%s=null", schema.Columns[i].Name) } else { fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql()) } if i < record.Len()-1 { fmt.Printf(", ") } else { fmt.Println() } } }) if err != nil { log.Fatalf("%+v", err) } start += count total += count log.Println(count) if err = reader.Close(); err != nil { log.Fatalf("%+v", err) } } println("total count ", total) }
ストリーミングデータのアップロード
MaxComputeでは、ストリーミングトンネルの概要を使用してテーブルまたはパーティションにデータを書き込むことができます。
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// Get configuration information from the configuration file
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
// Initialize ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()
// Get Tunnel Endpoint
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
session, err := tunnelIns.CreateStreamUploadSession(
project.Name(),
"all_types_demo",
tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
tunnel.SessionCfg.WithCreatePartition(), // Create a new partition if the specified partition does not exist
tunnel.SessionCfg.WithDefaultDeflateCompressor(),
)
if err != nil {
log.Fatalf("%+v", err)
}
packWriter := session.OpenRecordPackWriter()
for i := 0; i < 2; i++ {
record, err := makeRecord(session.Schema())
if err != nil {
log.Fatalf("%+v", err)
}
// Add data to packWriter until the data size reaches the threshold
for packWriter.DataSize() < 64 {
err = packWriter.Append(record)
if err != nil {
log.Fatalf("%+v", err)
}
}
// Refresh data
traceId, recordCount, bytesSend, err := packWriter.Flush()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf(
"success to upload data with traceId=%s, record count=%d, record bytes=%d\n",
traceId, recordCount, bytesSend,
)
}
}
func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
varchar, _ := data.NewVarChar(500, "varchar")
char, _ := data.NewVarChar(254, "char")
s := data.String("hello world")
date, _ := data.NewDate("2022-10-19")
datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")
mapType := schema.Columns[15].Type.(datatype.MapType)
mapData := data.NewMapWithType(mapType)
err := mapData.Set("hello", 1)
if err != nil {
return nil, err
}
err = mapData.Set("world", 2)
if err != nil {
return nil, err
}
arrayType := schema.Columns[16].Type.(datatype.ArrayType)
arrayData := data.NewArrayWithType(arrayType)
err = arrayData.Append("a")
if err != nil {
return nil, err
}
err = arrayData.Append("b")
if err != nil {
return nil, err
}
structType := schema.Columns[17].Type.(datatype.StructType)
structData := data.NewStructWithTyp(structType)
arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
err = arr.Append("x")
if err != nil {
return nil, err
}
err = arr.Append("y")
if err != nil {
return nil, err
}
err = structData.SetField("arr", arr)
if err != nil {
return nil, err
}
err = structData.SetField("name", "tom")
if err != nil {
return nil, err
}
record := []data.Data{
data.TinyInt(1),
data.SmallInt(32767),
data.Int(100),
data.BigInt(100000000000),
data.Binary("binary"),
data.Float(3.14),
data.Double(3.1415926),
data.NewDecimal(38, 18, "3.1415926"),
varchar,
char,
s,
date,
datetime,
timestamp,
data.Bool(true),
mapData,
arrayData,
structData,
}
return record, nil
}
スキーマ管理
package schema
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
// schemas means all Schema in default project
schemas := odpsIns.Schemas()
// Get all Schemas in the default project
schemas.List(func(schema *odps.Schema, err error) {
print(schema.Name() + "\n")
})
// Specify the current Schema
odpsIns.SetCurrentSchemaName("default_schema")
// To directly operate on a table, if no Schema is specified, the operation queries the tables under the "default" Schema.
table := odpsIns.Table("table") // actually, the table name is "project.default_schema.table"
print(table.SchemaName())
// Get all tables of a specified Schema. For example: schema_A
tablesInSchemaA := odps.NewTables(odpsIns, conf.ProjectName, "schema_A")
tablesInSchemaA.List(func(table *odps.Table, err error) {
print(table.Name() + "\n")
})
// Create a Schema
schemas.Create("new_schema", false, "comment")
// Delete the Schema
schemas.Delete("to_delete_schema")
// Get Schema metadata
schema := schemas.Get("new_schema")
schema.Load()
schema.Name()
schema.ProjectName()
schema.Type()
schema.Owner()
schema.Comment()
schema.CreateTime()
schema.ModifiedTime()
}
テーブル管理
テーブルの作成
レギュラーテーブルの作成
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
c1 := tableschema.Column{
Name: "tiny_int_type",
Type: datatype.TinyIntType,
}
c2 := tableschema.Column{
Name: "small_int_type",
Type: datatype.SmallIntType,
}
c3 := tableschema.Column{
Name: "int_type",
Type: datatype.IntType,
}
c4 := tableschema.Column{
Name: "bigint_type",
Type: datatype.BigIntType,
}
c5 := tableschema.Column{
Name: "binary_type",
Type: datatype.BinaryType,
}
c6 := tableschema.Column{
Name: "float_type",
Type: datatype.FloatType,
}
c7 := tableschema.Column{
Name: "double_type",
Type: datatype.DoubleType,
}
c8 := tableschema.Column{
Name: "decimal_type",
Type: datatype.NewDecimalType(10, 8),
}
c9 := tableschema.Column{
Name: "varchar_type",
Type: datatype.NewVarcharType(500),
}
c10 := tableschema.Column{
Name: "char_type",
Type: datatype.NewCharType(254),
}
c11 := tableschema.Column{
Name: "string_type",
Type: datatype.StringType,
}
c12 := tableschema.Column{
Name: "date_type",
Type: datatype.DateType,
}
c13 := tableschema.Column{
Name: "datetime_type",
Type: datatype.DateTimeType,
}
c14 := tableschema.Column{
Name: "timestamp_type",
Type: datatype.TimestampType,
}
c15 := tableschema.Column{
Name: "timestamp_ntz_type",
Type: datatype.TimestampNtzType,
}
c16 := tableschema.Column{
Name: "boolean_type",
Type: datatype.BooleanType,
}
mapType := datatype.NewMapType(datatype.StringType, datatype.BigIntType)
arrayType := datatype.NewArrayType(datatype.StringType)
structType := datatype.NewStructType(
datatype.NewStructFieldType("arr", arrayType),
datatype.NewStructFieldType("name", datatype.StringType),
)
jsonType := datatype.NewJsonType()
c17 := tableschema.Column{
Name: "map_type",
Type: mapType,
}
c18 := tableschema.Column{
Name: "array_type",
Type: arrayType,
}
c19 := tableschema.Column{
Name: "struct_type",
Type: structType,
}
c20 := tableschema.Column{
Name: "json_type",
Type: jsonType,
}
p1 := tableschema.Column{
Name: "p1",
Type: datatype.BigIntType,
}
p2 := tableschema.Column{
Name: "p2",
Type: datatype.StringType,
}
schemaBuilder := tableschema.NewSchemaBuilder()
schemaBuilder.Name("all_types_demo").
Columns(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19, c20).
PartitionColumns(p1, p2).
Lifecycle(2) // Unit: day
schema := schemaBuilder.Build()
tablesIns := odpsIns.Tables()
// If the data type version of the project is 1.0, you need to use the version 2.0 of data tyep by using the hints.
hints := make(map[string]string)
hints["odps.sql.type.system.odps2"] = "true"
hints["odps.sql.decimal.odps2"] = "true"
err = tablesIns.Create(schema, true, hints, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}
クラスター化テーブルの作成
ハッシュクラスタ化テーブルの作成
ハッシュクラスタ化テーブルの詳細については、「ハッシュクラスタリング」をご参照ください。
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
// Create a table which the DDL statement is "
// CREATE TABLE test_hash_clustering (a string, b string, c bigint)
// PARTITIONED BY (dt string)
// CLUSTERED BY (c)
// SORTED by (c) INTO 1024 BUCKETS;"
c1 := tableschema.Column{
Name: "a",
Type: datatype.StringType,
}
c2 := tableschema.Column{
Name: "b",
Type: datatype.StringType,
}
c3 := tableschema.Column{
Name: "c",
Type: datatype.BigIntType,
}
// partition
pc := tableschema.Column{
Name: "dt",
Type: datatype.StringType,
}
sb := tableschema.NewSchemaBuilder()
sb.Name("test_hash_clustering"). // the table name
Columns(c1, c2, c3). // columns
PartitionColumns(pc). // partition columns
ClusterType(tableschema.CLUSTER_TYPE.Hash). // ClusterType is the hash clustering
ClusterColumns([]string{c3.Name}). // Specify Cluster Key
// Sort key (optional), in most sconario, In most cases, we recommended that keep it consistent with the Cluster Key for the best optimization results.
ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
ClusterBucketNum(1024) // Bucket number (optional)
tablesIns := odpsIns.Tables()
schema := sb.Build()
println(schema.ToSQLString("test_cluster", "", true))
err = tablesIns.Create(schema, true, nil, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}
rangeクラスタ化テーブルの作成
range-clusteredテーブルの詳細については、「Range clustering」をご参照ください。
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
// Create a table which the DDL statement is "
// CREATE TABLE test_range_clustering (a string, b string, c int)
// PARTITIONED BY (dt int)
// RANGE CLUSTERED BY (c)
// SORTED by (c)
// INTO 1024 BUCKETS;"
c1 := tableschema.Column{
Name: "a",
Type: datatype.StringType,
}
c2 := tableschema.Column{
Name: "b",
Type: datatype.StringType,
}
c3 := tableschema.Column{
Name: "c",
Type: datatype.BigIntType,
}
// partition
pc := tableschema.Column{
Name: "dt",
Type: datatype.StringType,
}
sb := tableschema.NewSchemaBuilder()
sb.Name("test_range_clustering"). // the table name
Columns(c1, c2, c3). // columns
PartitionColumns(pc). // partition columns
ClusterType(tableschema.CLUSTER_TYPE.Range). // ClusterType is the Range Clustering
ClusterColumns([]string{c3.Name}). // Specify the Range Cluster Key
// Sort key (optional), in most sconario, In most cases, we recommended that keep it consistent with the Cluster Key for the best optimization results.
ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
ClusterBucketNum(1024) // Bucket number (optional)
tablesIns := odpsIns.Tables()
schema := sb.Build()
println(schema.ToSQLString("test_cluster", "", true))
err = tablesIns.Create(schema, true, nil, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}
OSS外部テーブルの作成
OSS外部テーブルの詳細については、「OSS外部テーブルの作成」をご参照ください。
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
// create external table if not exists go_sdk_regression_testing.`testCreateExternalTableWithUserDefinedStorageHandler` (
// `a` STRING ,
// `b` STRING ,
// `c` BIGINT
//)
// comment 'External table using user defined TextStorageHandler'
// partitioned by (`dt` STRING)
// stored by 'com.aliyun.odps.udf.example.text.TextStorageHandler'
// with serdeproperties('odps.text.option.delimiter'='|', 'my.own.option'='value')
// location 'MOCKoss://full/uri/path/to/oss/directory/'
// lifecycle 10;
tableName := "testCreateExternalTableWithUserDefinedStorageHandler"
c1 := tableschema.Column{
Name: "a",
Type: datatype.StringType,
}
c2 := tableschema.Column{
Name: "b",
Type: datatype.StringType,
}
c3 := tableschema.Column{
Name: "c",
Type: datatype.BigIntType,
}
// partition column
pc := tableschema.Column{
Name: "dt",
Type: datatype.StringType,
}
sb := tableschema.NewSchemaBuilder()
sb.Name(tableName). // the table name
Columns(c1, c2, c3). // columns
PartitionColumns(pc). // partition columns
Location("oss://full/uri/path/to/oss/directory/").
StorageHandler("com.aliyun.odps.udf.example.text.TextStorageHandler").
Comment("External table using user defined TextStorageHandler").
Lifecycle(10)
tablesIns := odpsIns.Tables()
schema := sb.Build()
// define the properties mapping
serDeProperties := map[string]string{
"odps.text.option.delimiter": "|",
"my.own.option": "value",
}
// define the hints mapping
hints := map[string]string{
"odps.sql.preparse.odps2": "lot",
"odps.sql.planner.mode": "lot",
"odps.sql.planner.parser.odps2": "true",
"odps.sql.ddl.odps2": "true",
"odps.compiler.output.format": "lot,pot",
}
sql, err := schema.ToExternalSQLString(odpsIns.DefaultProjectName(), "", true, serDeProperties, nil)
print(sql)
err = tablesIns.CreateExternal(schema, true, serDeProperties, nil, hints, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}
テーブルリストの取得
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
ts := project.Tables()
ts.List(
func(t *odps.Table, err error) {
if err != nil {
log.Fatalf("%+v", err)
}
println(t.Name())
},
// Filter by table name prefix
odps.TableFilter.NamePrefix("all_type"),
// Filter by table type. Other table types include: VirtualView, ExternalTable
odps.TableFilter.Type(odps.ManagedTable),
)
}
シングルテーブル情報の取得
テーブルが存在するかどうかを確認する
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
ok, err := table.Exists()
if err != nil {
log.Fatalf("%+v", err)
}
println(ok)
}
テーブルサイズと行数を取得する
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get table size (in bytes)
size := table.Size()
println("size = ", size)
// Get table row count
rowCount := table.RecordNum()
println("rowCount = ", rowCount)
}
テーブルCreatedTime、LastDDLTime、ModifiedTimeを取得する
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get table creation time
createTime := table.CreatedTime()
println("create time = ", createTime)
// Get the time of the last DDL operation
lastDDLTime := table.LastDDLTime()
println("last ddl time = ", lastDDLTime)
// Get the last modification time of the table
lastModifiedTime := table.LastModifiedTime()
println("last modified time = ", lastModifiedTime)
}
テーブル所有者を取得
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get table Owner
owner := table.Owner()
println("owner is ", owner)
}
テーブルタイプを取得する
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get table type
t := table.Type()
println("type is ", t)
}
テーブル構造を取得するGet table structure
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
"strings"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("test_cluster_table")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// Get the table Schema
schema := table.Schema()
println("table name = ", schema.TableName)
if table.LifeCycle() > 0 {
println("table lifecycle = ", table.LifeCycle())
}
// Get columns
for _, c := range schema.Columns {
fmt.Printf("column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
}
// Get partition columns
for _, c := range schema.PartitionColumns {
fmt.Printf("partition column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
}
// Get the cluster information
if schema.ClusterInfo.ClusterType != "" {
ci := schema.ClusterInfo
println("cluster type = ", ci.ClusterType)
println("cluster columns = ", strings.Join(ci.ClusterCols, ", "))
println("cluster bucket num = ", ci.BucketNum)
}
}
テーブルの削除
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("test_cluster_table")
err = table.Delete()
if err != nil {
log.Fatalf("%+v", err)
}
}
パーティション管理
パーティション一覧の取得
MaxCompute SDKは、テーブルのすべてのパーティション値とパーティションオブジェクトのリストの取得をサポートしています。 パーティションオブジェクトには、サイズやlastModifiedTimeなどのパーティション情報が含まれます。
パーティションリストの値を取得するGet partition list values
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Get the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
partitionValues, err := table.GetPartitionValues()
if err != nil {
log.Fatalf("%+v", err)
}
for _, pv := range partitionValues {
println(pv)
}
}
パーティションオブジェクト一覧の取得Get partition object list
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
partitions, err := table.GetPartitions()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf("get %d partitions\n", len(partitions))
for _, p := range partitions {
fmt.Printf(
"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)
}
}
単一パーティション情報の取得
基本パーティション情報の取得
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
p, err := table.GetPartition("p1=20/p2=hangzhou")
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf(
"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)
}
拡張パーティション情報の取得
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
p, err := table.GetPartition("p1=20/p2=hangzhou")
if err != nil {
log.Fatalf("%+v", err)
}
// Get basic partition information
fmt.Printf(
"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)
// Get extended partition information
err = p.LoadExtended()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf(
"isArchived=%t, lifeCycle=%d, physicalSize=%d",
p.IsArchivedEx(), p.LifeCycleEx(), p.PhysicalSizeEx(),
)
}
パーティションの追加
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.AddPartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
if err != nil {
log.Fatalf("%+v", err)
}
}
パーティションの削除
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.DeletePartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
if err != nil {
log.Fatalf("%+v", err)
}
}
インスタンス管理
MaxComputeは、SQL実行後にインスタンスオブジェクトを返します。 このインスタンスオブジェクトはMaxComputeジョブを表し、実行ステータスと結果を追跡できます。
インスタンスリストの取得
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
"time"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
timeFormat := "2006-01-02 15:04:05"
startTime, _ := time.Parse(timeFormat, "2024-10-11 02:15:30")
endTime, _ := time.Parse(timeFormat, "2024-10-13 06:22:02")
var f = func(i *odps.Instance) {
if err != nil {
log.Fatalf("%+v", err)
}
println(
fmt.Sprintf(
"%s, %s, %s, %s, %s",
i.Id(), i.Owner(), i.StartTime().Format(timeFormat), i.EndTime().Format(timeFormat), i.Status(),
))
}
instances := odpsIns.Instances()
instances.List(
f,
odps.InstanceFilter.TimeRange(startTime, endTime),
odps.InstanceFilter.Status(odps.InstanceTerminated),
)
}
インスタンス情報の取得
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
ins := odpsIns.Instances().Get("<yourInstanceId>")
err = ins.Load()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf("owner=%s\n", ins.Owner())
fmt.Printf("status=%s\n", ins.Status())
fmt.Printf("startTime=%s\n", ins.StartTime())
fmt.Printf("endTime=%s\n", ins.EndTime())
fmt.Printf("result=%+v\n", ins.TaskResults())
}
権限管理
MaxComputeは、特定のコマンドによる権限管理を容易にします。 承認スキームの概要については、「概要」をご参照ください。 次の例は、DESC ROLE
コマンドを使用してロール関連情報を表示する方法を示しています。
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/security"
"log"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
var restClient = odpsIns.RestClient()
sm := security.NewSecurityManager(restClient, conf.ProjectName)
result, err := sm.RunQuery("desc role role_project_admin;", true, "")
if err != nil {
log.Fatalf("%+v", err)
}
println(fmt.Sprintf("ok: %s", result))
}
ログビュー
Logviewでは、送信されたMaxComputeジョブを表示し、デバッグテストを実行できます。 詳細については、「LogViewを使用したジョブ情報の表示」をご参照ください。
package main
import (
"log"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)
func main() {
// Specify the configuration file path
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// Set the default MaxCompute project
odpsIns.SetDefaultProjectName(conf.ProjectName)
sql := "select * from all_types_demo where p1>0 or p2 > '';"
// SQL engine parameters, for example, odps.sql.skewjoin
var hints map[string]string = nil
// Create a SqlTask
sqlTask := odps.NewSqlTask("select", sql, hints)
// Run SQL by using the quota associated with the project
project := odpsIns.DefaultProjectName()
ins, err := sqlTask.Run(odpsIns, project)
if err != nil {
log.Fatalf("%+v", err)
}
logView, err := odpsIns.LogView().GenerateLogView(ins, 1)
if err != nil {
log.Fatalf("%+v", err)
}
println(logView)
}