Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
P
pyfs
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
OpenEdx
pyfs
Commits
ea0d2e43
Commit
ea0d2e43
authored
Dec 27, 2011
by
willmcgugan
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Python3 changes
parent
3b900b1c
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
55 additions
and
34 deletions
+55
-34
ChangeLog
+2
-0
fs/expose/fuse/__init__.py
+24
-12
fs/filelike.py
+3
-3
fs/ftpfs.py
+5
-5
fs/s3fs.py
+2
-0
fs/sftpfs.py
+2
-0
fs/tests/__init__.py
+11
-12
fs/tests/test_zipfs.py
+5
-2
tox.ini
+1
-0
No files found.
ChangeLog
View file @
ea0d2e43
...
...
@@ -81,3 +81,5 @@
* Added `appdirfs` module to abstract per-user application directories
0.5:
* Ported to Python 3.X
fs/expose/fuse/__init__.py
View file @
ea0d2e43
...
...
@@ -67,12 +67,14 @@ from fs.path import *
from
fs.local_functools
import
wraps
from
six
import
PY3
from
six
import
b
try
:
if
PY3
:
import
fuse3
as
fuse
else
:
import
fuse
#if PY3:
# import fuse3 as fuse
#else:
# import fuse
import
fuse_ctypes
as
fuse
except
NotImplementedError
:
raise
ImportError
(
"FUSE found but not usable"
)
try
:
...
...
@@ -530,8 +532,12 @@ class MountProcess(subprocess.Popen):
def
__init__
(
self
,
fs
,
path
,
fuse_opts
=
{},
nowait
=
False
,
**
kwds
):
self
.
path
=
path
if
nowait
or
kwds
.
get
(
"close_fds"
,
False
):
cmd
=
'import cPickle; '
cmd
=
cmd
+
'data = cPickle.loads(
%
s); '
if
PY3
:
cmd
=
"from pickle import loads;"
else
:
cmd
=
"from cPickle import loads;"
#cmd = 'import cPickle; '
cmd
=
cmd
+
'data = loads(
%
s); '
cmd
=
cmd
+
'from fs.expose.fuse import MountProcess; '
cmd
=
cmd
+
'MountProcess._do_mount_nowait(data)'
cmd
=
cmd
%
(
repr
(
cPickle
.
dumps
((
fs
,
path
,
fuse_opts
),
-
1
)),)
...
...
@@ -539,15 +545,21 @@ class MountProcess(subprocess.Popen):
super
(
MountProcess
,
self
)
.
__init__
(
cmd
,
**
kwds
)
else
:
(
r
,
w
)
=
os
.
pipe
()
cmd
=
'import cPickle; '
cmd
=
cmd
+
'data = cPickle.loads(
%
s); '
if
PY3
:
cmd
=
"from pickle import loads;"
else
:
cmd
=
"from cPickle import loads;"
#cmd = 'import cPickle; '
cmd
=
cmd
+
'data = loads(
%
s); '
cmd
=
cmd
+
'from fs.expose.fuse import MountProcess; '
cmd
=
cmd
+
'MountProcess._do_mount_wait(data)'
cmd
=
cmd
%
(
repr
(
cPickle
.
dumps
((
fs
,
path
,
fuse_opts
,
r
,
w
),
-
1
)),)
cmd
=
[
sys
.
executable
,
"-c"
,
cmd
]
super
(
MountProcess
,
self
)
.
__init__
(
cmd
,
**
kwds
)
os
.
close
(
w
)
if
os
.
read
(
r
,
1
)
!=
"S"
:
byte
=
os
.
read
(
r
,
1
)
if
byte
!=
b
(
"S"
):
self
.
terminate
()
raise
RuntimeError
(
"FUSE error: "
+
os
.
read
(
r
,
20
)
.
decode
(
NATIVE_ENCODING
))
...
...
@@ -595,7 +607,7 @@ class MountProcess(subprocess.Popen):
successful
=
[]
def
ready_callback
():
successful
.
append
(
True
)
os
.
write
(
w
,
"S"
)
os
.
write
(
w
,
b
(
"S"
)
)
os
.
close
(
w
)
opts
[
"ready_callback"
]
=
ready_callback
def
unmount_callback
():
...
...
@@ -604,11 +616,11 @@ class MountProcess(subprocess.Popen):
try
:
mount
(
fs
,
path
,
**
opts
)
except
Exception
,
e
:
os
.
write
(
w
,
"E"
+
str
(
e
))
os
.
write
(
w
,
b
(
"E"
)
+
b
(
e
))
os
.
close
(
w
)
else
:
if
not
successful
:
os
.
write
(
w
,
"E"
)
os
.
write
(
w
,
b
(
"E"
)
)
os
.
close
(
w
)
...
...
fs/filelike.py
View file @
ea0d2e43
...
...
@@ -49,7 +49,7 @@ import six
from
six
import
PY3
,
b
if
PY3
:
_StringIO
=
six
.
Bytes
IO
from
six
import
BytesIO
as
_String
IO
else
:
try
:
from
cStringIO
import
StringIO
as
_StringIO
...
...
@@ -721,8 +721,8 @@ class SpooledTemporaryFile(FileWrapper):
try
:
stf_args
=
(
max_size
,
mode
,
bufsize
)
+
args
wrapped_file
=
_tempfile
.
SpooledTemporaryFile
(
*
stf_args
,
**
kwds
)
#
wrapped_file._file = StringIO()
wrapped_file
.
_file
=
six
.
BytesIO
()
wrapped_file
.
_file
=
StringIO
()
#
wrapped_file._file = six.BytesIO()
self
.
__is_spooled
=
True
except
AttributeError
:
ntf_args
=
(
mode
,
bufsize
)
+
args
...
...
fs/ftpfs.py
View file @
ea0d2e43
...
...
@@ -30,7 +30,7 @@ from socket import error as socket_error
from
fs.local_functools
import
wraps
import
six
from
six
import
PY3
from
six
import
PY3
,
b
if
PY3
:
from
six
import
BytesIO
as
StringIO
...
...
@@ -654,7 +654,7 @@ class _FTPFile(object):
@fileftperrors
def
read
(
self
,
size
=
None
):
if
self
.
conn
is
None
:
return
''
return
b
(
''
)
chunks
=
[]
if
size
is
None
or
size
<
0
:
...
...
@@ -682,7 +682,7 @@ class _FTPFile(object):
self
.
read_pos
+=
len
(
data
)
remaining_bytes
-=
len
(
data
)
return
''
.
join
(
chunks
)
return
b
(
''
)
.
join
(
chunks
)
@fileftperrors
def
write
(
self
,
data
):
...
...
@@ -812,11 +812,11 @@ class _FTPFile(object):
This isn't terribly efficient. It would probably be better to do
a read followed by splitlines.
"""
endings
=
'
\r\n
'
endings
=
b
(
'
\r\n
'
)
chars
=
[]
append
=
chars
.
append
read
=
self
.
read
join
=
''
.
join
join
=
b
(
''
)
.
join
while
True
:
char
=
read
(
1
)
if
not
char
:
...
...
fs/s3fs.py
View file @
ea0d2e43
...
...
@@ -2,6 +2,8 @@
fs.s3fs
=======
**Currently only avaiable on Python2 due to boto not being available for Python3**
FS subclass accessing files in Amazon S3
This module provides the class 'S3FS', which implements the FS filesystem
...
...
fs/sftpfs.py
View file @
ea0d2e43
...
...
@@ -2,6 +2,8 @@
fs.sftpfs
=========
**Currently only avaiable on Python2 due to paramiko not being available for Python3**
Filesystem accessing an SFTP server (via paramiko)
"""
...
...
fs/tests/__init__.py
View file @
ea0d2e43
...
...
@@ -721,19 +721,18 @@ class FSTestCases(object):
checkcontents
(
"hello"
,
b
(
"hi"
))
self
.
fs
.
setcontents
(
"hello"
,
b
(
"1234567890"
))
checkcontents
(
"hello"
,
b
(
"1234567890"
))
with
self
.
fs
.
open
(
"hello"
,
"r+"
)
as
f
:
with
self
.
fs
.
open
(
"hello"
,
"r
b
+"
)
as
f
:
f
.
truncate
(
7
)
checkcontents
(
"hello"
,
b
(
"1234567"
))
with
self
.
fs
.
open
(
"hello"
,
"r+"
)
as
f
:
with
self
.
fs
.
open
(
"hello"
,
"r
b
+"
)
as
f
:
f
.
seek
(
5
)
f
.
truncate
()
checkcontents
(
"hello"
,
b
(
"12345"
))
def
test_truncate_to_larger_size
(
self
):
print
repr
(
self
.
fs
)
print
self
.
fs
.
__class__
with
self
.
fs
.
open
(
"hello"
,
"wb"
)
as
f
:
f
.
truncate
(
30
)
self
.
assertEquals
(
self
.
fs
.
getsize
(
"hello"
),
30
)
# Some file systems (FTPFS) don't support both reading and writing
...
...
@@ -792,9 +791,9 @@ class FSTestCases(object):
def
test_big_file
(
self
):
"""Test handling of a big file (1MB)"""
chunk_size
=
1024
*
256
num_chunks
=
4
def
chunk_stream
():
chunk_size
=
1024
*
256
num_chunks
=
4
def
chunk_stream
():
"""Generate predictable-but-randomy binary content."""
r
=
random
.
Random
(
0
)
randint
=
r
.
randint
...
...
@@ -842,7 +841,7 @@ def chunk_stream():
self
.
assertTrue
(
cmp_datetimes
(
d2
,
info
[
'modified_time'
]))
#
Disabled - see below
#
May be disabled - see end of file
class
ThreadingTestCases
(
object
):
"""Testcases for thread-safety of FS implementations."""
...
...
@@ -911,7 +910,7 @@ class ThreadingTestCases(object):
def
test_setcontents_threaded_samefile
(
self
):
def
setcontents
(
name
,
contents
):
f
=
self
.
fs
.
open
(
name
,
"w"
)
f
=
self
.
fs
.
open
(
name
,
"w
b
"
)
self
.
_yield
()
try
:
f
.
write
(
contents
)
...
...
@@ -1057,6 +1056,6 @@ class ThreadingTestCases(object):
self
.
assertEquals
(
self
.
fs
.
getcontents
(
"thread2.txt"
,
'rb'
),
c
)
self
.
_runThreads
(
thread1
,
thread2
)
class
ThreadingTestCases
(
object
):
_dont_retest
=
()
# Uncomment to temporarily disable threading tests
#
class ThreadingTestCases(object):
#
_dont_retest = ()
fs/tests/test_zipfs.py
View file @
ea0d2e43
...
...
@@ -15,7 +15,7 @@ import fs.tests
from
fs.path
import
*
from
fs
import
zipfs
from
six
import
b
from
six
import
PY3
,
b
class
TestReadZipFS
(
unittest
.
TestCase
):
...
...
@@ -122,9 +122,12 @@ class TestWriteZipFS(unittest.TestCase):
def
test_creation
(
self
):
zf
=
zipfile
.
ZipFile
(
self
.
temp_filename
,
"r"
)
def
check_contents
(
filename
,
contents
):
if
PY3
:
zcontents
=
zf
.
read
(
filename
)
else
:
zcontents
=
zf
.
read
(
filename
.
encode
(
"CP437"
))
self
.
assertEqual
(
contents
,
zcontents
)
check_contents
(
"a.txt"
,
b
(
)
"Hello, World!"
)
check_contents
(
"a.txt"
,
b
(
"Hello, World!"
)
)
check_contents
(
"b.txt"
,
b
(
"b"
))
check_contents
(
"foo/bar/baz.txt"
,
b
(
"baz"
))
check_contents
(
"foo/second.txt"
,
b
(
"hai"
))
...
...
tox.ini
View file @
ea0d2e43
...
...
@@ -31,4 +31,5 @@ deps = distribute
six
dexml
nose
winpdb
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment