Skip to content

Commit 94f0b0b

Browse files
committed
[IMP] orm: add optional parallelism to iter_browse.create()
Like the same support added to `__attr__` in the parent commit, this can only be used by callers when it is known that database modifications will be distinct, not causing concurrency issues or side-effects on the results. `create` returns an `iter_browse` object for the caller to browse created records. With the multiprocessing strategy, we make the following changes to it: - To support vast amounts of created records in multiprocessing strategy, we process values in a generator and initialize the returned `iter_browse` object with it. As this requires the caller of `create` to always consume/iterate the result (otherwise records will not be created), it is not applied to the other strategies as it would break existing API. - make __iter__ yield chunks if strategy is multiprocessing. This way, a caller can process chunks of freshly created records `for records in util.iter_browse(strategy="multiprocessing").create(SQLStr)` and since everything from input to output is a generator, will be perfectly memory efficient. - do not pass the logger to the returned `iter_browse` object from `create`, if the strategy is multiprocessing, because it will lead to interleaved logging from the input generator and this one when the caller iterates it.
1 parent e219dd0 commit 94f0b0b

File tree

1 file changed

+52
-1
lines changed

1 file changed

+52
-1
lines changed

src/util/orm.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,12 @@ def _mp_iter_browse_cb(ids_or_values, params):
367367
getattr(
368368
me.env[params["model_name"]].with_context(params["context"]).browse(ids_or_values), params["attr_name"]
369369
)(*params["args"], **params["kwargs"])
370+
if params["mode"] == "create":
371+
new_ids = me.env[params["model_name"]].with_context(params["context"]).create(ids_or_values).ids
370372
me.env.cr.commit()
373+
if params["mode"] == "create":
374+
return new_ids
375+
return None
371376

372377

373378
class iter_browse(object):
@@ -543,8 +548,12 @@ def __iter__(self):
543548
raise RuntimeError("%r ran twice" % (self,))
544549

545550
it = chain.from_iterable(self._it)
551+
sz = self._size
552+
if self._strategy == "multiprocessing":
553+
it = self._it
554+
sz = (self._size + self._chunk_size - 1) // self._chunk_size
546555
if self._logger:
547-
it = log_progress(it, self._logger, qualifier=self._model._name, size=self._size)
556+
it = log_progress(it, self._logger, qualifier=self._model._name, size=sz)
548557
self._it = None
549558
return chain(it, self._end())
550559

@@ -626,6 +635,12 @@ def create(self, values=None, query=None, **kw):
626635
except TypeError:
627636
raise ValueError("When passing a generator of values, the size kwarg is mandatory")
628637

638+
if self._strategy == "multiprocessing":
639+
return self._create_multiprocess(values, size, multi)
640+
641+
return self._create(values, size, multi)
642+
643+
def _create(self, values, size, multi):
629644
it = chunks(values, self._chunk_size, fmt=list)
630645
if self._logger:
631646
sz = (size + self._chunk_size - 1) // self._chunk_size
@@ -651,6 +666,42 @@ def create(self, values=None, query=None, **kw):
651666
self._model, *args, chunk_size=self._chunk_size, logger=self._logger, strategy=self._strategy
652667
)
653668

669+
def _create_multiprocess(self, values, size, multi):
670+
if not multi:
671+
raise ValueError("The multiprocessing strategy only supports the multi version of `create`")
672+
673+
it = chunks(values, self._superchunk_size, fmt=list)
674+
if self._logger:
675+
sz = (size + self._superchunk_size - 1) // self._superchunk_size
676+
qualifier = "env[%r].create([:%d])" % (self._model._name, self._superchunk_size)
677+
it = log_progress(it, self._logger, qualifier=qualifier, size=sz)
678+
679+
def iter_proc():
680+
params = {
681+
"dbname": self._model.env.cr.dbname,
682+
"model_name": self._model._name,
683+
# convert to dict for pickle. Will still break if any value in the context is not pickleable
684+
"context": dict(self._model.env.context),
685+
"mode": "create",
686+
}
687+
self._model.env.cr.commit()
688+
self._patch.start()
689+
extrakwargs = {"mp_context": multiprocessing.get_context("fork")} if sys.version_info >= (3, 7) else {}
690+
with ProcessPoolExecutor(max_workers=get_max_workers(), **extrakwargs) as executor:
691+
for sub_values in it:
692+
for task_result in executor.map(
693+
_mp_iter_browse_cb, chunks(sub_values, self._chunk_size, fmt=tuple), repeat(params)
694+
):
695+
self._model.env.cr.commit() # make task_result visible on main cursor before yielding ids
696+
for new_id in task_result:
697+
yield new_id
698+
next(self._end(), None)
699+
700+
self._patch = no_selection_cache_validation()
701+
args = self._cr_uid + (iter_proc(),)
702+
kwargs = {"size": size, "chunk_size": self._chunk_size, "logger": None, "strategy": self._strategy}
703+
return iter_browse(self._model, *args, **kwargs)
704+
654705

655706
@contextmanager
656707
def custom_module_field_as_manual(env, rollback=True, do_flush=False):

0 commit comments

Comments
 (0)