88]
99
1010from collections import OrderedDict
11+ from concurrent .futures import ThreadPoolExecutor
12+ from os import cpu_count
1113from typing import Any , Callable , Optional , Sequence , Tuple , TypeVar , Union
12- from urllib .parse import urlencode
13- from uuid import uuid4
1414
1515from arango .connection import Connection
1616from arango .exceptions import (
1717 AsyncExecuteError ,
18- BatchExecuteError ,
1918 BatchStateError ,
2019 OverloadControlExecutorError ,
2120 TransactionAbortError ,
2726from arango .request import Request
2827from arango .response import Response
2928from arango .typings import Fields , Json
30- from arango .utils import suppress_warning
3129
3230ApiExecutor = Union [
3331 "DefaultApiExecutor" ,
@@ -126,35 +124,29 @@ class BatchApiExecutor:
126124 If set to False, API executions return None and no results are tracked
127125 client-side.
128126 :type return_result: bool
127+ :param max_workers: Use a thread pool of at most `max_workers`. If None,
128+ the default value is the number of CPUs. For backwards compatibility,
129+ the default value is 1, effectively behaving like single-threaded
130+ execution.
131+ :type max_workers: Optional[int]
129132 """
130133
131- def __init__ (self , connection : Connection , return_result : bool ) -> None :
134+ def __init__ (
135+ self ,
136+ connection : Connection ,
137+ return_result : bool ,
138+ max_workers : Optional [int ] = 1 ,
139+ ) -> None :
132140 self ._conn = connection
133141 self ._return_result : bool = return_result
134142 self ._queue : OrderedDict [str , Tuple [Request , BatchJob [Any ]]] = OrderedDict ()
135143 self ._committed : bool = False
144+ self ._max_workers : int = max_workers or cpu_count () # type: ignore
136145
137146 @property
138147 def context (self ) -> str :
139148 return "batch"
140149
141- def _stringify_request (self , request : Request ) -> str :
142- path = request .endpoint
143-
144- if request .params is not None :
145- path += f"?{ urlencode (request .params )} "
146- buffer = [f"{ request .method } { path } HTTP/1.1" ]
147-
148- if request .headers is not None :
149- for key , value in sorted (request .headers .items ()):
150- buffer .append (f"{ key } : { value } " )
151-
152- if request .data is not None :
153- serialized = self ._conn .serialize (request .data )
154- buffer .append ("\r \n " + serialized )
155-
156- return "\r \n " .join (buffer )
157-
158150 @property
159151 def jobs (self ) -> Optional [Sequence [BatchJob [Any ]]]:
160152 """Return the queued batch jobs.
@@ -190,7 +182,7 @@ def execute(
190182 return job if self ._return_result else None
191183
192184 def commit (self ) -> Optional [Sequence [BatchJob [Any ]]]:
193- """Execute the queued requests in a single batch API request .
185+ """Execute the queued requests in a batch of requests .
194186
195187 If **return_result** parameter was set to True during initialization,
196188 :class:`arango.job.BatchJob` instances are populated with results.
@@ -199,9 +191,7 @@ def commit(self) -> Optional[Sequence[BatchJob[Any]]]:
199191 False during initialization.
200192 :rtype: [arango.job.BatchJob] | None
201193 :raise arango.exceptions.BatchStateError: If batch state is invalid
202- (e.g. batch was already committed or size of response from server
203- did not match the expected).
204- :raise arango.exceptions.BatchExecuteError: If commit fails.
194+ (e.g. batch was already committed).
205195 """
206196 if self ._committed :
207197 raise BatchStateError ("batch already committed" )
@@ -211,65 +201,18 @@ def commit(self) -> Optional[Sequence[BatchJob[Any]]]:
211201 if len (self ._queue ) == 0 :
212202 return self .jobs
213203
214- # Boundary used for multipart request
215- boundary = uuid4 ().hex
204+ with ThreadPoolExecutor (
205+ max_workers = min (self ._max_workers , len (self ._queue ))
206+ ) as executor :
207+ for req , job in self ._queue .values ():
208+ job ._future = executor .submit (self ._conn .send_request , req )
216209
217- # Build the batch request payload
218- buffer = []
219- for req , job in self ._queue .values ():
220- buffer .append (f"--{ boundary } " )
221- buffer .append ("Content-Type: application/x-arango-batchpart" )
222- buffer .append (f"Content-Id: { job .id } " )
223- buffer .append ("\r \n " + self ._stringify_request (req ))
224- buffer .append (f"--{ boundary } --" )
225-
226- request = Request (
227- method = "post" ,
228- endpoint = "/_api/batch" ,
229- headers = {"Content-Type" : f"multipart/form-data; boundary={ boundary } " },
230- data = "\r \n " .join (buffer ),
231- )
232- with suppress_warning ("requests.packages.urllib3.connectionpool" ):
233- resp = self ._conn .send_request (request )
234-
235- if not resp .is_success :
236- raise BatchExecuteError (resp , request )
210+ for _ , job in self ._queue .values ():
211+ job ._status = "done"
237212
238213 if not self ._return_result :
239214 return None
240215
241- url_prefix = resp .url .strip ("/_api/batch" )
242- raw_responses = resp .raw_body .split (f"--{ boundary } " )[1 :- 1 ]
243-
244- if len (self ._queue ) != len (raw_responses ):
245- raise BatchStateError (
246- "expecting {} parts in batch response but got {}" .format (
247- len (self ._queue ), len (raw_responses )
248- )
249- )
250- for raw_resp in raw_responses :
251- # Parse and breakdown the batch response body
252- resp_parts = raw_resp .strip ().split ("\r \n " )
253- raw_content_id = resp_parts [1 ]
254- raw_body = resp_parts [- 1 ]
255- raw_status = resp_parts [3 ]
256- job_id = raw_content_id .split (" " )[1 ]
257- _ , status_code , status_text = raw_status .split (" " , 2 )
258-
259- # Update the corresponding batch job
260- queued_req , queued_job = self ._queue [job_id ]
261-
262- queued_job ._status = "done"
263- resp = Response (
264- method = queued_req .method ,
265- url = url_prefix + queued_req .endpoint ,
266- headers = {},
267- status_code = int (status_code ),
268- status_text = status_text ,
269- raw_body = raw_body ,
270- )
271- queued_job ._response = self ._conn .prep_response (resp )
272-
273216 return self .jobs
274217
275218
0 commit comments