1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.asteriskjava.live.internal;
18
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Date;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Timer;
25 import java.util.TimerTask;
26
27 import org.asteriskjava.live.AsteriskQueue;
28 import org.asteriskjava.live.AsteriskQueueEntry;
29 import org.asteriskjava.live.AsteriskQueueListener;
30 import org.asteriskjava.live.AsteriskQueueMember;
31 import org.asteriskjava.util.Log;
32 import org.asteriskjava.util.LogFactory;
33
34
35
36
37
38
39
40 class AsteriskQueueImpl extends AbstractLiveObject implements AsteriskQueue
41 {
42
43
44
45
46
47 private class ServiceLevelTimerTask extends TimerTask
48 {
49 private final AsteriskQueueEntry entry;
50
51 ServiceLevelTimerTask(AsteriskQueueEntry entry)
52 {
53 this.entry = entry;
54 }
55
56 @Override
57 public void run()
58 {
59 fireServiceLevelExceeded(entry);
60 }
61 }
62
63 private final Log logger = LogFactory.getLog(this.getClass());
64 private final String name;
65 private Integer max;
66 private String strategy;
67 private Integer serviceLevel;
68 private Integer weight;
69 private final ArrayList<AsteriskQueueEntryImpl> entries;
70 private final Timer timer;
71 private final HashMap<String, AsteriskQueueMemberImpl> members;
72 private final List<AsteriskQueueListener> listeners;
73 private final HashMap<AsteriskQueueEntry, ServiceLevelTimerTask> serviceLevelTimerTasks;
74
75 AsteriskQueueImpl(AsteriskServerImpl server, String name, Integer max,
76 String strategy, Integer serviceLevel, Integer weight)
77 {
78 super(server);
79 this.name = name;
80 this.max = max;
81 this.strategy = strategy;
82 this.serviceLevel = serviceLevel;
83 this.weight = weight;
84 entries = new ArrayList<AsteriskQueueEntryImpl>(25);
85 listeners = new ArrayList<AsteriskQueueListener>();
86 members = new HashMap<String, AsteriskQueueMemberImpl>();
87 timer = new Timer("ServiceLevelTimer-" + name, true);
88 serviceLevelTimerTasks = new HashMap<AsteriskQueueEntry, ServiceLevelTimerTask>();
89 }
90
91 void cancelServiceLevelTimer()
92 {
93 timer.cancel();
94 }
95
96 public String getName()
97 {
98 return name;
99 }
100
101 public Integer getMax()
102 {
103 return max;
104 }
105
106 public String getStrategy()
107 {
108 return strategy;
109 }
110
111 void setMax(Integer max)
112 {
113 this.max = max;
114 }
115
116 public Integer getServiceLevel()
117 {
118 return serviceLevel;
119 }
120
121 void setServiceLevel(Integer serviceLevel)
122 {
123 this.serviceLevel = serviceLevel;
124 }
125
126 public Integer getWeight()
127 {
128 return weight;
129 }
130
131 void setWeight(Integer weight)
132 {
133 this.weight = weight;
134 }
135
136 public List<AsteriskQueueEntry> getEntries()
137 {
138 synchronized (entries)
139 {
140 return new ArrayList<AsteriskQueueEntry>(entries);
141 }
142 }
143
144
145
146
147
148 private void shift()
149 {
150 int currentPos = 1;
151
152 synchronized (entries)
153 {
154 for (AsteriskQueueEntryImpl qe : entries)
155 {
156
157 if (qe.getPosition() != currentPos)
158 {
159 qe.setPosition(currentPos);
160 }
161 currentPos++;
162 }
163 }
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 void createNewEntry(AsteriskChannelImpl channel, int reportedPosition, Date dateReceived)
181 {
182 AsteriskQueueEntryImpl qe = new AsteriskQueueEntryImpl(server, this, channel, reportedPosition, dateReceived);
183
184 long delay = serviceLevel * 1000L;
185 if (delay > 0)
186 {
187 ServiceLevelTimerTask timerTask = new ServiceLevelTimerTask(qe);
188 timer.schedule(timerTask, delay);
189 synchronized (serviceLevelTimerTasks)
190 {
191 serviceLevelTimerTasks.put(qe, timerTask);
192 }
193 }
194
195 synchronized (entries)
196 {
197 entries.add(qe);
198
199
200
201
202 shift();
203 }
204
205
206
207
208
209 channel.setQueueEntry(qe);
210 fireNewEntry(qe);
211 server.fireNewQueueEntry(qe);
212 }
213
214
215
216
217
218
219
220
221
222
223
224
225
226 void removeEntry(AsteriskQueueEntryImpl entry, Date dateReceived)
227 {
228 synchronized (serviceLevelTimerTasks)
229 {
230 if (serviceLevelTimerTasks.containsKey(entry))
231 {
232 ServiceLevelTimerTask timerTask = serviceLevelTimerTasks.get(entry);
233 timerTask.cancel();
234 serviceLevelTimerTasks.remove(entry);
235 }
236 }
237
238 boolean changed;
239 synchronized (entries)
240 {
241 changed = entries.remove(entry);
242
243 if (changed)
244 {
245
246 shift();
247 }
248 }
249
250
251 if (changed)
252 {
253 entry.getChannel().setQueueEntry(null);
254 entry.left(dateReceived);
255 fireEntryLeave(entry);
256 }
257 }
258
259 @Override
260 public String toString()
261 {
262 final StringBuffer sb;
263
264 sb = new StringBuffer("AsteriskQueue[");
265 sb.append("name='").append(getName()).append("',");
266 sb.append("max='").append(getMax()).append("',");
267 sb.append("strategy='").append(getStrategy()).append("',");
268 sb.append("serviceLevel='").append(getServiceLevel()).append("',");
269 sb.append("weight='").append(getWeight()).append("',");
270 synchronized (entries)
271 {
272 sb.append("entries='").append(entries.toString()).append("',");
273 }
274 synchronized (members)
275 {
276 sb.append("members='").append(members.toString()).append("',");
277 }
278 sb.append("systemHashcode=").append(System.identityHashCode(this));
279 sb.append("]");
280
281 return sb.toString();
282 }
283
284 public void addAsteriskQueueListener(AsteriskQueueListener listener)
285 {
286 synchronized (listeners)
287 {
288 listeners.add(listener);
289 }
290 }
291
292 public void removeAsteriskQueueListener(AsteriskQueueListener listener)
293 {
294 synchronized (listeners)
295 {
296 listeners.remove(listener);
297 }
298 }
299
300
301
302
303
304
305 void fireNewEntry(AsteriskQueueEntryImpl entry)
306 {
307 synchronized (listeners)
308 {
309 for (AsteriskQueueListener listener : listeners)
310 {
311 try
312 {
313 listener.onNewEntry(entry);
314 }
315 catch (Exception e)
316 {
317 logger.warn("Exception in onNewEntry()", e);
318 }
319 }
320 }
321 }
322
323
324
325
326
327
328 void fireEntryLeave(AsteriskQueueEntryImpl entry)
329 {
330 synchronized (listeners)
331 {
332 for (AsteriskQueueListener listener : listeners)
333 {
334 try
335 {
336 listener.onEntryLeave(entry);
337 }
338 catch (Exception e)
339 {
340 logger.warn("Exception in onEntryLeave()", e);
341 }
342 }
343 }
344 }
345
346
347
348
349
350
351 void fireMemberAdded(AsteriskQueueMemberImpl member)
352 {
353 synchronized (listeners)
354 {
355 for (AsteriskQueueListener listener : listeners)
356 {
357 try
358 {
359 listener.onMemberAdded(member);
360 }
361 catch (Exception e)
362 {
363 logger.warn("Exception in onMemberAdded()", e);
364 }
365 }
366 }
367 }
368
369
370
371
372
373
374 void fireMemberRemoved(AsteriskQueueMemberImpl member)
375 {
376 synchronized (listeners)
377 {
378 for (AsteriskQueueListener listener : listeners)
379 {
380 try
381 {
382 listener.onMemberRemoved(member);
383 }
384 catch (Exception e)
385 {
386 logger.warn("Exception in onMemberRemoved()", e);
387 }
388 }
389 }
390 }
391
392
393
394
395
396
397 public Collection<AsteriskQueueMember> getMembers()
398 {
399 ArrayList<AsteriskQueueMember> listOfMembers = new ArrayList<AsteriskQueueMember>(members.size());
400 synchronized (members)
401 {
402 for (AsteriskQueueMemberImpl asteriskQueueMember : members.values())
403 {
404 listOfMembers.add(asteriskQueueMember);
405 }
406 }
407 return listOfMembers;
408 }
409
410
411
412
413
414
415
416 AsteriskQueueMemberImpl getMember(String location)
417 {
418 synchronized (members)
419 {
420 if (members.containsKey(location))
421 {
422 return members.get(location);
423 }
424 }
425 return null;
426 }
427
428
429
430
431
432
433 void addMember(AsteriskQueueMemberImpl member)
434 {
435 synchronized (members)
436 {
437
438 if (members.containsValue(member))
439 {
440 return;
441 }
442
443 logger.info("Adding new member to the queue " + getName() + ": " + member.toString());
444 members.put(member.getLocation(), member);
445 }
446
447 fireMemberAdded(member);
448 }
449
450
451
452
453
454
455
456 AsteriskQueueMemberImpl getMemberByLocation(String location)
457 {
458 AsteriskQueueMemberImpl member;
459 synchronized (members)
460 {
461 member = members.get(location);
462 }
463 if (member == null)
464 {
465 logger.error("Requested member at location " + location + " not found!");
466 }
467 return member;
468 }
469
470
471
472
473
474
475 void fireMemberStateChanged(AsteriskQueueMemberImpl member)
476 {
477 synchronized (listeners)
478 {
479 for (AsteriskQueueListener listener : listeners)
480 {
481 try
482 {
483 listener.onMemberStateChange(member);
484 }
485 catch (Exception e)
486 {
487 logger.warn("Exception in onMemberStateChange()", e);
488 }
489 }
490 }
491 }
492
493
494
495
496
497
498
499 AsteriskQueueEntryImpl getEntry(String channelName)
500 {
501 synchronized (entries)
502 {
503 for (AsteriskQueueEntryImpl entry : entries)
504 {
505 if (entry.getChannel().getName().equals(channelName))
506 {
507 return entry;
508 }
509 }
510 }
511 return null;
512 }
513
514
515
516
517
518
519
520 public void removeMember(AsteriskQueueMemberImpl member)
521 {
522 synchronized (members)
523 {
524
525 if (!members.containsValue(member))
526 {
527 return;
528 }
529
530 logger.info("Remove member from the queue " + getName() + ": "
531 + member.toString());
532 members.remove(member.getLocation());
533 }
534
535 fireMemberRemoved(member);
536 }
537
538 void fireServiceLevelExceeded(AsteriskQueueEntry entry)
539 {
540 synchronized (listeners)
541 {
542 for (AsteriskQueueListener listener : listeners)
543 {
544 try
545 {
546 listener.onEntryServiceLevelExceeded(entry);
547 }
548 catch (Exception e)
549 {
550 logger.warn("Exception in fireServiceLevelExceeded()", e);
551 }
552 }
553 }
554 }
555
556
557
558
559
560
561
562 AsteriskQueueEntryImpl getEntry(int position)
563 {
564
565 position--;
566 AsteriskQueueEntryImpl foundEntry = null;
567 synchronized (entries)
568 {
569 try
570 {
571 foundEntry = entries.get(position);
572 }
573 catch (IndexOutOfBoundsException e)
574 {
575
576
577 }
578 }
579 return foundEntry;
580 }
581 }