View Javadoc

1   /*
2    *  Copyright 2004-2006 Stefan Reuter
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   *
16   */
17  package org.asteriskjava.manager.internal;
18  
19  import static org.asteriskjava.manager.ManagerConnectionState.CONNECTED;
20  import static org.asteriskjava.manager.ManagerConnectionState.CONNECTING;
21  import static org.asteriskjava.manager.ManagerConnectionState.DISCONNECTED;
22  import static org.asteriskjava.manager.ManagerConnectionState.DISCONNECTING;
23  import static org.asteriskjava.manager.ManagerConnectionState.INITIAL;
24  import static org.asteriskjava.manager.ManagerConnectionState.RECONNECTING;
25  
26  import java.io.IOException;
27  import java.io.Serializable;
28  import java.net.Socket;
29  import java.security.MessageDigest;
30  import java.security.NoSuchAlgorithmException;
31  import java.util.ArrayList;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  import org.asteriskjava.AsteriskVersion;
38  import org.asteriskjava.manager.AuthenticationFailedException;
39  import org.asteriskjava.manager.EventTimeoutException;
40  import org.asteriskjava.manager.ManagerConnection;
41  import org.asteriskjava.manager.ManagerConnectionState;
42  import org.asteriskjava.manager.ManagerEventListener;
43  import org.asteriskjava.manager.SendActionCallback;
44  import org.asteriskjava.manager.ResponseEvents;
45  import org.asteriskjava.manager.TimeoutException;
46  import org.asteriskjava.manager.action.ChallengeAction;
47  import org.asteriskjava.manager.action.CommandAction;
48  import org.asteriskjava.manager.action.EventGeneratingAction;
49  import org.asteriskjava.manager.action.LoginAction;
50  import org.asteriskjava.manager.action.LogoffAction;
51  import org.asteriskjava.manager.action.ManagerAction;
52  import org.asteriskjava.manager.event.ConnectEvent;
53  import org.asteriskjava.manager.event.DisconnectEvent;
54  import org.asteriskjava.manager.event.ManagerEvent;
55  import org.asteriskjava.manager.event.ProtocolIdentifierReceivedEvent;
56  import org.asteriskjava.manager.event.ResponseEvent;
57  import org.asteriskjava.manager.response.ChallengeResponse;
58  import org.asteriskjava.manager.response.CommandResponse;
59  import org.asteriskjava.manager.response.ManagerError;
60  import org.asteriskjava.manager.response.ManagerResponse;
61  import org.asteriskjava.util.DateUtil;
62  import org.asteriskjava.util.Log;
63  import org.asteriskjava.util.LogFactory;
64  import org.asteriskjava.util.SocketConnectionFacade;
65  import org.asteriskjava.util.internal.SocketConnectionFacadeImpl;
66  import org.asteriskjava.manager.action.UserEventAction;
67  
68  /***
69   * Default implemention of the ManagerConnection interface.
70   * <p>
71   * Generelly avoid direct use of this class. Use the ManagerConnectionFactory to
72   * obtain a ManagerConnection instead.
73   * <p>
74   * When using a dependency injection framework like Spring direct usage for
75   * wiring up beans that require a ManagerConnection property is fine though.
76   * <p>
77   * Note that the DefaultManagerConnection will create one new Thread for reading
78   * data from Asterisk on the first call to on of the login() methods.
79   * 
80   * @see org.asteriskjava.manager.ManagerConnectionFactory
81   * @author srt
82   * @version $Id: ManagerConnectionImpl.java 729 2007-05-26 05:16:57Z sprior $
83   */
84  public class ManagerConnectionImpl implements ManagerConnection, Dispatcher
85  {
86      private static final int RECONNECTION_INTERVAL_1 = 50;
87      private static final int RECONNECTION_INTERVAL_2 = 5000;
88      private static final String DEFAULT_HOSTNAME = "localhost";
89      private static final int DEFAULT_PORT = 5038;
90      private static final int RECONNECTION_VERSION_INTERVAL = 500;
91      private static final int MAX_VERSION_ATTEMPTS = 4;
92  
93      private static final AtomicLong idCounter = new AtomicLong(0);
94  
95      /***
96       * Instance logger.
97       */
98      private final Log logger = LogFactory.getLog(getClass());
99  
100     private final long id;
101 
102     /***
103      * Used to construct the internalActionId.
104      */
105     private AtomicLong actionIdCounter = new AtomicLong(0);
106 
107     /* Config attributes */
108     /***
109      * Hostname of the Asterisk server to connect to.
110      */
111     private String hostname = DEFAULT_HOSTNAME;
112 
113     /***
114      * TCP port to connect to.
115      */
116     private int port = DEFAULT_PORT;
117 
118     /***
119      * <code>true</code> to use SSL for the connection, <code>false</code>
120      * for a plain text connection.
121      */
122     private boolean ssl = false;
123 
124     /***
125      * The username to use for login as defined in Asterisk's
126      * <code>manager.conf</code>.
127      */
128     protected String username;
129 
130     /***
131      * The password to use for login as defined in Asterisk's
132      * <code>manager.conf</code>.
133      */
134     protected String password;
135 
136     /***
137      * The default timeout to wait for a ManagerResponse after sending a
138      * ManagerAction.
139      */
140     private long defaultResponseTimeout = 2000;
141 
142     /***
143      * The default timeout to wait for the last ResponseEvent after sending an
144      * EventGeneratingAction.
145      */
146     private long defaultEventTimeout = 5000;
147 
148     /***
149      * The timeout to use when connecting the the Asterisk server.
150      */
151     private int socketTimeout = 0;
152 
153     /***
154      * @see Socket#setSoTimeout(int)
155      */
156     private int socketReadTimeout = 0;
157 
158     /***
159      * Should we continue to reconnect after an authentication failure?
160      */
161     private boolean keepAliveAfterAuthenticationFailure = true;
162 
163     /***
164      * The socket to use for TCP/IP communication with Asterisk.
165      */
166     private SocketConnectionFacade socket;
167 
168     /***
169      * The thread that runs the reader.
170      */
171     private Thread readerThread;
172     private final AtomicLong readerThreadCounter = new AtomicLong(0);
173 
174     /***
175      * The thread that performs reconnect.
176      */
177     private Thread reconnectThread;
178     private final AtomicLong reconnectThreadCounter = new AtomicLong(0);
179 
180     /***
181      * The reader to use to receive events and responses from asterisk.
182      */
183     private ManagerReader reader;
184 
185     /***
186      * The writer to use to send actions to asterisk.
187      */
188     private ManagerWriter writer;
189 
190     /***
191      * The protocol identifer Asterisk sends on connect wrapped into an object
192      * to be used as mutex.
193      */
194     private final ProtocolIdentifierWrapper protocolIdentifier;
195 
196     /***
197      * The version of the Asterisk server we are connected to.
198      */
199     private AsteriskVersion version;
200 
201     /***
202      * Contains the registered handlers that process the ManagerResponses.
203      * <p>
204      * Key is the internalActionId of the Action sent and value the
205      * corresponding ResponseListener.
206      */
207     private final Map<String, SendActionCallback> responseListeners;
208 
209     /***
210      * Contains the event handlers that handle ResponseEvents for the
211      * sendEventGeneratingAction methods.
212      * <p>
213      * Key is the internalActionId of the Action sent and value the
214      * corresponding EventHandler.
215      */
216     private final Map<String, ManagerEventListener> responseEventListeners;
217 
218     /***
219      * Contains the event handlers that users registered.
220      */
221     private final List<ManagerEventListener> eventListeners;
222 
223     /***
224      * <code>true</code> while reconnecting.
225      */
226     private boolean reconnecting = false;
227 
228     protected ManagerConnectionState state = INITIAL;
229 
230     private String eventMask;
231 
232     /***
233      * Creates a new instance.
234      */
235     public ManagerConnectionImpl()
236     {
237         this.id = idCounter.getAndIncrement();
238         this.responseListeners = new HashMap<String, SendActionCallback>();
239         this.responseEventListeners = new HashMap<String, ManagerEventListener>();
240         this.eventListeners = new ArrayList<ManagerEventListener>();
241         this.protocolIdentifier = new ProtocolIdentifierWrapper();
242     }
243 
244     // the following two methods can be overriden when running test cases to
245     // return a mock object
246     protected ManagerReader createReader(Dispatcher dispatcher, Object source)
247     {
248         return new ManagerReaderImpl(dispatcher, source);
249     }
250 
251     protected ManagerWriter createWriter()
252     {
253         return new ManagerWriterImpl();
254     }
255 
256     /***
257      * Sets the hostname of the asterisk server to connect to.
258      * <p>
259      * Default is <code>localhost</code>.
260      * 
261      * @param hostname the hostname to connect to
262      */
263     public void setHostname(String hostname)
264     {
265         this.hostname = hostname;
266     }
267 
268     /***
269      * Sets the port to use to connect to the asterisk server. This is the port
270      * specified in asterisk's <code>manager.conf</code> file.
271      * <p>
272      * Default is 5038.
273      * 
274      * @param port the port to connect to
275      */
276     public void setPort(int port)
277     {
278         if (port <= 0)
279         {
280             this.port = DEFAULT_PORT;
281         }
282         else
283         {
284             this.port = port;
285         }
286     }
287 
288     /***
289      * Sets whether to use SSL.
290      * <p>
291      * Default is false.
292      * 
293      * @param ssl <code>true</code> to use SSL for the connection,
294      *            <code>false</code> for a plain text connection.
295      * @since 0.3
296      */
297     public void setSsl(boolean ssl)
298     {
299         this.ssl = ssl;
300     }
301 
302     /***
303      * Sets the username to use to connect to the asterisk server. This is the
304      * username specified in asterisk's <code>manager.conf</code> file.
305      * 
306      * @param username the username to use for login
307      */
308     public void setUsername(String username)
309     {
310         this.username = username;
311     }
312 
313     /***
314      * Sets the password to use to connect to the asterisk server. This is the
315      * password specified in Asterisk's <code>manager.conf</code> file.
316      * 
317      * @param password the password to use for login
318      */
319     public void setPassword(String password)
320     {
321         this.password = password;
322     }
323 
324     /***
325      * Sets the time in milliseconds the synchronous method
326      * {@link #sendAction(ManagerAction)} will wait for a response before
327      * throwing a TimeoutException.
328      * <p>
329      * Default is 2000.
330      * 
331      * @param defaultResponseTimeout default response timeout in milliseconds
332      * @since 0.2
333      */
334     public void setDefaultResponseTimeout(long defaultResponseTimeout)
335     {
336         this.defaultResponseTimeout = defaultResponseTimeout;
337     }
338 
339     /***
340      * Sets the time in milliseconds the synchronous method
341      * {@link #sendEventGeneratingAction(EventGeneratingAction)} will wait for a
342      * response and the last response event before throwing a TimeoutException.
343      * <p>
344      * Default is 5000.
345      * 
346      * @param defaultEventTimeout default event timeout in milliseconds
347      * @since 0.2
348      */
349     public void setDefaultEventTimeout(long defaultEventTimeout)
350     {
351         this.defaultEventTimeout = defaultEventTimeout;
352     }
353 
354     /***
355      * Set to <code>true</code> to try reconnecting to ther asterisk serve
356      * even if the reconnection attempt threw an AuthenticationFailedException.
357      * <p>
358      * Default is <code>true</code>.
359      *
360      * @param keepAliveAfterAuthenticationFailure <code>true</code> to try reconnecting to ther asterisk serve
361      *                                            even if the reconnection attempt threw an AuthenticationFailedException,
362      *                                            <code>false</code> otherwise.
363      */
364     public void setKeepAliveAfterAuthenticationFailure(boolean keepAliveAfterAuthenticationFailure)
365     {
366         this.keepAliveAfterAuthenticationFailure = keepAliveAfterAuthenticationFailure;
367     }
368 
369     /* Implementation of ManagerConnection interface */
370 
371     public String getUsername()
372     {
373         return username;
374     }
375 
376     public String getPassword()
377     {
378         return password;
379     }
380 
381     public String getHostname()
382     {
383         return hostname;
384     }
385 
386     public int getPort()
387     {
388         return port;
389     }
390 
391     public boolean isSsl()
392     {
393         return ssl;
394     }
395 
396     public void registerUserEventClass(Class userEventClass)
397     {
398         if (reader == null)
399         {
400             reader = createReader(this, this);
401         }
402 
403         reader.registerEventClass(userEventClass);
404     }
405 
406     public void setSocketTimeout(int socketTimeout)
407     {
408         this.socketTimeout = socketTimeout;
409     }
410 
411     public void setSocketReadTimeout(int socketReadTimeout)
412     {
413         this.socketReadTimeout = socketReadTimeout;
414     }
415 
416     public synchronized void login() throws IOException, AuthenticationFailedException, TimeoutException
417     {
418         login(null);
419     }
420 
421     public synchronized void login(String eventMask) throws IOException, AuthenticationFailedException, TimeoutException
422     {
423         if (state != INITIAL && state != DISCONNECTED)
424         {
425             throw new IllegalStateException("Login may only be perfomed when in state "
426                     + "INITIAL or DISCONNECTED, but connection is in state " + state);
427         }
428 
429         state = CONNECTING;
430         this.eventMask = eventMask;
431         try
432         {
433             doLogin(defaultResponseTimeout, eventMask);
434         }
435         finally
436         {
437             if (state != CONNECTED)
438             {
439                 state = DISCONNECTED;
440             }
441         }
442     }
443 
444     /***
445      * Does the real login, following the steps outlined below.
446      * <p>
447      * <ol>
448      * <li>Connects to the asterisk server by calling {@link #connect()} if not
449      * already connected
450      * <li>Waits until the protocol identifier is received but not longer than
451      * timeout ms.
452      * <li>Sends a {@link ChallengeAction} requesting a challenge for authType
453      * MD5.
454      * <li>When the {@link ChallengeResponse} is received a {@link LoginAction}
455      * is sent using the calculated key (MD5 hash of the password appended to
456      * the received challenge).
457      * </ol>
458      * 
459      * @param timeout the maximum time to wait for the protocol identifier (in
460      *            ms)
461      * @param eventMask the event mask. Set to "on" if all events should be
462      *            send, "off" if not events should be sent or a combination of
463      *            "system", "call" and "log" (separated by ',') to specify what
464      *            kind of events should be sent.
465      * @throws IOException if there is an i/o problem.
466      * @throws AuthenticationFailedException if username or password are
467      *             incorrect and the login action returns an error or if the MD5
468      *             hash cannot be computed. The connection is closed in this
469      *             case.
470      * @throws TimeoutException if a timeout occurs while waiting for the
471      *             protocol identifier. The connection is closed in this case.
472      */
473     protected synchronized void doLogin(long timeout, String eventMask) throws IOException, AuthenticationFailedException,
474             TimeoutException
475     {
476         ChallengeAction challengeAction;
477         ManagerResponse challengeResponse;
478         String challenge;
479         String key;
480         LoginAction loginAction;
481         ManagerResponse loginResponse;
482 
483         if (socket == null)
484         {
485             connect();
486         }
487 
488         synchronized (protocolIdentifier)
489         {
490             if (protocolIdentifier.value == null)
491             {
492                 try
493                 {
494                     protocolIdentifier.wait(timeout);
495                 }
496                 catch (InterruptedException e) // NOPMD
497                 {
498                     // swallow
499                 }
500             }
501 
502             if (protocolIdentifier.value == null)
503             {
504                 disconnect();
505                 if (reader != null && reader.getTerminationException() != null)
506                 {
507                     throw reader.getTerminationException();
508                 }
509                 else
510                 {
511                     throw new TimeoutException("Timeout waiting for protocol identifier");
512                 }
513             }
514         }
515 
516         challengeAction = new ChallengeAction("MD5");
517         try
518         {
519             challengeResponse = sendAction(challengeAction);
520         }
521         catch (Exception e)
522         {
523             disconnect();
524             throw new AuthenticationFailedException("Unable to send challenge action", e);
525         }
526 
527         if (challengeResponse instanceof ChallengeResponse)
528         {
529             challenge = ((ChallengeResponse) challengeResponse).getChallenge();
530         }
531         else
532         {
533             disconnect();
534             throw new AuthenticationFailedException("Unable to get challenge from Asterisk. ChallengeAction returned: "
535                     + challengeResponse.getMessage());
536         }
537 
538         try
539         {
540             MessageDigest md;
541 
542             md = MessageDigest.getInstance("MD5");
543             if (challenge != null)
544             {
545                 md.update(challenge.getBytes());
546             }
547             if (password != null)
548             {
549                 md.update(password.getBytes());
550             }
551             key = ManagerUtil.toHexString(md.digest());
552         }
553         catch (NoSuchAlgorithmException ex)
554         {
555             disconnect();
556             throw new AuthenticationFailedException("Unable to create login key using MD5 Message Digest", ex);
557         }
558 
559         loginAction = new LoginAction(username, "MD5", key, eventMask);
560         try
561         {
562             loginResponse = sendAction(loginAction);
563         }
564         catch (Exception e)
565         {
566             disconnect();
567             throw new AuthenticationFailedException("Unable to send login action", e);
568         }
569 
570         if (loginResponse instanceof ManagerError)
571         {
572             disconnect();
573             throw new AuthenticationFailedException(loginResponse.getMessage());
574         }
575 
576         state = CONNECTED;
577 
578         logger.info("Successfully logged in");
579 
580         version = determineVersion();
581         writer.setTargetVersion(version);
582 
583         logger.info("Determined Asterisk version: " + version);
584 
585         // generate pseudo event indicating a successful login
586         ConnectEvent connectEvent = new ConnectEvent(this);
587         connectEvent.setProtocolIdentifier(getProtocolIdentifier());
588         connectEvent.setDateReceived(DateUtil.getDate());
589         // TODO could this cause a deadlock?
590         fireEvent(connectEvent);
591     }
592 
593     protected AsteriskVersion determineVersion() throws IOException, TimeoutException
594     {
595         int attempts = 0;
596         while(attempts < MAX_VERSION_ATTEMPTS)
597         {
598 	    	ManagerResponse showVersionFilesResponse;
599 	
600 	        // increase timeout as output is quite large
601 	        showVersionFilesResponse = sendAction(new CommandAction("show version files pbx.c"), defaultResponseTimeout * 2);
602 	        if (showVersionFilesResponse instanceof CommandResponse)
603 	        {
604 	            List<String> showVersionFilesResult;
605 	
606 	            showVersionFilesResult = ((CommandResponse) showVersionFilesResponse).getResult();
607 	            if (showVersionFilesResult != null && showVersionFilesResult.size() > 0)
608 	            {
609 	                String line1;
610 	
611 	                line1 = showVersionFilesResult.get(0);
612                     if (line1 != null && line1.startsWith("File"))
613                     {
614                         final String rawVersion;
615 
616                         rawVersion = getRawVersion();
617                         if (rawVersion != null && rawVersion.startsWith("Asterisk 1.4"))
618                         {
619                             return AsteriskVersion.ASTERISK_1_4;
620                         }
621                     
622                         return AsteriskVersion.ASTERISK_1_2;
623                     }
624                     else if (line1 != null && line1.contains("No such command"))
625                     {
626                         try
627                         {
628                             attempts++;
629                             Thread.sleep(RECONNECTION_VERSION_INTERVAL);
630                         }
631                         catch (Exception ex)
632                         {
633                             // ingnore
634                         }
635                     }
636                     else
637                     {
638                         // if it isn't the "no such command", break and return the lowest version immediately
639                         break;
640                     }
641 	            }
642 	        }
643         }
644 
645         return AsteriskVersion.ASTERISK_1_0;
646     }
647 
648     protected String getRawVersion()
649     {
650         final ManagerResponse showVersionResponse;
651 
652         try
653         {
654             showVersionResponse = sendAction(new CommandAction("show version"), defaultResponseTimeout * 2);
655         }
656         catch (Exception e)
657         {
658             return null;
659         }
660 
661         if (showVersionResponse instanceof CommandResponse)
662         {
663             final List<String> showVersionResult;
664             
665             showVersionResult = ((CommandResponse) showVersionResponse).getResult();
666             if (showVersionResult != null && showVersionResult.size() > 0)
667             {
668                 return showVersionResult.get(0);
669             }
670         }
671 
672         return null;
673     }
674 
675     protected synchronized void connect() throws IOException
676     {
677         logger.info("Connecting to " + hostname + ":" + port);
678 
679         if (reader == null)
680         {
681             logger.debug("Creating reader for " + hostname + ":" + port);
682             reader = createReader(this, this);
683         }
684 
685         if (writer == null)
686         {
687             logger.debug("Creating writer");
688             writer = createWriter();
689         }
690 
691         logger.debug("Creating socket");
692         socket = createSocket();
693 
694         logger.debug("Passing socket to reader");
695         reader.setSocket(socket);
696 
697         if (readerThread == null || !readerThread.isAlive() || reader.isDead())
698         {
699             logger.debug("Creating and starting reader thread");
700             readerThread = new Thread(reader);
701             readerThread.setName("Asterisk-Java ManagerConnection-" + id + "-Reader-"
702                     + readerThreadCounter.getAndIncrement());
703             readerThread.setDaemon(true);
704             readerThread.start();
705         }
706 
707         logger.debug("Passing socket to writer");
708         writer.setSocket(socket);
709     }
710 
711     protected SocketConnectionFacade createSocket() throws IOException
712     {
713         return new SocketConnectionFacadeImpl(hostname, port, ssl, socketTimeout, socketReadTimeout);
714     }
715 
716     public synchronized void logoff() throws IllegalStateException
717     {
718         if (state != CONNECTED && state != RECONNECTING)
719         {
720             throw new IllegalStateException("Logoff may only be perfomed when in state "
721                     + "CONNECTED or RECONNECTING, but connection is in state " + state);
722         }
723 
724         state = DISCONNECTING;
725 
726         if (socket != null && !reconnecting)
727         {
728             try
729             {
730                 sendAction(new LogoffAction());
731             }
732             catch (Exception e)
733             {
734                 logger.warn("Unable to send LogOff action", e);
735             }
736         }
737         cleanup();
738         state = DISCONNECTED;
739     }
740 
741     /***
742      * Closes the socket connection.
743      */
744     protected synchronized void disconnect()
745     {
746         if (socket != null)
747         {
748             logger.info("Closing socket.");
749             try
750             {
751                 socket.close();
752             }
753             catch (IOException ex)
754             {
755                 logger.warn("Unable to close socket: " + ex.getMessage());
756             }
757             socket = null;
758         }
759         protocolIdentifier.value = null;
760     }
761 
762     public ManagerResponse sendAction(ManagerAction action) throws IOException, TimeoutException, IllegalArgumentException,
763             IllegalStateException
764     {
765         return sendAction(action, defaultResponseTimeout);
766     }
767 
768     /*
769      * Implements synchronous sending of "simple" actions.
770      */
771     public ManagerResponse sendAction(ManagerAction action, long timeout) throws IOException, TimeoutException,
772             IllegalArgumentException, IllegalStateException
773     {
774         ResponseHandlerResult result;
775         SendActionCallback callbackHandler;
776 
777         result = new ResponseHandlerResult();
778         callbackHandler = new DefaultSendActionCallback(result);
779 
780         synchronized (result)
781         {
782             sendAction(action, callbackHandler);
783             
784             // definitely return null for the response of user events
785             if(action instanceof UserEventAction)
786                 return null;
787             
788             // only wait if we did not yet receive the response.
789             // Responses may be returned really fast.
790             if (result.getResponse() == null)
791             {
792                 try
793                 {
794                     result.wait(timeout);
795                 }
796                 catch (InterruptedException ex)
797                 {
798                     logger.warn("Interrupted while waiting for result");
799                 }
800             }
801         }
802 
803         // still no response?
804         if (result.getResponse() == null)
805         {
806             throw new TimeoutException("Timeout waiting for response to " + action.getAction()
807                     + (action.getActionId() == null ? "" : " (actionId: " + action.getActionId() + ")"));
808         }
809 
810         return result.getResponse();
811     }
812 
813     public void sendAction(ManagerAction action, SendActionCallback callback) throws IOException, IllegalArgumentException,
814             IllegalStateException
815     {
816         final String internalActionId;
817 
818         if (action == null)
819         {
820             throw new IllegalArgumentException("Unable to send action: action is null.");
821         }
822 
823         // In general sending actions is only allowed while connected, though
824         // there are a few exceptions, these are handled here:
825         if ((state == CONNECTING || state == RECONNECTING)
826                 && (action instanceof ChallengeAction || action instanceof LoginAction))
827         {
828             // when (re-)connecting challenge and login actions are ok.
829         } // NOPMD
830         else if (state == DISCONNECTING && action instanceof LogoffAction)
831         {
832             // when disconnecting logoff action is ok.
833         } // NOPMD
834         else if (state != CONNECTED)
835         {
836             throw new IllegalStateException("Actions may only be sent when in state "
837                     + "CONNECTED, but connection is in state " + state);
838         }
839 
840         if (socket == null)
841         {
842             throw new IllegalStateException("Unable to send " + action.getAction() + " action: socket not connected.");
843         }
844 
845         internalActionId = createInternalActionId();
846 
847         // if the callbackHandler is null the user is obviously not interested
848         // in the response, thats fine.
849         if (callback != null)
850         {
851             synchronized (this.responseListeners)
852             {
853                 this.responseListeners.put(internalActionId, callback);
854             }
855         }
856 
857         writer.sendAction(action, internalActionId);
858     }
859 
860     public ResponseEvents sendEventGeneratingAction(EventGeneratingAction action) throws IOException, EventTimeoutException,
861             IllegalArgumentException, IllegalStateException
862     {
863         return sendEventGeneratingAction(action, defaultEventTimeout);
864     }
865 
866     /*
867      * Implements synchronous sending of event generating actions.
868      */
869     public ResponseEvents sendEventGeneratingAction(EventGeneratingAction action, long timeout) throws IOException,
870             EventTimeoutException, IllegalArgumentException, IllegalStateException
871     {
872         final ResponseEventsImpl responseEvents;
873         final ResponseEventHandler responseEventHandler;
874         final String internalActionId;
875 
876         if (action == null)
877         {
878             throw new IllegalArgumentException("Unable to send action: action is null.");
879         }
880         else if (action.getActionCompleteEventClass() == null)
881         {
882             throw new IllegalArgumentException("Unable to send action: actionCompleteEventClass for "
883                     + action.getClass().getName() + " is null.");
884         }
885         else if (!ResponseEvent.class.isAssignableFrom(action.getActionCompleteEventClass()))
886         {
887             throw new IllegalArgumentException("Unable to send action: actionCompleteEventClass ("
888                     + action.getActionCompleteEventClass().getName() + ") for " + action.getClass().getName()
889                     + " is not a ResponseEvent.");
890         }
891 
892         if (state != CONNECTED)
893         {
894             throw new IllegalStateException("Actions may only be sent when in state "
895                     + "CONNECTED, but connection is in state " + state);
896         }
897 
898         responseEvents = new ResponseEventsImpl();
899         responseEventHandler = new ResponseEventHandler(responseEvents, action.getActionCompleteEventClass());
900 
901         internalActionId = createInternalActionId();
902 
903         // register response handler...
904         synchronized (this.responseListeners)
905         {
906             this.responseListeners.put(internalActionId, responseEventHandler);
907         }
908 
909         // ...and event handler.
910         synchronized (this.responseEventListeners)
911         {
912             this.responseEventListeners.put(internalActionId, responseEventHandler);
913         }
914 
915         synchronized (responseEvents)
916         {
917             writer.sendAction(action, internalActionId);
918             // only wait if response has not yet arrived.
919             if ((responseEvents.getResponse() == null || !responseEvents.isComplete()))
920             {
921                 try
922                 {
923                     responseEvents.wait(timeout);
924                 }
925                 catch (InterruptedException e)
926                 {
927                     logger.warn("Interrupted while waiting for response events.");
928                 }
929             }
930         }
931 
932         // still no response or not all events received and timed out?
933         if ((responseEvents.getResponse() == null || !responseEvents.isComplete()))
934         {
935             // clean up
936             synchronized (this.responseEventListeners)
937             {
938                 this.responseEventListeners.remove(internalActionId);
939             }
940 
941             throw new EventTimeoutException("Timeout waiting for response or response events to " + action.getAction()
942                     + (action.getActionId() == null ? "" : " (actionId: " + action.getActionId() + ")"), responseEvents);
943         }
944 
945         // remove the event handler
946         // Note: The response handler has already been removed
947         // when the response was received
948         synchronized (this.responseEventListeners)
949         {
950             this.responseEventListeners.remove(internalActionId);
951         }
952 
953         return responseEvents;
954     }
955 
956     /***
957      * Creates a new unique internal action id based on the hash code of this
958      * connection and a sequence.
959      *
960      * @return a new internal action id
961      * @see ManagerUtil#addInternalActionId(String, String)
962      * @see ManagerUtil#getInternalActionId(String)
963      * @see ManagerUtil#stripInternalActionId(String)
964      */
965     private String createInternalActionId()
966     {
967         final StringBuffer sb;
968 
969         sb = new StringBuffer();
970         sb.append(this.hashCode());
971         sb.append("_");
972         sb.append(actionIdCounter.getAndIncrement());
973 
974         return sb.toString();
975     }
976 
977     public void addEventListener(final ManagerEventListener listener)
978     {
979         synchronized (this.eventListeners)
980         {
981             // only add it if its not already there
982             if (!this.eventListeners.contains(listener))
983             {
984                 this.eventListeners.add(listener);
985             }
986         }
987     }
988 
989     public void removeEventListener(final ManagerEventListener listener)
990     {
991         synchronized (this.eventListeners)
992         {
993             if (this.eventListeners.contains(listener))
994             {
995                 this.eventListeners.remove(listener);
996             }
997         }
998     }
999 
1000     public String getProtocolIdentifier()
1001     {
1002         return protocolIdentifier.value;
1003     }
1004 
1005     public ManagerConnectionState getState()
1006     {
1007         return state;
1008     }
1009 
1010     /* Implementation of Dispatcher: callbacks for ManagerReader */
1011 
1012     /***
1013      * This method is called by the reader whenever a {@link ManagerResponse} is
1014      * received. The response is dispatched to the associated
1015      * {@link SendActionCallback}.
1016      * 
1017      * @param response the response received by the reader
1018      * @see ManagerReader
1019      */
1020     public void dispatchResponse(ManagerResponse response)
1021     {
1022         final String actionId;
1023         String internalActionId;
1024         SendActionCallback listener;
1025 
1026         // shouldn't happen
1027         if (response == null)
1028         {
1029             logger.error("Unable to dispatch null response. This should never happen. Please file a bug.");
1030             return;
1031         }
1032 
1033         actionId = response.getActionId();
1034         internalActionId = null;
1035         listener = null;
1036 
1037         if (actionId != null)
1038         {
1039             internalActionId = ManagerUtil.getInternalActionId(actionId);
1040             response.setActionId(ManagerUtil.stripInternalActionId(actionId));
1041         }
1042 
1043         logger.debug("Dispatching response with internalActionId '" + internalActionId + "':\n" + response);
1044 
1045         if (internalActionId != null)
1046         {
1047             synchronized (this.responseListeners)
1048             {
1049                 listener = responseListeners.get(internalActionId);
1050                 if (listener != null)
1051                 {
1052                     this.responseListeners.remove(internalActionId);
1053                 }
1054                 else
1055                 {
1056                     // when using the async sendAction it's ok not to register a
1057                     // callback so if we don't find a response handler thats ok
1058                     logger.debug("No response listener registered for " + "internalActionId '" + internalActionId + "'");
1059                 }
1060             }
1061         }
1062         else
1063         {
1064             logger.error("Unable to retrieve internalActionId from response: " + "actionId '" + actionId + "':\n" + response);
1065         }
1066 
1067         if (listener != null)
1068         {
1069             try
1070             {
1071                 listener.onResponse(response);
1072             }
1073             catch (Exception e)
1074             {
1075                 logger.warn("Unexpected exception in response listener " + listener.getClass().getName(), e);
1076             }
1077         }
1078     }
1079 
1080     /***
1081      * This method is called by the reader whenever a ManagerEvent is received.
1082      * The event is dispatched to all registered ManagerEventHandlers.
1083      * 
1084      * @param event the event received by the reader
1085      * @see #addEventListener(ManagerEventListener)
1086      * @see #removeEventListener(ManagerEventListener)
1087      * @see ManagerReader
1088      */
1089     public void dispatchEvent(ManagerEvent event)
1090     {
1091         // shouldn't happen
1092         if (event == null)
1093         {
1094             logger.error("Unable to dispatch null event. This should never happen. Please file a bug.");
1095             return;
1096         }
1097 
1098         logger.debug("Dispatching event:\n" + event.toString());
1099 
1100         // Some events need special treatment besides forwarding them to the
1101         // registered eventListeners (clients)
1102         // These events are handled here at first:
1103 
1104         // Dispatch ResponseEvents to the appropriate responseEventListener
1105         if (event instanceof ResponseEvent)
1106         {
1107             ResponseEvent responseEvent;
1108             String internalActionId;
1109 
1110             responseEvent = (ResponseEvent) event;
1111             internalActionId = responseEvent.getInternalActionId();
1112             if (internalActionId != null)
1113             {
1114                 synchronized (responseEventListeners)
1115                 {
1116                     ManagerEventListener listener;
1117 
1118                     listener = responseEventListeners.get(internalActionId);
1119                     if (listener != null)
1120                     {
1121                         try
1122                         {
1123                             listener.onManagerEvent(event);
1124                         }
1125                         catch (Exception e)
1126                         {
1127                             logger.warn("Unexpected exception in response event listener " + listener.getClass().getName(),
1128                                     e);
1129                         }
1130                     }
1131                 }
1132             }
1133             else
1134             {
1135                 // ResponseEvent without internalActionId:
1136                 // this happens if the same event class is used as response
1137                 // event
1138                 // and as an event that is not triggered by a Manager command
1139                 // Example: QueueMemberStatusEvent.
1140                 // logger.debug("ResponseEvent without "
1141                 // + "internalActionId:\n" + responseEvent);
1142             } // NOPMD
1143         }
1144         if (event instanceof DisconnectEvent)
1145         {
1146             // When we receive get disconnected while we are connected start
1147             // a new reconnect thread and set the state to RECONNECTING.
1148             if (state == CONNECTED)
1149             {
1150                 state = RECONNECTING;
1151                 // close socket if still open and remove reference to
1152                 // readerThread
1153                 // After sending the DisconnectThread that thread will die
1154                 // anyway.
1155                 cleanup();
1156                 reconnectThread = new Thread(new Runnable()
1157                 {
1158                     public void run()
1159                     {
1160                         reconnect();
1161                     }
1162                 });
1163                 reconnectThread.setName("Asterisk-Java ManagerConnection-" + id + "-Reconnect-"
1164                         + reconnectThreadCounter.getAndIncrement());
1165                 reconnectThread.setDaemon(true);
1166                 reconnectThread.start();
1167                 // now the DisconnectEvent is dispatched to registered
1168                 // eventListeners
1169                 // (clients) and after that the ManagerReaderThread is gone.
1170                 // So effectively we replaced the reader thread by a
1171                 // ReconnectThread.
1172             }
1173             else
1174             {
1175                 // when we receive a DisconnectEvent while not connected we
1176                 // ignore it and do not send it to clients
1177                 return;
1178             }
1179         }
1180         if (event instanceof ProtocolIdentifierReceivedEvent)
1181         {
1182             ProtocolIdentifierReceivedEvent protocolIdentifierReceivedEvent;
1183             String protocolIdentifier;
1184 
1185             protocolIdentifierReceivedEvent = (ProtocolIdentifierReceivedEvent) event;
1186             protocolIdentifier = protocolIdentifierReceivedEvent.getProtocolIdentifier();
1187             setProtocolIdentifier(protocolIdentifier);
1188             // no need to send this event to clients
1189             return;
1190         }
1191 
1192         fireEvent(event);
1193     }
1194 
1195     /***
1196      * Notifies all {@link ManagerEventListener}s registered by users.
1197      * 
1198      * @param event the event to propagate
1199      */
1200     private void fireEvent(ManagerEvent event)
1201     {
1202         synchronized (eventListeners)
1203         {
1204             for (ManagerEventListener listener : eventListeners)
1205             {
1206                 try
1207                 {
1208                     listener.onManagerEvent(event);
1209                 }
1210                 catch (RuntimeException e)
1211                 {
1212                     logger.warn("Unexpected exception in eventHandler " + listener.getClass().getName(), e);
1213                 }
1214             }
1215         }
1216     }
1217 
1218     /***
1219      * This method is called when a {@link ProtocolIdentifierReceivedEvent} is
1220      * received from the reader. Having received a correct protocol identifier
1221      * is the precodition for logging in.
1222      * 
1223      * @param identifier the protocol version used by the Asterisk server.
1224      */
1225     private void setProtocolIdentifier(final String identifier)
1226     {
1227         logger.info("Connected via " + identifier);
1228 
1229         if (!"Asterisk Call Manager/1.0".equals(identifier) 
1230                 && !"Asterisk Call Manager/1.2".equals(identifier) // bri stuffed 
1231                 && !"OpenPBX Call Manager/1.0".equals(identifier))
1232         {
1233             logger.warn("Unsupported protocol version '" + identifier + "'. Use at your own risk.");
1234         }
1235 
1236         synchronized (protocolIdentifier)
1237         {
1238             protocolIdentifier.value = identifier;
1239             protocolIdentifier.notify();
1240         }
1241     }
1242 
1243     /***
1244      * Reconnects to the asterisk server when the connection is lost.
1245      * <p>
1246      * While keepAlive is <code>true</code> we will try to reconnect.
1247      * Reconnection attempts will be stopped when the {@link #logoff()} method
1248      * is called or when the login after a successful reconnect results in an
1249      * {@link AuthenticationFailedException} suggesting that the manager
1250      * credentials have changed and keepAliveAfterAuthenticationFailure is not
1251      * set.
1252      * <p>
1253      * This method is called when a {@link DisconnectEvent} is received from the
1254      * reader.
1255      */
1256     private void reconnect()
1257     {
1258         int numTries;
1259 
1260         // try to reconnect
1261         numTries = 0;
1262         while (state == RECONNECTING)
1263         {
1264             try
1265             {
1266                 if (numTries < 10)
1267                 {
1268                     // try to reconnect quite fast for the firt 10 times
1269                     // this succeeds if the server has just been restarted
1270                     Thread.sleep(RECONNECTION_INTERVAL_1);
1271                 }
1272                 else
1273                 {
1274                     // slow down after 10 unsuccessful attempts asuming a
1275                     // shutdown of the server
1276                     Thread.sleep(RECONNECTION_INTERVAL_2);
1277                 }
1278             }
1279             catch (InterruptedException e1)
1280             {
1281                 // ignore
1282             }
1283 
1284             try
1285             {
1286                 connect();
1287 
1288                 try
1289                 {
1290                     doLogin(defaultResponseTimeout, eventMask);
1291                     logger.info("Successfully reconnected.");
1292                     // everything is ok again, so we leave
1293                     // when successful doLogin set the state to CONNECTED so no
1294                     // need to adjust it
1295                     break;
1296                 }
1297                 catch (AuthenticationFailedException e1)
1298                 {
1299                     if (keepAliveAfterAuthenticationFailure)
1300                     {
1301                         logger.error("Unable to log in after reconnect: " + e1.getMessage());
1302                     }
1303                     else
1304                     {
1305                         logger.error("Unable to log in after reconnect: " + e1.getMessage() + ". Giving up.");
1306                         state = DISCONNECTED;
1307                     }
1308                 }
1309                 catch (TimeoutException e1)
1310                 {
1311                     // shouldn't happen - but happens!
1312                     logger.error("TimeoutException while trying to log in " + "after reconnect.");
1313                 }
1314             }
1315             catch (IOException e)
1316             {
1317                 // server seems to be still down, just continue to attempt
1318                 // reconnection
1319                 logger.warn("Exception while trying to reconnect: " + e.getMessage());
1320             }
1321             numTries++;
1322         }
1323     }
1324 
1325     private void cleanup()
1326     {
1327         disconnect();
1328         this.readerThread = null;
1329     }
1330 
1331     @Override
1332    public String toString()
1333     {
1334         StringBuffer sb;
1335 
1336         sb = new StringBuffer("ManagerConnection[");
1337         sb.append("id='").append(id).append("',");
1338         sb.append("hostname='").append(hostname).append("',");
1339         sb.append("port=").append(port).append(",");
1340         sb.append("systemHashcode=").append(System.identityHashCode(this)).append("]");
1341 
1342         return sb.toString();
1343     }
1344 
1345     /* Helper classes */
1346 
1347     /***
1348      * A simple data object to store a ManagerResult.
1349      */
1350     private class ResponseHandlerResult implements Serializable
1351     {
1352         /***
1353          * Serializable version identifier.
1354          */
1355         private static final long serialVersionUID = 7831097958568769220L;
1356         private ManagerResponse response;
1357 
1358         public ResponseHandlerResult()
1359         {
1360         }
1361 
1362         public ManagerResponse getResponse()
1363         {
1364             return this.response;
1365         }
1366 
1367         public void setResponse(ManagerResponse response)
1368         {
1369             this.response = response;
1370         }
1371     }
1372 
1373     /***
1374      * A simple response handler that stores the received response in a
1375      * ResponseHandlerResult for further processing.
1376      */
1377     private class DefaultSendActionCallback implements SendActionCallback, Serializable
1378     {
1379         /***
1380          * Serializable version identifier.
1381          */
1382         private static final long serialVersionUID = 2926598671855316803L;
1383         private final ResponseHandlerResult result;
1384 
1385         /***
1386          * Creates a new instance.
1387          * 
1388          * @param result the result to store the response in
1389          */
1390         public DefaultSendActionCallback(ResponseHandlerResult result)
1391         {
1392             this.result = result;
1393         }
1394 
1395         public void onResponse(ManagerResponse response)
1396         {
1397             synchronized (result)
1398             {
1399                 result.setResponse(response);
1400                 result.notify();
1401             }
1402         }
1403     }
1404 
1405     /***
1406      * A combinded event and response handler that adds received events and the
1407      * response to a ResponseEvents object.
1408      */
1409     @SuppressWarnings("unchecked")
1410     private class ResponseEventHandler implements ManagerEventListener, SendActionCallback, Serializable
1411     {
1412         /***
1413          * Serializable version identifier.
1414          */
1415         private static final long serialVersionUID = 2926598671855316803L;
1416         private final ResponseEventsImpl events;
1417         private final Class actionCompleteEventClass;
1418 
1419         /***
1420          * Creates a new instance.
1421          * 
1422          * @param events the ResponseEventsImpl to store the events in
1423          * @param actionCompleteEventClass the type of event that indicates that
1424          *            all events have been received
1425          */
1426         public ResponseEventHandler(ResponseEventsImpl events, Class actionCompleteEventClass)
1427         {
1428             this.events = events;
1429             this.actionCompleteEventClass = actionCompleteEventClass;
1430         }
1431 
1432         public void onManagerEvent(ManagerEvent event)
1433         {
1434             synchronized (events)
1435             {
1436                 // should always be a ResponseEvent, anyway...
1437                 if (event instanceof ResponseEvent)
1438                 {
1439                     ResponseEvent responseEvent;
1440 
1441                     responseEvent = (ResponseEvent) event;
1442                     events.addEvent(responseEvent);
1443                 }
1444 
1445                 // finished?
1446                 if (actionCompleteEventClass.isAssignableFrom(event.getClass()))
1447                 {
1448                     events.setComplete(true);
1449                     // notify if action complete event and response have been
1450                     // received
1451                     if (events.getResponse() != null)
1452                     {
1453                         events.notify();
1454                     }
1455                 }
1456             }
1457         }
1458 
1459         public void onResponse(ManagerResponse response)
1460         {
1461             synchronized (events)
1462             {
1463                 events.setRepsonse(response);
1464                 if (response instanceof ManagerError)
1465                 {
1466                     events.setComplete(true);
1467                 }
1468 
1469                 // finished?
1470                 // notify if action complete event and response have been
1471                 // received
1472                 if (events.isComplete())
1473                 {
1474                     events.notify();
1475                 }
1476             }
1477         }
1478     }
1479 
1480     private class ProtocolIdentifierWrapper
1481     {
1482         String value;
1483     }
1484 }