Commit 4f40e842 by willmcgugan

Work in progress. More tests. Move helper function in to new helpers.py file.

parent a6c0aa73
from itertools import chain
def isabsolutepath(path):
"""Returns True if a given path is absolute.
>>> isabsolutepath("a/b/c")
False
>>> isabsolutepath("/foo/bar")
True
"""
if path:
return path[0] in '\\/'
return False
def normpath(path):
"""Normalizes a path to be in the formated expected by FS objects.
Returns a new path string.
>>> normpath(r"foo\\bar\\baz")
'foo/bar/baz'
"""
return path.replace('\\', '/')
def pathjoin(*paths):
"""Joins any number of paths together. Returns a new path string.
paths -- An iterable of path strings
>>> pathjoin('foo', 'bar', 'baz')
'foo/bar/baz'
>>> pathjoin('foo/bar', '../baz')
'foo/baz'
"""
absolute = False
relpaths = []
for p in paths:
if p:
if p[0] in '\\/':
del relpaths[:]
absolute = True
relpaths.append(p)
pathstack = []
for component in chain(*(normpath(path).split('/') for path in relpaths)):
if component == "..":
if not pathstack:
raise ValueError("Relative path is invalid")
sub = pathstack.pop()
elif component == ".":
pass
elif component:
pathstack.append(component)
if absolute:
return "/" + "/".join(pathstack)
else:
return "/".join(pathstack)
def pathsplit(path):
"""Splits a path on a path separator. Returns a tuple containing the path up
to that last separator and the remaining path component.
>>> pathsplit("foo/bar")
('foo', 'bar')
>>> pathsplit("foo/bar/baz")
('foo/bar', 'baz')
"""
split = normpath(path).rsplit('/', 1)
if len(split) == 1:
return ('', split[0])
return tuple(split)
def getroot(path):
return pathsplit(path)[0]
def getresourcename(path):
return pathsplit(path)[1]
def resolvepath(path):
"""Normalises the path and removes any relative path components.
path -- A path string
>>> resolvepath(r"foo\\bar\\..\\baz")
'foo/baz'
"""
return pathjoin(path)
def makerelative(path):
"""Makes a path relative by removing initial separator.
path -- A path
>>> makerelative("/foo/bar")
'foo/bar'
"""
path = normpath(path)
if path.startswith('/'):
return path[1:]
return path
def makeabsolute(path):
"""Makes a path absolute by adding a separater at the beginning of the path.
path -- A path
>>> makeabsolute("foo/bar/baz")
'/foo/bar/baz'
"""
path = normpath(path)
if not path.startswith('/'):
return '/'+path
return path
def issamedir(path1, path2):
dirname1, name1 = pathsplit(path1)
dirname2, name2 = pathsplit(path2)
return dirname1 == dirname2
......@@ -2,6 +2,7 @@
import os
import datetime
from fs import _iteratepath
from fs import *
try:
from cStringIO import StringIO
......@@ -95,6 +96,9 @@ class MemoryFS(FS):
class DirEntry(object):
def __init__(self, type, name, contents=None):
assert type in ("dir", "file"), "Type must be dir or file!"
self.type = type
self.name = name
self.permissions = None
......@@ -214,7 +218,7 @@ class MemoryFS(FS):
else:
if not allow_recreate:
if dirname in parent_dir.contents:
raise ResourceNotFoundError("NO_DIR", dirname, msg="Can not create a directory that already exists (try allow_recreate=True): %(path)s")
raise OperationFailedError("MAKEDIR_FAILED", dirname, msg="Can not create a directory that already exists (try allow_recreate=True): %(path)s")
current_dir = self.root
for path_component in _iteratepath(dirpath)[:-1]:
......@@ -354,7 +358,7 @@ class MemoryFS(FS):
if not dir_entry.isdir():
raise ResourceInvalid("WRONG_TYPE", path, msg="Can't remove resource, its not a directory: %(path)s" )
if not recursive and len(dir_entry.contents):
if dir_entry.contents:
raise OperationFailedError("REMOVEDIR_FAILED", "Directory is not empty: %(path)s")
if recursive:
......@@ -372,6 +376,8 @@ class MemoryFS(FS):
self._lock.release()
def rename(self, src, dst):
if not issamedir(src, dst):
raise ValueError("Destination path must the same directory (user the move method for moving to a different directory)")
self._lock.acquire()
try:
dir_entry = self._get_dir_entry(src)
......
#!/usr/bin/env python
from fs import FS, FSError, pathjoin, pathsplit, print_fs, _iteratepath, normpath, makeabsolute, makerelative, _synchronize
from fs import *
from objecttree import ObjectTree
from memoryfs import MemoryFS
......@@ -124,6 +124,17 @@ class MountFS(FS):
finally:
self._lock.release()
def makedir(self, path, mode=0777, recursive=False, allow_recreate=False):
path = normpath(path)
self._lock.acquire()
try:
fs, mount_path, delegate_path = self._delegate(path)
if fs is self:
raise UnsupportedError("UNSUPPORTED", msg="Can only makedir for mounted paths" )
return fs.makedir(delegate_path, mode, recursive=recursive, allow_recreate=allow_recreate)
finally:
self._lock.release()
def open(self, path, mode="r", **kwargs):
self._lock.acquire()
......@@ -163,6 +174,20 @@ class MountFS(FS):
finally:
self._lock.release()
def remove(self, path):
self._lock.acquire()
try:
path = normpath(path)
fs, mount_path, delegate_path = self._delegate(path)
if fs is None:
raise ResourceNotFoundError("NO_FILE", path)
if fs is self:
raise UnsupportedError("UNSUPPORTED", msg="Can only remove paths within a mounted dir" )
return fs.remove(delegate_path)
finally:
self._lock.release()
def removedir(self, path, recursive=False):
self._lock.acquire()
......@@ -171,27 +196,26 @@ class MountFS(FS):
path = normpath(path)
fs, mount_path, delegate_path = self._delegate(path)
if fs is not self:
return fs.removedir(delegate_path, path)
object = self.mount_tree.get(path, None)
if object is None or not isinstance(object, dict):
raise ResourceNotFound("NO_DIR", path)
if fs is None or fs is self:
raise OperationFailedError("REMOVEDIR_FAILED", path, msg="Can not removedir for an un-mounted path")
if not recursive and len(object):
raise OperationFailedError("REMOVEDIR_FAILED", path)
if not fs.isdirempty(delegate_path):
raise OperationFailedError("REMOVEDIR_FAILED", "Directory is not empty: %(path)s")
del self.mount_tree[delegate_path]
return fs.removedir(delegate_path, recursive)
finally:
self._lock.release()
def rename(self, src, dst):
if not issamedir(src, dst):
raise ValueError("Destination path must the same directory (user the move method for moving to a different directory)")
self._lock.acquire()
try:
fs1, mount_path1, delegate_path1 = self._delegate(path)
fs2, mount_path2, delegate_path2 = self._delegate(path)
fs1, mount_path1, delegate_path1 = self._delegate(src)
fs2, mount_path2, delegate_path2 = self._delegate(dst)
if fs1 is not fs2:
raise OperationFailedError("RENAME_FAILED", src)
......@@ -273,123 +297,11 @@ class MountFS(FS):
size = self.mount_tree[path].info_callable(path).get("size", None)
return size
return fs.getinfo(delegate_path).getsize()
return fs.getinfo(delegate_path).get("size", None)
except:
self._lock.release()
#
#class MountFS(FS):
#
# class Mount(object):
# def __init__(self, path, memory_fs, value, mode):
# self.path = path
# memory_fs._on_close_memory_file(path, self)
# self.fs = None
#
# def __str__(self):
# return "Mount pont: %s, %s" % (self.path, str(self.fs))
#
# def get_mount(self, path, memory_fs, value, mode):
#
# dir_entry = memory_fs._get_dir_entry(path)
# if dir_entry is None or dir_entry.data is None:
# return MountFS.Mount(path, memory_fs, value, mode)
# else:
# return dir_entry.data
#
# def __init__(self):
# self.mounts = {}
# self.mem_fs = MemoryFS(file_factory=self.get_mount)
#
# def _delegate(self, path):
# path_components = list(_iteratepath(path))
#
# current_dir = self.mem_fs.root
# for i, path_component in enumerate(path_components):
#
# if current_dir is None:
# return None, None, None
#
# if '.mount' in current_dir.contents:
# break
#
# dir_entry = current_dir.contents.get(path_component, None)
# current_dir = dir_entry
# else:
# i = len(path_components)
#
# if '.mount' in current_dir.contents:
#
# mount_point = '/'.join(path_components[:i])
# mount_filename = pathjoin(mount_point, '.mount')
#
# mount = self.mem_fs.open(mount_filename, 'r')
# delegate_path = '/'.join(path_components[i:])
# return mount.fs, mount_point, delegate_path
#
# return self, "", path
#
# def desc(self, path):
# fs, mount_path, delegate_path = self._delegate(path)
# if fs is self:
# return "Mount dir"
#
# return "Mounted dir, maps to path %s on %s" % (delegate_path, str(fs))
#
# def isdir(self, path):
# fs, mount_path, delegate_path = self._delegate(path)
# if fs is None:
# return False
#
# if fs is self:
# return True
# else:
# return fs.isdir(delegate_path)
#
# def listdir(self, path="/", wildcard=None, full=False, absolute=False, hidden=False, dirs_only=False, files_only=False):
# fs, mount_path, delegate_path = self._delegate(path)
#
# if fs is None:
# raise ResourceNotFoundError("NO_DIR", path)
#
# if fs is self:
# if files_only:
# return []
# return self.mem_fs.listdir(path,
# wildcard=wildcard,
# full=full,
# absolute=absolute,
# hidden=hidden,
# dirs_only=True,
# files_only=False)
# else:
# paths = fs.listdir(delegate_path,
# wildcard=wildcard,
# full=full,
# absolute=absolute,
# hidden=hidden,
# dirs_only=dirs_only,
# files_only=files_only)
# if full or absolute:
# if full:
# mount_path = makeabsolute(mount_path)
# else:
# mount_path = makerelative(mount_path)
# paths = [pathjoin(mount_path, path) for path in paths]
#
# return paths
#
# def mount(self, name, path, fs):
# self.mem_fs.makedir(path, recursive=True)
# mount_filename = pathjoin(path, '.mount')
# mount = self.mem_fs.open(mount_filename, 'w')
# mount.name = name
# mount.fs = fs
#
# self.mounts[name] = (path, fs)
if __name__ == "__main__":
help(MountFS)
......
......@@ -101,7 +101,7 @@ class MultiFS(FS):
try:
fs = self._delegate_search(path)
if fs is not None:
return fs.getsyspath(path, allow_none)
return fs.getsyspath(path, allow_none=allow_none)
raise ResourceNotFoundError("NO_RESOURCE", path)
finally:
self._lock.release()
......@@ -120,12 +120,12 @@ class MultiFS(FS):
self._lock.release()
def open(self, path, mode="r", buffering=-1, **kwargs):
def open(self, path, mode="r",**kwargs):
self._lock.acquire()
try:
for fs in self:
if fs.exists(path):
fs_file = fs.open(path, mode, buffering, **kwargs)
fs_file = fs.open(path, mode, **kwargs)
return fs_file
raise ResourceNotFoundError("NO_FILE", path)
......@@ -206,6 +206,8 @@ class MultiFS(FS):
self._lock.release()
def rename(self, src, dst):
if not issamedir(src, dst):
raise ValueError("Destination path must the same directory (user the move method for moving to a different directory)")
self._lock.acquire()
try:
for fs in self:
......
......@@ -23,11 +23,11 @@ class OSFS(FS):
sys_path = os.path.join(self.root_path, makerelative(self._resolve(path)))
return sys_path
def open(self, path, mode="r", buffering=-1, **kwargs):
def open(self, path, mode="r", **kwargs):
try:
f = open(self.getsyspath(path), mode, buffering)
f = open(self.getsyspath(path), mode, kwargs.get("buffering", -1))
except IOError, e:
raise OperationFailedError("OPEN_FAILED", path, details=e, msg=str(details))
raise OperationFailedError("OPEN_FAILED", path, details=e, msg=str(e))
return f
......@@ -54,19 +54,21 @@ class OSFS(FS):
return self._listdir_helper(path, paths, wildcard, full, absolute, hidden, dirs_only, files_only)
def makedir(self, path, mode=0777, recursive=False):
def makedir(self, path, mode=0777, recursive=False, allow_recreate=False):
sys_path = self.getsyspath(path)
try:
if recursive:
os.makedirs(sys_path, mode)
else:
if not allow_recreate and self.exists(path):
raise OperationFailedError("MAKEDIR_FAILED", dirname, msg="Can not create a directory that already exists (try allow_recreate=True): %(path)s")
try:
os.mkdir(sys_path, mode)
except OSError, e:
raise FSError("NO_DIR", sys_path)
raise OperationFailedError("MAKEDIR_FAILED", path)
except OSError, e:
raise FSError("OS_ERROR", sys_path, details=e)
raise OperationFailedError("MAKEDIR_FAILED", path, details=e)
def remove(self, path):
sys_path = self.getsyspath(path)
......@@ -90,6 +92,8 @@ class OSFS(FS):
raise OperationFailedError("REMOVEDIR_FAILED", path, details=e)
def rename(self, src, dst):
if not issamedir(src, dst):
raise ValueError("Destination path must the same directory (user the move method for moving to a different directory)")
path_src = self.getsyspath(src)
path_dst = self.getsyspath(dst)
......
......@@ -2,6 +2,7 @@
import unittest
import fs
from helpers import *
import shutil
class TestHelpers(unittest.TestCase):
......@@ -46,10 +47,10 @@ class TestHelpers(unittest.TestCase):
result = testpaths[-1]
self.assertEqual(fs.pathjoin(*paths), result)
self.assertRaises(fs.PathError, fs.pathjoin, "../")
self.assertRaises(fs.PathError, fs.pathjoin, "./../")
self.assertRaises(fs.PathError, fs.pathjoin, "a/b", "../../..")
self.assertRaises(fs.PathError, fs.pathjoin, "a/b/../../../d")
self.assertRaises(ValueError, fs.pathjoin, "../")
self.assertRaises(ValueError, fs.pathjoin, "./../")
self.assertRaises(ValueError, fs.pathjoin, "a/b", "../../..")
self.assertRaises(ValueError, fs.pathjoin, "a/b/../../../d")
def test_makerelative(self):
tests = [ ("/a/b", "a/b"),
......@@ -132,7 +133,6 @@ class TestObjectTree(unittest.TestCase):
self.assertEqual(right, "f/g")
import tempfile
import osfs
import os
......@@ -148,7 +148,7 @@ class TestOSFS(unittest.TestCase):
shutil.rmtree(self.temp_dir)
def check(self, p):
return os.path.exists(os.path.join(self.temp_dir, p))
return os.path.exists(os.path.join(self.temp_dir, makerelative(p)))
def test_makedir(self):
check = self.check
......@@ -256,6 +256,35 @@ class TestOSFS(unittest.TestCase):
size = self.fs.getsize("info.txt")
self.assertEqual(size, len(test_str))
def test_movefile(self):
check = self.check
contents = "If the implementation is hard to explain, it's a bad idea."
def makefile(path):
f = self.fs.open(path, "wb")
f.write(contents)
f.close()
def checkcontents(path):
f = self.fs.open(path, "rb")
check_contents = f.read()
f.close()
return contents == check_contents
self.fs.makedir("foo/bar", recursive=True)
makefile("foo/bar/a.txt")
self.assert_(check("foo/bar/a.txt"))
self.assert_(checkcontents("foo/bar/a.txt"))
self.fs.move("foo/bar/a.txt", "foo/b.txt")
self.assert_(not check("foo/bar/a.txt"))
self.assert_(check("foo/b.txt"))
self.assert_(checkcontents("foo/b.txt"))
self.fs.move("foo/b.txt", "c.txt")
fs.print_fs(self.fs)
self.assert_(not check("foo/b.txt"))
self.assert_(check("/c.txt"))
self.assert_(checkcontents("/c.txt"))
class TestSubFS(TestOSFS):
def setUp(self):
......@@ -269,8 +298,9 @@ class TestSubFS(TestOSFS):
shutil.rmtree(self.temp_dir)
def check(self, p):
p = os.path.join("foo/bar", p)
return os.path.exists(os.path.join(self.temp_dir, p))
p = os.path.join("foo/bar", makerelative(p))
full_p = os.path.join(self.temp_dir, p)
return os.path.exists(full_p)
import memoryfs
......@@ -286,6 +316,22 @@ class TestMemoryFS(TestOSFS):
return self.fs.exists(p)
import mountfs
class TestMountFS(TestOSFS):
def setUp(self):
self.mount_fs = mountfs.MountFS()
self.mem_fs = memoryfs.MemoryFS()
self.mount_fs.mountdir("mounted/memfs", self.mem_fs)
self.fs = self.mount_fs.opendir("mounted/memfs")
def tearDown(self):
pass
def check(self, p):
return self.mount_fs.exists(os.path.join("mounted/memfs", makerelative(p)))
if __name__ == "__main__":
#t = TestFS()
#t.setUp()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment