Skip to content

Commit 25f7a06

Browse files
authored
chore: include message offset into logs (#157)
chore: include message offset into logs
1 parent 4dbf40c commit 25f7a06

File tree

3 files changed

+6
-3
lines changed

3 files changed

+6
-3
lines changed

src/KafkaFlow.Retry/Durable/RetryDurableMiddleware.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
8585
WaitMilliseconds = waitTime.TotalMilliseconds,
8686
PartitionNumber = context.ConsumerContext.Partition,
8787
Worker = context.ConsumerContext.WorkerId,
88+
Offset = context.ConsumerContext.Offset,
8889
ExceptionType = exception.GetType().FullName,
8990
ExceptionMessage = exception.Message
9091
});
@@ -151,4 +152,4 @@ await policy
151152
}
152153
}
153154
}
154-
}
155+
}

src/KafkaFlow.Retry/Forever/RetryForeverMiddleware.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
5858
WaitMilliseconds = waitTime.TotalMilliseconds,
5959
PartitionNumber = context.ConsumerContext.Partition,
6060
Worker = context.ConsumerContext.WorkerId,
61+
Offset = context.ConsumerContext.Offset,
6162
//Headers = context.HeadersAsJson(),
6263
//Message = context.Message.ToJson(),
6364
ExceptionType = exception.GetType().FullName
@@ -106,4 +107,4 @@ await policy
106107
}
107108
}
108109
}
109-
}
110+
}

src/KafkaFlow.Retry/Simple/RetrySimpleMiddleware.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
5959
WaitMilliseconds = waitTime.TotalMilliseconds,
6060
PartitionNumber = context.ConsumerContext.Partition,
6161
Worker = context.ConsumerContext.WorkerId,
62+
Offset = context.ConsumerContext.Offset,
6263
//Headers = context.HeadersAsJson(),
6364
//Message = context.Message.ToJson(),
6465
ExceptionType = exception.GetType().FullName
@@ -108,4 +109,4 @@ await policy
108109
}
109110
}
110111
}
111-
}
112+
}

0 commit comments

Comments
 (0)