Skip to content

Commit ea50d39

Browse files
authored
fix send to dls and allow produce more types (#137)
1 parent 6684375 commit ea50d39

File tree

1 file changed

+65
-50
lines changed

1 file changed

+65
-50
lines changed

memphis/producer.py

Lines changed: 65 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ async def validate_msg(self, message):
4141
elif schema_type == "graphql":
4242
message = self.validate_graphql(message)
4343
return message
44+
elif hasattr(message, "SerializeToString"):
45+
msgToSend = message.SerializeToString()
46+
return msgToSend
47+
elif isinstance(message, str):
48+
message = message.encode("utf-8")
49+
return message
50+
elif isinstance(message, graphql.language.ast.DocumentNode):
51+
msg = message
52+
message = str(msg.loc.source.body)
53+
message = message.encode("utf-8")
54+
return message
4455
elif not isinstance(message, bytearray) and not isinstance(message, dict):
4556
raise MemphisSchemaError("Unsupported message type")
4657
else:
@@ -181,63 +192,67 @@ async def produce(
181192
except Exception as e:
182193
if hasattr(e, "status_code") and e.status_code == "503":
183194
raise MemphisError(
184-
"Produce operation has failed, please check whether Station/Producer are still exist"
195+
"Produce operation has failed, please check whether Station/Producer still exist"
185196
)
186197
else:
187198
if "Schema validation has failed" in str(
188199
e
189200
) or "Unsupported message type" in str(e):
190-
msgToSend = ""
191-
if hasattr(message, "SerializeToString"):
192-
msgToSend = message.SerializeToString().decode("utf-8")
193-
elif isinstance(message, bytearray):
194-
msgToSend = str(message, "utf-8")
195-
else:
196-
msgToSend = str(message)
197-
if self.connection.station_schemaverse_to_dls[
198-
self.internal_station_name
199-
]:
200-
memphis_headers = {
201-
"$memphis_producedBy": self.producer_name,
202-
"$memphis_connectionId": self.connection.connection_id,
203-
}
204-
205-
if headers != {}:
206-
headers = headers.headers
207-
headers.update(memphis_headers)
201+
if self.connection.schema_updates_data[self.internal_station_name] != {}:
202+
msgToSend = ""
203+
if hasattr(message, "SerializeToString"):
204+
msgToSend = message.SerializeToString().decode("utf-8")
205+
elif isinstance(message, bytearray):
206+
msgToSend = str(message, "utf-8")
208207
else:
209-
headers = memphis_headers
208+
msgToSend = str(message)
209+
if self.connection.station_schemaverse_to_dls[
210+
self.internal_station_name
211+
]:
212+
unix_time = int(time.time())
213+
214+
memphis_headers = {
215+
"$memphis_producedBy": self.producer_name,
216+
"$memphis_connectionId": self.connection.connection_id,
217+
}
218+
219+
if headers != {}:
220+
headers = headers.headers
221+
headers.update(memphis_headers)
222+
else:
223+
headers = memphis_headers
210224

211-
msgToSendEncoded = msgToSend.encode("utf-8")
212-
msgHex = msgToSendEncoded.hex()
213-
buf = {
214-
"station_name": self.internal_station_name,
215-
"producer": {
216-
"name": self.producer_name,
217-
"connection_id": self.connection.connection_id,
218-
},
219-
"message": {
220-
"data": msgHex,
221-
"headers": headers,
222-
},
223-
"validation_error": str(e),
224-
}
225-
buf = json.dumps(buf).encode("utf-8")
226-
await self.connection.broker_manager.publish("$memphis_schemaverse_dls", buf)
227-
if self.connection.cluster_configurations.get(
228-
"send_notification"
229-
):
230-
await self.connection.send_notification(
231-
"Schema validation has failed",
232-
"Station: "
233-
+ self.station_name
234-
+ "\nProducer: "
235-
+ self.producer_name
236-
+ "\nError:"
237-
+ str(e),
238-
msgToSend,
239-
schemaVFailAlertType,
240-
)
225+
msgToSendEncoded = msgToSend.encode("utf-8")
226+
msgHex = msgToSendEncoded.hex()
227+
buf = {
228+
"station_name": self.internal_station_name,
229+
"producer": {
230+
"name": self.producer_name,
231+
"connection_id": self.connection.connection_id,
232+
},
233+
"creation_unix": unix_time,
234+
"message": {
235+
"data": msgHex,
236+
"headers": headers,
237+
},
238+
"validation_error": str(e),
239+
}
240+
buf = json.dumps(buf).encode("utf-8")
241+
await self.connection.broker_manager.publish("$memphis_schemaverse_dls", buf)
242+
if self.connection.cluster_configurations.get(
243+
"send_notification"
244+
):
245+
await self.connection.send_notification(
246+
"Schema validation has failed",
247+
"Station: "
248+
+ self.station_name
249+
+ "\nProducer: "
250+
+ self.producer_name
251+
+ "\nError:"
252+
+ str(e),
253+
msgToSend,
254+
schemaVFailAlertType,
255+
)
241256
raise MemphisError(str(e)) from e
242257

243258
async def destroy(self):

0 commit comments

Comments
 (0)