Skip to content

Commit 8423c5d

Browse files
authored
Merge branch 'master' into actor_ttl
2 parents 8dbf275 + fcdf3c3 commit 8423c5d

File tree

14 files changed

+158
-30
lines changed

14 files changed

+158
-30
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<properties>
1616
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1717
<grpc.version>1.64.0</grpc.version>
18-
<protobuf.version>3.25.0</protobuf.version>
18+
<protobuf.version>3.25.5</protobuf.version>
1919
<protocCommand>protoc</protocCommand>
2020
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.14.4/dapr/proto</dapr.proto.baseurl>
2121
<dapr.sdk.version>1.13.0-SNAPSHOT</dapr.sdk.version>

sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import io.grpc.ManagedChannelBuilder;
2323
import reactor.core.publisher.Mono;
2424

25+
import java.util.Collections;
26+
import java.util.Map;
27+
2528
/**
2629
* Holds a client for Dapr sidecar communication. ActorClient should be reused.
2730
*/
@@ -59,7 +62,7 @@ public ActorClient(ResiliencyOptions resiliencyOptions) {
5962
* @param overrideProperties Override properties.
6063
*/
6164
public ActorClient(Properties overrideProperties) {
62-
this(buildManagedChannel(overrideProperties), null, overrideProperties.getValue(Properties.API_TOKEN));
65+
this(overrideProperties, null);
6366
}
6467

6568
/**
@@ -69,21 +72,38 @@ public ActorClient(Properties overrideProperties) {
6972
* @param resiliencyOptions Client resiliency options.
7073
*/
7174
public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOptions) {
72-
this(buildManagedChannel(overrideProperties), resiliencyOptions, overrideProperties.getValue(Properties.API_TOKEN));
75+
this(overrideProperties, null, resiliencyOptions);
76+
}
77+
78+
/**
79+
* Instantiates a new channel for Dapr sidecar communication.
80+
*
81+
* @param overrideProperties Override properties.
82+
* @param metadata gRPC metadata or HTTP headers for actor invocation.
83+
* @param resiliencyOptions Client resiliency options.
84+
*/
85+
public ActorClient(Properties overrideProperties, Map<String, String> metadata, ResiliencyOptions resiliencyOptions) {
86+
this(buildManagedChannel(overrideProperties),
87+
metadata,
88+
resiliencyOptions,
89+
overrideProperties.getValue(Properties.API_TOKEN));
7390
}
7491

7592
/**
7693
* Instantiates a new channel for Dapr sidecar communication.
7794
*
7895
* @param grpcManagedChannel gRPC channel.
96+
* @param metadata gRPC metadata or HTTP headers for actor invocation.
7997
* @param resiliencyOptions Client resiliency options.
98+
* @param daprApiToken Dapr API token.
8099
*/
81100
private ActorClient(
82101
ManagedChannel grpcManagedChannel,
102+
Map<String, String> metadata,
83103
ResiliencyOptions resiliencyOptions,
84104
String daprApiToken) {
85105
this.grpcManagedChannel = grpcManagedChannel;
86-
this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions, daprApiToken);
106+
this.daprClient = buildDaprClient(grpcManagedChannel, metadata, resiliencyOptions, daprApiToken);
87107
}
88108

89109
/**
@@ -137,10 +157,12 @@ private static ManagedChannel buildManagedChannel(Properties overrideProperties)
137157
*/
138158
private static DaprClient buildDaprClient(
139159
Channel grpcManagedChannel,
160+
Map<String, String> metadata,
140161
ResiliencyOptions resiliencyOptions,
141162
String daprApiToken) {
142163
return new DaprClientImpl(
143164
DaprGrpc.newStub(grpcManagedChannel),
165+
metadata == null ? null : Collections.unmodifiableMap(metadata),
144166
resiliencyOptions,
145167
daprApiToken);
146168
}

sdk-actors/src/main/java/io/dapr/actors/client/DaprClientImpl.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import reactor.core.publisher.MonoSink;
3535
import reactor.util.context.ContextView;
3636

37+
import java.util.Map;
3738
import java.util.concurrent.ExecutionException;
3839
import java.util.function.Consumer;
3940

@@ -57,19 +58,30 @@ class DaprClientImpl implements DaprClient {
5758
*/
5859
private final DaprClientGrpcInterceptors grpcInterceptors;
5960

61+
/**
62+
* Metadata for actor invocation requests.
63+
*/
64+
private final Map<String, String> metadata;
65+
6066
/**
6167
* Internal constructor.
6268
*
6369
* @param grpcClient Dapr's GRPC client.
70+
* @param metadata gRPC metadata or HTTP headers for actor server to receive.
6471
* @param resiliencyOptions Client resiliency options (optional).
6572
* @param daprApiToken Dapr API token (optional).
6673
*/
67-
DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions, String daprApiToken) {
74+
DaprClientImpl(
75+
DaprGrpc.DaprStub grpcClient,
76+
Map<String, String> metadata,
77+
ResiliencyOptions resiliencyOptions,
78+
String daprApiToken) {
6879
this.client = grpcClient;
6980
this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken,
7081
new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout()));
7182
this.retryPolicy = new RetryPolicy(
7283
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
84+
this.metadata = metadata == null ? Map.of() : metadata;
7385
}
7486

7587
/**
@@ -82,6 +94,7 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
8294
.setActorType(actorType)
8395
.setActorId(actorId)
8496
.setMethod(methodName)
97+
.putAllMetadata(this.metadata)
8598
.setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
8699
.build();
87100
return Mono.deferContextual(

sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void setup() throws IOException {
106106
InProcessChannelBuilder.forName(serverName).directExecutor().build());
107107

108108
// Create a HelloWorldClient using the in-process channel;
109-
client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null);
109+
client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null, null);
110110
}
111111

112112
@Test

sdk-tests/src/test/java/io/dapr/it/DaprRun.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,19 @@ public DaprClientBuilder newDaprClientBuilder() {
187187
}
188188

189189
public ActorClient newActorClient() {
190-
return this.newActorClient(null);
190+
return this.newActorClient(null, null);
191+
}
192+
193+
public ActorClient newActorClient(Map<String, String> metadata) {
194+
return this.newActorClient(metadata, null);
191195
}
192196

193197
public ActorClient newActorClient(ResiliencyOptions resiliencyOptions) {
194-
return new ActorClient(new Properties(this.getPropertyOverrides()), resiliencyOptions);
198+
return this.newActorClient(null, resiliencyOptions);
199+
}
200+
201+
public ActorClient newActorClient(Map<String, String> metadata, ResiliencyOptions resiliencyOptions) {
202+
return new ActorClient(new Properties(this.getPropertyOverrides()), metadata, resiliencyOptions);
195203
}
196204

197205
public void waitForAppHealth(int maxWaitMilliseconds) throws InterruptedException {

sdk-tests/src/test/java/io/dapr/it/actors/ActorExceptionIT.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@
1616
import io.dapr.actors.ActorId;
1717
import io.dapr.actors.client.ActorProxyBuilder;
1818
import io.dapr.it.BaseIT;
19+
import io.dapr.it.DaprRun;
1920
import io.dapr.it.actors.app.MyActor;
2021
import io.dapr.it.actors.app.MyActorService;
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.BeforeAll;
2124
import org.junit.jupiter.api.Test;
2225
import org.slf4j.Logger;
2326
import org.slf4j.LoggerFactory;
2427

28+
import java.util.Map;
29+
2530
import static io.dapr.it.Retry.callWithRetry;
2631
import static io.dapr.it.TestUtils.assertThrowsDaprExceptionSubstring;
2732

@@ -30,23 +35,24 @@ public class ActorExceptionIT extends BaseIT {
3035

3136
private static Logger logger = LoggerFactory.getLogger(ActorExceptionIT.class);
3237

33-
@Test
34-
public void exceptionTest() throws Exception {
38+
private static DaprRun run;
39+
40+
@BeforeAll
41+
public static void start() throws Exception {
3542
// The call below will fail if service cannot start successfully.
36-
var run = startDaprApp(
43+
run = startDaprApp(
3744
ActorExceptionIT.class.getSimpleName(),
3845
MyActorService.SUCCESS_MESSAGE,
3946
MyActorService.class,
4047
true,
4148
60000);
49+
}
4250

43-
logger.debug("Creating proxy builder");
51+
@Test
52+
public void exceptionTest() throws Exception {
4453
ActorProxyBuilder<MyActor> proxyBuilder =
4554
new ActorProxyBuilder("MyActorTest", MyActor.class, deferClose(run.newActorClient()));
46-
logger.debug("Creating actorId");
47-
ActorId actorId1 = new ActorId("1");
48-
logger.debug("Building proxy");
49-
MyActor proxy = proxyBuilder.build(actorId1);
55+
MyActor proxy = proxyBuilder.build(new ActorId("1"));
5056

5157
callWithRetry(() -> {
5258
assertThrowsDaprExceptionSubstring(
@@ -55,4 +61,20 @@ public void exceptionTest() throws Exception {
5561
() -> proxy.throwException());
5662
}, 10000);
5763
}
64+
65+
@Test
66+
public void exceptionDueToMetadataTest() throws Exception {
67+
// Setting this HTTP header via actor metadata will cause the Actor HTTP server to error.
68+
Map<String, String> metadata = Map.of("Content-Length", "9999");
69+
ActorProxyBuilder<MyActor> proxyBuilderMetadataOverride =
70+
new ActorProxyBuilder("MyActorTest", MyActor.class, deferClose(run.newActorClient(metadata)));
71+
72+
MyActor proxyWithMetadata = proxyBuilderMetadataOverride.build(new ActorId("2"));
73+
callWithRetry(() -> {
74+
assertThrowsDaprExceptionSubstring(
75+
"INTERNAL",
76+
"ContentLength=9999 with Body length 13",
77+
() -> proxyWithMetadata.say("hello world"));
78+
}, 10000);
79+
}
5880
}

sdk/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
<dependency>
5353
<groupId>com.squareup.okhttp3</groupId>
5454
<artifactId>okhttp</artifactId>
55-
<version>4.9.0</version>
55+
<version>4.9.2</version>
5656
</dependency>
5757
<dependency>
5858
<groupId>org.mockito</groupId>

sdk/src/main/java/io/dapr/client/AbstractDaprClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,14 @@ public <T> Mono<T> invokeBinding(
270270
return this.invokeBinding(bindingName, operation, data, metadata, TypeRef.get(clazz));
271271
}
272272

273+
/**
274+
* {@inheritDoc}
275+
*/
276+
@Override
277+
public Mono<Void> invokeBinding(InvokeBindingRequest request) {
278+
return this.invokeBinding(request, TypeRef.VOID);
279+
}
280+
273281
/**
274282
* {@inheritDoc}
275283
*/

sdk/src/main/java/io/dapr/client/DaprClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,14 @@ <T> Mono<T> invokeBinding(String bindingName, String operation, Object data, Map
314314
<T> Mono<T> invokeBinding(String bindingName, String operation, Object data, Map<String, String> metadata,
315315
Class<T> clazz);
316316

317+
/**
318+
* Invokes a Binding operation.
319+
*
320+
* @param request The binding invocation request.
321+
* @return a Mono with void.
322+
*/
323+
Mono<Void> invokeBinding(InvokeBindingRequest request);
324+
317325
/**
318326
* Invokes a Binding operation.
319327
*

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,10 @@ private <T> Subscription<T> buildSubscription(
459459

460460
try {
461461
CloudEvent<T> cloudEvent = new CloudEvent<>();
462-
var object =
463-
DaprClientImpl.this.objectSerializer.deserialize(message.getData().toByteArray(), type);
462+
T object = null;
463+
if (type != null) {
464+
object = DaprClientImpl.this.objectSerializer.deserialize(message.getData().toByteArray(), type);
465+
}
464466
cloudEvent.setData(object);
465467
cloudEvent.setDatacontenttype(message.getDataContentType());
466468
cloudEvent.setId(message.getId());
@@ -528,6 +530,10 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
528530

529531
private <T> Mono<T> getMonoForHttpResponse(TypeRef<T> type, DaprHttp.Response r) {
530532
try {
533+
if (type == null) {
534+
return Mono.empty();
535+
}
536+
531537
T object = objectSerializer.deserialize(r.getBody(), type);
532538
if (object == null) {
533539
return Mono.empty();
@@ -585,6 +591,9 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
585591
}
586592

587593
try {
594+
if (type == null) {
595+
return Mono.empty();
596+
}
588597
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
589598
} catch (IOException e) {
590599
throw DaprException.propagate(e);
@@ -706,13 +715,18 @@ private <T> State<T> buildStateKeyValue(
706715
return new State<>(key, error);
707716
}
708717

709-
ByteString payload = item.getData();
710-
byte[] data = payload == null ? null : payload.toByteArray();
711-
T value = stateSerializer.deserialize(data, type);
712718
String etag = item.getEtag();
713719
if (etag.equals("")) {
714720
etag = null;
715721
}
722+
723+
T value = null;
724+
if (type != null) {
725+
ByteString payload = item.getData();
726+
byte[] data = payload == null ? null : payload.toByteArray();
727+
value = stateSerializer.deserialize(data, type);
728+
}
729+
716730
return new State<>(key, value, etag, item.getMetadataMap(), null);
717731
}
718732

@@ -723,7 +737,11 @@ private <T> State<T> buildStateKeyValue(
723737
TypeRef<T> type) throws IOException {
724738
ByteString payload = response.getData();
725739
byte[] data = payload == null ? null : payload.toByteArray();
726-
T value = stateSerializer.deserialize(data, type);
740+
T value = null;
741+
if (type != null) {
742+
value = stateSerializer.deserialize(data, type);
743+
}
744+
727745
String etag = response.getEtag();
728746
if (etag.equals("")) {
729747
etag = null;
@@ -1108,7 +1126,11 @@ private <T> QueryStateItem<T> buildQueryStateKeyValue(
11081126
}
11091127
ByteString payload = item.getData();
11101128
byte[] data = payload == null ? null : payload.toByteArray();
1111-
T value = stateSerializer.deserialize(data, type);
1129+
T value = null;
1130+
if (type != null) {
1131+
value = stateSerializer.deserialize(data, type);
1132+
}
1133+
11121134
String etag = item.getEtag();
11131135
if (etag.equals("")) {
11141136
etag = null;

0 commit comments

Comments
 (0)