Commit d5fb43e5 by willmcgugan

Moved OSFS to own file, added tests

parent 61eb97ee
......@@ -40,6 +40,14 @@ class FSError(Exception):
return '%s. %s' % (self.code, msg)
class PathError(Exception):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return self.msg
class NullFile:
......@@ -88,7 +96,7 @@ class NullFile:
def isabsolutepath(path):
if path:
return path[1] in '\\/'
return path[0] in '\\/'
return False
def normpath(path):
......@@ -112,7 +120,7 @@ def pathjoin(*paths):
for component in chain(*(normpath(path).split('/') for path in relpaths)):
if component == "..":
if not pathstack:
raise PathError("INVALID_PATH", str(paths))
raise PathError("relative path is invalid")
sub = pathstack.pop()
elif component == ".":
pass
......@@ -131,7 +139,6 @@ def pathsplit(path):
return ('', split[0])
return split
def resolvepath(path):
return pathjoin(path)
......@@ -140,6 +147,11 @@ def makerelative(path):
return path[1:]
return path
def makeabsolute(path):
if not path.startswith('/'):
return '/'+path
return path
def _iteratepath(path, numsplits=None):
path = resolvepath(path)
......@@ -197,26 +209,26 @@ class FS(object):
def getsyspath(self, path):
raise FSError("NO_SYS_PATH", path)
def safeopen(self, *args, **kwargs):
try:
f = self.open(*args, **kwargs)
except FSError, e:
if e.code == "NO_FILE":
return NullFile()
raise
def desc(self, path):
if not self.exists(path):
if not self.exists(path):
return "No description available"
try:
sys_path = self.getsyspath(path)
except FSError:
return "No description available"
if self.isdir(path):
return "OS dir, maps to %s" % sys_path
else:
......@@ -364,182 +376,31 @@ class SubFS(FS):
return self.parent.listdir(self._delegate(path), wildcard, full, absolute, hidden, dirs_only, files_only)
class OSFS(FS):
def __init__(self, root_path):
expanded_path = normpath(os.path.expanduser(os.path.expandvars(root_path)))
if not os.path.exists(expanded_path):
raise FSError("NO_DIR", expanded_path, msg="Root directory does not exist: %(path)s")
if not os.path.isdir(expanded_path):
raise FSError("NO_DIR", expanded_path, msg="Root path is not a directory: %(path)s")
self.root_path = normpath(os.path.abspath(expanded_path))
def __str__(self):
return "<OSFS \"%s\">" % self.root_path
def getsyspath(self, pathname):
sys_path = os.path.join(self.root_path, makerelative(self._resolve(pathname)))
return sys_path
def open(self, pathname, mode="r", buffering=-1, **kwargs):
try:
f = open(self.getsyspath(pathname), mode, buffering)
except IOError, e:
raise FSError("OPEN_FAILED", pathname, details=e, msg=str(details))
return f
def exists(self, pathname):
pathname = self.getsyspath(pathname)
return os.path.exists(pathname)
def isdir(self, pathname):
path = self.getsyspath(pathname)
return os.path.isdir(path)
def isfile(self, pathname):
path = self.getsyspath(pathname)
return os.path.isfile(path)
def ishidden(self, pathname):
return pathname.startswith('.')
def listdir(self, path="./", wildcard=None, full=False, absolute=False, hidden=False, dirs_only=False, files_only=False):
try:
paths = os.listdir(self.getsyspath(path))
except (OSError, IOError), e:
raise FSError("LISTDIR_FAILED", path, details=e, msg="Unable to get directory listing: %(path)s - (%(details)s)")
return self._listdir_helper(path, paths, wildcard, full, absolute, hidden, dirs_only, files_only)
def mkdir(self, path, mode=0777, recursive=False):
sys_path = self.getsyspath(path)
if recursive:
os.makedirs(sys_path, mode)
def validatefs(fs):
expected_methods = [ "abspath",
"getsyspath",
"open",
"exists",
"isdir",
"isfile",
"ishidden",
"listdir",
"mkdir",
"remove",
"removedir",
"getinfo",
"getsize",
]
pad_size = len(max(expected_methods, key=str.__len__))
count = 0
for method_name in sorted(expected_methods):
method = getattr(fs, method_name, None)
if method is None:
print method_name.ljust(pad_size), '?'
else:
os.makedir(sys_path, mode)
def remove(self, path):
sys_path = self.getsyspath(path)
try:
os.remove(sys_path)
except OSError, e:
raise FSError("FILE_DELETE_FAILED", path, details=e)
def removedir(self, path, recursive=False):
sys_path = self.getsyspath(path)
if recursive:
try:
os.rmdir(sys_path)
except OSError, e:
raise FSError("DIR_DELETE_FAILED", path, details=e)
else:
try:
os.removedirs(sys_path)
except OSError, e:
raise FSError("DIR_DELETE_FAILED", path, details=e)
def getinfo(self, path):
sys_path = self.getsyspath(path)
try:
stats = os.stat(sys_path)
except OSError, e:
raise FSError("UNKNOWN_ERROR", path, details=e)
info = dict((k, getattr(stats, k)) for k in dir(stats) if not k.startswith('__') )
info['size'] = info['st_size']
ct = info.get('st_ctime', None)
if ct is not None:
info['created_time'] = datetime.datetime.fromtimestamp(ct)
at = info.get('st_atime', None)
if at is not None:
info['accessed_time'] = datetime.datetime.fromtimestamp(at)
mt = info.get('st_mtime', None)
if mt is not None:
info['modified_time'] = datetime.datetime.fromtimestamp(at)
return info
def getsize(self, path):
sys_path = self.getsyspath(path)
try:
stats = os.stat(sys_path)
except OSError, e:
raise FSError("UNKNOWN_ERROR", path, details=e)
return stats.st_size
if __name__ == "__main__":
osfs = OSFS("~/projects")
print osfs
for filename in osfs.walk_files("/", "*.pov"):
print filename
print osfs.getinfo(filename)
import browsewin
browsewin.browse(osfs)
#print_fs(osfs)
#print osfs.listdir("/projects/fs")
#sub_fs = osfs.open_dir("projects/")
#print sub_fs
#sub_fs.open('test.txt')
#print sub_fs.listdir(dirs_only=True)
#print sub_fs.listdir()
#print_fs(sub_fs, max_levels=2)
#for f in osfs.listdir():
# print f
#print osfs.listdir('projects', dirs_only=True, wildcard="d*")
#print_fs(osfs, 'projects/')
print pathjoin('/', 'a')
print pathjoin('a/b/c', '../../e/f')
\ No newline at end of file
print method_name.ljust(pad_size), 'X'
count += 1
print
print "%i out of %i methods" % (count, len(expected_methods))
......@@ -155,8 +155,8 @@ class MemoryFS(FS):
self.dir_entry_factory = MemoryFS.DirEntry
self.file_factory = file_factory or MemoryFile
self.root = self._make_dir_entry('dir', 'root')
self.root = self._make_dir_entry('dir', 'root')
def __str__(self):
return "<MemoryFS>"
......@@ -178,7 +178,7 @@ class MemoryFS(FS):
raise FSError("NO_SYS_PATH", pathname, msg="This file-system has no syspath")
def desc(self, path):
if self.isdir(path):
return "Memory dir"
elif self.isfile(path):
......@@ -367,7 +367,6 @@ class MemoryFS(FS):
return info
def ishidden(self, pathname):
return False
......
......@@ -36,7 +36,7 @@ class MountFS(FS):
for i, path_component in enumerate(path_components):
if current_dir is None:
return None, None
return None, None, None
if '.mount' in current_dir.contents:
break
......@@ -53,14 +53,14 @@ class MountFS(FS):
mount = self.mem_fs.open(mount_filename, 'r')
delegate_path = '/'.join(path_components[i:])
return mount.fs, delegate_path
return mount.fs, mount_point, delegate_path
return self, path
return self, "", path
def desc(self, path):
fs, delegate_path = self._delegate(path)
fs, mount_path, delegate_path = self._delegate(path)
if fs is self:
return "Mount dir"
......@@ -69,7 +69,7 @@ class MountFS(FS):
def isdir(self, path):
fs, delegate_path = self._delegate(path)
fs, mount_path, delegate_path = self._delegate(path)
if fs is None:
return False
......@@ -80,16 +80,37 @@ class MountFS(FS):
def listdir(self, path="/", wildcard=None, full=False, absolute=False, hidden=False, dirs_only=False, files_only=False):
fs, delegate_path = self._delegate(path)
fs, mount_path, delegate_path = self._delegate(path)
if fs is None:
raise FSError("NO_DIR", path)
if fs is self:
return self.mem_fs.listdir(path, wildcard=wildcard, full=full, absolute=absolute, hidden=hidden, dirs_only=True, files_only=False)
if files_only:
return []
return self.mem_fs.listdir(path,
wildcard=wildcard,
full=full,
absolute=absolute,
hidden=hidden,
dirs_only=True,
files_only=False)
else:
return fs.listdir(delegate_path, wildcard=wildcard, full=full, absolute=absolute, hidden=hidden, dirs_only=dirs_only, files_only=files_only)
paths = fs.listdir(delegate_path,
wildcard=wildcard,
full=full,
absolute=absolute,
hidden=hidden,
dirs_only=dirs_only,
files_only=files_only)
if full or absolute:
if full:
mount_path = makeabsolute(mount_path)
else:
mount_path = makerelative(mount_path)
paths = [pathjoin(mount_path, path) for path in paths]
return paths
def mount(self, name, path, fs):
......@@ -110,6 +131,7 @@ if __name__ == "__main__":
#print_fs(fs1)
mountfs = MountFS()
mountfs.mount("fs1", '1/2', fs1)
mountfs.mount("fs1", '1/another', fs1)
......
......@@ -45,16 +45,16 @@ class MultiFS(FS):
if fs.exists(path):
return fs
return None
def which(self, path):
for fs in self:
if fs.exists(path):
for fs_name, fs_object in self.fs_lookup.iteritems():
if fs is fs_object:
return fs_name, fs
return None, None
def getsyspath(self, path):
......@@ -66,16 +66,16 @@ class MultiFS(FS):
def desc(self, path):
if not self.exists(path):
raise FSError("NO_RESOURCE", path)
name, fs = self.which(path)
if name is None:
return ""
return "%s, on %s (%s)" % (fs.desc(path), name, fs)
def open(self, path, mode="r", buffering=-1, **kwargs):
......@@ -149,9 +149,10 @@ class MultiFS(FS):
if __name__ == "__main__":
import fs
osfs = fs.OSFS('~/')
import osfs
osfs = osfs.OSFS('~/')
import memoryfs
mem_fs = memoryfs.MemoryFS()
......@@ -162,11 +163,11 @@ if __name__ == "__main__":
mem_fs.open("projects/test2/readme.txt", 'w').write("Hello, World!")
mem_fs.open("projects/A/readme.txt", 'w').write("\nSecond Line")
multifs = MultiFS()
multifs.addfs("osfs", osfs)
multifs.addfs("mem_fs", mem_fs)
import browsewin
browsewin.browse(multifs)
\ No newline at end of file
#!/usr/bin/env python
from fs import *
class OSFS(FS):
def __init__(self, root_path):
expanded_path = normpath(os.path.expanduser(os.path.expandvars(root_path)))
if not os.path.exists(expanded_path):
raise FSError("NO_DIR", expanded_path, msg="Root directory does not exist: %(path)s")
if not os.path.isdir(expanded_path):
raise FSError("NO_DIR", expanded_path, msg="Root path is not a directory: %(path)s")
self.root_path = normpath(os.path.abspath(expanded_path))
def __str__(self):
return "<OSFS \"%s\">" % self.root_path
def getsyspath(self, pathname):
sys_path = os.path.join(self.root_path, makerelative(self._resolve(pathname)))
return sys_path
def open(self, pathname, mode="r", buffering=-1, **kwargs):
try:
f = open(self.getsyspath(pathname), mode, buffering)
except IOError, e:
raise FSError("OPEN_FAILED", pathname, details=e, msg=str(details))
return f
def exists(self, pathname):
pathname = self.getsyspath(pathname)
return os.path.exists(pathname)
def isdir(self, pathname):
path = self.getsyspath(pathname)
return os.path.isdir(path)
def isfile(self, pathname):
path = self.getsyspath(pathname)
return os.path.isfile(path)
def ishidden(self, pathname):
return pathname.startswith('.')
def listdir(self, path="./", wildcard=None, full=False, absolute=False, hidden=False, dirs_only=False, files_only=False):
try:
paths = os.listdir(self.getsyspath(path))
except (OSError, IOError), e:
raise FSError("LISTDIR_FAILED", path, details=e, msg="Unable to get directory listing: %(path)s - (%(details)s)")
return self._listdir_helper(path, paths, wildcard, full, absolute, hidden, dirs_only, files_only)
def mkdir(self, path, mode=0777, recursive=False):
sys_path = self.getsyspath(path)
if recursive:
os.makedirs(sys_path, mode)
else:
os.makedir(sys_path, mode)
def remove(self, path):
sys_path = self.getsyspath(path)
try:
os.remove(sys_path)
except OSError, e:
raise FSError("FILE_DELETE_FAILED", path, details=e)
def removedir(self, path, recursive=False):
sys_path = self.getsyspath(path)
if recursive:
try:
os.rmdir(sys_path)
except OSError, e:
raise FSError("DIR_DELETE_FAILED", path, details=e)
else:
try:
os.removedirs(sys_path)
except OSError, e:
raise FSError("DIR_DELETE_FAILED", path, details=e)
def getinfo(self, path):
sys_path = self.getsyspath(path)
try:
stats = os.stat(sys_path)
except OSError, e:
raise FSError("UNKNOWN_ERROR", path, details=e)
info = dict((k, getattr(stats, k)) for k in dir(stats) if not k.startswith('__') )
info['size'] = info['st_size']
ct = info.get('st_ctime', None)
if ct is not None:
info['created_time'] = datetime.datetime.fromtimestamp(ct)
at = info.get('st_atime', None)
if at is not None:
info['accessed_time'] = datetime.datetime.fromtimestamp(at)
mt = info.get('st_mtime', None)
if mt is not None:
info['modified_time'] = datetime.datetime.fromtimestamp(at)
return info
def getsize(self, path):
sys_path = self.getsyspath(path)
try:
stats = os.stat(sys_path)
except OSError, e:
raise FSError("UNKNOWN_ERROR", path, details=e)
return stats.st_size
if __name__ == "__main__":
osfs = OSFS("~/projects")
print osfs
for filename in osfs.walk_files("/", "*.pov"):
print filename
print osfs.getinfo(filename)
validatefs(osfs)
import browsewin
browsewin.browse(osfs)
#print_fs(osfs)
#print osfs.listdir("/projects/fs")
#sub_fs = osfs.open_dir("projects/")
#print sub_fs
#sub_fs.open('test.txt')
#print sub_fs.listdir(dirs_only=True)
#print sub_fs.listdir()
#print_fs(sub_fs, max_levels=2)
#for f in osfs.listdir():
# print f
#print osfs.listdir('projects', dirs_only=True, wildcard="d*")
#print_fs(osfs, 'projects/')
print pathjoin('/', 'a')
print pathjoin('a/b/c', '../../e/f')
\ No newline at end of file
#!/usr/bin/env python
import nose
nose.run()
#!/usr/bin/env python
import unittest
import fs
class TestHelpers(unittest.TestCase):
def test_isabsolutepath(self):
tests = [ ('', False),
('/', True),
('/A/B', True),
('/asdasd', True),
('a/b/c', False),
]
for path, result in tests:
self.assertEqual(fs.isabsolutepath(path), result)
def test_normpath(self):
tests = [ ("\\a\\b\\c", "/a/b/c"),
("", ""),
("/a/b/c", "/a/b/c"),
]
for path, result in tests:
self.assertEqual(fs.normpath(path), result)
def test_pathjon(self):
tests = [ ("", "a", "a"),
("a", "a", "a/a"),
("a/b", "../c", "a/c"),
("a/b/../c", "d", "a/c/d"),
("/a/b/c", "d", "/a/b/c/d"),
("/a/b/c", "../../../d", "/d"),
("a", "b", "c", "a/b/c"),
("a/b/c", "../d", "c", "a/b/d/c"),
("a/b/c", "../d", "/a", "/a"),
("aaa", "bbb/ccc", "aaa/bbb/ccc"),
("aaa", "bbb\ccc", "aaa/bbb/ccc"),
("aaa", "bbb", "ccc", "/aaa", "eee", "/aaa/eee"),
("a/b", "./d", "e", "a/b/d/e"),
("/", "/", "/"),
("/", "", "/"),
]
for testpaths in tests:
paths = testpaths[:-1]
result = testpaths[-1]
self.assertEqual(fs.pathjoin(*paths), result)
self.assertRaises(fs.PathError, fs.pathjoin, "a/b", "../../..")
self.assertRaises(fs.PathError, fs.pathjoin, "a/b/../../../d")
def test_makerelative(self):
tests = [ ("/a/b", "a/b"),
("a/b", "a/b"),
("/", "") ]
for path, result in tests:
print path, result
self.assertEqual(fs.makerelative(path), result)
def test_absolute(self):
tests = [ ("/a/b", "/a/b"),
("a/b", "/a/b"),
("/", "/") ]
for path, result in tests:
self.assertEqual(fs.makeabsolute(path), result)
def test_iteratepath(self):
tests = [ ("a/b", ["a", "b"]),
("", [] ),
("aaa/bbb/ccc", ["aaa", "bbb", "ccc"]),
("a/b/c/../d", ["a", "b", "d"]) ]
for path, results in tests:
print repr(path), results
for path_component, expected in zip(fs._iteratepath(path), results):
self.assertEqual(path_component, expected)
self.assertEqual(list(fs._iteratepath("a/b/c/d", 1)), ["a", "b/c/d"])
self.assertEqual(list(fs._iteratepath("a/b/c/d", 2)), ["a", "b", "c/d"])
if __name__ == "__main__":
import nose
nose.run()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment