Commit 20640d1a by willmcgugan

Work in progress

parent 109c7f5d
...@@ -140,14 +140,27 @@ class NullFile(object): ...@@ -140,14 +140,27 @@ class NullFile(object):
def isabsolutepath(path): def isabsolutepath(path):
"""Returns True if a given path is absolute.""" """Returns True if a given path is absolute.
>>> isabsolutepath("a/b/c")
False
>>> isabsolutepath("/foo/bar")
True
"""
if path: if path:
return path[0] in '\\/' return path[0] in '\\/'
return False return False
def normpath(path): def normpath(path):
"""Normalizes a path to be in the formated expected by FS objects. """Normalizes a path to be in the formated expected by FS objects.
Returns a new path string.""" Returns a new path string.
>>> normpath(r"foo\\bar\\baz")
'foo/bar/baz'
"""
return path.replace('\\', '/') return path.replace('\\', '/')
...@@ -156,6 +169,12 @@ def pathjoin(*paths): ...@@ -156,6 +169,12 @@ def pathjoin(*paths):
paths -- An iterable of path strings paths -- An iterable of path strings
>>> pathjoin('foo', 'bar', 'baz')
'foo/bar/baz'
>>> pathjoin('foo/bar', '../baz')
'foo/baz'
""" """
absolute = False absolute = False
...@@ -193,7 +212,7 @@ def pathsplit(path): ...@@ -193,7 +212,7 @@ def pathsplit(path):
('foo', 'bar') ('foo', 'bar')
>>> pathsplit("foo/bar/baz") >>> pathsplit("foo/bar/baz")
('foo/bar', 'bar') ('foo/bar', 'baz')
""" """
...@@ -203,9 +222,25 @@ def pathsplit(path): ...@@ -203,9 +222,25 @@ def pathsplit(path):
return tuple(split) return tuple(split)
def resolvepath(path): def resolvepath(path):
"""Normalises the path and removes any relative path components.
path -- A path string
>>> resolvepath(r"foo\\bar\\..\\baz")
'foo/baz'
"""
return pathjoin(path) return pathjoin(path)
def makerelative(path): def makerelative(path):
"""Makes a path relative by removing initial separator.
path -- A normalised path
>>> makerelative("/foo/bar")
'foo/bar'
"""
if path.startswith('/'): if path.startswith('/'):
return path[1:] return path[1:]
return path return path
...@@ -261,8 +296,10 @@ class FS(object): ...@@ -261,8 +296,10 @@ class FS(object):
return pathjoin('/', pathname) return pathjoin('/', pathname)
return pathname return pathname
def getsyspath(self, path): def getsyspath(self, path, default=None):
raise NoSysPathError("NO_SYS_PATH", path) if default is None:
raise NoSysPathError("NO_SYS_PATH", path)
return default
def open(self, path, mode="r", buffering=-1, **kwargs): def open(self, path, mode="r", buffering=-1, **kwargs):
raise UnsupportedError("UNSUPPORTED") raise UnsupportedError("UNSUPPORTED")
...@@ -400,17 +437,6 @@ class FS(object): ...@@ -400,17 +437,6 @@ class FS(object):
def getsize(self, path): def getsize(self, path):
return self.getinfo(path)['size'] return self.getinfo(path)['size']
def makefile(self, path, data):
f = None
try:
f = self.open(path, "wb")
f.write(data)
finally:
if f is not None:
f.close()
return True
class SubFS(FS): class SubFS(FS):
......
...@@ -18,7 +18,7 @@ class OSFS(FS): ...@@ -18,7 +18,7 @@ class OSFS(FS):
def __str__(self): def __str__(self):
return "<OSFS \"%s\">" % self.root_path return "<OSFS \"%s\">" % self.root_path
def getsyspath(self, path): def getsyspath(self, path, default=None):
sys_path = os.path.join(self.root_path, makerelative(self._resolve(path))) sys_path = os.path.join(self.root_path, makerelative(self._resolve(path)))
return sys_path return sys_path
......
...@@ -6,6 +6,7 @@ import fs ...@@ -6,6 +6,7 @@ import fs
class TestHelpers(unittest.TestCase): class TestHelpers(unittest.TestCase):
def test_isabsolutepath(self): def test_isabsolutepath(self):
"""fs.isabsolutepath tests"""
tests = [ ('', False), tests = [ ('', False),
('/', True), ('/', True),
('/A/B', True), ('/A/B', True),
...@@ -16,6 +17,7 @@ class TestHelpers(unittest.TestCase): ...@@ -16,6 +17,7 @@ class TestHelpers(unittest.TestCase):
self.assertEqual(fs.isabsolutepath(path), result) self.assertEqual(fs.isabsolutepath(path), result)
def test_normpath(self): def test_normpath(self):
"""fs.normpath tests"""
tests = [ ("\\a\\b\\c", "/a/b/c"), tests = [ ("\\a\\b\\c", "/a/b/c"),
("", ""), ("", ""),
("/a/b/c", "/a/b/c"), ("/a/b/c", "/a/b/c"),
...@@ -24,6 +26,7 @@ class TestHelpers(unittest.TestCase): ...@@ -24,6 +26,7 @@ class TestHelpers(unittest.TestCase):
self.assertEqual(fs.normpath(path), result) self.assertEqual(fs.normpath(path), result)
def test_pathjon(self): def test_pathjon(self):
"""fs.pathjoin tests"""
tests = [ ("", "a", "a"), tests = [ ("", "a", "a"),
("a", "a", "a/a"), ("a", "a", "a/a"),
("a/b", "../c", "a/c"), ("a/b", "../c", "a/c"),
...@@ -51,6 +54,7 @@ class TestHelpers(unittest.TestCase): ...@@ -51,6 +54,7 @@ class TestHelpers(unittest.TestCase):
self.assertRaises(fs.PathError, fs.pathjoin, "a/b/../../../d") self.assertRaises(fs.PathError, fs.pathjoin, "a/b/../../../d")
def test_makerelative(self): def test_makerelative(self):
"""fs.makerelative tests"""
tests = [ ("/a/b", "a/b"), tests = [ ("/a/b", "a/b"),
("a/b", "a/b"), ("a/b", "a/b"),
("/", "") ] ("/", "") ]
...@@ -59,7 +63,8 @@ class TestHelpers(unittest.TestCase): ...@@ -59,7 +63,8 @@ class TestHelpers(unittest.TestCase):
print path, result print path, result
self.assertEqual(fs.makerelative(path), result) self.assertEqual(fs.makerelative(path), result)
def test_absolute(self): def test_makeabsolute(self):
"""fs.makeabsolute tests"""
tests = [ ("/a/b", "/a/b"), tests = [ ("/a/b", "/a/b"),
("a/b", "/a/b"), ("a/b", "/a/b"),
("/", "/") ] ("/", "/") ]
...@@ -68,6 +73,7 @@ class TestHelpers(unittest.TestCase): ...@@ -68,6 +73,7 @@ class TestHelpers(unittest.TestCase):
self.assertEqual(fs.makeabsolute(path), result) self.assertEqual(fs.makeabsolute(path), result)
def test_iteratepath(self): def test_iteratepath(self):
"""fs.iteratepath tests"""
tests = [ ("a/b", ["a", "b"]), tests = [ ("a/b", ["a", "b"]),
("", [] ), ("", [] ),
("aaa/bbb/ccc", ["aaa", "bbb", "ccc"]), ("aaa/bbb/ccc", ["aaa", "bbb", "ccc"]),
...@@ -82,6 +88,7 @@ class TestHelpers(unittest.TestCase): ...@@ -82,6 +88,7 @@ class TestHelpers(unittest.TestCase):
self.assertEqual(list(fs._iteratepath("a/b/c/d", 2)), ["a", "b", "c/d"]) self.assertEqual(list(fs._iteratepath("a/b/c/d", 2)), ["a", "b", "c/d"])
def test_pathsplit(self): def test_pathsplit(self):
"""fs.pathsplit tests"""
tests = [ ("a/b", ("a", "b")), tests = [ ("a/b", ("a", "b")),
("a/b/c", ("a/b", "c")), ("a/b/c", ("a/b", "c")),
("a", ("", "a")), ("a", ("", "a")),
...@@ -119,7 +126,7 @@ class TestFS(unittest.TestCase): ...@@ -119,7 +126,7 @@ class TestFS(unittest.TestCase):
return os.path.exists(os.path.join(self.temp_dir, p)) return os.path.exists(os.path.join(self.temp_dir, p))
def test_makedir(self): def test_makedir(self):
"""osfs.makedir tests"""
check = self.check check = self.check
self.fs.makedir("a") self.fs.makedir("a")
...@@ -137,7 +144,7 @@ class TestFS(unittest.TestCase): ...@@ -137,7 +144,7 @@ class TestFS(unittest.TestCase):
def test_removedir(self): def test_removedir(self):
"""osfs.removedir tests"""
check = self.check check = self.check
self.fs.makedir("a") self.fs.makedir("a")
self.assert_(check("a")) self.assert_(check("a"))
...@@ -159,20 +166,13 @@ class TestFS(unittest.TestCase): ...@@ -159,20 +166,13 @@ class TestFS(unittest.TestCase):
self.assert_(not check("foo")) self.assert_(not check("foo"))
def test_rename(self): def test_rename(self):
"""osfs.rename tests"""
check = self.check check = self.check
self.fs.open("foo.txt", 'wt').write("Hello, World!") self.fs.open("foo.txt", 'wt').write("Hello, World!")
self.assert_(check("foo.txt")) self.assert_(check("foo.txt"))
self.fs.rename("foo.txt", "bar.txt") self.fs.rename("foo.txt", "bar.txt")
self.assert_(check("bar.txt")) self.assert_(check("bar.txt"))
def test_makefile(self):
check = self.check
self.fs.makefile("foo.txt", "Hello, World!")
data = self.fs.open("foo.txt").read()
self.assertEqual(data, "Hello, World!")
if __name__ == "__main__": if __name__ == "__main__":
#t = TestFS() #t = TestFS()
......
import shutil import shutil
def copyfile(src_fs, src_path, dst_fs, dst_path, chunk_size=1024*16): def copy_file(src_fs, src_path, dst_fs, dst_path, chunk_size=1024*16):
def getsyspath(_fs, path): """Copy a file from one filesystem to another. Will use system copyfile, if both files have a syspath.
try: Otherwise file will be copied a chunk at a time.
return _fs.getsyspath(path)
except NoSysPathError:
return ""
src_syspath = src_fs.getsyspath(src_path) src_fs -- Source filesystem object
dst_syspath = dst_fs.getsyspath(dst_path) src_path -- Source path
dst_fs -- Destination filesystem object
dst_path -- Destination filesystem object
chunk_size -- Size of chunks to move if system copyfile is not available (default 16K)
"""
src_syspath = src_fs.getsyspath(src_path, default="")
dst_syspath = dst_fs.getsyspath(dst_path, default="")
# System copy if there are two sys paths # System copy if there are two sys paths
if src_syspath and dst_syspath: if src_syspath and dst_syspath:
...@@ -28,9 +33,22 @@ def copyfile(src_fs, src_path, dst_fs, dst_path, chunk_size=1024*16): ...@@ -28,9 +33,22 @@ def copyfile(src_fs, src_path, dst_fs, dst_path, chunk_size=1024*16):
if not chunk: if not chunk:
break break
dst.write(chunk) dst.write(chunk)
finally: finally:
if src is not None: if src is not None:
src.close() src.close()
if dst is not None: if dst is not None:
dst.close() dst.close()
\ No newline at end of file
def get_total_data(count_fs):
"""Returns the total number of bytes contained within files.
count_fs -- A filesystem object
"""
total = 0
for f in count_fs.walkfiles(absolute=True):
total += count_fs.getsize(f)
return total
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment