@@ -219,21 +219,22 @@ class R2dbcPostgresPubSub(
219219 }
220220 }
221221
222- private fun listenChannels (channels : Set <String >): Mono <Void > = { connectionRef.get() }.toMono()
223- .flatMap { connection ->
224- channels.toFlux()
225- .concatMap { channel ->
226- connection.createStatement(" LISTEN $channel " ).execute()
227- }
228- .then()
229- }
230- .doOnError { e ->
231- logger.error(
232- " Error during channel listen queries execution for channels: " +
233- channels.joinToString(" , " ),
234- e
235- )
236- }
222+ private fun listenChannels (channels : Set <String >): Mono <Void > =
223+ { connectionRef.get() }.toMono()
224+ .flatMap { connection ->
225+ channels.toFlux()
226+ .concatMap { channel ->
227+ connection.createStatement(" LISTEN $channel " ).execute()
228+ }
229+ .then()
230+ }
231+ .doOnError { e ->
232+ logger.error(
233+ " Error during channel listen queries execution for channels: " +
234+ channels.joinToString(" , " ),
235+ e
236+ )
237+ }
237238
238239 private fun notifyChannels (requests : Collection <NotificationRequest >): Mono <Void > =
239240 getConnection().flatMap { connection ->
@@ -263,8 +264,9 @@ class R2dbcPostgresPubSub(
263264 }
264265 .doOnError { e -> logger.error(" ${requests.size} notification has not been sent" , e) }
265266
266- private fun getConnection (): Mono <PostgresqlConnection > = { connectionRef.get() }.toMono()
267- .switchIfEmpty(NoActiveConnectionException ().toMono())
267+ private fun getConnection (): Mono <PostgresqlConnection > =
268+ { connectionRef.get() }.toMono()
269+ .switchIfEmpty(NoActiveConnectionException ().toMono())
268270
269271 // remove this reflection hack as soon as R2DBC connection interface explicitly provides process id
270272 private fun extractProcessId (connection : PostgresqlConnection ): Int {
0 commit comments