summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java7
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java93
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java281
8 files changed, 426 insertions, 14 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index aa390d6c26..2a46ee53b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -57,6 +57,12 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.logging.actors.AMQPChannelActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
public class AMQChannel
{
@@ -111,13 +117,22 @@ public class AMQChannel
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
- private boolean _closing;
+ private boolean _closing;
+
+ private LogActor _actor;
+ private LogSubject _logSubject;
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
_session = session;
_channelId = channelId;
+
+ _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
+ _logSubject = new ChannelLogSubject(this);
+
+ _actor.message(ChannelMessages.CHN_1001());
+
_storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
@@ -371,6 +386,8 @@ public class AMQChannel
private void setClosing(boolean closing)
{
_closing = closing;
+
+ CurrentActor.get().message(_logSubject, ChannelMessages.CHN_1003());
}
private void unsubscribeAllConsumers() throws AMQException
@@ -774,6 +791,8 @@ public class AMQChannel
boolean wasSuspended = _suspended.getAndSet(suspended);
if (wasSuspended != suspended)
{
+ _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started"));
+
if (wasSuspended)
{
// may need to deliver queued messages
@@ -865,6 +884,8 @@ public class AMQChannel
public void setCredit(final long prefetchSize, final int prefetchCount)
{
+ //fixme
+// _actor.message(ChannelMessages.CHN_100X(prefetchSize, prefetchCount);
_creditManager.setCreditLimits(prefetchSize, prefetchCount);
}
@@ -906,4 +927,9 @@ public class AMQChannel
{
return _recordDeliveryMethod;
}
+
+ public LogActor getLogActor()
+ {
+ return _actor;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
index d9f95ecb8e..24df17683f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
+++ b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
@@ -239,8 +239,8 @@ CON-1001 = Open[ : Client ID : {0}][ : Protocol Version : {1}]
CON-1002 = Close
#Channel
-# 0 - count
-CHN-1001 = Create : Prefetch {0, number}
+CHN-1001 = Create
+# : Prefetch Size {0,number} : Count {1,number}
# 0 - flow
CHN-1002 = Flow {0}
CHN-1003 = Close
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 1da5b1c26e..e8ea56bafd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -56,6 +56,7 @@ import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -205,6 +206,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _sessionID;
}
+ public LogActor getLogActor()
+ {
+ return _actor;
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -230,7 +236,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
int channelId = frame.getChannel();
AMQBody body = frame.getBodyFrame();
- CurrentActor.set(_actor);
+ //Look up the Channel's Actor and set that as the current actor
+ // If that is not available then we can use the ConnectionActor
+ // that is associated with this AMQMPSession.
+ LogActor channelActor = null;
+ if (_channelMap.get(channelId) != null)
+ {
+ channelActor = _channelMap.get(channelId).getLogActor();
+ }
+ CurrentActor.set(channelActor == null ? _actor : channelActor);
+
try
{
if (_logger.isDebugEnabled())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index f721730d9c..fff406bb3d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -28,6 +28,8 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -38,6 +40,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
{
long getSessionID();
+ LogActor getLogActor();
+
public static final class ProtocolSessionIdentifier
{
private final Object _sessionIdentifier;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
index 2a37eae728..b4dd3da2e6 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
@@ -27,15 +27,12 @@ public class ChannelMessagesTest extends AbstractTestMessages
{
public void testMessage1001()
{
- Integer prefetch = 12345;
-
- _logMessage = ChannelMessages.CHN_1001(prefetch);
+ _logMessage = ChannelMessages.CHN_1001();
List<Object> log = performLog();
// We use the MessageFormat here as that is what the ChannelMessage
// will do, this makes the resulting value 12,345
- String[] expected = {"Create", "Prefetch",
- MessageFormat.format("{0, number}", prefetch)};
+ String[] expected = {"Create"};
validateLogMessage(log, "CHN-1001", expected);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
index b9dcd972b1..c301969ae5 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -62,6 +63,11 @@ public class MockProtocolSession implements AMQProtocolSession
return _sessionID;
}
+ public LogActor getLogActor()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java b/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java
index 3437fea236..c5015760a5 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java
@@ -22,8 +22,6 @@ package org.apache.qpid.server.logging;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.LogMonitor;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
import java.io.IOException;
@@ -31,7 +29,6 @@ public class AbstractTestLogging extends QpidTestCase
{
protected LogMonitor _monitor;
-
@Override
public void setUp() throws Exception
{
@@ -41,21 +38,107 @@ public class AbstractTestLogging extends QpidTestCase
/**
* assert that the requested log message has not occured
+ *
* @param log
+ *
* @throws IOException
*/
public void assertLoggingNotYetOccured(String log) throws IOException
{
// Ensure the alert has not occured yet
- assertEquals("Message has already occured:"+log, 0,
+ assertEquals("Message has already occured:" + log, 0,
_monitor.findMatches(log).size());
}
+
+ protected void validateMessageID(String id, String log)
+ {
+ assertEquals("Incorrect CHN message",id, getMessageID(log));
+ }
+
+ protected String getMessageID(String log)
+ {
+ String message = fromMessage(log);
+
+ return message.substring(0, message.indexOf(" "));
+ }
+
+ /**
+ * Return the first channel id from the log string
+ * ' ch;X' if there is no channel id return -1.
+ *
+ * @param log the log string to search.
+ *
+ * @return channel id or -1 if no channel id exists.
+ */
+ protected int getChannelID(String log)
+ {
+ int start = log.indexOf("ch:") + 3;
+
+ // If we do a check for ] as the boundary we will get cases where log
+ // is presented with the bounding. If we don't match a ] then we can use
+ // the end of the string as the boundary.
+ int end = log.indexOf("]", start);
+ if (end == -1)
+ {
+ end = log.length();
+ }
+
+ try
+ {
+ return Integer.parseInt(log.substring(start, end));
+ }
+ catch (Exception e)
+ {
+ return -1;
+ }
+ }
+
+ protected String fromMessage(String log)
+ {
+ int messageStart = log.indexOf("MESSAGE");
+
+ int startSubject = log.indexOf("]", messageStart) + 1;
+ int start = log.indexOf("]", startSubject) + 1;
+
+ // If we don't have a subject then the second indexOf will return 0
+ // in which case we can use the end of the actor as the index.
+ if (start == 0)
+ {
+ start = startSubject;
+ }
+
+ return log.substring(start).trim();
+ }
+
+ protected String fromSubject(String log)
+ {
+ int start = log.indexOf("[") + 1;
+ // Take the second index
+ start = log.indexOf("[", start) + 1;
+
+ // There may not be a subject so in that case return nothing.
+ if (start == -1)
+ {
+ return "";
+ }
+
+ int end = log.indexOf("]", start);
+ return log.substring(start, end);
+ }
+
+ protected String fromActor(String log)
+ {
+ int start = log.indexOf("[") + 1;
+ int end = log.indexOf("]", start);
+ return log.substring(start, end).trim();
+ }
+
protected int extractConnectionID(String log)
{
int conIDStart = log.indexOf("con:") + 4;
int conIDEnd = log.indexOf("(", conIDStart);
return Integer.parseInt(log.substring(conIDStart, conIDEnd));
}
-
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
new file mode 100644
index 0000000000..9aa3799ca5
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.io.File;
+import java.util.List;
+
+public class ChannelLoggingTest extends AbstractTestLogging
+{
+ private static final String CHANNEL_PREFIX = "CHN-";
+
+ public void setUp() throws Exception
+ {
+ // set QPID_WORK to be [QPID_WORK|io.tmpdir]/<testName>
+ setSystemProperty("QPID_WORK",
+ System.getProperty("QPID_WORK",
+ System.getProperty("java.io.tmpdir"))
+ + File.separator + getName());
+
+ //Start the broker
+ super.setUp();
+ }
+
+ /**
+ * Description:
+ * When a new Channel (JMS Session) is created this will be logged as a CHN-1001 Create message. The messages will contain the prefetch details about this new Channel.
+ * Input:
+ *
+ * 1. Running Broker
+ * 2. New JMS Session/Channel creation
+ *
+ * Output:
+ * <date> CHN-1001 : Create : Prefetch <count>
+ *
+ * Validation Steps:
+ * 3. The CHN ID is correct
+ * 4. The prefetch value matches that defined by the requesting client.
+ *
+ * @throws Exception - if an error occurs
+ */
+ public void testChannelCreate() throws Exception
+ {
+ assertLoggingNotYetOccured(CHANNEL_PREFIX);
+
+ Connection connection = getConnection();
+
+ // Test that calling session.close gives us the expected output
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ List<String> results = _monitor.findMatches(CHANNEL_PREFIX);
+
+ // Validation
+
+ assertEquals("CHN messages not logged", 1, results.size());
+
+ String log = results.get(0);
+ // MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1001 : Create
+ //1 & 2
+ validateMessageID("CHN-1001", log);
+ assertEquals("Incorrect Channel in actor", 1, getChannelID(fromActor(log)));
+
+ connection.close();
+ }
+
+ /**
+ * Description:
+ * The Java Broker implements consumer flow control for all ack modes except
+ * No-Ack. When a client connects the session's flow is initially set to
+ * Stopped. Verify this message appears
+ *
+ * Input:
+ * 1. Running broker
+ * 2. Create consumer
+ * Output:
+ *
+ * <date> CHN-1002 : Flow Stopped
+ *
+ * Validation Steps:
+ * 4. The CHN ID is correct
+ *
+ * @throws Exception - if an error occurs
+ */
+
+ public void testChannelStartsFlowStopped() throws Exception
+ {
+ assertLoggingNotYetOccured(CHANNEL_PREFIX);
+
+ Connection connection = getConnection();
+
+ // Create a session to fill up
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = (Queue) getInitialContext().lookup(QUEUE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connection.start();
+
+ List<String> results = _monitor.findMatches(CHANNEL_PREFIX);
+
+ // The last channel message should be:
+ //
+ // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow Off
+
+ // Verify
+ int resultSize = results.size();
+ String log = results.get(resultSize - 1);
+
+ validateMessageID("CHN-1002", log);
+ assertTrue("Message should be Flow Stopped", fromMessage(log).endsWith("Flow Stopped"));
+
+ }
+
+ /**
+ * Description:
+ * The Java Broker implements consumer flow control for all ack modes except
+ * No-Ack. When the client first attempts to receive a message then the Flow
+ * status of the Session is set to Started.
+ *
+ * Input:
+ * 1. Running broker
+ * 2. Create a consumer
+ * 3. Attempt to receive a message
+ * Output:
+ *
+ * <date> CHN-1002 : Flow Started
+ *
+ * Validation Steps:
+ * 4. The CHN ID is correct
+ *
+ * @throws Exception - if an error occurs
+ */
+
+ public void testChannelStartConsumerFlowStarted() throws Exception
+ {
+ assertLoggingNotYetOccured(CHANNEL_PREFIX);
+
+ Connection connection = getConnection();
+
+ // Create a session to fill up
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = (Queue) getInitialContext().lookup(QUEUE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connection.start();
+
+ //Call receive to send the Flow On message
+ consumer.receiveNoWait();
+
+ List<String> results = _monitor.findMatches(CHANNEL_PREFIX);
+
+ // The last two channel messages should be:
+ //
+ // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow On
+
+ // Verify
+
+ int resultSize = results.size();
+ String log = results.get(resultSize - 1);
+
+ validateMessageID("CHN-1002", log);
+ assertTrue("Message should be Flow Started", fromMessage(log).endsWith("Flow Started"));
+
+ }
+
+ /**
+ * Description:
+ * When the client gracefully closes the Connection then a CHN-1003 Close
+ * message will be issued. This must be the last message logged for this
+ * Channel.
+ * Input:
+ * 1. Running Broker
+ * 2. Connected Client
+ * 3. Client then requests that the Connection is closed
+ * Output:
+ *
+ * <date> CHN-1003 : Close
+ *
+ * Validation Steps:
+ * 4. The MST ID is correct
+ * 5. This must be the last message logged for this Channel.
+ *
+ * @throws Exception - if an error occurs
+ */
+ public void testChannelCloseViaConnectionClose() throws Exception
+ {
+ assertLoggingNotYetOccured(CHANNEL_PREFIX);
+
+ Connection connection = getConnection();
+
+ // Create a session
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Close the connection to verify the created session closing is logged.
+ connection.close();
+
+ List<String> results = _monitor.findMatches(CHANNEL_PREFIX);
+
+ // The last two channel messages should be:
+ //
+ // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow On
+
+ // Verify
+
+ int resultSize = results.size();
+ String log = results.get(resultSize - 1);
+
+ validateMessageID("CHN-1003", log);
+ assertTrue("Message should be Close:" + fromMessage(log), fromMessage(log).endsWith("Close"));
+ assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromActor(log)));
+ assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromSubject(log)));
+ }
+
+ /**
+ * Description:
+ * When the client gracefully closes the Connection then a CHN-1003 Close
+ * message will be issued. This must be the last message logged for this
+ * Channel.
+ * Input:
+ * 1. Running Broker
+ * 2. Connected Client
+ * 3. Client then requests that the Channel is closed
+ * Output:
+ *
+ * <date> CHN-1003 : Close
+ *
+ * Validation Steps:
+ * 4. The MST ID is correct
+ * 5. This must be the last message logged for this Channel.
+ *
+ * @throws Exception - if an error occurs
+ */
+ public void testChannelCloseViaChannelClose() throws Exception
+ {
+ assertLoggingNotYetOccured(CHANNEL_PREFIX);
+
+ Connection connection = getConnection();
+
+ // Create a session and then close it
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE).close();
+
+ List<String> results = _monitor.findMatches(CHANNEL_PREFIX);
+
+ // The last two channel messages should be:
+ //
+ // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow On
+
+ // Verify
+
+ int resultSize = results.size();
+ String log = results.get(resultSize - 1);
+
+ validateMessageID("CHN-1003", log);
+ assertTrue("Message should be Close:" + fromMessage(log), fromMessage(log).endsWith("Close"));
+ assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromActor(log)));
+ assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromSubject(log)));
+ }
+
+} \ No newline at end of file