diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml
index ada77e28..79f6353c 100644
--- a/.github/workflows/run-tests.yml
+++ b/.github/workflows/run-tests.yml
@@ -8,12 +8,10 @@ jobs:
strategy:
matrix:
- php: [8.4, 8.3, 8.2]
- laravel: [12.*, 11.*, 10.*]
+ php: [8.4, 8.3]
+ laravel: [12.*, 11.*]
dependency-version: [prefer-stable]
include:
- - laravel: 10.*
- testbench: 8.*
- laravel: 11.*
testbench: 9.*
- laravel: 12.*
diff --git a/composer.json b/composer.json
index 740ab8f3..c424d2aa 100644
--- a/composer.json
+++ b/composer.json
@@ -3,18 +3,18 @@
"description": "A kafka driver for laravel",
"type": "library",
"require": {
- "php": "^8.2|^8.3|^8.4",
+ "php": "^8.3|^8.4",
"ext-rdkafka": "^6.0",
"monolog/monolog": "^3",
"mateusjunges/avro-serde-php": "^3.0",
- "illuminate/support": "^10.0|^11.0|^12.0",
- "illuminate/contracts": "^10.0|^11.0|^12.0"
+ "illuminate/support": "^11.0|^12.0",
+ "illuminate/contracts": "^11.0|^12.0"
},
"require-dev": {
"phpunit/phpunit": "^10.5|^11.5.3",
- "orchestra/testbench": "^7.16|^8.0|^9.0|^10.0",
+ "orchestra/testbench": "^9.0|^10.0",
"predis/predis": "^1",
- "rector/rector": "^0.19.8",
+ "rector/rector": "^2.1",
"laravel/pint": "dev-main"
},
"minimum-stability": "dev",
@@ -37,7 +37,7 @@
}
],
"scripts": {
- "test": "vendor/bin/phpunit tests",
+ "test": "vendor/bin/phpunit",
"format": "vendor/bin/pint"
},
"extra": {
diff --git a/docs/consuming-messages/queueable-handlers.md b/docs/consuming-messages/queueable-handlers.md
deleted file mode 100644
index 7fa1208c..00000000
--- a/docs/consuming-messages/queueable-handlers.md
+++ /dev/null
@@ -1,61 +0,0 @@
----
-title: Queueable handlers
-weight: 11
----
-
-Queueable handlers allow you to handle your kafka messages in a queue. This will put a job into the Laravel queue system for each message received by your Kafka consumer.
-
-```+parse
-
-```
-
-This only requires you to implements the `Illuminate\Contracts\Queue\ShouldQueue` interface in your Handler.
-
-This is how a queueable handler looks like:
-
-```php
-use Illuminate\Contracts\Queue\ShouldQueue;
-use Junges\Kafka\Contracts\Handler as HandlerContract;
-use Junges\Kafka\Contracts\KafkaConsumerMessage;
-
-class Handler implements HandlerContract, ShouldQueue
-{
- public function __invoke(KafkaConsumerMessage $message): void
- {
- // Handle the consumed message.
- }
-}
-```
-
-As you can see on the `__invoke` method, queued handlers does not have access to a `MessageConsumer` instance when handling the message,
-because it's running on a laravel queue and there are no actions that can be performed asynchronously on Kafka message consumer.
-
-You can specify which queue connection and queue name to use for your handler by implementing the `onConnection` and `onQueue` methods:
-
-```php
-use Illuminate\Contracts\Queue\ShouldQueue;
-use Junges\Kafka\Contracts\Handler as HandlerContract;
-use Junges\Kafka\Contracts\KafkaConsumerMessage;
-
-class Handler implements HandlerContract, ShouldQueue
-{
- public function __invoke(KafkaConsumerMessage $message): void
- {
- // Handle the consumed message.
- }
-
- public function onConnection(): string
- {
- return 'sqs'; // Specify your queue connection
- }
-
- public function onQueue(): string
- {
- return 'kafka-handlers'; // Specify your queue name
- }
-}
-```
-
-After creating your handler class, you can use it just as a normal handler, and `laravel-kafka` will know how to handle it under the hoods 😄.
-
-
diff --git a/docs/producing-messages/producing-messages.md b/docs/producing-messages/producing-messages.md
index 25b12a89..70adca49 100644
--- a/docs/producing-messages/producing-messages.md
+++ b/docs/producing-messages/producing-messages.md
@@ -14,16 +14,24 @@ Kafka::publish('broker')->onTopic('topic-name')
This method returns a `ProducerBuilder` instance, which contains a few methods to configure your kafka producer.
The following lines describes these methods.
-If you are going to produce a lot of messages to different topics, please use the `asyncPublish` method on the `Junges\Kafka\Facades\Kafka` class:
+The default `publish()` method now uses asynchronous publishing for better performance. Messages are queued and flushed when the application terminates:
```php
use Junges\Kafka\Facades\Kafka;
-Kafka::asyncPublish('broker')->onTopic('topic-name')
+Kafka::publish('broker')->onTopic('topic-name')
```
-The main difference is that the Async Producer is a singleton and will only flush the producer when the application is shutting down, instead of after each send.
-This reduces the overhead when you want to send a lot of messages in your request handlers.
+The async producer is a singleton and will only flush messages when the application is shutting down, instead of after each send.
+This reduces overhead when you want to send a lot of messages in your request handlers.
+
+If you need immediate message flushing (synchronous publishing), use the `publishSync()` method:
+
+```php
+use Junges\Kafka\Facades\Kafka;
+
+Kafka::publishSync('broker')->onTopic('topic-name')
+```
```+parse
@@ -37,6 +45,6 @@ available on the `Kafka` facade (added in v2.2.0). This method will return a fre
use Junges\Kafka\Facades\Kafka;
Kafka::fresh()
- ->asyncPublish('broker')
+ ->publish('broker')
->onTopic('topic-name')
```
\ No newline at end of file
diff --git a/docs/producing-messages/publishing-to-kafka.md b/docs/producing-messages/publishing-to-kafka.md
index e95d473f..03c65878 100644
--- a/docs/producing-messages/publishing-to-kafka.md
+++ b/docs/producing-messages/publishing-to-kafka.md
@@ -18,8 +18,19 @@ $producer = Kafka::publish('broker')
$producer->send();
```
-If you want to send multiple messages, consider using the async producer instead. The default `send` method is recommended for low-throughput systems only, as it
-flushes the producer after every message that is sent.
+The `publish()` method uses asynchronous publishing for better performance, batching messages and flushing them when the application terminates.
+If you need immediate message flushing, use `publishSync()` instead:
+
+```php
+use Junges\Kafka\Facades\Kafka;
+
+// For immediate flush (synchronous)
+$producer = Kafka::publishSync('broker')
+ ->onTopic('topic')
+ ->withKafkaKey('kafka-key');
+
+$producer->send();
+```
```+parse
diff --git a/docs/upgrade-guide.md b/docs/upgrade-guide.md
index 7e28b71f..3699e10d 100644
--- a/docs/upgrade-guide.md
+++ b/docs/upgrade-guide.md
@@ -3,6 +3,36 @@ title: Upgrade guide
weight: 6
---
+## Upgrade to v3.0 from v2.9
+
+### Breaking Changes
+
+- `publish()` is now asynchronous by default. Messages are queued and flushed when the application terminates for better performance
+- Removed `asyncPublish()` and `publishAsync()` methods - use `publish()` for async behavior (default) or `publishSync()` for immediate flushing
+- Minimum PHP version raised to 8.3
+- Minimum Laravel version raised to 11.0
+- **NEW**: Added `publishSync()` method for synchronous message publishing with immediate flush
+
+### Migration Guide
+
+**Before (v2.9):**
+```php
+// Async publishing
+Kafka::asyncPublish()->onTopic('topic')->withBody(['data' => 'value'])->send();
+
+// Sync publishing
+Kafka::publish()->onTopic('topic')->withBody(['data' => 'value'])->send();
+```
+
+**After (v3.0):**
+```php
+// Async publishing (default behavior)
+Kafka::publish()->onTopic('topic')->withBody(['data' => 'value'])->send();
+
+// Sync publishing (immediate flush)
+Kafka::publishSync()->onTopic('topic')->withBody(['data' => 'value'])->send();
+```
+
## Upgrade to v2.9 from v2.8
- **BREAKING CHANGE**: Deprecated producer batch messages feature has been removed (`MessageBatch`, `sendBatch`, `produceBatch`). Use `Kafka::asyncPublish()` instead for better performance
diff --git a/rector.php b/rector.php
index 6f75d570..529cb230 100644
--- a/rector.php
+++ b/rector.php
@@ -3,20 +3,22 @@
use Rector\CodeQuality\Rector\Class_\InlineConstructorDefaultToPropertyRector;
use Rector\Config\RectorConfig;
use Rector\Set\ValueObject\LevelSetList;
+use Rector\TypeDeclaration\Rector\ClassMethod\ReturnNeverTypeRector;
return static function (RectorConfig $rectorConfig): void {
$rectorConfig->paths([
__DIR__.'/config',
- __DIR__.'/dev',
__DIR__.'/src',
__DIR__.'/tests',
]);
- // register a single rule
$rectorConfig->rule(InlineConstructorDefaultToPropertyRector::class);
- // define sets of rules
$rectorConfig->sets([
- LevelSetList::UP_TO_PHP_81,
+ LevelSetList::UP_TO_PHP_83,
+ ]);
+
+ $rectorConfig->skip([
+ ReturnNeverTypeRector::class,
]);
};
diff --git a/src/Commit/RetryableCommitter.php b/src/Commit/RetryableCommitter.php
index 6ec60567..df1adc89 100644
--- a/src/Commit/RetryableCommitter.php
+++ b/src/Commit/RetryableCommitter.php
@@ -10,7 +10,7 @@
class RetryableCommitter implements Committer
{
- private const RETRYABLE_ERRORS = [
+ private const array RETRYABLE_ERRORS = [
RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
diff --git a/src/Concerns/HandleConsumedMessage.php b/src/Concerns/HandleConsumedMessage.php
deleted file mode 100644
index e5f112a0..00000000
--- a/src/Concerns/HandleConsumedMessage.php
+++ /dev/null
@@ -1,28 +0,0 @@
- $this->wrapMiddleware($middleware, $consumer), $middlewares);
- $middlewares = array_reverse($middlewares);
-
- foreach ($middlewares as $middleware) {
- $handler = $middleware($handler);
- }
-
- $handler($message, $consumer);
- }
-}
diff --git a/src/Concerns/PrepareMiddlewares.php b/src/Concerns/PrepareMiddlewares.php
deleted file mode 100644
index 0ddd4d64..00000000
--- a/src/Concerns/PrepareMiddlewares.php
+++ /dev/null
@@ -1,24 +0,0 @@
- new $middleware,
- $middleware instanceof Middleware => $middleware,
- is_callable($middleware) => $middleware,
- default => throw new LogicException('Invalid middleware.')
- };
-
- return static fn (callable $handler) => static fn ($message) => $middleware($message, fn ($message) => $handler($message, $consumer));
- }
-}
diff --git a/src/Config/Config.php b/src/Config/Config.php
index c6b60522..cc6a66c7 100644
--- a/src/Config/Config.php
+++ b/src/Config/Config.php
@@ -9,11 +9,11 @@
class Config
{
- final public const SASL_PLAINTEXT = 'SASL_PLAINTEXT';
+ final public const string SASL_PLAINTEXT = 'SASL_PLAINTEXT';
- final public const SASL_SSL = 'SASL_SSL';
+ final public const string SASL_SSL = 'SASL_SSL';
- final public const PRODUCER_ONLY_CONFIG_OPTIONS = [
+ final public const array PRODUCER_ONLY_CONFIG_OPTIONS = [
'transactional.id',
'transaction.timeout.ms',
'enable.idempotence',
@@ -36,7 +36,7 @@ class Config
'sticky.partitioning.linger.ms',
];
- final public const CONSUMER_ONLY_CONFIG_OPTIONS = [
+ final public const array CONSUMER_ONLY_CONFIG_OPTIONS = [
'partition.assignment.strategy',
'session.timeout.ms',
'heartbeat.interval.ms',
diff --git a/src/Consumers/CallableConsumer.php b/src/Consumers/CallableConsumer.php
index 430d220d..bbd7323b 100644
--- a/src/Consumers/CallableConsumer.php
+++ b/src/Consumers/CallableConsumer.php
@@ -3,80 +3,49 @@
namespace Junges\Kafka\Consumers;
use Closure;
-use Illuminate\Contracts\Bus\Dispatcher;
-use Illuminate\Contracts\Queue\ShouldQueue;
-use Illuminate\Support\Facades\App;
-use Junges\Kafka\Concerns\HandleConsumedMessage;
-use Junges\Kafka\Concerns\PrepareMiddlewares;
use Junges\Kafka\Contracts\Consumer;
use Junges\Kafka\Contracts\ConsumerMessage;
use Junges\Kafka\Contracts\Handler;
use Junges\Kafka\Contracts\MessageConsumer;
+use Junges\Kafka\Contracts\Middleware;
+use LogicException;
class CallableConsumer extends Consumer
{
- use HandleConsumedMessage;
- use PrepareMiddlewares;
-
- private Dispatcher $dispatcher;
-
public function __construct(private Closure|Handler $handler, private readonly array $middlewares)
{
- $this->handler = $this->handler instanceof Handler
- ? $handler
- : $handler(...);
-
- $this->dispatcher = App::make(Dispatcher::class);
+ $this->handler = match (true) {
+ $handler instanceof Handler => $handler,
+ default => $handler(...),
+ };
}
- /** Handle the received message. */
public function handle(ConsumerMessage $message, MessageConsumer $consumer): void
- {
- // If the message handler should be queued, we will dispatch a job to handle this message.
- // Otherwise, the message will be handled synchronously.
- if ($this->shouldQueueHandler()) {
- $this->queueHandler($this->handler, $message, $this->middlewares);
-
- return;
- }
-
- $this->handleMessageSynchronously($message, $consumer);
- }
-
- private function shouldQueueHandler(): bool
- {
- return $this->handler instanceof ShouldQueue;
- }
-
- private function handleMessageSynchronously(ConsumerMessage $message, MessageConsumer $consumer): void
{
$this->handleConsumedMessage($message, $this->handler, $consumer, $this->middlewares);
}
- /**
- * This method dispatches a job to handle the consumed message. You can customize the connection and
- * queue in which it will be dispatched using the onConnection and onQueue methods. If this
- * methods doesn't exist in the handler class, we will use the default configuration accordingly to
- * your queue.php config file.
- */
- private function queueHandler(Handler $handler, ConsumerMessage $message, array $middlewares): void
+ private function handleConsumedMessage(ConsumerMessage $message, Handler|Closure $handler, ?MessageConsumer $consumer = null, array $middlewares = []): void
{
- $connection = config('queue.default');
+ $middlewares = array_map(fn ($middleware) => $this->wrapMiddleware($middleware, $consumer), $middlewares);
+ $middlewares = array_reverse($middlewares);
- if (method_exists($handler, 'onConnection')) {
- $connection = $handler->onConnection();
+ foreach ($middlewares as $middleware) {
+ $handler = $middleware($handler);
}
- $queue = config("queue.$connection.queue", 'default');
-
- if (method_exists($handler, 'onQueue')) {
- $queue = $handler->onQueue();
- }
+ $handler($message, $consumer);
+ }
- $this->dispatcher->dispatch(
- (new DispatchQueuedHandler($handler, $message, $middlewares))
- ->onQueue($queue)
- ->onConnection($connection)
- );
+ private function wrapMiddleware(Middleware|string|callable $middleware, ?MessageConsumer $consumer = null): callable
+ {
+ $middleware = match (true) {
+ is_string($middleware) && is_subclass_of($middleware, Middleware::class) => new $middleware,
+ $middleware instanceof Middleware => $middleware,
+ is_callable($middleware) => $middleware,
+ default => throw new LogicException('Invalid middleware.')
+ };
+
+ return static fn (callable $handler) => static fn ($message) => $middleware($message, fn ($message) => $handler($message, $consumer));
}
}
diff --git a/src/Consumers/Consumer.php b/src/Consumers/Consumer.php
index a6ef021e..311d6ae2 100644
--- a/src/Consumers/Consumer.php
+++ b/src/Consumers/Consumer.php
@@ -33,23 +33,23 @@
class Consumer implements MessageConsumer
{
- private const IGNORABLE_CONSUMER_ERRORS = [
+ private const array IGNORABLE_CONSUMER_ERRORS = [
RD_KAFKA_RESP_ERR__PARTITION_EOF,
RD_KAFKA_RESP_ERR__TRANSPORT,
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
RD_KAFKA_RESP_ERR__TIMED_OUT,
];
- private const CONSUME_STOP_EOF_ERRORS = [
+ private const array CONSUME_STOP_EOF_ERRORS = [
RD_KAFKA_RESP_ERR__PARTITION_EOF,
RD_KAFKA_RESP_ERR__TIMED_OUT,
];
- private const TIMEOUT_ERRORS = [
+ private const array TIMEOUT_ERRORS = [
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
];
- private const IGNORABLE_COMMIT_ERRORS = [
+ private const array IGNORABLE_COMMIT_ERRORS = [
RD_KAFKA_RESP_ERR__NO_OFFSET,
];
@@ -73,9 +73,9 @@ class Consumer implements MessageConsumer
private bool $stopRequested = false;
- private ?Closure $whenStopConsuming;
+ private readonly ?Closure $whenStopConsuming;
- private Dispatcher $dispatcher;
+ private readonly Dispatcher $dispatcher;
public function __construct(private readonly Config $config, private readonly MessageDeserializer $deserializer, ?CommitterFactory $committerFactory = null)
{
@@ -390,7 +390,7 @@ private function buildHeadersForDlq(Message $message, ?Throwable $throwable = nu
$throwableHeaders['kafka_throwable_message'] = $throwable->getMessage();
$throwableHeaders['kafka_throwable_code'] = $throwable->getCode();
- $throwableHeaders['kafka_throwable_class_name'] = get_class($throwable);
+ $throwableHeaders['kafka_throwable_class_name'] = $throwable::class;
return array_merge($message->headers ?? [], $throwableHeaders);
}
diff --git a/src/Consumers/DispatchQueuedHandler.php b/src/Consumers/DispatchQueuedHandler.php
deleted file mode 100644
index 6c2fdbed..00000000
--- a/src/Consumers/DispatchQueuedHandler.php
+++ /dev/null
@@ -1,37 +0,0 @@
-handleConsumedMessage(
- message: $this->message,
- handler: $this->handler,
- middlewares: $this->middlewares
- );
- }
-}
diff --git a/src/Contracts/Manager.php b/src/Contracts/Manager.php
index a651e97f..87621d94 100644
--- a/src/Contracts/Manager.php
+++ b/src/Contracts/Manager.php
@@ -7,9 +7,12 @@ interface Manager
/** Returns a new fresh instance of the Manager. */
public function fresh(): self;
- /** Creates a new ProducerBuilder instance, setting brokers and topic. */
+ /** Creates a new async ProducerBuilder instance, setting brokers and topic. */
public function publish(?string $broker = null): MessageProducer;
+ /** Creates a synchronous ProducerBuilder instance for immediate message flushing. */
+ public function publishSync(?string $broker = null): MessageProducer;
+
/** Return a ConsumerBuilder instance. */
public function consumer(array $topics = [], ?string $groupId = null, ?string $brokers = null): ConsumerBuilder;
diff --git a/src/Events/CouldNotPublishMessage.php b/src/Events/CouldNotPublishMessage.php
index 6acfddee..654fcb1f 100644
--- a/src/Events/CouldNotPublishMessage.php
+++ b/src/Events/CouldNotPublishMessage.php
@@ -4,11 +4,11 @@
use Throwable;
-final class CouldNotPublishMessage
+final readonly class CouldNotPublishMessage
{
public function __construct(
- public readonly int $errorCode,
- public readonly string $message,
- public readonly Throwable $throwable,
+ public int $errorCode,
+ public string $message,
+ public Throwable $throwable,
) {}
}
diff --git a/src/Events/MessageConsumed.php b/src/Events/MessageConsumed.php
index f2a07874..521d9b33 100644
--- a/src/Events/MessageConsumed.php
+++ b/src/Events/MessageConsumed.php
@@ -4,10 +4,10 @@
use Junges\Kafka\Contracts\ConsumerMessage;
-final class MessageConsumed
+final readonly class MessageConsumed
{
public function __construct(
- public readonly ConsumerMessage $message
+ public ConsumerMessage $message
) {}
public function getMessageIdentifier(): string
diff --git a/src/Events/MessagePublished.php b/src/Events/MessagePublished.php
index 390d5cef..ad4abc92 100644
--- a/src/Events/MessagePublished.php
+++ b/src/Events/MessagePublished.php
@@ -4,10 +4,10 @@
use Junges\Kafka\Contracts\ProducerMessage;
-final class MessagePublished
+final readonly class MessagePublished
{
public function __construct(
- public readonly ProducerMessage $message,
+ public ProducerMessage $message,
) {}
public function getMessageIdentifier(): string
diff --git a/src/Events/PublishingMessage.php b/src/Events/PublishingMessage.php
index b89ee37d..df348d78 100644
--- a/src/Events/PublishingMessage.php
+++ b/src/Events/PublishingMessage.php
@@ -4,10 +4,10 @@
use Junges\Kafka\Contracts\ProducerMessage;
-final class PublishingMessage
+final readonly class PublishingMessage
{
public function __construct(
- public readonly ProducerMessage $message,
+ public ProducerMessage $message,
) {}
public function getMessageIdentifier(): string
diff --git a/src/Exceptions/SchemaRegistryException.php b/src/Exceptions/SchemaRegistryException.php
index 2b0d2798..366b88cc 100644
--- a/src/Exceptions/SchemaRegistryException.php
+++ b/src/Exceptions/SchemaRegistryException.php
@@ -4,5 +4,5 @@
class SchemaRegistryException extends LaravelKafkaException
{
- final public const SCHEMA_MAPPING_NOT_FOUND = 'There is no schema mapping topic: %s, type: %s';
+ final public const string SCHEMA_MAPPING_NOT_FOUND = 'There is no schema mapping topic: %s, type: %s';
}
diff --git a/src/Exceptions/Serializers/AvroSerializerException.php b/src/Exceptions/Serializers/AvroSerializerException.php
index 92d86a49..15f7c5bb 100644
--- a/src/Exceptions/Serializers/AvroSerializerException.php
+++ b/src/Exceptions/Serializers/AvroSerializerException.php
@@ -6,7 +6,7 @@
class AvroSerializerException extends LaravelKafkaException
{
- final public const NO_SCHEMA_FOR_TOPIC_MESSAGE = 'There is no %s avro schema defined for the topic %s';
+ final public const string NO_SCHEMA_FOR_TOPIC_MESSAGE = 'There is no %s avro schema defined for the topic %s';
- final public const UNABLE_TO_LOAD_DEFINITION_MESSAGE = 'Was unable to load definition for schema %s';
+ final public const string UNABLE_TO_LOAD_DEFINITION_MESSAGE = 'Was unable to load definition for schema %s';
}
diff --git a/src/Exceptions/Transactions/TransactionFatalErrorException.php b/src/Exceptions/Transactions/TransactionFatalErrorException.php
index 08bb7e79..b3430597 100644
--- a/src/Exceptions/Transactions/TransactionFatalErrorException.php
+++ b/src/Exceptions/Transactions/TransactionFatalErrorException.php
@@ -7,7 +7,7 @@
final class TransactionFatalErrorException extends LaravelKafkaException
{
- private const FATAL_EXCEPTION_MESSAGE = 'Transaction failed with a fatal error. You must create a new producer as this one can not be used anymore. [%s]';
+ private const string FATAL_EXCEPTION_MESSAGE = 'Transaction failed with a fatal error. You must create a new producer as this one can not be used anymore. [%s]';
public static function new(KafkaErrorException $baseException): self
{
diff --git a/src/Exceptions/Transactions/TransactionShouldBeAbortedException.php b/src/Exceptions/Transactions/TransactionShouldBeAbortedException.php
index f03cbdcb..944b8562 100644
--- a/src/Exceptions/Transactions/TransactionShouldBeAbortedException.php
+++ b/src/Exceptions/Transactions/TransactionShouldBeAbortedException.php
@@ -7,7 +7,7 @@
final class TransactionShouldBeAbortedException extends LaravelKafkaException
{
- private const ABORTABLE_EXCEPTION_MESSAGE = 'Transaction failed. You must abort your current transaction and start a new one. [%s]';
+ private const string ABORTABLE_EXCEPTION_MESSAGE = 'Transaction failed. You must abort your current transaction and start a new one. [%s]';
public static function new(KafkaErrorException $baseException): self
{
diff --git a/src/Exceptions/Transactions/TransactionShouldBeRetriedException.php b/src/Exceptions/Transactions/TransactionShouldBeRetriedException.php
index fd780236..51622582 100644
--- a/src/Exceptions/Transactions/TransactionShouldBeRetriedException.php
+++ b/src/Exceptions/Transactions/TransactionShouldBeRetriedException.php
@@ -7,7 +7,7 @@
final class TransactionShouldBeRetriedException extends LaravelKafkaException
{
- private const RETRIABLE_EXCEPTION_MESSAGE = 'This transaction failed, but can be retried. [%s]';
+ private const string RETRIABLE_EXCEPTION_MESSAGE = 'This transaction failed, but can be retried. [%s]';
public static function new(KafkaErrorException $baseException): self
{
diff --git a/src/Facades/Kafka.php b/src/Facades/Kafka.php
index 032aaa69..7eb9a7f6 100644
--- a/src/Facades/Kafka.php
+++ b/src/Facades/Kafka.php
@@ -8,7 +8,7 @@
/**
* @method static \Junges\Kafka\Contracts\MessageProducer publish(string $broker = null)
- * @method static \Junges\Kafka\Contracts\MessageProducer asyncPublish(string $broker = null)
+ * @method static \Junges\Kafka\Contracts\MessageProducer publishSync(string $broker = null)
* @method static \Junges\Kafka\Factory fresh(string $broker = null)
* @method static \Junges\Kafka\Consumers\Builder consumer(array $topics = [], string $groupId = null, string $brokers = null)
* @method static void assertPublished(ProducerMessage $expectedMessage = null, callable $callback = null)
diff --git a/src/Factory.php b/src/Factory.php
index d7eedebe..ac0b7e3f 100644
--- a/src/Factory.php
+++ b/src/Factory.php
@@ -28,44 +28,33 @@ public function publish(?string $broker = null): MessageProducer
return Kafka::fake()->publish($broker);
}
+ if ($this->builder instanceof ProducerBuilder) {
+ return $this->builder;
+ }
+
return new ProducerBuilder(
- broker: $broker ?? config('kafka.brokers')
+ broker: $broker ?? config('kafka.brokers'),
+ asyncProducer: true,
);
}
- /** Returns a fresh factory instance. */
- public function fresh(): self
- {
- return new self;
- }
-
- /**
- * Creates a new ProducerBuilder instance, optionally setting the brokers.
- * The producer will be flushed only when the application terminates,
- * and doing SEND does not mean that the message was flushed!
- */
- public function asyncPublish(?string $broker = null): MessageProducer
+ /** Creates a synchronous ProducerBuilder instance for immediate message flushing. */
+ public function publishSync(?string $broker = null): MessageProducer
{
if ($this->shouldFake) {
return Kafka::fake()->publish($broker);
}
- if ($this->builder instanceof ProducerBuilder) {
- return $this->builder;
- }
-
- $this->builder = new ProducerBuilder(
+ return new ProducerBuilder(
broker: $broker ?? config('kafka.brokers'),
- asyncProducer: true
+ asyncProducer: false
);
-
- return $this->builder;
}
- /** This is an alias for the asyncPublish method. */
- public function publishAsync(?string $broker = null): MessageProducer
+ /** Returns a fresh factory instance. */
+ public function fresh(): self
{
- return $this->asyncPublish($broker);
+ return new self;
}
/** Return a ConsumerBuilder instance. */
diff --git a/src/Message/Message.php b/src/Message/Message.php
index ca2c71eb..0c431b9c 100644
--- a/src/Message/Message.php
+++ b/src/Message/Message.php
@@ -8,6 +8,7 @@
use JetBrains\PhpStorm\Pure;
use Junges\Kafka\AbstractMessage;
use Junges\Kafka\Contracts\ProducerMessage;
+use Override;
class Message extends AbstractMessage implements Arrayable, ProducerMessage
{
@@ -81,6 +82,7 @@ public function withHeader(string $key, string|int|float $value): ProducerMessag
return $this;
}
+ #[Override]
public function getHeaders(): ?array
{
// Here we insert an uuid to be used to uniquely identify this message. If the
diff --git a/src/Providers/LaravelKafkaServiceProvider.php b/src/Providers/LaravelKafkaServiceProvider.php
index 08c08738..4a782af8 100644
--- a/src/Providers/LaravelKafkaServiceProvider.php
+++ b/src/Providers/LaravelKafkaServiceProvider.php
@@ -17,6 +17,7 @@
use Junges\Kafka\Message\Deserializers\JsonDeserializer;
use Junges\Kafka\Message\Message;
use Junges\Kafka\Message\Serializers\JsonSerializer;
+use Override;
class LaravelKafkaServiceProvider extends ServiceProvider
{
@@ -32,6 +33,7 @@ public function boot(): void
}
}
+ #[Override]
public function register(): void
{
$this->app->bind(MessageSerializer::class, fn () => new JsonSerializer);
diff --git a/src/Support/InfiniteTimer.php b/src/Support/InfiniteTimer.php
index 8dd75281..849b2e0f 100644
--- a/src/Support/InfiniteTimer.php
+++ b/src/Support/InfiniteTimer.php
@@ -2,8 +2,11 @@
namespace Junges\Kafka\Support;
+use Override;
+
class InfiniteTimer extends Timer
{
+ #[Override]
public function isTimedOut(): bool
{
return false;
diff --git a/src/Support/Testing/Fakes/BuilderFake.php b/src/Support/Testing/Fakes/BuilderFake.php
index 65b79d29..dc202bb2 100644
--- a/src/Support/Testing/Fakes/BuilderFake.php
+++ b/src/Support/Testing/Fakes/BuilderFake.php
@@ -7,6 +7,7 @@
use Junges\Kafka\Consumers\CallableConsumer;
use Junges\Kafka\Contracts\ConsumerBuilder as ConsumerBuilderContract;
use Junges\Kafka\Contracts\MessageConsumer;
+use Override;
class BuilderFake extends Builder implements ConsumerBuilderContract
{
@@ -14,6 +15,7 @@ class BuilderFake extends Builder implements ConsumerBuilderContract
private array $messages = [];
/** {@inheritDoc} */
+ #[Override]
public static function create(?string $brokers, array $topics = [], ?string $groupId = null): self
{
return new self(
@@ -32,6 +34,7 @@ public function setMessages(array $messages): self
}
/** Build the Kafka consumer. */
+ #[Override]
public function build(): MessageConsumer
{
$config = new Config(
diff --git a/src/Support/Testing/Fakes/KafkaFake.php b/src/Support/Testing/Fakes/KafkaFake.php
index e44289ae..faa4550f 100644
--- a/src/Support/Testing/Fakes/KafkaFake.php
+++ b/src/Support/Testing/Fakes/KafkaFake.php
@@ -46,7 +46,7 @@ public function publish(?string $broker = null): ProducerBuilderFake
return $this->makeProducerBuilderFake($broker);
}
- public function asyncPublish(?string $broker = null): ProducerBuilderFake
+ public function publishSync(?string $broker = null): ProducerBuilderFake
{
return $this->publish($broker);
}
diff --git a/tests/Commit/KafkaCommitterTest.php b/tests/Commit/KafkaCommitterTest.php
index 43b162a4..9f00765e 100644
--- a/tests/Commit/KafkaCommitterTest.php
+++ b/tests/Commit/KafkaCommitterTest.php
@@ -24,9 +24,7 @@ public function it_can_commit(): void
->shouldReceive('commit')->once()
->andReturnSelf();
- $this->app->bind(KafkaConsumer::class, function () use ($kafkaConsumer) {
- return $kafkaConsumer->getMock();
- });
+ $this->app->bind(KafkaConsumer::class, fn () => $kafkaConsumer->getMock());
$config = new Config(
broker: 'broker',
@@ -54,9 +52,7 @@ public function it_can_commit_to_dlq(): void
->shouldReceive('commit')->once()
->andReturnSelf();
- $this->app->bind(KafkaConsumer::class, function () use ($kafkaConsumer) {
- return $kafkaConsumer->getMock();
- });
+ $this->app->bind(KafkaConsumer::class, fn () => $kafkaConsumer->getMock());
$config = new Config(
broker: 'broker',
diff --git a/tests/Commit/RetryableCommitterTest.php b/tests/Commit/RetryableCommitterTest.php
index 090c255a..dce1bbe1 100644
--- a/tests/Commit/RetryableCommitterTest.php
+++ b/tests/Commit/RetryableCommitterTest.php
@@ -68,7 +68,7 @@ public function it_should_progressively_wait_for_the_next_retry(): void
try {
$retryableCommitter->commitMessage(new Message, true);
- } catch (RdKafkaException $exception) {
+ } catch (RdKafkaException) {
}
$expectedSleeps = [1e6, 2e6, 4e6, 8e6, 16e6, 32e6];
diff --git a/tests/Commit/SeekToCurrentErrorCommitterTest.php b/tests/Commit/SeekToCurrentErrorCommitterTest.php
index 2341a92c..cbb79529 100644
--- a/tests/Commit/SeekToCurrentErrorCommitterTest.php
+++ b/tests/Commit/SeekToCurrentErrorCommitterTest.php
@@ -60,6 +60,6 @@ public function it_passes_dlq_commits(): void
$seekToCurrentErrorCommitter = new SeekToCurrentErrorCommitter($mockedKafkaConsumer, $mockedCommitter);
- $seekToCurrentErrorCommitter->commitDlq(new Message, true);
+ $seekToCurrentErrorCommitter->commitDlq(new Message);
}
}
diff --git a/tests/Console/Consumers/OptionsTest.php b/tests/Console/Consumers/OptionsTest.php
index 87437c5d..9ffa0e86 100644
--- a/tests/Console/Consumers/OptionsTest.php
+++ b/tests/Console/Consumers/OptionsTest.php
@@ -5,12 +5,14 @@
use Junges\Kafka\Console\Commands\KafkaConsumer\Options;
use Junges\Kafka\Tests\Fakes\FakeHandler;
use Junges\Kafka\Tests\LaravelKafkaTestCase;
+use Override;
use PHPUnit\Framework\Attributes\Test;
class OptionsTest extends LaravelKafkaTestCase
{
private array $config;
+ #[Override]
protected function setUp(): void
{
parent::setUp();
diff --git a/tests/Consumers/ConsumerTest.php b/tests/Consumers/ConsumerTest.php
index 62ec9a40..d089d51a 100644
--- a/tests/Consumers/ConsumerTest.php
+++ b/tests/Consumers/ConsumerTest.php
@@ -2,13 +2,11 @@
namespace Junges\Kafka\Tests\Consumers;
-use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Event;
use Junges\Kafka\Commit\VoidCommitter;
use Junges\Kafka\Config\Config;
use Junges\Kafka\Consumers\CallableConsumer;
use Junges\Kafka\Consumers\Consumer;
-use Junges\Kafka\Consumers\DispatchQueuedHandler;
use Junges\Kafka\Contracts\CommitterFactory;
use Junges\Kafka\Contracts\ConsumerMessage;
use Junges\Kafka\Contracts\Handler;
@@ -105,34 +103,6 @@ public function it_can_consume_messages(): void
Event::assertDispatched(MessageConsumed::class, fn (MessageConsumed $e) => $e->message === $fakeConsumer->getMessage());
}
- #[Test]
- public function it_can_consume_messages_with_queueable_handlers(): void
- {
- Bus::fake();
- $message = new Message;
- $message->err = 0;
- $message->key = 'key';
- $message->topic_name = 'test';
- $message->payload = '{"body": "message payload"}';
- $message->headers = [];
- $message->partition = 1;
- $message->offset = 0;
-
- $this->mockConsumerWithMessage($message);
-
- $this->mockProducer();
-
- $consumer = Kafka::consumer(['test'])
- ->withHandler($fakeConsumer = new SimpleQueueableHandler)
- ->withAutoCommit()
- ->withMaxMessages(1)
- ->build();
-
- $consumer->consume();
-
- Bus::assertDispatched(DispatchQueuedHandler::class);
- }
-
#[Test]
public function consume_message_with_error(): void
{
@@ -589,7 +559,7 @@ public function commitAsync(mixed $messageOrOffsets = null): void
}
};
- $customCommitterFactory = new class($customCommitter) implements CommitterFactory
+ $customCommitterFactory = new readonly class($customCommitter) implements CommitterFactory
{
public function __construct(private \Junges\Kafka\Contracts\Committer $committer) {}
@@ -731,8 +701,6 @@ private function mockConsumerWithMessageAndPartitions(Message $message, array $p
->andReturn($partitions)
->getMock();
- $this->app->bind(KafkaConsumer::class, function () use ($mockedKafkaConsumer) {
- return $mockedKafkaConsumer;
- });
+ $this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
}
}
diff --git a/tests/Consumers/ManualCommitTest.php b/tests/Consumers/ManualCommitTest.php
index 5ff2c89a..d7ecf934 100644
--- a/tests/Consumers/ManualCommitTest.php
+++ b/tests/Consumers/ManualCommitTest.php
@@ -440,7 +440,7 @@ public function it_handles_commit_errors_gracefully(): void
function (ConsumerMessage $message, Consumer $consumer) use (&$exceptionThrown) {
try {
$consumer->commit($message);
- } catch (Throwable $e) {
+ } catch (Throwable) {
$exceptionThrown = true;
}
},
@@ -495,7 +495,7 @@ public function it_ignores_no_offset_commit_errors(): void
function (ConsumerMessage $message, Consumer $consumer) use (&$noExceptionThrown) {
try {
$consumer->commit($message);
- } catch (\RdKafka\Exception $e) {
+ } catch (\RdKafka\Exception) {
$noExceptionThrown = false;
}
},
diff --git a/tests/Consumers/SimpleQueueableHandler.php b/tests/Consumers/SimpleQueueableHandler.php
deleted file mode 100644
index 9e01307c..00000000
--- a/tests/Consumers/SimpleQueueableHandler.php
+++ /dev/null
@@ -1,16 +0,0 @@
-failure = $failure;
- $this->timesToFail = $timesToFail;
- }
+ public function __construct(private readonly Exception $failure, private readonly int $timesToFail) {}
/**
* @throws Exception
diff --git a/tests/KafkaFakeTest.php b/tests/KafkaFakeTest.php
index a680e86c..411b20bc 100644
--- a/tests/KafkaFakeTest.php
+++ b/tests/KafkaFakeTest.php
@@ -11,6 +11,7 @@
use Junges\Kafka\Message\ConsumedMessage;
use Junges\Kafka\Message\Message;
use Junges\Kafka\Support\Testing\Fakes\KafkaFake;
+use Override;
use PHPUnit\Framework\Attributes\Test;
use PHPUnit\Framework\Constraint\ExceptionMessageIsOrContains;
use PHPUnit\Framework\ExpectationFailedException;
@@ -21,6 +22,7 @@ final class KafkaFakeTest extends LaravelKafkaTestCase
private MessageConsumer $consumer;
+ #[Override]
protected function setUp(): void
{
parent::setUp();
@@ -58,7 +60,7 @@ public function it_stores_multiple_messages(): void
public function it_stores_multiple_messages_when_publishing_async(): void
{
for ($i = 0; $i < 3; $i++) {
- $this->fake->asyncPublish()
+ $this->fake->publish()
->onTopic('topic')
->withBody('test')
->send();
@@ -129,18 +131,12 @@ public function it_can_perform_assertions_on_published_messages(): void
$this->fake->assertPublished($producer->getMessage());
- $this->fake->assertPublished($producer->getMessage(), function ($message) use ($uuid) {
- return $message->getKey() === $uuid;
- });
+ $this->fake->assertPublished($producer->getMessage(), fn ($message) => $message->getKey() === $uuid);
- $this->fake->assertPublished($message = $producer->getMessage(), function () use ($message, $uuid) {
- return $message->getKey() === $uuid;
- });
+ $this->fake->assertPublished($message = $producer->getMessage(), fn () => $message->getKey() === $uuid);
try {
- $this->fake->assertPublished($message = $producer->getMessage(), function () use ($message) {
- return $message->getKey() === 'not-published-uuid';
- });
+ $this->fake->assertPublished($message = $producer->getMessage(), fn () => $message->getKey() === 'not-published-uuid');
} catch (ExpectationFailedException $exception) {
$this->assertThat($exception, new ExceptionMessageIsOrContains('The expected message was not published.'));
}
@@ -226,16 +222,12 @@ public function i_can_perform_assertions_using_assert_published_on(): void
$this->fake->assertPublishedOn('topic', $producer->getMessage());
try {
- $this->fake->assertPublishedOn('topic', $producer->getMessage(), function ($message) {
- return $message->getKey() === 'different-key';
- });
+ $this->fake->assertPublishedOn('topic', $producer->getMessage(), fn ($message) => $message->getKey() === 'different-key');
} catch (ExpectationFailedException $exception) {
$this->assertThat($exception, new ExceptionMessageIsOrContains('The expected message was not published.'));
}
- $this->fake->assertPublishedOn('topic', $producer->getMessage(), function ($message) use ($uuid) {
- return $message->getKey() === $uuid;
- });
+ $this->fake->assertPublishedOn('topic', $producer->getMessage(), fn ($message) => $message->getKey() === $uuid);
}
#[Test]
diff --git a/tests/KafkaTest.php b/tests/KafkaTest.php
index f2154e5e..1cfb7d0b 100644
--- a/tests/KafkaTest.php
+++ b/tests/KafkaTest.php
@@ -56,6 +56,57 @@ public function it_can_publish_messages_to_kafka(): void
$this->assertTrue($test);
}
+ #[Test]
+ public function it_can_publish_messages_synchronously(): void
+ {
+ Event::fake();
+
+ $mockedProducerTopic = m::mock(ProducerTopic::class)
+ ->shouldReceive('producev')->twice()
+ ->andReturn(m::self())
+ ->getMock();
+
+ $mockedProducer = m::mock(Producer::class)
+ ->shouldReceive('newTopic')->with('test')->twice()->andReturn($mockedProducerTopic)
+ ->shouldReceive('poll')->twice()
+ ->shouldReceive('flush')->twice()
+ ->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR)
+ ->getMock();
+
+ $this->app->bind(Producer::class, fn () => $mockedProducer);
+
+ $test1 = Kafka::publishSync()
+ ->onTopic('test')
+ ->withConfigOptions([
+ 'metadata.broker.list' => 'broker',
+ ])
+ ->withKafkaKey(Str::uuid()->toString())
+ ->withBodyKey('test', ['test'])
+ ->withHeaders(['custom' => 'header'])
+ ->withDebugEnabled()
+ ->send();
+
+ $test2 = Kafka::publishSync()
+ ->onTopic('test')
+ ->withConfigOptions([
+ 'metadata.broker.list' => 'broker',
+ ])
+ ->withKafkaKey(Str::uuid()->toString())
+ ->withBodyKey('test', ['test'])
+ ->withHeaders(['custom' => 'header'])
+ ->withDebugEnabled()
+ ->send();
+
+ Event::assertDispatched(MessagePublished::class);
+
+ $this->assertTrue($test1);
+ $this->assertTrue($test2);
+
+ Kafka::clearResolvedInstances();
+
+ Event::assertDispatched(MessagePublished::class);
+ }
+
#[Test]
public function it_can_publish_messages_asynchronously(): void
{
@@ -69,13 +120,13 @@ public function it_can_publish_messages_asynchronously(): void
$mockedProducer = m::mock(Producer::class)
->shouldReceive('newTopic')->with('test')->twice()->andReturn($mockedProducerTopic)
->shouldReceive('poll')->twice()
- ->shouldReceive('flush')->once()
+ ->shouldReceive('flush')->atLeast()->once()
->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR)
->getMock();
$this->app->bind(Producer::class, fn () => $mockedProducer);
- $test1 = Kafka::asyncPublish()
+ $test1 = Kafka::publish()
->onTopic('test')
->withConfigOptions([
'metadata.broker.list' => 'broker',
@@ -86,7 +137,7 @@ public function it_can_publish_messages_asynchronously(): void
->withDebugEnabled()
->send();
- $test2 = Kafka::asyncPublish()
+ $test2 = Kafka::publish()
->onTopic('test')
->withConfigOptions([
'metadata.broker.list' => 'broker',
@@ -188,9 +239,7 @@ public function i_can_set_the_entire_message_with_message_object(): void
->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR)
->getMock();
- $this->app->bind(Producer::class, function () use ($mockedProducer) {
- return $mockedProducer;
- });
+ $this->app->bind(Producer::class, fn () => $mockedProducer);
$message = Message::create()
->withHeaders(['foo' => 'bar'])
@@ -241,9 +290,7 @@ public function i_can_disable_debug_using_with_debug_disabled_method(): void
->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR)
->getMock();
- $this->app->bind(Producer::class, function () use ($mockedProducer) {
- return $mockedProducer;
- });
+ $this->app->bind(Producer::class, fn () => $mockedProducer);
/** @var ProducerBuilder $producer */
$producer = Kafka::publish()
@@ -332,11 +379,9 @@ public function producer_throws_exception_if_message_could_not_be_published(): v
Kafka::publish()->onTopic('test')->withBodyKey('foo', 'bar')->send();
- Event::assertDispatched(CouldNotPublishMessageEvent::class, function (CouldNotPublishMessageEvent $event) use ($expectedMessage) {
- return $event->throwable instanceof CouldNotPublishMessage
- && $event->errorCode === RD_KAFKA_RESP_ERR__FAIL
- && $event->message === $expectedMessage;
- });
+ Event::assertDispatched(CouldNotPublishMessageEvent::class, fn (CouldNotPublishMessageEvent $event) => $event->throwable instanceof CouldNotPublishMessage
+ && $event->errorCode === RD_KAFKA_RESP_ERR__FAIL
+ && $event->message === $expectedMessage);
}
#[Test]
@@ -344,13 +389,11 @@ public function macro(): void
{
$sasl = new Sasl(username: 'username', password: 'password', mechanisms: 'mechanisms');
- Kafka::macro('defaultProducer', function () {
- return $this->publish()->withSasl(
- username: 'username',
- password: 'password',
- mechanisms: 'mechanisms',
- );
- });
+ Kafka::macro('defaultProducer', fn () => $this->publish()->withSasl(
+ username: 'username',
+ password: 'password',
+ mechanisms: 'mechanisms',
+ ));
$producer = Kafka::defaultProducer();
@@ -367,9 +410,7 @@ public function it_stores_published_messages_when_using_macros(): void
->onTopic('topic')
->withKey($uuid = Str::uuid()->toString());
- Kafka::macro('testProducer', function () use ($expectedMessage) {
- return $this->publish()->withMessage($expectedMessage);
- });
+ Kafka::macro('testProducer', fn () => $this->publish()->withMessage($expectedMessage));
Kafka::fake();
Kafka::testProducer()->send();
diff --git a/tests/LaravelKafkaTestCase.php b/tests/LaravelKafkaTestCase.php
index c70d0eb8..9fbcadd8 100644
--- a/tests/LaravelKafkaTestCase.php
+++ b/tests/LaravelKafkaTestCase.php
@@ -8,6 +8,7 @@
use Junges\Kafka\Providers\LaravelKafkaServiceProvider;
use Mockery as m;
use Orchestra\Testbench\TestCase as Orchestra;
+use Override;
use RdKafka\Conf;
use RdKafka\KafkaConsumer;
use RdKafka\Message;
@@ -16,6 +17,7 @@
abstract class LaravelKafkaTestCase extends Orchestra
{
+ #[Override]
protected function setUp(): void
{
parent::setUp();
@@ -57,9 +59,7 @@ protected function mockProducer(): void
->shouldReceive('produce')
->andReturn();
- $this->app->bind(Producer::class, function () use ($mockedProducer) {
- return $mockedProducer->getMock();
- });
+ $this->app->bind(Producer::class, fn () => $mockedProducer->getMock());
$this->mockKafkaProducer();
}
@@ -82,9 +82,7 @@ protected function mockKafkaProducer(): void
->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR)
->getMock();
- $this->app->bind(KafkaProducer::class, function () use ($mockedKafkaProducer) {
- return $mockedKafkaProducer;
- });
+ $this->app->bind(KafkaProducer::class, fn () => $mockedKafkaProducer);
}
protected function mockConsumerWithMessageFailingCommit(Message $message): void
@@ -99,9 +97,7 @@ protected function mockConsumerWithMessageFailingCommit(Message $message): void
->never()
->getMock();
- $this->app->bind(KafkaConsumer::class, function () use ($mockedKafkaConsumer) {
- return $mockedKafkaConsumer;
- });
+ $this->app->bind(KafkaConsumer::class, fn () => $mockedKafkaConsumer);
}
protected function mockConsumerWithMessage(Message ...$message): void
@@ -125,7 +121,6 @@ protected function getPropertyWithReflection(string $property, object $object):
{
$reflection = new ReflectionClass($object);
$reflectionProperty = $reflection->getProperty($property);
- $reflectionProperty->setAccessible(true);
return $reflectionProperty->getValue($object);
}
diff --git a/tests/Message/MessageTest.php b/tests/Message/MessageTest.php
index bed221d1..0f20db35 100644
--- a/tests/Message/MessageTest.php
+++ b/tests/Message/MessageTest.php
@@ -5,12 +5,14 @@
use Illuminate\Support\Str;
use Junges\Kafka\Message\Message;
use Junges\Kafka\Tests\LaravelKafkaTestCase;
+use Override;
use PHPUnit\Framework\Attributes\Test;
final class MessageTest extends LaravelKafkaTestCase
{
private Message $message;
+ #[Override]
protected function setUp(): void
{
parent::setUp();
diff --git a/tests/Message/Registry/AvroSchemaRegistryTest.php b/tests/Message/Registry/AvroSchemaRegistryTest.php
index b5ba113e..b2f72c26 100644
--- a/tests/Message/Registry/AvroSchemaRegistryTest.php
+++ b/tests/Message/Registry/AvroSchemaRegistryTest.php
@@ -25,7 +25,6 @@ public function add_body_schema_mapping_for_topic(): void
$registry->addBodySchemaMappingForTopic('test', $schema);
$reflectionProperty = new ReflectionProperty($registry, 'schemaMapping');
- $reflectionProperty->setAccessible(true);
$schemaMapping = $reflectionProperty->getValue($registry);
@@ -46,7 +45,6 @@ public function add_key_schema_mapping_for_topic(): void
$registry->addKeySchemaMappingForTopic('test2', $schema);
$reflectionProperty = new ReflectionProperty($registry, 'schemaMapping');
- $reflectionProperty->setAccessible(true);
$schemaMapping = $reflectionProperty->getValue($registry);