Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package net.laprun.sustainability.power.nuprocess;

import java.io.InputStream;
import java.nio.ByteBuffer;

import com.zaxxer.nuprocess.NuProcess;

import io.quarkus.logging.Log;
import io.smallrye.mutiny.subscription.MultiEmitter;
import net.laprun.sustainability.power.sensors.macos.powermetrics.ProcessWrapper;

public class EmitterProcessHandler extends BaseProcessHandler {
private final boolean debug = true;
private MultiEmitter<? super InputStream> emitter;
private long start;
private short ignoreCount = 2;

public EmitterProcessHandler(long periodInMilliSecondsAsString) {
super(ProcessWrapper.preparePowermetricsCommandVarArgs("cpu_power,tasks",
"--show-process-samp-norm", "-b", "65536", "--show-process-gpu", "-i",
"" + periodInMilliSecondsAsString));
}

public void setEmitter(MultiEmitter<? super InputStream> emitter) {
this.emitter = emitter;
}

@Override
public void onPreStart(NuProcess nuProcess) {
start = System.currentTimeMillis();
super.onPreStart(nuProcess);
}

@Override
public void onStdout(ByteBuffer buffer, boolean closed) {
if (ignoreCount-- > 0) {
return;
}
final var now = System.currentTimeMillis();
if (buffer.hasRemaining() && !closed) {
final var remaining = buffer.remaining();

final var readOnlyBuffer = buffer.asReadOnlyBuffer();
final var first3Chars = new byte[4];
readOnlyBuffer.get(first3Chars);
Log.infof("1st 4 bytes: '%s'", new String(first3Chars));
final var last3Chars = new byte[4];
readOnlyBuffer.get(remaining - 4, last3Chars, 0, 3);
Log.infof("last 4 bytes: '%s'", new String(last3Chars));

if (debug) {
var bytes = new byte[remaining];
buffer.duplicate().get(bytes);
Log.infof("=== read %d after %d:\n\n'%s'\n\n===\n", remaining, now - start,
new String(bytes));
start = now;
}
final var shouldBuffer = remaining > 16384;
if (!shouldBuffer) {
return;
}
// log("read", remaining);
// stdOutBuffer.put(buffer);
// Log.warnf("=== read %s", new String(stdOutBuffer.array()));
//emitter.emit(new ByteArrayInputStream(bytes));
// stdOutBuffer.clear();
}

if (closed) {
log("closed", 0);
//emitter.complete();
}
}

private void log(String op, int remaining) {
final var end = System.currentTimeMillis();
Log.infof("%s after %dms, size: %d", op, end - start, remaining);
start = end;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkus.logging.Log;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import net.laprun.sustainability.power.SensorMetadata;

public abstract class AbstractPowerSensor implements PowerSensor {
protected final Measures measures = new MapMeasures();
public abstract class AbstractPowerSensor<T> implements PowerSensor {
private final PIDRegistry registry = new PIDRegistry();
private final Measures current = new MapMeasures();
private long lastUpdateEpoch;
private boolean started;
private Multi<Measures> measures;
@ConfigProperty(name = "power-server.enable-cpu-share-sampling", defaultValue = "false")
protected boolean cpuSharesEnabled;
private SensorMetadata metadata;
Expand Down Expand Up @@ -65,38 +68,52 @@ public Set<String> registeredPIDsAsStrings() {
}

@Override
public void start() throws Exception {
public Multi<Measures> start() throws Exception {
if (!started) {
lastUpdateEpoch = System.currentTimeMillis();
started = true;
doStart();
measures = doStart()
.broadcast()
.withCancellationAfterLastSubscriberDeparture()
.toAtLeast(1)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.onItem()
.transform(this::update);
}
return measures;
}

protected abstract void doStart();

@Override
public boolean isStarted() {
return started;
}

@Override
public void stop() {
PowerSensor.super.stop();
started = false;
if (started) {
PowerSensor.super.stop();
started = false;
}
}

public Measures update(long tick) {
public Measures update(T tick) {
// reset revolving measure so that we don't get values for pids that are not tracked anymore
current.clear();
final long newUpdateStartEpoch = System.currentTimeMillis();
Log.debugf("Sensor update last called: %dms ago", newUpdateStartEpoch - lastUpdateEpoch);
final var measures = doUpdate(lastUpdateEpoch, newUpdateStartEpoch);
Log.infof("Sensor update last called: %dms ago", newUpdateStartEpoch - lastUpdateEpoch);
Log.infof("input %s", tick);
// extract current values into revolving measure
doUpdate(tick, current, lastUpdateEpoch, newUpdateStartEpoch);
Log.infof("Last recorded measure: %s", current);
lastUpdateEpoch = newUpdateStartEpoch;
return measures;
return current;
}

protected abstract Multi<T> doStart();

protected long lastUpdateEpoch() {
return lastUpdateEpoch;
}

abstract protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch);
abstract protected void doUpdate(T tick, Measures current, long lastUpdateEpoch, long newUpdateStartEpoch);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Set;

import io.smallrye.mutiny.Multi;
import net.laprun.sustainability.power.SensorMetadata;

/**
Expand Down Expand Up @@ -52,7 +53,7 @@ default void stop() {
*
* @throws Exception if the sensor couldn't be started for some reason
*/
void start() throws Exception;
Multi<Measures> start() throws Exception;

/**
* Registers the provided process identifier (pid) with the sensor in case it can provide per-process measures. For sensors
Expand All @@ -64,15 +65,6 @@ default void stop() {
*/
RegisteredPID register(long pid);

/**
* Updates the ongoing {@link Measures} being recorded by this sensor for the given tick
*
* @param tick an ordinal value tracking the number of recorded measures being taken by the sensor since it started
* measuring power consumption
* @return the {@link Measures} object recording the measures this sensor has taken since it started measuring
*/
Measures update(long tick);

/**
* Unregisters the specified {@link RegisteredPID} with this sensor thus signaling that clients are not interested in
* tracking the consumption of the associated process anymore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@ private void startSamplingIfNeeded() throws Exception {
Log.infof("%s sensor adjusted its sampling period to %dms", sensor.getClass().getSimpleName(), adjusted);
}

// start sensor
sensor.start();
// start periodic power sampling, measuring sensor values over the sampling period
final var sensorSamplerMulti = sensor.start();

// manage external CPU share sampling
final var overSamplingFactor = 3;
final Multi<Map<String, Double>> cpuSharesMulti;
final var cpuSharesTicks = Multi.createFrom().ticks()
// over sample but over a shorter period to ensure we have an average that covers most of the sampling period
.every(samplingPeriod.minus(50, ChronoUnit.MILLIS).dividedBy(overSamplingFactor))
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
.emitOn(Infrastructure.getDefaultWorkerPool());
if (sensor.wantsCPUShareSamplingEnabled()) {
// if enabled, record a cpu share for each tick, group by the over sampling factor and average over these aggregates to produce one value for the power measure interval
cpuSharesMulti = cpuSharesTicks
Expand Down Expand Up @@ -132,15 +132,6 @@ private void startSamplingIfNeeded() throws Exception {
cpuSharesMulti = cpuSharesTicks.map(unused -> Map.of());
}

// manage periodic power sampling, measuring sensor values over the sampling period
final var sensorSamplerMulti = Multi.createFrom().ticks()
.every(samplingPeriod)
.map(sensor::update)
.broadcast()
.withCancellationAfterLastSubscriberDeparture()
.toAtLeast(1)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool());

// combine both multis
periodicSensorCheck = Multi.createBy()
.combining()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.*;
import java.util.function.BiConsumer;

import io.quarkus.logging.Log;
import io.smallrye.mutiny.Multi;
import net.laprun.sustainability.power.SensorMetadata;
import net.laprun.sustainability.power.measures.NoDurationSensorMeasure;
import net.laprun.sustainability.power.sensors.AbstractPowerSensor;
Expand All @@ -16,11 +18,12 @@
/**
* A sensor using Intel's RAPL accessed via Linux' powercap system.
*/
public class IntelRAPLSensor extends AbstractPowerSensor {
public class IntelRAPLSensor extends AbstractPowerSensor<Long> {
private final RAPLFile[] raplFiles;
private final int rawOffset;
private SensorMetadata nativeMetadata;
private final long[] lastMeasuredSensorValues;
private Duration samplingPeriod;

/**
* Initializes the RAPL sensor
Expand Down Expand Up @@ -101,9 +104,17 @@ private static boolean addFileIfReadable(String raplFileAsString, SortedMap<Stri
}

@Override
public void doStart() {
public long adjustSamplingPeriodIfNeeded(long requestedSamplingPeriodInMillis) {
samplingPeriod = Duration.ofMillis(requestedSamplingPeriodInMillis);
return super.adjustSamplingPeriodIfNeeded(requestedSamplingPeriodInMillis);
}

@Override
public Multi<Long> doStart() {
// perform an initial measure to prime the data
readAndRecordSensor(null, lastUpdateEpoch());
return Multi.createFrom().ticks()
.every(samplingPeriod);
}

/**
Expand Down Expand Up @@ -138,7 +149,7 @@ protected SensorMetadata nativeMetadata() {
}

@Override
protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch) {
protected void doUpdate(Long tick, Measures current, long lastUpdateEpoch, long newUpdateStartEpoch) {
final var measure = new double[metadata().componentCardinality()];
readAndRecordSensor((value, index) -> {
measure[index] = computePowerInMilliWatts(index, value, newUpdateStartEpoch);
Expand All @@ -147,9 +158,7 @@ protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch) {
newUpdateStartEpoch);

final var single = new NoDurationSensorMeasure(measure, lastUpdateEpoch, newUpdateStartEpoch);
registeredPIDs().forEach(pid -> measures.record(pid, single));

return measures;
registeredPIDs().forEach(pid -> current.record(pid, single));
}

protected void readAndRecordSensor(BiConsumer<Long, Integer> onReadingSensorValueAtIndex, long newUpdateStartEpoch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.io.FileInputStream;
import java.io.InputStream;

import io.smallrye.mutiny.Multi;

/**
* The aim of this sensor is to only perform one long measure and then read the power information from it once done,
*/
Expand All @@ -15,7 +17,11 @@ public FileMacOSPowermetricsSensor(File file) {
}

@Override
protected InputStream getInputStream() {
protected Multi<InputStream> getInputStream() {
return Multi.createFrom().item(fromFile());
}

private FileInputStream fromFile() {
try {
return new FileInputStream(file);
} catch (Exception e) {
Expand All @@ -26,7 +32,7 @@ protected InputStream getInputStream() {
@Override
public void stop() {
// need to defer reading metadata until we know the file has been populated
initMetadata(getInputStream());
initMetadata(fromFile());
super.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.InputStream;

import io.quarkus.logging.Log;
import io.smallrye.mutiny.Multi;
import net.laprun.sustainability.power.SensorMetadata;
import net.laprun.sustainability.power.sensors.AbstractPowerSensor;
import net.laprun.sustainability.power.sensors.Measures;
Expand All @@ -12,7 +13,7 @@
/**
* A macOS powermetrics based {@link PowerSensor} implementation.
*/
public abstract class MacOSPowermetricsSensor extends AbstractPowerSensor {
public abstract class MacOSPowermetricsSensor extends AbstractPowerSensor<InputStream> {
/**
* The Central Processing Unit component name
*/
Expand Down Expand Up @@ -42,7 +43,7 @@ public abstract class MacOSPowermetricsSensor extends AbstractPowerSensor {
public static final String CPU_SHARE = "cpuShare";

private CPU cpu;
private long lastCalled;
private long lastCalled = System.currentTimeMillis();

@Override
public boolean supportsProcessAttribution() {
Expand All @@ -59,30 +60,26 @@ protected SensorMetadata nativeMetadata() {
}

@Override
protected void doStart() {
// nothing to do here by default
if (Log.isDebugEnabled()) {
lastCalled = System.currentTimeMillis();
}
protected Multi<InputStream> doStart() {
return getInputStream();
}

Measures extractPowerMeasure(InputStream powerMeasureInput, long lastUpdateEpoch, long newUpdateEpoch) {
void extractPowerMeasure(InputStream powerMeasureInput, Measures current, long lastUpdateEpoch, long newUpdateEpoch) {
if (Log.isDebugEnabled()) {
final var start = System.currentTimeMillis();
Log.debugf("powermetrics measure extraction last called %dms ago", (start - lastCalled));
lastCalled = start;
}
PowerMetricsParser.extractPowerMeasure(powerMeasureInput, measures, lastUpdateEpoch, newUpdateEpoch, registeredPIDs(),
PowerMetricsParser.extractPowerMeasure(powerMeasureInput, current, lastUpdateEpoch, newUpdateEpoch, registeredPIDs(),
metadata(), cpu);
return measures;
}

@Override
protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch) {
return extractPowerMeasure(getInputStream(), lastUpdateEpoch, newUpdateStartEpoch);
protected void doUpdate(InputStream inputStream, Measures current, long lastUpdateEpoch, long newUpdateStartEpoch) {
extractPowerMeasure(inputStream, current, lastUpdateEpoch, newUpdateStartEpoch);
}

protected abstract InputStream getInputStream();
protected abstract Multi<InputStream> getInputStream();

@Override
public void unregister(RegisteredPID registeredPID) {
Expand Down
Loading
Loading