Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
P
pyfs
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
OpenEdx
pyfs
Commits
9ff3da0c
Commit
9ff3da0c
authored
Dec 01, 2012
by
willmcgugan@gmail.com
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Implemented generic validatepath method and optimized normpath
parent
eac17257
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
127 additions
and
58 deletions
+127
-58
fs/base.py
+33
-0
fs/errors.py
+8
-1
fs/osfs/__init__.py
+3
-9
fs/path.py
+41
-43
fs/tempfs.py
+6
-0
fs/tests/__init__.py
+10
-1
fs/tests/test_fs.py
+9
-0
fs/tests/test_path.py
+9
-0
fs/wrapfs/__init__.py
+8
-4
No files found.
fs/base.py
View file @
9ff3da0c
...
...
@@ -252,6 +252,7 @@ class FS(object):
* *free_space* The free space (in bytes) available on the file system
* *total_space* The total space (in bytes) available on the file system
* *virtual* True if the filesystem defers to other filesystems
* *invalid_path_chars* A string containing characters that may not be used in paths
FS implementations may expose non-generic meta data through a self-named namespace. e.g. ``"somefs.some_meta"``
...
...
@@ -282,6 +283,38 @@ class FS(object):
return
False
return
True
def
validatepath
(
self
,
path
):
"""Validate an fs path, throws an :class:`~fs.errors.InvalidPathError` exception if validation fails.
A path is invalid if it fails to map to a path on the underlaying filesystem. The default
implementation checks for the presence of any of the characters in the meta value 'invalid_path_chars',
but implementations may have other requirements for paths.
:param path: an fs path to validatepath
:raises `fs.errors.InvalidPathError`: if `path` does not map on to a valid path on this filesystem
"""
invalid_chars
=
self
.
getmeta
(
'invalid_path_chars'
,
default
=
None
)
if
invalid_chars
:
re_invalid_chars
=
getattr
(
self
,
'_re_invalid_chars'
,
None
)
if
re_invalid_chars
is
None
:
self
.
_re_invalid_chars
=
re_invalid_chars
=
re
.
compile
(
'|'
.
join
(
re
.
escape
(
c
)
for
c
in
invalid_chars
),
re
.
UNICODE
)
if
re_invalid_chars
.
search
(
path
):
raise
InvalidCharsInPathError
(
path
)
def
isvalidpath
(
self
,
path
):
"""Check if a path is valid on this filesystem
:param path: an fs path
"""
try
:
self
.
validatepath
(
path
)
except
InvalidPathError
:
return
False
else
:
return
True
def
getsyspath
(
self
,
path
,
allow_none
=
False
):
"""Returns the system path (a path recognized by the OS) if one is present.
...
...
fs/errors.py
View file @
9ff3da0c
...
...
@@ -11,6 +11,7 @@ catch-all exception.
__all__
=
[
'FSError'
,
'CreateFailedError'
,
'PathError'
,
'InvalidPathError'
,
'InvalidCharsInPathError'
,
'OperationFailedError'
,
'UnsupportedError'
,
...
...
@@ -83,7 +84,13 @@ class PathError(FSError):
super
(
PathError
,
self
)
.
__init__
(
**
kwds
)
class
InvalidCharsInPathError
(
PathError
):
class
InvalidPathError
(
PathError
):
"""Base exception for fs paths that can't be mapped on to the underlaying filesystem."""
default_message
=
"Path is invalid on this filesystem
%(path)
s"
class
InvalidCharsInPathError
(
InvalidPathError
):
"""The path contains characters that are invalid on this filesystem"""
default_message
=
"Path contains invalid characters:
%(path)
s"
...
...
fs/osfs/__init__.py
View file @
9ff3da0c
...
...
@@ -88,10 +88,9 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
}
if
platform
.
system
()
==
'Windows'
:
_
invalid_path_chars
=
''
.
join
(
chr
(
n
)
for
n
in
xrange
(
31
))
+
'
\\
:*?"<>|'
_
meta
[
"invalid_path_chars"
]
=
''
.
join
(
chr
(
n
)
for
n
in
xrange
(
31
))
+
'
\\
:*?"<>|'
else
:
_invalid_path_chars
=
'
\0
'
_re_invalid_path_chars
=
re
.
compile
(
'|'
.
join
(
re
.
escape
(
c
)
for
c
in
_invalid_path_chars
),
re
.
UNICODE
)
_meta
[
"invalid_path_chars"
]
=
'
\0
'
def
__init__
(
self
,
root_path
,
thread_synchronize
=
_thread_synchronize_default
,
encoding
=
None
,
create
=
False
,
dir_mode
=
0700
,
use_long_paths
=
True
):
"""
...
...
@@ -153,13 +152,8 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
return
p
return
p
.
decode
(
self
.
encoding
,
'replace'
)
def
_validate_path
(
self
,
path
):
"""Raise an error if there are any invalid characters in the path"""
if
self
.
_re_invalid_path_chars
.
search
(
path
):
raise
InvalidCharsInPathError
(
path
)
def
getsyspath
(
self
,
path
,
allow_none
=
False
):
self
.
_validate_
path
(
path
)
self
.
validate
path
(
path
)
path
=
relpath
(
normpath
(
path
))
.
replace
(
u"/"
,
os
.
sep
)
path
=
os
.
path
.
join
(
self
.
root_path
,
path
)
if
not
path
.
startswith
(
self
.
root_path
):
...
...
fs/path.py
View file @
9ff3da0c
...
...
@@ -14,7 +14,9 @@ import re
import
os
_requires_normalization
=
re
.
compile
(
r'/\.\.|\./|\.|//'
)
.
search
#_requires_normalization = re.compile(r'/\.\.|\./|\.|//').search
# New improved re that avoids normalizing paths that don't need it - WM
_requires_normalization
=
re
.
compile
(
r'/\.\.|\./|^\.$|\.$|//'
)
.
search
def
normpath
(
path
):
...
...
@@ -75,11 +77,6 @@ else:
return
path
def
normospath
(
path
):
"""Normalizes a path with os separators"""
return
normpath
(
ospath
(
path
))
def
iteratepath
(
path
,
numsplits
=
None
):
"""Iterate over the individual components of a path.
...
...
@@ -374,7 +371,7 @@ def isprefix(path1, path2):
bits1
.
pop
()
if
len
(
bits1
)
>
len
(
bits2
):
return
False
for
(
bit1
,
bit2
)
in
zip
(
bits1
,
bits2
):
for
(
bit1
,
bit2
)
in
zip
(
bits1
,
bits2
):
if
bit1
!=
bit2
:
return
False
return
True
...
...
@@ -434,7 +431,7 @@ class PathMap(object):
def
__init__
(
self
):
self
.
_map
=
{}
def
__getitem__
(
self
,
path
):
def
__getitem__
(
self
,
path
):
"""Get the value stored under the given path."""
m
=
self
.
_map
for
name
in
iteratepath
(
path
):
...
...
@@ -447,7 +444,7 @@ class PathMap(object):
except
KeyError
:
raise
KeyError
(
path
)
def
__contains__
(
self
,
path
):
def
__contains__
(
self
,
path
):
"""Check whether the given path has a value stored in the map."""
try
:
self
[
path
]
...
...
@@ -456,22 +453,22 @@ class PathMap(object):
else
:
return
True
def
__setitem__
(
self
,
path
,
value
):
def
__setitem__
(
self
,
path
,
value
):
"""Set the value stored under the given path."""
m
=
self
.
_map
for
name
in
iteratepath
(
path
):
try
:
m
=
m
[
name
]
except
KeyError
:
m
=
m
.
setdefault
(
name
,{})
m
=
m
.
setdefault
(
name
,
{})
m
[
""
]
=
value
def
__delitem__
(
self
,
path
):
def
__delitem__
(
self
,
path
):
"""Delete the value stored under the given path."""
ms
=
[[
self
.
_map
,
None
]]
ms
=
[[
self
.
_map
,
None
]]
for
name
in
iteratepath
(
path
):
try
:
ms
.
append
([
ms
[
-
1
][
0
][
name
],
None
])
ms
.
append
([
ms
[
-
1
][
0
][
name
],
None
])
except
KeyError
:
raise
KeyError
(
path
)
else
:
...
...
@@ -485,19 +482,19 @@ class PathMap(object):
del
ms
[
-
1
]
del
ms
[
-
1
][
0
][
ms
[
-
1
][
1
]]
def
get
(
self
,
path
,
default
=
None
):
def
get
(
self
,
path
,
default
=
None
):
"""Get the value stored under the given path, or the given default."""
try
:
return
self
[
path
]
except
KeyError
:
return
default
def
pop
(
self
,
path
,
default
=
None
):
def
pop
(
self
,
path
,
default
=
None
):
"""Pop the value stored under the given path, or the given default."""
ms
=
[[
self
.
_map
,
None
]]
ms
=
[[
self
.
_map
,
None
]]
for
name
in
iteratepath
(
path
):
try
:
ms
.
append
([
ms
[
-
1
][
0
][
name
],
None
])
ms
.
append
([
ms
[
-
1
][
0
][
name
],
None
])
except
KeyError
:
return
default
else
:
...
...
@@ -512,16 +509,16 @@ class PathMap(object):
del
ms
[
-
1
][
0
][
ms
[
-
1
][
1
]]
return
val
def
setdefault
(
self
,
path
,
value
):
def
setdefault
(
self
,
path
,
value
):
m
=
self
.
_map
for
name
in
iteratepath
(
path
):
try
:
m
=
m
[
name
]
except
KeyError
:
m
=
m
.
setdefault
(
name
,{})
return
m
.
setdefault
(
""
,
value
)
m
=
m
.
setdefault
(
name
,
{})
return
m
.
setdefault
(
""
,
value
)
def
clear
(
self
,
root
=
"/"
):
def
clear
(
self
,
root
=
"/"
):
"""Clear all entries beginning with the given root path."""
m
=
self
.
_map
for
name
in
iteratepath
(
root
):
...
...
@@ -531,7 +528,7 @@ class PathMap(object):
return
m
.
clear
()
def
iterkeys
(
self
,
root
=
"/"
,
m
=
None
):
def
iterkeys
(
self
,
root
=
"/"
,
m
=
None
):
"""Iterate over all keys beginning with the given root path."""
if
m
is
None
:
m
=
self
.
_map
...
...
@@ -540,12 +537,12 @@ class PathMap(object):
m
=
m
[
name
]
except
KeyError
:
return
for
(
nm
,
subm
)
in
m
.
iteritems
():
for
(
nm
,
subm
)
in
m
.
iteritems
():
if
not
nm
:
yield
abspath
(
root
)
else
:
k
=
pathcombine
(
root
,
nm
)
for
subk
in
self
.
iterkeys
(
k
,
subm
):
k
=
pathcombine
(
root
,
nm
)
for
subk
in
self
.
iterkeys
(
k
,
subm
):
yield
subk
def
__iter__
(
self
):
...
...
@@ -554,7 +551,7 @@ class PathMap(object):
def
keys
(
self
,
root
=
"/"
):
return
list
(
self
.
iterkeys
(
root
))
def
itervalues
(
self
,
root
=
"/"
,
m
=
None
):
def
itervalues
(
self
,
root
=
"/"
,
m
=
None
):
"""Iterate over all values whose keys begin with the given root path."""
root
=
normpath
(
root
)
if
m
is
None
:
...
...
@@ -564,18 +561,18 @@ class PathMap(object):
m
=
m
[
name
]
except
KeyError
:
return
for
(
nm
,
subm
)
in
m
.
iteritems
():
for
(
nm
,
subm
)
in
m
.
iteritems
():
if
not
nm
:
yield
subm
else
:
k
=
pathcombine
(
root
,
nm
)
for
subv
in
self
.
itervalues
(
k
,
subm
):
k
=
pathcombine
(
root
,
nm
)
for
subv
in
self
.
itervalues
(
k
,
subm
):
yield
subv
def
values
(
self
,
root
=
"/"
):
def
values
(
self
,
root
=
"/"
):
return
list
(
self
.
itervalues
(
root
))
def
iteritems
(
self
,
root
=
"/"
,
m
=
None
):
def
iteritems
(
self
,
root
=
"/"
,
m
=
None
):
"""Iterate over all (key,value) pairs beginning with the given root."""
root
=
normpath
(
root
)
if
m
is
None
:
...
...
@@ -585,18 +582,18 @@ class PathMap(object):
m
=
m
[
name
]
except
KeyError
:
return
for
(
nm
,
subm
)
in
m
.
iteritems
():
for
(
nm
,
subm
)
in
m
.
iteritems
():
if
not
nm
:
yield
(
abspath
(
normpath
(
root
)),
subm
)
yield
(
abspath
(
normpath
(
root
)),
subm
)
else
:
k
=
pathcombine
(
root
,
nm
)
for
(
subk
,
subv
)
in
self
.
iteritems
(
k
,
subm
):
yield
(
subk
,
subv
)
k
=
pathcombine
(
root
,
nm
)
for
(
subk
,
subv
)
in
self
.
iteritems
(
k
,
subm
):
yield
(
subk
,
subv
)
def
items
(
self
,
root
=
"/"
):
def
items
(
self
,
root
=
"/"
):
return
list
(
self
.
iteritems
(
root
))
def
iternames
(
self
,
root
=
"/"
):
def
iternames
(
self
,
root
=
"/"
):
"""Iterate over all names beneath the given root path.
This is basically the equivalent of listdir() for a PathMap - it yields
...
...
@@ -608,15 +605,17 @@ class PathMap(object):
m
=
m
[
name
]
except
KeyError
:
return
for
(
nm
,
subm
)
in
m
.
iteritems
():
for
(
nm
,
subm
)
in
m
.
iteritems
():
if
nm
and
subm
:
yield
nm
def
names
(
self
,
root
=
"/"
):
def
names
(
self
,
root
=
"/"
):
return
list
(
self
.
iternames
(
root
))
_wild_chars
=
frozenset
(
'*?[]!{}'
)
def
iswildcard
(
path
):
"""Check if a path ends with a wildcard
...
...
@@ -627,8 +626,7 @@ def iswildcard(path):
"""
assert
path
is
not
None
base_chars
=
frozenset
(
basename
(
path
))
return
bool
(
base_chars
.
intersection
(
_wild_chars
))
return
not
_wild_chars
.
isdisjoint
(
path
)
if
__name__
==
"__main__"
:
print
recursepath
(
'a/b/c'
)
fs/tempfs.py
View file @
9ff3da0c
...
...
@@ -10,6 +10,7 @@ import os
import
os.path
import
time
import
tempfile
import
platform
from
fs.osfs
import
OSFS
from
fs.errors
import
*
...
...
@@ -35,6 +36,11 @@ class TempFS(OSFS):
'atomic.setcontents'
:
False
}
if
platform
.
system
()
==
'Windows'
:
_meta
[
"invalid_path_chars"
]
=
''
.
join
(
chr
(
n
)
for
n
in
xrange
(
31
))
+
'
\\
:*?"<>|'
else
:
_meta
[
"invalid_path_chars"
]
=
'
\0
'
def
__init__
(
self
,
identifier
=
None
,
temp_dir
=
None
,
dir_mode
=
0700
,
thread_synchronize
=
_thread_synchronize_default
):
"""Creates a temporary Filesystem
...
...
fs/tests/__init__.py
View file @
9ff3da0c
...
...
@@ -52,10 +52,19 @@ class FSTestCases(object):
"""Check that a file exists within self.fs"""
return
self
.
fs
.
exists
(
p
)
def
test_invalid_chars
(
self
):
"""Check paths validate ok"""
# Will have to be overriden selectively for custom validepath methods
self
.
assertEqual
(
self
.
fs
.
validatepath
(
''
),
None
)
self
.
assertEqual
(
self
.
fs
.
validatepath
(
'.foo'
),
None
)
self
.
assertEqual
(
self
.
fs
.
validatepath
(
'foo'
),
None
)
self
.
assertEqual
(
self
.
fs
.
validatepath
(
'foo/bar'
),
None
)
self
.
assert_
(
self
.
fs
.
isvalidpath
(
'foo/bar'
))
def
test_meta
(
self
):
"""Checks getmeta / hasmeta are functioning"""
# getmeta / hasmeta are hard to test, since there is no way to validate
# the implementations response
# the implementation
'
s response
meta_names
=
[
"read_only"
,
"network"
,
"unicode_paths"
]
...
...
fs/tests/test_fs.py
View file @
9ff3da0c
...
...
@@ -31,7 +31,16 @@ class TestOSFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
return
os
.
path
.
exists
(
os
.
path
.
join
(
self
.
temp_dir
,
relpath
(
p
)))
def
test_invalid_chars
(
self
):
self
.
assertEqual
(
self
.
fs
.
validatepath
(
''
),
None
)
self
.
assertEqual
(
self
.
fs
.
validatepath
(
'.foo'
),
None
)
self
.
assertEqual
(
self
.
fs
.
validatepath
(
'foo'
),
None
)
self
.
assertEqual
(
self
.
fs
.
validatepath
(
'foo/bar'
),
None
)
self
.
assert_
(
self
.
fs
.
isvalidpath
(
'foo/bar'
))
self
.
assertRaises
(
errors
.
InvalidCharsInPathError
,
self
.
fs
.
open
,
'invalid
\0
file'
,
'wb'
)
self
.
assertFalse
(
self
.
fs
.
isvalidpath
(
'invalid
\0
file'
))
self
.
assert_
(
self
.
fs
.
isvalidpath
(
'validfile'
))
self
.
assert_
(
self
.
fs
.
isvalidpath
(
'completely_valid/path/foo.bar'
))
class
TestSubFS
(
unittest
.
TestCase
,
FSTestCases
,
ThreadingTestCases
):
...
...
fs/tests/test_path.py
View file @
9ff3da0c
...
...
@@ -138,6 +138,15 @@ class TestPathFunctions(unittest.TestCase):
for
path
,
test_basename
in
tests
:
self
.
assertEqual
(
basename
(
path
),
test_basename
)
def
test_iswildcard
(
self
):
self
.
assert_
(
iswildcard
(
'*'
))
self
.
assert_
(
iswildcard
(
'*.jpg'
))
self
.
assert_
(
iswildcard
(
'foo/*'
))
self
.
assert_
(
iswildcard
(
'foo/{}'
))
self
.
assertFalse
(
iswildcard
(
'foo'
))
self
.
assertFalse
(
iswildcard
(
'img.jpg'
))
self
.
assertFalse
(
iswildcard
(
'foo/bar'
))
class
Test_PathMap
(
unittest
.
TestCase
):
...
...
fs/wrapfs/__init__.py
View file @
9ff3da0c
...
...
@@ -116,7 +116,7 @@ class WrapFS(FS):
transparent file compression - in this case files from the wrapped
FS cannot be opened in append mode.
"""
return
(
mode
,
mode
)
return
(
mode
,
mode
)
def
__unicode__
(
self
):
return
u"<
%
s:
%
s>"
%
(
self
.
__class__
.
__name__
,
self
.
wrapped_fs
,)
...
...
@@ -134,12 +134,16 @@ class WrapFS(FS):
return
self
.
wrapped_fs
.
hasmeta
(
meta_name
)
@rewrite_errors
def
validatepath
(
self
,
path
):
return
self
.
wrapped_fs
.
validatepath
(
self
.
_encode
(
path
))
@rewrite_errors
def
getsyspath
(
self
,
path
,
allow_none
=
False
):
return
self
.
wrapped_fs
.
getsyspath
(
self
.
_encode
(
path
),
allow_none
)
return
self
.
wrapped_fs
.
getsyspath
(
self
.
_encode
(
path
),
allow_none
)
@rewrite_errors
def
getpathurl
(
self
,
path
,
allow_none
=
False
):
return
self
.
wrapped_fs
.
getpathurl
(
self
.
_encode
(
path
),
allow_none
)
return
self
.
wrapped_fs
.
getpathurl
(
self
.
_encode
(
path
),
allow_none
)
@rewrite_errors
def
hassyspath
(
self
,
path
):
...
...
@@ -469,7 +473,7 @@ def wrap_fs_methods(decorator, cls=None, exclude=[]):
wrap_fs_methods
.
method_names
=
[
"open"
,
"exists"
,
"isdir"
,
"isfile"
,
"listdir"
,
"makedir"
,
"remove"
,
"setcontents"
,
"removedir"
,
"rename"
,
"getinfo"
,
"copy"
,
"move"
,
"copydir"
,
"movedir"
,
"close"
,
"getxattr"
,
"setxattr"
,
"delxattr"
,
"listxattrs"
,
"getsyspath"
,
"createfile"
,
"hasmeta"
,
"getmeta"
,
"listdirinfo"
,
"listxattrs"
,
"
validatepath"
,
"
getsyspath"
,
"createfile"
,
"hasmeta"
,
"getmeta"
,
"listdirinfo"
,
"ilistdir"
,
"ilistdirinfo"
]
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment