1717 PartitionTransactionCache ,
1818 StorePartition ,
1919)
20- from quixstreams .state .exceptions import ColumnFamilyDoesNotExist
2120from quixstreams .state .metadata import METADATA_CF_NAME , Marker
2221from quixstreams .state .recovery import ChangelogProducer
2322from quixstreams .state .serialization import int_from_bytes , int_to_bytes
2423
25- from .exceptions import ColumnFamilyAlreadyExists , RocksDBCorruptedError
24+ from .exceptions import RocksDBCorruptedError
2625from .metadata import (
2726 CHANGELOG_OFFSET_KEY ,
2827)
@@ -52,8 +51,6 @@ class RocksDBStorePartition(StorePartition):
5251 :param options: RocksDB options. If `None`, the default options will be used.
5352 """
5453
55- additional_column_families : tuple [str , ...] = ()
56-
5754 def __init__ (
5855 self ,
5956 path : str ,
@@ -72,8 +69,6 @@ def __init__(
7269 self ._db = self ._init_rocksdb ()
7370 self ._cf_cache : Dict [str , Rdict ] = {}
7471 self ._cf_handle_cache : Dict [str , ColumnFamily ] = {}
75- for cf_name in self .additional_column_families :
76- self ._ensure_column_family (cf_name )
7772
7873 def recover_from_changelog_message (
7974 self , key : bytes , value : Optional [bytes ], cf_name : str , offset : int
@@ -149,7 +144,9 @@ def get(
149144 :param cf_name: rocksdb column family name. Default - "default"
150145 :return: a value if the key is present in the DB. Otherwise, `default`
151146 """
152- result = self .get_column_family (cf_name ).get (key , default = Marker .UNDEFINED )
147+ result = self .get_or_create_column_family (cf_name ).get (
148+ key , default = Marker .UNDEFINED
149+ )
153150
154151 # RDict accept Any type as value but we only write bytes so we should only get bytes back.
155152 return cast (Union [bytes , Literal [Marker .UNDEFINED ]], result )
@@ -172,7 +169,7 @@ def iter_items(
172169 Default is "default".
173170 :return: An iterator yielding (key, value) tuples.
174171 """
175- cf = self .get_column_family (cf_name = cf_name )
172+ cf = self .get_or_create_column_family (cf_name = cf_name )
176173
177174 # Set iterator bounds to reduce IO by limiting the range of keys fetched
178175 read_opt = ReadOptions ()
@@ -219,15 +216,15 @@ def exists(self, key: bytes, cf_name: str = "default") -> bool:
219216 :param cf_name: rocksdb column family name. Default - "default"
220217 :return: `True` if the key is present, `False` otherwise.
221218 """
222- cf_dict = self .get_column_family (cf_name )
219+ cf_dict = self .get_or_create_column_family (cf_name )
223220 return key in cf_dict
224221
225222 def get_changelog_offset (self ) -> Optional [int ]:
226223 """
227224 Get offset that the changelog is up-to-date with.
228225 :return: offset or `None` if there's no processed offset yet
229226 """
230- metadata_cf = self .get_column_family (METADATA_CF_NAME )
227+ metadata_cf = self .get_or_create_column_family (METADATA_CF_NAME )
231228 offset_bytes = metadata_cf .get (CHANGELOG_OFFSET_KEY )
232229 if offset_bytes is None :
233230 return None
@@ -288,18 +285,12 @@ def get_column_family_handle(self, cf_name: str) -> ColumnFamily:
288285 :return: instance of `rocksdict.ColumnFamily`
289286 """
290287 if (cf_handle := self ._cf_handle_cache .get (cf_name )) is None :
291- try :
292- cf_handle = self ._db .get_column_family_handle (cf_name )
293- self ._cf_handle_cache [cf_name ] = cf_handle
294- except Exception as exc :
295- if "does not exist" in str (exc ):
296- raise ColumnFamilyDoesNotExist (
297- f'Column family "{ cf_name } " does not exist'
298- )
299- raise
288+ self .get_or_create_column_family (cf_name )
289+ cf_handle = self ._db .get_column_family_handle (cf_name )
290+ self ._cf_handle_cache [cf_name ] = cf_handle
300291 return cf_handle
301292
302- def get_column_family (self , cf_name : str ) -> Rdict :
293+ def get_or_create_column_family (self , cf_name : str ) -> Rdict :
303294 """
304295 Get a column family instance.
305296 This method will cache the CF instance to avoid creating them repeatedly.
@@ -310,38 +301,14 @@ def get_column_family(self, cf_name: str) -> Rdict:
310301 if (cf := self ._cf_cache .get (cf_name )) is None :
311302 try :
312303 cf = self ._db .get_column_family (cf_name )
313- self ._cf_cache [cf_name ] = cf
314304 except Exception as exc :
315- if "does not exist" in str (exc ):
316- raise ColumnFamilyDoesNotExist (
317- f'Column family "{ cf_name } " does not exist'
318- )
319- raise
320- return cf
321-
322- def create_column_family (self , cf_name : str ):
323- try :
324- cf = self ._db .create_column_family (cf_name , options = self ._rocksdb_options )
325- except Exception as exc :
326- if "column family already exists" in str (exc ).lower ():
327- raise ColumnFamilyAlreadyExists (
328- f'Column family already exists: "{ cf_name } "'
329- )
330- raise
331-
332- self ._cf_cache [cf_name ] = cf
333-
334- def drop_column_family (self , cf_name : str ):
335- self ._cf_cache .pop (cf_name , None )
336- self ._cf_handle_cache .pop (cf_name , None )
337- try :
338- self ._db .drop_column_family (cf_name )
339- except Exception as exc :
340- if "invalid column family:" in str (exc ).lower ():
341- raise ColumnFamilyDoesNotExist (
342- f'Column family does not exist: "{ cf_name } "'
305+ if "does not exist" not in str (exc ):
306+ raise
307+ cf = self ._db .create_column_family (
308+ cf_name , options = self ._rocksdb_options
343309 )
344- raise
310+ self ._cf_cache [cf_name ] = cf
311+ return cf
345312
346313 def list_column_families (self ) -> List [str ]:
347314 return self ._db .list_cf (self ._path )
@@ -427,9 +394,3 @@ def _update_changelog_offset(self, batch: WriteBatch, offset: int):
427394 int_to_bytes (offset ),
428395 self .get_column_family_handle (METADATA_CF_NAME ),
429396 )
430-
431- def _ensure_column_family (self , cf_name : str ) -> None :
432- try :
433- self .get_column_family (cf_name )
434- except ColumnFamilyDoesNotExist :
435- self .create_column_family (cf_name )
0 commit comments