import zipfile
import tarfile
import os
import shutil
import posixpath
import contextlib
from distutils.errors import DistutilsError
from ._path import ensure_directory
__all__ = [
"unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
"UnrecognizedFormat", "extraction_drivers", "unpack_directory",
]
class UnrecognizedFormat(DistutilsError):
def default_filter(src, dst):
return dst
def unpack_archive(
filename, extract_dir, progress_filter=default_filter,
drivers=None):
for driver in drivers or extraction_drivers:
try:
driver(filename, extract_dir, progress_filter)
except UnrecognizedFormat:
continue
else:
return
else:
raise UnrecognizedFormat(
"Not a recognized archive type: %s" % filename
)
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
if not os.path.isdir(filename):
raise UnrecognizedFormat("%s is not a directory" % filename)
paths = {
filename: ('', extract_dir),
}
for base, dirs, files in os.walk(filename):
src, dst = paths[base]
for d in dirs:
paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
for f in files:
target = os.path.join(dst, f)
target = progress_filter(src + f, target)
if not target:
continue
ensure_directory(target)
f = os.path.join(base, f)
shutil.copyfile(f, target)
shutil.copystat(f, target)
def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
if not zipfile.is_zipfile(filename):
raise UnrecognizedFormat("%s is not a zip file" % (filename,))
with zipfile.ZipFile(filename) as z:
_unpack_zipfile_obj(z, extract_dir, progress_filter)
def _unpack_zipfile_obj(zipfile_obj, extract_dir, progress_filter=default_filter):
for info in zipfile_obj.infolist():
name = info.filename
if name.startswith('/') or '..' in name.split('/'):
continue
target = os.path.join(extract_dir, *name.split('/'))
target = progress_filter(name, target)
if not target:
continue
if name.endswith('/'):
ensure_directory(target)
else:
ensure_directory(target)
data = zipfile_obj.read(info.filename)
with open(target, 'wb') as f:
f.write(data)
unix_attributes = info.external_attr >> 16
if unix_attributes:
os.chmod(target, unix_attributes)
def _resolve_tar_file_or_dir(tar_obj, tar_member_obj):
while tar_member_obj is not None and (
tar_member_obj.islnk() or tar_member_obj.issym()):
linkpath = tar_member_obj.linkname
if tar_member_obj.issym():
base = posixpath.dirname(tar_member_obj.name)
linkpath = posixpath.join(base, linkpath)
linkpath = posixpath.normpath(linkpath)
tar_member_obj = tar_obj._getmember(linkpath)
is_file_or_dir = (
tar_member_obj is not None and
(tar_member_obj.isfile() or tar_member_obj.isdir())
)
if is_file_or_dir:
return tar_member_obj
raise LookupError('Got unknown file type')
def _iter_open_tar(tar_obj, extract_dir, progress_filter):
tar_obj.chown = lambda *args: None
with contextlib.closing(tar_obj):
for member in tar_obj:
name = member.name
if name.startswith('/') or '..' in name.split('/'):
continue
prelim_dst = os.path.join(extract_dir, *name.split('/'))
try:
member = _resolve_tar_file_or_dir(tar_obj, member)
except LookupError:
continue
final_dst = progress_filter(name, prelim_dst)
if not final_dst:
continue
if final_dst.endswith(os.sep):
final_dst = final_dst[:-1]
yield member, final_dst
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
try:
tarobj = tarfile.open(filename)
except tarfile.TarError as e:
raise UnrecognizedFormat(
"%s is not a compressed or uncompressed tar file" % (filename,)
) from e
for member, final_dst in _iter_open_tar(
tarobj, extract_dir, progress_filter,
):
try:
tarobj._extract_member(member, final_dst)
except tarfile.ExtractError:
pass
return True
extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile