|
1 | 1 | # import Utility modules |
2 | 2 | import os |
3 | | -import logging |
4 | 3 |
|
5 | 4 | # import vendor-specific modules |
6 | 5 | from quixstreams import Application |
|
10 | 9 | from dotenv import load_dotenv |
11 | 10 | load_dotenv() |
12 | 11 |
|
13 | | -logging.basicConfig(level=logging.INFO) |
14 | | -logger = logging.getLogger(__name__) |
15 | 12 |
|
16 | | -# read the consumer group from config |
17 | | -consumer_group_name = os.environ.get("CONSUMER_GROUP_NAME", "influxdb-data-writer") |
18 | | - |
19 | | -# read the timestamp column from config |
20 | | -timestamp_column = os.environ.get("TIMESTAMP_COLUMN") if os.environ.get("TIMESTAMP_COLUMN") else None |
| 13 | +tag_keys = keys.split(",") if (keys := os.environ.get("INFLUXDB_TAG_KEYS")) else [] |
| 14 | +field_keys = keys.split(",") if (keys := os.environ.get("INFLUXDB_FIELD_KEYS")) else [] |
| 15 | +measurement_name = os.environ.get("INFLUXDB_MEASUREMENT_NAME", "measurement1") |
| 16 | +time_setter = col if (col := os.environ.get("TIMESTAMP_COLUMN")) else None |
21 | 17 |
|
22 | | -buffer_size = int(os.environ.get("BUFFER_SIZE", "1000")) |
| 18 | +influxdb_v3_sink = InfluxDB3Sink( |
| 19 | + token=os.environ["INFLUXDB_TOKEN"], |
| 20 | + host=os.environ["INFLUXDB_HOST"], |
| 21 | + organization_id=os.environ["INFLUXDB_ORG"], |
| 22 | + tags_keys=tag_keys, |
| 23 | + fields_keys=field_keys, |
| 24 | + time_setter=time_setter, |
| 25 | + database=os.environ["INFLUXDB_DATABASE"], |
| 26 | + measurement=measurement_name, |
| 27 | +) |
23 | 28 |
|
24 | | -buffer_delay = float(os.environ.get("BUFFER_DELAY", "1")) |
25 | 29 |
|
26 | | -# Create a Quix platform-specific application instead |
27 | 30 | app = Application( |
28 | | - consumer_group=consumer_group_name, |
| 31 | + consumer_group=os.environ.get("CONSUMER_GROUP_NAME", "influxdb-data-writer"), |
29 | 32 | auto_offset_reset="earliest", |
30 | | - commit_every=buffer_size, |
31 | | - commit_interval=buffer_delay) |
32 | | - |
| 33 | + commit_every=int(os.environ.get("BUFFER_SIZE", "1000")), |
| 34 | + commit_interval=float(os.environ.get("BUFFER_DELAY", "1")), |
| 35 | +) |
33 | 36 | input_topic = app.topic(os.environ["input"]) |
34 | 37 |
|
35 | | -# Read the environment variable and convert it to a dictionary |
36 | | -tag_keys = os.environ.get("INFLUXDB_TAG_KEYS", "").split(",") if os.environ.get("INFLUXDB_TAG_KEYS") else [] |
37 | | -field_keys = os.environ.get("INFLUXDB_FIELD_KEYS", "").split(",")if os.environ.get("INFLUXDB_FIELD_KEYS") else [] |
38 | | -measurement_name = os.environ.get("INFLUXDB_MEASUREMENT_NAME", "measurement1") |
39 | | - |
40 | | -influxdb_v3_sink = InfluxDB3Sink( |
41 | | - token=os.environ["INFLUXDB_TOKEN"], |
42 | | - host=os.environ["INFLUXDB_HOST"], |
43 | | - organization_id=os.environ["INFLUXDB_ORG"], |
44 | | - tags_keys=tag_keys, |
45 | | - fields_keys=field_keys, |
46 | | - time_key=timestamp_column, |
47 | | - database=os.environ["INFLUXDB_DATABASE"], |
48 | | - measurement=measurement_name) |
49 | | - |
50 | 38 | sdf = app.dataframe(input_topic) |
51 | | - |
52 | | -#sdf.print() |
53 | 39 | sdf.sink(influxdb_v3_sink) |
54 | 40 |
|
| 41 | + |
55 | 42 | if __name__ == "__main__": |
56 | | - logger.info("Starting application") |
57 | 43 | app.run() |
58 | | - |
59 | | - |
0 commit comments