@@ -367,47 +367,68 @@ await runner.task # This is not needed in a notebook environment!
367367timer.result()
368368```
369369
370- ## Executing an asynchronous runner
370+ ## Custom parallelization using coroutines
371371
372- In this example we will show how to send complex tasks to adaptive as coroutines.
373- We require an asynchronous client to perform the execution of asynchronous tasks.
374- Here we will use ` dask.distributed ` .
372+ Adaptive by itself does not implement a way of sharing partial results between function executions.
373+ Its implementation of parallel computation using executors is minimal by design.
374+ Instead the appropriate way to implement custom parallelization is by using coroutines (asynchronous functions).
375+
376+ We illustrate this approach by using ` dask.distributed ` for parallel computations in part because it supports asynchronous operation out-of-the-box.
377+ Let us consider the computation below which has a slow but reusable part, and a fast part that cannot be reused.
375378
376379``` {code-cell} ipython3
377- from dask.distributed import Client
380+ import time
378381
379- client = await Client(asynchronous=True)
380- ```
381382
382- Once we have an asynchronous client, all its instances will be coroutines and they need to be called accordingly.
383- For example, ` await client.close() ` .
383+ def g(x):
384+ """Slow but reusable function"""
385+ time.sleep(random.randrange(5))
386+ return x**3
387+
388+
389+ def h(x):
390+ """Fast function"""
391+ return x**4
392+ ```
384393
385- In this case, we consider a function ` h ` that has some internal dependency on a function ` g ` .
386- The function to be learned is ` async_h ` , which submits ` h ` as a coroutine to the client .
394+ We need to convert ` f ` into a dask graph by using ` dask.delayed ` .
395+ In this example we will show how to send complex tasks to adaptive as coroutines .
387396
388397``` {code-cell} ipython3
389- import time
398+ from dask import delayed
390399
400+ # Convert f and g to dask.Delayed objects
401+ g, h = delayed(g), delayed(h)
391402
392- def h(x, offset=offset):
393- a = 0.01
394- x = g(x)
395- return x + a**2 / (a**2 + (x - offset) ** 2)
403+ @delayed
404+ def f(x, y):
405+ return (x + y)**2
406+ ```
396407
408+ Next we define a computation using coroutines such that it reuses previously submitted tasks.
397409
398- def g(x):
399- time.sleep(random.randrange(5))
400- return x**2
410+ ``` {code-cell} ipython3
411+ from collections import defaultdict
412+ from dask.distributed import Client
413+
414+ client = await Client(asynchronous=True)
415+
416+ g_futures = {}
417+
418+ async def f_parallel(x):
419+ # Get or sumbit the slow function future
420+ if (g_future := g_futures.get(int(x))) is None:
421+ g_futures[int(x)] = g_future = client.compute(g(int(x)))
401422
423+ future_f = client.compute(f(g_future, h(x % 1)))
402424
403- async def async_h(x):
404- return await client.submit(h, x)
425+ return await future_f
405426```
406427
407- When providing the asynchronous function to the ` learner ` and run it via ` AsyncRunner ` .
428+ Finally we provide the asynchronous function to the ` learner ` and run it via ` AsyncRunner ` .
408429
409430``` {code-cell} ipython3
410- learner = adaptive.Learner1D(async_h, bounds=(-1, 1 ))
431+ learner = adaptive.Learner1D(async_h, bounds=(-3.5, 3.5 ))
411432
412433runner = adaptive.AsyncRunner(learner, goal=lambda l: l.loss() < 0.01, ntasks=20)
413434```
0 commit comments