1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.asteriskjava.manager.internal;
18
19 import static org.asteriskjava.manager.ManagerConnectionState.CONNECTED;
20 import static org.asteriskjava.manager.ManagerConnectionState.CONNECTING;
21 import static org.asteriskjava.manager.ManagerConnectionState.DISCONNECTED;
22 import static org.asteriskjava.manager.ManagerConnectionState.DISCONNECTING;
23 import static org.asteriskjava.manager.ManagerConnectionState.INITIAL;
24 import static org.asteriskjava.manager.ManagerConnectionState.RECONNECTING;
25
26 import java.io.IOException;
27 import java.io.Serializable;
28 import java.net.Socket;
29 import java.security.MessageDigest;
30 import java.security.NoSuchAlgorithmException;
31 import java.util.ArrayList;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.atomic.AtomicLong;
36
37 import org.asteriskjava.AsteriskVersion;
38 import org.asteriskjava.manager.AuthenticationFailedException;
39 import org.asteriskjava.manager.EventTimeoutException;
40 import org.asteriskjava.manager.ManagerConnection;
41 import org.asteriskjava.manager.ManagerConnectionState;
42 import org.asteriskjava.manager.ManagerEventListener;
43 import org.asteriskjava.manager.SendActionCallback;
44 import org.asteriskjava.manager.ResponseEvents;
45 import org.asteriskjava.manager.TimeoutException;
46 import org.asteriskjava.manager.action.ChallengeAction;
47 import org.asteriskjava.manager.action.CommandAction;
48 import org.asteriskjava.manager.action.EventGeneratingAction;
49 import org.asteriskjava.manager.action.LoginAction;
50 import org.asteriskjava.manager.action.LogoffAction;
51 import org.asteriskjava.manager.action.ManagerAction;
52 import org.asteriskjava.manager.event.ConnectEvent;
53 import org.asteriskjava.manager.event.DisconnectEvent;
54 import org.asteriskjava.manager.event.ManagerEvent;
55 import org.asteriskjava.manager.event.ProtocolIdentifierReceivedEvent;
56 import org.asteriskjava.manager.event.ResponseEvent;
57 import org.asteriskjava.manager.response.ChallengeResponse;
58 import org.asteriskjava.manager.response.CommandResponse;
59 import org.asteriskjava.manager.response.ManagerError;
60 import org.asteriskjava.manager.response.ManagerResponse;
61 import org.asteriskjava.util.DateUtil;
62 import org.asteriskjava.util.Log;
63 import org.asteriskjava.util.LogFactory;
64 import org.asteriskjava.util.SocketConnectionFacade;
65 import org.asteriskjava.util.internal.SocketConnectionFacadeImpl;
66 import org.asteriskjava.manager.action.UserEventAction;
67
68 /***
69 * Default implemention of the ManagerConnection interface.
70 * <p>
71 * Generelly avoid direct use of this class. Use the ManagerConnectionFactory to
72 * obtain a ManagerConnection instead.
73 * <p>
74 * When using a dependency injection framework like Spring direct usage for
75 * wiring up beans that require a ManagerConnection property is fine though.
76 * <p>
77 * Note that the DefaultManagerConnection will create one new Thread for reading
78 * data from Asterisk on the first call to on of the login() methods.
79 *
80 * @see org.asteriskjava.manager.ManagerConnectionFactory
81 * @author srt
82 * @version $Id: ManagerConnectionImpl.java 729 2007-05-26 05:16:57Z sprior $
83 */
84 public class ManagerConnectionImpl implements ManagerConnection, Dispatcher
85 {
86 private static final int RECONNECTION_INTERVAL_1 = 50;
87 private static final int RECONNECTION_INTERVAL_2 = 5000;
88 private static final String DEFAULT_HOSTNAME = "localhost";
89 private static final int DEFAULT_PORT = 5038;
90 private static final int RECONNECTION_VERSION_INTERVAL = 500;
91 private static final int MAX_VERSION_ATTEMPTS = 4;
92
93 private static final AtomicLong idCounter = new AtomicLong(0);
94
95 /***
96 * Instance logger.
97 */
98 private final Log logger = LogFactory.getLog(getClass());
99
100 private final long id;
101
102 /***
103 * Used to construct the internalActionId.
104 */
105 private AtomicLong actionIdCounter = new AtomicLong(0);
106
107
108 /***
109 * Hostname of the Asterisk server to connect to.
110 */
111 private String hostname = DEFAULT_HOSTNAME;
112
113 /***
114 * TCP port to connect to.
115 */
116 private int port = DEFAULT_PORT;
117
118 /***
119 * <code>true</code> to use SSL for the connection, <code>false</code>
120 * for a plain text connection.
121 */
122 private boolean ssl = false;
123
124 /***
125 * The username to use for login as defined in Asterisk's
126 * <code>manager.conf</code>.
127 */
128 protected String username;
129
130 /***
131 * The password to use for login as defined in Asterisk's
132 * <code>manager.conf</code>.
133 */
134 protected String password;
135
136 /***
137 * The default timeout to wait for a ManagerResponse after sending a
138 * ManagerAction.
139 */
140 private long defaultResponseTimeout = 2000;
141
142 /***
143 * The default timeout to wait for the last ResponseEvent after sending an
144 * EventGeneratingAction.
145 */
146 private long defaultEventTimeout = 5000;
147
148 /***
149 * The timeout to use when connecting the the Asterisk server.
150 */
151 private int socketTimeout = 0;
152
153 /***
154 * @see Socket#setSoTimeout(int)
155 */
156 private int socketReadTimeout = 0;
157
158 /***
159 * Should we continue to reconnect after an authentication failure?
160 */
161 private boolean keepAliveAfterAuthenticationFailure = true;
162
163 /***
164 * The socket to use for TCP/IP communication with Asterisk.
165 */
166 private SocketConnectionFacade socket;
167
168 /***
169 * The thread that runs the reader.
170 */
171 private Thread readerThread;
172 private final AtomicLong readerThreadCounter = new AtomicLong(0);
173
174 /***
175 * The thread that performs reconnect.
176 */
177 private Thread reconnectThread;
178 private final AtomicLong reconnectThreadCounter = new AtomicLong(0);
179
180 /***
181 * The reader to use to receive events and responses from asterisk.
182 */
183 private ManagerReader reader;
184
185 /***
186 * The writer to use to send actions to asterisk.
187 */
188 private ManagerWriter writer;
189
190 /***
191 * The protocol identifer Asterisk sends on connect wrapped into an object
192 * to be used as mutex.
193 */
194 private final ProtocolIdentifierWrapper protocolIdentifier;
195
196 /***
197 * The version of the Asterisk server we are connected to.
198 */
199 private AsteriskVersion version;
200
201 /***
202 * Contains the registered handlers that process the ManagerResponses.
203 * <p>
204 * Key is the internalActionId of the Action sent and value the
205 * corresponding ResponseListener.
206 */
207 private final Map<String, SendActionCallback> responseListeners;
208
209 /***
210 * Contains the event handlers that handle ResponseEvents for the
211 * sendEventGeneratingAction methods.
212 * <p>
213 * Key is the internalActionId of the Action sent and value the
214 * corresponding EventHandler.
215 */
216 private final Map<String, ManagerEventListener> responseEventListeners;
217
218 /***
219 * Contains the event handlers that users registered.
220 */
221 private final List<ManagerEventListener> eventListeners;
222
223 /***
224 * <code>true</code> while reconnecting.
225 */
226 private boolean reconnecting = false;
227
228 protected ManagerConnectionState state = INITIAL;
229
230 private String eventMask;
231
232 /***
233 * Creates a new instance.
234 */
235 public ManagerConnectionImpl()
236 {
237 this.id = idCounter.getAndIncrement();
238 this.responseListeners = new HashMap<String, SendActionCallback>();
239 this.responseEventListeners = new HashMap<String, ManagerEventListener>();
240 this.eventListeners = new ArrayList<ManagerEventListener>();
241 this.protocolIdentifier = new ProtocolIdentifierWrapper();
242 }
243
244
245
246 protected ManagerReader createReader(Dispatcher dispatcher, Object source)
247 {
248 return new ManagerReaderImpl(dispatcher, source);
249 }
250
251 protected ManagerWriter createWriter()
252 {
253 return new ManagerWriterImpl();
254 }
255
256 /***
257 * Sets the hostname of the asterisk server to connect to.
258 * <p>
259 * Default is <code>localhost</code>.
260 *
261 * @param hostname the hostname to connect to
262 */
263 public void setHostname(String hostname)
264 {
265 this.hostname = hostname;
266 }
267
268 /***
269 * Sets the port to use to connect to the asterisk server. This is the port
270 * specified in asterisk's <code>manager.conf</code> file.
271 * <p>
272 * Default is 5038.
273 *
274 * @param port the port to connect to
275 */
276 public void setPort(int port)
277 {
278 if (port <= 0)
279 {
280 this.port = DEFAULT_PORT;
281 }
282 else
283 {
284 this.port = port;
285 }
286 }
287
288 /***
289 * Sets whether to use SSL.
290 * <p>
291 * Default is false.
292 *
293 * @param ssl <code>true</code> to use SSL for the connection,
294 * <code>false</code> for a plain text connection.
295 * @since 0.3
296 */
297 public void setSsl(boolean ssl)
298 {
299 this.ssl = ssl;
300 }
301
302 /***
303 * Sets the username to use to connect to the asterisk server. This is the
304 * username specified in asterisk's <code>manager.conf</code> file.
305 *
306 * @param username the username to use for login
307 */
308 public void setUsername(String username)
309 {
310 this.username = username;
311 }
312
313 /***
314 * Sets the password to use to connect to the asterisk server. This is the
315 * password specified in Asterisk's <code>manager.conf</code> file.
316 *
317 * @param password the password to use for login
318 */
319 public void setPassword(String password)
320 {
321 this.password = password;
322 }
323
324 /***
325 * Sets the time in milliseconds the synchronous method
326 * {@link #sendAction(ManagerAction)} will wait for a response before
327 * throwing a TimeoutException.
328 * <p>
329 * Default is 2000.
330 *
331 * @param defaultResponseTimeout default response timeout in milliseconds
332 * @since 0.2
333 */
334 public void setDefaultResponseTimeout(long defaultResponseTimeout)
335 {
336 this.defaultResponseTimeout = defaultResponseTimeout;
337 }
338
339 /***
340 * Sets the time in milliseconds the synchronous method
341 * {@link #sendEventGeneratingAction(EventGeneratingAction)} will wait for a
342 * response and the last response event before throwing a TimeoutException.
343 * <p>
344 * Default is 5000.
345 *
346 * @param defaultEventTimeout default event timeout in milliseconds
347 * @since 0.2
348 */
349 public void setDefaultEventTimeout(long defaultEventTimeout)
350 {
351 this.defaultEventTimeout = defaultEventTimeout;
352 }
353
354 /***
355 * Set to <code>true</code> to try reconnecting to ther asterisk serve
356 * even if the reconnection attempt threw an AuthenticationFailedException.
357 * <p>
358 * Default is <code>true</code>.
359 *
360 * @param keepAliveAfterAuthenticationFailure <code>true</code> to try reconnecting to ther asterisk serve
361 * even if the reconnection attempt threw an AuthenticationFailedException,
362 * <code>false</code> otherwise.
363 */
364 public void setKeepAliveAfterAuthenticationFailure(boolean keepAliveAfterAuthenticationFailure)
365 {
366 this.keepAliveAfterAuthenticationFailure = keepAliveAfterAuthenticationFailure;
367 }
368
369
370
371 public String getUsername()
372 {
373 return username;
374 }
375
376 public String getPassword()
377 {
378 return password;
379 }
380
381 public String getHostname()
382 {
383 return hostname;
384 }
385
386 public int getPort()
387 {
388 return port;
389 }
390
391 public boolean isSsl()
392 {
393 return ssl;
394 }
395
396 public void registerUserEventClass(Class userEventClass)
397 {
398 if (reader == null)
399 {
400 reader = createReader(this, this);
401 }
402
403 reader.registerEventClass(userEventClass);
404 }
405
406 public void setSocketTimeout(int socketTimeout)
407 {
408 this.socketTimeout = socketTimeout;
409 }
410
411 public void setSocketReadTimeout(int socketReadTimeout)
412 {
413 this.socketReadTimeout = socketReadTimeout;
414 }
415
416 public synchronized void login() throws IOException, AuthenticationFailedException, TimeoutException
417 {
418 login(null);
419 }
420
421 public synchronized void login(String eventMask) throws IOException, AuthenticationFailedException, TimeoutException
422 {
423 if (state != INITIAL && state != DISCONNECTED)
424 {
425 throw new IllegalStateException("Login may only be perfomed when in state "
426 + "INITIAL or DISCONNECTED, but connection is in state " + state);
427 }
428
429 state = CONNECTING;
430 this.eventMask = eventMask;
431 try
432 {
433 doLogin(defaultResponseTimeout, eventMask);
434 }
435 finally
436 {
437 if (state != CONNECTED)
438 {
439 state = DISCONNECTED;
440 }
441 }
442 }
443
444 /***
445 * Does the real login, following the steps outlined below.
446 * <p>
447 * <ol>
448 * <li>Connects to the asterisk server by calling {@link #connect()} if not
449 * already connected
450 * <li>Waits until the protocol identifier is received but not longer than
451 * timeout ms.
452 * <li>Sends a {@link ChallengeAction} requesting a challenge for authType
453 * MD5.
454 * <li>When the {@link ChallengeResponse} is received a {@link LoginAction}
455 * is sent using the calculated key (MD5 hash of the password appended to
456 * the received challenge).
457 * </ol>
458 *
459 * @param timeout the maximum time to wait for the protocol identifier (in
460 * ms)
461 * @param eventMask the event mask. Set to "on" if all events should be
462 * send, "off" if not events should be sent or a combination of
463 * "system", "call" and "log" (separated by ',') to specify what
464 * kind of events should be sent.
465 * @throws IOException if there is an i/o problem.
466 * @throws AuthenticationFailedException if username or password are
467 * incorrect and the login action returns an error or if the MD5
468 * hash cannot be computed. The connection is closed in this
469 * case.
470 * @throws TimeoutException if a timeout occurs while waiting for the
471 * protocol identifier. The connection is closed in this case.
472 */
473 protected synchronized void doLogin(long timeout, String eventMask) throws IOException, AuthenticationFailedException,
474 TimeoutException
475 {
476 ChallengeAction challengeAction;
477 ManagerResponse challengeResponse;
478 String challenge;
479 String key;
480 LoginAction loginAction;
481 ManagerResponse loginResponse;
482
483 if (socket == null)
484 {
485 connect();
486 }
487
488 synchronized (protocolIdentifier)
489 {
490 if (protocolIdentifier.value == null)
491 {
492 try
493 {
494 protocolIdentifier.wait(timeout);
495 }
496 catch (InterruptedException e)
497 {
498
499 }
500 }
501
502 if (protocolIdentifier.value == null)
503 {
504 disconnect();
505 if (reader != null && reader.getTerminationException() != null)
506 {
507 throw reader.getTerminationException();
508 }
509 else
510 {
511 throw new TimeoutException("Timeout waiting for protocol identifier");
512 }
513 }
514 }
515
516 challengeAction = new ChallengeAction("MD5");
517 try
518 {
519 challengeResponse = sendAction(challengeAction);
520 }
521 catch (Exception e)
522 {
523 disconnect();
524 throw new AuthenticationFailedException("Unable to send challenge action", e);
525 }
526
527 if (challengeResponse instanceof ChallengeResponse)
528 {
529 challenge = ((ChallengeResponse) challengeResponse).getChallenge();
530 }
531 else
532 {
533 disconnect();
534 throw new AuthenticationFailedException("Unable to get challenge from Asterisk. ChallengeAction returned: "
535 + challengeResponse.getMessage());
536 }
537
538 try
539 {
540 MessageDigest md;
541
542 md = MessageDigest.getInstance("MD5");
543 if (challenge != null)
544 {
545 md.update(challenge.getBytes());
546 }
547 if (password != null)
548 {
549 md.update(password.getBytes());
550 }
551 key = ManagerUtil.toHexString(md.digest());
552 }
553 catch (NoSuchAlgorithmException ex)
554 {
555 disconnect();
556 throw new AuthenticationFailedException("Unable to create login key using MD5 Message Digest", ex);
557 }
558
559 loginAction = new LoginAction(username, "MD5", key, eventMask);
560 try
561 {
562 loginResponse = sendAction(loginAction);
563 }
564 catch (Exception e)
565 {
566 disconnect();
567 throw new AuthenticationFailedException("Unable to send login action", e);
568 }
569
570 if (loginResponse instanceof ManagerError)
571 {
572 disconnect();
573 throw new AuthenticationFailedException(loginResponse.getMessage());
574 }
575
576 state = CONNECTED;
577
578 logger.info("Successfully logged in");
579
580 version = determineVersion();
581 writer.setTargetVersion(version);
582
583 logger.info("Determined Asterisk version: " + version);
584
585
586 ConnectEvent connectEvent = new ConnectEvent(this);
587 connectEvent.setProtocolIdentifier(getProtocolIdentifier());
588 connectEvent.setDateReceived(DateUtil.getDate());
589
590 fireEvent(connectEvent);
591 }
592
593 protected AsteriskVersion determineVersion() throws IOException, TimeoutException
594 {
595 int attempts = 0;
596 while(attempts < MAX_VERSION_ATTEMPTS)
597 {
598 ManagerResponse showVersionFilesResponse;
599
600
601 showVersionFilesResponse = sendAction(new CommandAction("show version files pbx.c"), defaultResponseTimeout * 2);
602 if (showVersionFilesResponse instanceof CommandResponse)
603 {
604 List<String> showVersionFilesResult;
605
606 showVersionFilesResult = ((CommandResponse) showVersionFilesResponse).getResult();
607 if (showVersionFilesResult != null && showVersionFilesResult.size() > 0)
608 {
609 String line1;
610
611 line1 = showVersionFilesResult.get(0);
612 if (line1 != null && line1.startsWith("File"))
613 {
614 final String rawVersion;
615
616 rawVersion = getRawVersion();
617 if (rawVersion != null && rawVersion.startsWith("Asterisk 1.4"))
618 {
619 return AsteriskVersion.ASTERISK_1_4;
620 }
621
622 return AsteriskVersion.ASTERISK_1_2;
623 }
624 else if (line1 != null && line1.contains("No such command"))
625 {
626 try
627 {
628 attempts++;
629 Thread.sleep(RECONNECTION_VERSION_INTERVAL);
630 }
631 catch (Exception ex)
632 {
633
634 }
635 }
636 else
637 {
638
639 break;
640 }
641 }
642 }
643 }
644
645 return AsteriskVersion.ASTERISK_1_0;
646 }
647
648 protected String getRawVersion()
649 {
650 final ManagerResponse showVersionResponse;
651
652 try
653 {
654 showVersionResponse = sendAction(new CommandAction("show version"), defaultResponseTimeout * 2);
655 }
656 catch (Exception e)
657 {
658 return null;
659 }
660
661 if (showVersionResponse instanceof CommandResponse)
662 {
663 final List<String> showVersionResult;
664
665 showVersionResult = ((CommandResponse) showVersionResponse).getResult();
666 if (showVersionResult != null && showVersionResult.size() > 0)
667 {
668 return showVersionResult.get(0);
669 }
670 }
671
672 return null;
673 }
674
675 protected synchronized void connect() throws IOException
676 {
677 logger.info("Connecting to " + hostname + ":" + port);
678
679 if (reader == null)
680 {
681 logger.debug("Creating reader for " + hostname + ":" + port);
682 reader = createReader(this, this);
683 }
684
685 if (writer == null)
686 {
687 logger.debug("Creating writer");
688 writer = createWriter();
689 }
690
691 logger.debug("Creating socket");
692 socket = createSocket();
693
694 logger.debug("Passing socket to reader");
695 reader.setSocket(socket);
696
697 if (readerThread == null || !readerThread.isAlive() || reader.isDead())
698 {
699 logger.debug("Creating and starting reader thread");
700 readerThread = new Thread(reader);
701 readerThread.setName("Asterisk-Java ManagerConnection-" + id + "-Reader-"
702 + readerThreadCounter.getAndIncrement());
703 readerThread.setDaemon(true);
704 readerThread.start();
705 }
706
707 logger.debug("Passing socket to writer");
708 writer.setSocket(socket);
709 }
710
711 protected SocketConnectionFacade createSocket() throws IOException
712 {
713 return new SocketConnectionFacadeImpl(hostname, port, ssl, socketTimeout, socketReadTimeout);
714 }
715
716 public synchronized void logoff() throws IllegalStateException
717 {
718 if (state != CONNECTED && state != RECONNECTING)
719 {
720 throw new IllegalStateException("Logoff may only be perfomed when in state "
721 + "CONNECTED or RECONNECTING, but connection is in state " + state);
722 }
723
724 state = DISCONNECTING;
725
726 if (socket != null && !reconnecting)
727 {
728 try
729 {
730 sendAction(new LogoffAction());
731 }
732 catch (Exception e)
733 {
734 logger.warn("Unable to send LogOff action", e);
735 }
736 }
737 cleanup();
738 state = DISCONNECTED;
739 }
740
741 /***
742 * Closes the socket connection.
743 */
744 protected synchronized void disconnect()
745 {
746 if (socket != null)
747 {
748 logger.info("Closing socket.");
749 try
750 {
751 socket.close();
752 }
753 catch (IOException ex)
754 {
755 logger.warn("Unable to close socket: " + ex.getMessage());
756 }
757 socket = null;
758 }
759 protocolIdentifier.value = null;
760 }
761
762 public ManagerResponse sendAction(ManagerAction action) throws IOException, TimeoutException, IllegalArgumentException,
763 IllegalStateException
764 {
765 return sendAction(action, defaultResponseTimeout);
766 }
767
768
769
770
771 public ManagerResponse sendAction(ManagerAction action, long timeout) throws IOException, TimeoutException,
772 IllegalArgumentException, IllegalStateException
773 {
774 ResponseHandlerResult result;
775 SendActionCallback callbackHandler;
776
777 result = new ResponseHandlerResult();
778 callbackHandler = new DefaultSendActionCallback(result);
779
780 synchronized (result)
781 {
782 sendAction(action, callbackHandler);
783
784
785 if(action instanceof UserEventAction)
786 return null;
787
788
789
790 if (result.getResponse() == null)
791 {
792 try
793 {
794 result.wait(timeout);
795 }
796 catch (InterruptedException ex)
797 {
798 logger.warn("Interrupted while waiting for result");
799 }
800 }
801 }
802
803
804 if (result.getResponse() == null)
805 {
806 throw new TimeoutException("Timeout waiting for response to " + action.getAction()
807 + (action.getActionId() == null ? "" : " (actionId: " + action.getActionId() + ")"));
808 }
809
810 return result.getResponse();
811 }
812
813 public void sendAction(ManagerAction action, SendActionCallback callback) throws IOException, IllegalArgumentException,
814 IllegalStateException
815 {
816 final String internalActionId;
817
818 if (action == null)
819 {
820 throw new IllegalArgumentException("Unable to send action: action is null.");
821 }
822
823
824
825 if ((state == CONNECTING || state == RECONNECTING)
826 && (action instanceof ChallengeAction || action instanceof LoginAction))
827 {
828
829 }
830 else if (state == DISCONNECTING && action instanceof LogoffAction)
831 {
832
833 }
834 else if (state != CONNECTED)
835 {
836 throw new IllegalStateException("Actions may only be sent when in state "
837 + "CONNECTED, but connection is in state " + state);
838 }
839
840 if (socket == null)
841 {
842 throw new IllegalStateException("Unable to send " + action.getAction() + " action: socket not connected.");
843 }
844
845 internalActionId = createInternalActionId();
846
847
848
849 if (callback != null)
850 {
851 synchronized (this.responseListeners)
852 {
853 this.responseListeners.put(internalActionId, callback);
854 }
855 }
856
857 writer.sendAction(action, internalActionId);
858 }
859
860 public ResponseEvents sendEventGeneratingAction(EventGeneratingAction action) throws IOException, EventTimeoutException,
861 IllegalArgumentException, IllegalStateException
862 {
863 return sendEventGeneratingAction(action, defaultEventTimeout);
864 }
865
866
867
868
869 public ResponseEvents sendEventGeneratingAction(EventGeneratingAction action, long timeout) throws IOException,
870 EventTimeoutException, IllegalArgumentException, IllegalStateException
871 {
872 final ResponseEventsImpl responseEvents;
873 final ResponseEventHandler responseEventHandler;
874 final String internalActionId;
875
876 if (action == null)
877 {
878 throw new IllegalArgumentException("Unable to send action: action is null.");
879 }
880 else if (action.getActionCompleteEventClass() == null)
881 {
882 throw new IllegalArgumentException("Unable to send action: actionCompleteEventClass for "
883 + action.getClass().getName() + " is null.");
884 }
885 else if (!ResponseEvent.class.isAssignableFrom(action.getActionCompleteEventClass()))
886 {
887 throw new IllegalArgumentException("Unable to send action: actionCompleteEventClass ("
888 + action.getActionCompleteEventClass().getName() + ") for " + action.getClass().getName()
889 + " is not a ResponseEvent.");
890 }
891
892 if (state != CONNECTED)
893 {
894 throw new IllegalStateException("Actions may only be sent when in state "
895 + "CONNECTED, but connection is in state " + state);
896 }
897
898 responseEvents = new ResponseEventsImpl();
899 responseEventHandler = new ResponseEventHandler(responseEvents, action.getActionCompleteEventClass());
900
901 internalActionId = createInternalActionId();
902
903
904 synchronized (this.responseListeners)
905 {
906 this.responseListeners.put(internalActionId, responseEventHandler);
907 }
908
909
910 synchronized (this.responseEventListeners)
911 {
912 this.responseEventListeners.put(internalActionId, responseEventHandler);
913 }
914
915 synchronized (responseEvents)
916 {
917 writer.sendAction(action, internalActionId);
918
919 if ((responseEvents.getResponse() == null || !responseEvents.isComplete()))
920 {
921 try
922 {
923 responseEvents.wait(timeout);
924 }
925 catch (InterruptedException e)
926 {
927 logger.warn("Interrupted while waiting for response events.");
928 }
929 }
930 }
931
932
933 if ((responseEvents.getResponse() == null || !responseEvents.isComplete()))
934 {
935
936 synchronized (this.responseEventListeners)
937 {
938 this.responseEventListeners.remove(internalActionId);
939 }
940
941 throw new EventTimeoutException("Timeout waiting for response or response events to " + action.getAction()
942 + (action.getActionId() == null ? "" : " (actionId: " + action.getActionId() + ")"), responseEvents);
943 }
944
945
946
947
948 synchronized (this.responseEventListeners)
949 {
950 this.responseEventListeners.remove(internalActionId);
951 }
952
953 return responseEvents;
954 }
955
956 /***
957 * Creates a new unique internal action id based on the hash code of this
958 * connection and a sequence.
959 *
960 * @return a new internal action id
961 * @see ManagerUtil#addInternalActionId(String, String)
962 * @see ManagerUtil#getInternalActionId(String)
963 * @see ManagerUtil#stripInternalActionId(String)
964 */
965 private String createInternalActionId()
966 {
967 final StringBuffer sb;
968
969 sb = new StringBuffer();
970 sb.append(this.hashCode());
971 sb.append("_");
972 sb.append(actionIdCounter.getAndIncrement());
973
974 return sb.toString();
975 }
976
977 public void addEventListener(final ManagerEventListener listener)
978 {
979 synchronized (this.eventListeners)
980 {
981
982 if (!this.eventListeners.contains(listener))
983 {
984 this.eventListeners.add(listener);
985 }
986 }
987 }
988
989 public void removeEventListener(final ManagerEventListener listener)
990 {
991 synchronized (this.eventListeners)
992 {
993 if (this.eventListeners.contains(listener))
994 {
995 this.eventListeners.remove(listener);
996 }
997 }
998 }
999
1000 public String getProtocolIdentifier()
1001 {
1002 return protocolIdentifier.value;
1003 }
1004
1005 public ManagerConnectionState getState()
1006 {
1007 return state;
1008 }
1009
1010
1011
1012 /***
1013 * This method is called by the reader whenever a {@link ManagerResponse} is
1014 * received. The response is dispatched to the associated
1015 * {@link SendActionCallback}.
1016 *
1017 * @param response the response received by the reader
1018 * @see ManagerReader
1019 */
1020 public void dispatchResponse(ManagerResponse response)
1021 {
1022 final String actionId;
1023 String internalActionId;
1024 SendActionCallback listener;
1025
1026
1027 if (response == null)
1028 {
1029 logger.error("Unable to dispatch null response. This should never happen. Please file a bug.");
1030 return;
1031 }
1032
1033 actionId = response.getActionId();
1034 internalActionId = null;
1035 listener = null;
1036
1037 if (actionId != null)
1038 {
1039 internalActionId = ManagerUtil.getInternalActionId(actionId);
1040 response.setActionId(ManagerUtil.stripInternalActionId(actionId));
1041 }
1042
1043 logger.debug("Dispatching response with internalActionId '" + internalActionId + "':\n" + response);
1044
1045 if (internalActionId != null)
1046 {
1047 synchronized (this.responseListeners)
1048 {
1049 listener = responseListeners.get(internalActionId);
1050 if (listener != null)
1051 {
1052 this.responseListeners.remove(internalActionId);
1053 }
1054 else
1055 {
1056
1057
1058 logger.debug("No response listener registered for " + "internalActionId '" + internalActionId + "'");
1059 }
1060 }
1061 }
1062 else
1063 {
1064 logger.error("Unable to retrieve internalActionId from response: " + "actionId '" + actionId + "':\n" + response);
1065 }
1066
1067 if (listener != null)
1068 {
1069 try
1070 {
1071 listener.onResponse(response);
1072 }
1073 catch (Exception e)
1074 {
1075 logger.warn("Unexpected exception in response listener " + listener.getClass().getName(), e);
1076 }
1077 }
1078 }
1079
1080 /***
1081 * This method is called by the reader whenever a ManagerEvent is received.
1082 * The event is dispatched to all registered ManagerEventHandlers.
1083 *
1084 * @param event the event received by the reader
1085 * @see #addEventListener(ManagerEventListener)
1086 * @see #removeEventListener(ManagerEventListener)
1087 * @see ManagerReader
1088 */
1089 public void dispatchEvent(ManagerEvent event)
1090 {
1091
1092 if (event == null)
1093 {
1094 logger.error("Unable to dispatch null event. This should never happen. Please file a bug.");
1095 return;
1096 }
1097
1098 logger.debug("Dispatching event:\n" + event.toString());
1099
1100
1101
1102
1103
1104
1105 if (event instanceof ResponseEvent)
1106 {
1107 ResponseEvent responseEvent;
1108 String internalActionId;
1109
1110 responseEvent = (ResponseEvent) event;
1111 internalActionId = responseEvent.getInternalActionId();
1112 if (internalActionId != null)
1113 {
1114 synchronized (responseEventListeners)
1115 {
1116 ManagerEventListener listener;
1117
1118 listener = responseEventListeners.get(internalActionId);
1119 if (listener != null)
1120 {
1121 try
1122 {
1123 listener.onManagerEvent(event);
1124 }
1125 catch (Exception e)
1126 {
1127 logger.warn("Unexpected exception in response event listener " + listener.getClass().getName(),
1128 e);
1129 }
1130 }
1131 }
1132 }
1133 else
1134 {
1135
1136
1137
1138
1139
1140
1141
1142 }
1143 }
1144 if (event instanceof DisconnectEvent)
1145 {
1146
1147
1148 if (state == CONNECTED)
1149 {
1150 state = RECONNECTING;
1151
1152
1153
1154
1155 cleanup();
1156 reconnectThread = new Thread(new Runnable()
1157 {
1158 public void run()
1159 {
1160 reconnect();
1161 }
1162 });
1163 reconnectThread.setName("Asterisk-Java ManagerConnection-" + id + "-Reconnect-"
1164 + reconnectThreadCounter.getAndIncrement());
1165 reconnectThread.setDaemon(true);
1166 reconnectThread.start();
1167
1168
1169
1170
1171
1172 }
1173 else
1174 {
1175
1176
1177 return;
1178 }
1179 }
1180 if (event instanceof ProtocolIdentifierReceivedEvent)
1181 {
1182 ProtocolIdentifierReceivedEvent protocolIdentifierReceivedEvent;
1183 String protocolIdentifier;
1184
1185 protocolIdentifierReceivedEvent = (ProtocolIdentifierReceivedEvent) event;
1186 protocolIdentifier = protocolIdentifierReceivedEvent.getProtocolIdentifier();
1187 setProtocolIdentifier(protocolIdentifier);
1188
1189 return;
1190 }
1191
1192 fireEvent(event);
1193 }
1194
1195 /***
1196 * Notifies all {@link ManagerEventListener}s registered by users.
1197 *
1198 * @param event the event to propagate
1199 */
1200 private void fireEvent(ManagerEvent event)
1201 {
1202 synchronized (eventListeners)
1203 {
1204 for (ManagerEventListener listener : eventListeners)
1205 {
1206 try
1207 {
1208 listener.onManagerEvent(event);
1209 }
1210 catch (RuntimeException e)
1211 {
1212 logger.warn("Unexpected exception in eventHandler " + listener.getClass().getName(), e);
1213 }
1214 }
1215 }
1216 }
1217
1218 /***
1219 * This method is called when a {@link ProtocolIdentifierReceivedEvent} is
1220 * received from the reader. Having received a correct protocol identifier
1221 * is the precodition for logging in.
1222 *
1223 * @param identifier the protocol version used by the Asterisk server.
1224 */
1225 private void setProtocolIdentifier(final String identifier)
1226 {
1227 logger.info("Connected via " + identifier);
1228
1229 if (!"Asterisk Call Manager/1.0".equals(identifier)
1230 && !"Asterisk Call Manager/1.2".equals(identifier)
1231 && !"OpenPBX Call Manager/1.0".equals(identifier))
1232 {
1233 logger.warn("Unsupported protocol version '" + identifier + "'. Use at your own risk.");
1234 }
1235
1236 synchronized (protocolIdentifier)
1237 {
1238 protocolIdentifier.value = identifier;
1239 protocolIdentifier.notify();
1240 }
1241 }
1242
1243 /***
1244 * Reconnects to the asterisk server when the connection is lost.
1245 * <p>
1246 * While keepAlive is <code>true</code> we will try to reconnect.
1247 * Reconnection attempts will be stopped when the {@link #logoff()} method
1248 * is called or when the login after a successful reconnect results in an
1249 * {@link AuthenticationFailedException} suggesting that the manager
1250 * credentials have changed and keepAliveAfterAuthenticationFailure is not
1251 * set.
1252 * <p>
1253 * This method is called when a {@link DisconnectEvent} is received from the
1254 * reader.
1255 */
1256 private void reconnect()
1257 {
1258 int numTries;
1259
1260
1261 numTries = 0;
1262 while (state == RECONNECTING)
1263 {
1264 try
1265 {
1266 if (numTries < 10)
1267 {
1268
1269
1270 Thread.sleep(RECONNECTION_INTERVAL_1);
1271 }
1272 else
1273 {
1274
1275
1276 Thread.sleep(RECONNECTION_INTERVAL_2);
1277 }
1278 }
1279 catch (InterruptedException e1)
1280 {
1281
1282 }
1283
1284 try
1285 {
1286 connect();
1287
1288 try
1289 {
1290 doLogin(defaultResponseTimeout, eventMask);
1291 logger.info("Successfully reconnected.");
1292
1293
1294
1295 break;
1296 }
1297 catch (AuthenticationFailedException e1)
1298 {
1299 if (keepAliveAfterAuthenticationFailure)
1300 {
1301 logger.error("Unable to log in after reconnect: " + e1.getMessage());
1302 }
1303 else
1304 {
1305 logger.error("Unable to log in after reconnect: " + e1.getMessage() + ". Giving up.");
1306 state = DISCONNECTED;
1307 }
1308 }
1309 catch (TimeoutException e1)
1310 {
1311
1312 logger.error("TimeoutException while trying to log in " + "after reconnect.");
1313 }
1314 }
1315 catch (IOException e)
1316 {
1317
1318
1319 logger.warn("Exception while trying to reconnect: " + e.getMessage());
1320 }
1321 numTries++;
1322 }
1323 }
1324
1325 private void cleanup()
1326 {
1327 disconnect();
1328 this.readerThread = null;
1329 }
1330
1331 @Override
1332 public String toString()
1333 {
1334 StringBuffer sb;
1335
1336 sb = new StringBuffer("ManagerConnection[");
1337 sb.append("id='").append(id).append("',");
1338 sb.append("hostname='").append(hostname).append("',");
1339 sb.append("port=").append(port).append(",");
1340 sb.append("systemHashcode=").append(System.identityHashCode(this)).append("]");
1341
1342 return sb.toString();
1343 }
1344
1345
1346
1347 /***
1348 * A simple data object to store a ManagerResult.
1349 */
1350 private class ResponseHandlerResult implements Serializable
1351 {
1352 /***
1353 * Serializable version identifier.
1354 */
1355 private static final long serialVersionUID = 7831097958568769220L;
1356 private ManagerResponse response;
1357
1358 public ResponseHandlerResult()
1359 {
1360 }
1361
1362 public ManagerResponse getResponse()
1363 {
1364 return this.response;
1365 }
1366
1367 public void setResponse(ManagerResponse response)
1368 {
1369 this.response = response;
1370 }
1371 }
1372
1373 /***
1374 * A simple response handler that stores the received response in a
1375 * ResponseHandlerResult for further processing.
1376 */
1377 private class DefaultSendActionCallback implements SendActionCallback, Serializable
1378 {
1379 /***
1380 * Serializable version identifier.
1381 */
1382 private static final long serialVersionUID = 2926598671855316803L;
1383 private final ResponseHandlerResult result;
1384
1385 /***
1386 * Creates a new instance.
1387 *
1388 * @param result the result to store the response in
1389 */
1390 public DefaultSendActionCallback(ResponseHandlerResult result)
1391 {
1392 this.result = result;
1393 }
1394
1395 public void onResponse(ManagerResponse response)
1396 {
1397 synchronized (result)
1398 {
1399 result.setResponse(response);
1400 result.notify();
1401 }
1402 }
1403 }
1404
1405 /***
1406 * A combinded event and response handler that adds received events and the
1407 * response to a ResponseEvents object.
1408 */
1409 @SuppressWarnings("unchecked")
1410 private class ResponseEventHandler implements ManagerEventListener, SendActionCallback, Serializable
1411 {
1412 /***
1413 * Serializable version identifier.
1414 */
1415 private static final long serialVersionUID = 2926598671855316803L;
1416 private final ResponseEventsImpl events;
1417 private final Class actionCompleteEventClass;
1418
1419 /***
1420 * Creates a new instance.
1421 *
1422 * @param events the ResponseEventsImpl to store the events in
1423 * @param actionCompleteEventClass the type of event that indicates that
1424 * all events have been received
1425 */
1426 public ResponseEventHandler(ResponseEventsImpl events, Class actionCompleteEventClass)
1427 {
1428 this.events = events;
1429 this.actionCompleteEventClass = actionCompleteEventClass;
1430 }
1431
1432 public void onManagerEvent(ManagerEvent event)
1433 {
1434 synchronized (events)
1435 {
1436
1437 if (event instanceof ResponseEvent)
1438 {
1439 ResponseEvent responseEvent;
1440
1441 responseEvent = (ResponseEvent) event;
1442 events.addEvent(responseEvent);
1443 }
1444
1445
1446 if (actionCompleteEventClass.isAssignableFrom(event.getClass()))
1447 {
1448 events.setComplete(true);
1449
1450
1451 if (events.getResponse() != null)
1452 {
1453 events.notify();
1454 }
1455 }
1456 }
1457 }
1458
1459 public void onResponse(ManagerResponse response)
1460 {
1461 synchronized (events)
1462 {
1463 events.setRepsonse(response);
1464 if (response instanceof ManagerError)
1465 {
1466 events.setComplete(true);
1467 }
1468
1469
1470
1471
1472 if (events.isComplete())
1473 {
1474 events.notify();
1475 }
1476 }
1477 }
1478 }
1479
1480 private class ProtocolIdentifierWrapper
1481 {
1482 String value;
1483 }
1484 }