1+ import argparse
2+ import pika
3+ from influxdb_client import InfluxDBClient
4+ from influxdb_client .client .exceptions import InfluxDBError
5+ from influxdb_client .client .write_api import SYNCHRONOUS , WritePrecision
6+ from os import getenv
7+ import logging
8+ import ssl
9+ import wagglemsg as message
10+ from contextlib import ExitStack
11+ from prometheus_client import start_http_server , Counter
12+
13+ messages_processed_total = Counter ("loader_messages_processed_total" , "Total messages processed by data loader." )
14+
15+
16+ def assert_type (obj , t ):
17+ if not isinstance (obj , t ):
18+ raise TypeError (f"{ obj !r} must be { t } " )
19+
20+
21+ def assert_maxlen (s , n ):
22+ if len (s ) > n :
23+ raise ValueError (f"len({ s !r} ) must be <= { n } " )
24+
25+
26+ def assert_valid_message (msg ):
27+ assert_type (msg .name , str )
28+ assert_maxlen (msg .name , 64 )
29+ assert_type (msg .timestamp , int )
30+ assert_type (msg .value , (int , float , str ))
31+ assert_type (msg .meta , dict )
32+ for k , v in msg .meta .items ():
33+ assert_type (k , str )
34+ assert_maxlen (k , 64 )
35+ assert_type (v , str )
36+ assert_maxlen (v , 64 )
37+ if "node" not in msg .meta :
38+ raise KeyError ("message missing node meta field" )
39+
40+
41+ def coerce_value (x ):
42+ if isinstance (x , int ):
43+ return float (x )
44+ return x
45+
46+
47+ class MessageHandler :
48+
49+ def __init__ (self , rabbitmq_conn : pika .BlockingConnection , influxdb_client : InfluxDBClient , influxdb_bucket : str ,
50+ influxdb_org : str , max_flush_interval : float , max_batch_size : int ):
51+ self .rabbitmq_conn = rabbitmq_conn
52+ self .influxdb_client = influxdb_client
53+ self .influxdb_bucket = influxdb_bucket
54+ self .influxdb_org = influxdb_org
55+ self .max_flush_interval = max_flush_interval
56+ self .max_batch_size = max_batch_size
57+ self .batch = []
58+
59+ def flush (self ):
60+ if len (self .batch ) == 0 :
61+ return
62+
63+ logging .info ("flushing batch with %d records" , len (self .batch ))
64+ records = []
65+
66+ # create records from batch
67+ for ch , method , properties , body in self .batch :
68+ try :
69+ msg = message .load (body )
70+ except Exception :
71+ logging .exception ("failed to parse message" )
72+ continue
73+
74+ try :
75+ assert_valid_message (msg )
76+ except Exception :
77+ logging .exception ("dropping invalid message: %s" , msg )
78+ continue
79+
80+ # # check that meta["node"] matches user_id
81+ # if "node-"+msg.meta["node"] != properties.user_id:
82+ # logging.info("dropping invalid message: username (%s) doesn't match node meta (%s) - ", msg.meta["node"], properties.user_id)
83+ # continue
84+
85+ logging .debug ("creating record for msg: %s value-type: %s" , msg , type (msg .value ))
86+ records .append ({
87+ "measurement" : msg .name ,
88+ "tags" : msg .meta ,
89+ "fields" : {
90+ "value" : coerce_value (msg .value ),
91+ },
92+ "time" : msg .timestamp ,
93+ })
94+
95+ # write entire batch to influxdb
96+ logging .info ("writing %d records to influxdb" , len (records ))
97+ with self .influxdb_client .write_api (write_options = SYNCHRONOUS ) as write_api :
98+ try :
99+ write_api .write (self .influxdb_bucket , self .influxdb_org , records , write_precision = WritePrecision .NS )
100+ except InfluxDBError as exc :
101+ # TODO(sean) InfluxDB only responds with single invalid data point message.
102+ # Although the write goes through for the valid data points, getting this info
103+ # could be helpful for debugging. We may need to leverage a known schema later
104+ # to be more proactive about the problem.
105+ logging .error ("error when writing batch: %s" , exc .message )
106+
107+ # ack entire batch
108+ logging .info ("acking batch" )
109+ for ch , method , properties , body in self .batch :
110+ ch .basic_ack (method .delivery_tag )
111+
112+ messages_processed_total .inc (len (self .batch ))
113+ self .batch .clear ()
114+ logging .info ("flushed batch" )
115+
116+ def handle (self , ch , method , properties , body ):
117+ # ensure we flush new batch within max flush interval
118+ if len (self .batch ) == 0 :
119+ self .rabbitmq_conn .call_later (self .max_flush_interval , self .flush )
120+
121+ self .batch .append ((ch , method , properties , body ))
122+
123+ # ensure we flush when batch is large enough
124+ if len (self .batch ) >= self .max_batch_size :
125+ self .flush ()
126+
127+
128+ def get_pika_credentials (args ):
129+ if args .rabbitmq_username != "" :
130+ return pika .PlainCredentials (args .rabbitmq_username , args .rabbitmq_password )
131+ return pika .credentials .ExternalCredentials ()
132+
133+
134+ def get_ssl_options (args ):
135+ if args .rabbitmq_cacertfile == "" :
136+ return None
137+ context = ssl .create_default_context (cafile = args .rabbitmq_cacertfile )
138+ # HACK this allows the host and baked in host to be configured independently
139+ context .check_hostname = False
140+ if args .rabbitmq_certfile != "" :
141+ context .load_cert_chain (args .rabbitmq_certfile , args .rabbitmq_keyfile )
142+ return pika .SSLOptions (context , args .rabbitmq_host )
143+
144+
145+ def main ():
146+ parser = argparse .ArgumentParser ()
147+ parser .add_argument ("--debug" , action = "store_true" )
148+ parser .add_argument ("--rabbitmq_host" ,default = getenv ("RABBITMQ_HOST" , "localhost" ))
149+ parser .add_argument ("--rabbitmq_port" , default = getenv ("RABBITMQ_PORT" , "5672" ), type = int )
150+ parser .add_argument ("--rabbitmq_username" , default = getenv ("RABBITMQ_USERNAME" , "" ))
151+ parser .add_argument ("--rabbitmq_password" , default = getenv ("RABBITMQ_PASSWORD" , "" ))
152+ parser .add_argument ("--rabbitmq_cacertfile" , default = getenv ("RABBITMQ_CACERTFILE" , "" ))
153+ parser .add_argument ("--rabbitmq_certfile" , default = getenv ("RABBITMQ_CERTFILE" , "" ))
154+ parser .add_argument ("--rabbitmq_keyfile" , default = getenv ("RABBITMQ_KEYFILE" , "" ))
155+ parser .add_argument ("--rabbitmq_exchange" , default = getenv ("RABBITMQ_EXCHANGE" , "waggle.msg" ))
156+ parser .add_argument ("--rabbitmq_queue" , default = getenv ("RABBITMQ_QUEUE" , "influx-messages" ))
157+ parser .add_argument ("--influxdb_url" , default = getenv ("INFLUXDB_URL" , "http://localhost:8086" ))
158+ parser .add_argument ("--influxdb_token" , default = getenv ("INFLUXDB_TOKEN" ))
159+ parser .add_argument ("--influxdb_bucket" , default = getenv ("INFLUXDB_BUCKET" , "waggle" ))
160+ parser .add_argument ("--influxdb_org" , default = getenv ("INFLUXDB_ORG" , "waggle" ))
161+ parser .add_argument ("--max_flush_interval" , default = getenv ("MAX_FLUSH_INTERVAL" , "1.0" ), type = float , help = "max flush interval" )
162+ parser .add_argument ("--max_batch_size" , default = getenv ("MAX_BATCH_SIZE" , "5000" ), type = int , help = "max batch size" )
163+ parser .add_argument ("--metrics_port" , default = getenv ("METRICS_PORT" , "8080" ), type = int , help = "port to expose metrics" )
164+ args = parser .parse_args ()
165+
166+ logging .basicConfig (
167+ level = logging .DEBUG if args .debug else logging .INFO ,
168+ format = "%(asctime)s %(message)s" ,
169+ datefmt = "%Y/%m/%d %H:%M:%S" )
170+ # pika logging is too verbose, so we turn it down.
171+ logging .getLogger ("pika" ).setLevel (logging .CRITICAL )
172+
173+ credentials = get_pika_credentials (args )
174+ ssl_options = get_ssl_options (args )
175+
176+ params = pika .ConnectionParameters (
177+ host = args .rabbitmq_host ,
178+ port = args .rabbitmq_port ,
179+ credentials = credentials ,
180+ ssl_options = ssl_options ,
181+ retry_delay = 60 ,
182+ socket_timeout = 10.0 )
183+
184+ start_http_server (args .metrics_port )
185+
186+ with ExitStack () as es :
187+ logging .info ("connecting to influxdb at %s" , args .influxdb_url )
188+ client = es .enter_context (InfluxDBClient (
189+ url = args .influxdb_url ,
190+ token = args .influxdb_token ,
191+ org = args .influxdb_org ,
192+ enable_gzip = True ,
193+ ))
194+ logging .info ("connected to influxdb" )
195+
196+ logging .info ("connecting to rabbitmq" )
197+ conn = es .enter_context (pika .BlockingConnection (params ))
198+ logging .info ("connected to rabbitmq" )
199+
200+ ch = conn .channel ()
201+ ch .queue_declare (args .rabbitmq_queue , durable = True )
202+ ch .queue_bind (args .rabbitmq_queue , args .rabbitmq_exchange , "#" )
203+
204+ handler = MessageHandler (
205+ rabbitmq_conn = conn ,
206+ influxdb_client = client ,
207+ influxdb_bucket = args .influxdb_bucket ,
208+ influxdb_org = args .influxdb_org ,
209+ max_flush_interval = args .max_flush_interval ,
210+ max_batch_size = args .max_batch_size ,
211+ )
212+ ch .basic_consume (args .rabbitmq_queue , handler .handle )
213+ ch .start_consuming ()
214+
215+
216+ if __name__ == "__main__" :
217+ main ()
0 commit comments