View Javadoc

1   package org.asteriskjava.fastagi;
2   
3   import org.asteriskjava.util.DaemonThreadFactory;
4   import org.asteriskjava.util.Log;
5   import org.asteriskjava.util.LogFactory;
6   import org.asteriskjava.fastagi.internal.AgiChannelFactory;
7   import org.asteriskjava.fastagi.internal.DefaultAgiChannelFactory;
8   
9   import java.util.concurrent.RejectedExecutionException;
10  import java.util.concurrent.ThreadPoolExecutor;
11  import java.util.concurrent.TimeUnit;
12  import java.util.concurrent.SynchronousQueue;
13  
14  /**
15   * Abstract base class for FastAGI and AsyncAGI servers.
16   *
17   * @since 1.0.0
18   */
19  public abstract class AbstractAgiServer
20  {
21      private final Log logger = LogFactory.getLog(getClass());
22  
23      /**
24       * The default thread pool size.
25       */
26      private static final int DEFAULT_POOL_SIZE = 10;
27  
28      /**
29       * The default thread pool size.
30       */
31      private static final int DEFAULT_MAXIMUM_POOL_SIZE = 100;
32  
33      /**
34       * The minimum number of worker threads in the thread pool.
35       */
36      private int poolSize = DEFAULT_POOL_SIZE;
37  
38      /**
39       * The maximum number of worker threads in the thread pool. This equals the maximum number of
40       * concurrent requests this AgiServer can serve.
41       */
42      private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
43  
44      /**
45       * The thread pool that contains the worker threads to process incoming requests.
46       */
47      private ThreadPoolExecutor pool;
48  
49      /**
50       * The strategy to use for mapping AgiRequests to AgiScripts that serve them.
51       */
52      private MappingStrategy mappingStrategy;
53  
54      /**
55       * The factory to use for creating new AgiChannel instances.
56       */
57      private final AgiChannelFactory agiChannelFactory;
58  
59      private volatile boolean die = false;
60  
61      public AbstractAgiServer()
62      {
63          this(new DefaultAgiChannelFactory());
64      }
65  
66      /**
67       * Creates a new AbstractAgiServer with the given channel factory.
68       *
69       * @param agiChannelFactory the AgiChannelFactory to use for creating new AgiChannel instances.
70       * @since 1.0.0
71       */
72      public AbstractAgiServer(AgiChannelFactory agiChannelFactory)
73      {
74          if (agiChannelFactory == null)
75          {
76              throw new IllegalArgumentException("AgiChannelFactory must not be null");
77          }
78  
79          logger.debug("Using channelFactory " + agiChannelFactory.getClass().getCanonicalName());
80          this.agiChannelFactory = agiChannelFactory;
81      }
82  
83      /**
84       * Returns the AgiChannelFactory to use for creating new AgiChannel instances.
85       *
86       * @return the AgiChannelFactory to use for creating new AgiChannel instances.
87       */
88      protected AgiChannelFactory getAgiChannelFactory()
89      {
90          return this.agiChannelFactory;
91      }
92  
93      /**
94       * Returns the default number of worker threads in the thread pool.
95       *
96       * @return the default number of worker threads in the thread pool.
97       * @since 1.0.0
98       */
99      public int getPoolSize()
100     {
101         return poolSize;
102     }
103 
104     /**
105      * Sets the default number of worker threads in the thread pool.
106      * <p/>
107      * This is the number of threads that are available even if they are idle.
108      * <p/>
109      * The default pool size is 10.
110      *
111      * @param poolSize the size of the worker thread pool.
112      * @throws IllegalArgumentException if the new pool size is negative
113      * @see {@link java.util.concurrent.ThreadPoolExecutor#setCorePoolSize(int)}
114      */
115     public synchronized void setPoolSize(int poolSize)
116     {
117         if (poolSize < 0)
118         {
119             throw new IllegalArgumentException("New poolSize (" + poolSize + ") is must not be negative");
120         }
121 
122         if (pool != null)
123         {
124             pool.setCorePoolSize(poolSize);
125         }
126         this.poolSize = poolSize;
127     }
128 
129     /**
130      * Returns the maximum number of worker threads in the thread pool.
131      *
132      * @return the maximum number of worker threads in the thread pool.
133      * @since 1.0.0
134      */
135     public int getMaximumPoolSize()
136     {
137         return maximumPoolSize;
138     }
139 
140     /**
141      * Sets the maximum number of worker threads in the thread pool.
142      * <p/>
143      * This equals the maximum number of concurrent requests this AgiServer can serve.
144      * <p/>
145      * The default maximum pool size is 100.
146      *
147      * @param maximumPoolSize the maximum size of the worker thread pool.
148      * @throws IllegalArgumentException if maximumPoolSize is less than current pool size or less than or equal to 0.
149      * @see {@link java.util.concurrent.ThreadPoolExecutor#setMaximumPoolSize(int)}
150      */
151     public synchronized void setMaximumPoolSize(int maximumPoolSize)
152     {
153         if (maximumPoolSize <= 0)
154         {
155             throw new IllegalArgumentException("New maximumPoolSize (" + maximumPoolSize + ") is must be positive");
156         }
157 
158         if (maximumPoolSize < poolSize)
159         {
160             throw new IllegalArgumentException("New maximumPoolSize (" + maximumPoolSize + ") is less than current pool size (" + poolSize + ")");
161         }
162 
163         if (pool != null)
164         {
165             pool.setMaximumPoolSize(maximumPoolSize);
166         }
167         this.maximumPoolSize = maximumPoolSize;
168     }
169 
170     /**
171      * Sets the strategy to use for mapping AgiRequests to AgiScripts that serve them.
172      *
173      * @param mappingStrategy the mapping strategy to use.
174      */
175     public void setMappingStrategy(MappingStrategy mappingStrategy)
176     {
177         this.mappingStrategy = mappingStrategy;
178     }
179 
180     protected MappingStrategy getMappingStrategy()
181     {
182         return mappingStrategy;
183     }
184 
185     protected boolean isDie()
186     {
187         return die;
188     }
189 
190     protected synchronized void shutdown()
191     {
192         this.die = true;
193         if (pool != null)
194         {
195             pool.shutdown();
196         }
197     }
198 
199     @Override
200     protected void finalize() throws Throwable
201     {
202         super.finalize();
203 
204         this.die = true;
205         if (pool != null)
206         {
207             pool.shutdown();
208         }
209     }
210 
211     /**
212      * Execute the runnable using the configured ThreadPoolExecutor obtained from {@link #getPool()}.
213      *
214      * @param command the command to run.
215      * @throws RejectedExecutionException if the runnable can't be executed
216      */
217     protected void execute(Runnable command) throws RejectedExecutionException
218     {
219         if (isDie())
220         {
221             logger.warn("AgiServer is shutting down: Refused to execute AgiScript");
222             return;
223         }
224 
225         getPool().execute(command);
226     }
227 
228     protected void handleException(String message, Exception e)
229     {
230         logger.warn(message, e);
231     }
232 
233     private synchronized ThreadPoolExecutor getPool()
234     {
235         if (pool == null)
236         {
237             pool = createPool();
238             logger.info("Thread pool started.");
239         }
240 
241         return pool;
242     }
243 
244     /**
245      * Creates a new ThreadPoolExecutor to serve the AGI requests. The nature of this pool
246      * defines how many concurrent requests can be handled. The default implementation
247      * returns a dynamic thread pool defined by the poolSize and maximumPoolSize properties.<p>
248      * You can override this method to change this behavior. For example you can use a cached
249      * pool with
250      * <pre>
251      * return Executors.newCachedThreadPool(new DaemonThreadFactory());
252      * </pre>
253      *
254      * @return the ThreadPoolExecutor to use for serving AGI requests.
255      * @see #setPoolSize(int)
256      * @see #setMaximumPoolSize(int)
257      */
258     protected ThreadPoolExecutor createPool()
259     {
260         return new ThreadPoolExecutor(
261                 poolSize,
262                 (maximumPoolSize < poolSize) ? poolSize : maximumPoolSize,
263                 50000L, TimeUnit.MILLISECONDS,
264                 new SynchronousQueue<Runnable>(),
265                 new DaemonThreadFactory()
266         );
267     }
268 }