@@ -35,14 +35,13 @@ import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameParser
3535import io.vertx.kotlin.coroutines.await
3636import kotlinx.coroutines.runBlocking
3737import spp.cli.PlatformCLI
38- import spp.protocol.SourceMarkerServices
38+ import spp.protocol.ProtocolMarshaller.deserializeLiveInstrumentRemoved
39+ import spp.protocol.SourceServices.Provide.toLiveInstrumentSubscriberAddress
3940import spp.protocol.extend.TCPServiceFrameParser
40- import spp.protocol.instrument.LiveInstrumentEvent
41- import spp.protocol.instrument.LiveInstrumentEventType
42- import spp.protocol.instrument.breakpoint.event.LiveBreakpointHit
43- import spp.protocol.instrument.breakpoint.event.LiveBreakpointRemoved
44- import spp.protocol.instrument.log.event.LiveLogHit
45- import spp.protocol.instrument.log.event.LiveLogRemoved
41+ import spp.protocol.instrument.event.LiveBreakpointHit
42+ import spp.protocol.instrument.event.LiveInstrumentEvent
43+ import spp.protocol.instrument.event.LiveInstrumentEventType
44+ import spp.protocol.instrument.event.LiveLogHit
4645
4746class SubscribeEvents : CliktCommand (
4847 help = " Listens for and outputs live events. Subscribes to all events by default"
@@ -89,7 +88,7 @@ class SubscribeEvents : CliktCommand(
8988 ).await()
9089 socket!! .handler(FrameParser (TCPServiceFrameParser (vertx, socket)))
9190
92- vertx.eventBus().consumer<JsonObject >(" local. " + SourceMarkerServices . Provide . LIVE_INSTRUMENT_SUBSCRIBER ) {
91+ vertx.eventBus().consumer<JsonObject >(toLiveInstrumentSubscriberAddress( PlatformCLI .developer.id) ) {
9392 val liveEvent = Json .decodeValue(it.body().toString(), LiveInstrumentEvent ::class .java)
9493
9594 // todo: impl filter on platform
@@ -107,15 +106,9 @@ class SubscribeEvents : CliktCommand(
107106 return @consumer
108107 }
109108 }
110- LiveInstrumentEventType .BREAKPOINT_REMOVED -> {
111- val breakpointRemoved = Json .decodeValue(liveEvent.data, LiveBreakpointRemoved ::class .java)
112- if (breakpointRemoved.breakpointId !in instrumentIds) {
113- return @consumer
114- }
115- }
116- LiveInstrumentEventType .LOG_REMOVED -> {
117- val logRemoved = Json .decodeValue(liveEvent.data, LiveLogRemoved ::class .java)
118- if (logRemoved.logId !in instrumentIds) {
109+ LiveInstrumentEventType .BREAKPOINT_REMOVED , LiveInstrumentEventType .LOG_REMOVED -> {
110+ val logRemoved = deserializeLiveInstrumentRemoved(JsonObject (liveEvent.data))
111+ if (logRemoved.liveInstrument.id !in instrumentIds) {
119112 return @consumer
120113 }
121114 }
@@ -164,9 +157,9 @@ class SubscribeEvents : CliktCommand(
164157 // register listener
165158 FrameHelper .sendFrame(
166159 BridgeEventType .REGISTER .name.lowercase(),
167- SourceMarkerServices . Provide . LIVE_INSTRUMENT_SUBSCRIBER ,
168- JsonObject (),
169- socket
160+ toLiveInstrumentSubscriberAddress( PlatformCLI .developer.id), null ,
161+ JsonObject (). apply { PlatformCLI .developer.accessToken?. let { put( " auth-token " , it) } } ,
162+ null , null , socket
170163 )
171164 println (" Listening for events..." )
172165 }
0 commit comments