Commit 42961816 by willmcgugan

Enhanced copy / move dir functionality, and added tests

parent dc7c650e
......@@ -71,6 +71,7 @@ class FSError(Exception):
self.code = code
self.msg = msg or error_msgs.get(code, error_msgs['UNKNOWN_ERROR'])
self.path = path
self.path2 = path2
self.details = details
def __str__(self):
......@@ -239,6 +240,9 @@ class FS(object):
raise NoSysPathError("NO_SYS_PATH", path)
return None
def hassyspath(self, path):
return self.getsyspath(path, None) is not None
def open(self, path, mode="r", **kwargs):
raise UnsupportedError("UNSUPPORTED")
......@@ -379,7 +383,7 @@ class FS(object):
for path, files in self.walk(path, wildcard, dir_wildcard, search):
for f in files:
yield f
yield pathjoin(path, f)
def walk(self, path="/", wildcard=None, dir_wildcard=None, search="breadth"):
......@@ -401,8 +405,9 @@ class FS(object):
current_path = dirs.pop()
paths = []
for path in self.listdir(current_path, full=True):
for filename in self.listdir(current_path):
path = pathjoin(current_path, filename)
if self.isdir(path):
if dir_wildcard is not None:
if fnmatch.fnmatch(path, dir_wilcard):
......@@ -412,9 +417,9 @@ class FS(object):
else:
if wildcard is not None:
if fnmatch.fnmatch(path, wildcard):
paths.append(path)
paths.append(filename)
else:
paths.append(path)
paths.append(filename)
yield (current_path, paths)
elif search == "depth":
......@@ -423,7 +428,7 @@ class FS(object):
for path in self.listdir(recurse_path, wildcard=dir_wildcard, full=True, dirs_only=True):
for p in recurse(path):
yield p
yield (recurse_path, self.listdir(recurse_path, wildcard=wildcard, full=True, files_only=True))
yield (recurse_path, self.listdir(recurse_path, wildcard=wildcard, files_only=True))
for p in recurse(path):
yield p
......@@ -457,7 +462,7 @@ class FS(object):
"""
if self.isdir(dst):
dst = pathjoin( getroot(dst), getresource(src) )
dst = pathjoin( getroot(dst), getresourcename(src) )
if not self.isfile(src):
raise ResourceInvalid("WRONG_TYPE", src, msg="Source is not a file: %(path)s")
......@@ -508,35 +513,98 @@ class FS(object):
self.remove(src)
def movedir(self, src, dst):
def movedir(self, src, dst, ignore_errors=False):
"""Moves a directory from one location to another.
src -- Source directory path
dst -- Destination directory path
ignore_errors -- If True then this method will ignore FSError exceptions when moving files
"""
src_syspath = self.getsyspath(src, allow_none=True)
dst_syspath = self.getsyspath(dst, allow_none=True)
if not self.isdir(src):
raise ResourceInvalid("WRONG_TYPE", src, msg="Source is not a dst: %(path)s")
if not self.isdir(dst):
raise ResourceInvalid("WRONG_TYPE", dst, msg="Source is not a dst: %(path)s")
src_syspath = self.getsyspath(src, allow_none=True)
dst_syspath = self.getsyspath(dst, allow_none=True)
if src_syspath is not None and dst_syspath is not None:
shutil.move(src_syspath, dst_syspath)
else:
def movefile_noerrors(src, dst):
try:
return self.move(src, dst)
except FSError:
return
if ignore_errors:
movefile = movefile_noerrors
else:
movefile = self.move
silence_fserrors(self.makedir, dst)
for dirname, filename in self.walk(src):
silence_fserrors(self.makedir, dirname)
self.makedir(dst, allow_recreate=True)
for dirname, filenames in self.walk(src, search="depth"):
dst_dirname = makerelative(dirname[len(src):])
dst_dirpath = pathjoin(dst, dst_dirname)
self.makedir(dst_dirpath, allow_recreate=True, recursive=True)
for filename in filenames:
src_filename = pathjoin(dirname, filename)
dst_filename = pathjoin(dst, dirname)
dst_filename = pathjoin(dst_dirpath, filename)
movefile(src_filename, dst_filename)
self.removedir(dirname)
def copydir(self, src, dst, ignore_errors=False):
"""Copies a directory from one location to another.
src -- Source directory path
dst -- Destination directory path
ignore_errors -- If True, exceptions when copying will be ignored
"""
if not self.isdir(src):
raise ResourceInvalid("WRONG_TYPE", src, msg="Source is not a dst: %(path)s")
if not self.isdir(dst):
raise ResourceInvalid("WRONG_TYPE", dst, msg="Source is not a dst: %(path)s")
#
#src_syspath = self.getsyspath(src, allow_none=True)
#dst_syspath = self.getsyspath(dst, allow_none=True)
#
#if src_syspath is not None and dst_syspath is not None:
# shutil.copytree(src_syspath, dst_syspath)
#else:
def copyfile_noerrors(src, dst):
try:
return self.copy(src, dst)
except FSError:
return
if ignore_errors:
copyfile = copyfile_noerrors
else:
copyfile = self.copy
copyfile = self.copy
self.makedir(dst, allow_recreate=True)
for dirname, filenames in self.walk(src):
dst_dirname = makerelative(dirname[len(src):])
dst_dirpath = pathjoin(dst, dst_dirname)
self.makedir(dst_dirpath, allow_recreate=True)
for filename in filenames:
src_filename = pathjoin(dirname, filename)
dst_filename = pathjoin(dst_dirpath, filename)
copyfile(src_filename, dst_filename)
def isdirempty(self, path):
......@@ -651,7 +719,13 @@ if __name__ == "__main__":
fs1 = osfs.OSFS('~/')
fs2 = fs1.opendir("projects").opendir('prettycharts')
print_fs(fs2)
for d, f in fs1.walk('/projects/prettycharts'):
print d, f
for f in fs1.walkfiles("/projects/prettycharts"):
print f
#print_fs(fs2)
#browsewin.browse(fs1)
......
......@@ -38,6 +38,10 @@ class MemoryFile(object):
elif _check_mode(mode, 'r'):
self.mem_file = StringIO(value)
elif _check_mode(mode, "a"):
self.mem_file = StringIO()
self.mem_file.write(value)
else:
if value is not None:
self.mem_file = StringIO(value)
......
......@@ -25,6 +25,9 @@ class MountFS(FS):
FS.__init__(self, thread_syncronize=True)
self.mount_tree = ObjectTree()
def __str__(self):
return "<MountFS>"
def _delegate(self, path):
path = normpath(path)
head_path, object, tail_path = self.mount_tree.partialget(path)
......
......@@ -27,6 +27,8 @@ class OSFS(FS):
try:
f = open(self.getsyspath(path), mode, kwargs.get("buffering", -1))
except IOError, e:
if e.errno == 2:
raise ResourceNotFoundError("NO_FILE", path)
raise OperationFailedError("OPEN_FAILED", path, details=e, msg=str(e))
return f
......@@ -66,6 +68,10 @@ class OSFS(FS):
try:
os.mkdir(sys_path, mode)
except OSError, e:
if allow_recreate:
if e.errno !=17:
raise OperationFailedError("MAKEDIR_FAILED", path)
else:
raise OperationFailedError("MAKEDIR_FAILED", path)
except OSError, e:
raise OperationFailedError("MAKEDIR_FAILED", path, details=e)
......
......@@ -285,6 +285,128 @@ class TestOSFS(unittest.TestCase):
self.assert_(check("/c.txt"))
self.assert_(checkcontents("/c.txt"))
def test_movedir(self):
check = self.check
contents = "If the implementation is hard to explain, it's a bad idea."
def makefile(path):
f = self.fs.open(path, "wb")
f.write(contents)
f.close()
self.fs.makedir("a")
self.fs.makedir("b")
makefile("a/1.txt")
makefile("a/2.txt")
makefile("a/3.txt")
self.fs.makedir("a/foo/bar", recursive=True)
makefile("a/foo/bar/baz.txt")
self.fs.makedir("copy of a")
self.fs.movedir("a", "copy of a")
self.assert_(check("copy of a/1.txt"))
self.assert_(check("copy of a/2.txt"))
self.assert_(check("copy of a/3.txt"))
self.assert_(check("copy of a/foo/bar/baz.txt"))
self.assert_(not check("a/1.txt"))
self.assert_(not check("a/2.txt"))
self.assert_(not check("a/3.txt"))
self.assert_(not check("a/foo/bar/baz.txt"))
self.assert_(not check("a/foo/bar"))
self.assert_(not check("a/foo"))
self.assert_(not check("a"))
def test_copydir(self):
check = self.check
contents = "If the implementation is hard to explain, it's a bad idea."
def makefile(path):
f = self.fs.open(path, "wb")
f.write(contents)
f.close()
self.fs.makedir("a")
self.fs.makedir("b")
makefile("a/1.txt")
makefile("a/2.txt")
makefile("a/3.txt")
self.fs.makedir("a/foo/bar", recursive=True)
makefile("a/foo/bar/baz.txt")
self.fs.makedir("copy of a")
self.fs.copydir("a", "copy of a")
self.assert_(check("copy of a/1.txt"))
self.assert_(check("copy of a/2.txt"))
self.assert_(check("copy of a/3.txt"))
self.assert_(check("copy of a/foo/bar/baz.txt"))
self.assert_(check("a/1.txt"))
self.assert_(check("a/2.txt"))
self.assert_(check("a/3.txt"))
self.assert_(check("a/foo/bar/baz.txt"))
def test_readwriteappendseek(self):
def checkcontents(path, check_contents):
f = self.fs.open(path, "rb")
read_contents = f.read()
f.close()
return read_contents == check_contents
test_strings = ["Beautiful is better than ugly.",
"Explicit is better than implicit.",
"Simple is better than complex."]
all_strings = "".join(test_strings)
self.assertRaises(fs.ResourceNotFoundError, self.fs.open, "a.txt", "r")
self.assert_(not self.fs.exists("a.txt"))
f1 = self.fs.open("a.txt", "wb")
pos = 0
for s in test_strings:
f1.write(s)
pos += len(s)
self.assertEqual(pos, f1.tell())
f1.close()
self.assert_(self.fs.exists("a.txt"))
self.assert_(checkcontents("a.txt", all_strings))
f2 = self.fs.open("b.txt", "wb")
f2.write(test_strings[0])
f2.close()
f3 = self.fs.open("b.txt", "ab")
f3.write(test_strings[1])
f3.write(test_strings[2])
f3.close()
self.assert_(checkcontents("b.txt", all_strings))
f4 = self.fs.open("b.txt", "wb")
f4.write(test_strings[2])
f4.close()
self.assert_(checkcontents("b.txt", test_strings[2]))
f5 = self.fs.open("c.txt", "wt")
for s in test_strings:
f5.write(s+"\n")
f5.close()
f6 = self.fs.open("c.txt", "rt")
for s, t in zip(f6, test_strings):
self.assertEqual(s, t+"\n")
f6.close()
f7 = self.fs.open("c.txt", "rt")
f7.seek(13)
word = f7.read(6)
self.assertEqual(word, "better")
f7.seek(1, os.SEEK_CUR)
word = f7.read(4)
self.assertEqual(word, "than")
f7.seek(-9, os.SEEK_END)
word = f7.read(7)
self.assertEqual(word, "complex")
f7.close()
class TestSubFS(TestOSFS):
def setUp(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment