diff options
author | Martin Ritchie <ritchiem@apache.org> | 2006-12-13 11:05:28 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2006-12-13 11:05:28 +0000 |
commit | ceb9cce91c42e69644bca614647099a5d5f4a3c7 (patch) | |
tree | 6250bdfdeb9afe03e7ecb06d277c96d8a062acea /java | |
parent | 67b2d60cd33168e7c2dbcf3f4abd95a6efa9d4b6 (diff) | |
download | qpid-python-ceb9cce91c42e69644bca614647099a5d5f4a3c7.tar.gz |
QPID-172
RejectedExecutionException.
In fireEvent
added additional checks :_poolReference.getPool() != null && !_poolReference.getPool().isShutdown()
to if (job.activate())
As active jobs were being put on a pool that was shutdown.
Included a test to check that the RejectedExecutionException doesn't occur.
(add ignore for example/target)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486596 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 335 insertions, 1 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index ba614a26c2..2ace2b8348 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -58,7 +58,7 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH Job job = getJobForSession(session); job.acquire(); //prevents this job being removed from _jobs job.add(event); - if (job.activate()) + if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown()) { _poolReference.getPool().execute(job); } diff --git a/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java new file mode 100644 index 0000000000..af1a28e787 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.pool; + +import junit.framework.TestCase; +import junit.framework.Assert; +import org.apache.qpid.session.TestSession; + +import java.util.concurrent.RejectedExecutionException; + +public class PoolingFilterTest extends TestCase +{ + private PoolingFilter _pool; + ReferenceCountingExecutorService _executorService; + + public void setUp() + { + //Create Pool + _executorService = ReferenceCountingExecutorService.getInstance(); + _executorService.acquireExecutorService(); + _pool = new PoolingFilter(_executorService, PoolingFilter.WRITE_EVENTS, + "AsynchronousWriteFilter"); + + } + + public void testRejectedExecution() throws Exception + { + _pool.filterWrite(null, new TestSession(), null); + + //Shutdown the pool + _executorService.getPool().shutdownNow(); + + try + { + //prior to fix for QPID-172 this would throw RejectedExecutionException + _pool.filterWrite(null, new TestSession(), null); + } + catch (RejectedExecutionException rje) + { + Assert.fail("RejectedExecutionException should not occur after pool has shutdown:" + rje); + } + } +} diff --git a/java/common/src/test/java/org/apache/qpid/session/TestSession.java b/java/common/src/test/java/org/apache/qpid/session/TestSession.java new file mode 100644 index 0000000000..f10d55e9d0 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/session/TestSession.java @@ -0,0 +1,273 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.session; + +import org.apache.mina.common.*; + +import java.net.SocketAddress; +import java.util.Set; + +public class TestSession implements IoSession +{ + public TestSession() + { + } + + public IoService getService() + { + return null; //TODO + } + + public IoServiceConfig getServiceConfig() + { + return null; //TODO + } + + public IoHandler getHandler() + { + return null; //TODO + } + + public IoSessionConfig getConfig() + { + return null; //TODO + } + + public IoFilterChain getFilterChain() + { + return null; //TODO + } + + public WriteFuture write(Object message) + { + return null; //TODO + } + + public CloseFuture close() + { + return null; //TODO + } + + public Object getAttachment() + { + return null; //TODO + } + + public Object setAttachment(Object attachment) + { + return null; //TODO + } + + public Object getAttribute(String key) + { + return null; //TODO + } + + public Object setAttribute(String key, Object value) + { + return null; //TODO + } + + public Object setAttribute(String key) + { + return null; //TODO + } + + public Object removeAttribute(String key) + { + return null; //TODO + } + + public boolean containsAttribute(String key) + { + return false; //TODO + } + + public Set getAttributeKeys() + { + return null; //TODO + } + + public TransportType getTransportType() + { + return null; //TODO + } + + public boolean isConnected() + { + return false; //TODO + } + + public boolean isClosing() + { + return false; //TODO + } + + public CloseFuture getCloseFuture() + { + return null; //TODO + } + + public SocketAddress getRemoteAddress() + { + return null; //TODO + } + + public SocketAddress getLocalAddress() + { + return null; //TODO + } + + public SocketAddress getServiceAddress() + { + return null; //TODO + } + + public int getIdleTime(IdleStatus status) + { + return 0; //TODO + } + + public long getIdleTimeInMillis(IdleStatus status) + { + return 0; //TODO + } + + public void setIdleTime(IdleStatus status, int idleTime) + { + //TODO + } + + public int getWriteTimeout() + { + return 0; //TODO + } + + public long getWriteTimeoutInMillis() + { + return 0; //TODO + } + + public void setWriteTimeout(int writeTimeout) + { + //TODO + } + + public TrafficMask getTrafficMask() + { + return null; //TODO + } + + public void setTrafficMask(TrafficMask trafficMask) + { + //TODO + } + + public void suspendRead() + { + //TODO + } + + public void suspendWrite() + { + //TODO + } + + public void resumeRead() + { + //TODO + } + + public void resumeWrite() + { + //TODO + } + + public long getReadBytes() + { + return 0; //TODO + } + + public long getWrittenBytes() + { + return 0; //TODO + } + + public long getReadMessages() + { + return 0; + } + + public long getWrittenMessages() + { + return 0; + } + + public long getWrittenWriteRequests() + { + return 0; //TODO + } + + public int getScheduledWriteRequests() + { + return 0; //TODO + } + + public int getScheduledWriteBytes() + { + return 0; //TODO + } + + public long getCreationTime() + { + return 0; //TODO + } + + public long getLastIoTime() + { + return 0; //TODO + } + + public long getLastReadTime() + { + return 0; //TODO + } + + public long getLastWriteTime() + { + return 0; //TODO + } + + public boolean isIdle(IdleStatus status) + { + return false; //TODO + } + + public int getIdleCount(IdleStatus status) + { + return 0; //TODO + } + + public long getLastIdleTime(IdleStatus status) + { + return 0; //TODO + } +} |