Commit 8a7ae4fc by Oleg Marshev Committed by Tim Babych

TNL-901 Add status endpoints.

parent 162bfb5e
...@@ -5,3 +5,6 @@ ...@@ -5,3 +5,6 @@
.coverage .coverage
.tox .tox
coverage/ coverage/
# Sqlite Database
*.db
...@@ -8,5 +8,4 @@ urlpatterns = patterns( ...@@ -8,5 +8,4 @@ urlpatterns = patterns(
url(r'^annotations/$', AnnotationListView.as_view(), name='annotations'), url(r'^annotations/$', AnnotationListView.as_view(), name='annotations'),
url(r'^annotations/(?P<annotation_id>[a-zA-Z0-9_-]+)/?$', AnnotationDetailView.as_view(), name='annotations_detail'), url(r'^annotations/(?P<annotation_id>[a-zA-Z0-9_-]+)/?$', AnnotationDetailView.as_view(), name='annotations_detail'),
url(r'^search/$', AnnotationSearchView.as_view(), name='annotations_search'), url(r'^search/$', AnnotationSearchView.as_view(), name='annotations_search'),
url(r'^status/$', RedirectView.as_view(url=reverse_lazy('status')), name='status'),
) )
import datetime
import base64
from mock import patch, Mock
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
from rest_framework.test import APITestCase from rest_framework.test import APITestCase
from elasticsearch.exceptions import TransportError
class OperationalEndpointsTest(APITestCase): class OperationalEndpointsTest(APITestCase):
""" """
Tests for operational endpoints. Tests for operational endpoints.
""" """
def test_status(self): def test_heartbeat(self):
""" """
Test if server is alive. Heartbeat endpoint success.
""" """
response = self.client.get(reverse('status')) response = self.client.get(reverse('heartbeat'))
self.assertEquals(response.status_code, 200) self.assertEquals(response.status_code, 200)
self.assertEquals(response.data, {"OK": True})
@patch('annotator.elasticsearch.ElasticSearch.conn')
def test_heartbeat_failure(self, mocked_conn):
"""
Elasticsearch is not reachable.
"""
mocked_conn.ping.return_value = False
response = self.client.get(reverse('heartbeat'))
self.assertEquals(response.status_code, 500)
self.assertEquals(response.data, {"OK": False, "check": "es"})
def test_root(self): def test_root(self):
""" """
...@@ -26,3 +40,42 @@ class OperationalEndpointsTest(APITestCase): ...@@ -26,3 +40,42 @@ class OperationalEndpointsTest(APITestCase):
"version": "1" "version": "1"
} }
) )
class OperationalAuthEndpointsTest(APITestCase):
"""
Tests for operational authenticated endpoints.
"""
def test_selftest_status(self):
"""
Test status on authorization success.
"""
response = self.client.get(reverse('selftest'))
self.assertEquals(response.status_code, 200)
@patch('notesserver.views.datetime', datetime=Mock(wraps=datetime.datetime))
@patch('annotator.elasticsearch.ElasticSearch.conn')
def test_selftest_data(self, mocked_conn, mocked_datetime):
"""
Test returned data on success.
"""
mocked_datetime.datetime.now.return_value = datetime.datetime(2014, 12, 11)
mocked_conn.info.return_value = {}
response = self.client.get(reverse('selftest'))
self.assertEquals(response.status_code, 200)
self.assertEquals(
response.data,
{
"es": {},
"time_elapsed": 0.0
}
)
@patch('annotator.elasticsearch.ElasticSearch.conn')
def test_selftest_failure(self, mocked_conn):
"""
Elasticsearch is not reachable on selftest.
"""
mocked_conn.info.side_effect = TransportError()
response = self.client.get(reverse('selftest'))
self.assertEquals(response.status_code, 500)
from django.conf.urls import patterns, url, include from django.conf.urls import patterns, url, include
from notesserver.views import StatusView
urlpatterns = patterns( urlpatterns = patterns(
'', '',
url(r'^status/$', StatusView.as_view(), name='status'), url(r'^heartbeat/$', 'notesserver.views.heartbeat', name='heartbeat'),
url(r'^selftest/$', 'notesserver.views.selftest', name='selftest'),
url(r'^$', 'notesserver.views.root', name='root'), url(r'^$', 'notesserver.views.root', name='root'),
url(r'^api/', include('notesapi.urls', namespace='api')), url(r'^api/', include('notesapi.urls', namespace='api')),
) )
import traceback
import datetime
from rest_framework import status
from rest_framework.permissions import AllowAny from rest_framework.permissions import AllowAny
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework.views import APIView from rest_framework.views import APIView
from rest_framework.decorators import api_view, permission_classes from rest_framework.decorators import api_view, permission_classes, authentication_classes
from elasticsearch.exceptions import TransportError
from annotator import es
@api_view(['GET']) @api_view(['GET'])
...@@ -16,14 +22,37 @@ def root(request): # pylint: disable=unused-argument ...@@ -16,14 +22,37 @@ def root(request): # pylint: disable=unused-argument
}) })
@api_view(['GET'])
@permission_classes([AllowAny]) @permission_classes([AllowAny])
class StatusView(APIView): def heartbeat(request): # pylint: disable=unused-argument
""" """
Determine if server is alive. ElasticSearch is reachable and ready to handle requests.
""" """
if es.conn.ping():
return Response({"OK": True})
else:
return Response({"OK": False, "check": "es"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def get(self, *args, **kwargs): # pylint: disable=unused-argument @api_view(['GET'])
@permission_classes([AllowAny])
def selftest(request): # pylint: disable=unused-argument
""" """
Service status. Manual test endpoint.
""" """
return Response() start = datetime.datetime.now()
try:
es_status = es.conn.info()
except TransportError:
return Response(
{"es_error": traceback.format_exc()},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
end = datetime.datetime.now()
delta = end - start
return Response({
"es": es_status,
"time_elapsed": int(delta.total_seconds() * 1000) # In milliseconds.
})
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment