Skip to content

Commit ab2d207

Browse files
committed
Add extar fraud detection queries
1 parent b7220ce commit ab2d207

File tree

4 files changed

+391
-3
lines changed

4 files changed

+391
-3
lines changed

kubernetes/opentelemetry-demo.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1429,7 +1429,7 @@ spec:
14291429
containers:
14301430
- name: fraud-detection
14311431
#image: 'ghcr.io/open-telemetry/demo:2.1.3-fraud-detection'
1432-
image: ghcr.io/splunk/opentelemetry-demo/otel-fraud-detection:2.1.3-sql.5
1432+
image: ghcr.io/splunk/opentelemetry-demo/otel-fraud-detection:2.1.3.1
14331433
imagePullPolicy: IfNotPresent
14341434
env:
14351435
- name: OTEL_SERVICE_NAME
@@ -1469,6 +1469,8 @@ spec:
14691469
value: sa
14701470
- name: SQL_SERVER_PASSWORD
14711471
value: "ChangeMe_SuperStrong123!"
1472+
- name: FRAUD_DETECTION_RATE
1473+
value: "80"
14721474
- name: CLEANUP_RETENTION_DAYS
14731475
value: "7"
14741476
- name: CLEANUP_INTERVAL_HOURS
Lines changed: 362 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,362 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package frauddetection
7+
8+
import org.apache.logging.log4j.LogManager
9+
import org.apache.logging.log4j.Logger
10+
import kotlin.random.Random
11+
12+
/**
13+
* Executes SQL-based fraud detection queries with varying latency.
14+
* These queries simulate automatic fraud detection analysis that runs
15+
* after an order is logged, demonstrating realistic database monitoring patterns.
16+
*/
17+
class FraudDetectionQueries {
18+
private val logger: Logger = LogManager.getLogger(FraudDetectionQueries::class.java)
19+
20+
/**
21+
* Run fraud detection analysis on a newly inserted order.
22+
* Randomly executes 1-3 fraud detection queries with latency variance.
23+
* @param orderId The order ID to analyze
24+
* @return true if any fraud indicators were found
25+
*/
26+
fun analyzeOrder(orderId: String): Boolean {
27+
val numChecks = Random.nextInt(1, 4) // Run 1-3 checks randomly
28+
var fraudDetected = false
29+
30+
try {
31+
val checksToRun = (0..5).shuffled().take(numChecks)
32+
33+
checksToRun.forEach { checkType ->
34+
val result = when (checkType) {
35+
0 -> checkHighValueOrder(orderId)
36+
1 -> checkDuplicateShippingAddress(orderId)
37+
2 -> checkRapidOrderVelocity(orderId)
38+
3 -> checkSuspiciousCountryPattern(orderId)
39+
4 -> checkAnomalousItemCount(orderId)
40+
5 -> checkHistoricalFraudPatterns(orderId)
41+
else -> false
42+
}
43+
if (result) fraudDetected = true
44+
}
45+
} catch (e: Exception) {
46+
logger.error("Error during fraud detection for order $orderId", e)
47+
}
48+
49+
return fraudDetected
50+
}
51+
52+
/**
53+
* Check 1: High-value order detection with historical comparison
54+
* Latency: 50-200ms (medium complexity query with aggregation)
55+
*/
56+
private fun checkHighValueOrder(orderId: String): Boolean {
57+
DatabaseConfig.getConnection().use { conn ->
58+
// Simulate variable latency
59+
Thread.sleep(Random.nextLong(50, 200))
60+
61+
val sql = """
62+
WITH OrderValue AS (
63+
SELECT
64+
order_id,
65+
shipping_cost_units,
66+
items_count,
67+
(shipping_cost_units + (items_count * 50)) as estimated_value
68+
FROM OrderLogs
69+
WHERE order_id = ?
70+
),
71+
AvgValue AS (
72+
SELECT AVG(shipping_cost_units + (items_count * 50)) as avg_order_value
73+
FROM OrderLogs
74+
WHERE consumed_at >= DATEADD(HOUR, -24, GETDATE())
75+
)
76+
SELECT
77+
CASE
78+
WHEN ov.estimated_value > (av.avg_order_value * 3) THEN 1
79+
ELSE 0
80+
END as is_high_value
81+
FROM OrderValue ov, AvgValue av
82+
""".trimIndent()
83+
84+
conn.prepareStatement(sql).use { stmt ->
85+
stmt.setString(1, orderId)
86+
stmt.executeQuery().use { rs ->
87+
if (rs.next() && rs.getInt("is_high_value") == 1) {
88+
logger.warn("🔍 FRAUD CHECK: High-value order detected for $orderId (>3x avg)")
89+
return true
90+
}
91+
}
92+
}
93+
}
94+
return false
95+
}
96+
97+
/**
98+
* Check 2: Duplicate shipping address with recent orders
99+
* Latency: 100-300ms (complex string matching and temporal query)
100+
*/
101+
private fun checkDuplicateShippingAddress(orderId: String): Boolean {
102+
DatabaseConfig.getConnection().use { conn ->
103+
// Simulate variable latency
104+
Thread.sleep(Random.nextLong(100, 300))
105+
106+
val sql = """
107+
WITH CurrentOrder AS (
108+
SELECT shipping_street, shipping_city, shipping_zip
109+
FROM OrderLogs
110+
WHERE order_id = ?
111+
)
112+
SELECT COUNT(DISTINCT ol.order_id) as duplicate_count
113+
FROM OrderLogs ol, CurrentOrder co
114+
WHERE ol.shipping_street = co.shipping_street
115+
AND ol.shipping_city = co.shipping_city
116+
AND ol.shipping_zip = co.shipping_zip
117+
AND ol.order_id != ?
118+
AND ol.consumed_at >= DATEADD(HOUR, -1, GETDATE())
119+
""".trimIndent()
120+
121+
conn.prepareStatement(sql).use { stmt ->
122+
stmt.setString(1, orderId)
123+
stmt.setString(2, orderId)
124+
stmt.executeQuery().use { rs ->
125+
if (rs.next()) {
126+
val dupes = rs.getInt("duplicate_count")
127+
if (dupes >= 3) {
128+
logger.warn("🔍 FRAUD CHECK: Duplicate shipping address for $orderId ($dupes recent orders)")
129+
insertFraudAlert(orderId, "DUPLICATE_ADDRESS", "MEDIUM", dupes * 0.15)
130+
return true
131+
}
132+
}
133+
}
134+
}
135+
}
136+
return false
137+
}
138+
139+
/**
140+
* Check 3: Rapid order velocity from same location
141+
* Latency: 80-250ms (temporal aggregation with grouping)
142+
*/
143+
private fun checkRapidOrderVelocity(orderId: String): Boolean {
144+
DatabaseConfig.getConnection().use { conn ->
145+
// Simulate variable latency
146+
Thread.sleep(Random.nextLong(80, 250))
147+
148+
val sql = """
149+
WITH CurrentOrder AS (
150+
SELECT shipping_city, shipping_state, shipping_country, consumed_at
151+
FROM OrderLogs
152+
WHERE order_id = ?
153+
)
154+
SELECT
155+
COUNT(*) as order_count,
156+
COUNT(DISTINCT order_id) as unique_orders,
157+
DATEDIFF(MINUTE, MIN(ol.consumed_at), MAX(ol.consumed_at)) as time_span_minutes
158+
FROM OrderLogs ol
159+
INNER JOIN CurrentOrder co ON
160+
ol.shipping_city = co.shipping_city AND
161+
ol.shipping_state = co.shipping_state AND
162+
ol.shipping_country = co.shipping_country
163+
WHERE ol.consumed_at >= DATEADD(MINUTE, -15, GETDATE())
164+
HAVING COUNT(*) >= 5
165+
""".trimIndent()
166+
167+
conn.prepareStatement(sql).use { stmt ->
168+
stmt.setString(1, orderId)
169+
stmt.executeQuery().use { rs ->
170+
if (rs.next()) {
171+
val orderCount = rs.getInt("order_count")
172+
val timeSpan = rs.getInt("time_span_minutes")
173+
if (orderCount >= 5) {
174+
val riskScore = (orderCount / 5.0) * 0.25
175+
logger.warn("🔍 FRAUD CHECK: Rapid order velocity for $orderId ($orderCount orders in $timeSpan mins)")
176+
insertFraudAlert(orderId, "RAPID_VELOCITY", "HIGH", riskScore)
177+
return true
178+
}
179+
}
180+
}
181+
}
182+
}
183+
return false
184+
}
185+
186+
/**
187+
* Check 4: Suspicious country/region pattern analysis
188+
* Latency: 120-350ms (complex geo-pattern with historical joins)
189+
*/
190+
private fun checkSuspiciousCountryPattern(orderId: String): Boolean {
191+
DatabaseConfig.getConnection().use { conn ->
192+
// Simulate variable latency
193+
Thread.sleep(Random.nextLong(120, 350))
194+
195+
val sql = """
196+
WITH OrderCountry AS (
197+
SELECT shipping_country, shipping_state
198+
FROM OrderLogs
199+
WHERE order_id = ?
200+
),
201+
CountryStats AS (
202+
SELECT
203+
shipping_country,
204+
COUNT(*) as total_orders,
205+
AVG(CAST(shipping_cost_units AS FLOAT)) as avg_shipping_cost,
206+
COUNT(DISTINCT shipping_city) as unique_cities
207+
FROM OrderLogs
208+
WHERE consumed_at >= DATEADD(DAY, -7, GETDATE())
209+
GROUP BY shipping_country
210+
)
211+
SELECT
212+
cs.total_orders,
213+
cs.avg_shipping_cost,
214+
cs.unique_cities,
215+
CASE
216+
WHEN cs.total_orders < 5 THEN 1
217+
WHEN cs.avg_shipping_cost > 100 THEN 1
218+
ELSE 0
219+
END as is_suspicious
220+
FROM OrderCountry oc
221+
LEFT JOIN CountryStats cs ON oc.shipping_country = cs.shipping_country
222+
""".trimIndent()
223+
224+
conn.prepareStatement(sql).use { stmt ->
225+
stmt.setString(1, orderId)
226+
stmt.executeQuery().use { rs ->
227+
if (rs.next() && rs.getInt("is_suspicious") == 1) {
228+
val totalOrders = rs.getInt("total_orders")
229+
logger.warn("🔍 FRAUD CHECK: Suspicious country pattern for $orderId (rare country: $totalOrders orders)")
230+
insertFraudAlert(orderId, "SUSPICIOUS_LOCATION", "MEDIUM", 0.35)
231+
return true
232+
}
233+
}
234+
}
235+
}
236+
return false
237+
}
238+
239+
/**
240+
* Check 5: Anomalous item count with statistical analysis
241+
* Latency: 60-180ms (statistical aggregation query)
242+
*/
243+
private fun checkAnomalousItemCount(orderId: String): Boolean {
244+
DatabaseConfig.getConnection().use { conn ->
245+
// Simulate variable latency
246+
Thread.sleep(Random.nextLong(60, 180))
247+
248+
val sql = """
249+
WITH CurrentOrder AS (
250+
SELECT items_count
251+
FROM OrderLogs
252+
WHERE order_id = ?
253+
),
254+
ItemStats AS (
255+
SELECT
256+
AVG(CAST(items_count AS FLOAT)) as avg_items,
257+
STDEV(CAST(items_count AS FLOAT)) as stddev_items
258+
FROM OrderLogs
259+
WHERE consumed_at >= DATEADD(DAY, -1, GETDATE())
260+
)
261+
SELECT
262+
co.items_count,
263+
is_stat.avg_items,
264+
is_stat.stddev_items,
265+
CASE
266+
WHEN co.items_count > (is_stat.avg_items + (2 * is_stat.stddev_items)) THEN 1
267+
ELSE 0
268+
END as is_anomalous
269+
FROM CurrentOrder co, ItemStats is_stat
270+
""".trimIndent()
271+
272+
conn.prepareStatement(sql).use { stmt ->
273+
stmt.setString(1, orderId)
274+
stmt.executeQuery().use { rs ->
275+
if (rs.next() && rs.getInt("is_anomalous") == 1) {
276+
val itemCount = rs.getInt("items_count")
277+
logger.warn("🔍 FRAUD CHECK: Anomalous item count for $orderId (count: $itemCount, >2σ from mean)")
278+
insertFraudAlert(orderId, "ANOMALOUS_ITEMS", "LOW", 0.20)
279+
return true
280+
}
281+
}
282+
}
283+
}
284+
return false
285+
}
286+
287+
/**
288+
* Check 6: Historical fraud pattern matching with correlated subqueries
289+
* Latency: 150-400ms (expensive multi-table joins and correlation)
290+
*/
291+
private fun checkHistoricalFraudPatterns(orderId: String): Boolean {
292+
DatabaseConfig.getConnection().use { conn ->
293+
// Simulate variable latency
294+
Thread.sleep(Random.nextLong(150, 400))
295+
296+
val sql = """
297+
WITH CurrentOrder AS (
298+
SELECT shipping_street, shipping_city, shipping_country, items_count
299+
FROM OrderLogs
300+
WHERE order_id = ?
301+
),
302+
HistoricalFraud AS (
303+
SELECT DISTINCT fa.order_id, ol.shipping_street, ol.shipping_city
304+
FROM FraudAlerts fa
305+
INNER JOIN OrderLogs ol ON fa.order_id = ol.order_id
306+
WHERE fa.severity IN ('HIGH', 'CRITICAL')
307+
AND fa.detected_at >= DATEADD(DAY, -30, GETDATE())
308+
)
309+
SELECT
310+
COUNT(DISTINCT hf.order_id) as matching_fraud_patterns,
311+
STRING_AGG(hf.order_id, ', ') as matching_order_ids
312+
FROM CurrentOrder co
313+
INNER JOIN HistoricalFraud hf ON
314+
(co.shipping_street = hf.shipping_street OR co.shipping_city = hf.shipping_city)
315+
HAVING COUNT(DISTINCT hf.order_id) > 0
316+
""".trimIndent()
317+
318+
conn.prepareStatement(sql).use { stmt ->
319+
stmt.setString(1, orderId)
320+
stmt.executeQuery().use { rs ->
321+
if (rs.next()) {
322+
val matchCount = rs.getInt("matching_fraud_patterns")
323+
if (matchCount > 0) {
324+
val riskScore = Math.min(matchCount * 0.30, 0.90)
325+
logger.warn("🔍 FRAUD CHECK: Historical fraud pattern match for $orderId ($matchCount similar patterns)")
326+
insertFraudAlert(orderId, "HISTORICAL_PATTERN", "HIGH", riskScore)
327+
return true
328+
}
329+
}
330+
}
331+
}
332+
}
333+
return false
334+
}
335+
336+
/**
337+
* Insert a fraud alert record into the FraudAlerts table
338+
*/
339+
private fun insertFraudAlert(orderId: String, alertType: String, severity: String, riskScore: Double) {
340+
try {
341+
DatabaseConfig.getConnection().use { conn ->
342+
val sql = """
343+
INSERT INTO FraudAlerts (order_id, alert_type, severity, risk_score, reason)
344+
VALUES (?, ?, ?, ?, ?)
345+
""".trimIndent()
346+
347+
conn.prepareStatement(sql).use { stmt ->
348+
stmt.setString(1, orderId)
349+
stmt.setString(2, alertType)
350+
stmt.setString(3, severity)
351+
stmt.setDouble(4, riskScore)
352+
stmt.setString(5, "Automatic fraud detection triggered for $alertType")
353+
354+
stmt.executeUpdate()
355+
logger.info("📝 Fraud alert created for order $orderId: $alertType ($severity, risk: $riskScore)")
356+
}
357+
}
358+
} catch (e: Exception) {
359+
logger.error("Failed to insert fraud alert for order $orderId", e)
360+
}
361+
}
362+
}

0 commit comments

Comments
 (0)