@@ -397,7 +397,8 @@ defmodule GenStage do
397397 we will serve the existing demand, otherwise the event will be queued in
398398 `GenStage`'s internal buffer. In case events are being queued and not being
399399 consumed, a log message will be emitted when we exceed the `:buffer_size`
400- configuration.
400+ configuration. This behavior can be customized by implementing the optional
401+ `c:format_discarded/2` callback.
401402
402403 While the implementation above is enough to solve the constraints above,
403404 a more robust implementation would have tighter control over the events
@@ -974,6 +975,15 @@ defmodule GenStage do
974975 | { :stop , reason , new_state }
975976 when new_state: term , reason: term
976977
978+ @ doc """
979+ Invoked when items are discarded from the buffer.
980+
981+ It receives the number of excess (discarded) items from this invocation.
982+ This callback returns a boolean that controls whether the default error log for discarded items is printed or not.
983+ Return true to print the log, return false to skip the log.
984+ """
985+ @ callback format_discarded ( discarded :: non_neg_integer , state :: term ) :: boolean
986+
977987 @ doc """
978988 Invoked when a consumer is no longer subscribed to a producer.
979989
@@ -1122,6 +1132,7 @@ defmodule GenStage do
11221132 handle_cancel: 3 ,
11231133 handle_demand: 2 ,
11241134 handle_events: 3 ,
1135+ format_discarded: 2 ,
11251136
11261137 # GenServer
11271138 code_change: 3 ,
@@ -1693,6 +1704,14 @@ defmodule GenStage do
16931704 "GenStage.stream/1 expects a list of subscriptions, got: #{ inspect ( subscriptions ) } "
16941705 end
16951706
1707+ @ doc """
1708+ Returns the estimated number of buffered items for a producer.
1709+ """
1710+ @ spec estimate_buffered_count ( stage , timeout ) :: non_neg_integer
1711+ def estimate_buffered_count ( stage , timeout \\ 5000 ) do
1712+ call ( stage , :"$estimate_buffered_count" , timeout )
1713+ end
1714+
16961715 ## Callbacks
16971716
16981717 @ compile :inline_list_funcs
@@ -1822,15 +1841,19 @@ defmodule GenStage do
18221841 consumer_subscribe ( current , to , opts , stage )
18231842 end
18241843
1844+ def handle_call ( :"$estimate_buffered_count" , _from , stage ) do
1845+ producer_estimate_buffered_count ( stage )
1846+ end
1847+
18251848 def handle_call ( msg , from , % { mod: mod , state: state } = stage ) do
18261849 case mod . handle_call ( msg , from , state ) do
18271850 { :reply , reply , events , state } when is_list ( events ) ->
1828- stage = dispatch_events ( events , length ( events ) , stage )
1829- { :reply , reply , % { stage | state: state } }
1851+ stage = dispatch_events ( events , length ( events ) , % { stage | state: state } )
1852+ { :reply , reply , stage }
18301853
18311854 { :reply , reply , events , state , :hibernate } when is_list ( events ) ->
1832- stage = dispatch_events ( events , length ( events ) , stage )
1833- { :reply , reply , % { stage | state: state } , :hibernate }
1855+ stage = dispatch_events ( events , length ( events ) , % { stage | state: state } )
1856+ { :reply , reply , stage , :hibernate }
18341857
18351858 { :stop , reason , reply , state } ->
18361859 { :stop , reason , reply , % { stage | state: state } }
@@ -2106,12 +2129,12 @@ defmodule GenStage do
21062129 defp handle_noreply_callback ( return , stage ) do
21072130 case return do
21082131 { :noreply , events , state } when is_list ( events ) ->
2109- stage = dispatch_events ( events , length ( events ) , stage )
2110- { :noreply , % { stage | state: state } }
2132+ stage = dispatch_events ( events , length ( events ) , % { stage | state: state } )
2133+ { :noreply , stage }
21112134
21122135 { :noreply , events , state , :hibernate } when is_list ( events ) ->
2113- stage = dispatch_events ( events , length ( events ) , stage )
2114- { :noreply , % { stage | state: state } , :hibernate }
2136+ stage = dispatch_events ( events , length ( events ) , % { stage | state: state } )
2137+ { :noreply , stage , :hibernate }
21152138
21162139 { :stop , reason , state } ->
21172140 { :stop , reason , % { stage | state: state } }
@@ -2210,6 +2233,14 @@ defmodule GenStage do
22102233 { :noreply , stage }
22112234 end
22122235
2236+ defp maybe_format_discarded ( mod , excess , state ) do
2237+ if function_exported? ( mod , :format_discarded , 2 ) do
2238+ mod . format_discarded ( excess , state )
2239+ else
2240+ true
2241+ end
2242+ end
2243+
22132244 defp producer_cancel ( ref , kind , reason , stage ) do
22142245 % { consumers: consumers , monitors: monitors , state: state } = stage
22152246
@@ -2313,21 +2344,41 @@ defmodule GenStage do
23132344 stage
23142345 end
23152346
2316- defp buffer_events ( events , % { buffer: buffer , buffer_keep: keep } = stage ) do
2347+ defp buffer_events (
2348+ events ,
2349+ % {
2350+ mod: mod ,
2351+ buffer: buffer ,
2352+ buffer_keep: keep ,
2353+ state: state
2354+ } = stage
2355+ ) do
23172356 { buffer , excess , perms } = Buffer . store_temporary ( buffer , events , keep )
23182357
23192358 case excess do
23202359 0 ->
23212360 :ok
23222361
23232362 excess ->
2324- error_msg = 'GenStage producer ~tp has discarded ~tp events from buffer'
2325- :error_logger . warning_msg ( error_msg , [ Utils . self_name ( ) , excess ] )
2363+ if maybe_format_discarded ( mod , excess , state ) do
2364+ error_msg = 'GenStage producer ~tp has discarded ~tp events from buffer'
2365+ :error_logger . warning_msg ( error_msg , [ Utils . self_name ( ) , excess ] )
2366+ end
23262367 end
23272368
23282369 :lists . foldl ( & dispatch_info / 2 , % { stage | buffer: buffer } , perms )
23292370 end
23302371
2372+ defp producer_estimate_buffered_count ( % { type: :consumer } = stage ) do
2373+ error_msg = 'Buffered count can only be requested for producers, GenStage ~tp is a consumer'
2374+ :error_logger . error_msg ( error_msg , [ Utils . self_name ( ) ] )
2375+ { :reply , 0 , stage }
2376+ end
2377+
2378+ defp producer_estimate_buffered_count ( % { buffer: buffer } = stage ) do
2379+ { :reply , Buffer . estimate_size ( buffer ) , stage }
2380+ end
2381+
23312382 ## Info helpers
23322383
23332384 defp producer_info ( msg , % { type: :consumer } = stage ) do
0 commit comments