Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
P
pyfs
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
OpenEdx
pyfs
Commits
0f20660a
Commit
0f20660a
authored
Nov 20, 2009
by
rfkelly0
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
insist on unicode paths throughout
parent
8a99f8d5
Show whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
218 additions
and
37 deletions
+218
-37
ChangeLog
+3
-0
fs/base.py
+4
-3
fs/expose/sftp.py
+24
-4
fs/expose/xmlrpc.py
+53
-5
fs/memoryfs.py
+3
-0
fs/osfs.py
+9
-2
fs/rpcfs.py
+52
-5
fs/sftpfs.py
+9
-1
fs/tests/__init__.py
+43
-10
fs/tests/test_xattr.py
+3
-0
fs/tests/test_zipfs.py
+6
-1
fs/xattrs.py
+1
-0
fs/zipfs.py
+8
-6
No files found.
ChangeLog
View file @
0f20660a
...
...
@@ -13,6 +13,9 @@
* expose.sftp: expose an FS object SFTP
* expose.django_storage: convert FS object to Django Storage object
* Extended attribute support (getxattr/setxattr/delxattr/listxattrs)
* Insist on unicode paths throughout:
* output paths are always unicode
* bytestring input paths are decoded as early as possible
* Renamed "fs.helpers" to "fs.path", and renamed to contained functions
to match those offered by os.path
* fs.remote: utilities for implementing FS classes that interface
...
...
fs/base.py
View file @
0f20660a
...
...
@@ -195,7 +195,8 @@ class FS(object):
"""Returns the system path (a path recognised by the OS) if present.
If the path does not map to a system path (and allow_none is False)
then a NoSysPathError exception is thrown.
then a NoSysPathError exception is thrown. Otherwise, the system
path will be returned as a unicode string.
path -- A path within the filesystem
allow_none -- If True, this method should return None if there is no
...
...
@@ -267,8 +268,8 @@ class FS(object):
dirs_only -- If True, only return directories
files_only -- If True, only return files
The directory contents are returned as a list of
paths. If the
given path is not found then ResourceNotFoundError is raised;
The directory contents are returned as a list of
unicode paths. If
then
given path is not found then ResourceNotFoundError is raised;
if it exists but is not a directory, ResourceInvalidError is raised.
"""
raise
UnsupportedError
(
"list directory"
)
...
...
fs/expose/sftp.py
View file @
0f20660a
...
...
@@ -77,8 +77,11 @@ class SFTPServerInterface(paramiko.SFTPServerInterface):
paramiko server infrastructure.
"""
def
__init__
(
self
,
server
,
fs
,
*
args
,
**
kwds
):
def
__init__
(
self
,
server
,
fs
,
encoding
=
None
,
*
args
,
**
kwds
):
self
.
fs
=
fs
if
encoding
is
None
:
encoding
=
"utf8"
self
.
encoding
=
encoding
super
(
SFTPServerInterface
,
self
)
.
__init__
(
server
,
*
args
,
**
kwds
)
@report_sftp_errors
...
...
@@ -87,6 +90,8 @@ class SFTPServerInterface(paramiko.SFTPServerInterface):
@report_sftp_errors
def
list_folder
(
self
,
path
):
if
not
isinstance
(
path
,
unicode
):
path
=
path
.
decode
(
self
.
encoding
)
stats
=
[]
for
entry
in
self
.
fs
.
listdir
(
path
,
absolute
=
True
):
stats
.
append
(
self
.
stat
(
entry
))
...
...
@@ -94,9 +99,11 @@ class SFTPServerInterface(paramiko.SFTPServerInterface):
@report_sftp_errors
def
stat
(
self
,
path
):
if
not
isinstance
(
path
,
unicode
):
path
=
path
.
decode
(
self
.
encoding
)
info
=
self
.
fs
.
getinfo
(
path
)
stat
=
paramiko
.
SFTPAttributes
()
stat
.
filename
=
basename
(
path
)
stat
.
filename
=
basename
(
path
)
.
encode
(
self
.
encoding
)
stat
.
st_size
=
info
.
get
(
"size"
)
stat
.
st_atime
=
time
.
mktime
(
info
.
get
(
"accessed_time"
)
.
timetuple
())
stat
.
st_mtime
=
time
.
mktime
(
info
.
get
(
"modified_time"
)
.
timetuple
())
...
...
@@ -111,11 +118,17 @@ class SFTPServerInterface(paramiko.SFTPServerInterface):
@report_sftp_errors
def
remove
(
self
,
path
):
if
not
isinstance
(
path
,
unicode
):
path
=
path
.
decode
(
self
.
encoding
)
self
.
fs
.
remove
(
path
)
return
paramiko
.
SFTP_OK
@report_sftp_errors
def
rename
(
self
,
oldpath
,
newpath
):
if
not
isinstance
(
oldpath
,
unicode
):
oldpath
=
oldpath
.
decode
(
self
.
encoding
)
if
not
isinstance
(
newpath
,
unicode
):
newpath
=
newpath
.
decode
(
self
.
encoding
)
if
self
.
fs
.
isfile
(
oldpath
):
self
.
fs
.
move
(
oldpath
,
newpath
)
else
:
...
...
@@ -124,11 +137,15 @@ class SFTPServerInterface(paramiko.SFTPServerInterface):
@report_sftp_errors
def
mkdir
(
self
,
path
,
attr
):
if
not
isinstance
(
path
,
unicode
):
path
=
path
.
decode
(
self
.
encoding
)
self
.
fs
.
makedir
(
path
)
return
paramiko
.
SFTP_OK
@report_sftp_errors
def
rmdir
(
self
,
path
):
if
not
isinstance
(
path
,
unicode
):
path
=
path
.
decode
(
self
.
encoding
)
self
.
fs
.
removedir
(
path
)
return
paramiko
.
SFTP_OK
...
...
@@ -156,6 +173,8 @@ class SFTPHandle(paramiko.SFTPHandle):
super
(
SFTPHandle
,
self
)
.
__init__
(
flags
)
mode
=
flags_to_mode
(
flags
)
+
"b"
self
.
owner
=
owner
if
not
isinstance
(
path
,
unicode
):
path
=
path
.
decode
(
self
.
owner
.
encoding
)
self
.
path
=
path
self
.
_file
=
owner
.
fs
.
open
(
path
,
mode
)
...
...
@@ -194,7 +213,7 @@ class SFTPRequestHandler(sockserv.StreamRequestHandler):
def
handle
(
self
):
t
=
paramiko
.
Transport
(
self
.
request
)
t
.
add_server_key
(
self
.
server
.
host_key
)
t
.
set_subsystem_handler
(
"sftp"
,
paramiko
.
SFTPServer
,
SFTPServerInterface
,
self
.
server
.
fs
)
t
.
set_subsystem_handler
(
"sftp"
,
paramiko
.
SFTPServer
,
SFTPServerInterface
,
self
.
server
.
fs
,
getattr
(
self
.
server
,
"encoding"
,
None
)
)
# Note that this actually spawns a new thread to handle the requests.
# (Actually, paramiko.Transport is a subclass of Thread)
t
.
start_server
(
server
=
self
.
server
)
...
...
@@ -228,8 +247,9 @@ class BaseSFTPServer(sockserv.TCPServer,paramiko.ServerInterface):
"""
def
__init__
(
self
,
address
,
fs
=
None
,
host_key
=
None
,
RequestHandlerClass
=
None
):
def
__init__
(
self
,
address
,
fs
=
None
,
encoding
=
None
,
host_key
=
None
,
RequestHandlerClass
=
None
):
self
.
fs
=
fs
self
.
encoding
=
encoding
if
host_key
is
None
:
host_key
=
DEFAULT_HOST_KEY
self
.
host_key
=
host_key
...
...
fs/expose/xmlrpc.py
View file @
0f20660a
...
...
@@ -27,59 +27,107 @@ class RPCFSInterface(object):
def
__init__
(
self
,
fs
):
self
.
fs
=
fs
def
encode_path
(
self
,
path
):
"""Encode a filesystem path for sending over the wire.
Unfortunately XMLRPC only supports ASCII strings, so this method
must return something that can be represented in ASCII. The default
is base64-encoded UTF-8.
"""
return
path
.
encode
(
"utf8"
)
.
encode
(
"base64"
)
def
decode_path
(
self
,
path
):
"""Decode paths arriving over the wire."""
return
path
.
decode
(
"base64"
)
.
decode
(
"utf8"
)
def
get_contents
(
self
,
path
):
path
=
self
.
decode_path
(
path
)
data
=
self
.
fs
.
getcontents
(
path
)
return
xmlrpclib
.
Binary
(
data
)
def
set_contents
(
self
,
path
,
data
):
path
=
self
.
decode_path
(
path
)
self
.
fs
.
createfile
(
path
,
data
.
data
)
def
exists
(
self
,
path
):
path
=
self
.
decode_path
(
path
)
return
self
.
fs
.
exists
(
path
)
def
isdir
(
self
,
path
):
path
=
self
.
decode_path
(
path
)
return
self
.
fs
.
isdir
(
path
)
def
isfile
(
self
,
path
):
path
=
self
.
decode_path
(
path
)
return
self
.
fs
.
isfile
(
path
)
def
listdir
(
self
,
path
=
"./"
,
wildcard
=
None
,
full
=
False
,
absolute
=
False
,
dirs_only
=
False
,
files_only
=
False
):
return
list
(
self
.
fs
.
listdir
(
path
,
wildcard
,
full
,
absolute
,
dirs_only
,
files_only
))
path
=
self
.
decode_path
(
path
)
entries
=
self
.
fs
.
listdir
(
path
,
wildcard
,
full
,
absolute
,
dirs_only
,
files_only
)
return
[
self
.
encode_path
(
e
)
for
e
in
entries
]
def
makedir
(
self
,
path
,
recursive
=
False
,
allow_recreate
=
False
):
path
=
self
.
decode_path
(
path
)
return
self
.
fs
.
makedir
(
path
,
recursive
,
allow_recreate
)
def
remove
(
self
,
path
):
path
=
self
.
decode_path
(
path
)
return
self
.
fs
.
remove
(
path
)
def
removedir
(
self
,
path
,
recursive
=
False
,
force
=
False
):
path
=
self
.
decode_path
(
path
)
return
self
.
fs
.
removedir
(
path
,
recursive
,
force
)
def
rename
(
self
,
src
,
dst
):
src
=
self
.
decode_path
(
src
)
dst
=
self
.
decode_path
(
dst
)
return
self
.
fs
.
rename
(
src
,
dst
)
def
getinfo
(
self
,
path
):
path
=
self
.
decode_path
(
path
)
return
self
.
fs
.
getinfo
(
path
)
def
desc
(
self
,
path
):
path
=
self
.
decode_path
(
path
)
return
self
.
fs
.
desc
(
path
)
def
getattr
(
self
,
path
,
attr
):
return
self
.
fs
.
getattr
(
path
,
attr
)
def
getxattr
(
self
,
path
,
attr
,
default
=
None
):
path
=
self
.
decode_path
(
path
)
attr
=
self
.
decode_path
(
attr
)
return
self
.
fs
.
getxattr
(
path
,
attr
,
default
)
def
setxattr
(
self
,
path
,
attr
,
value
):
path
=
self
.
decode_path
(
path
)
attr
=
self
.
decode_path
(
attr
)
return
self
.
fs
.
setxattr
(
path
,
attr
,
value
)
def
delxattr
(
self
,
path
,
attr
):
path
=
self
.
decode_path
(
path
)
attr
=
self
.
decode_path
(
attr
)
return
self
.
fs
.
delxattr
(
path
,
attr
)
def
setattr
(
self
,
path
,
attr
,
value
):
return
self
.
fs
.
setattr
(
path
,
attr
,
value
)
def
listxattrs
(
self
,
path
):
path
=
self
.
decode_path
(
path
)
return
[
self
.
encode_path
(
a
)
for
a
in
self
.
fs
.
listxattrs
(
path
)]
def
copy
(
self
,
src
,
dst
,
overwrite
=
False
,
chunk_size
=
16384
):
src
=
self
.
decode_path
(
src
)
dst
=
self
.
decode_path
(
dst
)
return
self
.
fs
.
copy
(
src
,
dst
,
overwrite
,
chunk_size
)
def
move
(
self
,
src
,
dst
,
overwrite
=
False
,
chunk_size
=
16384
):
src
=
self
.
decode_path
(
src
)
dst
=
self
.
decode_path
(
dst
)
return
self
.
fs
.
move
(
src
,
dst
,
overwrite
,
chunk_size
)
def
movedir
(
self
,
src
,
dst
,
overwrite
=
False
,
ignore_errors
=
False
,
chunk_size
=
16384
):
src
=
self
.
decode_path
(
src
)
dst
=
self
.
decode_path
(
dst
)
return
self
.
fs
.
movedir
(
src
,
dst
,
overwrite
,
ignore_errors
,
chunk_size
)
def
copydir
(
self
,
src
,
dst
,
overwrite
=
False
,
ignore_errors
=
False
,
chunk_size
=
16384
):
src
=
self
.
decode_path
(
src
)
dst
=
self
.
decode_path
(
dst
)
return
self
.
fs
.
copydir
(
src
,
dst
,
overwrite
,
ignore_errors
,
chunk_size
)
...
...
fs/memoryfs.py
View file @
0f20660a
...
...
@@ -438,6 +438,9 @@ class MemoryFS(FS):
if
dir_entry
.
isfile
():
raise
ResourceInvalidError
(
path
,
msg
=
"that's a file, not a directory:
%(path)
s"
)
paths
=
dir_entry
.
contents
.
keys
()
for
(
i
,
p
)
in
enumerate
(
paths
):
if
not
isinstance
(
p
,
unicode
):
paths
[
i
]
=
unicode
(
p
)
return
self
.
_listdir_helper
(
path
,
paths
,
wildcard
,
full
,
absolute
,
dirs_only
,
files_only
)
@synchronize
...
...
fs/osfs.py
View file @
0f20660a
...
...
@@ -21,8 +21,9 @@ class OSFS(FS):
methods in the os and os.path modules.
"""
def
__init__
(
self
,
root_path
,
dir_mode
=
0700
,
thread_synchronize
=
True
):
def
__init__
(
self
,
root_path
,
dir_mode
=
0700
,
thread_synchronize
=
True
,
encoding
=
None
):
FS
.
__init__
(
self
,
thread_synchronize
=
thread_synchronize
)
self
.
encoding
=
encoding
root_path
=
os
.
path
.
expanduser
(
os
.
path
.
expandvars
(
root_path
))
root_path
=
os
.
path
.
normpath
(
os
.
path
.
abspath
(
root_path
))
# Enable long pathnames on win32
...
...
@@ -41,7 +42,13 @@ class OSFS(FS):
def
getsyspath
(
self
,
path
,
allow_none
=
False
):
path
=
relpath
(
normpath
(
path
))
.
replace
(
"/"
,
os
.
sep
)
return
os
.
path
.
join
(
self
.
root_path
,
path
)
path
=
os
.
path
.
join
(
self
.
root_path
,
path
)
if
not
isinstance
(
path
,
unicode
):
if
self
.
encoding
is
None
:
path
=
path
.
decode
(
sys
.
getfilesystemencoding
())
else
:
path
=
path
.
decode
(
self
.
encoding
)
return
path
@convert_os_errors
def
open
(
self
,
path
,
mode
=
"r"
,
**
kwargs
):
...
...
fs/rpcfs.py
View file @
0f20660a
...
...
@@ -132,8 +132,22 @@ class RPCFS(FS):
self
.
__dict__
[
k
]
=
v
self
.
proxy
=
self
.
_make_proxy
()
def
encode_path
(
self
,
path
):
"""Encode a filesystem path for sending over the wire.
Unfortunately XMLRPC only supports ASCII strings, so this method
must return something that can be represented in ASCII. The default
is base64-encoded UTF8.
"""
return
path
.
encode
(
"utf8"
)
.
encode
(
"base64"
)
def
decode_path
(
self
,
path
):
"""Decode paths arriving over the wire."""
return
path
.
decode
(
"base64"
)
.
decode
(
"utf8"
)
def
open
(
self
,
path
,
mode
=
"r"
):
# TODO: chunked transport of large files
path
=
self
.
encode_path
(
path
)
if
"w"
in
mode
:
self
.
proxy
.
set_contents
(
path
,
xmlrpclib
.
Binary
(
""
))
if
"r"
in
mode
or
"a"
in
mode
or
"+"
in
mode
:
...
...
@@ -165,51 +179,84 @@ class RPCFS(FS):
return
f
def
exists
(
self
,
path
):
path
=
self
.
encode_path
(
path
)
return
self
.
proxy
.
exists
(
path
)
def
isdir
(
self
,
path
):
path
=
self
.
encode_path
(
path
)
return
self
.
proxy
.
isdir
(
path
)
def
isfile
(
self
,
path
):
path
=
self
.
encode_path
(
path
)
return
self
.
proxy
.
isfile
(
path
)
def
listdir
(
self
,
path
=
"./"
,
wildcard
=
None
,
full
=
False
,
absolute
=
False
,
dirs_only
=
False
,
files_only
=
False
):
return
self
.
proxy
.
listdir
(
path
,
wildcard
,
full
,
absolute
,
dirs_only
,
files_only
)
path
=
self
.
encode_path
(
path
)
entries
=
self
.
proxy
.
listdir
(
path
,
wildcard
,
full
,
absolute
,
dirs_only
,
files_only
)
return
[
self
.
decode_path
(
e
)
for
e
in
entries
]
def
makedir
(
self
,
path
,
recursive
=
False
,
allow_recreate
=
False
):
path
=
self
.
encode_path
(
path
)
return
self
.
proxy
.
makedir
(
path
,
recursive
,
allow_recreate
)
def
remove
(
self
,
path
):
path
=
self
.
encode_path
(
path
)
return
self
.
proxy
.
remove
(
path
)
def
removedir
(
self
,
path
,
recursive
=
False
,
force
=
False
):
path
=
self
.
encode_path
(
path
)
return
self
.
proxy
.
removedir
(
path
,
recursive
,
force
)
def
rename
(
self
,
src
,
dst
):
src
=
self
.
encode_path
(
src
)
dst
=
self
.
encode_path
(
dst
)
return
self
.
proxy
.
rename
(
src
,
dst
)
def
getinfo
(
self
,
path
):
path
=
self
.
encode_path
(
path
)
return
self
.
proxy
.
getinfo
(
path
)
def
desc
(
self
,
path
):
path
=
self
.
encode_path
(
path
)
return
self
.
proxy
.
desc
(
path
)
def
getattr
(
self
,
path
,
attr
):
return
self
.
proxy
.
getattr
(
path
,
attr
)
def
getxattr
(
self
,
path
,
attr
,
default
=
None
):
path
=
self
.
encode_path
(
path
)
attr
=
self
.
encode_path
(
attr
)
return
self
.
fs
.
getxattr
(
path
,
attr
,
default
)
def
setxattr
(
self
,
path
,
attr
,
value
):
path
=
self
.
encode_path
(
path
)
attr
=
self
.
encode_path
(
attr
)
return
self
.
fs
.
setxattr
(
path
,
attr
,
value
)
def
delxattr
(
self
,
path
,
attr
):
path
=
self
.
encode_path
(
path
)
attr
=
self
.
encode_path
(
attr
)
return
self
.
fs
.
delxattr
(
path
,
attr
)
def
setattr
(
self
,
path
,
attr
,
value
):
return
self
.
proxy
.
setattr
(
path
,
attr
,
value
)
def
listxattrs
(
self
,
path
):
path
=
self
.
encode_path
(
path
)
return
[
self
.
decode_path
(
a
)
for
a
in
self
.
fs
.
listxattrs
(
path
)]
def
copy
(
self
,
src
,
dst
,
overwrite
=
False
,
chunk_size
=
16384
):
src
=
self
.
encode_path
(
src
)
dst
=
self
.
encode_path
(
dst
)
return
self
.
proxy
.
copy
(
src
,
dst
,
overwrite
,
chunk_size
)
def
move
(
self
,
src
,
dst
,
overwrite
=
False
,
chunk_size
=
16384
):
src
=
self
.
encode_path
(
src
)
dst
=
self
.
encode_path
(
dst
)
return
self
.
proxy
.
move
(
src
,
dst
,
overwrite
,
chunk_size
)
def
movedir
(
self
,
src
,
dst
,
overwrite
=
False
,
ignore_errors
=
False
,
chunk_size
=
16384
):
src
=
self
.
encode_path
(
src
)
dst
=
self
.
encode_path
(
dst
)
return
self
.
proxy
.
movedir
(
src
,
dst
,
overwrite
,
ignore_errors
,
chunk_size
)
def
copydir
(
self
,
src
,
dst
,
overwrite
=
False
,
ignore_errors
=
False
,
chunk_size
=
16384
):
src
=
self
.
encode_path
(
src
)
dst
=
self
.
encode_path
(
dst
)
return
self
.
proxy
.
copydir
(
src
,
dst
,
overwrite
,
ignore_errors
,
chunk_size
)
fs/sftpfs.py
View file @
0f20660a
...
...
@@ -40,7 +40,7 @@ class SFTPFS(FS):
class in the paramiko module.
"""
def
__init__
(
self
,
connection
,
root_path
=
"/"
,
**
credentials
):
def
__init__
(
self
,
connection
,
root_path
=
"/"
,
encoding
=
None
,
**
credentials
):
"""SFTPFS constructor.
The only required argument is 'connection', which must be something
...
...
@@ -57,6 +57,9 @@ class SFTPFS(FS):
other keyword arguments are assumed to be credentials to be used when
connecting the transport.
"""
if
encoding
is
None
:
encoding
=
"utf8"
self
.
encoding
=
encoding
self
.
closed
=
False
self
.
_owns_transport
=
False
self
.
_credentials
=
credentials
...
...
@@ -111,6 +114,8 @@ class SFTPFS(FS):
self
.
_transport
.
close
()
def
_normpath
(
self
,
path
):
if
not
isinstance
(
path
,
unicode
):
path
=
path
.
decode
(
self
.
encoding
)
npath
=
pathjoin
(
self
.
root_path
,
relpath
(
normpath
(
path
)))
if
not
isprefix
(
self
.
root_path
,
npath
):
raise
PathError
(
path
,
msg
=
"Path is outside root:
%(path)
s"
)
...
...
@@ -173,6 +178,9 @@ class SFTPFS(FS):
elif
self
.
isfile
(
path
):
raise
ResourceInvalidError
(
path
,
msg
=
"Can't list directory contents of a file:
%(path)
s"
)
raise
for
(
i
,
p
)
in
enumerate
(
paths
):
if
not
isinstance
(
p
,
unicode
):
paths
[
i
]
=
p
.
decode
(
self
.
encoding
)
return
self
.
_listdir_helper
(
path
,
paths
,
wildcard
,
full
,
absolute
,
dirs_only
,
files_only
)
@convert_os_errors
...
...
fs/tests/__init__.py
View file @
0f20660a
...
...
@@ -49,6 +49,17 @@ class FSTestCases:
self
.
fs
.
getinfo
(
""
)
self
.
fs
.
getinfo
(
"/"
)
def
test_getsyspath
(
self
):
try
:
syspath
=
self
.
fs
.
getsyspath
(
"/"
)
except
NoSysPathError
:
pass
else
:
self
.
assertTrue
(
isinstance
(
syspath
,
unicode
))
syspath
=
self
.
fs
.
getsyspath
(
"/"
,
allow_none
=
True
)
if
syspath
is
not
None
:
self
.
assertTrue
(
isinstance
(
syspath
,
unicode
))
def
test_debug
(
self
):
str
(
self
.
fs
)
repr
(
self
.
fs
)
...
...
@@ -87,23 +98,30 @@ class FSTestCases:
self
.
assertFalse
(
self
.
fs
.
exists
(
"a.txt"
))
def
test_listdir
(
self
):
self
.
fs
.
createfile
(
"a"
)
def
check_unicode
(
items
):
for
item
in
items
:
self
.
assertTrue
(
isinstance
(
item
,
unicode
))
self
.
fs
.
createfile
(
u"a"
)
self
.
fs
.
createfile
(
"b"
)
self
.
fs
.
createfile
(
"foo"
)
self
.
fs
.
createfile
(
"bar"
)
# Test listing of the root directory
d1
=
self
.
fs
.
listdir
()
self
.
assertEqual
(
len
(
d1
),
4
)
self
.
assertEqual
(
sorted
(
d1
),
[
"a"
,
"b"
,
"bar"
,
"foo"
])
self
.
assertEqual
(
sorted
(
d1
),
[
u"a"
,
u"b"
,
u"bar"
,
u"foo"
])
check_unicode
(
d1
)
d1
=
self
.
fs
.
listdir
(
""
)
self
.
assertEqual
(
len
(
d1
),
4
)
self
.
assertEqual
(
sorted
(
d1
),
[
"a"
,
"b"
,
"bar"
,
"foo"
])
self
.
assertEqual
(
sorted
(
d1
),
[
u"a"
,
u"b"
,
u"bar"
,
u"foo"
])
check_unicode
(
d1
)
d1
=
self
.
fs
.
listdir
(
"/"
)
self
.
assertEqual
(
len
(
d1
),
4
)
check_unicode
(
d1
)
# Test listing absolute paths
d2
=
self
.
fs
.
listdir
(
absolute
=
True
)
self
.
assertEqual
(
len
(
d2
),
4
)
self
.
assertEqual
(
sorted
(
d2
),
[
"/a"
,
"/b"
,
"/bar"
,
"/foo"
])
self
.
assertEqual
(
sorted
(
d2
),
[
u"/a"
,
u"/b"
,
u"/bar"
,
u"/foo"
])
check_unicode
(
d2
)
# Create some deeper subdirectories, to make sure their
# contents are not inadvertantly included
self
.
fs
.
makedir
(
"p/1/2/3"
,
recursive
=
True
)
...
...
@@ -116,24 +134,39 @@ class FSTestCases:
dirs_only
=
self
.
fs
.
listdir
(
dirs_only
=
True
)
files_only
=
self
.
fs
.
listdir
(
files_only
=
True
)
contains_a
=
self
.
fs
.
listdir
(
wildcard
=
"*a*"
)
self
.
assertEqual
(
sorted
(
dirs_only
),
[
"p"
,
"q"
])
self
.
assertEqual
(
sorted
(
files_only
),
[
"a"
,
"b"
,
"bar"
,
"foo"
])
self
.
assertEqual
(
sorted
(
contains_a
),
[
"a"
,
"bar"
])
self
.
assertEqual
(
sorted
(
dirs_only
),
[
u"p"
,
u"q"
])
self
.
assertEqual
(
sorted
(
files_only
),
[
u"a"
,
u"b"
,
u"bar"
,
u"foo"
])
self
.
assertEqual
(
sorted
(
contains_a
),
[
u"a"
,
u"bar"
])
check_unicode
(
dirs_only
)
check_unicode
(
files_only
)
check_unicode
(
contains_a
)
# Test listing a subdirectory
d3
=
self
.
fs
.
listdir
(
"p/1/2/3"
)
self
.
assertEqual
(
len
(
d3
),
4
)
self
.
assertEqual
(
sorted
(
d3
),
[
"a"
,
"b"
,
"bar"
,
"foo"
])
self
.
assertEqual
(
sorted
(
d3
),
[
u"a"
,
u"b"
,
u"bar"
,
u"foo"
])
check_unicode
(
d3
)
# Test listing a subdirectory with absoliute and full paths
d4
=
self
.
fs
.
listdir
(
"p/1/2/3"
,
absolute
=
True
)
self
.
assertEqual
(
len
(
d4
),
4
)
self
.
assertEqual
(
sorted
(
d4
),
[
"/p/1/2/3/a"
,
"/p/1/2/3/b"
,
"/p/1/2/3/bar"
,
"/p/1/2/3/foo"
])
self
.
assertEqual
(
sorted
(
d4
),
[
u"/p/1/2/3/a"
,
u"/p/1/2/3/b"
,
u"/p/1/2/3/bar"
,
u"/p/1/2/3/foo"
])
check_unicode
(
d4
)
d4
=
self
.
fs
.
listdir
(
"p/1/2/3"
,
full
=
True
)
self
.
assertEqual
(
len
(
d4
),
4
)
self
.
assertEqual
(
sorted
(
d4
),
[
"p/1/2/3/a"
,
"p/1/2/3/b"
,
"p/1/2/3/bar"
,
"p/1/2/3/foo"
])
self
.
assertEqual
(
sorted
(
d4
),
[
u"p/1/2/3/a"
,
u"p/1/2/3/b"
,
u"p/1/2/3/bar"
,
u"p/1/2/3/foo"
])
check_unicode
(
d4
)
# Test that appropriate errors are raised
self
.
assertRaises
(
ResourceNotFoundError
,
self
.
fs
.
listdir
,
"zebra"
)
self
.
assertRaises
(
ResourceInvalidError
,
self
.
fs
.
listdir
,
"foo"
)
def
test_unicode
(
self
):
alpha
=
u"
\N{GREEK SMALL LETTER ALPHA}
"
beta
=
u"
\N{GREEK SMALL LETTER BETA}
"
self
.
fs
.
makedir
(
alpha
)
self
.
fs
.
createfile
(
alpha
+
"/a"
)
self
.
fs
.
createfile
(
alpha
+
"/"
+
beta
)
self
.
check
(
alpha
)
self
.
assertEquals
(
sorted
(
self
.
fs
.
listdir
(
alpha
)),[
"a"
,
beta
])
def
test_makedir
(
self
):
check
=
self
.
check
self
.
fs
.
makedir
(
"a"
)
...
...
fs/tests/test_xattr.py
View file @
0f20660a
...
...
@@ -40,8 +40,11 @@ class XAttrTestCases:
self
.
fs
.
setxattr
(
p
,
"xattr1"
,
"value1"
)
self
.
assertEquals
(
self
.
fs
.
getxattr
(
p
,
"xattr1"
),
"value1"
)
self
.
assertEquals
(
sorted
(
self
.
fs
.
listxattrs
(
p
)),[
"xattr1"
])
self
.
assertTrue
(
isinstance
(
self
.
fs
.
listxattrs
(
p
)[
0
],
unicode
))
self
.
fs
.
setxattr
(
p
,
"attr2"
,
"value2"
)
self
.
assertEquals
(
sorted
(
self
.
fs
.
listxattrs
(
p
)),[
"attr2"
,
"xattr1"
])
self
.
assertTrue
(
isinstance
(
self
.
fs
.
listxattrs
(
p
)[
0
],
unicode
))
self
.
assertTrue
(
isinstance
(
self
.
fs
.
listxattrs
(
p
)[
1
],
unicode
))
self
.
fs
.
delxattr
(
p
,
"xattr1"
)
self
.
assertEquals
(
sorted
(
self
.
fs
.
listxattrs
(
p
)),[
"attr2"
])
self
.
fs
.
delxattr
(
p
,
"attr2"
)
...
...
fs/tests/test_zipfs.py
View file @
0f20660a
...
...
@@ -79,6 +79,8 @@ class TestReadZipFS(unittest.TestCase):
def
check_listing
(
path
,
expected
):
dir_list
=
self
.
fs
.
listdir
(
path
)
self
.
assert_
(
sorted
(
dir_list
)
==
sorted
(
expected
))
for
item
in
dir_list
:
self
.
assert_
(
isinstance
(
item
,
unicode
))
check_listing
(
'/'
,
[
'a.txt'
,
'1.txt'
,
'foo'
,
'b.txt'
])
check_listing
(
'foo'
,
[
'second.txt'
,
'bar'
])
check_listing
(
'foo/bar'
,
[
'baz.txt'
])
...
...
@@ -101,6 +103,7 @@ class TestWriteZipFS(unittest.TestCase):
makefile
(
"a.txt"
,
"Hello, World!"
)
makefile
(
"b.txt"
,
"b"
)
makefile
(
u"
\N{GREEK SMALL LETTER ALPHA}
/
\N{GREEK CAPITAL LETTER OMEGA}
.txt"
,
"this is the alpha and the omega"
)
makefile
(
"foo/bar/baz.txt"
,
"baz"
)
makefile
(
"foo/second.txt"
,
"hai"
)
...
...
@@ -117,12 +120,13 @@ class TestWriteZipFS(unittest.TestCase):
def
test_creation
(
self
):
zf
=
zipfile
.
ZipFile
(
self
.
temp_filename
,
"r"
)
def
check_contents
(
filename
,
contents
):
zcontents
=
zf
.
read
(
filename
)
zcontents
=
zf
.
read
(
filename
.
encode
(
"CP437"
)
)
self
.
assertEqual
(
contents
,
zcontents
)
check_contents
(
"a.txt"
,
"Hello, World!"
)
check_contents
(
"b.txt"
,
"b"
)
check_contents
(
"foo/bar/baz.txt"
,
"baz"
)
check_contents
(
"foo/second.txt"
,
"hai"
)
check_contents
(
u"
\N{GREEK SMALL LETTER ALPHA}
/
\N{GREEK CAPITAL LETTER OMEGA}
.txt"
,
"this is the alpha and the omega"
)
class
TestAppendZipFS
(
TestWriteZipFS
):
...
...
@@ -147,6 +151,7 @@ class TestAppendZipFS(TestWriteZipFS):
zip_fs
=
zipfs
.
ZipFS
(
self
.
temp_filename
,
'a'
)
makefile
(
"foo/bar/baz.txt"
,
"baz"
)
makefile
(
u"
\N{GREEK SMALL LETTER ALPHA}
/
\N{GREEK CAPITAL LETTER OMEGA}
.txt"
,
"this is the alpha and the omega"
)
makefile
(
"foo/second.txt"
,
"hai"
)
zip_fs
.
close
()
...
...
fs/xattrs.py
View file @
0f20660a
...
...
@@ -104,6 +104,7 @@ class SimulateXAttr(WrapFS):
"""Set an extended attribute on the given path."""
if
not
self
.
exists
(
path
):
raise
ResourceNotFoundError
(
path
)
key
=
unicode
(
key
)
attrs
=
self
.
_get_attr_dict
(
path
)
attrs
[
key
]
=
str
(
value
)
self
.
_set_attr_dict
(
path
,
attrs
)
...
...
fs/zipfs.py
View file @
0f20660a
...
...
@@ -49,13 +49,14 @@ class ZipFS(FS):
"""A FileSystem that represents a zip file."""
def
__init__
(
self
,
zip_file
,
mode
=
"r"
,
compression
=
"deflated"
,
allowZip64
=
False
,
thread_synchronize
=
True
):
def
__init__
(
self
,
zip_file
,
mode
=
"r"
,
compression
=
"deflated"
,
allowZip64
=
False
,
encoding
=
"CP437"
,
thread_synchronize
=
True
):
"""Create a FS that maps on to a zip file.
zip_file -- A (system) path, or a file-like object
mode -- Mode to open zip file: 'r' for reading, 'w' for writing or 'a' for appending
compression -- Can be 'deflated' (default) to compress data or 'stored' to just store date
allowZip64 -- Set to True to use zip files greater than 2 MB, default is False
encoding -- The encoding to use for unicode filenames
thread_synchronize -- Set to True (default) to enable thread-safety
"""
...
...
@@ -71,6 +72,7 @@ class ZipFS(FS):
raise
ValueError
(
"mode must be 'r', 'w' or 'a'"
)
self
.
zip_mode
=
mode
self
.
encoding
=
encoding
try
:
self
.
zf
=
ZipFile
(
zip_file
,
mode
,
compression_type
,
allowZip64
)
except
IOError
:
...
...
@@ -93,7 +95,7 @@ class ZipFS(FS):
def
_parse_resource_list
(
self
):
for
path
in
self
.
zf
.
namelist
():
self
.
_add_resource
(
path
)
self
.
_add_resource
(
path
.
decode
(
self
.
encoding
)
)
def
_add_resource
(
self
,
path
):
if
path
.
endswith
(
'/'
):
...
...
@@ -125,7 +127,7 @@ class ZipFS(FS):
if
self
.
zip_mode
not
in
'ra'
:
raise
OperationFailedError
(
"open file"
,
path
=
path
,
msg
=
"Zip file must be opened for reading ('r') or appending ('a')"
)
try
:
contents
=
self
.
zf
.
read
(
path
)
contents
=
self
.
zf
.
read
(
path
.
encode
(
self
.
encoding
)
)
except
KeyError
:
raise
ResourceNotFoundError
(
path
)
return
StringIO
(
contents
)
...
...
@@ -148,7 +150,7 @@ class ZipFS(FS):
raise
ResourceNotFoundError
(
path
)
path
=
normpath
(
path
)
try
:
contents
=
self
.
zf
.
read
(
path
)
contents
=
self
.
zf
.
read
(
path
.
encode
(
self
.
encoding
)
)
except
KeyError
:
raise
ResourceNotFoundError
(
path
)
except
RuntimeError
:
...
...
@@ -158,7 +160,7 @@ class ZipFS(FS):
@synchronize
def
_on_write_close
(
self
,
filename
):
sys_path
=
self
.
temp_fs
.
getsyspath
(
filename
)
self
.
zf
.
write
(
sys_path
,
filename
)
self
.
zf
.
write
(
sys_path
,
filename
.
encode
(
self
.
encoding
)
)
def
desc
(
self
,
path
):
if
self
.
isdir
(
path
):
...
...
@@ -195,7 +197,7 @@ class ZipFS(FS):
return
ResourceNotFoundError
(
path
)
path
=
normpath
(
path
)
.
lstrip
(
'/'
)
try
:
zi
=
self
.
zf
.
getinfo
(
path
)
zi
=
self
.
zf
.
getinfo
(
path
.
encode
(
self
.
encoding
)
)
zinfo
=
dict
((
attrib
,
getattr
(
zi
,
attrib
))
for
attrib
in
dir
(
zi
)
if
not
attrib
.
startswith
(
'_'
))
except
KeyError
:
zinfo
=
{
'file_size'
:
0
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment