From 17a62649faa31b474c9b48946fe5a47a440c0de3 Mon Sep 17 00:00:00 2001 From: Colin Godsey Date: Thu, 24 Apr 2014 22:10:51 -0500 Subject: [PATCH 01/10] Update AbstractIrpQueue.java added synchronized blocks to thread control --- src/main/java/org/usb4java/javax/AbstractIrpQueue.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java index 39add76..9d5abfb 100644 --- a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java +++ b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java @@ -58,7 +58,7 @@ abstract class AbstractIrpQueue * @param irp * The control IRP to queue. */ - public final void add(final T irp) + public final synchronized void add(final T irp) { this.irps.add(irp); @@ -92,7 +92,7 @@ final void process() // if present). if (irp == null) { - this.processor = null; + synchronized { this.processor = null; } } else { @@ -153,7 +153,7 @@ final void process() * aborted. This method returns as soon as no more IRPs are in the queue and * no more are processed. */ - public final void abort() + public final synchronized void abort() { this.aborting = true; this.irps.clear(); @@ -180,7 +180,7 @@ public final void abort() * * @return True if queue is busy, false if not. */ - public final boolean isBusy() + public final synchronized boolean isBusy() { return !this.irps.isEmpty() || this.processor != null; } From 626be960475fb9a4bc329b44f1c9cdfbc5653fe2 Mon Sep 17 00:00:00 2001 From: Colin Godsey Date: Thu, 24 Apr 2014 22:12:55 -0500 Subject: [PATCH 02/10] Update AbstractIrpQueue.java added this --- src/main/java/org/usb4java/javax/AbstractIrpQueue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java index 9d5abfb..422e555 100644 --- a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java +++ b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java @@ -92,7 +92,7 @@ final void process() // if present). if (irp == null) { - synchronized { this.processor = null; } + synchronized(this) { this.processor = null; } } else { From 871993771b299b547f196d8e5a1e51fbe578e248 Mon Sep 17 00:00:00 2001 From: Colin Godsey Date: Thu, 24 Apr 2014 23:24:58 -0500 Subject: [PATCH 03/10] Update AbstractIrpQueue.java woops, one more sync spot --- src/main/java/org/usb4java/javax/AbstractIrpQueue.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java index 422e555..85f1231 100644 --- a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java +++ b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java @@ -92,7 +92,7 @@ final void process() // if present). if (irp == null) { - synchronized(this) { this.processor = null; } + synchronized(this) { this.processor = null; } } else { @@ -111,7 +111,7 @@ final void process() // Get next IRP and mark the thread as closing before sending // the events for the previous IRP final T nextIrp = this.irps.poll(); - if (nextIrp == null) this.processor = null; + if (nextIrp == null) synchronized(this) { this.processor = null; } // Finish the previous IRP irp.complete(); From 551e85a823a3f86e2ec190877e30ba66cc65d05d Mon Sep 17 00:00:00 2001 From: Colin Godsey Date: Sat, 26 Apr 2014 10:51:14 -0500 Subject: [PATCH 04/10] removed useless sync point --- .../org/usb4java/javax/AbstractIrpQueue.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java index 85f1231..0de6cb6 100644 --- a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java +++ b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java @@ -58,24 +58,26 @@ abstract class AbstractIrpQueue * @param irp * The control IRP to queue. */ - public final synchronized void add(final T irp) + public final void add(final T irp) { this.irps.add(irp); // Start the queue processor if not already running. if (this.processor == null) { - this.processor = new Thread(new Runnable() - { - @Override - public void run() - { - process(); + synchronized(this) { + if (this.processor == null) { + this.processor = new Thread(new Runnable() { + @Override + public void run() { + process(); + } + }); + this.processor.setDaemon(true); + this.processor.setName("usb4java IRP Queue Processor"); + this.processor.start(); } - }); - this.processor.setDaemon(true); - this.processor.setName("usb4java IRP Queue Processor"); - this.processor.start(); + } } } From e98dbf5af1cbe8b5646cb41586df0d88e0e59cee Mon Sep 17 00:00:00 2001 From: Colin Godsey Date: Sat, 26 Apr 2014 15:34:45 -0500 Subject: [PATCH 05/10] decided to go a step further and implement executor services --- .../org/usb4java/javax/AbstractIrpQueue.java | 120 +++++------------- src/main/java/org/usb4java/javax/Config.java | 36 ++++++ .../javax/ExecutorServiceProvider.java | 7 + 3 files changed, 78 insertions(+), 85 deletions(-) create mode 100644 src/main/java/org/usb4java/javax/ExecutorServiceProvider.java diff --git a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java index 0de6cb6..240d488 100644 --- a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java +++ b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java @@ -5,9 +5,16 @@ package org.usb4java.javax; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.Properties; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.usb.UsbControlIrp; import javax.usb.UsbException; @@ -27,18 +34,16 @@ */ abstract class AbstractIrpQueue { - /** The queued packets. */ - private final Queue irps = new ConcurrentLinkedQueue(); - - /** The queue processor thread. */ - private volatile Thread processor; - /** If queue is currently aborting. */ private volatile boolean aborting; /** The USB device. */ private final AbstractDevice device; + private final ExecutorService singleThreadExecutor; + + private final AtomicInteger jobCounter = new AtomicInteger(0); + /** * Constructor. * @@ -50,6 +55,8 @@ abstract class AbstractIrpQueue if (device == null) throw new IllegalArgumentException("device must be set"); this.device = device; + + this.singleThreadExecutor = Services.getInstance().getConfig().newExecutorService(); } /** @@ -60,75 +67,25 @@ abstract class AbstractIrpQueue */ public final void add(final T irp) { - this.irps.add(irp); - - // Start the queue processor if not already running. - if (this.processor == null) - { - synchronized(this) { - if (this.processor == null) { - this.processor = new Thread(new Runnable() { - @Override - public void run() { - process(); - } - }); - this.processor.setDaemon(true); - this.processor.setName("usb4java IRP Queue Processor"); - this.processor.start(); - } - } - } - } - - /** - * Processes the queue. Methods returns when the queue is empty. - */ - final void process() - { - // Get the next IRP - T irp = this.irps.poll(); - - // If there are no IRPs to process then mark the thread as closing - // right away. Otherwise process the IRP (and more IRPs from the queue - // if present). - if (irp == null) - { - synchronized(this) { this.processor = null; } - } - else - { - while (irp != null) - { - // Process the IRP + jobCounter.incrementAndGet(); + singleThreadExecutor.execute(new Runnable() { + final T irp0 = irp; + @Override + public void run() { try { - processIrp(irp); + processIrp(irp0); } catch (final UsbException e) { - irp.setUsbException(e); + irp0.setUsbException(e); } - - // Get next IRP and mark the thread as closing before sending - // the events for the previous IRP - final T nextIrp = this.irps.poll(); - if (nextIrp == null) synchronized(this) { this.processor = null; } - - // Finish the previous IRP - irp.complete(); - finishIrp(irp); - - // Process next IRP (if present) - irp = nextIrp; - } - } - // No more IRPs are present in the queue so terminate the thread. - synchronized (this.irps) - { - this.irps.notifyAll(); - } + irp0.complete(); + finishIrp(irp0); + jobCounter.decrementAndGet(); + } + }); } /** @@ -155,25 +112,18 @@ final void process() * aborted. This method returns as soon as no more IRPs are in the queue and * no more are processed. */ - public final synchronized void abort() + public final void abort() { this.aborting = true; - this.irps.clear(); - while (isBusy()) - { - try - { - synchronized (this.irps) - { - if (isBusy()) this.irps.wait(); - } - } - catch (final InterruptedException e) - { - Thread.currentThread().interrupt(); - } + + singleThreadExecutor.shutdown(); + try { + singleThreadExecutor.awaitTermination(4, TimeUnit.SECONDS); + this.aborting = false; + } catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); } - this.aborting = false; } /** @@ -184,7 +134,7 @@ public final synchronized void abort() */ public final synchronized boolean isBusy() { - return !this.irps.isEmpty() || this.processor != null; + return jobCounter.get() > 0; } /** diff --git a/src/main/java/org/usb4java/javax/Config.java b/src/main/java/org/usb4java/javax/Config.java index fef6399..74f74ef 100644 --- a/src/main/java/org/usb4java/javax/Config.java +++ b/src/main/java/org/usb4java/javax/Config.java @@ -6,6 +6,7 @@ package org.usb4java.javax; import java.util.Properties; +import java.util.concurrent.*; /** * Configuration. @@ -29,12 +30,25 @@ final class Config /** Key name for USB communication timeout. */ private static final String SCAN_INTERVAL_KEY = KEY_BASE + "scanInterval"; + /** Key name for USB IRP executor. */ + private static final String EXECUTOR_SERVICE_KEY = KEY_BASE + "irpExecutorService"; + /** The timeout for USB communication in milliseconds. */ private int timeout = DEFAULT_TIMEOUT; /** The scan interval in milliseconds. */ private int scanInterval = DEFAULT_SCAN_INTERVAL; + /** The executor service factory. */ + private ExecutorServiceProvider executorService = new ExecutorServiceProvider() { + public ExecutorService newExecutorService() { + //return Executors.newSingleThreadExecutor(); + return (new ThreadPoolExecutor(0, 1, + 3L, TimeUnit.SECONDS, + new LinkedBlockingQueue())); + } + }; + /** * Constructs new configuration from the specified properties. * @@ -55,6 +69,22 @@ final class Config this.scanInterval = Integer.valueOf(properties.getProperty( SCAN_INTERVAL_KEY)); } + + // Read the irp executor class + if (properties.containsKey(EXECUTOR_SERVICE_KEY)) + { + try { + Class cls = getClass().getClassLoader().loadClass(properties.getProperty( + EXECUTOR_SERVICE_KEY)); + this.executorService = (ExecutorServiceProvider)cls.newInstance(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } catch (InstantiationException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + } } /** @@ -76,4 +106,10 @@ public int getScanInterval() { return this.scanInterval; } + + /** + * Creates a new non-parallel execution service. Defaults to single thread exec + * @return new non-parallel ExecutorService + */ + public ExecutorService newExecutorService() { return this.executorService.newExecutorService(); } } diff --git a/src/main/java/org/usb4java/javax/ExecutorServiceProvider.java b/src/main/java/org/usb4java/javax/ExecutorServiceProvider.java new file mode 100644 index 0000000..5215c3c --- /dev/null +++ b/src/main/java/org/usb4java/javax/ExecutorServiceProvider.java @@ -0,0 +1,7 @@ +package org.usb4java.javax; + +import java.util.concurrent.ExecutorService; + +public interface ExecutorServiceProvider { + public ExecutorService newExecutorService(); +} \ No newline at end of file From 387fb4be1fd7e76579a399891c460414787be0ba Mon Sep 17 00:00:00 2001 From: Colin Godsey Date: Sun, 27 Apr 2014 10:44:03 -0500 Subject: [PATCH 06/10] queue cleanup using atomic counters --- .../org/usb4java/javax/AbstractIrpQueue.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java index 240d488..05238da 100644 --- a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java +++ b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java @@ -70,20 +70,21 @@ public final void add(final T irp) jobCounter.incrementAndGet(); singleThreadExecutor.execute(new Runnable() { final T irp0 = irp; + @Override public void run() { - try - { - processIrp(irp0); - } - catch (final UsbException e) - { - irp0.setUsbException(e); - } - - irp0.complete(); - finishIrp(irp0); jobCounter.decrementAndGet(); + + if (!aborting) { + try { + processIrp(irp0); + } catch (final UsbException e) { + irp0.setUsbException(e); + } + + irp0.complete(); + finishIrp(irp0); + } } }); } From 5ed80ae22d175674b7f0c70e79d5a6eb5ab633ca Mon Sep 17 00:00:00 2001 From: Colin Godsey Date: Sun, 27 Apr 2014 11:21:25 -0500 Subject: [PATCH 07/10] changed atomic counter over MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit reduces contention between threads, and doesn’t require the executor to execute stale tasks --- .../org/usb4java/javax/AbstractIrpQueue.java | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java index 05238da..e3c61bc 100644 --- a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java +++ b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java @@ -40,9 +40,11 @@ abstract class AbstractIrpQueue /** The USB device. */ private final AbstractDevice device; + /** The non-parallel ExecutorService we will use for this queue on this device. */ private final ExecutorService singleThreadExecutor; - private final AtomicInteger jobCounter = new AtomicInteger(0); + /** The job counter for active jobs in this queue. */ + private final AtomicInteger activeJobs = new AtomicInteger(0); /** * Constructor. @@ -67,23 +69,26 @@ abstract class AbstractIrpQueue */ public final void add(final T irp) { - jobCounter.incrementAndGet(); singleThreadExecutor.execute(new Runnable() { final T irp0 = irp; @Override public void run() { - jobCounter.decrementAndGet(); - - if (!aborting) { - try { - processIrp(irp0); - } catch (final UsbException e) { - irp0.setUsbException(e); + activeJobs.incrementAndGet(); + + try { + if (!aborting) { + try { + processIrp(irp0); + } catch (final UsbException e) { + irp0.setUsbException(e); + } + + irp0.complete(); + finishIrp(irp0); } - - irp0.complete(); - finishIrp(irp0); + } finally { + activeJobs.decrementAndGet(); } } }); @@ -135,7 +140,7 @@ public final void abort() */ public final synchronized boolean isBusy() { - return jobCounter.get() > 0; + return activeJobs.get() > 0; } /** From 3aa80b0a7e3c89ec07debeff6579deed960e0ebd Mon Sep 17 00:00:00 2001 From: Colin Godsey Date: Sun, 27 Apr 2014 11:23:00 -0500 Subject: [PATCH 08/10] config cleanup --- src/main/java/org/usb4java/javax/Config.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/usb4java/javax/Config.java b/src/main/java/org/usb4java/javax/Config.java index 74f74ef..508519b 100644 --- a/src/main/java/org/usb4java/javax/Config.java +++ b/src/main/java/org/usb4java/javax/Config.java @@ -42,7 +42,7 @@ final class Config /** The executor service factory. */ private ExecutorServiceProvider executorService = new ExecutorServiceProvider() { public ExecutorService newExecutorService() { - //return Executors.newSingleThreadExecutor(); + /* The default executor is a pool of max 1 thread, with 3s timeout. */ return (new ThreadPoolExecutor(0, 1, 3L, TimeUnit.SECONDS, new LinkedBlockingQueue())); From 6decb3cbe21a0f12b528b355dc9701af39c0fc46 Mon Sep 17 00:00:00 2001 From: Colin Godsey Date: Sun, 27 Apr 2014 11:30:34 -0500 Subject: [PATCH 09/10] removed useless imports --- src/main/java/org/usb4java/javax/AbstractIrpQueue.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java index e3c61bc..98cf032 100644 --- a/src/main/java/org/usb4java/javax/AbstractIrpQueue.java +++ b/src/main/java/org/usb4java/javax/AbstractIrpQueue.java @@ -5,13 +5,7 @@ package org.usb4java.javax; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; import java.nio.ByteBuffer; -import java.util.Properties; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; From 401f007658b0fb65142adaef90975573c13c04e2 Mon Sep 17 00:00:00 2001 From: Colin Godsey Date: Sun, 27 Apr 2014 13:22:16 -0500 Subject: [PATCH 10/10] new thread factory for named daemon threads --- src/main/java/org/usb4java/javax/Config.java | 34 ++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/usb4java/javax/Config.java b/src/main/java/org/usb4java/javax/Config.java index 508519b..7ff9a58 100644 --- a/src/main/java/org/usb4java/javax/Config.java +++ b/src/main/java/org/usb4java/javax/Config.java @@ -7,6 +7,7 @@ import java.util.Properties; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * Configuration. @@ -41,11 +42,40 @@ final class Config /** The executor service factory. */ private ExecutorServiceProvider executorService = new ExecutorServiceProvider() { + private final AtomicInteger poolNumber = new AtomicInteger(1); + + class LocalThreadFactory extends Object implements ThreadFactory { + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + LocalThreadFactory() { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = "usb4java-irp-" + + poolNumber.getAndIncrement() + + "-thread-"; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + t.setDaemon(true); + if (t.getPriority() != Thread.MAX_PRIORITY) + t.setPriority(Thread.MAX_PRIORITY); + return t; + } + } + public ExecutorService newExecutorService() { /* The default executor is a pool of max 1 thread, with 3s timeout. */ - return (new ThreadPoolExecutor(0, 1, + ThreadPoolExecutor es = new ThreadPoolExecutor(0, 1, 3L, TimeUnit.SECONDS, - new LinkedBlockingQueue())); + new LinkedBlockingQueue()); + es.setThreadFactory(new LocalThreadFactory()); + return es; } };