All Products
Search
Document Center

:Sample code for implementing multipart upload for Dataflow SDK

Last Updated:Feb 10, 2023

Signature overview

The Java SDK for OSS provides a sample of multipart upload. The specific code is used to implement multipart upload of local files. However, in the actual use process, the server may obtain the network flow data and use it for multipart upload. This method needs to save the network flow data as a local file before uploading the file, which will involve the re-upload step of intermediate storage, that is, non-real-time upload. This topic provides a sample implementation of multipart upload in Dataflow.

Details

In a data stream multipart upload, you need to perform a clone operation on the data stream, where inputstream cannot be read repeatedly. The following is a sample code for data stream multipart upload.

Note

Note: The cloneInputStream in the code is the implementation of copying data streams.

import java.io.ByteArrayInputStream; 
import java.io.ByteArrayOutputStream; 
import java.io.File; 
import java.io.FileInputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.util.ArrayList; 
import java.util.List; 

import com.aliyun.oss.OSS; 
import com.aliyun.oss.OSSClientBuilder; 
import com.aliyun.oss.model.CompleteMultipartUploadRequest; 
import com.aliyun.oss.model.CompleteMultipartUploadResult; 
import com.aliyun.oss.model.InitiateMultipartUploadRequest; 
import com.aliyun.oss.model.InitiateMultipartUploadResult; 
import com.aliyun.oss.model.PartETag; 
import com.aliyun.oss.model.UploadPartRequest; 
import com.aliyun.oss.model.UploadPartResult; 

public class MutilpartUpload {
public static void main(String[] args) {
// Endpoint Take Hangzhou as an example. Please fill in other regions as needed. 
String endpoint = "http://oss-cn-hangzhou.aliyuncs.com"; 
// The AccessKey of the Alibaba Cloud account has access to all APIs, which is risky. We recommend that you use your Resource Access Management (RAM) user's credentials to call API operations or perform routine operations and maintenance. To create a RAM user, log on to the RAM console. 
String accessKeyId = "xxx"; 
String accessKeySecret = "xxx"; 
String bucketName = "dahecs1"; 
// <yourObjectName> indicates that you must specify the complete path including the file suffix when you upload an object to OSS, for example, abc/efg/123.jpg. 
String objectName = "11/aaaatest11123"; 

String filePath = "/Users/wanghe/Downloads/aaaatest111"; 

// Create an OSSClient instance. 
OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret); 

// Create a InitiateMultipartUploadRequest object. 
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, objectName); 

// To set the file storage type when initializing a part, see the following sample code. 
// ObjectMetadata metadata = new ObjectMetadata(); 
// metadata.setHeader(OSSHeaders.OSS_STORAGE_CLASS, StorageClass.Standard.toString()); 
// request.setObjectMetadata(metadata); 

// Initialize sharding. 
InitiateMultipartUploadResult upresult = ossClient.initiateMultipartUpload(request); 
// Returns uploadId, which is the unique identifier of a multipart upload event. You can initiate related operations based on this uploadId, such as canceling multipart upload and querying multipart upload. 
String uploadId = upresult.getUploadId(); 

// partETags is a collection of PartETags. A PartETag consists of an ETag and a part number. 
List<PartETag> partETags = new ArrayList<PartETag>(); 
// Calculate how many parts the file has. 
try {
InputStream instream = new FileInputStream(filePath);

final long partSize = 1 * 1024 * 1024L; // 1MB
final File sampleFile = new File(filePath);
long fileLength1 = sampleFile.length();
long fileLength = instream.available();
int partCount = (int) (fileLength / partSize);
if (fileLength % partSize !=0) {
partCount++;
} 
// Traverse multipart upload. 
ByteArrayOutputStream baos = cloneInputStream(instream);
//InputStream stream2 = null;
for (int i = 0; i < partCount; i++) {
System.out.println("i= "+i);
long startPos = i * partSize;
long curPartSize = (i + 1 == partCount) ? (fileLength - startPos) : partSize; 
InputStream stream1 = new ByteArrayInputStream(baos.toByteArray()); 
System.out.println("length" + stream1.available()); 
//InputStream instream = new FileInputStream(sampleFile); 
// Skip the uploaded parts. 
stream1.skip(startPos); 
UploadPartRequest uploadPartRequest = new UploadPartRequest(); 
uploadPartRequest.setBucketName(bucketName); 
uploadPartRequest.setKey(objectName); 
uploadPartRequest.setUploadId(uploadId); 
uploadPartRequest.setInputStream(stream1); 
// Set the shard size. Each part except for the last part must be larger than 100 KB in size. 
uploadPartRequest.setPartSize(curPartSize); 
// Set the part number. Each part is configured with a part number. The number ranges from 1 to 10000. If you configure a number beyond the range, OSS returns an InvalidArgument error code. 
uploadPartRequest.setPartNumber( i +1); 
// Each part does not need to be uploaded in sequence, and can even be uploaded on a different client. OSS sorts the part number to form a complete file. 
UploadPartResult uploadPartResult = ossClient.uploadPart(uploadPartRequest); 
// After each part upload, the result returned by OSS contains PartETag. The PartETag is stored in partETags. 
partETags.add(uploadPartResult.getPartETag());
if (stream1 !=null) {
try {
stream1.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} 


// Create a CompleteMultipartUploadRequest object. 
// To complete the multipart upload operation, you must provide all valid partETags. After OSS receives the partETags, OSS verifies the validity of all parts one by one. After all parts are verified, OSS combines these parts into a complete object. 
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, objectName, uploadId, partETags); 

// If you need to set the object access permission at the same time as the object is uploaded, see the following sample code. 
// completeMultipartUploadRequest.setObjectACL(CannedAccessControlList.PublicRead); 

// Complete the upload. 
CompleteMultipartUploadResult completeMultipartUploadResult = ossClient.completeMultipartUpload(completeMultipartUploadRequest); 
baos.close(); 
// Disable OSSClient. 
ossClient.shutdown();

} catch (IOException e) {
e.printStackTrace();
}
}

private static ByteArrayOutputStream cloneInputStream(InputStream input) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int len;
while ((len = input.read(buffer)) > -1) {
baos.write(buffer, 0, len);
}
baos.flush();
return baos;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}

}



































































































































References

For more information about multipart upload in OSS, see Multipart upload.

applicable to

  • OSS