Skip to content

Commit 3696a90

Browse files
committed
S3 streaming
1 parent 7f95af6 commit 3696a90

File tree

2 files changed

+264
-171
lines changed

2 files changed

+264
-171
lines changed

fs_s3fs/_s3fs.py

Lines changed: 19 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,14 @@
88
from datetime import datetime
99
import io
1010
import itertools
11-
import os
1211
from ssl import SSLError
13-
import tempfile
1412
import threading
1513
import mimetypes
1614

1715
import boto3
1816
from botocore.exceptions import ClientError, EndpointConnectionError
1917

2018
import six
21-
from six import text_type
2219

2320
from fs import ResourceType
2421
from fs.base import FS
@@ -29,6 +26,8 @@
2926
from fs.path import basename, dirname, forcedir, join, normpath, relpath
3027
from fs.time import datetime_to_epoch
3128

29+
from ._s3fs_file import S3InputFile, S3OutputFile
30+
3231

3332
def _make_repr(class_name, *args, **kwargs):
3433
"""
@@ -57,115 +56,6 @@ def __repr__(self):
5756
return "{}({})".format(class_name, ", ".join(arguments))
5857

5958

60-
class S3File(io.IOBase):
61-
"""Proxy for a S3 file."""
62-
63-
@classmethod
64-
def factory(cls, filename, mode, on_close):
65-
"""Create a S3File backed with a temporary file."""
66-
_temp_file = tempfile.TemporaryFile()
67-
proxy = cls(_temp_file, filename, mode, on_close=on_close)
68-
return proxy
69-
70-
def __repr__(self):
71-
return _make_repr(
72-
self.__class__.__name__, self.__filename, text_type(self.__mode)
73-
)
74-
75-
def __init__(self, f, filename, mode, on_close=None):
76-
self._f = f
77-
self.__filename = filename
78-
self.__mode = mode
79-
self._on_close = on_close
80-
81-
def __enter__(self):
82-
return self
83-
84-
def __exit__(self, exc_type, exc_value, traceback):
85-
self.close()
86-
87-
@property
88-
def raw(self):
89-
return self._f
90-
91-
def close(self):
92-
if self._on_close is not None:
93-
self._on_close(self)
94-
95-
@property
96-
def closed(self):
97-
return self._f.closed
98-
99-
def fileno(self):
100-
return self._f.fileno()
101-
102-
def flush(self):
103-
return self._f.flush()
104-
105-
def isatty(self):
106-
return self._f.asatty()
107-
108-
def readable(self):
109-
return self.__mode.reading
110-
111-
def readline(self, limit=-1):
112-
return self._f.readline(limit)
113-
114-
def readlines(self, hint=-1):
115-
if hint == -1:
116-
return self._f.readlines(hint)
117-
else:
118-
size = 0
119-
lines = []
120-
for line in iter(self._f.readline, b""):
121-
lines.append(line)
122-
size += len(line)
123-
if size > hint:
124-
break
125-
return lines
126-
127-
def seek(self, offset, whence=os.SEEK_SET):
128-
if whence not in (os.SEEK_CUR, os.SEEK_END, os.SEEK_SET):
129-
raise ValueError("invalid value for 'whence'")
130-
self._f.seek(offset, whence)
131-
return self._f.tell()
132-
133-
def seekable(self):
134-
return True
135-
136-
def tell(self):
137-
return self._f.tell()
138-
139-
def writable(self):
140-
return self.__mode.writing
141-
142-
def writelines(self, lines):
143-
return self._f.writelines(lines)
144-
145-
def read(self, n=-1):
146-
if not self.__mode.reading:
147-
raise IOError("not open for reading")
148-
return self._f.read(n)
149-
150-
def readall(self):
151-
return self._f.readall()
152-
153-
def readinto(self, b):
154-
return self._f.readinto()
155-
156-
def write(self, b):
157-
if not self.__mode.writing:
158-
raise IOError("not open for reading")
159-
self._f.write(b)
160-
return len(b)
161-
162-
def truncate(self, size=None):
163-
if size is None:
164-
size = self._f.tell()
165-
self._f.truncate(size)
166-
return size
167-
168-
16959
@contextlib.contextmanager
17060
def s3errors(path):
17161
"""Translate S3 errors to FSErrors."""
@@ -527,29 +417,18 @@ def openbin(self, path, mode="r", buffering=-1, **options):
527417
_path = self.validatepath(path)
528418
_key = self._path_to_key(_path)
529419

530-
if _mode.create:
420+
if _mode.appending:
421+
raise errors.ResourceError(path, msg="append mode is not supported")
531422

532-
def on_close_create(s3file):
533-
"""Called when the S3 file closes, to upload data."""
423+
if _mode.create:
424+
if self.strict:
534425
try:
535-
s3file.raw.seek(0)
536-
with s3errors(path):
537-
self.client.upload_fileobj(
538-
s3file.raw,
539-
self._bucket_name,
540-
_key,
541-
ExtraArgs=self._get_upload_args(_key),
542-
)
543-
finally:
544-
s3file.raw.close()
545-
546-
try:
547-
dir_path = dirname(_path)
548-
if dir_path != "/":
549-
_dir_key = self._path_to_dir_key(dir_path)
550-
self._get_object(dir_path, _dir_key)
551-
except errors.ResourceNotFound:
552-
raise errors.ResourceNotFound(path)
426+
dir_path = dirname(_path)
427+
if dir_path != "/":
428+
_dir_key = self._path_to_dir_key(dir_path)
429+
self._get_object(dir_path, _dir_key)
430+
except errors.ResourceNotFound:
431+
raise errors.ResourceNotFound(path)
553432

554433
try:
555434
info = self._getinfo(path)
@@ -561,50 +440,19 @@ def on_close_create(s3file):
561440
if info.is_dir:
562441
raise errors.FileExpected(path)
563442

564-
s3file = S3File.factory(path, _mode, on_close=on_close_create)
565-
if _mode.appending:
566-
try:
567-
with s3errors(path):
568-
self.client.download_fileobj(
569-
self._bucket_name,
570-
_key,
571-
s3file.raw,
572-
ExtraArgs=self.download_args,
573-
)
574-
except errors.ResourceNotFound:
575-
pass
576-
else:
577-
s3file.seek(0, os.SEEK_END)
578-
579-
return s3file
443+
obj = self.s3.Object(self._bucket_name, _key)
444+
return S3OutputFile(
445+
obj,
446+
upload_kwargs=self._get_upload_args(_key)
447+
)
580448

581449
if self.strict:
582450
info = self.getinfo(path)
583451
if info.is_dir:
584452
raise errors.FileExpected(path)
585453

586-
def on_close(s3file):
587-
"""Called when the S3 file closes, to upload the data."""
588-
try:
589-
if _mode.writing:
590-
s3file.raw.seek(0, os.SEEK_SET)
591-
with s3errors(path):
592-
self.client.upload_fileobj(
593-
s3file.raw,
594-
self._bucket_name,
595-
_key,
596-
ExtraArgs=self._get_upload_args(_key),
597-
)
598-
finally:
599-
s3file.raw.close()
600-
601-
s3file = S3File.factory(path, _mode, on_close=on_close)
602-
with s3errors(path):
603-
self.client.download_fileobj(
604-
self._bucket_name, _key, s3file.raw, ExtraArgs=self.download_args
605-
)
606-
s3file.seek(0, os.SEEK_SET)
607-
return s3file
454+
obj = self.s3.Object(self._bucket_name, _key)
455+
return S3InputFile(obj)
608456

609457
def remove(self, path):
610458
self.check()

0 commit comments

Comments
 (0)