Commit 612ef35e by willmcgugan

Added some tests for ZipFS

parent 6c4a5238
...@@ -30,6 +30,7 @@ error_msgs = { ...@@ -30,6 +30,7 @@ error_msgs = {
"REMOVEDIR_FAILED" : "Unable to remove dir: %(path)s", "REMOVEDIR_FAILED" : "Unable to remove dir: %(path)s",
"GETSIZE_FAILED" : "Unable to retrieve size of resource: %(path)s", "GETSIZE_FAILED" : "Unable to retrieve size of resource: %(path)s",
"COPYFILE_FAILED" : "Unable to copy file: %(path)s", "COPYFILE_FAILED" : "Unable to copy file: %(path)s",
"READ_FAILED" : "Unable to read from file: %(path)s",
# NoSysPathError # NoSysPathError
"NO_SYS_PATH" : "No mapping to OS filesytem: %(path)s,", "NO_SYS_PATH" : "No mapping to OS filesytem: %(path)s,",
......
...@@ -472,6 +472,65 @@ class TestTempFS(TestOSFS): ...@@ -472,6 +472,65 @@ class TestTempFS(TestOSFS):
td = self.fs._temp_dir td = self.fs._temp_dir
return os.path.exists(os.path.join(td, makerelative(p))) return os.path.exists(os.path.join(td, makerelative(p)))
import zipfs
import random
import zipfile
class TestReadZipFS(unittest.TestCase):
def setUp(self):
self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip"
self.temp_filename = os.path.abspath(self.temp_filename)
self.zf = zipfile.ZipFile(self.temp_filename, "w")
zf = self.zf
zf.writestr("a.txt", "Hello, World!")
zf.writestr("b.txt", "b")
zf.writestr("1.txt", "1")
zf.writestr("foo/bar/baz.txt", "baz")
zf.writestr("foo/second.txt", "hai")
zf.close()
self.fs = zipfs.ZipFS(self.temp_filename, "r")
def tearDown(self):
self.fs.close()
os.remove(self.temp_filename)
def check(self, p):
try:
self.zipfile.getinfo(p)
return True
except:
return False
def test_reads(self):
def read_contents(path):
f = self.fs.open(path)
contents = f.read()
return contents
def check_contents(path, expected):
self.assert_(read_contents(path)==expected)
check_contents("a.txt", "Hello, World!")
check_contents("1.txt", "1")
check_contents("foo/bar/baz.txt", "baz")
def test_getcontents(self):
def read_contents(path):
return self.fs.getcontents(path)
def check_contents(path, expected):
self.assert_(read_contents(path)==expected)
check_contents("a.txt", "Hello, World!")
check_contents("1.txt", "1")
check_contents("foo/bar/baz.txt", "baz")
def test_listdir(self):
def check_listing(path, expected):
dir_list = self.fs.listdir(path)
self.assert_(sorted(dir_list) == sorted(expected))
check_listing('/', ['a.txt', '1.txt', 'foo', 'b.txt'])
check_listing('foo', ['second.txt', 'bar'])
check_listing('foo/bar', ['baz.txt'])
if __name__ == "__main__": if __name__ == "__main__":
#t = TestFS() #t = TestFS()
#t.setUp() #t.setUp()
......
...@@ -92,13 +92,10 @@ class ZipFS(FS): ...@@ -92,13 +92,10 @@ class ZipFS(FS):
def close(self): def close(self):
"""Finalizes the zip file so that it can be read. """Finalizes the zip file so that it can be read.
No further operations will work after this method is called.""" No further operations will work after this method is called."""
self._lock.acquire()
try: if hasattr(self, 'zf') and self.zf:
if self.zf: self.zf.close()
self.zf.close() self.zf = _ExceptionProxy()
self.zf = _ExceptionProxy()
finally:
self._lock.release()
def __del__(self): def __del__(self):
self.close() self.close()
...@@ -136,13 +133,15 @@ class ZipFS(FS): ...@@ -136,13 +133,15 @@ class ZipFS(FS):
def getcontents(self, path): def getcontents(self, path):
self._lock.acquire() self._lock.acquire()
try: try:
if not exists(path): if not self.exists(path):
raise ResourceNotFoundError("NO_FILE", path) raise ResourceNotFoundError("NO_FILE", path)
path = normpath(path) path = normpath(path)
try: try:
contents = self.zf.read(path) contents = self.zf.read(path)
except KeyError: except KeyError:
raise ResourceNotFoundError("NO_FILE", path) raise ResourceNotFoundError("NO_FILE", path)
except RuntimeError:
raise OperationFailedError("READ_FAILED", path, "Zip file must be oppened with 'r' or 'a' to read")
return contents return contents
finally: finally:
self._lock.release() self._lock.release()
...@@ -230,6 +229,8 @@ if __name__ == "__main__": ...@@ -230,6 +229,8 @@ if __name__ == "__main__":
zfs = ZipFS("t3.zip", "w") zfs = ZipFS("t3.zip", "w")
zfs.createfile("t.txt", "Hello, World!") zfs.createfile("t.txt", "Hello, World!")
zfs.createfile("foo/bar/baz/t.txt", "Hello, World!") zfs.createfile("foo/bar/baz/t.txt", "Hello, World!")
print zfs.getcontents('t.txt')
#print zfs.isdir("t.txt") #print zfs.isdir("t.txt")
#print zfs.isfile("t.txt") #print zfs.isfile("t.txt")
#print zfs.isfile("foo/bar") #print zfs.isfile("foo/bar")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment