""" Utility methods related to file handling. """ from datetime import datetime import os from pytz import UTC from django.core.exceptions import PermissionDenied from django.core.files.storage import DefaultStorage, get_valid_filename from django.utils.translation import ugettext as _ from django.utils.translation import ungettext class FileValidationException(Exception): """ An exception thrown during file validation. """ pass def store_uploaded_file( request, file_key, allowed_file_types, base_storage_filename, max_file_size, validator=None, ): """ Stores an uploaded file to django file storage. Args: request (HttpRequest): A request object from which a file will be retrieved. file_key (str): The key for retrieving the file from `request.FILES`. If no entry exists with this key, a `ValueError` will be thrown. allowed_file_types (list): a list of allowable file type extensions. These should start with a period and be specified in lower-case. For example, ['.txt', '.csv']. If the uploaded file does not end with one of these extensions, a `PermissionDenied` exception will be thrown. Note that the uploaded file extension does not need to be lower-case. base_storage_filename (str): the filename to be used for the stored file, not including the extension. The same extension as the uploaded file will be appended to this value. max_file_size (int): the maximum file size in bytes that the uploaded file can be. If the uploaded file is larger than this size, a `PermissionDenied` exception will be thrown. validator (function): an optional validation method that, if defined, will be passed the stored file (which is copied from the uploaded file). This method can do validation on the contents of the file and throw a `FileValidationException` if the file is not properly formatted. If any exception is thrown, the stored file will be deleted before the exception is re-raised. Note that the implementor of the validator function should take care to close the stored file if they open it for reading. Returns: Storage: the file storage object where the file can be retrieved from str: stored_file_name: the name of the stored file (including extension) """ if file_key not in request.FILES: raise ValueError("No file uploaded with key '" + file_key + "'.") uploaded_file = request.FILES[file_key] try: file_extension = os.path.splitext(uploaded_file.name)[1].lower() if file_extension not in allowed_file_types: file_types = "', '".join(allowed_file_types) msg = ungettext( "The file must end with the extension '{file_types}'.", "The file must end with one of the following extensions: '{file_types}'.", len(allowed_file_types)).format(file_types=file_types) raise PermissionDenied(msg) if uploaded_file.size > max_file_size: msg = _("Maximum upload file size is {file_size} bytes.").format(file_size=max_file_size) raise PermissionDenied(msg) stored_file_name = base_storage_filename + file_extension file_storage = DefaultStorage() # If a file already exists with the supplied name, file_storage will make the filename unique. stored_file_name = file_storage.save(stored_file_name, uploaded_file) if validator: try: validator(file_storage, stored_file_name) except: file_storage.delete(stored_file_name) raise finally: uploaded_file.close() return file_storage, stored_file_name # pylint: disable=invalid-name def course_filename_prefix_generator(course_id, separator='_'): """ Generates a course-identifying unicode string for use in a file name. Args: course_id (object): A course identification object. Returns: str: A unicode string which can safely be inserted into a filename. """ return get_valid_filename(unicode(separator).join([course_id.org, course_id.course, course_id.run])) # pylint: disable=invalid-name def course_and_time_based_filename_generator(course_id, base_name): """ Generates a filename (without extension) based on the current time and the supplied filename. Args: course_id (object): A course identification object (must have org, course, and run). base_name (str): A name describing what type of file this is. Any characters that are not safe for filenames will be converted per django.core.files.storage.get_valid_filename (Specifically, leading and trailing spaces are removed; other spaces are converted to underscores; and anything that is not a unicode alphanumeric, dash, underscore, or dot, is removed). Returns: str: a concatenation of the org, course and run from the input course_id, the input base_name, and the current time. Note that there will be no extension. """ return u"{course_prefix}_{base_name}_{timestamp_str}".format( course_prefix=course_filename_prefix_generator(course_id), base_name=get_valid_filename(base_name), timestamp_str=datetime.now(UTC).strftime("%Y-%m-%d-%H%M%S") # pylint: disable=maybe-no-member ) class UniversalNewlineIterator(object): """ This iterable class can be used as a wrapper around a file-like object which does not inherently support being read in universal-newline mode. It returns a line at a time. """ def __init__(self, original_file, buffer_size=4096): self.original_file = original_file self.buffer_size = buffer_size def __iter__(self): return self.generate_lines() @staticmethod def sanitize(string): """ Replace CR and CRLF with LF within `string`. """ return string.replace('\r\n', '\n').replace('\r', '\n') def generate_lines(self): """ Return data from `self.original_file` a line at a time, replacing CR and CRLF with LF. """ buf = self.original_file.read(self.buffer_size) line = '' while buf: for char in buf: if line.endswith('\r') and char == '\n': last_line = line line = '' yield self.sanitize(last_line) elif line.endswith('\r') or line.endswith('\n'): last_line = line line = char yield self.sanitize(last_line) else: line += char buf = self.original_file.read(self.buffer_size) if not buf and line: yield self.sanitize(line)