Anda dapat menggunakan Function Compute untuk secara otomatis mendekompresi file terkompresi yang Anda unggah ke OSS. Ketika file ZIP yang sesuai dengan aturan dekompresi tertentu diunggah ke Object Storage Service, Function Compute akan secara otomatis dipicu untuk mendekompresi file tersebut. Setelah proses dekompresi selesai, Function Compute mengunggah file hasil dekompresi ke direktori yang ditentukan di Object Storage Service.
Penggunaan
Gunakan pengkodean UTF-8 atau GB 2312 untuk nama file dan folder guna mencegah kerusakan karakter atau kegagalan dekompresi.
File jenis Archive atau Cold Archive harus dipulihkan terlebih dahulu sebelum didekompresi.
Waktu maksimum dekompresi untuk satu paket ZIP adalah 2 jam. Tugas yang melebihi batas ini akan gagal.
Untuk mencegah kegagalan dekompresi, pastikan setiap file dalam paket ZIP tidak melebihi 1 GB.
Atur timeout fungsi lebih dari 2 jam, hingga maksimal 24 jam.
Buat fungsi di wilayah yang sama dengan bucket Object Storage Service.
Prasyarat
Prosedur
Langkah 1: Buat fungsi
Masuk ke Konsol Function Compute, lalu pada panel navigasi kiri, pilih Function Management > Functions.
Pada bilah navigasi atas, pilih wilayah. Di halaman Functions, klik Create Function.
Di halaman Create Function, pilih metode pembuatan, konfigurasikan pengaturan berikut, lalu klik Create.
Bagian ini menjelaskan pengaturan utama. Untuk pengaturan lainnya, lihat Buat fungsi.
Runtime: Pilih Python 3.10.
Function Role: Pilih peran yang sudah ada atau buat peran baru. Pastikan peran tersebut memiliki izin untuk mengakses bucket OSS dengan melampirkan kebijakan AliyunOSSFullAccess.
Di halaman Detail Fungsi, pilih tab Code, buat file, masukkan kode Anda, lalu klik Deploy.
Berikut contoh kodenya.
index.pyfileKode contoh
# -*- coding: utf-8 -*- """ CATATAN: Fungsi ini menangani pengkodean karakter untuk nama file dan folder dalam arsip ZIP. Pengkodean bervariasi tergantung sistem operasi yang digunakan untuk membuat arsip. - Untuk arsip yang dibuat di macOS atau Linux, nama file biasanya dikodekan dalam UTF-8. - Untuk arsip yang dibuat di Windows, nama file sering dikodekan dalam GB2312 tetapi juga bisa UTF-8. Untuk menangani variasi ini, fungsi ini menggunakan pustaka 'chardet' untuk mendeteksi pengkodean secara otomatis. Namun, deteksi tidak dijamin akurat 100%. Jika Anda mengalami masalah dengan nama file yang rusak setelah dekompresi, Anda mungkin perlu memodifikasi logika pengkodean dalam fungsi ini dan menerapkannya ulang. """ import helper import oss2 import json import os import time import logging import chardet """ Fungsi ini dijalankan ketika file .zip diunggah ke awalan sumber yang ditentukan dalam bucket OSS. Fungsi ini mendekompresi file tersebut dan mengunggah isinya ke awalan tujuan. Misalnya, file yang diunggah sebagai `source/archive.zip` akan didekompresi ke `processed/archive/`. Awalan sumber dan tujuan dapat dikonfigurasi menggunakan variabel lingkungan. """ # Menekan log info yang dicetak oleh SDK OSS. logging.getLogger("oss2.api").setLevel(logging.ERROR) logging.getLogger("oss2.auth").setLevel(logging.ERROR) LOGGER = logging.getLogger() # Mencetak waktu eksekusi fungsi. def print_excute_time(func): def wrapper(*args, **kwargs): local_time = time.time() ret = func(*args, **kwargs) LOGGER.info('current Function [%s] excute time is %.2f' % (func.__name__, time.time() - local_time)) return ret return wrapper def get_zipfile_name(origin_name): # Menangani kemungkinan teks rusak pada nama file, terutama untuk pengkodean non-UTF-8. name = origin_name try: name_bytes = origin_name.encode(encoding="cp437") except: name_bytes = origin_name.encode(encoding="utf-8") # Deteksi lebih akurat untuk string yang lebih panjang. detect = chardet.detect(name_bytes) confidence = detect["confidence"] detect_encoding = detect["encoding"] if confidence > 0.75 and (detect_encoding.lower() in ["gb2312", "gbk", "gb18030", "ascii", "utf-8"]): try: if detect_encoding.lower() in ["gb2312", "gbk", "gb18030"]: detect_encoding = "gb18030" name = name_bytes.decode(detect_encoding) except: name = name_bytes.decode(encoding="gb18030") else: try: name = name_bytes.decode(encoding="gb18030") except: name = name_bytes.decode(encoding="utf-8") # Mengganti backslash gaya Windows dengan garis miring maju untuk konsistensi path. name = name.replace("\\", "/") return name @print_excute_time def handler(event, context): """ Fungsi penanganan utama yang dieksekusi oleh Function Compute. Fungsi ini secara otomatis mendekompresi objek .zip dari bucket OSS. :param event: String JSON yang merepresentasikan event OSS, termasuk kunci objek dan metadata lainnya. :param context: Konteks fungsi, yang menyediakan informasi kredensial dan runtime. """ evt_lst = json.loads(event) creds = context.credentials auth = oss2.StsAuth( creds.access_key_id, creds.access_key_secret, creds.security_token) evt = evt_lst['events'][0] bucket_name = evt['oss']['bucket']['name'] endpoint = 'oss-' + evt['region'] + '-internal.aliyuncs.com' bucket = oss2.Bucket(auth, endpoint, bucket_name) object_name = evt['oss']['object']['key'] if "ObjectCreated:PutSymlink" == evt['eventName']: object_name = bucket.get_symlink(object_name).target_key if not object_name: raise RuntimeError('{} is an invalid symlink file'.format( evt['oss']['object']['key'])) file_type = os.path.splitext(object_name)[1] if file_type != ".zip": raise RuntimeError('{} is not a .zip file'.format(object_name)) LOGGER.info("Starting to decompress zip file: {}".format(object_name)) lst = object_name.split("/") zip_name = lst[-1] PROCESSED_DIR = os.environ.get("PROCESSED_DIR", "") RETAIN_FILE_NAME = os.environ.get("RETAIN_FILE_NAME", "") if PROCESSED_DIR and PROCESSED_DIR[-1] != "/": PROCESSED_DIR += "/" if RETAIN_FILE_NAME == "false": newKey = PROCESSED_DIR else: newKey = PROCESSED_DIR + zip_name zip_fp = helper.OssStreamFileLikeObject(bucket, object_name) newKey = newKey.replace(".zip", "/") with helper.zipfile_support_oss.ZipFile(zip_fp) as zip_file: for name in zip_file.namelist(): with zip_file.open(name) as file_obj: name = get_zipfile_name(name) bucket.put_object(newKey + name, file_obj)helper.pyfileKode contoh
# -*- coding: utf-8 -*- import oss2 from oss2 import utils, models import ossZipfile as zipfile zipfile_support_oss = zipfile # Mendukung pengunggahan ke OSS sebagai objek mirip file. def make_crc_adapter(data, init_crc=0): data = utils.to_bytes(data) # objek mirip file if hasattr(data, 'read'): return utils._FileLikeAdapter(data, crc_callback=utils.Crc64(init_crc)) utils.make_crc_adapter = make_crc_adapter class OssStreamFileLikeObject(object): def __init__(self, bucket, key): super(OssStreamFileLikeObject, self).__init__() self._bucket = bucket self._key = key self._meta_data = self._bucket.get_object_meta(self._key) @property def bucket(self): return self._bucket @property def key(self): return self._key @property def filesize(self): return self._meta_data.content_length def get_reader(self, begin, end): begin = begin if begin >= 0 else 0 end = end if end > 0 else self.filesize - 1 end = end if end < self.filesize else self.filesize - 1 begin = begin if begin < end else end return self._bucket.get_object(self._key, byte_range=(begin, end)) def get_content_bytes(self, begin, end): reader = self.get_reader(begin, end) return reader.read() def get_last_content_bytes(self, offset): return self.get_content_bytes(self.filesize-offset, self.filesize-1)ossZipfile.pyKode contoh
""" Baca dan tulis file ZIP. Referensi XXX ke UTF-8 perlu investigasi lebih lanjut. """ import io import os import importlib.util import sys import time import stat import shutil import struct import binascii import threading try: import zlib # Kami mungkin memerlukan metode kompresinya crc32 = zlib.crc32 except ImportError: zlib = None crc32 = binascii.crc32 try: import bz2 # Kami mungkin memerlukan metode kompresinya except ImportError: bz2 = None try: import lzma # Kami mungkin memerlukan metode kompresinya except ImportError: lzma = None __all__ = ["BadZipFile", "BadZipfile", "error", "ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2", "ZIP_LZMA", "is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile"] class BadZipFile(Exception): pass class LargeZipFile(Exception): """ Dilempar saat menulis file ZIP yang memerlukan ekstensi ZIP64, tetapi ekstensi tersebut dinonaktifkan. """ error = BadZipfile = BadZipFile # Nama kompatibilitas pra-3.2 ZIP64_LIMIT = (1 << 31) - 1 ZIP_FILECOUNT_LIMIT = (1 << 16) - 1 ZIP_MAX_COMMENT = (1 << 16) - 1 # Konstanta untuk metode kompresi file ZIP ZIP_STORED = 0 ZIP_DEFLATED = 8 ZIP_BZIP2 = 12 ZIP_LZMA = 14 # Metode kompresi ZIP lainnya tidak didukung DEFAULT_VERSION = 20 ZIP64_VERSION = 45 BZIP2_VERSION = 46 LZMA_VERSION = 63 # Kami mengenali (tetapi belum tentu mendukung) semua fitur hingga versi tersebut MAX_EXTRACT_VERSION = 63 # Di bawah ini adalah beberapa format dan data terkait untuk membaca/menulis header menggunakan # modul struct. Nama dan struktur header/catatan adalah yang digunakan # dalam deskripsi format file ZIP PKWARE: # http://www.pkware.com/documents/casestudies/APPNOTE.TXT # (URL valid per Januari 2008) # Struktur "akhir direktori pusat", nomor ajaib, ukuran, dan indeks # (bagian V.I dalam dokumen format) structEndArchive = b"<4s4H2LH" stringEndArchive = b"PK\005\006" sizeEndCentDir = struct.calcsize(structEndArchive) _ECD_SIGNATURE = 0 _ECD_DISK_NUMBER = 1 _ECD_DISK_START = 2 _ECD_ENTRIES_THIS_DISK = 3 _ECD_ENTRIES_TOTAL = 4 _ECD_SIZE = 5 _ECD_OFFSET = 6 _ECD_COMMENT_SIZE = 7 # Dua indeks terakhir ini bukan bagian dari struktur seperti yang didefinisikan dalam # spesifikasi, tetapi digunakan secara internal oleh modul ini untuk kenyamanan _ECD_COMMENT = 8 _ECD_LOCATION = 9 # Struktur "direktori pusat", nomor ajaib, ukuran, dan indeks # entri dalam struktur (bagian V.F dalam dokumen format) structCentralDir = "<4s4B4HL2L5H2L" stringCentralDir = b"PK\001\002" sizeCentralDir = struct.calcsize(structCentralDir) # Indeks entri dalam struktur direktori pusat _CD_SIGNATURE = 0 _CD_CREATE_VERSION = 1 _CD_CREATE_SYSTEM = 2 _CD_EXTRACT_VERSION = 3 _CD_EXTRACT_SYSTEM = 4 _CD_FLAG_BITS = 5 _CD_COMPRESS_TYPE = 6 _CD_TIME = 7 _CD_DATE = 8 _CD_CRC = 9 _CD_COMPRESSED_SIZE = 10 _CD_UNCOMPRESSED_SIZE = 11 _CD_FILENAME_LENGTH = 12 _CD_EXTRA_FIELD_LENGTH = 13 _CD_COMMENT_LENGTH = 14 _CD_DISK_NUMBER_START = 15 _CD_INTERNAL_FILE_ATTRIBUTES = 16 _CD_EXTERNAL_FILE_ATTRIBUTES = 17 _CD_LOCAL_HEADER_OFFSET = 18 # Struktur "header file lokal", nomor ajaib, ukuran, dan indeks # (bagian V.A dalam dokumen format) structFileHeader = "<4s2B4HL2L2H" stringFileHeader = b"PK\003\004" sizeFileHeader = struct.calcsize(structFileHeader) _FH_SIGNATURE = 0 _FH_EXTRACT_VERSION = 1 _FH_EXTRACT_SYSTEM = 2 _FH_GENERAL_PURPOSE_FLAG_BITS = 3 _FH_COMPRESSION_METHOD = 4 _FH_LAST_MOD_TIME = 5 _FH_LAST_MOD_DATE = 6 _FH_CRC = 7 _FH_COMPRESSED_SIZE = 8 _FH_UNCOMPRESSED_SIZE = 9 _FH_FILENAME_LENGTH = 10 _FH_EXTRA_FIELD_LENGTH = 11 # Struktur "lokator akhir direktori pusat Zip64", nomor ajaib, dan ukuran structEndArchive64Locator = "<4sLQL" stringEndArchive64Locator = b"PK\x06\x07" sizeEndCentDir64Locator = struct.calcsize(structEndArchive64Locator) # Catatan "akhir direktori pusat Zip64", nomor ajaib, ukuran, dan indeks # (bagian V.G dalam dokumen format) structEndArchive64 = "<4sQ2H2L4Q" stringEndArchive64 = b"PK\x06\x06" sizeEndCentDir64 = struct.calcsize(structEndArchive64) _CD64_SIGNATURE = 0 _CD64_DIRECTORY_RECSIZE = 1 _CD64_CREATE_VERSION = 2 _CD64_EXTRACT_VERSION = 3 _CD64_DISK_NUMBER = 4 _CD64_DISK_NUMBER_START = 5 _CD64_NUMBER_ENTRIES_THIS_DISK = 6 _CD64_NUMBER_ENTRIES_TOTAL = 7 _CD64_DIRECTORY_SIZE = 8 _CD64_OFFSET_START_CENTDIR = 9 def _check_zipfile(fp): try: if _EndRecData(fp): return True # file memiliki nomor ajaib yang benar except OSError: pass return False def is_zipfile(filename): """Cepat menentukan apakah suatu file adalah file ZIP dengan memeriksa nomor ajaib. Argumen filename bisa berupa file atau objek mirip file. """ result = False try: if hasattr(filename, "read"): result = _check_zipfile(fp=filename) else: with open(filename, "rb") as fp: result = _check_zipfile(fp) except OSError: pass return result def _EndRecData64(fpin, offset, endrec): """ Baca catatan akhir arsip ZIP64 dan gunakan untuk memperbarui endrec. """ if hasattr(fpin, 'bucket'): data = fpin.get_content_bytes( fpin.filesize+offset-sizeEndCentDir64Locator, fpin.filesize+offset-1) else: try: fpin.seek(offset - sizeEndCentDir64Locator, 2) except OSError: # Jika pencarian gagal, file tidak cukup besar untuk berisi catatan # akhir arsip ZIP64, jadi cukup kembalikan catatan akhir yang diberikan. return endrec data = fpin.read(sizeEndCentDir64Locator) if len(data) != sizeEndCentDir64Locator: return endrec sig, diskno, reloff, disks = struct.unpack(structEndArchive64Locator, data) if sig != stringEndArchive64Locator: return endrec if diskno != 0 or disks != 1: raise BadZipFile("File ZIP yang mencakup beberapa disk tidak didukung") # Asumsikan tidak ada 'data ekstensi zip64' if hasattr(fpin, 'bucket'): data = fpin.get_content_bytes( fpin.filesize + offset - sizeEndCentDir64Locator - sizeEndCentDir64, fpin.filesize+offset-sizeEndCentDir64Locator-1) else: fpin.seek(offset - sizeEndCentDir64Locator - sizeEndCentDir64, 2) data = fpin.read(sizeEndCentDir64) if len(data) != sizeEndCentDir64: return endrec sig, sz, create_version, read_version, disk_num, disk_dir, \ dircount, dircount2, dirsize, diroffset = \ struct.unpack(structEndArchive64, data) if sig != stringEndArchive64: return endrec # Perbarui endrec asli menggunakan data dari catatan ZIP64 endrec[_ECD_SIGNATURE] = sig endrec[_ECD_DISK_NUMBER] = disk_num endrec[_ECD_DISK_START] = disk_dir endrec[_ECD_ENTRIES_THIS_DISK] = dircount endrec[_ECD_ENTRIES_TOTAL] = dircount2 endrec[_ECD_SIZE] = dirsize endrec[_ECD_OFFSET] = diroffset return endrec def _EndRecData(fpin): """Mengembalikan data dari catatan "Akhir Direktori Pusat", atau None. Datanya adalah daftar sembilan item dalam catatan "Akhir Direktori Pusat" ZIP diikuti oleh item kesepuluh, yaitu offset pencarian file dari catatan ini.""" # Tentukan ukuran file if hasattr(fpin, 'bucket'): filesize = fpin.filesize data = fpin.get_last_content_bytes(sizeEndCentDir) else: fpin.seek(0, 2) filesize = fpin.tell() # Periksa apakah ini file ZIP tanpa komentar arsip (struktur # "akhir direktori pusat" harus menjadi item terakhir dalam # file jika demikian). try: fpin.seek(-sizeEndCentDir, 2) except OSError: return None data = fpin.read() if (len(data) == sizeEndCentDir and data[0:4] == stringEndArchive and data[-2:] == b"\000\000"): # Tanda tangan benar dan tidak ada komentar; uraikan strukturnya endrec = struct.unpack(structEndArchive, data) endrec = list(endrec) # Tambahkan komentar kosong dan offset awal catatan endrec.append(b"") endrec.append(filesize - sizeEndCentDir) # Coba baca struktur "akhir direktori pusat Zip64" return _EndRecData64(fpin, -sizeEndCentDir, endrec) # Ini bukan file ZIP, atau file ZIP dengan komentar arsip. # Cari akhir file untuk tanda tangan catatan "akhir direktori pusat". # Komentar adalah item terakhir dalam file ZIP dan bisa mencapai 64K. # Diasumsikan bahwa nomor ajaib "akhir direktori pusat" tidak muncul dalam komentar. maxCommentStart = max(filesize - (1 << 16) - sizeEndCentDir, 0) if hasattr(fpin, 'bucket'): data = fpin.get_content_bytes(maxCommentStart, -1) else: fpin.seek(maxCommentStart, 0) data = fpin.read() start = data.rfind(stringEndArchive) if start >= 0: # Ditemukan nomor ajaib; coba uraikan dan interpretasikan recData = data[start:start+sizeEndCentDir] if len(recData) != sizeEndCentDir: # File ZIP rusak. return None endrec = list(struct.unpack(structEndArchive, recData)) commentSize = endrec[_ECD_COMMENT_SIZE] # seperti yang diklaim oleh file ZIP comment = data[start+sizeEndCentDir:start+sizeEndCentDir+commentSize] endrec.append(comment) endrec.append(maxCommentStart + start) # Coba baca struktur "akhir direktori pusat Zip64" return _EndRecData64(fpin, maxCommentStart + start - filesize, endrec) # Tidak dapat menemukan struktur akhir direktori pusat yang valid return None class ZipInfo (object): """Kelas dengan atribut yang menggambarkan setiap file dalam arsip ZIP.""" __slots__ = ( 'orig_filename', 'filename', 'date_time', 'compress_type', 'comment', 'extra', 'create_system', 'create_version', 'extract_version', 'reserved', 'flag_bits', 'volume', 'internal_attr', 'external_attr', 'header_offset', 'CRC', 'compress_size', 'file_size', '_raw_time', ) def __init__(self, filename="NoName", date_time=(1980, 1, 1, 0, 0, 0)): self.orig_filename = filename # Nama file asli dalam arsip # Hentikan nama file pada byte null pertama. Byte null dalam nama file # digunakan sebagai trik oleh virus dalam arsip. null_byte = filename.find(chr(0)) if null_byte >= 0: filename = filename[0:null_byte] # Ini digunakan untuk memastikan path dalam file ZIP yang dihasilkan selalu menggunakan # garis miring maju sebagai pemisah direktori, seperti yang disyaratkan oleh # spesifikasi format ZIP. if os.sep != "/" and os.sep in filename: filename = filename.replace(os.sep, "/") self.filename = filename # Nama file ternormalisasi self.date_time = date_time # tahun, bulan, hari, jam, menit, detik if date_time[0] < 1980: raise ValueError('ZIP tidak mendukung stempel waktu sebelum 1980') # Nilai standar: self.compress_type = ZIP_STORED # Jenis kompresi untuk file self.comment = b"" # Komentar untuk setiap file self.extra = b"" # Data tambahan ZIP if sys.platform == 'win32': self.create_system = 0 # Sistem yang membuat arsip ZIP else: # Asumsikan semuanya adalah unix-y self.create_system = 3 # Sistem yang membuat arsip ZIP self.create_version = DEFAULT_VERSION # Versi yang membuat arsip ZIP self.extract_version = DEFAULT_VERSION # Versi yang dibutuhkan untuk mengekstrak arsip self.reserved = 0 # Harus nol self.flag_bits = 0 # Bit flag ZIP self.volume = 0 # Nomor volume header file self.internal_attr = 0 # Atribut internal self.external_attr = 0 # Atribut file eksternal # Atribut lainnya diatur oleh kelas ZipFile: # header_offset Offset byte ke header file # CRC CRC-32 dari file yang tidak terkompresi # compress_size Ukuran file terkompresi # file_size Ukuran file yang tidak terkompresi def __repr__(self): result = ['<%s filename=%r' % (self.__class__.__name__, self.filename)] if self.compress_type != ZIP_STORED: result.append(' compress_type=%s' % compressor_names.get(self.compress_type, self.compress_type)) hi = self.external_attr >> 16 lo = self.external_attr & 0xFFFF if hi: result.append(' filemode=%r' % stat.filemode(hi)) if lo: result.append(' external_attr=%#x' % lo) isdir = self.is_dir() if not isdir or self.file_size: result.append(' file_size=%r' % self.file_size) if ((not isdir or self.compress_size) and (self.compress_type != ZIP_STORED or self.file_size != self.compress_size)): result.append(' compress_size=%r' % self.compress_size) result.append('>') return ''.join(result) def FileHeader(self, zip64=None): """Mengembalikan header per-file sebagai string.""" dt = self.date_time dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2] dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2) if self.flag_bits & 0x08: # Atur ini ke nol karena kami menulisnya setelah data file CRC = compress_size = file_size = 0 else: CRC = self.CRC compress_size = self.compress_size file_size = self.file_size extra = self.extra min_version = 0 if zip64 is None: zip64 = file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT if zip64: fmt = '<hhqq'> ZIP64_LIMIT or compress_size > ZIP64_LIMIT: if not zip64: raise LargeZipFile("Ukuran file memerlukan ekstensi ZIP64") # File lebih besar dari yang muat dalam integer 4-byte; # kembali ke ekstensi ZIP64. file_size = 0xffffffff compress_size = 0xffffffff min_version = ZIP64_VERSION if self.compress_type == ZIP_BZIP2: min_version = max(BZIP2_VERSION, min_version) elif self.compress_type == ZIP_LZMA: min_version = max(LZMA_VERSION, min_version) self.extract_version = max(min_version, self.extract_version) self.create_version = max(min_version, self.create_version) filename, flag_bits = self._encodeFilenameFlags() header = struct.pack(structFileHeader, stringFileHeader, self.extract_version, self.reserved, flag_bits, self.compress_type, dostime, dosdate, CRC, compress_size, file_size, len(filename), len(extra)) return header + filename + extra def _encodeFilenameFlags(self): try: return self.filename.encode('ascii'), self.flag_bits except UnicodeEncodeError: return self.filename.encode('utf-8'), self.flag_bits | 0x800 def _decodeExtra(self): # Coba uraikan bidang tambahan. extra = self.extra unpack = struct.unpack while len(extra) >= 4: tp, ln = unpack('<hh',> len(extra): raise BadZipFile( "Bidang tambahan %04x rusak (ukuran=%d)" % (tp, ln)) if tp == 0x0001: if ln >= 24: counts = unpack('<qqq',>> 1) ^ 0xEDB88320 else: crc >>= 1 return crc # ZIP mendukung bentuk enkripsi berbasis kata sandi. Meskipun serangan plaintext yang diketahui # telah ditemukan terhadapnya, masih berguna untuk dapat mengambil data dari file tersebut. # # Penggunaan: # zd = _ZipDecrypter(mypwd) # plain_bytes = zd(cipher_bytes) def _ZipDecrypter(pwd): key0 = 305419896 key1 = 591751049 key2 = 878082192 global _crctable if _crctable is None: _crctable = list(map(_gen_crc, range(256))) crctable = _crctable def crc32(ch, crc): """Hitung primitif CRC32 pada satu byte.""" return (crc >> 8) ^ crctable[(crc ^ ch) & 0xFF] def update_keys(c): nonlocal key0, key1, key2 key0 = crc32(c, key0) key1 = (key1 + (key0 & 0xFF)) & 0xFFFFFFFF key1 = (key1 * 134775813 + 1) & 0xFFFFFFFF key2 = crc32(key1 >> 24, key2) for p in pwd: update_keys(p) def decrypter(data): """Dekripsi objek bytes.""" result = bytearray() append = result.append for c in data: k = key2 | 2 c ^= ((k * (k ^ 1)) >> 8) & 0xFF update_keys(c) append(c) return bytes(result) return decrypter class LZMACompressor: def __init__(self): self._comp = None def _init(self): props = lzma._encode_filter_properties({'id': lzma.FILTER_LZMA1}) self._comp = lzma.LZMACompressor(lzma.FORMAT_RAW, filters=[ lzma._decode_filter_properties(lzma.FILTER_LZMA1, props) ]) return struct.pack('<bbh',><< 31 - 1 # Baca dari file terkompresi dalam blok 4k. MIN_READ_SIZE = 4096 def __init__(self, fileobj, mode, zipinfo, decrypter=None, close_fileobj=False): self._fileobj = fileobj self._decrypter = decrypter self._close_fileobj = close_fileobj self._compress_type = zipinfo.compress_type self._compress_left = zipinfo.compress_size self._left = zipinfo.file_size self._decompressor = _get_decompressor(self._compress_type) self._eof = False self._readbuffer = b'' self._offset = 0 self.newlines = None # Sesuaikan ukuran baca untuk file terenkripsi, karena 12 byte pertama # digunakan untuk informasi enkripsi/kata sandi. if self._decrypter is not None: self._compress_left -= 12 self.mode = mode self.name = zipinfo.filename if hasattr(zipinfo, 'CRC'): self._expected_crc = zipinfo.CRC self._running_crc = crc32(b'') else: self._expected_crc = None def __repr__(self): result = ['<%s.%s' % (self.__class__.__module__, self.__class__.__qualname__)] if not self.closed: result.append(' name=%r mode=%r' % (self.name, self.mode)) if self._compress_type != ZIP_STORED: result.append(' compress_type=%s' % compressor_names.get(self._compress_type, self._compress_type)) else: result.append(' [closed]') result.append('>') return ''.join(result) def readline(self, limit=-1): """Baca dan kembalikan satu baris dari aliran. Jika limit ditentukan, paling banyak limit byte akan dibaca. """ if limit < 0: # Jalan pintas untuk kasus umum: baris baru ditemukan dalam buffer. i = self._readbuffer.find(b'\n', self._offset) + 1 if i > 0: line = self._readbuffer[self._offset: i] self._offset = i return line return io.BufferedIOBase.readline(self, limit) def peek(self, n=1): """Kembalikan byte yang dibuffer tanpa memajukan posisi.""" if n > len(self._readbuffer) - self._offset: chunk = self.read(n) if len(chunk) > self._offset: self._readbuffer = chunk + self._readbuffer[self._offset:] self._offset = 0 else: self._offset -= len(chunk) # Kembalikan hingga 512 byte untuk mengurangi overhead alokasi untuk loop ketat. return self._readbuffer[self._offset: self._offset + 512] def readable(self): return True def read(self, n=-1): """Baca dan kembalikan hingga n byte. Jika argumen dihilangkan, None, atau negatif, data dibaca dan dikembalikan hingga EOF tercapai. """ if n is None or n < 0: buf = self._readbuffer[self._offset:] self._readbuffer = b'' self._offset = 0 while not self._eof: buf += self._read1(self.MAX_N) return buf end = n + self._offset if end < len(self._readbuffer): buf = self._readbuffer[self._offset:end] self._offset = end return buf n = end - len(self._readbuffer) buf = self._readbuffer[self._offset:] self._readbuffer = b'' self._offset = 0 while n > 0 and not self._eof: data = self._read1(n) if n < len(data): self._readbuffer = data self._offset = n buf += data[:n] break buf += data n -= len(data) return buf def _update_crc(self, newdata): # Perbarui CRC menggunakan data yang diberikan. if self._expected_crc is None: # Tidak perlu menghitung CRC jika tidak memiliki nilai referensi. return self._running_crc = crc32(newdata, self._running_crc) # Periksa CRC jika kita berada di akhir file. if self._eof and self._running_crc != self._expected_crc: raise BadZipFile("CRC-32 buruk untuk file %r" % self.name) def read1(self, n): """Baca hingga n byte dengan paling banyak satu panggilan sistem read().""" if n is None or n < 0: buf = self._readbuffer[self._offset:] self._readbuffer = b'' self._offset = 0 while not self._eof: data = self._read1(self.MAX_N) if data: buf += data break return buf end = n + self._offset if end < len(self._readbuffer): buf = self._readbuffer[self._offset:end] self._offset = end return buf n = end - len(self._readbuffer) buf = self._readbuffer[self._offset:] self._readbuffer = b'' self._offset = 0 if n > 0: while not self._eof: data = self._read1(n) if n < len(data): self._readbuffer = data self._offset = n buf += data[:n] break if data: buf += data break return buf def _read1(self, n): # Baca hingga n byte terkompresi dengan paling banyak satu panggilan sistem read(), # lalu dekripsi dan dekompresinya. if self._eof or n <= 0: return b'' # Baca dari file. if self._compress_type == ZIP_DEFLATED: # Tangani data yang belum dikonsumsi. data = self._decompressor.unconsumed_tail if n > len(data): data += self._read2(n - len(data)) else: data = self._read2(n) if self._compress_type == ZIP_STORED: self._eof = self._compress_left <= 0 elif self._compress_type == ZIP_DEFLATED: n = max(n, self.MIN_READ_SIZE) data = self._decompressor.decompress(data, n) self._eof = (self._decompressor.eof or self._compress_left <= 0 and not self._decompressor.unconsumed_tail) if self._eof: data += self._decompressor.flush() else: data = self._decompressor.decompress(data) self._eof = self._decompressor.eof or self._compress_left <= 0 data = data[:self._left] self._left -= len(data) if self._left <= 0: self._eof = True self._update_crc(data) return data def _read2(self, n): if self._compress_left <= 0: return b'' n = max(n, self.MIN_READ_SIZE) n = min(n, self._compress_left) data = self._fileobj.read(n) self._compress_left -= len(data) if not data: raise EOFError if self._decrypter is not None: data = self._decrypter(data) return data def close(self): try: if self._close_fileobj: self._fileobj.close() finally: super().close() class _ZipWriteFile(io.BufferedIOBase): def __init__(self, zf, zinfo, zip64): self._zinfo = zinfo self._zip64 = zip64 self._zipfile = zf self._compressor = _get_compressor(zinfo.compress_type) self._file_size = 0 self._compress_size = 0 self._crc = 0 @property def _fileobj(self): return self._zipfile.fp def writable(self): return True def write(self, data): if self.closed: raise ValueError('Operasi I/O pada file yang sudah ditutup.') nbytes = len(data) self._file_size += nbytes self._crc = crc32(data, self._crc) if self._compressor: data = self._compressor.compress(data) self._compress_size += len(data) self._fileobj.write(data) return nbytes def close(self): if self.closed: return super().close() # Kosongkan data apa pun dari kompresor dan perbarui info header. if self._compressor: buf = self._compressor.flush() self._compress_size += len(buf) self._fileobj.write(buf) self._zinfo.compress_size = self._compress_size else: self._zinfo.compress_size = self._file_size self._zinfo.CRC = self._crc self._zinfo.file_size = self._file_size # Tulis info header yang diperbarui. if self._zinfo.flag_bits & 0x08: # Tulis CRC dan ukuran file setelah data file fmt = '<lqq'> ZIP64_LIMIT: raise RuntimeError('Ukuran file melebihi batas ZIP64 ' 'secara tak terduga') if self._compress_size > ZIP64_LIMIT: raise RuntimeError('Ukuran terkompresi melebihi batas ' 'ZIP64 secara tak terduga') # Mundur dan tulis header file (yang sekarang akan mencakup # CRC dan ukuran file yang benar). # Simpan posisi saat ini dalam file. self._zipfile.start_dir = self._fileobj.tell() self._fileobj.seek(self._zinfo.header_offset) self._fileobj.write(self._zinfo.FileHeader(self._zip64)) self._fileobj.seek(self._zipfile.start_dir) self._zipfile._writing = False # Berhasil ditulis: Tambahkan file ke cache kami. self._zipfile.filelist.append(self._zinfo) self._zipfile.NameToInfo[self._zinfo.filename] = self._zinfo class ZipFile: """Kelas dengan metode untuk membuka, membaca, menulis, menutup, dan mendaftar file ZIP. z = ZipFile(file, mode="r", compression=ZIP_STORED, allowZip64=True) file: Baik path ke file, atau objek mirip file. Jika berupa path, file dibuka dan ditutup oleh ZipFile. mode: Bisa 'r' untuk membaca, 'w' untuk menulis, 'x' untuk pembuatan eksklusif, atau 'a' untuk menambahkan. compression: ZIP_STORED (tanpa kompresi), ZIP_DEFLATED (memerlukan zlib), ZIP_BZIP2 (memerlukan bz2), atau ZIP_LZMA (memerlukan lzma). allowZip64: Jika True, ZipFile membuat file dengan ekstensi ZIP64 saat diperlukan. Jika tidak, akan menimbulkan pengecualian jika ZIP64 diperlukan. """ fp = None # Diatur di sini karena __del__ memeriksanya _windows_illegal_name_trans_table = None def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True): """Buka file ZIP dalam mode baca ('r'), tulis ('w'), buat eksklusif ('x'), atau tambahkan ('a').""" if mode not in ('r', 'w', 'x', 'a'): raise ValueError("ZipFile memerlukan mode 'r', 'w', 'x', atau 'a'") _check_compression(compression) self._allowZip64 = allowZip64 self._didModify = False self.debug = 0 # Tingkat pencetakan: 0 hingga 3 self.NameToInfo = {} # Memetakan nama file ke instance ZipInfo self.filelist = [] # Daftar instance ZipInfo untuk arsip self.compression = compression # Metode kompresi default self.mode = mode self.pwd = None self._comment = b'' # Periksa apakah kami diberikan objek mirip file if isinstance(file, os.PathLike): file = os.fspath(file) if isinstance(file, str): # Ini adalah nama file self._filePassed = 0 self.filename = file modeDict = {'r': 'rb', 'w': 'w+b', 'x': 'x+b', 'a': 'r+b', 'r+b': 'w+b', 'w+b': 'wb', 'x+b': 'xb'} filemode = modeDict[mode] while True: try: self.fp = io.open(file, filemode) except OSError: if filemode in modeDict: filemode = modeDict[filemode] continue raise break else: self._filePassed = 1 self.fp = file self.filename = getattr(file, 'name', None) self._fileRefCnt = 1 self._lock = threading.RLock() self._seekable = True self._writing = False try: if mode == 'r': self._RealGetContents() elif mode in ('w', 'x'): # Atur flag dimodifikasi sehingga direktori pusat ditulis # bahkan jika tidak ada file yang ditambahkan ke arsip. self._didModify = True try: self.start_dir = self.fp.tell() except (AttributeError, OSError): self.fp = _Tellable(self.fp) self.start_dir = 0 self._seekable = False else: # Beberapa objek mirip file dapat memberikan tell() tetapi tidak seek(). try: self.fp.seek(self.start_dir) except (AttributeError, OSError): self._seekable = False elif mode == 'a': try: # Lihat apakah file tersebut adalah file ZIP. self._RealGetContents() # Arahkan ke awal direktori pusat dan timpa. self.fp.seek(self.start_dir) except BadZipFile: # File bukan file ZIP, cukup tambahkan ke dalamnya. self.fp.seek(0, 2) # Atur flag dimodifikasi sehingga direktori pusat ditulis # bahkan jika tidak ada file yang ditambahkan ke arsip. self._didModify = True self.start_dir = self.fp.tell() else: raise ValueError("Mode harus 'r', 'w', 'x', atau 'a'") except: fp = self.fp self.fp = None self._fpclose(fp) raise def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def __repr__(self): result = ['<%s.%s' % (self.__class__.__module__, self.__class__.__qualname__)] if self.fp is not None: if self._filePassed: result.append(' file=%r' % self.fp) elif self.filename is not None: result.append(' filename=%r' % self.filename) result.append(' mode=%r' % self.mode) else: result.append(' [closed]') result.append('>') return ''.join(result) def _RealGetContents(self): """Baca daftar isi dari file ZIP.""" fp = self.fp try: endrec = _EndRecData(fp) except OSError: raise BadZipFile("File bukan file ZIP") if not endrec: raise BadZipFile("File bukan file ZIP") if self.debug > 1: print(endrec) size_cd = endrec[_ECD_SIZE] # Byte dalam direktori pusat offset_cd = endrec[_ECD_OFFSET] # Offset direktori pusat self._comment = endrec[_ECD_COMMENT] # Komentar arsip # "concat" adalah nol, kecuali file ZIP digabungkan ke file lain. concat = endrec[_ECD_LOCATION] - size_cd - offset_cd if endrec[_ECD_SIGNATURE] == stringEndArchive64: # Jika struktur ekstensi ZIP64 ada, pertimbangkan itu. concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator) if self.debug > 2: inferred = concat + offset_cd print("given, inferred, offset", offset_cd, inferred, concat) # self.start_dir: Posisi awal direktori pusat. self.start_dir = offset_cd + concat if hasattr(fp, "bucket"): data = fp.get_content_bytes( self.start_dir, self.start_dir+size_cd-1) else: fp.seek(self.start_dir, 0) data = fp.read(size_cd) fp = io.BytesIO(data) total = 0 while total < size_cd: centdir = fp.read(sizeCentralDir) if len(centdir) != sizeCentralDir: raise BadZipFile("Direktori pusat terpotong") centdir = struct.unpack(structCentralDir, centdir) if centdir[_CD_SIGNATURE] != stringCentralDir: raise BadZipFile("Nomor ajaib buruk untuk direktori pusat") if self.debug > 2: print(centdir) filename = fp.read(centdir[_CD_FILENAME_LENGTH]) flags = centdir[5] if flags & 0x800: # Ekstensi nama file UTF-8 filename = filename.decode('utf-8') else: # Pengkodean nama file ZIP historis filename = filename.decode('cp437') # Buat instance ZipInfo untuk menyimpan informasi file. x = ZipInfo(filename) x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH]) x.comment = fp.read(centdir[_CD_COMMENT_LENGTH]) x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET] (x.create_version, x.create_system, x.extract_version, x.reserved, x.flag_bits, x.compress_type, t, d, x.CRC, x.compress_size, x.file_size) = centdir[1:12] if x.extract_version > MAX_EXTRACT_VERSION: raise NotImplementedError("Versi file ZIP %.1f" % (x.extract_version / 10)) x.volume, x.internal_attr, x.external_attr = centdir[15:18] # Konversi kode tanggal/waktu ke (tahun, bulan, hari, jam, menit, detik). x._raw_time = t x.date_time = ((d >> 9)+1980, (d >> 5) & 0xF, d & 0x1F, t >> 11, (t >> 5) & 0x3F, (t & 0x1F) * 2) x._decodeExtra() x.header_offset = x.header_offset + concat self.filelist.append(x) self.NameToInfo[x.filename] = x # Perbarui total byte yang dibaca dari direktori pusat. total = (total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH] + centdir[_CD_EXTRA_FIELD_LENGTH] + centdir[_CD_COMMENT_LENGTH]) if self.debug > 2: print("total", total) def namelist(self): """Kembalikan daftar nama file dalam arsip.""" return [data.filename for data in self.filelist] def infolist(self): """Kembalikan daftar instance ZipInfo untuk file dalam arsip.""" return self.filelist def printdir(self, file=None): """Cetak daftar isi untuk file ZIP.""" print("%-46s %19s %12s" % ("Nama File", "Dimodifikasi ", "Ukuran"), file=file) for zinfo in self.filelist: date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time[:6] print("%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size), file=file) def testzip(self): """Baca semua file dan periksa CRC-nya.""" chunk_size = 2 ** 20 for zinfo in self.filelist: try: # Baca per bagian untuk menghindari OverflowError atau # MemoryError dengan file tersemat yang sangat besar. with self.open(zinfo.filename, "r") as f: while f.read(chunk_size): # Periksa CRC-32 pass except BadZipFile: return zinfo.filename def getinfo(self, name): """Kembalikan ZipInfo untuk anggota `name`.""" info = self.NameToInfo.get(name) if info is None: raise KeyError( 'Tidak ada item bernama %r dalam arsip' % name) return info def setpassword(self, pwd): """Atur kata sandi default untuk file terenkripsi.""" if pwd and not isinstance(pwd, bytes): raise TypeError("pwd: diharapkan bytes, mendapatkan %s" % type(pwd).__name__) if pwd: self.pwd = pwd else: self.pwd = None @property def comment(self): """Teks komentar yang terkait dengan file ZIP.""" return self._comment @comment.setter def comment(self, comment): if not isinstance(comment, bytes): raise TypeError("komentar: diharapkan bytes, mendapatkan %s" % type(comment).__name__) # Periksa panjang komentar yang valid. if len(comment) > ZIP_MAX_COMMENT: import warnings warnings.warn('Komentar arsip terlalu panjang; dipotong menjadi %d byte' % ZIP_MAX_COMMENT, stacklevel=2) comment = comment[:ZIP_MAX_COMMENT] self._comment = comment self._didModify = True def read(self, name, pwd=None): """Kembalikan byte dari file `name`.""" with self.open(name, "r", pwd) as fp: return fp.read() def open(self, name, mode="r", pwd=None, *, force_zip64=False): """Kembalikan objek mirip file untuk `name`. `name` bisa berupa string yang menentukan nama file dalam file ZIP, atau objek ZipInfo. `mode` harus 'r' untuk membaca file dari arsip, atau 'w' untuk menulis file baru ke arsip. `pwd` adalah kata sandi untuk mendekripsi file (hanya digunakan untuk membaca). Saat menulis, jika ukuran file tidak diketahui sebelumnya tetapi mungkin melebihi 2 GiB, berikan `force_zip64=True` untuk menggunakan format ZIP64, yang mendukung file besar. Jika ukuran diketahui sebelumnya, sebaiknya berikan instance ZipInfo untuk `name` dengan `zinfo.file_size` diatur. """ if mode not in {"r", "w"}: raise ValueError('open() memerlukan mode "r" atau "w"') if pwd and not isinstance(pwd, bytes): raise TypeError("pwd: diharapkan bytes, mendapatkan %s" % type(pwd).__name__) if pwd and (mode == "w"): raise ValueError("pwd hanya didukung untuk membaca file") if not self.fp: raise ValueError( "Upaya menggunakan arsip ZIP yang sudah ditutup") # Pastikan kami memiliki objek info if isinstance(name, ZipInfo): # 'name' sudah merupakan objek info zinfo = name elif mode == 'w': zinfo = ZipInfo(name) zinfo.compress_type = self.compression else: # Dapatkan objek info untuk nama zinfo = self.getinfo(name) if mode == 'w': return self._open_to_write(zinfo, force_zip64=force_zip64) if self._writing: raise ValueError("Tidak dapat membaca dari file ZIP saat ada " "handle penulisan yang terbuka di atasnya. " "Tutup handle penulisan sebelum mencoba membaca.") # Buka untuk membaca: self._fileRefCnt += 1 zef_file = _SharedFile(self.fp, zinfo.header_offset, self._fpclose, self._lock, lambda: self._writing) try: # Lewati header file: fheader = zef_file.read(sizeFileHeader) if len(fheader) != sizeFileHeader: raise BadZipFile("Header file terpotong") fheader = struct.unpack(structFileHeader, fheader) if fheader[_FH_SIGNATURE] != stringFileHeader: raise BadZipFile("Nomor ajaib buruk untuk header file") fname = zef_file.read(fheader[_FH_FILENAME_LENGTH]) if fheader[_FH_EXTRA_FIELD_LENGTH]: zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) if zinfo.flag_bits & 0x20: # Zip 2.7: data patch terkompresi raise NotImplementedError( "data patch terkompresi (bit flag 5)") if zinfo.flag_bits & 0x40: # enkripsi kuat raise NotImplementedError("enkripsi kuat (bit flag 6)") if zinfo.flag_bits & 0x800: # nama file UTF-8 fname_str = fname.decode("utf-8") else: fname_str = fname.decode("cp437") if fname_str != zinfo.orig_filename: raise BadZipFile( 'Nama file dalam direktori %r dan header %r berbeda.' % (zinfo.orig_filename, fname)) # periksa flag terenkripsi & tangani kata sandi is_encrypted = zinfo.flag_bits & 0x1 zd = None if is_encrypted: if not pwd: pwd = self.pwd if not pwd: raise RuntimeError("File %r terenkripsi, kata sandi " "diperlukan untuk ekstraksi" % name) zd = _ZipDecrypter(pwd) # 12 byte pertama dalam aliran cipher membentuk header enkripsi # yang digunakan untuk memperkuat algoritma. 11 byte pertama sepenuhnya # acak, sedangkan byte ke-12 berisi MSB dari CRC # atau waktu file, tergantung jenis headernya, dan digunakan untuk # memeriksa kebenaran kata sandi. header = zef_file.read(12) h = zd(header[0:12]) if zinfo.flag_bits & 0x8: # bandingkan dengan jenis file dari header lokal ekstensi check_byte = (zinfo._raw_time >> 8) & 0xff else: # bandingkan dengan CRC sebaliknya check_byte = (zinfo.CRC >> 24) & 0xff if h[11] != check_byte: raise RuntimeError("Kata sandi buruk untuk file %r" % name) return ZipExtFile(zef_file, mode, zinfo, zd, True) except: zef_file.close() raise def _open_to_write(self, zinfo, force_zip64=False): if force_zip64 and not self._allowZip64: raise ValueError( "force_zip64 adalah True, tetapi allowZip64 adalah False saat membuka " "file ZIP." ) if self._writing: raise ValueError("Tidak dapat menulis ke file ZIP saat handle penulisan lain " "terbuka di atasnya. " "Tutup handle pertama sebelum membuka yang lain.") # Ukuran dan CRC akan ditimpa dengan data yang benar setelah memproses file. if not hasattr(zinfo, 'file_size'): zinfo.file_size = 0 zinfo.compress_size = 0 zinfo.CRC = 0 zinfo.flag_bits = 0x00 if zinfo.compress_type == ZIP_LZMA: # Data terkompresi mencakup penanda akhir-aliran (EOS) zinfo.flag_bits |= 0x02 if not self._seekable: zinfo.flag_bits |= 0x08 if not zinfo.external_attr: zinfo.external_attr = 0o600 << 16 # izin: ?rw------- # Ukuran terkompresi bisa lebih besar dari ukuran tidak terkompresi. zip64 = self._allowZip64 and \ (force_zip64 or zinfo.file_size * 1.05 > ZIP64_LIMIT) if self._seekable: self.fp.seek(self.start_dir) zinfo.header_offset = self.fp.tell() self._writecheck(zinfo) self._didModify = True self.fp.write(zinfo.FileHeader(zip64)) self._writing = True return _ZipWriteFile(self, zinfo, zip64) def extract(self, member, path=None, pwd=None): """Ekstrak anggota dari arsip ke direktori kerja saat ini. `member` bisa berupa nama file atau objek ZipInfo. Anda dapat menentukan direktori berbeda menggunakan `path`. Informasi file-nya diekstrak seakurat mungkin. """ if path is None: path = os.getcwd() else: path = os.fspath(path) return self._extract_member(member, path, pwd) def extractall(self, path=None, members=None, pwd=None): """Ekstrak semua anggota dari arsip ke direktori kerja saat ini. `path` menentukan direktori berbeda untuk diekstrak. `members` opsional dan harus merupakan subset dari daftar yang dikembalikan oleh `namelist()`. """ if members is None: members = self.namelist() if path is None: path = os.getcwd() else: path = os.fspath(path) for zipinfo in members: self._extract_member(zipinfo, path, pwd) @classmethod def _sanitize_windows_name(cls, arcname, pathsep): """Ganti karakter ilegal dan hapus titik akhir dari bagian-bagiannya.""" table = cls._windows_illegal_name_trans_table if not table: illegal = ':<>|"?*' table = str.maketrans(illegal, '_' * len(illegal)) cls._windows_illegal_name_trans_table = table arcname = arcname.translate(table) # hapus titik akhir arcname = (x.rstrip('.') for x in arcname.split(pathsep)) # gabungkan kembali, menghapus bagian kosong arcname = pathsep.join(x for x in arcname if x) return arcname def _extract_member(self, member, targetpath, pwd): """Ekstrak objek ZipInfo `member` ke file fisik pada path `targetpath`. """ if not isinstance(member, ZipInfo): member = self.getinfo(member) # bangun pathname tujuan, mengganti # garis miring maju dengan pemisah khusus platform. arcname = member.filename.replace('/', os.path.sep) if os.path.altsep: arcname = arcname.replace(os.path.altsep, os.path.sep) # interpretasikan pathname absolut sebagai relatif, hapus huruf drive atau # path UNC, dan pemisah berlebihan, komponen "." dan "..". arcname = os.path.splitdrive(arcname)[1] invalid_path_parts = ('', os.path.curdir, os.path.pardir) arcname = os.path.sep.join(x for x in arcname.split(os.path.sep) if x not in invalid_path_parts) if os.path.sep == '\\': # filter karakter ilegal di Windows arcname = self._sanitize_windows_name(arcname, os.path.sep) targetpath = os.path.join(targetpath, arcname) targetpath = os.path.normpath(targetpath) # Buat semua direktori atas jika perlu. upperdirs = os.path.dirname(targetpath) if upperdirs and not os.path.exists(upperdirs): os.makedirs(upperdirs) if member.is_dir(): if not os.path.isdir(targetpath): os.mkdir(targetpath) return targetpath with self.open(member, pwd=pwd) as source, \ open(targetpath, "wb") as target: shutil.copyfileobj(source, target) return targetpath def _writecheck(self, zinfo): """Periksa kesalahan sebelum menulis file ke arsip.""" if zinfo.filename in self.NameToInfo: import warnings warnings.warn('Nama duplikat: %r' % zinfo.filename, stacklevel=3) if self.mode not in ('w', 'x', 'a'): raise ValueError("write() memerlukan mode 'w', 'x', atau 'a'") if not self.fp: raise ValueError( "Upaya menulis ke arsip ZIP yang sudah ditutup") _check_compression(zinfo.compress_type) if not self._allowZip64: requires_zip64 = None if len(self.filelist) >= ZIP_FILECOUNT_LIMIT: requires_zip64 = "Jumlah file" elif zinfo.file_size > ZIP64_LIMIT: requires_zip64 = "Ukuran file" elif zinfo.header_offset > ZIP64_LIMIT: requires_zip64 = "Ukuran file ZIP" if requires_zip64: raise LargeZipFile(requires_zip64 + " akan memerlukan ekstensi ZIP64") def write(self, filename, arcname=None, compress_type=None): """Tulis file `filename` ke arsip dengan nama `arcname`.""" if not self.fp: raise ValueError( "Upaya menulis ke arsip ZIP yang sudah ditutup") if self._writing: raise ValueError( "Tidak dapat menulis ke arsip ZIP saat handle penulisan terbuka ada" ) zinfo = ZipInfo.from_file(filename, arcname) if zinfo.is_dir(): zinfo.compress_size = 0 zinfo.CRC = 0 else: if compress_type is not None: zinfo.compress_type = compress_type else: zinfo.compress_type = self.compression if zinfo.is_dir(): with self._lock: if self._seekable: self.fp.seek(self.start_dir) zinfo.header_offset = self.fp.tell() # Awal byte header if zinfo.compress_type == ZIP_LZMA: # Data terkompresi mencakup penanda akhir-aliran (EOS) zinfo.flag_bits |= 0x02 self._writecheck(zinfo) self._didModify = True self.filelist.append(zinfo) self.NameToInfo[zinfo.filename] = zinfo self.fp.write(zinfo.FileHeader(False)) self.start_dir = self.fp.tell() else: with open(filename, "rb") as src, self.open(zinfo, 'w') as dest: shutil.copyfileobj(src, dest, 1024*8) def writestr(self, zinfo_or_arcname, data, compress_type=None): """Tulis file ke dalam arsip. Isinya adalah `data`, yang bisa berupa instance `str` atau `bytes`; jika berupa `str`, pertama-tama diencode sebagai UTF-8. `zinfo_or_arcname` adalah instance ZipInfo atau nama file dalam arsip.""" if isinstance(data, str): data = data.encode("utf-8") if not isinstance(zinfo_or_arcname, ZipInfo): zinfo = ZipInfo(filename=zinfo_or_arcname, date_time=time.localtime(time.time())[:6]) zinfo.compress_type = self.compression if zinfo.filename[-1] == '/': zinfo.external_attr = 0o40775 << 16 # drwxrwxr-x zinfo.external_attr |= 0x10 # flag direktori MS-DOS else: zinfo.external_attr = 0o600 << 16 # ?rw------- else: zinfo = zinfo_or_arcname if not self.fp: raise ValueError( "Upaya menulis ke arsip ZIP yang sudah ditutup") if self._writing: raise ValueError( "Tidak dapat menulis ke arsip ZIP saat handle penulisan terbuka ada." ) if compress_type is not None: zinfo.compress_type = compress_type zinfo.file_size = len(data) # Ukuran tidak terkompresi with self._lock: with self.open(zinfo, mode='w') as dest: dest.write(data) def __del__(self): """Panggil metode "close()" jika pengguna lupa.""" self.close() def close(self): """Tutup file, dan untuk mode 'w', 'x', dan 'a', tulis catatan akhir.""" if self.fp is None: return if self._writing: raise ValueError("Tidak dapat menutup file ZIP saat ada " "handle penulisan yang terbuka di atasnya. " "Tutup handle penulisan sebelum menutup file ZIP.") try: if self.mode in ('w', 'x', 'a') and self._didModify: # tulis catatan akhir with self._lock: if self._seekable: self.fp.seek(self.start_dir) self._write_end_record() finally: fp = self.fp self.fp = None self._fpclose(fp) def _write_end_record(self): for zinfo in self.filelist: # tulis direktori pusat dt = zinfo.date_time dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2] dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2) extra = [] if zinfo.file_size > ZIP64_LIMIT \ or zinfo.compress_size > ZIP64_LIMIT: extra.append(zinfo.file_size) extra.append(zinfo.compress_size) file_size = 0xffffffff compress_size = 0xffffffff else: file_size = zinfo.file_size compress_size = zinfo.compress_size if zinfo.header_offset > ZIP64_LIMIT: extra.append(zinfo.header_offset) header_offset = 0xffffffff else: header_offset = zinfo.header_offset extra_data = zinfo.extra min_version = 0 if extra: # Tambahkan bidang ZIP64 ke data tambahan extra_data = struct.pack( '<hh'> ZIP_FILECOUNT_LIMIT: requires_zip64 = "Jumlah file" elif centDirOffset > ZIP64_LIMIT: requires_zip64 = "Offset direktori pusat" elif centDirSize > ZIP64_LIMIT: requires_zip64 = "Ukuran direktori pusat" if requires_zip64: # Perlu menulis catatan akhir arsip ZIP64 if not self._allowZip64: raise LargeZipFile(requires_zip64 + " akan memerlukan ekstensi ZIP64") zip64endrec = struct.pack( structEndArchive64, stringEndArchive64, 44, 45, 45, 0, 0, centDirCount, centDirCount, centDirSize, centDirOffset) self.fp.write(zip64endrec) zip64locrec = struct.pack( structEndArchive64Locator, stringEndArchive64Locator, 0, pos2, 1) self.fp.write(zip64locrec) centDirCount = min(centDirCount, 0xFFFF) centDirSize = min(centDirSize, 0xFFFFFFFF) centDirOffset = min(centDirOffset, 0xFFFFFFFF) endrec = struct.pack(structEndArchive, stringEndArchive, 0, 0, centDirCount, centDirCount, centDirSize, centDirOffset, len(self._comment)) self.fp.write(endrec) self.fp.write(self._comment) self.fp.flush() def _fpclose(self, fp): assert self._fileRefCnt > 0 self._fileRefCnt -= 1 if not self._fileRefCnt and not self._filePassed: fp.close() class PyZipFile(ZipFile): """Kelas untuk membuat arsip ZIP dengan file dan paket pustaka Python.""" def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True, optimize=-1): ZipFile.__init__(self, file, mode=mode, compression=compression, allowZip64=allowZip64) self._optimize = optimize def writepy(self, pathname, basename="", filterfunc=None): """Tambahkan semua file dari `pathname` ke arsip ZIP. Jika `pathname` adalah direktori paket, cari secara rekursif direktori dan semua subdirektorinya untuk file *.py dan tambahkan modul-modulnya ke arsip. Jika `pathname` adalah direktori biasa, tambahkan semua file *.py dari direktori tersebut. Jika `pathname` adalah file Python *.py, metode ini menambahkan modul ke arsip. Metode ini selalu menyimpan modul yang ditambahkan sebagai `module.pyc`. Metode ini akan mengompilasi `module.py` menjadi `module.pyc` jika diperlukan. Jika `filterfunc(pathname)` disediakan, dipanggil untuk setiap path. Metode ini melewati file atau direktori jika `filterfunc` mengembalikan False. """ pathname = os.fspath(pathname) if filterfunc and not filterfunc(pathname): if self.debug: label = 'path' if os.path.isdir(pathname) else 'file' print('%s %r dilewati oleh filterfunc' % (label, pathname)) return dir, name = os.path.split(pathname) if os.path.isdir(pathname): initname = os.path.join(pathname, "__init__.py") if os.path.isfile(initname): # Ini adalah direktori paket, tambahkan if basename: basename = "%s/%s" % (basename, name) else: basename = name if self.debug: print("Menambahkan paket di", pathname, "sebagai", basename) fname, arcname = self._get_codename(initname[0:-3], basename) if self.debug: print("Menambahkan", arcname) self.write(fname, arcname) dirlist = os.listdir(pathname) dirlist.remove("__init__.py") # Tambahkan semua file *.py dan subdirektori paket for filename in dirlist: path = os.path.join(pathname, filename) root, ext = os.path.splitext(filename) if os.path.isdir(path): if os.path.isfile(os.path.join(path, "__init__.py")): # Ini adalah direktori paket, tambahkan self.writepy(path, basename, filterfunc=filterfunc) # Panggilan rekursif elif ext == ".py": if filterfunc and not filterfunc(path): if self.debug: print('file %r dilewati oleh filterfunc' % path) continue fname, arcname = self._get_codename(path[0:-3], basename) if self.debug: print("Menambahkan", arcname) self.write(fname, arcname) else: # Ini BUKAN direktori paket; tambahkan file-filenya di tingkat atas if self.debug: print("Menambahkan file dari direktori", pathname) for filename in os.listdir(pathname): path = os.path.join(pathname, filename) root, ext = os.path.splitext(filename) if ext == ".py": if filterfunc and not filterfunc(path): if self.debug: print('file %r dilewati oleh filterfunc' % path) continue fname, arcname = self._get_codename(path[0:-3], basename) if self.debug: print("Menambahkan", arcname) self.write(fname, arcname) else: if pathname[-3:] != ".py": raise RuntimeError( 'File yang ditambahkan dengan writepy() harus diakhiri dengan ".py"') fname, arcname = self._get_codename(pathname[0:-3], basename) if self.debug: print("Menambahkan file", arcname) self.write(fname, arcname) def _get_codename(self, pathname, basename): """Kembalikan (nama file, nama arsip) untuk path yang diberikan. Diberikan path modul, kembalikan path file yang benar dan nama arsip, mengompilasi jika perlu. Misalnya, diberikan `/python/lib/string`, kembalikan (`/python/lib/string.pyc`, `string`). """ def _compile(file, optimize=-1): import py_compile if self.debug: print("Mengompilasi", file) try: py_compile.compile(file, doraise=True, optimize=optimize) except py_compile.PyCompileError as err: print(err.msg) return False return True file_py = pathname + ".py" file_pyc = pathname + ".pyc" pycache_opt0 = importlib.util.cache_from_source( file_py, optimization='') pycache_opt1 = importlib.util.cache_from_source( file_py, optimization=1) pycache_opt2 = importlib.util.cache_from_source( file_py, optimization=2) if self._optimize == -1: # mode lama: gunakan file apa pun yang ada if (os.path.isfile(file_pyc) and os.stat(file_pyc).st_mtime >= os.stat(file_py).st_mtime): # Gunakan file .pyc. arcname = fname = file_pyc elif (os.path.isfile(pycache_opt0) and os.stat(pycache_opt0).st_mtime >= os.stat(file_py).st_mtime): # Gunakan file __pycache__/*.pyc, tetapi tulis ke nama file pyc lama # dalam arsip. fname = pycache_opt0 arcname = file_pyc elif (os.path.isfile(pycache_opt1) and os.stat(pycache_opt1).st_mtime >= os.stat(file_py).st_mtime): # Gunakan file __pycache__/*.pyc, tetapi tulis ke nama file pyc lama # dalam arsip. fname = pycache_opt1 arcname = file_pyc elif (os.path.isfile(pycache_opt2) and os.stat(pycache_opt2).st_mtime >= os.stat(file_py).st_mtime): # Gunakan file __pycache__/*.pyc, tetapi tulis ke nama file pyc lama # dalam arsip. fname = pycache_opt2 arcname = file_pyc else: # Kompilasi .py menjadi file .pyc PEP 3147. if _compile(file_py): if sys.flags.optimize == 0: fname = pycache_opt0 elif sys.flags.optimize == 1: fname = pycache_opt1 else: fname = pycache_opt2 arcname = file_pyc else: fname = arcname = file_py else: # mode baru: gunakan tingkat optimasi yang diberikan if self._optimize == 0: fname = pycache_opt0 arcname = file_pyc else: arcname = file_pyc if self._optimize == 1: fname = pycache_opt1 elif self._optimize == 2: fname = pycache_opt2 else: msg = "nilai tidak valid untuk 'optimize': {!r}".format( self._optimize) raise ValueError(msg) if not (os.path.isfile(fname) and os.stat(fname).st_mtime >= os.stat(file_py).st_mtime): if not _compile(file_py, optimize=self._optimize): fname = arcname = file_py archivename = os.path.split(arcname)[1] if basename: archivename = "%s/%s" % (basename, archivename) return (fname, archivename) def main(args=None): import argparse description = 'Antarmuka baris perintah sederhana untuk modul zipfile.' parser = argparse.ArgumentParser(description=description) group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-l', '--list', metavar='<zipfile>', help='Tampilkan daftar isi file ZIP') group.add_argument('-e', '--extract', nargs=2, metavar=('<zipfile>', '<output_dir>'), help='Ekstrak file ZIP ke direktori target') group.add_argument('-c', '--create', nargs='+', metavar=('<name>', '<file>'), help='Buat file ZIP dari sumber-sumber') group.add_argument('-t', '--test', metavar='<zipfile>', help='Uji apakah file ZIP valid') args = parser.parse_args(args) if args.test is not None: src = args.test with ZipFile(src, 'r') as zf: badfile = zf.testzip() if badfile: print( "File terlampir berikut rusak: {!r}".format(badfile)) print("Selesai menguji") elif args.list is not None: src = args.list with ZipFile(src, 'r') as zf: zf.printdir() elif args.extract is not None: src, curdir = args.extract with ZipFile(src, 'r') as zf: zf.extractall(curdir) elif args.create is not None: zip_name = args.create.pop(0) files = args.create def addToZip(zf, path, zippath): if os.path.isfile(path): zf.write(path, zippath, ZIP_DEFLATED) elif os.path.isdir(path): if zippath: zf.write(path, zippath) for nm in os.listdir(path): addToZip(zf, os.path.join(path, nm), os.path.join(zippath, nm)) # else: abaikan with ZipFile(zip_name, 'w') as zf: for path in files: zippath = os.path.basename(path) if not zippath: zippath = os.path.basename(os.path.dirname(path)) if zippath in ('', os.curdir, os.pardir): zippath = '' addToZip(zf, path, zippath) if __name__ == "__main__": main()</zipfile></file></name></output_dir></zipfile></zipfile></hh'></lqq'></bbh',></qqq',></hh',></hhqq'>
Langkah 2: Buat pemicu OSS
Di halaman detail fungsi, pilih tab Trigger dan klik Create Trigger.
Di panel Buat Pemicu, pilih jenis pemicu OSS, konfigurasikan pengaturan, lalu klik OK.
Pengaturan utama dijelaskan di bawah ini. Untuk pengaturan lainnya, lihat Konfigurasi pemicu OSS native.
Bucket Name: Pilih bucket yang telah Anda buat.
Awalan Objek: Masukkan
srcuntuk contoh ini.Akhiran Objek: Masukkan
zipuntuk contoh ini.Event Pemicu: Untuk contoh ini, pilih event berikut:
oss:ObjectCreated:PutObject, oss:ObjectCreated:PostObject, oss:ObjectCreated:CompleteMultipartUpload, oss:ObjectCreated:PutSymlink.Role Name: Pilih peran dengan izin untuk memanggil fungsi. Anda dapat memberikan izin ini dengan melampirkan kebijakan AliyunFCFullAccess ke peran tersebut.
Setelah dibuat, pemicu akan muncul di halaman Pemicu.
Langkah 3: Verifikasi
Anda dapat menguji konfigurasi validasi Anda dengan dua cara:
Metode 1: Unggah file melalui konsol
Masuk ke Konsol Object Storage Service dan unggah file ZIP, seperti code.zip, ke direktori src bucket yang Anda pilih di Langkah 2: Buat pemicu OSS. Pengunggahan secara otomatis memicu fungsi untuk mengekstrak file ZIP ke direktori root bucket.
Metode 2: Uji dengan parameter event
Setelah mengonfigurasi parameter event, klik Test Function untuk menjalankan fungsi. Kemudian, buka Konsol Object Storage Service dan periksa daftar file di bucket target untuk memverifikasi bahwa file ZIP telah didekompresi.
Di halaman detail fungsi, klik tab Code, klik ikon
di samping Test Function, lalu pilih Configure Test Parameters dari daftar drop-down.Di panel Configure Test Parameters, pilih Event Template, masukkan Event Name dan konten event, lalu klik OK.
Berikut ini contoh event.
{ "events": [ { "eventName": "ObjectCreated:PutObject", "eventSource": "acs:oss", "eventTime": "2023-08-13T06:45:43.000Z", "eventVersion": "1.0", "oss": { "bucket": { "arn": "acs:oss:cn-hangzhou:10343546824****:bucket****", "name": "bucket****", "ownerIdentity": "10343546824****" }, "object": { "deltaSize": 122539, "eTag": "688A7BF4F233DC9C88A80BF985AB****", "key": "src/test.zip", "size": 122539 }, "ossSchemaVersion": "1.0", "ruleId": "9adac8e253828f4f7c0466d941fa3db81161****" }, "region": "cn-hangzhou", "requestParameters": { "sourceIPAddress": "140.205.XX.XX" }, "responseElements": { "requestId": "58F9FF2D3DF792092E12044C" }, "userIdentity": { "principalId": "10343546824****" } } ] }Untuk penjelasan tentang bidang dalam parameter event, lihat Konfigurasi Parameter Input Fungsi.
PentingKonten event di atas hanya untuk tujuan demonstrasi. Anda harus memodifikasi beberapa parameter berdasarkan informasi aktual Anda dan memastikan bahwa file yang ditentukan (dalam contoh ini,
src/test.zip) ada di bucket yang dikonfigurasi. Jika tidak, fungsi tidak dapat dipicu atau eksekusi fungsi akan gagal.Parameter berikut harus dimodifikasi berdasarkan konfigurasi aktual Anda:
bucket.arn: Misalnya,acs:oss:<region>:<your_account_id>:<your_bucket>. Ganti<region>dengan wilayah tempat Anda membuat fungsi,<your_account_id>dengan ID akun Alibaba Cloud Anda, dan<your_bucket>dengan nama bucket yang telah Anda buat di wilayah yang sama. Anda dapat menemukan ID akun Alibaba Cloud Anda di bagian Informasi Umum halaman Ikhtisar di Konsol Function Compute.bucket.name: Ganti dengan nama bucket Anda.bucket.ownerIdentity: Ganti dengan ID akun Alibaba Cloud Anda.object.key: Ganti dengan kunci objek di bucket target.region: Ganti dengan wilayah fungsi.userIdentity.principalId: Ganti dengan ID akun Alibaba Cloud Anda.
Di tab Code, klik Test Function untuk menjalankan uji coba.
Setelah operasi selesai, masuk ke Konsol Object Storage Service dan periksa apakah file ZIP target (
src/test.zipdalam contoh ini) di bucket target telah didekompresi. Untuk informasi lebih lanjut, lihat Kueri file menggunakan konsol OSS.
Jika pengujian telah selesai, hapus aplikasi dan sumber daya terkaitnya jika Anda tidak lagi membutuhkannya untuk menghindari biaya yang tidak perlu.
Referensi
Untuk mempelajari lebih lanjut tentang pemicu OSS, lihat Ikhtisar Pemicu OSS.
Untuk menggunakan Function Compute untuk mengemas dan mengunduh file dari OSS, lihat Unduh beberapa file sebagai paket ZIP.
Untuk informasi lebih lanjut tentang penyimpanan objek, lihat Bucket dan Objek.