@@ -68,6 +68,12 @@ This is a short guide on core features of `kotlinx.coroutines` with a series of
6868 * [ Fan-out] ( #fan-out )
6969 * [ Fan-in] ( #fan-in )
7070 * [ Buffered channels] ( #buffered-channels )
71+ * [ Shared mutable state and concurrency] ( #shared-mutable-state-and-concurrency )
72+ * [ The problem] ( #the-problem )
73+ * [ Thread-safe data structures] ( #thread-safe-data-structures )
74+ * [ Thread confinement] ( #thread-confinement )
75+ * [ Mutual exclusion] ( #mutual-exclusion )
76+ * [ Actors] ( #actors )
7177* [ Select expression] ( #select-expression )
7278 * [ Selecting from channels] ( #selecting-from-channels )
7379 * [ Selecting on close] ( #selecting-on-close )
@@ -1328,6 +1334,181 @@ Sending 4
13281334
13291335The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
13301336
1337+ ## Shared mutable state and concurrency
1338+
1339+ Coroutines can be executed concurrently using a multi-threaded dispatcher like [ CommonPool] . It presents
1340+ all the usual concurrency problems. The main problem being synchronization of access to ** shared mutable state** .
1341+ Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
1342+ but others are unique.
1343+
1344+ ### The problem
1345+
1346+ Let us launch 100k coroutines all doing the same action. We'll also measure their completion time for
1347+ further comparisons:
1348+
1349+ <!-- - INCLUDE .*/example-sync-([0-9]+).kt
1350+ import kotlin.system.measureTimeMillis
1351+ -->
1352+
1353+ <!-- - INCLUDE .*/example-sync-02.kt
1354+ import java.util.concurrent.atomic.AtomicInteger
1355+ -->
1356+
1357+ <!-- - INCLUDE .*/example-sync-04.kt
1358+ import kotlinx.coroutines.experimental.sync.Mutex
1359+ -->
1360+
1361+ <!-- - INCLUDE .*/example-sync-05.kt
1362+ import kotlinx.coroutines.experimental.channels.*
1363+ -->
1364+
1365+ ``` kotlin
1366+ suspend fun massiveRun (action : suspend () -> Unit ) {
1367+ val n = 100_000
1368+ val time = measureTimeMillis {
1369+ val jobs = List (n) {
1370+ launch(CommonPool ) {
1371+ action()
1372+ }
1373+ }
1374+ jobs.forEach { it.join() }
1375+ }
1376+ println (" Completed in $time ms" )
1377+ }
1378+ ```
1379+
1380+ <!-- - INCLUDE .*/example-sync-([0-9]+).kt -->
1381+
1382+ We start with a very simple action, that increments a shared mutable variable.
1383+
1384+ ``` kotlin
1385+ var counter = 0
1386+
1387+ fun main (args : Array <String >) = runBlocking<Unit > {
1388+ massiveRun {
1389+ counter++
1390+ }
1391+ println (" Counter = $counter " )
1392+ }
1393+ ```
1394+
1395+ > You can get full code [ here] ( kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-01.kt )
1396+
1397+ What does it print at the end? It is highly unlikely to ever print "100000", because all the
1398+ 100k coroutines increment the ` counter ` concurrently without any synchronization.
1399+
1400+ ### Thread-safe data structures
1401+
1402+ The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
1403+ linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
1404+ operations that needs to be performed on a shared state.
1405+ In the case of a simple counter we can use ` AtomicInteger ` class:
1406+
1407+ ``` kotlin
1408+ var counter = AtomicInteger ()
1409+
1410+ fun main (args : Array <String >) = runBlocking<Unit > {
1411+ massiveRun {
1412+ counter.incrementAndGet()
1413+ }
1414+ println (" Counter = ${counter.get()} " )
1415+ }
1416+ ```
1417+
1418+ > You can get full code [ here] ( kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-02.kt )
1419+
1420+ This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
1421+ standard data structures and basic operations on them. However, it does not easily scale to complex
1422+ state or to complex operations that do not have ready-to-use thread-safe implementations.
1423+
1424+ ### Thread confinement
1425+
1426+ Thread confinement is an approach to the problem of shared mutable state where all access to the particular shared
1427+ state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
1428+ the single event-dispatch/application thread. It is easy to apply with coroutines by using a
1429+ single-threaded context:
1430+
1431+ ``` kotlin
1432+ val counterContext = newSingleThreadContext(" CounterContext" )
1433+ var counter = 0
1434+
1435+ fun main (args : Array <String >) = runBlocking<Unit > {
1436+ massiveRun {
1437+ run (counterContext) {
1438+ counter++
1439+ }
1440+ }
1441+ println (" Counter = $counter " )
1442+ }
1443+ ```
1444+
1445+ > You can get full code [ here] ( kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-03.kt )
1446+
1447+ ### Mutual exclusion
1448+
1449+ Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _ critical section_
1450+ that is never executed concurrently. In a blocking world you'd typically use ` synchronized ` or ` ReentrantLock ` for that.
1451+ Coroutine's alternative is called [ Mutex] . It has [ lock] [ Mutex.lock ] and [ unlock] [ Mutex.unlock ] functions to
1452+ delimit a critical section. The key difference is that ` Mutex.lock ` is a suspending function. It does not block a thread.
1453+
1454+ ``` kotlin
1455+ val mutex = Mutex ()
1456+ var counter = 0
1457+
1458+ fun main (args : Array <String >) = runBlocking<Unit > {
1459+ massiveRun {
1460+ mutex.lock()
1461+ try { counter++ }
1462+ finally { mutex.unlock() }
1463+ }
1464+ println (" Counter = $counter " )
1465+ }
1466+ ```
1467+
1468+ > You can get full code [ here] ( kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-04.kt )
1469+
1470+ ### Actors
1471+
1472+ An actor is a combination of a coroutine, the state that is confined and is encapsulated into this coroutine,
1473+ and a channel to communicate with other coroutines. A simple actor can be written as a function,
1474+ but an actor with a complex state is better suited for a class.
1475+
1476+ ``` kotlin
1477+ // Message types for counterActor
1478+ sealed class CounterMsg
1479+ object IncCounter : CounterMsg() // one-way message to increment counter
1480+ class GetCounter (val response : SendChannel <Int >) : CounterMsg() // a request with reply
1481+
1482+ // This function launches a new counter actor
1483+ fun counterActor (request : ReceiveChannel <CounterMsg >) = launch(CommonPool ) {
1484+ var counter = 0 // actor state
1485+ while (true ) { // main loop of the actor
1486+ val msg = request.receive()
1487+ when (msg) {
1488+ is IncCounter -> counter++
1489+ is GetCounter -> msg.response.send(counter)
1490+ }
1491+ }
1492+ }
1493+
1494+ fun main (args : Array <String >) = runBlocking<Unit > {
1495+ val request = Channel <CounterMsg >()
1496+ counterActor(request)
1497+ massiveRun {
1498+ request.send(IncCounter )
1499+ }
1500+ val response = Channel <Int >()
1501+ request.send(GetCounter (response))
1502+ println (" Counter = ${response.receive()} " )
1503+ }
1504+ ```
1505+
1506+ > You can get full code [ here] ( kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-05.kt )
1507+
1508+ Notice, that it does not matter (for correctness) what context the actor itself is executed in. An actor is
1509+ a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
1510+ works as a solution to the problem of shared mutable state.
1511+
13311512## Select expression
13321513
13331514Select expression makes it possible to await multiple suspending function simultaneously and _ select_
@@ -1684,6 +1865,10 @@ Channel was closed
16841865[ CoroutineName ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
16851866[ Job.invoke ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/invoke.html
16861867[ Job.cancel ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/cancel.html
1868+ <!-- - INDEX kotlinx.coroutines.experimental.sync -->
1869+ [ Mutex ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
1870+ [ Mutex.lock ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/lock.html
1871+ [ Mutex.unlock ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/unlock.html
16871872<!-- - INDEX kotlinx.coroutines.experimental.channels -->
16881873[ Channel ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
16891874[ SendChannel.send ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/send.html
0 commit comments