diff options
Diffstat (limited to 'java')
8 files changed, 426 insertions, 14 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index aa390d6c26..2a46ee53b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -57,6 +57,12 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.logging.subjects.ChannelLogSubject; +import org.apache.qpid.server.logging.actors.AMQPChannelActor; +import org.apache.qpid.server.logging.actors.CurrentActor; public class AMQChannel { @@ -111,13 +117,22 @@ public class AMQChannel // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; - private boolean _closing; + private boolean _closing; + + private LogActor _actor; + private LogSubject _logSubject; public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { _session = session; _channelId = channelId; + + _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); + _logSubject = new ChannelLogSubject(this); + + _actor.message(ChannelMessages.CHN_1001()); + _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId); @@ -371,6 +386,8 @@ public class AMQChannel private void setClosing(boolean closing) { _closing = closing; + + CurrentActor.get().message(_logSubject, ChannelMessages.CHN_1003()); } private void unsubscribeAllConsumers() throws AMQException @@ -774,6 +791,8 @@ public class AMQChannel boolean wasSuspended = _suspended.getAndSet(suspended); if (wasSuspended != suspended) { + _actor.message(_logSubject, ChannelMessages.CHN_1002(suspended ? "Stopped" : "Started")); + if (wasSuspended) { // may need to deliver queued messages @@ -865,6 +884,8 @@ public class AMQChannel public void setCredit(final long prefetchSize, final int prefetchCount) { + //fixme +// _actor.message(ChannelMessages.CHN_100X(prefetchSize, prefetchCount); _creditManager.setCreditLimits(prefetchSize, prefetchCount); } @@ -906,4 +927,9 @@ public class AMQChannel { return _recordDeliveryMethod; } + + public LogActor getLogActor() + { + return _actor; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties index d9f95ecb8e..24df17683f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties +++ b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties @@ -239,8 +239,8 @@ CON-1001 = Open[ : Client ID : {0}][ : Protocol Version : {1}] CON-1002 = Close #Channel -# 0 - count -CHN-1001 = Create : Prefetch {0, number} +CHN-1001 = Create +# : Prefetch Size {0,number} : Count {1,number} # 0 - flow CHN-1002 = Flow {0} CHN-1003 = Close diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 1da5b1c26e..e8ea56bafd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -56,6 +56,7 @@ import org.apache.qpid.server.logging.actors.AMQPConnectionActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -205,6 +206,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return _sessionID; } + public LogActor getLogActor() + { + return _actor; + } + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; @@ -230,7 +236,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable int channelId = frame.getChannel(); AMQBody body = frame.getBodyFrame(); - CurrentActor.set(_actor); + //Look up the Channel's Actor and set that as the current actor + // If that is not available then we can use the ConnectionActor + // that is associated with this AMQMPSession. + LogActor channelActor = null; + if (_channelMap.get(channelId) != null) + { + channelActor = _channelMap.get(channelId).getLogActor(); + } + CurrentActor.set(channelActor == null ? _actor : channelActor); + try { if (_logger.isDebugEnabled()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index f721730d9c..fff406bb3d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -28,6 +28,8 @@ import org.apache.qpid.framing.*; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -38,6 +40,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession { long getSessionID(); + LogActor getLogActor(); + public static final class ProtocolSessionIdentifier { private final Object _sessionIdentifier; diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java index 2a37eae728..b4dd3da2e6 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java @@ -27,15 +27,12 @@ public class ChannelMessagesTest extends AbstractTestMessages { public void testMessage1001() { - Integer prefetch = 12345; - - _logMessage = ChannelMessages.CHN_1001(prefetch); + _logMessage = ChannelMessages.CHN_1001(); List<Object> log = performLog(); // We use the MessageFormat here as that is what the ChannelMessage // will do, this makes the resulting value 12,345 - String[] expected = {"Create", "Prefetch", - MessageFormat.format("{0, number}", prefetch)}; + String[] expected = {"Create"}; validateLogMessage(log, "CHN-1001", expected); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java index b9dcd972b1..c301969ae5 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.framing.*; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -62,6 +63,11 @@ public class MockProtocolSession implements AMQProtocolSession return _sessionID; } + public LogActor getLogActor() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public void dataBlockReceived(AMQDataBlock message) throws Exception { } diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java b/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java index 3437fea236..c5015760a5 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java +++ b/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java @@ -22,8 +22,6 @@ package org.apache.qpid.server.logging; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.LogMonitor; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject; import java.io.IOException; @@ -31,7 +29,6 @@ public class AbstractTestLogging extends QpidTestCase { protected LogMonitor _monitor; - @Override public void setUp() throws Exception { @@ -41,21 +38,107 @@ public class AbstractTestLogging extends QpidTestCase /** * assert that the requested log message has not occured + * * @param log + * * @throws IOException */ public void assertLoggingNotYetOccured(String log) throws IOException { // Ensure the alert has not occured yet - assertEquals("Message has already occured:"+log, 0, + assertEquals("Message has already occured:" + log, 0, _monitor.findMatches(log).size()); } + + protected void validateMessageID(String id, String log) + { + assertEquals("Incorrect CHN message",id, getMessageID(log)); + } + + protected String getMessageID(String log) + { + String message = fromMessage(log); + + return message.substring(0, message.indexOf(" ")); + } + + /** + * Return the first channel id from the log string + * ' ch;X' if there is no channel id return -1. + * + * @param log the log string to search. + * + * @return channel id or -1 if no channel id exists. + */ + protected int getChannelID(String log) + { + int start = log.indexOf("ch:") + 3; + + // If we do a check for ] as the boundary we will get cases where log + // is presented with the bounding. If we don't match a ] then we can use + // the end of the string as the boundary. + int end = log.indexOf("]", start); + if (end == -1) + { + end = log.length(); + } + + try + { + return Integer.parseInt(log.substring(start, end)); + } + catch (Exception e) + { + return -1; + } + } + + protected String fromMessage(String log) + { + int messageStart = log.indexOf("MESSAGE"); + + int startSubject = log.indexOf("]", messageStart) + 1; + int start = log.indexOf("]", startSubject) + 1; + + // If we don't have a subject then the second indexOf will return 0 + // in which case we can use the end of the actor as the index. + if (start == 0) + { + start = startSubject; + } + + return log.substring(start).trim(); + } + + protected String fromSubject(String log) + { + int start = log.indexOf("[") + 1; + // Take the second index + start = log.indexOf("[", start) + 1; + + // There may not be a subject so in that case return nothing. + if (start == -1) + { + return ""; + } + + int end = log.indexOf("]", start); + return log.substring(start, end); + } + + protected String fromActor(String log) + { + int start = log.indexOf("[") + 1; + int end = log.indexOf("]", start); + return log.substring(start, end).trim(); + } + protected int extractConnectionID(String log) { int conIDStart = log.indexOf("con:") + 4; int conIDEnd = log.indexOf("(", conIDStart); return Integer.parseInt(log.substring(conIDStart, conIDEnd)); } - + } diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java new file mode 100644 index 0000000000..9aa3799ca5 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java @@ -0,0 +1,281 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import java.io.File; +import java.util.List; + +public class ChannelLoggingTest extends AbstractTestLogging +{ + private static final String CHANNEL_PREFIX = "CHN-"; + + public void setUp() throws Exception + { + // set QPID_WORK to be [QPID_WORK|io.tmpdir]/<testName> + setSystemProperty("QPID_WORK", + System.getProperty("QPID_WORK", + System.getProperty("java.io.tmpdir")) + + File.separator + getName()); + + //Start the broker + super.setUp(); + } + + /** + * Description: + * When a new Channel (JMS Session) is created this will be logged as a CHN-1001 Create message. The messages will contain the prefetch details about this new Channel. + * Input: + * + * 1. Running Broker + * 2. New JMS Session/Channel creation + * + * Output: + * <date> CHN-1001 : Create : Prefetch <count> + * + * Validation Steps: + * 3. The CHN ID is correct + * 4. The prefetch value matches that defined by the requesting client. + * + * @throws Exception - if an error occurs + */ + public void testChannelCreate() throws Exception + { + assertLoggingNotYetOccured(CHANNEL_PREFIX); + + Connection connection = getConnection(); + + // Test that calling session.close gives us the expected output + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + List<String> results = _monitor.findMatches(CHANNEL_PREFIX); + + // Validation + + assertEquals("CHN messages not logged", 1, results.size()); + + String log = results.get(0); + // MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1001 : Create + //1 & 2 + validateMessageID("CHN-1001", log); + assertEquals("Incorrect Channel in actor", 1, getChannelID(fromActor(log))); + + connection.close(); + } + + /** + * Description: + * The Java Broker implements consumer flow control for all ack modes except + * No-Ack. When a client connects the session's flow is initially set to + * Stopped. Verify this message appears + * + * Input: + * 1. Running broker + * 2. Create consumer + * Output: + * + * <date> CHN-1002 : Flow Stopped + * + * Validation Steps: + * 4. The CHN ID is correct + * + * @throws Exception - if an error occurs + */ + + public void testChannelStartsFlowStopped() throws Exception + { + assertLoggingNotYetOccured(CHANNEL_PREFIX); + + Connection connection = getConnection(); + + // Create a session to fill up + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = (Queue) getInitialContext().lookup(QUEUE); + MessageConsumer consumer = session.createConsumer(queue); + + connection.start(); + + List<String> results = _monitor.findMatches(CHANNEL_PREFIX); + + // The last channel message should be: + // + // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow Off + + // Verify + int resultSize = results.size(); + String log = results.get(resultSize - 1); + + validateMessageID("CHN-1002", log); + assertTrue("Message should be Flow Stopped", fromMessage(log).endsWith("Flow Stopped")); + + } + + /** + * Description: + * The Java Broker implements consumer flow control for all ack modes except + * No-Ack. When the client first attempts to receive a message then the Flow + * status of the Session is set to Started. + * + * Input: + * 1. Running broker + * 2. Create a consumer + * 3. Attempt to receive a message + * Output: + * + * <date> CHN-1002 : Flow Started + * + * Validation Steps: + * 4. The CHN ID is correct + * + * @throws Exception - if an error occurs + */ + + public void testChannelStartConsumerFlowStarted() throws Exception + { + assertLoggingNotYetOccured(CHANNEL_PREFIX); + + Connection connection = getConnection(); + + // Create a session to fill up + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = (Queue) getInitialContext().lookup(QUEUE); + MessageConsumer consumer = session.createConsumer(queue); + + connection.start(); + + //Call receive to send the Flow On message + consumer.receiveNoWait(); + + List<String> results = _monitor.findMatches(CHANNEL_PREFIX); + + // The last two channel messages should be: + // + // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow On + + // Verify + + int resultSize = results.size(); + String log = results.get(resultSize - 1); + + validateMessageID("CHN-1002", log); + assertTrue("Message should be Flow Started", fromMessage(log).endsWith("Flow Started")); + + } + + /** + * Description: + * When the client gracefully closes the Connection then a CHN-1003 Close + * message will be issued. This must be the last message logged for this + * Channel. + * Input: + * 1. Running Broker + * 2. Connected Client + * 3. Client then requests that the Connection is closed + * Output: + * + * <date> CHN-1003 : Close + * + * Validation Steps: + * 4. The MST ID is correct + * 5. This must be the last message logged for this Channel. + * + * @throws Exception - if an error occurs + */ + public void testChannelCloseViaConnectionClose() throws Exception + { + assertLoggingNotYetOccured(CHANNEL_PREFIX); + + Connection connection = getConnection(); + + // Create a session + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Close the connection to verify the created session closing is logged. + connection.close(); + + List<String> results = _monitor.findMatches(CHANNEL_PREFIX); + + // The last two channel messages should be: + // + // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow On + + // Verify + + int resultSize = results.size(); + String log = results.get(resultSize - 1); + + validateMessageID("CHN-1003", log); + assertTrue("Message should be Close:" + fromMessage(log), fromMessage(log).endsWith("Close")); + assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromActor(log))); + assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromSubject(log))); + } + + /** + * Description: + * When the client gracefully closes the Connection then a CHN-1003 Close + * message will be issued. This must be the last message logged for this + * Channel. + * Input: + * 1. Running Broker + * 2. Connected Client + * 3. Client then requests that the Channel is closed + * Output: + * + * <date> CHN-1003 : Close + * + * Validation Steps: + * 4. The MST ID is correct + * 5. This must be the last message logged for this Channel. + * + * @throws Exception - if an error occurs + */ + public void testChannelCloseViaChannelClose() throws Exception + { + assertLoggingNotYetOccured(CHANNEL_PREFIX); + + Connection connection = getConnection(); + + // Create a session and then close it + connection.createSession(false, Session.AUTO_ACKNOWLEDGE).close(); + + List<String> results = _monitor.findMatches(CHANNEL_PREFIX); + + // The last two channel messages should be: + // + // INFO - MESSAGE [con:0(guest@anonymous(4205299)/test)/ch:1] [con:0(guest@anonymous(4205299)/test)/ch:1] CHN-1002 : Flow On + + // Verify + + int resultSize = results.size(); + String log = results.get(resultSize - 1); + + validateMessageID("CHN-1003", log); + assertTrue("Message should be Close:" + fromMessage(log), fromMessage(log).endsWith("Close")); + assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromActor(log))); + assertEquals("Incorrect Channel ID closed.", 1, getChannelID(fromSubject(log))); + } + +}
\ No newline at end of file |