Skip to content

Commit b7cccaf

Browse files
authored
Improve lookup joins (#1057)
1 parent 22b37a0 commit b7cccaf

File tree

1 file changed

+142
-22
lines changed

1 file changed

+142
-22
lines changed

docs/joins.md

Lines changed: 142 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -150,53 +150,173 @@ If you change timestamps of the record during processing, they will be processed
150150
!!! warning
151151
This is an experimental feature; the API may change in future releases.
152152

153+
`StreamingDataFrame.join_lookup()` is a special type of join that allows you to enrich records in a streaming dataframe with data from external systems. This is particularly useful for enriching streaming data with configuration or reference data from external sources like databases, configuration services, or APIs.
153154

154-
`StreamingDataFrame.join_lookup()` is a special type of join that allows you to enrich records in a streaming dataframe with the data from external systems.
155+
### Key Concepts
155156

156-
You can use it to enriching streaming data with configuration or reference data from an external source, like a database.
157+
Lookup joins work by:
157158

158-
### Example
159-
160-
To perform a lookup join, you need:
161-
162-
1. A subclass of [quixstreams.dataframe.joins.lookups.base.BaseLookup](api-reference/dataframe.md#baselookup) to query the external source and cache the results when necessary.
163-
2. A subclass of [quixstreams.dataframe.joins.lookups.base.BaseField](api-reference/dataframe.md#basefield) to define how the data is extracted from the result.
164-
3. To pass the lookup and the fields to the `StreamingDataFrame.join_lookup`.
159+
1. **Lookup Strategy**: A subclass of `BaseLookup` that defines how to query external sources and cache results
160+
2. **Field Definitions**: Subclasses of `BaseField` that specify how to extract and map enrichment data
161+
3. **In-place Enrichment**: Records are updated directly with the enrichment data
165162

163+
### Basic Example with SQLite
166164

167-
See [SQLiteLookup](api-reference/dataframe.md#sqlitelookup) and [SQLiteLookupField](api-reference/dataframe.md#sqlitelookupfield) for the reference implementation.
168-
169-
Here is an example of lookup join with a SQLite database:
165+
Here's a simple example using SQLite for reference data:
170166

171167
```python
172168
from quixstreams import Application
173169
from quixstreams.dataframe.joins.lookups import SQLiteLookup, SQLiteLookupField
174170

175171
app = Application(...)
176172

177-
# An implementation of BaseLookup for SQLite
178-
lookup = SQLiteLookup(path="db.db")
173+
# Create a lookup instance for SQLite database
174+
lookup = SQLiteLookup(path="reference_data.db")
179175

180-
sdf = app.dataframe(app.topic("input"))
176+
sdf = app.dataframe(app.topic("sensor-data"))
181177

178+
# Enrich sensor data with reference information
182179
sdf = sdf.join_lookup(
183180
lookup,
184-
on="column", # A column in StreamingDataFrame to join on
181+
on="device_id", # Column to match on
185182
fields={
186-
# A mapping with SQLite fields to join with
187-
"lookup": SQLiteLookupField(table="table", columns=["column"], on="id"),
183+
"device_info": SQLiteLookupField(
184+
table="devices",
185+
columns=["name", "model", "location"],
186+
on="device_id"
187+
),
188+
"calibration": SQLiteLookupField(
189+
table="calibrations",
190+
columns=["offset", "scale", "last_calibrated"],
191+
on="device_id"
192+
)
188193
},
189194
)
190195

191196
if __name__ == '__main__':
192197
app.run()
193198
```
194199

195-
### How it works
200+
### Dynamic Configuration Integration
201+
202+
For real-time configuration management, use the Quix Configuration Service integration:
203+
204+
```python
205+
from quixstreams import Application
206+
from quixstreams.dataframe.joins.lookups import QuixConfigurationService
207+
208+
app = Application()
209+
210+
# Create a lookup instance pointing to your configuration topic
211+
lookup = QuixConfigurationService(
212+
topic=app.topic("device-configurations"),
213+
app_config=app.config
214+
)
215+
216+
sdf = app.dataframe(app.topic("sensor-data"))
217+
218+
# Enrich with dynamic configuration data
219+
sdf = sdf.join_lookup(
220+
lookup=lookup,
221+
on="device_id",
222+
fields={
223+
"device_config": lookup.json_field(
224+
jsonpath="$.device",
225+
type="device-config"
226+
),
227+
"calibration_params": lookup.json_field(
228+
jsonpath="$.calibration",
229+
type="device-config"
230+
),
231+
"firmware_version": lookup.json_field(
232+
jsonpath="$.firmware.version",
233+
type="device-config"
234+
)
235+
}
236+
)
237+
238+
if __name__ == '__main__':
239+
app.run()
240+
```
241+
242+
### Advanced Configuration Matching
243+
244+
Use custom key matching logic for complex scenarios:
245+
246+
```python
247+
def custom_key_matcher(value, key):
248+
"""Custom logic to determine configuration key"""
249+
device_type = value.get("device_type", "unknown")
250+
location = value.get("location", "default")
251+
return f"{device_type}-{location}"
252+
253+
254+
# Use custom key matching
255+
sdf = sdf.join_lookup(
256+
lookup=lookup,
257+
on=custom_key_matcher,
258+
fields={
259+
"location_config": lookup.json_field(
260+
jsonpath="$",
261+
type="location-config"
262+
)
263+
}
264+
)
265+
```
266+
267+
### Binary Data Support
268+
269+
For non-JSON configurations (firmware files, calibration data, etc.):
270+
271+
```python
272+
sdf = sdf.join_lookup(
273+
lookup=lookup,
274+
on="device_id",
275+
fields={
276+
"firmware_binary": lookup.bytes_field(
277+
type="firmware"
278+
),
279+
"calibration_data": lookup.bytes_field(
280+
type="calibration"
281+
)
282+
}
283+
)
284+
```
285+
286+
### How Lookup Joins Work
287+
288+
1. **Record Processing**: For each record in the dataframe, the lookup strategy is called with the matching key and field definitions
289+
2. **External Query**: The lookup strategy queries the external source (database, API, configuration service) based on the key
290+
3. **Data Extraction**: Field definitions specify how to extract and map the enrichment data from the external source
291+
4. **In-place Update**: The record is updated directly with the enrichment data
292+
5. **Caching**: Results are cached locally to minimize external calls and improve performance
293+
294+
### Performance Considerations
295+
296+
- **Caching**: Lookup results are cached to minimize external API calls
297+
- **Batch Processing**: Consider batching multiple lookups for better performance
298+
- **Error Handling**: Implement proper fallback behavior for missing data
299+
- **Memory Usage**: Be mindful of cache size for large datasets
300+
301+
### Use Cases
302+
303+
- **Configuration Enrichment**: Enrich streaming data with device configurations, calibration parameters, or system settings
304+
- **Reference Data**: Join with lookup tables for device information, user profiles, or product catalogs
305+
- **Real-time Updates**: Use with Dynamic Configuration for real-time configuration updates
306+
- **Data Validation**: Enrich with validation rules or business logic
307+
- **Multi-source Enrichment**: Combine data from multiple external sources
308+
309+
### Integration with Quix Cloud
310+
311+
For production use cases, consider using the [Quix Dynamic Configuration service](https://quix.io/docs/quix-cloud/managed-services/dynamic-configuration.html) which provides:
312+
313+
- **Real-time Configuration Updates**: Lightweight Kafka events for configuration changes
314+
- **Version Management**: Automatic versioning and timestamp-based lookups
315+
- **Large File Support**: Handle configuration files too large for direct Kafka streaming
316+
- **Binary Data Support**: Support for both JSON and binary configuration content
317+
- **High Performance**: Optimized for high-throughput streaming applications
196318

197-
- For each record in the dataframe, a user-defined lookup strategy (a subclass of `BaseLookup`) is called with a mapping of field names to field definitions (subclasses of `BaseField`).
198-
- The lookup strategy fetches or computes enrichment data based on the provided key and fields, and updates the record in-place.
199-
- The enrichment can come from external sources such as configuration topics, databases, or in-memory data.
319+
👉 See the [Dynamic Configuration documentation](https://quix.io/docs/quix-cloud/managed-services/dynamic-configuration.html) for complete setup and usage details.
200320

201321
## Interval join
202322

0 commit comments

Comments
 (0)