Commit 5686efd7 by willmcgugan

Work in progress for a new filesystem server

parent 0001fb0a
#!/usr/bin/env python #!/usr/bin/env python
from fs.errors import FSError
from fs.opener import opener from fs.opener import opener
from fs.path import pathsplit, abspath, isdotfile, iswildcard from fs.path import pathsplit, abspath, isdotfile, iswildcard
from fs.commands.runner import Command from fs.commands.runner import Command
......
...@@ -43,26 +43,27 @@ else: ...@@ -43,26 +43,27 @@ else:
def ioctl_GWINSZ(fd): def ioctl_GWINSZ(fd):
try: try:
import fcntl, termios, struct, os import fcntl, termios, struct, os
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
'1234'))
except: except:
return None return None
return cr return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr: if not cr:
import os
try: try:
fd = os.open(os.ctermid(), os.O_RDONLY) fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd) cr = ioctl_GWINSZ(fd)
os.close(fd) os.close(fd)
except: except:
pass pass
if not cr: if cr:
return int(cr[1]), int(cr[0])
try: try:
cr = (env['LINES'], env['COLUMNS']) h, w = os.popen("stty size", "r").read().split()
return int(w), int(h)
except: except:
cr = (25, 80) pass
return int(cr[1]), int(cr[0]) return 80, 25
def _unicode(text): def _unicode(text):
if not isinstance(text, unicode): if not isinstance(text, unicode):
......
# Work in progress
\ No newline at end of file
try:
from json import dumps, loads
except ImportError:
from simplejson import dumps, loads
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
def encode(header='', payload=''):
def textsize(s):
if s:
return str(len(s))
return ''
return '%i,%i:%s%s' % (textsize(header), textsize(payload), header, payload)
class FileEncoder(object):
def __init__(self, f):
self.f = f
def write(self, header='', payload=''):
fwrite = self.f.write
def textsize(s):
if s:
return str(len(s))
return ''
fwrite('%s,%s:' % (textsize(header), textsize(payload)))
if header:
fwrite(header)
if payload:
fwrite(payload)
class JSONFileEncoder(FileEncoder):
def write(self, header=None, payload=''):
if header is None:
super(JSONFileEncoder, self).write('', payload)
else:
header_json = dumps(header, separators=(',', ':'))
super(JSONFileEncoder, self).write(header_json, payload)
class DecoderError(Exception):
pass
class PreludeError(DecoderError):
pass
class Decoder(object):
STAGE_PRELUDE, STAGE_SIZE, STAGE_HEADER, STAGE_PAYLOAD = range(4)
MAX_PRELUDE = 255
def __init__(self, no_prelude=False, prelude_callback=None):
self.prelude_callback = prelude_callback
self.stream_broken = False
self.expecting_bytes = None
self.stage = self.STAGE_PRELUDE
self._prelude = []
self._size = []
self._expecting_bytes = None
self.header_size = None
self.payload_size = None
self._header_bytes = None
self._payload_bytes = None
self._header_data = []
self._payload_data = []
self.header = None
self.payload = None
if no_prelude:
self.stage = self.STAGE_SIZE
def feed(self, data):
if self.stream_broken:
raise DecoderError('Stream is broken')
STAGE_PRELUDE, STAGE_SIZE, STAGE_HEADER, STAGE_PAYLOAD = range(4)
size_append = self._size.append
header_append = self._header_data.append
payload_append = self._payload_data.append
datafind = data.find
def reset_packet():
self.expecting_bytes = None
del self._header_data[:]
del self._payload_data[:]
self.header = None
self.payload = None
data_len = len(data)
data_pos = 0
expecting_bytes = self.expecting_bytes
stage = self.stage
if stage == STAGE_PRELUDE:
max_find = min(len(data), data_pos + self.MAX_PRELUDE)
cr_pos = datafind('\n', data_pos, max_find)
if cr_pos == -1:
self._prelude.append(data[data_pos:])
data_pos = max_find
if sum(len(s) for s in self._prelude) > self.MAX_PRELUDE:
self.stream_broken = True
raise PreludeError('Prelude not found')
else:
self._prelude.append(data[data_pos:cr_pos])
if sum(len(s) for s in self._prelude) > self.MAX_PRELUDE:
self.stream_broken = True
raise PreludeError('Prelude not found')
data_pos = cr_pos + 1
prelude = ''.join(self._prelude)
del self._prelude[:]
reset_packet()
if not self.on_prelude(prelude):
self.broken = True
return
stage = STAGE_SIZE
while data_pos < data_len:
if stage == STAGE_HEADER:
bytes_to_read = min(data_len - data_pos, expecting_bytes)
header_append(data[data_pos:data_pos + bytes_to_read])
data_pos += bytes_to_read
expecting_bytes -= bytes_to_read
if not expecting_bytes:
self.header = ''.join(self._header_data)
if not self.payload_size:
yield self.header, ''
reset_packet()
expecting_bytes = None
stage = STAGE_SIZE
else:
stage = STAGE_PAYLOAD
expecting_bytes = self.payload_size
elif stage == STAGE_PAYLOAD:
bytes_to_read = min(data_len - data_pos, expecting_bytes)
payload_append(data[data_pos:data_pos + bytes_to_read])
data_pos += bytes_to_read
expecting_bytes -= bytes_to_read
if not expecting_bytes:
self.payload = ''.join(self._payload_data)
yield self.header, self.payload
reset_packet()
stage = STAGE_SIZE
expecting_bytes = None
elif stage == STAGE_SIZE:
term_pos = datafind(':', data_pos)
if term_pos == -1:
size_append(data[data_pos:])
break
else:
size_append(data[data_pos:term_pos])
data_pos = term_pos + 1
size = ''.join(self._size)
del self._size[:]
if ',' in size:
header_size, payload_size = size.split(',', 1)
else:
header_size = size
payload_size = ''
try:
self.header_size = int(header_size or '0')
self.payload_size = int(payload_size or '0')
except ValueError:
self.stream_broken = False
raise DecoderError('Invalid size in packet (%s)' % size)
if self.header_size:
expecting_bytes = self.header_size
stage = STAGE_HEADER
elif self.payload_size:
expecting_bytes = self.payload_size
stage = STAGE_PAYLOAD
else:
# A completely empty packet, permitted, if a little odd
yield '', ''
reset_packet()
expecting_bytes = None
self.expecting_bytes = expecting_bytes
self.stage = stage
def on_prelude(self, prelude):
if self.prelude_callback and not self.prelude_callback(self, prelude):
return False
#pass
#print "Prelude:", prelude
return True
class JSONDecoder(Decoder):
def feed(self, data):
for header, payload in Decoder.feed(self, data):
if header:
header = loads(header)
else:
header = {}
yield header, payload
if __name__ == "__main__":
f = StringIO()
encoder = JSONFileEncoder(f)
encoder.write(dict(a=1, b=2), 'Payload')
encoder.write(dict(foo="bar", nested=dict(apples="oranges"), alist=range(5)), 'Payload goes here')
encoder.write(None, 'Payload')
encoder.write(dict(a=1))
encoder.write()
stream = 'prelude\n' + f.getvalue()
#print stream
# packets = ['Prelude string\n',
# encode('header', 'payload'),
# encode('header number 2', 'second payload'),
# encode('', '')]
#
# stream = ''.join(packets)
decoder = JSONDecoder()
stream = 'pyfs/0.1\n59,13:{"type":"rpc","method":"ping","client_ref":"-1221142848:1"}Hello, World!'
fdata = StringIO(stream)
while 1:
data = fdata.read(3)
if not data:
break
for header, payload in decoder.feed(data):
print "Header:", repr(header)
print "Payload:", repr(payload)
\ No newline at end of file
from __future__ import with_statement
import socket
import threading
from packetstream import JSONDecoder, JSONFileEncoder
class _SocketFile(object):
def __init__(self, socket):
self.socket = socket
def read(self, size):
try:
return self.socket.recv(size)
except socket.error:
return ''
def write(self, data):
self.socket.sendall(data)
class ConnectionThread(threading.Thread):
def __init__(self, server, connection_id, socket, address):
super(ConnectionThread, self).__init__()
self.server = server
self.connection_id = connection_id
self.socket = socket
self.transport = _SocketFile(socket)
self.address = address
self.encoder = JSONFileEncoder(self.transport)
self.decoder = JSONDecoder(prelude_callback=self.on_stream_prelude)
self._lock = threading.RLock()
self.socket_error = None
self.fs = None
def run(self):
self.transport.write('pyfs/1.0\n')
while True:
try:
data = self.transport.read(4096)
except socket.error, socket_error:
print socket_error
self.socket_error = socket_error
break
print "data", repr(data)
if data:
for packet in self.decoder.feed(data):
print repr(packet)
self.on_packet(*packet)
else:
break
self.on_connection_close()
def close(self):
with self._lock:
self.socket.close()
def on_connection_close(self):
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
self.server.on_connection_close(self.connection_id)
def on_stream_prelude(self, packet_stream, prelude):
print "prelude", prelude
return True
def on_packet(self, header, payload):
print '-' * 30
print repr(header)
print repr(payload)
if header['method'] == 'ping':
self.encoder.write({'client_ref':header['client_ref']}, payload)
class Server(object):
def __init__(self, addr='', port=3000):
self.addr = addr
self.port = port
self.socket = None
self.connection_id = 0
self.threads = {}
self._lock = threading.RLock()
def serve_forever(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((self.addr, self.port))
sock.listen(5)
try:
while True:
clientsocket, address = sock.accept()
self.on_connect(clientsocket, address)
except KeyboardInterrupt:
pass
try:
self._close_graceful()
except KeyboardInterrupt:
self._close_harsh()
def _close_graceful(self):
"""Tell all threads to exit and wait for them"""
with self._lock:
for connection in self.threads.itervalues():
connection.close()
for connection in self.threads.itervalues():
connection.join()
self.threads.clear()
def _close_harsh(self):
with self._lock:
for connection in self.threads.itervalues():
connection.close()
self.threads.clear()
def on_connect(self, clientsocket, address):
print "Connection from", address
with self._lock:
self.connection_id += 1
thread = ConnectionThread(self,
self.connection_id,
clientsocket,
address)
self.threads[self.connection_id] = thread
thread.start()
def on_connection_close(self, connection_id):
pass
#with self._lock:
# self.threads[connection_id].join()
# del self.threads[connection_id]
if __name__ == "__main__":
server = Server()
server.serve_forever()
\ No newline at end of file
import threading
import Queue as queue
def make_job(job_callable, *args, **kwargs):
""" Returns a callable that calls the supplied callable with given arguements. """
def job():
return job_callable(*args, **kwargs)
return job
class _PoolThread(threading.Thread):
""" Internal thread class that runs jobs. """
def __init__(self, queue, name):
super(_PoolThread, self).__init__()
self.queue = queue
self.name = name
def __str__(self):
return self.name
def run(self):
while True:
try:
_priority, job = self.queue.get()
except queue.Empty:
break
if job is None:
break
if callable(job):
try:
job()
except Exception, e:
print e
self.queue.task_done()
class ThreadPool(object):
def __init__(self, num_threads, size=None, name=''):
self.num_threads = num_threads
self.name = name
self.queue = queue.PriorityQueue(size)
self.job_no = 0
self.threads = [_PoolThread(self.queue, '%s #%i' % (name, i)) for i in xrange(num_threads)]
for thread in self.threads:
thread.start()
def _make_priority_key(self, i):
no = self.job_no
self.job_no += 1
return (i, no)
def job(self, job_callable, *args, **kwargs):
""" Post a job to the queue. """
def job():
return job_callable(*args, **kwargs)
self.queue.put( (self._make_priority_key(1), job), True )
return self.job_no
def flush_quit(self):
""" Quit after all tasks on the queue have been processed. """
for thread in self.threads:
self.queue.put( (self._make_priority_key(1), None) )
for thread in self.threads:
thread.join()
def quit(self):
""" Quit as soon as possible, potentially leaving tasks on the queue. """
for thread in self.threads:
self.queue.put( (self._make_priority_key(0), None) )
for thread in self.threads:
thread.join()
if __name__ == "__main__":
import time
def job(n):
print "Starting #%i" % n
time.sleep(1)
print "Ending #%i" % n
pool = ThreadPool(5, 'test thread')
for n in range(20):
pool.job(job, n)
pool.flush_quit()
\ No newline at end of file
# Work in Progress - Do not use
from fs.base import FS
from fs.expose.serve import packetstream
from collections import defaultdict
import threading
from threading import Lock, RLock
from json import dumps
import Queue as queue
import socket
class PacketHandler(threading.Thread):
def __init__(self, transport, prelude_callback=None):
super(PacketHandler, self).__init__()
self.transport = transport
self.encoder = packetstream.JSONFileEncoder(transport)
self.decoder = packetstream.JSONDecoder(prelude_callback=None)
self.queues = defaultdict(queue.Queue)
self._encoder_lock = threading.Lock()
self._queues_lock = threading.Lock()
self._call_id_lock = threading.Lock()
self.call_id = 0
def run(self):
decoder = self.decoder
read = self.transport.read
on_packet = self.on_packet
while True:
data = read(1024*16)
if not data:
print "No data"
break
print "data", repr(data)
for header, payload in decoder.feed(data):
print repr(header)
print repr(payload)
on_packet(header, payload)
def _new_call_id(self):
with self._call_id_lock:
self.call_id += 1
return self.call_id
def get_thread_queue(self, queue_id=None):
if queue_id is None:
queue_id = threading.current_thread().ident
with self._queues_lock:
return self.queues[queue_id]
def send_packet(self, header, payload=''):
call_id = self._new_call_id()
queue_id = threading.current_thread().ident
client_ref = "%i:%i" % (queue_id, call_id)
header['client_ref'] = client_ref
with self._encoder_lock:
self.encoder.write(header, payload)
return call_id
def get_packet(self, call_id):
if call_id is not None:
queue_id = threading.current_thread().ident
client_ref = "%i:%i" % (queue_id, call_id)
else:
client_ref = None
queue = self.get_thread_queue()
while True:
header, payload = queue.get()
print repr(header)
print repr(payload)
if client_ref is not None and header.get('client_ref') != client_ref:
continue
break
return header, payload
def on_packet(self, header, payload):
client_ref = header.get('client_ref', '')
queue_id, call_id = client_ref.split(':', 1)
queue_id = int(queue_id)
#queue_id = header.get('queue_id', '')
queue = self.get_thread_queue(queue_id)
queue.put((header, payload))
class _SocketFile(object):
def __init__(self, socket):
self.socket = socket
def read(self, size):
try:
return self.socket.recv(size)
except:
return ''
def write(self, data):
self.socket.sendall(data)
def close(self):
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
class RemoteFS(FS):
def __init__(self, addr='', port=3000, transport=None):
self.addr = addr
self.port = port
self.transport = transport
if self.transport is None:
self.transport = self._open_connection()
self.packet_handler = PacketHandler(self.transport)
self.packet_handler.start()
def _open_connection(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.addr, self.port))
socket_file = _SocketFile(sock)
socket_file.write('pyfs/0.1\n')
return socket_file
def ping(self, msg):
call_id = self.packet_handler.send_packet({'type':'rpc', 'method':'ping'}, msg)
header, payload = self.packet_handler.get_packet(call_id)
print "PING"
print header
print payload
def close(self):
self.transport.close()
self.packet_handler.join()
if __name__ == "__main__":
rfs = RemoteFS()
rfs.ping("Hello, World!")
rfs.close()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment