1+ package dotty .tools .dotc .profile
2+
3+ import java .util .concurrent .ThreadPoolExecutor .AbortPolicy
4+ import java .util .concurrent ._
5+ import java .util .concurrent .atomic .AtomicInteger
6+
7+ import dotty .tools .dotc .core .Phases .Phase
8+ import dotty .tools .dotc .core .Contexts .Context
9+
10+ sealed trait AsyncHelper {
11+
12+ def newUnboundedQueueFixedThreadPool
13+ (nThreads : Int ,
14+ shortId : String , priority : Int = Thread .NORM_PRIORITY ) : ThreadPoolExecutor
15+ def newBoundedQueueFixedThreadPool
16+ (nThreads : Int , maxQueueSize : Int , rejectHandler : RejectedExecutionHandler ,
17+ shortId : String , priority : Int = Thread .NORM_PRIORITY ) : ThreadPoolExecutor
18+
19+ }
20+
21+ object AsyncHelper {
22+ def apply (phase : Phase )(implicit ctx : Context ): AsyncHelper = ctx.profiler match {
23+ case NoOpProfiler => new BasicAsyncHelper (phase)
24+ case r : RealProfiler => new ProfilingAsyncHelper (phase, r)
25+ }
26+
27+ private abstract class BaseAsyncHelper (phase : Phase )(implicit ctx : Context ) extends AsyncHelper {
28+ val baseGroup = new ThreadGroup (s " scalac- ${phase.phaseName}" )
29+ private def childGroup (name : String ) = new ThreadGroup (baseGroup, name)
30+
31+ protected def wrapRunnable (r : Runnable ): Runnable
32+
33+ protected class CommonThreadFactory (shortId : String ,
34+ daemon : Boolean = true ,
35+ priority : Int ) extends ThreadFactory {
36+ private val group : ThreadGroup = childGroup(shortId)
37+ private val threadNumber : AtomicInteger = new AtomicInteger (1 )
38+ private val namePrefix = s " ${baseGroup.getName}- $shortId- "
39+
40+ override def newThread (r : Runnable ): Thread = {
41+ val wrapped = wrapRunnable(r)
42+ val t : Thread = new Thread (group, wrapped, namePrefix + threadNumber.getAndIncrement, 0 )
43+ if (t.isDaemon != daemon) t.setDaemon(daemon)
44+ if (t.getPriority != priority) t.setPriority(priority)
45+ t
46+ }
47+ }
48+ }
49+
50+ private final class BasicAsyncHelper (phase : Phase )(implicit ctx : Context ) extends BaseAsyncHelper (phase) {
51+
52+ override def newUnboundedQueueFixedThreadPool (nThreads : Int , shortId : String , priority : Int ): ThreadPoolExecutor = {
53+ val threadFactory = new CommonThreadFactory (shortId, priority = priority)
54+ // like Executors.newFixedThreadPool
55+ new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit .MILLISECONDS , new LinkedBlockingQueue [Runnable ], threadFactory)
56+ }
57+
58+ override def newBoundedQueueFixedThreadPool (nThreads : Int , maxQueueSize : Int , rejectHandler : RejectedExecutionHandler , shortId : String , priority : Int ): ThreadPoolExecutor = {
59+ val threadFactory = new CommonThreadFactory (shortId, priority = priority)
60+ // like Executors.newFixedThreadPool
61+ new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit .MILLISECONDS , new ArrayBlockingQueue [Runnable ](maxQueueSize), threadFactory, rejectHandler)
62+ }
63+
64+ override protected def wrapRunnable (r : Runnable ): Runnable = r
65+ }
66+
67+ private class ProfilingAsyncHelper (phase : Phase , private val profiler : RealProfiler )(implicit ctx : Context ) extends BaseAsyncHelper (phase) {
68+
69+ override def newUnboundedQueueFixedThreadPool (nThreads : Int , shortId : String , priority : Int ): ThreadPoolExecutor = {
70+ val threadFactory = new CommonThreadFactory (shortId, priority = priority)
71+ // like Executors.newFixedThreadPool
72+ new SinglePhaseInstrumentedThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit .MILLISECONDS , new LinkedBlockingQueue [Runnable ], threadFactory, new AbortPolicy )
73+ }
74+
75+ override def newBoundedQueueFixedThreadPool (nThreads : Int , maxQueueSize : Int , rejectHandler : RejectedExecutionHandler , shortId : String , priority : Int ): ThreadPoolExecutor = {
76+ val threadFactory = new CommonThreadFactory (shortId, priority = priority)
77+ // like Executors.newFixedThreadPool
78+ new SinglePhaseInstrumentedThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit .MILLISECONDS , new ArrayBlockingQueue [Runnable ](maxQueueSize), threadFactory, rejectHandler)
79+ }
80+
81+ override protected def wrapRunnable (r : Runnable ): Runnable = () => {
82+ val data = new ThreadProfileData
83+ localData.set(data)
84+
85+ val profileStart = Profiler .emptySnap
86+ try r.run finally {
87+ val snap = profiler.snapThread()
88+ val threadRange = ProfileRange (profileStart, snap, phase, 0 , " " , Thread .currentThread())
89+ profiler.completeBackground(threadRange)
90+ }
91+ }
92+
93+ /**
94+ * data for thread run. Not threadsafe, only written from a single thread
95+ */
96+ final class ThreadProfileData {
97+ var firstStartNs = 0L
98+ var taskCount = 0
99+
100+ var idleNs = 0L
101+ var runningNs = 0L
102+
103+ var lastStartNs = 0L
104+ var lastEndNs = 0L
105+ }
106+
107+ val localData = new ThreadLocal [ThreadProfileData ]
108+
109+ private class SinglePhaseInstrumentedThreadPoolExecutor
110+ ( corePoolSize : Int , maximumPoolSize : Int , keepAliveTime : Long , unit : TimeUnit ,
111+ workQueue : BlockingQueue [Runnable ], threadFactory : ThreadFactory , handler : RejectedExecutionHandler
112+ ) extends ThreadPoolExecutor (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler) {
113+
114+ override def beforeExecute (t : Thread , r : Runnable ): Unit = {
115+ val data = localData.get
116+ data.taskCount += 1
117+ val now = System .nanoTime()
118+
119+ if (data.firstStartNs == 0 ) data.firstStartNs = now
120+ else data.idleNs += now - data.lastEndNs
121+
122+ data.lastStartNs = now
123+
124+ super .beforeExecute(t, r)
125+ }
126+
127+ override def afterExecute (r : Runnable , t : Throwable ): Unit = {
128+ val now = System .nanoTime()
129+ val data = localData.get
130+
131+ data.lastEndNs = now
132+ data.runningNs += now - data.lastStartNs
133+
134+ super .afterExecute(r, t)
135+ }
136+
137+ }
138+ }
139+ }
0 commit comments