View Javadoc

1   /*
2    *  Copyright 2004-2006 Stefan Reuter
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   *
16   */
17  package org.asteriskjava.manager.internal;
18  
19  import static org.asteriskjava.manager.ManagerConnectionState.CONNECTED;
20  import static org.asteriskjava.manager.ManagerConnectionState.CONNECTING;
21  import static org.asteriskjava.manager.ManagerConnectionState.DISCONNECTED;
22  import static org.asteriskjava.manager.ManagerConnectionState.DISCONNECTING;
23  import static org.asteriskjava.manager.ManagerConnectionState.INITIAL;
24  import static org.asteriskjava.manager.ManagerConnectionState.RECONNECTING;
25  
26  import java.io.IOException;
27  import java.io.Serializable;
28  import java.net.Socket;
29  import java.security.MessageDigest;
30  import java.security.NoSuchAlgorithmException;
31  import java.util.ArrayList;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  import org.asteriskjava.AsteriskVersion;
38  import org.asteriskjava.manager.AuthenticationFailedException;
39  import org.asteriskjava.manager.EventTimeoutException;
40  import org.asteriskjava.manager.ManagerConnection;
41  import org.asteriskjava.manager.ManagerConnectionState;
42  import org.asteriskjava.manager.ManagerEventListener;
43  import org.asteriskjava.manager.SendActionCallback;
44  import org.asteriskjava.manager.ResponseEvents;
45  import org.asteriskjava.manager.TimeoutException;
46  import org.asteriskjava.manager.action.ChallengeAction;
47  import org.asteriskjava.manager.action.CommandAction;
48  import org.asteriskjava.manager.action.EventGeneratingAction;
49  import org.asteriskjava.manager.action.LoginAction;
50  import org.asteriskjava.manager.action.LogoffAction;
51  import org.asteriskjava.manager.action.ManagerAction;
52  import org.asteriskjava.manager.event.ConnectEvent;
53  import org.asteriskjava.manager.event.DisconnectEvent;
54  import org.asteriskjava.manager.event.ManagerEvent;
55  import org.asteriskjava.manager.event.ProtocolIdentifierReceivedEvent;
56  import org.asteriskjava.manager.event.ResponseEvent;
57  import org.asteriskjava.manager.response.ChallengeResponse;
58  import org.asteriskjava.manager.response.CommandResponse;
59  import org.asteriskjava.manager.response.ManagerError;
60  import org.asteriskjava.manager.response.ManagerResponse;
61  import org.asteriskjava.util.DateUtil;
62  import org.asteriskjava.util.Log;
63  import org.asteriskjava.util.LogFactory;
64  import org.asteriskjava.util.SocketConnectionFacade;
65  import org.asteriskjava.util.internal.SocketConnectionFacadeImpl;
66  import org.asteriskjava.manager.action.UserEventAction;
67  
68  /***
69   * Default implemention of the ManagerConnection interface.
70   * <p/>
71   * Generelly avoid direct use of this class. Use the ManagerConnectionFactory to
72   * obtain a ManagerConnection instead.
73   * <p/>
74   * When using a dependency injection framework like Spring direct usage for
75   * wiring up beans that require a ManagerConnection property is fine though.
76   * <p/>
77   * Note that the DefaultManagerConnection will create one new Thread for reading
78   * data from Asterisk on the first call to on of the login() methods.
79   *
80   * @author srt
81   * @version $Id: ManagerConnectionImpl.java 837 2007-07-09 06:44:23Z srt $
82   * @see org.asteriskjava.manager.ManagerConnectionFactory
83   */
84  public class ManagerConnectionImpl implements ManagerConnection, Dispatcher
85  {
86      private static final int RECONNECTION_INTERVAL_1 = 50;
87      private static final int RECONNECTION_INTERVAL_2 = 5000;
88      private static final String DEFAULT_HOSTNAME = "localhost";
89      private static final int DEFAULT_PORT = 5038;
90      private static final int RECONNECTION_VERSION_INTERVAL = 500;
91      private static final int MAX_VERSION_ATTEMPTS = 4;
92  
93      private static final AtomicLong idCounter = new AtomicLong(0);
94  
95      /***
96       * Instance logger.
97       */
98      private final Log logger = LogFactory.getLog(getClass());
99  
100     private final long id;
101 
102     /***
103      * Used to construct the internalActionId.
104      */
105     private AtomicLong actionIdCounter = new AtomicLong(0);
106 
107     /* Config attributes */
108     /***
109      * Hostname of the Asterisk server to connect to.
110      */
111     private String hostname = DEFAULT_HOSTNAME;
112 
113     /***
114      * TCP port to connect to.
115      */
116     private int port = DEFAULT_PORT;
117 
118     /***
119      * <code>true</code> to use SSL for the connection, <code>false</code>
120      * for a plain text connection.
121      */
122     private boolean ssl = false;
123 
124     /***
125      * The username to use for login as defined in Asterisk's
126      * <code>manager.conf</code>.
127      */
128     protected String username;
129 
130     /***
131      * The password to use for login as defined in Asterisk's
132      * <code>manager.conf</code>.
133      */
134     protected String password;
135 
136     /***
137      * The default timeout to wait for a ManagerResponse after sending a
138      * ManagerAction.
139      */
140     private long defaultResponseTimeout = 2000;
141 
142     /***
143      * The default timeout to wait for the last ResponseEvent after sending an
144      * EventGeneratingAction.
145      */
146     private long defaultEventTimeout = 5000;
147 
148     /***
149      * The timeout to use when connecting the the Asterisk server.
150      */
151     private int socketTimeout = 0;
152 
153     /***
154      * @see Socket#setSoTimeout(int)
155      */
156     private int socketReadTimeout = 0;
157 
158     /***
159      * Should we continue to reconnect after an authentication failure?
160      */
161     private boolean keepAliveAfterAuthenticationFailure = true;
162 
163     /***
164      * The socket to use for TCP/IP communication with Asterisk.
165      */
166     private SocketConnectionFacade socket;
167 
168     /***
169      * The thread that runs the reader.
170      */
171     private Thread readerThread;
172     private final AtomicLong readerThreadCounter = new AtomicLong(0);
173 
174     /***
175      * The thread that performs reconnect.
176      */
177     private Thread reconnectThread;
178     private final AtomicLong reconnectThreadCounter = new AtomicLong(0);
179 
180     /***
181      * The reader to use to receive events and responses from asterisk.
182      */
183     private ManagerReader reader;
184 
185     /***
186      * The writer to use to send actions to asterisk.
187      */
188     private ManagerWriter writer;
189 
190     /***
191      * The protocol identifer Asterisk sends on connect wrapped into an object
192      * to be used as mutex.
193      */
194     private final ProtocolIdentifierWrapper protocolIdentifier;
195 
196     /***
197      * The version of the Asterisk server we are connected to.
198      */
199     private AsteriskVersion version;
200 
201     /***
202      * Contains the registered handlers that process the ManagerResponses.
203      * <p/>
204      * Key is the internalActionId of the Action sent and value the
205      * corresponding ResponseListener.
206      */
207     private final Map<String, SendActionCallback> responseListeners;
208 
209     /***
210      * Contains the event handlers that handle ResponseEvents for the
211      * sendEventGeneratingAction methods.
212      * <p/>
213      * Key is the internalActionId of the Action sent and value the
214      * corresponding EventHandler.
215      */
216     private final Map<String, ManagerEventListener> responseEventListeners;
217 
218     /***
219      * Contains the event handlers that users registered.
220      */
221     private final List<ManagerEventListener> eventListeners;
222 
223     /***
224      * <code>true</code> while reconnecting.
225      */
226     private boolean reconnecting = false;
227 
228     protected ManagerConnectionState state = INITIAL;
229 
230     private String eventMask;
231 
232     /***
233      * Creates a new instance.
234      */
235     public ManagerConnectionImpl()
236     {
237         this.id = idCounter.getAndIncrement();
238         this.responseListeners = new HashMap<String, SendActionCallback>();
239         this.responseEventListeners = new HashMap<String, ManagerEventListener>();
240         this.eventListeners = new ArrayList<ManagerEventListener>();
241         this.protocolIdentifier = new ProtocolIdentifierWrapper();
242     }
243 
244     // the following two methods can be overriden when running test cases to
245     // return a mock object
246     protected ManagerReader createReader(Dispatcher dispatcher, Object source)
247     {
248         return new ManagerReaderImpl(dispatcher, source);
249     }
250 
251     protected ManagerWriter createWriter()
252     {
253         return new ManagerWriterImpl();
254     }
255 
256     /***
257      * Sets the hostname of the asterisk server to connect to.
258      * <p/>
259      * Default is <code>localhost</code>.
260      *
261      * @param hostname the hostname to connect to
262      */
263     public void setHostname(String hostname)
264     {
265         this.hostname = hostname;
266     }
267 
268     /***
269      * Sets the port to use to connect to the asterisk server. This is the port
270      * specified in asterisk's <code>manager.conf</code> file.
271      * <p/>
272      * Default is 5038.
273      *
274      * @param port the port to connect to
275      */
276     public void setPort(int port)
277     {
278         if (port <= 0)
279         {
280             this.port = DEFAULT_PORT;
281         }
282         else
283         {
284             this.port = port;
285         }
286     }
287 
288     /***
289      * Sets whether to use SSL.
290      * <p/>
291      * Default is false.
292      *
293      * @param ssl <code>true</code> to use SSL for the connection,
294      *            <code>false</code> for a plain text connection.
295      * @since 0.3
296      */
297     public void setSsl(boolean ssl)
298     {
299         this.ssl = ssl;
300     }
301 
302     /***
303      * Sets the username to use to connect to the asterisk server. This is the
304      * username specified in asterisk's <code>manager.conf</code> file.
305      *
306      * @param username the username to use for login
307      */
308     public void setUsername(String username)
309     {
310         this.username = username;
311     }
312 
313     /***
314      * Sets the password to use to connect to the asterisk server. This is the
315      * password specified in Asterisk's <code>manager.conf</code> file.
316      *
317      * @param password the password to use for login
318      */
319     public void setPassword(String password)
320     {
321         this.password = password;
322     }
323 
324     /***
325      * Sets the time in milliseconds the synchronous method
326      * {@link #sendAction(ManagerAction)} will wait for a response before
327      * throwing a TimeoutException.
328      * <p/>
329      * Default is 2000.
330      *
331      * @param defaultResponseTimeout default response timeout in milliseconds
332      * @since 0.2
333      */
334     public void setDefaultResponseTimeout(long defaultResponseTimeout)
335     {
336         this.defaultResponseTimeout = defaultResponseTimeout;
337     }
338 
339     /***
340      * Sets the time in milliseconds the synchronous method
341      * {@link #sendEventGeneratingAction(EventGeneratingAction)} will wait for a
342      * response and the last response event before throwing a TimeoutException.
343      * <p/>
344      * Default is 5000.
345      *
346      * @param defaultEventTimeout default event timeout in milliseconds
347      * @since 0.2
348      */
349     public void setDefaultEventTimeout(long defaultEventTimeout)
350     {
351         this.defaultEventTimeout = defaultEventTimeout;
352     }
353 
354     /***
355      * Set to <code>true</code> to try reconnecting to ther asterisk serve
356      * even if the reconnection attempt threw an AuthenticationFailedException.
357      * <p/>
358      * Default is <code>true</code>.
359      *
360      * @param keepAliveAfterAuthenticationFailure
361      *         <code>true</code> to try reconnecting to ther asterisk serve
362      *         even if the reconnection attempt threw an AuthenticationFailedException,
363      *         <code>false</code> otherwise.
364      */
365     public void setKeepAliveAfterAuthenticationFailure(boolean keepAliveAfterAuthenticationFailure)
366     {
367         this.keepAliveAfterAuthenticationFailure = keepAliveAfterAuthenticationFailure;
368     }
369 
370     /* Implementation of ManagerConnection interface */
371 
372     public String getUsername()
373     {
374         return username;
375     }
376 
377     public String getPassword()
378     {
379         return password;
380     }
381 
382     public String getHostname()
383     {
384         return hostname;
385     }
386 
387     public int getPort()
388     {
389         return port;
390     }
391 
392     public boolean isSsl()
393     {
394         return ssl;
395     }
396 
397     public void registerUserEventClass(Class userEventClass)
398     {
399         if (reader == null)
400         {
401             reader = createReader(this, this);
402         }
403 
404         reader.registerEventClass(userEventClass);
405     }
406 
407     public void setSocketTimeout(int socketTimeout)
408     {
409         this.socketTimeout = socketTimeout;
410     }
411 
412     public void setSocketReadTimeout(int socketReadTimeout)
413     {
414         this.socketReadTimeout = socketReadTimeout;
415     }
416 
417     public synchronized void login() throws IOException, AuthenticationFailedException, TimeoutException
418     {
419         login(null);
420     }
421 
422     public synchronized void login(String eventMask) throws IOException, AuthenticationFailedException, TimeoutException
423     {
424         if (state != INITIAL && state != DISCONNECTED)
425         {
426             throw new IllegalStateException("Login may only be perfomed when in state "
427                     + "INITIAL or DISCONNECTED, but connection is in state " + state);
428         }
429 
430         state = CONNECTING;
431         this.eventMask = eventMask;
432         try
433         {
434             doLogin(defaultResponseTimeout, eventMask);
435         }
436         finally
437         {
438             if (state != CONNECTED)
439             {
440                 state = DISCONNECTED;
441             }
442         }
443     }
444 
445     /***
446      * Does the real login, following the steps outlined below.
447      * <p/>
448      * <ol>
449      * <li>Connects to the asterisk server by calling {@link #connect()} if not
450      * already connected
451      * <li>Waits until the protocol identifier is received but not longer than
452      * timeout ms.
453      * <li>Sends a {@link ChallengeAction} requesting a challenge for authType
454      * MD5.
455      * <li>When the {@link ChallengeResponse} is received a {@link LoginAction}
456      * is sent using the calculated key (MD5 hash of the password appended to
457      * the received challenge).
458      * </ol>
459      *
460      * @param timeout   the maximum time to wait for the protocol identifier (in
461      *                  ms)
462      * @param eventMask the event mask. Set to "on" if all events should be
463      *                  send, "off" if not events should be sent or a combination of
464      *                  "system", "call" and "log" (separated by ',') to specify what
465      *                  kind of events should be sent.
466      * @throws IOException                   if there is an i/o problem.
467      * @throws AuthenticationFailedException if username or password are
468      *                                       incorrect and the login action returns an error or if the MD5
469      *                                       hash cannot be computed. The connection is closed in this
470      *                                       case.
471      * @throws TimeoutException              if a timeout occurs while waiting for the
472      *                                       protocol identifier. The connection is closed in this case.
473      */
474     protected synchronized void doLogin(long timeout, String eventMask) throws IOException, AuthenticationFailedException,
475             TimeoutException
476     {
477         ChallengeAction challengeAction;
478         ManagerResponse challengeResponse;
479         String challenge;
480         String key;
481         LoginAction loginAction;
482         ManagerResponse loginResponse;
483 
484         if (socket == null)
485         {
486             connect();
487         }
488 
489         synchronized (protocolIdentifier)
490         {
491             if (protocolIdentifier.value == null)
492             {
493                 try
494                 {
495                     protocolIdentifier.wait(timeout);
496                 }
497                 catch (InterruptedException e) // NOPMD
498                 {
499                     // swallow
500                 }
501             }
502 
503             if (protocolIdentifier.value == null)
504             {
505                 disconnect();
506                 if (reader != null && reader.getTerminationException() != null)
507                 {
508                     throw reader.getTerminationException();
509                 }
510                 else
511                 {
512                     throw new TimeoutException("Timeout waiting for protocol identifier");
513                 }
514             }
515         }
516 
517         challengeAction = new ChallengeAction("MD5");
518         try
519         {
520             challengeResponse = sendAction(challengeAction);
521         }
522         catch (Exception e)
523         {
524             disconnect();
525             throw new AuthenticationFailedException("Unable to send challenge action", e);
526         }
527 
528         if (challengeResponse instanceof ChallengeResponse)
529         {
530             challenge = ((ChallengeResponse) challengeResponse).getChallenge();
531         }
532         else
533         {
534             disconnect();
535             throw new AuthenticationFailedException("Unable to get challenge from Asterisk. ChallengeAction returned: "
536                     + challengeResponse.getMessage());
537         }
538 
539         try
540         {
541             MessageDigest md;
542 
543             md = MessageDigest.getInstance("MD5");
544             if (challenge != null)
545             {
546                 md.update(challenge.getBytes());
547             }
548             if (password != null)
549             {
550                 md.update(password.getBytes());
551             }
552             key = ManagerUtil.toHexString(md.digest());
553         }
554         catch (NoSuchAlgorithmException ex)
555         {
556             disconnect();
557             throw new AuthenticationFailedException("Unable to create login key using MD5 Message Digest", ex);
558         }
559 
560         loginAction = new LoginAction(username, "MD5", key, eventMask);
561         try
562         {
563             loginResponse = sendAction(loginAction);
564         }
565         catch (Exception e)
566         {
567             disconnect();
568             throw new AuthenticationFailedException("Unable to send login action", e);
569         }
570 
571         if (loginResponse instanceof ManagerError)
572         {
573             disconnect();
574             throw new AuthenticationFailedException(loginResponse.getMessage());
575         }
576 
577         state = CONNECTED;
578 
579         logger.info("Successfully logged in");
580 
581         version = determineVersion();
582         writer.setTargetVersion(version);
583 
584         logger.info("Determined Asterisk version: " + version);
585 
586         // generate pseudo event indicating a successful login
587         ConnectEvent connectEvent = new ConnectEvent(this);
588         connectEvent.setProtocolIdentifier(getProtocolIdentifier());
589         connectEvent.setDateReceived(DateUtil.getDate());
590         // TODO could this cause a deadlock?
591         fireEvent(connectEvent);
592     }
593 
594     protected AsteriskVersion determineVersion() throws IOException, TimeoutException
595     {
596         int attempts = 0;
597         while (attempts < MAX_VERSION_ATTEMPTS)
598         {
599             ManagerResponse showVersionFilesResponse;
600 
601             // increase timeout as output is quite large
602             showVersionFilesResponse = sendAction(new CommandAction("show version files pbx.c"), defaultResponseTimeout * 2);
603             if (showVersionFilesResponse instanceof CommandResponse)
604             {
605                 List<String> showVersionFilesResult;
606 
607                 showVersionFilesResult = ((CommandResponse) showVersionFilesResponse).getResult();
608                 if (showVersionFilesResult != null && showVersionFilesResult.size() > 0)
609                 {
610                     String line1;
611 
612                     line1 = showVersionFilesResult.get(0);
613                     if (line1 != null && line1.startsWith("File"))
614                     {
615                         final String rawVersion;
616 
617                         rawVersion = getRawVersion();
618                         if (rawVersion != null && rawVersion.startsWith("Asterisk 1.4"))
619                         {
620                             return AsteriskVersion.ASTERISK_1_4;
621                         }
622 
623                         return AsteriskVersion.ASTERISK_1_2;
624                     }
625                     else if (line1 != null && line1.contains("No such command"))
626                     {
627                         try
628                         {
629                             attempts++;
630                             Thread.sleep(RECONNECTION_VERSION_INTERVAL);
631                         }
632                         catch (Exception ex)
633                         {
634                             // ingnore
635                         }
636                     }
637                     else
638                     {
639                         // if it isn't the "no such command", break and return the lowest version immediately
640                         break;
641                     }
642                 }
643             }
644         }
645 
646         return AsteriskVersion.ASTERISK_1_0;
647     }
648 
649     protected String getRawVersion()
650     {
651         final ManagerResponse showVersionResponse;
652 
653         try
654         {
655             showVersionResponse = sendAction(new CommandAction("show version"), defaultResponseTimeout * 2);
656         }
657         catch (Exception e)
658         {
659             return null;
660         }
661 
662         if (showVersionResponse instanceof CommandResponse)
663         {
664             final List<String> showVersionResult;
665 
666             showVersionResult = ((CommandResponse) showVersionResponse).getResult();
667             if (showVersionResult != null && showVersionResult.size() > 0)
668             {
669                 return showVersionResult.get(0);
670             }
671         }
672 
673         return null;
674     }
675 
676     protected synchronized void connect() throws IOException
677     {
678         logger.info("Connecting to " + hostname + ":" + port);
679 
680         if (reader == null)
681         {
682             logger.debug("Creating reader for " + hostname + ":" + port);
683             reader = createReader(this, this);
684         }
685 
686         if (writer == null)
687         {
688             logger.debug("Creating writer");
689             writer = createWriter();
690         }
691 
692         logger.debug("Creating socket");
693         socket = createSocket();
694 
695         logger.debug("Passing socket to reader");
696         reader.setSocket(socket);
697 
698         if (readerThread == null || !readerThread.isAlive() || reader.isDead())
699         {
700             logger.debug("Creating and starting reader thread");
701             readerThread = new Thread(reader);
702             readerThread.setName("Asterisk-Java ManagerConnection-" + id + "-Reader-"
703                     + readerThreadCounter.getAndIncrement());
704             readerThread.setDaemon(true);
705             readerThread.start();
706         }
707 
708         logger.debug("Passing socket to writer");
709         writer.setSocket(socket);
710     }
711 
712     protected SocketConnectionFacade createSocket() throws IOException
713     {
714         return new SocketConnectionFacadeImpl(hostname, port, ssl, socketTimeout, socketReadTimeout);
715     }
716 
717     public synchronized void logoff() throws IllegalStateException
718     {
719         if (state != CONNECTED && state != RECONNECTING)
720         {
721             throw new IllegalStateException("Logoff may only be perfomed when in state "
722                     + "CONNECTED or RECONNECTING, but connection is in state " + state);
723         }
724 
725         state = DISCONNECTING;
726 
727         if (socket != null && !reconnecting)
728         {
729             try
730             {
731                 sendAction(new LogoffAction());
732             }
733             catch (Exception e)
734             {
735                 logger.warn("Unable to send LogOff action", e);
736             }
737         }
738         cleanup();
739         state = DISCONNECTED;
740     }
741 
742     /***
743      * Closes the socket connection.
744      */
745     protected synchronized void disconnect()
746     {
747         if (socket != null)
748         {
749             logger.info("Closing socket.");
750             try
751             {
752                 socket.close();
753             }
754             catch (IOException ex)
755             {
756                 logger.warn("Unable to close socket: " + ex.getMessage());
757             }
758             socket = null;
759         }
760         protocolIdentifier.value = null;
761     }
762 
763     public ManagerResponse sendAction(ManagerAction action) throws IOException, TimeoutException, IllegalArgumentException,
764             IllegalStateException
765     {
766         return sendAction(action, defaultResponseTimeout);
767     }
768 
769     /*
770      * Implements synchronous sending of "simple" actions.
771      */
772     public ManagerResponse sendAction(ManagerAction action, long timeout) throws IOException, TimeoutException,
773             IllegalArgumentException, IllegalStateException
774     {
775         ResponseHandlerResult result;
776         SendActionCallback callbackHandler;
777 
778         result = new ResponseHandlerResult();
779         callbackHandler = new DefaultSendActionCallback(result);
780 
781         synchronized (result)
782         {
783             sendAction(action, callbackHandler);
784 
785             // definitely return null for the response of user events
786             if (action instanceof UserEventAction)
787             {
788                 return null;
789             }
790 
791             // only wait if we did not yet receive the response.
792             // Responses may be returned really fast.
793             if (result.getResponse() == null)
794             {
795                 try
796                 {
797                     result.wait(timeout);
798                 }
799                 catch (InterruptedException ex)
800                 {
801                     logger.warn("Interrupted while waiting for result");
802                 }
803             }
804         }
805 
806         // still no response?
807         if (result.getResponse() == null)
808         {
809             throw new TimeoutException("Timeout waiting for response to " + action.getAction()
810                     + (action.getActionId() == null ? "" : " (actionId: " + action.getActionId() + ")"));
811         }
812 
813         return result.getResponse();
814     }
815 
816     public void sendAction(ManagerAction action, SendActionCallback callback) throws IOException, IllegalArgumentException,
817             IllegalStateException
818     {
819         final String internalActionId;
820 
821         if (action == null)
822         {
823             throw new IllegalArgumentException("Unable to send action: action is null.");
824         }
825 
826         // In general sending actions is only allowed while connected, though
827         // there are a few exceptions, these are handled here:
828         if ((state == CONNECTING || state == RECONNECTING)
829                 && (action instanceof ChallengeAction || action instanceof LoginAction))
830         {
831             // when (re-)connecting challenge and login actions are ok.
832         } // NOPMD
833         else if (state == DISCONNECTING && action instanceof LogoffAction)
834         {
835             // when disconnecting logoff action is ok.
836         } // NOPMD
837         else if (state != CONNECTED)
838         {
839             throw new IllegalStateException("Actions may only be sent when in state "
840                     + "CONNECTED, but connection is in state " + state);
841         }
842 
843         if (socket == null)
844         {
845             throw new IllegalStateException("Unable to send " + action.getAction() + " action: socket not connected.");
846         }
847 
848         internalActionId = createInternalActionId();
849 
850         // if the callbackHandler is null the user is obviously not interested
851         // in the response, thats fine.
852         if (callback != null)
853         {
854             synchronized (this.responseListeners)
855             {
856                 this.responseListeners.put(internalActionId, callback);
857             }
858         }
859 
860         writer.sendAction(action, internalActionId);
861     }
862 
863     public ResponseEvents sendEventGeneratingAction(EventGeneratingAction action) throws IOException, EventTimeoutException,
864             IllegalArgumentException, IllegalStateException
865     {
866         return sendEventGeneratingAction(action, defaultEventTimeout);
867     }
868 
869     /*
870      * Implements synchronous sending of event generating actions.
871      */
872     public ResponseEvents sendEventGeneratingAction(EventGeneratingAction action, long timeout) throws IOException,
873             EventTimeoutException, IllegalArgumentException, IllegalStateException
874     {
875         final ResponseEventsImpl responseEvents;
876         final ResponseEventHandler responseEventHandler;
877         final String internalActionId;
878 
879         if (action == null)
880         {
881             throw new IllegalArgumentException("Unable to send action: action is null.");
882         }
883         else if (action.getActionCompleteEventClass() == null)
884         {
885             throw new IllegalArgumentException("Unable to send action: actionCompleteEventClass for "
886                     + action.getClass().getName() + " is null.");
887         }
888         else if (!ResponseEvent.class.isAssignableFrom(action.getActionCompleteEventClass()))
889         {
890             throw new IllegalArgumentException("Unable to send action: actionCompleteEventClass ("
891                     + action.getActionCompleteEventClass().getName() + ") for " + action.getClass().getName()
892                     + " is not a ResponseEvent.");
893         }
894 
895         if (state != CONNECTED)
896         {
897             throw new IllegalStateException("Actions may only be sent when in state "
898                     + "CONNECTED, but connection is in state " + state);
899         }
900 
901         responseEvents = new ResponseEventsImpl();
902         responseEventHandler = new ResponseEventHandler(responseEvents, action.getActionCompleteEventClass());
903 
904         internalActionId = createInternalActionId();
905 
906         // register response handler...
907         synchronized (this.responseListeners)
908         {
909             this.responseListeners.put(internalActionId, responseEventHandler);
910         }
911 
912         // ...and event handler.
913         synchronized (this.responseEventListeners)
914         {
915             this.responseEventListeners.put(internalActionId, responseEventHandler);
916         }
917 
918         synchronized (responseEvents)
919         {
920             writer.sendAction(action, internalActionId);
921             // only wait if response has not yet arrived.
922             if ((responseEvents.getResponse() == null || !responseEvents.isComplete()))
923             {
924                 try
925                 {
926                     responseEvents.wait(timeout);
927                 }
928                 catch (InterruptedException e)
929                 {
930                     logger.warn("Interrupted while waiting for response events.");
931                 }
932             }
933         }
934 
935         // still no response or not all events received and timed out?
936         if ((responseEvents.getResponse() == null || !responseEvents.isComplete()))
937         {
938             // clean up
939             synchronized (this.responseEventListeners)
940             {
941                 this.responseEventListeners.remove(internalActionId);
942             }
943 
944             throw new EventTimeoutException("Timeout waiting for response or response events to " + action.getAction()
945                     + (action.getActionId() == null ? "" : " (actionId: " + action.getActionId() + ")"), responseEvents);
946         }
947 
948         // remove the event handler
949         // Note: The response handler has already been removed
950         // when the response was received
951         synchronized (this.responseEventListeners)
952         {
953             this.responseEventListeners.remove(internalActionId);
954         }
955 
956         return responseEvents;
957     }
958 
959     /***
960      * Creates a new unique internal action id based on the hash code of this
961      * connection and a sequence.
962      *
963      * @return a new internal action id
964      * @see ManagerUtil#addInternalActionId(String,String)
965      * @see ManagerUtil#getInternalActionId(String)
966      * @see ManagerUtil#stripInternalActionId(String)
967      */
968     private String createInternalActionId()
969     {
970         final StringBuffer sb;
971 
972         sb = new StringBuffer();
973         sb.append(this.hashCode());
974         sb.append("_");
975         sb.append(actionIdCounter.getAndIncrement());
976 
977         return sb.toString();
978     }
979 
980     public void addEventListener(final ManagerEventListener listener)
981     {
982         synchronized (this.eventListeners)
983         {
984             // only add it if its not already there
985             if (!this.eventListeners.contains(listener))
986             {
987                 this.eventListeners.add(listener);
988             }
989         }
990     }
991 
992     public void removeEventListener(final ManagerEventListener listener)
993     {
994         synchronized (this.eventListeners)
995         {
996             if (this.eventListeners.contains(listener))
997             {
998                 this.eventListeners.remove(listener);
999             }
1000         }
1001     }
1002 
1003     public String getProtocolIdentifier()
1004     {
1005         return protocolIdentifier.value;
1006     }
1007 
1008     public ManagerConnectionState getState()
1009     {
1010         return state;
1011     }
1012 
1013     /* Implementation of Dispatcher: callbacks for ManagerReader */
1014 
1015     /***
1016      * This method is called by the reader whenever a {@link ManagerResponse} is
1017      * received. The response is dispatched to the associated
1018      * {@link SendActionCallback}.
1019      *
1020      * @param response the response received by the reader
1021      * @see ManagerReader
1022      */
1023     public void dispatchResponse(ManagerResponse response)
1024     {
1025         final String actionId;
1026         String internalActionId;
1027         SendActionCallback listener;
1028 
1029         // shouldn't happen
1030         if (response == null)
1031         {
1032             logger.error("Unable to dispatch null response. This should never happen. Please file a bug.");
1033             return;
1034         }
1035 
1036         actionId = response.getActionId();
1037         internalActionId = null;
1038         listener = null;
1039 
1040         if (actionId != null)
1041         {
1042             internalActionId = ManagerUtil.getInternalActionId(actionId);
1043             response.setActionId(ManagerUtil.stripInternalActionId(actionId));
1044         }
1045 
1046         logger.debug("Dispatching response with internalActionId '" + internalActionId + "':\n" + response);
1047 
1048         if (internalActionId != null)
1049         {
1050             synchronized (this.responseListeners)
1051             {
1052                 listener = responseListeners.get(internalActionId);
1053                 if (listener != null)
1054                 {
1055                     this.responseListeners.remove(internalActionId);
1056                 }
1057                 else
1058                 {
1059                     // when using the async sendAction it's ok not to register a
1060                     // callback so if we don't find a response handler thats ok
1061                     logger.debug("No response listener registered for " + "internalActionId '" + internalActionId + "'");
1062                 }
1063             }
1064         }
1065         else
1066         {
1067             logger.error("Unable to retrieve internalActionId from response: " + "actionId '" + actionId + "':\n" + response);
1068         }
1069 
1070         if (listener != null)
1071         {
1072             try
1073             {
1074                 listener.onResponse(response);
1075             }
1076             catch (Exception e)
1077             {
1078                 logger.warn("Unexpected exception in response listener " + listener.getClass().getName(), e);
1079             }
1080         }
1081     }
1082 
1083     /***
1084      * This method is called by the reader whenever a ManagerEvent is received.
1085      * The event is dispatched to all registered ManagerEventHandlers.
1086      *
1087      * @param event the event received by the reader
1088      * @see #addEventListener(ManagerEventListener)
1089      * @see #removeEventListener(ManagerEventListener)
1090      * @see ManagerReader
1091      */
1092     public void dispatchEvent(ManagerEvent event)
1093     {
1094         // shouldn't happen
1095         if (event == null)
1096         {
1097             logger.error("Unable to dispatch null event. This should never happen. Please file a bug.");
1098             return;
1099         }
1100 
1101         logger.debug("Dispatching event:\n" + event.toString());
1102 
1103         // Some events need special treatment besides forwarding them to the
1104         // registered eventListeners (clients)
1105         // These events are handled here at first:
1106 
1107         // Dispatch ResponseEvents to the appropriate responseEventListener
1108         if (event instanceof ResponseEvent)
1109         {
1110             ResponseEvent responseEvent;
1111             String internalActionId;
1112 
1113             responseEvent = (ResponseEvent) event;
1114             internalActionId = responseEvent.getInternalActionId();
1115             if (internalActionId != null)
1116             {
1117                 synchronized (responseEventListeners)
1118                 {
1119                     ManagerEventListener listener;
1120 
1121                     listener = responseEventListeners.get(internalActionId);
1122                     if (listener != null)
1123                     {
1124                         try
1125                         {
1126                             listener.onManagerEvent(event);
1127                         }
1128                         catch (Exception e)
1129                         {
1130                             logger.warn("Unexpected exception in response event listener " + listener.getClass().getName(),
1131                                     e);
1132                         }
1133                     }
1134                 }
1135             }
1136             else
1137             {
1138                 // ResponseEvent without internalActionId:
1139                 // this happens if the same event class is used as response
1140                 // event
1141                 // and as an event that is not triggered by a Manager command
1142                 // Example: QueueMemberStatusEvent.
1143                 // logger.debug("ResponseEvent without "
1144                 // + "internalActionId:\n" + responseEvent);
1145             } // NOPMD
1146         }
1147         if (event instanceof DisconnectEvent)
1148         {
1149             // When we receive get disconnected while we are connected start
1150             // a new reconnect thread and set the state to RECONNECTING.
1151             if (state == CONNECTED)
1152             {
1153                 state = RECONNECTING;
1154                 // close socket if still open and remove reference to
1155                 // readerThread
1156                 // After sending the DisconnectThread that thread will die
1157                 // anyway.
1158                 cleanup();
1159                 reconnectThread = new Thread(new Runnable()
1160                 {
1161                     public void run()
1162                     {
1163                         reconnect();
1164                     }
1165                 });
1166                 reconnectThread.setName("Asterisk-Java ManagerConnection-" + id + "-Reconnect-"
1167                         + reconnectThreadCounter.getAndIncrement());
1168                 reconnectThread.setDaemon(true);
1169                 reconnectThread.start();
1170                 // now the DisconnectEvent is dispatched to registered
1171                 // eventListeners
1172                 // (clients) and after that the ManagerReaderThread is gone.
1173                 // So effectively we replaced the reader thread by a
1174                 // ReconnectThread.
1175             }
1176             else
1177             {
1178                 // when we receive a DisconnectEvent while not connected we
1179                 // ignore it and do not send it to clients
1180                 return;
1181             }
1182         }
1183         if (event instanceof ProtocolIdentifierReceivedEvent)
1184         {
1185             ProtocolIdentifierReceivedEvent protocolIdentifierReceivedEvent;
1186             String protocolIdentifier;
1187 
1188             protocolIdentifierReceivedEvent = (ProtocolIdentifierReceivedEvent) event;
1189             protocolIdentifier = protocolIdentifierReceivedEvent.getProtocolIdentifier();
1190             setProtocolIdentifier(protocolIdentifier);
1191             // no need to send this event to clients
1192             return;
1193         }
1194 
1195         fireEvent(event);
1196     }
1197 
1198     /***
1199      * Notifies all {@link ManagerEventListener}s registered by users.
1200      *
1201      * @param event the event to propagate
1202      */
1203     private void fireEvent(ManagerEvent event)
1204     {
1205         synchronized (eventListeners)
1206         {
1207             for (ManagerEventListener listener : eventListeners)
1208             {
1209                 try
1210                 {
1211                     listener.onManagerEvent(event);
1212                 }
1213                 catch (RuntimeException e)
1214                 {
1215                     logger.warn("Unexpected exception in eventHandler " + listener.getClass().getName(), e);
1216                 }
1217             }
1218         }
1219     }
1220 
1221     /***
1222      * This method is called when a {@link ProtocolIdentifierReceivedEvent} is
1223      * received from the reader. Having received a correct protocol identifier
1224      * is the precodition for logging in.
1225      *
1226      * @param identifier the protocol version used by the Asterisk server.
1227      */
1228     private void setProtocolIdentifier(final String identifier)
1229     {
1230         logger.info("Connected via " + identifier);
1231 
1232         if (!"Asterisk Call Manager/1.0".equals(identifier)
1233                 && !"Asterisk Call Manager/1.2".equals(identifier) // bri stuffed 
1234                 && !"OpenPBX Call Manager/1.0".equals(identifier)
1235                 && !"CallWeaver Call Manager/1.0".equals(identifier))
1236         {
1237             logger.warn("Unsupported protocol version '" + identifier + "'. Use at your own risk.");
1238         }
1239 
1240         synchronized (protocolIdentifier)
1241         {
1242             protocolIdentifier.value = identifier;
1243             protocolIdentifier.notify();
1244         }
1245     }
1246 
1247     /***
1248      * Reconnects to the asterisk server when the connection is lost.
1249      * <p/>
1250      * While keepAlive is <code>true</code> we will try to reconnect.
1251      * Reconnection attempts will be stopped when the {@link #logoff()} method
1252      * is called or when the login after a successful reconnect results in an
1253      * {@link AuthenticationFailedException} suggesting that the manager
1254      * credentials have changed and keepAliveAfterAuthenticationFailure is not
1255      * set.
1256      * <p/>
1257      * This method is called when a {@link DisconnectEvent} is received from the
1258      * reader.
1259      */
1260     private void reconnect()
1261     {
1262         int numTries;
1263 
1264         // try to reconnect
1265         numTries = 0;
1266         while (state == RECONNECTING)
1267         {
1268             try
1269             {
1270                 if (numTries < 10)
1271                 {
1272                     // try to reconnect quite fast for the firt 10 times
1273                     // this succeeds if the server has just been restarted
1274                     Thread.sleep(RECONNECTION_INTERVAL_1);
1275                 }
1276                 else
1277                 {
1278                     // slow down after 10 unsuccessful attempts asuming a
1279                     // shutdown of the server
1280                     Thread.sleep(RECONNECTION_INTERVAL_2);
1281                 }
1282             }
1283             catch (InterruptedException e1)
1284             {
1285                 // ignore
1286             }
1287 
1288             try
1289             {
1290                 connect();
1291 
1292                 try
1293                 {
1294                     doLogin(defaultResponseTimeout, eventMask);
1295                     logger.info("Successfully reconnected.");
1296                     // everything is ok again, so we leave
1297                     // when successful doLogin set the state to CONNECTED so no
1298                     // need to adjust it
1299                     break;
1300                 }
1301                 catch (AuthenticationFailedException e1)
1302                 {
1303                     if (keepAliveAfterAuthenticationFailure)
1304                     {
1305                         logger.error("Unable to log in after reconnect: " + e1.getMessage());
1306                     }
1307                     else
1308                     {
1309                         logger.error("Unable to log in after reconnect: " + e1.getMessage() + ". Giving up.");
1310                         state = DISCONNECTED;
1311                     }
1312                 }
1313                 catch (TimeoutException e1)
1314                 {
1315                     // shouldn't happen - but happens!
1316                     logger.error("TimeoutException while trying to log in " + "after reconnect.");
1317                 }
1318             }
1319             catch (IOException e)
1320             {
1321                 // server seems to be still down, just continue to attempt
1322                 // reconnection
1323                 logger.warn("Exception while trying to reconnect: " + e.getMessage());
1324             }
1325             numTries++;
1326         }
1327     }
1328 
1329     private void cleanup()
1330     {
1331         disconnect();
1332         this.readerThread = null;
1333     }
1334 
1335     @Override
1336     public String toString()
1337     {
1338         StringBuffer sb;
1339 
1340         sb = new StringBuffer("ManagerConnection[");
1341         sb.append("id='").append(id).append("',");
1342         sb.append("hostname='").append(hostname).append("',");
1343         sb.append("port=").append(port).append(",");
1344         sb.append("systemHashcode=").append(System.identityHashCode(this)).append("]");
1345 
1346         return sb.toString();
1347     }
1348 
1349     /* Helper classes */
1350 
1351     /***
1352      * A simple data object to store a ManagerResult.
1353      */
1354     private class ResponseHandlerResult implements Serializable
1355     {
1356         /***
1357          * Serializable version identifier.
1358          */
1359         private static final long serialVersionUID = 7831097958568769220L;
1360         private ManagerResponse response;
1361 
1362         public ResponseHandlerResult()
1363         {
1364         }
1365 
1366         public ManagerResponse getResponse()
1367         {
1368             return this.response;
1369         }
1370 
1371         public void setResponse(ManagerResponse response)
1372         {
1373             this.response = response;
1374         }
1375     }
1376 
1377     /***
1378      * A simple response handler that stores the received response in a
1379      * ResponseHandlerResult for further processing.
1380      */
1381     private class DefaultSendActionCallback implements SendActionCallback, Serializable
1382     {
1383         /***
1384          * Serializable version identifier.
1385          */
1386         private static final long serialVersionUID = 2926598671855316803L;
1387         private final ResponseHandlerResult result;
1388 
1389         /***
1390          * Creates a new instance.
1391          *
1392          * @param result the result to store the response in
1393          */
1394         public DefaultSendActionCallback(ResponseHandlerResult result)
1395         {
1396             this.result = result;
1397         }
1398 
1399         public void onResponse(ManagerResponse response)
1400         {
1401             synchronized (result)
1402             {
1403                 result.setResponse(response);
1404                 result.notify();
1405             }
1406         }
1407     }
1408 
1409     /***
1410      * A combinded event and response handler that adds received events and the
1411      * response to a ResponseEvents object.
1412      */
1413     @SuppressWarnings("unchecked")
1414     private class ResponseEventHandler implements ManagerEventListener, SendActionCallback, Serializable
1415     {
1416         /***
1417          * Serializable version identifier.
1418          */
1419         private static final long serialVersionUID = 2926598671855316803L;
1420         private final ResponseEventsImpl events;
1421         private final Class actionCompleteEventClass;
1422 
1423         /***
1424          * Creates a new instance.
1425          *
1426          * @param events                   the ResponseEventsImpl to store the events in
1427          * @param actionCompleteEventClass the type of event that indicates that
1428          *                                 all events have been received
1429          */
1430         public ResponseEventHandler(ResponseEventsImpl events, Class actionCompleteEventClass)
1431         {
1432             this.events = events;
1433             this.actionCompleteEventClass = actionCompleteEventClass;
1434         }
1435 
1436         public void onManagerEvent(ManagerEvent event)
1437         {
1438             synchronized (events)
1439             {
1440                 // should always be a ResponseEvent, anyway...
1441                 if (event instanceof ResponseEvent)
1442                 {
1443                     ResponseEvent responseEvent;
1444 
1445                     responseEvent = (ResponseEvent) event;
1446                     events.addEvent(responseEvent);
1447                 }
1448 
1449                 // finished?
1450                 if (actionCompleteEventClass.isAssignableFrom(event.getClass()))
1451                 {
1452                     events.setComplete(true);
1453                     // notify if action complete event and response have been
1454                     // received
1455                     if (events.getResponse() != null)
1456                     {
1457                         events.notify();
1458                     }
1459                 }
1460             }
1461         }
1462 
1463         public void onResponse(ManagerResponse response)
1464         {
1465             synchronized (events)
1466             {
1467                 events.setRepsonse(response);
1468                 if (response instanceof ManagerError)
1469                 {
1470                     events.setComplete(true);
1471                 }
1472 
1473                 // finished?
1474                 // notify if action complete event and response have been
1475                 // received
1476                 if (events.isComplete())
1477                 {
1478                     events.notify();
1479                 }
1480             }
1481         }
1482     }
1483 
1484     private class ProtocolIdentifierWrapper
1485     {
1486         String value;
1487     }
1488 }