Commit 8d4f5e41 by rfkelly0

adding latest code for TahoeFS

parent 1728ab6b
...@@ -34,8 +34,9 @@ ...@@ -34,8 +34,9 @@
0.4: 0.4:
* New FS implementations (under fs.contrib): * New FS implementations (under fs.contrib):
* BigFS: read contents of a BIG file (C&C game file format) * BigFS: read contents of a BIG file (C&C game file format)
* DAVFS: access remote files stored on a WebDAV server * DAVFS: access remote files stored on a WebDAV server
* TahoeFS: access files stored in a Tahoe-LAFS grid
* New fs.expose implementations: * New fs.expose implementations:
* dokan: mount an FS object as a drive using Dokan (win32-only) * dokan: mount an FS object as a drive using Dokan (win32-only)
* Modified listdir and walk methods to accept callables as well as strings * Modified listdir and walk methods to accept callables as well as strings
...@@ -52,7 +53,7 @@ ...@@ -52,7 +53,7 @@
* watch_win32: don't create immortal reference cycles. * watch_win32: don't create immortal reference cycles.
* MountFS: added support for mounting at the root directory, and for * MountFS: added support for mounting at the root directory, and for
mounting over an existing mount. mounting over an existing mount.
* Added 'getpathurl' and 'haspathurl' methods * Added 'getpathurl' and 'haspathurl' methods.
* Added utils.isdir(fs,path,info) and utils.isfile(fs,path,info); these * Added utils.isdir(fs,path,info) and utils.isfile(fs,path,info); these
can often determine whether a path is a file or directory by inspecting can often determine whether a path is a file or directory by inspecting
the info dict and avoid an additional query to the filesystem. the info dict and avoid an additional query to the filesystem.
......
'''
Example (it will use publicly available, but slow-as-hell Tahoe-LAFS cloud):
from fs.tahoefs import TahoeFS, Connection
dircap = TahoeFS.createdircap(webapi='http://pubgrid.tahoe-lafs.org')
print "Your dircap (unique key to your storage directory) is", dircap
print "Keep it safe!"
fs = TahoeFS(dircap, autorun=False, timeout=300, webapi='http://pubgrid.tahoe-lafs.org')
f = fs.open("foo.txt", "a")
f.write('bar!')
f.close()
print "Now visit %s and enjoy :-)" % fs.getpathurl('foo.txt')
When any problem occurred, you can turn on internal debugging messages:
import logging
l = logging.getLogger()
l.setLevel(logging.DEBUG)
l.addHandler(logging.StreamHandler(sys.stdout))
... your Python code using TahoeFS ...
TODO:
x unicode support
x try network errors / bad happiness
x colon hack (all occurences of ':' in filenames transparently convert to __colon__)
x logging / cleaning sources from print()
x exceptions
x tests
x create ticket and send initial code
x sanitize all path types (., /)
x rewrite listdir, add listdirinfo
x support for extra large file uploads (poster module)
x Possibility to block write until upload done (Tahoe mailing list)
x Report something sane when Tahoe crashed/unavailable
x solve failed unit tests (makedir_winner, ...)
filetimes
docs & author
python3 support
python 2.3 support
remove creating blank files (depends on FileUploadManager)
TODO (Not TahoeFS specific tasks):
x DebugFS
x RemoteFileBuffer on the fly buffering support
x RemoteFileBuffer unit tests
x RemoteFileBuffer submit to trunk
colon hack -> move outside tahoe, should be in windows-specific FS (maybe in Dokan?)
autorun hack -> move outside tahoe, -||-
Implement FileUploadManager + faking isfile/exists of just processing file
pyfilesystem docs is outdated (rename, movedir, ...)
'''
import logging
from logging import DEBUG, INFO, ERROR, CRITICAL
import fs.errors as errors
from fs.path import abspath, relpath, normpath, dirname, pathjoin
from fs import FS, NullFile, _thread_synchronize_default, SEEK_END
from fs.remote import CacheFS, _cached_method, RemoteFileBuffer
from fs.base import fnmatch
from util import TahoeUtil
from connection import Connection
#from .debugfs import DebugFS
logger = logging.getLogger('fs.tahoefs')
def _fix_path(func):
def wrapper(self, *args, **kwds):
if len(args):
args = list(args)
args[0] = abspath(normpath(args[0]))
return func(self, *args, **kwds)
return wrapper
class TahoeFS(CacheFS):
def __init__(self, dircap, timeout=60, autorun=True, largefilesize=10*1024*1024, webapi='http://127.0.0.1:3456'):
'''
Creates instance of TahoeFS.
dircap - special hash allowing user to work with TahoeLAFS directory.
timeout - how long should underlying CacheFS keep information about files
before asking TahoeLAFS node again.
autorun - Allow listing autorun files? Can be very dangerous on Windows!.
This is temporary hack, as it should be part of Windows-specific middleware,
not Tahoe itself.
largefilesize - Create placeholder file for files larger than this tresholf.
Uploading and processing of large files can last extremely long (many hours),
so placing this placeholder can help you to remember that upload is processing.
Setting this to None will skip creating placeholder files for any uploads.
'''
fs = _TahoeFS(dircap, autorun=autorun, largefilesize=largefilesize, webapi=webapi)
super(TahoeFS, self).__init__(fs, timeout)
def __str__(self):
return "<TahoeFS: %s>" % self.dircap
@classmethod
def createdircap(cls, webapi='http://127.0.0.1:3456'):
return TahoeUtil(webapi).createdircap()
@_fix_path
def open(self, path, mode='r', **kwargs):
self.wrapped_fs._log(INFO, 'Opening file %s in mode %s' % (path, mode))
newfile = False
if not self.exists(path):
if 'w' in mode or 'a' in mode:
newfile = True
else:
self.wrapped_fs._log(DEBUG, "File %s not found while opening for reads" % path)
raise errors.ResourceNotFoundError(path)
elif self.isdir(path):
self.wrapped_fs._log(DEBUG, "Path %s is directory, not a file" % path)
raise errors.ResourceInvalidError(path)
if 'w' in mode:
newfile = True
if newfile:
self.wrapped_fs._log(DEBUG, 'Creating empty file %s' % path)
if self.wrapped_fs.readonly:
raise errors.UnsupportedError('read only filesystem')
self.setcontents(path, '')
handler = NullFile()
else:
self.wrapped_fs._log(DEBUG, 'Opening existing file %s for reading' % path)
handler = self.wrapped_fs._get_file_handler(path)
return RemoteFileBuffer(self, path, mode, handler,
write_on_flush=False)
@_fix_path
def desc(self, path):
try:
return self.getinfo(path)
except:
return ''
@_fix_path
def exists(self, path):
try:
self.getinfo(path)
self.wrapped_fs._log(DEBUG, "Path %s exists" % path)
return True
except errors.ResourceNotFoundError:
self.wrapped_fs._log(DEBUG, "Path %s does not exists" % path)
return False
except errors.ResourceInvalidError:
self.wrapped_fs._log(DEBUG, "Path %s does not exists, probably misspelled URI" % path)
return False
@_fix_path
def getsize(self, path):
try:
size = self.getinfo(path)['size']
self.wrapped_fs._log(DEBUG, "Size of %s is %d" % (path, size))
return size
except errors.ResourceNotFoundError:
return 0
@_fix_path
def isfile(self, path):
try:
isfile = (self.getinfo(path)['type'] == 'filenode')
except errors.ResourceNotFoundError:
#isfile = not path.endswith('/')
isfile = False
self.wrapped_fs._log(DEBUG, "Path %s is file: %d" % (path, isfile))
return isfile
@_fix_path
def isdir(self, path):
try:
isdir = (self.getinfo(path)['type'] == 'dirnode')
except errors.ResourceNotFoundError:
isdir = False
self.wrapped_fs._log(DEBUG, "Path %s is directory: %d" % (path, isdir))
return isdir
@_fix_path
@_cached_method
def listdirinfo(self, path="/", wildcard=None, full=False, absolute=False,
dirs_only=False, files_only=False):
self.wrapped_fs._log(DEBUG, "Listing directory (listdirinfo) %s" % path)
_fixpath = self.wrapped_fs._fixpath
_path = _fixpath(path)
if dirs_only and files_only:
raise errors.ValueError("dirs_only and files_only can not both be True")
result = []
for item in self.wrapped_fs.tahoeutil.list(self.dircap, _path):
if dirs_only and item['type'] == 'filenode':
continue
elif files_only and item['type'] == 'dirnode':
continue
if wildcard is not None and \
not fnmatch.fnmatch(item['name'], wildcard):
continue
if full:
item_path = relpath(pathjoin(_path, item['name']))
elif absolute:
item_path = abspath(pathjoin(_path, item['name']))
else:
item_path = item['name']
cache_name = self.wrapped_fs._fixpath(u"%s/%s" % \
(path, item['name']))
self._cache_set(cache_name, 'getinfo', (), {}, (True, item))
result.append((item_path, item))
return result
def listdir(self, *args, **kwargs):
return [ item[0] for item in self.listdirinfo(*args, **kwargs) ]
@_fix_path
def remove(self, path):
self.wrapped_fs._log(INFO, 'Removing file %s' % path)
if self.wrapped_fs.readonly:
raise errors.UnsupportedError('read only filesystem')
if not self.isfile(path):
if not self.isdir(path):
raise errors.ResourceNotFoundError(path)
raise errors.ResourceInvalidError(path)
try:
self.wrapped_fs.tahoeutil.unlink(self.dircap, path)
except Exception, e:
raise errors.ResourceInvalidError(path)
finally:
self._uncache(path, removed=True)
@_fix_path
def removedir(self, path, recursive=False, force=False):
self.wrapped_fs._log(INFO, "Removing directory %s" % path)
if self.wrapped_fs.readonly:
raise errors.UnsupportedError('read only filesystem')
if not self.isdir(path):
if not self.isfile(path):
raise errors.ResourceNotFoundError(path)
raise errors.ResourceInvalidError(path)
if not force and self.listdir(path):
raise errors.DirectoryNotEmptyError(path)
try:
self.wrapped_fs.tahoeutil.unlink(self.dircap, path)
finally:
self._uncache(path, removed=True)
if recursive and path != '/':
try:
self.removedir(dirname(path), recursive=True)
except errors.DirectoryNotEmptyError:
pass
@_fix_path
def makedir(self, path, recursive=False, allow_recreate=False):
self.wrapped_fs._log(INFO, "Creating directory %s" % path)
if self.wrapped_fs.readonly:
raise errors.UnsupportedError('read only filesystem')
if self.exists(path):
if not self.isdir(path):
raise errors.ResourceInvalidError(path)
if not allow_recreate:
raise errors.DestinationExistsError(path)
if not recursive and not self.exists(dirname(path)):
raise errors.ParentDirectoryMissingError(path)
try:
self.wrapped_fs.tahoeutil.mkdir(self.dircap, path)
finally:
self._uncache(path,added=True)
def movedir(self, src, dst, overwrite=False):
self.move(src, dst, overwrite)
def move(self, src, dst, overwrite=False):
# FIXME: overwrite not documented
self.wrapped_fs._log(INFO, "Moving file from %s to %s" % (src, dst))
if self.wrapped_fs.readonly:
raise errors.UnsupportedError('read only filesystem')
src = self.wrapped_fs._fixpath(src)
dst = self.wrapped_fs._fixpath(dst)
if not self.exists(dirname(dst)):
# FIXME: Why raise exception when it is legal construct?
raise errors.ParentDirectoryMissingError(dst)
if not overwrite and self.exists(dst):
raise errors.DestinationExistsError(dst)
try:
self.wrapped_fs.tahoeutil.move(self.dircap, src, dst)
finally:
self._uncache(src,removed=True)
self._uncache(dst,added=True)
@_fix_path
def setcontents(self, path, file):
try:
self.wrapped_fs.setcontents(path, file)
finally:
self._uncache(path, added=True)
def rename(self, src, dst):
self.move(src, dst)
def copy(self, src, dst, overwrite=False, chunk_size=16384):
if self.wrapped_fs.readonly:
raise errors.UnsupportedError('read only filesystem')
# FIXME: Workaround because isfile() not exists on _TahoeFS
FS.copy(self, src, dst, overwrite, chunk_size)
def copydir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384):
if self.wrapped_fs.readonly:
raise errors.UnsupportedError('read only filesystem')
# FIXME: Workaround because isfile() not exists on _TahoeFS
FS.copydir(self, src, dst, overwrite, ignore_errors, chunk_size)
class _TahoeFS(FS):
def __init__(self, dircap, autorun, largefilesize, webapi):
self.dircap = dircap if not dircap.endswith('/') else dircap[:-1]
self.autorun = autorun
self.largefilesize = largefilesize
self.connection = Connection(webapi)
self.tahoeutil = TahoeUtil(webapi)
self.readonly = dircap.startswith('URI:DIR2-RO')
super(_TahoeFS, self).__init__(thread_synchronize=_thread_synchronize_default)
def _log(self, level, message):
if not logger.isEnabledFor(level): return
logger.log(level, u'(%d) %s' % (id(self),
unicode(message).encode('ASCII', 'replace')))
def _fixpath(self, path):
return abspath(normpath(path))
def _get_file_handler(self, path):
if not self.autorun:
if path.lower().startswith('/autorun.'):
self._log(DEBUG, 'Access to file %s denied' % path)
return NullFile()
return self.getrange(path, 0)
@_fix_path
def getpathurl(self, path, allow_none=False, webapi=None):
'''
Retrieve URL where the file/directory is stored
'''
if webapi == None:
webapi = self.connection.webapi
self._log(DEBUG, "Retrieving URL for %s over %s" % (path, webapi))
path = self.tahoeutil.fixwinpath(path, False)
return u"%s/uri/%s%s" % (webapi, self.dircap, path)
@_fix_path
def getrange(self, path, offset, length=None):
path = self.tahoeutil.fixwinpath(path, False)
return self.connection.get(u'/uri/%s%s' % (self.dircap, path),
offset=offset, length=length)
@_fix_path
def setcontents(self, path, file):
self._log(INFO, 'Uploading file %s' % path)
path = self.tahoeutil.fixwinpath(path, False)
size=None
if self.readonly:
raise errors.UnsupportedError('read only filesystem')
# Workaround for large files:
# First create zero file placeholder, then
# upload final content.
if self.largefilesize != None and getattr(file, 'read', None):
# As 'file' can be also a string, need to check,
# if 'file' looks like duck. Sorry, file.
file.seek(0, SEEK_END)
size = file.tell()
file.seek(0)
if size > self.largefilesize:
self.connection.put(u'/uri/%s%s' % (self.dircap, path),
"PyFilesystem.TahoeFS: Upload started, final size %d" % size)
self.connection.put(u'/uri/%s%s' % (self.dircap, path), file, size=size)
@_fix_path
def getinfo(self, path):
self._log(INFO, 'Reading meta for %s' % path)
info = self.tahoeutil.info(self.dircap, path)
#import datetime
#info['created_time'] = datetime.datetime.now()
#info['modified_time'] = datetime.datetime.now()
#info['accessed_time'] = datetime.datetime.now()
return info
import platform
import logging
import fs.errors as errors
from fs import SEEK_END
python3 = int(platform.python_version_tuple()[0]) > 2
if python3:
from urllib.parse import urlencode, pathname2url, quote
from urllib.request import Request, urlopen
else:
from urllib import urlencode, pathname2url
from urllib2 import Request, urlopen, quote
class PutRequest(Request):
def __init__(self, *args, **kwargs):
self.get_method = lambda: u'PUT'
Request.__init__(self, *args, **kwargs)
class DeleteRequest(Request):
def __init__(self, *args, **kwargs):
self.get_method = lambda: u'DELETE'
Request.__init__(self, *args, **kwargs)
class Connection:
def __init__(self, webapi):
self.webapi = webapi
self.headers = {'Accept': 'text/plain'}
def _get_headers(self, f, size=None):
'''
Retrieve length of string or file object and prepare HTTP headers.
'''
if isinstance(f, basestring):
# Just set up content length
size = len(f)
elif getattr(f, 'read', None):
if size == None:
# When size is already known, skip this
f.seek(0, SEEK_END)
size = f.tell()
f.seek(0)
else:
raise errors.UnsupportedError("Cannot handle type %s" % type(f))
headers = {'Content-Length': size}
headers.update(self.headers)
return headers
def _urlencode(self, data):
_data = {}
for k, v in data.items():
_data[k.encode('utf-8')] = v.encode('utf-8')
return urlencode(_data)
def _quotepath(self, path, params={}):
q = quote(path.encode('utf-8'), safe='/')
if params:
return u"%s?%s" % (q, self._urlencode(params))
return q
def _urlopen(self, req):
try:
#print req.get_full_url()
return urlopen(req)
except Exception, e:
if not getattr(e, 'getcode', None):
raise errors.RemoteConnectionError(str(e))
code = e.getcode()
if code == 500:
# Probably out of space or unhappiness error
raise errors.StorageSpaceError(e.fp.read())
elif code in (400, 404, 410):
# Standard not found
raise errors.ResourceNotFoundError(e.fp.read())
raise errors.ResourceInvalidError(e.fp.read())
def post(self, path, data={}, params={}):
data = self._urlencode(data)
path = self._quotepath(path, params)
req = Request(''.join([self.webapi, path]), data, headers=self.headers)
return self._urlopen(req)
def get(self, path, data={}, offset=None, length=None):
data = self._urlencode(data)
path = self._quotepath(path)
if data:
path = u'?'.join([path, data])
headers = {}
headers.update(self.headers)
if offset:
if length:
headers['Range'] = 'bytes=%d-%d' % \
(int(offset), int(offset+length))
else:
headers['Range'] = 'bytes=%d-' % int(offset)
req = Request(''.join([self.webapi, path]), headers=headers)
return self._urlopen(req)
def put(self, path, data, size=None, params={}):
path = self._quotepath(path, params)
headers = self._get_headers(data, size=size)
req = PutRequest(''.join([self.webapi, path]), data, headers=headers)
return self._urlopen(req)
def delete(self, path, data={}):
path = self._quotepath(path)
req = DeleteRequest(''.join([self.webapi, path]), data, headers=self.headers)
return self._urlopen(req)
#!/usr/bin/python
"""
Test the TahoeFS
@author: Marek Palatinus <marek@palatinus.cz>
"""
import sys
import logging
import unittest
from fs.base import FS
import fs.errors as errors
from fs.tests import FSTestCases, ThreadingTestCases
from fs.contrib.tahoefs import TahoeFS, Connection
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger('fs.tahoefs').addHandler(logging.StreamHandler(sys.stdout))
WEBAPI = 'http://pubgrid.tahoe-lafs.org'
class TestTahoeFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
def setUp(self):
self.dircap = TahoeFS.createdircap(WEBAPI)
self.fs = TahoeFS(self.dircap, timeout=0, webapi=WEBAPI)
def tearDown(self):
self.fs.close()
def test_dircap(self):
# Is dircap in correct format?
self.assert_(self.dircap.startswith('URI:DIR2:') and len(self.dircap) > 50)
def test_concurrent_copydir(self):
# makedir() on TahoeFS is currently not atomic
pass
def test_makedir_winner(self):
# makedir() on TahoeFS is currently not atomic
pass
def test_big_file(self):
pass
if __name__ == '__main__':
unittest.main()
'''
Created on 25.9.2010
@author: marekp
'''
import sys
import platform
import stat as statinfo
import fs.errors as errors
from fs.path import pathsplit
try:
# For non-CPython or older CPython versions.
# Simplejson also comes with C speedup module which
# is not in standard CPython >=2.6 library.
import simplejson as json
except ImportError:
import json
from .connection import Connection
python3 = int(platform.python_version_tuple()[0]) > 2
if python3:
from urllib.error import HTTPError
else:
from urllib2 import HTTPError
class TahoeUtil:
def __init__(self, webapi):
self.connection = Connection(webapi)
def createdircap(self):
return self.connection.post(u'/uri', params={u't': u'mkdir'}).read()
def unlink(self, dircap, path=None):
path = self.fixwinpath(path, False)
self.connection.delete(u'/uri/%s%s' % (dircap, path))
def info(self, dircap, path):
path = self.fixwinpath(path, False)
meta = json.load(self.connection.get(u'/uri/%s%s' % (dircap, path), {u't': u'json'}))
return self._info(path, meta)
def fixwinpath(self, path, direction=True):
'''
No, Tahoe really does not support file streams...
This is ugly hack, because it is not Tahoe-specific.
Should be move to middleware if will be any.
'''
if platform.system() != 'Windows':
return path
if direction and ':' in path:
path = path.replace(':', '__colon__')
elif not direction and '__colon__' in path:
path = path.replace('__colon__', ':')
return path
def _info(self, path, data):
if isinstance(data, list):
type = data[0]
data = data[1]
elif isinstance(data, dict):
type = data['type']
else:
raise errors.ResourceInvalidError('Metadata in unknown format!')
if type == 'unknown':
raise errors.ResourceNotFoundError(path)
info = {'name': unicode(self.fixwinpath(path, True)),
'type': type,
'size': data.get('size', 0),
'ctime': None,
'uri': data.get('rw_uri', data.get('ro_uri'))}
if 'metadata' in data:
info['ctime'] = data['metadata'].get('ctime')
if info['type'] == 'dirnode':
info['st_mode'] = 0777 | statinfo.S_IFDIR
else:
info['st_mode'] = 0644
return info
def list(self, dircap, path=None):
path = self.fixwinpath(path, False)
data = json.load(self.connection.get(u'/uri/%s%s' % (dircap, path), {u't': u'json'}))
if len(data) < 2 or data[0] != 'dirnode':
raise errors.ResourceInvalidError('Metadata in unknown format!')
data = data[1]['children']
for i in data.keys():
x = self._info(i, data[i])
yield x
def mkdir(self, dircap, path):
path = self.fixwinpath(path, False)
path = pathsplit(path)
self.connection.post(u"/uri/%s%s" % (dircap, path[0]), data={u't': u'mkdir', u'name': path[1]})
def move(self, dircap, src, dst):
if src == '/' or dst == '/':
raise errors.UnsupportedError("Too dangerous operation, aborting")
src = self.fixwinpath(src, False)
dst = self.fixwinpath(dst, False)
src_tuple = pathsplit(src)
dst_tuple = pathsplit(dst)
if src_tuple[0] == dst_tuple[0]:
# Move inside one directory
self.connection.post(u"/uri/%s%s" % (dircap, src_tuple[0]), data={u't': u'rename',
u'from_name': src_tuple[1], u'to_name': dst_tuple[1]})
return
# Move to different directory. Firstly create link on dst, then remove from src
try:
self.info(dircap, dst)
except errors.ResourceNotFoundError:
pass
else:
self.unlink(dircap, dst)
uri = self.info(dircap, src)['uri']
self.connection.put(u"/uri/%s%s" % (dircap, dst), data=uri, params={u't': u'uri'})
if uri != self.info(dircap, dst)['uri']:
raise errors.OperationFailedError('Move failed')
self.unlink(dircap, src)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment