2828 # Python built without zlib support.
2929 _HAVE_ZLIB = False
3030
31+ try :
32+ from zstandard import ZstdCompressor , ZstdDecompressor
33+ _HAVE_ZSTD = True
34+ except ImportError :
35+ _HAVE_ZSTD = False
36+
3137from pymongo .monitoring import _SENSITIVE_COMMANDS
3238
33- _SUPPORTED_COMPRESSORS = set (["snappy" , "zlib" ])
39+ _SUPPORTED_COMPRESSORS = set (["snappy" , "zlib" , "zstd" ])
3440_NO_COMPRESSION = set (['ismaster' ])
3541_NO_COMPRESSION .update (_SENSITIVE_COMMANDS )
3642
@@ -57,6 +63,11 @@ def validate_compressors(dummy, value):
5763 warnings .warn (
5864 "Wire protocol compression with zlib is not available. "
5965 "The zlib module is not available." )
66+ elif compressor == "zstd" and not _HAVE_ZSTD :
67+ compressors .remove (compressor )
68+ warnings .warn (
69+ "Wire protocol compression with zstandard is not available. "
70+ "You must install the zstandard module for zstandard support." )
6071 return compressors
6172
6273
@@ -83,6 +94,8 @@ def get_compression_context(self, compressors):
8394 return SnappyContext ()
8495 elif chosen == "zlib" :
8596 return ZlibContext (self .zlib_compression_level )
97+ elif chosen == "zstd" :
98+ return ZstdContext ()
8699
87100
88101def _zlib_no_compress (data ):
@@ -113,6 +126,16 @@ def __init__(self, level):
113126 self .compress = lambda data : zlib .compress (data , level )
114127
115128
129+ class ZstdContext (object ):
130+ compressor_id = 3
131+
132+ @staticmethod
133+ def compress (data ):
134+ # ZstdCompressor is not thread safe.
135+ # TODO: Use a pool?
136+ return ZstdCompressor ().compress (data )
137+
138+
116139def decompress (data , compressor_id ):
117140 if compressor_id == SnappyContext .compressor_id :
118141 # python-snappy doesn't support the buffer interface.
@@ -126,5 +149,9 @@ def decompress(data, compressor_id):
126149 return snappy .uncompress (bytes (data ))
127150 elif compressor_id == ZlibContext .compressor_id :
128151 return zlib .decompress (data )
152+ elif compressor_id == ZstdContext .compressor_id :
153+ # ZstdDecompressor is not thread safe.
154+ # TODO: Use a pool?
155+ return ZstdDecompressor ().decompress (data )
129156 else :
130157 raise ValueError ("Unknown compressorId %d" % (compressor_id ,))
0 commit comments