Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
P
pyfs
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
OpenEdx
pyfs
Commits
21dff279
Commit
21dff279
authored
Sep 27, 2010
by
rfkelly0
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
make listdirinfo() an official part of the FS API
parent
d4e33c63
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
174 additions
and
71 deletions
+174
-71
ChangeLog
+2
-0
docs/interface.rst
+1
-0
fs/base.py
+5
-6
fs/contrib/davfs/__init__.py
+60
-14
fs/expose/dokan/__init__.py
+9
-25
fs/expose/fuse/__init__.py
+4
-13
fs/s3fs.py
+20
-12
fs/tests/__init__.py
+67
-1
fs/tests/test_expose.py
+6
-0
No files found.
ChangeLog
View file @
21dff279
...
...
@@ -40,6 +40,8 @@
* dokan: mount an FS object as a drive using Dokan (win32-only)
* Modified listdir and walk methods to accept callables as well as strings
for wildcards.
* Added listdirinfo method, which retrieves both the entry names and the
corresponding info dicts in a single operation.
* Fixed operation of OSFS on win32 when it points to the root of a drive.
* Made SubFS a subclass of WrapFS, and moved it into its own module at
fs.wrapfs.subfs.
...
...
docs/interface.rst
View file @
21dff279
...
...
@@ -34,6 +34,7 @@ The following methods have default implementations in fs.base.FS and aren't requ
* :meth:`~fs.base.FS.copydir` Recursively copy a directory to a new location
* :meth:`~fs.base.FS.desc` Return a short destriptive text regarding a path
* :meth:`~fs.base.FS.exists` Check whether a path exists as file or directory
* :meth:`~fs.base.FS.listdirinfo` Get a directory listing along with the info dict for each entry
* :meth:`~fs.base.FS.getsyspath` Get a file's name in the local filesystem, if possible
* :meth:`~fs.base.FS.hassyspath` Check if a path maps to a system path (recognised by the OS)
* :meth:`~fs.base.FS.move` Move a file to a new location
...
...
fs/base.py
View file @
21dff279
...
...
@@ -323,17 +323,16 @@ class FS(object):
"""
def
get_path
(
p
):
if
not
full
or
absolute
:
return
pathjoin
(
path
,
p
)
def
getinfo
(
p
):
try
:
return
self
.
getinfo
(
get_path
(
p
))
if
full
or
absolute
:
return
self
.
getinfo
(
p
)
else
:
return
self
.
getinfo
(
pathjoin
(
path
,
p
))
except
FSError
:
return
{}
return
[(
p
,
getinfo
(
get_path
(
p
)
))
return
[(
p
,
getinfo
(
p
))
for
p
in
self
.
listdir
(
path
,
wildcard
=
wildcard
,
full
=
full
,
...
...
fs/contrib/davfs/__init__.py
View file @
21dff279
...
...
@@ -27,6 +27,7 @@ import base64
import
re
import
datetime
import
cookielib
import
fnmatch
from
fs.base
import
*
from
fs.path
import
*
...
...
@@ -348,11 +349,53 @@ class DAVFS(FS):
finally
:
response
.
close
()
def
listdir
(
self
,
path
=
"./"
,
wildcard
=
None
,
full
=
False
,
absolute
=
False
,
info
=
False
,
dirs_only
=
False
,
files_only
=
False
):
if
info
:
pf
=
propfind
(
prop
=
"<prop xmlns='DAV:'><resourcetype /><getcontentlength /><getlastmodified /><getetag /></prop>"
)
else
:
pf
=
propfind
(
prop
=
"<prop xmlns='DAV:'><resourcetype /></prop>"
)
def
listdir
(
self
,
path
=
"./"
,
wildcard
=
None
,
full
=
False
,
absolute
=
False
,
dirs_only
=
False
,
files_only
=
False
):
pf
=
propfind
(
prop
=
"<prop xmlns='DAV:'><resourcetype /></prop>"
)
response
=
self
.
_request
(
path
,
"PROPFIND"
,
pf
.
render
(),{
"Depth"
:
"1"
})
try
:
if
response
.
status
==
404
:
raise
ResourceNotFoundError
(
path
)
if
response
.
status
!=
207
:
raise_generic_error
(
response
,
"listdir"
,
path
)
entries
=
[]
msres
=
multistatus
.
parse
(
response
.
read
())
dir_ok
=
False
for
res
in
msres
.
responses
:
if
self
.
_isurl
(
path
,
res
.
href
):
# The directory itself, check it's actually a directory
for
ps
in
res
.
propstats
:
if
ps
.
props
.
getElementsByTagNameNS
(
"DAV:"
,
"collection"
):
dir_ok
=
True
break
else
:
nm
=
basename
(
self
.
_url2path
(
res
.
href
))
if
dirs_only
:
for
ps
in
res
.
propstats
:
if
ps
.
props
.
getElementsByTagNameNS
(
"DAV:"
,
"collection"
):
entries
.
append
(
nm
)
break
elif
files_only
:
for
ps
in
res
.
propstats
:
if
ps
.
props
.
getElementsByTagNameNS
(
"DAV:"
,
"collection"
):
break
else
:
entries
.
append
(
nm
)
else
:
entries
.
append
(
nm
)
if
not
dir_ok
:
raise
ResourceInvalidError
(
path
)
if
wildcard
is
not
None
:
entries
=
[
e
for
e
in
entries
if
fnmatch
.
fnmatch
(
e
,
wildcard
)]
if
full
:
entries
=
[
relpath
(
pathjoin
(
path
,
e
))
for
e
in
entries
]
elif
absolute
:
entries
=
[
abspath
(
pathjoin
(
path
,
e
))
for
e
in
entries
]
return
entries
finally
:
response
.
close
()
def
listdirinfo
(
self
,
path
=
"./"
,
wildcard
=
None
,
full
=
False
,
absolute
=
False
,
dirs_only
=
False
,
files_only
=
False
):
pf
=
propfind
(
prop
=
"<prop xmlns='DAV:'><resourcetype /><getcontentlength /><getlastmodified /><getetag /></prop>"
)
response
=
self
.
_request
(
path
,
"PROPFIND"
,
pf
.
render
(),{
"Depth"
:
"1"
})
try
:
if
response
.
status
==
404
:
...
...
@@ -372,28 +415,31 @@ class DAVFS(FS):
else
:
# An entry in the directory, check if it's of the
# appropriate type and add to entries list as required.
if
info
:
e_info
=
self
.
_info_from_propfind
(
res
)
e_info
[
"name"
]
=
basename
(
self
.
_url2path
(
res
.
href
))
else
:
# TODO: technically, should use displayname for this
e_info
=
basename
(
self
.
_url2path
(
res
.
href
))
info
=
self
.
_info_from_propfind
(
res
)
nm
=
basename
(
self
.
_url2path
(
res
.
href
))
if
dirs_only
:
for
ps
in
res
.
propstats
:
if
ps
.
props
.
getElementsByTagNameNS
(
"DAV:"
,
"collection"
):
entries
.
append
(
e_info
)
entries
.
append
(
(
nm
,
info
)
)
break
elif
files_only
:
for
ps
in
res
.
propstats
:
if
ps
.
props
.
getElementsByTagNameNS
(
"DAV:"
,
"collection"
):
break
else
:
entries
.
append
(
e_info
)
entries
.
append
(
(
nm
,
info
)
)
else
:
entries
.
append
(
e_info
)
entries
.
append
(
(
nm
,
info
)
)
if
not
dir_ok
:
raise
ResourceInvalidError
(
path
)
return
self
.
_listdir_helper
(
path
,
entries
,
wildcard
,
full
,
absolute
,
False
,
False
)
if
wildcard
is
not
None
:
entries
=
[(
e
,
info
)
for
(
e
,
info
)
in
entries
if
fnmatch
.
fnmatch
(
e
,
wildcard
)]
if
full
:
entries
=
[(
relpath
(
pathjoin
(
path
,
e
)),
info
)
for
(
e
,
info
)
in
entries
]
elif
absolute
:
entries
=
[(
abspath
(
pathjoin
(
path
,
e
)),
info
)
for
(
e
,
info
)
in
entries
]
return
entries
finally
:
response
.
close
()
...
...
fs/expose/dokan/__init__.py
View file @
21dff279
...
...
@@ -417,8 +417,6 @@ class FSOperations(object):
def
GetFileInformation
(
self
,
path
,
buffer
,
info
):
path
=
normpath
(
path
)
finfo
=
self
.
fs
.
getinfo
(
path
)
if
"name"
not
in
finfo
:
finfo
[
"name"
]
=
basename
(
path
)
data
=
buffer
.
contents
self
.
_info2finddataw
(
path
,
finfo
,
data
,
info
)
try
:
...
...
@@ -434,37 +432,23 @@ class FSOperations(object):
@handle_fs_errors
def
FindFiles
(
self
,
path
,
fillFindData
,
info
):
path
=
normpath
(
path
)
for
nm
in
self
.
fs
.
listdir
(
path
):
for
(
nm
,
finfo
)
in
self
.
fs
.
listdirinfo
(
path
):
fpath
=
pathjoin
(
path
,
nm
)
if
self
.
_is_pending_delete
(
fpath
):
continue
data
=
self
.
_info2finddataw
(
fpath
,
self
.
fs
.
getinfo
(
fpath
)
)
data
=
self
.
_info2finddataw
(
fpath
,
finfo
)
fillFindData
(
ctypes
.
byref
(
data
),
info
)
@handle_fs_errors
def
FindFilesWithPattern
(
self
,
path
,
pattern
,
fillFindData
,
info
):
path
=
normpath
(
path
)
infolist
=
[]
try
:
for
finfo
in
self
.
fs
.
listdir
(
path
,
info
=
True
):
nm
=
finfo
[
"name"
]
if
self
.
_is_pending_delete
(
pathjoin
(
path
,
nm
)):
continue
if
not
libdokan
.
DokanIsNameInExpression
(
pattern
,
nm
,
True
):
continue
infolist
.
append
(
finfo
)
except
(
TypeError
,
KeyError
,
UnsupportedError
):
filtered
=
True
for
nm
in
self
.
fs
.
listdir
(
path
):
if
self
.
_is_pending_delete
(
pathjoin
(
path
,
nm
)):
continue
if
not
libdokan
.
DokanIsNameInExpression
(
pattern
,
nm
,
True
):
continue
finfo
=
self
.
fs
.
getinfo
(
pathjoin
(
path
,
nm
))
finfo
[
"name"
]
=
nm
infolist
.
append
(
finfo
)
for
finfo
in
infolist
:
fpath
=
pathjoin
(
path
,
finfo
[
"name"
])
for
(
nm
,
finfo
)
in
self
.
fs
.
listdirinfo
(
path
):
fpath
=
pathjoin
(
path
,
nm
)
if
self
.
_is_pending_delete
(
fpath
):
continue
if
not
libdokan
.
DokanIsNameInExpression
(
pattern
,
nm
,
True
):
continue
data
=
self
.
_info2finddataw
(
fpath
,
finfo
,
None
)
fillFindData
(
ctypes
.
byref
(
data
),
info
)
...
...
@@ -584,7 +568,7 @@ class FSOperations(object):
data
.
ftWriteTime
=
_datetime2filetime
(
info
.
get
(
"modified_time"
,
None
))
data
.
nFileSizeHigh
=
info
.
get
(
"size"
,
0
)
>>
32
data
.
nFileSizeLow
=
info
.
get
(
"size"
,
0
)
&
0xffffffff
data
.
cFileName
=
info
.
get
(
"name"
,
""
)
data
.
cFileName
=
basename
(
path
)
data
.
cAlternateFileName
=
""
return
data
...
...
fs/expose/fuse/__init__.py
View file @
21dff279
...
...
@@ -245,19 +245,10 @@ class FSOperations(Operations):
@handle_fs_errors
def
readdir
(
self
,
path
,
fh
=
None
):
path
=
path
.
decode
(
NATIVE_ENCODING
)
# If listdir() can return info dicts directly, it will save FUSE
# having to call getinfo() on each entry individually.
try
:
entries
=
self
.
fs
.
listdir
(
path
,
info
=
True
)
except
TypeError
:
entries
=
[]
for
name
in
self
.
fs
.
listdir
(
path
):
name
=
name
.
encode
(
NATIVE_ENCODING
)
entries
.
append
(
name
)
else
:
entries
=
[(
e
[
"name"
]
.
encode
(
NATIVE_ENCODING
),
e
,
0
)
for
e
in
entries
]
for
(
name
,
attrs
,
offset
)
in
entries
:
self
.
_fill_stat_dict
(
pathjoin
(
path
,
name
.
decode
(
NATIVE_ENCODING
)),
attrs
)
entries
=
[]
for
(
nm
,
info
)
in
self
.
fs
.
listdirinfo
(
path
):
self
.
_fill_stat_dict
(
pathjoin
(
path
,
nm
),
info
)
entries
.
append
((
nm
.
encode
(
NATIVE_ENCODING
),
info
,
0
))
entries
=
[
"."
,
".."
]
+
entries
return
entries
...
...
fs/s3fs.py
View file @
21dff279
...
...
@@ -257,8 +257,18 @@ class S3FS(FS):
return
True
return
False
def
listdir
(
self
,
path
=
"./"
,
wildcard
=
None
,
full
=
False
,
absolute
=
False
,
info
=
False
,
dirs_only
=
False
,
files_only
=
False
):
def
listdir
(
self
,
path
=
"./"
,
wildcard
=
None
,
full
=
False
,
absolute
=
False
,
dirs_only
=
False
,
files_only
=
False
):
"""List contents of a directory."""
keys
=
self
.
_list_keys
(
self
,
path
)
entries
=
self
.
_filter_keys
(
path
,
keys
,
wildcard
,
full
,
absolute
,
dirs_only
,
files_only
)
return
[
nm
for
(
nm
,
k
)
in
entries
]
def
listdirinfo
(
self
,
path
=
"./"
,
wildcard
=
None
,
full
=
False
,
absolute
=
False
,
dirs_only
=
False
,
files_only
=
False
):
keys
=
self
.
_list_keys
(
self
,
path
)
entries
=
self
.
_listdir_helper
(
path
,
keys
,
wildcard
,
full
,
absolute
,
dirs_only
,
files_only
)
return
[(
nm
,
self
.
_get_key_info
(
k
))
for
(
nm
,
k
)
in
entries
]
def
_list_keys
(
self
,
path
):
s3path
=
self
.
_s3path
(
path
)
+
self
.
_separator
if
s3path
==
"/"
:
s3path
=
""
...
...
@@ -277,34 +287,32 @@ class S3FS(FS):
if
self
.
isfile
(
path
):
raise
ResourceInvalidError
(
path
,
msg
=
"that's not a directory:
%(path)
s"
)
raise
ResourceNotFoundError
(
path
)
return
self
.
_listdir_helper
(
path
,
keys
,
wildcard
,
full
,
absolute
,
info
,
dirs_only
,
files_only
)
return
keys
def
_listdir_helper
(
self
,
path
,
keys
,
wildcard
,
full
,
absolute
,
info
,
dirs_only
,
files_only
):
"""Modify listdir helper to avoid additional calls to the server."""
def
_filter_keys
(
self
,
path
,
keys
,
wildcard
,
full
,
absolute
,
info
,
dirs_only
,
files_only
):
"""Filter out keys not matching the given criteria.
Returns a list of (name,key) pairs.
"""
if
dirs_only
and
files_only
:
raise
ValueError
(
"dirs_only and files_only can not both be True"
)
if
dirs_only
:
keys
=
[
k
for
k
in
keys
if
k
.
name
.
endswith
(
self
.
_separator
)]
elif
files_only
:
keys
=
[
k
for
k
in
keys
if
not
k
.
name
.
endswith
(
self
.
_separator
)]
for
k
in
keys
:
if
k
.
name
.
endswith
(
self
.
_separator
):
k
.
name
=
k
.
name
[:
-
1
]
if
type
(
path
)
is
not
unicode
:
k
.
name
=
k
.
name
.
encode
()
if
wildcard
is
not
None
:
keys
=
[
k
for
k
in
keys
if
fnmatch
.
fnmatch
(
k
.
name
,
wildcard
)]
if
full
:
entries
=
[
relpath
(
pathjoin
(
path
,
k
.
name
)
)
for
k
in
keys
]
entries
=
[
(
relpath
(
pathjoin
(
path
,
k
.
name
)),
k
)
for
k
in
keys
]
elif
absolute
:
entries
=
[
abspath
(
pathjoin
(
path
,
k
.
name
))
for
k
in
keys
]
elif
info
:
entries
=
[
self
.
_get_key_info
(
k
)
for
k
in
keys
]
entries
=
[(
abspath
(
pathjoin
(
path
,
k
.
name
)),
k
)
for
k
in
keys
]
else
:
entries
=
[
k
.
name
for
k
in
keys
]
entries
=
[
(
k
.
name
,
k
)
for
k
in
keys
]
return
entries
def
makedir
(
self
,
path
,
recursive
=
False
,
allow_recreate
=
False
):
...
...
fs/tests/__init__.py
View file @
21dff279
...
...
@@ -71,13 +71,14 @@ class FSTestCases(object):
def
test_open_on_directory
(
self
):
self
.
fs
.
makedir
(
"testdir"
)
try
:
self
.
fs
.
open
(
"testdir"
)
f
=
self
.
fs
.
open
(
"testdir"
)
except
ResourceInvalidError
:
pass
except
Exception
:
ecls
=
sys
.
exc_info
[
0
]
assert
False
,
"
%
s raised instead of ResourceInvalidError"
%
(
ecls
,)
else
:
f
.
close
()
assert
False
,
"ResourceInvalidError was not raised"
def
test_writefile
(
self
):
...
...
@@ -174,6 +175,71 @@ class FSTestCases(object):
self
.
assertRaises
(
ResourceNotFoundError
,
self
.
fs
.
listdir
,
"zebra"
)
self
.
assertRaises
(
ResourceInvalidError
,
self
.
fs
.
listdir
,
"foo"
)
def
test_listdirinfo
(
self
):
def
check_unicode
(
items
):
for
(
nm
,
info
)
in
items
:
self
.
assertTrue
(
isinstance
(
nm
,
unicode
))
def
check_equal
(
items
,
target
):
names
=
[
nm
for
(
nm
,
info
)
in
items
]
self
.
assertEqual
(
sorted
(
names
),
sorted
(
target
))
self
.
fs
.
createfile
(
u"a"
)
self
.
fs
.
createfile
(
"b"
)
self
.
fs
.
createfile
(
"foo"
)
self
.
fs
.
createfile
(
"bar"
)
# Test listing of the root directory
d1
=
self
.
fs
.
listdirinfo
()
self
.
assertEqual
(
len
(
d1
),
4
)
check_equal
(
d1
,
[
u"a"
,
u"b"
,
u"bar"
,
u"foo"
])
check_unicode
(
d1
)
d1
=
self
.
fs
.
listdirinfo
(
""
)
self
.
assertEqual
(
len
(
d1
),
4
)
check_equal
(
d1
,
[
u"a"
,
u"b"
,
u"bar"
,
u"foo"
])
check_unicode
(
d1
)
d1
=
self
.
fs
.
listdirinfo
(
"/"
)
self
.
assertEqual
(
len
(
d1
),
4
)
check_equal
(
d1
,
[
u"a"
,
u"b"
,
u"bar"
,
u"foo"
])
check_unicode
(
d1
)
# Test listing absolute paths
d2
=
self
.
fs
.
listdirinfo
(
absolute
=
True
)
self
.
assertEqual
(
len
(
d2
),
4
)
check_equal
(
d2
,
[
u"/a"
,
u"/b"
,
u"/bar"
,
u"/foo"
])
check_unicode
(
d2
)
# Create some deeper subdirectories, to make sure their
# contents are not inadvertantly included
self
.
fs
.
makedir
(
"p/1/2/3"
,
recursive
=
True
)
self
.
fs
.
createfile
(
"p/1/2/3/a"
)
self
.
fs
.
createfile
(
"p/1/2/3/b"
)
self
.
fs
.
createfile
(
"p/1/2/3/foo"
)
self
.
fs
.
createfile
(
"p/1/2/3/bar"
)
self
.
fs
.
makedir
(
"q"
)
# Test listing just files, just dirs, and wildcards
dirs_only
=
self
.
fs
.
listdirinfo
(
dirs_only
=
True
)
files_only
=
self
.
fs
.
listdirinfo
(
files_only
=
True
)
contains_a
=
self
.
fs
.
listdirinfo
(
wildcard
=
"*a*"
)
check_equal
(
dirs_only
,
[
u"p"
,
u"q"
])
check_equal
(
files_only
,
[
u"a"
,
u"b"
,
u"bar"
,
u"foo"
])
check_equal
(
contains_a
,
[
u"a"
,
u"bar"
])
check_unicode
(
dirs_only
)
check_unicode
(
files_only
)
check_unicode
(
contains_a
)
# Test listing a subdirectory
d3
=
self
.
fs
.
listdirinfo
(
"p/1/2/3"
)
self
.
assertEqual
(
len
(
d3
),
4
)
check_equal
(
d3
,
[
u"a"
,
u"b"
,
u"bar"
,
u"foo"
])
check_unicode
(
d3
)
# Test listing a subdirectory with absoliute and full paths
d4
=
self
.
fs
.
listdirinfo
(
"p/1/2/3"
,
absolute
=
True
)
self
.
assertEqual
(
len
(
d4
),
4
)
check_equal
(
d4
,
[
u"/p/1/2/3/a"
,
u"/p/1/2/3/b"
,
u"/p/1/2/3/bar"
,
u"/p/1/2/3/foo"
])
check_unicode
(
d4
)
d4
=
self
.
fs
.
listdirinfo
(
"p/1/2/3"
,
full
=
True
)
self
.
assertEqual
(
len
(
d4
),
4
)
check_equal
(
d4
,
[
u"p/1/2/3/a"
,
u"p/1/2/3/b"
,
u"p/1/2/3/bar"
,
u"p/1/2/3/foo"
])
check_unicode
(
d4
)
# Test that appropriate errors are raised
self
.
assertRaises
(
ResourceNotFoundError
,
self
.
fs
.
listdirinfo
,
"zebra"
)
self
.
assertRaises
(
ResourceInvalidError
,
self
.
fs
.
listdirinfo
,
"foo"
)
def
test_unicode
(
self
):
alpha
=
u"
\N{GREEK SMALL LETTER ALPHA}
"
beta
=
u"
\N{GREEK SMALL LETTER BETA}
"
...
...
fs/tests/test_expose.py
View file @
21dff279
...
...
@@ -158,11 +158,17 @@ if dokan.is_available:
self
.
fs
.
remove
(
"dir1/a.txt"
)
self
.
assertFalse
(
self
.
check
(
"/dir1/a.txt"
))
def
test_open_on_directory
(
self
):
# Dokan seems quite happy to ask me to open a directory and
# then treat it like a file.
pass
def
test_settimes
(
self
):
# Setting the times does actually work, but there's some sort
# of caching effect which prevents them from being read back
# out. Disabling the test for now.
pass
class
TestDokan
(
unittest
.
TestCase
,
DokanTestCases
,
ThreadingTestCases
):
def
setUp
(
self
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment