全部產品
Search
文件中心

Object Storage Service:Go分區上傳

更新時間:Mar 13, 2025

OSS提供的分區上傳(Multipart Upload)功能,將要上傳的較大檔案(Object)分成多個分區(Part)來分別上傳,上傳完成後再調用CompleteMultipartUpload介面將這些Part組合成一個Object。

注意事項

  • 本文範例程式碼以華東1(杭州)的地區IDcn-hangzhou為例,預設使用外網Endpoint,如果您希望通過與OSS同地區的其他阿里雲產品訪問OSS,請使用內網Endpoint。關於OSS支援的Region與Endpoint的對應關係,請參見OSS地區和訪問網域名稱

  • 本文以從環境變數讀取存取憑證為例。如何配置訪問憑證,請參見配置訪問憑證

  • 要分區上傳,您必須有oss:PutObject許可權。具體操作,請參見為RAM使用者授權自訂的權限原則

分區上傳流程

分區上傳(Multipart Upload)分為以下三個步驟:

  1. 初始化一個分區上傳事件。

    調用Client.InitiateMultipartUpload方法返回OSS建立的全域唯一的uploadID。

  2. 上傳分區。

    調用Client.UploadPart方法上傳分區資料。

    說明
    • 對於同一個uploadID,分區號(partNumber)標識了該分區在整個檔案內的相對位置。如果使用同一個分區號上傳了新的資料,那麼OSS上該分區已有的資料將會被覆蓋。

    • OSS將收到的分區資料的MD5值放在ETag頭內返回給使用者。

    • OSS計算上傳資料的MD5值,並與SDK計算的MD5值比較,如果不一致則返回InvalidDigest錯誤碼。

  3. 完成分區上傳。

    所有分區上傳完成後,調用Client.CompleteMultipartUpload方法將所有分區合并成完整的檔案。

範例程式碼

以下代碼展示如何將本地的大檔案分割成多個分區檔案並發上傳到儲存空間,然後合并成完整的檔案對象。

package main

import (
	"bufio"
	"bytes"
	"context"
	"flag"
	"io"
	"log"
	"os"
	"sync"

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

// 定義全域變數
var (
	region     string // 儲存地區
	bucketName string // 源儲存空間名稱
	objectName string // 來源物件名稱

)

// init函數用於初始化命令列參數
func init() {
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	flag.StringVar(&bucketName, "bucket", "", "The name of the source bucket.")
	flag.StringVar(&objectName, "object", "", "The name of the source object.")
}

func main() {
	// 解析命令列參數
	flag.Parse()

	// 定義上傳ID
	var uploadId string

	// 檢查源儲存空間名稱是否為空白
	if len(bucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source bucket name required")
	}

	// 檢查儲存地區是否為空白
	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required")
	}

	// 檢查來源物件名稱是否為空白
	if len(objectName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source object name required")
	}

	// 載入預設配置並設定憑證提供者和地區
	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	// 建立OSS用戶端
	client := oss.NewClient(cfg)

	// 初始化分區上傳請求
	initRequest := &oss.InitiateMultipartUploadRequest{
		Bucket: oss.Ptr(bucketName),
		Key:    oss.Ptr(objectName),
	}
	initResult, err := client.InitiateMultipartUpload(context.TODO(), initRequest)
	if err != nil {
		log.Fatalf("failed to initiate multipart upload %v", err)
	}

	// 列印初始化分區上傳的結果
	log.Printf("initiate multipart upload result:%#v\n", *initResult.UploadId)
	uploadId = *initResult.UploadId

	// 初始化等待組和互斥鎖
	var wg sync.WaitGroup
	var parts []oss.UploadPart
	count := 3
	var mu sync.Mutex

	// 讀取本地檔案內容到記憶體,將yourLocalFile替換為實際的本地檔案名稱和路徑
	file, err := os.Open("yourLocalFile")
	if err != nil {
		log.Fatalf("failed to open local file %v", err)
	}
	defer file.Close()

	bufReader := bufio.NewReader(file)
	content, err := io.ReadAll(bufReader)
	if err != nil {
		log.Fatalf("failed to read local file %v", err)
	}
	log.Printf("file size: %d\n", len(content))

	// 計算每個分區的大小
	chunkSize := len(content) / count
	if chunkSize == 0 {
		chunkSize = 1
	}

	// 啟動多個goroutine進行分區上傳
	for i := 0; i < count; i++ {
		start := i * chunkSize
		end := start + chunkSize
		if i == count-1 {
			end = len(content)
		}

		wg.Add(1)
		go func(partNumber int, start, end int) {
			defer wg.Done()

			// 建立分區上傳請求
			partRequest := &oss.UploadPartRequest{
				Bucket:     oss.Ptr(bucketName),                 // 目標儲存空間名稱
				Key:        oss.Ptr(objectName),                 // 目標對象名稱
				PartNumber: int32(partNumber),                   // 分區編號
				UploadId:   oss.Ptr(uploadId),                   // 上傳ID
				Body:       bytes.NewReader(content[start:end]), // 分區內容
			}

			// 發送分區上傳請求
			partResult, err := client.UploadPart(context.TODO(), partRequest)
			if err != nil {
				log.Fatalf("failed to upload part %d: %v", partNumber, err)
			}

			// 記錄分區上傳結果
			part := oss.UploadPart{
				PartNumber: partRequest.PartNumber,
				ETag:       partResult.ETag,
			}

			// 使用互斥鎖保護共用資料
			mu.Lock()
			parts = append(parts, part)
			mu.Unlock()
		}(i+1, start, end)
	}

	// 等待所有goroutine完成
	wg.Wait()

	// 完成分區上傳請求
	request := &oss.CompleteMultipartUploadRequest{
		Bucket:   oss.Ptr(bucketName),
		Key:      oss.Ptr(objectName),
		UploadId: oss.Ptr(uploadId),
		CompleteMultipartUpload: &oss.CompleteMultipartUpload{
			Parts: parts,
		},
	}
	result, err := client.CompleteMultipartUpload(context.TODO(), request)
	if err != nil {
		log.Fatalf("failed to complete multipart upload %v", err)
	}

	// 列印完成分區上傳的結果
	log.Printf("complete multipart upload result:%#v\n", result)
}

常見使用情境

將指定長度的隨機字串進行分區上傳

以下代碼展示了將400kb的隨機字串分割成3個分區檔案並發上傳到儲存空間,然後合并成完整的檔案對象。

package main

import (
	"bufio"
	"context"
	"flag"
	"io"
	"log"
	"math/rand"
	"strings"
	"sync"
	"time"

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

// 定義全域變數
var (
	region     string                                                                     // 儲存地區
	bucketName string                                                                     // 儲存空間名稱
	objectName string                                                                     // 對象名稱
	letters    = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") // 用於產生隨機字串的字元集
)

// init函數用於初始化命令列參數
func init() {
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	flag.StringVar(&bucketName, "bucket", "", "The name of the bucket.")
	flag.StringVar(&objectName, "object", "", "The name of the object.")
}

func main() {
	// 解析命令列參數
	flag.Parse()

	// 定義上傳ID
	var uploadId string

	// 檢查bucket名稱是否為空白
	if len(bucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, bucket name required")
	}

	// 檢查region是否為空白
	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required")
	}

	// 檢查object名稱是否為空白
	if len(objectName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, object name required")
	}

	// 載入預設配置並設定憑證提供者和地區
	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	// 建立OSS用戶端
	client := oss.NewClient(cfg)

	// 建立初始化分區上傳的請求
	initRequest := &oss.InitiateMultipartUploadRequest{
		Bucket: oss.Ptr(bucketName), // 儲存空間名稱
		Key:    oss.Ptr(objectName), // 對象名稱
	}

	// 執行初始化分區上傳的操作並處理結果
	initResult, err := client.InitiateMultipartUpload(context.TODO(), initRequest)
	if err != nil {
		log.Fatalf("failed to initiate multipart upload %v", err)
	}

	// 列印初始化分區上傳的結果
	log.Printf("initiate multipart upload result:%#v\n", initResult)
	uploadId = *initResult.UploadId

	// 初始化等待組和互斥鎖
	var wg sync.WaitGroup
	var parts []oss.UploadPart
	count := 3
	body := randBody(400000) // 產生400KB的隨機字串
	reader := strings.NewReader(body)
	bufReader := bufio.NewReader(reader)
	content, _ := io.ReadAll(bufReader)
	partSize := len(body) / count
	var mu sync.Mutex

	// 啟動多個goroutine進行分區上傳
	for i := 0; i < count; i++ {
		wg.Add(1)
		go func(partNumber int, partSize int, i int) {
			defer wg.Done()

			// 建立分區上傳請求
			partRequest := &oss.UploadPartRequest{
				Bucket:     oss.Ptr(bucketName),                                             // 儲存空間名稱
				Key:        oss.Ptr(objectName),                                             // 對象名稱
				PartNumber: int32(partNumber),                                               // 分區編號
				UploadId:   oss.Ptr(uploadId),                                               // 上傳ID
				Body:       strings.NewReader(string(content[i*partSize : (i+1)*partSize])), // 分區內容
			}

			// 發送分區上傳請求
			partResult, err := client.UploadPart(context.TODO(), partRequest)
			if err != nil {
				log.Fatalf("failed to upload part %d: %v", partNumber, err)
			}

			// 記錄分區上傳結果
			part := oss.UploadPart{
				PartNumber: partRequest.PartNumber,
				ETag:       partResult.ETag,
			}

			// 使用互斥鎖保護共用資料
			mu.Lock()
			parts = append(parts, part)
			mu.Unlock()
		}(i+1, partSize, i)
	}

	// 等待所有goroutine完成
	wg.Wait()

	// 列印分區上傳成功的訊息
	log.Println("upload part success!")

	// 建立完成分區上傳請求
	request := &oss.CompleteMultipartUploadRequest{
		Bucket:   oss.Ptr(bucketName),
		Key:      oss.Ptr(objectName),
		UploadId: oss.Ptr(uploadId),
		CompleteMultipartUpload: &oss.CompleteMultipartUpload{
			Parts: parts,
		},
	}

	// 完成分區上傳並處理結果
	result, err := client.CompleteMultipartUpload(context.TODO(), request)
	if err != nil {
		log.Fatalf("failed to complete multipart upload %v", err)
	}
	log.Printf("complete multipart upload result:%#v\n", result)
}

// randBody產生指定長度的隨機字串
func randBody(n int) string {
	b := make([]rune, n)
	randMarker := rand.New(rand.NewSource(time.Now().UnixNano()))
	for i := range b {
		b[i] = letters[randMarker.Intn(len(letters))]
	}
	return string(b)
}

取消指定的分區上傳事件

當您遇到以下情境時,可以使用Client.AbortMultipartUpload方法取消分區上傳事件。

  1. 檔案出錯

    • 如果在上傳過程中發現檔案有錯誤,如檔案損壞或包含惡意代碼,您可以選擇取消上傳以避免潛在的風險。

  2. 網路不穩定

    • 當網路連接不穩定或中斷時,可能會導致上傳過程中的分區丟失或損壞,您可以選擇取消上傳並重新開始,以確保資料的完整性和一致性。

  3. 資源限制

    • 當您的儲存空間有限,而上傳的檔案過大,您可以取消上傳以釋放儲存資源,將資源分派給其他更重要的任務。

  4. 誤操作:

    • 當不小心啟動了一個不必要的上傳任務,或者上傳了一個錯誤的檔案版本,您可以取消此次上傳事件

package main

import (
	"context"
	"flag"
	"log"

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

// 定義全域變數
var (
	region     string // 儲存地區
	bucketName string // 源儲存空間名稱
	objectName string // 來源物件名稱

)

// init函數用於初始化命令列參數
func init() {
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	flag.StringVar(&bucketName, "bucket", "", "The name of the source bucket.")
	flag.StringVar(&objectName, "object", "", "The name of the source object.")
}

func main() {
	// 解析命令列參數
	flag.Parse()

	// 定義上傳ID
	var uploadId string

	// 檢查源儲存空間名稱是否為空白
	if len(bucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source bucket name required")
	}

	// 檢查儲存地區是否為空白
	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required")
	}

	// 檢查來源物件名稱是否為空白
	if len(objectName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source object name required")
	}

	// 載入預設配置並設定憑證提供者和地區
	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	// 建立OSS用戶端
	client := oss.NewClient(cfg)

	// 初始化分區上傳請求
	initRequest := &oss.InitiateMultipartUploadRequest{
		Bucket: oss.Ptr(bucketName),
		Key:    oss.Ptr(objectName),
	}

	// 執行初始化分區上傳請求
	initResult, err := client.InitiateMultipartUpload(context.TODO(), initRequest)
	if err != nil {
		log.Fatalf("failed to initiate multipart upload %v", err)
	}

	// 列印初始化分區上傳的結果
	log.Printf("initiate multipart upload result:%#v\n", *initResult.UploadId)
	uploadId = *initResult.UploadId

	// 建立一個AbortMultipartUploadRequest請求
	request := &oss.AbortMultipartUploadRequest{
		Bucket:   oss.Ptr(bucketName), // 儲存空間名稱
		Key:      oss.Ptr(objectName), // 對象名稱
		UploadId: oss.Ptr(uploadId),   // 上傳uploadId
	}
	// 執行請求並處理結果
	result, err := client.AbortMultipartUpload(context.TODO(), request)
	if err != nil {
		log.Fatalf("failed to abort multipart upload %v", err)
	}
	log.Printf("abort multipart upload result:%#v\n", result)

}

列舉指定的分區上傳事件中已經成功上傳的分區

當您遇到以下情境時,可以使用Client.NewListPartsPaginator分頁器列舉某個分區上傳事件中已經成功上傳的分區。

監控上傳進度:

  1. 大型檔案上傳

    • 當上傳非常大的檔案時,您通過列舉已上傳的分區,確保上傳過程按照預期進行,並及時發現問題。

  2. 斷點續傳

    • 在網路不穩定或上傳過程中發生中斷時,您可以通過查看已上傳的分區來決定是否需要重試上傳未完成的部分,從而實現斷點續傳。

  3. 故障排除

    • 如果上傳過程中出現錯誤,通過檢查已上傳的分區,您可以快速定位問題所在,比如某個特定分區上傳失敗,然後針對性地解決問題。

  4. 資源管理

    • 對於需要嚴格控制資源使用方式的情境,通過監控上傳進度,可以更好地管理儲存空間和頻寬資源,確保資源的有效利用。

package main

import (
	"context"
	"flag"
	"log"

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

// 定義全域變數
var (
	region     string // 儲存地區
	bucketName string // 源儲存空間名稱
	objectName string // 來源物件名稱

)

// init函數用於初始化命令列參數
func init() {
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	flag.StringVar(&bucketName, "bucket", "", "The name of the source bucket.")
	flag.StringVar(&objectName, "object", "", "The name of the source object.")
}

func main() {
	// 解析命令列參數
	flag.Parse()

	// 定義上傳ID
	var uploadId string

	// 檢查源儲存空間名稱是否為空白
	if len(bucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source bucket name required")
	}

	// 檢查儲存地區是否為空白
	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required")
	}

	// 檢查來源物件名稱是否為空白
	if len(objectName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source object name required")
	}

	// 載入預設配置並設定憑證提供者和地區
	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	// 建立OSS用戶端
	client := oss.NewClient(cfg)

	// 初始化分區上傳請求
	initRequest := &oss.InitiateMultipartUploadRequest{
		Bucket: oss.Ptr(bucketName),
		Key:    oss.Ptr(objectName),
	}

	// 執行初始化分區上傳請求
	initResult, err := client.InitiateMultipartUpload(context.TODO(), initRequest)
	if err != nil {
		log.Fatalf("failed to initiate multipart upload %v", err)
	}

	// 列印初始化分區上傳的結果
	log.Printf("initiate multipart upload result:%#v\n", *initResult.UploadId)
	uploadId = *initResult.UploadId

	// 建立列出分區的請求
	request := &oss.ListPartsRequest{
		Bucket:   oss.Ptr(bucketName), // 儲存空間名稱
		Key:      oss.Ptr(objectName), // 對象名稱
		UploadId: oss.Ptr(uploadId),   // 上傳uploadId
	}

	// 建立分頁器
	p := client.NewListPartsPaginator(request)

	// 初始化頁碼計數器
	var i int
	log.Println("List Parts:")

	// 遍曆分頁器中的每一頁
	for p.HasNext() {
		i++

		// 擷取下一頁的資料
		page, err := p.NextPage(context.TODO())
		if err != nil {
			log.Fatalf("failed to get page %v, %v", i, err)
		}

		// 列印該頁中的每個分區的資訊
		for _, part := range page.Parts {
			log.Printf("Part Number: %v, ETag: %v, Last Modified: %v, Size: %v, HashCRC64: %v\n",
				part.PartNumber,
				oss.ToString(part.ETag),
				oss.ToTime(part.LastModified),
				part.Size,
				oss.ToString(part.HashCRC64))
		}
	}

}

列舉分區上傳事件

當您遇到以下情境時,可以使用Client.NewListMultipartUploadsPaginator分頁器列舉某個儲存空間所有進行中的分區上傳事件。

監控情境:

  1. 批量檔案上傳管理

    • 當需要上傳大量檔案時,為了確保所有檔案都能正確完成分區上傳,您可以使用ListMultipartUploads方法來即時監控所有的分區上傳活動。

  2. 故障檢測與恢複

    • 在上傳過程中如果遇到網路問題或其他故障,可能導致部分分區未能成功上傳。通過監控進行中中的分區上傳事件,可以及時發現這些問題,並採取措施恢複上傳。

  3. 資源最佳化與管理

    • 在大規模的檔案上傳過程中,監控進行中中的分區上傳事件可以協助最佳化資源分派,例如根據上傳進度調整頻寬使用或最佳化上傳策略。

  4. 資料移轉:

    • 在進行大規模的資料移轉專案時,監控所有進行中的分區上傳事件可以確保遷移任務的順利進行,及時發現並解決任何潛在的問題。

參數設定

參數

說明

Delimiter

用於對Object名字進行分組的字元。所有名字包含指定的首碼且第一次出現Delimiter字元之間的Object作為一組元素。

MaxUploads

限定此次返回分區上傳事件的最大數目,預設值和最大值均為1000。

KeyMarker

所有檔案名稱的字母序大於KeyMarker參數值的分區上傳事件,可以與UploadIDMarker參數一同使用來指定返回結果的起始位置。

Prefix

限定返回的檔案名稱必須以指定的Prefix作為首碼。注意使用Prefix查詢時,返回的檔案名稱中仍會包含Prefix。

UploadIDMarker

與KeyMarker參數一同使用來指定返回結果的起始位置。

  • 如果KeyMarker參數未設定,則OSS忽略該參數。

  • 如果KeyMarker參數被設定,查詢結果中包含:

    • 所有Object名字的字典序大於KeyMarker參數值的分區上傳事件。

    • Object名字等於KeyMarker參數值,但是UploadID比UploadIDMarker參數值大的分區上傳事件。

  • 指定首碼為file且最多返回100條結果資料

    package main
    
    import (
    	"context"
    	"flag"
    	"log"
    
    	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
    	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
    )
    
    // 定義全域變數
    var (
    	region     string // 儲存地區
    	bucketName string // 源儲存空間名稱
    	objectName string // 來源物件名稱
    
    )
    
    // init函數用於初始化命令列參數
    func init() {
    	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
    	flag.StringVar(&bucketName, "bucket", "", "The name of the source bucket.")
    	flag.StringVar(&objectName, "object", "", "The name of the source object.")
    }
    
    func main() {
    	// 解析命令列參數
    	flag.Parse()
    
    	// 檢查源儲存空間名稱是否為空白
    	if len(bucketName) == 0 {
    		flag.PrintDefaults()
    		log.Fatalf("invalid parameters, source bucket name required")
    	}
    
    	// 檢查儲存地區是否為空白
    	if len(region) == 0 {
    		flag.PrintDefaults()
    		log.Fatalf("invalid parameters, region required")
    	}
    
    	// 檢查來源物件名稱是否為空白
    	if len(objectName) == 0 {
    		flag.PrintDefaults()
    		log.Fatalf("invalid parameters, source object name required")
    	}
    
    	// 載入預設配置並設定憑證提供者和地區
    	cfg := oss.LoadDefaultConfig().
    		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
    		WithRegion(region)
    
    	// 建立OSS用戶端
    	client := oss.NewClient(cfg)
    
    	// 建立列出分區上傳的請求
    	request := &oss.ListMultipartUploadsRequest{
    		Bucket:     oss.Ptr(bucketName), // 儲存空間名稱
    		MaxUploads: 100,                 // 指定最多返回100條結果資料
    		Prefix:     oss.Ptr("file"),     // 指定首碼為file
    	}
    
    	// 建立分頁器
    	p := client.NewListMultipartUploadsPaginator(request)
    
    	var i int
    	log.Println("List Multipart Uploads:")
    
    	// 遍曆分頁器中的每一頁
    	for p.HasNext() {
    		i++
    
    		// 擷取下一頁的資料
    		page, err := p.NextPage(context.TODO())
    		if err != nil {
    			log.Fatalf("failed to get page %v, %v", i, err)
    		}
    
    		// 列印該頁中的每個分區上傳的資訊
    		for _, u := range page.Uploads {
    			log.Printf("Upload key: %v, upload id: %v, initiated: %v\n", oss.ToString(u.Key), oss.ToString(u.UploadId), oss.ToTime(u.Initiated))
    		}
    	}
    
    }
    

分區上傳時設定回呼函數

以下代碼實現了將一個 400KB 大小的檔案分割成 3 個片段並發上傳到阿里雲 OSS,上傳完成後將這些片段合并成完整的檔案對象,並在合并完成後觸發回調通知。

package main

import (
	"bufio"
	"context"
	"encoding/base64"
	"encoding/json"
	"flag"
	"io"
	"log"
	"math/rand"
	"strings"
	"sync"
	"time"

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

var (
	region     string
	bucketName string
	objectName string
	letters    = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
)

func init() {
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	flag.StringVar(&bucketName, "bucket", "", "The name of the bucket.")
	flag.StringVar(&objectName, "object", "", "The name of the object.")
}

func main() {
	flag.Parse()
	if len(bucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, bucket name required")
	}

	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required")
	}

	if len(objectName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, object name required")
	}

	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	client := oss.NewClient(cfg)

	initRequest := &oss.InitiateMultipartUploadRequest{
		Bucket: oss.Ptr(bucketName),
		Key:    oss.Ptr(objectName),
	}
	initResult, err := client.InitiateMultipartUpload(context.TODO(), initRequest)

	// 定義回調參數
	callbackMap := map[string]string{
		"callbackUrl":      "https://example.com:23450",                                                                  // 設定回調伺服器的URL,例如https://example.com:23450。
		"callbackBody":     "bucket=${bucket}&object=${object}&size=${size}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}", // 設定回調請求體。
		"callbackBodyType": "application/x-www-form-urlencoded",                                                          //設定回調請求體類型。
	}

	// 將回調參數轉換為JSON並進行Base64編碼,以便將其作為回調參數傳遞
	callbackStr, err := json.Marshal(callbackMap)
	if err != nil {
		log.Fatalf("failed to marshal callback map: %v", err)
	}
	callbackBase64 := base64.StdEncoding.EncodeToString(callbackStr)

	callbackVarMap := map[string]string{}
	callbackVarMap["x:my_var1"] = "thi is var 1"
	callbackVarMap["x:my_var2"] = "thi is var 2"
	callbackVarStr, err := json.Marshal(callbackVarMap)
	if err != nil {
		log.Fatalf("failed to marshal callback var: %v", err)
	}
	callbackVarBase64 := base64.StdEncoding.EncodeToString(callbackVarStr)

	var wg sync.WaitGroup
	var parts []oss.UploadPart
	count := 3
	body := randBody(400000)
	reader := strings.NewReader(body)
	bufReader := bufio.NewReader(reader)
	content, _ := io.ReadAll(bufReader)
	partSize := len(body) / count
	var mu sync.Mutex
	for i := 0; i < count; i++ {
		wg.Add(1)
		go func(partNumber int, partSize int, i int) {
			defer wg.Done()
			partRequest := &oss.UploadPartRequest{
				Bucket:     oss.Ptr(bucketName),
				Key:        oss.Ptr(objectName),
				PartNumber: int32(partNumber),
				UploadId:   initResult.UploadId,
				Body:       strings.NewReader(string(content[i*partSize : (i+1)*partSize])),
			}
			partResult, err := client.UploadPart(context.TODO(), partRequest)
			if err != nil {
				log.Fatalf("failed to upload part %d: %v", partNumber, err)
			}
			part := oss.UploadPart{
				PartNumber: partRequest.PartNumber,
				ETag:       partResult.ETag,
			}
			mu.Lock()
			parts = append(parts, part)
			mu.Unlock()
		}(i+1, partSize, i)
	}
	wg.Wait()

	request := &oss.CompleteMultipartUploadRequest{
		Bucket:   oss.Ptr(bucketName),
		Key:      oss.Ptr(objectName),
		UploadId: initResult.UploadId,
		CompleteMultipartUpload: &oss.CompleteMultipartUpload{
			Parts: parts,
		},
		Callback:    oss.Ptr(callbackBase64), // 填寫回調參數
		CallbackVar: oss.Ptr(callbackVarBase64),
	}
	result, err := client.CompleteMultipartUpload(context.TODO(), request)
	if err != nil {
		log.Fatalf("failed to complete multipart upload %v", err)
	}
	log.Printf("complete multipart upload result:%#v\n", result)
}

func randBody(n int) string {
	b := make([]rune, n)
	randMarker := rand.New(rand.NewSource(time.Now().UnixNano()))
	for i := range b {
		b[i] = letters[randMarker.Intn(len(letters))]
	}
	return string(b)
}

相關文檔

  • 關於分區上傳的完整範例程式碼,請參見GitHub樣本

  • 分區上傳的完整實現涉及三個API介面,詳情如下:

  • 關於取消分區上傳事件的API介面說明,請參見AbortMultipartUpload

  • 關於列舉已上傳分區的API介面說明,請參見NewListPartsPaginator

  • 關於列舉所有執行中的分區上傳事件(即已初始化但尚未完成或已取消的分區上傳事件)的API介面說明,請參見NewListMultipartUploadsPaginator