diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 28 |
1 files changed, 27 insertions, 1 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index c6e370206e..e963fb23ea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -58,6 +58,12 @@ import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.logging.subjects.ChannelLogSubject; +import org.apache.qpid.server.logging.actors.AMQPChannelActor; +import org.apache.qpid.server.logging.actors.CurrentActor; public class AMQChannel { @@ -112,13 +118,22 @@ public class AMQChannel // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; - private boolean _closing; + private boolean _closing; + + private LogActor _actor; + private LogSubject _logSubject; public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { _session = session; _channelId = channelId; + + _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); + _logSubject = new ChannelLogSubject(this); + + _actor.message(ChannelMessages.CHN_1001()); + _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId); @@ -386,6 +401,8 @@ public class AMQChannel private void setClosing(boolean closing) { _closing = closing; + + CurrentActor.get().message(_logSubject, ChannelMessages.CHN_1003()); } private void unsubscribeAllConsumers() throws AMQException @@ -789,6 +806,8 @@ public class AMQChannel boolean wasSuspended = _suspended.getAndSet(suspended); if (wasSuspended != suspended) { + _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started")); + if (wasSuspended) { // may need to deliver queued messages @@ -891,6 +910,8 @@ public class AMQChannel public void setCredit(final long prefetchSize, final int prefetchCount) { + //fixme +// _actor.message(ChannelMessages.CHN_100X(prefetchSize, prefetchCount); _creditManager.setCreditLimits(prefetchSize, prefetchCount); } @@ -942,4 +963,9 @@ public class AMQChannel { return _recordDeliveryMethod; } + + public LogActor getLogActor() + { + return _actor; + } } |