全部产品
Search
文档中心

Function Compute:Gunakan Function Compute untuk mendekompresi otomatis file ZIP yang diunggah ke OSS

更新时间:Jul 09, 2025

Function Compute dapat mengotomatiskan dekompresi file yang diunggah ke Object Storage Service (OSS). Saat file ZIP yang memenuhi aturan tertentu diunggah, proses ini secara otomatis memicu Function Compute yang telah ditentukan sebelumnya. Setelah dekompresi, file yang diekstraksi akan diunggah ke direktori yang ditentukan di OSS.

Catatan penggunaan

  • Gunakan pengkodean UTF-8 atau GB2312 untuk nama file atau folder guna menghindari teks berantakan dan gangguan selama dekompresi.

  • Untuk file Arsip atau Penyimpanan Arsip Dingin, pulihkan file terlebih dahulu sebelum mendekompresinya.

  • Waktu maksimum untuk mendekompresi paket ZIP adalah 2 jam. Tugas yang melebihi batas waktu ini akan gagal.

  • Jaga ukuran file tunggal dalam paket ZIP di bawah 1 GB untuk menghindari kegagalan dekompresi.

  • Atur batas waktu fungsi lebih dari 2 jam, tetapi tidak lebih dari 24 jam.

  • Buat fungsi Anda di wilayah yang sama dengan bucket OSS terkait.

Prasyarat

Prosedur

Langkah 1: Buat fungsi

  1. Masuk ke Konsol Function Compute. Di panel navigasi sisi kiri, klik Functions.

  2. Di bilah navigasi atas, pilih wilayah. Pada halaman Functions, klik Create Function.

  3. Di halaman Create Function, pilih metode pembuatan fungsi, konfigurasikan item yang diperlukan, lalu klik Create.

    Item konfigurasi utama dirinci sebagai berikut. Untuk detail tentang item lainnya, lihat Buat Fungsi.

    • Runtime: Pilih Python 3.10 sebagai runtime fungsi Anda.

    • Function Role: Pilih peran yang ada atau buat yang baru untuk memastikan fungsi memiliki izin yang diperlukan untuk mengakses bucket OSS. Lampirkan kebijakan AliyunOSSFullAccess ke peran ini.

  4. Pada halaman Detail Fungsi dari fungsi yang baru dibuat, klik tab Code. Pada tab Code, buat file kode dan tulis kode Anda di editor kode, lalu klik Deploy.

    Contoh kode adalah sebagai berikut.

    • index.py file

      Klik untuk melihat contoh kode

      # -*- coding: utf-8 -*-
      '''
      Pernyataan:
      Fungsi ini memberi nama dan mengkodekan file dan folder sebagai berikut:
      1. Untuk MAC/Linux, pengkodean UTF-8 digunakan secara default.
      2. Untuk Windows, pengkodean GB2312 atau UTF-8 digunakan secara default.
      
      Untuk pengkodean lainnya, deteksi pengkodean dilakukan menggunakan library chardet; 
      namun, akurasi 100% tidak dijamin. 
      Tulis ulang fungsi ini hanya jika diperlukan, dan pastikan itu lolos debugging.
      '''
      
      import helper
      import oss2
      import json
      import os
      import time
      import logging
      import chardet
      
      """
      Saat objek dengan prefix source/ ditempatkan di bucket OSS, diharapkan objek tersebut akan didekompresi dan kemudian disimpan di bucket sebagai processed/ prefixed.
      Misalnya, source/a.zip akan diproses sebagai processed/a/... 
      "Source /", "processed/" dapat diubah sesuai kebutuhan.
      """
      # Matikan info log yang dicetak oleh SDK OSS
      logging.getLogger("oss2.api").setLevel(logging.ERROR)
      logging.getLogger("oss2.auth").setLevel(logging.ERROR)
      
      LOGGER = logging.getLogger()
      
      # Dekorator untuk mencetak waktu eksekusi suatu fungsi
      
      
      def print_excute_time(func):
          def wrapper(*args, **kwargs):
              local_time = time.time()
              ret = func(*args, **kwargs)
              LOGGER.info('current Function [%s] excute time is %.2f' %
                          (func.__name__, time.time() - local_time))
              return ret
          return wrapper
      
      
      def get_zipfile_name(origin_name):  # Mengatasi masalah teks Cina yang berantakan
          name = origin_name
          try:
              name_bytes = origin_name.encode(encoding="cp437")
          except:
              name_bytes = origin_name.encode(encoding="utf-8")
      
          # Jika string yang akan dideteksi cukup panjang, hasil deteksi lebih akurat
          detect = chardet.detect(name_bytes)
          confidence = detect["confidence"]
          detect_encoding = detect["encoding"]
          if confidence > 0.75 and (detect_encoding.lower() in ["gb2312", "gbk", "gb18030", "ascii", "utf-8"]):
              try:
                  if detect_encoding.lower() in ["gb2312", "gbk", "gb18030"]:
                      detect_encoding = "gb18030"
                  name = name_bytes.decode(detect_encoding)
              except:
                  name = name_bytes.decode(encoding="gb18030")
          else:
              try:
                  name = name_bytes.decode(encoding="gb18030")
              except:
                  name = name_bytes.decode(encoding="utf-8")
          # Perbaiki windows \\ sebagai segmen direktori
          name = name.replace("\\", "/")
          return name
      
      
      @print_excute_time
      def handler(event, context):
          """
          Objek dari OSS akan didekompresi secara otomatis.
          param: event:   String JSON acara OSS, termasuk URI objek OSS dan informasi lainnya.
      
          param: context: Konteks fungsi, termasuk kredensial dan informasi runtime.
      
          """
          evt_lst = json.loads(event)
          creds = context.credentials
          auth = oss2.StsAuth(
              creds.access_key_id,
              creds.access_key_secret,
              creds.security_token)
      
          evt = evt_lst['events'][0]
          bucket_name = evt['oss']['bucket']['name']
          endpoint = 'oss-' + evt['region'] + '-internal.aliyuncs.com'
          bucket = oss2.Bucket(auth, endpoint, bucket_name)
          object_name = evt['oss']['object']['key']
      
          if "ObjectCreated:PutSymlink" == evt['eventName']:
              object_name = bucket.get_symlink(object_name).target_key
              if object_name == "":
                  raise RuntimeError('{} adalah file symlink yang tidak valid'.format(
                      evt['oss']['object']['key']))
      
          file_type = os.path.splitext(object_name)[1]
      
          if file_type != ".zip":
              raise RuntimeError('{} tipe file bukan zip'.format(object_name))
      
          LOGGER.info("mulai mendekompresi file zip = {}".format(object_name))
      
          lst = object_name.split("/")
          zip_name = lst[-1]
          PROCESSED_DIR = os.environ.get("PROCESSED_DIR", "")
          RETAIN_FILE_NAME = os.environ.get("RETAIN_FILE_NAME", "")
          if PROCESSED_DIR and PROCESSED_DIR[-1] != "/":
              PROCESSED_DIR += "/"
          if RETAIN_FILE_NAME == "false":
              newKey = PROCESSED_DIR
          else:
              newKey = PROCESSED_DIR + zip_name
      
          zip_fp = helper.OssStreamFileLikeObject(bucket, object_name)
          newKey = newKey.replace(".zip", "/")
      
          with helper.zipfile_support_oss.ZipFile(zip_fp) as zip_file:
              for name in zip_file.namelist():
                  with zip_file.open(name) as file_obj:
                      name = get_zipfile_name(name)
                      bucket.put_object(newKey + name, file_obj)
    • helper.py file

      Klik untuk melihat contoh kode

      # -*- coding: utf-8 -*-
      import oss2
      from oss2 import utils, models
      import ossZipfile as zipfile
      
      zipfile_support_oss = zipfile
      
      # Dukungan unggah ke OSS sebagai objek seperti file
      
      
      def make_crc_adapter(data, init_crc=0):
          data = utils.to_bytes(data)
          # Objek seperti file
          if hasattr(data, 'read'):
              return utils._FileLikeAdapter(data, crc_callback=utils.Crc64(init_crc))
      
      
      utils.make_crc_adapter = make_crc_adapter
      
      
      class OssStreamFileLikeObject(object):
          def __init__(self, bucket, key):
              super(OssStreamFileLikeObject, self).__init__()
              self._bucket = bucket
              self._key = key
              self._meta_data = self._bucket.get_object_meta(self._key)
      
          @property
          def bucket(self):
              return self._bucket
      
          @property
          def key(self):
              return self._key
      
          @property
          def filesize(self):
              return self._meta_data.content_length
      
          def get_reader(self, begin, end):
              begin = begin if begin >= 0 else 0
              end = end if end > 0 else self.filesize - 1
              end = end if end < self.filesize else self.filesize - 1
              begin = begin if begin < end else end
              return self._bucket.get_object(self._key, byte_range=(begin, end))
      
          def get_content_bytes(self, begin, end):
              reader = self.get_reader(begin, end)
              return reader.read()
      
          def get_last_content_bytes(self, offset):
              return self.get_content_bytes(self.filesize-offset, self.filesize-1)
    • ossZipfile.py file

      Klik untuk melihat contoh kode

      """
      Baca dan tulis file ZIP.
      
      XXX referensi ke UTF-8 memerlukan investigasi lebih lanjut.
      """
      import io
      import os
      import importlib.util
      import sys
      import time
      import stat
      import shutil
      import struct
      import binascii
      import threading
      
      try:
          import zlib  # Kita mungkin memerlukan metode kompresinya
          crc32 = zlib.crc32
      except ImportError:
          zlib = None
          crc32 = binascii.crc32
      
      try:
          import bz2  # Kita mungkin memerlukan metode kompresinya
      except ImportError:
          bz2 = None
      
      try:
          import lzma  # Kita mungkin memerlukan metode kompresinya
      except ImportError:
          lzma = None
      
      __all__ = ["BadZipFile", "BadZipfile", "error",
                 "ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2", "ZIP_LZMA",
                 "is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile"]
      
      
      class BadZipFile(Exception):
          pass
      
      
      class LargeZipFile(Exception):
          """
          Dilempar saat menulis file ZIP, file ZIP memerlukan ekstensi ZIP64
          dan ekstensi tersebut dinonaktifkan.
          """
      
      
      error = BadZipfile = BadZipFile      # Nama kompatibilitas pra-3.2
      
      
      ZIP64_LIMIT = (1 << 31) - 1
      ZIP_FILECOUNT_LIMIT = (1 << 16) - 1
      ZIP_MAX_COMMENT = (1 << 16) - 1
      
      # Konstanta untuk metode kompresi file ZIP
      ZIP_STORED = 0
      ZIP_DEFLATED = 8
      ZIP_BZIP2 = 12
      ZIP_LZMA = 14
      # Metode kompresi ZIP lainnya tidak didukung
      
      DEFAULT_VERSION = 20
      ZIP64_VERSION = 45
      BZIP2_VERSION = 46
      LZMA_VERSION = 63
      # Kami mengenali (tetapi tidak selalu mendukung) semua fitur hingga versi tersebut
      MAX_EXTRACT_VERSION = 63
      
      # Di bawah ini adalah beberapa format dan data terkait untuk membaca/menulis header menggunakan
      # modul struct. Nama dan struktur header/rekaman adalah yang digunakan
      # dalam deskripsi PKWARE dari format file ZIP:
      #     http://www.pkware.com/documents/casestudies/APPNOTE.TXT
      # (URL valid pada Januari 2008)
      
      # Struktur "akhir direktori pusat", nomor ajaib, ukuran, dan indeks
      # (bagian V.I dalam dokumen format)
      structEndArchive = b"<4s4H2LH"
      stringEndArchive = b"PK\005\006"
      sizeEndCentDir = struct.calcsize(structEndArchive)
      
      _ECD_SIGNATURE = 0
      _ECD_DISK_NUMBER = 1
      _ECD_DISK_START = 2
      _ECD_ENTRIES_THIS_DISK = 3
      _ECD_ENTRIES_TOTAL = 4
      _ECD_SIZE = 5
      _ECD_OFFSET = 6
      _ECD_COMMENT_SIZE = 7
      # Dua indeks terakhir ini bukan bagian dari struktur seperti yang didefinisikan dalam
      # spesifikasi, tetapi mereka digunakan secara internal oleh modul ini sebagai kenyamanan
      _ECD_COMMENT = 8
      _ECD_LOCATION = 9
      
      # Struktur "direktori pusat", nomor ajaib, ukuran, dan indeks
      # entri dalam struktur (bagian V.F dalam dokumen format)
      structCentralDir = "<4s4B4HL2L5H2L"
      stringCentralDir = b"PK\001\002"
      sizeCentralDir = struct.calcsize(structCentralDir)
      
      # Indeks entri dalam struktur direktori pusat
      _CD_SIGNATURE = 0
      _CD_CREATE_VERSION = 1
      _CD_CREATE_SYSTEM = 2
      _CD_EXTRACT_VERSION = 3
      _CD_EXTRACT_SYSTEM = 4
      _CD_FLAG_BITS = 5
      _CD_COMPRESS_TYPE = 6
      _CD_TIME = 7
      _CD_DATE = 8
      _CD_CRC = 9
      _CD_COMPRESSED_SIZE = 10
      _CD_UNCOMPRESSED_SIZE = 11
      _CD_FILENAME_LENGTH = 12
      _CD_EXTRA_FIELD_LENGTH = 13
      _CD_COMMENT_LENGTH = 14
      _CD_DISK_NUMBER_START = 15
      _CD_INTERNAL_FILE_ATTRIBUTES = 16
      _CD_EXTERNAL_FILE_ATTRIBUTES = 17
      _CD_LOCAL_HEADER_OFFSET = 18
      
      # Struktur "header file lokal", nomor ajaib, ukuran, dan indeks
      # (bagian V.A dalam dokumen format)
      structFileHeader = "<4s2B4HL2L2H"
      stringFileHeader = b"PK\003\004"
      sizeFileHeader = struct.calcsize(structFileHeader)
      
      _FH_SIGNATURE = 0
      _FH_EXTRACT_VERSION = 1
      _FH_EXTRACT_SYSTEM = 2
      _FH_GENERAL_PURPOSE_FLAG_BITS = 3
      _FH_COMPRESSION_METHOD = 4
      _FH_LAST_MOD_TIME = 5
      _FH_LAST_MOD_DATE = 6
      _FH_CRC = 7
      _FH_COMPRESSED_SIZE = 8
      _FH_UNCOMPRESSED_SIZE = 9
      _FH_FILENAME_LENGTH = 10
      _FH_EXTRA_FIELD_LENGTH = 11
      
      # Struktur "lokator akhir direktori pusat ZIP64", nomor ajaib, dan ukuran
      structEndArchive64Locator = "<4sLQL"
      stringEndArchive64Locator = b"PK\x06\x07"
      sizeEndCentDir64Locator = struct.calcsize(structEndArchive64Locator)
      
      # Rekaman "akhir direktori pusat ZIP64", nomor ajaib, ukuran, dan indeks
      # (bagian V.G dalam dokumen format)
      structEndArchive64 = "<4sQ2H2L4Q"
      stringEndArchive64 = b"PK\x06\x06"
      sizeEndCentDir64 = struct.calcsize(structEndArchive64)
      
      _CD64_SIGNATURE = 0
      _CD64_DIRECTORY_RECSIZE = 1
      _CD64_CREATE_VERSION = 2
      _CD64_EXTRACT_VERSION = 3
      _CD64_DISK_NUMBER = 4
      _CD64_DISK_NUMBER_START = 5
      _CD64_NUMBER_ENTRIES_THIS_DISK = 6
      _CD64_NUMBER_ENTRIES_TOTAL = 7
      _CD64_DIRECTORY_SIZE = 8
      _CD64_OFFSET_START_CENTDIR = 9
      
      
      def _check_zipfile(fp):
          try:
              if _EndRecData(fp):
                  return True         # File memiliki nomor ajaib yang benar
          except OSError:
              pass
          return False
      
      
      def is_zipfile(filename):
          """Cepat lihat apakah sebuah file adalah file ZIP dengan memeriksa nomor ajaib.
      
          Argumen filename bisa berupa file atau objek file-like juga.
          """
          result = False
          try:
              if hasattr(filename, "read"):
                  result = _check_zipfile(fp=filename)
              else:
                  with open(filename, "rb") as fp:
                      result = _check_zipfile(fp)
          except OSError:
              pass
          return result
      
      
      def _EndRecData64(fpin, offset, endrec):
          """
          Baca rekaman akhir arsip ZIP64 dan gunakan itu untuk memperbarui endrec
          """
          if hasattr(fpin, 'bucket'):
              data = fpin.get_content_bytes(
                  fpin.filesize+offset-sizeEndCentDir64Locator, fpin.filesize+offset-1)
          else:
              try:
                  fpin.seek(offset - sizeEndCentDir64Locator, 2)
              except OSError:
                  # Jika pencarian gagal, file tidak cukup besar untuk berisi rekaman akhir arsip ZIP64,
                  # jadi cukup kembalikan rekaman akhir yang diberikan.
                  return endrec
      
              data = fpin.read(sizeEndCentDir64Locator)
      
          if len(data) != sizeEndCentDir64Locator:
              return endrec
          sig, diskno, reloff, disks = struct.unpack(structEndArchive64Locator, data)
          if sig != stringEndArchive64Locator:
              return endrec
      
          if diskno != 0 or disks != 1:
              raise BadZipFile("file zip yang mencakup beberapa disk tidak didukung")
      
          # Asumsikan tidak ada 'data ekstensible zip64'
          if hasattr(fpin, 'bucket'):
              data = fpin.get_content_bytes(
                  fpin.filesize + offset - sizeEndCentDir64Locator - sizeEndCentDir64,
                  fpin.filesize+offset-sizeEndCentDir64Locator-1)
          else:
              fpin.seek(offset - sizeEndCentDir64Locator - sizeEndCentDir64, 2)
              data = fpin.read(sizeEndCentDir64)
          if len(data) != sizeEndCentDir64:
              return endrec
          sig, sz, create_version, read_version, disk_num, disk_dir, \
              dircount, dircount2, dirsize, diroffset = \
              struct.unpack(structEndArchive64, data)
          if sig != stringEndArchive64:
              return endrec
      
          # Perbarui endrec asli menggunakan data dari rekaman ZIP64
          endrec[_ECD_SIGNATURE] = sig
          endrec[_ECD_DISK_NUMBER] = disk_num
          endrec[_ECD_DISK_START] = disk_dir
          endrec[_ECD_ENTRIES_THIS_DISK] = dircount
          endrec[_ECD_ENTRIES_TOTAL] = dircount2
          endrec[_ECD_SIZE] = dirsize
          endrec[_ECD_OFFSET] = diroffset
          return endrec
      
      
      def _EndRecData(fpin):
          """Kembalikan data dari rekaman "Akhir Direktori Pusat", atau None.
      
          Data adalah daftar sembilan item dalam rekaman ZIP "Akhir direktori pusat"
          diikuti oleh item kesepuluh, offset pencarian file dari rekaman ini."""
      
          # Tentukan ukuran file
          if hasattr(fpin, 'bucket'):
              filesize = fpin.filesize
              data = fpin.get_last_content_bytes(sizeEndCentDir)
          else:
              fpin.seek(0, 2)
              filesize = fpin.tell()
      
              # Periksa apakah ini file ZIP tanpa komentar arsip (struktur
              # "akhir direktori pusat" harus menjadi item terakhir dalam
              # file jika ini masalahnya).
              try:
                  fpin.seek(-sizeEndCentDir, 2)
              except OSError:
                  return None
              data = fpin.read()
      
          if (len(data) == sizeEndCentDir and
              data[0:4] == stringEndArchive and
                  data[-2:] == b"\000\000"):
              # Tanda tangannya benar dan tidak ada komentar, lepaskan struktur
              endrec = struct.unpack(structEndArchive, data)
              endrec = list(endrec)
      
              # Tambahkan komentar kosong dan catat offset awal
              endrec.append(b"")
              endrec.append(filesize - sizeEndCentDir)
      
              # Coba baca struktur "Akhir Direktori Pusat ZIP64"
              return _EndRecData64(fpin, -sizeEndCentDir, endrec)
      
          # Entah ini bukan file ZIP, atau ini adalah file ZIP dengan komentar arsip.
          # Cari di akhir file untuk tanda tangan rekaman "akhir direktori pusat".
          # Komentar adalah item terakhir dalam file ZIP dan bisa
          # hingga 64K panjangnya. Diasumsikan bahwa nomor ajaib "akhir direktori pusat"
          # tidak muncul dalam komentar.
          maxCommentStart = max(filesize - (1 << 16) - sizeEndCentDir, 0)
      
          if hasattr(fpin, 'bucket'):
              data = fpin.get_content_bytes(maxCommentStart, -1)
          else:
              fpin.seek(maxCommentStart, 0)
              data = fpin.read()
      
          start = data.rfind(stringEndArchive)
          if start >= 0:
              # Nomor ajaib ditemukan; coba bongkar dan interpretasikan
              recData = data[start:start+sizeEndCentDir]
              if len(recData) != sizeEndCentDir:
                  # File ZIP rusak.
                  return None
              endrec = list(struct.unpack(structEndArchive, recData))
              commentSize = endrec[_ECD_COMMENT_SIZE]  # Seperti yang diklaim oleh file ZIP
              comment = data[start+sizeEndCentDir:start+sizeEndCentDir+commentSize]
              endrec.append(comment)
              endrec.append(maxCommentStart + start)
      
              # Coba baca struktur "Akhir Direktori Pusat ZIP64"
              return _EndRecData64(fpin, maxCommentStart + start - filesize,
                                   endrec)
      
          # Tidak dapat menemukan struktur akhir direktori pusat yang valid
          return None
      
      
      class ZipInfo (object):
          """Kelas dengan atribut yang menggambarkan setiap file dalam arsip ZIP."""
      
          __slots__ = (
              'orig_filename',
              'filename',
              'date_time',
              'compress_type',
              'comment',
              'extra',
              'create_system',
              'create_version',
              'extract_version',
              'reserved',
              'flag_bits',
              'volume',
              'internal_attr',
              'external_attr',
              'header_offset',
              'CRC',
              'compress_size',
              'file_size',
              '_raw_time',
          )
      
          def __init__(self, filename="NoName", date_time=(1980, 1, 1, 0, 0, 0)):
              self.orig_filename = filename   # Nama file asli dalam arsip
      
              # Akhiri nama file pada byte null pertama. Byte null dalam nama file
              # digunakan sebagai trik oleh virus dalam arsip.
              null_byte = filename.find(chr(0))
              if null_byte >= 0:
                  filename = filename[0:null_byte]
              # Ini digunakan untuk memastikan jalur dalam file ZIP yang dihasilkan selalu menggunakan
              # garis miring ke depan sebagai pemisah direktori, seperti yang diminta oleh
              # spesifikasi format ZIP.
              if os.sep != "/" and os.sep in filename:
                  filename = filename.replace(os.sep, "/")
      
              self.filename = filename        # Nama file normal
              self.date_time = date_time      # tahun, bulan, hari, jam, menit, detik
      
              if date_time[0] < 1980:
                  raise ValueError('ZIP tidak mendukung timestamp sebelum 1980')
      
              # Nilai standar:
              self.compress_type = ZIP_STORED  # Jenis kompresi untuk file
              self.comment = b""              # Komentar untuk setiap file
              self.extra = b""                # Data tambahan ZIP
              if sys.platform == 'win32':
                  self.create_system = 0          # Sistem yang membuat arsip ZIP
              else:
                  # Anggap semuanya lainnya adalah unix-y
                  self.create_system = 3          # Sistem yang membuat arsip ZIP
              self.create_version = DEFAULT_VERSION  # Versi yang membuat arsip ZIP
              self.extract_version = DEFAULT_VERSION  # Versi yang diperlukan untuk mengekstrak arsip
              self.reserved = 0               # Harus nol
              self.flag_bits = 0              # Bit flag ZIP
              self.volume = 0                 # Nomor volume header file
              self.internal_attr = 0          # Atribut internal
              self.external_attr = 0          # Atribut file eksternal
              # Atribut lainnya disetel oleh kelas ZipFile:
              # header_offset         Offset byte ke header file
              # CRC                   CRC-32 dari file yang tidak dikompresi
              # compress_size         Ukuran file yang dikompresi
              # file_size             Ukuran file yang tidak dikompresi
      
          def __repr__(self):
              result = ['<%s filename=%r' % (self.__class__.__name__, self.filename)]
              if self.compress_type != ZIP_STORED:
                  result.append(' compress_type=%s' %
                                compressor_names.get(self.compress_type,
                                                     self.compress_type))
              hi = self.external_attr >> 16
              lo = self.external_attr & 0xFFFF
              if hi:
                  result.append(' filemode=%r' % stat.filemode(hi))
              if lo:
                  result.append(' external_attr=%#x' % lo)
              isdir = self.is_dir()
              if not isdir or self.file_size:
                  result.append(' file_size=%r' % self.file_size)
              if ((not isdir or self.compress_size) and
                  (self.compress_type != ZIP_STORED or
                   self.file_size != self.compress_size)):
                  result.append(' compress_size=%r' % self.compress_size)
              result.append('>')
              return ''.join(result)
      
          def FileHeader(self, zip64=None):
              """Kembalikan header per file sebagai string."""
              dt = self.date_time
              dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
              dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
              if self.flag_bits & 0x08:
                  # Setel ini ke nol karena kita menulisnya setelah data file
                  CRC = compress_size = file_size = 0
              else:
                  CRC = self.CRC
                  compress_size = self.compress_size
                  file_size = self.file_size
      
              extra = self.extra
      
              min_version = 0
              if zip64 is None:
                  zip64 = file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT
              if zip64:
                  fmt = '<HHQQ'
                  extra = extra + struct.pack(fmt,
                                              1, struct.calcsize(fmt)-4, file_size, compress_size)
              if file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT:
                  if not zip64:
                      raise LargeZipFile("Ukuran file memerlukan ekstensi ZIP64")
                  # File lebih besar dari yang dapat ditampung dalam integer 4 byte,
                  # kembali ke ekstensi ZIP64
                  file_size = 0xffffffff
                  compress_size = 0xffffffff
                  min_version = ZIP64_VERSION
      
              if self.compress_type == ZIP_BZIP2:
                  min_version = max(BZIP2_VERSION, min_version)
              elif self.compress_type == ZIP_LZMA:
                  min_version = max(LZMA_VERSION, min_version)
      
              self.extract_version = max(min_version, self.extract_version)
              self.create_version = max(min_version, self.create_version)
              filename, flag_bits = self._encodeFilenameFlags()
              header = struct.pack(structFileHeader, stringFileHeader,
                                   self.extract_version, self.reserved, flag_bits,
                                   self.compress_type, dostime, dosdate, CRC,
                                   compress_size, file_size,
                                   len(filename), len(extra))
              return header + filename + extra
      
          def _encodeFilenameFlags(self):
              try:
                  return self.filename.encode('ascii'), self.flag_bits
              except UnicodeEncodeError:
                  return self.filename.encode('utf-8'), self.flag_bits | 0x800
      
          def _decodeExtra(self):
              # Coba dekode bidang tambahan.
              extra = self.extra
              unpack = struct.unpack
              while len(extra) >= 4:
                  tp, ln = unpack('<HH', extra[:4])
                  if ln+4 > len(extra):
                      raise BadZipFile(
                          "Bidang tambahan rusak %04x (ukuran=%d)" % (tp, ln))
                  if tp == 0x0001:
                      if ln >= 24:
                          counts = unpack('<QQQ', extra[4:28])
                      elif ln == 16:
                          counts = unpack('<QQ', extra[4:20])
                      elif ln == 8:
                          counts = unpack('<Q', extra[4:12])
                      elif ln == 0:
                          counts = ()
                      else:
                          raise BadZipFile(
                              "Bidang tambahan rusak %04x (ukuran=%d)" % (tp, ln))
      
                      idx = 0
      
                      # Ekstensi ZIP64 (file besar dan/atau arsip besar)
                      if self.file_size in (0xffffffffffffffff, 0xffffffff):
                          self.file_size = counts[idx]
                          idx += 1
      
                      if self.compress_size == 0xFFFFFFFF:
                          self.compress_size = counts[idx]
                          idx += 1
      
                      if self.header_offset == 0xffffffff:
                          old = self.header_offset
                          self.header_offset = counts[idx]
                          idx += 1
      
                  extra = extra[ln+4:]
      
          @classmethod
          def from_file(cls, filename, arcname=None):
              """Bangun ZipInfo yang sesuai untuk file pada sistem file.
      
              filename harus berupa path ke file atau direktori pada sistem file.
      
              arcname adalah nama yang akan dimiliki dalam arsip (secara default,
              ini akan sama dengan filename, tetapi tanpa huruf drive dan dengan
              pemisah jalur terdepan dihapus).
              """
              if isinstance(filename, os.PathLike):
                  filename = os.fspath(filename)
              st = os.stat(filename)
              isdir = stat.S_ISDIR(st.st_mode)
              mtime = time.localtime(st.st_mtime)
              date_time = mtime[0:6]
              # Buat instance ZipInfo untuk menyimpan informasi file
              if arcname is None:
                  arcname = filename
              arcname = os.path.normpath(os.path.splitdrive(arcname)[1])
              while arcname[0] in (os.sep, os.altsep):
                  arcname = arcname[1:]
              if isdir:
                  arcname += '/'
              zinfo = cls(arcname, date_time)
              zinfo.external_attr = (st.st_mode & 0xFFFF) << 16  # Atribut Unix
              if isdir:
                  zinfo.file_size = 0
                  zinfo.external_attr |= 0x10  # Bendera direktori MS-DOS
              else:
                  zinfo.file_size = st.st_size
      
              return zinfo
      
          def is_dir(self):
              """Kembalikan True jika anggota arsip ini adalah direktori."""
              return self.filename[-1] == '/'
      
      
      # Enkripsi ZIP menggunakan primitif CRC32 satu-byte untuk mengacak beberapa
      # kunci internal. Kami memperhatikan bahwa implementasi langsung lebih cepat daripada
      # bergantung pada binascii.crc32().
      
      _crctable = None
      
      
      def _gen_crc(crc):
          for j in range(8):
              if crc & 1:
                  crc = (crc >> 1) ^ 0xEDB88320
              else:
                  crc >>= 1
          return crc
      
      # ZIP mendukung bentuk enkripsi berbasis kata sandi. Meskipun serangan teks biasa dikenal
      # telah ditemukan terhadapnya, itu masih berguna
      # untuk dapat mengambil data dari file semacam itu.
      #
      # Penggunaan:
      #     zd = _ZipDecrypter(mypwd)
      #     plain_bytes = zd(cypher_bytes)
      
      
      def _ZipDecrypter(pwd):
          key0 = 305419896
          key1 = 591751049
          key2 = 878082192
      
          global _crctable
          if _crctable is None:
              _crctable = list(map(_gen_crc, range(256)))
          crctable = _crctable
      
          def crc32(ch, crc):
              """Hitung primitif CRC32 pada satu byte."""
              return (crc >> 8) ^ crctable[(crc ^ ch) & 0xFF]
      
          def update_keys(c):
              nonlocal key0, key1, key2
              key0 = crc32(c, key0)
              key1 = (key1 + (key0 & 0xFF)) & 0xFFFFFFFF
              key1 = (key1 * 134775813 + 1) & 0xFFFFFFFF
              key2 = crc32(key1 >> 24, key2)
      
          for p in pwd:
              update_keys(p)
      
          def decrypter(data):
              """Dekripsi objek bytes."""
              result = bytearray()
              append = result.append
              for c in data:
                  k = key2 | 2
                  c ^= ((k * (k ^ 1)) >> 8) & 0xFF
                  update_keys(c)
                  append(c)
              return bytes(result)
      
          return decrypter
      
      
      class LZMACompressor:
      
          def __init__(self):
              self._comp = None
      
          def _init(self):
              props = lzma._encode_filter_properties({'id': lzma.FILTER_LZMA1})
              self._comp = lzma.LZMACompressor(lzma.FORMAT_RAW, filters=[
                  lzma._decode_filter_properties(lzma.FILTER_LZMA1, props)
              ])
              return struct.pack('<BBH', 9, 4, len(props)) + props
      
          def compress(self, data):
              if self._comp is None:
                  return self._init() + self._comp.compress(data)
              return self._comp.compress(data)
      
          def flush(self):
              if self._comp is None:
                  return self._init() + self._comp.flush()
              return self._comp.flush()
      
      
      class LZMADecompressor:
      
          def __init__(self):
              self._decomp = None
              self._unconsumed = b''
              self.eof = False
      
          def decompress(self, data):
              if self._decomp is None:
                  self._unconsumed += data
                  if len(self._unconsumed) <= 4:
                      return b''
                  psize, = struct.unpack('<H', self._unconsumed[2:4])
                  if len(self._unconsumed) <= 4 + psize:
                      return b''
      
                  self._decomp = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[
                      lzma._decode_filter_properties(lzma.FILTER_LZMA1,
                                                     self._unconsumed[4:4 + psize])
                  ])
                  data = self._unconsumed[4 + psize:]
                  del self._unconsumed
      
              result = self._decomp.decompress(data)
              self.eof = self._decomp.eof
              return result
      
      
      compressor_names = {
          0: 'store',
          1: 'shrink',
          2: 'reduce',
          3: 'reduce',
          4: 'reduce',
          5: 'reduce',
          6: 'implode',
          7: 'tokenize',
          8: 'deflate',
          9: 'deflate64',
          10: 'implode',
          12: 'bzip2',
          14: 'lzma',
          18: 'terse',
          19: 'lz77',
          97: 'wavpack',
          98: 'ppmd',
      }
      
      
      def _check_compression(compression):
          if compression == ZIP_STORED:
              pass
          elif compression == ZIP_DEFLATED:
              if not zlib:
                  raise RuntimeError(
                      "Kompresi memerlukan modul (tidak tersedia) zlib")
          elif compression == ZIP_BZIP2:
              if not bz2:
                  raise RuntimeError(
                      "Kompresi memerlukan modul (tidak tersedia) bz2")
          elif compression == ZIP_LZMA:
              if not lzma:
                  raise RuntimeError(
                      "Kompresi memerlukan modul (tidak tersedia) lzma")
          else:
              raise NotImplementedError("Metode kompresi tersebut tidak didukung")
      
      
      def _get_compressor(compress_type):
          if compress_type == ZIP_DEFLATED:
              return zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
                                      zlib.DEFLATED, -15)
          elif compress_type == ZIP_BZIP2:
              return bz2.BZ2Compressor()
          elif compress_type == ZIP_LZMA:
              return LZMACompressor()
          else:
              return None
      
      
      def _get_decompressor(compress_type):
          if compress_type == ZIP_STORED:
              return None
          elif compress_type == ZIP_DEFLATED:
              return zlib.decompressobj(-15)
          elif compress_type == ZIP_BZIP2:
              return bz2.BZ2Decompressor()
          elif compress_type == ZIP_LZMA:
              return LZMADecompressor()
          else:
              descr = compressor_names.get(compress_type)
              if descr:
                  raise NotImplementedError(
                      "tipe kompresi %d (%s)" % (compress_type, descr))
              else:
                  raise NotImplementedError("tipe kompresi %d" % (compress_type,))
      
      
      class _SharedFile:
          def __init__(self, file, pos, close, lock, writing):
              self._file = file
              self._pos = pos
              self._close = close
              self._lock = lock
              self._writing = writing
              if hasattr(self._file, "bucket"):
                  self._fp = self._file.get_reader(pos, -1)
      
          def read(self, n=-1):
              with self._lock:
                  if self._writing():
                      raise ValueError("Tidak bisa membaca dari file ZIP saat ada "
                                       "handle penulisan terbuka padanya. "
                                       "Tutup handle penulisan sebelum mencoba membaca.")
      
                  if hasattr(self._file, "bucket"):
                      data = self._fp.read(n)
                      self._pos += n
                  else:
                      self._file.seek(self._pos)
                      data = self._file.read(n)
                      self._pos = self._file.tell()
                  return data
      
          def close(self):
              if self._file is not None:
                  fileobj = self._file
                  self._file = None
                  self._close(fileobj)
      
      # Berikan metode tell untuk aliran yang tidak dapat dicari
      
      
      class _Tellable:
          def __init__(self, fp):
              self.fp = fp
              self.offset = 0
      
          def write(self, data):
              n = self.fp.write(data)
              self.offset += n
              return n
      
          def tell(self):
              return self.offset
      
          def flush(self):
              self.fp.flush()
      
          def close(self):
              self.fp.close()
      
      
      class ZipExtFile(io.BufferedIOBase):
          """Objek seperti file untuk membaca anggota arsip.
             Dikembalikan oleh ZipFile.open().
          """
      
          # Ukuran maksimum yang didukung oleh dekompresor.
          MAX_N = 1 << 31 - 1
      
          # Baca dari file terkompresi dalam blok 4k.
          MIN_READ_SIZE = 4096
      
          def __init__(self, fileobj, mode, zipinfo, decrypter=None,
                       close_fileobj=False):
              self._fileobj = fileobj
              self._decrypter = decrypter
              self._close_fileobj = close_fileobj
      
              self._compress_type = zipinfo.compress_type
              self._compress_left = zipinfo.compress_size
              self._left = zipinfo.file_size
      
              self._decompressor = _get_decompressor(self._compress_type)
      
              self._eof = False
              self._readbuffer = b''
              self._offset = 0
      
              self.newlines = None
      
              # Sesuaikan ukuran baca untuk file terenkripsi karena 12 byte pertama
              # digunakan untuk informasi enkripsi/kata sandi.
              if self._decrypter is not None:
                  self._compress_left -= 12
      
              self.mode = mode
              self.name = zipinfo.filename
      
              if hasattr(zipinfo, 'CRC'):
                  self._expected_crc = zipinfo.CRC
                  self._running_crc = crc32(b'')
              else:
                  self._expected_crc = None
      
          def __repr__(self):
              result = ['<%s.%s' % (self.__class__.__module__,
                                    self.__class__.__qualname__)]
              if not self.closed:
                  result.append(' name=%r mode=%r' % (self.name, self.mode))
                  if self._compress_type != ZIP_STORED:
                      result.append(' compress_type=%s' %
                                    compressor_names.get(self._compress_type,
                                                         self._compress_type))
              else:
                  result.append(' [closed]')
              result.append('>')
              return ''.join(result)
      
          def readline(self, limit=-1):
              """Baca dan kembalikan satu baris dari aliran.
      
              Jika limit ditentukan, paling banyak limit byte akan dibaca.
              """
      
              if limit < 0:
                  # Pintasan kasus umum - baris baru ditemukan di buffer.
                  i = self._readbuffer.find(b'\n', self._offset) + 1
                  if i > 0:
                      line = self._readbuffer[self._offset: i]
                      self._offset = i
                      return line
      
              return io.BufferedIOBase.readline(self, limit)
      
          def peek(self, n=1):
              """Kembalikan byte yang di-buffer tanpa maju posisi."""
              if n > len(self._readbuffer) - self._offset:
                  chunk = self.read(n)
                  if len(chunk) > self._offset:
                      self._readbuffer = chunk + self._readbuffer[self._offset:]
                      self._offset = 0
                  else:
                      self._offset -= len(chunk)
      
              # Kembalikan hingga 512 byte untuk mengurangi overhead alokasi untuk loop ketat.
              return self._readbuffer[self._offset: self._offset + 512]
      
          def readable(self):
              return True
      
          def read(self, n=-1):
              """Baca dan kembalikan hingga n byte.
              Jika argumen dihilangkan, None, atau negatif, data dibaca dan dikembalikan sampai EOF tercapai..
              """
              if n is None or n < 0:
                  buf = self._readbuffer[self._offset:]
                  self._readbuffer = b''
                  self._offset = 0
                  while not self._eof:
                      buf += self._read1(self.MAX_N)
                  return buf
      
              end = n + self._offset
              if end < len(self._readbuffer):
                  buf = self._readbuffer[self._offset:end]
                  self._offset = end
                  return buf
      
              n = end - len(self._readbuffer)
              buf = self._readbuffer[self._offset:]
              self._readbuffer = b''
              self._offset = 0
              while n > 0 and not self._eof:
                  data = self._read1(n)
                  if n < len(data):
                      self._readbuffer = data
                      self._offset = n
                      buf += data[:n]
                      break
                  buf += data
                  n -= len(data)
              return buf
      
          def _update_crc(self, newdata):
              # Perbarui CRC menggunakan data yang diberikan.
              if self._expected_crc is None:
                  # Tidak perlu menghitung CRC jika kita tidak memiliki nilai referensi
                  return
              self._running_crc = crc32(newdata, self._running_crc)
              # Periksa CRC jika kita berada di akhir file
              if self._eof and self._running_crc != self._expected_crc:
                  raise BadZipFile("Bad CRC-32 untuk file %r" % self.name)
      
          def read1(self, n):
              """Baca hingga n byte dengan paling banyak satu panggilan sistem read()."""
      
              if n is None or n < 0:
                  buf = self._readbuffer[self._offset:]
                  self._readbuffer = b''
                  self._offset = 0
                  while not self._eof:
                      data = self._read1(self.MAX_N)
                      if data:
                          buf += data
                          break
                  return buf
      
              end = n + self._offset
              if end < len(self._readbuffer):
                  buf = self._readbuffer[self._offset:end]
                  self._offset = end
                  return buf
      
              n = end - len(self._readbuffer)
              buf = self._readbuffer[self._offset:]
              self._readbuffer = b''
              self._offset = 0
              if n > 0:
                  while not self._eof:
                      data = self._read1(n)
                      if n < len(data):
                          self._readbuffer = data
                          self._offset = n
                          buf += data[:n]
                          break
                      if data:
                          buf += data
                          break
              return buf
      
          def _read1(self, n):
              # Baca hingga n byte terkompresi dengan paling banyak satu panggilan sistem read(),
              # dekripsi dan dekompresi mereka.
              if self._eof or n <= 0:
                  return b''
      
              # Baca dari file.
              if self._compress_type == ZIP_DEFLATED:
                  # Tangani data yang belum dikonsumsi.
                  data = self._decompressor.unconsumed_tail
                  if n > len(data):
                      data += self._read2(n - len(data))
              else:
                  data = self._read2(n)
      
              if self._compress_type == ZIP_STORED:
                  self._eof = self._compress_left <= 0
              elif self._compress_type == ZIP_DEFLATED:
                  n = max(n, self.MIN_READ_SIZE)
                  data = self._decompressor.decompress(data, n)
                  self._eof = (self._decompressor.eof or
                               self._compress_left <= 0 and
                               not self._decompressor.unconsumed_tail)
                  if self._eof:
                      data += self._decompressor.flush()
              else:
                  data = self._decompressor.decompress(data)
                  self._eof = self._decompressor.eof or self._compress_left <= 0
      
              data = data[:self._left]
              self._left -= len(data)
              if self._left <= 0:
                  self._eof = True
              self._update_crc(data)
              return data
      
          def _read2(self, n):
              if self._compress_left <= 0:
                  return b''
      
              n = max(n, self.MIN_READ_SIZE)
              n = min(n, self._compress_left)
      
              data = self._fileobj.read(n)
              self._compress_left -= len(data)
              if not data:
                  raise EOFError
      
              if self._decrypter is not None:
                  data = self._decrypter(data)
              return data
      
          def close(self):
              try:
                  if self._close_fileobj:
                      self._fileobj.close()
              finally:
                  super().close()
      
      
      class _ZipWriteFile(io.BufferedIOBase):
          def __init__(self, zf, zinfo, zip64):
              self._zinfo = zinfo
              self._zip64 = zip64
              self._zipfile = zf
              self._compressor = _get_compressor(zinfo.compress_type)
              self._file_size = 0
              self._compress_size = 0
              self._crc = 0
      
          @property
          def _fileobj(self):
              return self._zipfile.fp
      
          def writable(self):
              return True
      
          def write(self, data):
              if self.closed:
                  raise ValueError('Operasi I/O pada file yang ditutup.')
              nbytes = len(data)
              self._file_size += nbytes
              self._crc = crc32(data, self._crc)
              if self._compressor:
                  data = self._compressor.compress(data)
                  self._compress_size += len(data)
              self._fileobj.write(data)
              return nbytes
      
          def close(self):
              if self.closed:
                  return
              super().close()
              # Flush data apa pun dari kompresor, dan perbarui info header
              if self._compressor:
                  buf = self._compressor.flush()
                  self._compress_size += len(buf)
                  self._fileobj.write(buf)
                  self._zinfo.compress_size = self._compress_size
              else:
                  self._zinfo.compress_size = self._file_size
              self._zinfo.CRC = self._crc
              self._zinfo.file_size = self._file_size
      
              # Tulis ulang info header yang diperbarui
              if self._zinfo.flag_bits & 0x08:
                  # Tulis CRC dan ukuran file setelah data file
                  fmt = '<LQQ' if self._zip64 else '<LLL'
                  self._fileobj.write(struct.pack(fmt, self._zinfo.CRC,
                                                  self._zinfo.compress_size, self._zinfo.file_size))
                  self._zipfile.start_dir = self._fileobj.tell()
              else:
                  if not self._zip64:
                      if self._file_size > ZIP64_LIMIT:
                          raise RuntimeError('Ukuran file melebihi batas ZIP64 secara tak terduga')
                      if self._compress_size > ZIP64_LIMIT:
                          raise RuntimeError('Ukuran terkompresi melebihi batas ZIP64 secara tak terduga')
                  # Mundur dan tulis header file (yang sekarang akan mencakup
                  # CRC dan ukuran file yang benar)
      
                  # Simpan posisi saat ini dalam file
                  self._zipfile.start_dir = self._fileobj.tell()
                  self._fileobj.seek(self._zinfo.header_offset)
                  self._fileobj.write(self._zinfo.FileHeader(self._zip64))
                  self._fileobj.seek(self._zipfile.start_dir)
      
              self._zipfile._writing = False
      
              # Berhasil ditulis: Tambahkan file ke cache kami
              self._zipfile.filelist.append(self._zinfo)
              self._zipfile.NameToInfo[self._zinfo.filename] = self._zinfo
      
      
      class ZipFile:
          """ Kelas dengan metode untuk membuka, membaca, menulis, menutup, daftar file zip.
      
          z = ZipFile(file, mode="r", compression=ZIP_STORED, allowZip64=True)
      
          file: Baik path ke file, atau objek seperti file.
                Jika itu adalah path, file akan dibuka dan ditutup oleh ZipFile.
          mode: Mode bisa berupa baca 'r', tulis 'w', buat eksklusif 'x',
                atau tambahkan 'a'.
          compression: ZIP_STORED (tanpa kompresi), ZIP_DEFLATED (memerlukan zlib),
                       ZIP_BZIP2 (memerlukan bz2) atau ZIP_LZMA (memerlukan lzma).
          allowZip64: jika Benar ZipFile akan membuat file dengan ekstensi ZIP64 saat
                      diperlukan, jika tidak akan memunculkan pengecualian saat ini akan
                      diperlukan.
      
          """
      
          fp = None                   # Diatur di sini karena __del__ memeriksanya
          _windows_illegal_name_trans_table = None
      
          def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True):
              """Buka file ZIP dengan mode baca 'r', tulis 'w', buat eksklusif 'x',
              atau tambahkan 'a'."""
              if mode not in ('r', 'w', 'x', 'a'):
                  raise ValueError("ZipFile memerlukan mode 'r', 'w', 'x', atau 'a'")
      
              _check_compression(compression)
      
              self._allowZip64 = allowZip64
              self._didModify = False
              self.debug = 0  # Tingkat pencetakan: 0 hingga 3
              self.NameToInfo = {}    # Temukan informasi file yang diberikan nama
              self.filelist = []      # Daftar instance ZipInfo untuk arsip
              self.compression = compression  # Metode kompresi
              self.mode = mode
              self.pwd = None
              self._comment = b''
      
              # Periksa apakah kita diberikan objek seperti file
              if isinstance(file, os.PathLike):
                  file = os.fspath(file)
              if isinstance(file, str):
                  # Tidak, itu adalah nama file
                  self._filePassed = 0
                  self.filename = file
                  modeDict = {'r': 'rb', 'w': 'w+b', 'x': 'x+b', 'a': 'r+b',
                              'r+b': 'w+b', 'w+b': 'wb', 'x+b': 'xb'}
                  filemode = modeDict[mode]
                  while True:
                      try:
                          self.fp = io.open(file, filemode)
                      except OSError:
                          if filemode in modeDict:
                              filemode = modeDict[filemode]
                              continue
                          raise
                      break
              else:
                  self._filePassed = 1
                  self.fp = file
                  self.filename = getattr(file, 'name', None)
              self._fileRefCnt = 1
              self._lock = threading.RLock()
              self._seekable = True
              self._writing = False
      
              try:
                  if mode == 'r':
                      self._RealGetContents()
                  elif mode in ('w', 'x'):
                      # atur bendera dimodifikasi sehingga direktori pusat ditulis
                      # bahkan jika tidak ada file yang ditambahkan ke arsip
                      self._didModify = True
                      try:
                          self.start_dir = self.fp.tell()
                      except (AttributeError, OSError):
                          self.fp = _Tellable(self.fp)
                          self.start_dir = 0
                          self._seekable = False
                      else:
                          # Beberapa objek seperti file dapat memberikan tell() tetapi tidak seek()
                          try:
                              self.fp.seek(self.start_dir)
                          except (AttributeError, OSError):
                              self._seekable = False
                  elif mode == 'a':
                      try:
                          # Lihat apakah file adalah file zip
                          self._RealGetContents()
                          # cari ke awal direktori dan timpa
                          self.fp.seek(self.start_dir)
                      except BadZipFile:
                          # file bukan file zip, cukup tambahkan
                          self.fp.seek(0, 2)
      
                          # atur bendera dimodifikasi sehingga direktori pusat ditulis
                          # bahkan jika tidak ada file yang ditambahkan ke arsip
                          self._didModify = True
                          self.start_dir = self.fp.tell()
                  else:
                      raise ValueError("Mode harus 'r', 'w', 'x', atau 'a'")
              except:
                  fp = self.fp
                  self.fp = None
                  self._fpclose(fp)
                  raise
      
          def __enter__(self):
              return self
      
          def __exit__(self, type, value, traceback):
              self.close()
      
          def __repr__(self):
              result = ['<%s.%s' % (self.__class__.__module__,
                                    self.__class__.__qualname__)]
              if self.fp is not None:
                  if self._filePassed:
                      result.append(' file=%r' % self.fp)
                  elif self.filename is not None:
                      result.append(' filename=%r' % self.filename)
                  result.append(' mode=%r' % self.mode)
              else:
                  result.append(' [closed]')
              result.append('>')
              return ''.join(result)
      
          def _RealGetContents(self):
              """Baca isi tabel konten untuk file ZIP."""
              fp = self.fp
              try:
                  endrec = _EndRecData(fp)
              except OSError:
                  raise BadZipFile("File bukan file zip")
              if not endrec:
                  raise BadZipFile("File bukan file zip")
              if self.debug > 1:
                  print(endrec)
              size_cd = endrec[_ECD_SIZE]             # byte dalam direktori pusat
              offset_cd = endrec[_ECD_OFFSET]         # offset direktori pusat
              self._comment = endrec[_ECD_COMMENT]    # komentar arsip
      
              # "concat" adalah nol, kecuali jika zip digabungkan dengan file lain
              concat = endrec[_ECD_LOCATION] - size_cd - offset_cd
              if endrec[_ECD_SIGNATURE] == stringEndArchive64:
                  # Jika struktur ekstensi Zip64 ada, akun untuk itu
                  concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator)
      
              if self.debug > 2:
                  inferred = concat + offset_cd
                  print("given, inferred, offset", offset_cd, inferred, concat)
              # self.start_dir: Posisi awal direktori pusat
              self.start_dir = offset_cd + concat
      
              if hasattr(fp, "bucket"):
                  data = fp.get_content_bytes(
                      self.start_dir, self.start_dir+size_cd-1)
              else:
                  fp.seek(self.start_dir, 0)
                  data = fp.read(size_cd)
      
              fp = io.BytesIO(data)
              total = 0
              while total < size_cd:
                  centdir = fp.read(sizeCentralDir)
                  if len(centdir) != sizeCentralDir:
                      raise BadZipFile("Direktori pusat terpotong")
                  centdir = struct.unpack(structCentralDir, centdir)
                  if centdir[_CD_SIGNATURE] != stringCentralDir:
                      raise BadZipFile("Nomor ajaib buruk untuk direktori pusat")
                  if self.debug > 2:
                      print(centdir)
                  filename = fp.read(centdir[_CD_FILENAME_LENGTH])
                  flags = centdir[5]
                  if flags & 0x800:
                      # Ekstensi nama file UTF-8
                      filename = filename.decode('utf-8')
                  else:
                      # Pengkodean nama file ZIP historis
                      filename = filename.decode('cp437')
                  # Buat instance ZipInfo untuk menyimpan informasi file
                  x = ZipInfo(filename)
                  x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH])
                  x.comment = fp.read(centdir[_CD_COMMENT_LENGTH])
                  x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET]
                  (x.create_version, x.create_system, x.extract_version, x.reserved,
                   x.flag_bits, x.compress_type, t, d,
                   x.CRC, x.compress_size, x.file_size) = centdir[1:12]
                  if x.extract_version > MAX_EXTRACT_VERSION:
                      raise NotImplementedError("versi file zip %.1f" %
                                                (x.extract_version / 10))
                  x.volume, x.internal_attr, x.external_attr = centdir[15:18]
                  # Ubah kode tanggal/waktu menjadi (tahun, bulan, hari, jam, menit, detik)
                  x._raw_time = t
                  x.date_time = ((d >> 9)+1980, (d >> 5) & 0xF, d & 0x1F,
                                 t >> 11, (t >> 5) & 0x3F, (t & 0x1F) * 2)
      
                  x._decodeExtra()
                  x.header_offset = x.header_offset + concat
                  self.filelist.append(x)
                  self.NameToInfo[x.filename] = x
      
                  # perbarui total byte yang dibaca dari direktori pusat
                  total = (total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH]
                           + centdir[_CD_EXTRA_FIELD_LENGTH]
                           + centdir[_CD_COMMENT_LENGTH])
      
                  if self.debug > 2:
                      print("total", total)
      
          def namelist(self):
              """Kembalikan daftar nama file dalam arsip."""
              return [data.filename for data in self.filelist]
      
          def infolist(self):
              """Kembalikan daftar instance kelas ZipInfo untuk file dalam
              arsip."""
              return self.filelist
      
          def printdir(self, file=None):
              """Cetak daftar isi untuk file zip."""
              print("%-46s %19s %12s" % ("Nama File", "Dimodifikasi    ", "Ukuran"),
                    file=file)
              for zinfo in self.filelist:
                  date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time[:6]
                  print("%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size),
                        file=file)
      
          def testzip(self):
              """Baca semua file dan periksa CRC."""
              chunk_size = 2 ** 20
              for zinfo in self.filelist:
                  try:
                      # Baca dalam blok, untuk menghindari OverflowError atau
                      # MemoryError dengan file tertanam yang sangat besar.
                      with self.open(zinfo.filename, "r") as f:
                          while f.read(chunk_size):     # Periksa CRC-32
                              pass
                  except BadZipFile:
                      return zinfo.filename
      
          def getinfo(self, name):
              """Kembalikan instance ZipInfo yang diberikan 'name'."""
              info = self.NameToInfo.get(name)
              if info is None:
                  raise KeyError(
                      'Tidak ada item bernama %r dalam arsip' % name)
      
              return info
      
          def setpassword(self, pwd):
              """Setel kata sandi default untuk file terenkripsi."""
              if pwd and not isinstance(pwd, bytes):
                  raise TypeError("pwd: diharapkan bytes, dapatkan %s" % type(pwd).__name__)
              if pwd:
                  self.pwd = pwd
              else:
                  self.pwd = None
      
          @property
          def comment(self):
              """Teks komentar yang terkait dengan file ZIP."""
              return self._comment
      
          @comment.setter
          def comment(self, comment):
              if not isinstance(comment, bytes):
                  raise TypeError("comment: diharapkan bytes, dapatkan %s" %
                                  type(comment).__name__)
              # periksa panjang komentar yang valid
              if len(comment) > ZIP_MAX_COMMENT:
                  import warnings
                  warnings.warn('Komentar arsip terlalu panjang; memotong menjadi %d byte'
                                % ZIP_MAX_COMMENT, stacklevel=2)
                  comment = comment[:ZIP_MAX_COMMENT]
              self._comment = comment
              self._didModify = True
      
          def read(self, name, pwd=None):
              """Kembalikan byte file (sebagai string) untuk nama."""
              with self.open(name, "r", pwd) as fp:
                  return fp.read()
      
          def open(self, name, mode="r", pwd=None, *, force_zip64=False):
              """Kembalikan objek seperti file untuk 'name'.
      
              name adalah string untuk nama file dalam file ZIP, atau objek ZipInfo.
      
              mode harus 'r' untuk membaca file yang sudah ada dalam file ZIP, atau 'w' untuk
              menulis ke file yang baru ditambahkan ke arsip.
      
              pwd adalah kata sandi untuk mendekripsi file (hanya digunakan untuk membaca).
      
              Saat menulis, jika ukuran file tidak diketahui sebelumnya tetapi mungkin melebihi
              2 GiB, berikan force_zip64 untuk menggunakan format ZIP64, yang dapat menangani file besar. Jika ukuran diketahui sebelumnya, sebaiknya berikan instance ZipInfo
              untuk name, dengan zinfo.file_size disetel.
              """
              if mode not in {"r", "w"}:
                  raise ValueError('open() memerlukan mode "r" atau "w"')
              if pwd and not isinstance(pwd, bytes):
                  raise TypeError("pwd: diharapkan bytes, dapatkan %s" % type(pwd).__name__)
              if pwd and (mode == "w"):
                  raise ValueError("pwd hanya didukung untuk membaca file")
              if not self.fp:
                  raise ValueError(
                      "Upaya menggunakan arsip ZIP yang sudah ditutup")
      
              # Pastikan kita memiliki objek info
              if isinstance(name, ZipInfo):
                  # 'name' sudah merupakan objek info
                  zinfo = name
              elif mode == 'w':
                  zinfo = ZipInfo(name)
                  zinfo.compress_type = self.compression
              else:
                  # Dapatkan objek info untuk name
                  zinfo = self.getinfo(name)
      
              if mode == 'w':
                  return self._open_to_write(zinfo, force_zip64=force_zip64)
      
              if self._writing:
                  raise ValueError("Tidak bisa membaca dari file ZIP saat ada "
                                   "handle penulisan terbuka padanya. "
                                   "Tutup handle penulisan sebelum mencoba membaca.")
      
              # Buka untuk membaca:
              self._fileRefCnt += 1
              zef_file = _SharedFile(self.fp, zinfo.header_offset,
                                     self._fpclose, self._lock, lambda: self._writing)
              try:
                  # Lewati header file:
                  fheader = zef_file.read(sizeFileHeader)
                  if len(fheader) != sizeFileHeader:
                      raise BadZipFile("Header file terpotong")
                  fheader = struct.unpack(structFileHeader, fheader)
                  if fheader[_FH_SIGNATURE] != stringFileHeader:
                      raise BadZipFile("Nomor ajaib buruk untuk header file")
      
                  fname = zef_file.read(fheader[_FH_FILENAME_LENGTH])
                  if fheader[_FH_EXTRA_FIELD_LENGTH]:
                      zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH])
      
                  if zinfo.flag_bits & 0x20:
                      # Data tambalan terkompresi Zip 2.7
                      raise NotImplementedError(
                          "data tambalan terkompresi (bit flag 5)")
      
                  if zinfo.flag_bits & 0x40:
                      # enkripsi kuat
                      raise NotImplementedError("enkripsi kuat (bit flag 6)")
      
                  if zinfo.flag_bits & 0x800:
                      # Nama file UTF-8
                      fname_str = fname.decode("utf-8")
                  else:
                      fname_str = fname.decode("cp437")
      
                  if fname_str != zinfo.orig_filename:
                      raise BadZipFile(
                          'Nama file dalam direktori %r dan header %r berbeda.'
                          % (zinfo.orig_filename, fname))
      
                  # periksa flag terenkripsi dan tangani kata sandi
                  is_encrypted = zinfo.flag_bits & 0x1
                  zd = None
                  if is_encrypted:
                      if not pwd:
                          pwd = self.pwd
                      if not pwd:
                          raise RuntimeError("File %r terenkripsi, kata sandi diperlukan untuk ekstraksi" % name)
      
                      zd = _ZipDecrypter(pwd)
                      # 12 byte pertama dalam aliran sandi adalah header enkripsi
                      # digunakan untuk memperkuat algoritma. 11 byte pertama sepenuhnya acak, sedangkan byte ke-12 berisi MSB dari CRC,
                      # atau MSB dari waktu file tergantung pada jenis header
                      # dan digunakan untuk memeriksa kebenaran kata sandi.
                      header = zef_file.read(12)
                      h = zd(header[0:12])
                      if zinfo.flag_bits & 0x8:
                          # bandingkan dengan tipe file dari header lokal yang diperluas
                          check_byte = (zinfo._raw_time >> 8) & 0xff
                      else:
                          # bandingkan dengan CRC jika tidak
                          check_byte = (zinfo.CRC >> 24) & 0xff
                      if h[11] != check_byte:
                          raise RuntimeError("Kata sandi buruk untuk file %r" % name)
      
                  return ZipExtFile(zef_file, mode, zinfo, zd, True)
              except:
                  zef_file.close()
                  raise
      
          def _open_to_write(self, zinfo, force_zip64=False):
              if force_zip64 and not self._allowZip64:
                  raise ValueError(
                      "force_zip64 adalah Benar, tetapi allowZip64 adalah Salah saat membuka "
                      "file ZIP."
                  )
              if self._writing:
                  raise ValueError("Tidak bisa menulis ke file ZIP saat ada "
                                   "handle penulisan lain yang terbuka padanya. "
                                   "Tutup handle pertama sebelum membuka yang lain.")
      
              # Ukuran dan CRC ditimpa dengan data yang benar setelah memproses file
              if not hasattr(zinfo, 'file_size'):
                  zinfo.file_size = 0
              zinfo.compress_size = 0
              zinfo.CRC = 0
      
              zinfo.flag_bits = 0x00
              if zinfo.compress_type == ZIP_LZMA:
                  # Data terkompresi mencakup penanda akhir aliran (EOS)
                  zinfo.flag_bits |= 0x02
              if not self._seekable:
                  zinfo.flag_bits |= 0x08
      
              if not zinfo.external_attr:
                  zinfo.external_attr = 0o600 << 16  # izin: ?rw-------
      
              # Ukuran terkompresi bisa lebih besar dari ukuran tidak terkompresi
              zip64 = self._allowZip64 and \
                  (force_zip64 or zinfo.file_size * 1.05 > ZIP64_LIMIT)
      
              if self._seekable:
                  self.fp.seek(self.start_dir)
              zinfo.header_offset = self.fp.tell()
      
              self._writecheck(zinfo)
              self._didModify = True
      
              self.fp.write(zinfo.FileHeader(zip64))
      
              self._writing = True
              return _ZipWriteFile(self, zinfo, zip64)
      
          def extract(self, member, path=None, pwd=None):
              """Ekstrak anggota dari arsip ke direktori kerja saat ini,
                 menggunakan nama lengkapnya. Informasi filenya diekstrak seakurat mungkin. `member` bisa berupa nama file atau objek ZipInfo. Anda dapat
                 tentukan direktori berbeda menggunakan `path`.
              """
              if path is None:
                  path = os.getcwd()
              else:
                  path = os.fspath(path)
      
              return self._extract_member(member, path, pwd)
      
          def extractall(self, path=None, members=None, pwd=None):
              """Ekstrak semua anggota dari arsip ke direktori kerja saat ini. `path` menentukan direktori berbeda untuk ekstraksi.
                 `members` bersifat opsional dan harus merupakan subset dari daftar yang dikembalikan
                 oleh namelist().
              """
              if members is None:
                  members = self.namelist()
      
              if path is None:
                  path = os.getcwd()
              else:
                  path = os.fspath(path)
      
              for zipinfo in members:
                  self._extract_member(zipinfo, path, pwd)
      
          @classmethod
          def _sanitize_windows_name(cls, arcname, pathsep):
              """Ganti karakter buruk dan hapus titik di bagian akhir."""
              table = cls._windows_illegal_name_trans_table
              if not table:
                  illegal = ':<>|"?*'
                  table = str.maketrans(illegal, '_' * len(illegal))
                  cls._windows_illegal_name_trans_table = table
              arcname = arcname.translate(table)
              # hapus titik di akhir
              arcname = (x.rstrip('.') for x in arcname.split(pathsep))
              # gabungkan kembali, menghapus bagian kosong.
              arcname = pathsep.join(x for x in arcname if x)
              return arcname
      
          def _extract_member(self, member, targetpath, pwd):
              """Ekstrak objek ZipInfo 'member' ke file fisik
                 pada jalur targetpath.
              """
              if not isinstance(member, ZipInfo):
                  member = self.getinfo(member)
      
              # bangun jalur tujuan, mengganti
              # garis miring ke pemisah jalur spesifik platform.
              arcname = member.filename.replace('/', os.path.sep)
      
              if os.path.altsep:
                  arcname = arcname.replace(os.path.altsep, os.path.sep)
              # interpretasikan jalur absolut sebagai relatif, hapus huruf drive atau
              # jalur UNC, pemisah berlebih, "." dan ".." komponen.
              arcname = os.path.splitdrive(arcname)[1]
              invalid_path_parts = ('', os.path.curdir, os.path.pardir)
              arcname = os.path.sep.join(x for x in arcname.split(os.path.sep)
                                         if x not in invalid_path_parts)
              if os.path.sep == '\\':
                  # filter karakter ilegal di Windows
                  arcname = self._sanitize_windows_name(arcname, os.path.sep)
      
              targetpath = os.path.join(targetpath, arcname)
              targetpath = os.path.normpath(targetpath)
      
              # Buat semua direktori atas jika perlu.
              upperdirs = os.path.dirname(targetpath)
              if upperdirs and not os.path.exists(upperdirs):
                  os.makedirs(upperdirs)
      
              if member.is_dir():
                  if not os.path.isdir(targetpath):
                      os.mkdir(targetpath)
                  return targetpath
      
              with self.open(member, pwd=pwd) as source, \
                      open(targetpath, "wb") as target:
                  shutil.copyfileobj(source, target)
      
              return targetpath
      
          def _writecheck(self, zinfo):
              """Periksa kesalahan sebelum menulis file ke arsip."""
              if zinfo.filename in self.NameToInfo:
                  import warnings
                  warnings.warn('Nama duplikat: %r' % zinfo.filename, stacklevel=3)
              if self.mode not in ('w', 'x', 'a'):
                  raise ValueError("write() memerlukan mode 'w', 'x', atau 'a'")
              if not self.fp:
                  raise ValueError(
                      "Upaya menulis ke arsip ZIP yang sudah ditutup")
              _check_compression(zinfo.compress_type)
              if not self._allowZip64:
                  requires_zip64 = None
                  if len(self.filelist) >= ZIP_FILECOUNT_LIMIT:
                      requires_zip64 = "Jumlah file"
                  elif zinfo.file_size > ZIP64_LIMIT:
                      requires_zip64 = "Ukuran file"
                  elif zinfo.header_offset > ZIP64_LIMIT:
                      requires_zip64 = "Ukuran file zip"
                  if requires_zip64:
                      raise LargeZipFile(requires_zip64 +
                                         " akan memerlukan ekstensi ZIP64")
      
          def write(self, filename, arcname=None, compress_type=None):
              """Masukkan byte dari filename ke dalam arsip dengan nama
              arcname."""
              if not self.fp:
                  raise ValueError(
                      "Upaya menulis ke arsip ZIP yang sudah ditutup")
              if self._writing:
                  raise ValueError(
                      "Tidak bisa menulis ke arsip ZIP saat ada handle penulisan terbuka"
                  )
      
              zinfo = ZipInfo.from_file(filename, arcname)
      
              if zinfo.is_dir():
                  zinfo.compress_size = 0
                  zinfo.CRC = 0
              else:
                  if compress_type is not None:
                      zinfo.compress_type = compress_type
                  else:
                      zinfo.compress_type = self.compression
      
              if zinfo.is_dir():
                  with self._lock:
                      if self._seekable:
                          self.fp.seek(self.start_dir)
                      zinfo.header_offset = self.fp.tell()  # Mulai byte header
                      if zinfo.compress_type == ZIP_LZMA:
                          # Data terkompresi mencakup penanda akhir aliran (EOS)
                          zinfo.flag_bits |= 0x02
      
                      self._writecheck(zinfo)
                      self._didModify = True
      
                      self.filelist.append(zinfo)
                      self.NameToInfo[zinfo.filename] = zinfo
                      self.fp.write(zinfo.FileHeader(False))
                      self.start_dir = self.fp.tell()
              else:
                  with open(filename, "rb") as src, self.open(zinfo, 'w') as dest:
                      shutil.copyfileobj(src, dest, 1024*8)
      
          def writestr(self, zinfo_or_arcname, data, compress_type=None):
              """Tulis file ke dalam arsip. Kontennya adalah 'data', yang
              bisa berupa instance 'str' atau 'bytes'; jika itu adalah 'str',
              itu akan dienkripsi sebagai UTF-8 terlebih dahulu.
              'zinfo_or_arcname' adalah instance ZipInfo atau
              nama file dalam arsip."""
              if isinstance(data, str):
                  data = data.encode("utf-8")
              if not isinstance(zinfo_or_arcname, ZipInfo):
                  zinfo = ZipInfo(filename=zinfo_or_arcname,
                                  date_time=time.localtime(time.time())[:6])
                  zinfo.compress_type = self.compression
                  if zinfo.filename[-1] == '/':
                      zinfo.external_attr = 0o40775 << 16   # drwxrwxr-x
                      zinfo.external_attr |= 0x10           # Bendera direktori MS-DOS
                  else:
                      zinfo.external_attr = 0o600 << 16     # ?rw-------
              else:
                  zinfo = zinfo_or_arcname
      
              if not self.fp:
                  raise ValueError(
                      "Upaya menulis ke arsip ZIP yang sudah ditutup")
              if self._writing:
                  raise ValueError(
                      "Tidak bisa menulis ke arsip ZIP saat ada handle penulisan terbuka."
                  )
      
              if compress_type is not None:
                  zinfo.compress_type = compress_type
      
              zinfo.file_size = len(data)            # Ukuran tidak terkompresi
              with self._lock:
                  with self.open(zinfo, mode='w') as dest:
                      dest.write(data)
      
          def __del__(self):
              """Panggil metode "close()" jika pengguna lupa."""
              self.close()
      
          def close(self):
              """Tutup file, dan untuk mode 'w', 'x' dan 'a' tulis rekaman akhir
              ."""
              if self.fp is None:
                  return
      
              if self._writing:
                  raise ValueError("Tidak bisa menutup file ZIP saat ada "
                                   "handle penulisan terbuka padanya. "
                                   "Tutup handle penulisan sebelum menutup zip.")
      
              try:
                  if self.mode in ('w', 'x', 'a') and self._didModify:  # tulis rekaman akhir
                      with self._lock:
                          if self._seekable:
                              self.fp.seek(self.start_dir)
                          self._write_end_record()
              finally:
                  fp = self.fp
                  self.fp = None
                  self._fpclose(fp)
      
          def _write_end_record(self):
              for zinfo in self.filelist:         # tulis direktori pusat
                  dt = zinfo.date_time
                  dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
                  dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
                  extra = []
                  if zinfo.file_size > ZIP64_LIMIT \
                     or zinfo.compress_size > ZIP64_LIMIT:
                      extra.append(zinfo.file_size)
                      extra.append(zinfo.compress_size)
                      file_size = 0xffffffff
                      compress_size = 0xffffffff
                  else:
                      file_size = zinfo.file_size
                      compress_size = zinfo.compress_size
      
                  if zinfo.header_offset > ZIP64_LIMIT:
                      extra.append(zinfo.header_offset)
                      header_offset = 0xffffffff
                  else:
                      header_offset = zinfo.header_offset
      
                  extra_data = zinfo.extra
                  min_version = 0
                  if extra:
                      # Tambahkan bidang ZIP64 ke extra's
                      extra_data = struct.pack(
                          '<HHQQQ', 1, 28, zinfo.file_size, zinfo.compress_size, zinfo.header_offset)
                      extra_data = extra_data + zinfo.extra
                  if zinfo.file_size > ZIP64_LIMIT or zinfo.compress_size > ZIP64_LIMIT:
                      file_size = 0xffffffff
                      compress_size = 0xffffffff
                  if zinfo.header_offset > ZIP64_LIMIT:
                      header_offset = 0xffffffff
      
                  centDirCount = len(self.filelist)
                  centDirSize = self.fp.tell() - self.start_dir
                  centDirOffset = self.start_dir
                  requires_zip64 = None
                  if centDirCount > ZIP_FILECOUNT_LIMIT:
                      requires_zip64 = "Jumlah file"
                  elif centDirOffset > ZIP64_LIMIT:
                      requires_zip64 = "Offset direktori pusat"
                  elif centDirSize > ZIP64_LIMIT:
                      requires_zip64 = "Ukuran direktori pusat"
                  if requires_zip64:
                      # Perlu menulis rekaman akhir arsip ZIP64
                      if not self._allowZip64:
                          raise LargeZipFile(requires_zip64 +
                                             " akan memerlukan ekstensi ZIP64")
                      zip64endrec = struct.pack(
                          structEndArchive64, stringEndArchive64,
                          44, 45, 45, 0, 0, centDirCount, centDirCount,
                          centDirSize, centDirOffset)
                      self.fp.write(zip64endrec)
      
                      zip64locrec = struct.pack(
                          structEndArchive64Locator,
                          stringEndArchive64Locator, 0, pos2, 1)
                      self.fp.write(zip64locrec)
                      centDirCount = min(centDirCount, 0xFFFF)
                      centDirSize = min(centDirSize, 0xFFFFFFFF)
                      centDirOffset = min(centDirOffset, 0xFFFFFFFF)
      
              endrec = struct.pack(structEndArchive, stringEndArchive,
                                   0, 0, centDirCount, centDirCount,
                                   centDirSize, centDirOffset, len(self._comment))
              self.fp.write(endrec)
              self.fp.write(self._comment)
              self.fp.flush()
      
          def _fpclose(self, fp):
              assert self._fileRefCnt > 0
              self._fileRefCnt -= 1
              if not self._fileRefCnt and not self._filePassed:
                  fp.close()
      
      
      class PyZipFile(ZipFile):
          """Kelas untuk membuat arsip ZIP dengan file dan paket pustaka Python."""
      
          def __init__(self, file, mode="r", compression=ZIP_STORED,
                       allowZip64=True, optimize=-1):
              ZipFile.__init__(self, file, mode=mode, compression=compression,
                               allowZip64=allowZip64)
              self._optimize = optimize
      
          def writepy(self, pathname, basename="", filterfunc=None):
              """Tambahkan semua file dari "pathname" ke arsip ZIP.
      
              Jika pathname adalah direktori paket, telusuri direktori dan
              semua subdirektori paket secara rekursif untuk semua *.py dan masukkan
              modul ke dalam arsip. Jika pathname adalah direktori biasa, daftarkan *.py dan masukkan
              semua modul. Jika tidak, pathname
              harus berupa file Python *.py dan modul akan dimasukkan ke dalam
              arsip. Modul yang ditambahkan selalu module.pyc.
              Metode ini akan mengompilasi module.py menjadi module.pyc jika
              diperlukan.
              Jika filterfunc(pathname) diberikan, itu dipanggil dengan setiap argumen.
              Saat bernilai Salah, file atau direktori dilewati.
              """
              pathname = os.fspath(pathname)
              if filterfunc and not filterfunc(pathname):
                  if self.debug:
                      label = 'jalur' if os.path.isdir(pathname) else 'file'
                      print('%s %r dilewati oleh filterfunc' % (label, pathname))
                  return
              dir, name = os.path.split(pathname)
              if os.path.isdir(pathname):
                  initname = os.path.join(pathname, "__init__.py")
                  if os.path.isfile(initname):
                      # Ini adalah direktori paket, tambahkan
                      if basename:
                          basename = "%s/%s" % (basename, name)
                      else:
                          basename = name
                      if self.debug:
                          print("Menambahkan paket di", pathname, "sebagai", basename)
                      fname, arcname = self._get_codename(initname[0:-3], basename)
                      if self.debug:
                          print("Menambahkan", arcname)
                      self.write(fname, arcname)
                      dirlist = os.listdir(pathname)
                      dirlist.remove("__init__.py")
                      # Tambahkan semua file *.py dan subdirektori paket
                      for filename in dirlist:
                          path = os.path.join(pathname, filename)
                          root, ext = os.path.splitext(filename)
                          if os.path.isdir(path):
                              if os.path.isfile(os.path.join(path, "__init__.py")):
                                  # Ini adalah direktori paket, tambahkan
                                  self.writepy(path, basename,
                                               filterfunc=filterfunc)  # Panggilan rekursif
                          elif ext == ".py":
                              if filterfunc and not filterfunc(path):
                                  if self.debug:
                                      print('file %r dilewati oleh filterfunc' % path)
                                  continue
                              fname, arcname = self._get_codename(path[0:-3],
                                                                  basename)
                              if self.debug:
                                  print("Menambahkan", arcname)
                              self.write(fname, arcname)
                  else:
                      # Ini BUKAN direktori paket, tambahkan file-file di level atas
                      if self.debug:
                          print("Menambahkan file dari direktori", pathname)
                      for filename in os.listdir(pathname):
                          path = os.path.join(pathname, filename)
                          root, ext = os.path.splitext(filename)
                          if ext == ".py":
                              if filterfunc and not filterfunc(path):
                                  if self.debug:
                                      print('file %r dilewati oleh filterfunc' % path)
                                  continue
                              fname, arcname = self._get_codename(path[0:-3],
                                                                  basename)
                              if self.debug:
                                  print("Menambahkan", arcname)
                              self.write(fname, arcname)
              else:
                  if pathname[-3:] != ".py":
                      raise RuntimeError(
                          'File yang ditambahkan dengan writepy() harus diakhiri dengan ".py"')
                  fname, arcname = self._get_codename(pathname[0:-3], basename)
                  if self.debug:
                      print("Menambahkan file", arcname)
                  self.write(fname, arcname)
      
          def _get_codename(self, pathname, basename):
              """Kembalikan (filename, archivename) untuk jalur.
      
              Diberikan nama modul path, kembalikan jalur file yang benar dan
              nama arsip, kompilasi jika perlu. Sebagai contoh, diberikan
              /python/lib/string, kembalikan (/python/lib/string.pyc, string).
              """
              def _compile(file, optimize=-1):
                  import py_compile
                  if self.debug:
                      print("Mengompilasi", file)
                  try:
                      py_compile.compile(file, doraise=True, optimize=optimize)
                  except py_compile.PyCompileError as err:
                      print(err.msg)
                      return False
                  return True
      
              file_py = pathname + ".py"
              file_pyc = pathname + ".pyc"
              pycache_opt0 = importlib.util.cache_from_source(
                  file_py, optimization='')
              pycache_opt1 = importlib.util.cache_from_source(
                  file_py, optimization=1)
              pycache_opt2 = importlib.util.cache_from_source(
                  file_py, optimization=2)
              if self._optimize == -1:
                  # mode warisan: gunakan file apa pun yang ada
                  if (os.path.isfile(file_pyc) and
                          os.stat(file_pyc).st_mtime >= os.stat(file_py).st_mtime):
                      # Gunakan file .pyc.
                      arcname = fname = file_pyc
                  elif (os.path.isfile(pycache_opt0) and
                        os.stat(pycache_opt0).st_mtime >= os.stat(file_py).st_mtime):
                      # Gunakan file __pycache__/*.pyc, tetapi tulis ke nama file pyc warisan
                      # dalam arsip.
                      fname = pycache_opt0
                      arcname = file_pyc
                  elif (os.path.isfile(pycache_opt1) and
                        os.stat(pycache_opt1).st_mtime >= os.stat(file_py).st_mtime):
                      # Gunakan file __pycache__/*.pyc, tetapi tulis ke nama file pyc warisan
                      # dalam arsip.
                      fname = pycache_opt1
                      arcname = file_pyc
                  elif (os.path.isfile(pycache_opt2) and
                        os.stat(pycache_opt2).st_mtime >= os.stat(file_py).st_mtime):
                      # Gunakan file __pycache__/*.pyc, tetapi tulis ke nama file pyc warisan
                      # dalam arsip.
                      fname = pycache_opt2
                      arcname = file_pyc
                  else:
                      # Kompilasi py menjadi file PEP 3147 pyc.
                      if _compile(file_py):
                          if sys.flags.optimize == 0:
                              fname = pycache_opt0
                          elif sys.flags.optimize == 1:
                              fname = pycache_opt1
                          else:
                              fname = pycache_opt2
                          arcname = file_pyc
                      else:
                          fname = arcname = file_py
              else:
                  # mode baru: gunakan tingkat optimasi yang diberikan
                  if self._optimize == 0:
                      fname = pycache_opt0
                      arcname = file_pyc
                  else:
                      arcname = file_pyc
                      if self._optimize == 1:
                          fname = pycache_opt1
                      elif self._optimize == 2:
                          fname = pycache_opt2
                      else:
                          msg = "nilai tidak valid untuk 'optimize': {!r}".format(
                              self._optimize)
                          raise ValueError(msg)
                  if not (os.path.isfile(fname) and
                          os.stat(fname).st_mtime >= os.stat(file_py).st_mtime):
                      if not _compile(file_py, optimize=self._optimize):
                          fname = arcname = file_py
              archivename = os.path.split(arcname)[1]
              if basename:
                  archivename = "%s/%s" % (basename, archivename)
              return (fname, archivename)
      
      
      def main(args=None):
          import argparse
      
          description = 'Antarmuka baris perintah sederhana untuk modul zipfile.'
          parser = argparse.ArgumentParser(description=description)
          group = parser.add_mutually_exclusive_group(required=True)
          group.add_argument('-l', '--list', metavar='',
                             help='Tampilkan daftar zipfile')
          group.add_argument('-e', '--extract', nargs=2,
                             metavar=('', ''),
                             help='Ekstrak zipfile ke direktori target')
          group.add_argument('-c', '--create', nargs='+',
                             metavar=('', ''),
                             help='Buat zipfile dari sumber')
          group.add_argument('-t', '--test', metavar='',
                             help='Tes apakah zipfile valid')
          args = parser.parse_args(args)
      
          if args.test is not None:
              src = args.test
              with ZipFile(src, 'r') as zf:
                  badfile = zf.testzip()
              if badfile:
                  print(
                      "File terlampir berikut rusak: {!r}".format(badfile))
              print("Pengujian selesai")
      
          elif args.list is not None:
              src = args.list
              with ZipFile(src, 'r') as zf:
                  zf.printdir()
      
          elif args.extract is not None:
              src, curdir = args.extract
              with ZipFile(src, 'r') as zf:
                  zf.extractall(curdir)
      
          elif args.create is not None:
              zip_name = args.create.pop(0)
              files = args.create
      
              def addToZip(zf, path, zippath):
                  if os.path.isfile(path):
                      zf.write(path, zippath, ZIP_DEFLATED)
                  elif os.path.isdir(path):
                      if zippath:
                          zf.write(path, zippath)
                      for nm in os.listdir(path):
                          addToZip(zf,
                                   os.path.join(path, nm), os.path.join(zippath, nm))
                  # jika tidak: abaikan
      
              with ZipFile(zip_name, 'w') as zf:
                  for path in files:
                      zippath = os.path.basename(path)
                      if not zippath:
                          zippath = os.path.basename(os.path.dirname(path))
                      if zippath in ('', os.curdir, os.pardir):
                          zippath = ''
                      addToZip(zf, path, zippath)
      
      
      if __name__ == "__main__":
          main()

Langkah 2: Buat pemicu OSS

  1. Pada halaman Detail Fungsi, pilih tab Configurations, lalu pilih tab Triggers dari panel navigasi sisi kiri.

  2. Pada halaman Pemicu, klik Create Trigger. Di panel Buat Pemicu, pilih tipe pemicu OSS, konfigurasikan pengaturan sesuai kebutuhan, lalu klik OK.

    Berikut adalah konfigurasi beberapa item kunci. Untuk rincian item lainnya, lihat Mengonfigurasi Pemicu OSS Native.

    • Bucket Name: Pilih nama bucket yang telah dibuat.

    • Object Prefix: Dalam contoh ini, gunakan src.

    • Object Suffix: Dalam contoh ini, gunakan zip.

    • Trigger Event: Dalam contoh ini, gunakan oss:ObjectCreated:PutObject, oss:ObjectCreated:PostObject, oss:ObjectCreated:CompleteMultipartUpload, oss:ObjectCreated:PutSymlink.

    • Role Name: Pilih peran dan pastikan memiliki izin yang diperlukan untuk memanggil fungsi. Lampirkan kebijakan AliyunFCFullAccess untuk memberikan akses penuh.

Setelah pemicu dibuat, Anda dapat melihatnya di halaman Pemicu.

Langkah 3: Uji dan verifikasi

Anda dapat menguji konfigurasi Anda dengan dua cara:

Metode 1: Unggah file secara manual melalui konsol OSS

Masuk ke Konsol Layanan Penyimpanan Objek. Unggah file ZIP seperti code.zip ke direktori src bucket yang dipilih di Langkah 2: Buat Pemicu OSS. Setelah unggahan selesai, fungsi akan secara otomatis dipicu untuk mendekompresi file ZIP ke direktori root bucket.

Metode 2: Konfigurasikan parameter acara pemicu

Perhatikan bahwa dengan metode ini, Anda perlu memicu fungsi secara manual dengan mengklik Test Function setelah konfigurasi. Kemudian, masuk ke Konsol Object Storage Service untuk memeriksa daftar file di bucket target guna memverifikasi keberhasilan dekompresi file ZIP. Prosedurnya adalah sebagai berikut.

  1. Pada halaman Detail Fungsi, klik tab Code, lalu klik ikon xialatubiao di sebelah Test Function. Dari daftar drop-down, pilih Configure Test Parameters.

  2. <Di panel Configure Test Parameters, pilih Event Template, masukkan Event Name dan konten acara Anda, lalu klik OK.

    Contoh acara adalah sebagai berikut.

    {
        "events": [
            {
                "eventName": "ObjectCreated:PutObject",
                "eventSource": "acs:oss",
                "eventTime": "2023-08-13T06:45:43.000Z",
                "eventVersion": "1.0",
                "oss": {
                    "bucket": {
                        "arn": "acs:oss:cn-hangzhou:10343546824****:bucket****",
                        "name": "bucket****",
                        "ownerIdentity": "10343546824****"
                    },
                    "object": {
                        "deltaSize": 122539,
                        "eTag": "688A7BF4F233DC9C88A80BF985AB****",
                        "key": "src/test.zip",
                        "size": 122539
                    },
                    "ossSchemaVersion": "1.0",
                    "ruleId": "9adac8e253828f4f7c0466d941fa3db81161****"
                },
                "region": "cn-hangzhou",
                "requestParameters": {
                    "sourceIPAddress": "140.205.XX.XX"
                },
                "responseElements": {
                    "requestId": "58F9FF2D3DF792092E12044C"
                },
                "userIdentity": {
                    "principalId": "10343546824****"
                }
            }
        ]
    }

    Untuk detail tentang parameter objek acara, lihat Langkah 2: (Opsional) Konfigurasikan Parameter Input.

    Penting

    Konten acara di atas diberikan sebagai contoh. Sesuaikan parameter sesuai kebutuhan. Pastikan file yang ditentukan (file src/test.zip dalam contoh ini) ada di bucket yang ditentukan; jika tidak, fungsi tidak akan dipicu, atau eksekusinya akan gagal.

    Parameter yang harus dimodifikasi sesuai dengan lingkungan aktual Anda adalah sebagai berikut:

    • bucket.arn: Gunakan format acs:oss:<region>:<your_account_id>:<your_bucket> untuk parameter ini. Ganti <region> dengan wilayah yang dipilih saat membuat fungsi, <your_account_id> dengan ID akun Alibaba Cloud Anda, dan <your_bucket> dengan nama sebenarnya dari bucket yang telah dibuat di wilayah tersebut. Anda dapat menemukan ID akun Alibaba Cloud Anda di bagian Referensi pada halaman Overview dari Konsol Function Compute.

    • bucket.name: Ganti dengan nama sebenarnya dari bucket yang telah dibuat di wilayah yang sama dengan fungsi Anda.

    • bucket.ownerIdentity: Ganti dengan ID akun Alibaba Cloud Anda.

    • object.key: Ganti dengan nama file yang diunggah ke bucket tersebut.

    • region: Ganti dengan wilayah yang dipilih saat membuat fungsi.

    • userIdentity.principalId: Ganti dengan ID akun Alibaba Cloud Anda.

  3. Klik Test Function di tab Code untuk menjalankan tes.

    Setelah fungsi berhasil dieksekusi, masuk ke Konsol Layanan Penyimpanan Objek dan periksa apakah file ZIP target (file src/test.zip dalam contoh ini) di bucket yang ditentukan telah didekompresi. Untuk langkah-langkah rinci, lihat Gunakan Konsol OSS untuk Menanyakan Objek.

Setelah pengujian, jika Anda tidak memerlukan aplikasi ini untuk saat ini, pastikan untuk menghapus aplikasi dan sumber daya terkait apa pun untuk menghindari biaya yang tidak perlu.

Referensi