Commit 9fe3ecd4 by rfkelly0

RemoteFileBuffer: implement on-demand loading of remote data.

Closes Issue #33, thanks to Marek Palatinus for the implementation.
parent 814dcd50
......@@ -21,10 +21,10 @@ FS subclasses interfacing with a remote filesystem. These include:
"""
import os
import sys
import time
import copy
from StringIO import StringIO
from errno import EINVAL
from fs.base import FS, threading
from fs.wrapfs import WrapFS, wrap_fs_methods
......@@ -32,6 +32,7 @@ from fs.wrapfs.lazyfs import LazyFS
from fs.path import *
from fs.errors import *
from fs.local_functools import wraps
from fs import SEEK_SET, SEEK_CUR, SEEK_END
try:
from tempfile import SpooledTemporaryFile
......@@ -40,7 +41,6 @@ except ImportError:
class SpooledTemporaryFile(NamedTemporaryFile):
def __init__(self,max_size=0,*args,**kwds):
NamedTemporaryFile.__init__(self,*args,**kwds)
class RemoteFileBuffer(object):
......@@ -64,14 +64,13 @@ class RemoteFileBuffer(object):
def setcontents(self,path,file):
self._put_remote_file(path,file)
The current implementation reads the entire contents of the file into
the buffer before returning. Future implementations may pull data into
the buffer on demand.
The contents of the remote file are read into the buffer on-demand.
"""
max_size_in_memory = 1024 * 8
def __init__(self,fs,path,mode,rfile=None):
def __init__(self,fs,path,mode,rfile=None,
write_on_flush=True):
"""RemoteFileBuffer constructor.
The owning filesystem, path and mode must be provided. If the
......@@ -83,23 +82,36 @@ class RemoteFileBuffer(object):
self.path = path
self.mode = mode
self.closed = False
self._flushed = False
self.write_on_flush = write_on_flush
self._changed = False
self._readlen = 0 # How many bytes already loaded from rfile
self._rfile = None # Reference to remote file object
self._eof = False # Reached end of rfile?
if getattr(fs,"_lock",None) is not None:
self._lock = fs._lock.__class__()
else:
self._lock = threading.RLock()
if "r" in mode or "+" in mode or "a" in mode:
if rfile is not None:
if hasattr(rfile,"read"):
data = rfile.read(1024*256)
while data:
self.file.write(data)
data = rfile.read(1024*256)
else:
self.file.write(str(rfile))
if "a" not in mode:
self.file.seek(0)
if "r" in mode or "+" in mode or "a" in mode:
if rfile is None:
# File was just created, force to write anything
self._changed = True
self._eof = True
if not hasattr(rfile, "read"):
rfile = StringIO(unicode(rfile))
self._rfile = rfile
# FIXME: What if mode with position on eof?
if "a" in mode:
# Not good enough...
self.seek(0, SEEK_END)
else:
# Do not use remote file object
self._eof = True
self._rfile = None
def __del__(self):
# Don't try to close a partially-constructed file
if "_lock" in self.__dict__:
......@@ -107,7 +119,7 @@ class RemoteFileBuffer(object):
self.close()
def __getattr__(self,name):
if name in ("file","_lock","fs","path","mode","closed","_flushed"):
if name in ("file","_lock","fs","path","mode","closed"):
raise AttributeError(name)
file = self.__dict__['file']
a = getattr(file, name)
......@@ -118,7 +130,14 @@ class RemoteFileBuffer(object):
self._lock.acquire()
try:
if "write" in name:
self._flushed = False
self._changed = True
# Do we need to discard into from the buffer?
toread = len(args[0]) - (self._readlen - self.file.tell())
if toread > 0:
if not self._eof:
self._fillbuffer(toread)
else:
self._readlen += toread
return a(*args,**kwds)
finally:
self._lock.release()
......@@ -133,23 +152,109 @@ class RemoteFileBuffer(object):
self.close()
return False
def __iter__(self):
return iter(self.file)
def __iter__(self):
# TODO: implement this with on-demand loading.
self._fillbuffer()
return self.file.__iter__()
def _read(self, length=None):
"""Read data from the remote file into the local buffer."""
chunklen = 1024 * 256
bytes_read = 0
while True:
toread = chunklen
if length is not None and length - bytes_read < chunklen:
toread = length - bytes_read
if not toread:
break
data = self._rfile.read(toread)
datalen = len(data)
if not datalen:
self._eof = True
break
bytes_read += datalen
self.file.write(data)
if datalen < toread:
# We reached EOF,
# no more reads needed
self._eof = True
break
self._readlen += bytes_read
def _fillbuffer(self, length=None):
"""Fill the local buffer, leaving file position unchanged.
def seek(self,offset,whence=os.SEEK_SET):
This method is used for on-demand loading of data from the remote file
into the buffer. It reads 'length' bytes from rfile and writes them
into the buffer, seeking back to the original file position.
"""
curpos = self.file.tell()
if length == None:
if not self._eof:
# Read all data and we didn't reached EOF
# Merge endpos - tell + bytes from rfile
self.file.seek(0, SEEK_END)
self._read()
self._eof = True
self.file.seek(curpos)
elif not self._eof:
if curpos + length > self._readlen:
# Load endpos - tell() + len bytes from rfile
toload = length - (self._readlen - curpos)
self.file.seek(0, SEEK_END)
self._read(toload)
self.file.seek(curpos)
def read(self, length=None):
if length is None:
self._fillbuffer()
return self.file.read()
else:
toread = self.file.tell() + length - self._readlen
if toread > 0:
self._fillbuffer(toread)
return self.file.read(length)
def seek(self,offset,whence=SEEK_SET):
if isinstance(self.file,SpooledTemporaryFile):
# SpooledTemporaryFile.seek doesn't roll to disk if seeking
# beyond the max in-memory size.
if whence == os.SEEK_SET:
if whence == SEEK_SET:
if offset > self.file._max_size:
self.file.rollover()
elif whence == os.SEEK_CUR:
elif whence == SEEK_CUR:
if offset + self.file.tell() > self.file._max_size:
self.file.rollover()
else:
if offset > 0:
self.file.rollover()
self.file.seek(offset,whence)
if not self._eof:
# Count absolute position of seeking
if whence == SEEK_SET:
abspos = offset
elif whence == SEEK_CUR:
abspos = offset + self.file.tell()
elif whence == SEEK_END:
abspos = None
else:
raise IOError(EINVAL, 'Invalid whence')
if abspos != None:
toread = abspos - self._readlen
if toread > 0:
self.file.seek(self._readlen)
self._fillbuffer(toread)
else:
self.file.seek(self._readlen)
self._fillbuffer()
self.file.seek(offset, whence)
def truncate(self,size=None):
self._lock.acquire()
......@@ -162,6 +267,19 @@ class RemoteFileBuffer(object):
self.file._file.truncate(size)
else:
self.file.truncate(size)
self._changed = True
if not self._eof and self._readlen < size:
# Read the rest of file
self._fillbuffer(size - self._readlen)
# Lock rfile
self._eof = True
elif self._readlen >= size:
# Crop rfile metadata
self._readlen = size if size != None else 0
# Lock rfile
self._eof = True
self.flush()
finally:
self._lock.release()
......@@ -170,27 +288,33 @@ class RemoteFileBuffer(object):
self._lock.acquire()
try:
self.file.flush()
if "w" in self.mode or "a" in self.mode or "+" in self.mode:
if not self._flushed:
pos = self.file.tell()
self.file.seek(0)
self.fs.setcontents(self.path,self.file)
self.file.seek(pos)
self._flushed = True
if self.write_on_flush:
self._setcontents()
finally:
self._lock.release()
def _setcontents(self):
if not self._changed:
# Nothing changed, no need to write data back
return
# If not all data loaded, load until eof
if not self._eof:
self._fillbuffer()
if "w" in self.mode or "a" in self.mode or "+" in self.mode:
pos = self.file.tell()
self.file.seek(0)
self.fs.setcontents(self.path, self.file)
self.file.seek(pos)
def close(self):
self._lock.acquire()
try:
if not self.closed:
self.closed = True
if "w" in self.mode or "a" in self.mode or "+" in self.mode:
if not self._flushed:
self.file.seek(0)
self.file.seek(0)
self.fs.setcontents(self.path,self.file)
self._setcontents()
self.file.close()
self.closed = True
finally:
self._lock.release()
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
fs.tests.test_remote: testcases for FS remote support utilities
......@@ -14,12 +16,182 @@ import sys
from fs.remote import *
from fs import SEEK_END
from fs.wrapfs import WrapFS, wrap_fs_methods
from fs.tempfs import TempFS
from fs.path import *
from fs.local_functools import wraps
class RemoteTempFS(TempFS):
'''
Simple filesystem implementing setfilecontents
for RemoteFileBuffer tests
'''
def open(self, path, mode='rb', write_on_flush=True):
if 'a' in mode or 'r' in mode or '+' in mode:
f = super(RemoteTempFS, self).open(path, 'rb')
else:
f = None
return RemoteFileBuffer(self, path, mode, f,
write_on_flush=write_on_flush)
def setcontents(self, path, content):
f = super(RemoteTempFS, self).open(path, 'wb')
if getattr(content, 'read', False):
f.write(content.read())
else:
f.write(content)
f.close()
class TestRemoteFileBuffer(unittest.TestCase, FSTestCases, ThreadingTestCases):
class FakeException(Exception): pass
def setUp(self):
self.fs = RemoteTempFS()
self.original_setcontents = self.fs.setcontents
def tearDown(self):
self.fs.close()
def fake_setcontents(self, path, content):
''' Fake replacement for RemoteTempFS setcontents() '''
raise self.FakeException("setcontents should not be called here!")
def fakeOn(self):
'''
Turn on fake_setcontents(). When setcontents on RemoteTempFS
is called, FakeException is raised and nothing is stored.
'''
self.original_setcontents = self.fs.setcontents
self.fs.setcontents = self.fake_setcontents
def fakeOff(self):
''' Switch off fake_setcontents(). '''
self.fs.setcontents = self.original_setcontents
def test_ondemand(self):
'''
Tests on-demand loading of remote content in RemoteFileBuffer
'''
contents = "Tristatricettri stribrnych strikacek strikalo" + \
"pres tristatricettri stribrnych strech."
f = self.fs.open('test.txt', 'wb')
f.write(contents)
f.close()
# During following tests, no setcontents() should be called.
self.fakeOn()
f = self.fs.open('test.txt', 'rb')
self.assertEquals(f.read(10), contents[:10])
f.file.seek(0, SEEK_END)
self.assertEquals(f._rfile.tell(), 10)
f.seek(20)
self.assertEquals(f.tell(), 20)
self.assertEquals(f._rfile.tell(), 20)
f.seek(0, SEEK_END)
self.assertEquals(f._rfile.tell(), len(contents))
f.close()
f = self.fs.open('test.txt', 'ab')
self.assertEquals(f.tell(), len(contents))
f.close()
self.fakeOff()
# Writing over the rfile edge
f = self.fs.open('test.txt', 'wb+')
self.assertEquals(f.tell(), 0)
f.seek(len(contents) - 5)
# Last 5 characters not loaded from remote file
self.assertEquals(f._rfile.tell(), len(contents) - 5)
# Confirm that last 5 characters are still in rfile buffer
self.assertEquals(f._rfile.read(), contents[-5:])
# Rollback position 5 characters before eof
f._rfile.seek(len(contents[:-5]))
# Write 10 new characters (will make contents longer for 5 chars)
f.write(u'1234567890')
# We are on the end of file (and buffer not serve anything anymore)
self.assertEquals(f.read(), '')
f.close()
self.fakeOn()
# Check if we wrote everything OK from
# previous writing over the remote buffer edge
f = self.fs.open('test.txt', 'rb')
self.assertEquals(f.read(), contents[:-5] + u'1234567890')
f.close()
def test_writeonflush(self):
'''
Test 'write_on_flush' switch of RemoteFileBuffer.
When True, flush() should call setcontents and store
to remote destination.
When False, setcontents should be called only on close().
'''
self.fakeOn()
f = self.fs.open('test.txt', 'wb', write_on_flush=True)
f.write('Sample text')
self.assertRaises(self.FakeException, f.flush)
f.write('Second sample text')
self.assertRaises(self.FakeException, f.close)
f = self.fs.open('test.txt', 'wb', write_on_flush=False)
f.write('Sample text')
# FakeException is not raised, because setcontents is not called
f.flush()
f.write('Second sample text')
self.assertRaises(self.FakeException, f.close)
def test_flush_and_continue(self):
'''
This tests if partially loaded remote buffer can be flushed
back to remote destination and opened file is still
in good condition.
'''
contents = "Zlutoucky kun upel dabelske ody."
contents2 = 'Ententyky dva spaliky cert vyletel z elektriky'
f = self.fs.open('test.txt', 'wb')
f.write(contents)
f.close()
f = self.fs.open('test.txt', 'rb+')
# Check if we read just 10 characters
self.assertEquals(f.read(10), contents[:10])
self.assertEquals(f._rfile.tell(), 10)
# Write garbage to file to mark it as _changed
f.write('x')
# This should read the rest of file and store file back to again.
f.flush()
f.seek(0)
# Try if we have unocrrupted file locally...
self.assertEquals(f.read(), contents[:10] + 'x' + contents[11:])
f.close()
# And if we have uncorrupted file also on storage
f = self.fs.open('test.txt', 'rb')
self.assertEquals(f.read(), contents[:10] + 'x' + contents[11:])
f.close()
# Now try it again, but write garbage behind edge of remote file
f = self.fs.open('test.txt', 'rb+')
self.assertEquals(f.read(10), contents[:10])
# Write garbage to file to mark it as _changed
f.write(contents2)
f.flush()
f.seek(0)
# Try if we have unocrrupted file locally...
self.assertEquals(f.read(), contents[:10] + contents2)
f.close()
# And if we have uncorrupted file also on storage
f = self.fs.open('test.txt', 'rb')
self.assertEquals(f.read(), contents[:10] + contents2)
f.close()
class TestCacheFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
"""Test simple operation of CacheFS"""
......@@ -129,4 +301,5 @@ class TestConnectionManagerFS_disconnect(TestConnectionManagerFS):
self.fs.close()
sys.setcheckinterval(self._check_interval)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment