Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
P
pyfs
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
OpenEdx
pyfs
Commits
aece1874
Commit
aece1874
authored
Jul 26, 2008
by
willmcgugan
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Refinements to the memoryfs class, and added a 'walk' generator to the base class
parent
36b626d8
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
236 additions
and
171 deletions
+236
-171
fs/browsewin.py
+8
-8
fs/fs.py
+34
-10
fs/memoryfs.py
+194
-153
No files found.
fs/browsewin.py
View file @
aece1874
...
...
@@ -9,7 +9,7 @@ import fs
class
BrowseFrame
(
wx
.
Frame
):
def
__init__
(
self
,
fs
):
wx
.
Frame
.
__init__
(
self
,
None
)
self
.
fs
=
fs
...
...
@@ -44,7 +44,7 @@ class BrowseFrame(wx.Frame):
if
not
self
.
fs
.
isdir
(
path
):
return
if
item_data
[
'expanded'
]:
if
item_data
[
'expanded'
]:
return
paths
=
[(
self
.
fs
.
isdir
(
p
),
p
)
for
p
in
self
.
fs
.
listdir
(
path
,
absolute
=
True
)]
...
...
@@ -55,16 +55,16 @@ class BrowseFrame(wx.Frame):
return
paths
.
sort
(
key
=
lambda
p
:(
not
p
[
0
],
p
[
1
]
.
lower
()))
for
is_dir
,
new_path
in
paths
:
name
=
fs
.
pathsplit
(
new_path
)[
-
1
]
if
not
is_dir
and
name
.
endswith
(
'.txt'
):
txt
=
self
.
fs
.
open
(
new_path
)
.
read
(
50
)
txt
=
self
.
fs
.
open
(
new_path
)
.
read
line
()[:
50
]
.
rstrip
(
)
name
+=
" - "
+
txt
new_item
=
self
.
tree
.
AppendItem
(
item_id
,
name
,
data
=
wx
.
TreeItemData
({
'path'
:
new_path
,
'expanded'
:
False
}))
if
is_dir
:
...
...
fs/fs.py
View file @
aece1874
...
...
@@ -13,7 +13,8 @@ error_msgs = {
"LISTDIR_FAILED"
:
"Unable to get directory listing:
%(path)
s"
,
"NO_SYS_PATH"
:
"No mapping to OS filesytem:
%(path)
s,"
,
"DIR_EXISTS"
:
"Directory exists (try allow_recreate=True):
%(path)
s"
,
"OPEN_FAILED"
:
"Unable to open file:
%(path)
s"
"OPEN_FAILED"
:
"Unable to open file:
%(path)
s"
,
"FILE_LOCKED"
:
"File is locked:
%(path)
s"
,
}
error_codes
=
error_msgs
.
keys
()
...
...
@@ -29,9 +30,9 @@ class FSError(Exception):
def
__str__
(
self
):
msg
=
self
.
msg
%
dict
((
k
,
str
(
v
))
for
k
,
v
in
self
.
__dict__
.
iteritems
())
msg
=
self
.
msg
%
dict
((
k
,
str
(
v
))
for
k
,
v
in
self
.
__dict__
.
iteritems
())
return
'
%
s
%
s'
%
(
self
.
code
,
msg
)
return
'
%
s
.
%
s'
%
(
self
.
code
,
msg
)
class
NullFile
:
...
...
@@ -224,17 +225,40 @@ class FS(object):
elif
absolute
:
paths
=
[
self
.
abspath
(
pathjoin
(
path
,
p
))
for
p
in
paths
]
return
paths
def
walk_files
(
self
,
path
=
"/"
,
wildcard
=
None
,
dir_wildcard
=
None
):
dirs
=
[
path
]
files
=
[]
while
dirs
:
path
=
dirs
.
pop
()
for
path
in
self
.
listdir
(
path
,
full
=
True
):
if
self
.
isdir
(
path
):
if
dir_wildcard
is
not
None
:
if
fnmatch
.
fnmatch
(
path
,
dir_wilcard
):
dirs
.
append
(
path
)
else
:
dirs
.
append
(
path
)
else
:
if
wildcard
is
not
None
:
if
fnmatch
.
fnmatch
(
path
,
wildcard
):
yield
path
else
:
yield
path
class
SubFS
(
FS
):
def
__init__
(
self
,
parent
,
sub_dir
):
self
.
parent
=
parent
self
.
sub_dir
=
parent
.
abspath
(
sub_dir
)
#print "sub_dir", self.sub_dir
def
__str__
(
self
):
return
"<SubFS
\"
%
s
\"
of
%
s>"
%
(
self
.
sub_dir
,
self
.
parent
)
...
...
@@ -242,7 +266,6 @@ class SubFS(FS):
def
_delegate
(
self
,
dirname
):
delegate_path
=
pathjoin
(
self
.
sub_dir
,
resolvepath
(
makerelative
(
dirname
)))
#print "delegate path", delegate_path
return
delegate_path
def
getsyspath
(
self
,
pathname
):
...
...
@@ -274,7 +297,6 @@ class OSFS(FS):
raise
FSError
(
"NO_DIR"
,
expanded_path
,
msg
=
"Root path is not a directory:
%(path)
s"
)
self
.
root_path
=
normpath
(
os
.
path
.
abspath
(
expanded_path
))
#print "Root path", self.root_path
def
__str__
(
self
):
return
"<OSFS
\"
%
s
\"
>"
%
self
.
root_path
...
...
@@ -320,7 +342,7 @@ class OSFS(FS):
try
:
paths
=
os
.
listdir
(
self
.
getsyspath
(
path
))
except
IOError
,
e
:
except
(
OSError
,
IOError
)
,
e
:
raise
FSError
(
"LISTDIR_FAILED"
,
path
,
details
=
e
,
msg
=
"Unable to get directory listing:
%(path)
s - (
%(details)
s)"
)
return
self
.
_listdir_helper
(
path
,
paths
,
wildcard
,
full
,
absolute
,
hidden
,
dirs_only
,
files_only
)
...
...
@@ -470,9 +492,11 @@ class MountFS(FS):
if
__name__
==
"__main__"
:
osfs
=
OSFS
(
"~/"
)
osfs
=
OSFS
(
"~/
projects
"
)
print
osfs
#print osfs
for
filename
in
osfs
.
walk_files
(
"/prettycharts"
,
"*.pov"
):
print
filename
import
browsewin
browsewin
.
browse
(
osfs
)
...
...
fs/memoryfs.py
View file @
aece1874
#!/usr/bin/env python
import
os
from
fs
import
FS
,
pathsplit
,
_iteratepath
,
FSError
,
print_fs
try
:
...
...
@@ -7,199 +8,209 @@ try:
except
ImportError
:
from
StringIO
import
StringIO
def
_check_mode
(
mode
,
mode_chars
):
for
c
in
mode_chars
:
if
c
not
in
mode
:
return
False
return
True
class
MemoryFile
(
object
):
def
__init__
(
self
,
path
,
memory_fs
,
value
,
mode
):
self
.
path
=
path
self
.
memory_fs
=
memory_fs
self
.
mode
=
mode
def
check_mode
(
mode_chars
):
for
c
in
mode_chars
:
if
c
not
in
mode
:
return
False
return
True
self
.
mem_file
=
None
if
check_mode
(
'w'
):
if
check_mode
(
'+'
):
if
_check_mode
(
mode
,
'wa'
):
self
.
mem_file
=
StringIO
()
self
.
mem_file
.
write
(
value
)
elif
_check_mode
(
mode
,
'w'
):
self
.
mem_file
=
StringIO
()
elif
_check_mode
(
mode
,
'ra'
):
self
.
mem_file
=
StringIO
()
self
.
mem_file
.
write
(
value
)
elif
_check_mode
(
mode
,
'r'
):
self
.
mem_file
=
StringIO
(
value
)
else
:
if
value
is
not
None
:
self
.
mem_file
=
StringIO
(
value
)
else
:
self
.
mem_file
=
StringIO
()
elif
check_mode
(
'r'
):
if
check_mode
(
'+'
):
self
.
mem_file
=
StringIO
(
value
)
else
:
self
.
mem_file
=
StringIO
(
value
)
if
check_mode
(
'a'
):
self
.
mem_file
.
seek
(
0
,
os
.
SEEK_END
)
assert
self
.
mem_file
is
not
None
,
"self.mem_file should have a value"
self
.
closed
=
False
def
__del__
(
self
):
if
not
self
.
closed
:
self
.
close
()
def
flush
(
self
):
pass
def
__iter__
(
self
):
return
iter
(
self
.
mem_file
)
def
next
(
self
):
return
self
.
mem_file
.
next
()
def
readline
(
self
,
*
args
,
**
kwargs
):
return
self
.
mem_file
.
readline
(
*
args
,
**
kwargs
)
def
close
(
self
):
value
=
self
.
mem_file
.
getvalue
()
if
'w'
in
self
.
mode
:
def
close
(
self
):
if
not
self
.
closed
:
value
=
self
.
mem_file
.
getvalue
()
self
.
memory_fs
.
_on_close_memory_file
(
self
.
path
,
value
)
self
.
mem_file
.
close
()
self
.
closed
=
True
self
.
mem_file
.
close
()
self
.
closed
=
True
def
read
(
self
,
size
=
None
):
if
size
is
None
:
size
=
-
1
return
self
.
mem_file
.
read
(
size
)
def
seek
(
self
,
*
args
,
**
kwargs
):
return
self
.
mem_file
.
seek
(
*
args
,
**
kwargs
)
def
tell
(
self
):
return
self
.
mem_file
.
tell
()
def
truncate
(
self
,
*
args
,
**
kwargs
):
return
self
.
mem_file
.
truncate
(
*
args
,
**
kwargs
)
def
write
(
self
,
data
):
return
self
.
mem_file
.
write
(
data
)
def
writelines
(
self
,
*
args
,
**
kwargs
):
return
self
.
mem_file
.
writelines
(
*
args
,
**
kwargs
)
class
MemoryFS
(
FS
):
class
DirEntry
(
object
):
class
MemoryFS
(
FS
):
class
DirEntry
(
object
):
def
__init__
(
self
,
type
,
name
,
contents
=
None
):
self
.
type
=
type
self
.
name
=
name
self
.
name
=
name
self
.
permissions
=
None
if
contents
is
None
and
type
==
"dir"
:
contents
=
{}
self
.
contents
=
contents
self
.
num_reading
=
0
self
.
num_writing
=
0
self
.
data
=
None
self
.
locks
=
0
def
lock
(
self
):
return
self
.
locks
+=
1
def
unlock
(
self
):
return
self
.
locks
-=
1
assert
self
.
locks
>=
0
,
"Lock / Unlock mismatch!"
def
desc_contents
(
self
):
if
self
.
isfile
():
return
"<file
%
s>"
%
self
.
name
elif
self
.
isdir
():
return
"<dir
%
s>"
%
""
.
join
(
"
%
s:
%
s"
%
(
k
,
v
.
desc_contents
())
for
k
,
v
in
self
.
contents
.
iteritems
())
def
isdir
(
self
):
return
self
.
type
==
"dir"
def
isfile
(
self
):
return
self
.
type
==
"file"
def
islocked
(
self
):
return
self
.
locks
>
0
def
__str__
(
self
):
return
"
%
s:
%
s"
%
(
self
.
name
,
self
.
desc_contents
())
class
FileEntry
(
object
):
def
__init__
(
self
):
self
.
memory_file
=
None
self
.
value
=
""
def
_make_dir_entry
(
self
,
*
args
,
**
kwargs
):
return
self
.
dir_entry_factory
(
*
args
,
**
kwargs
)
def
__init__
(
self
):
self
.
dir_entry_factory
=
MemoryFS
.
DirEntry
self
.
root
=
self
.
_make_dir_entry
(
'dir'
,
'root'
)
self
.
root
=
self
.
_make_dir_entry
(
'dir'
,
'root'
)
def
_get_dir_entry
(
self
,
dirpath
):
current_dir
=
self
.
root
for
path_component
in
_iteratepath
(
dirpath
):
dir_entry
=
current_dir
.
contents
.
get
(
path_component
,
None
)
if
dir_entry
is
None
:
return
None
return
None
current_dir
=
dir_entry
return
current_dir
def
getsyspath
(
self
,
pathname
):
raise
FSError
(
"NO_SYS_PATH"
,
pathname
,
msg
=
"This file-system has no syspath"
)
def
isdir
(
self
,
path
):
dir_item
=
self
.
_get_dir_entry
(
self
.
_resolve
(
path
))
if
dir_item
is
None
:
if
dir_item
is
None
:
return
False
return
dir_item
.
isdir
()
def
isfile
(
self
,
path
):
dir_item
=
self
.
_get_dir_entry
(
self
.
_resolve
(
path
))
if
dir_item
is
None
:
return
False
return
dir_item
.
isfile
()
def
exists
(
self
,
path
):
return
self
.
_getdir
(
path
)
is
not
None
def
mkdir
(
self
,
dirname
,
mode
=
0777
,
recursive
=
False
,
allow_recreate
=
False
):
fullpath
=
dirname
dirpath
,
dirname
=
pathsplit
(
dirname
)
if
recursive
:
dirpath
,
dirname
=
pathsplit
(
dirname
)
if
recursive
:
parent_dir
=
self
.
_get_dir_entry
(
dirpath
)
if
parent_dir
is
not
None
:
if
parent_dir
.
isfile
():
if
parent_dir
.
isfile
():
raise
FSError
(
"NO_DIR"
,
dirname
,
msg
=
"Can not create a directory, because path references a file:
%(path)
s"
)
else
:
else
:
if
not
allow_recreate
:
if
dirname
in
parent_dir
.
contents
:
raise
FSError
(
"NO_DIR"
,
dirname
,
msg
=
"Can not create a directory that already exists (try allow_recreate=True):
%(path)
s"
)
current_dir
=
self
.
root
for
path_component
in
_iteratepath
(
dirpath
)[:
-
1
]:
dir_item
=
current_dir
.
contents
.
get
(
path_component
,
None
)
...
...
@@ -208,8 +219,8 @@ class MemoryFS(FS):
if
not
dir_item
.
isdir
():
raise
FSError
(
"NO_DIR"
,
dirname
,
msg
=
"Can not create a directory, because path references a file:
%(path)
s"
)
current_dir
=
dir_item
.
contents
current_dir
=
self
.
root
current_dir
=
self
.
root
for
path_component
in
_iteratepath
(
dirpath
):
dir_item
=
current_dir
.
contents
.
get
(
path_component
,
None
)
if
dir_item
is
None
:
...
...
@@ -218,114 +229,145 @@ class MemoryFS(FS):
current_dir
=
new_dir
else
:
current_dir
=
dir_item
parent_dir
=
current_dir
else
:
parent_dir
=
self
.
_get_dir_entry
(
dirpath
)
parent_dir
=
current_dir
else
:
parent_dir
=
self
.
_get_dir_entry
(
dirpath
)
if
parent_dir
is
None
:
raise
FSError
(
"NO_DIR"
,
dirname
,
msg
=
"Could not make dir, as parent dir does not exist:
%(path)
s"
)
dir_item
=
parent_dir
.
contents
.
get
(
dirname
,
None
)
if
dir_item
is
not
None
:
dir_item
=
parent_dir
.
contents
.
get
(
dirname
,
None
)
if
dir_item
is
not
None
:
if
dir_item
.
isdir
():
if
not
allow_recreate
:
raise
FSError
(
"DIR_EXISTS"
,
dirname
)
else
:
raise
FSError
(
"NO_DIR"
,
dirname
,
msg
=
"Can not create a directory, because path references a file:
%(path)
s"
)
if
dir_item
is
None
:
parent_dir
.
contents
[
dirname
]
=
self
.
_make_dir_entry
(
"dir"
,
dirname
)
return
self
def
_lock_dir
(
self
,
dir
path
):
dir_entry
=
self
.
_get_dir_entry
(
dir
path
)
def
_lock_dir
_entry
(
self
,
path
):
dir_entry
=
self
.
_get_dir_entry
(
path
)
dir_entry
.
lock
()
def
_unlock_dir
(
self
,
dir
path
):
dir_entry
=
self
.
_get_dir_entry
(
dir
path
)
def
_unlock_dir
_entry
(
self
,
path
):
dir_entry
=
self
.
_get_dir_entry
(
path
)
dir_entry
.
unlock
()
def
_is_dir_locked
(
self
,
dir
path
):
dir_entry
=
self
.
_get_dir_entry
(
dir
path
)
def
_is_dir_locked
(
self
,
path
):
dir_entry
=
self
.
_get_dir_entry
(
path
)
return
dir_entry
.
islocked
()
def
open
(
self
,
path
,
mode
=
"r"
,
**
kwargs
):
filepath
,
filename
=
pathsplit
(
path
)
dir_entry
=
self
.
_get_dir_entry
(
path
)
filepath
,
filename
=
pathsplit
(
path
)
parent_dir_entry
=
self
.
_get_dir_entry
(
filepath
)
if
parent_dir_entry
is
None
or
not
parent_dir_entry
.
isdir
():
raise
FSError
(
"NO_FILE"
,
path
)
if
'r'
in
mode
or
'a'
in
mode
:
if
filename
not
in
parent_dir_entry
.
contents
:
raise
FSError
(
"NO_FILE"
,
path
)
self
.
_lock_dir
(
filepath
)
file_dir_entry
=
parent_dir_entry
.
contents
[
filename
]
if
'a'
in
mode
and
file_dir_entry
.
islocked
():
raise
FSError
(
"FILE_LOCKED"
,
path
)
self
.
_lock_dir_entry
(
path
)
mem_file
=
MemoryFile
(
path
,
self
,
file_dir_entry
.
data
,
mode
)
file_dir_entry
.
num_reading
+=
1
return
mem_file
elif
'w'
in
mode
:
if
filename
not
in
parent_dir_entry
.
contents
:
if
filename
not
in
parent_dir_entry
.
contents
:
file_dir_entry
=
self
.
_make_dir_entry
(
"file"
,
filename
)
parent_dir_entry
.
contents
[
filename
]
=
file_dir_entry
else
:
file_dir_entry
=
parent_dir_entry
.
contents
[
fielname
]
file_dir_entry
=
parent_dir_entry
.
contents
[
filename
]
if
file_dir_entry
.
islocked
():
raise
FSError
(
"FILE_LOCKED"
,
path
)
self
.
_lock_dir_entry
(
path
)
mem_file
=
MemoryFile
(
path
,
self
,
None
,
mode
)
file_dir_entry
.
num_writing
=
1
return
mem_file
if
parent_dir_entry
is
None
:
raise
FSError
(
"NO_FILE"
,
path
)
def
_on_close_memory_file
(
self
,
path
,
value
):
filepath
,
filename
=
pathsplit
(
path
)
self
.
_unlock_dir
(
filepath
)
dir_entry
=
self
.
_get_dir_entry
(
path
)
dir_entry
=
self
.
_get_dir_entry
(
path
)
dir_entry
.
data
=
value
self
.
_unlock_dir_entry
(
path
)
def
listdir
(
self
,
path
=
"/"
,
wildcard
=
None
,
full
=
False
,
absolute
=
False
,
hidden
=
False
,
dirs_only
=
False
,
files_only
=
False
):
dir_entry
=
self
.
_get_dir_entry
(
path
)
paths
=
dir_entry
.
contents
.
keys
()
paths
=
dir_entry
.
contents
.
keys
()
return
self
.
_listdir_helper
(
path
,
paths
,
wildcard
,
full
,
absolute
,
hidden
,
dirs_only
,
files_only
)
def
ishidden
(
self
,
pathname
):
return
False
if
__name__
==
"__main__"
:
def
main
():
mem_fs
=
MemoryFS
()
mem_fs
.
mkdir
(
'test/test2'
,
recursive
=
True
)
mem_fs
.
mkdir
(
'test/A'
,
recursive
=
True
)
mem_fs
.
mkdir
(
'test/A/B'
,
recursive
=
True
)
mem_fs
.
open
(
"test/readme.txt"
,
'w'
)
.
write
(
"Hello, World!"
)
mem_fs
.
open
(
"test/readme.txt"
,
'wa'
)
.
write
(
"
\n
Second Line"
)
print
mem_fs
.
open
(
"test/readme.txt"
,
'r'
)
.
read
()
f1
=
mem_fs
.
open
(
"/test/readme.txt"
,
'r'
)
f2
=
mem_fs
.
open
(
"/test/readme.txt"
,
'r'
)
print
f1
.
read
(
10
)
print
f2
.
read
(
10
)
f1
.
close
()
f2
.
close
()
f3
=
mem_fs
.
open
(
"/test/readme.txt"
,
'w'
)
#print mem_fs.listdir('test')
#print mem_fs.isdir("test/test2")
#print mem_fs.root
print_fs
(
mem_fs
)
from
browsewin
import
browse
browse
(
mem_fs
)
\ No newline at end of file
browse
(
mem_fs
)
if
__name__
==
"__main__"
:
main
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment