Skip to content
Open
132 changes: 42 additions & 90 deletions src/main/java/org/usb4java/javax/AbstractIrpQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
package org.usb4java.javax;

import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.usb.UsbControlIrp;
import javax.usb.UsbException;
Expand All @@ -27,18 +28,18 @@
*/
abstract class AbstractIrpQueue<T extends UsbIrp>
{
/** The queued packets. */
private final Queue<T> irps = new ConcurrentLinkedQueue<T>();

/** The queue processor thread. */
private volatile Thread processor;

/** If queue is currently aborting. */
private volatile boolean aborting;

/** The USB device. */
private final AbstractDevice device;

/** The non-parallel ExecutorService we will use for this queue on this device. */
private final ExecutorService singleThreadExecutor;

/** The job counter for active jobs in this queue. */
private final AtomicInteger activeJobs = new AtomicInteger(0);

/**
* Constructor.
*
Expand All @@ -50,6 +51,8 @@ abstract class AbstractIrpQueue<T extends UsbIrp>
if (device == null)
throw new IllegalArgumentException("device must be set");
this.device = device;

this.singleThreadExecutor = Services.getInstance().getConfig().newExecutorService();
}

/**
Expand All @@ -60,73 +63,29 @@ abstract class AbstractIrpQueue<T extends UsbIrp>
*/
public final void add(final T irp)
{
this.irps.add(irp);

// Start the queue processor if not already running.
if (this.processor == null)
{
this.processor = new Thread(new Runnable()
{
@Override
public void run()
{
process();
}
});
this.processor.setDaemon(true);
this.processor.setName("usb4java IRP Queue Processor");
this.processor.start();
}
}

/**
* Processes the queue. Methods returns when the queue is empty.
*/
final void process()
{
// Get the next IRP
T irp = this.irps.poll();

// If there are no IRPs to process then mark the thread as closing
// right away. Otherwise process the IRP (and more IRPs from the queue
// if present).
if (irp == null)
{
this.processor = null;
}
else
{
while (irp != null)
{
// Process the IRP
try
{
processIrp(irp);
singleThreadExecutor.execute(new Runnable() {
final T irp0 = irp;

@Override
public void run() {
activeJobs.incrementAndGet();

try {
if (!aborting) {
try {
processIrp(irp0);
} catch (final UsbException e) {
irp0.setUsbException(e);
}

irp0.complete();
finishIrp(irp0);
}
} finally {
activeJobs.decrementAndGet();
}
catch (final UsbException e)
{
irp.setUsbException(e);
}

// Get next IRP and mark the thread as closing before sending
// the events for the previous IRP
final T nextIrp = this.irps.poll();
if (nextIrp == null) this.processor = null;

// Finish the previous IRP
irp.complete();
finishIrp(irp);

// Process next IRP (if present)
irp = nextIrp;
}
}

// No more IRPs are present in the queue so terminate the thread.
synchronized (this.irps)
{
this.irps.notifyAll();
}
});
}

/**
Expand Down Expand Up @@ -156,22 +115,15 @@ final void process()
public final void abort()
{
this.aborting = true;
this.irps.clear();
while (isBusy())
{
try
{
synchronized (this.irps)
{
if (isBusy()) this.irps.wait();
}
}
catch (final InterruptedException e)
{
Thread.currentThread().interrupt();
}

singleThreadExecutor.shutdown();
try {
singleThreadExecutor.awaitTermination(4, TimeUnit.SECONDS);
this.aborting = false;
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
this.aborting = false;
}

/**
Expand All @@ -180,9 +132,9 @@ public final void abort()
*
* @return True if queue is busy, false if not.
*/
public final boolean isBusy()
public final synchronized boolean isBusy()
{
return !this.irps.isEmpty() || this.processor != null;
return activeJobs.get() > 0;
}

/**
Expand Down
66 changes: 66 additions & 0 deletions src/main/java/org/usb4java/javax/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.usb4java.javax;

import java.util.Properties;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Configuration.
Expand All @@ -29,12 +31,54 @@ final class Config
/** Key name for USB communication timeout. */
private static final String SCAN_INTERVAL_KEY = KEY_BASE + "scanInterval";

/** Key name for USB IRP executor. */
private static final String EXECUTOR_SERVICE_KEY = KEY_BASE + "irpExecutorService";

/** The timeout for USB communication in milliseconds. */
private int timeout = DEFAULT_TIMEOUT;

/** The scan interval in milliseconds. */
private int scanInterval = DEFAULT_SCAN_INTERVAL;

/** The executor service factory. */
private ExecutorServiceProvider executorService = new ExecutorServiceProvider() {
private final AtomicInteger poolNumber = new AtomicInteger(1);

class LocalThreadFactory extends Object implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

LocalThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "usb4java-irp-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
t.setDaemon(true);
if (t.getPriority() != Thread.MAX_PRIORITY)
t.setPriority(Thread.MAX_PRIORITY);
return t;
}
}

public ExecutorService newExecutorService() {
/* The default executor is a pool of max 1 thread, with 3s timeout. */
ThreadPoolExecutor es = new ThreadPoolExecutor(0, 1,
3L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
es.setThreadFactory(new LocalThreadFactory());
return es;
}
};

/**
* Constructs new configuration from the specified properties.
*
Expand All @@ -55,6 +99,22 @@ final class Config
this.scanInterval = Integer.valueOf(properties.getProperty(
SCAN_INTERVAL_KEY));
}

// Read the irp executor class
if (properties.containsKey(EXECUTOR_SERVICE_KEY))
{
try {
Class<?> cls = getClass().getClassLoader().loadClass(properties.getProperty(
EXECUTOR_SERVICE_KEY));
this.executorService = (ExecutorServiceProvider)cls.newInstance();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}

/**
Expand All @@ -76,4 +136,10 @@ public int getScanInterval()
{
return this.scanInterval;
}

/**
* Creates a new non-parallel execution service. Defaults to single thread exec
* @return new non-parallel ExecutorService
*/
public ExecutorService newExecutorService() { return this.executorService.newExecutorService(); }
}
7 changes: 7 additions & 0 deletions src/main/java/org/usb4java/javax/ExecutorServiceProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.usb4java.javax;

import java.util.concurrent.ExecutorService;

public interface ExecutorServiceProvider {
public ExecutorService newExecutorService();
}