@@ -2,88 +2,147 @@ package io.github.jan.supabase.realtime
22
33import io.github.jan.supabase.SupabaseSerializer
44import io.github.jan.supabase.annotations.SupabaseInternal
5- import io.github.jan.supabase.collections.AtomicMutableList
65import io.github.jan.supabase.serializer.KotlinXSerializer
6+ import kotlinx.collections.immutable.PersistentList
7+ import kotlinx.collections.immutable.PersistentMap
8+ import kotlinx.collections.immutable.persistentHashMapOf
9+ import kotlinx.collections.immutable.persistentListOf
10+ import kotlinx.collections.immutable.plus
711import kotlinx.serialization.json.JsonObject
812import kotlin.concurrent.atomics.AtomicInt
913import kotlin.concurrent.atomics.AtomicReference
1014import kotlin.concurrent.atomics.fetchAndIncrement
15+ import kotlin.concurrent.atomics.update
1116
1217@SupabaseInternal
13- sealed interface CallbackManager {
18+ sealed class RealtimeCallbackId (val value : Int ) {
19+
20+ class Postgres (value : Int ) : RealtimeCallbackId(value)
21+
22+ class Presence (value : Int ) : RealtimeCallbackId(value)
23+
24+ class Broadcast (value : Int ) : RealtimeCallbackId(value)
25+
26+ }
27+
28+ @SupabaseInternal
29+ interface CallbackManager {
1430
1531 fun triggerPostgresChange (ids : List <Int >, data : PostgresAction )
1632
1733 fun triggerBroadcast (event : String , data : JsonObject )
1834
1935 fun triggerPresenceDiff (joins : Map <String , Presence >, leaves : Map <String , Presence >)
2036
21- fun addBroadcastCallback ( event : String , callback : ( JsonObject ) -> Unit ): Int
37+ fun hasPresenceCallback (): Boolean
2238
23- fun addPostgresCallback ( filter : PostgresJoinConfig , callback : (PostgresAction ) -> Unit ): Int
39+ fun addBroadcastCallback ( event : String , callback : (JsonObject ) -> Unit ): RealtimeCallbackId . Broadcast
2440
25- fun addPresenceCallback ( callback : (PresenceAction ) -> Unit ): Int
41+ fun addPostgresCallback ( filter : PostgresJoinConfig , callback : (PostgresAction ) -> Unit ): RealtimeCallbackId . Postgres
2642
27- fun removeCallbackById ( id : Int )
43+ fun addPresenceCallback ( callback : ( PresenceAction ) -> Unit ): RealtimeCallbackId . Presence
2844
29- fun setServerChanges ( changes : List < PostgresJoinConfig > )
45+ fun removeCallbackById ( id : RealtimeCallbackId )
3046
31- fun getCallbacks () : List <RealtimeCallback < * >>
47+ fun setServerChanges ( changes : List <PostgresJoinConfig >)
3248
3349}
3450
51+ private typealias BroadcastMap = PersistentMap <String , PersistentList <RealtimeCallback .BroadcastCallback >>
52+ private typealias PresenceMap = PersistentMap <Int , RealtimeCallback .PresenceCallback >
53+ private typealias PostgresMap = PersistentMap <Int , RealtimeCallback .PostgresCallback >
54+
3555internal class CallbackManagerImpl (
3656 private val serializer : SupabaseSerializer = KotlinXSerializer ()
3757) : CallbackManager {
3858
3959 private val nextId = AtomicInt (0 )
4060 private val _serverChanges = AtomicReference (listOf<PostgresJoinConfig >())
4161 val serverChanges: List <PostgresJoinConfig > get() = _serverChanges .load()
42- private val callbacks = AtomicMutableList <RealtimeCallback <* >>()
4362
44- override fun getCallbacks (): List <RealtimeCallback <* >> {
45- return callbacks.toList()
46- }
63+ private val presenceCallbacks = AtomicReference <PresenceMap >(persistentHashMapOf())
4764
48- override fun addBroadcastCallback (event : String , callback : (JsonObject ) -> Unit ): Int {
65+ private val broadcastCallbacks = AtomicReference <BroadcastMap >(persistentHashMapOf())
66+ // Additional map to know from which list a callback may be removed in broadcastCallbacks without searching through the whole map
67+ private val broadcastEventId = AtomicReference <PersistentMap <Int , String >>(persistentHashMapOf())
68+
69+ private val postgresCallbacks = AtomicReference <PostgresMap >(persistentHashMapOf())
70+
71+ override fun addBroadcastCallback (event : String , callback : (JsonObject ) -> Unit ): RealtimeCallbackId .Broadcast {
4972 val id = nextId.fetchAndIncrement()
50- callbacks + = RealtimeCallback .BroadcastCallback (callback, event, id)
51- return id
73+ broadcastCallbacks.update {
74+ val current = it[event] ? : persistentListOf()
75+ it.put(event, current + RealtimeCallback .BroadcastCallback (callback, event, id))
76+ }
77+ broadcastEventId.update {
78+ it.put(id, event)
79+ }
80+ return RealtimeCallbackId .Broadcast (id)
5281 }
5382
54- override fun addPostgresCallback (filter : PostgresJoinConfig , callback : (PostgresAction ) -> Unit ): Int {
83+ override fun addPostgresCallback (filter : PostgresJoinConfig , callback : (PostgresAction ) -> Unit ): RealtimeCallbackId . Postgres {
5584 val id = nextId.fetchAndIncrement()
56- callbacks + = RealtimeCallback .PostgresCallback (callback, filter, id)
57- return id
85+ postgresCallbacks.update {
86+ it.put(id, RealtimeCallback .PostgresCallback (callback, filter, id))
87+ }
88+ return RealtimeCallbackId .Postgres (id)
5889 }
5990
6091 override fun triggerPostgresChange (ids : List <Int >, data : PostgresAction ) {
6192 val filter = serverChanges.filter { it.id in ids }
62- val postgresCallbacks = callbacks.filterIsInstance<RealtimeCallback .PostgresCallback >()
6393 val callbacks =
64- postgresCallbacks.filter { cc -> filter.any { sc -> cc.filter == sc } }
94+ postgresCallbacks.load().values. filter { cc -> filter.any { sc -> cc.filter == sc } }
6595 callbacks.forEach { it.callback(data) }
6696 }
6797
6898 override fun triggerBroadcast (event : String , data : JsonObject ) {
69- val broadcastCallbacks = callbacks.filterIsInstance<RealtimeCallback .BroadcastCallback >()
70- val callbacks = broadcastCallbacks.filter { it.event == event }
71- callbacks.forEach { it.callback(data) }
99+ broadcastCallbacks.load()[event]?.forEach { it.callback(data) }
72100 }
73101
74102 override fun triggerPresenceDiff (joins : Map <String , Presence >, leaves : Map <String , Presence >) {
75- val presenceCallbacks = callbacks.filterIsInstance<RealtimeCallback .PresenceCallback >()
76- presenceCallbacks.forEach { it.callback(PresenceActionImpl (serializer, joins, leaves)) }
103+ presenceCallbacks.load().values.forEach { it.callback(PresenceActionImpl (serializer, joins, leaves)) }
77104 }
78105
79- override fun addPresenceCallback (callback : (PresenceAction ) -> Unit ): Int {
106+ override fun hasPresenceCallback (): Boolean {
107+ return presenceCallbacks.load().isNotEmpty()
108+ }
109+
110+ override fun addPresenceCallback (callback : (PresenceAction ) -> Unit ): RealtimeCallbackId .Presence {
80111 val id = nextId.fetchAndIncrement()
81- callbacks + = RealtimeCallback .PresenceCallback (callback, id)
82- return id
112+ presenceCallbacks.update {
113+ it.put(id, RealtimeCallback .PresenceCallback (callback, id))
114+ }
115+ return RealtimeCallbackId .Presence (id)
116+ }
117+
118+ fun removeBroadcastCallbackById (id : Int ) {
119+ val event = broadcastEventId.load()[id] ? : return
120+ broadcastCallbacks.update {
121+ it.put(event, it[event]?.removeAll { c -> c.id == id } ? : persistentListOf())
122+ }
123+ broadcastEventId.update {
124+ it.remove(id)
125+ }
126+ }
127+
128+ fun removePresenceCallbackById (id : Int ) {
129+ presenceCallbacks.update {
130+ it.remove(id)
131+ }
132+ }
133+
134+ fun removePostgresCallbackById (id : Int ) {
135+ postgresCallbacks.update {
136+ it.remove(id)
137+ }
83138 }
84139
85- override fun removeCallbackById (id : Int ) {
86- callbacks.indexOfFirst { it.id == id }.takeIf { it != - 1 }?.let { callbacks.removeAt(it) }
140+ override fun removeCallbackById (id : RealtimeCallbackId ) {
141+ when (id) {
142+ is RealtimeCallbackId .Broadcast -> removeBroadcastCallbackById(id.value)
143+ is RealtimeCallbackId .Presence -> removePresenceCallbackById(id.value)
144+ is RealtimeCallbackId .Postgres -> removePostgresCallbackById(id.value)
145+ }
87146 }
88147
89148 override fun setServerChanges (changes : List <PostgresJoinConfig >) {
0 commit comments