1515
1616import com .rabbitmq .stream .ChunkChecksum ;
1717import com .rabbitmq .stream .ChunkChecksumValidationException ;
18+ import com .rabbitmq .stream .StreamException ;
1819import io .netty .buffer .ByteBuf ;
1920import io .netty .util .ByteProcessor ;
21+ import java .lang .reflect .InvocationTargetException ;
22+ import java .lang .reflect .Method ;
23+ import java .nio .ByteBuffer ;
2024import java .util .function .Supplier ;
2125import java .util .zip .CRC32 ;
2226import java .util .zip .Checksum ;
27+ import org .slf4j .Logger ;
28+ import org .slf4j .LoggerFactory ;
2329
2430class JdkChunkChecksum implements ChunkChecksum {
2531
32+ static final ChunkChecksum CRC32_SINGLETON ;
33+ private static final Logger LOGGER = LoggerFactory .getLogger (JdkChunkChecksum .class );
2634 private static final Supplier <Checksum > CRC32_SUPPLIER = CRC32 ::new ;
27- static final ChunkChecksum CRC32_SINGLETON = new JdkChunkChecksum (CRC32_SUPPLIER );
35+
36+ static {
37+ if (isChecksumUpdateByteBufferAvailable ()) {
38+ LOGGER .debug ("Checksum#update(ByteBuffer) method not available, using it for direct buffers" );
39+ CRC32_SINGLETON = new ByteBufferDirectByteBufChecksum (CRC32_SUPPLIER );
40+ } else {
41+ LOGGER .debug (
42+ "Checksum#update(ByteBuffer) method not available, using byte-by-byte CRC calculation for direct buffers" );
43+ CRC32_SINGLETON = new JdkChunkChecksum (CRC32_SUPPLIER );
44+ }
45+ }
46+
2847 private final Supplier <Checksum > checksumSupplier ;
2948
3049 JdkChunkChecksum () {
@@ -35,6 +54,15 @@ class JdkChunkChecksum implements ChunkChecksum {
3554 this .checksumSupplier = checksumSupplier ;
3655 }
3756
57+ private static boolean isChecksumUpdateByteBufferAvailable () {
58+ try {
59+ Checksum .class .getDeclaredMethod ("update" , ByteBuffer .class );
60+ return true ;
61+ } catch (Exception e ) {
62+ return false ;
63+ }
64+ }
65+
3866 @ Override
3967 public void checksum (ByteBuf byteBuf , long dataLength , long expected ) {
4068 Checksum checksum = checksumSupplier .get ();
@@ -50,6 +78,44 @@ public void checksum(ByteBuf byteBuf, long dataLength, long expected) {
5078 }
5179 }
5280
81+ private static class ByteBufferDirectByteBufChecksum implements ChunkChecksum {
82+
83+ private final Supplier <Checksum > checksumSupplier ;
84+ private final Method updateMethod ;
85+
86+ private ByteBufferDirectByteBufChecksum (Supplier <Checksum > checksumSupplier ) {
87+ this .checksumSupplier = checksumSupplier ;
88+ try {
89+ this .updateMethod = Checksum .class .getDeclaredMethod ("update" , ByteBuffer .class );
90+ } catch (NoSuchMethodException e ) {
91+ throw new StreamException ("Error while looking up Checksum#update(ByteBuffer) method" , e );
92+ }
93+ }
94+
95+ @ Override
96+ public void checksum (ByteBuf byteBuf , long dataLength , long expected ) {
97+ Checksum checksum = checksumSupplier .get ();
98+ if (byteBuf .hasArray ()) {
99+ checksum .update (
100+ byteBuf .array (),
101+ byteBuf .arrayOffset () + byteBuf .readerIndex (),
102+ byteBuf .readableBytes ());
103+ } else {
104+ try {
105+ this .updateMethod .invoke (
106+ checksum , byteBuf .nioBuffer (byteBuf .readerIndex (), byteBuf .readableBytes ()));
107+ } catch (IllegalAccessException e ) {
108+ throw new StreamException ("Error while calculating CRC" , e );
109+ } catch (InvocationTargetException e ) {
110+ throw new StreamException ("Error while calculating CRC" , e );
111+ }
112+ }
113+ if (expected != checksum .getValue ()) {
114+ throw new ChunkChecksumValidationException (expected , checksum .getValue ());
115+ }
116+ }
117+ }
118+
53119 private static class UpdateProcessor implements ByteProcessor {
54120
55121 private final Checksum checksum ;
0 commit comments