1818
1919logger = logging .getLogger (__name__ )
2020
21+ REQUEST_CONFIG = configure_opensearch_bulk_settings ()
2122
2223# Cluster functions
2324
@@ -27,11 +28,15 @@ def configure_opensearch_client(url: str) -> OpenSearch:
2728
2829 Includes the appropriate AWS credentials configuration if the URL is not localhost.
2930 """
31+ logger .info ("OpenSearch request configurations: %s" , REQUEST_CONFIG )
3032 if url == "localhost" :
3133 return OpenSearch (
3234 hosts = [{"host" : url , "port" : "9200" }],
3335 http_auth = ("admin" , "admin" ),
3436 connection_class = RequestsHttpConnection ,
37+ max_retries = REQUEST_CONFIG ["OPENSEARCH_BULK_MAX_RETRIES" ],
38+ retry_on_timeout = True ,
39+ timeout = REQUEST_CONFIG ["OPENSEARCH_REQUEST_TIMEOUT" ],
3540 )
3641
3742 credentials = boto3 .Session ().get_credentials ()
@@ -43,6 +48,9 @@ def configure_opensearch_client(url: str) -> OpenSearch:
4348 use_ssl = True ,
4449 verify_certs = True ,
4550 connection_class = RequestsHttpConnection ,
51+ max_retries = REQUEST_CONFIG ["OPENSEARCH_BULK_MAX_RETRIES" ],
52+ retry_on_timeout = True ,
53+ timeout = REQUEST_CONFIG ["OPENSEARCH_REQUEST_TIMEOUT" ],
4654 )
4755
4856
@@ -315,16 +323,13 @@ def bulk_index(
315323 Returns total sums of: records created, records updated, errors, and total records
316324 processed.
317325 """
318- bulk_config = configure_opensearch_bulk_settings ()
319326 result = {"created" : 0 , "updated" : 0 , "errors" : 0 , "total" : 0 }
320327 actions = helpers .generate_bulk_actions (index , records , "index" )
321328 responses = streaming_bulk (
322329 client ,
323330 actions ,
324- max_chunk_bytes = bulk_config ["OPENSEARCH_BULK_MAX_CHUNK_BYTES" ],
325- max_retries = bulk_config ["OPENSEARCH_BULK_MAX_RETRIES" ],
331+ max_chunk_bytes = REQUEST_CONFIG ["OPENSEARCH_BULK_MAX_CHUNK_BYTES" ],
326332 raise_on_error = False ,
327- request_timeout = bulk_config ["OPENSEARCH_REQUEST_TIMEOUT" ],
328333 )
329334 for response in responses :
330335 if response [0 ] is False :
@@ -350,7 +355,6 @@ def bulk_index(
350355 logger .info ("All records ingested, refreshing index." )
351356 response = client .indices .refresh (
352357 index = index ,
353- request_timeout = bulk_config ["OPENSEARCH_REQUEST_TIMEOUT" ],
354358 )
355359 logger .debug (response )
356360 return result
0 commit comments