001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.geronimo.connector.outbound; 018 019 import java.util.ArrayList; 020 import java.util.Timer; 021 import java.util.TimerTask; 022 import java.util.List; 023 import java.util.concurrent.Semaphore; 024 import java.util.concurrent.TimeUnit; 025 import java.util.concurrent.locks.ReadWriteLock; 026 import java.util.concurrent.locks.ReentrantReadWriteLock; 027 028 import javax.resource.ResourceException; 029 import javax.resource.spi.ConnectionRequestInfo; 030 import javax.resource.spi.ManagedConnectionFactory; 031 import javax.resource.spi.ManagedConnection; 032 import javax.security.auth.Subject; 033 034 import org.apache.commons.logging.Log; 035 import org.apache.commons.logging.LogFactory; 036 037 /** 038 * @version $Rev: 620213 $ $Date: 2008-02-10 00:13:09 +0100 (Sun, 10 Feb 2008) $ 039 */ 040 public abstract class AbstractSinglePoolConnectionInterceptor implements ConnectionInterceptor, PoolingAttributes { 041 protected static Log log = LogFactory.getLog(AbstractSinglePoolConnectionInterceptor.class.getName()); 042 protected final ConnectionInterceptor next; 043 private final ReadWriteLock resizeLock = new ReentrantReadWriteLock(); 044 protected Semaphore permits; 045 protected int blockingTimeoutMilliseconds; 046 protected int connectionCount = 0; 047 private long idleTimeoutMilliseconds; 048 private IdleReleaser idleReleaser; 049 protected Timer timer = PoolIdleReleaserTimer.getTimer(); 050 protected int maxSize = 0; 051 protected int minSize = 0; 052 protected int shrinkLater = 0; 053 protected volatile boolean destroyed = false; 054 055 public AbstractSinglePoolConnectionInterceptor(final ConnectionInterceptor next, 056 int maxSize, 057 int minSize, 058 int blockingTimeoutMilliseconds, 059 int idleTimeoutMinutes) { 060 this.next = next; 061 this.maxSize = maxSize; 062 this.minSize = minSize; 063 this.blockingTimeoutMilliseconds = blockingTimeoutMilliseconds; 064 setIdleTimeoutMinutes(idleTimeoutMinutes); 065 permits = new Semaphore(maxSize, true); 066 } 067 068 public void getConnection(ConnectionInfo connectionInfo) throws ResourceException { 069 if (connectionInfo.getManagedConnectionInfo().getManagedConnection() != null) { 070 if (log.isTraceEnabled()) { 071 log.trace("using already assigned connection " + connectionInfo.getConnectionHandle() + " for managed connection " + connectionInfo.getManagedConnectionInfo().getManagedConnection() + " to pool " + this); 072 } 073 return; 074 } 075 try { 076 resizeLock.readLock().lock(); 077 try { 078 if (permits.tryAcquire(blockingTimeoutMilliseconds, TimeUnit.MILLISECONDS)) { 079 try { 080 internalGetConnection(connectionInfo); 081 } catch (ResourceException e) { 082 permits.release(); 083 throw e; 084 } 085 } else { 086 throw new ResourceException("No ManagedConnections available " 087 + "within configured blocking timeout ( " 088 + blockingTimeoutMilliseconds 089 + " [ms] ) for pool " + this); 090 091 } 092 } finally { 093 resizeLock.readLock().unlock(); 094 } 095 096 } catch (InterruptedException ie) { 097 throw new ResourceException("Interrupted while requesting permit.", ie); 098 } // end of try-catch 099 } 100 101 protected abstract void internalGetConnection(ConnectionInfo connectionInfo) throws ResourceException; 102 103 public void returnConnection(ConnectionInfo connectionInfo, 104 ConnectionReturnAction connectionReturnAction) { 105 if (log.isTraceEnabled()) { 106 log.trace("returning connection " + connectionInfo.getConnectionHandle() + " for MCI " + connectionInfo.getManagedConnectionInfo() + " and MC " + connectionInfo.getManagedConnectionInfo().getManagedConnection() + " to pool " + this); 107 } 108 109 // not strictly synchronized with destroy(), but pooled operations in internalReturn() are... 110 if (destroyed) { 111 try { 112 connectionInfo.getManagedConnectionInfo().getManagedConnection().destroy(); 113 } catch (ResourceException re) { 114 // empty 115 } 116 return; 117 } 118 119 resizeLock.readLock().lock(); 120 try { 121 ManagedConnectionInfo mci = connectionInfo.getManagedConnectionInfo(); 122 if (connectionReturnAction == ConnectionReturnAction.RETURN_HANDLE && mci.hasConnectionHandles()) { 123 if (log.isTraceEnabled()) { 124 log.trace("Return request at pool with connection handles! " + connectionInfo.getConnectionHandle() + " for MCI " + connectionInfo.getManagedConnectionInfo() + " and MC " + connectionInfo.getManagedConnectionInfo().getManagedConnection() + " to pool " + this, new Exception("Stack trace")); 125 } 126 return; 127 } 128 129 boolean releasePermit = internalReturn(connectionInfo, connectionReturnAction); 130 131 if (releasePermit) { 132 permits.release(); 133 } 134 } finally { 135 resizeLock.readLock().unlock(); 136 } 137 } 138 139 protected boolean internalReturn(ConnectionInfo connectionInfo, ConnectionReturnAction connectionReturnAction) { 140 ManagedConnectionInfo mci = connectionInfo.getManagedConnectionInfo(); 141 ManagedConnection mc = mci.getManagedConnection(); 142 try { 143 mc.cleanup(); 144 } catch (ResourceException e) { 145 connectionReturnAction = ConnectionReturnAction.DESTROY; 146 } 147 148 boolean releasePermit; 149 synchronized (getPool()) { 150 // a bit redundant, but this closes a small timing hole... 151 if (destroyed) { 152 try { 153 mc.destroy(); 154 } 155 catch (ResourceException re) { 156 //ignore 157 } 158 return doRemove(mci); 159 } 160 if (shrinkLater > 0) { 161 //nothing can get in the pool while shrinkLater > 0, so releasePermit is false here. 162 connectionReturnAction = ConnectionReturnAction.DESTROY; 163 shrinkLater--; 164 releasePermit = false; 165 } else if (connectionReturnAction == ConnectionReturnAction.RETURN_HANDLE) { 166 mci.setLastUsed(System.currentTimeMillis()); 167 doAdd(mci); 168 return true; 169 } else { 170 releasePermit = doRemove(mci); 171 } 172 } 173 //we must destroy connection. 174 next.returnConnection(connectionInfo, connectionReturnAction); 175 connectionCount--; 176 return releasePermit; 177 } 178 179 protected abstract void internalDestroy(); 180 181 // Cancel the IdleReleaser TimerTask (fixes memory leak) and clean up the pool 182 public void destroy() { 183 destroyed = true; 184 if (idleReleaser != null) 185 idleReleaser.cancel(); 186 internalDestroy(); 187 next.destroy(); 188 } 189 190 public int getPartitionCount() { 191 return 1; 192 } 193 194 public int getPartitionMaxSize() { 195 return maxSize; 196 } 197 198 public void setPartitionMaxSize(int newMaxSize) throws InterruptedException { 199 if (newMaxSize <= 0) { 200 throw new IllegalArgumentException("Max size must be positive, not " + newMaxSize); 201 } 202 if (newMaxSize != getPartitionMaxSize()) { 203 resizeLock.writeLock().lock(); 204 try { 205 ResizeInfo resizeInfo = new ResizeInfo(this.minSize, permits.availablePermits(), connectionCount, newMaxSize); 206 permits = new Semaphore(newMaxSize, true); 207 //pre-acquire permits for the existing checked out connections that will not be closed when they are returned. 208 for (int i = 0; i < resizeInfo.getTransferCheckedOut(); i++) { 209 permits.acquire(); 210 } 211 //make sure shrinkLater is 0 while discarding excess connections 212 this.shrinkLater = 0; 213 //transfer connections we are going to keep 214 transferConnections(newMaxSize, resizeInfo.getShrinkNow()); 215 this.shrinkLater = resizeInfo.getShrinkLater(); 216 this.minSize = resizeInfo.getNewMinSize(); 217 this.maxSize = newMaxSize; 218 } finally { 219 resizeLock.writeLock().unlock(); 220 } 221 } 222 } 223 224 protected abstract boolean doRemove(ManagedConnectionInfo mci); 225 226 protected abstract void doAdd(ManagedConnectionInfo mci); 227 228 protected abstract Object getPool(); 229 230 231 static final class ResizeInfo { 232 233 private final int newMinSize; 234 private final int shrinkNow; 235 private final int shrinkLater; 236 private final int transferCheckedOut; 237 238 ResizeInfo(final int oldMinSize, final int oldPermitsAvailable, final int oldConnectionCount, final int newMaxSize) { 239 final int checkedOut = oldConnectionCount - oldPermitsAvailable; 240 int shrinkLater = checkedOut - newMaxSize; 241 if (shrinkLater < 0) { 242 shrinkLater = 0; 243 } 244 this.shrinkLater = shrinkLater; 245 int shrinkNow = oldConnectionCount - newMaxSize - shrinkLater; 246 if (shrinkNow < 0) { 247 shrinkNow = 0; 248 } 249 this.shrinkNow = shrinkNow; 250 if (newMaxSize >= oldMinSize) { 251 newMinSize = oldMinSize; 252 } else { 253 newMinSize = newMaxSize; 254 } 255 this.transferCheckedOut = checkedOut - shrinkLater; 256 } 257 258 public int getNewMinSize() { 259 return newMinSize; 260 } 261 262 public int getShrinkNow() { 263 return shrinkNow; 264 } 265 266 public int getShrinkLater() { 267 return shrinkLater; 268 } 269 270 public int getTransferCheckedOut() { 271 return transferCheckedOut; 272 } 273 274 275 } 276 277 protected abstract void transferConnections(int maxSize, int shrinkNow); 278 279 public abstract int getIdleConnectionCount(); 280 281 public int getConnectionCount() { 282 return connectionCount; 283 } 284 285 public int getPartitionMinSize() { 286 return minSize; 287 } 288 289 public void setPartitionMinSize(int minSize) { 290 this.minSize = minSize; 291 } 292 293 public int getBlockingTimeoutMilliseconds() { 294 return blockingTimeoutMilliseconds; 295 } 296 297 public void setBlockingTimeoutMilliseconds(int blockingTimeoutMilliseconds) { 298 if (blockingTimeoutMilliseconds < 0) { 299 throw new IllegalArgumentException("blockingTimeoutMilliseconds must be positive or 0, not " + blockingTimeoutMilliseconds); 300 } 301 if (blockingTimeoutMilliseconds == 0) { 302 this.blockingTimeoutMilliseconds = Integer.MAX_VALUE; 303 } else { 304 this.blockingTimeoutMilliseconds = blockingTimeoutMilliseconds; 305 } 306 } 307 308 public int getIdleTimeoutMinutes() { 309 return (int) idleTimeoutMilliseconds / (1000 * 60); 310 } 311 312 public void setIdleTimeoutMinutes(int idleTimeoutMinutes) { 313 if (idleTimeoutMinutes < 0) { 314 throw new IllegalArgumentException("idleTimeoutMinutes must be positive or 0, not " + idleTimeoutMinutes); 315 } 316 if (idleReleaser != null) { 317 idleReleaser.cancel(); 318 } 319 if (idleTimeoutMinutes > 0) { 320 this.idleTimeoutMilliseconds = idleTimeoutMinutes * 60 * 1000; 321 idleReleaser = new IdleReleaser(this); 322 timer.schedule(idleReleaser, this.idleTimeoutMilliseconds, this.idleTimeoutMilliseconds); 323 } 324 } 325 326 protected abstract void getExpiredManagedConnectionInfos(long threshold, List<ManagedConnectionInfo> killList); 327 328 protected boolean addToPool(ManagedConnectionInfo mci) { 329 boolean added; 330 synchronized (getPool()) { 331 connectionCount++; 332 added = getPartitionMaxSize() > getIdleConnectionCount(); 333 if (added) { 334 doAdd(mci); 335 } 336 } 337 return added; 338 } 339 340 // static class to permit chain of strong references from preventing ClassLoaders 341 // from being GC'ed. 342 private static class IdleReleaser extends TimerTask { 343 private AbstractSinglePoolConnectionInterceptor parent; 344 345 private IdleReleaser(AbstractSinglePoolConnectionInterceptor parent) { 346 this.parent = parent; 347 } 348 349 public boolean cancel() { 350 this.parent = null; 351 return super.cancel(); 352 } 353 354 public void run() { 355 // protect against interceptor being set to null mid-execution 356 AbstractSinglePoolConnectionInterceptor interceptor = parent; 357 if (interceptor == null) 358 return; 359 360 interceptor.resizeLock.readLock().lock(); 361 try { 362 long threshold = System.currentTimeMillis() - interceptor.idleTimeoutMilliseconds; 363 List<ManagedConnectionInfo> killList = new ArrayList<ManagedConnectionInfo>(interceptor.getPartitionMaxSize()); 364 interceptor.getExpiredManagedConnectionInfos(threshold, killList); 365 for (ManagedConnectionInfo managedConnectionInfo : killList) { 366 ConnectionInfo killInfo = new ConnectionInfo(managedConnectionInfo); 367 interceptor.internalReturn(killInfo, ConnectionReturnAction.DESTROY); 368 } 369 interceptor.permits.release(killList.size()); 370 } catch (Throwable t) { 371 log.error("Error occurred during execution of ExpirationMonitor TimerTask", t); 372 } finally { 373 interceptor.resizeLock.readLock().unlock(); 374 } 375 } 376 377 } 378 379 // Currently only a short-lived (10 millisecond) task. 380 // So, FillTask, unlike IdleReleaser, shouldn't cause GC problems. 381 protected class FillTask extends TimerTask { 382 private final ManagedConnectionFactory managedConnectionFactory; 383 private final Subject subject; 384 private final ConnectionRequestInfo cri; 385 386 public FillTask(ConnectionInfo connectionInfo) { 387 managedConnectionFactory = connectionInfo.getManagedConnectionInfo().getManagedConnectionFactory(); 388 subject = connectionInfo.getManagedConnectionInfo().getSubject(); 389 cri = connectionInfo.getManagedConnectionInfo().getConnectionRequestInfo(); 390 } 391 392 public void run() { 393 resizeLock.readLock().lock(); 394 try { 395 while (connectionCount < minSize) { 396 ManagedConnectionInfo mci = new ManagedConnectionInfo(managedConnectionFactory, cri); 397 mci.setSubject(subject); 398 ConnectionInfo ci = new ConnectionInfo(mci); 399 try { 400 next.getConnection(ci); 401 } catch (ResourceException e) { 402 return; 403 } 404 boolean added = addToPool(mci); 405 if (!added) { 406 internalReturn(ci, ConnectionReturnAction.DESTROY); 407 return; 408 } 409 } 410 } catch (Throwable t) { 411 log.error("FillTask encountered error in run method", t); 412 } finally { 413 resizeLock.readLock().unlock(); 414 } 415 } 416 417 } 418 }