Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
P
pyfs
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
OpenEdx
pyfs
Commits
c1b2967b
Commit
c1b2967b
authored
Jan 20, 2012
by
willmcgugan
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
archive fs
parent
bc82eb48
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
315 additions
and
0 deletions
+315
-0
ChangeLog
+1
-0
fs/contrib/archivefs.py
+130
-0
fs/tests/test_archivefs.py
+184
-0
No files found.
ChangeLog
View file @
c1b2967b
...
...
@@ -86,3 +86,4 @@
* Added a DeleteRootError to exceptions thrown when trying to delete '/'
* Added a remove_all function to utils
* Added sqlitefs to fs.contrib, contributed by Nitin Bhide
* Added archivefs to fs.contrib, contributed by btimby
fs/contrib/archivefs.py
0 → 100644
View file @
c1b2967b
"""
fs.contrib.archivefs
========
A FS object that represents the contents of an archive.
"""
import
time
import
stat
import
datetime
import
os.path
from
fs.base
import
*
from
fs.path
import
*
from
fs.errors
import
*
from
fs.filelike
import
StringIO
import
libarchive
ENCODING
=
libarchive
.
ENCODING
class
ArchiveFS
(
FS
):
"""A FileSystem that represents an archive supported by libarchive."""
_meta
=
{
'thread_safe'
:
True
,
'virtual'
:
False
,
'read_only'
:
False
,
'unicode_paths'
:
True
,
'case_insensitive_paths'
:
False
,
'network'
:
False
,
'atomic.setcontents'
:
False
}
def
__init__
(
self
,
f
,
mode
=
'r'
,
format
=
None
,
thread_synchronize
=
True
):
"""Create a FS that maps on to a zip file.
:param path: a (system) path, or a file-like object
:param thread_synchronize: set to True (default) to enable thread-safety
"""
super
(
ArchiveFS
,
self
)
.
__init__
(
thread_synchronize
=
thread_synchronize
)
self
.
contents
=
PathMap
()
self
.
archive
=
libarchive
.
SeekableArchive
(
f
,
format
=
format
,
mode
=
mode
)
if
mode
==
'r'
:
for
item
in
self
.
archive
:
for
part
in
recursepath
(
item
.
pathname
)[
1
:]:
part
=
relpath
(
part
)
if
part
==
item
.
pathname
:
self
.
contents
[
part
]
=
item
else
:
self
.
contents
[
part
]
=
libarchive
.
Entry
(
pathname
=
part
,
mode
=
stat
.
S_IFDIR
,
size
=
0
,
mtime
=
item
.
mtime
)
def
__str__
(
self
):
return
"<ArchiveFS>"
def
__unicode__
(
self
):
return
u"<ArchiveFS>"
def
getmeta
(
self
,
meta_name
,
default
=
NoDefaultMeta
):
if
meta_name
==
'read_only'
:
return
self
.
read_only
return
super
(
ZipFS
,
self
)
.
getmeta
(
meta_name
,
default
)
def
close
(
self
):
self
.
archive
.
close
()
@synchronize
def
open
(
self
,
path
,
mode
=
"r"
,
**
kwargs
):
path
=
normpath
(
relpath
(
path
))
if
mode
not
in
(
'r'
,
'w'
,
'wb'
):
raise
Exception
(
'Unsupported mode '
+
mode
)
if
'r'
in
mode
:
return
self
.
archive
.
readstream
(
path
)
else
:
return
self
.
archive
.
writestream
(
path
)
@synchronize
def
getcontents
(
self
,
path
,
mode
=
"rb"
):
if
not
self
.
exists
(
path
):
raise
ResourceNotFoundError
(
path
)
f
=
self
.
open
(
path
)
return
f
.
read
()
def
desc
(
self
,
path
):
return
"
%
s in zip file"
%
path
def
isdir
(
self
,
path
):
info
=
self
.
getinfo
(
path
)
return
stat
.
S_ISDIR
(
info
.
get
(
'mode'
,
0
))
def
isfile
(
self
,
path
):
info
=
self
.
getinfo
(
path
)
return
stat
.
S_ISREG
(
info
.
get
(
'mode'
,
0
))
def
exists
(
self
,
path
):
path
=
normpath
(
path
)
.
lstrip
(
'/'
)
return
path
in
self
.
contents
def
listdir
(
self
,
path
=
"/"
,
wildcard
=
None
,
full
=
False
,
absolute
=
False
,
dirs_only
=
False
,
files_only
=
False
):
return
self
.
_listdir_helper
(
path
,
self
.
contents
.
names
(
path
),
wildcard
,
full
,
absolute
,
dirs_only
,
files_only
)
def
makedir
(
self
,
dirname
,
recursive
=
False
,
allow_recreate
=
False
):
entry
=
self
.
archive
.
entry_class
(
pathname
=
dirname
,
mode
=
stat
.
S_IFDIR
,
size
=
0
,
mtime
=
time
.
time
())
self
.
archive
.
write
(
entry
)
@synchronize
def
getinfo
(
self
,
path
):
if
not
self
.
exists
(
path
):
raise
ResourceNotFoundError
(
path
)
path
=
normpath
(
path
)
.
lstrip
(
'/'
)
info
=
{
'size'
:
0
}
try
:
entry
=
self
.
contents
.
get
(
path
)
for
attr
in
dir
(
entry
):
if
attr
.
startswith
(
'_'
):
continue
elif
attr
==
'mtime'
:
info
[
'created_time'
]
=
datetime
.
datetime
.
fromtimestamp
(
entry
.
mtime
)
else
:
info
[
attr
]
=
getattr
(
entry
,
attr
)
except
KeyError
:
pass
return
info
def
main
():
ArchiveFS
()
if
__name__
==
'__main__'
:
main
()
fs/tests/test_archivefs.py
0 → 100644
View file @
c1b2967b
"""
fs.tests.test_zipfs: testcases for the ZipFS class
"""
import
unittest
import
os
import
random
import
zipfile
import
tempfile
import
shutil
import
fs.tests
from
fs.path
import
*
from
fs.contrib
import
archivefs
from
six
import
PY3
,
b
class
TestReadZipFS
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
temp_filename
=
""
.
join
(
random
.
choice
(
"abcdefghijklmnopqrstuvwxyz"
)
for
_
in
range
(
6
))
+
".zip"
self
.
temp_filename
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
self
.
temp_filename
)
self
.
zf
=
zipfile
.
ZipFile
(
self
.
temp_filename
,
"w"
)
zf
=
self
.
zf
zf
.
writestr
(
"a.txt"
,
b
(
"Hello, World!"
))
zf
.
writestr
(
"b.txt"
,
b
(
"b"
))
zf
.
writestr
(
"1.txt"
,
b
(
"1"
))
zf
.
writestr
(
"foo/bar/baz.txt"
,
b
(
"baz"
))
zf
.
writestr
(
"foo/second.txt"
,
b
(
"hai"
))
zf
.
close
()
self
.
fs
=
archivefs
.
ArchiveFS
(
self
.
temp_filename
,
"r"
)
def
tearDown
(
self
):
self
.
fs
.
close
()
os
.
remove
(
self
.
temp_filename
)
def
check
(
self
,
p
):
try
:
self
.
zipfile
.
getinfo
(
p
)
return
True
except
:
return
False
def
test_reads
(
self
):
def
read_contents
(
path
):
f
=
self
.
fs
.
open
(
path
)
contents
=
f
.
read
()
return
contents
def
check_contents
(
path
,
expected
):
self
.
assert_
(
read_contents
(
path
)
==
expected
)
check_contents
(
"a.txt"
,
b
(
"Hello, World!"
))
check_contents
(
"1.txt"
,
b
(
"1"
))
check_contents
(
"foo/bar/baz.txt"
,
b
(
"baz"
))
def
test_getcontents
(
self
):
def
read_contents
(
path
):
return
self
.
fs
.
getcontents
(
path
)
def
check_contents
(
path
,
expected
):
self
.
assert_
(
read_contents
(
path
)
==
expected
)
check_contents
(
"a.txt"
,
b
(
"Hello, World!"
))
check_contents
(
"1.txt"
,
b
(
"1"
))
check_contents
(
"foo/bar/baz.txt"
,
b
(
"baz"
))
def
test_is
(
self
):
self
.
assert_
(
self
.
fs
.
isfile
(
'a.txt'
))
self
.
assert_
(
self
.
fs
.
isfile
(
'1.txt'
))
self
.
assert_
(
self
.
fs
.
isfile
(
'foo/bar/baz.txt'
))
self
.
assert_
(
self
.
fs
.
isdir
(
'foo'
))
self
.
assert_
(
self
.
fs
.
isdir
(
'foo/bar'
))
self
.
assert_
(
self
.
fs
.
exists
(
'a.txt'
))
self
.
assert_
(
self
.
fs
.
exists
(
'1.txt'
))
self
.
assert_
(
self
.
fs
.
exists
(
'foo/bar/baz.txt'
))
self
.
assert_
(
self
.
fs
.
exists
(
'foo'
))
self
.
assert_
(
self
.
fs
.
exists
(
'foo/bar'
))
def
test_listdir
(
self
):
def
check_listing
(
path
,
expected
):
dir_list
=
self
.
fs
.
listdir
(
path
)
self
.
assert_
(
sorted
(
dir_list
)
==
sorted
(
expected
))
for
item
in
dir_list
:
self
.
assert_
(
isinstance
(
item
,
unicode
))
check_listing
(
'/'
,
[
'a.txt'
,
'1.txt'
,
'foo'
,
'b.txt'
])
check_listing
(
'foo'
,
[
'second.txt'
,
'bar'
])
check_listing
(
'foo/bar'
,
[
'baz.txt'
])
class
TestWriteZipFS
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
temp_filename
=
""
.
join
(
random
.
choice
(
"abcdefghijklmnopqrstuvwxyz"
)
for
_
in
range
(
6
))
+
".zip"
self
.
temp_filename
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
self
.
temp_filename
)
archive_fs
=
archivefs
.
ArchiveFS
(
self
.
temp_filename
,
format
=
'zip'
,
mode
=
'w'
)
def
makefile
(
filename
,
contents
):
if
dirname
(
filename
):
archive_fs
.
makedir
(
dirname
(
filename
),
recursive
=
True
,
allow_recreate
=
True
)
f
=
archive_fs
.
open
(
filename
,
'wb'
)
f
.
write
(
contents
)
f
.
close
()
makefile
(
"a.txt"
,
b
(
"Hello, World!"
))
makefile
(
"b.txt"
,
b
(
"b"
))
makefile
(
u"
\N{GREEK SMALL LETTER ALPHA}
/
\N{GREEK CAPITAL LETTER OMEGA}
.txt"
,
b
(
"this is the alpha and the omega"
))
makefile
(
"foo/bar/baz.txt"
,
b
(
"baz"
))
makefile
(
"foo/second.txt"
,
b
(
"hai"
))
archive_fs
.
close
()
def
tearDown
(
self
):
os
.
remove
(
self
.
temp_filename
)
def
test_valid
(
self
):
zf
=
zipfile
.
ZipFile
(
self
.
temp_filename
,
"r"
)
self
.
assert_
(
zf
.
testzip
()
is
None
)
zf
.
close
()
def
test_creation
(
self
):
zf
=
zipfile
.
ZipFile
(
self
.
temp_filename
,
"r"
)
def
check_contents
(
filename
,
contents
):
if
PY3
:
zcontents
=
zf
.
read
(
filename
)
else
:
zcontents
=
zf
.
read
(
filename
.
encode
(
archivefs
.
ENCODING
))
self
.
assertEqual
(
contents
,
zcontents
)
check_contents
(
"a.txt"
,
b
(
"Hello, World!"
))
check_contents
(
"b.txt"
,
b
(
"b"
))
check_contents
(
"foo/bar/baz.txt"
,
b
(
"baz"
))
check_contents
(
"foo/second.txt"
,
b
(
"hai"
))
check_contents
(
u"
\N{GREEK SMALL LETTER ALPHA}
/
\N{GREEK CAPITAL LETTER OMEGA}
.txt"
,
b
(
"this is the alpha and the omega"
))
#~ class TestAppendZipFS(TestWriteZipFS):
#~ def setUp(self):
#~ self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip"
#~ self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename)
#~ zip_fs = zipfs.ZipFS(self.temp_filename, 'w')
#~ def makefile(filename, contents):
#~ if dirname(filename):
#~ zip_fs.makedir(dirname(filename), recursive=True, allow_recreate=True)
#~ f = zip_fs.open(filename, 'wb')
#~ f.write(contents)
#~ f.close()
#~ makefile("a.txt", b("Hello, World!"))
#~ makefile("b.txt", b("b"))
#~ zip_fs.close()
#~ zip_fs = zipfs.ZipFS(self.temp_filename, 'a')
#~ makefile("foo/bar/baz.txt", b("baz"))
#~ makefile(u"\N{GREEK SMALL LETTER ALPHA}/\N{GREEK CAPITAL LETTER OMEGA}.txt", b("this is the alpha and the omega"))
#~ makefile("foo/second.txt", b("hai"))
#~ zip_fs.close()
#~ class TestZipFSErrors(unittest.TestCase):
#~ def setUp(self):
#~ self.workdir = tempfile.mkdtemp()
#~ def tearDown(self):
#~ shutil.rmtree(self.workdir)
#~ def test_bogus_zipfile(self):
#~ badzip = os.path.join(self.workdir,"bad.zip")
#~ f = open(badzip,"wb")
#~ f.write(b("I'm not really a zipfile"))
#~ f.close()
#~ self.assertRaises(zipfs.ZipOpenError,zipfs.ZipFS,badzip)
#~ def test_missing_zipfile(self):
#~ missingzip = os.path.join(self.workdir,"missing.zip")
#~ self.assertRaises(zipfs.ZipNotFoundError,zipfs.ZipFS,missingzip)
if
__name__
==
'__main__'
:
unittest
.
main
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment