66import json
77
88from pymysql .charset import charset_by_name
9+ from enum import Enum
910
1011from .event import BinLogEvent
1112from .exceptions import TableMetadataUnavailableError
@@ -552,7 +553,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
552553 super ().__init__ (from_packet , event_size ,
553554 table_map , ctl_connection , ** kwargs )
554555 if self ._processed :
555- #Body
556+ # Body
556557 self .columns_present_bitmap = self .packet .read (
557558 (self .number_of_columns + 7 ) / 8 )
558559 self .columns_present_bitmap2 = self .packet .read (
@@ -577,6 +578,40 @@ def _dump(self):
577578 row ["before_values" ][key ],
578579 row ["after_values" ][key ]))
579580
581+ class OptionalMetaData :
582+ def __init__ (self ):
583+ self .unsigned_column_list = []
584+ self .default_charset_collation = None
585+ self .charset_collation = {}
586+ self .column_charset = []
587+ self .column_name_list = []
588+ self .set_str_value_list = []
589+ self .set_enum_str_value_list = []
590+ self .geometry_type_list = []
591+ self .simple_primary_key_list = []
592+ self .primary_keys_with_prefix = {}
593+ self .enum_and_set_default_charset = None
594+ self .enum_and_set_charset_collation = {}
595+ self .enum_and_set_default_column_charset_list = []
596+ self .charset_collation_list = []
597+ self .enum_and_set_collation_list = []
598+ self .visibility_list = []
599+
600+ def dump (self ):
601+ print ("=== %s ===" % self .__class__ .__name__ )
602+ print ("unsigned_column_list: %s" % self .unsigned_column_list )
603+ print ("default_charset_collation: %s" % self .default_charset_collation )
604+ print ("charset_collation: %s" % self .charset_collation )
605+ print ("column_charset: %s" % self .column_charset )
606+ print ("column_name_list: %s" % self .column_name_list )
607+ print ("set_str_value_list : %s" % self .set_str_value_list )
608+ print ("set_enum_str_value_list : %s" % self .set_enum_str_value_list )
609+ print ("geometry_type_list : %s" % self .geometry_type_list )
610+ print ("simple_primary_key_list: %s" % self .simple_primary_key_list )
611+ print ("primary_keys_with_prefix: %s" % self .primary_keys_with_prefix )
612+ print ("visibility_list: %s" % self .visibility_list )
613+ print ("charset_collation_list: %s" % self .charset_collation_list )
614+ print ("enum_and_set_collation_list: %s" % self .enum_and_set_collation_list )
580615
581616class TableMapEvent (BinLogEvent ):
582617 """This event describes the structure of a table.
@@ -633,6 +668,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
633668 else :
634669 self .column_schemas = self ._ctl_connection ._get_table_information (self .schema , self .table )
635670
671+ self .dbms = self ._ctl_connection ._get_dbms ()
636672 ordinal_pos_loc = 0
637673
638674 if self .column_count != 0 :
@@ -675,6 +711,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
675711 # ith column is nullable if (i - 1)th bit is set to True, not nullable otherwise
676712 ## Refer to definition of and call to row.event._is_null() to interpret bitmap corresponding to columns
677713 self .null_bitmask = self .packet .read ((self .column_count + 7 ) / 8 )
714+ # optional meta Data
715+ self .optional_metadata = self ._get_optional_meta_data ()
678716
679717 def get_table (self ):
680718 return self .table_obj
@@ -685,3 +723,252 @@ def _dump(self):
685723 print ("Schema: %s" % (self .schema ))
686724 print ("Table: %s" % (self .table ))
687725 print ("Columns: %s" % (self .column_count ))
726+ self .optional_metadata .dump ()
727+
728+ def _get_optional_meta_data (self ):
729+ """
730+ DEFAULT_CHARSET and COLUMN_CHARSET don't appear together,
731+ and ENUM_AND_SET_DEFAULT_CHARSET and ENUM_AND_SET_COLUMN_CHARSET don't appear together.
732+ They are just alternative ways to pack character set information.
733+ When binlogging, it logs character sets in the way that occupies least storage.
734+
735+ TLV format data (TYPE, LENGTH, VALUE)
736+ """
737+ optional_metadata = OptionalMetaData ()
738+ while self .packet .bytes_to_read () > BINLOG .BINLOG_CHECKSUM_LEN :
739+ option_metadata_type = self .packet .read (1 )[0 ]
740+ length = self .packet .read_length_coded_binary ()
741+ field_type : MetadataFieldType = MetadataFieldType .by_index (option_metadata_type )
742+
743+ if field_type == MetadataFieldType .SIGNEDNESS :
744+ signed_column_list = self ._convert_include_non_numeric_column (
745+ self ._read_bool_list (length , True ))
746+ optional_metadata .unsigned_column_list = signed_column_list
747+
748+ elif field_type == MetadataFieldType .DEFAULT_CHARSET :
749+ optional_metadata .default_charset_collation , optional_metadata .charset_collation = self ._read_default_charset (
750+ length )
751+ optional_metadata .charset_collation_list = self ._parsed_column_charset_by_default_charset (
752+ optional_metadata .default_charset_collation ,
753+ optional_metadata .charset_collation ,
754+ self ._is_character_column )
755+
756+ elif field_type == MetadataFieldType .COLUMN_CHARSET :
757+ optional_metadata .column_charset = self ._read_ints (length )
758+ optional_metadata .charset_collation_list = self ._parsed_column_charset_by_column_charset (
759+ optional_metadata .column_charset , self ._is_character_column )
760+
761+ elif field_type == MetadataFieldType .COLUMN_NAME :
762+ optional_metadata .column_name_list = self ._read_column_names (length )
763+
764+ elif field_type == MetadataFieldType .SET_STR_VALUE :
765+ optional_metadata .set_str_value_list = self ._read_type_values (length )
766+
767+ elif field_type == MetadataFieldType .ENUM_STR_VALUE :
768+ optional_metadata .set_enum_str_value_list = self ._read_type_values (length )
769+
770+ elif field_type == MetadataFieldType .GEOMETRY_TYPE :
771+ optional_metadata .geometry_type_list = self ._read_ints (length )
772+
773+ elif field_type == MetadataFieldType .SIMPLE_PRIMARY_KEY :
774+ optional_metadata .simple_primary_key_list = self ._read_ints (length )
775+
776+ elif field_type == MetadataFieldType .PRIMARY_KEY_WITH_PREFIX :
777+ optional_metadata .primary_keys_with_prefix = self ._read_primary_keys_with_prefix (length )
778+
779+ elif field_type == MetadataFieldType .ENUM_AND_SET_DEFAULT_CHARSET :
780+ optional_metadata .enum_and_set_default_charset , optional_metadata .enum_and_set_charset_collation = self ._read_default_charset (
781+ length )
782+
783+ optional_metadata .enum_and_set_collation_list = self ._parsed_column_charset_by_default_charset (
784+ optional_metadata .enum_and_set_default_charset ,
785+ optional_metadata .enum_and_set_charset_collation ,
786+ self ._is_enum_or_set_column )
787+
788+ elif field_type == MetadataFieldType .ENUM_AND_SET_COLUMN_CHARSET :
789+ optional_metadata .enum_and_set_default_column_charset_list = self ._read_ints (length )
790+
791+ optional_metadata .enum_and_set_collation_list = self ._parsed_column_charset_by_column_charset (
792+ optional_metadata .enum_and_set_default_column_charset_list , self ._is_enum_or_set_column )
793+
794+ elif field_type == MetadataFieldType .VISIBILITY :
795+ optional_metadata .visibility_list = self ._read_bool_list (length , False )
796+
797+ return optional_metadata
798+
799+ def _convert_include_non_numeric_column (self , signedness_bool_list ):
800+ # The incoming order of columns in the packet represents the indices of the numeric columns.
801+ # Thus, it transforms non-numeric columns to align with the sorting.
802+ bool_list = []
803+ position = 0
804+ for i in range (self .column_count ):
805+ column_type = self .columns [i ].type
806+ if self ._is_numeric_column (column_type ):
807+ if signedness_bool_list [position ]:
808+ bool_list .append (True )
809+ else :
810+ bool_list .append (False )
811+ position += 1
812+ else :
813+ bool_list .append (False )
814+
815+ return bool_list
816+
817+ def _parsed_column_charset_by_default_charset (self , default_charset_collation : int , column_charset_collation : dict ,
818+ column_type_detect_function ):
819+ column_charset = []
820+ for i in range (self .column_count ):
821+ column_type = self .columns [i ].type
822+ if not column_type_detect_function (column_type , dbms = self .dbms ):
823+ continue
824+ elif i not in column_charset_collation .keys ():
825+ column_charset .append (default_charset_collation )
826+ else :
827+ column_charset .append (column_charset_collation [i ])
828+
829+ return column_charset
830+
831+ def _parsed_column_charset_by_column_charset (self , column_charset_list : list , column_type_detect_function ):
832+ column_charset = []
833+ position = 0
834+ if len (column_charset_list ) == 0 :
835+ return
836+ for i in range (self .column_count ):
837+ column_type = self .columns [i ].type
838+ if not column_type_detect_function (column_type , dbms = self .dbms ):
839+ continue
840+ else :
841+ column_charset .append (column_charset_list [position ])
842+ position += 1
843+
844+ return column_charset
845+
846+ def _read_bool_list (self , read_byte_length , signedness_flag ):
847+ # if signedness_flag true
848+ # The order of the index in the packet is only the index between the numeric_columns.
849+ # Therefore, we need to use numeric_column_count when calculating bits.
850+ bool_list = []
851+ bytes_data = self .packet .read (read_byte_length )
852+
853+ byte = 0
854+ byte_idx = 0
855+ bit_idx = 0
856+
857+ for i in range (self .column_count ):
858+ column_type = self .columns [i ].type
859+ if not self ._is_numeric_column (column_type ) and signedness_flag :
860+ continue
861+ if bit_idx == 0 :
862+ byte = bytes_data [byte_idx ]
863+ byte_idx += 1
864+ bool_list .append ((byte & (0b10000000 >> bit_idx )) != 0 )
865+ bit_idx = (bit_idx + 1 ) % 8
866+ return bool_list
867+
868+ def _read_default_charset (self , length ):
869+ charset = {}
870+ read_until = self .packet .read_bytes + length
871+ if self .packet .read_bytes >= read_until :
872+ return
873+ default_charset_collation = self .packet .read_length_coded_binary ()
874+ while self .packet .read_bytes < read_until :
875+ column_index = self .packet .read_length_coded_binary ()
876+ charset_collation = self .packet .read_length_coded_binary ()
877+ charset [column_index ] = charset_collation
878+
879+ return default_charset_collation , charset
880+
881+ def _read_ints (self , length ):
882+ result = []
883+ read_until = self .packet .read_bytes + length
884+ while self .packet .read_bytes < read_until :
885+ result .append (self .packet .read_length_coded_binary ())
886+ return result
887+
888+ def _read_column_names (self , length ):
889+ result = []
890+ read_until = self .packet .read_bytes + length
891+ while self .packet .read_bytes < read_until :
892+ result .append (self .packet .read_variable_length_string ().decode ())
893+ return result
894+
895+ def _read_type_values (self , length ):
896+ result = []
897+ read_until = self .packet .read_bytes + length
898+ if self .packet .read_bytes >= read_until :
899+ return
900+ while self .packet .read_bytes < read_until :
901+ type_value_list = []
902+ value_count = self .packet .read_length_coded_binary ()
903+ for i in range (value_count ):
904+ value = self .packet .read_variable_length_string ()
905+ decode_value = ""
906+ try :
907+ decode_value = value .decode ()
908+ except UnicodeDecodeError :
909+ # ignore not utf-8 decode type
910+ pass
911+ type_value_list .append (decode_value )
912+ result .append (type_value_list )
913+ return result
914+
915+ def _read_primary_keys_with_prefix (self , length ):
916+ ints = self ._read_ints (length )
917+ result = {}
918+ for i in range (0 , len (ints ), 2 ):
919+ result [ints [i ]] = ints [i + 1 ]
920+ return result
921+
922+ @staticmethod
923+ def _is_character_column (column_type , dbms = 'mysql' ):
924+ if column_type in [FIELD_TYPE .STRING , FIELD_TYPE .VAR_STRING , FIELD_TYPE .VARCHAR , FIELD_TYPE .BLOB ]:
925+ return True
926+ if column_type == FIELD_TYPE .GEOMETRY and dbms == 'mariadb' :
927+ return True
928+ return False
929+
930+ @staticmethod
931+ def _is_enum_column (column_type ):
932+ if column_type == FIELD_TYPE .ENUM :
933+ return True
934+ return False
935+
936+ @staticmethod
937+ def _is_set_column (column_type ):
938+ if column_type == FIELD_TYPE .SET :
939+ return True
940+ return False
941+
942+ @staticmethod
943+ def _is_enum_or_set_column (column_type , dbms = 'mysql' ):
944+ if column_type in [FIELD_TYPE .ENUM , FIELD_TYPE .SET ]:
945+ return True
946+ return False
947+
948+ @staticmethod
949+ def _is_numeric_column (column_type ):
950+ if column_type in [FIELD_TYPE .TINY , FIELD_TYPE .SHORT , FIELD_TYPE .INT24 , FIELD_TYPE .LONG ,
951+ FIELD_TYPE .LONGLONG , FIELD_TYPE .NEWDECIMAL , FIELD_TYPE .FLOAT ,
952+ FIELD_TYPE .DOUBLE ,
953+ FIELD_TYPE .YEAR ]:
954+ return True
955+ return False
956+
957+ class MetadataFieldType (Enum ):
958+ SIGNEDNESS = 1 # Signedness of numeric columns
959+ DEFAULT_CHARSET = 2 # Charsets of character columns
960+ COLUMN_CHARSET = 3 # Charsets of character columns
961+ COLUMN_NAME = 4 # Names of columns
962+ SET_STR_VALUE = 5 # The string values of SET columns
963+ ENUM_STR_VALUE = 6 # The string values in ENUM columns
964+ GEOMETRY_TYPE = 7 # The real type of geometry columns
965+ SIMPLE_PRIMARY_KEY = 8 # The primary key without any prefix
966+ PRIMARY_KEY_WITH_PREFIX = 9 # The primary key with some prefix
967+ ENUM_AND_SET_DEFAULT_CHARSET = 10 # Charsets of ENUM and SET columns
968+ ENUM_AND_SET_COLUMN_CHARSET = 11 # Charsets of ENUM and SET columns
969+ VISIBILITY = 12
970+ UNKNOWN_METADATA_FIELD_TYPE = 128
971+
972+ @staticmethod
973+ def by_index (index ):
974+ return MetadataFieldType (index )
0 commit comments