Skip to content

Commit 49d9a1b

Browse files
author
ffffwh
committed
kafka: refactor around datetime and timestamp
timestamp is always in UTC
1 parent 43b1f58 commit 49d9a1b

File tree

2 files changed

+33
-31
lines changed

2 files changed

+33
-31
lines changed

driver/kafka/kafka2.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ const (
5050
)
5151

5252
const (
53-
LAYOUT = "2006-01-02 15:04:05"
53+
MySQLDateTimeFormat = "2006-01-02 15:04:05"
54+
MySQLDateFormat = "2006-01-02"
55+
dbzTimestampFormat = "2006-01-02T15:04:05Z"
5456
)
5557

5658
var (
@@ -465,7 +467,7 @@ func NewDateTimeField(optional bool, field string, defaultValue interface{}, loc
465467
}
466468
}
467469
func DateTimeValue(dateTime string, loc *time.Location) int64 {
468-
tm2, err := time.ParseInLocation(LAYOUT, dateTime, loc)
470+
tm2, err := time.ParseInLocation(MySQLDateTimeFormat, dateTime, loc)
469471
if err != nil {
470472
return 0
471473
}
@@ -477,17 +479,16 @@ func DateTimeValue(dateTime string, loc *time.Location) int64 {
477479
return tm2.Unix()*1000 + ms
478480
}
479481
func DateValue(date string) int64 {
480-
tm2, err := time.Parse(LAYOUT, date+" 00:00:00")
482+
tm2, err := time.Parse(MySQLDateFormat, date)
481483
if err != nil {
482484
return 0
483485
}
484486
return tm2.Unix() / 60 / 60 / 24
485487
}
486-
func TimeStamp(timestamp string, loc *time.Location) string {
487-
tm2, _ := time.ParseInLocation(LAYOUT, timestamp, loc)
488-
value := tm2.In(time.UTC).Format(LAYOUT)
489-
timestamp = value[:10] + "T" + value[11:] + "Z"
490-
return timestamp
488+
func TimeStamp(timestamp string) string {
489+
// dtle always read timestamp in UTC
490+
tm2, _ := time.ParseInLocation(MySQLDateTimeFormat, timestamp, time.UTC)
491+
return tm2.In(time.UTC).Format(dbzTimestampFormat)
491492
}
492493

493494
func NewJsonField(optional bool, field string) *Schema {
@@ -556,13 +557,12 @@ func NewSetField(theType SchemaType, optional bool, field string, allowed string
556557
Version: 1,
557558
}
558559
}
559-
func NewTimeStampField(optional bool, field string, defaultValue interface{}, loc *time.Location) *Schema {
560+
func NewTimeStampField(optional bool, field string, defaultValue interface{}) *Schema {
560561
if defaultValue == "CURRENT_TIMESTAMP" {
561562
defaultValue = "1970-01-01T00:00:00Z"
562563
} else if defaultValue != nil {
563-
tm2, _ := time.ParseInLocation(LAYOUT, defaultValue.(string), loc)
564-
value := tm2.In(time.UTC).Format(LAYOUT)
565-
defaultValue = value[:10] + "T" + value[11:] + "Z"
564+
tm2, _ := time.ParseInLocation(MySQLDateTimeFormat, defaultValue.(string), time.UTC)
565+
defaultValue = tm2.In(time.UTC).Format(dbzTimestampFormat)
566566
}
567567
return &Schema{
568568
Field: field,

driver/kafka/kafka3.go

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -762,8 +762,9 @@ func (kr *KafkaRunner) kafkaTransformSnapshotData(
762762
value = TimeValue(valueStr)
763763
case mysqlconfig.TimestampColumnType:
764764
if valueStr != "" {
765-
value = TimeStamp(valueStr, kr.location)
765+
value = TimeStamp(valueStr)
766766
} else {
767+
// TODO what?
767768
value = TimeValue(valueStr)
768769
}
769770
case mysqlconfig.BinaryColumnType:
@@ -790,13 +791,14 @@ func (kr *KafkaRunner) kafkaTransformSnapshotData(
790791
valueStr = "char(255)"
791792
}
792793
value = base64.StdEncoding.EncodeToString([]byte(valueStr))
793-
case mysqlconfig.DateColumnType, mysqlconfig.DateTimeColumnType:
794-
if valueStr != "" && columnList[i].ColumnType == "datetime" {
794+
case mysqlconfig.DateTimeColumnType:
795+
if valueStr != "" {
795796
value = DateTimeValue(valueStr, kr.location)
796-
} else if valueStr != "" {
797+
}
798+
case mysqlconfig.DateColumnType:
799+
if valueStr != "" {
797800
value = DateValue(valueStr)
798801
}
799-
800802
case mysqlconfig.YearColumnType:
801803
if valueStr != "" {
802804
value = YearValue(valueStr)
@@ -1072,18 +1074,22 @@ func (kr *KafkaRunner) kafkaConvertArg(column *mysqlconfig.Column, theValue inte
10721074
theValue = int64(theValue.(uint64))
10731075
}
10741076
}
1075-
case mysqlconfig.TimeColumnType, mysqlconfig.TimestampColumnType:
1076-
if theValue != nil && column.ColumnType == "timestamp" {
1077-
theValue = TimeStamp(theValue.(string), kr.location)
1078-
} else if theValue != nil {
1077+
case mysqlconfig.TimeColumnType:
1078+
if theValue != nil {
10791079
theValue = TimeValue(theValue.(string))
10801080
}
1081-
case mysqlconfig.DateColumnType, mysqlconfig.DateTimeColumnType:
1082-
if theValue != nil && column.ColumnType == "datetime" {
1083-
theValue = DateTimeValue(theValue.(string), kr.location)
1084-
} else if theValue != nil {
1081+
case mysqlconfig.TimestampColumnType:
1082+
if theValue != nil {
1083+
theValue = TimeStamp(theValue.(string))
1084+
}
1085+
case mysqlconfig.DateColumnType:
1086+
if theValue != nil {
10851087
theValue = DateValue(theValue.(string))
10861088
}
1089+
case mysqlconfig.DateTimeColumnType:
1090+
if theValue != nil {
1091+
theValue = DateTimeValue(theValue.(string), kr.location)
1092+
}
10871093
case mysqlconfig.VarbinaryColumnType:
10881094
if theValue != nil {
10891095
theValue = encodeStringInterfaceToBase64String(theValue)
@@ -1294,19 +1300,15 @@ func kafkaColumnListToColDefs(colList *common.ColumnList, loc *time.Location) (v
12941300
case mysqlconfig.DecimalColumnType:
12951301
field = NewDecimalField(cols[i].Precision, cols[i].Scale, optional, fieldName, defaultValue)
12961302
case mysqlconfig.DateColumnType:
1297-
if cols[i].ColumnType == "datetime" {
1298-
field = NewDateTimeField(optional, fieldName, defaultValue, loc)
1299-
} else {
1300-
field = NewDateField(SCHEMA_TYPE_INT32, optional, fieldName, defaultValue)
1301-
}
1303+
field = NewDateField(SCHEMA_TYPE_INT32, optional, fieldName, defaultValue)
13021304
case mysqlconfig.YearColumnType:
13031305
field = NewYearField(SCHEMA_TYPE_INT32, optional, fieldName, defaultValue)
13041306
case mysqlconfig.DateTimeColumnType:
13051307
field = NewDateTimeField(optional, fieldName, defaultValue, loc)
13061308
case mysqlconfig.TimeColumnType:
13071309
field = NewTimeField(optional, fieldName, defaultValue)
13081310
case mysqlconfig.TimestampColumnType:
1309-
field = NewTimeStampField(optional, fieldName, defaultValue, loc)
1311+
field = NewTimeStampField(optional, fieldName, defaultValue)
13101312
case mysqlconfig.JSONColumnType:
13111313
field = NewJsonField(optional, fieldName)
13121314
default:

0 commit comments

Comments
 (0)