Function Compute dapat mengotomatiskan dekompresi file yang diunggah ke Object Storage Service (OSS). Saat file ZIP yang memenuhi aturan tertentu diunggah, proses ini secara otomatis memicu Function Compute yang telah ditentukan sebelumnya. Setelah dekompresi, file yang diekstraksi akan diunggah ke direktori yang ditentukan di OSS.
Catatan penggunaan
Gunakan pengkodean UTF-8 atau GB2312 untuk nama file atau folder guna menghindari teks berantakan dan gangguan selama dekompresi.
Untuk file Arsip atau Penyimpanan Arsip Dingin, pulihkan file terlebih dahulu sebelum mendekompresinya.
Waktu maksimum untuk mendekompresi paket ZIP adalah 2 jam. Tugas yang melebihi batas waktu ini akan gagal.
Jaga ukuran file tunggal dalam paket ZIP di bawah 1 GB untuk menghindari kegagalan dekompresi.
Atur batas waktu fungsi lebih dari 2 jam, tetapi tidak lebih dari 24 jam.
Buat fungsi Anda di wilayah yang sama dengan bucket OSS terkait.
Prasyarat
Aktifkan Function Compute. Untuk informasi lebih lanjut, lihat Prasyarat.
Aktifkan OSS dan buat bucket. Untuk informasi lebih lanjut, lihat Mulai Menggunakan Konsol OSS.
Prosedur
Langkah 1: Buat fungsi
Masuk ke Konsol Function Compute. Di panel navigasi sisi kiri, klik Functions.
Di bilah navigasi atas, pilih wilayah. Pada halaman Functions, klik Create Function.
Di halaman Create Function, pilih metode pembuatan fungsi, konfigurasikan item yang diperlukan, lalu klik Create.
Item konfigurasi utama dirinci sebagai berikut. Untuk detail tentang item lainnya, lihat Buat Fungsi.
Runtime: Pilih Python 3.10 sebagai runtime fungsi Anda.
Function Role: Pilih peran yang ada atau buat yang baru untuk memastikan fungsi memiliki izin yang diperlukan untuk mengakses bucket OSS. Lampirkan kebijakan AliyunOSSFullAccess ke peran ini.
Pada halaman Detail Fungsi dari fungsi yang baru dibuat, klik tab Code. Pada tab Code, buat file kode dan tulis kode Anda di editor kode, lalu klik Deploy.
Contoh kode adalah sebagai berikut.
index.pyfileKlik untuk melihat contoh kode
# -*- coding: utf-8 -*- ''' Pernyataan: Fungsi ini memberi nama dan mengkodekan file dan folder sebagai berikut: 1. Untuk MAC/Linux, pengkodean UTF-8 digunakan secara default. 2. Untuk Windows, pengkodean GB2312 atau UTF-8 digunakan secara default. Untuk pengkodean lainnya, deteksi pengkodean dilakukan menggunakan library chardet; namun, akurasi 100% tidak dijamin. Tulis ulang fungsi ini hanya jika diperlukan, dan pastikan itu lolos debugging. ''' import helper import oss2 import json import os import time import logging import chardet """ Saat objek dengan prefix source/ ditempatkan di bucket OSS, diharapkan objek tersebut akan didekompresi dan kemudian disimpan di bucket sebagai processed/ prefixed. Misalnya, source/a.zip akan diproses sebagai processed/a/... "Source /", "processed/" dapat diubah sesuai kebutuhan. """ # Matikan info log yang dicetak oleh SDK OSS logging.getLogger("oss2.api").setLevel(logging.ERROR) logging.getLogger("oss2.auth").setLevel(logging.ERROR) LOGGER = logging.getLogger() # Dekorator untuk mencetak waktu eksekusi suatu fungsi def print_excute_time(func): def wrapper(*args, **kwargs): local_time = time.time() ret = func(*args, **kwargs) LOGGER.info('current Function [%s] excute time is %.2f' % (func.__name__, time.time() - local_time)) return ret return wrapper def get_zipfile_name(origin_name): # Mengatasi masalah teks Cina yang berantakan name = origin_name try: name_bytes = origin_name.encode(encoding="cp437") except: name_bytes = origin_name.encode(encoding="utf-8") # Jika string yang akan dideteksi cukup panjang, hasil deteksi lebih akurat detect = chardet.detect(name_bytes) confidence = detect["confidence"] detect_encoding = detect["encoding"] if confidence > 0.75 and (detect_encoding.lower() in ["gb2312", "gbk", "gb18030", "ascii", "utf-8"]): try: if detect_encoding.lower() in ["gb2312", "gbk", "gb18030"]: detect_encoding = "gb18030" name = name_bytes.decode(detect_encoding) except: name = name_bytes.decode(encoding="gb18030") else: try: name = name_bytes.decode(encoding="gb18030") except: name = name_bytes.decode(encoding="utf-8") # Perbaiki windows \\ sebagai segmen direktori name = name.replace("\\", "/") return name @print_excute_time def handler(event, context): """ Objek dari OSS akan didekompresi secara otomatis. param: event: String JSON acara OSS, termasuk URI objek OSS dan informasi lainnya. param: context: Konteks fungsi, termasuk kredensial dan informasi runtime. """ evt_lst = json.loads(event) creds = context.credentials auth = oss2.StsAuth( creds.access_key_id, creds.access_key_secret, creds.security_token) evt = evt_lst['events'][0] bucket_name = evt['oss']['bucket']['name'] endpoint = 'oss-' + evt['region'] + '-internal.aliyuncs.com' bucket = oss2.Bucket(auth, endpoint, bucket_name) object_name = evt['oss']['object']['key'] if "ObjectCreated:PutSymlink" == evt['eventName']: object_name = bucket.get_symlink(object_name).target_key if object_name == "": raise RuntimeError('{} adalah file symlink yang tidak valid'.format( evt['oss']['object']['key'])) file_type = os.path.splitext(object_name)[1] if file_type != ".zip": raise RuntimeError('{} tipe file bukan zip'.format(object_name)) LOGGER.info("mulai mendekompresi file zip = {}".format(object_name)) lst = object_name.split("/") zip_name = lst[-1] PROCESSED_DIR = os.environ.get("PROCESSED_DIR", "") RETAIN_FILE_NAME = os.environ.get("RETAIN_FILE_NAME", "") if PROCESSED_DIR and PROCESSED_DIR[-1] != "/": PROCESSED_DIR += "/" if RETAIN_FILE_NAME == "false": newKey = PROCESSED_DIR else: newKey = PROCESSED_DIR + zip_name zip_fp = helper.OssStreamFileLikeObject(bucket, object_name) newKey = newKey.replace(".zip", "/") with helper.zipfile_support_oss.ZipFile(zip_fp) as zip_file: for name in zip_file.namelist(): with zip_file.open(name) as file_obj: name = get_zipfile_name(name) bucket.put_object(newKey + name, file_obj)helper.pyfileKlik untuk melihat contoh kode
# -*- coding: utf-8 -*- import oss2 from oss2 import utils, models import ossZipfile as zipfile zipfile_support_oss = zipfile # Dukungan unggah ke OSS sebagai objek seperti file def make_crc_adapter(data, init_crc=0): data = utils.to_bytes(data) # Objek seperti file if hasattr(data, 'read'): return utils._FileLikeAdapter(data, crc_callback=utils.Crc64(init_crc)) utils.make_crc_adapter = make_crc_adapter class OssStreamFileLikeObject(object): def __init__(self, bucket, key): super(OssStreamFileLikeObject, self).__init__() self._bucket = bucket self._key = key self._meta_data = self._bucket.get_object_meta(self._key) @property def bucket(self): return self._bucket @property def key(self): return self._key @property def filesize(self): return self._meta_data.content_length def get_reader(self, begin, end): begin = begin if begin >= 0 else 0 end = end if end > 0 else self.filesize - 1 end = end if end < self.filesize else self.filesize - 1 begin = begin if begin < end else end return self._bucket.get_object(self._key, byte_range=(begin, end)) def get_content_bytes(self, begin, end): reader = self.get_reader(begin, end) return reader.read() def get_last_content_bytes(self, offset): return self.get_content_bytes(self.filesize-offset, self.filesize-1)ossZipfile.pyfileKlik untuk melihat contoh kode
""" Baca dan tulis file ZIP. XXX referensi ke UTF-8 memerlukan investigasi lebih lanjut. """ import io import os import importlib.util import sys import time import stat import shutil import struct import binascii import threading try: import zlib # Kita mungkin memerlukan metode kompresinya crc32 = zlib.crc32 except ImportError: zlib = None crc32 = binascii.crc32 try: import bz2 # Kita mungkin memerlukan metode kompresinya except ImportError: bz2 = None try: import lzma # Kita mungkin memerlukan metode kompresinya except ImportError: lzma = None __all__ = ["BadZipFile", "BadZipfile", "error", "ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2", "ZIP_LZMA", "is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile"] class BadZipFile(Exception): pass class LargeZipFile(Exception): """ Dilempar saat menulis file ZIP, file ZIP memerlukan ekstensi ZIP64 dan ekstensi tersebut dinonaktifkan. """ error = BadZipfile = BadZipFile # Nama kompatibilitas pra-3.2 ZIP64_LIMIT = (1 << 31) - 1 ZIP_FILECOUNT_LIMIT = (1 << 16) - 1 ZIP_MAX_COMMENT = (1 << 16) - 1 # Konstanta untuk metode kompresi file ZIP ZIP_STORED = 0 ZIP_DEFLATED = 8 ZIP_BZIP2 = 12 ZIP_LZMA = 14 # Metode kompresi ZIP lainnya tidak didukung DEFAULT_VERSION = 20 ZIP64_VERSION = 45 BZIP2_VERSION = 46 LZMA_VERSION = 63 # Kami mengenali (tetapi tidak selalu mendukung) semua fitur hingga versi tersebut MAX_EXTRACT_VERSION = 63 # Di bawah ini adalah beberapa format dan data terkait untuk membaca/menulis header menggunakan # modul struct. Nama dan struktur header/rekaman adalah yang digunakan # dalam deskripsi PKWARE dari format file ZIP: # http://www.pkware.com/documents/casestudies/APPNOTE.TXT # (URL valid pada Januari 2008) # Struktur "akhir direktori pusat", nomor ajaib, ukuran, dan indeks # (bagian V.I dalam dokumen format) structEndArchive = b"<4s4H2LH" stringEndArchive = b"PK\005\006" sizeEndCentDir = struct.calcsize(structEndArchive) _ECD_SIGNATURE = 0 _ECD_DISK_NUMBER = 1 _ECD_DISK_START = 2 _ECD_ENTRIES_THIS_DISK = 3 _ECD_ENTRIES_TOTAL = 4 _ECD_SIZE = 5 _ECD_OFFSET = 6 _ECD_COMMENT_SIZE = 7 # Dua indeks terakhir ini bukan bagian dari struktur seperti yang didefinisikan dalam # spesifikasi, tetapi mereka digunakan secara internal oleh modul ini sebagai kenyamanan _ECD_COMMENT = 8 _ECD_LOCATION = 9 # Struktur "direktori pusat", nomor ajaib, ukuran, dan indeks # entri dalam struktur (bagian V.F dalam dokumen format) structCentralDir = "<4s4B4HL2L5H2L" stringCentralDir = b"PK\001\002" sizeCentralDir = struct.calcsize(structCentralDir) # Indeks entri dalam struktur direktori pusat _CD_SIGNATURE = 0 _CD_CREATE_VERSION = 1 _CD_CREATE_SYSTEM = 2 _CD_EXTRACT_VERSION = 3 _CD_EXTRACT_SYSTEM = 4 _CD_FLAG_BITS = 5 _CD_COMPRESS_TYPE = 6 _CD_TIME = 7 _CD_DATE = 8 _CD_CRC = 9 _CD_COMPRESSED_SIZE = 10 _CD_UNCOMPRESSED_SIZE = 11 _CD_FILENAME_LENGTH = 12 _CD_EXTRA_FIELD_LENGTH = 13 _CD_COMMENT_LENGTH = 14 _CD_DISK_NUMBER_START = 15 _CD_INTERNAL_FILE_ATTRIBUTES = 16 _CD_EXTERNAL_FILE_ATTRIBUTES = 17 _CD_LOCAL_HEADER_OFFSET = 18 # Struktur "header file lokal", nomor ajaib, ukuran, dan indeks # (bagian V.A dalam dokumen format) structFileHeader = "<4s2B4HL2L2H" stringFileHeader = b"PK\003\004" sizeFileHeader = struct.calcsize(structFileHeader) _FH_SIGNATURE = 0 _FH_EXTRACT_VERSION = 1 _FH_EXTRACT_SYSTEM = 2 _FH_GENERAL_PURPOSE_FLAG_BITS = 3 _FH_COMPRESSION_METHOD = 4 _FH_LAST_MOD_TIME = 5 _FH_LAST_MOD_DATE = 6 _FH_CRC = 7 _FH_COMPRESSED_SIZE = 8 _FH_UNCOMPRESSED_SIZE = 9 _FH_FILENAME_LENGTH = 10 _FH_EXTRA_FIELD_LENGTH = 11 # Struktur "lokator akhir direktori pusat ZIP64", nomor ajaib, dan ukuran structEndArchive64Locator = "<4sLQL" stringEndArchive64Locator = b"PK\x06\x07" sizeEndCentDir64Locator = struct.calcsize(structEndArchive64Locator) # Rekaman "akhir direktori pusat ZIP64", nomor ajaib, ukuran, dan indeks # (bagian V.G dalam dokumen format) structEndArchive64 = "<4sQ2H2L4Q" stringEndArchive64 = b"PK\x06\x06" sizeEndCentDir64 = struct.calcsize(structEndArchive64) _CD64_SIGNATURE = 0 _CD64_DIRECTORY_RECSIZE = 1 _CD64_CREATE_VERSION = 2 _CD64_EXTRACT_VERSION = 3 _CD64_DISK_NUMBER = 4 _CD64_DISK_NUMBER_START = 5 _CD64_NUMBER_ENTRIES_THIS_DISK = 6 _CD64_NUMBER_ENTRIES_TOTAL = 7 _CD64_DIRECTORY_SIZE = 8 _CD64_OFFSET_START_CENTDIR = 9 def _check_zipfile(fp): try: if _EndRecData(fp): return True # File memiliki nomor ajaib yang benar except OSError: pass return False def is_zipfile(filename): """Cepat lihat apakah sebuah file adalah file ZIP dengan memeriksa nomor ajaib. Argumen filename bisa berupa file atau objek file-like juga. """ result = False try: if hasattr(filename, "read"): result = _check_zipfile(fp=filename) else: with open(filename, "rb") as fp: result = _check_zipfile(fp) except OSError: pass return result def _EndRecData64(fpin, offset, endrec): """ Baca rekaman akhir arsip ZIP64 dan gunakan itu untuk memperbarui endrec """ if hasattr(fpin, 'bucket'): data = fpin.get_content_bytes( fpin.filesize+offset-sizeEndCentDir64Locator, fpin.filesize+offset-1) else: try: fpin.seek(offset - sizeEndCentDir64Locator, 2) except OSError: # Jika pencarian gagal, file tidak cukup besar untuk berisi rekaman akhir arsip ZIP64, # jadi cukup kembalikan rekaman akhir yang diberikan. return endrec data = fpin.read(sizeEndCentDir64Locator) if len(data) != sizeEndCentDir64Locator: return endrec sig, diskno, reloff, disks = struct.unpack(structEndArchive64Locator, data) if sig != stringEndArchive64Locator: return endrec if diskno != 0 or disks != 1: raise BadZipFile("file zip yang mencakup beberapa disk tidak didukung") # Asumsikan tidak ada 'data ekstensible zip64' if hasattr(fpin, 'bucket'): data = fpin.get_content_bytes( fpin.filesize + offset - sizeEndCentDir64Locator - sizeEndCentDir64, fpin.filesize+offset-sizeEndCentDir64Locator-1) else: fpin.seek(offset - sizeEndCentDir64Locator - sizeEndCentDir64, 2) data = fpin.read(sizeEndCentDir64) if len(data) != sizeEndCentDir64: return endrec sig, sz, create_version, read_version, disk_num, disk_dir, \ dircount, dircount2, dirsize, diroffset = \ struct.unpack(structEndArchive64, data) if sig != stringEndArchive64: return endrec # Perbarui endrec asli menggunakan data dari rekaman ZIP64 endrec[_ECD_SIGNATURE] = sig endrec[_ECD_DISK_NUMBER] = disk_num endrec[_ECD_DISK_START] = disk_dir endrec[_ECD_ENTRIES_THIS_DISK] = dircount endrec[_ECD_ENTRIES_TOTAL] = dircount2 endrec[_ECD_SIZE] = dirsize endrec[_ECD_OFFSET] = diroffset return endrec def _EndRecData(fpin): """Kembalikan data dari rekaman "Akhir Direktori Pusat", atau None. Data adalah daftar sembilan item dalam rekaman ZIP "Akhir direktori pusat" diikuti oleh item kesepuluh, offset pencarian file dari rekaman ini.""" # Tentukan ukuran file if hasattr(fpin, 'bucket'): filesize = fpin.filesize data = fpin.get_last_content_bytes(sizeEndCentDir) else: fpin.seek(0, 2) filesize = fpin.tell() # Periksa apakah ini file ZIP tanpa komentar arsip (struktur # "akhir direktori pusat" harus menjadi item terakhir dalam # file jika ini masalahnya). try: fpin.seek(-sizeEndCentDir, 2) except OSError: return None data = fpin.read() if (len(data) == sizeEndCentDir and data[0:4] == stringEndArchive and data[-2:] == b"\000\000"): # Tanda tangannya benar dan tidak ada komentar, lepaskan struktur endrec = struct.unpack(structEndArchive, data) endrec = list(endrec) # Tambahkan komentar kosong dan catat offset awal endrec.append(b"") endrec.append(filesize - sizeEndCentDir) # Coba baca struktur "Akhir Direktori Pusat ZIP64" return _EndRecData64(fpin, -sizeEndCentDir, endrec) # Entah ini bukan file ZIP, atau ini adalah file ZIP dengan komentar arsip. # Cari di akhir file untuk tanda tangan rekaman "akhir direktori pusat". # Komentar adalah item terakhir dalam file ZIP dan bisa # hingga 64K panjangnya. Diasumsikan bahwa nomor ajaib "akhir direktori pusat" # tidak muncul dalam komentar. maxCommentStart = max(filesize - (1 << 16) - sizeEndCentDir, 0) if hasattr(fpin, 'bucket'): data = fpin.get_content_bytes(maxCommentStart, -1) else: fpin.seek(maxCommentStart, 0) data = fpin.read() start = data.rfind(stringEndArchive) if start >= 0: # Nomor ajaib ditemukan; coba bongkar dan interpretasikan recData = data[start:start+sizeEndCentDir] if len(recData) != sizeEndCentDir: # File ZIP rusak. return None endrec = list(struct.unpack(structEndArchive, recData)) commentSize = endrec[_ECD_COMMENT_SIZE] # Seperti yang diklaim oleh file ZIP comment = data[start+sizeEndCentDir:start+sizeEndCentDir+commentSize] endrec.append(comment) endrec.append(maxCommentStart + start) # Coba baca struktur "Akhir Direktori Pusat ZIP64" return _EndRecData64(fpin, maxCommentStart + start - filesize, endrec) # Tidak dapat menemukan struktur akhir direktori pusat yang valid return None class ZipInfo (object): """Kelas dengan atribut yang menggambarkan setiap file dalam arsip ZIP.""" __slots__ = ( 'orig_filename', 'filename', 'date_time', 'compress_type', 'comment', 'extra', 'create_system', 'create_version', 'extract_version', 'reserved', 'flag_bits', 'volume', 'internal_attr', 'external_attr', 'header_offset', 'CRC', 'compress_size', 'file_size', '_raw_time', ) def __init__(self, filename="NoName", date_time=(1980, 1, 1, 0, 0, 0)): self.orig_filename = filename # Nama file asli dalam arsip # Akhiri nama file pada byte null pertama. Byte null dalam nama file # digunakan sebagai trik oleh virus dalam arsip. null_byte = filename.find(chr(0)) if null_byte >= 0: filename = filename[0:null_byte] # Ini digunakan untuk memastikan jalur dalam file ZIP yang dihasilkan selalu menggunakan # garis miring ke depan sebagai pemisah direktori, seperti yang diminta oleh # spesifikasi format ZIP. if os.sep != "/" and os.sep in filename: filename = filename.replace(os.sep, "/") self.filename = filename # Nama file normal self.date_time = date_time # tahun, bulan, hari, jam, menit, detik if date_time[0] < 1980: raise ValueError('ZIP tidak mendukung timestamp sebelum 1980') # Nilai standar: self.compress_type = ZIP_STORED # Jenis kompresi untuk file self.comment = b"" # Komentar untuk setiap file self.extra = b"" # Data tambahan ZIP if sys.platform == 'win32': self.create_system = 0 # Sistem yang membuat arsip ZIP else: # Anggap semuanya lainnya adalah unix-y self.create_system = 3 # Sistem yang membuat arsip ZIP self.create_version = DEFAULT_VERSION # Versi yang membuat arsip ZIP self.extract_version = DEFAULT_VERSION # Versi yang diperlukan untuk mengekstrak arsip self.reserved = 0 # Harus nol self.flag_bits = 0 # Bit flag ZIP self.volume = 0 # Nomor volume header file self.internal_attr = 0 # Atribut internal self.external_attr = 0 # Atribut file eksternal # Atribut lainnya disetel oleh kelas ZipFile: # header_offset Offset byte ke header file # CRC CRC-32 dari file yang tidak dikompresi # compress_size Ukuran file yang dikompresi # file_size Ukuran file yang tidak dikompresi def __repr__(self): result = ['<%s filename=%r' % (self.__class__.__name__, self.filename)] if self.compress_type != ZIP_STORED: result.append(' compress_type=%s' % compressor_names.get(self.compress_type, self.compress_type)) hi = self.external_attr >> 16 lo = self.external_attr & 0xFFFF if hi: result.append(' filemode=%r' % stat.filemode(hi)) if lo: result.append(' external_attr=%#x' % lo) isdir = self.is_dir() if not isdir or self.file_size: result.append(' file_size=%r' % self.file_size) if ((not isdir or self.compress_size) and (self.compress_type != ZIP_STORED or self.file_size != self.compress_size)): result.append(' compress_size=%r' % self.compress_size) result.append('>') return ''.join(result) def FileHeader(self, zip64=None): """Kembalikan header per file sebagai string.""" dt = self.date_time dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2] dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2) if self.flag_bits & 0x08: # Setel ini ke nol karena kita menulisnya setelah data file CRC = compress_size = file_size = 0 else: CRC = self.CRC compress_size = self.compress_size file_size = self.file_size extra = self.extra min_version = 0 if zip64 is None: zip64 = file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT if zip64: fmt = '<HHQQ' extra = extra + struct.pack(fmt, 1, struct.calcsize(fmt)-4, file_size, compress_size) if file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT: if not zip64: raise LargeZipFile("Ukuran file memerlukan ekstensi ZIP64") # File lebih besar dari yang dapat ditampung dalam integer 4 byte, # kembali ke ekstensi ZIP64 file_size = 0xffffffff compress_size = 0xffffffff min_version = ZIP64_VERSION if self.compress_type == ZIP_BZIP2: min_version = max(BZIP2_VERSION, min_version) elif self.compress_type == ZIP_LZMA: min_version = max(LZMA_VERSION, min_version) self.extract_version = max(min_version, self.extract_version) self.create_version = max(min_version, self.create_version) filename, flag_bits = self._encodeFilenameFlags() header = struct.pack(structFileHeader, stringFileHeader, self.extract_version, self.reserved, flag_bits, self.compress_type, dostime, dosdate, CRC, compress_size, file_size, len(filename), len(extra)) return header + filename + extra def _encodeFilenameFlags(self): try: return self.filename.encode('ascii'), self.flag_bits except UnicodeEncodeError: return self.filename.encode('utf-8'), self.flag_bits | 0x800 def _decodeExtra(self): # Coba dekode bidang tambahan. extra = self.extra unpack = struct.unpack while len(extra) >= 4: tp, ln = unpack('<HH', extra[:4]) if ln+4 > len(extra): raise BadZipFile( "Bidang tambahan rusak %04x (ukuran=%d)" % (tp, ln)) if tp == 0x0001: if ln >= 24: counts = unpack('<QQQ', extra[4:28]) elif ln == 16: counts = unpack('<QQ', extra[4:20]) elif ln == 8: counts = unpack('<Q', extra[4:12]) elif ln == 0: counts = () else: raise BadZipFile( "Bidang tambahan rusak %04x (ukuran=%d)" % (tp, ln)) idx = 0 # Ekstensi ZIP64 (file besar dan/atau arsip besar) if self.file_size in (0xffffffffffffffff, 0xffffffff): self.file_size = counts[idx] idx += 1 if self.compress_size == 0xFFFFFFFF: self.compress_size = counts[idx] idx += 1 if self.header_offset == 0xffffffff: old = self.header_offset self.header_offset = counts[idx] idx += 1 extra = extra[ln+4:] @classmethod def from_file(cls, filename, arcname=None): """Bangun ZipInfo yang sesuai untuk file pada sistem file. filename harus berupa path ke file atau direktori pada sistem file. arcname adalah nama yang akan dimiliki dalam arsip (secara default, ini akan sama dengan filename, tetapi tanpa huruf drive dan dengan pemisah jalur terdepan dihapus). """ if isinstance(filename, os.PathLike): filename = os.fspath(filename) st = os.stat(filename) isdir = stat.S_ISDIR(st.st_mode) mtime = time.localtime(st.st_mtime) date_time = mtime[0:6] # Buat instance ZipInfo untuk menyimpan informasi file if arcname is None: arcname = filename arcname = os.path.normpath(os.path.splitdrive(arcname)[1]) while arcname[0] in (os.sep, os.altsep): arcname = arcname[1:] if isdir: arcname += '/' zinfo = cls(arcname, date_time) zinfo.external_attr = (st.st_mode & 0xFFFF) << 16 # Atribut Unix if isdir: zinfo.file_size = 0 zinfo.external_attr |= 0x10 # Bendera direktori MS-DOS else: zinfo.file_size = st.st_size return zinfo def is_dir(self): """Kembalikan True jika anggota arsip ini adalah direktori.""" return self.filename[-1] == '/' # Enkripsi ZIP menggunakan primitif CRC32 satu-byte untuk mengacak beberapa # kunci internal. Kami memperhatikan bahwa implementasi langsung lebih cepat daripada # bergantung pada binascii.crc32(). _crctable = None def _gen_crc(crc): for j in range(8): if crc & 1: crc = (crc >> 1) ^ 0xEDB88320 else: crc >>= 1 return crc # ZIP mendukung bentuk enkripsi berbasis kata sandi. Meskipun serangan teks biasa dikenal # telah ditemukan terhadapnya, itu masih berguna # untuk dapat mengambil data dari file semacam itu. # # Penggunaan: # zd = _ZipDecrypter(mypwd) # plain_bytes = zd(cypher_bytes) def _ZipDecrypter(pwd): key0 = 305419896 key1 = 591751049 key2 = 878082192 global _crctable if _crctable is None: _crctable = list(map(_gen_crc, range(256))) crctable = _crctable def crc32(ch, crc): """Hitung primitif CRC32 pada satu byte.""" return (crc >> 8) ^ crctable[(crc ^ ch) & 0xFF] def update_keys(c): nonlocal key0, key1, key2 key0 = crc32(c, key0) key1 = (key1 + (key0 & 0xFF)) & 0xFFFFFFFF key1 = (key1 * 134775813 + 1) & 0xFFFFFFFF key2 = crc32(key1 >> 24, key2) for p in pwd: update_keys(p) def decrypter(data): """Dekripsi objek bytes.""" result = bytearray() append = result.append for c in data: k = key2 | 2 c ^= ((k * (k ^ 1)) >> 8) & 0xFF update_keys(c) append(c) return bytes(result) return decrypter class LZMACompressor: def __init__(self): self._comp = None def _init(self): props = lzma._encode_filter_properties({'id': lzma.FILTER_LZMA1}) self._comp = lzma.LZMACompressor(lzma.FORMAT_RAW, filters=[ lzma._decode_filter_properties(lzma.FILTER_LZMA1, props) ]) return struct.pack('<BBH', 9, 4, len(props)) + props def compress(self, data): if self._comp is None: return self._init() + self._comp.compress(data) return self._comp.compress(data) def flush(self): if self._comp is None: return self._init() + self._comp.flush() return self._comp.flush() class LZMADecompressor: def __init__(self): self._decomp = None self._unconsumed = b'' self.eof = False def decompress(self, data): if self._decomp is None: self._unconsumed += data if len(self._unconsumed) <= 4: return b'' psize, = struct.unpack('<H', self._unconsumed[2:4]) if len(self._unconsumed) <= 4 + psize: return b'' self._decomp = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[ lzma._decode_filter_properties(lzma.FILTER_LZMA1, self._unconsumed[4:4 + psize]) ]) data = self._unconsumed[4 + psize:] del self._unconsumed result = self._decomp.decompress(data) self.eof = self._decomp.eof return result compressor_names = { 0: 'store', 1: 'shrink', 2: 'reduce', 3: 'reduce', 4: 'reduce', 5: 'reduce', 6: 'implode', 7: 'tokenize', 8: 'deflate', 9: 'deflate64', 10: 'implode', 12: 'bzip2', 14: 'lzma', 18: 'terse', 19: 'lz77', 97: 'wavpack', 98: 'ppmd', } def _check_compression(compression): if compression == ZIP_STORED: pass elif compression == ZIP_DEFLATED: if not zlib: raise RuntimeError( "Kompresi memerlukan modul (tidak tersedia) zlib") elif compression == ZIP_BZIP2: if not bz2: raise RuntimeError( "Kompresi memerlukan modul (tidak tersedia) bz2") elif compression == ZIP_LZMA: if not lzma: raise RuntimeError( "Kompresi memerlukan modul (tidak tersedia) lzma") else: raise NotImplementedError("Metode kompresi tersebut tidak didukung") def _get_compressor(compress_type): if compress_type == ZIP_DEFLATED: return zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15) elif compress_type == ZIP_BZIP2: return bz2.BZ2Compressor() elif compress_type == ZIP_LZMA: return LZMACompressor() else: return None def _get_decompressor(compress_type): if compress_type == ZIP_STORED: return None elif compress_type == ZIP_DEFLATED: return zlib.decompressobj(-15) elif compress_type == ZIP_BZIP2: return bz2.BZ2Decompressor() elif compress_type == ZIP_LZMA: return LZMADecompressor() else: descr = compressor_names.get(compress_type) if descr: raise NotImplementedError( "tipe kompresi %d (%s)" % (compress_type, descr)) else: raise NotImplementedError("tipe kompresi %d" % (compress_type,)) class _SharedFile: def __init__(self, file, pos, close, lock, writing): self._file = file self._pos = pos self._close = close self._lock = lock self._writing = writing if hasattr(self._file, "bucket"): self._fp = self._file.get_reader(pos, -1) def read(self, n=-1): with self._lock: if self._writing(): raise ValueError("Tidak bisa membaca dari file ZIP saat ada " "handle penulisan terbuka padanya. " "Tutup handle penulisan sebelum mencoba membaca.") if hasattr(self._file, "bucket"): data = self._fp.read(n) self._pos += n else: self._file.seek(self._pos) data = self._file.read(n) self._pos = self._file.tell() return data def close(self): if self._file is not None: fileobj = self._file self._file = None self._close(fileobj) # Berikan metode tell untuk aliran yang tidak dapat dicari class _Tellable: def __init__(self, fp): self.fp = fp self.offset = 0 def write(self, data): n = self.fp.write(data) self.offset += n return n def tell(self): return self.offset def flush(self): self.fp.flush() def close(self): self.fp.close() class ZipExtFile(io.BufferedIOBase): """Objek seperti file untuk membaca anggota arsip. Dikembalikan oleh ZipFile.open(). """ # Ukuran maksimum yang didukung oleh dekompresor. MAX_N = 1 << 31 - 1 # Baca dari file terkompresi dalam blok 4k. MIN_READ_SIZE = 4096 def __init__(self, fileobj, mode, zipinfo, decrypter=None, close_fileobj=False): self._fileobj = fileobj self._decrypter = decrypter self._close_fileobj = close_fileobj self._compress_type = zipinfo.compress_type self._compress_left = zipinfo.compress_size self._left = zipinfo.file_size self._decompressor = _get_decompressor(self._compress_type) self._eof = False self._readbuffer = b'' self._offset = 0 self.newlines = None # Sesuaikan ukuran baca untuk file terenkripsi karena 12 byte pertama # digunakan untuk informasi enkripsi/kata sandi. if self._decrypter is not None: self._compress_left -= 12 self.mode = mode self.name = zipinfo.filename if hasattr(zipinfo, 'CRC'): self._expected_crc = zipinfo.CRC self._running_crc = crc32(b'') else: self._expected_crc = None def __repr__(self): result = ['<%s.%s' % (self.__class__.__module__, self.__class__.__qualname__)] if not self.closed: result.append(' name=%r mode=%r' % (self.name, self.mode)) if self._compress_type != ZIP_STORED: result.append(' compress_type=%s' % compressor_names.get(self._compress_type, self._compress_type)) else: result.append(' [closed]') result.append('>') return ''.join(result) def readline(self, limit=-1): """Baca dan kembalikan satu baris dari aliran. Jika limit ditentukan, paling banyak limit byte akan dibaca. """ if limit < 0: # Pintasan kasus umum - baris baru ditemukan di buffer. i = self._readbuffer.find(b'\n', self._offset) + 1 if i > 0: line = self._readbuffer[self._offset: i] self._offset = i return line return io.BufferedIOBase.readline(self, limit) def peek(self, n=1): """Kembalikan byte yang di-buffer tanpa maju posisi.""" if n > len(self._readbuffer) - self._offset: chunk = self.read(n) if len(chunk) > self._offset: self._readbuffer = chunk + self._readbuffer[self._offset:] self._offset = 0 else: self._offset -= len(chunk) # Kembalikan hingga 512 byte untuk mengurangi overhead alokasi untuk loop ketat. return self._readbuffer[self._offset: self._offset + 512] def readable(self): return True def read(self, n=-1): """Baca dan kembalikan hingga n byte. Jika argumen dihilangkan, None, atau negatif, data dibaca dan dikembalikan sampai EOF tercapai.. """ if n is None or n < 0: buf = self._readbuffer[self._offset:] self._readbuffer = b'' self._offset = 0 while not self._eof: buf += self._read1(self.MAX_N) return buf end = n + self._offset if end < len(self._readbuffer): buf = self._readbuffer[self._offset:end] self._offset = end return buf n = end - len(self._readbuffer) buf = self._readbuffer[self._offset:] self._readbuffer = b'' self._offset = 0 while n > 0 and not self._eof: data = self._read1(n) if n < len(data): self._readbuffer = data self._offset = n buf += data[:n] break buf += data n -= len(data) return buf def _update_crc(self, newdata): # Perbarui CRC menggunakan data yang diberikan. if self._expected_crc is None: # Tidak perlu menghitung CRC jika kita tidak memiliki nilai referensi return self._running_crc = crc32(newdata, self._running_crc) # Periksa CRC jika kita berada di akhir file if self._eof and self._running_crc != self._expected_crc: raise BadZipFile("Bad CRC-32 untuk file %r" % self.name) def read1(self, n): """Baca hingga n byte dengan paling banyak satu panggilan sistem read().""" if n is None or n < 0: buf = self._readbuffer[self._offset:] self._readbuffer = b'' self._offset = 0 while not self._eof: data = self._read1(self.MAX_N) if data: buf += data break return buf end = n + self._offset if end < len(self._readbuffer): buf = self._readbuffer[self._offset:end] self._offset = end return buf n = end - len(self._readbuffer) buf = self._readbuffer[self._offset:] self._readbuffer = b'' self._offset = 0 if n > 0: while not self._eof: data = self._read1(n) if n < len(data): self._readbuffer = data self._offset = n buf += data[:n] break if data: buf += data break return buf def _read1(self, n): # Baca hingga n byte terkompresi dengan paling banyak satu panggilan sistem read(), # dekripsi dan dekompresi mereka. if self._eof or n <= 0: return b'' # Baca dari file. if self._compress_type == ZIP_DEFLATED: # Tangani data yang belum dikonsumsi. data = self._decompressor.unconsumed_tail if n > len(data): data += self._read2(n - len(data)) else: data = self._read2(n) if self._compress_type == ZIP_STORED: self._eof = self._compress_left <= 0 elif self._compress_type == ZIP_DEFLATED: n = max(n, self.MIN_READ_SIZE) data = self._decompressor.decompress(data, n) self._eof = (self._decompressor.eof or self._compress_left <= 0 and not self._decompressor.unconsumed_tail) if self._eof: data += self._decompressor.flush() else: data = self._decompressor.decompress(data) self._eof = self._decompressor.eof or self._compress_left <= 0 data = data[:self._left] self._left -= len(data) if self._left <= 0: self._eof = True self._update_crc(data) return data def _read2(self, n): if self._compress_left <= 0: return b'' n = max(n, self.MIN_READ_SIZE) n = min(n, self._compress_left) data = self._fileobj.read(n) self._compress_left -= len(data) if not data: raise EOFError if self._decrypter is not None: data = self._decrypter(data) return data def close(self): try: if self._close_fileobj: self._fileobj.close() finally: super().close() class _ZipWriteFile(io.BufferedIOBase): def __init__(self, zf, zinfo, zip64): self._zinfo = zinfo self._zip64 = zip64 self._zipfile = zf self._compressor = _get_compressor(zinfo.compress_type) self._file_size = 0 self._compress_size = 0 self._crc = 0 @property def _fileobj(self): return self._zipfile.fp def writable(self): return True def write(self, data): if self.closed: raise ValueError('Operasi I/O pada file yang ditutup.') nbytes = len(data) self._file_size += nbytes self._crc = crc32(data, self._crc) if self._compressor: data = self._compressor.compress(data) self._compress_size += len(data) self._fileobj.write(data) return nbytes def close(self): if self.closed: return super().close() # Flush data apa pun dari kompresor, dan perbarui info header if self._compressor: buf = self._compressor.flush() self._compress_size += len(buf) self._fileobj.write(buf) self._zinfo.compress_size = self._compress_size else: self._zinfo.compress_size = self._file_size self._zinfo.CRC = self._crc self._zinfo.file_size = self._file_size # Tulis ulang info header yang diperbarui if self._zinfo.flag_bits & 0x08: # Tulis CRC dan ukuran file setelah data file fmt = '<LQQ' if self._zip64 else '<LLL' self._fileobj.write(struct.pack(fmt, self._zinfo.CRC, self._zinfo.compress_size, self._zinfo.file_size)) self._zipfile.start_dir = self._fileobj.tell() else: if not self._zip64: if self._file_size > ZIP64_LIMIT: raise RuntimeError('Ukuran file melebihi batas ZIP64 secara tak terduga') if self._compress_size > ZIP64_LIMIT: raise RuntimeError('Ukuran terkompresi melebihi batas ZIP64 secara tak terduga') # Mundur dan tulis header file (yang sekarang akan mencakup # CRC dan ukuran file yang benar) # Simpan posisi saat ini dalam file self._zipfile.start_dir = self._fileobj.tell() self._fileobj.seek(self._zinfo.header_offset) self._fileobj.write(self._zinfo.FileHeader(self._zip64)) self._fileobj.seek(self._zipfile.start_dir) self._zipfile._writing = False # Berhasil ditulis: Tambahkan file ke cache kami self._zipfile.filelist.append(self._zinfo) self._zipfile.NameToInfo[self._zinfo.filename] = self._zinfo class ZipFile: """ Kelas dengan metode untuk membuka, membaca, menulis, menutup, daftar file zip. z = ZipFile(file, mode="r", compression=ZIP_STORED, allowZip64=True) file: Baik path ke file, atau objek seperti file. Jika itu adalah path, file akan dibuka dan ditutup oleh ZipFile. mode: Mode bisa berupa baca 'r', tulis 'w', buat eksklusif 'x', atau tambahkan 'a'. compression: ZIP_STORED (tanpa kompresi), ZIP_DEFLATED (memerlukan zlib), ZIP_BZIP2 (memerlukan bz2) atau ZIP_LZMA (memerlukan lzma). allowZip64: jika Benar ZipFile akan membuat file dengan ekstensi ZIP64 saat diperlukan, jika tidak akan memunculkan pengecualian saat ini akan diperlukan. """ fp = None # Diatur di sini karena __del__ memeriksanya _windows_illegal_name_trans_table = None def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True): """Buka file ZIP dengan mode baca 'r', tulis 'w', buat eksklusif 'x', atau tambahkan 'a'.""" if mode not in ('r', 'w', 'x', 'a'): raise ValueError("ZipFile memerlukan mode 'r', 'w', 'x', atau 'a'") _check_compression(compression) self._allowZip64 = allowZip64 self._didModify = False self.debug = 0 # Tingkat pencetakan: 0 hingga 3 self.NameToInfo = {} # Temukan informasi file yang diberikan nama self.filelist = [] # Daftar instance ZipInfo untuk arsip self.compression = compression # Metode kompresi self.mode = mode self.pwd = None self._comment = b'' # Periksa apakah kita diberikan objek seperti file if isinstance(file, os.PathLike): file = os.fspath(file) if isinstance(file, str): # Tidak, itu adalah nama file self._filePassed = 0 self.filename = file modeDict = {'r': 'rb', 'w': 'w+b', 'x': 'x+b', 'a': 'r+b', 'r+b': 'w+b', 'w+b': 'wb', 'x+b': 'xb'} filemode = modeDict[mode] while True: try: self.fp = io.open(file, filemode) except OSError: if filemode in modeDict: filemode = modeDict[filemode] continue raise break else: self._filePassed = 1 self.fp = file self.filename = getattr(file, 'name', None) self._fileRefCnt = 1 self._lock = threading.RLock() self._seekable = True self._writing = False try: if mode == 'r': self._RealGetContents() elif mode in ('w', 'x'): # atur bendera dimodifikasi sehingga direktori pusat ditulis # bahkan jika tidak ada file yang ditambahkan ke arsip self._didModify = True try: self.start_dir = self.fp.tell() except (AttributeError, OSError): self.fp = _Tellable(self.fp) self.start_dir = 0 self._seekable = False else: # Beberapa objek seperti file dapat memberikan tell() tetapi tidak seek() try: self.fp.seek(self.start_dir) except (AttributeError, OSError): self._seekable = False elif mode == 'a': try: # Lihat apakah file adalah file zip self._RealGetContents() # cari ke awal direktori dan timpa self.fp.seek(self.start_dir) except BadZipFile: # file bukan file zip, cukup tambahkan self.fp.seek(0, 2) # atur bendera dimodifikasi sehingga direktori pusat ditulis # bahkan jika tidak ada file yang ditambahkan ke arsip self._didModify = True self.start_dir = self.fp.tell() else: raise ValueError("Mode harus 'r', 'w', 'x', atau 'a'") except: fp = self.fp self.fp = None self._fpclose(fp) raise def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def __repr__(self): result = ['<%s.%s' % (self.__class__.__module__, self.__class__.__qualname__)] if self.fp is not None: if self._filePassed: result.append(' file=%r' % self.fp) elif self.filename is not None: result.append(' filename=%r' % self.filename) result.append(' mode=%r' % self.mode) else: result.append(' [closed]') result.append('>') return ''.join(result) def _RealGetContents(self): """Baca isi tabel konten untuk file ZIP.""" fp = self.fp try: endrec = _EndRecData(fp) except OSError: raise BadZipFile("File bukan file zip") if not endrec: raise BadZipFile("File bukan file zip") if self.debug > 1: print(endrec) size_cd = endrec[_ECD_SIZE] # byte dalam direktori pusat offset_cd = endrec[_ECD_OFFSET] # offset direktori pusat self._comment = endrec[_ECD_COMMENT] # komentar arsip # "concat" adalah nol, kecuali jika zip digabungkan dengan file lain concat = endrec[_ECD_LOCATION] - size_cd - offset_cd if endrec[_ECD_SIGNATURE] == stringEndArchive64: # Jika struktur ekstensi Zip64 ada, akun untuk itu concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator) if self.debug > 2: inferred = concat + offset_cd print("given, inferred, offset", offset_cd, inferred, concat) # self.start_dir: Posisi awal direktori pusat self.start_dir = offset_cd + concat if hasattr(fp, "bucket"): data = fp.get_content_bytes( self.start_dir, self.start_dir+size_cd-1) else: fp.seek(self.start_dir, 0) data = fp.read(size_cd) fp = io.BytesIO(data) total = 0 while total < size_cd: centdir = fp.read(sizeCentralDir) if len(centdir) != sizeCentralDir: raise BadZipFile("Direktori pusat terpotong") centdir = struct.unpack(structCentralDir, centdir) if centdir[_CD_SIGNATURE] != stringCentralDir: raise BadZipFile("Nomor ajaib buruk untuk direktori pusat") if self.debug > 2: print(centdir) filename = fp.read(centdir[_CD_FILENAME_LENGTH]) flags = centdir[5] if flags & 0x800: # Ekstensi nama file UTF-8 filename = filename.decode('utf-8') else: # Pengkodean nama file ZIP historis filename = filename.decode('cp437') # Buat instance ZipInfo untuk menyimpan informasi file x = ZipInfo(filename) x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH]) x.comment = fp.read(centdir[_CD_COMMENT_LENGTH]) x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET] (x.create_version, x.create_system, x.extract_version, x.reserved, x.flag_bits, x.compress_type, t, d, x.CRC, x.compress_size, x.file_size) = centdir[1:12] if x.extract_version > MAX_EXTRACT_VERSION: raise NotImplementedError("versi file zip %.1f" % (x.extract_version / 10)) x.volume, x.internal_attr, x.external_attr = centdir[15:18] # Ubah kode tanggal/waktu menjadi (tahun, bulan, hari, jam, menit, detik) x._raw_time = t x.date_time = ((d >> 9)+1980, (d >> 5) & 0xF, d & 0x1F, t >> 11, (t >> 5) & 0x3F, (t & 0x1F) * 2) x._decodeExtra() x.header_offset = x.header_offset + concat self.filelist.append(x) self.NameToInfo[x.filename] = x # perbarui total byte yang dibaca dari direktori pusat total = (total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH] + centdir[_CD_EXTRA_FIELD_LENGTH] + centdir[_CD_COMMENT_LENGTH]) if self.debug > 2: print("total", total) def namelist(self): """Kembalikan daftar nama file dalam arsip.""" return [data.filename for data in self.filelist] def infolist(self): """Kembalikan daftar instance kelas ZipInfo untuk file dalam arsip.""" return self.filelist def printdir(self, file=None): """Cetak daftar isi untuk file zip.""" print("%-46s %19s %12s" % ("Nama File", "Dimodifikasi ", "Ukuran"), file=file) for zinfo in self.filelist: date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time[:6] print("%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size), file=file) def testzip(self): """Baca semua file dan periksa CRC.""" chunk_size = 2 ** 20 for zinfo in self.filelist: try: # Baca dalam blok, untuk menghindari OverflowError atau # MemoryError dengan file tertanam yang sangat besar. with self.open(zinfo.filename, "r") as f: while f.read(chunk_size): # Periksa CRC-32 pass except BadZipFile: return zinfo.filename def getinfo(self, name): """Kembalikan instance ZipInfo yang diberikan 'name'.""" info = self.NameToInfo.get(name) if info is None: raise KeyError( 'Tidak ada item bernama %r dalam arsip' % name) return info def setpassword(self, pwd): """Setel kata sandi default untuk file terenkripsi.""" if pwd and not isinstance(pwd, bytes): raise TypeError("pwd: diharapkan bytes, dapatkan %s" % type(pwd).__name__) if pwd: self.pwd = pwd else: self.pwd = None @property def comment(self): """Teks komentar yang terkait dengan file ZIP.""" return self._comment @comment.setter def comment(self, comment): if not isinstance(comment, bytes): raise TypeError("comment: diharapkan bytes, dapatkan %s" % type(comment).__name__) # periksa panjang komentar yang valid if len(comment) > ZIP_MAX_COMMENT: import warnings warnings.warn('Komentar arsip terlalu panjang; memotong menjadi %d byte' % ZIP_MAX_COMMENT, stacklevel=2) comment = comment[:ZIP_MAX_COMMENT] self._comment = comment self._didModify = True def read(self, name, pwd=None): """Kembalikan byte file (sebagai string) untuk nama.""" with self.open(name, "r", pwd) as fp: return fp.read() def open(self, name, mode="r", pwd=None, *, force_zip64=False): """Kembalikan objek seperti file untuk 'name'. name adalah string untuk nama file dalam file ZIP, atau objek ZipInfo. mode harus 'r' untuk membaca file yang sudah ada dalam file ZIP, atau 'w' untuk menulis ke file yang baru ditambahkan ke arsip. pwd adalah kata sandi untuk mendekripsi file (hanya digunakan untuk membaca). Saat menulis, jika ukuran file tidak diketahui sebelumnya tetapi mungkin melebihi 2 GiB, berikan force_zip64 untuk menggunakan format ZIP64, yang dapat menangani file besar. Jika ukuran diketahui sebelumnya, sebaiknya berikan instance ZipInfo untuk name, dengan zinfo.file_size disetel. """ if mode not in {"r", "w"}: raise ValueError('open() memerlukan mode "r" atau "w"') if pwd and not isinstance(pwd, bytes): raise TypeError("pwd: diharapkan bytes, dapatkan %s" % type(pwd).__name__) if pwd and (mode == "w"): raise ValueError("pwd hanya didukung untuk membaca file") if not self.fp: raise ValueError( "Upaya menggunakan arsip ZIP yang sudah ditutup") # Pastikan kita memiliki objek info if isinstance(name, ZipInfo): # 'name' sudah merupakan objek info zinfo = name elif mode == 'w': zinfo = ZipInfo(name) zinfo.compress_type = self.compression else: # Dapatkan objek info untuk name zinfo = self.getinfo(name) if mode == 'w': return self._open_to_write(zinfo, force_zip64=force_zip64) if self._writing: raise ValueError("Tidak bisa membaca dari file ZIP saat ada " "handle penulisan terbuka padanya. " "Tutup handle penulisan sebelum mencoba membaca.") # Buka untuk membaca: self._fileRefCnt += 1 zef_file = _SharedFile(self.fp, zinfo.header_offset, self._fpclose, self._lock, lambda: self._writing) try: # Lewati header file: fheader = zef_file.read(sizeFileHeader) if len(fheader) != sizeFileHeader: raise BadZipFile("Header file terpotong") fheader = struct.unpack(structFileHeader, fheader) if fheader[_FH_SIGNATURE] != stringFileHeader: raise BadZipFile("Nomor ajaib buruk untuk header file") fname = zef_file.read(fheader[_FH_FILENAME_LENGTH]) if fheader[_FH_EXTRA_FIELD_LENGTH]: zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) if zinfo.flag_bits & 0x20: # Data tambalan terkompresi Zip 2.7 raise NotImplementedError( "data tambalan terkompresi (bit flag 5)") if zinfo.flag_bits & 0x40: # enkripsi kuat raise NotImplementedError("enkripsi kuat (bit flag 6)") if zinfo.flag_bits & 0x800: # Nama file UTF-8 fname_str = fname.decode("utf-8") else: fname_str = fname.decode("cp437") if fname_str != zinfo.orig_filename: raise BadZipFile( 'Nama file dalam direktori %r dan header %r berbeda.' % (zinfo.orig_filename, fname)) # periksa flag terenkripsi dan tangani kata sandi is_encrypted = zinfo.flag_bits & 0x1 zd = None if is_encrypted: if not pwd: pwd = self.pwd if not pwd: raise RuntimeError("File %r terenkripsi, kata sandi diperlukan untuk ekstraksi" % name) zd = _ZipDecrypter(pwd) # 12 byte pertama dalam aliran sandi adalah header enkripsi # digunakan untuk memperkuat algoritma. 11 byte pertama sepenuhnya acak, sedangkan byte ke-12 berisi MSB dari CRC, # atau MSB dari waktu file tergantung pada jenis header # dan digunakan untuk memeriksa kebenaran kata sandi. header = zef_file.read(12) h = zd(header[0:12]) if zinfo.flag_bits & 0x8: # bandingkan dengan tipe file dari header lokal yang diperluas check_byte = (zinfo._raw_time >> 8) & 0xff else: # bandingkan dengan CRC jika tidak check_byte = (zinfo.CRC >> 24) & 0xff if h[11] != check_byte: raise RuntimeError("Kata sandi buruk untuk file %r" % name) return ZipExtFile(zef_file, mode, zinfo, zd, True) except: zef_file.close() raise def _open_to_write(self, zinfo, force_zip64=False): if force_zip64 and not self._allowZip64: raise ValueError( "force_zip64 adalah Benar, tetapi allowZip64 adalah Salah saat membuka " "file ZIP." ) if self._writing: raise ValueError("Tidak bisa menulis ke file ZIP saat ada " "handle penulisan lain yang terbuka padanya. " "Tutup handle pertama sebelum membuka yang lain.") # Ukuran dan CRC ditimpa dengan data yang benar setelah memproses file if not hasattr(zinfo, 'file_size'): zinfo.file_size = 0 zinfo.compress_size = 0 zinfo.CRC = 0 zinfo.flag_bits = 0x00 if zinfo.compress_type == ZIP_LZMA: # Data terkompresi mencakup penanda akhir aliran (EOS) zinfo.flag_bits |= 0x02 if not self._seekable: zinfo.flag_bits |= 0x08 if not zinfo.external_attr: zinfo.external_attr = 0o600 << 16 # izin: ?rw------- # Ukuran terkompresi bisa lebih besar dari ukuran tidak terkompresi zip64 = self._allowZip64 and \ (force_zip64 or zinfo.file_size * 1.05 > ZIP64_LIMIT) if self._seekable: self.fp.seek(self.start_dir) zinfo.header_offset = self.fp.tell() self._writecheck(zinfo) self._didModify = True self.fp.write(zinfo.FileHeader(zip64)) self._writing = True return _ZipWriteFile(self, zinfo, zip64) def extract(self, member, path=None, pwd=None): """Ekstrak anggota dari arsip ke direktori kerja saat ini, menggunakan nama lengkapnya. Informasi filenya diekstrak seakurat mungkin. `member` bisa berupa nama file atau objek ZipInfo. Anda dapat tentukan direktori berbeda menggunakan `path`. """ if path is None: path = os.getcwd() else: path = os.fspath(path) return self._extract_member(member, path, pwd) def extractall(self, path=None, members=None, pwd=None): """Ekstrak semua anggota dari arsip ke direktori kerja saat ini. `path` menentukan direktori berbeda untuk ekstraksi. `members` bersifat opsional dan harus merupakan subset dari daftar yang dikembalikan oleh namelist(). """ if members is None: members = self.namelist() if path is None: path = os.getcwd() else: path = os.fspath(path) for zipinfo in members: self._extract_member(zipinfo, path, pwd) @classmethod def _sanitize_windows_name(cls, arcname, pathsep): """Ganti karakter buruk dan hapus titik di bagian akhir.""" table = cls._windows_illegal_name_trans_table if not table: illegal = ':<>|"?*' table = str.maketrans(illegal, '_' * len(illegal)) cls._windows_illegal_name_trans_table = table arcname = arcname.translate(table) # hapus titik di akhir arcname = (x.rstrip('.') for x in arcname.split(pathsep)) # gabungkan kembali, menghapus bagian kosong. arcname = pathsep.join(x for x in arcname if x) return arcname def _extract_member(self, member, targetpath, pwd): """Ekstrak objek ZipInfo 'member' ke file fisik pada jalur targetpath. """ if not isinstance(member, ZipInfo): member = self.getinfo(member) # bangun jalur tujuan, mengganti # garis miring ke pemisah jalur spesifik platform. arcname = member.filename.replace('/', os.path.sep) if os.path.altsep: arcname = arcname.replace(os.path.altsep, os.path.sep) # interpretasikan jalur absolut sebagai relatif, hapus huruf drive atau # jalur UNC, pemisah berlebih, "." dan ".." komponen. arcname = os.path.splitdrive(arcname)[1] invalid_path_parts = ('', os.path.curdir, os.path.pardir) arcname = os.path.sep.join(x for x in arcname.split(os.path.sep) if x not in invalid_path_parts) if os.path.sep == '\\': # filter karakter ilegal di Windows arcname = self._sanitize_windows_name(arcname, os.path.sep) targetpath = os.path.join(targetpath, arcname) targetpath = os.path.normpath(targetpath) # Buat semua direktori atas jika perlu. upperdirs = os.path.dirname(targetpath) if upperdirs and not os.path.exists(upperdirs): os.makedirs(upperdirs) if member.is_dir(): if not os.path.isdir(targetpath): os.mkdir(targetpath) return targetpath with self.open(member, pwd=pwd) as source, \ open(targetpath, "wb") as target: shutil.copyfileobj(source, target) return targetpath def _writecheck(self, zinfo): """Periksa kesalahan sebelum menulis file ke arsip.""" if zinfo.filename in self.NameToInfo: import warnings warnings.warn('Nama duplikat: %r' % zinfo.filename, stacklevel=3) if self.mode not in ('w', 'x', 'a'): raise ValueError("write() memerlukan mode 'w', 'x', atau 'a'") if not self.fp: raise ValueError( "Upaya menulis ke arsip ZIP yang sudah ditutup") _check_compression(zinfo.compress_type) if not self._allowZip64: requires_zip64 = None if len(self.filelist) >= ZIP_FILECOUNT_LIMIT: requires_zip64 = "Jumlah file" elif zinfo.file_size > ZIP64_LIMIT: requires_zip64 = "Ukuran file" elif zinfo.header_offset > ZIP64_LIMIT: requires_zip64 = "Ukuran file zip" if requires_zip64: raise LargeZipFile(requires_zip64 + " akan memerlukan ekstensi ZIP64") def write(self, filename, arcname=None, compress_type=None): """Masukkan byte dari filename ke dalam arsip dengan nama arcname.""" if not self.fp: raise ValueError( "Upaya menulis ke arsip ZIP yang sudah ditutup") if self._writing: raise ValueError( "Tidak bisa menulis ke arsip ZIP saat ada handle penulisan terbuka" ) zinfo = ZipInfo.from_file(filename, arcname) if zinfo.is_dir(): zinfo.compress_size = 0 zinfo.CRC = 0 else: if compress_type is not None: zinfo.compress_type = compress_type else: zinfo.compress_type = self.compression if zinfo.is_dir(): with self._lock: if self._seekable: self.fp.seek(self.start_dir) zinfo.header_offset = self.fp.tell() # Mulai byte header if zinfo.compress_type == ZIP_LZMA: # Data terkompresi mencakup penanda akhir aliran (EOS) zinfo.flag_bits |= 0x02 self._writecheck(zinfo) self._didModify = True self.filelist.append(zinfo) self.NameToInfo[zinfo.filename] = zinfo self.fp.write(zinfo.FileHeader(False)) self.start_dir = self.fp.tell() else: with open(filename, "rb") as src, self.open(zinfo, 'w') as dest: shutil.copyfileobj(src, dest, 1024*8) def writestr(self, zinfo_or_arcname, data, compress_type=None): """Tulis file ke dalam arsip. Kontennya adalah 'data', yang bisa berupa instance 'str' atau 'bytes'; jika itu adalah 'str', itu akan dienkripsi sebagai UTF-8 terlebih dahulu. 'zinfo_or_arcname' adalah instance ZipInfo atau nama file dalam arsip.""" if isinstance(data, str): data = data.encode("utf-8") if not isinstance(zinfo_or_arcname, ZipInfo): zinfo = ZipInfo(filename=zinfo_or_arcname, date_time=time.localtime(time.time())[:6]) zinfo.compress_type = self.compression if zinfo.filename[-1] == '/': zinfo.external_attr = 0o40775 << 16 # drwxrwxr-x zinfo.external_attr |= 0x10 # Bendera direktori MS-DOS else: zinfo.external_attr = 0o600 << 16 # ?rw------- else: zinfo = zinfo_or_arcname if not self.fp: raise ValueError( "Upaya menulis ke arsip ZIP yang sudah ditutup") if self._writing: raise ValueError( "Tidak bisa menulis ke arsip ZIP saat ada handle penulisan terbuka." ) if compress_type is not None: zinfo.compress_type = compress_type zinfo.file_size = len(data) # Ukuran tidak terkompresi with self._lock: with self.open(zinfo, mode='w') as dest: dest.write(data) def __del__(self): """Panggil metode "close()" jika pengguna lupa.""" self.close() def close(self): """Tutup file, dan untuk mode 'w', 'x' dan 'a' tulis rekaman akhir .""" if self.fp is None: return if self._writing: raise ValueError("Tidak bisa menutup file ZIP saat ada " "handle penulisan terbuka padanya. " "Tutup handle penulisan sebelum menutup zip.") try: if self.mode in ('w', 'x', 'a') and self._didModify: # tulis rekaman akhir with self._lock: if self._seekable: self.fp.seek(self.start_dir) self._write_end_record() finally: fp = self.fp self.fp = None self._fpclose(fp) def _write_end_record(self): for zinfo in self.filelist: # tulis direktori pusat dt = zinfo.date_time dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2] dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2) extra = [] if zinfo.file_size > ZIP64_LIMIT \ or zinfo.compress_size > ZIP64_LIMIT: extra.append(zinfo.file_size) extra.append(zinfo.compress_size) file_size = 0xffffffff compress_size = 0xffffffff else: file_size = zinfo.file_size compress_size = zinfo.compress_size if zinfo.header_offset > ZIP64_LIMIT: extra.append(zinfo.header_offset) header_offset = 0xffffffff else: header_offset = zinfo.header_offset extra_data = zinfo.extra min_version = 0 if extra: # Tambahkan bidang ZIP64 ke extra's extra_data = struct.pack( '<HHQQQ', 1, 28, zinfo.file_size, zinfo.compress_size, zinfo.header_offset) extra_data = extra_data + zinfo.extra if zinfo.file_size > ZIP64_LIMIT or zinfo.compress_size > ZIP64_LIMIT: file_size = 0xffffffff compress_size = 0xffffffff if zinfo.header_offset > ZIP64_LIMIT: header_offset = 0xffffffff centDirCount = len(self.filelist) centDirSize = self.fp.tell() - self.start_dir centDirOffset = self.start_dir requires_zip64 = None if centDirCount > ZIP_FILECOUNT_LIMIT: requires_zip64 = "Jumlah file" elif centDirOffset > ZIP64_LIMIT: requires_zip64 = "Offset direktori pusat" elif centDirSize > ZIP64_LIMIT: requires_zip64 = "Ukuran direktori pusat" if requires_zip64: # Perlu menulis rekaman akhir arsip ZIP64 if not self._allowZip64: raise LargeZipFile(requires_zip64 + " akan memerlukan ekstensi ZIP64") zip64endrec = struct.pack( structEndArchive64, stringEndArchive64, 44, 45, 45, 0, 0, centDirCount, centDirCount, centDirSize, centDirOffset) self.fp.write(zip64endrec) zip64locrec = struct.pack( structEndArchive64Locator, stringEndArchive64Locator, 0, pos2, 1) self.fp.write(zip64locrec) centDirCount = min(centDirCount, 0xFFFF) centDirSize = min(centDirSize, 0xFFFFFFFF) centDirOffset = min(centDirOffset, 0xFFFFFFFF) endrec = struct.pack(structEndArchive, stringEndArchive, 0, 0, centDirCount, centDirCount, centDirSize, centDirOffset, len(self._comment)) self.fp.write(endrec) self.fp.write(self._comment) self.fp.flush() def _fpclose(self, fp): assert self._fileRefCnt > 0 self._fileRefCnt -= 1 if not self._fileRefCnt and not self._filePassed: fp.close() class PyZipFile(ZipFile): """Kelas untuk membuat arsip ZIP dengan file dan paket pustaka Python.""" def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True, optimize=-1): ZipFile.__init__(self, file, mode=mode, compression=compression, allowZip64=allowZip64) self._optimize = optimize def writepy(self, pathname, basename="", filterfunc=None): """Tambahkan semua file dari "pathname" ke arsip ZIP. Jika pathname adalah direktori paket, telusuri direktori dan semua subdirektori paket secara rekursif untuk semua *.py dan masukkan modul ke dalam arsip. Jika pathname adalah direktori biasa, daftarkan *.py dan masukkan semua modul. Jika tidak, pathname harus berupa file Python *.py dan modul akan dimasukkan ke dalam arsip. Modul yang ditambahkan selalu module.pyc. Metode ini akan mengompilasi module.py menjadi module.pyc jika diperlukan. Jika filterfunc(pathname) diberikan, itu dipanggil dengan setiap argumen. Saat bernilai Salah, file atau direktori dilewati. """ pathname = os.fspath(pathname) if filterfunc and not filterfunc(pathname): if self.debug: label = 'jalur' if os.path.isdir(pathname) else 'file' print('%s %r dilewati oleh filterfunc' % (label, pathname)) return dir, name = os.path.split(pathname) if os.path.isdir(pathname): initname = os.path.join(pathname, "__init__.py") if os.path.isfile(initname): # Ini adalah direktori paket, tambahkan if basename: basename = "%s/%s" % (basename, name) else: basename = name if self.debug: print("Menambahkan paket di", pathname, "sebagai", basename) fname, arcname = self._get_codename(initname[0:-3], basename) if self.debug: print("Menambahkan", arcname) self.write(fname, arcname) dirlist = os.listdir(pathname) dirlist.remove("__init__.py") # Tambahkan semua file *.py dan subdirektori paket for filename in dirlist: path = os.path.join(pathname, filename) root, ext = os.path.splitext(filename) if os.path.isdir(path): if os.path.isfile(os.path.join(path, "__init__.py")): # Ini adalah direktori paket, tambahkan self.writepy(path, basename, filterfunc=filterfunc) # Panggilan rekursif elif ext == ".py": if filterfunc and not filterfunc(path): if self.debug: print('file %r dilewati oleh filterfunc' % path) continue fname, arcname = self._get_codename(path[0:-3], basename) if self.debug: print("Menambahkan", arcname) self.write(fname, arcname) else: # Ini BUKAN direktori paket, tambahkan file-file di level atas if self.debug: print("Menambahkan file dari direktori", pathname) for filename in os.listdir(pathname): path = os.path.join(pathname, filename) root, ext = os.path.splitext(filename) if ext == ".py": if filterfunc and not filterfunc(path): if self.debug: print('file %r dilewati oleh filterfunc' % path) continue fname, arcname = self._get_codename(path[0:-3], basename) if self.debug: print("Menambahkan", arcname) self.write(fname, arcname) else: if pathname[-3:] != ".py": raise RuntimeError( 'File yang ditambahkan dengan writepy() harus diakhiri dengan ".py"') fname, arcname = self._get_codename(pathname[0:-3], basename) if self.debug: print("Menambahkan file", arcname) self.write(fname, arcname) def _get_codename(self, pathname, basename): """Kembalikan (filename, archivename) untuk jalur. Diberikan nama modul path, kembalikan jalur file yang benar dan nama arsip, kompilasi jika perlu. Sebagai contoh, diberikan /python/lib/string, kembalikan (/python/lib/string.pyc, string). """ def _compile(file, optimize=-1): import py_compile if self.debug: print("Mengompilasi", file) try: py_compile.compile(file, doraise=True, optimize=optimize) except py_compile.PyCompileError as err: print(err.msg) return False return True file_py = pathname + ".py" file_pyc = pathname + ".pyc" pycache_opt0 = importlib.util.cache_from_source( file_py, optimization='') pycache_opt1 = importlib.util.cache_from_source( file_py, optimization=1) pycache_opt2 = importlib.util.cache_from_source( file_py, optimization=2) if self._optimize == -1: # mode warisan: gunakan file apa pun yang ada if (os.path.isfile(file_pyc) and os.stat(file_pyc).st_mtime >= os.stat(file_py).st_mtime): # Gunakan file .pyc. arcname = fname = file_pyc elif (os.path.isfile(pycache_opt0) and os.stat(pycache_opt0).st_mtime >= os.stat(file_py).st_mtime): # Gunakan file __pycache__/*.pyc, tetapi tulis ke nama file pyc warisan # dalam arsip. fname = pycache_opt0 arcname = file_pyc elif (os.path.isfile(pycache_opt1) and os.stat(pycache_opt1).st_mtime >= os.stat(file_py).st_mtime): # Gunakan file __pycache__/*.pyc, tetapi tulis ke nama file pyc warisan # dalam arsip. fname = pycache_opt1 arcname = file_pyc elif (os.path.isfile(pycache_opt2) and os.stat(pycache_opt2).st_mtime >= os.stat(file_py).st_mtime): # Gunakan file __pycache__/*.pyc, tetapi tulis ke nama file pyc warisan # dalam arsip. fname = pycache_opt2 arcname = file_pyc else: # Kompilasi py menjadi file PEP 3147 pyc. if _compile(file_py): if sys.flags.optimize == 0: fname = pycache_opt0 elif sys.flags.optimize == 1: fname = pycache_opt1 else: fname = pycache_opt2 arcname = file_pyc else: fname = arcname = file_py else: # mode baru: gunakan tingkat optimasi yang diberikan if self._optimize == 0: fname = pycache_opt0 arcname = file_pyc else: arcname = file_pyc if self._optimize == 1: fname = pycache_opt1 elif self._optimize == 2: fname = pycache_opt2 else: msg = "nilai tidak valid untuk 'optimize': {!r}".format( self._optimize) raise ValueError(msg) if not (os.path.isfile(fname) and os.stat(fname).st_mtime >= os.stat(file_py).st_mtime): if not _compile(file_py, optimize=self._optimize): fname = arcname = file_py archivename = os.path.split(arcname)[1] if basename: archivename = "%s/%s" % (basename, archivename) return (fname, archivename) def main(args=None): import argparse description = 'Antarmuka baris perintah sederhana untuk modul zipfile.' parser = argparse.ArgumentParser(description=description) group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-l', '--list', metavar='', help='Tampilkan daftar zipfile') group.add_argument('-e', '--extract', nargs=2, metavar=('', ''), help='Ekstrak zipfile ke direktori target') group.add_argument('-c', '--create', nargs='+', metavar=('', ''), help='Buat zipfile dari sumber') group.add_argument('-t', '--test', metavar='', help='Tes apakah zipfile valid') args = parser.parse_args(args) if args.test is not None: src = args.test with ZipFile(src, 'r') as zf: badfile = zf.testzip() if badfile: print( "File terlampir berikut rusak: {!r}".format(badfile)) print("Pengujian selesai") elif args.list is not None: src = args.list with ZipFile(src, 'r') as zf: zf.printdir() elif args.extract is not None: src, curdir = args.extract with ZipFile(src, 'r') as zf: zf.extractall(curdir) elif args.create is not None: zip_name = args.create.pop(0) files = args.create def addToZip(zf, path, zippath): if os.path.isfile(path): zf.write(path, zippath, ZIP_DEFLATED) elif os.path.isdir(path): if zippath: zf.write(path, zippath) for nm in os.listdir(path): addToZip(zf, os.path.join(path, nm), os.path.join(zippath, nm)) # jika tidak: abaikan with ZipFile(zip_name, 'w') as zf: for path in files: zippath = os.path.basename(path) if not zippath: zippath = os.path.basename(os.path.dirname(path)) if zippath in ('', os.curdir, os.pardir): zippath = '' addToZip(zf, path, zippath) if __name__ == "__main__": main()
Langkah 2: Buat pemicu OSS
Pada halaman Detail Fungsi, pilih tab Configurations, lalu pilih tab Triggers dari panel navigasi sisi kiri.
Pada halaman Pemicu, klik Create Trigger. Di panel Buat Pemicu, pilih tipe pemicu OSS, konfigurasikan pengaturan sesuai kebutuhan, lalu klik OK.
Berikut adalah konfigurasi beberapa item kunci. Untuk rincian item lainnya, lihat Mengonfigurasi Pemicu OSS Native.
Bucket Name: Pilih nama bucket yang telah dibuat.
Object Prefix: Dalam contoh ini, gunakan
src.Object Suffix: Dalam contoh ini, gunakan
zip.Trigger Event: Dalam contoh ini, gunakan
oss:ObjectCreated:PutObject, oss:ObjectCreated:PostObject, oss:ObjectCreated:CompleteMultipartUpload, oss:ObjectCreated:PutSymlink.Role Name: Pilih peran dan pastikan memiliki izin yang diperlukan untuk memanggil fungsi. Lampirkan kebijakan AliyunFCFullAccess untuk memberikan akses penuh.
Setelah pemicu dibuat, Anda dapat melihatnya di halaman Pemicu.
Langkah 3: Uji dan verifikasi
Anda dapat menguji konfigurasi Anda dengan dua cara:
Metode 1: Unggah file secara manual melalui konsol OSS
Masuk ke Konsol Layanan Penyimpanan Objek. Unggah file ZIP seperti code.zip ke direktori src bucket yang dipilih di Langkah 2: Buat Pemicu OSS. Setelah unggahan selesai, fungsi akan secara otomatis dipicu untuk mendekompresi file ZIP ke direktori root bucket.
Metode 2: Konfigurasikan parameter acara pemicu
Perhatikan bahwa dengan metode ini, Anda perlu memicu fungsi secara manual dengan mengklik Test Function setelah konfigurasi. Kemudian, masuk ke Konsol Object Storage Service untuk memeriksa daftar file di bucket target guna memverifikasi keberhasilan dekompresi file ZIP. Prosedurnya adalah sebagai berikut.
Pada halaman Detail Fungsi, klik tab Code, lalu klik ikon
di sebelah Test Function. Dari daftar drop-down, pilih Configure Test Parameters.<Di panel Configure Test Parameters, pilih Event Template, masukkan Event Name dan konten acara Anda, lalu klik OK.
Contoh acara adalah sebagai berikut.
{ "events": [ { "eventName": "ObjectCreated:PutObject", "eventSource": "acs:oss", "eventTime": "2023-08-13T06:45:43.000Z", "eventVersion": "1.0", "oss": { "bucket": { "arn": "acs:oss:cn-hangzhou:10343546824****:bucket****", "name": "bucket****", "ownerIdentity": "10343546824****" }, "object": { "deltaSize": 122539, "eTag": "688A7BF4F233DC9C88A80BF985AB****", "key": "src/test.zip", "size": 122539 }, "ossSchemaVersion": "1.0", "ruleId": "9adac8e253828f4f7c0466d941fa3db81161****" }, "region": "cn-hangzhou", "requestParameters": { "sourceIPAddress": "140.205.XX.XX" }, "responseElements": { "requestId": "58F9FF2D3DF792092E12044C" }, "userIdentity": { "principalId": "10343546824****" } } ] }Untuk detail tentang parameter objek acara, lihat Langkah 2: (Opsional) Konfigurasikan Parameter Input.
PentingKonten acara di atas diberikan sebagai contoh. Sesuaikan parameter sesuai kebutuhan. Pastikan file yang ditentukan (file
src/test.zipdalam contoh ini) ada di bucket yang ditentukan; jika tidak, fungsi tidak akan dipicu, atau eksekusinya akan gagal.Parameter yang harus dimodifikasi sesuai dengan lingkungan aktual Anda adalah sebagai berikut:
bucket.arn: Gunakan formatacs:oss:<region>:<your_account_id>:<your_bucket>untuk parameter ini. Ganti<region>dengan wilayah yang dipilih saat membuat fungsi,<your_account_id>dengan ID akun Alibaba Cloud Anda, dan<your_bucket>dengan nama sebenarnya dari bucket yang telah dibuat di wilayah tersebut. Anda dapat menemukan ID akun Alibaba Cloud Anda di bagian Referensi pada halaman Overview dari Konsol Function Compute.bucket.name: Ganti dengan nama sebenarnya dari bucket yang telah dibuat di wilayah yang sama dengan fungsi Anda.bucket.ownerIdentity: Ganti dengan ID akun Alibaba Cloud Anda.object.key: Ganti dengan nama file yang diunggah ke bucket tersebut.region: Ganti dengan wilayah yang dipilih saat membuat fungsi.userIdentity.principalId: Ganti dengan ID akun Alibaba Cloud Anda.
Klik Test Function di tab Code untuk menjalankan tes.
Setelah fungsi berhasil dieksekusi, masuk ke Konsol Layanan Penyimpanan Objek dan periksa apakah file ZIP target (file
src/test.zipdalam contoh ini) di bucket yang ditentukan telah didekompresi. Untuk langkah-langkah rinci, lihat Gunakan Konsol OSS untuk Menanyakan Objek.
Setelah pengujian, jika Anda tidak memerlukan aplikasi ini untuk saat ini, pastikan untuk menghapus aplikasi dan sumber daya terkait apa pun untuk menghindari biaya yang tidak perlu.
Referensi
Untuk mempelajari lebih lanjut tentang apa yang dapat Anda lakukan dengan pemicu OSS, lihat Ikhtisar Pemicu OSS.
Untuk menggunakan Function Compute untuk mengemas dan mengunduh file dari OSS, lihat Gunakan Function Compute untuk Mengunduh Beberapa Objek sebagai Paket.
Untuk informasi lebih lanjut tentang operasi OSS yang disebutkan dalam dokumen ini, lihat Bucket dan Objek.