Skip to content

Commit fcdf3c3

Browse files
artursouzacicoyle
andauthored
Implement actor client metadata. (#1165)
Signed-off-by: Artur Souza <asouza.pro@gmail.com> Co-authored-by: Cassie Coyle <cassie@diagrid.io>
1 parent bd1667b commit fcdf3c3

File tree

8 files changed

+92
-20
lines changed

8 files changed

+92
-20
lines changed

sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import io.grpc.ManagedChannelBuilder;
2323
import reactor.core.publisher.Mono;
2424

25+
import java.util.Collections;
26+
import java.util.Map;
27+
2528
/**
2629
* Holds a client for Dapr sidecar communication. ActorClient should be reused.
2730
*/
@@ -59,7 +62,7 @@ public ActorClient(ResiliencyOptions resiliencyOptions) {
5962
* @param overrideProperties Override properties.
6063
*/
6164
public ActorClient(Properties overrideProperties) {
62-
this(buildManagedChannel(overrideProperties), null, overrideProperties.getValue(Properties.API_TOKEN));
65+
this(overrideProperties, null);
6366
}
6467

6568
/**
@@ -69,21 +72,38 @@ public ActorClient(Properties overrideProperties) {
6972
* @param resiliencyOptions Client resiliency options.
7073
*/
7174
public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOptions) {
72-
this(buildManagedChannel(overrideProperties), resiliencyOptions, overrideProperties.getValue(Properties.API_TOKEN));
75+
this(overrideProperties, null, resiliencyOptions);
76+
}
77+
78+
/**
79+
* Instantiates a new channel for Dapr sidecar communication.
80+
*
81+
* @param overrideProperties Override properties.
82+
* @param metadata gRPC metadata or HTTP headers for actor invocation.
83+
* @param resiliencyOptions Client resiliency options.
84+
*/
85+
public ActorClient(Properties overrideProperties, Map<String, String> metadata, ResiliencyOptions resiliencyOptions) {
86+
this(buildManagedChannel(overrideProperties),
87+
metadata,
88+
resiliencyOptions,
89+
overrideProperties.getValue(Properties.API_TOKEN));
7390
}
7491

7592
/**
7693
* Instantiates a new channel for Dapr sidecar communication.
7794
*
7895
* @param grpcManagedChannel gRPC channel.
96+
* @param metadata gRPC metadata or HTTP headers for actor invocation.
7997
* @param resiliencyOptions Client resiliency options.
98+
* @param daprApiToken Dapr API token.
8099
*/
81100
private ActorClient(
82101
ManagedChannel grpcManagedChannel,
102+
Map<String, String> metadata,
83103
ResiliencyOptions resiliencyOptions,
84104
String daprApiToken) {
85105
this.grpcManagedChannel = grpcManagedChannel;
86-
this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions, daprApiToken);
106+
this.daprClient = buildDaprClient(grpcManagedChannel, metadata, resiliencyOptions, daprApiToken);
87107
}
88108

89109
/**
@@ -137,10 +157,12 @@ private static ManagedChannel buildManagedChannel(Properties overrideProperties)
137157
*/
138158
private static DaprClient buildDaprClient(
139159
Channel grpcManagedChannel,
160+
Map<String, String> metadata,
140161
ResiliencyOptions resiliencyOptions,
141162
String daprApiToken) {
142163
return new DaprClientImpl(
143164
DaprGrpc.newStub(grpcManagedChannel),
165+
metadata == null ? null : Collections.unmodifiableMap(metadata),
144166
resiliencyOptions,
145167
daprApiToken);
146168
}

sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import reactor.core.publisher.MonoSink;
3535
import reactor.util.context.ContextView;
3636

37+
import java.util.Map;
3738
import java.util.concurrent.ExecutionException;
3839
import java.util.function.Consumer;
3940

@@ -57,19 +58,30 @@ class DaprClientImpl implements DaprClient {
5758
*/
5859
private final DaprClientGrpcInterceptors grpcInterceptors;
5960

61+
/**
62+
* Metadata for actor invocation requests.
63+
*/
64+
private final Map<String, String> metadata;
65+
6066
/**
6167
* Internal constructor.
6268
*
6369
* @param grpcClient Dapr's GRPC client.
70+
* @param metadata gRPC metadata or HTTP headers for actor server to receive.
6471
* @param resiliencyOptions Client resiliency options (optional).
6572
* @param daprApiToken Dapr API token (optional).
6673
*/
67-
DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions, String daprApiToken) {
74+
DaprClientImpl(
75+
DaprGrpc.DaprStub grpcClient,
76+
Map<String, String> metadata,
77+
ResiliencyOptions resiliencyOptions,
78+
String daprApiToken) {
6879
this.client = grpcClient;
6980
this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken,
7081
new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout()));
7182
this.retryPolicy = new RetryPolicy(
7283
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
84+
this.metadata = metadata == null ? Map.of() : metadata;
7385
}
7486

7587
/**
@@ -82,6 +94,7 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
8294
.setActorType(actorType)
8395
.setActorId(actorId)
8496
.setMethod(methodName)
97+
.putAllMetadata(this.metadata)
8598
.setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
8699
.build();
87100
return Mono.deferContextual(

sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void setup() throws IOException {
106106
InProcessChannelBuilder.forName(serverName).directExecutor().build());
107107

108108
// Create a HelloWorldClient using the in-process channel;
109-
client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null);
109+
client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null, null);
110110
}
111111

112112
@Test

sdk-tests/src/test/java/io/dapr/it/DaprRun.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,19 @@ public DaprClientBuilder newDaprClientBuilder() {
187187
}
188188

189189
public ActorClient newActorClient() {
190-
return this.newActorClient(null);
190+
return this.newActorClient(null, null);
191+
}
192+
193+
public ActorClient newActorClient(Map<String, String> metadata) {
194+
return this.newActorClient(metadata, null);
191195
}
192196

193197
public ActorClient newActorClient(ResiliencyOptions resiliencyOptions) {
194-
return new ActorClient(new Properties(this.getPropertyOverrides()), resiliencyOptions);
198+
return this.newActorClient(null, resiliencyOptions);
199+
}
200+
201+
public ActorClient newActorClient(Map<String, String> metadata, ResiliencyOptions resiliencyOptions) {
202+
return new ActorClient(new Properties(this.getPropertyOverrides()), metadata, resiliencyOptions);
195203
}
196204

197205
public void waitForAppHealth(int maxWaitMilliseconds) throws InterruptedException {

sdk-tests/src/test/java/io/dapr/it/actors/ActorExceptionIT.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@
1616
import io.dapr.actors.ActorId;
1717
import io.dapr.actors.client.ActorProxyBuilder;
1818
import io.dapr.it.BaseIT;
19+
import io.dapr.it.DaprRun;
1920
import io.dapr.it.actors.app.MyActor;
2021
import io.dapr.it.actors.app.MyActorService;
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.BeforeAll;
2124
import org.junit.jupiter.api.Test;
2225
import org.slf4j.Logger;
2326
import org.slf4j.LoggerFactory;
2427

28+
import java.util.Map;
29+
2530
import static io.dapr.it.Retry.callWithRetry;
2631
import static io.dapr.it.TestUtils.assertThrowsDaprExceptionSubstring;
2732

@@ -30,23 +35,24 @@ public class ActorExceptionIT extends BaseIT {
3035

3136
private static Logger logger = LoggerFactory.getLogger(ActorExceptionIT.class);
3237

33-
@Test
34-
public void exceptionTest() throws Exception {
38+
private static DaprRun run;
39+
40+
@BeforeAll
41+
public static void start() throws Exception {
3542
// The call below will fail if service cannot start successfully.
36-
var run = startDaprApp(
43+
run = startDaprApp(
3744
ActorExceptionIT.class.getSimpleName(),
3845
MyActorService.SUCCESS_MESSAGE,
3946
MyActorService.class,
4047
true,
4148
60000);
49+
}
4250

43-
logger.debug("Creating proxy builder");
51+
@Test
52+
public void exceptionTest() throws Exception {
4453
ActorProxyBuilder<MyActor> proxyBuilder =
4554
new ActorProxyBuilder("MyActorTest", MyActor.class, deferClose(run.newActorClient()));
46-
logger.debug("Creating actorId");
47-
ActorId actorId1 = new ActorId("1");
48-
logger.debug("Building proxy");
49-
MyActor proxy = proxyBuilder.build(actorId1);
55+
MyActor proxy = proxyBuilder.build(new ActorId("1"));
5056

5157
callWithRetry(() -> {
5258
assertThrowsDaprExceptionSubstring(
@@ -55,4 +61,20 @@ public void exceptionTest() throws Exception {
5561
() -> proxy.throwException());
5662
}, 10000);
5763
}
64+
65+
@Test
66+
public void exceptionDueToMetadataTest() throws Exception {
67+
// Setting this HTTP header via actor metadata will cause the Actor HTTP server to error.
68+
Map<String, String> metadata = Map.of("Content-Length", "9999");
69+
ActorProxyBuilder<MyActor> proxyBuilderMetadataOverride =
70+
new ActorProxyBuilder("MyActorTest", MyActor.class, deferClose(run.newActorClient(metadata)));
71+
72+
MyActor proxyWithMetadata = proxyBuilderMetadataOverride.build(new ActorId("2"));
73+
callWithRetry(() -> {
74+
assertThrowsDaprExceptionSubstring(
75+
"INTERNAL",
76+
"ContentLength=9999 with Body length 13",
77+
() -> proxyWithMetadata.say("hello world"));
78+
}, 10000);
79+
}
5880
}

sdk/src/main/java/io/dapr/internal/grpc/DaprClientGrpcInterceptors.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import io.dapr.internal.grpc.interceptors.DaprApiTokenInterceptor;
1717
import io.dapr.internal.grpc.interceptors.DaprAppIdInterceptor;
18-
import io.dapr.internal.grpc.interceptors.DaprMetadataInterceptor;
18+
import io.dapr.internal.grpc.interceptors.DaprMetadataReceiverInterceptor;
1919
import io.dapr.internal.grpc.interceptors.DaprTimeoutInterceptor;
2020
import io.dapr.internal.grpc.interceptors.DaprTracingInterceptor;
2121
import io.dapr.internal.resiliency.TimeoutPolicy;
@@ -35,10 +35,18 @@ public class DaprClientGrpcInterceptors {
3535

3636
private final TimeoutPolicy timeoutPolicy;
3737

38+
/**
39+
* Instantiates a holder of all gRPC interceptors.
40+
*/
3841
public DaprClientGrpcInterceptors() {
3942
this(null, null);
4043
}
4144

45+
/**
46+
* Instantiates a holder of all gRPC interceptors.
47+
* @param daprApiToken Dapr API token.
48+
* @param timeoutPolicy Timeout Policy.
49+
*/
4250
public DaprClientGrpcInterceptors(String daprApiToken, TimeoutPolicy timeoutPolicy) {
4351
this.daprApiToken = daprApiToken;
4452
this.timeoutPolicy = timeoutPolicy;
@@ -118,7 +126,7 @@ public <T extends AbstractStub<T>> T intercept(
118126
new DaprApiTokenInterceptor(this.daprApiToken),
119127
new DaprTimeoutInterceptor(this.timeoutPolicy),
120128
new DaprTracingInterceptor(context),
121-
new DaprMetadataInterceptor(metadataConsumer));
129+
new DaprMetadataReceiverInterceptor(metadataConsumer));
122130
}
123131

124132
}

sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprApiTokenInterceptor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package io.dapr.internal.grpc.interceptors;
1515

1616
import io.dapr.client.Headers;
17-
import io.dapr.config.Properties;
1817
import io.grpc.CallOptions;
1918
import io.grpc.Channel;
2019
import io.grpc.ClientCall;

sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprMetadataInterceptor.java renamed to sdk/src/main/java/io/dapr/internal/grpc/interceptors/DaprMetadataReceiverInterceptor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@
2727
/**
2828
* Consumes gRPC metadata.
2929
*/
30-
public class DaprMetadataInterceptor implements ClientInterceptor {
30+
public class DaprMetadataReceiverInterceptor implements ClientInterceptor {
3131

3232
private final Consumer<Metadata> metadataConsumer;
3333

3434
/**
3535
* Creates an instance of the consumer for gRPC metadata.
3636
* @param metadataConsumer gRPC metadata consumer
3737
*/
38-
public DaprMetadataInterceptor(Consumer<Metadata> metadataConsumer) {
38+
public DaprMetadataReceiverInterceptor(Consumer<Metadata> metadataConsumer) {
3939
this.metadataConsumer = metadataConsumer;
4040
}
4141

0 commit comments

Comments
 (0)