Commit ff98abc4 by Sofiya Semenova

Create HTTP video ingest API

parent 6023feb1
......@@ -56,4 +56,9 @@ urlpatterns = [
view=views.heartbeat,
name='heartbeat'
),
url(
r'^api/ingest_from_s3/',
view=views.IngestFromS3View.as_view(),
name='ingest_from_s3'
)
]
"""
Common utils.
"""
from rest_framework.parsers import BaseParser
from VEDA.utils import get_config
from VEDA_OS01.models import Encode, TranscriptStatus, URL, Video
......@@ -107,3 +109,16 @@ def is_video_ready(edx_id, ignore_encodes=list()):
ignore_encodes(list): A list containing the profiles that should not be considered.
"""
return set(get_incomplete_encodes(edx_id)).issubset(set(ignore_encodes))
class PlainTextParser(BaseParser):
"""
Plain text parser.
"""
media_type = 'text/plain'
def parse(self, stream, media_type=None, parser_context=None):
"""
Simply return a string representing the body of the request.
"""
return stream.read()
\ No newline at end of file
......@@ -3,6 +3,9 @@
import json
import logging
import boto
import boto.s3
from boto.exception import S3ResponseError
import requests
from django.db import connection
from django.db.utils import DatabaseError
......@@ -11,11 +14,13 @@ from django.views.decorators.csrf import csrf_exempt
from rest_framework import filters, renderers, status, viewsets
from rest_framework.decorators import (api_view, detail_route,
permission_classes)
from rest_framework.parsers import JSONParser
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.views import APIView
from api import token_finisher
from control.veda_file_discovery import FileDiscovery
from VEDA import utils
from VEDA_OS01.enums import TranscriptionProviderErrorType
from VEDA_OS01.models import (URL, Course, Encode, TranscriptCredentials,
......@@ -23,6 +28,7 @@ from VEDA_OS01.models import (URL, Course, Encode, TranscriptCredentials,
from VEDA_OS01.serializers import (CourseSerializer, EncodeSerializer,
URLSerializer, VideoSerializer)
from VEDA_OS01.transcripts import CIELO24_API_VERSION
from VEDA_OS01.utils import PlainTextParser
LOGGER = logging.getLogger(__name__)
......@@ -33,6 +39,12 @@ CIELO24_LOGIN_URL = utils.build_url(
'/account/login'
)
try:
boto.config.add_section('Boto')
except:
pass
boto.config.set('Boto', 'http_socket_timeout', '100')
class CourseViewSet(viewsets.ModelViewSet):
......@@ -267,6 +279,124 @@ class TranscriptCredentialsView(APIView):
return Response(status=status.HTTP_201_CREATED)
@permission_classes([AllowAny])
class IngestFromS3View(APIView):
"""
Endpoint called by Amazon SNS/SQS to ingest video from the s3 bucket.
"""
parser_classes = (JSONParser, PlainTextParser,)
def _manage_aws_sns_subscription(self, request_body, subscription_type_url):
"""
Manage HTTP endpoint subscription to SNS. There are two subscription_type_urls:
1. subscribeURL
2. unsubscribeURL
Upon receiving a request to subscribe or unsunscribe an SNS to an HTTP endpoint,
the endpoint must visit the URL provided by Amazon to confirm.
"""
url = request_body.get(subscription_type_url)
if not url:
return 400, 'Subscribe/unsubscribe URL not in request body'
requests.get(url)
return 200, ''
def _ingest_from_s3_bucket(self, request_body):
"""
Handle ingest from s3 bucket.
"""
# BUCKET_NAME = self.auth_dict['edx_s3_ingest_bucket']
status = 400
reason = ''
request_message = request_body.get('Message')
try:
message_json = json.loads(request_message)
s3_object = message_json.get('Records')[0].get('s3')
video_s3_key = s3_object.get('object').get('key')
except TypeError:
reason = 'Request message body does not contain expected output'
LOGGER.error('[HTTP INGEST] {reason}').format(
reason=reason,
)
return status, reason
if not video_s3_key:
return status, 'Video does not contain s3 key'
try:
connection = boto.connect_s3()
bucket = connection.get_bucket('sofiyajune21')
vd_key = bucket.get_key(video_s3_key)
file_discovery = FileDiscovery()
file_discovery.bucket = bucket
successful_ingest = file_discovery.validate_metadata_and_feed_to_ingest(
video_s3_key=vd_key
)
status = 200 if successful_ingest else 400
except S3ResponseError:
LOGGER.error('[DISCOVERY] S3 Ingest Connection Failure')
reason = 'S3 ingest connection failure'
return status, reason
@csrf_exempt
def post(self, request):
"""
Endpoint to handle requests from SNS.
Three types of messages can be sent:
1. A SubscriptionConfirmation - a subscription from SNS to this endpoint
2. A UnsubscribeConfirmation - unsubscribing SNS from this endpoint
3. A Notification - a request to ingest a video
"""
amazon_message_type = request.META.get('HTTP_X_AMZ_SNS_MESSAGE_TYPE')
if not amazon_message_type:
return JsonResponse(
{'Reason': 'Malformed header'},
status=400
)
if not request.data:
return JsonResponse(
{'Reason': 'Empty request body'},
status=400
)
json_data = json.loads(request.data)
if amazon_message_type == 'SubscriptionConfirmation':
status, reason = self._manage_aws_sns_subscription(json_data, 'SubscribeURL')
if status == 200:
LOGGER.info('[HTTP INGEST] SNS subscribed to HTTP endpoint')
elif amazon_message_type == 'UnsubscribeConfirmation':
status, reason = self._manage_aws_sns_subscription(json_data, 'UnsubscribeURL')
if status == 200:
LOGGER.info('[HTTP INGEST] SNS unsubscribed to HTTP endpoint')
elif amazon_message_type == 'Notification':
status, reason = self._ingest_from_s3_bucket(json_data)
if status == 200:
LOGGER.info('[HTTP INGEST] Video ingested through HTTP endpoint. Request body = {body}'.format(
body=request.data
))
else:
LOGGER.error('[HTTP INGEST] Video failed ingest through HTTP endpoint. Request body = {body}'.format(
body=request.data
))
else:
status = 400
reason = 'Unsupported or invalid amazon message type'
return JsonResponse(
{'Reason': reason},
status=status
)
@csrf_exempt
def token_auth(request):
"""
......
......@@ -290,7 +290,7 @@ class FileDiscovery(object):
if not file_downloaded:
# S3 Bucket ingest failed, move the file rejected directory.
self.move_video(video_s3_key, destination_dir=self.auth_dict['edx_s3_rejected_prefix'])
return
return False
# Prepare to ingest.
video_metadata = dict(
......@@ -322,6 +322,9 @@ class FileDiscovery(object):
if ingest.complete:
# Move the video file into 'prod-edx/processed' directory, if ingestion is complete.
self.move_video(video_s3_key, destination_dir=self.auth_dict['edx_s3_processed_prefix'])
return ingest.complete
else:
# Reject the video file and update val status to 'invalid_token'
self.reject_file_and_update_val(video_s3_key, filename, client_title, course_id)
return False
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment