@@ -2,88 +2,145 @@ package io.github.jan.supabase.realtime
22
33import io.github.jan.supabase.SupabaseSerializer
44import io.github.jan.supabase.annotations.SupabaseInternal
5- import io.github.jan.supabase.collections.AtomicMutableList
65import io.github.jan.supabase.serializer.KotlinXSerializer
6+ import kotlinx.collections.immutable.PersistentList
7+ import kotlinx.collections.immutable.PersistentMap
8+ import kotlinx.collections.immutable.persistentHashMapOf
9+ import kotlinx.collections.immutable.persistentListOf
10+ import kotlinx.collections.immutable.plus
711import kotlinx.serialization.json.JsonObject
812import kotlin.concurrent.atomics.AtomicInt
913import kotlin.concurrent.atomics.AtomicReference
1014import kotlin.concurrent.atomics.fetchAndIncrement
15+ import kotlin.concurrent.atomics.update
1116
1217@SupabaseInternal
13- sealed interface CallbackManager {
18+ sealed class RealtimeCallbackId (val id : Int ) {
19+
20+ class Postgres (id : Int ) : RealtimeCallbackId(id)
21+
22+ class Presence (id : Int ) : RealtimeCallbackId(id)
23+
24+ class Broadcast (id : Int ) : RealtimeCallbackId(id)
25+
26+ }
27+
28+ @SupabaseInternal
29+ interface CallbackManager {
1430
1531 fun triggerPostgresChange (ids : List <Int >, data : PostgresAction )
1632
1733 fun triggerBroadcast (event : String , data : JsonObject )
1834
1935 fun triggerPresenceDiff (joins : Map <String , Presence >, leaves : Map <String , Presence >)
2036
21- fun addBroadcastCallback ( event : String , callback : ( JsonObject ) -> Unit ): Int
37+ fun hasPresenceCallback (): Boolean
2238
23- fun addPostgresCallback ( filter : PostgresJoinConfig , callback : (PostgresAction ) -> Unit ): Int
39+ fun addBroadcastCallback ( event : String , callback : (JsonObject ) -> Unit ): RealtimeCallbackId . Broadcast
2440
25- fun addPresenceCallback ( callback : (PresenceAction ) -> Unit ): Int
41+ fun addPostgresCallback ( filter : PostgresJoinConfig , callback : (PostgresAction ) -> Unit ): RealtimeCallbackId . Postgres
2642
27- fun removeCallbackById ( id : Int )
43+ fun addPresenceCallback ( callback : ( PresenceAction ) -> Unit ): RealtimeCallbackId . Presence
2844
29- fun setServerChanges ( changes : List < PostgresJoinConfig > )
45+ fun removeCallbackById ( id : RealtimeCallbackId )
3046
31- fun getCallbacks () : List <RealtimeCallback < * >>
47+ fun setServerChanges ( changes : List <PostgresJoinConfig >)
3248
3349}
3450
35- internal class CallbackManagerImpl (
51+ typealias BroadcastMap = PersistentMap <String , PersistentList <RealtimeCallback .BroadcastCallback >>
52+
53+
54+ class CallbackManagerImpl (
3655 private val serializer : SupabaseSerializer = KotlinXSerializer ()
3756) : CallbackManager {
3857
3958 private val nextId = AtomicInt (0 )
4059 private val _serverChanges = AtomicReference (listOf<PostgresJoinConfig >())
4160 val serverChanges: List <PostgresJoinConfig > get() = _serverChanges .load()
42- private val callbacks = AtomicMutableList <RealtimeCallback <* >>()
4361
44- override fun getCallbacks (): List <RealtimeCallback <* >> {
45- return callbacks.toList()
46- }
62+ private val presenceCallbacks = AtomicReference <PersistentMap <Int , RealtimeCallback .PresenceCallback >>(persistentHashMapOf())
4763
48- override fun addBroadcastCallback (event : String , callback : (JsonObject ) -> Unit ): Int {
64+ private val broadcastCallbacks = AtomicReference <BroadcastMap >(persistentHashMapOf())
65+ private val broadcastEventId = AtomicReference <PersistentMap <Int , String >>(persistentHashMapOf())
66+
67+ private val postgresCallbacks = AtomicReference <PersistentMap <Int , RealtimeCallback .PostgresCallback >>(persistentHashMapOf())
68+
69+ override fun addBroadcastCallback (event : String , callback : (JsonObject ) -> Unit ): RealtimeCallbackId .Broadcast {
4970 val id = nextId.fetchAndIncrement()
50- callbacks + = RealtimeCallback .BroadcastCallback (callback, event, id)
51- return id
71+ broadcastCallbacks.update {
72+ val current = it[event] ? : persistentListOf()
73+ it.put(event, current + RealtimeCallback .BroadcastCallback (callback, event, id))
74+ }
75+ broadcastEventId.update {
76+ it.put(id, event)
77+ }
78+ return RealtimeCallbackId .Broadcast (id)
5279 }
5380
54- override fun addPostgresCallback (filter : PostgresJoinConfig , callback : (PostgresAction ) -> Unit ): Int {
81+ override fun addPostgresCallback (filter : PostgresJoinConfig , callback : (PostgresAction ) -> Unit ): RealtimeCallbackId . Postgres {
5582 val id = nextId.fetchAndIncrement()
56- callbacks + = RealtimeCallback .PostgresCallback (callback, filter, id)
57- return id
83+ postgresCallbacks.update {
84+ it.put(id, RealtimeCallback .PostgresCallback (callback, filter, id))
85+ }
86+ return RealtimeCallbackId .Postgres (id)
5887 }
5988
6089 override fun triggerPostgresChange (ids : List <Int >, data : PostgresAction ) {
6190 val filter = serverChanges.filter { it.id in ids }
62- val postgresCallbacks = callbacks.filterIsInstance<RealtimeCallback .PostgresCallback >()
6391 val callbacks =
64- postgresCallbacks.filter { cc -> filter.any { sc -> cc.filter == sc } }
92+ postgresCallbacks.load().values. filter { cc -> filter.any { sc -> cc.filter == sc } }
6593 callbacks.forEach { it.callback(data) }
6694 }
6795
6896 override fun triggerBroadcast (event : String , data : JsonObject ) {
69- val broadcastCallbacks = callbacks.filterIsInstance<RealtimeCallback .BroadcastCallback >()
70- val callbacks = broadcastCallbacks.filter { it.event == event }
71- callbacks.forEach { it.callback(data) }
97+ broadcastCallbacks.load()[event]?.forEach { it.callback(data) }
7298 }
7399
74100 override fun triggerPresenceDiff (joins : Map <String , Presence >, leaves : Map <String , Presence >) {
75- val presenceCallbacks = callbacks.filterIsInstance<RealtimeCallback .PresenceCallback >()
76- presenceCallbacks.forEach { it.callback(PresenceActionImpl (serializer, joins, leaves)) }
101+ presenceCallbacks.load().values.forEach { it.callback(PresenceActionImpl (serializer, joins, leaves)) }
77102 }
78103
79- override fun addPresenceCallback (callback : (PresenceAction ) -> Unit ): Int {
104+ override fun hasPresenceCallback (): Boolean {
105+ return presenceCallbacks.load().isNotEmpty()
106+ }
107+
108+ override fun addPresenceCallback (callback : (PresenceAction ) -> Unit ): RealtimeCallbackId .Presence {
80109 val id = nextId.fetchAndIncrement()
81- callbacks + = RealtimeCallback .PresenceCallback (callback, id)
82- return id
110+ presenceCallbacks.update {
111+ it.put(id, RealtimeCallback .PresenceCallback (callback, id))
112+ }
113+ return RealtimeCallbackId .Presence (id)
114+ }
115+
116+ fun removeBroadcastCallbackById (id : Int ) {
117+ val event = broadcastEventId.load()[id] ? : return
118+ broadcastCallbacks.update {
119+ it.put(event, it[event]?.removeAll { c -> c.id == id } ? : persistentListOf())
120+ }
121+ broadcastEventId.update {
122+ it.remove(id)
123+ }
124+ }
125+
126+ fun removePresenceCallbackById (id : Int ) {
127+ presenceCallbacks.update {
128+ it.remove(id)
129+ }
130+ }
131+
132+ fun removePostgresCallbackById (id : Int ) {
133+ postgresCallbacks.update {
134+ it.remove(id)
135+ }
83136 }
84137
85- override fun removeCallbackById (id : Int ) {
86- callbacks.indexOfFirst { it.id == id }.takeIf { it != - 1 }?.let { callbacks.removeAt(it) }
138+ override fun removeCallbackById (id : RealtimeCallbackId ) {
139+ when (id) {
140+ is RealtimeCallbackId .Broadcast -> removeBroadcastCallbackById(id.id)
141+ is RealtimeCallbackId .Presence -> removePresenceCallbackById(id.id)
142+ is RealtimeCallbackId .Postgres -> removePostgresCallbackById(id.id)
143+ }
87144 }
88145
89146 override fun setServerChanges (changes : List <PostgresJoinConfig >) {
0 commit comments