|
| 1 | +from pymysqlreplication.util import * |
| 2 | +from pymysqlreplication.tests.base import PyMySQLReplicationTestCase |
| 3 | + |
| 4 | +""" |
| 5 | +These tests aim to cover some potential input scenarios, including valid inputs, edge cases, and error handling. |
| 6 | +This approach ensures that every line of the corresponding function is executed under test conditions, |
| 7 | +thereby improving the coverage. |
| 8 | +""" |
| 9 | + |
| 10 | + |
| 11 | +class TestIsDataShort(PyMySQLReplicationTestCase): |
| 12 | + def test_data_is_shorter(self): |
| 13 | + # Test with data shorter than expected. |
| 14 | + data = bytearray([0x01]) # 1-byte data |
| 15 | + expected_length = 2 |
| 16 | + self.assertTrue(is_data_short(data, expected_length)) |
| 17 | + |
| 18 | + def test_data_is_equal_length(self): |
| 19 | + # Test with data equal to expected length. |
| 20 | + data = bytearray([0x01, 0x00]) # 2-byte data |
| 21 | + expected_length = 2 |
| 22 | + self.assertFalse(is_data_short(data, expected_length)) |
| 23 | + |
| 24 | + def test_data_is_longer(self): |
| 25 | + # Test with data longer than expected. |
| 26 | + data = bytearray([0x01, 0x00, 0x02]) # 3-byte data |
| 27 | + expected_length = 2 |
| 28 | + self.assertFalse(is_data_short(data, expected_length)) |
| 29 | + |
| 30 | + def test_data_is_empty(self): |
| 31 | + # Test with empty data. |
| 32 | + data = bytearray([]) |
| 33 | + expected_length = 1 |
| 34 | + self.assertTrue(is_data_short(data, expected_length)) |
| 35 | + |
| 36 | + |
| 37 | +class TestDecodeCount(PyMySQLReplicationTestCase): |
| 38 | + def test_small_format(self): |
| 39 | + # Test with 2-byte input and small format. |
| 40 | + data = bytearray([0x01, 0x00]) |
| 41 | + is_small = True |
| 42 | + result = decode_count(data, is_small) |
| 43 | + self.assertEqual(result, 1) |
| 44 | + |
| 45 | + def test_large_format(self): |
| 46 | + # Test with 4-byte input and large format. |
| 47 | + data = bytearray([0x01, 0x00, 0x00, 0x00]) |
| 48 | + is_small = False |
| 49 | + result = decode_count(data, is_small) |
| 50 | + self.assertEqual(result, 1) |
| 51 | + |
| 52 | + |
| 53 | +class TestDecodeUint(PyMySQLReplicationTestCase): |
| 54 | + def test_valid_input(self): |
| 55 | + # Test with a known 2-byte input. |
| 56 | + data = bytearray([0x01, 0x00]) |
| 57 | + result = decode_uint(data) |
| 58 | + self.assertEqual(result, 1) |
| 59 | + |
| 60 | + def test_short_data(self): |
| 61 | + # Test with a 1-byte input, which is less than expected. |
| 62 | + data = bytearray([0x01]) |
| 63 | + result = decode_uint(data) |
| 64 | + self.assertEqual(result, 0) |
| 65 | + |
| 66 | + def test_empty_data(self): |
| 67 | + data = bytearray([]) |
| 68 | + result = decode_uint(data) |
| 69 | + self.assertEqual(result, 0) |
| 70 | + |
| 71 | + |
| 72 | +class TestDecodeVariableLength(PyMySQLReplicationTestCase): |
| 73 | + def test_single_byte(self): |
| 74 | + # Test with a single byte where the high bit is not set (indicating the end) |
| 75 | + data = bytearray([0x05]) # 5 with the high bit not set |
| 76 | + length, pos = decode_variable_length(data) |
| 77 | + self.assertEqual(length, 5) |
| 78 | + self.assertEqual(pos, 1) |
| 79 | + |
| 80 | + def test_multiple_bytes(self): |
| 81 | + # Test with multiple bytes |
| 82 | + # 0x81 -> 1 with the high bit set, indicating more bytes |
| 83 | + # 0x01 -> 1 with the high bit not set, indicating the end |
| 84 | + # Combined value is 1 + (1 << 7) = 129 |
| 85 | + data = bytearray([0x81, 0x01]) |
| 86 | + length, pos = decode_variable_length(data) |
| 87 | + self.assertEqual(length, 129) |
| 88 | + self.assertEqual(pos, 2) |
| 89 | + |
| 90 | + def test_max_length(self): |
| 91 | + # Test with the maximum length (5 bytes) |
| 92 | + # This will test the boundary condition of the loop in the function |
| 93 | + data = bytearray( |
| 94 | + [0x80, 0x80, 0x80, 0x80, 0x01] |
| 95 | + ) # Each 0x80 has the high bit set, 0x01 does not |
| 96 | + # The value is 1 << (7 * 4) = 2**28 |
| 97 | + length, pos = decode_variable_length(data) |
| 98 | + self.assertEqual(length, 2**28) |
| 99 | + self.assertEqual(pos, 5) |
| 100 | + |
| 101 | + |
| 102 | +class TestParseUint16(PyMySQLReplicationTestCase): |
| 103 | + def test_valid_input(self): |
| 104 | + data = bytearray([0x01, 0x00]) |
| 105 | + result = parse_uint16(data) |
| 106 | + self.assertEqual(result, 1) |
| 107 | + |
| 108 | + def test_different_input(self): |
| 109 | + data = bytearray([0xFF, 0x00]) # Represents the unsigned integer 255 |
| 110 | + result = parse_uint16(data) |
| 111 | + self.assertEqual(result, 255) |
| 112 | + |
| 113 | + |
| 114 | +class TestLengthEncodedInt(PyMySQLReplicationTestCase): |
| 115 | + def test_single_byte(self): |
| 116 | + data = bytearray([0x05]) |
| 117 | + result, _, _ = length_encoded_int(data) |
| 118 | + self.assertEqual(result, 5) |
| 119 | + |
| 120 | + def test_two_bytes(self): |
| 121 | + data = bytearray([0xFC, 0x12, 0x34]) |
| 122 | + result, _, _ = length_encoded_int(data) |
| 123 | + self.assertEqual(result, 0x3412) |
| 124 | + |
| 125 | + def test_three_bytes(self): |
| 126 | + data = bytearray([0xFD, 0x01, 0x02, 0x03]) |
| 127 | + result, _, _ = length_encoded_int(data) |
| 128 | + self.assertEqual(result, 0x030201) |
| 129 | + |
| 130 | + |
| 131 | +class TestDecodeTime(PyMySQLReplicationTestCase): |
| 132 | + def test_midnight(self): |
| 133 | + data = bytearray([0x00] * 8) # Represents 00:00:00 |
| 134 | + result = decode_time(data) |
| 135 | + self.assertEqual(result, datetime.time(0, 0, 0)) |
| 136 | + |
| 137 | + def test_valid_time(self): |
| 138 | + data = bytearray( |
| 139 | + [0x00, 0x00, 0x00, 0xC0, 0x18, 0x01, 0x00, 0x00] |
| 140 | + ) # Represents 17:35:00 |
| 141 | + result = decode_time(data) |
| 142 | + self.assertEqual(result, datetime.time(17, 35, 0)) |
| 143 | + |
| 144 | + |
| 145 | +class TestDecodeDatetime(PyMySQLReplicationTestCase): |
| 146 | + def test_zero_datetime(self): |
| 147 | + data = bytearray([0x00] * 8) # Represents 0000-00-00 00:00:00 |
| 148 | + result = decode_datetime(data) |
| 149 | + self.assertEqual(result, "0000-00-00 00:00:00") |
| 150 | + |
| 151 | + def test_valid_datetime(self): |
| 152 | + data = bytearray( |
| 153 | + [0x00, 0x00, 0x00, 0xC0, 0x18, 0x9B, 0xB1, 0x19] |
| 154 | + ) # Represents 2023-11-13 17:35:00 |
| 155 | + expected_datetime = datetime.datetime( |
| 156 | + year=2023, month=11, day=13, hour=17, minute=35, second=0 |
| 157 | + ) |
| 158 | + result = decode_datetime(data) |
| 159 | + self.assertEqual(result, expected_datetime) |
0 commit comments