Commit 84c200f2 by Gabe Mulley

Fix S3 directory reading

Only writing was broken, so don't override the HdfsTarget behavior for reading. It breaks the ability to read a directory from S3 as if it were a single file.

Change-Id: Ibaf6c7f92fdda8c695b3d71a1c19aeb95c338c34
parent 3aded9ec
...@@ -6,7 +6,7 @@ from fnmatch import fnmatch ...@@ -6,7 +6,7 @@ from fnmatch import fnmatch
from urlparse import urlparse from urlparse import urlparse
from boto.s3.key import Key from boto.s3.key import Key
from luigi.s3 import S3Client, AtomicS3File, ReadableS3File, FileNotFoundException from luigi.s3 import S3Client, AtomicS3File
import luigi.hdfs import luigi.hdfs
...@@ -101,8 +101,9 @@ class RestrictedPermissionsS3Client(S3Client): ...@@ -101,8 +101,9 @@ class RestrictedPermissionsS3Client(S3Client):
class S3HdfsTarget(luigi.hdfs.HdfsTarget): class S3HdfsTarget(luigi.hdfs.HdfsTarget):
"""HDFS target that supports writing and reading files directly in S3.""" """HDFS target that supports writing and reading files directly in S3."""
# Luigi does not support HDFS targets that point to complete URLs like "s3://foo/bar" it only supports HDFS paths # Luigi does not support writing to HDFS targets that point to complete URLs like "s3://foo/bar" it only supports
# that look like standard file paths "/foo/bar". Once this bug is fixed this class is no longer necessary. # HDFS paths that look like standard file paths "/foo/bar". Once this bug is fixed this class is no longer
# necessary.
# TODO: Fix the upstream bug in luigi that prevents writing to HDFS files that are specified by complete URLs # TODO: Fix the upstream bug in luigi that prevents writing to HDFS files that are specified by complete URLs
...@@ -114,13 +115,8 @@ class S3HdfsTarget(luigi.hdfs.HdfsTarget): ...@@ -114,13 +115,8 @@ class S3HdfsTarget(luigi.hdfs.HdfsTarget):
if mode not in ('r', 'w'): if mode not in ('r', 'w'):
raise ValueError("Unsupported open mode '{mode}'".format(mode=mode)) raise ValueError("Unsupported open mode '{mode}'".format(mode=mode))
safe_path = self.path.replace('s3n://', 's3://')
if mode == 'r': if mode == 'r':
s3_key = self.s3_client.get_key(safe_path) return super(S3HdfsTarget, self).open(mode=mode)
if s3_key:
return ReadableS3File(s3_key)
else:
raise FileNotFoundException("Could not find file at %s" % safe_path)
else: else:
safe_path = self.path.replace('s3n://', 's3://')
return AtomicS3File(safe_path, self.s3_client) return AtomicS3File(safe_path, self.s3_client)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment