Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@

import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Interface responsible for generating request IDs.
*
* <p>Note that all request IDs have a parent/child relationship. A "parent ID" can loosely be
* thought of as encompassing a sequence of a request + any attendant retries, speculative
* <p>Note that all request IDs have a parent/child relationship. A "session request ID" can loosely
* be thought of as encompassing a sequence of a request + any attendant retries, speculative
* executions etc. It's scope is identical to that of a {@link
* com.datastax.oss.driver.internal.core.cql.CqlRequestHandler}. A "request ID" represents a single
* request within this larger scope. Note that a request corresponding to a request ID may be
* com.datastax.oss.driver.internal.core.cql.CqlRequestHandler}. A "node request ID" represents a
* single request within this larger scope. Note that a request corresponding to a request ID may be
* retried; in that case the retry count will be appended to the corresponding identifier in the
* logs.
*/
Expand Down Expand Up @@ -67,11 +68,17 @@ default String getCustomPayloadKey() {

default Statement<?> getDecoratedStatement(
@NonNull Statement<?> statement, @NonNull String requestId) {
Map<String, ByteBuffer> customPayload =
NullAllowingImmutableMap.<String, ByteBuffer>builder()
.putAll(statement.getCustomPayload())
.put(getCustomPayloadKey(), ByteBuffer.wrap(requestId.getBytes(StandardCharsets.UTF_8)))
.build();
return statement.setCustomPayload(customPayload);

Map<String, ByteBuffer> existing = new HashMap<>(statement.getCustomPayload());
String key = getCustomPayloadKey();

// Add or overwrite
existing.put(key, ByteBuffer.wrap(requestId.getBytes(StandardCharsets.UTF_8)));

// Allowing null key/values
// Wrap a map inside to be immutable without instanciating a new map
Map<String, ByteBuffer> unmodifiableMap = Collections.unmodifiableMap(existing);

return statement.setCustomPayload(unmodifiableMap);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know that I want to hold things up any but I'd argue ImmutableMap is more concise (and avoids the need to create at least one of the intermediate maps here):

  default Statement<?> getDecoratedStatement(
      @NonNull Statement<?> statement, @NonNull String requestId) {

      return statement.setCustomPayload(
              ImmutableMap.<String,ByteBuffer>builder()
              .putAll(statement.getCustomPayload())
              .put(getCustomPayloadKey(), ByteBuffer.wrap(requestId.getBytes(StandardCharsets.UTF_8)))
              .buildKeepingLast());
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original map implementation that was used that caused this issue NullAllowingImmutableMap was presumably being used to allow null keys/values, which ImmutableMap doesn't allow, so if we use that it's possible any existing custom payload map with null keys and values will cause an exception to be thrown here (discussion here: #2060 (comment))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna make some notes here 'cause in all honesty this was pretty perplexing to me... but I think I finally got to something resembling an answer. I don't know how useful all of this will be but brain dump follows.

The doc referenced by @tolbertam above is from the 3.6 Java driver, which was an entirely different code base. The constant Statement.NULL_PAYLOAD_VALUE is indeed present in that version... so far so good. Thing is... there's no corresponding entry in the 4.x manual and the equivalent 4.x class doesn't have such a constant. So... what's going on here?

Let's take a look at what the v4 native protocol spec actually says here. A bytes map is understood to have a string for a key and bytes for a map. A string is understood to be "A [short] n, followed by n bytes representing an UTF-8 string". There's no obvious way for that to be null, which is consistent with the verbiage in the 3.6 manual page. bytes on the other hand is defined as follows: "A [int] n, followed by n bytes if n >= 0. If n < 0, no byte should follow and the value represented is null" That's presumably the point of the marker referenced in the 3.6 manual page.

To confirm all of this I ran a simple test client against C* 5.0.6 using Java driver 4.19.1. In each case a custom payload was set on a simple statement and executed. Results seem to follow the pattern described above: when using a null key I get a weird native-protocol exception (an issue has been created for that elsewhere) while a null value seems to work just fine.

So where does this leave us? In order to leverage ImmutableMap you need to translate null values into zero-length ByteBuffers... but do so without acquiring intermediate state. You can do it with something like the following:

public class KeyspaceCount {

    static class ImmutableEntry<K,V> implements Map.Entry<K,V> {

        private final K key;
        private final V val;

        public ImmutableEntry(K key, V val) {
            this.key = key;
            this.val = val;
        }

        @Override
        public K getKey() {
            return key;
        }

        @Override
        public V getValue() {
            return val;
        }

        @Override
        public V setValue(Object value) {
            throw new UnsupportedOperationException("You can't do that");
        }
    }

    public static void main(String[] args) {

        try (CqlSession session = CqlSession.builder().build()) {

            SimpleStatement stmt = SimpleStatement.builder("select release_version from system.local").build();
            HashMap<String, ByteBuffer> map = new HashMap<>();
            map.put("key", ByteBuffer.allocate(0));

            ImmutableMap.Builder<String,ByteBuffer> builder = ImmutableMap.builder();
            map.entrySet()
                    .stream()
                    .map((Map.Entry<String,ByteBuffer> entry) -> {
                        return entry.getValue() != null ?
                                entry :
                                new ImmutableEntry<>(entry.getKey(), ByteBuffer.allocate(0));
                    })
                    .forEach(builder::put);

            ResultSet rs = session.execute(stmt.setCustomPayload(builder.buildKeepingLast()));
            Row row = rs.one();

            assert row != null;
            String releaseVersion = row.getString("release_version");
            System.out.printf("Cassandra version is: %s%n", releaseVersion);
        }
    }
}

That actually does work, and it's all stream-based so that the only map you create is the one you're actually add to the Statement object. You could pull the immutable Map.Entry impl into a utility class somewhere to help keep the change down but in the end yeah, I agree it's not quite as clean as I'd like. I'd still argue it's an improvement because (a) you avoid the intermediate maps and (b) you get the immutable part reflected in the type system but these maps are pretty short-lived anyway... so it probably doesn't matter that much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.unmodifiableWrap just wraps the underlying map with some API methods that prevent modification, so it's not like it's creating an expensive copy right? Or is there something I'm missing here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's probably right @tolbertam but in the end I don't think it matters much. As I mentioned in my earlier comment my hope was that the Guava ImmutableMap code would be easier to read and comprehend but the necessity to map null values there makes the code more convoluted. You do get the advantage of having immutability as part of the type system but that doesn't but you much if you just stick the results in a Statement where custom payload is just a Map anyway.

Given all of that I'm okay with the original impl. My primary goal in this exercise was just to better understand (and document) how null handling even entered into play here. The driver docs you cited clearly indicated that null handling is possible (and relevant) here but I wanted to completely understand why that was the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Andrew, Collections.unmodifiableMap only wraps and does not create a new map.
I added comments, hope it makes it easier to read.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,22 @@
import com.datastax.oss.driver.api.core.servererrors.ServerError;
import com.datastax.oss.driver.api.core.servererrors.UnavailableException;
import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
import com.datastax.oss.protocol.internal.ProtocolConstants;
import com.datastax.oss.protocol.internal.response.Error;
import com.datastax.oss.protocol.internal.response.error.ReadTimeout;
import com.datastax.oss.protocol.internal.response.error.Unavailable;
import com.datastax.oss.protocol.internal.response.error.WriteTimeout;
import com.tngtech.java.junit.dataprovider.DataProvider;
import com.tngtech.java.junit.dataprovider.UseDataProvider;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;

public class CqlRequestHandlerRetryTest extends CqlRequestHandlerTestBase {
Expand Down Expand Up @@ -384,6 +390,63 @@ public void should_rethrow_error_if_not_idempotent_and_error_unsafe_or_policy_re
}
}

@Test
@UseDataProvider("failureAndIdempotent")
public void should_not_fail_with_duplicate_key_when_retrying_with_request_id_generator(
FailureScenario failureScenario, boolean defaultIdempotence, Statement<?> statement) {

// Create a RequestIdGenerator that uses the same key as the statement's custom payload
RequestIdGenerator requestIdGenerator =
new RequestIdGenerator() {
private AtomicInteger counter = new AtomicInteger(0);

@Override
public String getSessionRequestId() {
return "session-123";
}

@Override
public String getNodeRequestId(@NonNull Request request, @NonNull String parentId) {
return parentId + "-" + counter.getAndIncrement();
}
};

RequestHandlerTestHarness.Builder harnessBuilder =
RequestHandlerTestHarness.builder()
.withDefaultIdempotence(defaultIdempotence)
.withRequestIdGenerator(requestIdGenerator);
failureScenario.mockRequestError(harnessBuilder, node1);
harnessBuilder.withResponse(node2, defaultFrameOf(singleRow()));

try (RequestHandlerTestHarness harness = harnessBuilder.build()) {
failureScenario.mockRetryPolicyVerdict(
harness.getContext().getRetryPolicy(anyString()), RetryVerdict.RETRY_NEXT);

CompletionStage<AsyncResultSet> resultSetFuture =
new CqlRequestHandler(statement, harness.getSession(), harness.getContext(), "test")
.handle();

// The test should succeed without throwing a duplicate key exception
assertThatStage(resultSetFuture)
.isSuccess(
resultSet -> {
Iterator<Row> rows = resultSet.currentPage().iterator();
assertThat(rows.hasNext()).isTrue();
assertThat(rows.next().getString("message")).isEqualTo("hello, world");

ExecutionInfo executionInfo = resultSet.getExecutionInfo();
assertThat(executionInfo.getCoordinator()).isEqualTo(node2);
assertThat(executionInfo.getErrors()).hasSize(1);
assertThat(executionInfo.getErrors().get(0).getKey()).isEqualTo(node1);

// Verify that the custom payload still contains the request ID key
// (either the original value or the generated one, depending on implementation)
assertThat(executionInfo.getRequest().getCustomPayload().get("request-id"))
.isEqualTo(ByteBuffer.wrap("session-123-1".getBytes(StandardCharsets.UTF_8)));
});
}
}

/**
* Sets up the mocks to simulate an error from a node, and make the retry policy return a given
* decision for that error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy;
import com.datastax.oss.driver.api.core.time.TimestampGenerator;
import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.datastax.oss.driver.internal.core.DefaultConsistencyLevelRegistry;
import com.datastax.oss.driver.internal.core.ProtocolFeature;
Expand Down Expand Up @@ -170,7 +171,8 @@ protected RequestHandlerTestHarness(Builder builder) {

when(context.getRequestTracker()).thenReturn(new NoopRequestTracker(context));

when(context.getRequestIdGenerator()).thenReturn(Optional.empty());
when(context.getRequestIdGenerator())
.thenReturn(Optional.ofNullable(builder.requestIdGenerator));
}

public DefaultSession getSession() {
Expand Down Expand Up @@ -203,6 +205,7 @@ public static class Builder {
private final List<PoolBehavior> poolBehaviors = new ArrayList<>();
private boolean defaultIdempotence;
private ProtocolVersion protocolVersion;
private RequestIdGenerator requestIdGenerator;

/**
* Sets the given node as the next one in the query plan; an empty pool will be simulated when
Expand Down Expand Up @@ -258,6 +261,11 @@ public Builder withProtocolVersion(ProtocolVersion protocolVersion) {
return this;
}

public Builder withRequestIdGenerator(RequestIdGenerator requestIdGenerator) {
this.requestIdGenerator = requestIdGenerator;
return this;
}

/**
* Sets the given node as the next one in the query plan; the test code is responsible of
* calling the methods on the returned object to complete the write and the query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
*/
package com.datastax.oss.driver.core.tracker;

import static com.datastax.oss.driver.Assertions.assertThatStage;
import static org.assertj.core.api.Assertions.assertThat;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
Expand Down Expand Up @@ -119,7 +121,24 @@ public void should_not_write_id_to_custom_payload_when_key_is_not_set() {
try (CqlSession session = SessionUtils.newSession(ccmRule, loader)) {
String query = "SELECT * FROM system.local";
ResultSet rs = session.execute(query);
assertThat(rs.getExecutionInfo().getRequest().getCustomPayload().get("trace_key")).isNull();
assertThat(rs.getExecutionInfo().getRequest().getCustomPayload().get("request-id")).isNull();
}
}

@Test
public void should_succeed_with_null_value_in_custom_payload() {
DriverConfigLoader loader =
SessionUtils.configLoaderBuilder()
.withString(
DefaultDriverOption.REQUEST_ID_GENERATOR_CLASS, "W3CContextRequestIdGenerator")
.build();
try (CqlSession session = SessionUtils.newSession(ccmRule, loader)) {
String query = "SELECT * FROM system.local";
Map<String, ByteBuffer> customPayload =
new NullAllowingImmutableMap.Builder<String, ByteBuffer>(1).put("my_key", null).build();
SimpleStatement statement =
SimpleStatement.newInstance(query).setCustomPayload(customPayload);
assertThatStage(session.executeAsync(statement)).isSuccess();
}
}
}