View Javadoc

1   /*
2    *  Copyright 2004-2006 Stefan Reuter
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   *
16   */
17  package org.asteriskjava.live.internal;
18  
19  import java.util.ArrayList;
20  import java.util.Collection;
21  import java.util.Date;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Timer;
25  import java.util.TimerTask;
26  
27  import org.asteriskjava.live.AsteriskQueue;
28  import org.asteriskjava.live.AsteriskQueueEntry;
29  import org.asteriskjava.live.AsteriskQueueListener;
30  import org.asteriskjava.live.AsteriskQueueMember;
31  import org.asteriskjava.util.Log;
32  import org.asteriskjava.util.LogFactory;
33  
34  /**
35   * Default implementation of the AsteriskQueue interface.
36   *
37   * @author srt
38   * @version $Id$
39   */
40  class AsteriskQueueImpl extends AbstractLiveObject implements AsteriskQueue
41  {
42      /**
43       * TimerTask that monitors exceeding service levels.
44       *
45       * @author <a href="mailto:patrick.breucking{@nospam}gonicus.de">Patrick Breucking</a>
46       */
47      private class ServiceLevelTimerTask extends TimerTask
48      {
49          private final AsteriskQueueEntry entry;
50  
51          ServiceLevelTimerTask(AsteriskQueueEntry entry)
52          {
53              this.entry = entry;
54          }
55  
56          @Override
57          public void run()
58          {
59              fireServiceLevelExceeded(entry);
60          }
61      }
62  
63      private final Log logger = LogFactory.getLog(this.getClass());
64      private final String name;
65      private Integer max;
66      private String strategy;
67      private Integer serviceLevel;
68      private Integer weight;
69      private final ArrayList<AsteriskQueueEntryImpl> entries;
70      private final Timer timer;
71      private final HashMap<String, AsteriskQueueMemberImpl> members;
72      private final List<AsteriskQueueListener> listeners;
73      private final HashMap<AsteriskQueueEntry, ServiceLevelTimerTask> serviceLevelTimerTasks;
74  
75      AsteriskQueueImpl(AsteriskServerImpl server, String name, Integer max,
76                        String strategy, Integer serviceLevel, Integer weight)
77      {
78          super(server);
79          this.name = name;
80          this.max = max;
81          this.strategy = strategy;
82          this.serviceLevel = serviceLevel;
83          this.weight = weight;
84          entries = new ArrayList<AsteriskQueueEntryImpl>(25);
85          listeners = new ArrayList<AsteriskQueueListener>();
86          members = new HashMap<String, AsteriskQueueMemberImpl>();
87          timer = new Timer("ServiceLevelTimer-" + name, true);
88          serviceLevelTimerTasks = new HashMap<AsteriskQueueEntry, ServiceLevelTimerTask>();
89      }
90  
91      void cancelServiceLevelTimer()
92      {
93          timer.cancel();
94      }
95  
96      public String getName()
97      {
98          return name;
99      }
100 
101     public Integer getMax()
102     {
103         return max;
104     }
105 
106     public String getStrategy()
107     {
108         return strategy;
109     }
110 
111     void setMax(Integer max)
112     {
113         this.max = max;
114     }
115 
116     public Integer getServiceLevel()
117     {
118         return serviceLevel;
119     }
120 
121     void setServiceLevel(Integer serviceLevel)
122     {
123         this.serviceLevel = serviceLevel;
124     }
125 
126     public Integer getWeight()
127     {
128         return weight;
129     }
130 
131     void setWeight(Integer weight)
132     {
133         this.weight = weight;
134     }
135 
136     public List<AsteriskQueueEntry> getEntries()
137     {
138         synchronized (entries)
139         {
140             return new ArrayList<AsteriskQueueEntry>(entries);
141         }
142     }
143 
144     /**
145      * Shifts the position of the queue entries if needed
146      * (and fire PCE on queue entries if appropriate).
147      */
148     private void shift()
149     {
150         int currentPos = 1; // Asterisk starts at 1
151 
152         synchronized (entries)
153         {
154             for (AsteriskQueueEntryImpl qe : entries)
155             {
156                 // Only set (and fire PCE on qe) if necessary
157                 if (qe.getPosition() != currentPos)
158                 {
159                     qe.setPosition(currentPos);
160                 }
161                 currentPos++;
162             }
163         }
164     }
165 
166     /**
167      * Creates a new AsteriskQueueEntry, adds it to this queue.<p>
168      * Fires:
169      * <ul>
170      * <li>PCE on channel</li>
171      * <li>NewEntry on this queue</li>
172      * <li>PCE on other queue entries if shifted (never happens)</li>
173      * <li>NewQueueEntry on server</li>
174      * </ul>
175      *
176      * @param channel          the channel that joined the queue
177      * @param reportedPosition the position as given by Asterisk (currently not used)
178      * @param dateReceived     the date the hannel joined the queue
179      */
180     void createNewEntry(AsteriskChannelImpl channel, int reportedPosition, Date dateReceived)
181     {
182         AsteriskQueueEntryImpl qe = new AsteriskQueueEntryImpl(server, this, channel, reportedPosition, dateReceived);
183 
184         long delay = serviceLevel * 1000L;
185         if (delay > 0)
186         {
187             ServiceLevelTimerTask timerTask = new ServiceLevelTimerTask(qe);
188             timer.schedule(timerTask, delay);
189             synchronized (serviceLevelTimerTasks)
190             {
191                 serviceLevelTimerTasks.put(qe, timerTask);
192             }
193         }
194 
195         synchronized (entries)
196         {
197             entries.add(qe); // at the end of the list
198 
199             // Keep the lock !
200             // This will fire PCE on the newly created queue entry
201             // but hopefully this one has no listeners yet
202             shift();
203         }
204 
205         // Set the channel property ony here as queue entries and channels
206         // maintain a reciprocal reference.
207         // That way property change on channel and new entry event on queue will be
208         // lanched when BOTH channel and queue are correctly set.
209         channel.setQueueEntry(qe);
210         fireNewEntry(qe);
211         server.fireNewQueueEntry(qe);
212     }
213 
214     /**
215      * Removes the given queue entry from the queue.<p>
216      * Fires if needed:
217      * <ul>
218      * <li>PCE on channel</li>
219      * <li>EntryLeave on this queue</li>
220      * <li>PCE on other queue entries if shifted</li>
221      * </ul>
222      *
223      * @param entry        an existing entry object.
224      * @param dateReceived the remove event was received.
225      */
226     void removeEntry(AsteriskQueueEntryImpl entry, Date dateReceived)
227     {
228         synchronized (serviceLevelTimerTasks)
229         {
230             if (serviceLevelTimerTasks.containsKey(entry))
231             {
232                 ServiceLevelTimerTask timerTask = serviceLevelTimerTasks.get(entry);
233                 timerTask.cancel();
234                 serviceLevelTimerTasks.remove(entry);
235             }
236         }
237 
238         boolean changed;
239         synchronized (entries)
240         {
241             changed = entries.remove(entry);
242 
243             if (changed)
244             {
245                 // Keep the lock !
246                 shift();
247             }
248         }
249 
250         // Fire outside lock
251         if (changed)
252         {
253             entry.getChannel().setQueueEntry(null);
254             entry.left(dateReceived);
255             fireEntryLeave(entry);
256         }
257     }
258 
259     @Override
260     public String toString()
261     {
262         final StringBuffer sb;
263 
264         sb = new StringBuffer("AsteriskQueue[");
265         sb.append("name='").append(getName()).append("',");
266         sb.append("max='").append(getMax()).append("',");
267         sb.append("strategy='").append(getStrategy()).append("',");
268         sb.append("serviceLevel='").append(getServiceLevel()).append("',");
269         sb.append("weight='").append(getWeight()).append("',");
270         synchronized (entries)
271         {
272             sb.append("entries='").append(entries.toString()).append("',");
273         }
274         synchronized (members)
275         {
276             sb.append("members='").append(members.toString()).append("',");
277         }
278         sb.append("systemHashcode=").append(System.identityHashCode(this));
279         sb.append("]");
280 
281         return sb.toString();
282     }
283 
284     public void addAsteriskQueueListener(AsteriskQueueListener listener)
285     {
286         synchronized (listeners)
287         {
288             listeners.add(listener);
289         }
290     }
291 
292     public void removeAsteriskQueueListener(AsteriskQueueListener listener)
293     {
294         synchronized (listeners)
295         {
296             listeners.remove(listener);
297         }
298     }
299 
300     /**
301      * Notifies all registered listener that an entry joins the queue.
302      *
303      * @param entry that joins the queue
304      */
305     void fireNewEntry(AsteriskQueueEntryImpl entry)
306     {
307         synchronized (listeners)
308         {
309             for (AsteriskQueueListener listener : listeners)
310             {
311                 try
312                 {
313                     listener.onNewEntry(entry);
314                 }
315                 catch (Exception e)
316                 {
317                     logger.warn("Exception in onNewEntry()", e);
318                 }
319             }
320         }
321     }
322 
323     /**
324      * Notifies all registered listener that an entry leaves the queue.
325      *
326      * @param entry that leaves the queue.
327      */
328     void fireEntryLeave(AsteriskQueueEntryImpl entry)
329     {
330         synchronized (listeners)
331         {
332             for (AsteriskQueueListener listener : listeners)
333             {
334                 try
335                 {
336                     listener.onEntryLeave(entry);
337                 }
338                 catch (Exception e)
339                 {
340                     logger.warn("Exception in onEntryLeave()", e);
341                 }
342             }
343         }
344     }
345 
346     /**
347      * Notifies all registered listener that a member has been added to the queue.
348      *
349      * @param member added to the queue
350      */
351     void fireMemberAdded(AsteriskQueueMemberImpl member)
352     {
353         synchronized (listeners)
354         {
355             for (AsteriskQueueListener listener : listeners)
356             {
357                 try
358                 {
359                     listener.onMemberAdded(member);
360                 }
361                 catch (Exception e)
362                 {
363                     logger.warn("Exception in onMemberAdded()", e);
364                 }
365             }
366         }
367     }
368 
369     /**
370      * Notifies all registered listener that a member has been removed from the queue.
371      *
372      * @param member that has been removed.
373      */
374     void fireMemberRemoved(AsteriskQueueMemberImpl member)
375     {
376         synchronized (listeners)
377         {
378             for (AsteriskQueueListener listener : listeners)
379             {
380                 try
381                 {
382                     listener.onMemberRemoved(member);
383                 }
384                 catch (Exception e)
385                 {
386                     logger.warn("Exception in onMemberRemoved()", e);
387                 }
388             }
389         }
390     }
391 
392     /**
393      * Returns a collection of members of this queue.
394      *
395      * @see org.asteriskjava.live.AsteriskQueue#getMembers()
396      */
397     public Collection<AsteriskQueueMember> getMembers()
398     {
399         ArrayList<AsteriskQueueMember> listOfMembers = new ArrayList<AsteriskQueueMember>(members.size());
400         synchronized (members)
401         {
402             for (AsteriskQueueMemberImpl asteriskQueueMember : members.values())
403             {
404                 listOfMembers.add(asteriskQueueMember);
405             }
406         }
407         return listOfMembers;
408     }
409 
410     /**
411      * Returns a member by its location.
412      *
413      * @param location ot the member
414      * @return the member by its location.
415      */
416     AsteriskQueueMemberImpl getMember(String location)
417     {
418         synchronized (members)
419         {
420             if (members.containsKey(location))
421             {
422                 return members.get(location);
423             }
424         }
425         return null;
426     }
427 
428     /**
429      * Add a new member to this queue.
430      *
431      * @param member to add
432      */
433     void addMember(AsteriskQueueMemberImpl member)
434     {
435         synchronized (members)
436         {
437             // Check if member already exists
438             if (members.containsValue(member))
439             {
440                 return;
441             }
442             // If not, add the new member.
443             logger.info("Adding new member to the queue " + getName() + ": " + member.toString());
444             members.put(member.getLocation(), member);
445         }
446         
447         fireMemberAdded(member);
448     }
449 
450     /**
451      * Retrieves a member by its location.
452      *
453      * @param location of the member
454      * @return the requested member.
455      */
456     AsteriskQueueMemberImpl getMemberByLocation(String location)
457     {
458         AsteriskQueueMemberImpl member;
459         synchronized (members)
460         {
461             member = members.get(location);
462         }
463         if (member == null)
464         {
465             logger.error("Requested member at location " + location + " not found!");
466         }
467         return member;
468     }
469 
470     /**
471      * Notifies all registered listener that a queue member changes its state.
472      *
473      * @param member the changed member.
474      */
475     void fireMemberStateChanged(AsteriskQueueMemberImpl member)
476     {
477         synchronized (listeners)
478         {
479             for (AsteriskQueueListener listener : listeners)
480             {
481                 try
482                 {
483                     listener.onMemberStateChange(member);
484                 }
485                 catch (Exception e)
486                 {
487                     logger.warn("Exception in onMemberStateChange()", e);
488                 }
489             }
490         }
491     }
492 
493     /**
494      * Gets an entry of the queue by its channel name.
495      *
496      * @param channelName The entry's channel name.
497      * @return the queue entry if found, null otherwise.
498      */
499     AsteriskQueueEntryImpl getEntry(String channelName)
500     {
501         synchronized (entries)
502         {
503             for (AsteriskQueueEntryImpl entry : entries)
504             {
505                 if (entry.getChannel().getName().equals(channelName))
506                 {
507                     return entry;
508                 }
509             }
510         }
511         return null;
512     }
513 
514 
515     /**
516      * Removes a member from this queue.
517      *
518      * @param member the member to remove.
519      */
520     public void removeMember(AsteriskQueueMemberImpl member)
521     {
522         synchronized (members)
523         {
524             // Check if member exists
525             if (!members.containsValue(member))
526             {
527                 return;
528             }
529             // If so, remove the member.
530             logger.info("Remove member from the queue " + getName() + ": "
531                     + member.toString());
532             members.remove(member.getLocation());
533         }
534         
535         fireMemberRemoved(member);
536     }
537 
538     void fireServiceLevelExceeded(AsteriskQueueEntry entry)
539     {
540         synchronized (listeners)
541         {
542             for (AsteriskQueueListener listener : listeners)
543             {
544                 try
545                 {
546                     listener.onEntryServiceLevelExceeded(entry);
547                 }
548                 catch (Exception e)
549                 {
550                     logger.warn("Exception in fireServiceLevelExceeded()", e);
551                 }
552             }
553         }
554     }
555 
556     /**
557      * Gets an entry by its (estimated) position in the queue.
558      *
559      * @param position the position, starting at 1.
560      * @return the queue entry if exiting at this position, null otherwise.
561      */
562     AsteriskQueueEntryImpl getEntry(int position)
563     {
564         // positions in asterisk start at 1, but list starts at 0
565         position--;
566         AsteriskQueueEntryImpl foundEntry = null;
567         synchronized (entries)
568         {
569             try
570             {
571                 foundEntry = entries.get(position);
572             }
573             catch (IndexOutOfBoundsException e)
574             {
575                 // For consistency with the above method,
576                 // swallow. We might indeed request the 1st one from time to time
577             } // NOPMD
578         }
579         return foundEntry;
580     }
581 }