Skip to content

Commit e219dd0

Browse files
committed
[IMP] orm: add optional parallelism to iter_browse.__attr__()
In some cases, e.g. if it is known that calling a certain method on the model will only trigger inserts or it is clear that updates will be disjunct, such method calls can be done in parallel.
1 parent 01d0fba commit e219dd0

File tree

1 file changed

+99
-13
lines changed

1 file changed

+99
-13
lines changed

src/util/orm.py

Lines changed: 99 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,23 @@
99
on this module work along the ORM of *all* supported versions.
1010
"""
1111

12+
import collections
1213
import logging
14+
import multiprocessing
15+
import os
1316
import re
17+
import sys
1418
import uuid
1519
from contextlib import contextmanager
1620
from functools import wraps
17-
from itertools import chain
21+
from itertools import chain, repeat
1822
from textwrap import dedent
1923

24+
try:
25+
from concurrent.futures import ProcessPoolExecutor
26+
except ImportError:
27+
ProcessPoolExecutor = None
28+
2029
try:
2130
from unittest.mock import patch
2231
except ImportError:
@@ -28,9 +37,9 @@
2837
except ImportError:
2938
from odoo import SUPERUSER_ID
3039
from odoo import fields as ofields
31-
from odoo import modules, release
40+
from odoo import modules, release, sql_db
3241
except ImportError:
33-
from openerp import SUPERUSER_ID, modules, release
42+
from openerp import SUPERUSER_ID, modules, release, sql_db
3443

3544
try:
3645
from openerp import fields as ofields
@@ -42,8 +51,8 @@
4251
from .const import BIG_TABLE_THRESHOLD
4352
from .exceptions import MigrationError
4453
from .helpers import table_of_model
45-
from .misc import chunks, log_progress, version_between, version_gte
46-
from .pg import SQLStr, column_exists, format_query, get_columns, named_cursor
54+
from .misc import chunks, log_progress, str2bool, version_between, version_gte
55+
from .pg import SQLStr, column_exists, format_query, get_columns, get_max_workers, named_cursor
4756

4857
# python3 shims
4958
try:
@@ -53,6 +62,10 @@
5362

5463
_logger = logging.getLogger(__name__)
5564

65+
UPG_PARALLEL_ITER_BROWSE = str2bool(os.environ.get("UPG_PARALLEL_ITER_BROWSE", "0"))
66+
# FIXME: for CI! Remove before merge
67+
UPG_PARALLEL_ITER_BROWSE = True
68+
5669

5770
def env(cr):
5871
"""
@@ -342,6 +355,21 @@ def get_ids():
342355
cr.execute("DROP TABLE IF EXISTS _upgrade_rf")
343356

344357

358+
def _mp_iter_browse_cb(ids_or_values, params):
359+
me = _mp_iter_browse_cb
360+
# init upon first call. Done here instead of initializer callback, because py3.6 doesn't have it
361+
if not hasattr(me, "env"):
362+
sql_db._Pool = None # children cannot borrow from copies of the same pool, it will cause protocol error
363+
me.env = env(sql_db.db_connect(params["dbname"]).cursor())
364+
me.env.clear()
365+
# process
366+
if params["mode"] == "browse":
367+
getattr(
368+
me.env[params["model_name"]].with_context(params["context"]).browse(ids_or_values), params["attr_name"]
369+
)(*params["args"], **params["kwargs"])
370+
me.env.cr.commit()
371+
372+
345373
class iter_browse(object):
346374
"""
347375
Iterate over recordsets.
@@ -389,7 +417,19 @@ class iter_browse(object):
389417
See also :func:`~odoo.upgrade.util.orm.env`
390418
"""
391419

392-
__slots__ = ("_chunk_size", "_cr_uid", "_ids", "_it", "_logger", "_model", "_patch", "_query", "_size", "_strategy")
420+
__slots__ = (
421+
"_chunk_size",
422+
"_cr_uid",
423+
"_ids",
424+
"_it",
425+
"_logger",
426+
"_model",
427+
"_patch",
428+
"_query",
429+
"_size",
430+
"_strategy",
431+
"_superchunk_size",
432+
)
393433

394434
def __init__(self, model, *args, **kw):
395435
assert len(args) in [1, 3] # either (cr, uid, ids) or (ids,)
@@ -399,9 +439,30 @@ def __init__(self, model, *args, **kw):
399439
self._size = kw.pop("size", None)
400440
self._query = kw.pop("query", None)
401441
self._chunk_size = kw.pop("chunk_size", 200) # keyword-only argument
442+
self._superchunk_size = self._chunk_size
402443
self._logger = kw.pop("logger", _logger)
403444
self._strategy = kw.pop("strategy", "flush")
404-
assert self._strategy in {"flush", "commit"}
445+
assert self._strategy in {"flush", "commit", "multiprocessing"}
446+
if self._strategy == "multiprocessing":
447+
if not ProcessPoolExecutor:
448+
raise ValueError("multiprocessing strategy can not be used in scripts run by python2")
449+
if UPG_PARALLEL_ITER_BROWSE:
450+
self._superchunk_size = min(get_max_workers() * 10 * self._chunk_size, 1000000)
451+
else:
452+
self._strategy = "commit" # downgrade
453+
if self._size > 100000:
454+
_logger.warning(
455+
"Browsing %d %s, which may take a long time. "
456+
"This can be sped up by setting the env variable UPG_PARALLEL_ITER_BROWSE to 1. "
457+
"If you do, be sure to examine the results carefully.",
458+
self._size,
459+
self._model._name,
460+
)
461+
else:
462+
_logger.info(
463+
"Caller requested multiprocessing strategy, but UPG_PARALLEL_ITER_BROWSE env var is not set. "
464+
"Downgrading strategy to commit.",
465+
)
405466
if kw:
406467
raise TypeError("Unknown arguments: %s" % ", ".join(kw))
407468

@@ -436,7 +497,7 @@ def _ids_query(self):
436497
)
437498

438499
def get_ids():
439-
with named_cursor(cr, itersize=self._chunk_size) as ncr:
500+
with named_cursor(cr, itersize=self._superchunk_size) as ncr:
440501
ncr.execute(format_query(cr, "SELECT id FROM {} ORDER BY id", tmp_tbl))
441502
for (id_,) in ncr:
442503
yield id_
@@ -467,7 +528,7 @@ def _browse(self, ids):
467528
return self._model.browse(*args)
468529

469530
def _end(self):
470-
if self._strategy == "commit":
531+
if self._strategy in ["commit", "multiprocessing"]:
471532
self._model.env.cr.commit()
472533
else:
473534
flush(self._model)
@@ -494,15 +555,40 @@ def __getattr__(self, attr):
494555
if not callable(getattr(self._model, attr)):
495556
raise TypeError("The attribute %r is not callable" % attr)
496557

497-
it = self._it
558+
it = chunks(self._ids, self._superchunk_size, fmt=self._browse)
498559
if self._logger:
499-
sz = (self._size + self._chunk_size - 1) // self._chunk_size
500-
qualifier = "%s[:%d]" % (self._model._name, self._chunk_size)
560+
sz = (self._size + self._superchunk_size - 1) // self._superchunk_size
561+
qualifier = "%s[:%d]" % (self._model._name, self._superchunk_size)
501562
it = log_progress(it, self._logger, qualifier=qualifier, size=sz)
502563

503564
def caller(*args, **kwargs):
504565
args = self._cr_uid + args
505-
return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())]
566+
if self._strategy != "multiprocessing":
567+
return [getattr(chnk, attr)(*args, **kwargs) for chnk in chain(it, self._end())]
568+
params = {
569+
"dbname": self._model.env.cr.dbname,
570+
"model_name": self._model._name,
571+
# convert to dict for pickle. Will still break if any value in the context is not pickleable
572+
"context": dict(self._model.env.context),
573+
"attr_name": attr,
574+
"args": args,
575+
"kwargs": kwargs,
576+
"mode": "browse",
577+
}
578+
self._model.env.cr.commit()
579+
extrakwargs = {"mp_context": multiprocessing.get_context("fork")} if sys.version_info >= (3, 7) else {}
580+
with ProcessPoolExecutor(max_workers=get_max_workers(), **extrakwargs) as executor:
581+
for chunk in it:
582+
collections.deque(
583+
executor.map(
584+
_mp_iter_browse_cb, chunks(chunk._ids, self._chunk_size, fmt=tuple), repeat(params)
585+
),
586+
maxlen=0,
587+
)
588+
next(self._end(), None)
589+
# do not return results in // mode, we expect it to be used for huge numbers of
590+
# records and thus would risk MemoryError, also we cannot know if what attr returns is pickleable
591+
return None
506592

507593
self._it = None
508594
return caller

0 commit comments

Comments
 (0)