@@ -7,7 +7,7 @@ FactoryMethods. They are not designed for inheritance but rather for
77composition.
88
99``` ruby
10- Concurrent ::Promises ::FactoryMethods .instance_methods false
10+ Concurrent ::Promises ::FactoryMethods .instance_methods
1111```
1212
1313The module can be included or extended where needed.
@@ -438,6 +438,27 @@ future.fulfill 1 rescue $!
438438future.fulfill 2 , false
439439```
440440
441+ ## How are promises executed?
442+
443+ Promises use global pools to execute the tasks. Therefore each task may run on
444+ different thread which implies that users have to be careful not to depend on
445+ Thread local variables (or they have to set at the begging of the task and
446+ cleaned up at the end of the task).
447+
448+ Since the tasks are running on may different threads of the thread pool, it's
449+ better to follow following rules:
450+
451+ - Use only data passed in through arguments or values of parent futures, to
452+ have better control over what are futures accessing.
453+ - The data passed in and out of futures are easier to deal with if they are
454+ immutable or at least treated as such.
455+ - Any mutable and mutated object accessed by more than one threads or futures
456+ must be thread safe, see {Concurrent::Array}, {Concurrent::Hash}, and
457+ {Concurrent::Map}. (Value of a future may be consumed by many futures.)
458+ - Futures can access outside objects, but they has to be thread-safe.
459+
460+ > * TODO: This part to be extended*
461+
441462# Advanced
442463
443464## Callbacks
@@ -470,6 +491,25 @@ Promises.future_on(:fast) { 2 }.
470491 value.size
471492```
472493
494+ ## Run (simulated process)
495+
496+ Similar to flatting is running. When ` run ` is called on a future it will flat
497+ indefinitely as long the future fulfils into a ` Future ` value. It can be used
498+ to simulate a thread like processing without actually occupying the thread.
499+
500+ ``` ruby
501+ count = lambda do |v |
502+ v += 1
503+ v < 5 ? Promises .future_on(:fast , v, & count) : v
504+ end
505+ 400 .times.
506+ map { Promises .future_on(:fast , 0 , & count).run.value! }.
507+ all? { |v | v == 5 }
508+ ```
509+
510+ Therefore the above example finished fine on the the ` :fast ` thread pool even
511+ though it has much less threads than there is the simulated process.
512+
473513# Interoperability
474514
475515## Actors
@@ -500,10 +540,47 @@ The `ask` method returns future.
500540``` ruby
501541actor.ask(2 ).then(& :succ ).value!
502542```
543+ ## ProcessingActor
544+
545+ > * TODO: Documentation to be added in few days*
546+
547+ ## Channel
548+
549+ There is an implementation of channel as well. Lets start by creating a
550+ channel with capacity 2 messages.
551+
552+ ``` ruby
553+ ch1 = Concurrent ::Promises ::Channel .new 2
554+ ```
503555
504- ## Channels
556+ We push 3 messages, it can be observed that the last future representing the
557+ push is not fulfilled since the capacity prevents it. When the work which fills
558+ the channel depends on the futures created by push it can be used to create
559+ back pressure – the filling work is delayed until the channel has space for
560+ more messages.
505561
506- > * TODO: To be added*
562+ ``` ruby
563+ pushes = 3 .times.map { |i | ch1.push i }
564+ ch1.pop.value!
565+ pushes
566+ ```
567+
568+ A selection over channels can be created with select_channel factory method. It
569+ will be fulfilled with a first message available in any of the channels. It
570+ returns a pair to be able to find out which channel had the message available.
571+
572+ ``` ruby
573+ ch2 = Concurrent ::Promises ::Channel .new 2
574+ result = Concurrent ::Promises .select_channel(ch1, ch2)
575+ result.value!
576+
577+ Promises .future { 1 + 1 }.then_push_channel(ch1)
578+ result = (
579+ Concurrent ::Promises .fulfilled_future(' %02d' ) &
580+ Concurrent ::Promises .select_channel(ch1, ch2)).
581+ then { |format , (channel , value )| format format , value }
582+ result.value!
583+ ```
507584
508585# Use-cases
509586
@@ -573,7 +650,7 @@ results = 3.times.map { computer.ask [:run, -> { sleep 0.1; :result }] }
573650computer.ask(:status ).value!
574651results.map(& :value! )
575652```
576- ## Too many threads / fibers
653+ ## Solving the Thread count limit by thread simulation
577654
578655Sometimes an application requires to process a lot of tasks concurrently. If
579656the number of concurrent tasks is high enough than it is not possible to create
@@ -606,7 +683,7 @@ Promises.future(0, &body).run.value! # => 5
606683
607684This solution works well an any Ruby implementation.
608685
609- > TODO add more complete example
686+ > * TODO: More examples to be added. *
610687
611688## Cancellation
612689
@@ -771,55 +848,116 @@ end #
771848futures.map(& :value! )
772849```
773850
774- ## Long stream of tasks
851+ ## Long stream of tasks, applying back pressure
852+
853+ Lets assume that we queuing an API for a data and the queries can be faster
854+ than we are able to process them. This example shows how to use channel as a
855+ buffer and how to apply back pressure to slow down the queries.
856+
857+ ``` ruby
858+ require ' json' #
859+
860+ channel = Promises ::Channel .new 6
861+ source, token = Concurrent ::Cancellation .create
862+
863+ def query_random_text (token , channel )
864+ Promises .future do
865+ # for simplicity the query is omitted
866+ # url = 'some api'
867+ # Net::HTTP.get(URI(url))
868+ sleep 0.1
869+ { ' message' =>
870+ ' Lorem ipsum rhoncus scelerisque vulputate diam inceptos'
871+ }.to_json
872+ end .then(token) do |value , token |
873+ # The push to channel is fulfilled only after the message is successfully
874+ # published to the channel, therefore it will not continue querying until
875+ # current message is pushed.
876+ channel.push(value) |
877+ # It could wait on the push indefinitely if the token is not checked
878+ # here with `or` (the pipe).
879+ token.to_future
880+ end .flat_future.then(token) do |_ , token |
881+ # query again after the message is pushed to buffer
882+ query_random_text(token, channel) unless token.canceled?
883+ end
884+ end
885+
886+ words = []
887+ words_throttle = Concurrent ::Throttle .new 1
888+
889+ def count_words_in_random_text (token , channel , words , words_throttle )
890+ channel.pop.then do |response |
891+ string = JSON .load (response)[' message' ]
892+ # processing is slower than querying
893+ sleep 0.2
894+ words_count = string.scan(/\w +/ ).size
895+ end .then_throttled_by(words_throttle, words) do |words_count , words |
896+ # safe since throttled to only 1 task at a time
897+ words << words_count
898+ end .then(token) do |_ , token |
899+ # count words in next message
900+ unless token.canceled?
901+ count_words_in_random_text(token, channel, words, words_throttle)
902+ end
903+ end
904+ end
905+
906+ query_processes = 3 .times.map do
907+ Promises .future(token, channel, & method(:query_random_text )).run
908+ end
909+
910+ word_counter_processes = 2 .times.map do
911+ Promises .future(token, channel, words, words_throttle,
912+ & method(:count_words_in_random_text )).run
913+ end
914+
915+ sleep 0.5
916+ ```
917+
918+ Let it run for a while then cancel it and ensure that the runs all fulfilled
919+ (therefore ended) after the cancellation. Finally print the result.
775920
776- > TODO Channel
921+ ``` ruby
922+ source.cancel
923+ query_processes.map(& :wait! )
924+ word_counter_processes.map(& :wait! )
925+ words
926+ ```
777927
778- ## Parallel enumerable ?
928+ Compared to using threads directly this is highly configurable and compostable
929+ solution.
779930
780- > TODO
781931
782932## Periodic task
783933
784- > TODO revisit, use cancellation, add to library
934+ By combining ` schedule ` , ` run ` and ` Cancellation ` periodically executed task
935+ can be easily created.
785936
786937``` ruby
787- def schedule_job (interval , & job )
788- # schedule the first execution and chain restart of the job
789- Promises .schedule(interval, & job).chain do |fulfilled , continue , reason |
790- if fulfilled
791- schedule_job(interval, & job) if continue
792- else
793- # handle error
794- reason
795- # retry sooner
796- schedule_job(interval, & job)
797- end
798- end
938+ repeating_scheduled_task = -> interval, token, task do
939+ Promises .
940+ # Schedule the task.
941+ schedule(interval, token, & task).
942+ # If successful schedule again.
943+ # Alternatively use chain to schedule always.
944+ then { repeating_scheduled_task.call(interval, token, task) }
799945end
800946
801- queue = Queue .new
802- count = 0
803- interval = 0.05 # small just not to delay execution of this example
804-
805- schedule_job interval do
806- queue.push count
807- count += 1
808- # to continue scheduling return true, false will end the task
809- if count < 4
810- # to continue scheduling return true
811- true
812- else
813- # close the queue with nil to simplify reading it
814- queue.push nil
815- # to end the task return false
816- false
947+ cancellation, token = Concurrent ::Cancellation .create
948+
949+ task = -> token do
950+ 5 .times do
951+ token.raise_if_canceled
952+ # do stuff
953+ print ' .'
954+ sleep 0.01
817955 end
818956end
819957
820- # read the queue
821- arr, v = [], nil ; arr << v while (v = queue.pop) #
822- # arr has the results from the executed scheduled tasks
823- arr
958+ result = Promises .future( 0.1 , token, task, & repeating_scheduled_task).run
959+ sleep 0.2
960+ cancellation.cancel
961+ result.result
824962```
825963
0 commit comments