Skip to content

Commit b89ba18

Browse files
put some backoff in loadgenerator when errors are created for demo
1 parent ec49bcc commit b89ba18

File tree

1 file changed

+127
-5
lines changed

1 file changed

+127
-5
lines changed

src/shop-dc-shim/load-generator/shop_load_generator.py

Lines changed: 127 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,15 @@
1414
1515
The generator creates realistic purchase patterns that would originate
1616
from datacenter-deployed point-of-sale systems calling cloud checkout.
17+
18+
ERROR HANDLING & BACKOFF LOGIC:
19+
- Purchase requests are NEVER affected by error handling
20+
- Transaction status polling implements circuit breaker pattern
21+
- 500 errors on status checks trigger exponential backoff (1.5x multiplier)
22+
- After 10 consecutive status check failures, circuit breaker opens for 10 minutes
23+
- Purchase flow continues normally even when status checks are failing
24+
- This prevents overwhelming the service with failed status check requests
25+
- Designed to keep demo errors visible while preventing system overload
1726
"""
1827

1928
import json
@@ -64,6 +73,16 @@ def __init__(self, shop_service_url: str = "http://localhost:8070"):
6473
self.shop_service_url = shop_service_url
6574
self.session = self._create_session()
6675

76+
# Status check error tracking and backoff logic
77+
self.status_check_failures = 0
78+
self.consecutive_failures = 0
79+
self.last_failure_time = 0
80+
self.status_check_enabled = True
81+
self.backoff_delay = 1 # Initial backoff delay in seconds
82+
self.max_backoff_delay = 300 # Maximum backoff delay (5 minutes)
83+
self.circuit_breaker_threshold = 10 # Number of consecutive failures before backing off more
84+
self.circuit_breaker_reset_time = 600 # Time to wait before resetting circuit breaker (10 minutes)
85+
6786
# Store locations (simulating datacenter-deployed stores)
6887
self.stores = [
6988
StoreLocation("DC-NYC-01", "Manhattan Flagship", "New York", "NY",
@@ -207,23 +226,80 @@ def submit_purchase(self, purchase_request: Dict[str, Any]) -> Dict[str, Any]:
207226
return {"success": False, "error": str(e)}
208227

209228
def check_transaction_status(self, transaction_id: str) -> Dict[str, Any]:
210-
"""Check the status of a transaction"""
229+
"""Check the status of a transaction with backoff logic for 500 errors"""
230+
# Check if circuit breaker is open and should be reset
231+
current_time = time.time()
232+
if not self.status_check_enabled and (current_time - self.last_failure_time) > self.circuit_breaker_reset_time:
233+
logger.info("Resetting status check circuit breaker - attempting to resume normal operation")
234+
self.status_check_enabled = True
235+
self.consecutive_failures = 0
236+
self.backoff_delay = 1
237+
238+
# If circuit breaker is open, return immediately without making request
239+
if not self.status_check_enabled:
240+
time_until_reset = self.circuit_breaker_reset_time - (current_time - self.last_failure_time)
241+
return {
242+
"success": False,
243+
"error": f"Circuit breaker open - status checks disabled for {time_until_reset:.0f} more seconds",
244+
"circuit_breaker_open": True
245+
}
246+
247+
# Apply current backoff delay before making request
248+
if self.backoff_delay > 1:
249+
logger.debug(f"Applying backoff delay of {self.backoff_delay:.1f}s before status check")
250+
time.sleep(self.backoff_delay)
251+
211252
try:
212253
response = self.session.get(
213254
f"{self.shop_service_url}/api/shop/transaction/{transaction_id}",
214255
timeout=10
215256
)
216257

217258
if response.status_code == 200:
259+
# Success - reset failure counters
260+
self.consecutive_failures = 0
261+
self.backoff_delay = 1
218262
return {"success": True, "data": response.json()}
263+
elif response.status_code == 500:
264+
# Handle 500 errors with backoff logic (but still allow them for demo)
265+
self._handle_status_check_failure("500 Internal Server Error - Jackson serialization issue (demo)")
266+
return {"success": False, "error": f"HTTP {response.status_code} (demo error)", "server_error": True}
267+
elif response.status_code == 404:
268+
# 404 is expected for new transactions, don't count as failure
269+
return {"success": False, "error": f"HTTP {response.status_code} - Transaction not found"}
219270
else:
271+
# Other error codes
272+
self._handle_status_check_failure(f"HTTP {response.status_code}")
220273
return {"success": False, "error": f"HTTP {response.status_code}"}
221274

222275
except requests.exceptions.RequestException as e:
276+
self._handle_status_check_failure(f"Request exception: {str(e)}")
223277
return {"success": False, "error": str(e)}
224278

279+
def _handle_status_check_failure(self, error_msg: str):
280+
"""Handle status check failures with exponential backoff and circuit breaker logic"""
281+
self.status_check_failures += 1
282+
self.consecutive_failures += 1
283+
self.last_failure_time = time.time()
284+
285+
# Implement exponential backoff
286+
self.backoff_delay = min(self.backoff_delay * 1.5, self.max_backoff_delay)
287+
288+
if self.consecutive_failures >= self.circuit_breaker_threshold:
289+
self.status_check_enabled = False
290+
logger.warning(f"Status check circuit breaker opened after {self.consecutive_failures} consecutive failures. "
291+
f"Last error: {error_msg}. Status checks disabled for {self.circuit_breaker_reset_time} seconds.")
292+
else:
293+
logger.debug(f"Status check failure #{self.consecutive_failures}: {error_msg}. "
294+
f"Next request will have {self.backoff_delay:.1f}s backoff delay.")
295+
225296
def run_single_transaction(self) -> Dict[str, Any]:
226-
"""Execute a single transaction (for use with thread pool)"""
297+
"""Execute a single transaction (for use with thread pool)
298+
299+
NOTE: Purchase requests are NEVER affected by status check backoff logic.
300+
Only transaction status polling is subject to circuit breaker protection.
301+
This ensures purchases continue to flow even when status checks are failing.
302+
"""
227303
store = random.choice(self.stores)
228304
terminal = random.choice(store.terminals)
229305

@@ -288,12 +364,20 @@ def run_continuous_load(self, transactions_per_minute: int = 10, duration_minute
288364
if transaction_count % transactions_per_minute == 0:
289365
elapsed_minutes = (time.time() - start_time) / 60
290366
success_rate = (success_count / transaction_count) * 100 if transaction_count > 0 else 0
367+
stats = self.get_status_check_stats()
368+
369+
status_info = ""
370+
if not stats["circuit_breaker_enabled"]:
371+
status_info = f" | Status checks: CIRCUIT BREAKER OPEN ({stats['consecutive_failures']} failures)"
372+
elif stats["total_failures"] > 0:
373+
status_info = f" | Status checks: {stats['total_failures']} total failures, {stats['backoff_delay']:.1f}s backoff"
374+
291375
if run_forever:
292376
logger.info(f"Progress: {transaction_count} transactions in {elapsed_minutes:.1f} minutes "
293-
f"(Success rate: {success_rate:.1f}%) - Running indefinitely")
377+
f"(Success rate: {success_rate:.1f}%) - Running indefinitely{status_info}")
294378
else:
295379
logger.info(f"Progress: {transaction_count} transactions in {elapsed_minutes:.1f} minutes "
296-
f"(Success rate: {success_rate:.1f}%)")
380+
f"(Success rate: {success_rate:.1f}%){status_info}")
297381

298382
# Sleep to maintain target rate
299383
elapsed = time.time() - loop_start
@@ -320,17 +404,27 @@ def run_continuous_load(self, transactions_per_minute: int = 10, duration_minute
320404
return
321405

322406
final_success_rate = (success_count / transaction_count) * 100
407+
stats = self.get_status_check_stats()
323408

324409
logger.info(f"Load generation completed:")
325410
logger.info(f" Total transactions: {transaction_count}")
326411
logger.info(f" Successful submissions: {success_count}")
327412
logger.info(f" Success rate: {final_success_rate:.1f}%")
328413
logger.info(f" Total time: {total_time:.1f} seconds")
329414
logger.info(f" Average TPM: {(transaction_count / (total_time / 60)):.1f}")
415+
logger.info(f"Status check circuit breaker stats:")
416+
logger.info(f" Total status check failures: {stats['total_failures']}")
417+
logger.info(f" Circuit breaker status: {'OPEN' if not stats['circuit_breaker_enabled'] else 'CLOSED'}")
418+
logger.info(f" Final backoff delay: {stats['backoff_delay']:.1f}s")
419+
420+
if stats["total_failures"] > 0:
421+
logger.info("NOTE: Status check failures are expected for this demo (Jackson LocalDateTime serialization errors)")
422+
logger.info("Purchase requests were never affected by status check failures - they continued successfully!")
330423

331424
def _check_pending_transactions(self, pending_transactions: List[Dict[str, Any]]):
332-
"""Check status of pending transactions"""
425+
"""Check status of pending transactions with backoff logic"""
333426
completed_indices = []
427+
status_check_skipped = 0
334428

335429
for i, txn in enumerate(pending_transactions):
336430
# Only check transactions that are at least 30 seconds old
@@ -339,6 +433,11 @@ def _check_pending_transactions(self, pending_transactions: List[Dict[str, Any]]
339433

340434
status_result = self.check_transaction_status(txn["transaction_id"])
341435

436+
# Handle circuit breaker case
437+
if status_result.get("circuit_breaker_open"):
438+
status_check_skipped += 1
439+
continue
440+
342441
if status_result["success"]:
343442
data = status_result["data"]
344443
status = data.get("status")
@@ -347,10 +446,33 @@ def _check_pending_transactions(self, pending_transactions: List[Dict[str, Any]]
347446
if status in ["COMPLETED", "FAILED"]:
348447
logger.info(f"Transaction {txn['transaction_id'][:8]}... from {store} -> {status}")
349448
completed_indices.append(i)
449+
elif status_result.get("server_error"):
450+
# 500 errors are expected for demo - log but continue tracking
451+
logger.debug(f"Transaction {txn['transaction_id'][:8]}... status check failed with expected demo error: {status_result['error']}")
452+
else:
453+
# Other errors (404, network issues, etc.)
454+
logger.debug(f"Transaction {txn['transaction_id'][:8]}... status check failed: {status_result['error']}")
350455

351456
# Remove completed transactions from tracking
352457
for i in reversed(completed_indices):
353458
pending_transactions.pop(i)
459+
460+
# Log circuit breaker status if any checks were skipped
461+
if status_check_skipped > 0:
462+
logger.info(f"Skipped {status_check_skipped} status checks due to circuit breaker protection")
463+
logger.info(f"Status check circuit breaker stats: {self.consecutive_failures} consecutive failures, "
464+
f"next backoff delay: {self.backoff_delay:.1f}s")
465+
466+
def get_status_check_stats(self) -> Dict[str, Any]:
467+
"""Get current status check statistics for monitoring"""
468+
return {
469+
"total_failures": self.status_check_failures,
470+
"consecutive_failures": self.consecutive_failures,
471+
"backoff_delay": self.backoff_delay,
472+
"circuit_breaker_enabled": self.status_check_enabled,
473+
"last_failure_time": self.last_failure_time,
474+
"time_since_last_failure": time.time() - self.last_failure_time if self.last_failure_time > 0 else 0
475+
}
354476

355477
def run_burst_load(self, concurrent_transactions: int = 20, total_transactions: int = 100):
356478
"""Run burst load with concurrent transactions"""

0 commit comments

Comments
 (0)