Skip to content

Commit c3c615f

Browse files
authored
Merge pull request #423 from jbbarth/feat-boto3
boto2 -> boto3
2 parents 1666e92 + ae7dddb commit c3c615f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1419
-777
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ dynamic = ["version"]
3131

3232
dependencies = [
3333
"attrs",
34-
"boto>=2.49.0",
34+
"boto3>=1.28.20",
3535
"dill>=0.3.6",
3636
"diskcache>=4.1.0",
3737
"Jinja2",
@@ -49,6 +49,7 @@ dependencies = [
4949
[project.optional-dependencies]
5050
dev = [
5151
"black",
52+
"boto3-stubs[s3,swf]",
5253
"flaky",
5354
"invoke",
5455
"moto<3.0.0",

simpleflow/command.py

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from typing import TYPE_CHECKING
1010
from uuid import uuid4
1111

12-
import boto.connection
1312
import click
1413
import multiprocess
1514

@@ -34,24 +33,6 @@
3433
from simpleflow.swf.mapper.models.workflow import WorkflowType
3534

3635

37-
def disable_boto_connection_pooling():
38-
# boto connection pooling doesn't work very well with multiprocessing, it
39-
# provokes some errors like this:
40-
#
41-
# [Errno 1] _ssl.c:1429: error:1408F119:SSL routines:SSL3_GET_RECORD:decryption failed or bad record mac
42-
# when polling on analysis-testjbb-repair-a61ff96e854344748e308fefc9ddff61
43-
#
44-
# It's because when forking, file handles are copied and sockets are shared.
45-
# Even sockets that handle SSL conections to AWS services, but SSL
46-
# connections are stateful! So with multiple workers, it collides.
47-
#
48-
# To disable boto's connection pooling (which in practice makes boto open a
49-
# *NEW* connection for each call), we make make boto believe we run on
50-
# Google App Engine, where it disables connection pooling. There's no
51-
# "direct" setting, so that's a hack but that works.
52-
boto.connection.ON_APP_ENGINE = True
53-
54-
5536
def comma_separated_list(value):
5637
"""
5738
Transforms a comma-separated list into a list of strings.
@@ -577,8 +558,6 @@ def standalone(
577558
with a single main process.
578559
579560
"""
580-
disable_boto_connection_pooling()
581-
582561
if force_activities and not repair:
583562
raise ValueError("You should only use --force-activities with --repair.")
584563

simpleflow/metrology.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,16 +163,16 @@ def push_metrology(self, history):
163163
"""
164164
Fetch workflow history and merge it with metrology
165165
"""
166-
activity_keys = [obj for obj in storage.list_keys(settings.METROLOGY_BUCKET, self.metrology_path)]
166+
activity_keys = storage.list_keys(settings.METROLOGY_BUCKET, self.metrology_path)
167167
history_dumped = dump_history_to_json(history)
168168
history = json.loads(history_dumped)
169169

170170
for key in activity_keys:
171171
if not key.key.startswith(os.path.join(self.metrology_path, "activity.")):
172172
continue
173-
contents = key.get_contents_as_string(encoding="utf-8")
173+
contents = key.get()["Body"].read().decode("utf-8")
174174
result = json.loads(contents)
175-
search = ACTIVITY_KEY_RE.search(key.name)
175+
search = ACTIVITY_KEY_RE.search(key.key)
176176
name = search.group(1)
177177
for h in history:
178178
if h[0] == name:

simpleflow/storage.py

Lines changed: 44 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,34 @@
11
from __future__ import annotations
22

3+
import io
34
from typing import TYPE_CHECKING
45

5-
from boto.exception import S3ResponseError
6-
from boto.s3 import connect_to_region, connection
7-
from boto.s3.key import Key
6+
import boto3
7+
from botocore.exceptions import ClientError
88

99
from . import logger, settings
10+
from .swf.mapper.exceptions import extract_error_code
1011

1112
if TYPE_CHECKING:
1213
from typing import Optional, Tuple # NOQA
1314

14-
from boto.s3.bucket import Bucket # NOQA
15-
from boto.s3.bucketlistresultset import BucketListResultSet # NOQA
15+
from mypy_boto3_s3.service_resource import Bucket, ObjectSummary # NOQA
1616

1717
BUCKET_CACHE = {}
1818
BUCKET_LOCATIONS_CACHE = {}
1919

2020

21-
def get_connection(host_or_region: str) -> connection.S3Connection:
21+
def get_client() -> boto3.session.Session.client:
22+
return boto3.session.Session().client("s3")
23+
24+
25+
def get_resource(host_or_region: str) -> boto3.session.Session.resource:
2226
# first case: we got a valid DNS (host)
2327
if "." in host_or_region:
24-
return connection.S3Connection(host=host_or_region)
28+
return boto3.resource("s3", endpoint_url=f"https://{host_or_region}")
2529

2630
# second case: we got a region
27-
return connect_to_region(host_or_region)
31+
return boto3.resource("s3", region_name=host_or_region)
2832

2933

3034
def sanitize_bucket_and_host(bucket: str) -> tuple[str, str]:
@@ -48,20 +52,18 @@ def sanitize_bucket_and_host(bucket: str) -> tuple[str, str]:
4852

4953
# second case: we got a bucket name, we need to figure out which region it's in
5054
try:
51-
conn0 = connection.S3Connection()
52-
bucket_obj = conn0.get_bucket(bucket, validate=False)
53-
54-
# get_location() returns a region or an empty string for us-east-1,
55-
# historically named "US Standard" in some places. Maybe other S3
56-
# calls support an empty string as region, but I prefer to be
57-
# explicit here.
58-
location = bucket_obj.get_location() or "us-east-1"
55+
# get_bucket_location() returns a region or an empty string for us-east-1,
56+
# historically named "US Standard" in some places. Maybe other S3 calls
57+
# support an empty string as region, but I prefer to be explicit here.
58+
location = get_client().get_bucket_location(Bucket=bucket)["LocationConstraint"] or "us-east-1"
5959

6060
# save location for later use
6161
BUCKET_LOCATIONS_CACHE[bucket] = location
62-
except S3ResponseError as e:
63-
if e.error_code == "AccessDenied":
62+
except ClientError as e:
63+
error_code = extract_error_code(e)
64+
if error_code == "AccessDenied":
6465
# probably not allowed to perform GetBucketLocation on this bucket
66+
# TODO: consider raising instead? who forbids GetBucketLocation anyway?
6567
logger.warning(f"Access denied while trying to get location of bucket {bucket}")
6668
location = ""
6769
else:
@@ -74,45 +76,47 @@ def sanitize_bucket_and_host(bucket: str) -> tuple[str, str]:
7476
return bucket, location
7577

7678

77-
def get_bucket(bucket_name: str) -> Bucket:
79+
def get_bucket(bucket_name: str) -> "Bucket":
7880
bucket_name, location = sanitize_bucket_and_host(bucket_name)
79-
conn = get_connection(location)
81+
s3 = get_resource(location)
8082
if bucket_name not in BUCKET_CACHE:
81-
bucket = conn.get_bucket(bucket_name, validate=False)
83+
bucket = s3.Bucket(bucket_name)
8284
BUCKET_CACHE[bucket_name] = bucket
8385
return BUCKET_CACHE[bucket_name]
8486

8587

8688
def pull(bucket: str, path: str, dest_file: str) -> None:
87-
bucket = get_bucket(bucket)
88-
key = bucket.get_key(path)
89-
key.get_contents_to_filename(dest_file)
89+
bucket_resource = get_bucket(bucket)
90+
bucket_resource.download_file(path, dest_file)
9091

9192

9293
def pull_content(bucket: str, path: str) -> str:
93-
bucket = get_bucket(bucket)
94-
key = bucket.get_key(path)
95-
return key.get_contents_as_string(encoding="utf-8")
94+
bucket_resource = get_bucket(bucket)
95+
bytes_buffer = io.BytesIO()
96+
bucket_resource.download_fileobj(path, bytes_buffer)
97+
return bytes_buffer.getvalue().decode()
9698

9799

98100
def push(bucket: str, path: str, src_file: str, content_type: str | None = None) -> None:
99-
bucket = get_bucket(bucket)
100-
key = Key(bucket, path)
101-
headers = {}
101+
bucket_resource = get_bucket(bucket)
102+
extra_args = {}
102103
if content_type:
103-
headers["content_type"] = content_type
104-
key.set_contents_from_filename(src_file, headers=headers, encrypt_key=settings.SIMPLEFLOW_S3_SSE)
104+
extra_args["ContentType"] = content_type
105+
if settings.SIMPLEFLOW_S3_SSE:
106+
extra_args["ServerSideEncryption"] = "AES256"
107+
bucket_resource.upload_file(src_file, path, ExtraArgs=extra_args)
105108

106109

107110
def push_content(bucket: str, path: str, content: str, content_type: str | None = None) -> None:
108-
bucket = get_bucket(bucket)
109-
key = Key(bucket, path)
110-
headers = {}
111+
bucket_resource = get_bucket(bucket)
112+
extra_args = {}
111113
if content_type:
112-
headers["content_type"] = content_type
113-
key.set_contents_from_string(content, headers=headers, encrypt_key=settings.SIMPLEFLOW_S3_SSE)
114+
extra_args["ContentType"] = content_type
115+
if settings.SIMPLEFLOW_S3_SSE:
116+
extra_args["ServerSideEncryption"] = "AES256"
117+
bucket_resource.upload_fileobj(io.BytesIO(content.encode()), path, ExtraArgs=extra_args)
114118

115119

116-
def list_keys(bucket: str, path: str = None) -> BucketListResultSet:
117-
bucket = get_bucket(bucket)
118-
return bucket.list(path)
120+
def list_keys(bucket: str, path: str = None) -> list["ObjectSummary"]:
121+
bucket_resource = get_bucket(bucket)
122+
return [obj for obj in bucket_resource.objects.filter(Prefix=path or "").all()]

simpleflow/swf/executor.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@
4848
# doesn't get the scheduled task while it should...
4949
@retry.with_delay(nb_times=3, delay=retry.exponential, on_exceptions=KeyError)
5050
def run_fake_activity_task(domain, task_list, result):
51-
conn = ConnectedSWFObject().connection
52-
resp = conn.poll_for_activity_task(
51+
obj = ConnectedSWFObject()
52+
resp = obj.poll_for_activity_task(
5353
domain,
5454
task_list,
5555
identity=swf_identity(),
5656
)
57-
conn.respond_activity_task_completed(
57+
obj.respond_activity_task_completed(
5858
resp["taskToken"],
5959
result,
6060
)
@@ -63,13 +63,13 @@ def run_fake_activity_task(domain, task_list, result):
6363
# Same retry condition as run_fake_activity_task
6464
@retry.with_delay(nb_times=3, delay=retry.exponential, on_exceptions=KeyError)
6565
def run_fake_child_workflow_task(domain, task_list, result=None):
66-
conn = ConnectedSWFObject().connection
67-
resp = conn.poll_for_decision_task(
66+
obj = ConnectedSWFObject()
67+
resp = obj.poll_for_decision_task(
6868
domain,
6969
task_list,
7070
identity=swf_identity(),
7171
)
72-
conn.respond_decision_task_completed(
72+
obj.respond_decision_task_completed(
7373
resp["taskToken"],
7474
decisions=[
7575
{

simpleflow/swf/mapper/actors/core.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
11
from __future__ import annotations
22

3-
from typing import TYPE_CHECKING
43

54
from simpleflow.swf.mapper.core import ConnectedSWFObject
65
from simpleflow.swf.mapper.models.domain import Domain
76

8-
if TYPE_CHECKING:
9-
from boto.exception import SWFResponseError
10-
117

128
class Actor(ConnectedSWFObject):
139
"""SWF Actor base class.
@@ -50,11 +46,3 @@ def stop(self):
5046
shutting down.
5147
"""
5248
raise NotImplementedError
53-
54-
def get_error_message(self, e: SWFResponseError) -> str:
55-
message = e.error_message
56-
if not message:
57-
if e.body:
58-
# Expected 'message', got 'Message' ¯\_(ツ)_/¯
59-
message = e.body.get("Message")
60-
return message

simpleflow/swf/mapper/actors/decider.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,18 @@
22

33
from typing import TYPE_CHECKING, Any
44

5-
import boto.exception
5+
from botocore.exceptions import ClientError
66

77
from simpleflow import format, logging_context
88
from simpleflow.utils import json_dumps
99
from simpleflow.swf.mapper.actors.core import Actor
10-
from simpleflow.swf.mapper.exceptions import DoesNotExistError, PollTimeout, ResponseError
10+
from simpleflow.swf.mapper.exceptions import (
11+
DoesNotExistError,
12+
PollTimeout,
13+
ResponseError,
14+
extract_error_code,
15+
extract_message,
16+
)
1117
from simpleflow.swf.mapper.models.decision.base import Decision
1218
from simpleflow.swf.mapper.models.history.base import History
1319
from simpleflow.swf.mapper.models.workflow import WorkflowExecution, WorkflowType
@@ -43,14 +49,15 @@ def complete(
4349
if execution_context is not None and not isinstance(execution_context, str):
4450
execution_context = json_dumps(execution_context)
4551
try:
46-
self.connection.respond_decision_task_completed(
52+
self.respond_decision_task_completed(
4753
task_token,
4854
decisions,
49-
format.execution_context(execution_context),
55+
execution_context=format.execution_context(execution_context),
5056
)
51-
except boto.exception.SWFResponseError as e:
52-
message = self.get_error_message(e)
53-
if e.error_code == "UnknownResourceFault":
57+
except ClientError as e:
58+
error_code = extract_error_code(e)
59+
message = extract_message(e)
60+
if error_code == "UnknownResourceFault":
5461
raise DoesNotExistError(
5562
f"Unable to complete decision task with token={task_token}",
5663
message,
@@ -79,7 +86,7 @@ def poll(self, task_list=None, identity=None, **kwargs):
7986
logging_context.reset()
8087
task_list = task_list or self.task_list
8188

82-
task = self.connection.poll_for_decision_task(
89+
task = self.poll_for_decision_task(
8390
self.domain.name,
8491
task_list=task_list,
8592
identity=format.identity(identity),
@@ -97,16 +104,17 @@ def poll(self, task_list=None, identity=None, **kwargs):
97104
next_page = task.get("nextPageToken")
98105
while next_page:
99106
try:
100-
task = self.connection.poll_for_decision_task(
107+
task = self.poll_for_decision_task(
101108
self.domain.name,
102109
task_list=task_list,
103110
identity=format.identity(identity),
104111
next_page_token=next_page,
105112
**kwargs,
106113
)
107-
except boto.exception.SWFResponseError as e:
108-
message = self.get_error_message(e)
109-
if e.error_code == "UnknownResourceFault":
114+
except ClientError as e:
115+
error_code = extract_error_code(e)
116+
message = extract_message(e)
117+
if error_code == "UnknownResourceFault":
110118
raise DoesNotExistError(
111119
"Unable to poll decision task",
112120
message,

0 commit comments

Comments
 (0)