1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.asteriskjava.manager.internal;
18
19 import static org.asteriskjava.manager.ManagerConnectionState.CONNECTED;
20 import static org.asteriskjava.manager.ManagerConnectionState.CONNECTING;
21 import static org.asteriskjava.manager.ManagerConnectionState.DISCONNECTED;
22 import static org.asteriskjava.manager.ManagerConnectionState.DISCONNECTING;
23 import static org.asteriskjava.manager.ManagerConnectionState.INITIAL;
24 import static org.asteriskjava.manager.ManagerConnectionState.RECONNECTING;
25
26 import java.io.IOException;
27 import java.io.Serializable;
28 import java.net.Socket;
29 import java.net.InetAddress;
30 import java.security.MessageDigest;
31 import java.security.NoSuchAlgorithmException;
32 import java.util.ArrayList;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.atomic.AtomicLong;
37 import java.util.regex.Matcher;
38 import java.util.regex.Pattern;
39
40 import org.asteriskjava.AsteriskVersion;
41 import org.asteriskjava.manager.*;
42 import org.asteriskjava.manager.action.ChallengeAction;
43 import org.asteriskjava.manager.action.CommandAction;
44 import org.asteriskjava.manager.action.EventGeneratingAction;
45 import org.asteriskjava.manager.action.LoginAction;
46 import org.asteriskjava.manager.action.LogoffAction;
47 import org.asteriskjava.manager.action.ManagerAction;
48 import org.asteriskjava.manager.event.ConnectEvent;
49 import org.asteriskjava.manager.event.DisconnectEvent;
50 import org.asteriskjava.manager.event.ManagerEvent;
51 import org.asteriskjava.manager.event.ProtocolIdentifierReceivedEvent;
52 import org.asteriskjava.manager.event.ResponseEvent;
53 import org.asteriskjava.manager.response.ChallengeResponse;
54 import org.asteriskjava.manager.response.CommandResponse;
55 import org.asteriskjava.manager.response.ManagerError;
56 import org.asteriskjava.manager.response.ManagerResponse;
57 import org.asteriskjava.util.DateUtil;
58 import org.asteriskjava.util.Log;
59 import org.asteriskjava.util.LogFactory;
60 import org.asteriskjava.util.SocketConnectionFacade;
61 import org.asteriskjava.util.internal.SocketConnectionFacadeImpl;
62 import org.asteriskjava.manager.action.UserEventAction;
63
64
65
66
67
68
69
70
71 public class ManagerConnectionImpl implements ManagerConnection, Dispatcher
72 {
73 private static final int RECONNECTION_INTERVAL_1 = 50;
74 private static final int RECONNECTION_INTERVAL_2 = 5000;
75 private static final String DEFAULT_HOSTNAME = "localhost";
76 private static final int DEFAULT_PORT = 5038;
77 private static final int RECONNECTION_VERSION_INTERVAL = 500;
78 private static final int MAX_VERSION_ATTEMPTS = 4;
79 private static final Pattern SHOW_VERSION_PATTERN = Pattern.compile("^(core )?show version.*");
80
81 private static final AtomicLong idCounter = new AtomicLong(0);
82
83
84
85
86 private final Log logger = LogFactory.getLog(getClass());
87
88 private final long id;
89
90
91
92
93 private AtomicLong actionIdCounter = new AtomicLong(0);
94
95
96
97
98
99 private String hostname = DEFAULT_HOSTNAME;
100
101
102
103
104 private int port = DEFAULT_PORT;
105
106
107
108
109
110 private boolean ssl = false;
111
112
113
114
115
116 protected String username;
117
118
119
120
121
122 protected String password;
123
124
125
126
127
128 private long defaultResponseTimeout = 2000;
129
130
131
132
133
134 private long defaultEventTimeout = 5000;
135
136
137
138
139 private int socketTimeout = 0;
140
141
142
143
144
145
146
147 private int socketReadTimeout = 0;
148
149
150
151
152 private boolean keepAliveAfterAuthenticationFailure = true;
153
154
155
156
157 private SocketConnectionFacade socket;
158
159
160
161
162 private Thread readerThread;
163 private final AtomicLong readerThreadCounter = new AtomicLong(0);
164
165 private final AtomicLong reconnectThreadCounter = new AtomicLong(0);
166
167
168
169
170 private ManagerReader reader;
171
172
173
174
175 private ManagerWriter writer;
176
177
178
179
180
181 private final ProtocolIdentifierWrapper protocolIdentifier;
182
183
184
185
186 private AsteriskVersion version;
187
188
189
190
191
192
193
194 private final Map<String, SendActionCallback> responseListeners;
195
196
197
198
199
200
201
202
203 private final Map<String, ManagerEventListener> responseEventListeners;
204
205
206
207
208 private final List<ManagerEventListener> eventListeners;
209
210 protected ManagerConnectionState state = INITIAL;
211
212 private String eventMask;
213
214
215
216
217 public ManagerConnectionImpl()
218 {
219 this.id = idCounter.getAndIncrement();
220 this.responseListeners = new HashMap<String, SendActionCallback>();
221 this.responseEventListeners = new HashMap<String, ManagerEventListener>();
222 this.eventListeners = new ArrayList<ManagerEventListener>();
223 this.protocolIdentifier = new ProtocolIdentifierWrapper();
224 }
225
226
227
228 protected ManagerReader createReader(Dispatcher dispatcher, Object source)
229 {
230 return new ManagerReaderImpl(dispatcher, source);
231 }
232
233 protected ManagerWriter createWriter()
234 {
235 return new ManagerWriterImpl();
236 }
237
238
239
240
241
242
243
244
245 public void setHostname(String hostname)
246 {
247 this.hostname = hostname;
248 }
249
250
251
252
253
254
255
256
257
258 public void setPort(int port)
259 {
260 if (port <= 0)
261 {
262 this.port = DEFAULT_PORT;
263 }
264 else
265 {
266 this.port = port;
267 }
268 }
269
270
271
272
273
274
275
276
277
278
279 public void setSsl(boolean ssl)
280 {
281 this.ssl = ssl;
282 }
283
284
285
286
287
288
289
290 public void setUsername(String username)
291 {
292 this.username = username;
293 }
294
295
296
297
298
299
300
301 public void setPassword(String password)
302 {
303 this.password = password;
304 }
305
306
307
308
309
310
311
312
313
314
315
316 public void setDefaultResponseTimeout(long defaultResponseTimeout)
317 {
318 this.defaultResponseTimeout = defaultResponseTimeout;
319 }
320
321
322
323
324
325
326
327
328
329
330
331 public void setDefaultEventTimeout(long defaultEventTimeout)
332 {
333 this.defaultEventTimeout = defaultEventTimeout;
334 }
335
336
337
338
339
340
341
342
343
344
345
346
347 public void setKeepAliveAfterAuthenticationFailure(boolean keepAliveAfterAuthenticationFailure)
348 {
349 this.keepAliveAfterAuthenticationFailure = keepAliveAfterAuthenticationFailure;
350 }
351
352
353
354 public String getUsername()
355 {
356 return username;
357 }
358
359 public String getPassword()
360 {
361 return password;
362 }
363
364 public AsteriskVersion getVersion()
365 {
366 return version;
367 }
368
369 public String getHostname()
370 {
371 return hostname;
372 }
373
374 public int getPort()
375 {
376 return port;
377 }
378
379 public boolean isSsl()
380 {
381 return ssl;
382 }
383
384 public InetAddress getLocalAddress()
385 {
386 return socket.getLocalAddress();
387 }
388
389 public int getLocalPort()
390 {
391 return socket.getLocalPort();
392 }
393
394 public InetAddress getRemoteAddress()
395 {
396 return socket.getRemoteAddress();
397 }
398
399 public int getRemotePort()
400 {
401 return socket.getRemotePort();
402 }
403
404 public void registerUserEventClass(Class<? extends ManagerEvent> userEventClass)
405 {
406 if (reader == null)
407 {
408 reader = createReader(this, this);
409 }
410
411 reader.registerEventClass(userEventClass);
412 }
413
414 public void setSocketTimeout(int socketTimeout)
415 {
416 this.socketTimeout = socketTimeout;
417 }
418
419 public void setSocketReadTimeout(int socketReadTimeout)
420 {
421 this.socketReadTimeout = socketReadTimeout;
422 }
423
424 public synchronized void login() throws IOException, AuthenticationFailedException, TimeoutException
425 {
426 login(null);
427 }
428
429 public synchronized void login(String eventMask) throws IOException, AuthenticationFailedException, TimeoutException
430 {
431 if (state != INITIAL && state != DISCONNECTED)
432 {
433 throw new IllegalStateException("Login may only be perfomed when in state "
434 + "INITIAL or DISCONNECTED, but connection is in state " + state);
435 }
436
437 state = CONNECTING;
438 this.eventMask = eventMask;
439 try
440 {
441 doLogin(defaultResponseTimeout, eventMask);
442 }
443 finally
444 {
445 if (state != CONNECTED)
446 {
447 state = DISCONNECTED;
448 }
449 }
450 }
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481 protected synchronized void doLogin(long timeout, String eventMask) throws IOException, AuthenticationFailedException,
482 TimeoutException
483 {
484 ChallengeAction challengeAction;
485 ManagerResponse challengeResponse;
486 String challenge;
487 String key;
488 LoginAction loginAction;
489 ManagerResponse loginResponse;
490
491 if (socket == null)
492 {
493 connect();
494 }
495
496 synchronized (protocolIdentifier)
497 {
498 if (protocolIdentifier.value == null)
499 {
500 try
501 {
502 protocolIdentifier.wait(timeout);
503 }
504 catch (InterruptedException e)
505 {
506 Thread.currentThread().interrupt();
507 }
508 }
509
510 if (protocolIdentifier.value == null)
511 {
512 disconnect();
513 if (reader != null && reader.getTerminationException() != null)
514 {
515 throw reader.getTerminationException();
516 }
517 else
518 {
519 throw new TimeoutException("Timeout waiting for protocol identifier");
520 }
521 }
522 }
523
524 challengeAction = new ChallengeAction("MD5");
525 try
526 {
527 challengeResponse = sendAction(challengeAction);
528 }
529 catch (Exception e)
530 {
531 disconnect();
532 throw new AuthenticationFailedException("Unable to send challenge action", e);
533 }
534
535 if (challengeResponse instanceof ChallengeResponse)
536 {
537 challenge = ((ChallengeResponse) challengeResponse).getChallenge();
538 }
539 else
540 {
541 disconnect();
542 throw new AuthenticationFailedException("Unable to get challenge from Asterisk. ChallengeAction returned: "
543 + challengeResponse.getMessage());
544 }
545
546 try
547 {
548 MessageDigest md;
549
550 md = MessageDigest.getInstance("MD5");
551 if (challenge != null)
552 {
553 md.update(challenge.getBytes());
554 }
555 if (password != null)
556 {
557 md.update(password.getBytes());
558 }
559 key = ManagerUtil.toHexString(md.digest());
560 }
561 catch (NoSuchAlgorithmException ex)
562 {
563 disconnect();
564 throw new AuthenticationFailedException("Unable to create login key using MD5 Message Digest", ex);
565 }
566
567 loginAction = new LoginAction(username, "MD5", key, eventMask);
568 try
569 {
570 loginResponse = sendAction(loginAction);
571 }
572 catch (Exception e)
573 {
574 disconnect();
575 throw new AuthenticationFailedException("Unable to send login action", e);
576 }
577
578 if (loginResponse instanceof ManagerError)
579 {
580 disconnect();
581 throw new AuthenticationFailedException(loginResponse.getMessage());
582 }
583
584 logger.info("Successfully logged in");
585
586 version = determineVersion();
587
588 state = CONNECTED;
589
590 writer.setTargetVersion(version);
591
592 logger.info("Determined Asterisk version: " + version);
593
594
595 ConnectEvent connectEvent = new ConnectEvent(this);
596 connectEvent.setProtocolIdentifier(getProtocolIdentifier());
597 connectEvent.setDateReceived(DateUtil.getDate());
598
599 fireEvent(connectEvent);
600 }
601
602 protected AsteriskVersion determineVersion() throws IOException, TimeoutException
603 {
604 int attempts = 0;
605
606
607
608
609
610
611 while (attempts++ < MAX_VERSION_ATTEMPTS)
612 {
613 final ManagerResponse showVersionFilesResponse;
614 final List<String> showVersionFilesResult;
615
616
617 showVersionFilesResponse = sendAction(new CommandAction("show version files pbx.c"), defaultResponseTimeout * 2);
618 if (!(showVersionFilesResponse instanceof CommandResponse))
619 {
620
621
622
623
624 break;
625 }
626
627 showVersionFilesResult = ((CommandResponse) showVersionFilesResponse).getResult();
628 if (showVersionFilesResult != null && showVersionFilesResult.size() > 0)
629 {
630 final String line1;
631
632 line1 = showVersionFilesResult.get(0);
633 if (line1 != null && line1.startsWith("File"))
634 {
635 final String rawVersion;
636
637 rawVersion = getRawVersion();
638 if (rawVersion != null && rawVersion.startsWith("Asterisk 1.4"))
639 {
640 return AsteriskVersion.ASTERISK_1_4;
641 }
642 return AsteriskVersion.ASTERISK_1_2;
643 }
644 else if (line1 != null && line1.contains("No such command"))
645 {
646 final ManagerResponse coreShowVersionResponse = sendAction(new CommandAction("core show version"), defaultResponseTimeout * 2);
647
648 if(coreShowVersionResponse != null && coreShowVersionResponse instanceof CommandResponse){
649 final List<String> coreShowVersionResult = ((CommandResponse) coreShowVersionResponse).getResult();
650
651 if(coreShowVersionResult != null && coreShowVersionResult.size() > 0){
652 final String coreLine = coreShowVersionResult.get(0);
653
654 if (coreLine != null && coreLine.startsWith("Asterisk 1.6"))
655 {
656 return AsteriskVersion.ASTERISK_1_6;
657 }
658 else if (coreLine != null && coreLine.startsWith("Asterisk 1.8"))
659 {
660 return AsteriskVersion.ASTERISK_1_8;
661 }
662 }
663 }
664
665 try
666 {
667 Thread.sleep(RECONNECTION_VERSION_INTERVAL);
668 }
669 catch (Exception ex)
670 {
671
672 }
673 }
674 else
675 {
676
677 break;
678 }
679 }
680 }
681
682
683 return AsteriskVersion.ASTERISK_1_6;
684 }
685
686 protected String getRawVersion()
687 {
688 final ManagerResponse showVersionResponse;
689
690 try
691 {
692 showVersionResponse = sendAction(new CommandAction("show version"), defaultResponseTimeout * 2);
693 }
694 catch (Exception e)
695 {
696 return null;
697 }
698
699 if (showVersionResponse instanceof CommandResponse)
700 {
701 final List<String> showVersionResult;
702
703 showVersionResult = ((CommandResponse) showVersionResponse).getResult();
704 if (showVersionResult != null && showVersionResult.size() > 0)
705 {
706 return showVersionResult.get(0);
707 }
708 }
709
710 return null;
711 }
712
713 protected synchronized void connect() throws IOException
714 {
715 logger.info("Connecting to " + hostname + ":" + port);
716
717 if (reader == null)
718 {
719 logger.debug("Creating reader for " + hostname + ":" + port);
720 reader = createReader(this, this);
721 }
722
723 if (writer == null)
724 {
725 logger.debug("Creating writer");
726 writer = createWriter();
727 }
728
729 logger.debug("Creating socket");
730 socket = createSocket();
731
732 logger.debug("Passing socket to reader");
733 reader.setSocket(socket);
734
735 if (readerThread == null || !readerThread.isAlive() || reader.isDead())
736 {
737 logger.debug("Creating and starting reader thread");
738 readerThread = new Thread(reader);
739 readerThread.setName("Asterisk-Java ManagerConnection-" + id + "-Reader-"
740 + readerThreadCounter.getAndIncrement());
741 readerThread.setDaemon(true);
742 readerThread.start();
743 }
744
745 logger.debug("Passing socket to writer");
746 writer.setSocket(socket);
747 }
748
749 protected SocketConnectionFacade createSocket() throws IOException
750 {
751 return new SocketConnectionFacadeImpl(hostname, port, ssl, socketTimeout, socketReadTimeout);
752 }
753
754 public synchronized void logoff() throws IllegalStateException
755 {
756 if (state != CONNECTED && state != RECONNECTING)
757 {
758 throw new IllegalStateException("Logoff may only be perfomed when in state "
759 + "CONNECTED or RECONNECTING, but connection is in state " + state);
760 }
761
762 state = DISCONNECTING;
763
764 if (socket != null)
765 {
766 try
767 {
768 sendAction(new LogoffAction());
769 }
770 catch (Exception e)
771 {
772 logger.warn("Unable to send LogOff action", e);
773 }
774 }
775 cleanup();
776 state = DISCONNECTED;
777 }
778
779
780
781
782 protected synchronized void disconnect()
783 {
784 if (socket != null)
785 {
786 logger.info("Closing socket.");
787 try
788 {
789 socket.close();
790 }
791 catch (IOException ex)
792 {
793 logger.warn("Unable to close socket: " + ex.getMessage());
794 }
795 socket = null;
796 }
797 protocolIdentifier.value = null;
798 }
799
800 public ManagerResponse sendAction(ManagerAction action) throws IOException, TimeoutException, IllegalArgumentException,
801 IllegalStateException
802 {
803 return sendAction(action, defaultResponseTimeout);
804 }
805
806
807
808
809 public ManagerResponse sendAction(ManagerAction action, long timeout) throws IOException, TimeoutException,
810 IllegalArgumentException, IllegalStateException
811 {
812 ResponseHandlerResult result;
813 SendActionCallback callbackHandler;
814
815 result = new ResponseHandlerResult();
816 callbackHandler = new DefaultSendActionCallback(result);
817
818 synchronized (result)
819 {
820 sendAction(action, callbackHandler);
821
822
823 if (action instanceof UserEventAction)
824 {
825 return null;
826 }
827
828
829
830 if (result.getResponse() == null)
831 {
832 try
833 {
834 result.wait(timeout);
835 }
836 catch (InterruptedException ex)
837 {
838 logger.warn("Interrupted while waiting for result");
839 Thread.currentThread().interrupt();
840 }
841 }
842 }
843
844
845 if (result.getResponse() == null)
846 {
847 throw new TimeoutException("Timeout waiting for response to " + action.getAction()
848 + (action.getActionId() == null ? "" : " (actionId: " + action.getActionId() + ")"));
849 }
850
851 return result.getResponse();
852 }
853
854 public void sendAction(ManagerAction action, SendActionCallback callback) throws IOException, IllegalArgumentException,
855 IllegalStateException
856 {
857 final String internalActionId;
858
859 if (action == null)
860 {
861 throw new IllegalArgumentException("Unable to send action: action is null.");
862 }
863
864
865
866 if ((state == CONNECTING || state == RECONNECTING)
867 && (action instanceof ChallengeAction || action instanceof LoginAction || isShowVersionCommandAction(action)))
868 {
869
870 }
871 else if (state == DISCONNECTING && action instanceof LogoffAction)
872 {
873
874 }
875 else if (state != CONNECTED)
876 {
877 throw new IllegalStateException("Actions may only be sent when in state "
878 + "CONNECTED, but connection is in state " + state);
879 }
880
881 if (socket == null)
882 {
883 throw new IllegalStateException("Unable to send " + action.getAction() + " action: socket not connected.");
884 }
885
886 internalActionId = createInternalActionId();
887
888
889
890 if (callback != null)
891 {
892 synchronized (this.responseListeners)
893 {
894 this.responseListeners.put(internalActionId, callback);
895 }
896 }
897
898 Class<? extends ManagerResponse> responseClass = getExpectedResponseClass(action.getClass());
899 if (responseClass != null)
900 {
901 reader.expectResponseClass(internalActionId, responseClass);
902 }
903
904 writer.sendAction(action, internalActionId);
905 }
906
907 boolean isShowVersionCommandAction(ManagerAction action)
908 {
909 if (! (action instanceof CommandAction)) {
910 return false;
911 }
912 final Matcher showVersionMatcher = SHOW_VERSION_PATTERN.matcher(((CommandAction)action).getCommand());
913 return showVersionMatcher.matches();
914 }
915
916 private Class<? extends ManagerResponse> getExpectedResponseClass(Class<? extends ManagerAction> actionClass)
917 {
918 final ExpectedResponse annotation = actionClass.getAnnotation(ExpectedResponse.class);
919 if (annotation == null)
920 {
921 return null;
922 }
923
924 return annotation.value();
925 }
926
927 public ResponseEvents sendEventGeneratingAction(EventGeneratingAction action) throws IOException, EventTimeoutException,
928 IllegalArgumentException, IllegalStateException
929 {
930 return sendEventGeneratingAction(action, defaultEventTimeout);
931 }
932
933
934
935
936 public ResponseEvents sendEventGeneratingAction(EventGeneratingAction action, long timeout) throws IOException,
937 EventTimeoutException, IllegalArgumentException, IllegalStateException
938 {
939 final ResponseEventsImpl responseEvents;
940 final ResponseEventHandler responseEventHandler;
941 final String internalActionId;
942
943 if (action == null)
944 {
945 throw new IllegalArgumentException("Unable to send action: action is null.");
946 }
947 else if (action.getActionCompleteEventClass() == null)
948 {
949 throw new IllegalArgumentException("Unable to send action: actionCompleteEventClass for "
950 + action.getClass().getName() + " is null.");
951 }
952 else if (!ResponseEvent.class.isAssignableFrom(action.getActionCompleteEventClass()))
953 {
954 throw new IllegalArgumentException("Unable to send action: actionCompleteEventClass ("
955 + action.getActionCompleteEventClass().getName() + ") for " + action.getClass().getName()
956 + " is not a ResponseEvent.");
957 }
958
959 if (state != CONNECTED)
960 {
961 throw new IllegalStateException("Actions may only be sent when in state "
962 + "CONNECTED but connection is in state " + state);
963 }
964
965 responseEvents = new ResponseEventsImpl();
966 responseEventHandler = new ResponseEventHandler(responseEvents, action.getActionCompleteEventClass());
967
968 internalActionId = createInternalActionId();
969
970
971 synchronized (this.responseListeners)
972 {
973 this.responseListeners.put(internalActionId, responseEventHandler);
974 }
975
976
977 synchronized (this.responseEventListeners)
978 {
979 this.responseEventListeners.put(internalActionId, responseEventHandler);
980 }
981
982 synchronized (responseEvents)
983 {
984 writer.sendAction(action, internalActionId);
985
986 if ((responseEvents.getResponse() == null || !responseEvents.isComplete()))
987 {
988 try
989 {
990 responseEvents.wait(timeout);
991 }
992 catch (InterruptedException e)
993 {
994 logger.warn("Interrupted while waiting for response events.");
995 Thread.currentThread().interrupt();
996 }
997 }
998 }
999
1000
1001 if ((responseEvents.getResponse() == null || !responseEvents.isComplete()))
1002 {
1003
1004 synchronized (this.responseEventListeners)
1005 {
1006 this.responseEventListeners.remove(internalActionId);
1007 }
1008
1009 throw new EventTimeoutException("Timeout waiting for response or response events to " + action.getAction()
1010 + (action.getActionId() == null ? "" : " (actionId: " + action.getActionId() + ")"), responseEvents);
1011 }
1012
1013
1014
1015
1016 synchronized (this.responseEventListeners)
1017 {
1018 this.responseEventListeners.remove(internalActionId);
1019 }
1020
1021 return responseEvents;
1022 }
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033 private String createInternalActionId()
1034 {
1035 final StringBuffer sb;
1036
1037 sb = new StringBuffer();
1038 sb.append(this.hashCode());
1039 sb.append("_");
1040 sb.append(actionIdCounter.getAndIncrement());
1041
1042 return sb.toString();
1043 }
1044
1045 public void addEventListener(final ManagerEventListener listener)
1046 {
1047 synchronized (this.eventListeners)
1048 {
1049
1050 if (!this.eventListeners.contains(listener))
1051 {
1052 this.eventListeners.add(listener);
1053 }
1054 }
1055 }
1056
1057 public void removeEventListener(final ManagerEventListener listener)
1058 {
1059 synchronized (this.eventListeners)
1060 {
1061 if (this.eventListeners.contains(listener))
1062 {
1063 this.eventListeners.remove(listener);
1064 }
1065 }
1066 }
1067
1068 public String getProtocolIdentifier()
1069 {
1070 return protocolIdentifier.value;
1071 }
1072
1073 public ManagerConnectionState getState()
1074 {
1075 return state;
1076 }
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088 public void dispatchResponse(ManagerResponse response)
1089 {
1090 final String actionId;
1091 String internalActionId;
1092 SendActionCallback listener;
1093
1094
1095 if (response == null)
1096 {
1097 logger.error("Unable to dispatch null response. This should never happen. Please file a bug.");
1098 return;
1099 }
1100
1101 actionId = response.getActionId();
1102 internalActionId = null;
1103 listener = null;
1104
1105 if (actionId != null)
1106 {
1107 internalActionId = ManagerUtil.getInternalActionId(actionId);
1108 response.setActionId(ManagerUtil.stripInternalActionId(actionId));
1109 }
1110
1111 logger.debug("Dispatching response with internalActionId '" + internalActionId + "':\n" + response);
1112
1113 if (internalActionId != null)
1114 {
1115 synchronized (this.responseListeners)
1116 {
1117 listener = responseListeners.get(internalActionId);
1118 if (listener != null)
1119 {
1120 this.responseListeners.remove(internalActionId);
1121 }
1122 else
1123 {
1124
1125
1126 logger.debug("No response listener registered for " + "internalActionId '" + internalActionId + "'");
1127 }
1128 }
1129 }
1130 else
1131 {
1132 logger.error("Unable to retrieve internalActionId from response: " + "actionId '" + actionId + "':\n" + response);
1133 }
1134
1135 if (listener != null)
1136 {
1137 try
1138 {
1139 listener.onResponse(response);
1140 }
1141 catch (Exception e)
1142 {
1143 logger.warn("Unexpected exception in response listener " + listener.getClass().getName(), e);
1144 }
1145 }
1146 }
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157 public void dispatchEvent(ManagerEvent event)
1158 {
1159
1160 if (event == null)
1161 {
1162 logger.error("Unable to dispatch null event. This should never happen. Please file a bug.");
1163 return;
1164 }
1165
1166 logger.debug("Dispatching event:\n" + event.toString());
1167
1168
1169
1170
1171
1172
1173 if (event instanceof ResponseEvent)
1174 {
1175 ResponseEvent responseEvent;
1176 String internalActionId;
1177
1178 responseEvent = (ResponseEvent) event;
1179 internalActionId = responseEvent.getInternalActionId();
1180 if (internalActionId != null)
1181 {
1182 synchronized (responseEventListeners)
1183 {
1184 ManagerEventListener listener;
1185
1186 listener = responseEventListeners.get(internalActionId);
1187 if (listener != null)
1188 {
1189 try
1190 {
1191 listener.onManagerEvent(event);
1192 }
1193 catch (Exception e)
1194 {
1195 logger.warn("Unexpected exception in response event listener " + listener.getClass().getName(),
1196 e);
1197 }
1198 }
1199 }
1200 }
1201 else
1202 {
1203
1204
1205
1206
1207
1208
1209
1210 }
1211 }
1212 if (event instanceof DisconnectEvent)
1213 {
1214
1215
1216 if (state == CONNECTED)
1217 {
1218 state = RECONNECTING;
1219
1220
1221
1222
1223 cleanup();
1224 Thread reconnectThread = new Thread(new Runnable()
1225 {
1226 public void run()
1227 {
1228 reconnect();
1229 }
1230 });
1231 reconnectThread.setName("Asterisk-Java ManagerConnection-" + id + "-Reconnect-"
1232 + reconnectThreadCounter.getAndIncrement());
1233 reconnectThread.setDaemon(true);
1234 reconnectThread.start();
1235
1236
1237
1238
1239
1240 }
1241 else
1242 {
1243
1244
1245 return;
1246 }
1247 }
1248 if (event instanceof ProtocolIdentifierReceivedEvent)
1249 {
1250 ProtocolIdentifierReceivedEvent protocolIdentifierReceivedEvent;
1251 String protocolIdentifier;
1252
1253 protocolIdentifierReceivedEvent = (ProtocolIdentifierReceivedEvent) event;
1254 protocolIdentifier = protocolIdentifierReceivedEvent.getProtocolIdentifier();
1255 setProtocolIdentifier(protocolIdentifier);
1256
1257 return;
1258 }
1259
1260 fireEvent(event);
1261 }
1262
1263
1264
1265
1266
1267
1268 private void fireEvent(ManagerEvent event)
1269 {
1270 synchronized (eventListeners)
1271 {
1272 for (ManagerEventListener listener : eventListeners)
1273 {
1274 try
1275 {
1276 listener.onManagerEvent(event);
1277 }
1278 catch (RuntimeException e)
1279 {
1280 logger.warn("Unexpected exception in eventHandler " + listener.getClass().getName(), e);
1281 }
1282 }
1283 }
1284 }
1285
1286
1287
1288
1289
1290
1291
1292
1293 private void setProtocolIdentifier(final String identifier)
1294 {
1295 logger.info("Connected via " + identifier);
1296
1297 if (!"Asterisk Call Manager/1.0".equals(identifier)
1298 && !"Asterisk Call Manager/1.1".equals(identifier)
1299 && !"Asterisk Call Manager/1.2".equals(identifier)
1300 && !"OpenPBX Call Manager/1.0".equals(identifier)
1301 && !"CallWeaver Call Manager/1.0".equals(identifier)
1302 && !(identifier != null && identifier.startsWith("Asterisk Call Manager Proxy/")))
1303 {
1304 logger.warn("Unsupported protocol version '" + identifier + "'. Use at your own risk.");
1305 }
1306
1307 synchronized (protocolIdentifier)
1308 {
1309 protocolIdentifier.value = identifier;
1310 protocolIdentifier.notifyAll();
1311 }
1312 }
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327 private void reconnect()
1328 {
1329 int numTries;
1330
1331
1332 numTries = 0;
1333 while (state == RECONNECTING)
1334 {
1335 try
1336 {
1337 if (numTries < 10)
1338 {
1339
1340
1341 Thread.sleep(RECONNECTION_INTERVAL_1);
1342 }
1343 else
1344 {
1345
1346
1347 Thread.sleep(RECONNECTION_INTERVAL_2);
1348 }
1349 }
1350 catch (InterruptedException e)
1351 {
1352 Thread.currentThread().interrupt();
1353 }
1354
1355 try
1356 {
1357 connect();
1358
1359 try
1360 {
1361 doLogin(defaultResponseTimeout, eventMask);
1362 logger.info("Successfully reconnected.");
1363
1364
1365
1366 break;
1367 }
1368 catch (AuthenticationFailedException e1)
1369 {
1370 if (keepAliveAfterAuthenticationFailure)
1371 {
1372 logger.error("Unable to log in after reconnect: " + e1.getMessage());
1373 }
1374 else
1375 {
1376 logger.error("Unable to log in after reconnect: " + e1.getMessage() + ". Giving up.");
1377 state = DISCONNECTED;
1378 }
1379 }
1380 catch (TimeoutException e1)
1381 {
1382
1383 logger.error("TimeoutException while trying to log in " + "after reconnect.");
1384 }
1385 }
1386 catch (IOException e)
1387 {
1388
1389
1390 logger.warn("Exception while trying to reconnect: " + e.getMessage());
1391 }
1392 numTries++;
1393 }
1394 }
1395
1396 private void cleanup()
1397 {
1398 disconnect();
1399 this.readerThread = null;
1400 }
1401
1402 @Override
1403 public String toString()
1404 {
1405 StringBuffer sb;
1406
1407 sb = new StringBuffer("ManagerConnection[");
1408 sb.append("id='").append(id).append("',");
1409 sb.append("hostname='").append(hostname).append("',");
1410 sb.append("port=").append(port).append(",");
1411 sb.append("systemHashcode=").append(System.identityHashCode(this)).append("]");
1412
1413 return sb.toString();
1414 }
1415
1416
1417
1418
1419
1420
1421 private static class ResponseHandlerResult implements Serializable
1422 {
1423
1424
1425
1426 private static final long serialVersionUID = 7831097958568769220L;
1427 private ManagerResponse response;
1428
1429 public ResponseHandlerResult()
1430 {
1431 }
1432
1433 public ManagerResponse getResponse()
1434 {
1435 return this.response;
1436 }
1437
1438 public void setResponse(ManagerResponse response)
1439 {
1440 this.response = response;
1441 }
1442 }
1443
1444
1445
1446
1447
1448 private static class DefaultSendActionCallback implements SendActionCallback, Serializable
1449 {
1450
1451
1452
1453 private static final long serialVersionUID = 2926598671855316803L;
1454 private final ResponseHandlerResult result;
1455
1456
1457
1458
1459
1460
1461 public DefaultSendActionCallback(ResponseHandlerResult result)
1462 {
1463 this.result = result;
1464 }
1465
1466 public void onResponse(ManagerResponse response)
1467 {
1468 synchronized (result)
1469 {
1470 result.setResponse(response);
1471 result.notifyAll();
1472 }
1473 }
1474 }
1475
1476
1477
1478
1479
1480 private static class ResponseEventHandler implements ManagerEventListener, SendActionCallback
1481 {
1482 private final ResponseEventsImpl events;
1483 private final Class<?> actionCompleteEventClass;
1484
1485
1486
1487
1488
1489
1490
1491
1492 public ResponseEventHandler(ResponseEventsImpl events, Class<?> actionCompleteEventClass)
1493 {
1494 this.events = events;
1495 this.actionCompleteEventClass = actionCompleteEventClass;
1496 }
1497
1498 public void onManagerEvent(ManagerEvent event)
1499 {
1500 synchronized (events)
1501 {
1502
1503 if (event instanceof ResponseEvent)
1504 {
1505 ResponseEvent responseEvent;
1506
1507 responseEvent = (ResponseEvent) event;
1508 events.addEvent(responseEvent);
1509 }
1510
1511
1512 if (actionCompleteEventClass.isAssignableFrom(event.getClass()))
1513 {
1514 events.setComplete(true);
1515
1516
1517 if (events.getResponse() != null)
1518 {
1519 events.notifyAll();
1520 }
1521 }
1522 }
1523 }
1524
1525 public void onResponse(ManagerResponse response)
1526 {
1527 synchronized (events)
1528 {
1529 events.setRepsonse(response);
1530 if (response instanceof ManagerError)
1531 {
1532 events.setComplete(true);
1533 }
1534
1535
1536
1537
1538 if (events.isComplete())
1539 {
1540 events.notifyAll();
1541 }
1542 }
1543 }
1544 }
1545
1546 private static class ProtocolIdentifierWrapper
1547 {
1548 String value;
1549 }
1550 }