summaryrefslogtreecommitdiff
path: root/common/src/org/apache/qpid/pool/PoolingFilter.java
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/org/apache/qpid/pool/PoolingFilter.java')
-rw-r--r--common/src/org/apache/qpid/pool/PoolingFilter.java186
1 files changed, 0 insertions, 186 deletions
diff --git a/common/src/org/apache/qpid/pool/PoolingFilter.java b/common/src/org/apache/qpid/pool/PoolingFilter.java
deleted file mode 100644
index ba614a26c2..0000000000
--- a/common/src/org/apache/qpid/pool/PoolingFilter.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.pool;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterAdapter;
-import org.apache.mina.common.IoSession;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
-{
- private static final Logger _logger = Logger.getLogger(PoolingFilter.class);
- public static final Set<EventType> READ_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.RECEIVED));
- public static final Set<EventType> WRITE_EVENTS = new HashSet<EventType>(Arrays.asList(EventType.WRITE));
-
- private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>();
- private final ReferenceCountingExecutorService _poolReference;
- private final Set<EventType> _asyncTypes;
-
- private final String _name;
- private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
-
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, Set<EventType> asyncTypes, String name)
- {
- _poolReference = refCountingPool;
- _asyncTypes = asyncTypes;
- _name = name;
- }
-
- private void fireEvent(IoSession session, Event event)
- {
- if (_asyncTypes.contains(event.getType()))
- {
- Job job = getJobForSession(session);
- job.acquire(); //prevents this job being removed from _jobs
- job.add(event);
- if (job.activate())
- {
- _poolReference.getPool().execute(job);
- }
- }
- else
- {
- event.process(session);
- }
- }
-
- private Job getJobForSession(IoSession session)
- {
- Job job = _jobs.get(session);
- return job == null ? createJobForSession(session) : job;
- }
-
- private Job createJobForSession(IoSession session)
- {
- return addJobForSession(session, new Job(session, this, _maxEvents));
- }
-
- private Job addJobForSession(IoSession session, Job job)
- {
- //atomic so ensures all threads agree on the same job
- Job existing = _jobs.putIfAbsent(session, job);
- return existing == null ? job : existing;
- }
-
- //Job.JobCompletionHandler
- public void completed(IoSession session, Job job)
- {
- if (job.isComplete())
- {
- job.release();
- if (!job.isReferenced())
- {
- _jobs.remove(session);
- }
- }
- else
- {
- if (job.activate())
- {
- _poolReference.getPool().execute(job);
- }
- }
- }
-
- //IoFilter methods that are processed by threads on the pool
-
- public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception
- {
- fireEvent(session, new Event(nextFilter, EventType.OPENED, null));
- }
-
- public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception
- {
- fireEvent(session, new Event(nextFilter, EventType.CLOSED, null));
- }
-
- public void sessionIdle(NextFilter nextFilter, IoSession session,
- IdleStatus status) throws Exception
- {
- fireEvent(session, new Event(nextFilter, EventType.IDLE, status));
- }
-
- public void exceptionCaught(NextFilter nextFilter, IoSession session,
- Throwable cause) throws Exception
- {
- fireEvent(session, new Event(nextFilter, EventType.EXCEPTION, cause));
- }
-
- public void messageReceived(NextFilter nextFilter, IoSession session,
- Object message) throws Exception
- {
- //ByteBufferUtil.acquireIfPossible( message );
- fireEvent(session, new Event(nextFilter, EventType.RECEIVED, message));
- }
-
- public void messageSent(NextFilter nextFilter, IoSession session,
- Object message) throws Exception
- {
- //ByteBufferUtil.acquireIfPossible( message );
- fireEvent(session, new Event(nextFilter, EventType.SENT, message));
- }
-
- public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception
- {
- fireEvent(session, new Event(nextFilter, EventType.WRITE, writeRequest));
- }
-
- //IoFilter methods that are processed on current thread (NOT on pooled thread)
-
- public void filterClose(NextFilter nextFilter, IoSession session) throws Exception
- {
- nextFilter.filterClose(session);
- }
-
- public void sessionCreated(NextFilter nextFilter, IoSession session)
- {
- nextFilter.sessionCreated(session);
- }
-
- public String toString()
- {
- return _name;
- }
-
- // LifeCycle methods
-
- public void init()
- {
- _logger.info("Init called on PoolingFilter " + toString());
- // called when the filter is initialised in the chain. If the reference count is
- // zero this acquire will initialise the pool
- _poolReference.acquireExecutorService();
- }
-
- public void destroy()
- {
- _logger.info("Destroy called on PoolingFilter " + toString());
- // when the reference count gets to zero we release the executor service
- _poolReference.releaseExecutorService();
- }
-}