Skip to content

Commit a41d45d

Browse files
committed
fix(compression): read one command at a time from socket
- fix chunked rowset and decompression, chunk are compressed one by one - chunks are read from socket to complete the message - json is returned as JSON Object
1 parent 5e4e4fb commit a41d45d

File tree

3 files changed

+217
-112
lines changed

3 files changed

+217
-112
lines changed

src/sqlitecloud/driver.py

Lines changed: 120 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -262,15 +262,15 @@ def _internal_pubsub_thread(self, connection: SQLiteCloudConnect) -> None:
262262
blen -= nread
263263
buffer += data
264264

265-
SQLiteCloud_number = self._internal_parse_number(buffer)
266-
clen = SQLiteCloud_number.value
265+
sqlitecloud_number = self._internal_parse_number(buffer)
266+
clen = sqlitecloud_number.value
267267
if clen == 0:
268268
continue
269269

270270
# check if read is complete
271271
# clen is the lenght parsed in the buffer
272272
# cstart is the index of the first space
273-
cstart = SQLiteCloud_number.cstart
273+
cstart = sqlitecloud_number.cstart
274274
if clen + cstart != tread:
275275
continue
276276

@@ -532,61 +532,80 @@ def _internal_socket_read(
532532
slicing the buffer into parts if there are special characters like "ò".
533533
"""
534534
buffer = b""
535-
buffer_size = 8192
535+
command_type = ""
536+
command_length_value = b""
536537
nread = 0
537538

538539
sock = connection.socket if main_socket else connection.pubsub_socket
539540

541+
# read the lenght of the command, eg:
542+
# ?LEN <command>, where `?` is any command type
543+
# _ for null command
544+
# :145 for integer command with value 145
540545
while True:
541546
try:
542-
data = sock.recv(buffer_size)
547+
data = sock.recv(1)
543548
if not data:
544-
raise SQLiteCloudException("Incomplete response from server.")
549+
raise SQLiteCloudException(
550+
"Incomplete response from server. Cannot read the command length."
551+
)
545552
except Exception as exc:
546553
raise SQLiteCloudException(
547-
"An error occurred while reading data from the socket.",
554+
"An error occurred while reading command length from the socket.",
548555
SQLITECLOUD_INTERNAL_ERRCODE.NETWORK,
549556
) from exc
550557

551-
# the expected data length to read
552-
# matches the string size before decoding it
553558
nread += len(data)
554-
# update buffers
555559
buffer += data
556560

557-
c = chr(buffer[0])
561+
# first character is the type of the message
562+
if nread == 1:
563+
command_type = data.decode()
564+
continue
558565

559-
if (
560-
c == SQLITECLOUD_CMD.INT.value
561-
or c == SQLITECLOUD_CMD.FLOAT.value
562-
or c == SQLITECLOUD_CMD.NULL.value
563-
):
564-
if not buffer.endswith(b" "):
565-
continue
566-
elif c == SQLITECLOUD_CMD.ROWSET_CHUNK.value:
567-
isEndOfChunk = buffer.endswith(SQLITECLOUD_ROWSET.CHUNKS_END.value)
568-
if not isEndOfChunk:
569-
continue
570-
else:
571-
SQLiteCloud_number = self._internal_parse_number(buffer)
572-
n = SQLiteCloud_number.value
573-
cstart = SQLiteCloud_number.cstart
566+
# end of len value
567+
if data == b" ":
568+
break
574569

575-
can_be_zerolength = (
576-
c == SQLITECLOUD_CMD.BLOB.value or c == SQLITECLOUD_CMD.STRING.value
577-
)
578-
if n == 0 and not can_be_zerolength:
579-
continue
580-
if n + cstart != nread:
581-
continue
570+
command_length_value += data
582571

572+
if (
573+
command_type == SQLITECLOUD_CMD.INT.value
574+
or command_type == SQLITECLOUD_CMD.FLOAT.value
575+
or command_type == SQLITECLOUD_CMD.NULL.value
576+
):
583577
return self._internal_parse_buffer(connection, buffer, len(buffer))
584578

579+
command_length = int(command_length_value)
580+
581+
# read the command
582+
nread = 0
583+
584+
while nread < command_length:
585+
buffer_size = min(command_length - nread, 8192)
586+
587+
try:
588+
data = sock.recv(buffer_size)
589+
if not data:
590+
raise SQLiteCloudException(
591+
"Incomplete response from server. Cannot read the command."
592+
)
593+
except Exception as exc:
594+
raise SQLiteCloudException(
595+
"An error occurred while reading the command from the socket.",
596+
SQLITECLOUD_INTERNAL_ERRCODE.NETWORK,
597+
) from exc
598+
599+
nread += len(data)
600+
buffer += data
601+
602+
return self._internal_parse_buffer(connection, buffer, len(buffer))
603+
585604
def _internal_parse_number(
586605
self, buffer: bytes, index: int = 1
587606
) -> SQLiteCloudNumber:
588-
SQLiteCloud_number = SQLiteCloudNumber()
589-
SQLiteCloud_number.value = 0
607+
sqlitecloud_number = SQLiteCloudNumber()
608+
sqlitecloud_number.value = 0
590609
extvalue = 0
591610
isext = False
592611
blen = len(buffer)
@@ -602,20 +621,20 @@ def _internal_parse_number(
602621

603622
# check for end of value
604623
if c == " ":
605-
SQLiteCloud_number.cstart = i + 1
606-
SQLiteCloud_number.extcode = extvalue
607-
return SQLiteCloud_number
624+
sqlitecloud_number.cstart = i + 1
625+
sqlitecloud_number.extcode = extvalue
626+
return sqlitecloud_number
608627

609628
val = int(c) if c.isdigit() else 0
610629

611630
# compute numeric value
612631
if isext:
613632
extvalue = (extvalue * 10) + val
614633
else:
615-
SQLiteCloud_number.value = (SQLiteCloud_number.value * 10) + val
634+
sqlitecloud_number.value = (sqlitecloud_number.value * 10) + val
616635

617-
SQLiteCloud_number.value = 0
618-
return SQLiteCloud_number
636+
sqlitecloud_number.value = 0
637+
return sqlitecloud_number
619638

620639
def _internal_parse_buffer(
621640
self, connection: SQLiteCloudConnect, buffer: bytes, blen: int
@@ -665,11 +684,7 @@ def _internal_parse_buffer(
665684
if len_ == 0:
666685
return SQLiteCloudResult(SQLITECLOUD_RESULT_TYPE.RESULT_STRING, "")
667686

668-
tag = (
669-
SQLITECLOUD_RESULT_TYPE.RESULT_JSON
670-
if cmd == SQLITECLOUD_CMD.JSON.value
671-
else SQLITECLOUD_RESULT_TYPE.RESULT_STRING
672-
)
687+
tag = SQLITECLOUD_RESULT_TYPE.RESULT_STRING
673688

674689
if cmd == SQLITECLOUD_CMD.ZEROSTRING.value:
675690
len_ -= 1
@@ -693,6 +708,10 @@ def _internal_parse_buffer(
693708
)
694709
elif cmd == SQLITECLOUD_CMD.BLOB.value:
695710
tag = SQLITECLOUD_RESULT_TYPE.RESULT_BLOB
711+
elif cmd == SQLITECLOUD_CMD.JSON.value:
712+
return SQLiteCloudResult(
713+
SQLITECLOUD_RESULT_TYPE.RESULT_JSON, json.loads(clone)
714+
)
696715

697716
clone = clone.decode() if cmd != SQLITECLOUD_CMD.BLOB.value else clone
698717
return SQLiteCloudResult(tag, clone)
@@ -740,20 +759,19 @@ def _internal_parse_buffer(
740759
rowset_signature.ncols,
741760
)
742761

743-
# continue parsing next chunk in the buffer
744-
sign_len = rowset_signature.len
745-
buffer = buffer[sign_len + len(f"/{sign_len} ") :]
746-
if cmd == SQLITECLOUD_CMD.ROWSET_CHUNK.value and buffer:
747-
return self._internal_parse_buffer(connection, buffer, len(buffer))
762+
# continue reading from the socket
763+
# until the end-of-chunk condition
764+
if cmd == SQLITECLOUD_CMD.ROWSET_CHUNK.value:
765+
return self._internal_socket_read(connection)
748766

749767
return rowset
750768

751769
elif cmd == SQLITECLOUD_CMD.NULL.value:
752770
return SQLiteCloudResult(SQLITECLOUD_RESULT_TYPE.RESULT_NONE, None)
753771

754772
elif cmd in [SQLITECLOUD_CMD.INT.value, SQLITECLOUD_CMD.FLOAT.value]:
755-
SQLiteCloud_value = self._internal_parse_value(buffer)
756-
clone = SQLiteCloud_value.value
773+
sqlitecloud_value = self._internal_parse_value(buffer)
774+
clone = sqlitecloud_value.value
757775

758776
tag = (
759777
SQLITECLOUD_RESULT_TYPE.RESULT_INTEGER
@@ -784,6 +802,10 @@ def _internal_uncompress_data(self, buffer: bytes) -> Optional[bytes]:
784802
Returns:
785803
str: The uncompressed data.
786804
"""
805+
# buffer may contain a sequence of compressed data
806+
# eg, a compressed rowset split in chunks is a sequence of rowset chunks
807+
# compressed individually, each one with its compressed header,
808+
# rowset header and compressed data
787809
space_index = buffer.index(b" ")
788810
buffer = buffer[space_index + 1 :]
789811

@@ -821,14 +843,14 @@ def _internal_parse_array(self, buffer: bytes) -> list:
821843

822844
r: str = []
823845
for i in range(n):
824-
SQLiteCloud_value = self._internal_parse_value(buffer, start)
825-
start += SQLiteCloud_value.cellsize
826-
r.append(SQLiteCloud_value.value)
846+
sqlitecloud_value = self._internal_parse_value(buffer, start)
847+
start += sqlitecloud_value.cellsize
848+
r.append(sqlitecloud_value.value)
827849

828850
return r
829851

830852
def _internal_parse_value(self, buffer: bytes, index: int = 0) -> SQLiteCloudValue:
831-
SQLiteCloud_value = SQLiteCloudValue()
853+
sqlitecloud_value = SQLiteCloudValue()
832854
len = 0
833855
cellsize = 0
834856

@@ -839,35 +861,35 @@ def _internal_parse_value(self, buffer: bytes, index: int = 0) -> SQLiteCloudVal
839861
if cellsize is not None:
840862
cellsize = 2
841863

842-
SQLiteCloud_value.len = len
843-
SQLiteCloud_value.cellsize = cellsize
864+
sqlitecloud_value.len = len
865+
sqlitecloud_value.cellsize = cellsize
844866

845-
return SQLiteCloud_value
867+
return sqlitecloud_value
846868

847-
SQLiteCloud_number = self._internal_parse_number(buffer, index + 1)
848-
blen = SQLiteCloud_number.value
849-
cstart = SQLiteCloud_number.cstart
869+
sqlitecloud_number = self._internal_parse_number(buffer, index + 1)
870+
blen = sqlitecloud_number.value
871+
cstart = sqlitecloud_number.cstart
850872

851873
# handle decimal/float cases
852874
if c == SQLITECLOUD_CMD.INT.value or c == SQLITECLOUD_CMD.FLOAT.value:
853875
nlen = cstart - index
854876
len = nlen - 2
855877
cellsize = nlen
856878

857-
SQLiteCloud_value.value = (buffer[index + 1 : index + 1 + len]).decode()
858-
SQLiteCloud_value.len
859-
SQLiteCloud_value.cellsize = cellsize
879+
sqlitecloud_value.value = (buffer[index + 1 : index + 1 + len]).decode()
880+
sqlitecloud_value.len
881+
sqlitecloud_value.cellsize = cellsize
860882

861-
return SQLiteCloud_value
883+
return sqlitecloud_value
862884

863885
len = blen - 1 if c == SQLITECLOUD_CMD.ZEROSTRING.value else blen
864886
cellsize = blen + cstart - index
865887

866-
SQLiteCloud_value.value = (buffer[cstart : cstart + len]).decode()
867-
SQLiteCloud_value.len = len
868-
SQLiteCloud_value.cellsize = cellsize
888+
sqlitecloud_value.value = (buffer[cstart : cstart + len]).decode()
889+
sqlitecloud_value.len = len
890+
sqlitecloud_value.cellsize = cellsize
869891

870-
return SQLiteCloud_value
892+
return sqlitecloud_value
871893

872894
def _internal_parse_rowset_signature(
873895
self, buffer: bytes
@@ -951,9 +973,9 @@ def _internal_parse_rowset_header(
951973
# parse column names
952974
rowset.colname = []
953975
for i in range(ncols):
954-
SQLiteCloud_number = self._internal_parse_number(buffer, start)
955-
number_len = SQLiteCloud_number.value
956-
cstart = SQLiteCloud_number.cstart
976+
sqlitecloud_number = self._internal_parse_number(buffer, start)
977+
number_len = sqlitecloud_number.value
978+
cstart = sqlitecloud_number.cstart
957979
value = buffer[cstart : cstart + number_len]
958980
rowset.colname.append(value.decode())
959981
start = cstart + number_len
@@ -969,63 +991,63 @@ def _internal_parse_rowset_header(
969991
# parse declared types
970992
rowset.decltype = []
971993
for i in range(ncols):
972-
SQLiteCloud_number = self._internal_parse_number(buffer, start)
973-
number_len = SQLiteCloud_number.value
974-
cstart = SQLiteCloud_number.cstart
994+
sqlitecloud_number = self._internal_parse_number(buffer, start)
995+
number_len = sqlitecloud_number.value
996+
cstart = sqlitecloud_number.cstart
975997
value = buffer[cstart : cstart + number_len]
976998
rowset.decltype.append(value.decode())
977999
start = cstart + number_len
9781000

9791001
# parse database names
9801002
rowset.dbname = []
9811003
for i in range(ncols):
982-
SQLiteCloud_number = self._internal_parse_number(buffer, start)
983-
number_len = SQLiteCloud_number.value
984-
cstart = SQLiteCloud_number.cstart
1004+
sqlitecloud_number = self._internal_parse_number(buffer, start)
1005+
number_len = sqlitecloud_number.value
1006+
cstart = sqlitecloud_number.cstart
9851007
value = buffer[cstart : cstart + number_len]
9861008
rowset.dbname.append(value.decode())
9871009
start = cstart + number_len
9881010

9891011
# parse table names
9901012
rowset.tblname = []
9911013
for i in range(ncols):
992-
SQLiteCloud_number = self._internal_parse_number(buffer, start)
993-
number_len = SQLiteCloud_number.value
994-
cstart = SQLiteCloud_number.cstart
1014+
sqlitecloud_number = self._internal_parse_number(buffer, start)
1015+
number_len = sqlitecloud_number.value
1016+
cstart = sqlitecloud_number.cstart
9951017
value = buffer[cstart : cstart + number_len]
9961018
rowset.tblname.append(value.decode())
9971019
start = cstart + number_len
9981020

9991021
# parse column original names
10001022
rowset.origname = []
10011023
for i in range(ncols):
1002-
SQLiteCloud_number = self._internal_parse_number(buffer, start)
1003-
number_len = SQLiteCloud_number.value
1004-
cstart = SQLiteCloud_number.cstart
1024+
sqlitecloud_number = self._internal_parse_number(buffer, start)
1025+
number_len = sqlitecloud_number.value
1026+
cstart = sqlitecloud_number.cstart
10051027
value = buffer[cstart : cstart + number_len]
10061028
rowset.origname.append(value.decode())
10071029
start = cstart + number_len
10081030

10091031
# parse not null flags
10101032
rowset.notnull = []
10111033
for i in range(ncols):
1012-
SQLiteCloud_number = self._internal_parse_number(buffer, start)
1013-
rowset.notnull.append(SQLiteCloud_number.value)
1014-
start = SQLiteCloud_number.cstart
1034+
sqlitecloud_number = self._internal_parse_number(buffer, start)
1035+
rowset.notnull.append(sqlitecloud_number.value)
1036+
start = sqlitecloud_number.cstart
10151037

10161038
# parse primary key flags
10171039
rowset.prikey = []
10181040
for i in range(ncols):
1019-
SQLiteCloud_number = self._internal_parse_number(buffer, start)
1020-
rowset.prikey.append(SQLiteCloud_number.value)
1021-
start = SQLiteCloud_number.cstart
1041+
sqlitecloud_number = self._internal_parse_number(buffer, start)
1042+
rowset.prikey.append(sqlitecloud_number.value)
1043+
start = sqlitecloud_number.cstart
10221044

10231045
# parse autoincrement flags
10241046
rowset.autoinc = []
10251047
for i in range(ncols):
1026-
SQLiteCloud_number = self._internal_parse_number(buffer, start)
1027-
rowset.autoinc.append(SQLiteCloud_number.value)
1028-
start = SQLiteCloud_number.cstart
1048+
sqlitecloud_number = self._internal_parse_number(buffer, start)
1049+
rowset.autoinc.append(sqlitecloud_number.value)
1050+
start = sqlitecloud_number.cstart
10291051

10301052
return start
10311053

@@ -1034,6 +1056,6 @@ def _internal_parse_rowset_values(
10341056
):
10351057
# loop to parse each individual value
10361058
for i in range(bound):
1037-
SQLiteCloud_value = self._internal_parse_value(buffer, start)
1038-
start += SQLiteCloud_value.cellsize
1039-
rowset.data.append(SQLiteCloud_value.value)
1059+
sqlitecloud_value = self._internal_parse_value(buffer, start)
1060+
start += sqlitecloud_value.cellsize
1061+
rowset.data.append(sqlitecloud_value.value)

0 commit comments

Comments
 (0)