4949
5050from _pulsar import Result , CompressionType , ConsumerType , InitialPosition , PartitionsRoutingMode , BatchingType , \
5151 LoggerLevel , BatchReceivePolicy , KeySharedPolicy , KeySharedMode , ProducerAccessMode , RegexSubscriptionMode , \
52- DeadLetterPolicyBuilder # noqa: F401
52+ DeadLetterPolicyBuilder , ConsumerCryptoFailureAction # noqa: F401
5353
5454from pulsar .__about__ import __version__
5555
@@ -876,6 +876,7 @@ def subscribe(self, topic, subscription_name,
876876 batch_index_ack_enabled = False ,
877877 regex_subscription_mode : RegexSubscriptionMode = RegexSubscriptionMode .PersistentOnly ,
878878 dead_letter_policy : Union [None , ConsumerDeadLetterPolicy ] = None ,
879+ crypto_failure_action : ConsumerCryptoFailureAction = ConsumerCryptoFailureAction .FAIL ,
879880 ):
880881 """
881882 Subscribe to the given topic and subscription combination.
@@ -979,6 +980,19 @@ def my_listener(consumer, message):
979980 stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're
980981 exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
981982 automatically.
983+ crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
984+ Set the behavior when the decryption fails. The default is to fail the message.
985+
986+ Supported actions:
987+
988+ * ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
989+ * ConsumerCryptoFailureAction.DISCARD:
990+ Message is silently acknowledged and not delivered to the application.
991+ * ConsumerCryptoFailureAction.CONSUME:
992+ Deliver the encrypted message to the application. It's the application's responsibility
993+ to decrypt the message. If message is also compressed, decompression will fail. If the
994+ message contains batch messages, client will not be able to retrieve individual messages
995+ in the batch.
982996 """
983997 _check_type (str , subscription_name , 'subscription_name' )
984998 _check_type (ConsumerType , consumer_type , 'consumer_type' )
@@ -1002,6 +1016,7 @@ def my_listener(consumer, message):
10021016 _check_type_or_none (ConsumerKeySharedPolicy , key_shared_policy , 'key_shared_policy' )
10031017 _check_type (bool , batch_index_ack_enabled , 'batch_index_ack_enabled' )
10041018 _check_type (RegexSubscriptionMode , regex_subscription_mode , 'regex_subscription_mode' )
1019+ _check_type (ConsumerCryptoFailureAction , crypto_failure_action , 'crypto_failure_action' )
10051020
10061021 conf = _pulsar .ConsumerConfiguration ()
10071022 conf .consumer_type (consumer_type )
@@ -1040,6 +1055,7 @@ def my_listener(consumer, message):
10401055 conf .batch_index_ack_enabled (batch_index_ack_enabled )
10411056 if dead_letter_policy :
10421057 conf .dead_letter_policy (dead_letter_policy .policy ())
1058+ conf .crypto_failure_action (crypto_failure_action )
10431059
10441060 c = Consumer ()
10451061 if isinstance (topic , str ):
@@ -1068,7 +1084,8 @@ def create_reader(self, topic, start_message_id,
10681084 subscription_role_prefix = None ,
10691085 is_read_compacted = False ,
10701086 crypto_key_reader : Union [None , CryptoKeyReader ] = None ,
1071- start_message_id_inclusive = False
1087+ start_message_id_inclusive = False ,
1088+ crypto_failure_action : ConsumerCryptoFailureAction = ConsumerCryptoFailureAction .FAIL ,
10721089 ):
10731090 """
10741091 Create a reader on a particular topic
@@ -1129,6 +1146,19 @@ def my_listener(reader, message):
11291146 and private key decryption messages for the consumer
11301147 start_message_id_inclusive: bool, default=False
11311148 Set the reader to include the startMessageId or given position of any reset operation like Reader.seek
1149+ crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
1150+ Set the behavior when the decryption fails. The default is to fail the message.
1151+
1152+ Supported actions:
1153+
1154+ * ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
1155+ * ConsumerCryptoFailureAction.DISCARD:
1156+ Message is silently acknowledged and not delivered to the application.
1157+ * ConsumerCryptoFailureAction.CONSUME:
1158+ Deliver the encrypted message to the application. It's the application's responsibility
1159+ to decrypt the message. If message is also compressed, decompression will fail. If the
1160+ message contains batch messages, client will not be able to retrieve individual messages
1161+ in the batch.
11321162 """
11331163
11341164 # If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
@@ -1144,6 +1174,7 @@ def my_listener(reader, message):
11441174 _check_type (bool , is_read_compacted , 'is_read_compacted' )
11451175 _check_type_or_none (CryptoKeyReader , crypto_key_reader , 'crypto_key_reader' )
11461176 _check_type (bool , start_message_id_inclusive , 'start_message_id_inclusive' )
1177+ _check_type (ConsumerCryptoFailureAction , crypto_failure_action , 'crypto_failure_action' )
11471178
11481179 conf = _pulsar .ReaderConfiguration ()
11491180 if reader_listener :
@@ -1158,6 +1189,7 @@ def my_listener(reader, message):
11581189 if crypto_key_reader :
11591190 conf .crypto_key_reader (crypto_key_reader .cryptoKeyReader )
11601191 conf .start_message_id_inclusive (start_message_id_inclusive )
1192+ conf .crypto_failure_action (crypto_failure_action )
11611193
11621194 c = Reader ()
11631195 c ._reader = self ._client .create_reader (topic , start_message_id , conf )
0 commit comments