1- """
2- InfluxDB Data Push Utility
3- Provides convenient InfluxDB data writing functionality
4- """
5-
6- from datetime import datetime
7- from typing import Dict , Any , Optional , Union
1+ import json
2+ import time
3+ from datetime import datetime , timezone
4+ import threading
5+ from typing import Dict , Any , Optional
86from influxdb_client import InfluxDBClient , Point , WritePrecision
97from influxdb_client .client .write_api import SYNCHRONOUS
10- from config_utils import config_utils as config_instance
8+ from common .config_utils import config_utils as config_instance
9+ import logging
10+ import os
11+
12+ logger = logging .getLogger (__name__ )
13+
14+
15+
1116
1217class InfluxDBUtils :
13- """InfluxDB Utility Class"""
18+ """
19+ InfluxDB Utility with Singleton pattern, health check, retry, and backup/resend logic.
20+ It uses a synchronous write approach for simplicity in error handling.
21+ """
22+
23+ _instance = None
24+ _lock = threading .Lock ()
25+
26+ def __new__ (cls , * args , ** kwargs ):
27+ """Ensures only a single instance of InfluxDBUtils is created (Singleton)."""
28+ if cls ._instance is None :
29+ with cls ._lock :
30+ if cls ._instance is None :
31+ cls ._instance = super ().__new__ (cls )
32+ return cls ._instance
33+
34+ def __init__ (self , url : str , token : str , org : str , bucket : str ,
35+ backup_file : str = "failed_push.jsonl" , retry_interval : int = 5 , heartbeat_interval : int = 30 ):
36+ # Prevent re-initialization
37+ if hasattr (self , "_initialized" ) and self ._initialized :
38+ return
39+
40+ self .url = url
41+ self .token = token
42+ self .org = org
43+ self .bucket = bucket
44+ self .backup_file = backup_file
45+ self .retry_interval = retry_interval
46+ self .heartbeat_interval = heartbeat_interval
47+ self ._client : Optional [InfluxDBClient ] = None
48+ self ._write_api = None
49+ self ._stop_heartbeat = threading .Event ()
50+
51+ self ._connect ()
52+ self ._start_heartbeat ()
53+
54+ self ._initialized = True
55+
56+ def _connect (self ):
57+ """Try to establish connection to InfluxDB with retry mechanism."""
58+ max_attempts = 3
59+ for attempt in range (max_attempts ):
60+ try :
61+ self ._client = InfluxDBClient (url = self .url , token = self .token , org = self .org )
62+
63+ # Check server health right after client initialization
64+ self ._client .ping ()
65+
66+ self ._write_api = self ._client .write_api (write_options = SYNCHRONOUS )
67+ # logger.info("Connected to InfluxDB successfully.")
68+
69+ # Attempt to resend backed up metrics immediately after connection is established
70+ self ._resend_backed_up_metrics ()
71+ return
72+ except Exception as e :
73+ logger .error (f"InfluxDB connection failed (attempt { attempt + 1 } /{ max_attempts } ): { e } " )
74+ if attempt < max_attempts - 1 :
75+ time .sleep (self .retry_interval )
76+
77+ # If all attempts fail, raise an error
78+ raise ConnectionError (f"Failed to connect to InfluxDB after { max_attempts } attempts." )
79+
80+
81+ def _start_heartbeat (self ):
82+ """Start periodic health check."""
83+ def heartbeat ():
84+ while not self ._stop_heartbeat .is_set ():
85+ try :
86+ # Use ping for a light-weight connectivity check
87+ health = self ._client .ping ()
88+ if not health :
89+ logger .warning ("InfluxDB health check failed." )
90+ except Exception as e :
91+ # This could indicate a lost connection
92+ logger .warning (f"InfluxDB heartbeat error: { e } " )
93+
94+ # Wait for the next interval or until stopped
95+ time .sleep (self .heartbeat_interval )
96+
97+ threading .Thread (target = heartbeat , daemon = True ).start ()
98+
99+ def stop (self ):
100+ """Stop heartbeat and close the InfluxDB client connection."""
101+ self ._stop_heartbeat .set ()
102+ if self ._client :
103+ self ._client .close ()
104+ logger .info ("InfluxDB client connection closed." )
105+
106+ def _resend_backed_up_metrics (self ):
107+ """Attempts to resend metrics from the backup file and clears the file on success."""
108+ if not os .path .exists (self .backup_file ) or os .path .getsize (self .backup_file ) == 0 :
109+ logger .debug ("No backup file found or file is empty to resend." )
110+ return
111+
112+ successful_resends = []
113+ failed_resends = []
114+
115+ logger .info (f"Attempting to resend metrics from backup file: { self .backup_file } " )
116+
117+ try :
118+ with open (self .backup_file , "r" , encoding = "utf-8" ) as f :
119+ lines = f .readlines ()
14120
15- def __init__ (self ):
16- """Initialize InfluxDB connection"""
17- self .config = config_instance .get_config ("influxdb" )
121+ for line in lines :
122+ try :
123+ data = json .loads (line )
124+
125+ # Reconstruct Point object. Note: Using the original timestamp from backup.
126+ point = Point (data ["measurement" ])
127+ point .time (data ["time" ], WritePrecision .NS )
128+
129+ # Add tags, including a marker for resend
130+ point .tag ("resend" , "true" )
131+ for k , v in data .get ("tags" , {}).items ():
132+ point .tag (k , v )
133+
134+ # Add field
135+ point .field (data ["field_key" ], data ["field_value" ])
18136
137+ # Attempt to write (synchronous)
138+ self ._write_api .write (bucket = self .bucket , org = self .org , record = point )
139+ successful_resends .append (line )
140+ except Exception as e :
141+ logger .error (f"Failed to resend backed-up metric: { e } . Data: { line .strip ()} " )
142+ failed_resends .append (line )
143+ # Stop resending on the first failure to avoid processing the whole file only to fail again
144+ break
145+
146+ # Rewrite the backup file only with failed lines
147+ if successful_resends or failed_resends :
148+ with open (self .backup_file , "w" , encoding = "utf-8" ) as f :
149+ for line in failed_resends :
150+ f .write (line )
151+
152+ if not failed_resends :
153+ logger .info ("Successfully resent all backed-up metrics and cleared the backup file." )
154+ else :
155+ logger .warning (f"Resent { len (successful_resends )} metrics. { len (failed_resends )} metrics remain in backup." )
19156
20- # Global InfluxDB utility instance
21- influxdb_utils = InfluxDBUtils ( )
157+ except Exception as fe :
158+ logger . error ( f"Error reading or processing backup file: { fe } " )
22159
23160
24- def push_to_influx (measurement : str ,
25- value : Union [int , float , str ],
26- tags : Optional [Dict [str , str ]] = None ,
27- fields : Optional [Dict [str , Union [int , float , str ]]] = None ,
28- timestamp : Optional [datetime ] = None ) -> bool :
161+ def push_metric (self , measurement : str , field_key : str , field_value : Any , tags : Optional [Dict [str , str ]] = None ):
162+ """Public interface: pushes a single metric point to InfluxDB."""
163+ # Using current UTC time in nanosecond precision
164+ point = Point (measurement ).time (datetime .now (timezone .utc ), WritePrecision .NS )
29165
30- return None
166+ if tags :
167+ for k , v in tags .items ():
168+ point .tag (k , v )
169+
170+ point .field (field_key , field_value )
31171
172+ try :
173+ self ._write_api .write (bucket = self .bucket , org = self .org , record = point )
174+ logger .debug (f"Pushed metric: { measurement } ={ field_value } with tags { tags } " )
175+ except Exception as e :
176+ # If write fails, log error and trigger local backup
177+ logger .error (f"Failed to push metric { measurement } : { e } . Backing up." )
178+ self ._backup_metric (measurement , field_key , field_value , tags )
32179
33- def push_test_metric (test_name : str ,
34- metric_name : str ,
35- value : Union [int , float ],
36- additional_tags : Optional [Dict [str , str ]] = None ) -> bool :
37- print ("Push to InfluxDB, To be implemented." )
180+ def _backup_metric (self , measurement , field_key , field_value , tags ):
181+ """Stores the failed metric locally in JSON Line format."""
182+ data = {
183+ # Storing the exact time of the push failure for later resending
184+ "time" : datetime .now (timezone .utc ).isoformat (),
185+ "measurement" : measurement ,
186+ "field_key" : field_key ,
187+ "field_value" : field_value ,
188+ "tags" : tags or {}
189+ }
190+ try :
191+ # Ensure the directory exists
192+ os .makedirs (os .path .dirname (self .backup_file ), exist_ok = True )
193+ with open (self .backup_file , "a" , encoding = "utf-8" ) as f :
194+ f .write (json .dumps (data , ensure_ascii = False ) + "\n " )
195+ logger .warning (f"Saved failed metric to backup file: { self .backup_file } " )
196+ except Exception as fe :
197+ logger .error (f"Failed to write backup file: { fe } " )
38198
199+ # Instantiate the Singleton instance using configuration values
200+ influxdb_utils = InfluxDBUtils (
201+ url = config_instance .get_nested_config ("influxdb.url" , "localhost" ),
202+ token = config_instance .get_nested_config ("influxdb.token" , os .getenv ("INFLUXDB_TOKEN" )),
203+ org = config_instance .get_nested_config ("influxdb.org" , "ucm" ),
204+ bucket = config_instance .get_nested_config ("influxdb.bucket" , "ucm_test_bucket" ),
205+ backup_file = config_instance .get_nested_config ("influxdb.offline_file" , ".cache/influxdb_offline.jsonl" )
206+ )
39207
208+ # Example usage
40209if __name__ == "__main__" :
41- # Simple data push
42- push_to_influx ("response_time" , 0.123 )
43-
44- # Data push with tags
45- push_to_influx ("accuracy" , 0.95 , {
46- "model" : "v1.0" ,
47- "platform" : "gpu" ,
48- "test_case" : "classification"
49- })
50-
51- # Test metric push
52- push_test_metric ("test_calculation_accuracy" , "calculation_time" , 0.001 , {
53- "feature" : "accuracy"
54- })
55-
56- # Data push with timestamp
57- from datetime import datetime
58- push_to_influx ("memory_usage" , 1024 , {"test" : "memory" }, timestamp = datetime .now ())
210+ logging .basicConfig (level = logging .INFO )
211+ try :
212+ # This will attempt to connect and resend any backed up metrics
213+ influxdb_utils .push_metric ("cpu_usage" , "value" , 0.88 , {"host" : "server1" , "region" : "us-west" })
214+ influxdb_utils .push_metric ("memory_free" , "bytes" , 1024 * 1024 * 512 , {"host" : "server2" })
215+
216+ # Simulate a connection failure (if you change the URL to something invalid and run this)
217+ # The metric will be backed up locally
218+ # influxdb_utils.push_metric("fail_test", "value", 999, {"host": "test_server"})
219+
220+ except ConnectionError as ce :
221+ logger .error (f"Application cannot start: { ce } " )
222+ finally :
223+ # Ensures clean shutdown
224+ influxdb_utils .stop ()
0 commit comments