test.py 1.39 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
"""Test for async task service status"""

from django.utils import unittest
from django.test.client import Client
from django.core.urlresolvers import reverse
import json


class CeleryConfigTest(unittest.TestCase):
    """
    Test that we can get a response from Celery
    """

    def setUp(self):
        """
        Create a django test client
        """
18
        super(CeleryConfigTest, self).setUp()
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
        self.client = Client()
        self.ping_url = reverse('status.service.celery.ping')

    def test_ping(self):
        """
        Try to ping celery.
        """

        # Access the service status page, which starts a delayed
        # asynchronous task
        response = self.client.get(self.ping_url)

        # HTTP response should be successful
        self.assertEqual(response.status_code, 200)

        # Expect to get a JSON-serialized dict with
        # task and time information
        result_dict = json.loads(response.content)

        # Was it successful?
        self.assertTrue(result_dict['success'])

        # We should get a "pong" message back
        self.assertEqual(result_dict['value'], "pong")

        # We don't know the other dict values exactly,
        # but we can assert that they take the right form
Ned Batchelder committed
46 47
        self.assertIsInstance(result_dict['task_id'], unicode)
        self.assertIsInstance(result_dict['time'], float)
48
        self.assertTrue(result_dict['time'] > 0.0)