Skip to content

Commit 8326dc5

Browse files
committed
Add support for Akka-HTTP
1 parent ec6a2d1 commit 8326dc5

File tree

10 files changed

+204
-2
lines changed

10 files changed

+204
-2
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ Traffic can be captured from at least:
2121
- [x] Reactor-Netty v0.9 & v1+
2222
- [x] Spring WebClient
2323
- [x] Ktor-Client
24+
- [x] Akka-HTTP
2425

2526
This will also capture HTTP(S) from any downstream libraries based on each of these clients, and many other untested clients sharing similar implementations, and so should cover a very large percentage of HTTP client usage.
2627

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ dependencies {
4040
compileOnly group: 'io.projectreactor.netty', name: 'reactor-netty', version: '1.0.4'
4141
compileOnly group: 'io.ktor', name: 'ktor-client-core', version: '1.5.2'
4242
compileOnly group: 'io.ktor', name: 'ktor-client-cio', version: '1.5.2'
43+
compileOnly group: 'com.typesafe.akka', name: 'akka-http-core_2.13', version: '10.2.4'
44+
compileOnly group: 'com.typesafe.akka', name: 'akka-actor_2.13', version: '2.6.13'
4345

4446
// Test deps:
4547
testImplementation group: 'io.kotest', name: 'kotest-runner-junit5-jvm', version: '4.4.0'
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package tech.httptoolkit.javaagent.advice.akka;
2+
3+
import akka.http.scaladsl.ClientTransport;
4+
import akka.http.scaladsl.ConnectionContext;
5+
import akka.http.scaladsl.settings.ClientConnectionSettings;
6+
import net.bytebuddy.asm.Advice;
7+
import tech.httptoolkit.javaagent.HttpProxyAgent;
8+
9+
import java.net.InetSocketAddress;
10+
11+
public class OverrideHttpSettingsAdvice {
12+
@Advice.OnMethodEnter
13+
public static void beforeOutgoingConnection(
14+
@Advice.Argument(value = 2, readOnly = false) ClientConnectionSettings clientSettings,
15+
@Advice.Argument(value = 3, readOnly = false) ConnectionContext connectionContext
16+
) {
17+
// Change all new outgoing connections to use the proxy:
18+
clientSettings = clientSettings.withTransport(
19+
ClientTransport.httpsProxy(new InetSocketAddress(
20+
HttpProxyAgent.getAgentProxyHost(),
21+
HttpProxyAgent.getAgentProxyPort()
22+
))
23+
);
24+
25+
// Change all new outgoing connections to trust our certificate:
26+
connectionContext = ConnectionContext.httpsClient(HttpProxyAgent.getInterceptedSslContext());
27+
}
28+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package tech.httptoolkit.javaagent.advice.akka;
2+
3+
import net.bytebuddy.asm.Advice;
4+
5+
import java.lang.reflect.Method;
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
8+
public class ResetAllConnectionPoolsAdvice {
9+
10+
public static AtomicBoolean connectionPoolsReset = new AtomicBoolean(false);
11+
12+
@Advice.OnMethodEnter
13+
public static void resetConnectionsBeforeDispatch(
14+
@Advice.This Object thisPoolMaster
15+
) throws Exception {
16+
boolean resetNeeded = ResetAllConnectionPoolsAdvice
17+
.connectionPoolsReset
18+
.compareAndSet(false, true);
19+
20+
if (!resetNeeded) return;
21+
22+
// Just once, when we first try to dispatch a request, reset all existing connection pools.
23+
Method shutdownMethod = thisPoolMaster.getClass().getDeclaredMethod("shutdownAll");
24+
shutdownMethod.invoke(thisPoolMaster);
25+
}
26+
}

src/main/kotlin/tech/httptoolkit/javaagent/AgentMain.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ fun interceptAllHttps(config: Config, instrumentation: Instrumentation) {
103103
ReactorNettyHttpClientSecureTransformer(logger),
104104
KtorClientEngineConfigTransformer(logger),
105105
KtorCioEngineTransformer(logger),
106-
KtorClientTlsTransformer(logger)
106+
KtorClientTlsTransformer(logger),
107+
AkkaHttpTransformer(logger),
108+
AkkaPoolTransformer(logger)
107109
).forEach { matchingAgentTransformer ->
108110
agentBuilder = matchingAgentTransformer.register(agentBuilder)
109111
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package tech.httptoolkit.javaagent
2+
3+
import net.bytebuddy.agent.builder.AgentBuilder
4+
import net.bytebuddy.asm.Advice
5+
import net.bytebuddy.dynamic.DynamicType
6+
import net.bytebuddy.matcher.ElementMatchers.hasMethodName
7+
import net.bytebuddy.matcher.ElementMatchers.named
8+
import tech.httptoolkit.javaagent.advice.akka.OverrideHttpSettingsAdvice
9+
import tech.httptoolkit.javaagent.advice.akka.ResetAllConnectionPoolsAdvice
10+
11+
// First, we hook outgoing connection creation, and ensure that new connections always go via the proxy & trust us:
12+
class AkkaHttpTransformer(logger: TransformationLogger) : MatchingAgentTransformer(logger) {
13+
override fun register(builder: AgentBuilder): AgentBuilder {
14+
return builder
15+
.type(named("akka.http.scaladsl.HttpExt")) // Scala compiles Http()s methods here for some reason
16+
.transform(this)
17+
}
18+
19+
override fun transform(builder: DynamicType.Builder<*>): DynamicType.Builder<*> {
20+
return builder
21+
.visit(
22+
Advice.to(OverrideHttpSettingsAdvice::class.java)
23+
.on(hasMethodName("_outgoingConnection"))
24+
)
25+
}
26+
}
27+
28+
// Then, to ensure that any existing connections trust us too, we do a one-off connection pool reset,
29+
// triggered by the new PoolMaster.dispatchRequest (seems to happen for every request). This seems to
30+
// affects shared pools and individual pools too.
31+
class AkkaPoolTransformer(logger: TransformationLogger) : MatchingAgentTransformer(logger) {
32+
override fun register(builder: AgentBuilder): AgentBuilder {
33+
return builder
34+
.type(named("akka.http.impl.engine.client.PoolMaster"))
35+
.transform(this)
36+
}
37+
38+
override fun transform(builder: DynamicType.Builder<*>): DynamicType.Builder<*> {
39+
return builder
40+
.visit(
41+
Advice.to(ResetAllConnectionPoolsAdvice::class.java)
42+
.on(hasMethodName("dispatchRequest"))
43+
)
44+
}
45+
}

test-app/build.gradle

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ dependencies {
2828

2929
implementation group: 'io.ktor', name: 'ktor-client-core', version: '1.5.2'
3030
implementation group: 'io.ktor', name: 'ktor-client-cio', version: '1.5.2'
31+
32+
implementation group: 'com.typesafe.akka', name: 'akka-actor_2.13', version: '2.6.13'
33+
implementation group: 'com.typesafe.akka', name: 'akka-http_2.13', version: '10.2.4'
34+
implementation group: 'com.typesafe.akka', name: 'akka-stream_2.13', version: '2.6.13'
3135
}
3236

3337
test {
@@ -43,4 +47,13 @@ jar {
4347
compileJava {
4448
sourceCompatibility = '11'
4549
targetCompatibility = '11'
50+
}
51+
52+
import com.github.jengelman.gradle.plugins.shadow.transformers.AppendingTransformer
53+
54+
shadowJar {
55+
transform(AppendingTransformer) {
56+
resource = 'reference.conf' // Required for akka
57+
}
58+
with jar
4659
}

test-app/src/main/java/tech/httptoolkit/testapp/Main.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ public class Main {
2828
entry("jetty-client", new JettyClientCase()),
2929
entry("async-http-client", new AsyncHttpClientCase()),
3030
entry("spring-web", new SpringWebClientCase()),
31-
entry("ktor-cio", new KtorCioCase())
31+
entry("ktor-cio", new KtorCioCase()),
32+
entry("akka-req-http", new AkkaRequestClientCase()),
33+
entry("akka-host-http", new AkkaHostClientCase())
3234
);
3335

3436
public static void main(String[] args) throws Exception {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package tech.httptoolkit.testapp.cases;
2+
3+
import akka.NotUsed;
4+
import akka.actor.ActorSystem;
5+
import akka.actor.ExtendedActorSystem;
6+
import akka.http.javadsl.HostConnectionPool;
7+
import akka.http.javadsl.Http;
8+
import akka.http.javadsl.model.HttpRequest;
9+
import akka.http.javadsl.model.HttpResponse;
10+
import akka.japi.Pair;
11+
import akka.stream.javadsl.Flow;
12+
import akka.stream.javadsl.Sink;
13+
import akka.stream.javadsl.Source;
14+
import scala.util.Try;
15+
16+
import java.net.URI;
17+
import java.net.URISyntaxException;
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.ExecutionException;
20+
21+
// Here we use a host-based persistent Akka connection pool, just to make sure that that case is covered too, in
22+
// addition to the request client case.
23+
public class AkkaHostClientCase extends ClientCase<
24+
Flow<Pair<HttpRequest, NotUsed>, Pair<Try<HttpResponse>, NotUsed>, HostConnectionPool>
25+
> {
26+
27+
private static final ActorSystem system = ExtendedActorSystem.create();
28+
29+
@Override
30+
public Flow<Pair<HttpRequest, NotUsed>, Pair<Try<HttpResponse>, NotUsed>, HostConnectionPool> newClient(String url) throws Exception {
31+
URI uri = new URI(url);
32+
return new Http((ExtendedActorSystem) system)
33+
.cachedHostConnectionPool(uri.getHost());
34+
}
35+
36+
@Override
37+
public int test(
38+
String url,
39+
Flow<Pair<HttpRequest, NotUsed>, Pair<Try<HttpResponse>, NotUsed>, HostConnectionPool> clientFlow
40+
) throws URISyntaxException, ExecutionException, InterruptedException {
41+
Source<Pair<HttpRequest, NotUsed>, NotUsed> requestSource = Source.single(
42+
new Pair<>(HttpRequest.create(url), null)
43+
);
44+
45+
CompletableFuture<Pair<Try<HttpResponse>, NotUsed>> responseFuture = requestSource
46+
.via(clientFlow)
47+
.runWith(Sink.head(), system)
48+
.toCompletableFuture();
49+
50+
HttpResponse response = responseFuture.get().first().get();
51+
response.discardEntityBytes(system);
52+
return response.status().intValue();
53+
}
54+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package tech.httptoolkit.testapp.cases;
2+
3+
import akka.actor.ActorSystem;
4+
import akka.http.javadsl.Http;
5+
import akka.http.javadsl.model.HttpRequest;
6+
import akka.http.javadsl.model.HttpResponse;
7+
8+
import java.net.URISyntaxException;
9+
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.ExecutionException;
11+
12+
public class AkkaRequestClientCase extends ClientCase<Http> {
13+
14+
private static final ActorSystem system = ActorSystem.create();
15+
16+
@Override
17+
public Http newClient(String url) throws Exception {
18+
return Http.get(system);
19+
}
20+
21+
@Override
22+
public int test(String url, Http client) throws URISyntaxException, ExecutionException, InterruptedException {
23+
CompletableFuture<HttpResponse> responseFuture = client
24+
.singleRequest(HttpRequest.create(url))
25+
.toCompletableFuture();
26+
HttpResponse response = responseFuture.get();
27+
return response.status().intValue();
28+
}
29+
}

0 commit comments

Comments
 (0)