Commit 0a8a21cf by willmcgugan

Some fixes for Python2.5 compatibility

parent a534d7d3
......@@ -12,7 +12,15 @@ import fs
from fs.base import *
from fs.path import pathsplit
from ftplib import FTP, _GLOBAL_DEFAULT_TIMEOUT, error_perm, error_temp, error_proto, error_reply
from ftplib import FTP, error_perm, error_temp, error_proto, error_reply
try:
from ftplib import _GLOBAL_DEFAULT_TIMEOUT
_FTPLIB_TIMEOUT = True
except ImportError:
_GLOBAL_DEFAULT_TIMEOUT = None
_FTPLIB_TIMEOUT = False
import threading
from time import sleep
import datetime
......@@ -882,7 +890,10 @@ class FTPFS(FS):
def _open_ftp(self):
try:
ftp = FTP()
ftp.connect(self.host, self.port, self.timeout)
if _FTPLIB_TIMEOUT:
ftp.connect(self.host, self.port, self.timeout)
else:
ftp.connect(self.host, self.port)
ftp.login(self.user, self.passwd, self.acct)
except socket_error, e:
raise RemoteConnectionError(str(e), details=e)
......
......@@ -69,7 +69,7 @@ class MemoryFile(object):
__repr__ = __str__
def __unicode__(self):
return unicode(self.__str__())
return u"<MemoryFile in %s %s>" % (self.memory_fs, self.path)
def __del__(self):
if not self.closed:
......
......@@ -572,7 +572,7 @@ class ThreadingTestCases:
__lock = threading.RLock()
def _yield(self):
time.sleep(0.01)
time.sleep(0.001)
def _lock(self):
self.__lock.acquire()
......@@ -589,14 +589,19 @@ class ThreadingTestCases:
return threading.Thread(target=runThread)
def _runThreads(self,*funcs):
errors = []
threads = [self._makeThread(f,errors) for f in funcs]
for t in threads:
t.start()
for t in threads:
t.join()
for (c,e,t) in errors:
raise c,e,t
check_interval = sys.getcheckinterval()
sys.setcheckinterval(1)
try:
errors = []
threads = [self._makeThread(f,errors) for f in funcs]
for t in threads:
t.start()
for t in threads:
t.join()
for (c,e,t) in errors:
raise c,e,t
finally:
sys.setcheckinterval(check_interval)
def test_setcontents(self):
def setcontents(name,contents):
......
......@@ -20,7 +20,6 @@ from fs import osfs
class TestOSFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
def setUp(self):
sys.setcheckinterval(1)
self.temp_dir = tempfile.mkdtemp(u"fstest")
self.fs = osfs.OSFS(self.temp_dir)
......@@ -33,8 +32,7 @@ class TestOSFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
class TestSubFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
def setUp(self):
sys.setcheckinterval(1)
def setUp(self):
self.temp_dir = tempfile.mkdtemp(u"fstest")
self.parent_fs = osfs.OSFS(self.temp_dir)
self.parent_fs.makedir("foo/bar", recursive=True)
......@@ -53,7 +51,6 @@ from fs import memoryfs
class TestMemoryFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
def setUp(self):
sys.setcheckinterval(1)
self.fs = memoryfs.MemoryFS()
......@@ -61,15 +58,11 @@ from fs import mountfs
class TestMountFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
def setUp(self):
sys.setcheckinterval(1)
self.mount_fs = mountfs.MountFS()
self.mem_fs = memoryfs.MemoryFS()
self.mount_fs.mountdir("mounted/memfs", self.mem_fs)
self.fs = self.mount_fs.opendir("mounted/memfs")
def tearDown(self):
pass
def check(self, p):
return self.mount_fs.exists(os.path.join("mounted/memfs", relpath(p)))
......
......@@ -28,18 +28,15 @@ class TestFTPFS(unittest.TestCase, FSTestCases, ThreadingTestCases):
#ftp_port += 1
use_port = str(ftp_port)
#ftp_port = 10000
sys.setcheckinterval(1)
self.temp_dir = tempfile.mkdtemp(u"ftpfstests")
self.ftp_server = subprocess.Popen(['python', abspath(__file__), self.temp_dir, str(use_port)])
self.ftp_server = subprocess.Popen([sys.executable, abspath(__file__), self.temp_dir, str(use_port)])
# Need to sleep to allow ftp server to start
time.sleep(.2)
self.fs = ftpfs.FTPFS('127.0.0.1', 'user', '12345', port=use_port, timeout=5.0)
def tearDown(self):
if sys.platform == 'win32':
import win32api
win32api.TerminateProcess(int(process._handle), -1)
......
......@@ -22,22 +22,26 @@ class TestCacheFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
"""Test simple operation of CacheFS"""
def setUp(self):
self._check_interval = sys.getcheckinterval()
sys.setcheckinterval(10)
self.fs = CacheFS(TempFS())
def tearDown(self):
self.fs.close()
sys.setcheckinterval(self._check_interval)
class TestConnectionManagerFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
"""Test simple operation of ConnectionManagerFS"""
def setUp(self):
self._check_interval = sys.getcheckinterval()
sys.setcheckinterval(10)
self.fs = ConnectionManagerFS(TempFS())
def tearDown(self):
self.fs.close()
sys.setcheckinterval(self._check_interval)
class DisconnectingFS(WrapFS):
......@@ -108,11 +112,13 @@ class TestConnectionManagerFS_disconnect(TestConnectionManagerFS):
"""Test ConnectionManagerFS's ability to wait for reconnection."""
def setUp(self):
self._check_interval = sys.getcheckinterval()
sys.setcheckinterval(10)
c_fs = ConnectionManagerFS(DisconnectingFS,poll_interval=0.1)
self.fs = DisconnectRecoveryFS(c_fs)
def tearDown(self):
self.fs.close()
sys.setcheckinterval(self._check_interval)
......@@ -8,7 +8,8 @@ This module provides the class LimitSizeFS, an FS wrapper that can limit the
total size of files stored in the wrapped FS.
"""
# for Python2.5 compatibility
from __future__ import with_statement
from fs.errors import *
from fs.base import FS, threading, synchronize
from fs.wrapfs import WrapFS
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment