Commit 87674ab0 by rfkelly0

fix concurrency bug in fs.expose.fuse

parent 0c7a2b8c
...@@ -138,8 +138,8 @@ class FSOperations(Operations): ...@@ -138,8 +138,8 @@ class FSOperations(Operations):
self._on_init = on_init self._on_init = on_init
self._on_destroy = on_destroy self._on_destroy = on_destroy
self._fhmap = {} self._fhmap = {}
self._fh_lock = threading.Lock() self._fhmap_lock = threading.Lock()
self._fh_next = 1 self._fhmap_next = 1
def _get_file(self,fh): def _get_file(self,fh):
try: try:
...@@ -148,14 +148,14 @@ class FSOperations(Operations): ...@@ -148,14 +148,14 @@ class FSOperations(Operations):
raise FSError("invalid file handle") raise FSError("invalid file handle")
def _reg_file(self,f): def _reg_file(self,f):
self._fh_lock.acquire() self._fhmap_lock.acquire()
try: try:
fh = self._fh_next fh = self._fhmap_next
self._fh_next += 1 self._fhmap_next += 1
self._fhmap[fh] = f self._fhmap[fh] = (f,threading.Lock())
return fh return fh
finally: finally:
self._fh_lock.release() self._fhmap_lock.release()
def init(self,conn): def init(self,conn):
if self._on_init: if self._on_init:
...@@ -181,7 +181,12 @@ class FSOperations(Operations): ...@@ -181,7 +181,12 @@ class FSOperations(Operations):
@handle_fs_errors @handle_fs_errors
def flush(self,path,fh): def flush(self,path,fh):
self._get_file(fh).flush() (file,lock) = self._get_file(fh)
lock.acquire()
try:
file.flush()
finally:
lock.release()
@handle_fs_errors @handle_fs_errors
def getattr(self,path,fh=None): def getattr(self,path,fh=None):
...@@ -227,9 +232,13 @@ class FSOperations(Operations): ...@@ -227,9 +232,13 @@ class FSOperations(Operations):
@handle_fs_errors @handle_fs_errors
def read(self,path,size,offset,fh): def read(self,path,size,offset,fh):
f = self._get_file(fh) (file,lock) = self._get_file(fh)
f.seek(offset) lock.acquire()
return f.read(size) try:
file.seek(offset)
return file.read(size)
finally:
lock.release()
@handle_fs_errors @handle_fs_errors
def readdir(self,path,fh=None): def readdir(self,path,fh=None):
...@@ -253,8 +262,13 @@ class FSOperations(Operations): ...@@ -253,8 +262,13 @@ class FSOperations(Operations):
@handle_fs_errors @handle_fs_errors
def release(self,path,fh): def release(self,path,fh):
self._get_file(fh).close() (file,lock) = self._get_file(fh)
lock.acquire()
try:
file.close()
del self._fhmap[fh] del self._fhmap[fh]
finally:
lock.release()
@handle_fs_errors @handle_fs_errors
def removexattr(self,path,name): def removexattr(self,path,name):
...@@ -299,11 +313,18 @@ class FSOperations(Operations): ...@@ -299,11 +313,18 @@ class FSOperations(Operations):
else: else:
if fh is None: if fh is None:
f = self.fs.open(path,"w+") f = self.fs.open(path,"w+")
else:
f = self._get_file(fh)
if not hasattr(f,"truncate"): if not hasattr(f,"truncate"):
raise UnsupportedError("truncate") raise UnsupportedError("truncate")
f.truncate(length) f.truncate(length)
else:
(file,lock) = self._get_file(fh)
lock.acquire()
try:
if not hasattr(file,"truncate"):
raise UnsupportedError("truncate")
file.truncate(length)
finally:
lock.release()
@handle_fs_errors @handle_fs_errors
def unlink(self, path): def unlink(self, path):
...@@ -315,10 +336,14 @@ class FSOperations(Operations): ...@@ -315,10 +336,14 @@ class FSOperations(Operations):
@handle_fs_errors @handle_fs_errors
def write(self, path, data, offset, fh): def write(self, path, data, offset, fh):
f = self._get_file(fh) (file,lock) = self._get_file(fh)
f.seek(offset) lock.acquire()
f.write(data) try:
file.seek(offset)
file.write(data)
return len(data) return len(data)
finally:
lock.release()
def mount(fs,path,foreground=False,ready_callback=None,unmount_callback=None,**kwds): def mount(fs,path,foreground=False,ready_callback=None,unmount_callback=None,**kwds):
......
...@@ -16,6 +16,7 @@ from fs.base import * ...@@ -16,6 +16,7 @@ from fs.base import *
import unittest import unittest
import os, os.path import os, os.path
import pickle import pickle
import random
import time import time
try: try:
...@@ -461,6 +462,33 @@ class FSTestCases: ...@@ -461,6 +462,33 @@ class FSTestCases:
fs3 = pickle.loads(pickle.dumps(self.fs,-1)) fs3 = pickle.loads(pickle.dumps(self.fs,-1))
self.assert_(fs3.isfile("test1")) self.assert_(fs3.isfile("test1"))
def test_big_file(self):
chunk_size = 1024 * 256
num_chunks = 4
def chunk_stream():
"""Generate predictable-but-randomy binary content."""
r = random.Random(0)
for i in xrange(num_chunks):
yield "".join(chr(r.randint(0,255)) for j in xrange(chunk_size))
for i in xrange(5):
f = self.fs.open("bigfile","wb")
try:
for chunk in chunk_stream():
f.write(chunk)
finally:
f.close()
chunks = chunk_stream()
f = self.fs.open("bigfile","rb")
try:
try:
while True:
if chunks.next() != f.read(chunk_size):
assert False, "bigfile was corrupted"
except StopIteration:
if f.read() != "":
assert False, "bigfile was corrupted"
finally:
f.close()
class ThreadingTestCases: class ThreadingTestCases:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment