Commit 37c29cb3 by David Robinson

Fixed merge conflicts- new version as per #8 should be ready.

parents 5905c7bb 4d66c30f
...@@ -20,14 +20,125 @@ import collections ...@@ -20,14 +20,125 @@ import collections
import re import re
import logging import logging
from query import QueryManager
API_ROOT = 'https://api.parse.com/1' API_ROOT = 'https://api.parse.com/1'
APPLICATION_ID = '' APPLICATION_ID = ''
REST_API_KEY = '' REST_API_KEY = ''
MASTER_KEY = ''
class ParseBinaryDataWrapper(str): class ParseType(object):
pass
@staticmethod
def convert(parse_data):
is_parse_type = isinstance(parse_data, dict) and '__type' in parse_data
if not is_parse_type:
return parse_data
parse_type = parse_data['__type']
native = {
'Pointer': Pointer,
'Date': Date,
'Bytes': Binary,
'GeoPoint': GeoPoint,
'File': File,
'Relation': Relation
}.get(parse_type)
return native and native.from_native(**parse_data) or parse_data
@classmethod
def from_native(cls, **kw):
return cls(**kw)
def _to_native(self):
return self._value
class Pointer(ParseType):
@classmethod
def from_native(cls, **kw):
klass = Object.factory(kw.get('className'))
return klass.retrieve(kw.get('objectId'))
def _to_native(self):
return {
'__type': 'Pointer',
'className': self.__class__.__name__,
'objectId': self.objectId
}
class Relation(ParseType):
@classmethod
def from_native(cls, **kw):
pass
class Date(ParseType):
FORMAT = '%Y-%m-%dT%H:%M:%S.%f%Z'
@classmethod
def from_native(cls, **kw):
date_str = kw.get('iso', '')[:-1] + 'UTC'
return cls(datetime.datetime.strptime(date_str, Date.FORMAT))
def __init__(self, date):
self._date = date
def _to_native(self):
return {
'__type': 'Date', 'iso': self._date.isoformat()
}
class Binary(ParseType):
@classmethod
def from_native(self, **kw):
return cls(kw.get('base64', ''))
def __init__(self, encoded_string):
self._encoded = encoded_string
self._decoded = str(base64.b64decode(self._encoded))
def _to_native(self):
return {'__type': 'Bytes', 'base64': self._encoded}
class GeoPoint(ParseType):
@classmethod
def from_native(self, **kw):
return cls(kw.get('latitude'), kw.get('longitude'))
def __init__(self, latitude, longitude):
self.latitude = latitude
self.longitude = longitude
def _to_native(self):
return {
'__type': 'GeoPoint',
'latitude': self.latitude,
'longitude': self.longitude
}
class File(ParseType):
@classmethod
def from_native(self, **kw):
return cls(kw.get('url'), kw.get('name'))
def __init__(self, url, name):
request = urllib2.Request(url)
self._name = name
self._url = url
self._file = urllib2.urlopen(request)
class ParseBase(object): class ParseBase(object):
...@@ -48,16 +159,21 @@ class ParseBase(object): ...@@ -48,16 +159,21 @@ class ParseBase(object):
request = urllib2.Request(url, data, headers) request = urllib2.Request(url, data, headers)
request.add_header('Content-type', 'application/json') request.add_header('Content-type', 'application/json')
#auth_header = "Basic %s" % base64.b64encode('%s:%s' %
# (APPLICATION_ID, REST_API_KEY))
#request.add_header("Authorization", auth_header)
request.add_header("X-Parse-Application-Id", APPLICATION_ID) request.add_header("X-Parse-Application-Id", APPLICATION_ID)
request.add_header("X-Parse-REST-API-Key", REST_API_KEY) request.add_header("X-Parse-REST-API-Key", REST_API_KEY)
request.get_method = lambda: http_verb request.get_method = lambda: http_verb
# TODO: add error handling for server response try:
response = urllib2.urlopen(request) response = urllib2.urlopen(request)
except urllib2.HTTPError, e:
raise {
400: ResourceRequestBadRequest,
401: ResourceRequestLoginRequired,
403: ResourceRequestForbidden,
404: ResourceRequestNotFound
}.get(e.code, ParseError)
return json.loads(response.read()) return json.loads(response.read())
@classmethod @classmethod
...@@ -76,46 +192,6 @@ class ParseBase(object): ...@@ -76,46 +192,6 @@ class ParseBase(object):
def DELETE(cls, uri, **kw): def DELETE(cls, uri, **kw):
return cls.execute(uri, 'DELETE', **kw) return cls.execute(uri, 'DELETE', **kw)
@property
def _attributes(self):
# return "public" attributes converted to the base parse representation
return dict([
self._convertToParseType(p) for p in self.__dict__.items()
if p[0][0] != '_'
])
def _isGeoPoint(self, value):
if isinstance(value, str):
return re.search('\\bPOINT\\(([-+]?[0-9]*\\.?[0-9]*) ' +
'([-+]?[0-9]*\\.?[0-9]*)\\)', value, re.I)
def _ISO8601ToDatetime(self, date_string):
# TODO: verify correct handling of timezone
date_string = date_string[:-1] + 'UTC'
return datetime.datetime.strptime(date_string,
'%Y-%m-%dT%H:%M:%S.%f%Z')
def _convertToParseType(self, prop):
key, value = prop
if type(value) == Object:
value = {'__type': 'Pointer',
'className': value._class_name,
'objectId': value._object_id}
elif type(value) == datetime.datetime:
# take off the last 3 digits and add a Z
value = {'__type': 'Date', 'iso': value.isoformat()[:-3] + 'Z'}
elif type(value) == ParseBinaryDataWrapper:
value = {'__type': 'Bytes',
'base64': base64.b64encode(value)}
elif self._isGeoPoint(value):
coordinates = re.findall('[-+]?[0-9]+\\.?[0-9]*', value)
value = {'__type': 'GeoPoint',
'longitude': float(coordinates[0]),
'latitude': float(coordinates[1])}
return (key, value)
class Function(ParseBase): class Function(ParseBase):
ENDPOINT_ROOT = "/".join((API_ROOT, "functions")) ENDPOINT_ROOT = "/".join((API_ROOT, "functions"))
...@@ -128,376 +204,159 @@ class Function(ParseBase): ...@@ -128,376 +204,159 @@ class Function(ParseBase):
class ParseResource(ParseBase): class ParseResource(ParseBase):
def __init__(self, **kw):
self._object_id = kw.pop('objectId', None)
self._updated_at = kw.pop('updatedAt', None)
self._created_at = kw.pop('createdAt', None)
for attr, value in kw.items(): PROTECTED_ATTRIBUTES = ['objectId', 'createdAt', 'updatedAt']
self.__dict__[attr] = value
@classmethod @classmethod
def retrieve(cls, resource_id): def retrieve(cls, resource_id):
return cls(**cls.GET('/' + resource_id)) return cls(**cls.GET('/' + resource_id))
_absolute_url = property(lambda self: '/'.join( def __init__(self, **kw):
[self.__class__.ENDPOINT_ROOT, self._object_id])) for key, value in kw.items():
setattr(self, key, value)
def objectId(self): def _to_native(self):
return self._object_id # serializes all attributes that need to be persisted on Parse
def updatedAt(self): protected_attributes = self.__class__.PROTECTED_ATTRIBUTES
return (self._updated_at and self._ISO8601ToDatetime(self._updated_at) is_protected = lambda a: a in protected_attributes or a.startswith('_')
or None)
def createdAt(self): return dict([(k, v._to_native() if isinstance(v, ParseType) else v)
return (self._created_at and self._ISO8601ToDatetime(self._created_at) for k, v in self.__dict__.items() if not is_protected(k)
or None) ])
def _get_object_id(self):
return getattr(self, '_object_id', None)
class Installation(ParseResource): def _set_object_id(self, value):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'installations']) if hasattr(self, '_object_id'):
raise ValueError('Can not re-set object id')
self._object_id = value
def _get_updated_datetime(self):
return getattr(self, '_updated_at', None) and self._updated_at._date
class Object(ParseResource): def _set_updated_datetime(self, value):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'classes']) self._updated_at = Date(value)
def __init__(self, class_name, attrs_dict=None): def _get_created_datetime(self):
self._class_name = class_name return getattr(self, '_created_at', None) and self._created_at._date
self._object_id = None
self._updated_at = None
self._created_at = None
if attrs_dict: def _set_created_datetime(self, value):
self._populateFromDict(attrs_dict) self._created_at = Date(value)
def save(self): def save(self):
if self._object_id: if self.objectId:
self._update() self._update()
else: else:
self._create() self._create()
def delete(self):
# URL: /1/classes/<className>/<objectId>
# HTTP Verb: DELETE
self.__class__.DELETE('/%s/%s' % (self._class_name, self._object_id))
self = self.__init__(None)
def increment(self, key, amount=1):
"""
Increment one value in the object. Note that this happens immediately:
it does not wait for save() to be called
"""
uri = '/%s/%s' % (self._class_name, self._object_id)
payload = {
key: {
'__op': 'Increment',
'amount': amount
}
}
self._populateFromDict(self.__class__.execute(uri, 'PUT', **payload))
def has(self, attr):
return attr in self.__dict__
def remove(self, attr):
self.__dict__.pop(attr)
# dictionary and list-like methods
def __contains__(self, k):
return not k.startswith("_") and k in self.__dict__
def __iter__(self):
return (k for k in self.__dict__ if not k.startswith("_"))
def __getitem__(self, key):
if key.startswith("_"):
raise KeyError("Cannot access this value in this way")
return self.__dict__[key]
def __setitem__(self, key, value):
if key.startswith("_"):
raise KeyError("Cannot change this value in this way")
self.__dict__[key] = value
def items(self):
return self._attributes.items()
def refresh(self):
uri = '/%s/%s' % (self._class_name, self._object_id)
response_dict = self.__class__.execute(uri, 'GET')
self._populateFromDict(response_dict)
def _populateFromDict(self, attrs_dict):
if 'objectId' in attrs_dict:
self._object_id = attrs_dict['objectId']
del attrs_dict['objectId']
if 'createdAt' in attrs_dict:
self._created_at = attrs_dict['createdAt']
del attrs_dict['createdAt']
if 'updatedAt' in attrs_dict:
self._updated_at = attrs_dict['updatedAt']
del attrs_dict['updatedAt']
attrs_dict = dict(map(self._convertFromParseType, attrs_dict.items()))
self.__dict__.update(attrs_dict)
def _convertFromParseType(self, prop):
key, value = prop
if type(value) == dict and '__type' in value:
if value['__type'] == 'Pointer':
value = ObjectQuery(value['className']).get(value['objectId'])
elif value['__type'] == 'Date':
value = self._ISO8601ToDatetime(value['iso'])
elif value['__type'] == 'Bytes':
value = ParseBinaryDataWrapper(base64.b64decode(
value['base64']))
elif value['__type'] == 'GeoPoint':
value = 'POINT(%s %s)' % (value['longitude'],
value['latitude'])
else:
raise Exception('Invalid __type.')
return (key, value)
def _create(self): def _create(self):
# URL: /1/classes/<className> # URL: /1/classes/<className>
# HTTP Verb: POST # HTTP Verb: POST
uri = '/%s' % self._class_name uri = self.__class__.ENDPOINT_ROOT
response_dict = self.__class__.POST(uri, **self._attributes) response_dict = self.__class__.POST(uri, **self._to_native())
self._created_at = self._updated_at = response_dict['createdAt'] self.createdAt = self.updatedAt = response_dict['createdAt']
self._object_id = response_dict['objectId'] self.objectId = response_dict['objectId']
def _update(self): def _update(self):
# URL: /1/classes/<className>/<objectId> # URL: /1/classes/<className>/<objectId>
# HTTP Verb: PUT # HTTP Verb: PUT
uri = '/%s/%s' % (self._class_name, self._object_id) response = self.__class__.PUT(self._absolute_url, **self._to_native())
self.updatedAt = response['updatedAt']
response_dict = self.__class__.PUT(uri, **self._attributes) def delete(self):
self._updated_at = response_dict['updatedAt'] self.__class__.DELETE(self._absolute_url)
self.__dict__ = {}
_absolute_url = property(
lambda self: '/'.join([self.__class__.ENDPOINT_ROOT, self.objectId])
)
class User(Object): objectId = property(_get_object_id, _set_object_id)
""" createdAt = property(_get_created_datetime, _set_created_datetime)
A User is like a regular Parse object (can be modified and saved) but updatedAt = property(_get_updated_datetime, _set_created_datetime)
it requires additional methods and functionality
"""
ENDPOINT_ROOT = '/'.join([API_ROOT, 'users'])
def __init__(self, username, password=None, **kw):
"""
Initialized with a username and possibly password along with any other
attributes (name, phone number...)
"""
kw["username"] = username
kw["password"] = password
Object.__init__(self, "", kw)
@classmethod class ObjectMetaclass(type):
def retrieve(cls, resource_id): def __new__(cls, name, bases, dct):
"""retrieve a user by its ID""" cls = super(ObjectMetaclass, cls).__new__(cls, name, bases, dct)
ret = cls.GET('/' + resource_id) cls.set_endpoint_root()
username = ret.pop("username") cls.Query = QueryManager(cls)
return cls(username, **ret) return cls
def needs_session(func):
"""decorator describing User methods that need to be logged in"""
def ret(obj, *a, **kw):
if not hasattr(obj, "sessionToken"):
raise ParseError("%s requires a logged-in session" %
(func.__name__, ))
func(obj, *a, **kw)
return ret
def signup(self, **kw):
"""same as creating an object, with handling if user already exists"""
try:
self._create()
except urllib2.HTTPError as e:
if "400" in str(e):
raise ParseError("User already exists")
else:
raise
def login(self):
try:
ret = self.GET('/'.join([API_ROOT, 'login']),
username=self.username, password=self.password)
except urllib2.HTTPError:
raise ParseError("Invalid login")
# update all attributes
self._populateFromDict(ret)
@needs_session
def save(self):
session_header = {'X-Parse-Session-Token': self.sessionToken}
# remove items you can't change
save_dict = {k: v for k, v in self._attributes.items()
if k not in ["username", "password", "sessionToken"]}
return self.__class__.PUT(
self._absolute_url, extra_headers=session_header,
**save_dict)
@needs_session class Object(ParseResource):
def delete(self): __metaclass__ = ObjectMetaclass
session_header = {'X-Parse-Session-Token': self.sessionToken} ENDPOINT_ROOT = '/'.join([API_ROOT, 'classes'])
return self.DELETE("/" + self.objectId(), extra_headers=session_header)
@classmethod @classmethod
def request_password_reset(cls, email): def factory(cls, class_name):
"""reset a user's password using his email""" class DerivedClass(cls):
return self.POST('/'.join([API_ROOT, 'requestPasswordReset']), pass
email=email) DerivedClass.__name__ = class_name
return DerivedClass
class Push(ParseResource):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'push'])
@classmethod @classmethod
def send(cls, message, channels=None, **kw): def set_endpoint_root(cls):
alert_message = {'alert': message} root = '/'.join([API_ROOT, 'classes', cls.__name__])
targets = {} if cls.ENDPOINT_ROOT != root:
if channels: cls.ENDPOINT_ROOT = root
targets['channels'] = channels return cls.ENDPOINT_ROOT
if kw:
targets['where'] = kw def __new__(cls, *args, **kw):
return cls.POST('', data=alert_message, **targets) cls.set_endpoint_root()
manager = getattr(cls, 'Query', QueryManager(cls))
if not (hasattr(cls, 'Query') and manager.model_class is cls):
class Query(ParseBase): cls.Query = manager
return ParseResource.__new__(cls)
def __init__(self):
self._where = collections.defaultdict(dict)
self._options = {}
def eq(self, name, value):
self._where[name] = value
return self
# It's tempting to generate the comparison functions programatically,
# but probably not worth the decrease in readability of the code.
def lt(self, name, value):
self._where[name]['$lt'] = value
return self
def lte(self, name, value):
self._where[name]['$lte'] = value
return self
def gt(self, name, value):
self._where[name]['$gt'] = value
return self
def gte(self, name, value):
self._where[name]['$gte'] = value
return self
def ne(self, name, value):
self._where[name]['$ne'] = value
return self
def order(self, order, decending=False):
# add a minus sign before the order value if decending == True
self._options['order'] = decending and ('-' + order) or order
return self
def limit(self, limit):
self._options['limit'] = limit
return self
def skip(self, skip):
self._options['skip'] = skip
return self
def get(self, object_id):
return self.__class__.QUERY_CLASS.retrieve(object_id)
def fetch(self):
# hide the single_result param of the _fetch method from the library
# user since it's only useful internally
return self._fetch()
def _fetch(self):
options = dict(self._options) # make a local copy
if self._where:
# JSON encode WHERE values
where = json.dumps(self._where)
options.update({'where': where})
response = self.__class__.GET('', **options)
return [self.__class__.QUERY_CLASS(**result)
for result in response['results']]
class ObjectQuery(Query):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'classes'])
def __init__(self, class_name):
self._class_name = class_name
self._where = collections.defaultdict(dict)
self._options = {}
self._object_id = ''
def get(self, object_id): @property
self._object_id = object_id def _absolute_url(self):
return self._fetch(single_result=True) if not self.objectId:
return None
def _fetch(self, single_result=False): return '/'.join([self.__class__.ENDPOINT_ROOT, self.objectId])
# URL: /1/classes/<className>/<objectId>
# HTTP Verb: GET
if self._object_id: def increment(self, key, amount=1):
response = self.__class__.GET('/%s/%s' % (self._class_name, """
self._object_id)) Increment one value in the object. Note that this happens immediately:
else: it does not wait for save() to be called
options = dict(self._options) # make a local copy """
if self._where: payload = {
# JSON encode WHERE values key: {
where = json.dumps(self._where) '__op': 'Increment',
options.update({'where': where}) 'amount': amount
}
}
self.__class__.PUT(self._absolute_url, **payload)
self.__dict__[key] += amount
response = self.__class__.GET('/%s' % self._class_name, **options)
if single_result: class ParseError(Exception):
return Object(self._class_name, response) '''Base exceptions from requests made to Parse'''
else: pass
return [Object(self._class_name, result)
for result in response['results']]
class UserQuery(Query): class ResourceRequestBadRequest(ParseError):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'users']) '''Request returns a 400'''
QUERY_CLASS = User pass
class InstallationQuery(Query): class ResourceRequestLoginRequired(ParseError):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'installations']) '''Request returns a 401'''
QUERY_CLASS = Installation pass
def _fetch(self):
options = dict(self._options) # make a local copy
if self._where:
# JSON encode WHERE values
where = json.dumps(self._where)
options.update({'where': where})
extra_headers = {'X-Parse-Master-Key': MASTER_KEY} class ResourceRequestForbidden(ParseError):
response = self.__class__.GET('', extra_headers=extra_headers, '''Request returns a 403'''
**options) pass
return [self.__class__.QUERY_CLASS(**result)
for result in response['results']]
class ParseError(Exception): class ResourceRequestNotFound(ParseError):
""" '''Request returns a 404'''
Represents exceptions coming from Parse (e.g. invalid login or signup)
"""
pass pass
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import __init__ as parse_rest
from __init__ import API_ROOT, ParseResource
from query import QueryManager, Query
class InstallationManager(QueryManager):
def __init__(self):
self._model_class = Installation
class InstallationQuery(Query):
def _fetch(self):
opts = dict(self._options) # make a local copy
if self._where:
# JSON encode WHERE values
where = json.dumps(self._where)
opts.update({'where': where})
extra_headers = {'X-Parse-Master-Key': parse_rest.MASTER_KEY}
klass = self._manager.model_class
uri = self._manager.model_class.ENDPOINT_ROOT
return [klass(**it) for it in klass.GET(uri, **options).get('results')]
class Installation(ParseResource):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'installations'])
class Push(ParseResource):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'push'])
@classmethod
def send(cls, message, channels=None, **kw):
alert_message = {'alert': message}
targets = {}
if channels:
targets['channels'] = channels
if kw:
targets['where'] = kw
return cls.POST('', data=alert_message, **targets)
Installation.Query = InstallationManager()
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import collections
class QueryResourceDoesNotExist(Exception):
'''Query returned no results'''
pass
class QueryResourceMultipleResultsReturned(Exception):
'''Query was supposed to return unique result, returned more than one'''
pass
class QueryManager(object):
def __init__(self, model_class):
self.model_class = model_class
def all(self):
return Queryset(self)
def where(self, **kw):
return Queryset(self).where(**kw)
def get(self, **kw):
return Queryset(self).where(**kw).get()
class Queryset(object):
def __init__(self, manager):
self._manager = manager
self._where = collections.defaultdict(dict)
self._options = {}
def __iter__(self):
return iter(self._fetch())
def where(self, **kw):
for key, value in kw.items():
self.eq(key, value)
return self
def eq(self, name, value):
self._where[name] = value
return self
# It's tempting to generate the comparison functions programatically,
# but probably not worth the decrease in readability of the code.
def lt(self, name, value):
self._where[name]['$lt'] = value
return self
def lte(self, name, value):
self._where[name]['$lte'] = value
return self
def gt(self, name, value):
self._where[name]['$gt'] = value
return self
def gte(self, name, value):
self._where[name]['$gte'] = value
return self
def ne(self, name, value):
self._where[name]['$ne'] = value
return self
def order(self, order, decending=False):
# add a minus sign before the order value if decending == True
self._options['order'] = decending and ('-' + order) or order
return self
def limit(self, limit):
self._options['limit'] = limit
return self
def skip(self, skip):
self._options['skip'] = skip
return self
def exists(self):
results = self._fetch()
return len(results) > 0
def get(self):
results = self._fetch()
if len(results) == 0:
raise QueryResourceDoesNotExist
if len(results) >= 2:
raise QueryResourceMultipleResultsReturned
return results[0]
def _fetch(self):
options = dict(self._options) # make a local copy
if self._where:
# JSON encode WHERE values
where = json.dumps(self._where)
options.update({'where': where})
klass = self._manager.model_class
uri = self._manager.model_class.ENDPOINT_ROOT
return [klass(**it) for it in klass.GET(uri, **options).get('results')]
#!/usr/bin/env python
#-*- coding: utf-8 -*-
""" """
Contains unit tests for the Python Parse REST API wrapper Contains unit tests for the Python Parse REST API wrapper
""" """
...@@ -9,6 +12,9 @@ import urllib2 ...@@ -9,6 +12,9 @@ import urllib2
import datetime import datetime
import __init__ as parse_rest import __init__ as parse_rest
from __init__ import GeoPoint, Object
from user import User
try: try:
import settings_local import settings_local
...@@ -17,9 +23,9 @@ except ImportError: ...@@ -17,9 +23,9 @@ except ImportError:
'APPLICATION_ID, REST_API_KEY, and a MASTER_KEY ' + 'APPLICATION_ID, REST_API_KEY, and a MASTER_KEY ' +
'to run tests.') 'to run tests.')
parse_rest.APPLICATION_ID = settings_local.APPLICATION_ID parse_rest.APPLICATION_ID = getattr(settings_local, 'APPLICATION_ID', '')
parse_rest.REST_API_KEY = settings_local.REST_API_KEY parse_rest.REST_API_KEY = getattr(settings_local, 'REST_API_KEY', '')
parse_rest.MASTER_KEY = getattr(settings_local, 'MASTER_KEY', '')
GLOBAL_JSON_TEXT = """{ GLOBAL_JSON_TEXT = """{
"applications": { "applications": {
...@@ -38,138 +44,69 @@ GLOBAL_JSON_TEXT = """{ ...@@ -38,138 +44,69 @@ GLOBAL_JSON_TEXT = """{
""" """
### FUNCTIONS ### class GameScore(Object):
def test_obj(saved=False): pass
"""Return a test parse_rest.Object (content is from the docs)"""
ret = parse_rest.Object("GameScore")
ret.score = 1337 class City(Object):
ret.playerName = "Sean Plott" pass
ret.cheatMode = False
ret.location = "POINT(-30.0 43.21)" # "POINT(30 -43.21)"
if saved: class Review(Object):
ret.save() pass
return ret
class TestObject(unittest.TestCase):
### CLASSES ### def setUp(self):
class TestObjectAndQuery(unittest.TestCase): self.score = GameScore(
""" score=1337, player_name='John Doe', cheat_mode=False
Tests for the parse_rest.Object interface for creating and updating Parse )
objects, as well as the parse_rest.ObjectQuery interface for retrieving self.sao_paulo = City(
them name='São Paulo', location=GeoPoint(-23.5, -46.6167)
""" )
def check_test_obj(self, o): def tearDown(self):
"""check that the object is consistent with the test object""" city_name = getattr(self.sao_paulo, 'name', None)
self.assertEqual(o.objectId().__class__, unicode) game_score = getattr(self.score, 'score', None)
self.assertEqual(o.updatedAt().__class__, datetime.datetime) if city_name:
self.assertEqual(o.createdAt().__class__, datetime.datetime) for city in City.Query.where(name=city_name):
self.assertEqual(o.score, 1337) city.delete()
# TODO: str vs unicode
#self.assertEqual(o.playerName.__class__, unicode) if game_score:
self.assertEqual(o.cheatMode.__class__, bool) for score in GameScore.Query.where(score=game_score):
self.assertEqual(o.location, "POINT(-30.0 43.21)") score.delete()
def test_object(self): def testCanInitialize(self):
"""Test the creation, retrieval and updating of a Object""" self.assert_(self.score.score == 1337, 'Could not set score')
gameScore = test_obj()
gameScore.save() def testCanInstantiateParseType(self):
self.check_test_obj(gameScore) self.assert_(self.sao_paulo.location.latitude == -23.5)
# retrieve a new one def testCanCreateNewObject(self):
query = parse_rest.ObjectQuery('GameScore') self.score.save()
obj1 = query.get(gameScore.objectId()) self.assert_(self.score.objectId is not None, 'Can not create object')
self.check_test_obj(obj1)
def testCanUpdateExistingObject(self):
# now update it self.sao_paulo.save()
current_updated = obj1.updatedAt() self.sao_paulo.country = 'Brazil'
obj1.score = 1000 self.sao_paulo.save()
obj1.save()
self.assertGreater(obj1.updatedAt(), current_updated) city = City.Query.where(name='São Paulo').get()
self.assertEqual(obj1.score, 1000) self.assert_(city.country == 'Brazil', 'Could not update object')
# test accessing like a dictionary def testCanDeleteExistingObject(self):
self.assertTrue("playerName" in obj1) self.score.save()
self.assertTrue("score" in obj1) object_id = self.score.objectId
self.assertEqual(obj1["score"], 1000) self.score.delete()
self.assertEqual(obj1["playerName"], "Sean Plott") self.assert_(not GameScore.Query.where(objectId=object_id).exists(),
obj1["playerName"] = "Sean Scott" 'Failed to delete object %s on Parse ' % self.score)
self.assertEqual(obj1.playerName, "Sean Scott")
# non-existent or forbidden lookup def testCanIncrementField(self):
self.assertRaises(KeyError, obj1.__getitem__, "nosuchkey") previous_score = self.score.score
self.assertRaises(KeyError, obj1.__getitem__, "_class_name") self.score.save()
self.score.increment('score')
# re-retrieve it self.assert_(GameScore.Query.where(score=previous_score + 1).exists(),
obj2 = query.get(obj1.objectId()) 'Failed to increment score on backend')
self.assertEqual(obj2.score, 1000)
# change one object, check that others can be refreshed
obj2.score = 2000
obj2.save()
self.assertEqual(obj1.score, 1000)
obj1.refresh()
self.assertEqual(obj1.score, 2000)
# try removing a field
obj2.remove("score")
obj2.save()
self.assertEqual(obj2.has("score"), False)
def test_increment(self):
"""Test incrementation of fields"""
o = test_obj(True)
self.check_test_obj(o)
o.save()
o.increment("score")
self.assertEqual(o.score, 1338)
query = parse_rest.ObjectQuery("GameScore")
o2 = query.get(o.objectId())
self.assertEqual(o2.score, 1338)
# one more time
o.increment("score")
self.assertEqual(o.score, 1339)
o3 = query.get(o.objectId())
self.assertEqual(o3.score, 1339)
def test_relationship(self):
"""Test relationship between objects"""
post = parse_rest.Object("Post")
post.title = "I'm Hungry"
post.content = "Where should we go for lunch?"
post.save()
comment = parse_rest.Object("Comment")
comment.content = "Let's do Sushirrito"
comment.parent = post
comment.save()
# that should have saved both post and comment
post_id = post.objectId()
comment_id = comment.objectId()
self.assertEqual(post_id.__class__, unicode)
self.assertEqual(comment_id.__class__, unicode)
# retrieve new ones
post2 = parse_rest.ObjectQuery("Post").get(post_id)
comment2 = parse_rest.ObjectQuery("Comment").get(comment_id)
# check the relationship between the saved post and comment
self.assertEqual(comment2.parent.objectId(), post_id)
self.assertEqual(comment2.parent.title, "I'm Hungry")
def test_delete(self):
"""Test deleting an object"""
o = test_obj(True)
obj_id = o.objectId()
self.check_test_obj(o)
o2 = parse_rest.ObjectQuery("GameScore").get(obj_id)
self.check_test_obj(o2)
o2.delete()
self.assertRaises(urllib2.HTTPError,
parse_rest.ObjectQuery("GameScore").get, obj_id)
class TestFunction(unittest.TestCase): class TestFunction(unittest.TestCase):
...@@ -182,7 +119,7 @@ class TestFunction(unittest.TestCase): ...@@ -182,7 +119,7 @@ class TestFunction(unittest.TestCase):
# write the config file # write the config file
with open("config/global.json", "w") as outf: with open("config/global.json", "w") as outf:
outf.write(GLOBAL_JSON_TEXT % (settings_local.APPLICATION_ID, outf.write(GLOBAL_JSON_TEXT % (settings_local.APPLICATION_ID,
settings_local.MASTER_KEY)) settings_local.MASTER_KEY))
try: try:
subprocess.call(["parse", "deploy"]) subprocess.call(["parse", "deploy"])
except OSError: except OSError:
...@@ -190,8 +127,8 @@ class TestFunction(unittest.TestCase): ...@@ -190,8 +127,8 @@ class TestFunction(unittest.TestCase):
"(see https://www.parse.com/docs/cloud_code_guide)") "(see https://www.parse.com/docs/cloud_code_guide)")
os.chdir(original_dir) os.chdir(original_dir)
# remove all existing Review objects def tearDown(self):
for review in parse_rest.ObjectQuery("Review").fetch(): for review in Review.Query.all():
review.delete() review.delete()
def test_simple_functions(self): def test_simple_functions(self):
...@@ -202,13 +139,10 @@ class TestFunction(unittest.TestCase): ...@@ -202,13 +139,10 @@ class TestFunction(unittest.TestCase):
self.assertEqual(ret["result"], u"Hello world!") self.assertEqual(ret["result"], u"Hello world!")
# Test the averageStars function- takes simple argument # Test the averageStars function- takes simple argument
r1 = parse_rest.Object("Review", {"movie": "The Matrix", r1 = Review(movie="The Matrix", stars=5,
"stars": 5, comment="Too bad they never made any sequels.")
"comment": "Too bad they never made any sequels."})
r1.save() r1.save()
r2 = parse_rest.Object("Review", {"movie": "The Matrix", r2 = Review(movie="The Matrix", stars=4, comment="It's OK.")
"stars": 4,
"comment": "It's OK."})
r2.save() r2.save()
star_func = parse_rest.Function("averageStars") star_func = parse_rest.Function("averageStars")
...@@ -217,54 +151,67 @@ class TestFunction(unittest.TestCase): ...@@ -217,54 +151,67 @@ class TestFunction(unittest.TestCase):
class TestUser(unittest.TestCase): class TestUser(unittest.TestCase):
def setUp(self): USERNAME = "dhelmet@spaceballs.com"
"""remove the test user if he exists""" PASSWORD = "12345"
u = parse_rest.User("dhelmet@spaceballs.com", "12345")
try:
u.login()
u.delete()
except parse_rest.ParseError as e:
# if the user doesn't exist, that's fine
if e.message != "Invalid login":
raise
def test_user(self): def _get_user(self):
"""Test the ability to sign up, log in, and delete users""" try:
u = parse_rest.User("dhelmet@spaceballs.com", "12345") user = User.signup(self.username, self.password)
u.signup() except:
user = User.Query.get(username=self.username)
return user
# can't save or delete until it's logged in def _destroy_user(self):
self.assertRaises(parse_rest.ParseError, u.save, ()) user = self._get_logged_user()
self.assertRaises(parse_rest.ParseError, u.delete, ()) user and user.delete()
u.login() def _get_logged_user(self):
self.assertTrue(hasattr(u, "sessionToken")) if User.Query.where(username=self.username).exists():
self.assertNotEqual(u.sessionToken, None) return User.login(self.username, self.password)
else:
return self._get_user()
# add phone number and save def setUp(self):
u.phone = "555-5555" self.username = TestUser.USERNAME
u.save() self.password = TestUser.PASSWORD
uq = parse_rest.UserQuery() try:
u_retrieved = uq.get(u.objectId()) u = User.login(self.USERNAME, self.PASSWORD)
self.assertEqual(u.username, u_retrieved.username) except parse_rest.ResourceRequestNotFound as e:
self.assertEqual(u_retrieved.phone, "555-5555") # if the user doesn't exist, that's fine
return
u.delete()
# test UserQuery.fetch def tearDown(self):
queried_users = uq.fetch() self._destroy_user()
self.assertTrue(u.username in [qu.username for qu in queried_users])
# test accessing like a dictionary def testCanSignUp(self):
self.assertEqual(u_retrieved["username"], "dhelmet@spaceballs.com") self._destroy_user()
self.assertEqual(u_retrieved["phone"], "555-5555") user = User.signup(self.username, self.password)
self.assert_(user is not None)
# try creating another account with the same user def testCanLogin(self):
u2 = parse_rest.User("dhelmet@spaceballs.com", "12345") self._get_user() # User should be created here.
self.assertRaises(parse_rest.ParseError, u2.signup) user = User.login(self.username, self.password)
self.assert_(user.is_authenticated(), 'Login failed')
# time to delete def testCanUpdate(self):
u.delete() user = self._get_logged_user()
phone_number = '555-5555'
# add phone number and save
user.phone = phone_number
user.save()
self.assert_(User.Query.where(phone=phone_number).exists(),
'Failed to update user data. New info not on Parse')
def testCanQueryBySession(self):
User.signup(self.username, self.password)
logged = User.login(self.username, self.password)
queried = User.Query.where(sessionToken=logged.sessionToken).get()
self.assert_(queried.objectId == logged.objectId,
'Could not find user %s by session' % logged.username)
if __name__ == "__main__": if __name__ == "__main__":
# command line # command line
......
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from __init__ import ParseResource, API_ROOT
from __init__ import ResourceRequestLoginRequired, ResourceRequestBadRequest
from query import QueryManager
def login_required(func):
'''decorator describing User methods that need to be logged in'''
def ret(obj, *args, **kw):
if not hasattr(obj, 'sessionToken'):
message = '%s requires a logged-in session' % func.__name__
raise ResourceRequestLoginRequired(message)
func(obj, *args, **kw)
return ret
class User(ParseResource):
'''
A User is like a regular Parse object (can be modified and saved) but
it requires additional methods and functionality
'''
ENDPOINT_ROOT = '/'.join([API_ROOT, 'users'])
PROTECTED_ATTRIBUTES = ParseResource.PROTECTED_ATTRIBUTES + [
'username', 'sessionToken']
def is_authenticated(self):
return getattr(self, 'sessionToken', None) or False
@login_required
def save(self):
session_header = {'X-Parse-Session-Token': self.sessionToken}
return self.__class__.PUT(
self._absolute_url,
extra_headers=session_header,
**self._to_native())
@login_required
def delete(self):
session_header = {'X-Parse-Session-Token': self.sessionToken}
return self.DELETE(self._absolute_url, extra_headers=session_header)
@staticmethod
def signup(username, password, **kw):
return User(**User.POST('', username=username, password=password,
**kw))
@staticmethod
def login(username, password):
login_url = '/'.join([API_ROOT, 'login'])
return User(**User.GET(login_url, username=username,
password=password))
@staticmethod
def request_password_reset(email):
'''Trigger Parse's Password Process. Return True/False
indicate success/failure on the request'''
url = '/'.join([API_ROOT, 'requestPasswordReset'])
try:
User.POST(url, email=email)
return True
except Exception, why:
return False
User.Query = QueryManager(User)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment