33import io
44import os
55import socket
6+ import sys
67import threading
78import weakref
89from io import SEEK_END
3233 TimeoutError ,
3334)
3435from redis .retry import Retry
35- from redis .utils import CRYPTOGRAPHY_AVAILABLE , HIREDIS_AVAILABLE , str_if_bytes
36+ from redis .utils import (
37+ CRYPTOGRAPHY_AVAILABLE ,
38+ HIREDIS_AVAILABLE ,
39+ HIREDIS_PACK_AVAILABLE ,
40+ str_if_bytes ,
41+ )
3642
3743try :
3844 import ssl
@@ -509,6 +515,75 @@ def read_response(self, disable_decoding=False):
509515 DefaultParser = PythonParser
510516
511517
518+ class HiredisRespSerializer :
519+ def pack (self , * args ):
520+ """Pack a series of arguments into the Redis protocol"""
521+ output = []
522+
523+ if isinstance (args [0 ], str ):
524+ args = tuple (args [0 ].encode ().split ()) + args [1 :]
525+ elif b" " in args [0 ]:
526+ args = tuple (args [0 ].split ()) + args [1 :]
527+ try :
528+ output .append (hiredis .pack_command (args ))
529+ except TypeError :
530+ _ , value , traceback = sys .exc_info ()
531+ raise DataError (value ).with_traceback (traceback )
532+
533+ return output
534+
535+
536+ class PythonRespSerializer :
537+ def __init__ (self , buffer_cutoff , encode ) -> None :
538+ self ._buffer_cutoff = buffer_cutoff
539+ self .encode = encode
540+
541+ def pack (self , * args ):
542+ """Pack a series of arguments into the Redis protocol"""
543+ output = []
544+ # the client might have included 1 or more literal arguments in
545+ # the command name, e.g., 'CONFIG GET'. The Redis server expects these
546+ # arguments to be sent separately, so split the first argument
547+ # manually. These arguments should be bytestrings so that they are
548+ # not encoded.
549+ if isinstance (args [0 ], str ):
550+ args = tuple (args [0 ].encode ().split ()) + args [1 :]
551+ elif b" " in args [0 ]:
552+ args = tuple (args [0 ].split ()) + args [1 :]
553+
554+ buff = SYM_EMPTY .join ((SYM_STAR , str (len (args )).encode (), SYM_CRLF ))
555+
556+ buffer_cutoff = self ._buffer_cutoff
557+ for arg in map (self .encode , args ):
558+ # to avoid large string mallocs, chunk the command into the
559+ # output list if we're sending large values or memoryviews
560+ arg_length = len (arg )
561+ if (
562+ len (buff ) > buffer_cutoff
563+ or arg_length > buffer_cutoff
564+ or isinstance (arg , memoryview )
565+ ):
566+ buff = SYM_EMPTY .join (
567+ (buff , SYM_DOLLAR , str (arg_length ).encode (), SYM_CRLF )
568+ )
569+ output .append (buff )
570+ output .append (arg )
571+ buff = SYM_CRLF
572+ else :
573+ buff = SYM_EMPTY .join (
574+ (
575+ buff ,
576+ SYM_DOLLAR ,
577+ str (arg_length ).encode (),
578+ SYM_CRLF ,
579+ arg ,
580+ SYM_CRLF ,
581+ )
582+ )
583+ output .append (buff )
584+ return output
585+
586+
512587class Connection :
513588 "Manages TCP communication to and from a Redis server"
514589
@@ -536,6 +611,7 @@ def __init__(
536611 retry = None ,
537612 redis_connect_func = None ,
538613 credential_provider : Optional [CredentialProvider ] = None ,
614+ command_packer = None ,
539615 ):
540616 """
541617 Initialize a new Connection.
@@ -590,6 +666,7 @@ def __init__(
590666 self .set_parser (parser_class )
591667 self ._connect_callbacks = []
592668 self ._buffer_cutoff = 6000
669+ self ._command_packer = self ._construct_command_packer (command_packer )
593670
594671 def __repr__ (self ):
595672 repr_args = "," .join ([f"{ k } ={ v } " for k , v in self .repr_pieces ()])
@@ -607,6 +684,14 @@ def __del__(self):
607684 except Exception :
608685 pass
609686
687+ def _construct_command_packer (self , packer ):
688+ if packer is not None :
689+ return packer
690+ elif HIREDIS_PACK_AVAILABLE :
691+ return HiredisRespSerializer ()
692+ else :
693+ return PythonRespSerializer (self ._buffer_cutoff , self .encoder .encode )
694+
610695 def register_connect_callback (self , callback ):
611696 self ._connect_callbacks .append (weakref .WeakMethod (callback ))
612697
@@ -827,7 +912,8 @@ def send_packed_command(self, command, check_health=True):
827912 def send_command (self , * args , ** kwargs ):
828913 """Pack and send a command to the Redis server"""
829914 self .send_packed_command (
830- self .pack_command (* args ), check_health = kwargs .get ("check_health" , True )
915+ self ._command_packer .pack (* args ),
916+ check_health = kwargs .get ("check_health" , True ),
831917 )
832918
833919 def can_read (self , timeout = 0 ):
@@ -872,48 +958,7 @@ def read_response(self, disable_decoding=False):
872958
873959 def pack_command (self , * args ):
874960 """Pack a series of arguments into the Redis protocol"""
875- output = []
876- # the client might have included 1 or more literal arguments in
877- # the command name, e.g., 'CONFIG GET'. The Redis server expects these
878- # arguments to be sent separately, so split the first argument
879- # manually. These arguments should be bytestrings so that they are
880- # not encoded.
881- if isinstance (args [0 ], str ):
882- args = tuple (args [0 ].encode ().split ()) + args [1 :]
883- elif b" " in args [0 ]:
884- args = tuple (args [0 ].split ()) + args [1 :]
885-
886- buff = SYM_EMPTY .join ((SYM_STAR , str (len (args )).encode (), SYM_CRLF ))
887-
888- buffer_cutoff = self ._buffer_cutoff
889- for arg in map (self .encoder .encode , args ):
890- # to avoid large string mallocs, chunk the command into the
891- # output list if we're sending large values or memoryviews
892- arg_length = len (arg )
893- if (
894- len (buff ) > buffer_cutoff
895- or arg_length > buffer_cutoff
896- or isinstance (arg , memoryview )
897- ):
898- buff = SYM_EMPTY .join (
899- (buff , SYM_DOLLAR , str (arg_length ).encode (), SYM_CRLF )
900- )
901- output .append (buff )
902- output .append (arg )
903- buff = SYM_CRLF
904- else :
905- buff = SYM_EMPTY .join (
906- (
907- buff ,
908- SYM_DOLLAR ,
909- str (arg_length ).encode (),
910- SYM_CRLF ,
911- arg ,
912- SYM_CRLF ,
913- )
914- )
915- output .append (buff )
916- return output
961+ return self ._command_packer .pack (* args )
917962
918963 def pack_commands (self , commands ):
919964 """Pack multiple commands into the Redis protocol"""
@@ -923,7 +968,7 @@ def pack_commands(self, commands):
923968 buffer_cutoff = self ._buffer_cutoff
924969
925970 for cmd in commands :
926- for chunk in self .pack_command (* cmd ):
971+ for chunk in self ._command_packer . pack (* cmd ):
927972 chunklen = len (chunk )
928973 if (
929974 buffer_length > buffer_cutoff
0 commit comments