Commit 14f06e4f by willmcgugan

Work in progress for memoryfs

parent 3038fbda
...@@ -122,12 +122,12 @@ def _iteratepath(path, numsplits=None): ...@@ -122,12 +122,12 @@ def _iteratepath(path, numsplits=None):
path = resolvepath(path) path = resolvepath(path)
if not path: if not path:
return [] return []
if numsplits == None: if numsplits == None:
return path.split('/') return filter(lambda p:bool(p), path.split('/'))
else: else:
return path.split('/', numsplits) return filter(lambda p:bool(p), path.split('/', numsplits))
def print_fs(fs, path="/", max_levels=None, indent=' '*2): def print_fs(fs, path="/", max_levels=None, indent=' '*2):
...@@ -135,7 +135,7 @@ def print_fs(fs, path="/", max_levels=None, indent=' '*2): ...@@ -135,7 +135,7 @@ def print_fs(fs, path="/", max_levels=None, indent=' '*2):
def print_dir(fs, path, level): def print_dir(fs, path, level):
try: try:
dir_listing = [(fs.isdir(p), p) for p in fs.listdir(path)] dir_listing = [(fs.isdir(pathjoin(path,p)), p) for p in fs.listdir(path)]
except FSError: except FSError:
print indent*level + "... unabled to retrieve directory list (%s) ..." % str(e) print indent*level + "... unabled to retrieve directory list (%s) ..." % str(e)
return return
...@@ -157,6 +157,20 @@ def print_fs(fs, path="/", max_levels=None, indent=' '*2): ...@@ -157,6 +157,20 @@ def print_fs(fs, path="/", max_levels=None, indent=' '*2):
class FS(object): class FS(object):
def _resolve(self, pathname):
resolved_path = resolvepath(pathname)
return resolved_path
def abspath(self, pathname):
pathname = normpath(pathname)
if not pathname.startswith('/'):
return pathjoin('/', pathname)
return pathname
def open(self, pathname, mode, **kwargs): def open(self, pathname, mode, **kwargs):
...@@ -238,8 +252,6 @@ class OSFS(FS): ...@@ -238,8 +252,6 @@ class OSFS(FS):
expanded_path = normpath(os.path.expanduser(root_path)) expanded_path = normpath(os.path.expanduser(root_path))
print expanded_path
if not os.path.exists(expanded_path): if not os.path.exists(expanded_path):
raise FSError("PATH_NOT_EXIST", "Root path does not exist: %(path)s", expanded_path) raise FSError("PATH_NOT_EXIST", "Root path does not exist: %(path)s", expanded_path)
if not os.path.isdir(expanded_path): if not os.path.isdir(expanded_path):
...@@ -251,26 +263,14 @@ class OSFS(FS): ...@@ -251,26 +263,14 @@ class OSFS(FS):
def __str__(self): def __str__(self):
return "<OSFS \"%s\">" % self.root_path return "<OSFS \"%s\">" % self.root_path
def _resolve(self, pathname):
resolved_path = resolvepath(pathname)
#print "Resolved_path", resolved_path
return resolved_path
def getsyspath(self, pathname): def getsyspath(self, pathname):
#print makerelative(self._resolve(pathname)) sys_path = os.path.join(self.root_path, makerelative(self._resolve(pathname)))
sys_path = os.path.join(self.root_path, makerelative(self._resolve(pathname)))
#print "Sys path", sys_path
return sys_path return sys_path
def abspath(self, pathname):
pathname = normpath(pathname)
if not pathname.startswith('/'):
return pathjoin('/', pathname)
return pathname
def open(self, pathname, mode="r", buffering=-1): def open(self, pathname, mode="r", buffering=-1):
...@@ -319,6 +319,7 @@ class OSFS(FS): ...@@ -319,6 +319,7 @@ class OSFS(FS):
else: else:
makedir(sys_path, mode) makedir(sys_path, mode)
def remove(self, path): def remove(self, path):
sys_path = self.getsyspath(path) sys_path = self.getsyspath(path)
......
#!/usr/bin/env python #!/usr/bin/env python
from fs import FS, pathsplit, _iteratepath, FSError from fs import FS, pathsplit, _iteratepath, FSError, print_fs
class MemoryFS(FS): class MemoryFS(FS):
...@@ -17,11 +17,20 @@ class MemoryFS(FS): ...@@ -17,11 +17,20 @@ class MemoryFS(FS):
self.contents = contents self.contents = contents
def desc_contents(self):
if self.isfile():
return "<file>"
elif self.isdir():
return "<dir %s>"%"".join( "%s: %s"% (k, v.desc_contents()) for k, v in self.contents.iteritems())
def isdir(self): def isdir(self):
return self.type == "dir" return self.type == "dir"
def isfile(self): def isfile(self):
return self.type == "file" return self.type == "file"
def __str__(self):
return "%s: %s" % (self.name, self.desc_contents())
def _make_dir_entry(self, *args, **kwargs): def _make_dir_entry(self, *args, **kwargs):
...@@ -33,9 +42,10 @@ class MemoryFS(FS): ...@@ -33,9 +42,10 @@ class MemoryFS(FS):
self.root = self._make_dir_entry('dir', 'root') self.root = self._make_dir_entry('dir', 'root')
def _get_dir_entry(self, dirpath): def _get_dir_entry(self, dirpath):
current_dir = self.root current_dir = self.root
#print _iteratepath(dirpath)
for path_component in _iteratepath(dirpath): for path_component in _iteratepath(dirpath):
dir_entry = current_dir.contents.get(path_component, None) dir_entry = current_dir.contents.get(path_component, None)
if dir_entry is None: if dir_entry is None:
...@@ -45,56 +55,47 @@ class MemoryFS(FS): ...@@ -45,56 +55,47 @@ class MemoryFS(FS):
current_dir = dir_entry current_dir = dir_entry
return current_dir return current_dir
def getsyspath(self, pathname):
raise FSError("NO_SYS_PATH", "This file-system has not syspath!", pathname)
def isdir(path): def isdir(self, path):
dir_item = self._get_dir_entry(path) dir_item = self._get_dir_entry(self._resolve(path))
if dir_item is None: if dir_item is None:
return False return False
return dir_item.isdir() return dir_item.isdir()
def isfile(path): def isfile(self, path):
dir_item = self._get_dir_entry(path) dir_item = self._get_dir_entry(self._resolve(path))
if dir_item is None: if dir_item is None:
return False return False
return dir_item.isfile() return dir_item.isfile()
def exists(path): def exists(self, path):
return self._getdir(path) is not None return self._getdir(path) is not None
def mkdir(self, dirname, mode=0777, recursive=False, allow_recreate=False): def mkdir(self, dirname, mode=0777, recursive=False, allow_recreate=False):
if not recursive: fullpath = dirname
dirpath, dirname = pathsplit(dirname) dirpath, dirname = pathsplit(dirname)
parent_dir = self._get_dir_entry(dirpath)
if parent_dir is None:
raise FSError("NO_DIR", "Could not make dir, as parent dir does not exist: %(path)s", dirname )
dir_item = parent_dir.contents.get(dirname, None)
if dir_item is not None:
if dir_item.isdir():
if not allow_recreate:
raise FSError("CANNOT_RECREATE_DIR", "Can not create a directory that already exists (try allow_recreate=True): %(path)s", dirname)
else:
raise FSError("CANNOT_CREATE_DIR", "Can not create a directory, because path references a file: %(path)s", dirname)
if dir_item is None:
dir_item.contents[dirname] = self._make_dir_entry("dir", dirname)
else: if recursive:
dirpath, dirname = pathsplit(dirname)
parent_dir = self._get_dir_entry(dirpath) parent_dir = self._get_dir_entry(dirpath)
if parent_dir is not None: if parent_dir is not None:
if parent_dir.isfile(): if parent_dir.isfile():
raise FSError("CANNOT_CREATE_DIR", "Can not create a directory, because path references a file: %(path)s", dirname) raise FSError("CANNOT_CREATE_DIR", "Can not create a directory, because path references a file: %(path)s", dirname)
else: else:
if not allow_recreate: if not allow_recreate:
raise FSError("CANNOT_RECREATE_DIR", "Can not create a directory that already exists (try allow_recreate=True): %(path)s", dirname) if dirname in parent_dir.contents:
raise FSError("CANNOT_RECREATE_DIR", "Can not create a directory that already exists (try allow_recreate=True): %(path)s", dirname)
current_dir = self.root current_dir = self.root
for path_component in list(_iteratepath(dirname))[:-2]: for path_component in _iteratepath(dirpath)[:-1]:
dir_item = current_dir.contents.get(path_component, None) dir_item = current_dir.contents.get(path_component, None)
if dir_item is None: if dir_item is None:
break break
...@@ -102,20 +103,57 @@ class MemoryFS(FS): ...@@ -102,20 +103,57 @@ class MemoryFS(FS):
raise FSError("CANNOT_CREATE_DIR", "Can not create a directory, because path references a file: %(path)s", dirname) raise FSError("CANNOT_CREATE_DIR", "Can not create a directory, because path references a file: %(path)s", dirname)
current_dir = dir_item.contents current_dir = dir_item.contents
current_dir = self.root current_dir = self.root
for path_component in _iteratepath(dirname): for path_component in _iteratepath(dirpath):
dir_item = current_dir.contents.get(path_component, None) dir_item = current_dir.contents.get(path_component, None)
if dir_item is None: if dir_item is None:
new_dir = self._make_dir_entry("dir", path_component) new_dir = self._make_dir_entry("dir", path_component)
current_dir.contents[path_component] = new_dir current_dir.contents[path_component] = new_dir
current_dir = new_dir current_dir = new_dir
else:
current_dir = dir_item
parent_dir = current_dir
else:
parent_dir = self._get_dir_entry(dirpath)
if parent_dir is None:
raise FSError("NO_DIR", "Could not make dir, as parent dir does not exist: %(path)s", dirname )
dir_item = parent_dir.contents.get(dirname, None)
if dir_item is not None:
if dir_item.isdir():
if not allow_recreate:
raise FSError("CANNOT_RECREATE_DIR", "Can not create a directory that already exists (try allow_recreate=True): %(path)s", dirname)
else:
raise FSError("CANNOT_CREATE_DIR", "Can not create a directory, because path references a file: %(path)s", dirname)
if dir_item is None:
parent_dir.contents[dirname] = self._make_dir_entry("dir", dirname)
return self return self
def listdir(self, path="/", wildcard=None, full=False, absolute=False, hidden=False, dirs_only=False, files_only=False):
dir_entry = self._get_dir_entry(path)
paths = dir_entry.contents.keys()
return self._listdir_helper(path, paths, wildcard, full, absolute, hidden, dirs_only, files_only)
def ishidden(self, pathname):
return False
if __name__ == "__main__": if __name__ == "__main__":
mem_fs = MemoryFS() mem_fs = MemoryFS()
mem_fs.mkdir('test') mem_fs.mkdir('test/test2', recursive=True)
mem_fs.mkdir('test/A', recursive=True)
mem_fs.mkdir('test/A/B', recursive=True)
#print mem_fs.listdir('test')
#print mem_fs.isdir("test/test2")
#print mem_fs.root
print_fs(mem_fs)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment