summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java28
1 files changed, 27 insertions, 1 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index c6e370206e..e963fb23ea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -58,6 +58,12 @@ import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.logging.actors.AMQPChannelActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
public class AMQChannel
{
@@ -112,13 +118,22 @@ public class AMQChannel
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
- private boolean _closing;
+ private boolean _closing;
+
+ private LogActor _actor;
+ private LogSubject _logSubject;
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
_session = session;
_channelId = channelId;
+
+ _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
+ _logSubject = new ChannelLogSubject(this);
+
+ _actor.message(ChannelMessages.CHN_1001());
+
_storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
@@ -386,6 +401,8 @@ public class AMQChannel
private void setClosing(boolean closing)
{
_closing = closing;
+
+ CurrentActor.get().message(_logSubject, ChannelMessages.CHN_1003());
}
private void unsubscribeAllConsumers() throws AMQException
@@ -789,6 +806,8 @@ public class AMQChannel
boolean wasSuspended = _suspended.getAndSet(suspended);
if (wasSuspended != suspended)
{
+ _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started"));
+
if (wasSuspended)
{
// may need to deliver queued messages
@@ -891,6 +910,8 @@ public class AMQChannel
public void setCredit(final long prefetchSize, final int prefetchCount)
{
+ //fixme
+// _actor.message(ChannelMessages.CHN_100X(prefetchSize, prefetchCount);
_creditManager.setCreditLimits(prefetchSize, prefetchCount);
}
@@ -942,4 +963,9 @@ public class AMQChannel
{
return _recordDeliveryMethod;
}
+
+ public LogActor getLogActor()
+ {
+ return _actor;
+ }
}