@@ -219,22 +219,21 @@ class R2dbcPostgresPubSub(
219219 }
220220 }
221221
222- private fun listenChannels (channels : Set <String >): Mono <Void > =
223- { connectionRef.get() }.toMono()
224- .flatMap { connection ->
225- channels.toFlux()
226- .concatMap { channel ->
227- connection.createStatement(" LISTEN $channel " ).execute()
228- }
229- .then()
230- }
231- .doOnError { e ->
232- logger.error(
233- " Error during channel listen queries execution for channels: " +
234- channels.joinToString(" , " ),
235- e
236- )
237- }
222+ private fun listenChannels (channels : Set <String >): Mono <Void > = { connectionRef.get() }.toMono()
223+ .flatMap { connection ->
224+ channels.toFlux()
225+ .concatMap { channel ->
226+ connection.createStatement(" LISTEN $channel " ).execute()
227+ }
228+ .then()
229+ }
230+ .doOnError { e ->
231+ logger.error(
232+ " Error during channel listen queries execution for channels: " +
233+ channels.joinToString(" , " ),
234+ e
235+ )
236+ }
238237
239238 private fun notifyChannels (requests : Collection <NotificationRequest >): Mono <Void > =
240239 getConnection().flatMap { connection ->
@@ -264,9 +263,8 @@ class R2dbcPostgresPubSub(
264263 }
265264 .doOnError { e -> logger.error(" ${requests.size} notification has not been sent" , e) }
266265
267- private fun getConnection (): Mono <PostgresqlConnection > =
268- { connectionRef.get() }.toMono()
269- .switchIfEmpty(NoActiveConnectionException ().toMono())
266+ private fun getConnection (): Mono <PostgresqlConnection > = { connectionRef.get() }.toMono()
267+ .switchIfEmpty(NoActiveConnectionException ().toMono())
270268
271269 // remove this reflection hack as soon as R2DBC connection interface explicitly provides process id
272270 private fun extractProcessId (connection : PostgresqlConnection ): Int {
0 commit comments