1010import java .util .Map ;
1111import java .util .Map .Entry ;
1212import java .util .Queue ;
13+ import java .util .concurrent .ArrayBlockingQueue ;
14+ import java .util .concurrent .BlockingQueue ;
1315import java .util .concurrent .ConcurrentHashMap ;
1416import java .util .concurrent .ConcurrentLinkedQueue ;
1517import java .util .concurrent .ExecutorService ;
2325
2426public class talk extends AbstractApplication {
2527
26- private static final long TIMEOUT = 200 ;
28+ private static final long TIMEOUT = 10 ;
2729 private static final int DEFAULT_POOL_SIZE = 3 ;
30+ protected static final int DEFAULT_MESSAGE_POOL_SIZE = 10 ;
31+ protected final Map <String , BlockingQueue <Builder >> meetings = new ConcurrentHashMap <String , BlockingQueue <Builder >>();
2832 protected final Map <String , Queue <Builder >> list = new ConcurrentHashMap <String , Queue <Builder >>();
29- protected final Map <String , Queue <Builder >> meetings = new ConcurrentHashMap <String , Queue <Builder >>();
3033 protected final Map <String , List <String >> sessions = new ConcurrentHashMap <String , List <String >>();
3134 private ExecutorService service ;
3235
@@ -89,39 +92,30 @@ public String save(Object meetingCode, String sessionId, String message) {
8992 * @return builder
9093 */
9194 public final String save (final Object meetingCode , final Builder builder ) {
92- final Queue <Builder > messages ;
93- synchronized (this .meetings ) {
94- if (this .meetings .get (meetingCode ) == null ) {
95- this .meetings .put (meetingCode .toString (), new ConcurrentLinkedQueue <Builder >());
96- }
95+ BlockingQueue <Builder > messages ;
96+ if ((messages = this .meetings .get (meetingCode )) == null ) {
97+ this .meetings .put (meetingCode .toString (), messages = new ArrayBlockingQueue <Builder >(DEFAULT_MESSAGE_POOL_SIZE ));
98+ }
9799
98- messages = this .meetings .get (meetingCode );
99- messages .add (builder );
100- this .meetings .notifyAll ();
100+ try {
101+ messages .put (builder );
102+ } catch (InterruptedException e ) {
103+ e .printStackTrace ();
101104 }
102105
103106 this .getService ().execute (new Runnable (){
104107 @ Override
105108 public void run () {
106- synchronized (talk .this .meetings ) {
107109 Builder message ;
108- do {
109- try {
110- talk .this .meetings .wait (TIMEOUT );
111- } catch (InterruptedException e ) {
112- e .printStackTrace ();
113- }
114- } while (talk .this .meetings .get (meetingCode ) == null || (message = talk .this .meetings .get (meetingCode ).poll ()) == null );
115-
110+ if (talk .this .meetings .get (meetingCode ) == null || (message = talk .this .meetings .get (meetingCode ).poll ()) == null ) return ;
116111 talk .this .copy (meetingCode , message );
117- }
118112 }
119113 });
120114 return builder .toString ();
121115 }
122116
123117 private ExecutorService getService () {
124- return this .service !=null ? this .service : Executors .newFixedThreadPool (DEFAULT_POOL_SIZE );
118+ return this .service !=null ? this .service : Executors .newFixedThreadPool (DEFAULT_POOL_SIZE );
125119 }
126120
127121 /**
@@ -133,19 +127,16 @@ private ExecutorService getService() {
133127 */
134128 public final String update (final String sessionId ) throws ApplicationException , IOException {
135129 Builder message ;
136- Queue <Builder > messages ;
137- synchronized (this .list ) {
138- messages = this .list .get (sessionId );
139- while ((message = messages .poll ()) == null ) {
140- try {
141- this .list .wait (TIMEOUT );
142- } catch (InterruptedException e ) {
143- throw new ApplicationException (e .getMessage (), e );
144- }
130+ Queue <Builder > messages = this .list .get (sessionId );
131+ while ((message = messages .poll ()) == null ) {
132+ try {
133+ Thread .sleep (TIMEOUT );
134+ } catch (InterruptedException e ) {
135+ throw new ApplicationException (e .getMessage (), e );
145136 }
146-
147- return message .toString ();
148137 }
138+
139+ return message .toString ();
149140 }
150141
151142 /**
@@ -163,22 +154,17 @@ protected String filter(String text) {
163154 * @param builder
164155 */
165156 private final void copy (Object meetingCode , Builder builder ) {
166- synchronized (this .list ) {
167- final Collection <Entry <String , Queue <Builder >>> set = list .entrySet ();
157+ final Collection <Entry <String , Queue <Builder >>> set = this .list .entrySet ();
168158 final Iterator <Entry <String , Queue <Builder >>> iterator = set .iterator ();
169- final List <String > meeting_session ;
170- if ((meeting_session = this .sessions .get (meetingCode )) != null ) {
159+ final List <String > _sessions ;
160+ if ((_sessions = this .sessions .get (meetingCode )) != null ) {
171161 while (iterator .hasNext ()) {
172- Entry <String , Queue <Builder >> e = iterator .next ();
173- if (meeting_session .contains (e .getKey ())) {
174- e .getValue ().add (builder );
175- this .list .notifyAll ();
162+ Entry <String , Queue <Builder >> list = iterator .next ();
163+ if (_sessions .contains (list .getKey ())) {
164+ list .getValue ().add (builder );
176165 }
177166 }
178167 }
179- else
180- this .list .notifyAll ();
181- }
182168 }
183169
184170 @ Override
@@ -195,7 +181,7 @@ public String version() {
195181 * @throws ApplicationException
196182 */
197183 public boolean testing (final int n ) throws ApplicationException {
198- this .meetings .put ("[M001]" , new ConcurrentLinkedQueue <Builder >());
184+ this .meetings .put ("[M001]" , new ArrayBlockingQueue <Builder >(DEFAULT_MESSAGE_POOL_SIZE ));
199185 this .list .put ("{A}" , new ConcurrentLinkedQueue <Builder >());
200186 this .list .put ("{B}" , new ConcurrentLinkedQueue <Builder >());
201187
0 commit comments