1+ package com .oembedler .moon .graphql .boot .publishers ;
2+
3+ import java .math .BigDecimal ;
4+ import java .math .RoundingMode ;
5+ import java .time .LocalDateTime ;
6+ import java .util .ArrayList ;
7+ import java .util .List ;
8+ import java .util .Map ;
9+ import java .util .Random ;
10+ import java .util .concurrent .ConcurrentHashMap ;
11+ import java .util .concurrent .Executors ;
12+ import java .util .concurrent .ScheduledExecutorService ;
13+ import java .util .concurrent .TimeUnit ;
14+
15+ import org .slf4j .Logger ;
16+ import org .slf4j .LoggerFactory ;
17+ import org .springframework .stereotype .Component ;
18+ import com .oembedler .moon .graphql .boot .resolvers .StockPriceUpdate ;
19+ import reactor .core .publisher .ConnectableFlux ;
20+ import reactor .core .publisher .Flux ;
21+ import reactor .core .publisher .FluxSink ;
22+
23+ @ Component
24+ public class StockTickerReactorPublisher {
25+ private static final Logger LOG = LoggerFactory .getLogger (StockTickerRxPublisher .class );
26+
27+ private final Flux <StockPriceUpdate > publisher ;
28+
29+ public StockTickerReactorPublisher () {
30+ Flux <StockPriceUpdate > stockPriceUpdateFlux = Flux .create (emitter -> {
31+ ScheduledExecutorService executorService = Executors .newScheduledThreadPool (1 );
32+ executorService .scheduleAtFixedRate (newStockTick (emitter ), 0 , 2 , TimeUnit .SECONDS );
33+ }, FluxSink .OverflowStrategy .BUFFER );
34+ ConnectableFlux <StockPriceUpdate > connectableFlux = stockPriceUpdateFlux .share ().publish ();
35+ connectableFlux .connect ();
36+
37+ publisher = Flux .from (connectableFlux );
38+ }
39+
40+ private Runnable newStockTick (FluxSink <StockPriceUpdate > emitter ) {
41+ return () -> {
42+ List <StockPriceUpdate > stockPriceUpdates = getUpdates (rollDice (0 , 5 ));
43+ if (stockPriceUpdates != null ) {
44+ emitStocks (emitter , stockPriceUpdates );
45+ }
46+ };
47+ }
48+
49+ private void emitStocks (FluxSink <StockPriceUpdate > emitter , List <StockPriceUpdate > stockPriceUpdates ) {
50+ for (StockPriceUpdate stockPriceUpdate : stockPriceUpdates ) {
51+ try {
52+ emitter .next (stockPriceUpdate );
53+ } catch (RuntimeException e ) {
54+ LOG .error ("Cannot send StockUpdate" , e );
55+ }
56+ }
57+ }
58+
59+ public Flux <StockPriceUpdate > getPublisher () {
60+ return publisher ;
61+ }
62+
63+ public Flux <StockPriceUpdate > getPublisher (List <String > stockCodes ) {
64+ if (stockCodes != null ) {
65+ return publisher .filter (stockPriceUpdate -> stockCodes .contains (stockPriceUpdate .getStockCode ()));
66+ }
67+ return publisher ;
68+ }
69+
70+ private List <StockPriceUpdate > getUpdates (int number ) {
71+ List <StockPriceUpdate > updates = new ArrayList <>();
72+ for (int i = 0 ; i < number ; i ++) {
73+ updates .add (rollUpdate ());
74+ }
75+ return updates ;
76+ }
77+
78+ private final static Map <String , BigDecimal > CURRENT_STOCK_PRICES = new ConcurrentHashMap <>();
79+
80+ static {
81+ CURRENT_STOCK_PRICES .put ("TEAM" , dollars (39 , 64 ));
82+ CURRENT_STOCK_PRICES .put ("IBM" , dollars (147 , 10 ));
83+ CURRENT_STOCK_PRICES .put ("AMZN" , dollars (1002 , 94 ));
84+ CURRENT_STOCK_PRICES .put ("MSFT" , dollars (77 , 49 ));
85+ CURRENT_STOCK_PRICES .put ("GOOGL" , dollars (1007 , 87 ));
86+ }
87+
88+ private StockPriceUpdate rollUpdate () {
89+ ArrayList <String > STOCK_CODES = new ArrayList <>(CURRENT_STOCK_PRICES .keySet ());
90+
91+ String stockCode = STOCK_CODES .get (rollDice (0 , STOCK_CODES .size () - 1 ));
92+ BigDecimal currentPrice = CURRENT_STOCK_PRICES .get (stockCode );
93+
94+ BigDecimal incrementDollars = dollars (rollDice (0 , 1 ), rollDice (0 , 99 ));
95+ if (rollDice (0 , 10 ) > 7 ) {
96+ // 0.3 of the time go down
97+ incrementDollars = incrementDollars .negate ();
98+ }
99+ BigDecimal newPrice = currentPrice .add (incrementDollars );
100+
101+ CURRENT_STOCK_PRICES .put (stockCode , newPrice );
102+ return new StockPriceUpdate (stockCode , LocalDateTime .now (), newPrice , incrementDollars );
103+ }
104+
105+ private static BigDecimal dollars (int dollars , int cents ) {
106+ return truncate ("" + dollars + "." + cents );
107+ }
108+
109+ private static BigDecimal truncate (final String text ) {
110+ BigDecimal bigDecimal = new BigDecimal (text );
111+ if (bigDecimal .scale () > 2 )
112+ bigDecimal = new BigDecimal (text ).setScale (2 , RoundingMode .HALF_UP );
113+ return bigDecimal .stripTrailingZeros ();
114+ }
115+
116+ private final static Random rand = new Random ();
117+
118+ private static int rollDice (int min , int max ) {
119+ return rand .nextInt ((max - min ) + 1 ) + min ;
120+ }
121+
122+ }
0 commit comments