From ef86724d1c7b862a3670b82519d0d0bc13f96720 Mon Sep 17 00:00:00 2001 From: Damien Lacoste Date: Tue, 20 Oct 2020 09:58:05 +0200 Subject: [PATCH 1/7] Draft for AbstractReader.py. This version is just a draft open for discussion. --- share/AbstractReader.py | 125 +++++++++++++++++++++++++++------------- 1 file changed, 85 insertions(+), 40 deletions(-) diff --git a/share/AbstractReader.py b/share/AbstractReader.py index fba659e..c95c8a7 100644 --- a/share/AbstractReader.py +++ b/share/AbstractReader.py @@ -1,78 +1,123 @@ +class Aggregate(Enum): + """ + Enum to describe aggregation method to use. + Note that this aggregation functions should + be supported at the backend level. + """ + COUNT + COUNT_ERRORS + COUNT_NAN + MIN + MAX + AVG + STD_DEV -import time - class AbstractReader(object): """ Subclass this class to create a PyTangoArchiving Reader for your specific DB - + e.g. TimeDBReader(AbstractReader) """ - def __init__(self,config='',...): + def __init__(self, config='',...): ''' - config must be an string like user:passwd@host + Config can be an string like user:passwd@host or a json-like dictionary "{'user':'...','passwd':'...'}" ''' self.db = YourDb(config) return - def get_database(self,epoch=-1): + def get_attributes(self, active=False, regexp=''): """ - This method should provide the current connection object to DB - - - """ - return self.db - - def get_attributes(self,active=False,regexp=''): - """ Queries the database for the current list of archived attributes. arguments: - active: True/False: attributes currently archived + active: True: only attributes currently archived + False: all attributes, even the one not archiving anymore regexp: '' :filter for attributes to retrieve """ return list() - def is_attribute_archived(self,attribute): + def is_attribute_archived(self, attribute, active=False): """ Returns if an attribute has values in DB. - If active=True, only returns for value currently adding new values + + arguments: + attribute: fqdn for the attribute. + active: if true, only check for active attributes, + otherwise check all. """ return True - def load_last_values(self,attribute): + def get_last_attribute_value(self, attribute): """ Returns last value inserted in DB for an attribute - - (epoch, r_value, w_value, quality) + + arguments: + attribute: fqdn for the attribute. + returns: + (epoch, r_value, w_value, quality, error_desc) """ - return (time.time(), 0., 0., 0) - def get_attribute_values(self,attribute,start_date,stop_date=None, - decimate=False): + return get_last_attributes_values((attribute))[attribute] + + def get_last_attributes_values(self, attributes): """ - Returns attribute values between dates in the format: - [(epoch0, r_value, w_value, quality), - (epoch1, r_value, w_value, quality), - (epoch2, Exception, None, ATTR_INVALID), - ... ] - decimate may be False, True or an aggregation method - w_value and quality are optional, while r_value is mandatory + Returns last values inserted in DB for a list of attributes + + arguments: + attribute: fqdn for the attribute. + returns: + {'att1':(epoch, r_value, w_value, quality, error_desc), + 'att2':(epoch, r_value, w_value, quality, error_desc), + ... + } """ - return [(time.time(), 0., 0., 0)] - def get_attributes_values(self,attribute,start_date,stop_date=None, - decimate=False, correlate=False): + return {attributes[0]: (time.time(), 0., 0., 0, "")} + + def get_attribute_values(self, attribute, + start_date, stop_date=None, + decimate=None): """ - Returns attribute values between dates in the format: - [(epoch0, (r_value,w_value,quality)), - (epoch1, (r_value,w_value,quality)), - (epoch2, (Exception, None, ATTR_INVALID)), + Returns attribute values between start and stop dates. + + arguments: + attribute: fqdn for the attribute. + start_date: datetime, beginning of the period to query. + stop_date: datetime, end of the period to query. + if None, now() is used. + decimate: aggregation function to use in the form: + {'timedelta0':(MIN, MAX, …) + , 'timedelta1':(AVG, COUNT, …) + , …} + if None, returns raw data. + returns: + [(epoch0, r_value, w_value, quality, error_desc), + (epoch1, r_value, w_value, quality, error_desc), ... ] - decimate may be False, True, an aggregation method or just an interval in seconds + """ + attributes[attribute] = {'start': start_date + , 'stop': stop_date + , 'decimation': decimate} + return get_attributes_values(attributes)[attribute] + + def get_attributes_values(self, attributes): + """ + Returns attributes values between start and stop dates + , using decimation or not. + + arguments: + attributes: a dict from the fqdn for the attributes + to the data to extract. + See get_attribute_values for the format to be used. - if correlate is True, attributes with no value in the interval will be correlated + returns: + {'attr0':[(epoch0, r_value, w_value, quality, error_desc), + (epoch1, r_value, w_value, quality, error_desc), + ... ], + 'attr1':[(…),(…)]} """ - return {'attr0':[(time.time(), 0., 0., 0)], 'attr1':[(time.time(), 0., 0., 0)]} + return {'attr0': [(time.time(), 0., 0., 0, '')] + , 'attr1': [(time.time(), 0., 0., 0, '')]} From 85967c553c3bdc3d1534b38bfa49937e172d0371 Mon Sep 17 00:00:00 2001 From: Damien Lacoste Date: Mon, 16 Nov 2020 07:45:48 +0100 Subject: [PATCH 2/7] Add get_connection function. Add the correlate arg and an all passing argument for the get_attributes_values functions. --- share/AbstractReader.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/share/AbstractReader.py b/share/AbstractReader.py index c95c8a7..8fc0012 100644 --- a/share/AbstractReader.py +++ b/share/AbstractReader.py @@ -8,6 +8,8 @@ class Aggregate(Enum): COUNT COUNT_ERRORS COUNT_NAN + FIRST + LAST MIN MAX AVG @@ -29,6 +31,14 @@ def __init__(self, config='',...): self.db = YourDb(config) return + def get_connection(): + """ + Return the connection object to avoid a client + to open one for custom queries. + The returned object will be implementation specific. + """ + return self.db + def get_attributes(self, active=False, regexp=''): """ Queries the database for the current list of archived attributes. @@ -79,7 +89,8 @@ def get_last_attributes_values(self, attributes): def get_attribute_values(self, attribute, start_date, stop_date=None, - decimate=None): + decimate=None, + **params): """ Returns attribute values between start and stop dates. @@ -101,9 +112,10 @@ def get_attribute_values(self, attribute, attributes[attribute] = {'start': start_date , 'stop': stop_date , 'decimation': decimate} - return get_attributes_values(attributes)[attribute] + return get_attributes_values(attributes, params)[attribute] - def get_attributes_values(self, attributes): + def get_attributes_values(self, attributes, + correlate = False, **params): """ Returns attributes values between start and stop dates , using decimation or not. @@ -112,6 +124,9 @@ def get_attributes_values(self, attributes): attributes: a dict from the fqdn for the attributes to the data to extract. See get_attribute_values for the format to be used. + correlate: if True, data is generated so that + there is available data for each timestamp of + each attribute. returns: {'attr0':[(epoch0, r_value, w_value, quality, error_desc), From f62c98856ae092d6e7c3db5ccb81788692c529cb Mon Sep 17 00:00:00 2001 From: Damien Lacoste Date: Mon, 7 Jun 2021 12:40:56 +0200 Subject: [PATCH 3/7] use a similar interface for get_attribute_values and get_attributes_values --- share/AbstractReader.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/share/AbstractReader.py b/share/AbstractReader.py index 8fc0012..1edc1ee 100644 --- a/share/AbstractReader.py +++ b/share/AbstractReader.py @@ -109,21 +109,27 @@ def get_attribute_values(self, attribute, (epoch1, r_value, w_value, quality, error_desc), ... ] """ - attributes[attribute] = {'start': start_date - , 'stop': stop_date - , 'decimation': decimate} - return get_attributes_values(attributes, params)[attribute] + return get_attributes_values((attribute), start_date, stop_date, decimate, False, params)[attribute] def get_attributes_values(self, attributes, - correlate = False, **params): + start_date, stop_date=None, + decimate=None, + correlate = False, + **params): """ Returns attributes values between start and stop dates - , using decimation or not. + , using decimation or not, correlating the values or not. arguments: - attributes: a dict from the fqdn for the attributes - to the data to extract. - See get_attribute_values for the format to be used. + attributes: a list of the attributes' fqdn + start_date: datetime, beginning of the period to query. + stop_date: datetime, end of the period to query. + if None, now() is used. + decimate: aggregation function to use in the form: + {'timedelta0':(MIN, MAX, …) + , 'timedelta1':(AVG, COUNT, …) + , …} + if None, returns raw data. correlate: if True, data is generated so that there is available data for each timestamp of each attribute. From 3032b5bf35baba636134897763a646ffdbf404fc Mon Sep 17 00:00:00 2001 From: srubio Date: Wed, 15 Sep 2021 00:10:15 +0200 Subject: [PATCH 4/7] solve errors, added main method --- share/AbstractReader.py | 112 +++++++++++++++++++++++++++++++++------- 1 file changed, 93 insertions(+), 19 deletions(-) diff --git a/share/AbstractReader.py b/share/AbstractReader.py index 1edc1ee..b98182c 100644 --- a/share/AbstractReader.py +++ b/share/AbstractReader.py @@ -1,19 +1,21 @@ -class Aggregate(Enum): +from enum import Enum + +class Aggregator(Enum): """ Enum to describe aggregation method to use. Note that this aggregation functions should be supported at the backend level. """ - COUNT - COUNT_ERRORS - COUNT_NAN - FIRST - LAST - MIN - MAX - AVG - STD_DEV + COUNT = 1 + COUNT_ERRORS = 2 + COUNT_NAN = 3 + FIRST = 4 + LAST = 5 + MIN = 6 + MAX = 7 + AVG = 8 + STD_DEV = 9 class AbstractReader(object): @@ -23,15 +25,18 @@ class AbstractReader(object): e.g. TimeDBReader(AbstractReader) """ - def __init__(self, config='',...): + def __init__(self, config='',**kwargs): ''' Config can be an string like user:passwd@host - or a json-like dictionary "{'user':'...','passwd':'...'}" + or a json-like dictionary "{'user':'...','password':'...','database':}" ''' - self.db = YourDb(config) + try: + self.db = YourDb(**(config or kwargs)) + except: + raise Exception('WrongDatabaseConfig') return - def get_connection(): + def get_connection(self): """ Return the connection object to avoid a client to open one for custom queries. @@ -39,13 +44,13 @@ def get_connection(): """ return self.db - def get_attributes(self, active=False, regexp=''): + def get_attributes(self, active=False, pattern=''): """ Queries the database for the current list of archived attributes. arguments: active: True: only attributes currently archived False: all attributes, even the one not archiving anymore - regexp: '' :filter for attributes to retrieve + pattern: '' :filter for attributes to retrieve """ return list() @@ -70,14 +75,15 @@ def get_last_attribute_value(self, attribute): (epoch, r_value, w_value, quality, error_desc) """ - return get_last_attributes_values((attribute))[attribute] + return self.get_last_attributes_values([attribute])[attribute] - def get_last_attributes_values(self, attributes): + def get_last_attributes_values(self, attributes, columns = 'time, r_value'): """ Returns last values inserted in DB for a list of attributes arguments: attribute: fqdn for the attribute. + columns: requested columns separated by commas returns: {'att1':(epoch, r_value, w_value, quality, error_desc), 'att2':(epoch, r_value, w_value, quality, error_desc), @@ -109,12 +115,13 @@ def get_attribute_values(self, attribute, (epoch1, r_value, w_value, quality, error_desc), ... ] """ - return get_attributes_values((attribute), start_date, stop_date, decimate, False, params)[attribute] + return self.get_attributes_values([attribute], start_date, stop_date, decimate, False, params)[attribute] def get_attributes_values(self, attributes, start_date, stop_date=None, decimate=None, correlate = False, + columns = 'time, r_value', **params): """ Returns attributes values between start and stop dates @@ -133,6 +140,8 @@ def get_attributes_values(self, attributes, correlate: if True, data is generated so that there is available data for each timestamp of each attribute. + columns: columns separated by commas + time, r_value, w_value, quality, error_desc returns: {'attr0':[(epoch0, r_value, w_value, quality, error_desc), @@ -142,3 +151,68 @@ def get_attributes_values(self, attributes, """ return {'attr0': [(time.time(), 0., 0., 0, '')] , 'attr1': [(time.time(), 0., 0., 0, '')]} + +############################################################################### + +__usage__ = """ +Usage: + +:> reader : print this help + +:> reader [options] list [pattern] : + returns matching attributes from database + +:> reader [options] : + print last value for attribute + +:> reader [options] : + returns values for attribute between given dates + +Options (at least some is needed): + --prompt + --config=user:password@host:port/database + --database= + --host= + --port= + --user= + --password= + +""" + +def main(apiclass=AbstractReader,timeformatter=None): + import sys + + args = [a for a in sys.argv[1:] if not a.startswith('-')] + opts = dict([a.strip('-').split('=') for a in sys.argv[1:] + if a not in args and '=' in a]) + if '--prompt' in sys.argv: + opts['host'] = input('host:') + opts['database'] = input('database:') + opts['user'] = input('user:') + opts['password'] = input('password:') + opts['port'] = input('port(3306):') or '3306' + + if not args or not opts: + print(__usage__) + sys.exit(0) + + reader = apiclass(**opts) + if args[0] == 'list': + pattern = (args[1:] or [''])[0] + print('\n'.join(reader.get_attributes(pattern=pattern))) + else: + if args[1:]: + data = reader.get_attribute_values(args[0],args[1],args[2], + decimate=True) + for d in data: + l = '\t'.join(map(str,d)) + if timeformatter: + print('%s\t%s' % (timeformatter(d[0]),l)) + else: + print(l) + else: + print(reader.get_attribute_id_table(args[0])) + print(reader.get_last_attribute_value(args[0])) + +if __name__ == '__main__': + main() From 78dfa6eff6996a8fe1b4e75b6f0a8faabbe59ad0 Mon Sep 17 00:00:00 2001 From: srubio Date: Wed, 15 Sep 2021 00:10:34 +0200 Subject: [PATCH 5/7] add time conversion methods from fandango --- share/timeutils.py | 227 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 227 insertions(+) create mode 100644 share/timeutils.py diff --git a/share/timeutils.py b/share/timeutils.py new file mode 100644 index 0000000..d40ce9d --- /dev/null +++ b/share/timeutils.py @@ -0,0 +1,227 @@ +######################################################################## +## Time conversion methods from Fandango +######################################################################## + +import time, datetime, re, traceback + +END_OF_TIME = 1024*1024*1024*2-1 #Jan 19 04:14:07 2038 + +TIME_UNITS = { 'ns': 1e-9, 'us': 1e-6, 'ms': 1e-3, '': 1, 's': 1, 'm': 60, + 'h': 3600, 'd': 86.4e3, 'w': 604.8e3, 'M': 30*86.4e3, 'y': 31.536e6 } +TIME_UNITS.update((k.upper(),v) for k,v in list(TIME_UNITS.items()) if k!='m') + +#@todo: RAW_TIME should be capable to parse durations as of ISO 8601 +RAW_TIME = ('^(?:P)?([+-]?[0-9]+[.]?(?:[0-9]+)?)(?: )?(%s)$' + % ('|').join(TIME_UNITS)) # e.g. 3600.5 s + +MYSQL_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' +ISO_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S' + +global DEFAULT_TIME_FORMAT +DEFAULT_TIME_FORMAT = MYSQL_TIME_FORMAT + +ALT_TIME_FORMATS = [ ('%s%s%s' % ( + date.replace('-',dash),separator if hour else '',hour)) + for date in ('%Y-%m-%d','%y-%m-%d','%d-%m-%Y', + '%d-%m-%y','%m-%d-%Y','%m-%d-%y') + for dash in ('-','/') + for separator in (' ','T') + for hour in ('%H:%M','%H:%M:%S','%H','')] + +def set_default_time_format(dtf, test = True): + """ + Usages: + + fandango.set_default_time_format('%Y-%m-%d %H:%M:%S') + + or + + fandango.set_default_time_format(fandango.ISO_TIME_FORMAT) + + """ + if test: + str2time(time2str(cad = dtf), cad = dtf) + global DEFAULT_TIME_FORMAT + DEFAULT_TIME_FORMAT = dtf + +def now(): + return time.time() + +def time2tuple(epoch=None, utc=False): + if epoch is None: epoch = now() + elif epoch<0: epoch = now()-epoch + if utc: + return time.gmtime(epoch) + else: + return time.localtime(epoch) + +def tuple2time(tup): + return time.mktime(tup) + +def date2time(date,us=True): + """ + This method would accept both timetuple and timedelta + in order to deal with times coming from different + api's with a single method + """ + try: + t = tuple2time(date.timetuple()) + us = us and getattr(date,'microsecond',0) + if us: t+=us*1e-6 + return t + except Exception as e: + try: + return date.total_seconds() + except: + raise e + +def date2str(date, cad = '', us=False): + #return time.ctime(date2time(date)) + global DEFAULT_TIME_FORMAT + cad = cad or DEFAULT_TIME_FORMAT + t = time.strftime(cad, time2tuple(date2time(date))) + us = us and getattr(date,'microsecond',0) + if us: t+='.%06d'%us + return t + +def time2date(epoch=None): + if epoch is None: epoch = now() + elif epoch<0: epoch = now()-epoch + return datetime.datetime.fromtimestamp(epoch) + +def utcdiff(t=None): + return now() - date2time(datetime.datetime.utcnow()) + +def time2str(epoch=None, cad='', us=False, bt=True, + utc=False, iso=False): + """ + cad: introduce your own custom format (see below) + use DEFAULT_TIME_FORMAT to set a default one + us=False; True to introduce ms precission + bt=True; negative epochs are considered relative from now + utc=False; if True it converts to UTC + iso=False; if True, 'T' will be used to separate date and time + + cad accepts the following formats: + + %a Locale’s abbreviated weekday name. + %A Locale’s full weekday name. + %b Locale’s abbreviated month name. + %B Locale’s full month name. + %c Locale’s appropriate date and time representation. + %d Day of the month as a decimal number [01,31]. + %H Hour (24-hour clock) as a decimal number [00,23]. + %I Hour (12-hour clock) as a decimal number [01,12]. + %j Day of the year as a decimal number [001,366]. + %m Month as a decimal number [01,12]. + %M Minute as a decimal number [00,59]. + %p Locale’s equivalent of either AM or PM. (1) + %S Second as a decimal number [00,61]. (2) + %U Week number of the year (Sunday as the first day of the week) as a decimal number [00,53]. + All days in a new year preceding the first Sunday are considered to be in week 0. (3) + %w Weekday as a decimal number [0(Sunday),6]. + %W Week number of the year (Monday as the first day of the week) as a decimal number [00,53]. + All days in a new year preceding the first Monday are considered to be in week 0. (3) + %x Locale’s appropriate date representation. + %X Locale’s appropriate time representation. + %y Year without century as a decimal number [00,99]. + %Y Year with century as a decimal number. + %Z Time zone name (no characters if no time zone exists). + %% A literal '%' character. + """ + if epoch is None: epoch = now() + elif bt and epoch<0: epoch = now()+epoch + global DEFAULT_TIME_FORMAT + if cad: + cad = 'T'.join(cad.split(' ',1)) if iso else cad + else: + cad = ISO_TIME_FORMAT if iso else DEFAULT_TIME_FORMAT + + t = time.strftime(cad,time2tuple(epoch,utc=utc)) + us = us and epoch%1 + if us: t+='.%06d'%(1e6*us) + return t + +epoch2str = time2str + +def str2time(seq='', cad='', throw=True, relative=False): + """ + :param seq: Date must be in ((Y-m-d|d/m/Y) (H:M[:S]?)) format or -N [d/m/y/s/h] + + See RAW_TIME and TIME_UNITS to see the units used for pattern matching. + + The conversion itself is done by time.strptime method. + + :param cad: You can pass a custom time format + :param relative: negative times will be converted to now()-time + :param throw: if False, None is returned instead of exception + """ + try: + if seq in (None,''): + return time.time() + if 'NOW-' in seq: + seq,relative = seq.replace('NOW',''),True + elif seq=='NOW': + return now() + + t, seq = None, str(seq).strip() + if not cad: + m = re.match(RAW_TIME,seq) + if m: + #Converting from a time(unit) format + value,unit = m.groups() + t = float(value)*TIME_UNITS[unit] + return t # must return here + + #Converting from a date format + ms = re.match('.*(\.[0-9]+)$',seq) #Splitting the decimal part + if ms: + ms,seq = float(ms.groups()[0]),seq.replace(ms.groups()[0],'') + + if t is None: + #tf=None will try default system format + global DEFAULT_TIME_FORMAT + time_fmts = ([cad] if cad else + [DEFAULT_TIME_FORMAT,None] + ALT_TIME_FORMATS) + for tf in time_fmts: + try: + tf = (tf,) if tf else () + t = time.strptime(seq,*tf) + break + except: + pass + + v = time.mktime(t)+(ms or 0) + if relative and v<0: + v = fn.now()-v + return v + except: + if throw: + raise Exception('PARAMS_ERROR','unknown time format: %s' % seq) + else: + return None + + +str2epoch = str2time + +def time2gmt(epoch=None): + if epoch is None: epoch = now() + return tuple2time(time.gmtime(epoch)) + +def timezone(): + t = now() + return old_div(int(t-time2gmt(t)),3600) + +#Auxiliary methods: +def ctime2time(time_struct): + try: + return (float(time_struct.tv_sec)+1e-6*float(time_struct.tv_usec)) + except: + return -1 + +def mysql2time(mysql_time,us=True): + try: + return date2time(mysql_time,us=us) + #t = time.mktime(mysql_time.timetuple()) + except: + return -1 From 843b0257105b68b55a3cce612b6fd740dec063d8 Mon Sep 17 00:00:00 2001 From: srubio Date: Wed, 15 Sep 2021 00:10:56 +0200 Subject: [PATCH 6/7] add MariaDB implementation of AbstractReader --- share/mariadb_reader.py | 303 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 303 insertions(+) create mode 100755 share/mariadb_reader.py diff --git a/share/mariadb_reader.py b/share/mariadb_reader.py new file mode 100755 index 0000000..96a2852 --- /dev/null +++ b/share/mariadb_reader.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 + +import sys, re, traceback +from timeutils import * +import AbstractReader + +try: + import pymysql as mariadb +except: + import MySQLdb as mariadb + + + +class MariadbReader(AbstractReader.AbstractReader): + """ + read-only API for hdb++ databases, based on PyTangoArchiving AbstractReader + """ + + def __init__(self,config='',**kwargs): + """ + Arguments accepted by pymysql connections: + + :param host: Host where the database server is located + :param user: Username to log in as + :param password: Password to use. + :param database: Database to use, None to not use a particular one. + :param port: MySQL port to use, default is usually OK. (default: 3306) + :param bind_address: When the client has multiple network interfaces, specify + the interface from which to connect to the host. Argument can be + a hostname or an IP address. + :param unix_socket: Optionally, you can use a unix socket rather than TCP/IP. + :param read_timeout: The timeout for reading from the connection in seconds (default: None - no timeout) + :param write_timeout: The timeout for writing to the connection in seconds (default: None - no timeout) + :param charset: Charset you want to use. + :param sql_mode: Default SQL_MODE to use. + :param read_default_file: + Specifies my.cnf file to read these parameters from under the [client] section. + :param conv: + Conversion dictionary to use instead of the default one. + This is used to provide custom marshalling and unmarshaling of types. + See converters. + :param use_unicode: + Whether or not to default to unicode strings. + This option defaults to true for Py3k. + :param client_flag: Custom flags to send to MySQL. Find potential values in constants.CLIENT. + :param cursorclass: Custom cursor class to use. + :param init_command: Initial SQL statement to run when connection is established. + :param connect_timeout: Timeout before throwing an exception when connecting. + (default: 10, min: 1, max: 31536000) + :param ssl: + A dict of arguments similar to mysql_ssl_set()'s parameters. + :param read_default_group: Group to read from in the configuration file. + :param compress: Not supported + :param named_pipe: Not supported + :param autocommit: Autocommit mode. None means use server default. (default: False) + :param local_infile: Boolean to enable the use of LOAD DATA LOCAL command. (default: False) + :param max_allowed_packet: Max size of packet sent to server in bytes. (default: 16MB) + Only used to limit size of "LOAD LOCAL INFILE" data packet smaller than default (16KB). + :param defer_connect: Don't explicitly connect on contruction - wait for connect call. + (default: False) + :param auth_plugin_map: A dict of plugin names to a class that processes that plugin. + The class will take the Connection object as the argument to the constructor. + The class needs an authenticate method taking an authentication packet as + an argument. For the dialog plugin, a prompt(echo, prompt) method can be used + (if no authenticate method) for returning a string from the user. (experimental) + :param server_public_key: SHA256 authenticaiton plugin public key value. (default: None) + :param db: Alias for database. (for compatibility to MySQLdb) + :param passwd: Alias for password. (for compatibility to MySQLdb) + :param binary_prefix: Add _binary prefix on bytes and bytearray. (default: False) + """ + if config and isinstance(config,(str,bytes)): + config = self.parse_config(config) + + + self.config = config or {} + self.config.update(kwargs) + + self.database = self.config.get('database','hdbpp') + self.user = self.config.get('user','') + self.password = self.config.get('password','') + self.port = int(self.config.get('port','3306')) + self.host = self.config.get('host','localhost') + + #print([(k,v) for k,v in self.__dict__.items() + #if k not in type(self).__dict__()]) + + self.db = mariadb.connect(database=self.database, + user=self.user, password=self.password, port=self.port, + host=self.host) + self._cursor = self.db.cursor() + + def __del__(self): + self._cursor.close() + self.db.close() + + def _query(self,query,prune=False): + """ + query: SQL code + """ + #print(query) + self._cursor.execute(query) + if prune: + r,l = [],True + while l: + try: + l = self._cursor.fetchone() + if l and (not r or l[1:] != r[-1][1:]): + r.append(l) + except: + print(r[-1:], l) + traceback.print_exc() + break + return r + else: + return self._cursor.fetchall() + + def parse_config(self,config): + """ + config string as user:password@host:port/database + or dictionary like + """ + try: + if re.match('.*[:].*[@].*',config): + h = config.split('@') + u,p = h[0].split(':') + config = {'user':u,'password':p} + if '/' in h[1]: + config['host'],config['database'] = h[1].split('/') + else: + config['host'] = h[1] + if ':' in config['host']: + config['host'],config['port'] = config['host'].split(':') + else: + if '{' not in config: + config = '{%s}' % config.replace(';',',') + config = dict(eval(config)) + except: + raise Exception('Wrong format in config, should be dict-like') + return config + + def get_attributes(self, active=False, pattern=''): + """ + Queries the database for the current list of archived attributes. + arguments: + active: True: only attributes currently archived + False: all attributes, even the one not archiving anymore + regexp: '' :filter for attributes to retrieve + """ + q = 'select att_name from att_conf' + if pattern: + q += " where att_name like '%s'" % pattern.replace('*','%') + print(q) + return [str(a[0]).lower() for a in self._query(q) if a] + + def get_attribute_name(self,attribute): + attribute = str(attribute).lower() + if ':' not in attribute: + attribute = '%' + '/' + attribute + + elif '.' not in attribute: + attribute = attribute.rsplit(':',1) + attribute = attribute[0] + '.%' + attribute[1] + + if 'tango' not in attribute: + attribute = '%' + '/' + attribute + + attrs = self.get_attributes(pattern=attribute) + if len(attrs)!=1: + raise Exception('MultipleAttributeMatches') + + return attrs[0] if attrs else '' + + def is_attribute_archived(self, attribute, active=False): + """ + Returns if an attribute has values in DB. + + arguments: + attribute: fqdn for the attribute. + active: if true, only check for active attributes, + otherwise check all. + """ + return bool(self.get_attribute_name(attribute)) + + def get_attribute_id_table(self, attribute=''): + """ + for each matching attribute returns name, ID and table name + """ + q = "select att_name,att_conf_id,data_type " + q += " from att_conf, att_conf_data_type where " + q += "att_conf.att_conf_data_type_id = att_conf_data_type.att_conf_data_type_id" + if attribute: + q += " and att_name like '%s'" % attribute + + return [(a,i,'att_'+t) for (a,i,t) in self._query(q)] + + def get_last_attributes_values(self, attributes, columns = '', n = 1): + """ + Returns last values inserted in DB for a list of attributes + + arguments: + attribute: fqdn for the attribute. + columns: requested columns separated by commas + returns: + {'att1':(epoch, r_value, w_value, quality, error_desc), + 'att2':(epoch, r_value, w_value, quality, error_desc), + ... + } + """ + data = {} + columns = columns or 'data_time, value_r, quality, att_error_desc_id' + + for a in attributes: + try: + a,i,t = self.get_attribute_id_table(a)[0] + tdesc = str(self._query('describe %s'%t)) + tcol = ('int_time' if 'int_time' in tdesc else 'data_time') + cols = ','.join(c for c in columns.split(',') + if c.strip() in tdesc) + data[a] = self._query('select %s from %s where ' + 'att_conf_id = %s order by %s desc limit %s' + % (cols, t, i, tcol, n)) + except: + raise Exception('AttributeNotFound: %s' % a) + + return data + + def get_attributes_values(self, attributes, + start_date, stop_date=None, + decimate=None, + correlate = False, + columns = '', + **params): + """ + Returns attributes values between start and stop dates + , using decimation or not, correlating the values or not. + + arguments: + attributes: a list of the attributes' fqdn + start_date: datetime, beginning of the period to query. + stop_date: datetime, end of the period to query. + if None, now() is used. + decimate: aggregation function to use in the form: + {'timedelta0':(MIN, MAX, …) + , 'timedelta1':(AVG, COUNT, …) + , …} + if None, returns raw data. + correlate: if True, data is generated so that + there is available data for each timestamp of + each attribute. + columns: columns separated by commas + time, r_value, w_value, quality, error_desc + + returns: + {'attr0':[(epoch0, r_value, w_value, quality, error_desc), + (epoch1, r_value, w_value, quality, error_desc), + ... ], + 'attr1':[(…),(…)]} + """ + data = {} + columns = columns or 'data_time, value_r, quality, att_error_desc_id' + if isinstance(start_date,(int,float)): + start_date = time2str(start_date) + if stop_date is None: + stop_date = now() + if isinstance(stop_date,(int,float)): + stop_date = time2str(stop_date) + + for a in attributes: + try: + a,i,t = self.get_attribute_id_table(a)[0] + tdesc = str(self._query('describe %s'%t)) + tcol = ('int_time' if 'int_time' in tdesc else 'data_time') + if tcol == 'int_time': + b,e = str2time(start_date),str2time(stop_date) + else: + b,e = "'%s'" % start_date, "'%s'" % stop_date + + cols = ','.join(c for c in columns.split(',') + if c.strip() in tdesc) + print(cols) + if 'data_time,' in cols: + cols = cols.replace('data_time,', + 'CAST(UNIX_TIMESTAMP(data_time) AS DOUBLE),') + data[a] = self._query('select %s from %s where ' + 'att_conf_id = %s and %s between %s and %s ' + 'order by data_time' + % (cols, t, i, tcol, b, e), prune=decimate) + except: + import traceback + traceback.print_exc() + #raise Exception('AttributeNotFound: %s' % a) + + return data + + return {'attr0': [(time.time(), 0., 0., 0, '')] + , 'attr1': [(time.time(), 0., 0., 0, '')]} + + +############################################################################## + +if __name__ == '__main__': + AbstractReader.main(apiclass=MariadbReader,timeformatter=time2str) + From 2afcf2e140c86362663e8b053e7feabc41b088f9 Mon Sep 17 00:00:00 2001 From: Sergi Date: Wed, 15 Sep 2021 15:52:15 +0200 Subject: [PATCH 7/7] remove print --- share/mariadb_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/share/mariadb_reader.py b/share/mariadb_reader.py index 96a2852..a291f32 100755 --- a/share/mariadb_reader.py +++ b/share/mariadb_reader.py @@ -149,7 +149,7 @@ def get_attributes(self, active=False, pattern=''): q = 'select att_name from att_conf' if pattern: q += " where att_name like '%s'" % pattern.replace('*','%') - print(q) + #print(q) return [str(a[0]).lower() for a in self._query(q) if a] def get_attribute_name(self,attribute):