Skip to content

Commit 31c27f4

Browse files
author
ffffwh
committed
handle timestamp default value for kafka #537
1 parent 7a98826 commit 31c27f4

File tree

2 files changed

+27
-9
lines changed

2 files changed

+27
-9
lines changed

driver/mysql/base/utils.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -543,11 +543,10 @@ func GetTableColumnsSqle(sqleContext *sqle.Context, schema string,
543543
newColumn.Nullable = false
544544
case ast.ColumnOptionAutoIncrement:
545545
case ast.ColumnOptionDefaultValue:
546-
value, ok := colOpt.Expr.(ast.ValueExpr)
547-
if !ok {
548-
newColumn.Default = nil
549-
} else {
550-
switch v := value.GetValue().(type) {
546+
expectNil := false
547+
switch exp := colOpt.Expr.(type) {
548+
case ast.ValueExpr:
549+
switch v := exp.GetValue().(type) {
551550
case int64, uint64, float32, float64, string, []byte:
552551
newColumn.Default = v
553552
case types.BinaryLiteral:
@@ -556,11 +555,20 @@ func GetTableColumnsSqle(sqleContext *sqle.Context, schema string,
556555
newColumn.Default = v.String()
557556
case nil:
558557
newColumn.Default = nil
559-
default:
560-
newColumn.Default = nil
561-
g.Logger.Warn("GetTableColumnsSqle: unknown type for a default value",
562-
"schema", schema, "table", table, "type", hclog.Fmt("%T", v))
558+
expectNil = true
563559
}
560+
case *ast.FuncCallExpr:
561+
if col.Tp.Tp == parsermysql.TypeTimestamp {
562+
if exp.FnName.L == "current_timestamp" {
563+
// See # 537. "1970-01-01 00:00:00" is debezium's representation for CURRENT_TIMESTAMP.
564+
// TODO It would be better to put it in dest/kafka.
565+
newColumn.Default = "1970-01-01 00:00:00"
566+
}
567+
}
568+
}
569+
if newColumn.Default == nil && !expectNil {
570+
g.Logger.Warn("GetTableColumnsSqle: cannot handle a default value",
571+
"schema", schema, "table", table, "column", col.Name, "type", hclog.Fmt("%T", colOpt.Expr))
564572
}
565573
case ast.ColumnOptionUniqKey:
566574
newColumn.Key = "UNI"

driver/mysql/base/utils_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ func TestGetTableColumnsSqle(t *testing.T) {
255255
sqls := []string{
256256
"create table a.text_columns(id int(11) not null primary key,c_text longtext)",
257257
"create table a.binary_columns(id int(11) not null primary key, c_binary varbinary(255))",
258+
"create table a.timestamp1(id int(11) primary key, val timestamp default current_timestamp)",
258259
}
259260
for i := range sqls {
260261
stmt, err := p.ParseOneStmt(sqls[i], "", "")
@@ -293,6 +294,15 @@ func TestGetTableColumnsSqle(t *testing.T) {
293294
},
294295
want: nil,
295296
wantErr: false,
297+
}, {
298+
name: "timestamp",
299+
args: args{
300+
sqleContext: sqleCtx,
301+
schema: "a",
302+
table: "timestamp1",
303+
},
304+
want: nil,
305+
wantErr: false,
296306
},
297307
}
298308
for _, tt := range tests {

0 commit comments

Comments
 (0)