Python 인메모리 zip 라이브러리
실제 디스크 파일을 사용하지 않고 메모리에서 zip 아카이브를 조작할 수 있는 파이썬 라이브러리가 있습니까?
ZipFile 라이브러리에서는 보관 파일을 업데이트할 수 없습니다.유일한 방법은 디렉토리에 압축을 풀고 변경한 후 해당 디렉토리에서 새 zip을 만드는 것입니다.디스크 액세스 없이 zip 아카이브를 수정하고 싶습니다. 다운로드하고 변경한 후 다시 업로드하기 때문에 저장할 이유가 없습니다.
Java의 ZipInputStream/ZipOutputStream과 유사한 인터페이스를 사용하면 디스크 액세스를 방지할 수 있습니다.
Python 문서에 따르면:
class zipfile.ZipFile(file[, mode[, compression[, allowZip64]]])
Open a ZIP file, where file can be either a path to a file (a string) or a file-like object.
따라서 메모리에서 파일을 열려면 파일과 같은 개체를 만듭니다(바이트 사용).IO).
file_like_object = io.BytesIO(my_zip_data)
zipfile_ob = zipfile.ZipFile(file_like_object)
파이썬 3
import io
import zipfile
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "a",
zipfile.ZIP_DEFLATED, False) as zip_file:
for file_name, data in [('1.txt', io.BytesIO(b'111')),
('2.txt', io.BytesIO(b'222'))]:
zip_file.writestr(file_name, data.getvalue())
with open('C:/1.zip', 'wb') as f:
f.write(zip_buffer.getvalue())
Python의 In-Memory Zip 기사에서:
아래는 Postous가 종료된 이후 Python으로 메모리를 압축하는 것에 대한 2008년 5월의 제 게시물입니다.
최근에 Python으로 파일을 메모리에 압축할 수 있는 유료 구성 요소가 있다는 것을 알게 되었습니다.이것이 무료여야 하는 것임을 고려하여, 저는 다음과 같은 코드를 만들었습니다.아주 기본적인 테스트만 거쳤으니 혹시라도 오류가 발견되면 알려주시면 업데이트하겠습니다.
import zipfile
import StringIO
class InMemoryZip(object):
def __init__(self):
# Create the in-memory file-like object
self.in_memory_zip = StringIO.StringIO()
def append(self, filename_in_zip, file_contents):
'''Appends a file with name filename_in_zip and contents of
file_contents to the in-memory zip.'''
# Get a handle to the in-memory zip in append mode
zf = zipfile.ZipFile(self.in_memory_zip, "a", zipfile.ZIP_DEFLATED, False)
# Write the file to the in-memory zip
zf.writestr(filename_in_zip, file_contents)
# Mark the files as having been created on Windows so that
# Unix permissions are not inferred as 0000
for zfile in zf.filelist:
zfile.create_system = 0
return self
def read(self):
'''Returns a string with the contents of the in-memory zip.'''
self.in_memory_zip.seek(0)
return self.in_memory_zip.read()
def writetofile(self, filename):
'''Writes the in-memory zip to a file.'''
f = file(filename, "w")
f.write(self.read())
f.close()
if __name__ == "__main__":
# Run a test
imz = InMemoryZip()
imz.append("test.txt", "Another test").append("test2.txt", "Still another")
imz.writetofile("test.zip")
Ethier가 제공한 예제에는 몇 가지 문제가 있으며, 그 중 일부는 다음과 같습니다.
- Windows(윈도우)의 실제 데이터에서는 작동하지 않습니다.ZIP 파일은 이진 파일이며 해당 데이터는 항상 'wb' 파일을 연 상태에서 작성해야 합니다.
- 각 파일에 대해 ZIP 파일이 추가되므로 비효율적입니다.열어서 보관할 수 있습니다.
InMemoryZip
기여하다 - 설명서에는 ZIP 파일을 명시적으로 닫아야 한다고 명시되어 있습니다. 이는 추가 기능에서는 수행되지 않습니다(예: zf가 범위를 벗어나 ZIP 파일을 닫기 때문에 (예를 들어) 작동할 수 있습니다).
- create_system 플래그는 파일당 한 번만 추가하는 대신 파일이 추가될 때마다 zip 파일의 모든 파일에 대해 설정됩니다.
- Python < 3cString에서IO가 StringIO보다 훨씬 효율적임
- Python 3에서 작동하지 않습니다(원래 기사는 3.0 릴리스 이전의 것이었지만 코드가 게시되었을 때 3.1은 오랫동안 사용되지 않았습니다).
은 설하면버사전수있을 할 수 .ruamel.std.zipfile
(저는 그 저자입니다).After .
pip install ruamel.std.zipfile
또는 여기서 클래스의 코드를 포함하여 다음 작업을 수행할 수 있습니다.
import ruamel.std.zipfile as zipfile
# Run a test
zipfile.InMemoryZipFile()
imz.append("test.txt", "Another test").append("test2.txt", "Still another")
imz.writetofile("test.zip")
또는 다음을 사용하여 내용을 작성할 수 있습니다.imz.data
필요한 곳이면 어디든 갈 수 있습니다.
은 또한 있다니습수도를 할 수 .with
파일 이름을 제공하는 경우 해당 컨텍스트를 떠날 때 ZIP의 내용이 기록됩니다.
with zipfile.InMemoryZipFile('test.zip') as imz:
imz.append("test.txt", "Another test").append("test2.txt", "Still another")
디스크에 쓰는 것이 지연되었기 때문에, 당신은 실제로 오래된 것을 읽을 수 있습니다.test.zip
그런 맥락에서
Flask를 사용하여 인메모리 zip 파일을 생성하여 다운로드로 반환하고 있습니다.Vladimir의 위의 예를 기반으로 합니다.seek(0)
알아내는데 시간이 좀 걸렸습니다.
import io
import zipfile
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
for file_name, data in [('1.txt', io.BytesIO(b'111')), ('2.txt', io.BytesIO(b'222'))]:
zip_file.writestr(file_name, data.getvalue())
zip_buffer.seek(0)
return send_file(zip_buffer, attachment_filename='filename.zip', as_attachment=True)
디스크 액세스 없이 zip 아카이브를 수정하고 싶습니다. 다운로드하고 변경한 후 다시 업로드하기 때문에 저장할 이유가 없습니다.
이것은 https://github.com/uktrade/stream-unzip 과 https://github.com/uktrade/stream-zip 두 개의 라이브러리를 사용하여 가능합니다(완전 공개: 내가 작성함).또한 변경 사항에 따라 전체 zip을 한 번에 메모리에 저장할 필요가 없을 수도 있습니다.
다운로드, 압축 풀기, 압축 풀기, 재업로드를 원한다고 합니다.약간 무의미하지만 압축 해제된 콘텐츠에 대한 일부 변경 사항을 삽입할 수 있습니다.
from datetime import datetime
import httpx
from stream_unzip import stream_unzip
from stream_zip import stream_zip, ZIP_64
def get_source_bytes_iter(url):
with httpx.stream('GET', url) as r:
yield from r.iter_bytes()
def get_target_files(files):
# stream-unzip doesn't expose perms or modified_at, but stream-zip requires them
modified_at = datetime.now()
perms = 0o600
for name, _, chunks in files:
# Could change name, manipulate chunks, skip a file, or yield a new file
yield name.decode(), modified_at, perms, ZIP_64, chunks
source_url = 'https://source.test/file.zip'
target_url = 'https://target.test/file.zip'
source_bytes_iter = get_source_bytes_iter(source_url)
source_files = stream_unzip(source_bytes_iter)
target_files = get_target_files(source_files)
target_bytes_iter = stream_zip(target_files)
httpx.put(target_url, data=target_bytes_iter)
다음과 같은 데이터를 기반으로 여러 파일로 메모리 내 zip 파일을 만드는 도우미{'1.txt': 'string', '2.txt": b'bytes'}
import io, zipfile
def prepare_zip_file_content(file_name_content: dict) -> bytes:
"""returns Zip bytes ready to be saved with
open('C:/1.zip', 'wb') as f: f.write(bytes)
@file_name_content dict like {'1.txt': 'string', '2.txt": b'bytes'}
"""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
for file_name, file_data in file_name_content.items():
zip_file.writestr(file_name, file_data)
zip_buffer.seek(0)
return zip_buffer.getvalue()
ctypes를 통해 Python의 라이브러리 라이브러리 아카이브를 사용할 수 있습니다. 이 라이브러리는 스트리밍(적어도 역사적으로)에 중점을 두고 메모리의 ZIP 데이터를 조작하는 방법을 제공합니다.
HTTP 서버에서 다운로드하는 동안 ZIP 파일을 즉시 압축 해제하려고 합니다.아래 코드
from contextlib import contextmanager
from ctypes import CFUNCTYPE, POINTER, create_string_buffer, cdll, byref, c_ssize_t, c_char_p, c_int, c_void_p, c_char
from ctypes.util import find_library
import httpx
def get_zipped_chunks(url, chunk_size=6553):
with httpx.stream('GET', url) as r:
yield from r.iter_bytes()
def stream_unzip(zipped_chunks, chunk_size=65536):
# Library
libarchive = cdll.LoadLibrary(find_library('archive'))
# Callback types
open_callback_type = CFUNCTYPE(c_int, c_void_p, c_void_p)
read_callback_type = CFUNCTYPE(c_ssize_t, c_void_p, c_void_p, POINTER(POINTER(c_char)))
close_callback_type = CFUNCTYPE(c_int, c_void_p, c_void_p)
# Function types
libarchive.archive_read_new.restype = c_void_p
libarchive.archive_read_open.argtypes = [c_void_p, c_void_p, open_callback_type, read_callback_type, close_callback_type]
libarchive.archive_read_finish.argtypes = [c_void_p]
libarchive.archive_entry_new.restype = c_void_p
libarchive.archive_read_next_header.argtypes = [c_void_p, c_void_p]
libarchive.archive_read_support_compression_all.argtypes = [c_void_p]
libarchive.archive_read_support_format_all.argtypes = [c_void_p]
libarchive.archive_entry_pathname.argtypes = [c_void_p]
libarchive.archive_entry_pathname.restype = c_char_p
libarchive.archive_read_data.argtypes = [c_void_p, POINTER(c_char), c_ssize_t]
libarchive.archive_read_data.restype = c_ssize_t
libarchive.archive_error_string.argtypes = [c_void_p]
libarchive.archive_error_string.restype = c_char_p
ARCHIVE_EOF = 1
ARCHIVE_OK = 0
it = iter(zipped_chunks)
compressed_bytes = None # Make sure not garbage collected
@contextmanager
def get_archive():
archive = libarchive.archive_read_new()
if not archive:
raise Exception('Unable to allocate archive')
try:
yield archive
finally:
libarchive.archive_read_finish(archive)
def read_callback(archive, client_data, buffer):
nonlocal compressed_bytes
try:
compressed_bytes = create_string_buffer(next(it))
except StopIteration:
return 0
else:
buffer[0] = compressed_bytes
return len(compressed_bytes) - 1
def uncompressed_chunks(archive):
uncompressed_bytes = create_string_buffer(chunk_size)
while (num := libarchive.archive_read_data(archive, uncompressed_bytes, len(uncompressed_bytes))) > 0:
yield uncompressed_bytes.value[:num]
if num < 0:
raise Exception(libarchive.archive_error_string(archive))
with get_archive() as archive:
libarchive.archive_read_support_compression_all(archive)
libarchive.archive_read_support_format_all(archive)
libarchive.archive_read_open(
archive, 0,
open_callback_type(0), read_callback_type(read_callback), close_callback_type(0),
)
entry = c_void_p(libarchive.archive_entry_new())
if not entry:
raise Exception('Unable to allocate entry')
while (status := libarchive.archive_read_next_header(archive, byref(entry))) == ARCHIVE_OK:
yield (libarchive.archive_entry_pathname(entry), uncompressed_chunks(archive))
if status != ARCHIVE_EOF:
raise Exception(libarchive.archive_error_string(archive))
하기 위해 다음과 같이 사용할 수 있습니다.
zipped_chunks = get_zipped_chunks('https://domain.test/file.zip')
files = stream_unzip(zipped_chunks)
for name, uncompressed_chunks in stream_unzip(zipped_chunks):
print(name)
for uncompressed_chunk in uncompressed_chunks:
print(uncompressed_chunk)
실제로 libarchive는 여러 아카이브 형식을 지원하며 위의 내용은 특별히 ZIP에만 해당되지 않으므로 다른 형식과 함께 사용할 수 있습니다.
언급URL : https://stackoverflow.com/questions/2463770/python-in-memory-zip-library
'programing' 카테고리의 다른 글
ScrollView 내부의 TableView 스크롤이 자연스럽게 동작하도록 하는 방법 (0) | 2023.07.31 |
---|---|
Android: 태블릿에 세로와 가로를 허용하지만 전화기에 세로를 강제로 설정하시겠습니까? (0) | 2023.07.31 |
자녀가 Angular 2에서 부모 이벤트를 수신합니다. (0) | 2023.07.31 |
Android: 선형 레이아웃에 테두리를 그리는 방법 (0) | 2023.07.31 |
팬더 콘캣: 가치 오류:전달된 값의 모양은 blah이고 인덱스는 blah2를 의미합니다. (0) | 2023.07.31 |