@@ -19,29 +19,24 @@ package spp.probe.control
1919
2020import io.vertx.core.AbstractVerticle
2121import io.vertx.core.eventbus.Message
22- import io.vertx.core.json.Json
2322import io.vertx.core.json.JsonObject
2423import io.vertx.ext.bridge.BridgeEventType
2524import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameHelper
2625import org.apache.skywalking.apm.agent.core.context.util.ThrowableTransformer
2726import org.apache.skywalking.apm.agent.core.plugin.WitnessFinder
2827import spp.probe.ProbeConfiguration
2928import spp.probe.SourceProbe
30- import spp.protocol.instrument.LiveInstrument
31- import spp.protocol.instrument.breakpoint.LiveBreakpoint
32- import spp.protocol.instrument.log.LiveLog
33- import spp.protocol.instrument.meter.LiveMeter
34- import spp.protocol.instrument.span.LiveSpan
35- import spp.protocol.platform.PlatformAddress
36- import spp.protocol.probe.ProbeAddress
37- import spp.protocol.probe.command.LiveInstrumentCommand
38- import spp.protocol.probe.command.LiveInstrumentCommand.CommandType
29+ import spp.protocol.ProtocolMarshaller
30+ import spp.protocol.instrument.*
31+ import spp.protocol.instrument.command.CommandType
32+ import spp.protocol.instrument.command.LiveInstrumentCommand
33+ import spp.protocol.platform.ProbeAddress
34+ import spp.protocol.platform.ProcessorAddress
3935import java.lang.instrument.Instrumentation
4036import java.lang.reflect.InvocationTargetException
4137import java.lang.reflect.Method
4238import java.util.*
4339import java.util.function.BiConsumer
44- import kotlin.reflect.KClass
4540
4641class LiveInstrumentRemote : AbstractVerticle () {
4742
@@ -99,84 +94,66 @@ class LiveInstrumentRemote : AbstractVerticle() {
9994 e.printStackTrace()
10095 throw RuntimeException (e)
10196 }
102- vertx.eventBus()
103- .localConsumer<JsonObject >(" local." + ProbeAddress .LIVE_BREAKPOINT_REMOTE .address + " :" + SourceProbe .PROBE_ID )
104- .handler { handleInstrumentationRequest(LiveBreakpoint ::class , it) }
105- vertx.eventBus()
106- .localConsumer<JsonObject >(" local." + ProbeAddress .LIVE_LOG_REMOTE .address + " :" + SourceProbe .PROBE_ID )
107- .handler { handleInstrumentationRequest(LiveLog ::class , it) }
108- vertx.eventBus()
109- .localConsumer<JsonObject >(" local." + ProbeAddress .LIVE_METER_REMOTE .address + " :" + SourceProbe .PROBE_ID )
110- .handler { handleInstrumentationRequest(LiveMeter ::class , it) }
111- vertx.eventBus()
112- .localConsumer<JsonObject >(" local." + ProbeAddress .LIVE_SPAN_REMOTE .address + " :" + SourceProbe .PROBE_ID )
113- .handler { handleInstrumentationRequest(LiveSpan ::class , it) }
97+
98+ vertx.eventBus() // global instrument remote
99+ .localConsumer<JsonObject >(ProbeAddress .LIVE_INSTRUMENT_REMOTE )
100+ .handler { handleInstrumentationRequest(it) }
101+ vertx.eventBus() // probe specific instrument remote
102+ .localConsumer<JsonObject >(ProbeAddress .LIVE_INSTRUMENT_REMOTE + " :" + SourceProbe .PROBE_ID )
103+ .handler { handleInstrumentationRequest(it) }
114104 }
115105
116- private fun handleInstrumentationRequest (clazz : KClass < out LiveInstrument >, it : Message <JsonObject >) {
106+ private fun handleInstrumentationRequest (it : Message <JsonObject >) {
117107 try {
118- val command = Json .decodeValue (it.body().toString(), LiveInstrumentCommand :: class .java )
108+ val command = ProtocolMarshaller .deserializeLiveInstrumentCommand (it.body())
119109 when (command.commandType) {
120- CommandType .ADD_LIVE_INSTRUMENT -> addInstrument(clazz, command)
110+ CommandType .ADD_LIVE_INSTRUMENT -> addInstrument(command)
121111 CommandType .REMOVE_LIVE_INSTRUMENT -> removeInstrument(command)
122112 }
123113 } catch (ex: InvocationTargetException ) {
124114 if (ex.cause != null ) {
125- publishCommandError(it, ex.cause!! , clazz )
115+ publishCommandError(it, ex.cause!! )
126116 } else {
127- publishCommandError(it, ex.targetException, clazz )
117+ publishCommandError(it, ex.targetException)
128118 }
129119 } catch (ex: Throwable ) {
130- publishCommandError(it, ex, clazz )
120+ publishCommandError(it, ex)
131121 }
132122 }
133123
134- private fun publishCommandError (it : Message <JsonObject >, ex : Throwable , clazz : KClass < out LiveInstrument > ) {
124+ private fun publishCommandError (it : Message <JsonObject >, ex : Throwable ) {
135125 val map: MutableMap <String , Any > = HashMap ()
136126 map[" command" ] = it.body().toString()
137127 map[" occurredAt" ] = System .currentTimeMillis()
138128 map[" cause" ] = ThrowableTransformer .INSTANCE .convert2String(ex, 4000 )
139129
140- val address = when (clazz) {
141- LiveBreakpoint ::class -> PlatformAddress .LIVE_BREAKPOINT_REMOVED .address
142- LiveLog ::class -> PlatformAddress .LIVE_LOG_REMOVED .address
143- LiveMeter ::class -> PlatformAddress .LIVE_METER_REMOVED .address
144- LiveSpan ::class -> PlatformAddress .LIVE_SPAN_REMOVED .address
145- else -> throw IllegalArgumentException (" Unknown instrument: $clazz " )
146- }
147130 FrameHelper .sendFrame(
148- BridgeEventType .PUBLISH .name.lowercase(), address, JsonObject .mapFrom(map), SourceProbe .tcpSocket
131+ BridgeEventType .PUBLISH .name.lowercase(), ProcessorAddress .LIVE_INSTRUMENT_REMOVED ,
132+ JsonObject .mapFrom(map), SourceProbe .tcpSocket
149133 )
150134 }
151135
152- private fun addInstrument (clazz : KClass < out LiveInstrument >, command : LiveInstrumentCommand ) {
136+ private fun addInstrument (command : LiveInstrumentCommand ) {
153137 if (ProbeConfiguration .isNotQuite) println (" Adding instrument: $command " )
154- val instrumentData = command.context.liveInstruments[0 ]
155- applyInstrument!! .invoke(null , Json .decodeValue(instrumentData, clazz.java))
138+ applyInstrument!! .invoke(null , command.instruments.first()) // todo: check for multiple
156139 }
157140
158141 private fun removeInstrument (command : LiveInstrumentCommand ) {
159- for (breakpointData in command.context.liveInstruments) {
160- val breakpointObject = JsonObject (breakpointData)
161- val breakpointId = breakpointObject.getString(" id" )
162- val location = breakpointObject.getJsonObject(" location" )
163- val source = location.getString(" source" )
164- val line = location.getInteger(" line" )
165- removeInstrument!! .invoke(null , source, line, breakpointId)
142+ for (breakpoint in command.instruments) {
143+ val breakpointId = breakpoint.id
144+ val location = breakpoint.location
145+ removeInstrument!! .invoke(null , location.source, location.line, breakpointId)
166146 }
167- for (locationData in command.context.locations) {
168- val location = JsonObject (locationData)
169- val source = location.getString(" source" )
170- val line = location.getInteger(" line" )
171- removeInstrument!! .invoke(null , source, line, null )
147+ for (location in command.locations) {
148+ removeInstrument!! .invoke(null , location.source, location.line, null )
172149 }
173150 }
174151
175152 companion object {
176153 private val EVENT_CONSUMER = BiConsumer (fun (address : String? , json : String? ) {
177154 if (ProbeConfiguration .isNotQuite) println (" Publishing event: $address , $json " )
178155 FrameHelper .sendFrame(
179- BridgeEventType .PUBLISH .name.lowercase(Locale .getDefault() ),
156+ BridgeEventType .PUBLISH .name.lowercase(),
180157 address,
181158 JsonObject (json),
182159 SourceProbe .tcpSocket
0 commit comments