Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
P
pyfs
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
OpenEdx
pyfs
Commits
bc30657b
Commit
bc30657b
authored
Nov 24, 2012
by
willmcgugan@gmail.com
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Fixes for backslashes on Linux issue, see Issue #139
parent
2fbb136c
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
235 additions
and
211 deletions
+235
-211
fs/commands/runner.py
+102
-101
fs/errors.py
+7
-2
fs/osfs/__init__.py
+35
-36
fs/path.py
+84
-65
fs/tests/test_path.py
+7
-7
No files found.
fs/commands/runner.py
View file @
bc30657b
...
...
@@ -16,15 +16,15 @@ if platform.system() == 'Windows':
try
:
## {{{ http://code.activestate.com/recipes/440694/ (r3)
from
ctypes
import
windll
,
create_string_buffer
# stdin handle is -10
# stdout handle is -11
# stderr handle is -12
h
=
windll
.
kernel32
.
GetStdHandle
(
-
12
)
csbi
=
create_string_buffer
(
22
)
res
=
windll
.
kernel32
.
GetConsoleScreenBufferInfo
(
h
,
csbi
)
if
res
:
import
struct
(
bufx
,
bufy
,
curx
,
cury
,
wattr
,
...
...
@@ -36,9 +36,9 @@ if platform.system() == 'Windows':
return
sizex
,
sizey
except
:
return
80
,
25
else
:
def
getTerminalSize
():
def
ioctl_GWINSZ
(
fd
):
try
:
...
...
@@ -57,10 +57,10 @@ else:
except
:
pass
if
cr
:
return
int
(
cr
[
1
]),
int
(
cr
[
0
])
return
int
(
cr
[
1
]),
int
(
cr
[
0
])
try
:
h
,
w
=
os
.
popen
(
"stty size"
,
"r"
)
.
read
()
.
split
()
return
int
(
w
),
int
(
h
)
return
int
(
w
),
int
(
h
)
except
:
pass
return
80
,
25
...
...
@@ -71,11 +71,11 @@ def _unicode(text):
return
text
class
Command
(
object
):
usage
=
''
version
=
''
def
__init__
(
self
,
usage
=
''
,
version
=
''
):
def
__init__
(
self
,
usage
=
''
,
version
=
''
):
self
.
output_file
=
sys
.
stdout
self
.
error_file
=
sys
.
stderr
self
.
encoding
=
getattr
(
self
.
output_file
,
'encoding'
,
'utf-8'
)
or
'utf-8'
...
...
@@ -87,24 +87,24 @@ class Command(object):
else
:
self
.
terminal_width
=
80
self
.
name
=
self
.
__class__
.
__name__
.
lower
()
def
is_terminal
(
self
):
try
:
return
self
.
output_file
.
isatty
()
except
AttributeError
:
return
False
return
False
def
wrap_dirname
(
self
,
dirname
):
if
not
self
.
terminal_colors
:
return
dirname
return
'
\x1b
[1;34m
%
s
\x1b
[0m'
%
dirname
def
wrap_error
(
self
,
msg
):
if
not
self
.
terminal_colors
:
return
msg
return
'
\x1b
[31m
%
s
\x1b
[0m'
%
msg
def
wrap_filename
(
self
,
fname
):
def
wrap_filename
(
self
,
fname
):
fname
=
_unicode
(
fname
)
if
not
self
.
terminal_colors
:
return
fname
...
...
@@ -116,28 +116,28 @@ class Command(object):
if
isdotfile
(
fname
):
fname
=
'
\x1b
[33m
%
s
\x1b
[0m'
%
fname
return
fname
def
wrap_faded
(
self
,
text
):
text
=
_unicode
(
text
)
if
not
self
.
terminal_colors
:
return
text
return
u'
\x1b
[2m
%
s
\x1b
[0m'
%
text
def
wrap_link
(
self
,
text
):
if
not
self
.
terminal_colors
:
return
text
return
u'
\x1b
[1;33m
%
s
\x1b
[0m'
%
text
def
wrap_strong
(
self
,
text
):
if
not
self
.
terminal_colors
:
return
text
return
u'
\x1b
[1m
%
s
\x1b
[0m'
%
text
def
wrap_table_header
(
self
,
name
):
if
not
self
.
terminal_colors
:
return
name
return
'
\x1b
[1;32m
%
s
\x1b
[0m'
%
name
def
highlight_fsurls
(
self
,
text
):
if
not
self
.
terminal_colors
:
return
text
...
...
@@ -146,13 +146,13 @@ class Command(object):
fs_url
=
matchobj
.
group
(
0
)
return
self
.
wrap_link
(
fs_url
)
return
re
.
sub
(
re_fs
,
repl
,
text
)
def
open_fs
(
self
,
fs_url
,
writeable
=
False
,
create_dir
=
False
):
fs
,
path
=
opener
.
parse
(
fs_url
,
writeable
=
writeable
,
create_dir
=
create_dir
)
fs
.
cache_hint
(
True
)
def
open_fs
(
self
,
fs_url
,
writeable
=
False
,
create_dir
=
False
):
fs
,
path
=
opener
.
parse
(
fs_url
,
writeable
=
writeable
,
create_dir
=
create_dir
)
fs
.
cache_hint
(
True
)
return
fs
,
path
def
expand_wildcard
(
self
,
fs
,
path
):
def
expand_wildcard
(
self
,
fs
,
path
):
if
path
is
None
:
return
[],
[]
pathname
,
resourcename
=
pathsplit
(
path
)
...
...
@@ -161,35 +161,35 @@ class Command(object):
wildcard
=
resourcename
,
absolute
=
True
,
dirs_only
=
True
)
file_paths
=
fs
.
listdir
(
pathname
,
wildcard
=
resourcename
,
absolute
=
True
,
files_only
=
True
)
return
dir_paths
,
file_paths
else
:
if
fs
.
isdir
(
path
):
else
:
if
fs
.
isdir
(
path
):
#file_paths = fs.listdir(path,
# absolute=True)
return
[
path
],
[]
return
[],
[
path
]
def
get_resources
(
self
,
fs_urls
,
dirs_only
=
False
,
files_only
=
False
,
single
=
False
):
fs_paths
=
[
self
.
open_fs
(
fs_url
)
for
fs_url
in
fs_urls
]
fs_paths
=
[
self
.
open_fs
(
fs_url
)
for
fs_url
in
fs_urls
]
resources
=
[]
for
fs
,
path
in
fs_paths
:
for
fs
,
path
in
fs_paths
:
if
path
and
iswildcard
(
path
):
if
not
files_only
:
dir_paths
=
fs
.
listdir
(
wildcard
=
path
,
dirs_only
=
True
)
for
path
in
dir_paths
:
resources
.
append
([
fs
,
path
,
True
])
if
not
dirs_only
:
resources
.
append
([
fs
,
path
,
True
])
if
not
dirs_only
:
file_paths
=
fs
.
listdir
(
wildcard
=
path
,
files_only
=
True
)
for
path
in
file_paths
:
resources
.
append
([
fs
,
path
,
False
])
resources
.
append
([
fs
,
path
,
False
])
else
:
path
=
path
or
'/'
is_dir
=
fs
.
isdir
(
path
)
...
...
@@ -199,47 +199,47 @@ class Command(object):
elif
files_only
and
not
is_dir
:
resources
.
append
(
resource
)
elif
dirs_only
and
is_dir
:
resources
.
append
(
resource
)
resources
.
append
(
resource
)
if
single
:
break
return
resources
return
resources
def
ask
(
self
,
msg
):
return
raw_input
(
'
%
s:
%
s '
%
(
self
.
name
,
msg
))
def
text_encode
(
self
,
text
):
return
raw_input
(
'
%
s:
%
s '
%
(
self
.
name
,
msg
))
def
text_encode
(
self
,
text
):
if
not
isinstance
(
text
,
unicode
):
text
=
text
.
decode
(
'ascii'
,
'replace'
)
text
=
text
.
decode
(
'ascii'
,
'replace'
)
text
=
text
.
encode
(
self
.
encoding
,
'replace'
)
return
text
def
output
(
self
,
msgs
,
verbose
=
False
):
def
output
(
self
,
msgs
,
verbose
=
False
):
if
verbose
and
not
self
.
options
.
verbose
:
return
return
if
isinstance
(
msgs
,
basestring
):
msgs
=
(
msgs
,)
for
msg
in
msgs
:
msgs
=
(
msgs
,)
for
msg
in
msgs
:
self
.
output_file
.
write
(
self
.
text_encode
(
msg
))
def
output_table
(
self
,
table
,
col_process
=
None
,
verbose
=
False
):
if
verbose
and
not
self
.
verbose
:
return
if
col_process
is
None
:
col_process
=
{}
max_row_widths
=
defaultdict
(
int
)
for
row
in
table
:
for
col_no
,
col
in
enumerate
(
row
):
max_row_widths
[
col_no
]
=
max
(
max_row_widths
[
col_no
],
len
(
col
))
lines
=
[]
lines
=
[]
for
row
in
table
:
out_col
=
[]
for
col_no
,
col
in
enumerate
(
row
):
...
...
@@ -248,12 +248,12 @@ class Command(object):
td
=
col_process
[
col_no
](
td
)
out_col
.
append
(
td
)
lines
.
append
(
self
.
text_encode
(
'
%
s
\n
'
%
' '
.
join
(
out_col
)
.
rstrip
()))
self
.
output
(
''
.
join
(
lines
))
self
.
output
(
''
.
join
(
lines
))
def
error
(
self
,
*
msgs
):
for
msg
in
msgs
:
self
.
error_file
.
write
(
'
%
s:
%
s'
%
(
self
.
name
,
self
.
text_encode
(
msg
)))
self
.
error_file
.
write
(
'
%
s:
%
s'
%
(
self
.
name
,
self
.
text_encode
(
msg
)))
def
get_optparse
(
self
):
optparse
=
OptionParser
(
usage
=
self
.
usage
,
version
=
self
.
version
)
optparse
.
add_option
(
'--debug'
,
dest
=
'debug'
,
action
=
"store_true"
,
default
=
False
,
...
...
@@ -265,29 +265,29 @@ class Command(object):
optparse
.
add_option
(
'--fs'
,
dest
=
'fs'
,
action
=
'append'
,
type
=
"string"
,
help
=
"import an FS opener e.g --fs foo.bar.MyOpener"
,
metavar
=
"OPENER"
)
return
optparse
def
list_openers
(
self
):
opener_table
=
[]
for
fs_opener
in
opener
.
openers
.
itervalues
():
names
=
fs_opener
.
names
desc
=
getattr
(
fs_opener
,
'desc'
,
''
)
desc
=
getattr
(
fs_opener
,
'desc'
,
''
)
opener_table
.
append
((
names
,
desc
))
opener_table
.
sort
(
key
=
lambda
r
:
r
[
0
])
def
wrap_line
(
text
):
lines
=
text
.
split
(
'
\n
'
)
lines
=
text
.
split
(
'
\n
'
)
for
line
in
lines
:
words
=
[]
line_len
=
0
for
word
in
line
.
split
():
if
word
==
'*'
:
word
=
' *'
word
=
' *'
if
line_len
+
len
(
word
)
>
self
.
terminal_width
:
self
.
output
((
self
.
highlight_fsurls
(
' '
.
join
(
words
)),
'
\n
'
))
self
.
output
((
self
.
highlight_fsurls
(
' '
.
join
(
words
)),
'
\n
'
))
del
words
[:]
line_len
=
0
words
.
append
(
word
)
...
...
@@ -295,73 +295,73 @@ class Command(object):
if
words
:
self
.
output
(
self
.
highlight_fsurls
(
' '
.
join
(
words
)))
self
.
output
(
'
\n
'
)
for
names
,
desc
in
opener_table
:
self
.
output
((
'-'
*
self
.
terminal_width
,
'
\n
'
))
for
names
,
desc
in
opener_table
:
self
.
output
((
'-'
*
self
.
terminal_width
,
'
\n
'
))
proto
=
', '
.
join
([
n
+
'://'
for
n
in
names
])
self
.
output
((
self
.
wrap_dirname
(
'[
%
s]'
%
proto
),
'
\n\n
'
))
self
.
output
((
self
.
wrap_dirname
(
'[
%
s]'
%
proto
),
'
\n\n
'
))
if
not
desc
.
strip
():
desc
=
"No information available"
desc
=
"No information available"
wrap_line
(
desc
)
self
.
output
(
'
\n
'
)
def
run
(
self
):
def
run
(
self
):
parser
=
self
.
get_optparse
()
options
,
args
=
parser
.
parse_args
()
self
.
options
=
options
if
options
.
listopeners
:
self
.
list_openers
()
return
0
ilocals
=
{}
if
options
.
fs
:
if
options
.
fs
:
for
import_opener
in
options
.
fs
:
module_name
,
opener_class
=
import_opener
.
rsplit
(
'.'
,
1
)
module_name
,
opener_class
=
import_opener
.
rsplit
(
'.'
,
1
)
try
:
opener_module
=
__import__
(
module_name
,
globals
(),
ilocals
,
[
opener_class
],
-
1
)
opener_module
=
__import__
(
module_name
,
globals
(),
ilocals
,
[
opener_class
],
-
1
)
except
ImportError
:
self
.
error
(
"Unable to import opener
%
s
\n
"
%
import_opener
)
return
0
new_opener
=
getattr
(
opener_module
,
opener_class
)
try
:
new_opener
=
getattr
(
opener_module
,
opener_class
)
try
:
if
not
issubclass
(
new_opener
,
Opener
):
self
.
error
(
'
%
s is not an fs.opener.Opener
\n
'
%
import_opener
)
return
0
except
TypeError
:
self
.
error
(
'
%
s is not an opener class
\n
'
%
import_opener
)
return
0
if
options
.
verbose
:
self
.
output
(
'Imported opener
%
s
\n
'
%
import_opener
)
opener
.
add
(
new_opener
)
args
=
[
unicode
(
arg
,
sys
.
getfilesystemencoding
())
for
arg
in
args
]
self
.
verbose
=
options
.
verbose
self
.
verbose
=
options
.
verbose
try
:
return
self
.
do_run
(
options
,
args
)
or
0
except
FSError
,
e
:
self
.
error
(
self
.
wrap_error
(
unicode
(
e
))
+
'
\n
'
)
if
options
.
debug
:
raise
return
1
return
1
except
KeyboardInterrupt
:
if
self
.
is_terminal
():
self
.
output
(
"
\n
"
)
return
0
return
0
except
SystemExit
:
return
0
except
Exception
,
e
:
return
0
except
Exception
,
e
:
self
.
error
(
self
.
wrap_error
(
'Error -
%
s
\n
'
%
unicode
(
e
)))
if
options
.
debug
:
raise
return
1
if
__name__
==
"__main__"
:
command
=
Command
()
sys
.
exit
(
command
.
run
())
\ No newline at end of file
fs/errors.py
View file @
bc30657b
...
...
@@ -24,7 +24,7 @@ __all__ = ['FSError',
'NoMetaError'
,
'NoPathURLError'
,
'ResourceNotFoundError'
,
'ResourceInvalidError'
,
'ResourceInvalidError'
,
'DestinationExistsError'
,
'DirectoryNotEmptyError'
,
'ParentDirectoryMissingError'
,
...
...
@@ -42,6 +42,10 @@ from fs.path import *
from
fs.local_functools
import
wraps
class
InvalidPathError
(
Exception
):
pass
class
FSError
(
Exception
):
"""Base exception class for the FS module."""
default_message
=
"Unspecified error"
...
...
@@ -81,7 +85,7 @@ class PathError(FSError):
def
__init__
(
self
,
path
=
""
,
**
kwds
):
self
.
path
=
path
super
(
PathError
,
self
)
.
__init__
(
**
kwds
)
class
OperationFailedError
(
FSError
):
"""Base exception class for errors associated with a specific operation."""
...
...
@@ -184,6 +188,7 @@ class ResourceLockedError(ResourceError):
"""Exception raised when a resource can't be used because it is locked."""
default_message
=
"Resource is locked:
%(path)
s"
class
NoMMapError
(
ResourceError
):
"""Exception raise when getmmap fails to create a mmap"""
default_message
=
"Can't get mmap for
%(path)
s"
...
...
fs/osfs/__init__.py
View file @
bc30657b
...
...
@@ -32,13 +32,15 @@ from fs.osfs.watch import OSFSWatchMixin
@convert_os_errors
def
_os_stat
(
path
):
"""Replacement for os.stat that raises FSError subclasses."""
"""Replacement for os.stat that raises FSError subclasses."""
return
os
.
stat
(
path
)
@convert_os_errors
def
_os_mkdir
(
name
,
mode
=
0777
):
"""Replacement for os.mkdir that raises FSError subclasses."""
return
os
.
mkdir
(
name
,
mode
)
return
os
.
mkdir
(
name
,
mode
)
@convert_os_errors
def
_os_makedirs
(
name
,
mode
=
0777
):
...
...
@@ -64,7 +66,6 @@ def _os_makedirs(name, mode=0777):
if
tail
==
os
.
curdir
:
return
os
.
mkdir
(
name
,
mode
)
class
OSFS
(
OSFSXAttrMixin
,
OSFSWatchMixin
,
FS
):
...
...
@@ -74,7 +75,7 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
filesystem of the OS. Most of its methods simply defer to the matching
methods in the os and os.path modules.
"""
_meta
=
{
'thread_safe'
:
True
,
'network'
:
False
,
'virtual'
:
False
,
...
...
@@ -90,7 +91,7 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
"""
Creates an FS object that represents the OS Filesystem under a given root path
:param root_path: The root OS path
:param root_path: The root OS path
:param thread_synchronize: If True, this object will be thread-safe by use of a threading.Lock object
:param encoding: The encoding method for path strings
:param create: If True, then root_path will be created if it doesn't already exist
...
...
@@ -114,7 +115,7 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
if
root_path
.
startswith
(
"
\\\\
"
):
root_path
=
u"
\\\\
?
\\
UNC
\\
"
+
root_path
[
2
:]
else
:
root_path
=
u"
\\\\
?"
+
root_path
root_path
=
u"
\\\\
?"
+
root_path
# If it points at the root of a drive, it needs a trailing slash.
if
len
(
root_path
)
==
6
and
not
root_path
.
endswith
(
"
\\
"
):
root_path
=
root_path
+
"
\\
"
...
...
@@ -126,9 +127,9 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
pass
if
not
os
.
path
.
exists
(
root_path
):
raise
ResourceNotFoundError
(
root_path
,
msg
=
"Root directory does not exist:
%(path)
s"
)
raise
ResourceNotFoundError
(
root_path
,
msg
=
"Root directory does not exist:
%(path)
s"
)
if
not
os
.
path
.
isdir
(
root_path
):
raise
ResourceInvalidError
(
root_path
,
msg
=
"Root path is not a directory:
%(path)
s"
)
raise
ResourceInvalidError
(
root_path
,
msg
=
"Root path is not a directory:
%(path)
s"
)
self
.
root_path
=
root_path
self
.
dir_mode
=
dir_mode
...
...
@@ -137,20 +138,20 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
def
__repr__
(
self
):
return
"<OSFS:
%
r>"
%
self
.
root_path
def
__unicode__
(
self
):
return
u"<OSFS:
%
s>"
%
self
.
root_path
def
_decode_path
(
self
,
p
):
if
isinstance
(
p
,
unicode
):
return
p
return
p
.
decode
(
self
.
encoding
,
'replace'
)
return
p
return
p
.
decode
(
self
.
encoding
,
'replace'
)
def
getsyspath
(
self
,
path
,
allow_none
=
False
):
path
=
relpath
(
normpath
(
path
))
.
replace
(
"/"
,
os
.
sep
)
path
=
relpath
(
normpath
(
path
))
.
replace
(
"/"
,
os
.
sep
)
path
=
os
.
path
.
join
(
self
.
root_path
,
path
)
if
not
path
.
startswith
(
self
.
root_path
):
raise
PathError
(
path
,
msg
=
"OSFS given path outside root:
%(path)
s"
)
raise
PathError
(
path
,
msg
=
"OSFS given path outside root:
%(path)
s"
)
path
=
self
.
_decode_path
(
path
)
return
path
...
...
@@ -159,11 +160,11 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
This basically the reverse of getsyspath(). If the path does not
refer to a location within this filesystem, ValueError is raised.
:param path: a system path
:returns: a path within this FS object
:rtype: string
"""
path
=
os
.
path
.
normpath
(
os
.
path
.
abspath
(
path
))
path
=
self
.
_decode_path
(
path
)
...
...
@@ -173,11 +174,11 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
if
not
prefix
.
endswith
(
os
.
path
.
sep
):
prefix
+=
os
.
path
.
sep
if
not
os
.
path
.
normcase
(
path
)
.
startswith
(
prefix
):
raise
ValueError
(
"path not within this FS:
%
s (
%
s)"
%
(
os
.
path
.
normcase
(
path
),
prefix
))
raise
ValueError
(
"path not within this FS:
%
s (
%
s)"
%
(
os
.
path
.
normcase
(
path
),
prefix
))
return
normpath
(
path
[
len
(
self
.
root_path
):])
def
getmeta
(
self
,
meta_name
,
default
=
NoDefaultMeta
):
if
meta_name
==
'free_space'
:
if
platform
.
system
()
==
'Windows'
:
try
:
...
...
@@ -204,11 +205,11 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
else
:
stat
=
os
.
statvfs
(
self
.
root_path
)
return
stat
.
f_blocks
*
stat
.
f_bsize
return
super
(
OSFS
,
self
)
.
getmeta
(
meta_name
,
default
)
@convert_os_errors
def
open
(
self
,
path
,
mode
=
"r"
,
**
kwargs
):
def
open
(
self
,
path
,
mode
=
"r"
,
**
kwargs
):
mode
=
''
.
join
(
c
for
c
in
mode
if
c
in
'rwabt+'
)
sys_path
=
self
.
getsyspath
(
path
)
try
:
...
...
@@ -221,25 +222,25 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
raise
@convert_os_errors
def
setcontents
(
self
,
path
,
contents
,
chunk_size
=
64
*
1024
):
return
super
(
OSFS
,
self
)
.
setcontents
(
path
,
contents
,
chunk_size
)
def
setcontents
(
self
,
path
,
contents
,
chunk_size
=
64
*
1024
):
return
super
(
OSFS
,
self
)
.
setcontents
(
path
,
contents
,
chunk_size
)
@convert_os_errors
def
exists
(
self
,
path
):
def
exists
(
self
,
path
):
return
_exists
(
self
.
getsyspath
(
path
))
@convert_os_errors
def
isdir
(
self
,
path
):
def
isdir
(
self
,
path
):
return
_isdir
(
self
.
getsyspath
(
path
))
@convert_os_errors
def
isfile
(
self
,
path
):
def
isfile
(
self
,
path
):
return
_isfile
(
self
.
getsyspath
(
path
))
@convert_os_errors
def
listdir
(
self
,
path
=
"./"
,
wildcard
=
None
,
full
=
False
,
absolute
=
False
,
dirs_only
=
False
,
files_only
=
False
):
_decode_path
=
self
.
_decode_path
paths
=
[
_decode_path
(
p
)
for
p
in
os
.
listdir
(
self
.
getsyspath
(
path
))]
_decode_path
=
self
.
_decode_path
paths
=
[
_decode_path
(
p
)
for
p
in
os
.
listdir
(
self
.
getsyspath
(
path
))]
return
self
.
_listdir_helper
(
path
,
paths
,
wildcard
,
full
,
absolute
,
dirs_only
,
files_only
)
@convert_os_errors
...
...
@@ -252,16 +253,16 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
_os_mkdir
(
sys_path
,
self
.
dir_mode
)
except
DestinationExistsError
:
if
self
.
isfile
(
path
):
raise
ResourceInvalidError
(
path
,
msg
=
"Cannot create directory, there's already a file of that name:
%(path)
s"
)
raise
ResourceInvalidError
(
path
,
msg
=
"Cannot create directory, there's already a file of that name:
%(path)
s"
)
if
not
allow_recreate
:
raise
DestinationExistsError
(
path
,
msg
=
"Can not create a directory that already exists (try allow_recreate=True):
%(path)
s"
)
raise
DestinationExistsError
(
path
,
msg
=
"Can not create a directory that already exists (try allow_recreate=True):
%(path)
s"
)
except
ResourceNotFoundError
:
raise
ParentDirectoryMissingError
(
path
)
@convert_os_errors
def
remove
(
self
,
path
):
sys_path
=
self
.
getsyspath
(
path
)
try
:
try
:
os
.
remove
(
sys_path
)
except
OSError
,
e
:
if
e
.
errno
==
errno
.
EACCES
and
sys
.
platform
==
"win32"
:
...
...
@@ -275,7 +276,7 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
raise
@convert_os_errors
def
removedir
(
self
,
path
,
recursive
=
False
,
force
=
False
):
def
removedir
(
self
,
path
,
recursive
=
False
,
force
=
False
):
sys_path
=
self
.
getsyspath
(
path
)
if
force
:
for
path2
in
self
.
listdir
(
path
,
absolute
=
True
,
files_only
=
True
):
...
...
@@ -297,7 +298,7 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
if
recursive
:
try
:
if
dirname
(
path
)
not
in
(
''
,
'/'
):
self
.
removedir
(
dirname
(
path
),
recursive
=
True
)
self
.
removedir
(
dirname
(
path
),
recursive
=
True
)
except
DirectoryNotEmptyError
:
pass
...
...
@@ -320,9 +321,9 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
if
e
.
errno
==
errno
.
ENOENT
:
if
not
os
.
path
.
exists
(
os
.
path
.
dirname
(
path_dst
)):
raise
ParentDirectoryMissingError
(
dst
)
raise
def
_stat
(
self
,
path
):
raise
def
_stat
(
self
,
path
):
"""Stat the given path, normalising error codes."""
sys_path
=
self
.
getsyspath
(
path
)
try
:
...
...
@@ -350,5 +351,3 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
@convert_os_errors
def
getsize
(
self
,
path
):
return
self
.
_stat
(
path
)
.
st_size
fs/path.py
View file @
bc30657b
from
__future__
import
unicode_literals
"""
fs.path
=======
...
...
@@ -11,22 +13,23 @@ by forward slashes and with an optional leading slash).
"""
import
re
import
os
_requires_normalization
=
re
.
compile
(
r'/\.\.|\./|\.|//'
)
.
search
_requires_normalization
=
re
.
compile
(
r'/\.\.|\./|\.|//|\\'
)
.
search
def
normpath
(
path
):
"""Normalizes a path to be in the format expected by FS objects.
This function remove any leading or trailing slashes, collapses
duplicate slashes,
replaces backward with forward slashes, and generally
tries very hard to return a new path string
the canonical FS format.
duplicate slashes,
and generally tries very hard to return a new path
in
the canonical FS format.
If the path is invalid, ValueError will be raised.
:param path: path to normalize
:returns: a valid FS path
>>> normpath(r"foo
\\
bar
\\
baz")
'foo/bar/baz'
>>> normpath("/foo//bar/frob/../baz")
'/foo/bar/baz'
...
...
@@ -40,15 +43,13 @@ def normpath(path):
if
path
in
(
''
,
'/'
):
return
path
path
=
path
.
replace
(
'
\\
'
,
'/'
)
# An early out if there is no need to normalize this path
if
not
_requires_normalization
(
path
):
return
path
.
rstrip
(
'/'
)
components
=
[]
append
=
components
.
append
special
=
(
'..'
,
'.'
,
''
)
.
__contains__
special
=
(
'..'
,
'.'
,
''
)
.
__contains__
try
:
for
component
in
path
.
split
(
'/'
):
if
special
(
component
):
...
...
@@ -66,12 +67,27 @@ def normpath(path):
return
'/'
.
join
(
components
)
if
os
.
sep
!=
'/'
:
def
ospath
(
path
):
"""Replace path separators in an OS path if required"""
return
path
.
replace
(
os
.
sep
,
'/'
)
else
:
def
ospath
(
path
):
"""Replace path separators in an OS path if required"""
return
path
def
normospath
(
path
):
"""Normalizes a path with os separators"""
return
normpath
(
ospath
)
def
iteratepath
(
path
,
numsplits
=
None
):
"""Iterate over the individual components of a path.
:param path: Path to iterate over
:numsplits: Maximum number of splits
"""
path
=
relpath
(
normpath
(
path
))
if
not
path
:
...
...
@@ -84,39 +100,40 @@ def iteratepath(path, numsplits=None):
def
recursepath
(
path
,
reverse
=
False
):
"""Returns intermediate paths from the root to the given path
:param reverse: reverses the order of the paths
>>> recursepath('a/b/c')
['/', u'/a', u'/a/b', u'/a/b/c']
"""
"""
if
path
in
(
''
,
'/'
):
return
[
u'/'
]
path
=
abspath
(
normpath
(
path
))
+
'/'
path
=
abspath
(
normpath
(
path
))
+
'/'
paths
=
[
u'/'
]
find
=
path
.
find
append
=
paths
.
append
append
=
paths
.
append
pos
=
1
len_path
=
len
(
path
)
while
pos
<
len_path
:
pos
=
find
(
'/'
,
pos
)
len_path
=
len
(
path
)
while
pos
<
len_path
:
pos
=
find
(
'/'
,
pos
)
append
(
path
[:
pos
])
pos
+=
1
pos
+=
1
if
reverse
:
return
paths
[::
-
1
]
return
paths
return
paths
def
isabs
(
path
):
"""Return True if path is an absolute path."""
return
path
.
startswith
(
'/'
)
def
abspath
(
path
):
"""Convert the given path to an absolute path.
...
...
@@ -134,9 +151,9 @@ def relpath(path):
This is the inverse of abspath(), stripping a leading '/' from the
path if it is present.
:param path: Path to adjust
>>> relpath('/a/b')
'a/b'
...
...
@@ -146,7 +163,7 @@ def relpath(path):
def
pathjoin
(
*
paths
):
"""Joins any number of paths together, returning a new path string.
:param paths: Paths to join are given in positional arguments
>>> pathjoin('foo', 'bar', 'baz')
...
...
@@ -160,10 +177,10 @@ def pathjoin(*paths):
"""
absolute
=
False
relpaths
=
[]
relpaths
=
[]
for
p
in
paths
:
if
p
:
if
p
[
0
]
in
'
\\
/'
:
if
p
[
0
]
==
'
/'
:
del
relpaths
[:]
absolute
=
True
relpaths
.
append
(
p
)
...
...
@@ -173,24 +190,26 @@ def pathjoin(*paths):
path
=
abspath
(
path
)
return
path
def
pathcombine
(
path1
,
path2
):
"""Joins two paths together.
This is faster than `pathjoin`, but only works when the second path is relative,
and there are no backreferences in either path.
and there are no backreferences in either path.
>>> pathcombine("foo/bar", "baz")
'foo/bar/baz'
"""
'foo/bar/baz'
"""
return
"
%
s/
%
s"
%
(
path1
.
rstrip
(
'/'
),
path2
.
lstrip
(
'/'
))
def
join
(
*
paths
):
"""Joins any number of paths together, returning a new path string.
This is a simple alias for the ``pathjoin`` function, allowing it to be
used as ``fs.path.join`` in direct correspondence with ``os.path.join``.
:param paths: Paths to join are given in positional arguments
"""
return
pathjoin
(
*
paths
)
...
...
@@ -201,7 +220,7 @@ def pathsplit(path):
This function splits a path into a pair (head, tail) where 'tail' is the
last pathname component and 'head' is all preceding components.
:param path: Path to split
>>> pathsplit("foo/bar")
...
...
@@ -209,7 +228,7 @@ def pathsplit(path):
>>> pathsplit("foo/bar/baz")
('foo/bar', 'baz')
>>> pathsplit("/foo/bar/baz")
('/foo/bar', 'baz')
...
...
@@ -234,17 +253,17 @@ def split(path):
def
splitext
(
path
):
"""Splits the extension from the path, and returns the path (up to the last
'.' and the extension).
:param path: A path to split
>>> splitext('baz.txt')
('baz', 'txt')
>>> splitext('foo/bar/baz.txt')
('foo/bar/baz', 'txt')
"""
parent_path
,
pathname
=
pathsplit
(
path
)
if
'.'
not
in
pathname
:
return
path
,
''
...
...
@@ -256,18 +275,18 @@ def splitext(path):
def
isdotfile
(
path
):
"""Detects if a path references a dot file, i.e. a resource who's name
starts with a '.'
:param path: Path to check
>>> isdotfile('.baz')
True
>>> isdotfile('foo/bar/baz')
True
>>> isdotfile('foo/bar.baz').
False
"""
return
basename
(
path
)
.
startswith
(
'.'
)
...
...
@@ -277,15 +296,15 @@ def dirname(path):
This is always equivalent to the 'head' component of the value returned
by pathsplit(path).
:param path: A FS path
>>> dirname('foo/bar/baz')
'foo/bar'
>>> dirname('/foo/bar')
'/foo'
>>> dirname('/foo')
'/'
...
...
@@ -298,15 +317,15 @@ def basename(path):
This is always equivalent to the 'tail' component of the value returned
by pathsplit(path).
:param path: A FS path
>>> basename('foo/bar/baz')
'baz'
>>> basename('foo/bar')
'bar'
>>> basename('foo/bar/')
''
...
...
@@ -316,7 +335,7 @@ def basename(path):
def
issamedir
(
path1
,
path2
):
"""Return true if two paths reference a resource in the same directory.
:param path1: An FS path
:param path2: An FS path
...
...
@@ -332,15 +351,15 @@ def issamedir(path1, path2):
def
isbase
(
path1
,
path2
):
p1
=
forcedir
(
abspath
(
path1
))
p2
=
forcedir
(
abspath
(
path2
))
return
p1
==
p2
or
p1
.
startswith
(
p2
)
return
p1
==
p2
or
p1
.
startswith
(
p2
)
def
isprefix
(
path1
,
path2
):
"""Return true is path1 is a prefix of path2.
:param path1: An FS path
:param path2: An FS path
>>> isprefix("foo/bar", "foo/bar/spam.txt")
True
>>> isprefix("foo/bar/", "foo/bar")
...
...
@@ -365,7 +384,7 @@ def isprefix(path1, path2):
def
forcedir
(
path
):
"""Ensure the path ends with a trailing /
:param path: An FS path
>>> forcedir("foo/bar")
...
...
@@ -602,12 +621,12 @@ class PathMap(object):
_wild_chars
=
frozenset
(
'*?[]!{}'
)
def
iswildcard
(
path
):
"""Check if a path ends with a wildcard
>>> is_wildcard('foo/bar/baz.*')
True
>>> is_wildcard('foo/bar')
False
"""
assert
path
is
not
None
base_chars
=
frozenset
(
basename
(
path
))
...
...
fs/tests/test_path.py
View file @
bc30657b
...
...
@@ -14,7 +14,7 @@ class TestPathFunctions(unittest.TestCase):
"""Testcases for FS path functions."""
def
test_normpath
(
self
):
tests
=
[
(
"
\\
a
\\
b
\\
c"
,
"
/a/b/
c"
),
tests
=
[
(
"
\\
a
\\
b
\\
c"
,
"
\\
a
\\
b
\\
c"
),
(
"."
,
""
),
(
"./"
,
""
),
(
""
,
""
),
...
...
@@ -22,7 +22,7 @@ class TestPathFunctions(unittest.TestCase):
(
"a/b/c"
,
"a/b/c"
),
(
"a/b/../c/"
,
"a/c"
),
(
"/"
,
"/"
),
(
u"a/
\N{GREEK SMALL LETTER BETA}
\\
c"
,
u"a/
\N{GREEK SMALL LETTER BETA}
/c"
),
(
u"a/
\N{GREEK SMALL LETTER BETA}
/
c"
,
u"a/
\N{GREEK SMALL LETTER BETA}
/c"
),
]
for
path
,
result
in
tests
:
self
.
assertEqual
(
normpath
(
path
),
result
)
...
...
@@ -38,7 +38,7 @@ class TestPathFunctions(unittest.TestCase):
(
"a/b/c"
,
"../d"
,
"c"
,
"a/b/d/c"
),
(
"a/b/c"
,
"../d"
,
"/a"
,
"/a"
),
(
"aaa"
,
"bbb/ccc"
,
"aaa/bbb/ccc"
),
(
"aaa"
,
"bbb
\
ccc"
,
"aaa/bbb/
ccc"
),
(
"aaa"
,
"bbb
\
\
ccc"
,
"aaa/bbb
\\
ccc"
),
(
"aaa"
,
"bbb"
,
"ccc"
,
"/aaa"
,
"eee"
,
"/aaa/eee"
),
(
"a/b"
,
"./d"
,
"e"
,
"a/b/d/e"
),
(
"/"
,
"/"
,
"/"
),
...
...
@@ -104,7 +104,7 @@ class TestPathFunctions(unittest.TestCase):
self
.
assertEquals
(
recursepath
(
"/hello/world/"
,
reverse
=
True
),[
"/hello/world"
,
"/hello"
,
"/"
])
self
.
assertEquals
(
recursepath
(
"hello"
,
reverse
=
True
),[
"/hello"
,
"/"
])
self
.
assertEquals
(
recursepath
(
""
,
reverse
=
True
),[
"/"
])
def
test_isdotfile
(
self
):
for
path
in
[
'.foo'
,
'.svn'
,
...
...
@@ -112,14 +112,14 @@ class TestPathFunctions(unittest.TestCase):
'foo/bar/.svn'
,
'/foo/.bar'
]:
self
.
assert_
(
isdotfile
(
path
))
for
path
in
[
'asfoo'
,
'df.svn'
,
'foo/er.svn'
,
'foo/bar/test.txt'
,
'/foo/bar'
]:
self
.
assertFalse
(
isdotfile
(
path
))
def
test_dirname
(
self
):
tests
=
[(
'foo'
,
''
),
(
'foo/bar'
,
'foo'
),
...
...
@@ -129,7 +129,7 @@ class TestPathFunctions(unittest.TestCase):
(
'/'
,
'/'
)]
for
path
,
test_dirname
in
tests
:
self
.
assertEqual
(
dirname
(
path
),
test_dirname
)
def
test_basename
(
self
):
tests
=
[(
'foo'
,
'foo'
),
(
'foo/bar'
,
'bar'
),
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment