1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.asteriskjava.manager.internal;
18
19 import static org.asteriskjava.manager.ManagerConnectionState.CONNECTED;
20 import static org.asteriskjava.manager.ManagerConnectionState.CONNECTING;
21 import static org.asteriskjava.manager.ManagerConnectionState.DISCONNECTED;
22 import static org.asteriskjava.manager.ManagerConnectionState.DISCONNECTING;
23 import static org.asteriskjava.manager.ManagerConnectionState.INITIAL;
24 import static org.asteriskjava.manager.ManagerConnectionState.RECONNECTING;
25
26 import java.io.IOException;
27 import java.io.Serializable;
28 import java.net.Socket;
29 import java.security.MessageDigest;
30 import java.security.NoSuchAlgorithmException;
31 import java.util.ArrayList;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.atomic.AtomicLong;
36
37 import org.asteriskjava.AsteriskVersion;
38 import org.asteriskjava.manager.AuthenticationFailedException;
39 import org.asteriskjava.manager.EventTimeoutException;
40 import org.asteriskjava.manager.ManagerConnection;
41 import org.asteriskjava.manager.ManagerConnectionState;
42 import org.asteriskjava.manager.ManagerEventListener;
43 import org.asteriskjava.manager.SendActionCallback;
44 import org.asteriskjava.manager.ResponseEvents;
45 import org.asteriskjava.manager.TimeoutException;
46 import org.asteriskjava.manager.action.ChallengeAction;
47 import org.asteriskjava.manager.action.CommandAction;
48 import org.asteriskjava.manager.action.EventGeneratingAction;
49 import org.asteriskjava.manager.action.LoginAction;
50 import org.asteriskjava.manager.action.LogoffAction;
51 import org.asteriskjava.manager.action.ManagerAction;
52 import org.asteriskjava.manager.event.ConnectEvent;
53 import org.asteriskjava.manager.event.DisconnectEvent;
54 import org.asteriskjava.manager.event.ManagerEvent;
55 import org.asteriskjava.manager.event.ProtocolIdentifierReceivedEvent;
56 import org.asteriskjava.manager.event.ResponseEvent;
57 import org.asteriskjava.manager.response.ChallengeResponse;
58 import org.asteriskjava.manager.response.CommandResponse;
59 import org.asteriskjava.manager.response.ManagerError;
60 import org.asteriskjava.manager.response.ManagerResponse;
61 import org.asteriskjava.util.DateUtil;
62 import org.asteriskjava.util.Log;
63 import org.asteriskjava.util.LogFactory;
64 import org.asteriskjava.util.SocketConnectionFacade;
65 import org.asteriskjava.util.internal.SocketConnectionFacadeImpl;
66 import org.asteriskjava.manager.action.UserEventAction;
67
68 /***
69 * Default implemention of the ManagerConnection interface.
70 * <p/>
71 * Generelly avoid direct use of this class. Use the ManagerConnectionFactory to
72 * obtain a ManagerConnection instead.
73 * <p/>
74 * When using a dependency injection framework like Spring direct usage for
75 * wiring up beans that require a ManagerConnection property is fine though.
76 * <p/>
77 * Note that the DefaultManagerConnection will create one new Thread for reading
78 * data from Asterisk on the first call to on of the login() methods.
79 *
80 * @author srt
81 * @version $Id: ManagerConnectionImpl.java 837 2007-07-09 06:44:23Z srt $
82 * @see org.asteriskjava.manager.ManagerConnectionFactory
83 */
84 public class ManagerConnectionImpl implements ManagerConnection, Dispatcher
85 {
86 private static final int RECONNECTION_INTERVAL_1 = 50;
87 private static final int RECONNECTION_INTERVAL_2 = 5000;
88 private static final String DEFAULT_HOSTNAME = "localhost";
89 private static final int DEFAULT_PORT = 5038;
90 private static final int RECONNECTION_VERSION_INTERVAL = 500;
91 private static final int MAX_VERSION_ATTEMPTS = 4;
92
93 private static final AtomicLong idCounter = new AtomicLong(0);
94
95 /***
96 * Instance logger.
97 */
98 private final Log logger = LogFactory.getLog(getClass());
99
100 private final long id;
101
102 /***
103 * Used to construct the internalActionId.
104 */
105 private AtomicLong actionIdCounter = new AtomicLong(0);
106
107
108 /***
109 * Hostname of the Asterisk server to connect to.
110 */
111 private String hostname = DEFAULT_HOSTNAME;
112
113 /***
114 * TCP port to connect to.
115 */
116 private int port = DEFAULT_PORT;
117
118 /***
119 * <code>true</code> to use SSL for the connection, <code>false</code>
120 * for a plain text connection.
121 */
122 private boolean ssl = false;
123
124 /***
125 * The username to use for login as defined in Asterisk's
126 * <code>manager.conf</code>.
127 */
128 protected String username;
129
130 /***
131 * The password to use for login as defined in Asterisk's
132 * <code>manager.conf</code>.
133 */
134 protected String password;
135
136 /***
137 * The default timeout to wait for a ManagerResponse after sending a
138 * ManagerAction.
139 */
140 private long defaultResponseTimeout = 2000;
141
142 /***
143 * The default timeout to wait for the last ResponseEvent after sending an
144 * EventGeneratingAction.
145 */
146 private long defaultEventTimeout = 5000;
147
148 /***
149 * The timeout to use when connecting the the Asterisk server.
150 */
151 private int socketTimeout = 0;
152
153 /***
154 * @see Socket#setSoTimeout(int)
155 */
156 private int socketReadTimeout = 0;
157
158 /***
159 * Should we continue to reconnect after an authentication failure?
160 */
161 private boolean keepAliveAfterAuthenticationFailure = true;
162
163 /***
164 * The socket to use for TCP/IP communication with Asterisk.
165 */
166 private SocketConnectionFacade socket;
167
168 /***
169 * The thread that runs the reader.
170 */
171 private Thread readerThread;
172 private final AtomicLong readerThreadCounter = new AtomicLong(0);
173
174 /***
175 * The thread that performs reconnect.
176 */
177 private Thread reconnectThread;
178 private final AtomicLong reconnectThreadCounter = new AtomicLong(0);
179
180 /***
181 * The reader to use to receive events and responses from asterisk.
182 */
183 private ManagerReader reader;
184
185 /***
186 * The writer to use to send actions to asterisk.
187 */
188 private ManagerWriter writer;
189
190 /***
191 * The protocol identifer Asterisk sends on connect wrapped into an object
192 * to be used as mutex.
193 */
194 private final ProtocolIdentifierWrapper protocolIdentifier;
195
196 /***
197 * The version of the Asterisk server we are connected to.
198 */
199 private AsteriskVersion version;
200
201 /***
202 * Contains the registered handlers that process the ManagerResponses.
203 * <p/>
204 * Key is the internalActionId of the Action sent and value the
205 * corresponding ResponseListener.
206 */
207 private final Map<String, SendActionCallback> responseListeners;
208
209 /***
210 * Contains the event handlers that handle ResponseEvents for the
211 * sendEventGeneratingAction methods.
212 * <p/>
213 * Key is the internalActionId of the Action sent and value the
214 * corresponding EventHandler.
215 */
216 private final Map<String, ManagerEventListener> responseEventListeners;
217
218 /***
219 * Contains the event handlers that users registered.
220 */
221 private final List<ManagerEventListener> eventListeners;
222
223 /***
224 * <code>true</code> while reconnecting.
225 */
226 private boolean reconnecting = false;
227
228 protected ManagerConnectionState state = INITIAL;
229
230 private String eventMask;
231
232 /***
233 * Creates a new instance.
234 */
235 public ManagerConnectionImpl()
236 {
237 this.id = idCounter.getAndIncrement();
238 this.responseListeners = new HashMap<String, SendActionCallback>();
239 this.responseEventListeners = new HashMap<String, ManagerEventListener>();
240 this.eventListeners = new ArrayList<ManagerEventListener>();
241 this.protocolIdentifier = new ProtocolIdentifierWrapper();
242 }
243
244
245
246 protected ManagerReader createReader(Dispatcher dispatcher, Object source)
247 {
248 return new ManagerReaderImpl(dispatcher, source);
249 }
250
251 protected ManagerWriter createWriter()
252 {
253 return new ManagerWriterImpl();
254 }
255
256 /***
257 * Sets the hostname of the asterisk server to connect to.
258 * <p/>
259 * Default is <code>localhost</code>.
260 *
261 * @param hostname the hostname to connect to
262 */
263 public void setHostname(String hostname)
264 {
265 this.hostname = hostname;
266 }
267
268 /***
269 * Sets the port to use to connect to the asterisk server. This is the port
270 * specified in asterisk's <code>manager.conf</code> file.
271 * <p/>
272 * Default is 5038.
273 *
274 * @param port the port to connect to
275 */
276 public void setPort(int port)
277 {
278 if (port <= 0)
279 {
280 this.port = DEFAULT_PORT;
281 }
282 else
283 {
284 this.port = port;
285 }
286 }
287
288 /***
289 * Sets whether to use SSL.
290 * <p/>
291 * Default is false.
292 *
293 * @param ssl <code>true</code> to use SSL for the connection,
294 * <code>false</code> for a plain text connection.
295 * @since 0.3
296 */
297 public void setSsl(boolean ssl)
298 {
299 this.ssl = ssl;
300 }
301
302 /***
303 * Sets the username to use to connect to the asterisk server. This is the
304 * username specified in asterisk's <code>manager.conf</code> file.
305 *
306 * @param username the username to use for login
307 */
308 public void setUsername(String username)
309 {
310 this.username = username;
311 }
312
313 /***
314 * Sets the password to use to connect to the asterisk server. This is the
315 * password specified in Asterisk's <code>manager.conf</code> file.
316 *
317 * @param password the password to use for login
318 */
319 public void setPassword(String password)
320 {
321 this.password = password;
322 }
323
324 /***
325 * Sets the time in milliseconds the synchronous method
326 * {@link #sendAction(ManagerAction)} will wait for a response before
327 * throwing a TimeoutException.
328 * <p/>
329 * Default is 2000.
330 *
331 * @param defaultResponseTimeout default response timeout in milliseconds
332 * @since 0.2
333 */
334 public void setDefaultResponseTimeout(long defaultResponseTimeout)
335 {
336 this.defaultResponseTimeout = defaultResponseTimeout;
337 }
338
339 /***
340 * Sets the time in milliseconds the synchronous method
341 * {@link #sendEventGeneratingAction(EventGeneratingAction)} will wait for a
342 * response and the last response event before throwing a TimeoutException.
343 * <p/>
344 * Default is 5000.
345 *
346 * @param defaultEventTimeout default event timeout in milliseconds
347 * @since 0.2
348 */
349 public void setDefaultEventTimeout(long defaultEventTimeout)
350 {
351 this.defaultEventTimeout = defaultEventTimeout;
352 }
353
354 /***
355 * Set to <code>true</code> to try reconnecting to ther asterisk serve
356 * even if the reconnection attempt threw an AuthenticationFailedException.
357 * <p/>
358 * Default is <code>true</code>.
359 *
360 * @param keepAliveAfterAuthenticationFailure
361 * <code>true</code> to try reconnecting to ther asterisk serve
362 * even if the reconnection attempt threw an AuthenticationFailedException,
363 * <code>false</code> otherwise.
364 */
365 public void setKeepAliveAfterAuthenticationFailure(boolean keepAliveAfterAuthenticationFailure)
366 {
367 this.keepAliveAfterAuthenticationFailure = keepAliveAfterAuthenticationFailure;
368 }
369
370
371
372 public String getUsername()
373 {
374 return username;
375 }
376
377 public String getPassword()
378 {
379 return password;
380 }
381
382 public String getHostname()
383 {
384 return hostname;
385 }
386
387 public int getPort()
388 {
389 return port;
390 }
391
392 public boolean isSsl()
393 {
394 return ssl;
395 }
396
397 public void registerUserEventClass(Class userEventClass)
398 {
399 if (reader == null)
400 {
401 reader = createReader(this, this);
402 }
403
404 reader.registerEventClass(userEventClass);
405 }
406
407 public void setSocketTimeout(int socketTimeout)
408 {
409 this.socketTimeout = socketTimeout;
410 }
411
412 public void setSocketReadTimeout(int socketReadTimeout)
413 {
414 this.socketReadTimeout = socketReadTimeout;
415 }
416
417 public synchronized void login() throws IOException, AuthenticationFailedException, TimeoutException
418 {
419 login(null);
420 }
421
422 public synchronized void login(String eventMask) throws IOException, AuthenticationFailedException, TimeoutException
423 {
424 if (state != INITIAL && state != DISCONNECTED)
425 {
426 throw new IllegalStateException("Login may only be perfomed when in state "
427 + "INITIAL or DISCONNECTED, but connection is in state " + state);
428 }
429
430 state = CONNECTING;
431 this.eventMask = eventMask;
432 try
433 {
434 doLogin(defaultResponseTimeout, eventMask);
435 }
436 finally
437 {
438 if (state != CONNECTED)
439 {
440 state = DISCONNECTED;
441 }
442 }
443 }
444
445 /***
446 * Does the real login, following the steps outlined below.
447 * <p/>
448 * <ol>
449 * <li>Connects to the asterisk server by calling {@link #connect()} if not
450 * already connected
451 * <li>Waits until the protocol identifier is received but not longer than
452 * timeout ms.
453 * <li>Sends a {@link ChallengeAction} requesting a challenge for authType
454 * MD5.
455 * <li>When the {@link ChallengeResponse} is received a {@link LoginAction}
456 * is sent using the calculated key (MD5 hash of the password appended to
457 * the received challenge).
458 * </ol>
459 *
460 * @param timeout the maximum time to wait for the protocol identifier (in
461 * ms)
462 * @param eventMask the event mask. Set to "on" if all events should be
463 * send, "off" if not events should be sent or a combination of
464 * "system", "call" and "log" (separated by ',') to specify what
465 * kind of events should be sent.
466 * @throws IOException if there is an i/o problem.
467 * @throws AuthenticationFailedException if username or password are
468 * incorrect and the login action returns an error or if the MD5
469 * hash cannot be computed. The connection is closed in this
470 * case.
471 * @throws TimeoutException if a timeout occurs while waiting for the
472 * protocol identifier. The connection is closed in this case.
473 */
474 protected synchronized void doLogin(long timeout, String eventMask) throws IOException, AuthenticationFailedException,
475 TimeoutException
476 {
477 ChallengeAction challengeAction;
478 ManagerResponse challengeResponse;
479 String challenge;
480 String key;
481 LoginAction loginAction;
482 ManagerResponse loginResponse;
483
484 if (socket == null)
485 {
486 connect();
487 }
488
489 synchronized (protocolIdentifier)
490 {
491 if (protocolIdentifier.value == null)
492 {
493 try
494 {
495 protocolIdentifier.wait(timeout);
496 }
497 catch (InterruptedException e)
498 {
499
500 }
501 }
502
503 if (protocolIdentifier.value == null)
504 {
505 disconnect();
506 if (reader != null && reader.getTerminationException() != null)
507 {
508 throw reader.getTerminationException();
509 }
510 else
511 {
512 throw new TimeoutException("Timeout waiting for protocol identifier");
513 }
514 }
515 }
516
517 challengeAction = new ChallengeAction("MD5");
518 try
519 {
520 challengeResponse = sendAction(challengeAction);
521 }
522 catch (Exception e)
523 {
524 disconnect();
525 throw new AuthenticationFailedException("Unable to send challenge action", e);
526 }
527
528 if (challengeResponse instanceof ChallengeResponse)
529 {
530 challenge = ((ChallengeResponse) challengeResponse).getChallenge();
531 }
532 else
533 {
534 disconnect();
535 throw new AuthenticationFailedException("Unable to get challenge from Asterisk. ChallengeAction returned: "
536 + challengeResponse.getMessage());
537 }
538
539 try
540 {
541 MessageDigest md;
542
543 md = MessageDigest.getInstance("MD5");
544 if (challenge != null)
545 {
546 md.update(challenge.getBytes());
547 }
548 if (password != null)
549 {
550 md.update(password.getBytes());
551 }
552 key = ManagerUtil.toHexString(md.digest());
553 }
554 catch (NoSuchAlgorithmException ex)
555 {
556 disconnect();
557 throw new AuthenticationFailedException("Unable to create login key using MD5 Message Digest", ex);
558 }
559
560 loginAction = new LoginAction(username, "MD5", key, eventMask);
561 try
562 {
563 loginResponse = sendAction(loginAction);
564 }
565 catch (Exception e)
566 {
567 disconnect();
568 throw new AuthenticationFailedException("Unable to send login action", e);
569 }
570
571 if (loginResponse instanceof ManagerError)
572 {
573 disconnect();
574 throw new AuthenticationFailedException(loginResponse.getMessage());
575 }
576
577 state = CONNECTED;
578
579 logger.info("Successfully logged in");
580
581 version = determineVersion();
582 writer.setTargetVersion(version);
583
584 logger.info("Determined Asterisk version: " + version);
585
586
587 ConnectEvent connectEvent = new ConnectEvent(this);
588 connectEvent.setProtocolIdentifier(getProtocolIdentifier());
589 connectEvent.setDateReceived(DateUtil.getDate());
590
591 fireEvent(connectEvent);
592 }
593
594 protected AsteriskVersion determineVersion() throws IOException, TimeoutException
595 {
596 int attempts = 0;
597 while (attempts < MAX_VERSION_ATTEMPTS)
598 {
599 ManagerResponse showVersionFilesResponse;
600
601
602 showVersionFilesResponse = sendAction(new CommandAction("show version files pbx.c"), defaultResponseTimeout * 2);
603 if (showVersionFilesResponse instanceof CommandResponse)
604 {
605 List<String> showVersionFilesResult;
606
607 showVersionFilesResult = ((CommandResponse) showVersionFilesResponse).getResult();
608 if (showVersionFilesResult != null && showVersionFilesResult.size() > 0)
609 {
610 String line1;
611
612 line1 = showVersionFilesResult.get(0);
613 if (line1 != null && line1.startsWith("File"))
614 {
615 final String rawVersion;
616
617 rawVersion = getRawVersion();
618 if (rawVersion != null && rawVersion.startsWith("Asterisk 1.4"))
619 {
620 return AsteriskVersion.ASTERISK_1_4;
621 }
622
623 return AsteriskVersion.ASTERISK_1_2;
624 }
625 else if (line1 != null && line1.contains("No such command"))
626 {
627 try
628 {
629 attempts++;
630 Thread.sleep(RECONNECTION_VERSION_INTERVAL);
631 }
632 catch (Exception ex)
633 {
634
635 }
636 }
637 else
638 {
639
640 break;
641 }
642 }
643 }
644 }
645
646 return AsteriskVersion.ASTERISK_1_0;
647 }
648
649 protected String getRawVersion()
650 {
651 final ManagerResponse showVersionResponse;
652
653 try
654 {
655 showVersionResponse = sendAction(new CommandAction("show version"), defaultResponseTimeout * 2);
656 }
657 catch (Exception e)
658 {
659 return null;
660 }
661
662 if (showVersionResponse instanceof CommandResponse)
663 {
664 final List<String> showVersionResult;
665
666 showVersionResult = ((CommandResponse) showVersionResponse).getResult();
667 if (showVersionResult != null && showVersionResult.size() > 0)
668 {
669 return showVersionResult.get(0);
670 }
671 }
672
673 return null;
674 }
675
676 protected synchronized void connect() throws IOException
677 {
678 logger.info("Connecting to " + hostname + ":" + port);
679
680 if (reader == null)
681 {
682 logger.debug("Creating reader for " + hostname + ":" + port);
683 reader = createReader(this, this);
684 }
685
686 if (writer == null)
687 {
688 logger.debug("Creating writer");
689 writer = createWriter();
690 }
691
692 logger.debug("Creating socket");
693 socket = createSocket();
694
695 logger.debug("Passing socket to reader");
696 reader.setSocket(socket);
697
698 if (readerThread == null || !readerThread.isAlive() || reader.isDead())
699 {
700 logger.debug("Creating and starting reader thread");
701 readerThread = new Thread(reader);
702 readerThread.setName("Asterisk-Java ManagerConnection-" + id + "-Reader-"
703 + readerThreadCounter.getAndIncrement());
704 readerThread.setDaemon(true);
705 readerThread.start();
706 }
707
708 logger.debug("Passing socket to writer");
709 writer.setSocket(socket);
710 }
711
712 protected SocketConnectionFacade createSocket() throws IOException
713 {
714 return new SocketConnectionFacadeImpl(hostname, port, ssl, socketTimeout, socketReadTimeout);
715 }
716
717 public synchronized void logoff() throws IllegalStateException
718 {
719 if (state != CONNECTED && state != RECONNECTING)
720 {
721 throw new IllegalStateException("Logoff may only be perfomed when in state "
722 + "CONNECTED or RECONNECTING, but connection is in state " + state);
723 }
724
725 state = DISCONNECTING;
726
727 if (socket != null && !reconnecting)
728 {
729 try
730 {
731 sendAction(new LogoffAction());
732 }
733 catch (Exception e)
734 {
735 logger.warn("Unable to send LogOff action", e);
736 }
737 }
738 cleanup();
739 state = DISCONNECTED;
740 }
741
742 /***
743 * Closes the socket connection.
744 */
745 protected synchronized void disconnect()
746 {
747 if (socket != null)
748 {
749 logger.info("Closing socket.");
750 try
751 {
752 socket.close();
753 }
754 catch (IOException ex)
755 {
756 logger.warn("Unable to close socket: " + ex.getMessage());
757 }
758 socket = null;
759 }
760 protocolIdentifier.value = null;
761 }
762
763 public ManagerResponse sendAction(ManagerAction action) throws IOException, TimeoutException, IllegalArgumentException,
764 IllegalStateException
765 {
766 return sendAction(action, defaultResponseTimeout);
767 }
768
769
770
771
772 public ManagerResponse sendAction(ManagerAction action, long timeout) throws IOException, TimeoutException,
773 IllegalArgumentException, IllegalStateException
774 {
775 ResponseHandlerResult result;
776 SendActionCallback callbackHandler;
777
778 result = new ResponseHandlerResult();
779 callbackHandler = new DefaultSendActionCallback(result);
780
781 synchronized (result)
782 {
783 sendAction(action, callbackHandler);
784
785
786 if (action instanceof UserEventAction)
787 {
788 return null;
789 }
790
791
792
793 if (result.getResponse() == null)
794 {
795 try
796 {
797 result.wait(timeout);
798 }
799 catch (InterruptedException ex)
800 {
801 logger.warn("Interrupted while waiting for result");
802 }
803 }
804 }
805
806
807 if (result.getResponse() == null)
808 {
809 throw new TimeoutException("Timeout waiting for response to " + action.getAction()
810 + (action.getActionId() == null ? "" : " (actionId: " + action.getActionId() + ")"));
811 }
812
813 return result.getResponse();
814 }
815
816 public void sendAction(ManagerAction action, SendActionCallback callback) throws IOException, IllegalArgumentException,
817 IllegalStateException
818 {
819 final String internalActionId;
820
821 if (action == null)
822 {
823 throw new IllegalArgumentException("Unable to send action: action is null.");
824 }
825
826
827
828 if ((state == CONNECTING || state == RECONNECTING)
829 && (action instanceof ChallengeAction || action instanceof LoginAction))
830 {
831
832 }
833 else if (state == DISCONNECTING && action instanceof LogoffAction)
834 {
835
836 }
837 else if (state != CONNECTED)
838 {
839 throw new IllegalStateException("Actions may only be sent when in state "
840 + "CONNECTED, but connection is in state " + state);
841 }
842
843 if (socket == null)
844 {
845 throw new IllegalStateException("Unable to send " + action.getAction() + " action: socket not connected.");
846 }
847
848 internalActionId = createInternalActionId();
849
850
851
852 if (callback != null)
853 {
854 synchronized (this.responseListeners)
855 {
856 this.responseListeners.put(internalActionId, callback);
857 }
858 }
859
860 writer.sendAction(action, internalActionId);
861 }
862
863 public ResponseEvents sendEventGeneratingAction(EventGeneratingAction action) throws IOException, EventTimeoutException,
864 IllegalArgumentException, IllegalStateException
865 {
866 return sendEventGeneratingAction(action, defaultEventTimeout);
867 }
868
869
870
871
872 public ResponseEvents sendEventGeneratingAction(EventGeneratingAction action, long timeout) throws IOException,
873 EventTimeoutException, IllegalArgumentException, IllegalStateException
874 {
875 final ResponseEventsImpl responseEvents;
876 final ResponseEventHandler responseEventHandler;
877 final String internalActionId;
878
879 if (action == null)
880 {
881 throw new IllegalArgumentException("Unable to send action: action is null.");
882 }
883 else if (action.getActionCompleteEventClass() == null)
884 {
885 throw new IllegalArgumentException("Unable to send action: actionCompleteEventClass for "
886 + action.getClass().getName() + " is null.");
887 }
888 else if (!ResponseEvent.class.isAssignableFrom(action.getActionCompleteEventClass()))
889 {
890 throw new IllegalArgumentException("Unable to send action: actionCompleteEventClass ("
891 + action.getActionCompleteEventClass().getName() + ") for " + action.getClass().getName()
892 + " is not a ResponseEvent.");
893 }
894
895 if (state != CONNECTED)
896 {
897 throw new IllegalStateException("Actions may only be sent when in state "
898 + "CONNECTED, but connection is in state " + state);
899 }
900
901 responseEvents = new ResponseEventsImpl();
902 responseEventHandler = new ResponseEventHandler(responseEvents, action.getActionCompleteEventClass());
903
904 internalActionId = createInternalActionId();
905
906
907 synchronized (this.responseListeners)
908 {
909 this.responseListeners.put(internalActionId, responseEventHandler);
910 }
911
912
913 synchronized (this.responseEventListeners)
914 {
915 this.responseEventListeners.put(internalActionId, responseEventHandler);
916 }
917
918 synchronized (responseEvents)
919 {
920 writer.sendAction(action, internalActionId);
921
922 if ((responseEvents.getResponse() == null || !responseEvents.isComplete()))
923 {
924 try
925 {
926 responseEvents.wait(timeout);
927 }
928 catch (InterruptedException e)
929 {
930 logger.warn("Interrupted while waiting for response events.");
931 }
932 }
933 }
934
935
936 if ((responseEvents.getResponse() == null || !responseEvents.isComplete()))
937 {
938
939 synchronized (this.responseEventListeners)
940 {
941 this.responseEventListeners.remove(internalActionId);
942 }
943
944 throw new EventTimeoutException("Timeout waiting for response or response events to " + action.getAction()
945 + (action.getActionId() == null ? "" : " (actionId: " + action.getActionId() + ")"), responseEvents);
946 }
947
948
949
950
951 synchronized (this.responseEventListeners)
952 {
953 this.responseEventListeners.remove(internalActionId);
954 }
955
956 return responseEvents;
957 }
958
959 /***
960 * Creates a new unique internal action id based on the hash code of this
961 * connection and a sequence.
962 *
963 * @return a new internal action id
964 * @see ManagerUtil#addInternalActionId(String,String)
965 * @see ManagerUtil#getInternalActionId(String)
966 * @see ManagerUtil#stripInternalActionId(String)
967 */
968 private String createInternalActionId()
969 {
970 final StringBuffer sb;
971
972 sb = new StringBuffer();
973 sb.append(this.hashCode());
974 sb.append("_");
975 sb.append(actionIdCounter.getAndIncrement());
976
977 return sb.toString();
978 }
979
980 public void addEventListener(final ManagerEventListener listener)
981 {
982 synchronized (this.eventListeners)
983 {
984
985 if (!this.eventListeners.contains(listener))
986 {
987 this.eventListeners.add(listener);
988 }
989 }
990 }
991
992 public void removeEventListener(final ManagerEventListener listener)
993 {
994 synchronized (this.eventListeners)
995 {
996 if (this.eventListeners.contains(listener))
997 {
998 this.eventListeners.remove(listener);
999 }
1000 }
1001 }
1002
1003 public String getProtocolIdentifier()
1004 {
1005 return protocolIdentifier.value;
1006 }
1007
1008 public ManagerConnectionState getState()
1009 {
1010 return state;
1011 }
1012
1013
1014
1015 /***
1016 * This method is called by the reader whenever a {@link ManagerResponse} is
1017 * received. The response is dispatched to the associated
1018 * {@link SendActionCallback}.
1019 *
1020 * @param response the response received by the reader
1021 * @see ManagerReader
1022 */
1023 public void dispatchResponse(ManagerResponse response)
1024 {
1025 final String actionId;
1026 String internalActionId;
1027 SendActionCallback listener;
1028
1029
1030 if (response == null)
1031 {
1032 logger.error("Unable to dispatch null response. This should never happen. Please file a bug.");
1033 return;
1034 }
1035
1036 actionId = response.getActionId();
1037 internalActionId = null;
1038 listener = null;
1039
1040 if (actionId != null)
1041 {
1042 internalActionId = ManagerUtil.getInternalActionId(actionId);
1043 response.setActionId(ManagerUtil.stripInternalActionId(actionId));
1044 }
1045
1046 logger.debug("Dispatching response with internalActionId '" + internalActionId + "':\n" + response);
1047
1048 if (internalActionId != null)
1049 {
1050 synchronized (this.responseListeners)
1051 {
1052 listener = responseListeners.get(internalActionId);
1053 if (listener != null)
1054 {
1055 this.responseListeners.remove(internalActionId);
1056 }
1057 else
1058 {
1059
1060
1061 logger.debug("No response listener registered for " + "internalActionId '" + internalActionId + "'");
1062 }
1063 }
1064 }
1065 else
1066 {
1067 logger.error("Unable to retrieve internalActionId from response: " + "actionId '" + actionId + "':\n" + response);
1068 }
1069
1070 if (listener != null)
1071 {
1072 try
1073 {
1074 listener.onResponse(response);
1075 }
1076 catch (Exception e)
1077 {
1078 logger.warn("Unexpected exception in response listener " + listener.getClass().getName(), e);
1079 }
1080 }
1081 }
1082
1083 /***
1084 * This method is called by the reader whenever a ManagerEvent is received.
1085 * The event is dispatched to all registered ManagerEventHandlers.
1086 *
1087 * @param event the event received by the reader
1088 * @see #addEventListener(ManagerEventListener)
1089 * @see #removeEventListener(ManagerEventListener)
1090 * @see ManagerReader
1091 */
1092 public void dispatchEvent(ManagerEvent event)
1093 {
1094
1095 if (event == null)
1096 {
1097 logger.error("Unable to dispatch null event. This should never happen. Please file a bug.");
1098 return;
1099 }
1100
1101 logger.debug("Dispatching event:\n" + event.toString());
1102
1103
1104
1105
1106
1107
1108 if (event instanceof ResponseEvent)
1109 {
1110 ResponseEvent responseEvent;
1111 String internalActionId;
1112
1113 responseEvent = (ResponseEvent) event;
1114 internalActionId = responseEvent.getInternalActionId();
1115 if (internalActionId != null)
1116 {
1117 synchronized (responseEventListeners)
1118 {
1119 ManagerEventListener listener;
1120
1121 listener = responseEventListeners.get(internalActionId);
1122 if (listener != null)
1123 {
1124 try
1125 {
1126 listener.onManagerEvent(event);
1127 }
1128 catch (Exception e)
1129 {
1130 logger.warn("Unexpected exception in response event listener " + listener.getClass().getName(),
1131 e);
1132 }
1133 }
1134 }
1135 }
1136 else
1137 {
1138
1139
1140
1141
1142
1143
1144
1145 }
1146 }
1147 if (event instanceof DisconnectEvent)
1148 {
1149
1150
1151 if (state == CONNECTED)
1152 {
1153 state = RECONNECTING;
1154
1155
1156
1157
1158 cleanup();
1159 reconnectThread = new Thread(new Runnable()
1160 {
1161 public void run()
1162 {
1163 reconnect();
1164 }
1165 });
1166 reconnectThread.setName("Asterisk-Java ManagerConnection-" + id + "-Reconnect-"
1167 + reconnectThreadCounter.getAndIncrement());
1168 reconnectThread.setDaemon(true);
1169 reconnectThread.start();
1170
1171
1172
1173
1174
1175 }
1176 else
1177 {
1178
1179
1180 return;
1181 }
1182 }
1183 if (event instanceof ProtocolIdentifierReceivedEvent)
1184 {
1185 ProtocolIdentifierReceivedEvent protocolIdentifierReceivedEvent;
1186 String protocolIdentifier;
1187
1188 protocolIdentifierReceivedEvent = (ProtocolIdentifierReceivedEvent) event;
1189 protocolIdentifier = protocolIdentifierReceivedEvent.getProtocolIdentifier();
1190 setProtocolIdentifier(protocolIdentifier);
1191
1192 return;
1193 }
1194
1195 fireEvent(event);
1196 }
1197
1198 /***
1199 * Notifies all {@link ManagerEventListener}s registered by users.
1200 *
1201 * @param event the event to propagate
1202 */
1203 private void fireEvent(ManagerEvent event)
1204 {
1205 synchronized (eventListeners)
1206 {
1207 for (ManagerEventListener listener : eventListeners)
1208 {
1209 try
1210 {
1211 listener.onManagerEvent(event);
1212 }
1213 catch (RuntimeException e)
1214 {
1215 logger.warn("Unexpected exception in eventHandler " + listener.getClass().getName(), e);
1216 }
1217 }
1218 }
1219 }
1220
1221 /***
1222 * This method is called when a {@link ProtocolIdentifierReceivedEvent} is
1223 * received from the reader. Having received a correct protocol identifier
1224 * is the precodition for logging in.
1225 *
1226 * @param identifier the protocol version used by the Asterisk server.
1227 */
1228 private void setProtocolIdentifier(final String identifier)
1229 {
1230 logger.info("Connected via " + identifier);
1231
1232 if (!"Asterisk Call Manager/1.0".equals(identifier)
1233 && !"Asterisk Call Manager/1.2".equals(identifier)
1234 && !"OpenPBX Call Manager/1.0".equals(identifier)
1235 && !"CallWeaver Call Manager/1.0".equals(identifier))
1236 {
1237 logger.warn("Unsupported protocol version '" + identifier + "'. Use at your own risk.");
1238 }
1239
1240 synchronized (protocolIdentifier)
1241 {
1242 protocolIdentifier.value = identifier;
1243 protocolIdentifier.notify();
1244 }
1245 }
1246
1247 /***
1248 * Reconnects to the asterisk server when the connection is lost.
1249 * <p/>
1250 * While keepAlive is <code>true</code> we will try to reconnect.
1251 * Reconnection attempts will be stopped when the {@link #logoff()} method
1252 * is called or when the login after a successful reconnect results in an
1253 * {@link AuthenticationFailedException} suggesting that the manager
1254 * credentials have changed and keepAliveAfterAuthenticationFailure is not
1255 * set.
1256 * <p/>
1257 * This method is called when a {@link DisconnectEvent} is received from the
1258 * reader.
1259 */
1260 private void reconnect()
1261 {
1262 int numTries;
1263
1264
1265 numTries = 0;
1266 while (state == RECONNECTING)
1267 {
1268 try
1269 {
1270 if (numTries < 10)
1271 {
1272
1273
1274 Thread.sleep(RECONNECTION_INTERVAL_1);
1275 }
1276 else
1277 {
1278
1279
1280 Thread.sleep(RECONNECTION_INTERVAL_2);
1281 }
1282 }
1283 catch (InterruptedException e1)
1284 {
1285
1286 }
1287
1288 try
1289 {
1290 connect();
1291
1292 try
1293 {
1294 doLogin(defaultResponseTimeout, eventMask);
1295 logger.info("Successfully reconnected.");
1296
1297
1298
1299 break;
1300 }
1301 catch (AuthenticationFailedException e1)
1302 {
1303 if (keepAliveAfterAuthenticationFailure)
1304 {
1305 logger.error("Unable to log in after reconnect: " + e1.getMessage());
1306 }
1307 else
1308 {
1309 logger.error("Unable to log in after reconnect: " + e1.getMessage() + ". Giving up.");
1310 state = DISCONNECTED;
1311 }
1312 }
1313 catch (TimeoutException e1)
1314 {
1315
1316 logger.error("TimeoutException while trying to log in " + "after reconnect.");
1317 }
1318 }
1319 catch (IOException e)
1320 {
1321
1322
1323 logger.warn("Exception while trying to reconnect: " + e.getMessage());
1324 }
1325 numTries++;
1326 }
1327 }
1328
1329 private void cleanup()
1330 {
1331 disconnect();
1332 this.readerThread = null;
1333 }
1334
1335 @Override
1336 public String toString()
1337 {
1338 StringBuffer sb;
1339
1340 sb = new StringBuffer("ManagerConnection[");
1341 sb.append("id='").append(id).append("',");
1342 sb.append("hostname='").append(hostname).append("',");
1343 sb.append("port=").append(port).append(",");
1344 sb.append("systemHashcode=").append(System.identityHashCode(this)).append("]");
1345
1346 return sb.toString();
1347 }
1348
1349
1350
1351 /***
1352 * A simple data object to store a ManagerResult.
1353 */
1354 private class ResponseHandlerResult implements Serializable
1355 {
1356 /***
1357 * Serializable version identifier.
1358 */
1359 private static final long serialVersionUID = 7831097958568769220L;
1360 private ManagerResponse response;
1361
1362 public ResponseHandlerResult()
1363 {
1364 }
1365
1366 public ManagerResponse getResponse()
1367 {
1368 return this.response;
1369 }
1370
1371 public void setResponse(ManagerResponse response)
1372 {
1373 this.response = response;
1374 }
1375 }
1376
1377 /***
1378 * A simple response handler that stores the received response in a
1379 * ResponseHandlerResult for further processing.
1380 */
1381 private class DefaultSendActionCallback implements SendActionCallback, Serializable
1382 {
1383 /***
1384 * Serializable version identifier.
1385 */
1386 private static final long serialVersionUID = 2926598671855316803L;
1387 private final ResponseHandlerResult result;
1388
1389 /***
1390 * Creates a new instance.
1391 *
1392 * @param result the result to store the response in
1393 */
1394 public DefaultSendActionCallback(ResponseHandlerResult result)
1395 {
1396 this.result = result;
1397 }
1398
1399 public void onResponse(ManagerResponse response)
1400 {
1401 synchronized (result)
1402 {
1403 result.setResponse(response);
1404 result.notify();
1405 }
1406 }
1407 }
1408
1409 /***
1410 * A combinded event and response handler that adds received events and the
1411 * response to a ResponseEvents object.
1412 */
1413 @SuppressWarnings("unchecked")
1414 private class ResponseEventHandler implements ManagerEventListener, SendActionCallback, Serializable
1415 {
1416 /***
1417 * Serializable version identifier.
1418 */
1419 private static final long serialVersionUID = 2926598671855316803L;
1420 private final ResponseEventsImpl events;
1421 private final Class actionCompleteEventClass;
1422
1423 /***
1424 * Creates a new instance.
1425 *
1426 * @param events the ResponseEventsImpl to store the events in
1427 * @param actionCompleteEventClass the type of event that indicates that
1428 * all events have been received
1429 */
1430 public ResponseEventHandler(ResponseEventsImpl events, Class actionCompleteEventClass)
1431 {
1432 this.events = events;
1433 this.actionCompleteEventClass = actionCompleteEventClass;
1434 }
1435
1436 public void onManagerEvent(ManagerEvent event)
1437 {
1438 synchronized (events)
1439 {
1440
1441 if (event instanceof ResponseEvent)
1442 {
1443 ResponseEvent responseEvent;
1444
1445 responseEvent = (ResponseEvent) event;
1446 events.addEvent(responseEvent);
1447 }
1448
1449
1450 if (actionCompleteEventClass.isAssignableFrom(event.getClass()))
1451 {
1452 events.setComplete(true);
1453
1454
1455 if (events.getResponse() != null)
1456 {
1457 events.notify();
1458 }
1459 }
1460 }
1461 }
1462
1463 public void onResponse(ManagerResponse response)
1464 {
1465 synchronized (events)
1466 {
1467 events.setRepsonse(response);
1468 if (response instanceof ManagerError)
1469 {
1470 events.setComplete(true);
1471 }
1472
1473
1474
1475
1476 if (events.isComplete())
1477 {
1478 events.notify();
1479 }
1480 }
1481 }
1482 }
1483
1484 private class ProtocolIdentifierWrapper
1485 {
1486 String value;
1487 }
1488 }