Commit 30fc0f27 by rfkelly0

Add testcases for file.truncate(), and fix some failing FS implementations

parent c5e46ddc
......@@ -312,7 +312,7 @@ class FSOperations(Operations):
self.fs.open(path,"w").close()
else:
if fh is None:
f = self.fs.open(path,"w+")
f = self.fs.open(path,"r+")
if not hasattr(f,"truncate"):
raise UnsupportedError("truncate")
f.truncate(length)
......
......@@ -152,8 +152,16 @@ class SFTPServerInterface(paramiko.SFTPServerInterface):
def canonicalize(self, path):
return abspath(normpath(path))
@report_sftp_errors
def chattr(self, path, attr):
return paramiko.SFTP_OP_UNSUPPORTED
# f.truncate() is implemented by setting the size attr.
# Any other attr requests fail out.
if attr._flags:
if attr._flags != attr.FLAG_SIZE:
raise UnsupportedError
with self.fs.open(path,"r+") as f:
f.truncate(attr.st_size)
return paramiko.SFTP_OK
def readlink(self, path):
return paramiko.SFTP_OP_UNSUPPORTED
......
......@@ -142,6 +142,14 @@ class SFTPFS(FS):
# paramiko implements its own buffering and write-back logic,
# so we don't need to use a RemoteFileBuffer here.
f = self.client.open(npath,mode,bufsize)
# Unfortunately it has a broken truncate() method.
# TODO: implement this as a wrapper
old_truncate = f.truncate
def new_truncate(size=None):
if size is None:
size = f.tell()
return old_truncate(size)
f.truncate = new_truncate
return f
@convert_os_errors
......
......@@ -598,6 +598,25 @@ class FSTestCases(object):
f7.close()
self.assertEqual(self.fs.getcontents("a.txt"), all_strings)
def test_truncate(self):
def checkcontents(path, check_contents):
read_contents = self.fs.getcontents(path)
self.assertEqual(read_contents,check_contents)
return read_contents == check_contents
self.fs.setcontents("hello","world")
checkcontents("hello","world")
self.fs.setcontents("hello","hi")
checkcontents("hello","hi")
self.fs.setcontents("hello","1234567890")
checkcontents("hello","1234567890")
with self.fs.open("hello","r+") as f:
f.truncate(7)
checkcontents("hello","1234567")
with self.fs.open("hello","r+") as f:
f.seek(5)
f.truncate()
checkcontents("hello","12345")
def test_with_statement(self):
# This is a little tricky since 'with' is actually new syntax.
# We use eval() to make this method safe for old python versions.
......
......@@ -61,7 +61,7 @@ class LimitSizeFS(WrapFS):
self._file_sizes[path] = 0
return LimitSizeFile(self,path,f,mode,size)
def _ensure_file_size(self, path, size):
def _ensure_file_size(self, path, size, shrink=False):
path = relpath(normpath(path))
with self._size_lock:
if path not in self._file_sizes:
......@@ -73,6 +73,9 @@ class LimitSizeFS(WrapFS):
raise StorageSpaceError("write")
self.cur_size += diff
self._file_sizes[path] = size
elif diff < 0 and shrink:
self.cur_size += diff
self._file_sizes[path] = size
def copy(self, src, dst, **kwds):
FS.copy(self,src,dst,**kwds)
......@@ -141,7 +144,7 @@ class LimitSizeFile(object):
pos = self.file.tell()
if size is None:
size = pos
self.fs._ensure_file_size(self.path,size)
self.fs._ensure_file_size(self.path,size,shrink=True)
self.file.truncate(size)
self.size = size
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment