Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 114 additions & 1 deletion caso/messenger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,78 @@
"""Module containing the base class and manager for the cASO messengers."""

import abc
import typing

from oslo_config import cfg
from oslo_log import log
import six

import caso.record
from caso import loading

CONF = cfg.CONF

LOG = log.getLogger(__name__)

# Valid record types that can be configured
VALID_RECORD_TYPES = frozenset(["cloud", "ip", "accelerator", "storage", "energy"])

# Default record types for SSM messenger (records from default extractors)
# Default extractors are: nova, cinder, neutron
# nova -> CloudRecord, AcceleratorRecord
# cinder -> StorageRecord
# neutron -> IPRecord
DEFAULT_SSM_RECORD_TYPES = ["cloud", "ip", "accelerator", "storage"]


def get_messenger_opts(messenger_name: str) -> typing.List[cfg.Opt]:
"""Get the configuration options for a specific messenger.

:param messenger_name: Name of the messenger.
:returns: List of configuration options for the messenger.
"""
# SSM messengers have a different default (only records from default extractors)
if messenger_name in ("ssm", "ssmv4"):
default_record_types = DEFAULT_SSM_RECORD_TYPES
else:
default_record_types = []

return [
cfg.ListOpt(
"record_types",
default=default_record_types,
help="List of record types to publish to this messenger. "
"Valid values are: cloud, ip, accelerator, storage, energy. "
"If empty, all record types will be published. "
f"Default for {messenger_name}: "
f"{default_record_types if default_record_types else 'all record types'}.",
),
]


def register_messenger_opts(messenger_names: typing.Optional[typing.List[str]] = None):
"""Register configuration options for the specified messengers.

:param messenger_names: List of messenger names to register options for.
If None, registers options for all available messengers.
"""
if messenger_names is None:
messenger_names = list(loading.get_available_messenger_names())

for messenger_name in messenger_names:
group_name = f"messenger_{messenger_name}"
CONF.register_opts(get_messenger_opts(messenger_name), group=group_name)


# Mapping from record type names to record classes
RECORD_TYPE_MAP: typing.Dict[str, type] = {
"cloud": caso.record.CloudRecord,
"ip": caso.record.IPRecord,
"accelerator": caso.record.AcceleratorRecord,
"storage": caso.record.StorageRecord,
"energy": caso.record.EnergyRecord,
}


@six.add_metaclass(abc.ABCMeta)
class BaseMessenger(object):
Expand All @@ -38,22 +99,74 @@ def push(self, records):
"""Push the records."""


def _filter_records(
records: typing.List,
record_types: typing.Optional[typing.List[str]],
) -> typing.List:
"""Filter records based on allowed record types.

:param records: List of records to filter.
:param record_types: List of allowed record type names. If None or empty,
all records are returned.
:returns: Filtered list of records.
"""
if not record_types:
return records

allowed_classes = tuple(
RECORD_TYPE_MAP[rt] for rt in record_types if rt in RECORD_TYPE_MAP
)
if not allowed_classes:
return records

return [r for r in records if isinstance(r, allowed_classes)]


class Manager(object):
"""Manager for all cASO messengers."""

def __init__(self):
"""Init the manager with all the configured messengers."""
# Register messenger options for all configured messengers
register_messenger_opts(CONF.messengers)

try:
self.mgr = loading.get_enabled_messengers(CONF.messengers)
except Exception as e:
# Capture exception so that we can continue working
LOG.error(e)
raise e

# Build mapping of messenger name to allowed record types
self.messenger_record_types: typing.Dict[
str, typing.Optional[typing.List[str]]
] = {}
for messenger_name in CONF.messengers:
group_name = f"messenger_{messenger_name}"
self.messenger_record_types[messenger_name] = None
if hasattr(CONF, group_name):
group = getattr(CONF, group_name)
if hasattr(group, "record_types"):
record_types = group.record_types
if record_types:
self.messenger_record_types[messenger_name] = list(record_types)

def push_to_all(self, records):
"""Push records to all the configured messengers."""
try:
self.mgr.map_method("push", records)
for ext in self.mgr:
messenger_name = ext.name
record_types = self.messenger_record_types.get(messenger_name)
filtered_records = _filter_records(records, record_types)
if filtered_records:
ext.obj.push(filtered_records)
else:
LOG.debug(
"No records to push to messenger %s "
"after filtering for record types: %s",
messenger_name,
record_types,
)
except Exception as e:
# Capture exception so that we can continue working
LOG.error("Something happeneded when pushing records.")
Expand Down
11 changes: 10 additions & 1 deletion caso/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@
import caso.extract.openstack.nova
import caso.extract.prometheus
import caso.keystone_client
import caso.loading
import caso.manager
import caso.messenger
import caso.messenger.logstash
import caso.messenger.ssm


def list_opts():
"""Get the list of all configured options."""
return [
opts = [
(
"DEFAULT",
itertools.chain(
Expand All @@ -47,3 +49,10 @@ def list_opts():
("prometheus", caso.extract.prometheus.opts),
("ssm", caso.messenger.ssm.opts),
]

# Add messenger-specific record_types options for all available messengers
for messenger_name in caso.loading.get_available_messenger_names():
group_name = f"messenger_{messenger_name}"
opts.append((group_name, caso.messenger.get_messenger_opts(messenger_name)))

return opts
189 changes: 189 additions & 0 deletions caso/tests/test_messenger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# -*- coding: utf-8 -*-

# Copyright 2014 Spanish National Research Council (CSIC)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Tests for messenger module."""

import caso.messenger
import caso.record


class TestFilterRecords:
"""Test cases for the _filter_records function."""

def test_filter_records_returns_all_when_no_filter(
self, cloud_record, ip_record, storage_record
):
"""Test that all records are returned when no filter is specified."""
records = [cloud_record, ip_record, storage_record]
result = caso.messenger._filter_records(records, None)
assert result == records

def test_filter_records_returns_all_when_empty_filter(
self, cloud_record, ip_record, storage_record
):
"""Test that all records are returned when filter list is empty."""
records = [cloud_record, ip_record, storage_record]
result = caso.messenger._filter_records(records, [])
assert result == records

def test_filter_records_filters_by_cloud(
self, cloud_record, ip_record, storage_record
):
"""Test filtering for cloud records only."""
records = [cloud_record, ip_record, storage_record]
result = caso.messenger._filter_records(records, ["cloud"])
assert len(result) == 1
assert isinstance(result[0], caso.record.CloudRecord)

def test_filter_records_filters_by_ip(
self, cloud_record, ip_record, storage_record
):
"""Test filtering for IP records only."""
records = [cloud_record, ip_record, storage_record]
result = caso.messenger._filter_records(records, ["ip"])
assert len(result) == 1
assert isinstance(result[0], caso.record.IPRecord)

def test_filter_records_filters_by_storage(
self, cloud_record, ip_record, storage_record
):
"""Test filtering for storage records only."""
records = [cloud_record, ip_record, storage_record]
result = caso.messenger._filter_records(records, ["storage"])
assert len(result) == 1
assert isinstance(result[0], caso.record.StorageRecord)

def test_filter_records_filters_by_multiple_types(
self, cloud_record, ip_record, storage_record
):
"""Test filtering for multiple record types."""
records = [cloud_record, ip_record, storage_record]
result = caso.messenger._filter_records(records, ["cloud", "ip"])
assert len(result) == 2
assert any(isinstance(r, caso.record.CloudRecord) for r in result)
assert any(isinstance(r, caso.record.IPRecord) for r in result)

def test_filter_records_handles_accelerator(self, accelerator_record):
"""Test filtering for accelerator records."""
records = [accelerator_record]
result = caso.messenger._filter_records(records, ["accelerator"])
assert len(result) == 1
assert isinstance(result[0], caso.record.AcceleratorRecord)

def test_filter_records_handles_energy(self, energy_record):
"""Test filtering for energy records."""
records = [energy_record]
result = caso.messenger._filter_records(records, ["energy"])
assert len(result) == 1
assert isinstance(result[0], caso.record.EnergyRecord)

def test_filter_records_returns_empty_when_no_match(self, cloud_record, ip_record):
"""Test that empty list is returned when no records match filter."""
records = [cloud_record, ip_record]
result = caso.messenger._filter_records(records, ["storage", "energy"])
assert result == []

def test_filter_records_ignores_invalid_types(self, cloud_record, ip_record):
"""Test that invalid record types are ignored in the filter."""
records = [cloud_record, ip_record]
result = caso.messenger._filter_records(records, ["invalid_type"])
# When all filter types are invalid, return all records
assert result == records

def test_filter_records_mixed_valid_invalid_types(
self, cloud_record, ip_record, storage_record
):
"""Test filtering with mix of valid and invalid types."""
records = [cloud_record, ip_record, storage_record]
result = caso.messenger._filter_records(records, ["cloud", "invalid_type"])
assert len(result) == 1
assert isinstance(result[0], caso.record.CloudRecord)


class TestGetMessengerOpts:
"""Test cases for the get_messenger_opts function."""

def test_ssm_messenger_has_default_record_types(self):
"""Test that SSM messenger has default record types."""
opts = caso.messenger.get_messenger_opts("ssm")
assert len(opts) == 1
opt = opts[0]
assert opt.name == "record_types"
assert opt.default == caso.messenger.DEFAULT_SSM_RECORD_TYPES

def test_ssmv4_messenger_has_default_record_types(self):
"""Test that SSMv4 messenger has default record types."""
opts = caso.messenger.get_messenger_opts("ssmv4")
assert len(opts) == 1
opt = opts[0]
assert opt.name == "record_types"
assert opt.default == caso.messenger.DEFAULT_SSM_RECORD_TYPES

def test_noop_messenger_has_empty_default(self):
"""Test that noop messenger has empty default (all record types)."""
opts = caso.messenger.get_messenger_opts("noop")
assert len(opts) == 1
opt = opts[0]
assert opt.name == "record_types"
assert opt.default == []

def test_logstash_messenger_has_empty_default(self):
"""Test that logstash messenger has empty default (all record types)."""
opts = caso.messenger.get_messenger_opts("logstash")
assert len(opts) == 1
opt = opts[0]
assert opt.name == "record_types"
assert opt.default == []


class TestDefaultSsmRecordTypes:
"""Test cases for default SSM record types."""

def test_default_ssm_record_types_includes_cloud(self):
"""Test that default SSM record types includes cloud."""
assert "cloud" in caso.messenger.DEFAULT_SSM_RECORD_TYPES

def test_default_ssm_record_types_includes_ip(self):
"""Test that default SSM record types includes ip."""
assert "ip" in caso.messenger.DEFAULT_SSM_RECORD_TYPES

def test_default_ssm_record_types_includes_accelerator(self):
"""Test that default SSM record types includes accelerator."""
assert "accelerator" in caso.messenger.DEFAULT_SSM_RECORD_TYPES

def test_default_ssm_record_types_includes_storage(self):
"""Test that default SSM record types includes storage."""
assert "storage" in caso.messenger.DEFAULT_SSM_RECORD_TYPES

def test_default_ssm_record_types_excludes_energy(self):
"""Test that default SSM record types excludes energy."""
assert "energy" not in caso.messenger.DEFAULT_SSM_RECORD_TYPES


class TestValidRecordTypes:
"""Test cases for VALID_RECORD_TYPES constant."""

def test_valid_record_types_contains_all_types(self):
"""Test that VALID_RECORD_TYPES contains all expected types."""
expected = {"cloud", "ip", "accelerator", "storage", "energy"}
assert caso.messenger.VALID_RECORD_TYPES == expected

def test_record_type_map_matches_valid_record_types(self):
"""Test that RECORD_TYPE_MAP keys match VALID_RECORD_TYPES."""
assert (
set(caso.messenger.RECORD_TYPE_MAP.keys())
== caso.messenger.VALID_RECORD_TYPES
)
Loading