From 804eff6ed7543068a3e5e918ae61c253b4efb700 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 28 Nov 2025 16:24:05 +0100 Subject: [PATCH] feat(?): attempt to model stream of measures as Multi --- .../nuprocess/EmitterProcessHandler.java | 80 +++++++++++++++++++ .../power/sensors/AbstractPowerSensor.java | 43 +++++++--- .../power/sensors/PowerSensor.java | 12 +-- .../power/sensors/SamplingMeasurer.java | 15 +--- .../sensors/linux/rapl/IntelRAPLSensor.java | 21 +++-- .../FileMacOSPowermetricsSensor.java | 10 ++- .../powermetrics/MacOSPowermetricsSensor.java | 23 +++--- .../macos/powermetrics/NuProcessWrapper.java | 59 +++++--------- .../powermetrics/PowerMetricsParser.java | 1 + .../ProcessMacOSPowermetricsSensor.java | 32 +++++--- .../ResourceMacOSPowermetricsSensor.java | 13 ++- .../power/sensors/test/TestPowerSensor.java | 11 +-- .../linux/rapl/IntelRAPLSensorTest.java | 1 + .../MacOSPowermetricsSensorTest.java | 8 +- 14 files changed, 212 insertions(+), 117 deletions(-) create mode 100644 backend/src/main/java/net/laprun/sustainability/power/nuprocess/EmitterProcessHandler.java diff --git a/backend/src/main/java/net/laprun/sustainability/power/nuprocess/EmitterProcessHandler.java b/backend/src/main/java/net/laprun/sustainability/power/nuprocess/EmitterProcessHandler.java new file mode 100644 index 00000000..4bdb7806 --- /dev/null +++ b/backend/src/main/java/net/laprun/sustainability/power/nuprocess/EmitterProcessHandler.java @@ -0,0 +1,80 @@ +package net.laprun.sustainability.power.nuprocess; + +import java.io.InputStream; +import java.nio.ByteBuffer; + +import com.zaxxer.nuprocess.NuProcess; + +import io.quarkus.logging.Log; +import io.smallrye.mutiny.subscription.MultiEmitter; +import net.laprun.sustainability.power.sensors.macos.powermetrics.ProcessWrapper; + +public class EmitterProcessHandler extends BaseProcessHandler { + private final boolean debug = true; + private MultiEmitter emitter; + private long start; + private short ignoreCount = 2; + + public EmitterProcessHandler(long periodInMilliSecondsAsString) { + super(ProcessWrapper.preparePowermetricsCommandVarArgs("cpu_power,tasks", + "--show-process-samp-norm", "-b", "65536", "--show-process-gpu", "-i", + "" + periodInMilliSecondsAsString)); + } + + public void setEmitter(MultiEmitter emitter) { + this.emitter = emitter; + } + + @Override + public void onPreStart(NuProcess nuProcess) { + start = System.currentTimeMillis(); + super.onPreStart(nuProcess); + } + + @Override + public void onStdout(ByteBuffer buffer, boolean closed) { + if (ignoreCount-- > 0) { + return; + } + final var now = System.currentTimeMillis(); + if (buffer.hasRemaining() && !closed) { + final var remaining = buffer.remaining(); + + final var readOnlyBuffer = buffer.asReadOnlyBuffer(); + final var first3Chars = new byte[4]; + readOnlyBuffer.get(first3Chars); + Log.infof("1st 4 bytes: '%s'", new String(first3Chars)); + final var last3Chars = new byte[4]; + readOnlyBuffer.get(remaining - 4, last3Chars, 0, 3); + Log.infof("last 4 bytes: '%s'", new String(last3Chars)); + + if (debug) { + var bytes = new byte[remaining]; + buffer.duplicate().get(bytes); + Log.infof("=== read %d after %d:\n\n'%s'\n\n===\n", remaining, now - start, + new String(bytes)); + start = now; + } + final var shouldBuffer = remaining > 16384; + if (!shouldBuffer) { + return; + } + // log("read", remaining); + // stdOutBuffer.put(buffer); + // Log.warnf("=== read %s", new String(stdOutBuffer.array())); + //emitter.emit(new ByteArrayInputStream(bytes)); + // stdOutBuffer.clear(); + } + + if (closed) { + log("closed", 0); + //emitter.complete(); + } + } + + private void log(String op, int remaining) { + final var end = System.currentTimeMillis(); + Log.infof("%s after %dms, size: %d", op, end - start, remaining); + start = end; + } +} diff --git a/backend/src/main/java/net/laprun/sustainability/power/sensors/AbstractPowerSensor.java b/backend/src/main/java/net/laprun/sustainability/power/sensors/AbstractPowerSensor.java index a9960117..4e2d3846 100644 --- a/backend/src/main/java/net/laprun/sustainability/power/sensors/AbstractPowerSensor.java +++ b/backend/src/main/java/net/laprun/sustainability/power/sensors/AbstractPowerSensor.java @@ -5,13 +5,16 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import io.quarkus.logging.Log; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.infrastructure.Infrastructure; import net.laprun.sustainability.power.SensorMetadata; -public abstract class AbstractPowerSensor implements PowerSensor { - protected final Measures measures = new MapMeasures(); +public abstract class AbstractPowerSensor implements PowerSensor { private final PIDRegistry registry = new PIDRegistry(); + private final Measures current = new MapMeasures(); private long lastUpdateEpoch; private boolean started; + private Multi measures; @ConfigProperty(name = "power-server.enable-cpu-share-sampling", defaultValue = "false") protected boolean cpuSharesEnabled; private SensorMetadata metadata; @@ -65,16 +68,21 @@ public Set registeredPIDsAsStrings() { } @Override - public void start() throws Exception { + public Multi start() throws Exception { if (!started) { lastUpdateEpoch = System.currentTimeMillis(); started = true; - doStart(); + measures = doStart() + .broadcast() + .withCancellationAfterLastSubscriberDeparture() + .toAtLeast(1) + .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()) + .onItem() + .transform(this::update); } + return measures; } - protected abstract void doStart(); - @Override public boolean isStarted() { return started; @@ -82,21 +90,30 @@ public boolean isStarted() { @Override public void stop() { - PowerSensor.super.stop(); - started = false; + if (started) { + PowerSensor.super.stop(); + started = false; + } } - public Measures update(long tick) { + public Measures update(T tick) { + // reset revolving measure so that we don't get values for pids that are not tracked anymore + current.clear(); final long newUpdateStartEpoch = System.currentTimeMillis(); - Log.debugf("Sensor update last called: %dms ago", newUpdateStartEpoch - lastUpdateEpoch); - final var measures = doUpdate(lastUpdateEpoch, newUpdateStartEpoch); + Log.infof("Sensor update last called: %dms ago", newUpdateStartEpoch - lastUpdateEpoch); + Log.infof("input %s", tick); + // extract current values into revolving measure + doUpdate(tick, current, lastUpdateEpoch, newUpdateStartEpoch); + Log.infof("Last recorded measure: %s", current); lastUpdateEpoch = newUpdateStartEpoch; - return measures; + return current; } + protected abstract Multi doStart(); + protected long lastUpdateEpoch() { return lastUpdateEpoch; } - abstract protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch); + abstract protected void doUpdate(T tick, Measures current, long lastUpdateEpoch, long newUpdateStartEpoch); } diff --git a/backend/src/main/java/net/laprun/sustainability/power/sensors/PowerSensor.java b/backend/src/main/java/net/laprun/sustainability/power/sensors/PowerSensor.java index c733362c..09f41fbf 100644 --- a/backend/src/main/java/net/laprun/sustainability/power/sensors/PowerSensor.java +++ b/backend/src/main/java/net/laprun/sustainability/power/sensors/PowerSensor.java @@ -2,6 +2,7 @@ import java.util.Set; +import io.smallrye.mutiny.Multi; import net.laprun.sustainability.power.SensorMetadata; /** @@ -52,7 +53,7 @@ default void stop() { * * @throws Exception if the sensor couldn't be started for some reason */ - void start() throws Exception; + Multi start() throws Exception; /** * Registers the provided process identifier (pid) with the sensor in case it can provide per-process measures. For sensors @@ -64,15 +65,6 @@ default void stop() { */ RegisteredPID register(long pid); - /** - * Updates the ongoing {@link Measures} being recorded by this sensor for the given tick - * - * @param tick an ordinal value tracking the number of recorded measures being taken by the sensor since it started - * measuring power consumption - * @return the {@link Measures} object recording the measures this sensor has taken since it started measuring - */ - Measures update(long tick); - /** * Unregisters the specified {@link RegisteredPID} with this sensor thus signaling that clients are not interested in * tracking the consumption of the associated process anymore diff --git a/backend/src/main/java/net/laprun/sustainability/power/sensors/SamplingMeasurer.java b/backend/src/main/java/net/laprun/sustainability/power/sensors/SamplingMeasurer.java index 2a1f1a89..50a11a9c 100644 --- a/backend/src/main/java/net/laprun/sustainability/power/sensors/SamplingMeasurer.java +++ b/backend/src/main/java/net/laprun/sustainability/power/sensors/SamplingMeasurer.java @@ -95,8 +95,8 @@ private void startSamplingIfNeeded() throws Exception { Log.infof("%s sensor adjusted its sampling period to %dms", sensor.getClass().getSimpleName(), adjusted); } - // start sensor - sensor.start(); + // start periodic power sampling, measuring sensor values over the sampling period + final var sensorSamplerMulti = sensor.start(); // manage external CPU share sampling final var overSamplingFactor = 3; @@ -104,7 +104,7 @@ private void startSamplingIfNeeded() throws Exception { final var cpuSharesTicks = Multi.createFrom().ticks() // over sample but over a shorter period to ensure we have an average that covers most of the sampling period .every(samplingPeriod.minus(50, ChronoUnit.MILLIS).dividedBy(overSamplingFactor)) - .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()); + .emitOn(Infrastructure.getDefaultWorkerPool()); if (sensor.wantsCPUShareSamplingEnabled()) { // if enabled, record a cpu share for each tick, group by the over sampling factor and average over these aggregates to produce one value for the power measure interval cpuSharesMulti = cpuSharesTicks @@ -132,15 +132,6 @@ private void startSamplingIfNeeded() throws Exception { cpuSharesMulti = cpuSharesTicks.map(unused -> Map.of()); } - // manage periodic power sampling, measuring sensor values over the sampling period - final var sensorSamplerMulti = Multi.createFrom().ticks() - .every(samplingPeriod) - .map(sensor::update) - .broadcast() - .withCancellationAfterLastSubscriberDeparture() - .toAtLeast(1) - .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()); - // combine both multis periodicSensorCheck = Multi.createBy() .combining() diff --git a/backend/src/main/java/net/laprun/sustainability/power/sensors/linux/rapl/IntelRAPLSensor.java b/backend/src/main/java/net/laprun/sustainability/power/sensors/linux/rapl/IntelRAPLSensor.java index 2e0a6c47..bcb82336 100644 --- a/backend/src/main/java/net/laprun/sustainability/power/sensors/linux/rapl/IntelRAPLSensor.java +++ b/backend/src/main/java/net/laprun/sustainability/power/sensors/linux/rapl/IntelRAPLSensor.java @@ -4,10 +4,12 @@ import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.*; import java.util.function.BiConsumer; import io.quarkus.logging.Log; +import io.smallrye.mutiny.Multi; import net.laprun.sustainability.power.SensorMetadata; import net.laprun.sustainability.power.measures.NoDurationSensorMeasure; import net.laprun.sustainability.power.sensors.AbstractPowerSensor; @@ -16,11 +18,12 @@ /** * A sensor using Intel's RAPL accessed via Linux' powercap system. */ -public class IntelRAPLSensor extends AbstractPowerSensor { +public class IntelRAPLSensor extends AbstractPowerSensor { private final RAPLFile[] raplFiles; private final int rawOffset; private SensorMetadata nativeMetadata; private final long[] lastMeasuredSensorValues; + private Duration samplingPeriod; /** * Initializes the RAPL sensor @@ -101,9 +104,17 @@ private static boolean addFileIfReadable(String raplFileAsString, SortedMap doStart() { // perform an initial measure to prime the data readAndRecordSensor(null, lastUpdateEpoch()); + return Multi.createFrom().ticks() + .every(samplingPeriod); } /** @@ -138,7 +149,7 @@ protected SensorMetadata nativeMetadata() { } @Override - protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch) { + protected void doUpdate(Long tick, Measures current, long lastUpdateEpoch, long newUpdateStartEpoch) { final var measure = new double[metadata().componentCardinality()]; readAndRecordSensor((value, index) -> { measure[index] = computePowerInMilliWatts(index, value, newUpdateStartEpoch); @@ -147,9 +158,7 @@ protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch) { newUpdateStartEpoch); final var single = new NoDurationSensorMeasure(measure, lastUpdateEpoch, newUpdateStartEpoch); - registeredPIDs().forEach(pid -> measures.record(pid, single)); - - return measures; + registeredPIDs().forEach(pid -> current.record(pid, single)); } protected void readAndRecordSensor(BiConsumer onReadingSensorValueAtIndex, long newUpdateStartEpoch) { diff --git a/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/FileMacOSPowermetricsSensor.java b/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/FileMacOSPowermetricsSensor.java index 0843eba8..b2845399 100644 --- a/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/FileMacOSPowermetricsSensor.java +++ b/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/FileMacOSPowermetricsSensor.java @@ -4,6 +4,8 @@ import java.io.FileInputStream; import java.io.InputStream; +import io.smallrye.mutiny.Multi; + /** * The aim of this sensor is to only perform one long measure and then read the power information from it once done, */ @@ -15,7 +17,11 @@ public FileMacOSPowermetricsSensor(File file) { } @Override - protected InputStream getInputStream() { + protected Multi getInputStream() { + return Multi.createFrom().item(fromFile()); + } + + private FileInputStream fromFile() { try { return new FileInputStream(file); } catch (Exception e) { @@ -26,7 +32,7 @@ protected InputStream getInputStream() { @Override public void stop() { // need to defer reading metadata until we know the file has been populated - initMetadata(getInputStream()); + initMetadata(fromFile()); super.stop(); } } diff --git a/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/MacOSPowermetricsSensor.java b/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/MacOSPowermetricsSensor.java index 53c941c3..a04b9b39 100644 --- a/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/MacOSPowermetricsSensor.java +++ b/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/MacOSPowermetricsSensor.java @@ -3,6 +3,7 @@ import java.io.InputStream; import io.quarkus.logging.Log; +import io.smallrye.mutiny.Multi; import net.laprun.sustainability.power.SensorMetadata; import net.laprun.sustainability.power.sensors.AbstractPowerSensor; import net.laprun.sustainability.power.sensors.Measures; @@ -12,7 +13,7 @@ /** * A macOS powermetrics based {@link PowerSensor} implementation. */ -public abstract class MacOSPowermetricsSensor extends AbstractPowerSensor { +public abstract class MacOSPowermetricsSensor extends AbstractPowerSensor { /** * The Central Processing Unit component name */ @@ -42,7 +43,7 @@ public abstract class MacOSPowermetricsSensor extends AbstractPowerSensor { public static final String CPU_SHARE = "cpuShare"; private CPU cpu; - private long lastCalled; + private long lastCalled = System.currentTimeMillis(); @Override public boolean supportsProcessAttribution() { @@ -59,30 +60,26 @@ protected SensorMetadata nativeMetadata() { } @Override - protected void doStart() { - // nothing to do here by default - if (Log.isDebugEnabled()) { - lastCalled = System.currentTimeMillis(); - } + protected Multi doStart() { + return getInputStream(); } - Measures extractPowerMeasure(InputStream powerMeasureInput, long lastUpdateEpoch, long newUpdateEpoch) { + void extractPowerMeasure(InputStream powerMeasureInput, Measures current, long lastUpdateEpoch, long newUpdateEpoch) { if (Log.isDebugEnabled()) { final var start = System.currentTimeMillis(); Log.debugf("powermetrics measure extraction last called %dms ago", (start - lastCalled)); lastCalled = start; } - PowerMetricsParser.extractPowerMeasure(powerMeasureInput, measures, lastUpdateEpoch, newUpdateEpoch, registeredPIDs(), + PowerMetricsParser.extractPowerMeasure(powerMeasureInput, current, lastUpdateEpoch, newUpdateEpoch, registeredPIDs(), metadata(), cpu); - return measures; } @Override - protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch) { - return extractPowerMeasure(getInputStream(), lastUpdateEpoch, newUpdateStartEpoch); + protected void doUpdate(InputStream inputStream, Measures current, long lastUpdateEpoch, long newUpdateStartEpoch) { + extractPowerMeasure(inputStream, current, lastUpdateEpoch, newUpdateStartEpoch); } - protected abstract InputStream getInputStream(); + protected abstract Multi getInputStream(); @Override public void unregister(RegisteredPID registeredPID) { diff --git a/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/NuProcessWrapper.java b/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/NuProcessWrapper.java index 7e0474d3..097c48aa 100644 --- a/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/NuProcessWrapper.java +++ b/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/NuProcessWrapper.java @@ -1,17 +1,16 @@ package net.laprun.sustainability.power.sensors.macos.powermetrics; import java.io.InputStream; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import com.zaxxer.nuprocess.NuProcess; import com.zaxxer.nuprocess.NuProcessBuilder; +import io.quarkus.logging.Log; import net.laprun.sustainability.power.nuprocess.BaseProcessHandler; -public class NuProcessWrapper implements ProcessWrapper { - private PowermetricsProcessHandler measureHandler; - private String periodInMilliSecondsAsString; - +public class NuProcessWrapper { @SuppressWarnings("UnusedReturnValue") public static NuProcess exec(BaseProcessHandler handler) { if (handler == null) @@ -20,46 +19,30 @@ public static NuProcess exec(BaseProcessHandler handler) { } public static InputStream metadataInputStream() { - final var metadataHandler = new PowermetricsProcessHandler(6500, "cpu_power", "-i", "10", "-n", "1"); - exec(metadataHandler); - try { - return metadataHandler.getInputStream().get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } + return waitForOutput(new PowermetricsProcessHandler(6500, "cpu_power", "-i", "10", "-n", "1")); } - @Override - public void start(long periodInMilliSeconds) { - this.periodInMilliSecondsAsString = Long.toString(periodInMilliSeconds); + public static CompletableFuture asyncGetOutput(PowermetricsProcessHandler handler) { + exec(handler); + return handler.getInputStream(); } - @Override - public void stop() { - if (measureHandler != null) { - measureHandler.stop(); - measureHandler = null; + public static InputStream waitForOutput(PowermetricsProcessHandler handler) { + try { + final var start = System.currentTimeMillis(); + final var inputStream = asyncGetOutput(handler).get(); + Log.infof("Waited for output: %dms", System.currentTimeMillis() - start); + return inputStream; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); } } - @Override - public boolean isRunning() { - return measureHandler != null && measureHandler.isRunning(); - } - - @Override - public InputStream streamForMeasure() { - if (!isRunning()) { - measureHandler = new PowermetricsProcessHandler(27000, "cpu_power,tasks", - "--show-process-samp-norm", "--show-process-gpu", "-i", - periodInMilliSecondsAsString, "-n", "1"); - exec(measureHandler); - try { - return measureHandler.getInputStream().get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - } - throw new IllegalStateException("Measure is still running"); + public static InputStream measureInputStream(String periodInMilliSecondsAsString) { + final var inputStream = waitForOutput(new PowermetricsProcessHandler(27000, "cpu_power,tasks", + "--show-process-samp-norm", "--show-process-gpu", "-i", + periodInMilliSecondsAsString, "-n", "1")); + Log.infof("powermetrics output: %s", inputStream); + return inputStream; } } diff --git a/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/PowerMetricsParser.java b/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/PowerMetricsParser.java index 76404339..ba6783c6 100644 --- a/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/PowerMetricsParser.java +++ b/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/PowerMetricsParser.java @@ -73,6 +73,7 @@ static void extractPowerMeasure(InputStream powerMeasureInput, Measures current, Set registeredPIDs, SensorMetadata metadata, CPU cpu) { final long startMs = lastUpdateEpoch; try { + Log.infof("Parsing: %s", powerMeasureInput); // Should not be closed since it closes the process BufferedReader input = new BufferedReader(new InputStreamReader(powerMeasureInput)); String line; diff --git a/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/ProcessMacOSPowermetricsSensor.java b/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/ProcessMacOSPowermetricsSensor.java index 7d5ba2bd..710e9fab 100644 --- a/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/ProcessMacOSPowermetricsSensor.java +++ b/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/ProcessMacOSPowermetricsSensor.java @@ -2,10 +2,12 @@ import java.io.InputStream; +import io.smallrye.mutiny.Multi; + public class ProcessMacOSPowermetricsSensor extends MacOSPowermetricsSensor { private static final int MINIMAL_PERIOD_MS = 700; - private final ProcessWrapper processWrapper = new NuProcessWrapper(); private long samplingPeriodInMillis; + private String periodInMilliSecondsAsString; public ProcessMacOSPowermetricsSensor() { // extract metadata @@ -25,24 +27,34 @@ public long adjustSamplingPeriodIfNeeded(long requestedSamplingPeriodInMillis) { MINIMAL_PERIOD_MS + " milliseconds to get useful results to also account for processing time."); } samplingPeriodInMillis = requestedSamplingPeriodInMillis - 600; + this.periodInMilliSecondsAsString = Long.toString(samplingPeriodInMillis); return samplingPeriodInMillis; } @Override - public void doStart() { - super.doStart(); - processWrapper.start(samplingPeriodInMillis); - } - - @Override - protected InputStream getInputStream() { - return processWrapper.streamForMeasure(); + protected Multi getInputStream() { + /* + * return Multi.createBy() + * .repeating() + * .completionStage(() -> { + * Log.info("getInputStream() called"); + * final var measureHandler = new PowermetricsProcessHandler(27000, "cpu_power,tasks", + * "--show-process-samp-norm", "--show-process-gpu", "-i", + * periodInMilliSecondsAsString, "-n", "1"); + * NuProcessWrapper.exec(measureHandler); + * return measureHandler.getInputStream(); + * }) + * .indefinitely(); + */ + return Multi.createBy() + .repeating() + .supplier(() -> NuProcessWrapper.measureInputStream(periodInMilliSecondsAsString)) + .indefinitely(); } @Override public void stop() { if (isStarted()) { - processWrapper.stop(); super.stop(); } } diff --git a/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/ResourceMacOSPowermetricsSensor.java b/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/ResourceMacOSPowermetricsSensor.java index 7e5ca351..fe699add 100644 --- a/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/ResourceMacOSPowermetricsSensor.java +++ b/backend/src/main/java/net/laprun/sustainability/power/sensors/macos/powermetrics/ResourceMacOSPowermetricsSensor.java @@ -2,6 +2,7 @@ import java.io.InputStream; +import io.smallrye.mutiny.Multi; import net.laprun.sustainability.power.sensors.Measures; public class ResourceMacOSPowermetricsSensor extends MacOSPowermetricsSensor { @@ -16,17 +17,21 @@ public ResourceMacOSPowermetricsSensor(String resourceName) { cpuSharesEnabled = false; this.resourceName = resourceName; this.start = expectedStartUpdateEpoch; - initMetadata(getInputStream()); + initMetadata(fromResource()); } @Override - protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch) { + protected void doUpdate(InputStream inputStream, Measures current, long lastUpdateEpoch, long newUpdateStartEpoch) { // use the expected start measured time (if provided) for the measure instead of using the provided current epoch - return super.doUpdate(start != -1 ? start : lastUpdateEpoch, newUpdateStartEpoch); + super.doUpdate(inputStream, current, start != -1 ? start : lastUpdateEpoch, newUpdateStartEpoch); } @Override - protected InputStream getInputStream() { + protected Multi getInputStream() { + return Multi.createFrom().item(fromResource()); + } + + InputStream fromResource() { return Thread.currentThread().getContextClassLoader().getResourceAsStream(resourceName); } } diff --git a/backend/src/main/java/net/laprun/sustainability/power/sensors/test/TestPowerSensor.java b/backend/src/main/java/net/laprun/sustainability/power/sensors/test/TestPowerSensor.java index 7f0894fe..b76e8dcf 100644 --- a/backend/src/main/java/net/laprun/sustainability/power/sensors/test/TestPowerSensor.java +++ b/backend/src/main/java/net/laprun/sustainability/power/sensors/test/TestPowerSensor.java @@ -4,13 +4,14 @@ import java.util.List; +import io.smallrye.mutiny.Multi; import net.laprun.sustainability.power.SensorMetadata; import net.laprun.sustainability.power.measures.NoDurationSensorMeasure; import net.laprun.sustainability.power.sensors.AbstractPowerSensor; import net.laprun.sustainability.power.sensors.Measures; @SuppressWarnings("unused") -public class TestPowerSensor extends AbstractPowerSensor { +public class TestPowerSensor extends AbstractPowerSensor { public static final String CPU = "cpu"; public static final SensorMetadata DEFAULT = new SensorMetadata( List.of(new SensorMetadata.ComponentMetadata(CPU, 0, "CPU", true, mW)), @@ -31,14 +32,14 @@ protected SensorMetadata nativeMetadata() { } @Override - public void doStart() { + public Multi doStart() { // nothing to do + return null; } @Override - protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch) { - registeredPIDs().forEach(pid -> measures.record(pid, + protected void doUpdate(Void tick, Measures current, long lastUpdateEpoch, long newUpdateStartEpoch) { + registeredPIDs().forEach(pid -> current.record(pid, new NoDurationSensorMeasure(new double[] { Math.random() }, lastUpdateEpoch, newUpdateStartEpoch))); - return measures; } } diff --git a/backend/src/test/java/net/laprun/sustainability/power/sensors/linux/rapl/IntelRAPLSensorTest.java b/backend/src/test/java/net/laprun/sustainability/power/sensors/linux/rapl/IntelRAPLSensorTest.java index 83064b8d..77a32ea9 100644 --- a/backend/src/test/java/net/laprun/sustainability/power/sensors/linux/rapl/IntelRAPLSensorTest.java +++ b/backend/src/test/java/net/laprun/sustainability/power/sensors/linux/rapl/IntelRAPLSensorTest.java @@ -100,6 +100,7 @@ protected void readAndRecordSensor(BiConsumer onReadingSensorValu void wattComputationShouldWork() throws Exception { final var raplFile = new TestRAPLFile(10000L, 20000L, 30000L); final var sensor = new TestIntelRAPLSensor(new TreeMap<>(Map.of("sensor", raplFile))); + sensor.adjustSamplingPeriodIfNeeded(100); sensor.start(); Thread.sleep(10); // ensure we get enough time between the measure performed during start and the first update final var pid = sensor.register(1234L); diff --git a/backend/src/test/java/net/laprun/sustainability/power/sensors/macos/powermetrics/MacOSPowermetricsSensorTest.java b/backend/src/test/java/net/laprun/sustainability/power/sensors/macos/powermetrics/MacOSPowermetricsSensorTest.java index e5b828ac..54a4730e 100644 --- a/backend/src/test/java/net/laprun/sustainability/power/sensors/macos/powermetrics/MacOSPowermetricsSensorTest.java +++ b/backend/src/test/java/net/laprun/sustainability/power/sensors/macos/powermetrics/MacOSPowermetricsSensorTest.java @@ -65,7 +65,7 @@ void extractPowerMeasureForM4() { final var cpu = metadata.metadataFor(MacOSPowermetricsSensor.CPU); // re-open the stream to read the measure this time - final var measure = sensor.update(0L); + final var measure = sensor.update(sensor.fromResource()); final var totalCPUPower = 420; final var totalCPUTime = 1287.34; @@ -84,7 +84,7 @@ void checkTotalPowerMeasureEvenWhenRegisteredProcessIsNotFound() { sensor.register(-666); // re-open the stream to read the measure this time - final var measure = sensor.update(0L); + final var measure = sensor.update(sensor.fromResource()); assertEquals(0, getTotalSystemComponent(measure, metadata, MacOSPowermetricsSensor.ANE)); assertEquals(19, getTotalSystemComponent(measure, metadata, MacOSPowermetricsSensor.DRAM)); @@ -109,7 +109,7 @@ void extractionShouldWorkForLowProcessIds() { final var cpu = metadata.metadataFor(MacOSPowermetricsSensor.CPU); // re-open the stream to read the measure this time - final var measure = sensor.update(0L); + final var measure = sensor.update(sensor.fromResource()); // Process CPU power should be equal to sample ms/s divided for process (here: 116.64) by total samples (1222.65) times total CPU power var pidCPUShare = 116.64 / 1222.65; assertEquals(pidCPUShare * 211, getComponent(measure, pid0, cpu)); @@ -126,7 +126,7 @@ private static void checkPowerMeasure(String testFileName, float total, String t final var pid2 = sensor.register(391); // re-open the stream to read the measure this time - final var measure = sensor.update(0L); + final var measure = sensor.update(sensor.fromResource()); final var totalMeasureMetadata = metadata.metadataFor(totalMeasureName); final var pid1CPUShare = 23.88 / 1222.65; assertEquals((pid1CPUShare * total), getComponent(measure, pid1, totalMeasureMetadata));