Commit 30599d4b by rfkelly0

DAVFS: re-use persistent connections when possible

parent 5686efd7
......@@ -16,6 +16,8 @@ Requires the dexml module:
# Copyright (c) 2009-2010, Cloud Matrix Pty. Ltd.
# All rights reserved; available under the terms of the MIT License.
from __future__ import with_statement
import os
import sys
import httplib
......@@ -26,10 +28,13 @@ from urllib import quote as urlquote
from urllib import unquote as urlunquote
import base64
import re
import time
import datetime
import cookielib
import fnmatch
import xml.dom.pulldom
import threading
from collections import deque
import fs
from fs.base import *
......@@ -72,6 +77,11 @@ class DAVFS(FS):
"http": httplib.HTTPConnection,
"https": httplib.HTTPSConnection,
}
_DEFAULT_PORT_NUMBERS = {
"http": 80,
"https": 443,
}
_meta = { 'virtual' : False,
'read_only' : False,
......@@ -100,6 +110,8 @@ class DAVFS(FS):
self.connection_classes = self.connection_classes.copy()
self.connection_classes.update(connection_classes)
self._connections = []
self._free_connections = {}
self._connection_lock = threading.Lock()
self._cookiejar = cookielib.CookieJar()
super(DAVFS,self).__init__(thread_synchronize=thread_synchronize)
# Check that the server speaks WebDAV, and normalize the URL
......@@ -125,24 +137,75 @@ class DAVFS(FS):
con.close()
super(DAVFS,self).close()
def _add_connection(self,con):
def _take_connection(self,url):
"""Get a connection to the given url's host, re-using if possible."""
scheme = url.scheme.lower()
hostname = url.hostname
port = url.port
if not port:
try:
port = self._DEFAULT_PORT_NUMBERS[scheme]
except KeyError:
msg = "unsupported protocol: '%s'" % (url.scheme,)
raise RemoteConnectionError(msg=msg)
# Can we re-use an existing connection?
with self._connection_lock:
now = time.time()
try:
free_connections = self._free_connections[(hostname,port)]
except KeyError:
self._free_connections[(hostname,port)] = deque()
free_connections = self._free_connections[(hostname,port)]
else:
while free_connections:
(when,con) = free_connections.popleft()
if when + 30 > now:
return (False,con)
self._discard_connection(con)
# Nope, we need to make a fresh one.
try:
ConClass = self.connection_classes[scheme]
except KeyError:
msg = "unsupported protocol: '%s'" % (url.scheme,)
raise RemoteConnectionError(msg=msg)
con = ConClass(url.hostname,url.port,timeout=self.timeout)
self._connections.append(con)
return (True,con)
def _give_connection(self,url,con):
"""Return a connection to the pool, or destroy it if dead."""
scheme = url.scheme.lower()
hostname = url.hostname
port = url.port
if not port:
try:
port = self._DEFAULT_PORT_NUMBERS[scheme]
except KeyError:
msg = "unsupported protocol: '%s'" % (url.scheme,)
raise RemoteConnectionError(msg=msg)
with self._connection_lock:
now = time.time()
try:
free_connections = self._free_connections[(hostname,port)]
except KeyError:
self._free_connections[(hostname,port)] = deque()
free_connections = self._free_connections[(hostname,port)]
free_connections.append((now,con))
def _del_connection(self,con):
try:
self._connections.remove(con)
except ValueError:
pass
else:
con.close()
def _discard_connection(self,con):
con.close()
self._connections.remove(con)
def __str__(self):
return '<DAVFS: %s>' % (self.url,)
__repr__ = __str__
def __getstate__(self):
# Python2.5 cannot load pickled urlparse.ParseResult objects.
state = super(DAVFS,self).__getstate__()
del state["_connection_lock"]
del state["_connections"]
del state["_free_connections"]
# Python2.5 cannot load pickled urlparse.ParseResult objects.
del state["_url_p"]
# CookieJar objects contain a lock, so they can't be pickled.
del state["_cookiejar"]
......@@ -150,6 +213,9 @@ class DAVFS(FS):
def __setstate__(self,state):
super(DAVFS,self).__setstate__(state)
self._connections = []
self._free_connections = {}
self._connection_lock = threading.Lock()
self._url_p = urlparse(self.url)
self._cookiejar = cookielib.CookieJar()
......@@ -165,7 +231,7 @@ class DAVFS(FS):
def _url2path(self,url):
"""Convert a server-side URL into a client-side path."""
path = urlunquote(urlparse(url).path)
root = self._url_p.path
root = urlunquote(self._url_p.path)
path = path[len(root)-1:].decode("utf8")
while path.endswith("/"):
path = path[:-1]
......@@ -232,13 +298,7 @@ class DAVFS(FS):
headers["Authorization"] = creds
(size,chunks) = normalize_req_body(body)
try:
try:
ConClass = self.connection_classes[url.scheme.lower()]
except KeyError:
msg = "unsupported protocol: '%s'" % (url.scheme,)
raise RemoteConnectionError(msg=msg)
con = ConClass(url.hostname,url.port,timeout=self.timeout)
self._add_connection(con)
(fresh,con) = self._take_connection(url)
try:
con.putrequest(method,url.path)
if size is not None:
......@@ -256,17 +316,21 @@ class DAVFS(FS):
raise RemoteConnectionError("",msg="FS is closed")
resp = con.getresponse()
self._cookiejar.extract_cookies(FakeResp(resp),FakeReq(con,url.scheme,url.path))
except Exception, e:
self._del_connection(con)
except Exception:
self._discard_connection(con)
raise
else:
old_close = resp.close
def new_close():
del resp.close
old_close()
self._del_connection(con)
con.close()
self._give_connection(url,con)
resp.close = new_close
return resp
except socket.error, e:
if not fresh:
return self._raw_request(url,method,body,headers,num_tries)
if e.args[0] in _RETRYABLE_ERRORS:
if num_tries < 3:
num_tries += 1
......
......@@ -134,9 +134,9 @@ class response(_davbase):
"""XML model for an individual response in a multi-status message."""
href = HrefField()
# TODO: ensure only one of hrefs/propstats
hrefs = fields.List(HrefField(),minlength=1,required=False)
hrefs = fields.List(HrefField(),required=False)
status = StatusField(required=False)
propstats = fields.List("propstat",minlenth=1,required=False)
propstats = fields.List("propstat",required=False)
description = fields.String(tagname="responsedescription",required=False)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment