Commit 2b5a26b0 by Raphael Lullis

Lots of changes. Too many to put in a single commit message. In short,…

Lots of changes. Too many to put in a single commit message. In short, transformed Query objects into a method that belongs to ResourceClass. Also, added classes to represent ParseTypes. It should be possible to use these transparently. Added tests\!
parent 60755bf6
......@@ -20,16 +20,126 @@ import collections
import re
import logging
from query import QueryManager
API_ROOT = 'https://api.parse.com/1'
APPLICATION_ID = ''
REST_API_KEY = ''
MASTER_KEY = ''
class ParseType(object):
@staticmethod
def convert(parse_data):
is_parse_type = isinstance(parse_data, dict) and '__type' in parse_data
if not is_parse_type: return parse_data
parse_type = parse_data['__type']
native = {
'Pointer': Pointer,
'Date': Date,
'Bytes': Binary,
'GeoPoint': GeoPoint,
'File': File,
'Relation': Relation
}.get(parse_type)
return native and native.from_native(**parse_data) or parse_data
@classmethod
def from_native(cls, **kw):
return cls(**kw)
def _to_native(self):
return self._value
class Pointer(ParseType):
@classmethod
def from_native(cls, **kw):
klass = Object.factory(kw.get('className'))
return klass.retrieve(kw.get('objectId'))
def _to_native(self):
return {
'__type': 'Pointer',
'className': self.__class__.__name__,
'objectId': self.objectId
}
class ParseBinaryDataWrapper(str):
class Relation(ParseType):
@classmethod
def from_native(cls, **kw):
pass
class Date(ParseType):
FORMAT = '%Y-%m-%dT%H:%M:%S.%f%Z'
@classmethod
def from_native(cls, **kw):
date_str = kw.get('iso', '')[:-1] + 'UTC'
return cls(datetime.datetime.strptime(date_str, Date.FORMAT))
def __init__(self, date):
self._date = date
def _to_native(self):
return {
'__type': 'Date', 'iso': self._date.isoformat()
}
class Binary(ParseType):
@classmethod
def from_native(self, **kw):
return cls(kw.get('base64', ''))
def __init__(self, encoded_string):
self._encoded = encoded_string
self._decoded = str(base64.b64decode(self._encoded))
def _to_native(self):
return {'__type': 'Bytes', 'base64': self._encoded}
class GeoPoint(ParseType):
@classmethod
def from_native(self, **kw):
return cls(kw.get('latitude'), kw.get('longitude'))
def __init__(self, latitude, longitude):
self.latitude = latitude
self.longitude = longitude
def _to_native(self):
return {
'__type': 'GeoPoint',
'latitude': self.latitude,
'longitude': self.longitude
}
class File(ParseType):
@classmethod
def from_native(self, **kw):
return cls(kw.get('url'), kw.get('name'))
def __init__(self, url, name):
request = urllib2.Request(url)
self._name = name
self._url = url
self._file = urllib2.urlopen(request)
class ParseBase(object):
ENDPOINT_ROOT = API_ROOT
......@@ -48,16 +158,21 @@ class ParseBase(object):
request = urllib2.Request(url, data, headers)
request.add_header('Content-type', 'application/json')
#auth_header = "Basic %s" % base64.b64encode('%s:%s' %
# (APPLICATION_ID, REST_API_KEY))
#request.add_header("Authorization", auth_header)
request.add_header("X-Parse-Application-Id", APPLICATION_ID)
request.add_header("X-Parse-REST-API-Key", REST_API_KEY)
request.get_method = lambda: http_verb
# TODO: add error handling for server response
try:
response = urllib2.urlopen(request)
except urllib2.HTTPError, e:
raise {
400: ResourceRequestBadRequest,
401: ResourceRequestLoginRequired,
403: ResourceRequestForbidden,
404: ResourceRequestNotFound
}.get(e.code, ParseError)
return json.loads(response.read())
@classmethod
......@@ -76,46 +191,6 @@ class ParseBase(object):
def DELETE(cls, uri, **kw):
return cls.execute(uri, 'DELETE', **kw)
@property
def _attributes(self):
# return "public" attributes converted to the base parse representation
return dict([
self._convertToParseType(p) for p in self.__dict__.items()
if p[0][0] != '_'
])
def _isGeoPoint(self, value):
if isinstance(value, str):
return re.search('\\bPOINT\\(([-+]?[0-9]*\\.?[0-9]*) ' +
'([-+]?[0-9]*\\.?[0-9]*)\\)', value, re.I)
def _ISO8601ToDatetime(self, date_string):
# TODO: verify correct handling of timezone
date_string = date_string[:-1] + 'UTC'
return datetime.datetime.strptime(date_string,
'%Y-%m-%dT%H:%M:%S.%f%Z')
def _convertToParseType(self, prop):
key, value = prop
if type(value) == Object:
value = {'__type': 'Pointer',
'className': value._class_name,
'objectId': value._object_id}
elif type(value) == datetime.datetime:
# take off the last 3 digits and add a Z
value = {'__type': 'Date', 'iso': value.isoformat()[:-3] + 'Z'}
elif type(value) == ParseBinaryDataWrapper:
value = {'__type': 'Bytes',
'base64': base64.b64encode(value)}
elif self._isGeoPoint(value):
coordinates = re.findall('[-+]?[0-9]+\\.?[0-9]*', value)
value = {'__type': 'GeoPoint',
'longitude': float(coordinates[0]),
'latitude': float(coordinates[1])}
return (key, value)
class Function(ParseBase):
ENDPOINT_ROOT = "/".join((API_ROOT, "functions"))
......@@ -128,380 +203,148 @@ class Function(ParseBase):
class ParseResource(ParseBase):
def __init__(self, **kw):
self._object_id = kw.pop('objectId', None)
self._updated_at = kw.pop('updatedAt', None)
self._created_at = kw.pop('createdAt', None)
for attr, value in kw.items():
self.__dict__[attr] = value
PROTECTED_ATTRIBUTES = ['objectId', 'createdAt', 'updatedAt']
@classmethod
def retrieve(cls, resource_id):
return cls(**cls.GET('/' + resource_id))
_absolute_url = property(lambda self: '/'.join(
[self.__class__.ENDPOINT_ROOT, self._object_id]))
def __init__(self, **kw):
for key, value in kw.items(): setattr(self, key, value)
def objectId(self):
return self._object_id
def _to_native(self):
# serializes all attributes that need to be persisted on Parse
def updatedAt(self):
return (self._updated_at and self._ISO8601ToDatetime(self._updated_at)
or None)
protected_attributes = self.__class__.PROTECTED_ATTRIBUTES
is_protected = lambda a: a in protected_attributes or a.startswith('_')
def createdAt(self):
return (self._created_at and self._ISO8601ToDatetime(self._created_at)
or None)
return dict([(k, v._to_native() if isinstance(v, ParseType) else v)
for k, v in self.__dict__.items() if not is_protected(k)
])
def _get_object_id(self):
return getattr(self, '_object_id', None)
class Installation(ParseResource):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'installations'])
def _set_object_id(self, value):
if hasattr(self, '_object_id'):
raise ValueError, 'Can not re-set object id'
self._object_id = value
def _get_updated_datetime(self):
return getattr(self, '_updated_at', None) and self._updated_at._date
class Object(ParseResource):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'classes'])
def _set_updated_datetime(self, value):
self._updated_at = Date(value)
def __init__(self, class_name, attrs_dict=None):
self._class_name = class_name
self._object_id = None
self._updated_at = None
self._created_at = None
def _get_created_datetime(self):
return getattr(self, '_created_at', None) and self._created_at._date
if attrs_dict:
self._populateFromDict(attrs_dict)
def _set_created_datetime(self, value):
self._created_at = Date(value)
def save(self):
if self._object_id:
if self.objectId:
self._update()
else:
self._create()
def delete(self):
# URL: /1/classes/<className>/<objectId>
# HTTP Verb: DELETE
self.__class__.DELETE('/%s/%s' % (self._class_name, self._object_id))
self = self.__init__(None)
def increment(self, key, amount=1):
"""
Increment one value in the object. Note that this happens immediately:
it does not wait for save() to be called
"""
uri = '/%s/%s' % (self._class_name, self._object_id)
payload = {
key: {
'__op': 'Increment',
'amount': amount
}
}
self._populateFromDict(self.__class__.execute(uri, 'PUT', **payload))
def has(self, attr):
return attr in self.__dict__
def remove(self, attr):
self.__dict__.pop(attr)
# dictionary and list-like methods
def __contains__(self, k):
return not k.startswith("_") and k in self.__dict__
def __iter__(self):
return (k for k in self.__dict__ if not k.startswith("_"))
def __getitem__(self, key):
if key.startswith("_"):
raise KeyError("Cannot access this value in this way")
return self.__dict__[key]
def __setitem__(self, key, value):
if key.startswith("_"):
raise KeyError("Cannot change this value in this way")
self.__dict__[key] = value
def items(self):
return self._attributes.items()
def refresh(self):
uri = '/%s/%s' % (self._class_name, self._object_id)
response_dict = self.__class__.execute(uri, 'GET')
self._populateFromDict(response_dict)
def _populateFromDict(self, attrs_dict):
if 'objectId' in attrs_dict:
self._object_id = attrs_dict['objectId']
del attrs_dict['objectId']
if 'createdAt' in attrs_dict:
self._created_at = attrs_dict['createdAt']
del attrs_dict['createdAt']
if 'updatedAt' in attrs_dict:
self._updated_at = attrs_dict['updatedAt']
del attrs_dict['updatedAt']
attrs_dict = dict(map(self._convertFromParseType, attrs_dict.items()))
self.__dict__.update(attrs_dict)
def _convertFromParseType(self, prop):
key, value = prop
if type(value) == dict and '__type' in value:
if value['__type'] == 'Pointer':
value = ObjectQuery(value['className']).get(value['objectId'])
elif value['__type'] == 'Date':
value = self._ISO8601ToDatetime(value['iso'])
elif value['__type'] == 'Bytes':
value = ParseBinaryDataWrapper(base64.b64decode(
value['base64']))
elif value['__type'] == 'GeoPoint':
value = 'POINT(%s %s)' % (value['longitude'],
value['latitude'])
else:
raise Exception('Invalid __type.')
return (key, value)
def _create(self):
# URL: /1/classes/<className>
# HTTP Verb: POST
uri = '/%s' % self._class_name
uri = self.__class__.ENDPOINT_ROOT
response_dict = self.__class__.POST(uri, **self._attributes)
response_dict = self.__class__.POST(uri, **self._to_native())
self._created_at = self._updated_at = response_dict['createdAt']
self._object_id = response_dict['objectId']
self.createdAt = self.updatedAt = response_dict['createdAt']
self.objectId = response_dict['objectId']
def _update(self):
# URL: /1/classes/<className>/<objectId>
# HTTP Verb: PUT
uri = '/%s/%s' % (self._class_name, self._object_id)
response_dict = self.__class__.PUT(uri, **self._attributes)
self._updated_at = response_dict['updatedAt']
response = self.__class__.PUT(self._absolute_url, **self._to_native())
self.updatedAt = response['updatedAt']
class User(Object):
"""
A User is like a regular Parse object (can be modified and saved) but
it requires additional methods and functionality
"""
ENDPOINT_ROOT = '/'.join([API_ROOT, 'users'])
def delete(self):
self.__class__.DELETE(self._absolute_url)
self.__dict__ = {}
def __init__(self, username, password=None, **kw):
"""
Initialized with a username and possibly password along with any other
attributes (name, phone number...)
"""
kw["username"] = username
kw["password"] = password
Object.__init__(self, "", kw)
_absolute_url = property(
lambda self: '/'.join([self.__class__.ENDPOINT_ROOT, self.objectId])
)
@classmethod
def retrieve(cls, resource_id):
"""retrieve a user by its ID"""
ret = cls.GET('/' + resource_id)
username = ret.pop("username")
return cls(username, **ret)
def needs_session(func):
"""decorator describing User methods that need to be logged in"""
def ret(obj, *a, **kw):
if not hasattr(obj, "sessionToken"):
raise ParseError("%s requires a logged-in session" %
(func.__name__, ))
func(obj, *a, **kw)
return ret
def signup(self, **kw):
"""same as creating an object, with handling if user already exists"""
try:
self._create()
except urllib2.HTTPError as e:
if "400" in str(e):
raise ParseError("User already exists")
else:
raise
objectId = property(_get_object_id, _set_object_id)
createdAt = property(_get_created_datetime, _set_created_datetime)
updatedAt = property(_get_updated_datetime, _set_created_datetime)
def login(self):
try:
ret = self.GET('/'.join([API_ROOT, 'login']),
username=self.username, password=self.password)
except urllib2.HTTPError:
raise ParseError("Invalid login")
# update all attributes
self._populateFromDict(ret)
@needs_session
def save(self):
session_header = {'X-Parse-Session-Token': self.sessionToken}
# remove items you can't change
save_dict = {k: v for k, v in self._attributes.items()
if k not in ["username", "password", "sessionToken"]}
return self.__class__.PUT(
self._absolute_url, extra_headers=session_header,
**save_dict)
class Object(ParseResource):
@needs_session
def delete(self):
session_header = {'X-Parse-Session-Token': self.sessionToken}
return self.DELETE("/" + self.objectId(), extra_headers=session_header)
ENDPOINT_ROOT = '/'.join([API_ROOT, 'classes'])
@classmethod
def request_password_reset(cls, email):
"""reset a user's password using his email"""
return self.POST('/'.join([API_ROOT, 'requestPasswordReset']),
email=email)
class Push(ParseResource):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'push'])
def factory(cls, class_name):
class DerivedClass(cls): pass
DerivedClass.__name__ = class_name
return DerivedClass
@classmethod
def send(cls, message, channels=None, **kw):
alert_message = {'alert': message}
targets = {}
if channels:
targets['channels'] = channels
if kw:
targets['where'] = kw
return cls.POST('', data=alert_message, **targets)
class Query(ParseBase):
def __init__(self):
self._where = collections.defaultdict(dict)
self._options = {}
def eq(self, name, value):
self._where[name] = value
return self
# It's tempting to generate the comparison functions programatically,
# but probably not worth the decrease in readability of the code.
def lt(self, name, value):
self._where[name]['$lt'] = value
return self
def lte(self, name, value):
self._where[name]['$lte'] = value
return self
def gt(self, name, value):
self._where[name]['$gt'] = value
return self
def gte(self, name, value):
self._where[name]['$gte'] = value
return self
def ne(self, name, value):
self._where[name]['$ne'] = value
return self
def order(self, order, decending=False):
# add a minus sign before the order value if decending == True
self._options['order'] = decending and ('-' + order) or order
return self
def limit(self, limit):
self._options['limit'] = limit
return self
def skip(self, skip):
self._options['skip'] = skip
return self
def get(self, object_id):
return self.__class__.QUERY_CLASS.retrieve(object_id)
def fetch(self):
# hide the single_result param of the _fetch method from the library
# user since it's only useful internally
return self._fetch()
def _fetch(self):
options = dict(self._options) # make a local copy
if self._where:
# JSON encode WHERE values
where = json.dumps(self._where)
options.update({'where': where})
response = self.__class__.GET('', **options)
return [self.__class__.QUERY_CLASS(**result)
for result in response['results']]
class ObjectQuery(Query):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'classes'])
def set_endpoint_root(cls):
root = '/'.join([API_ROOT, 'classes', cls.__name__])
if cls.ENDPOINT_ROOT != root:
cls.ENDPOINT_ROOT = root
return cls.ENDPOINT_ROOT
def __new__(cls, *args, **kw):
cls.set_endpoint_root()
manager = getattr(cls, 'Query', QueryManager(cls))
if not (hasattr(cls, 'Query') and manager.model_class is cls):
cls.Query = manager
return ParseResource.__new__(cls)
def __init__(self, class_name):
self._class_name = class_name
self._where = collections.defaultdict(dict)
self._options = {}
self._object_id = ''
def get(self, object_id):
self._object_id = object_id
return self._fetch(single_result=True)
def _fetch(self, single_result=False):
# URL: /1/classes/<className>/<objectId>
# HTTP Verb: GET
@property
def _absolute_url(self):
if not self.objectId: return None
if self._object_id:
response = self.__class__.GET('/%s/%s' % (self._class_name,
self._object_id))
else:
options = dict(self._options) # make a local copy
if self._where:
# JSON encode WHERE values
where = json.dumps(self._where)
options.update({'where': where})
return '/'.join([self.__class__.ENDPOINT_ROOT, self.objectId])
response = self.__class__.GET('/%s' % self._class_name, **options)
def increment(self, key, amount=1):
"""
Increment one value in the object. Note that this happens immediately:
it does not wait for save() to be called
"""
payload = {
key: {
'__op': 'Increment',
'amount': amount
}
}
self.__class__.PUT(self._absolute_url, **payload)
self.__dict__[key] += amount
if single_result:
return Object(self._class_name, response)
else:
return [Object(self._class_name, result)
for result in response['results']]
class ParseError(Exception):
'''Base exceptions from requests made to Parse'''
pass
class UserQuery(Query):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'users'])
QUERY_CLASS = User
def __init__(self):
"""UserQuery doesn't need a class name to be passed"""
class ResourceRequestBadRequest(ParseError):
'''Request returns a 400'''
pass
class InstallationQuery(Query):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'installations'])
QUERY_CLASS = Installation
class ResourceRequestLoginRequired(ParseError):
'''Request returns a 401'''
pass
def _fetch(self):
options = dict(self._options) # make a local copy
if self._where:
# JSON encode WHERE values
where = json.dumps(self._where)
options.update({'where': where})
extra_headers = {'X-Parse-Master-Key': MASTER_KEY}
response = self.__class__.GET('', extra_headers=extra_headers,
**options)
return [self.__class__.QUERY_CLASS(**result)
for result in response['results']]
class ResourceRequestForbidden(ParseError):
'''Request returns a 403'''
pass
class ParseError(Exception):
"""
Represents exceptions coming from Parse (e.g. invalid login or signup)
"""
class ResourceRequestNotFound(ParseError):
'''Request returns a 404'''
pass
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import __init__ as parse_rest
from __init__ import API_ROOT, ParseResource
from query import QueryManager, Query
class InstallationManager(QueryManager):
def __init__(self):
self._model_class = Installation
class InstallationQuery(Query):
def _fetch(self):
opts = dict(self._options) # make a local copy
if self._where:
# JSON encode WHERE values
where = json.dumps(self._where)
opts.update({'where': where})
extra_headers = {'X-Parse-Master-Key': parse_rest.MASTER_KEY}
klass = self._manager.model_class
uri = self._manager.model_class.ENDPOINT_ROOT
return [klass(**it) for it in klass.GET(uri, **options).get('results')]
class Installation(ParseResource):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'installations'])
class Push(ParseResource):
ENDPOINT_ROOT = '/'.join([API_ROOT, 'push'])
@classmethod
def send(cls, message, channels=None, **kw):
alert_message = {'alert': message}
targets = {}
if channels:
targets['channels'] = channels
if kw:
targets['where'] = kw
return cls.POST('', data=alert_message, **targets)
Installation.Query = InstallationManager()
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import collections
class QueryResourceDoesNotExist(Exception):
'''Query returned no results'''
pass
class QueryResourceMultipleResultsReturned(Exception):
'''Query was supposed to return unique result, returned more than one'''
pass
class QueryManager(object):
def __init__(self, model_class):
self.model_class = model_class
def all(self):
return Queryset(self)
def where(self, **kw):
return Queryset(self).where(**kw)
def get(self, **kw):
return Queryset(self).where(**kw).get()
class Queryset(object):
def __init__(self, manager):
self._manager = manager
self._where = collections.defaultdict(dict)
self._options = {}
def __iter__(self):
results = getattr(self, '_results', self._fetch())
self._results = results
if len(self._results) == 0:
raise StopIteration
yield self._results.pop(0)
def where(self, **kw):
for key, value in kw.items(): self.eq(key, value)
return self
def eq(self, name, value):
self._where[name] = value
return self
# It's tempting to generate the comparison functions programatically,
# but probably not worth the decrease in readability of the code.
def lt(self, name, value):
self._where[name]['$lt'] = value
return self
def lte(self, name, value):
self._where[name]['$lte'] = value
return self
def gt(self, name, value):
self._where[name]['$gt'] = value
return self
def gte(self, name, value):
self._where[name]['$gte'] = value
return self
def ne(self, name, value):
self._where[name]['$ne'] = value
return self
def order(self, order, decending=False):
# add a minus sign before the order value if decending == True
self._options['order'] = decending and ('-' + order) or order
return self
def limit(self, limit):
self._options['limit'] = limit
return self
def skip(self, skip):
self._options['skip'] = skip
return self
def exists(self):
results = self._fetch()
return len(results) > 0
def get(self):
results = self._fetch()
if len(results) == 0: raise QueryResourceDoesNotExist
if len(results) >= 2: raise QueryResourceMultipleResultsReturned
return results[0]
def _fetch(self):
options = dict(self._options) # make a local copy
if self._where:
# JSON encode WHERE values
where = json.dumps(self._where)
options.update({'where': where})
klass = self._manager.model_class
uri = self._manager.model_class.ENDPOINT_ROOT
return [klass(**it) for it in klass.GET(uri, **options).get('results')]
#!/usr/bin/env python
#-*- coding: utf-8 -*-
"""
Contains unit tests for the Python Parse REST API wrapper
"""
......@@ -9,6 +12,9 @@ import urllib2
import datetime
import __init__ as parse_rest
from __init__ import GeoPoint, Object
from user import User
try:
import settings_local
......@@ -17,9 +23,9 @@ except ImportError:
'APPLICATION_ID, REST_API_KEY, and a MASTER_KEY ' +
'to run tests.')
parse_rest.APPLICATION_ID = settings_local.APPLICATION_ID
parse_rest.REST_API_KEY = settings_local.REST_API_KEY
parse_rest.APPLICATION_ID = getattr(settings_local, 'APPLICATION_ID', '')
parse_rest.REST_API_KEY = getattr(settings_local, 'REST_API_KEY', '')
parse_rest.MASTER_KEY = getattr(settings_local, 'MASTER_KEY', '')
GLOBAL_JSON_TEXT = """{
"applications": {
......@@ -38,140 +44,68 @@ GLOBAL_JSON_TEXT = """{
"""
### FUNCTIONS ###
def test_obj(saved=False):
"""Return a test parse_rest.Object (content is from the docs)"""
ret = parse_rest.Object("GameScore")
ret.score = 1337
ret.playerName = "Sean Plott"
ret.cheatMode = False
ret.location = "POINT(-30.0 43.21)" # "POINT(30 -43.21)"
if saved:
ret.save()
return ret
### CLASSES ###
class TestObjectAndQuery(unittest.TestCase):
"""
Tests for the parse_rest.Object interface for creating and updating Parse
objects, as well as the parse_rest.ObjectQuery interface for retrieving
them
"""
def check_test_obj(self, o):
"""check that the object is consistent with the test object"""
self.assertEqual(o.objectId().__class__, unicode)
self.assertEqual(o.updatedAt().__class__, datetime.datetime)
self.assertEqual(o.createdAt().__class__, datetime.datetime)
self.assertEqual(o.score, 1337)
# TODO: str vs unicode
#self.assertEqual(o.playerName.__class__, unicode)
self.assertEqual(o.cheatMode.__class__, bool)
self.assertEqual(o.location, "POINT(-30.0 43.21)")
def test_object(self):
"""Test the creation, retrieval and updating of a Object"""
gameScore = test_obj()
gameScore.save()
self.check_test_obj(gameScore)
# retrieve a new one
query = parse_rest.ObjectQuery('GameScore')
obj1 = query.get(gameScore.objectId())
self.check_test_obj(obj1)
# now update it
current_updated = obj1.updatedAt()
obj1.score = 1000
obj1.save()
self.assertGreater(obj1.updatedAt(), current_updated)
self.assertEqual(obj1.score, 1000)
# test accessing like a dictionary
self.assertTrue("playerName" in obj1)
self.assertTrue("score" in obj1)
self.assertEqual(obj1["score"], 1000)
self.assertEqual(obj1["playerName"], "Sean Plott")
obj1["playerName"] = "Sean Scott"
self.assertEqual(obj1.playerName, "Sean Scott")
# non-existent or forbidden lookup
self.assertRaises(KeyError, obj1.__getitem__, "nosuchkey")
self.assertRaises(KeyError, obj1.__getitem__, "_class_name")
# re-retrieve it
obj2 = query.get(obj1.objectId())
self.assertEqual(obj2.score, 1000)
# change one object, check that others can be refreshed
obj2.score = 2000
obj2.save()
self.assertEqual(obj1.score, 1000)
obj1.refresh()
self.assertEqual(obj1.score, 2000)
# try removing a field
obj2.remove("score")
obj2.save()
self.assertEqual(obj2.has("score"), False)
def test_increment(self):
"""Test incrementation of fields"""
o = test_obj(True)
self.check_test_obj(o)
o.save()
o.increment("score")
self.assertEqual(o.score, 1338)
query = parse_rest.ObjectQuery("GameScore")
o2 = query.get(o.objectId())
self.assertEqual(o2.score, 1338)
# one more time
o.increment("score")
self.assertEqual(o.score, 1339)
o3 = query.get(o.objectId())
self.assertEqual(o3.score, 1339)
def test_relationship(self):
"""Test relationship between objects"""
post = parse_rest.Object("Post")
post.title = "I'm Hungry"
post.content = "Where should we go for lunch?"
post.save()
comment = parse_rest.Object("Comment")
comment.content = "Let's do Sushirrito"
comment.parent = post
comment.save()
# that should have saved both post and comment
post_id = post.objectId()
comment_id = comment.objectId()
self.assertEqual(post_id.__class__, unicode)
self.assertEqual(comment_id.__class__, unicode)
# retrieve new ones
post2 = parse_rest.ObjectQuery("Post").get(post_id)
comment2 = parse_rest.ObjectQuery("Comment").get(comment_id)
# check the relationship between the saved post and comment
self.assertEqual(comment2.parent.objectId(), post_id)
self.assertEqual(comment2.parent.title, "I'm Hungry")
def test_delete(self):
"""Test deleting an object"""
o = test_obj(True)
obj_id = o.objectId()
self.check_test_obj(o)
o2 = parse_rest.ObjectQuery("GameScore").get(obj_id)
self.check_test_obj(o2)
o2.delete()
self.assertRaises(urllib2.HTTPError,
parse_rest.ObjectQuery("GameScore").get, obj_id)
class GameScore(Object):
pass
class City(Object):
pass
class TestObject(unittest.TestCase):
def setUp(self):
self.score = GameScore(
score=1337, player_name='John Doe', cheat_mode=False
)
self.sao_paulo = City(
name='São Paulo', location=GeoPoint(-23.5, -46.6167)
)
def tearDown(self):
city_name = getattr(self.sao_paulo, 'name', None)
game_score = getattr(self.score, 'score', None)
if city_name:
for city in City.Query.where(name=city_name):
city.delete()
if game_score:
for score in GameScore.Query.where(score=game_score):
score.delete()
def testCanInitialize(self):
self.assert_(self.score.score == 1337, 'Could not set score')
def testCanInstantiateParseType(self):
self.assert_(self.sao_paulo.location.latitude == -23.5)
def testCanCreateNewObject(self):
self.score.save()
self.assert_(self.score.objectId is not None, 'Can not create object')
def testCanUpdateExistingObject(self):
self.sao_paulo.save()
self.sao_paulo.country = 'Brazil'
self.sao_paulo.save()
city = City.Query.where(name='São Paulo').get()
self.assert_(city.country == 'Brazil', 'Could not update object')
def testCanDeleteExistingObject(self):
self.score.save()
object_id = self.score.objectId
self.score.delete()
self.assert_(not GameScore.Query.where(objectId=object_id).exists(),
'Failed to delete object %s on Parse ' % self.score)
def testCanIncrementField(self):
previous_score = self.score.score
self.score.save()
self.score.increment('score')
self.assert_(GameScore.Query.where(score=previous_score + 1).exists(),
'Failed to increment score on backend')
@unittest.skip("Skipping")
class TestFunction(unittest.TestCase):
def setUp(self):
"""create and deploy cloud functions"""
......@@ -202,11 +136,15 @@ class TestFunction(unittest.TestCase):
self.assertEqual(ret["result"], u"Hello world!")
# Test the averageStars function- takes simple argument
r1 = parse_rest.Object("Review", {"movie": "The Matrix",
r1 = parse_rest.Object(
"Review", {
"movie": "The Matrix",
"stars": 5,
"comment": "Too bad they never made any sequels."})
r1.save()
r2 = parse_rest.Object("Review", {"movie": "The Matrix",
r2 = parse_rest.Object(
"Review", {
"movie": "The Matrix",
"stars": 4,
"comment": "It's OK."})
r2.save()
......@@ -217,50 +155,60 @@ class TestFunction(unittest.TestCase):
class TestUser(unittest.TestCase):
def setUp(self):
"""remove the test user if he exists"""
u = parse_rest.User("dhelmet@spaceballs.com", "12345")
USERNAME = "dhelmet@spaceballs.com"
PASSWORD = "12345"
def _get_user(self):
try:
u.login()
u.delete()
except parse_rest.ParseError as e:
# if the user doesn't exist, that's fine
if e.message != "Invalid login":
raise
def test_user(self):
"""Test the ability to sign up, log in, and delete users"""
u = parse_rest.User("dhelmet@spaceballs.com", "12345")
u.signup()
# can't save or delete until it's logged in
self.assertRaises(parse_rest.ParseError, u.save, ())
self.assertRaises(parse_rest.ParseError, u.delete, ())
u.login()
self.assertTrue(hasattr(u, "sessionToken"))
self.assertNotEqual(u.sessionToken, None)
user = User.signup(self.username, self.password)
except:
user = User.Query.get(username=self.username)
return user
# add phone number and save
u.phone = "555-5555"
u.save()
def _destroy_user(self):
user = self._get_logged_user()
user and user.delete()
def _get_logged_user(self):
if User.Query.where(username=self.username).exists():
return User.login(self.username, self.password)
else:
return self._get_user()
uq = parse_rest.UserQuery()
u_retrieved = uq.get(u.objectId())
self.assertEqual(u.username, u_retrieved.username)
self.assertEqual(u_retrieved.phone, "555-5555")
def setUp(self):
self.username = TestUser.USERNAME
self.password = TestUser.PASSWORD
def tearDown(self):
self._destroy_user()
# test accessing like a dictionary
self.assertEqual(u_retrieved["username"], "dhelmet@spaceballs.com")
self.assertEqual(u_retrieved["phone"], "555-5555")
def testCanSignUp(self):
self._destroy_user()
user = User.signup(self.username, self.password)
self.assert_(user is not None)
# try creating another account with the same user
u2 = parse_rest.User("dhelmet@spaceballs.com", "12345")
self.assertRaises(parse_rest.ParseError, u2.signup)
def testCanLogin(self):
self._get_user() # User should be created here.
user = User.login(self.username, self.password)
self.assert_(user.is_authenticated(), 'Login failed')
# time to delete
u.delete()
def testCanUpdate(self):
user = self._get_logged_user()
phone_number = '555-5555'
# add phone number and save
user.phone = phone_number
user.save()
self.assert_(User.Query.where(phone=phone_number).exists(),
'Failed to update user data. New info not on Parse')
def testCanQueryBySession(self):
User.signup(self.username, self.password)
logged = User.login(self.username, self.password)
queried = User.Query.where(sessionToken=logged.sessionToken).get()
self.assert_(queried.objectId == logged.objectId,
'Could not find user %s by session' % logged.username)
if __name__ == "__main__":
# command line
......
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from __init__ import ParseResource, API_ROOT
from __init__ import ResourceRequestLoginRequired, ResourceRequestBadRequest
from query import QueryManager
def login_required(func):
'''decorator describing User methods that need to be logged in'''
def ret(obj, *args, **kw):
if not hasattr(obj, 'sessionToken'):
message = '%s requires a logged-in session' % func.__name__
raise ResourceRequestLoginRequired(message)
func(obj, *args, **kw)
return ret
class User(ParseResource):
'''
A User is like a regular Parse object (can be modified and saved) but
it requires additional methods and functionality
'''
ENDPOINT_ROOT = '/'.join([API_ROOT, 'users'])
PROTECTED_ATTRIBUTES = ParseResource.PROTECTED_ATTRIBUTES + [
'username', 'sessionToken']
def is_authenticated(self):
return getattr(self, 'sessionToken', None) or False
@login_required
def save(self):
session_header = {'X-Parse-Session-Token': self.sessionToken}
return self.__class__.PUT(
self._absolute_url,
extra_headers=session_header,
**self._to_native())
@login_required
def delete(self):
session_header = {'X-Parse-Session-Token': self.sessionToken}
return self.DELETE(self._absolute_url, extra_headers=session_header)
@staticmethod
def signup(username, password, **kw):
return User(**User.POST('', username=username, password=password, **kw))
@staticmethod
def login(username, password):
login_url = '/'.join([API_ROOT, 'login'])
return User(**User.GET(login_url, username=username, password=password))
@staticmethod
def request_password_reset(email):
'''Trigger Parse's Password Process. Return True/False
indicate success/failure on the request'''
url = '/'.join([API_ROOT, 'requestPasswordReset'])
try:
User.POST(url, email=email)
return True
except Exception, why:
return False
User.Query = QueryManager(User)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment