View Javadoc

1   /*
2    *  Copyright 2004-2006 Stefan Reuter
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   *
16   */
17  package org.asteriskjava.manager.internal;
18  
19  import static org.asteriskjava.manager.ManagerConnectionState.CONNECTED;
20  import static org.asteriskjava.manager.ManagerConnectionState.CONNECTING;
21  import static org.asteriskjava.manager.ManagerConnectionState.DISCONNECTED;
22  import static org.asteriskjava.manager.ManagerConnectionState.DISCONNECTING;
23  import static org.asteriskjava.manager.ManagerConnectionState.INITIAL;
24  import static org.asteriskjava.manager.ManagerConnectionState.RECONNECTING;
25  
26  import java.io.IOException;
27  import java.io.Serializable;
28  import java.net.Socket;
29  import java.net.InetAddress;
30  import java.security.MessageDigest;
31  import java.security.NoSuchAlgorithmException;
32  import java.util.ArrayList;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.concurrent.atomic.AtomicLong;
37  import java.util.regex.Matcher;
38  import java.util.regex.Pattern;
39  
40  import org.asteriskjava.AsteriskVersion;
41  import org.asteriskjava.manager.*;
42  import org.asteriskjava.manager.action.ChallengeAction;
43  import org.asteriskjava.manager.action.CommandAction;
44  import org.asteriskjava.manager.action.EventGeneratingAction;
45  import org.asteriskjava.manager.action.LoginAction;
46  import org.asteriskjava.manager.action.LogoffAction;
47  import org.asteriskjava.manager.action.ManagerAction;
48  import org.asteriskjava.manager.event.ConnectEvent;
49  import org.asteriskjava.manager.event.DisconnectEvent;
50  import org.asteriskjava.manager.event.ManagerEvent;
51  import org.asteriskjava.manager.event.ProtocolIdentifierReceivedEvent;
52  import org.asteriskjava.manager.event.ResponseEvent;
53  import org.asteriskjava.manager.response.ChallengeResponse;
54  import org.asteriskjava.manager.response.CommandResponse;
55  import org.asteriskjava.manager.response.ManagerError;
56  import org.asteriskjava.manager.response.ManagerResponse;
57  import org.asteriskjava.util.DateUtil;
58  import org.asteriskjava.util.Log;
59  import org.asteriskjava.util.LogFactory;
60  import org.asteriskjava.util.SocketConnectionFacade;
61  import org.asteriskjava.util.internal.SocketConnectionFacadeImpl;
62  import org.asteriskjava.manager.action.UserEventAction;
63  
64  /**
65   * Internal implemention of the ManagerConnection interface.
66   *
67   * @author srt
68   * @version $Id$
69   * @see org.asteriskjava.manager.ManagerConnectionFactory
70   */
71  public class ManagerConnectionImpl implements ManagerConnection, Dispatcher
72  {
73      private static final int RECONNECTION_INTERVAL_1 = 50;
74      private static final int RECONNECTION_INTERVAL_2 = 5000;
75      private static final String DEFAULT_HOSTNAME = "localhost";
76      private static final int DEFAULT_PORT = 5038;
77      private static final int RECONNECTION_VERSION_INTERVAL = 500;
78      private static final int MAX_VERSION_ATTEMPTS = 4;
79      private static final Pattern SHOW_VERSION_PATTERN = Pattern.compile("^(core )?show version.*");
80  
81      private static final AtomicLong idCounter = new AtomicLong(0);
82  
83      /**
84       * Instance logger.
85       */
86      private final Log logger = LogFactory.getLog(getClass());
87  
88      private final long id;
89  
90      /**
91       * Used to construct the internalActionId.
92       */
93      private AtomicLong actionIdCounter = new AtomicLong(0);
94  
95      /* Config attributes */
96      /**
97       * Hostname of the Asterisk server to connect to.
98       */
99      private String hostname = DEFAULT_HOSTNAME;
100 
101     /**
102      * TCP port to connect to.
103      */
104     private int port = DEFAULT_PORT;
105 
106     /**
107      * <code>true</code> to use SSL for the connection, <code>false</code>
108      * for a plain text connection.
109      */
110     private boolean ssl = false;
111 
112     /**
113      * The username to use for login as defined in Asterisk's
114      * <code>manager.conf</code>.
115      */
116     protected String username;
117 
118     /**
119      * The password to use for login as defined in Asterisk's
120      * <code>manager.conf</code>.
121      */
122     protected String password;
123 
124     /**
125      * The default timeout to wait for a ManagerResponse after sending a
126      * ManagerAction.
127      */
128     private long defaultResponseTimeout = 2000;
129 
130     /**
131      * The default timeout to wait for the last ResponseEvent after sending an
132      * EventGeneratingAction.
133      */
134     private long defaultEventTimeout = 5000;
135 
136     /**
137      * The timeout to use when connecting the the Asterisk server.
138      */
139     private int socketTimeout = 0;
140 
141     /**
142      * Closes the connection (and reconnects) if no input has been read for the given amount
143      * of milliseconds. A timeout of zero is interpreted as an infinite timeout.
144      *
145      * @see Socket#setSoTimeout(int)
146      */
147     private int socketReadTimeout = 0;
148 
149     /**
150      * <code>true</code> to continue to reconnect after an authentication failure.
151      */
152     private boolean keepAliveAfterAuthenticationFailure = true;
153 
154     /**
155      * The socket to use for TCP/IP communication with Asterisk.
156      */
157     private SocketConnectionFacade socket;
158 
159     /**
160      * The thread that runs the reader.
161      */
162     private Thread readerThread;
163     private final AtomicLong readerThreadCounter = new AtomicLong(0);
164 
165     private final AtomicLong reconnectThreadCounter = new AtomicLong(0);
166 
167     /**
168      * The reader to use to receive events and responses from asterisk.
169      */
170     private ManagerReader reader;
171 
172     /**
173      * The writer to use to send actions to asterisk.
174      */
175     private ManagerWriter writer;
176 
177     /**
178      * The protocol identifer Asterisk sends on connect wrapped into an object
179      * to be used as mutex.
180      */
181     private final ProtocolIdentifierWrapper protocolIdentifier;
182 
183     /**
184      * The version of the Asterisk server we are connected to.
185      */
186     private AsteriskVersion version;
187 
188     /**
189      * Contains the registered handlers that process the ManagerResponses.
190      * <p/>
191      * Key is the internalActionId of the Action sent and value the
192      * corresponding ResponseListener.
193      */
194     private final Map<String, SendActionCallback> responseListeners;
195 
196     /**
197      * Contains the event handlers that handle ResponseEvents for the
198      * sendEventGeneratingAction methods.
199      * <p/>
200      * Key is the internalActionId of the Action sent and value the
201      * corresponding EventHandler.
202      */
203     private final Map<String, ManagerEventListener> responseEventListeners;
204 
205     /**
206      * Contains the event handlers that users registered.
207      */
208     private final List<ManagerEventListener> eventListeners;
209 
210     protected ManagerConnectionState state = INITIAL;
211 
212     private String eventMask;
213 
214     /**
215      * Creates a new instance.
216      */
217     public ManagerConnectionImpl()
218     {
219         this.id = idCounter.getAndIncrement();
220         this.responseListeners = new HashMap<String, SendActionCallback>();
221         this.responseEventListeners = new HashMap<String, ManagerEventListener>();
222         this.eventListeners = new ArrayList<ManagerEventListener>();
223         this.protocolIdentifier = new ProtocolIdentifierWrapper();
224     }
225 
226     // the following two methods can be overriden when running test cases to
227     // return a mock object
228     protected ManagerReader createReader(Dispatcher dispatcher, Object source)
229     {
230         return new ManagerReaderImpl(dispatcher, source);
231     }
232 
233     protected ManagerWriter createWriter()
234     {
235         return new ManagerWriterImpl();
236     }
237 
238     /**
239      * Sets the hostname of the asterisk server to connect to.
240      * <p/>
241      * Default is <code>localhost</code>.
242      *
243      * @param hostname the hostname to connect to
244      */
245     public void setHostname(String hostname)
246     {
247         this.hostname = hostname;
248     }
249 
250     /**
251      * Sets the port to use to connect to the asterisk server. This is the port
252      * specified in asterisk's <code>manager.conf</code> file.
253      * <p/>
254      * Default is 5038.
255      *
256      * @param port the port to connect to
257      */
258     public void setPort(int port)
259     {
260         if (port <= 0)
261         {
262             this.port = DEFAULT_PORT;
263         }
264         else
265         {
266             this.port = port;
267         }
268     }
269 
270     /**
271      * Sets whether to use SSL.
272      * <p/>
273      * Default is false.
274      *
275      * @param ssl <code>true</code> to use SSL for the connection,
276      *            <code>false</code> for a plain text connection.
277      * @since 0.3
278      */
279     public void setSsl(boolean ssl)
280     {
281         this.ssl = ssl;
282     }
283 
284     /**
285      * Sets the username to use to connect to the asterisk server. This is the
286      * username specified in asterisk's <code>manager.conf</code> file.
287      *
288      * @param username the username to use for login
289      */
290     public void setUsername(String username)
291     {
292         this.username = username;
293     }
294 
295     /**
296      * Sets the password to use to connect to the asterisk server. This is the
297      * password specified in Asterisk's <code>manager.conf</code> file.
298      *
299      * @param password the password to use for login
300      */
301     public void setPassword(String password)
302     {
303         this.password = password;
304     }
305 
306     /**
307      * Sets the time in milliseconds the synchronous method
308      * {@link #sendAction(ManagerAction)} will wait for a response before
309      * throwing a TimeoutException.
310      * <p/>
311      * Default is 2000.
312      *
313      * @param defaultResponseTimeout default response timeout in milliseconds
314      * @since 0.2
315      */
316     public void setDefaultResponseTimeout(long defaultResponseTimeout)
317     {
318         this.defaultResponseTimeout = defaultResponseTimeout;
319     }
320 
321     /**
322      * Sets the time in milliseconds the synchronous method
323      * {@link #sendEventGeneratingAction(EventGeneratingAction)} will wait for a
324      * response and the last response event before throwing a TimeoutException.
325      * <p/>
326      * Default is 5000.
327      *
328      * @param defaultEventTimeout default event timeout in milliseconds
329      * @since 0.2
330      */
331     public void setDefaultEventTimeout(long defaultEventTimeout)
332     {
333         this.defaultEventTimeout = defaultEventTimeout;
334     }
335 
336     /**
337      * Set to <code>true</code> to try reconnecting to ther asterisk serve
338      * even if the reconnection attempt threw an AuthenticationFailedException.
339      * <p/>
340      * Default is <code>true</code>.
341      *
342      * @param keepAliveAfterAuthenticationFailure
343      *         <code>true</code> to try reconnecting to ther asterisk serve
344      *         even if the reconnection attempt threw an AuthenticationFailedException,
345      *         <code>false</code> otherwise.
346      */
347     public void setKeepAliveAfterAuthenticationFailure(boolean keepAliveAfterAuthenticationFailure)
348     {
349         this.keepAliveAfterAuthenticationFailure = keepAliveAfterAuthenticationFailure;
350     }
351 
352     /* Implementation of ManagerConnection interface */
353 
354     public String getUsername()
355     {
356         return username;
357     }
358 
359     public String getPassword()
360     {
361         return password;
362     }
363 
364     public AsteriskVersion getVersion()
365     {
366         return version;
367     }
368 
369     public String getHostname()
370     {
371         return hostname;
372     }
373 
374     public int getPort()
375     {
376         return port;
377     }
378 
379     public boolean isSsl()
380     {
381         return ssl;
382     }
383 
384     public InetAddress getLocalAddress()
385     {
386         return socket.getLocalAddress();
387     }
388 
389     public int getLocalPort()
390     {
391         return socket.getLocalPort();
392     }
393 
394     public InetAddress getRemoteAddress()
395     {
396         return socket.getRemoteAddress();
397     }
398 
399     public int getRemotePort()
400     {
401         return socket.getRemotePort();
402     }
403 
404     public void registerUserEventClass(Class<? extends ManagerEvent> userEventClass)
405     {
406         if (reader == null)
407         {
408             reader = createReader(this, this);
409         }
410 
411         reader.registerEventClass(userEventClass);
412     }
413 
414     public void setSocketTimeout(int socketTimeout)
415     {
416         this.socketTimeout = socketTimeout;
417     }
418 
419     public void setSocketReadTimeout(int socketReadTimeout)
420     {
421         this.socketReadTimeout = socketReadTimeout;
422     }
423 
424     public synchronized void login() throws IOException, AuthenticationFailedException, TimeoutException
425     {
426         login(null);
427     }
428 
429     public synchronized void login(String eventMask) throws IOException, AuthenticationFailedException, TimeoutException
430     {
431         if (state != INITIAL && state != DISCONNECTED)
432         {
433             throw new IllegalStateException("Login may only be perfomed when in state "
434                     + "INITIAL or DISCONNECTED, but connection is in state " + state);
435         }
436 
437         state = CONNECTING;
438         this.eventMask = eventMask;
439         try
440         {
441             doLogin(defaultResponseTimeout, eventMask);
442         }
443         finally
444         {
445             if (state != CONNECTED)
446             {
447                 state = DISCONNECTED;
448             }
449         }
450     }
451 
452     /**
453      * Does the real login, following the steps outlined below.
454      * <p/>
455      * <ol>
456      * <li>Connects to the asterisk server by calling {@link #connect()} if not
457      * already connected
458      * <li>Waits until the protocol identifier is received but not longer than
459      * timeout ms.
460      * <li>Sends a {@link ChallengeAction} requesting a challenge for authType
461      * MD5.
462      * <li>When the {@link ChallengeResponse} is received a {@link LoginAction}
463      * is sent using the calculated key (MD5 hash of the password appended to
464      * the received challenge).
465      * </ol>
466      *
467      * @param timeout   the maximum time to wait for the protocol identifier (in
468      *                  ms)
469      * @param eventMask the event mask. Set to "on" if all events should be
470      *                  send, "off" if not events should be sent or a combination of
471      *                  "system", "call" and "log" (separated by ',') to specify what
472      *                  kind of events should be sent.
473      * @throws IOException                   if there is an i/o problem.
474      * @throws AuthenticationFailedException if username or password are
475      *                                       incorrect and the login action returns an error or if the MD5
476      *                                       hash cannot be computed. The connection is closed in this
477      *                                       case.
478      * @throws TimeoutException              if a timeout occurs while waiting for the
479      *                                       protocol identifier. The connection is closed in this case.
480      */
481     protected synchronized void doLogin(long timeout, String eventMask) throws IOException, AuthenticationFailedException,
482             TimeoutException
483     {
484         ChallengeAction challengeAction;
485         ManagerResponse challengeResponse;
486         String challenge;
487         String key;
488         LoginAction loginAction;
489         ManagerResponse loginResponse;
490 
491         if (socket == null)
492         {
493             connect();
494         }
495 
496         synchronized (protocolIdentifier)
497         {
498             if (protocolIdentifier.value == null)
499             {
500                 try
501                 {
502                     protocolIdentifier.wait(timeout);
503                 }
504                 catch (InterruptedException e) // NOPMD
505                 {
506                     Thread.currentThread().interrupt();
507                 }
508             }
509 
510             if (protocolIdentifier.value == null)
511             {
512                 disconnect();
513                 if (reader != null && reader.getTerminationException() != null)
514                 {
515                     throw reader.getTerminationException();
516                 }
517                 else
518                 {
519                     throw new TimeoutException("Timeout waiting for protocol identifier");
520                 }
521             }
522         }
523 
524         challengeAction = new ChallengeAction("MD5");
525         try
526         {
527             challengeResponse = sendAction(challengeAction);
528         }
529         catch (Exception e)
530         {
531             disconnect();
532             throw new AuthenticationFailedException("Unable to send challenge action", e);
533         }
534 
535         if (challengeResponse instanceof ChallengeResponse)
536         {
537             challenge = ((ChallengeResponse) challengeResponse).getChallenge();
538         }
539         else
540         {
541             disconnect();
542             throw new AuthenticationFailedException("Unable to get challenge from Asterisk. ChallengeAction returned: "
543                     + challengeResponse.getMessage());
544         }
545 
546         try
547         {
548             MessageDigest md;
549 
550             md = MessageDigest.getInstance("MD5");
551             if (challenge != null)
552             {
553                 md.update(challenge.getBytes());
554             }
555             if (password != null)
556             {
557                 md.update(password.getBytes());
558             }
559             key = ManagerUtil.toHexString(md.digest());
560         }
561         catch (NoSuchAlgorithmException ex)
562         {
563             disconnect();
564             throw new AuthenticationFailedException("Unable to create login key using MD5 Message Digest", ex);
565         }
566 
567         loginAction = new LoginAction(username, "MD5", key, eventMask);
568         try
569         {
570             loginResponse = sendAction(loginAction);
571         }
572         catch (Exception e)
573         {
574             disconnect();
575             throw new AuthenticationFailedException("Unable to send login action", e);
576         }
577 
578         if (loginResponse instanceof ManagerError)
579         {
580             disconnect();
581             throw new AuthenticationFailedException(loginResponse.getMessage());
582         }
583 
584         logger.info("Successfully logged in");
585 
586         version = determineVersion();
587 
588         state = CONNECTED;
589 
590         writer.setTargetVersion(version);
591 
592         logger.info("Determined Asterisk version: " + version);
593 
594         // generate pseudo event indicating a successful login
595         ConnectEvent connectEvent = new ConnectEvent(this);
596         connectEvent.setProtocolIdentifier(getProtocolIdentifier());
597         connectEvent.setDateReceived(DateUtil.getDate());
598         // TODO could this cause a deadlock?
599         fireEvent(connectEvent);
600     }
601 
602     protected AsteriskVersion determineVersion() throws IOException, TimeoutException
603     {
604         int attempts = 0;
605 
606 //        if ("Asterisk Call Manager/1.1".equals(protocolIdentifier.value))
607 //        {
608 //            return AsteriskVersion.ASTERISK_1_6;
609 //        }
610 
611         while (attempts++ < MAX_VERSION_ATTEMPTS)
612         {
613             final ManagerResponse showVersionFilesResponse;
614             final List<String> showVersionFilesResult;
615 
616             // increase timeout as output is quite large
617             showVersionFilesResponse = sendAction(new CommandAction("show version files pbx.c"), defaultResponseTimeout * 2);
618             if (!(showVersionFilesResponse instanceof CommandResponse))
619             {
620                 // return early in case of permission problems
621                 // org.asteriskjava.manager.response.ManagerError:
622                 // actionId='null'; message='Permission denied'; response='Error';
623                 // uniqueId='null'; systemHashcode=15231583
624                 break;
625             }
626 
627             showVersionFilesResult = ((CommandResponse) showVersionFilesResponse).getResult();
628             if (showVersionFilesResult != null && showVersionFilesResult.size() > 0)
629             {
630                 final String line1;
631 
632                 line1 = showVersionFilesResult.get(0);
633                 if (line1 != null && line1.startsWith("File"))
634                 {
635                     final String rawVersion;
636 
637                     rawVersion = getRawVersion();
638                     if (rawVersion != null && rawVersion.startsWith("Asterisk 1.4"))
639                     {
640                         return AsteriskVersion.ASTERISK_1_4;
641                     }
642                     return AsteriskVersion.ASTERISK_1_2;
643                 }
644                 else if (line1 != null && line1.contains("No such command"))
645                 {
646                 	final ManagerResponse coreShowVersionResponse = sendAction(new CommandAction("core show version"), defaultResponseTimeout * 2);
647                 	
648                 	if(coreShowVersionResponse != null && coreShowVersionResponse instanceof CommandResponse){
649                 		final List<String> coreShowVersionResult = ((CommandResponse) coreShowVersionResponse).getResult();
650                 		
651                 		if(coreShowVersionResult != null && coreShowVersionResult.size() > 0){
652                 			final String coreLine = coreShowVersionResult.get(0);
653                 			
654                 			 if (coreLine != null && coreLine.startsWith("Asterisk 1.6"))
655                              {
656                                  return AsteriskVersion.ASTERISK_1_6;
657                              }
658                              else if (coreLine != null && coreLine.startsWith("Asterisk 1.8"))
659                              {
660                                  return AsteriskVersion.ASTERISK_1_8;
661                              }
662                 		}
663                 	}
664                 	
665                     try
666                     {
667                         Thread.sleep(RECONNECTION_VERSION_INTERVAL);
668                     }
669                     catch (Exception ex)
670                     {
671                         // ingnore
672                     } // NOPMD
673                 }
674                 else
675                 {
676                     // if it isn't the "no such command", break and return the lowest version immediately
677                     break;
678                 }
679             }
680         }
681 
682         // as a fallback assume 1.6
683         return AsteriskVersion.ASTERISK_1_6;
684     }
685 
686     protected String getRawVersion()
687     {
688         final ManagerResponse showVersionResponse;
689 
690         try
691         {
692             showVersionResponse = sendAction(new CommandAction("show version"), defaultResponseTimeout * 2);
693         }
694         catch (Exception e)
695         {
696             return null;
697         }
698 
699         if (showVersionResponse instanceof CommandResponse)
700         {
701             final List<String> showVersionResult;
702 
703             showVersionResult = ((CommandResponse) showVersionResponse).getResult();
704             if (showVersionResult != null && showVersionResult.size() > 0)
705             {
706                 return showVersionResult.get(0);
707             }
708         }
709 
710         return null;
711     }
712 
713     protected synchronized void connect() throws IOException
714     {
715         logger.info("Connecting to " + hostname + ":" + port);
716 
717         if (reader == null)
718         {
719             logger.debug("Creating reader for " + hostname + ":" + port);
720             reader = createReader(this, this);
721         }
722 
723         if (writer == null)
724         {
725             logger.debug("Creating writer");
726             writer = createWriter();
727         }
728 
729         logger.debug("Creating socket");
730         socket = createSocket();
731 
732         logger.debug("Passing socket to reader");
733         reader.setSocket(socket);
734 
735         if (readerThread == null || !readerThread.isAlive() || reader.isDead())
736         {
737             logger.debug("Creating and starting reader thread");
738             readerThread = new Thread(reader);
739             readerThread.setName("Asterisk-Java ManagerConnection-" + id + "-Reader-"
740                     + readerThreadCounter.getAndIncrement());
741             readerThread.setDaemon(true);
742             readerThread.start();
743         }
744 
745         logger.debug("Passing socket to writer");
746         writer.setSocket(socket);
747     }
748 
749     protected SocketConnectionFacade createSocket() throws IOException
750     {
751         return new SocketConnectionFacadeImpl(hostname, port, ssl, socketTimeout, socketReadTimeout);
752     }
753 
754     public synchronized void logoff() throws IllegalStateException
755     {
756         if (state != CONNECTED && state != RECONNECTING)
757         {
758             throw new IllegalStateException("Logoff may only be perfomed when in state "
759                     + "CONNECTED or RECONNECTING, but connection is in state " + state);
760         }
761 
762         state = DISCONNECTING;
763 
764         if (socket != null)
765         {
766             try
767             {
768                 sendAction(new LogoffAction());
769             }
770             catch (Exception e)
771             {
772                 logger.warn("Unable to send LogOff action", e);
773             }
774         }
775         cleanup();
776         state = DISCONNECTED;
777     }
778 
779     /**
780      * Closes the socket connection.
781      */
782     protected synchronized void disconnect()
783     {
784         if (socket != null)
785         {
786             logger.info("Closing socket.");
787             try
788             {
789                 socket.close();
790             }
791             catch (IOException ex)
792             {
793                 logger.warn("Unable to close socket: " + ex.getMessage());
794             }
795             socket = null;
796         }
797         protocolIdentifier.value = null;
798     }
799 
800     public ManagerResponse sendAction(ManagerAction action) throws IOException, TimeoutException, IllegalArgumentException,
801             IllegalStateException
802     {
803         return sendAction(action, defaultResponseTimeout);
804     }
805 
806     /*
807      * Implements synchronous sending of "simple" actions.
808      */
809     public ManagerResponse sendAction(ManagerAction action, long timeout) throws IOException, TimeoutException,
810             IllegalArgumentException, IllegalStateException
811     {
812         ResponseHandlerResult result;
813         SendActionCallback callbackHandler;
814 
815         result = new ResponseHandlerResult();
816         callbackHandler = new DefaultSendActionCallback(result);
817 
818         synchronized (result)
819         {
820             sendAction(action, callbackHandler);
821 
822             // definitely return null for the response of user events
823             if (action instanceof UserEventAction)
824             {
825                 return null;
826             }
827 
828             // only wait if we did not yet receive the response.
829             // Responses may be returned really fast.
830             if (result.getResponse() == null)
831             {
832                 try
833                 {
834                     result.wait(timeout);
835                 }
836                 catch (InterruptedException ex)
837                 {
838                     logger.warn("Interrupted while waiting for result");
839                     Thread.currentThread().interrupt();
840                 }
841             }
842         }
843 
844         // still no response?
845         if (result.getResponse() == null)
846         {
847             throw new TimeoutException("Timeout waiting for response to " + action.getAction()
848                     + (action.getActionId() == null ? "" : " (actionId: " + action.getActionId() + ")"));
849         }
850 
851         return result.getResponse();
852     }
853 
854     public void sendAction(ManagerAction action, SendActionCallback callback) throws IOException, IllegalArgumentException,
855             IllegalStateException
856     {
857         final String internalActionId;
858 
859         if (action == null)
860         {
861             throw new IllegalArgumentException("Unable to send action: action is null.");
862         }
863 
864         // In general sending actions is only allowed while connected, though
865         // there are a few exceptions, these are handled here:
866         if ((state == CONNECTING || state == RECONNECTING)
867                 && (action instanceof ChallengeAction || action instanceof LoginAction || isShowVersionCommandAction(action)))
868         {
869             // when (re-)connecting challenge and login actions are ok.
870         } // NOPMD
871         else if (state == DISCONNECTING && action instanceof LogoffAction)
872         {
873             // when disconnecting logoff action is ok.
874         } // NOPMD
875         else if (state != CONNECTED)
876         {
877             throw new IllegalStateException("Actions may only be sent when in state "
878                     + "CONNECTED, but connection is in state " + state);
879         }
880 
881         if (socket == null)
882         {
883             throw new IllegalStateException("Unable to send " + action.getAction() + " action: socket not connected.");
884         }
885 
886         internalActionId = createInternalActionId();
887 
888         // if the callbackHandler is null the user is obviously not interested
889         // in the response, thats fine.
890         if (callback != null)
891         {
892             synchronized (this.responseListeners)
893             {
894                 this.responseListeners.put(internalActionId, callback);
895             }
896         }
897 
898         Class<? extends ManagerResponse> responseClass = getExpectedResponseClass(action.getClass());
899         if (responseClass != null)
900         {
901             reader.expectResponseClass(internalActionId, responseClass);
902         }
903 
904         writer.sendAction(action, internalActionId);
905     }
906 
907     boolean isShowVersionCommandAction(ManagerAction action)
908     {
909         if (! (action instanceof CommandAction)) {
910             return false;
911         }
912         final Matcher showVersionMatcher = SHOW_VERSION_PATTERN.matcher(((CommandAction)action).getCommand());
913         return showVersionMatcher.matches();
914     }
915 
916     private Class<? extends ManagerResponse> getExpectedResponseClass(Class<? extends ManagerAction> actionClass)
917     {
918         final ExpectedResponse annotation = actionClass.getAnnotation(ExpectedResponse.class);
919         if (annotation == null)
920         {
921             return null;
922         }
923 
924         return annotation.value();
925     }
926 
927     public ResponseEvents sendEventGeneratingAction(EventGeneratingAction action) throws IOException, EventTimeoutException,
928             IllegalArgumentException, IllegalStateException
929     {
930         return sendEventGeneratingAction(action, defaultEventTimeout);
931     }
932 
933     /*
934      * Implements synchronous sending of event generating actions.
935      */
936     public ResponseEvents sendEventGeneratingAction(EventGeneratingAction action, long timeout) throws IOException,
937             EventTimeoutException, IllegalArgumentException, IllegalStateException
938     {
939         final ResponseEventsImpl responseEvents;
940         final ResponseEventHandler responseEventHandler;
941         final String internalActionId;
942 
943         if (action == null)
944         {
945             throw new IllegalArgumentException("Unable to send action: action is null.");
946         }
947         else if (action.getActionCompleteEventClass() == null)
948         {
949             throw new IllegalArgumentException("Unable to send action: actionCompleteEventClass for "
950                     + action.getClass().getName() + " is null.");
951         }
952         else if (!ResponseEvent.class.isAssignableFrom(action.getActionCompleteEventClass()))
953         {
954             throw new IllegalArgumentException("Unable to send action: actionCompleteEventClass ("
955                     + action.getActionCompleteEventClass().getName() + ") for " + action.getClass().getName()
956                     + " is not a ResponseEvent.");
957         }
958 
959         if (state != CONNECTED)
960         {
961             throw new IllegalStateException("Actions may only be sent when in state "
962                     + "CONNECTED but connection is in state " + state);
963         }
964 
965         responseEvents = new ResponseEventsImpl();
966         responseEventHandler = new ResponseEventHandler(responseEvents, action.getActionCompleteEventClass());
967 
968         internalActionId = createInternalActionId();
969 
970         // register response handler...
971         synchronized (this.responseListeners)
972         {
973             this.responseListeners.put(internalActionId, responseEventHandler);
974         }
975 
976         // ...and event handler.
977         synchronized (this.responseEventListeners)
978         {
979             this.responseEventListeners.put(internalActionId, responseEventHandler);
980         }
981 
982         synchronized (responseEvents)
983         {
984             writer.sendAction(action, internalActionId);
985             // only wait if response has not yet arrived.
986             if ((responseEvents.getResponse() == null || !responseEvents.isComplete()))
987             {
988                 try
989                 {
990                     responseEvents.wait(timeout);
991                 }
992                 catch (InterruptedException e)
993                 {
994                     logger.warn("Interrupted while waiting for response events.");
995                     Thread.currentThread().interrupt();
996                 }
997             }
998         }
999 
1000         // still no response or not all events received and timed out?
1001         if ((responseEvents.getResponse() == null || !responseEvents.isComplete()))
1002         {
1003             // clean up
1004             synchronized (this.responseEventListeners)
1005             {
1006                 this.responseEventListeners.remove(internalActionId);
1007             }
1008 
1009             throw new EventTimeoutException("Timeout waiting for response or response events to " + action.getAction()
1010                     + (action.getActionId() == null ? "" : " (actionId: " + action.getActionId() + ")"), responseEvents);
1011         }
1012 
1013         // remove the event handler
1014         // Note: The response handler has already been removed
1015         // when the response was received
1016         synchronized (this.responseEventListeners)
1017         {
1018             this.responseEventListeners.remove(internalActionId);
1019         }
1020 
1021         return responseEvents;
1022     }
1023 
1024     /**
1025      * Creates a new unique internal action id based on the hash code of this
1026      * connection and a sequence.
1027      *
1028      * @return a new internal action id
1029      * @see ManagerUtil#addInternalActionId(String,String)
1030      * @see ManagerUtil#getInternalActionId(String)
1031      * @see ManagerUtil#stripInternalActionId(String)
1032      */
1033     private String createInternalActionId()
1034     {
1035         final StringBuffer sb;
1036 
1037         sb = new StringBuffer();
1038         sb.append(this.hashCode());
1039         sb.append("_");
1040         sb.append(actionIdCounter.getAndIncrement());
1041 
1042         return sb.toString();
1043     }
1044 
1045     public void addEventListener(final ManagerEventListener listener)
1046     {
1047         synchronized (this.eventListeners)
1048         {
1049             // only add it if its not already there
1050             if (!this.eventListeners.contains(listener))
1051             {
1052                 this.eventListeners.add(listener);
1053             }
1054         }
1055     }
1056 
1057     public void removeEventListener(final ManagerEventListener listener)
1058     {
1059         synchronized (this.eventListeners)
1060         {
1061             if (this.eventListeners.contains(listener))
1062             {
1063                 this.eventListeners.remove(listener);
1064             }
1065         }
1066     }
1067 
1068     public String getProtocolIdentifier()
1069     {
1070         return protocolIdentifier.value;
1071     }
1072 
1073     public ManagerConnectionState getState()
1074     {
1075         return state;
1076     }
1077 
1078     /* Implementation of Dispatcher: callbacks for ManagerReader */
1079 
1080     /**
1081      * This method is called by the reader whenever a {@link ManagerResponse} is
1082      * received. The response is dispatched to the associated
1083      * {@link SendActionCallback}.
1084      *
1085      * @param response the response received by the reader
1086      * @see ManagerReader
1087      */
1088     public void dispatchResponse(ManagerResponse response)
1089     {
1090         final String actionId;
1091         String internalActionId;
1092         SendActionCallback listener;
1093 
1094         // shouldn't happen
1095         if (response == null)
1096         {
1097             logger.error("Unable to dispatch null response. This should never happen. Please file a bug.");
1098             return;
1099         }
1100 
1101         actionId = response.getActionId();
1102         internalActionId = null;
1103         listener = null;
1104 
1105         if (actionId != null)
1106         {
1107             internalActionId = ManagerUtil.getInternalActionId(actionId);
1108             response.setActionId(ManagerUtil.stripInternalActionId(actionId));
1109         }
1110 
1111         logger.debug("Dispatching response with internalActionId '" + internalActionId + "':\n" + response);
1112 
1113         if (internalActionId != null)
1114         {
1115             synchronized (this.responseListeners)
1116             {
1117                 listener = responseListeners.get(internalActionId);
1118                 if (listener != null)
1119                 {
1120                     this.responseListeners.remove(internalActionId);
1121                 }
1122                 else
1123                 {
1124                     // when using the async sendAction it's ok not to register a
1125                     // callback so if we don't find a response handler thats ok
1126                     logger.debug("No response listener registered for " + "internalActionId '" + internalActionId + "'");
1127                 }
1128             }
1129         }
1130         else
1131         {
1132             logger.error("Unable to retrieve internalActionId from response: " + "actionId '" + actionId + "':\n" + response);
1133         }
1134 
1135         if (listener != null)
1136         {
1137             try
1138             {
1139                 listener.onResponse(response);
1140             }
1141             catch (Exception e)
1142             {
1143                 logger.warn("Unexpected exception in response listener " + listener.getClass().getName(), e);
1144             }
1145         }
1146     }
1147 
1148     /**
1149      * This method is called by the reader whenever a ManagerEvent is received.
1150      * The event is dispatched to all registered ManagerEventHandlers.
1151      *
1152      * @param event the event received by the reader
1153      * @see #addEventListener(ManagerEventListener)
1154      * @see #removeEventListener(ManagerEventListener)
1155      * @see ManagerReader
1156      */
1157     public void dispatchEvent(ManagerEvent event)
1158     {
1159         // shouldn't happen
1160         if (event == null)
1161         {
1162             logger.error("Unable to dispatch null event. This should never happen. Please file a bug.");
1163             return;
1164         }
1165 
1166         logger.debug("Dispatching event:\n" + event.toString());
1167 
1168         // Some events need special treatment besides forwarding them to the
1169         // registered eventListeners (clients)
1170         // These events are handled here at first:
1171 
1172         // Dispatch ResponseEvents to the appropriate responseEventListener
1173         if (event instanceof ResponseEvent)
1174         {
1175             ResponseEvent responseEvent;
1176             String internalActionId;
1177 
1178             responseEvent = (ResponseEvent) event;
1179             internalActionId = responseEvent.getInternalActionId();
1180             if (internalActionId != null)
1181             {
1182                 synchronized (responseEventListeners)
1183                 {
1184                     ManagerEventListener listener;
1185 
1186                     listener = responseEventListeners.get(internalActionId);
1187                     if (listener != null)
1188                     {
1189                         try
1190                         {
1191                             listener.onManagerEvent(event);
1192                         }
1193                         catch (Exception e)
1194                         {
1195                             logger.warn("Unexpected exception in response event listener " + listener.getClass().getName(),
1196                                     e);
1197                         }
1198                     }
1199                 }
1200             }
1201             else
1202             {
1203                 // ResponseEvent without internalActionId:
1204                 // this happens if the same event class is used as response
1205                 // event
1206                 // and as an event that is not triggered by a Manager command
1207                 // Example: QueueMemberStatusEvent.
1208                 // logger.debug("ResponseEvent without "
1209                 // + "internalActionId:\n" + responseEvent);
1210             } // NOPMD
1211         }
1212         if (event instanceof DisconnectEvent)
1213         {
1214             // When we receive get disconnected while we are connected start
1215             // a new reconnect thread and set the state to RECONNECTING.
1216             if (state == CONNECTED)
1217             {
1218                 state = RECONNECTING;
1219                 // close socket if still open and remove reference to
1220                 // readerThread
1221                 // After sending the DisconnectThread that thread will die
1222                 // anyway.
1223                 cleanup();
1224                 Thread reconnectThread = new Thread(new Runnable()
1225                 {
1226                     public void run()
1227                     {
1228                         reconnect();
1229                     }
1230                 });
1231                 reconnectThread.setName("Asterisk-Java ManagerConnection-" + id + "-Reconnect-"
1232                         + reconnectThreadCounter.getAndIncrement());
1233                 reconnectThread.setDaemon(true);
1234                 reconnectThread.start();
1235                 // now the DisconnectEvent is dispatched to registered
1236                 // eventListeners
1237                 // (clients) and after that the ManagerReaderThread is gone.
1238                 // So effectively we replaced the reader thread by a
1239                 // ReconnectThread.
1240             }
1241             else
1242             {
1243                 // when we receive a DisconnectEvent while not connected we
1244                 // ignore it and do not send it to clients
1245                 return;
1246             }
1247         }
1248         if (event instanceof ProtocolIdentifierReceivedEvent)
1249         {
1250             ProtocolIdentifierReceivedEvent protocolIdentifierReceivedEvent;
1251             String protocolIdentifier;
1252 
1253             protocolIdentifierReceivedEvent = (ProtocolIdentifierReceivedEvent) event;
1254             protocolIdentifier = protocolIdentifierReceivedEvent.getProtocolIdentifier();
1255             setProtocolIdentifier(protocolIdentifier);
1256             // no need to send this event to clients
1257             return;
1258         }
1259 
1260         fireEvent(event);
1261     }
1262 
1263     /**
1264      * Notifies all {@link ManagerEventListener}s registered by users.
1265      *
1266      * @param event the event to propagate
1267      */
1268     private void fireEvent(ManagerEvent event)
1269     {
1270         synchronized (eventListeners)
1271         {
1272             for (ManagerEventListener listener : eventListeners)
1273             {
1274                 try
1275                 {
1276                     listener.onManagerEvent(event);
1277                 }
1278                 catch (RuntimeException e)
1279                 {
1280                     logger.warn("Unexpected exception in eventHandler " + listener.getClass().getName(), e);
1281                 }
1282             }
1283         }
1284     }
1285 
1286     /**
1287      * This method is called when a {@link ProtocolIdentifierReceivedEvent} is
1288      * received from the reader. Having received a correct protocol identifier
1289      * is the precodition for logging in.
1290      *
1291      * @param identifier the protocol version used by the Asterisk server.
1292      */
1293     private void setProtocolIdentifier(final String identifier)
1294     {
1295         logger.info("Connected via " + identifier);
1296 
1297         if (!"Asterisk Call Manager/1.0".equals(identifier)
1298                 && !"Asterisk Call Manager/1.1".equals(identifier) // Asterisk 1.6 
1299                 && !"Asterisk Call Manager/1.2".equals(identifier) // bri stuffed
1300                 && !"OpenPBX Call Manager/1.0".equals(identifier)
1301                 && !"CallWeaver Call Manager/1.0".equals(identifier)
1302                 && !(identifier != null && identifier.startsWith("Asterisk Call Manager Proxy/")))
1303         {
1304             logger.warn("Unsupported protocol version '" + identifier + "'. Use at your own risk.");
1305         }
1306 
1307         synchronized (protocolIdentifier)
1308         {
1309             protocolIdentifier.value = identifier;
1310             protocolIdentifier.notifyAll();
1311         }
1312     }
1313 
1314     /**
1315      * Reconnects to the asterisk server when the connection is lost.
1316      * <p/>
1317      * While keepAlive is <code>true</code> we will try to reconnect.
1318      * Reconnection attempts will be stopped when the {@link #logoff()} method
1319      * is called or when the login after a successful reconnect results in an
1320      * {@link AuthenticationFailedException} suggesting that the manager
1321      * credentials have changed and keepAliveAfterAuthenticationFailure is not
1322      * set.
1323      * <p/>
1324      * This method is called when a {@link DisconnectEvent} is received from the
1325      * reader.
1326      */
1327     private void reconnect()
1328     {
1329         int numTries;
1330 
1331         // try to reconnect
1332         numTries = 0;
1333         while (state == RECONNECTING)
1334         {
1335             try
1336             {
1337                 if (numTries < 10)
1338                 {
1339                     // try to reconnect quite fast for the firt 10 times
1340                     // this succeeds if the server has just been restarted
1341                     Thread.sleep(RECONNECTION_INTERVAL_1);
1342                 }
1343                 else
1344                 {
1345                     // slow down after 10 unsuccessful attempts asuming a
1346                     // shutdown of the server
1347                     Thread.sleep(RECONNECTION_INTERVAL_2);
1348                 }
1349             }
1350             catch (InterruptedException e)
1351             {
1352                 Thread.currentThread().interrupt();
1353             }
1354 
1355             try
1356             {
1357                 connect();
1358 
1359                 try
1360                 {
1361                     doLogin(defaultResponseTimeout, eventMask);
1362                     logger.info("Successfully reconnected.");
1363                     // everything is ok again, so we leave
1364                     // when successful doLogin set the state to CONNECTED so no
1365                     // need to adjust it
1366                     break;
1367                 }
1368                 catch (AuthenticationFailedException e1)
1369                 {
1370                     if (keepAliveAfterAuthenticationFailure)
1371                     {
1372                         logger.error("Unable to log in after reconnect: " + e1.getMessage());
1373                     }
1374                     else
1375                     {
1376                         logger.error("Unable to log in after reconnect: " + e1.getMessage() + ". Giving up.");
1377                         state = DISCONNECTED;
1378                     }
1379                 }
1380                 catch (TimeoutException e1)
1381                 {
1382                     // shouldn't happen - but happens!
1383                     logger.error("TimeoutException while trying to log in " + "after reconnect.");
1384                 }
1385             }
1386             catch (IOException e)
1387             {
1388                 // server seems to be still down, just continue to attempt
1389                 // reconnection
1390                 logger.warn("Exception while trying to reconnect: " + e.getMessage());
1391             }
1392             numTries++;
1393         }
1394     }
1395 
1396     private void cleanup()
1397     {
1398         disconnect();
1399         this.readerThread = null;
1400     }
1401 
1402     @Override
1403     public String toString()
1404     {
1405         StringBuffer sb;
1406 
1407         sb = new StringBuffer("ManagerConnection[");
1408         sb.append("id='").append(id).append("',");
1409         sb.append("hostname='").append(hostname).append("',");
1410         sb.append("port=").append(port).append(",");
1411         sb.append("systemHashcode=").append(System.identityHashCode(this)).append("]");
1412 
1413         return sb.toString();
1414     }
1415 
1416     /* Helper classes */
1417 
1418     /**
1419      * A simple data object to store a ManagerResult.
1420      */
1421     private static class ResponseHandlerResult implements Serializable
1422     {
1423         /**
1424          * Serializable version identifier.
1425          */
1426         private static final long serialVersionUID = 7831097958568769220L;
1427         private ManagerResponse response;
1428 
1429         public ResponseHandlerResult()
1430         {
1431         }
1432 
1433         public ManagerResponse getResponse()
1434         {
1435             return this.response;
1436         }
1437 
1438         public void setResponse(ManagerResponse response)
1439         {
1440             this.response = response;
1441         }
1442     }
1443 
1444     /**
1445      * A simple response handler that stores the received response in a
1446      * ResponseHandlerResult for further processing.
1447      */
1448     private static class DefaultSendActionCallback implements SendActionCallback, Serializable
1449     {
1450         /**
1451          * Serializable version identifier.
1452          */
1453         private static final long serialVersionUID = 2926598671855316803L;
1454         private final ResponseHandlerResult result;
1455 
1456         /**
1457          * Creates a new instance.
1458          *
1459          * @param result the result to store the response in
1460          */
1461         public DefaultSendActionCallback(ResponseHandlerResult result)
1462         {
1463             this.result = result;
1464         }
1465 
1466         public void onResponse(ManagerResponse response)
1467         {
1468             synchronized (result)
1469             {
1470                 result.setResponse(response);
1471                 result.notifyAll();
1472             }
1473         }
1474     }
1475 
1476     /**
1477      * A combinded event and response handler that adds received events and the
1478      * response to a ResponseEvents object.
1479      */
1480     private static class ResponseEventHandler implements ManagerEventListener, SendActionCallback
1481     {
1482         private final ResponseEventsImpl events;
1483         private final Class<?> actionCompleteEventClass;
1484 
1485         /**
1486          * Creates a new instance.
1487          *
1488          * @param events                   the ResponseEventsImpl to store the events in
1489          * @param actionCompleteEventClass the type of event that indicates that
1490          *                                 all events have been received
1491          */
1492         public ResponseEventHandler(ResponseEventsImpl events, Class<?> actionCompleteEventClass)
1493         {
1494             this.events = events;
1495             this.actionCompleteEventClass = actionCompleteEventClass;
1496         }
1497 
1498         public void onManagerEvent(ManagerEvent event)
1499         {
1500             synchronized (events)
1501             {
1502                 // should always be a ResponseEvent, anyway...
1503                 if (event instanceof ResponseEvent)
1504                 {
1505                     ResponseEvent responseEvent;
1506 
1507                     responseEvent = (ResponseEvent) event;
1508                     events.addEvent(responseEvent);
1509                 }
1510 
1511                 // finished?
1512                 if (actionCompleteEventClass.isAssignableFrom(event.getClass()))
1513                 {
1514                     events.setComplete(true);
1515                     // notify if action complete event and response have been
1516                     // received
1517                     if (events.getResponse() != null)
1518                     {
1519                         events.notifyAll();
1520                     }
1521                 }
1522             }
1523         }
1524 
1525         public void onResponse(ManagerResponse response)
1526         {
1527             synchronized (events)
1528             {
1529                 events.setRepsonse(response);
1530                 if (response instanceof ManagerError)
1531                 {
1532                     events.setComplete(true);
1533                 }
1534 
1535                 // finished?
1536                 // notify if action complete event and response have been
1537                 // received
1538                 if (events.isComplete())
1539                 {
1540                     events.notifyAll();
1541                 }
1542             }
1543         }
1544     }
1545 
1546     private static class ProtocolIdentifierWrapper
1547     {
1548         String value;
1549     }
1550 }