Skip to content

Commit 41535c7

Browse files
committed
Add support for older Akka HTTP versions
1 parent 653e008 commit 41535c7

File tree

6 files changed

+98
-13
lines changed

6 files changed

+98
-13
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ Traffic can be captured from at least:
2121
- [x] Reactor-Netty v0.9 & v1+
2222
- [x] Spring WebClient
2323
- [x] Ktor-Client
24-
- [x] Akka-HTTP
24+
- [x] Akka-HTTP v10.1.6+
2525

2626
This will also capture HTTP(S) from any downstream libraries based on each of these clients, and many other untested clients sharing similar implementations, and so should cover a very large percentage of HTTP client usage.
2727

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,31 @@
11
package tech.httptoolkit.javaagent.advice.akka;
22

3-
import akka.http.scaladsl.ClientTransport;
4-
import akka.http.scaladsl.ConnectionContext;
5-
import akka.http.scaladsl.settings.ClientConnectionSettings;
3+
import akka.http.javadsl.ClientTransport;
4+
import akka.http.javadsl.ConnectionContext;
5+
import akka.http.javadsl.settings.ClientConnectionSettings;
66
import net.bytebuddy.asm.Advice;
7+
import net.bytebuddy.implementation.bytecode.assign.Assigner;
78
import tech.httptoolkit.javaagent.HttpProxyAgent;
89

910
import java.net.InetSocketAddress;
11+
import java.util.Arrays;
1012

1113
public class OverrideHttpSettingsAdvice {
14+
15+
public static final boolean hasHttpsSettingsMethod =
16+
Arrays.stream(ConnectionContext.class.getDeclaredMethods())
17+
.anyMatch(method -> method.getName().equals("httpsClient"));
18+
19+
public static final ConnectionContext interceptedConnectionContext = hasHttpsSettingsMethod
20+
// For 10.2+:
21+
? ConnectionContext.httpsClient(HttpProxyAgent.getInterceptedSslContext())
22+
// For everything before then:
23+
: akka.http.javadsl.ConnectionContext.https(HttpProxyAgent.getInterceptedSslContext());
24+
1225
@Advice.OnMethodEnter
1326
public static void beforeOutgoingConnection(
14-
@Advice.Argument(value = 2, readOnly = false) ClientConnectionSettings clientSettings,
15-
@Advice.Argument(value = 3, readOnly = false) ConnectionContext connectionContext
27+
@Advice.Argument(value = 2, readOnly = false, typing = Assigner.Typing.DYNAMIC) ClientConnectionSettings clientSettings,
28+
@Advice.Argument(value = 3, readOnly = false, typing = Assigner.Typing.DYNAMIC) ConnectionContext connectionContext
1629
) {
1730
// Change all new outgoing connections to use the proxy:
1831
clientSettings = clientSettings.withTransport(
@@ -23,6 +36,6 @@ public static void beforeOutgoingConnection(
2336
);
2437

2538
// Change all new outgoing connections to trust our certificate:
26-
connectionContext = ConnectionContext.httpsClient(HttpProxyAgent.getInterceptedSslContext());
39+
connectionContext = interceptedConnectionContext;
2740
}
2841
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package tech.httptoolkit.javaagent.advice.akka;
2+
3+
import akka.http.impl.settings.HostConnectionPoolSetup;
4+
import akka.http.scaladsl.ClientTransport;
5+
import net.bytebuddy.asm.Advice;
6+
import scala.concurrent.Await;
7+
import scala.concurrent.Future;
8+
import scala.concurrent.duration.Duration;
9+
10+
import java.lang.reflect.Method;
11+
import java.util.Collections;
12+
import java.util.Set;
13+
import java.util.WeakHashMap;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.TimeoutException;
16+
17+
// This is very similar to ResetOldPoolsAdvice, but applies to older Akka setups, which use many PoolGateway instances,
18+
// one per config, rather than one PoolMaster instance. Otherwise the logic should be identical.
19+
public class ResetOldGatewaysAdvice {
20+
21+
public static Set<HostConnectionPoolSetup> resetPoolSetups = Collections.newSetFromMap(new WeakHashMap<>());
22+
23+
@Advice.OnMethodEnter
24+
public static void beforeDispatchRequest(
25+
@Advice.This Object thisPoolGateway,
26+
@Advice.FieldValue(value = "hcps") HostConnectionPoolSetup poolSetup
27+
) throws Exception {
28+
// If a pool config has been changed to use our proxy already, then we're perfect
29+
ClientTransport transport = poolSetup.setup().settings().transport();
30+
boolean alreadyIntercepted = transport == ResetPoolSetupAdvice.interceptedProxyTransport;
31+
// If not, it's still OK, as long as we've previously reset the pool to ensure the connection was
32+
// re-established (we hook connection setup too, so all new conns are intercepted, even with old config)
33+
boolean alreadyReset = resetPoolSetups.contains(poolSetup);
34+
35+
if (alreadyIntercepted || alreadyReset) return;
36+
37+
// Otherwise this is a request to use a pre-existing connection pool which probably has connections open that
38+
// aren't using our proxy. We shutdown the pool before the request. It'll be restarted automatically when
39+
// the request does go through, but this ensures we re-establish connections (so it definitely gets intercepted)
40+
Method shutdownMethod = thisPoolGateway.getClass().getDeclaredMethod("shutdown");
41+
42+
Future<?> shutdownFuture = (Future<?>) shutdownMethod.invoke(thisPoolGateway);
43+
44+
// We wait a little, just to ensure the shutdown is definitely started before this request is dispatched.
45+
try {
46+
Await.result(shutdownFuture, Duration.apply(10, TimeUnit.MILLISECONDS));
47+
} catch (TimeoutException ignored) {}
48+
49+
// Lastly, we remember this pool setup, so that we don't unnecessarily reset it again in future:
50+
resetPoolSetups.add(poolSetup);
51+
}
52+
53+
}

src/main/java/tech/httptoolkit/javaagent/advice/akka/ResetPoolSetupAdvice.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package tech.httptoolkit.javaagent.advice.akka;
22

33
import akka.http.scaladsl.ClientTransport;
4-
import akka.http.scaladsl.ConnectionContext;
54
import akka.http.scaladsl.settings.ConnectionPoolSettings;
5+
import akka.http.javadsl.ConnectionContext;
66
import net.bytebuddy.asm.Advice;
7+
import net.bytebuddy.implementation.bytecode.assign.Assigner;
78
import tech.httptoolkit.javaagent.HttpProxyAgent;
89

910
import java.net.InetSocketAddress;
@@ -21,13 +22,13 @@ public class ResetPoolSetupAdvice {
2122

2223
@Advice.OnMethodExit
2324
public static void afterConstructor(
24-
@Advice.FieldValue(value = "settings", readOnly = false) ConnectionPoolSettings settings,
25-
@Advice.FieldValue(value = "connectionContext", readOnly = false) ConnectionContext connContext
25+
@Advice.FieldValue(value = "settings", readOnly = false, typing = Assigner.Typing.DYNAMIC) ConnectionPoolSettings settings,
26+
@Advice.FieldValue(value = "connectionContext", readOnly = false, typing = Assigner.Typing.DYNAMIC) ConnectionContext connContext
2627
) {
2728
// Change all new outgoing connections to use the proxy:
2829
settings = settings.withTransport(interceptedProxyTransport);
2930

3031
// Change all new outgoing connections to trust our certificate:
31-
connContext = ConnectionContext.httpsClient(HttpProxyAgent.getInterceptedSslContext());
32+
connContext = OverrideHttpSettingsAdvice.interceptedConnectionContext;
3233
}
3334
}

src/main/kotlin/tech/httptoolkit/javaagent/AgentMain.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ fun interceptAllHttps(config: Config, instrumentation: Instrumentation) {
106106
KtorClientTlsTransformer(logger),
107107
AkkaHttpTransformer(logger),
108108
AkkaPoolSettingsTransformer(logger),
109-
AkkaPoolTransformer(logger)
109+
AkkaPoolTransformer(logger),
110+
AkkaGatewayTransformer(logger)
110111
).forEach { matchingAgentTransformer ->
111112
agentBuilder = matchingAgentTransformer.register(agentBuilder)
112113
}

src/main/kotlin/tech/httptoolkit/javaagent/AkkaClientTransformers.kt

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,21 @@ class AkkaPoolTransformer(logger: TransformationLogger) : MatchingAgentTransform
5858
.on(hasMethodName("dispatchRequest"))
5959
)
6060
}
61-
}
61+
}
62+
63+
// The above works perfectly for new Akka, but as a last step we duplicate the 3rd step for slightly older versions:
64+
class AkkaGatewayTransformer(logger: TransformationLogger) : MatchingAgentTransformer(logger) {
65+
override fun register(builder: AgentBuilder): AgentBuilder {
66+
return builder
67+
.type(named("akka.http.impl.engine.client.PoolGateway")) // Exists on <10.2.0
68+
.transform(this)
69+
}
70+
71+
override fun transform(builder: DynamicType.Builder<*>): DynamicType.Builder<*> {
72+
return builder
73+
.visit(
74+
Advice.to(ResetOldGatewaysAdvice::class.java)
75+
.on(hasMethodName("apply"))
76+
)
77+
}
78+
}

0 commit comments

Comments
 (0)