diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-12 20:50:07 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-12 20:50:07 +0000 |
commit | f772c4f42999d902722584623910184c8f439a1b (patch) | |
tree | 04f9a03a1e0771ad1c3c23e67eed68eb22629e17 | |
parent | f86519a1181233a5179d8a9e4bbb3f427baaf988 (diff) | |
download | qpid-python-f772c4f42999d902722584623910184c8f439a1b.tar.gz |
Implemented alternate exchanges, some work on integrating IO layer
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@824494 13f79535-47bb-0310-9956-ffa450edef68
36 files changed, 1006 insertions, 349 deletions
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java index 572b123676..b228e7270c 100644 --- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java +++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java @@ -74,6 +74,16 @@ public class TestExchange implements Exchange return false; } + public Exchange getAlternateExchange() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setAlternateExchange(Exchange exchange) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public void initialise(VirtualHost host, AMQShortString name, boolean durable, boolean autoDelete) throws AMQException { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 7b966700fb..6d27c1c07c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -232,7 +232,7 @@ public class AMQChannel if(!checkMessageUserId(_currentMessage.getContentHeader())) { - _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", createAMQMessage(_currentMessage))); + _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", _currentMessage)); } else { @@ -240,7 +240,7 @@ public class AMQChannel { if (_currentMessage.isMandatory() || _currentMessage.isImmediate()) { - _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", createAMQMessage(_currentMessage))); + _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", _currentMessage)); } else { @@ -898,37 +898,6 @@ public class AMQChannel return _defaultQueue; } - public void processReturns() throws AMQException - { - if (!_returnMessages.isEmpty()) - { - - for (RequiredDeliveryException bouncedMessage : _returnMessages) - { - ServerMessage serverMessage = bouncedMessage.getAMQMessage(); - if(serverMessage instanceof AMQMessage) - { - AMQMessage message = (AMQMessage) serverMessage; - _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), - message.getContentHeaderBody(), - message.getBodyFrameIterator(_session,_channelId), - _channelId, - bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); - - } - else - { - // TODO AMQP 0-10 Message - throw new RuntimeException("not yet implemented conversion of 0-10 messages"); - } - bouncedMessage.release(); - } - - - _returnMessages.clear(); - } - } public boolean isClosing() { @@ -1154,12 +1123,12 @@ public class AMQChannel private class WriteReturnAction implements Transaction.Action { private final AMQConstant _errorCode; - private final AMQMessage _message; + private final IncomingMessage _message; private final String _description; public WriteReturnAction(AMQConstant errorCode, String description, - AMQMessage message) + IncomingMessage message) { _errorCode = errorCode; _message = message; @@ -1171,8 +1140,8 @@ public class AMQChannel try { _session.getProtocolOutputConverter().writeReturn(_message.getMessagePublishInfo(), - _message.getContentHeaderBody(), - _message.getBodyFrameIterator(_session,_channelId), + _message.getContentHeader(), + new BodyFrameIterator(_session,_channelId,_message), _channelId, _errorCode.getCode(), new AMQShortString(_description)); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExchangeReferrer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExchangeReferrer.java new file mode 100755 index 0000000000..314f2761a8 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExchangeReferrer.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server; + +public interface ExchangeReferrer +{ +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index baa9ba6208..de095a3bc2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -245,7 +245,7 @@ public class Main String logConfig = commandLine.getOptionValue("l"); String logWatchConfig = commandLine.getOptionValue("w", "0"); - + int logWatchTime = 0; try { @@ -256,7 +256,7 @@ public class Main System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " + "a non-negative integer. Using default of zero (no watching configured"); } - + File logConfigFile; if (logConfig != null) { @@ -282,8 +282,10 @@ public class Main BrokerMessages.reload(); // AR.initialise() sets its own actor so we now need to set the actor - // for the remainder of the startup + // for the remainder of the startup CurrentActor.set(new BrokerActor(config.getRootMessageLogger())); + CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger())); + try { configureLoggingManagementMBean(logConfigFile, logWatchTime); @@ -339,40 +341,53 @@ public class Main if (!serverConfig.getSSLOnly()) { NetworkDriver driver = new MINANetworkDriver(); - driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(), + driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), null); - ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), new QpidAcceptor(driver,"TCP")); CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port)); } - + if (serverConfig.getEnableSSL()) { sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); NetworkDriver driver = new MINANetworkDriver(); - driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress}, + driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory); - ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), new QpidAcceptor(driver,"TCP")); CurrentActor.get().message(BrokerMessages.BRK_1002("TCP/SSL", serverConfig.getSSLPort())); } - + //fixme qpid.AMQP should be using qpidproperties to get value _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion() + " build: " + QpidProperties.getBuildVersion()); CurrentActor.get().message(BrokerMessages.BRK_1004()); - - // TODO - Fix to use a proper binding int port_0_10 = port + 1; IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); - final ConnectionDelegate delegate = - new org.apache.qpid.server.transport.ServerConnectionDelegate(appRegistry, "localhost"); - - + new org.apache.qpid.server.transport.ServerConnectionDelegate(appRegistry, bindAddress.getCanonicalHostName()); + +/* + NetworkDriver driver = new MINANetworkDriver(); + driver.bind(port, new InetAddress[]{bindAddress}, new ProtocolEngineFactory_0_10(delegate), + serverConfig.getNetworkConfiguration(), null); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + new QpidAcceptor(driver,"TCP")); + CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port)); +*/ + + + // TODO - Fix to use a proper binding + + + + + + ConnectionBinding cb = new ConnectionBinding() { public Connection connection() @@ -382,7 +397,7 @@ public class Main return conn; } }; - + org.apache.qpid.transport.network.io.IoAcceptor ioa = new org.apache.qpid.transport.network.io.IoAcceptor ("0.0.0.0", port_0_10, cb); ioa.start(); @@ -426,11 +441,11 @@ public class Main { System.setProperty("log4j.defaultInitOverride", "true"); } - + //now that the override status is know, we can instantiate the Loggers _logger = Logger.getLogger(Main.class); _brokerLogger = Logger.getLogger("Qpid.Broker"); - + new Main(args); } @@ -466,7 +481,7 @@ public class Main { if (logConfigFile.exists() && logConfigFile.canRead()) { - CurrentActor.get().message(BrokerMessages.BRK_1007(logConfigFile.getAbsolutePath())); + CurrentActor.get().message(BrokerMessages.BRK_1007(logConfigFile.getAbsolutePath())); System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath()); if (logWatchTime > 0) { @@ -498,7 +513,7 @@ public class Main { System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath()); System.err.println("Using the fallback internal log4j.properties configuration"); - + InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties"); if(propsFile == null) { @@ -516,7 +531,7 @@ public class Main private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception { LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime); - + try { blm.register(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java new file mode 100755 index 0000000000..4c84b47d43 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server; + +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.network.Disassembler; + +public class ProtocolEngineFactory_0_10 implements ProtocolEngineFactory +{ + private ConnectionDelegate _delegate; + + public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + + + public ProtocolEngineFactory_0_10(ConnectionDelegate delegate) + { + _delegate = delegate; + } + + public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) + { + Connection conn = new Connection(); + conn.setConnectionDelegate(_delegate); + Disassembler dis = new Disassembler(networkDriver, MAX_FRAME_SIZE); + conn.setSender(dis); + return new ProtocolEngine_0_10(conn, networkDriver); //To change body of implemented methods use File | Settings | File Templates. + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java new file mode 100755 index 0000000000..7ddd3da1a6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.Assembler; + +import java.net.SocketAddress; + +public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine +{ + private NetworkDriver _networkDriver; + private long _readBytes; + private long _writtenBytes; + + public ProtocolEngine_0_10(Connection conn, NetworkDriver networkDriver) + { + super(new Assembler(conn)); + _networkDriver = networkDriver; + } + + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public long getReadBytes() + { + return _readBytes; + } + + public long getWrittenBytes() + { + return _writtenBytes; + } + + public void writerIdle() + { + //Todo + } + + public void readerIdle() + { + //Todo + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index f3bc543562..cacd294464 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -46,14 +46,16 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ExchangeLogSubject; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.ExchangeReferrer; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public abstract class AbstractExchange implements Exchange, Managable { private AMQShortString _name; - + private Exchange _alternateExchange; protected boolean _durable; protected String _exchangeType; @@ -70,6 +72,7 @@ public abstract class AbstractExchange implements Exchange, Managable //The logSubject for ths exchange private LogSubject _logSubject; + private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>(); /** * Abstract MBean class. This has some of the methods implemented from @@ -197,6 +200,10 @@ public abstract class AbstractExchange implements Exchange, Managable { _exchangeMbean.unregister(); } + if(_alternateExchange != null) + { + _alternateExchange.removeReference(this); + } CurrentActor.get().message(_logSubject, ExchangeMessages.EXH_1002()); } @@ -237,4 +244,37 @@ public abstract class AbstractExchange implements Exchange, Managable return isBound(new AMQShortString(bindingKey)); } + public Exchange getAlternateExchange() + { + return _alternateExchange; + } + + public void setAlternateExchange(Exchange exchange) + { + if(_alternateExchange != null) + { + _alternateExchange.removeReference(this); + } + if(exchange != null) + { + exchange.addReference(this); + } + _alternateExchange = exchange; + + } + + public void removeReference(ExchangeReferrer exchange) + { + _referrers.remove(exchange); + } + + public void addReference(ExchangeReferrer exchange) + { + _referrers.put(exchange, Boolean.TRUE); + } + + public boolean hasReferrers() + { + return !_referrers.isEmpty(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 441996c212..2bcf5e3053 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -27,11 +27,12 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.ExchangeReferrer; import java.util.ArrayList; import java.util.Map; -public interface Exchange +public interface Exchange extends ExchangeReferrer { AMQShortString getName(); @@ -100,4 +101,13 @@ public interface Exchange boolean isBound(String bindingKey); + Exchange getAlternateExchange(); + + void setAlternateExchange(Exchange exchange); + + void removeReference(ExchangeReferrer exchange); + + void addReference(ExchangeReferrer exchange); + + boolean hasReferrers(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java index df4da2a79e..3d31a705fe 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.logging.actors; import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.RootMessageLogger; import java.util.EmptyStackException; import java.util.Stack; @@ -66,6 +69,8 @@ public class CurrentActor } }; + private static LogActor _defaultActor; + /** * Set a new LogActor to be the Current Actor * <p/> @@ -105,7 +110,12 @@ public class CurrentActor } catch (EmptyStackException ese) { - return null; + return _defaultActor; } } + + public static void setDefault(LogActor defaultActor) + { + _defaultActor = defaultActor; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java index b9143ece91..580e7d21f0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java @@ -41,8 +41,14 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage public MessageTransferMessage(MessageTransfer xfr, WeakReference<Session> sessionRef) { + this(_numberSource.getAndIncrement(), xfr, sessionRef); + } + + public MessageTransferMessage(long messageNumber, MessageTransfer xfr, WeakReference<Session> sessionRef) + { + _xfr = xfr; - _messageNumber = _numberSource.getAndIncrement(); + _messageNumber = messageNumber; Header header = _xfr.getHeader(); if(header != null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 7004bac519..1d4fd6b1ea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -22,22 +22,17 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.message.*; import java.util.Iterator; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.CopyOnWriteArrayList; /** * A deliverable message. @@ -74,107 +69,6 @@ public class AMQMessage implements ServerMessage /** - * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory - * therefore is memory-efficient. - */ - private class BodyFrameIterator implements Iterator<AMQDataBlock> - { - private int _channel; - - private int _index = -1; - private AMQProtocolSession _protocolSession; - - private BodyFrameIterator(AMQProtocolSession protocolSession, int channel) - { - _channel = channel; - _protocolSession = protocolSession; - } - - public boolean hasNext() - { - try - { - return _index < (_messageHandle.getBodyCount() - 1); - } - catch (AMQException e) - { - _log.error("Unable to get body count: " + e, e); - - return false; - } - } - - public AMQDataBlock next() - { - try - { - - AMQBody cb = - getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk( - ++_index)); - - return new AMQFrame(_channel, cb); - } - catch (AMQException e) - { - // have no choice but to throw a runtime exception - throw new RuntimeException("Error getting content body: " + e, e); - } - - } - - private ProtocolVersionMethodConverter getProtocolVersionMethodConverter() - { - return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter(); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - } - - - private class BodyContentIterator implements Iterator<ContentChunk> - { - - private int _index = -1; - - public boolean hasNext() - { - try - { - return _index < (_messageHandle.getBodyCount() - 1); - } - catch (AMQException e) - { - _log.error("Error getting body count: " + e, e); - - return false; - } - } - - public ContentChunk next() - { - try - { - return _messageHandle.getContentChunk(++_index); - } - catch (AMQException e) - { - throw new RuntimeException("Error getting content body: " + e, e); - } - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - } - - - - /** * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to * queues. @@ -260,12 +154,12 @@ public class AMQMessage implements ServerMessage public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) { - return new BodyFrameIterator(protocolSession, channel); + return new BodyFrameIterator(protocolSession, channel, _messageHandle); } public Iterator<ContentChunk> getContentBodyIterator() { - return new BodyContentIterator(); + return new BodyContentIterator(_messageHandle); } public ContentHeaderBody getContentHeaderBody() throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java index d8ab6ee9b2..5798dca079 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -29,7 +29,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; * A pluggable way of getting message data. Implementations can provide intelligent caching for example or * even no caching at all to minimise the broker memory footprint. */ -public interface AMQMessageHandle +public interface AMQMessageHandle extends BodyContentHolder { ContentHeaderBody getContentHeaderBody() throws AMQException; @@ -41,23 +41,10 @@ public interface AMQMessageHandle /** - * @return the number of body frames associated with this message - */ - int getBodyCount() throws AMQException; - - /** * @return the size of the body */ long getBodySize() throws AMQException; - /** - * Get a particular content body - * @param index the index of the body to retrieve, must be between 0 and getBodyCount() - 1 - * @return a content body - * @throws IllegalArgumentException if the index is invalid - */ - ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException; - void addContentBodyFrame(ContentChunk contentBody, boolean isLastContentBody) throws AMQException; MessagePublishInfo getMessagePublishInfo() throws AMQException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index c5691792d4..95ca18c778 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.PrincipalHolder; +import org.apache.qpid.server.ExchangeReferrer; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -39,7 +40,7 @@ import java.util.List; import java.util.Set; import java.util.Map; -public interface AMQQueue extends Managable, Comparable<AMQQueue> +public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer { @@ -207,6 +208,10 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> boolean isExclusive(); + Exchange getAlternateExchange(); + + void setAlternateExchange(Exchange exchange); + Map<String, Object> getArguments(); void checkCapacity(AMQChannel channel); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentHolder.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentHolder.java new file mode 100755 index 0000000000..4ab6278715 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentHolder.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.abstraction.ContentChunk; + +public interface BodyContentHolder +{ + /** + * @return the number of body frames associated with this message + */ + int getBodyCount() throws AMQException; + + /** + * Get a particular content body + * @param index the index of the body to retrieve, must be between 0 and getBodyCount() - 1 + * @return a content body + * @throws IllegalArgumentException if the index is invalid + */ + ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException; +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentIterator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentIterator.java new file mode 100755 index 0000000000..1483177855 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentIterator.java @@ -0,0 +1,75 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + +import java.util.Iterator; + + +class BodyContentIterator implements Iterator<ContentChunk> +{ + private static final Logger _log = Logger.getLogger(BodyContentIterator.class); + + + private AMQMessageHandle _messageHandle; + private int _index = -1; + + + public BodyContentIterator(AMQMessageHandle messageHandle) + { + _messageHandle = messageHandle; + } + + + public boolean hasNext() + { + try + { + return _index < (_messageHandle.getBodyCount() - 1); + } + catch (AMQException e) + { + _log.error("Error getting body count: " + e, e); + + return false; + } + } + + public ContentChunk next() + { + try + { + return _messageHandle.getContentChunk(++_index); + } + catch (AMQException e) + { + throw new RuntimeException("Error getting content body: " + e, e); + } + } + + public void remove() + { + throw new UnsupportedOperationException(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyFrameIterator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyFrameIterator.java new file mode 100755 index 0000000000..fd9cfe45bb --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyFrameIterator.java @@ -0,0 +1,94 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + +import java.util.Iterator; + + +public class BodyFrameIterator implements Iterator<AMQDataBlock> +{ + private static final Logger _log = Logger.getLogger(BodyFrameIterator.class); + + + private int _channel; + + private int _index = -1; + private AMQProtocolSession _protocolSession; + private BodyContentHolder _bodyContentHolder; + + public BodyFrameIterator(AMQProtocolSession protocolSession, int channel, BodyContentHolder messageHandle) + { + _channel = channel; + _protocolSession = protocolSession; + _bodyContentHolder = messageHandle; + } + + public boolean hasNext() + { + try + { + return _index < (_bodyContentHolder.getBodyCount() - 1); + } + catch (AMQException e) + { + _log.error("Unable to get body count: " + e, e); + + return false; + } + } + + public AMQDataBlock next() + { + try + { + + AMQBody cb = + getProtocolVersionMethodConverter().convertToBody(_bodyContentHolder.getContentChunk( + ++_index)); + + return new AMQFrame(_channel, cb); + } + catch (AMQException e) + { + // have no choice but to throw a runtime exception + throw new RuntimeException("Error getting content body: " + e, e); + } + + } + + private ProtocolVersionMethodConverter getProtocolVersionMethodConverter() + { + return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java new file mode 100755 index 0000000000..77da08d8c4 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.AMQMessageHeader; + +class InboundMessageAdapter implements InboundMessage +{ + + private QueueEntry _entry; + + InboundMessageAdapter() + { + } + + InboundMessageAdapter(QueueEntry entry) + { + _entry = entry; + } + + public void setEntry(QueueEntry entry) + { + _entry = entry; + } + + + public String getRoutingKey() + { + return _entry.getMessage().getRoutingKey(); + } + + public AMQMessageHeader getMessageHeader() + { + return _entry.getMessageHeader(); + } + + public boolean isPersistent() + { + return _entry.isPersistent(); + } + + public boolean isRedelivered() + { + return _entry.isRedelivered(); + } + + public long getSize() + { + return _entry.getSize(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 726d99f8b3..ac82e1f2b3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -37,8 +37,9 @@ import org.apache.qpid.AMQException; import org.apache.log4j.Logger; import java.util.ArrayList; +import java.util.List; -public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage +public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, BodyContentHolder { /** Used for debugging purposes. */ @@ -73,6 +74,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes private int _receivedChunkCount = 0; + private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>(); public IncomingMessage(final Long messageId, @@ -145,6 +147,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes { _bodyLengthReceived += contentChunk.getSize(); + _contentChunks.add(contentChunk); _messageHandle.addContentBodyFrame(contentChunk, allContentReceived()); return _receivedChunkCount++; } @@ -246,4 +249,13 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes } + public int getBodyCount() throws AMQException + { + return _contentChunks.size(); + } + + public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException + { + return _contentChunks.get(index); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 87b0267b0f..6a303fd156 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.message.ServerMessage; public interface QueueEntry extends Comparable<QueueEntry>, Filterable { + public static enum State { AVAILABLE, @@ -199,6 +200,8 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable void discard(); + void routeToAlternate(); + boolean isQueueDeleted(); void addStateChangeListener(StateChangeListener listener); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index f10a4175db..74c034d305 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -25,10 +25,14 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.Transaction; import org.apache.log4j.Logger; import java.util.Set; import java.util.HashSet; +import java.util.List; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.CopyOnWriteArraySet; @@ -213,12 +217,21 @@ public class QueueEntryImpl implements QueueEntry public void release() { _stateUpdater.set(this,AVAILABLE_STATE); - getQueue().requeue(this); - if(_stateChangeListeners != null) + if(!getQueue().isDeleted()) { - notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); + getQueue().requeue(this); + if(_stateChangeListeners != null) + { + notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); + } + + } + else if(acquire()) + { + routeToAlternate(); } + } public boolean releaseButRetain() @@ -386,6 +399,57 @@ public class QueueEntryImpl implements QueueEntry dispose(); } + public void routeToAlternate() + { + final AMQQueue currentQueue = getQueue(); + Exchange alternateExchange = currentQueue.getAlternateExchange(); + + if(alternateExchange != null) + { + final List<AMQQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this)); + final ServerMessage message = getMessage(); + if(rerouteQueues != null && rerouteQueues.size() != 0) + { + Transaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog()); + + txn.enqueue(rerouteQueues, message, new Transaction.Action() { + public void postCommit() + { + try + { + for(AMQQueue queue : rerouteQueues) + { + QueueEntry entry = queue.enqueue(message); + } + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + + public void onRollback() + { + + } + }); + txn.dequeue(currentQueue,message, + new Transaction.Action() + { + public void postCommit() + { + discard(); + } + + public void onRollback() + { + + } + }); + } + } + } + public boolean isQueueDeleted() { return getQueue().isDeleted(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 8865c7946a..de74974d2d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -19,8 +19,8 @@ import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TransactionLog; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -61,12 +61,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); + private int unused; private PrincipalHolder _prinicpalHolder; private Object _exclusiveOwner; + private Exchange _alternateExchange; + static final class QueueContext implements Context { @@ -267,6 +270,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _exclusiveOwner != null; } + public Exchange getAlternateExchange() + { + return _alternateExchange; + } + + public void setAlternateExchange(Exchange exchange) + { + if(_alternateExchange != null) + { + _alternateExchange.removeReference(this); + } + if(exchange != null) + { + exchange.addReference(this); + } + _alternateExchange = exchange; + } + public Map<String, Object> getArguments() { return null; @@ -922,7 +943,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - MessageStore store = getVirtualHost().getMessageStore(); + TransactionLog txnLog = getVirtualHost().getTransactionLog(); List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -943,7 +964,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { - store.beginTran(storeContext); + txnLog.beginTran(storeContext); // Move the messages in on the message store. for (QueueEntry entry : entries) @@ -952,7 +973,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (message.isPersistent() && toQueue.isDurable()) { - store.enqueueMessage(storeContext, toQueue, message.getMessageNumber()); + txnLog.enqueueMessage(storeContext, toQueue, message.getMessageNumber()); } // dequeue does not decrement the refence count entry.dequeue(); @@ -961,7 +982,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // Commit and flush the move transcations. try { - store.commitTran(storeContext); + txnLog.commitTran(storeContext); } catch (AMQException e) { @@ -972,7 +993,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { try { - store.abortTran(storeContext); + txnLog.abortTran(storeContext); } catch (AMQException rollbackEx) { @@ -1007,7 +1028,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener final StoreContext storeContext) { AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - MessageStore store = getVirtualHost().getMessageStore(); + TransactionLog txnLog = getVirtualHost().getTransactionLog(); List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -1027,7 +1048,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { - store.beginTran(storeContext); + txnLog.beginTran(storeContext); // Move the messages in on the message store. for (QueueEntry entry : entries) @@ -1037,7 +1058,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (message.isPersistent() && toQueue.isDurable()) { - store.enqueueMessage(storeContext, toQueue, message.getMessageNumber()); + txnLog.enqueueMessage(storeContext, toQueue, message.getMessageNumber()); } } @@ -1045,7 +1066,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // Commit and flush the move transcations. try { - store.commitTran(storeContext); + txnLog.commitTran(storeContext); } catch (AMQException e) { @@ -1056,7 +1077,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { try { - store.abortTran(storeContext); + txnLog.abortTran(storeContext); } catch (AMQException rollbackEx) { @@ -1139,7 +1160,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntryIterator queueListIterator = _entries.iterator(); long count = 0; - Transaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); + Transaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); while (queueListIterator.advance()) { @@ -1160,7 +1181,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private void dequeueEntry(final QueueEntry node) { - Transaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore()); + Transaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog()); dequeueEntry(node, txn); } @@ -1206,7 +1227,108 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _bindings.deregister(); _virtualHost.getQueueRegistry().unregisterQueue(_name); + List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() + { + + public boolean accept(QueueEntry entry) + { + return entry.acquire(); + } + + public boolean filterComplete() + { + return false; + } + }); + + Transaction txn = new LocalTransaction(getVirtualHost().getTransactionLog()); + + if(_alternateExchange != null) + { + + InboundMessageAdapter adapter = new InboundMessageAdapter(); + for(final QueueEntry entry : entries) + { + adapter.setEntry(entry); + final List<AMQQueue> rerouteQueues = _alternateExchange.route(adapter); + final ServerMessage message = entry.getMessage(); + if(rerouteQueues != null & rerouteQueues.size() != 0) + { + txn.enqueue(rerouteQueues, entry.getMessage(), + new Transaction.Action() + { + + public void postCommit() + { + try + { + for(AMQQueue queue : rerouteQueues) + { + QueueEntry entry = queue.enqueue(message); + } + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + + } + + public void onRollback() + { + + } + }); + txn.dequeue(this, entry.getMessage(), + new Transaction.Action() + { + + public void postCommit() + { + entry.discard(); + } + + public void onRollback() + { + } + }); + } + + } + + _alternateExchange.removeReference(this); + } + else + { + // TODO log discard + + for(final QueueEntry entry : entries) + { + final ServerMessage message = entry.getMessage(); + if(message != null) + { + txn.dequeue(this, message, + new Transaction.Action() + { + + public void postCommit() + { + entry.discard(); + } + + public void onRollback() + { + } + }); + } + } + } + + txn.commit(); + + _managedObject.unregister(); + for (Task task : _deleteTaskList) { task.doTask(this); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 6e86f4b70d..6dce764e63 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.ServerMessage; import java.io.ByteArrayInputStream; @@ -1375,6 +1376,21 @@ public class DerbyMessageStore extends AbstractMessageStore return true; } + public void storeMessageHeader(Long messageNumber, ServerMessage message) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void storeContent(Long messageNumber, long offset, java.nio.ByteBuffer body) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public ServerMessage getMessage(Long messageNumber) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + private void checkNotClosed() throws MessageStoreClosedException { if (_closed.get()) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 8a9c1571ca..ecae3cb794 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.ServerMessage; import java.util.ArrayList; import java.util.Collections; @@ -39,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.nio.ByteBuffer; /** A simple message store that stores the messages in a threadsafe structure in memory. */ public class MemoryMessageStore extends AbstractMessageStore @@ -240,6 +242,21 @@ public class MemoryMessageStore extends AbstractMessageStore return false; } + public void storeMessageHeader(Long messageNumber, ServerMessage message) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void storeContent(Long messageNumber, long offset, ByteBuffer body) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public ServerMessage getMessage(Long messageNumber) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + private void checkNotClosed() throws MessageStoreClosedException { if (_closed.get()) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index 802c0bc709..a30e6b485c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -25,6 +25,10 @@ import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.message.ServerMessage; + +import java.nio.ByteBuffer; /** * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages, queues @@ -134,9 +138,14 @@ public interface MessageStore extends DurableConfigurationStore, TransactionLog /** * Is this store capable of persisting the data - * + * * @return true if this store is capable of persisting data */ boolean isPersistent(); + void storeMessageHeader(Long messageNumber, ServerMessage message); + + void storeContent(Long messageNumber, long offset, ByteBuffer body); + + ServerMessage getMessage(Long messageNumber); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 1b80ede81f..2669475a63 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -108,8 +108,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr _creditManager.addStateListener(this); _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED); - _logSubject = new SubscriptionLogSubject(this); - _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this); } @@ -135,6 +133,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); } _queue = queue; + _logSubject = new SubscriptionLogSubject(this); + _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this); + } public AMQShortString getConsumerTag() @@ -335,7 +336,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { public void onComplete(Method method) { - restoreCredit(entry); + restoreCredit(entry); } }); } @@ -409,7 +410,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private void reject(QueueEntry entry) { entry.setRedelivered(true); - entry.reject(this); + entry.routeToAlternate(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 7891c5214c..1b0ea41e0b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.flow.*; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.*; @@ -87,7 +88,7 @@ public class ServerSessionDelegate extends SessionDelegate session.executionResult((int) method.getId(), result); - + } @Override @@ -154,7 +155,7 @@ public class ServerSessionDelegate extends SessionDelegate // TODO filters - Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, + Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination, method.getAcceptMode(), method.getAcquireMode(), @@ -177,7 +178,6 @@ public class ServerSessionDelegate extends SessionDelegate catch (AMQException e) { // TODO - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. throw new RuntimeException(e); } } @@ -204,7 +204,14 @@ public class ServerSessionDelegate extends SessionDelegate exchange = exchangeRegistry.getDefaultExchange(); } + + + MessageTransferMessage message = new MessageTransferMessage(xfr, ((ServerSession)ssn).getReference()); + final MessageStore store = getVirtualHost(ssn).getMessageStore(); + + store.storeMessageHeader(message.getMessageNumber(),message); + store.storeContent(message.getMessageNumber(), 0, xfr.getBody()); DeliveryProperties delvProps = null; if(message.getHeader() != null && (delvProps = message.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration()) @@ -216,10 +223,47 @@ public class ServerSessionDelegate extends SessionDelegate - if(queues != null) + if(queues != null && queues.size() != 0) { ((ServerSession) ssn).enqueue(message, queues); } + else + { + if(delvProps == null || !delvProps.hasDiscardUnroutable() || !delvProps.getDiscardUnroutable()) + { + if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT) + { + RangeSet rejects = new RangeSet(); + rejects.add(xfr.getId()); + MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable"); + ssn.invoke(reject); + } + else + { + Exchange alternate = exchange.getAlternateExchange(); + if(alternate != null) + { + queues = alternate.route(message); + if(queues != null && queues.size() != 0) + { + ((ServerSession) ssn).enqueue(message, queues); + } + else + { + //TODO - log the message discard + } + } + else + { + //TODO - log the message discard + } + + + } + } + + + } ssn.processed(xfr); @@ -346,6 +390,14 @@ public class ServerSessionDelegate extends SessionDelegate method.getDurable(), method.getAutoDelete()); + String alternateExchangeName = method.getAlternateExchange(); + if(alternateExchangeName != null && alternateExchangeName.length() != 0) + { + Exchange alternate = getExchange(session, alternateExchangeName); + exchange.setAlternateExchange(alternate); + } + + exchangeRegistry.registerExchange(exchange); } catch(AMQUnknownExchangeType e) @@ -426,7 +478,16 @@ public class ServerSessionDelegate extends SessionDelegate try { - exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused()); + Exchange exchange = getExchange(session, method.getExchange()); + + if(exchange != null && exchange.hasReferrers()) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange"); + } + else + { + exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused()); + } } catch (ExchangeInUseException e) { @@ -491,7 +552,7 @@ public class ServerSessionDelegate extends SessionDelegate //TODO - here because of non-compiant python tests if (!method.hasBindingKey()) { - method.setBindingKey(method.getQueue()); + method.setBindingKey(method.getQueue()); } AMQQueue queue = queueRegistry.getQueue(method.getQueue()); Exchange exchange = exchangeRegistry.getExchange(method.getExchange()); @@ -512,7 +573,7 @@ public class ServerSessionDelegate extends SessionDelegate } else if(exchange.getType().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match"))) { - exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header"); + exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header"); } else { @@ -622,7 +683,7 @@ public class ServerSessionDelegate extends SessionDelegate { result.setQueueNotFound(true); } - + if(exchange != null && queue != null) { @@ -651,7 +712,7 @@ public class ServerSessionDelegate extends SessionDelegate else if (method.hasArguments()) { // TODO - + } result.setQueueNotMatched(!exchange.isBound(queue)); @@ -748,6 +809,13 @@ public class ServerSessionDelegate extends SessionDelegate if(method.getExclusive()) { queue.setPrincipalHolder((ServerSession)session); + queue.setExclusiveOwner(session); + } + final String alternateExchangeName = method.getAlternateExchange(); + if(alternateExchangeName != null && alternateExchangeName.length() != 0) + { + Exchange alternate = getExchange(session, alternateExchangeName); + queue.setAlternateExchange(alternate); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 8ce9cbc480..eccdce1ebc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -54,6 +54,7 @@ import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticat import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.TransactionLog; import javax.management.NotCompliantMBeanException; import java.util.Collections; @@ -88,6 +89,7 @@ public class VirtualHost implements Accessable private final Timer _houseKeepingTimer; private VirtualHostConfiguration _configuration; + private DurableConfigurationStore _durableConfigurationStore; public void setAccessableName(String name) { @@ -181,7 +183,7 @@ public class VirtualHost implements Accessable StartupRoutingTable configFileRT = new StartupRoutingTable(); - _messageStore = configFileRT; + _durableConfigurationStore = configFileRT; // This needs to be after the RT has been defined as it creates the default durable exchanges. _exchangeRegistry.initialise(); @@ -211,6 +213,7 @@ public class VirtualHost implements Accessable if (store != null) { _messageStore = store; + _durableConfigurationStore = store; } else { @@ -302,6 +305,7 @@ public class VirtualHost implements Accessable MessageStore messageStore = (MessageStore) o; messageStore.configure(this, "store", hostConfig); _messageStore = messageStore; + _durableConfigurationStore = messageStore; } private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException @@ -413,11 +417,16 @@ public class VirtualHost implements Accessable return _messageStore; } - public DurableConfigurationStore getDurableConfigurationStore() + public TransactionLog getTransactionLog() { return _messageStore; } + public DurableConfigurationStore getDurableConfigurationStore() + { + return _durableConfigurationStore; + } + public AuthenticationManager getAuthenticationManager() { return _authenticationManager; @@ -475,7 +484,7 @@ public class VirtualHost implements Accessable * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded. * This should be removed after the _RT has been fully split from the the TL */ - private class StartupRoutingTable implements MessageStore + private class StartupRoutingTable implements DurableConfigurationStore { public List<Exchange> exchange = new LinkedList<Exchange>(); public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>(); @@ -535,87 +544,6 @@ public class VirtualHost implements Accessable { } - public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void beginTran(StoreContext context) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void commitTran(StoreContext context) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public StoreFuture commitTranAsync(StoreContext context) throws AMQException - { - commitTran(context); - return new StoreFuture() - { - public boolean isComplete() - { - return true; - } - - public void waitForCompletion() - { - - } - }; - - } - - public void abortTran(StoreContext context) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean inTran(StoreContext context) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public Long getNewMessageId() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void storeContentBodyChunk( - Long messageId, - int index, - ContentChunk contentBody, - boolean lastContentBody) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public MessageMetaData getMessageMetaData(Long messageId) throws AMQException - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isPersistent() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } private class CreateQueueTuple { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 2747094caf..268451c74a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -413,6 +413,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase //To change body of implemented methods use File | Settings | File Templates. } + public void routeToAlternate() + { + //To change body of implemented methods use File | Settings | File Templates. + } + public boolean isQueueDeleted() { return false; //To change body of implemented methods use File | Settings | File Templates. diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index bd0f273742..f093e42874 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -33,7 +33,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -public class DestWildExchangeTest extends TestCase +public class TopicExchangeTest extends TestCase { TopicExchange _exchange; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 77a1412f5f..912233f5cd 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -336,6 +336,16 @@ public class MockAMQQueue implements AMQQueue return false; //To change body of implemented methods use File | Settings | File Templates. } + public Exchange getAlternateExchange() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setAlternateExchange(Exchange exchange) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public Map<String, Object> getArguments() { return null; //To change body of implemented methods use File | Settings | File Templates. diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index cff6d4d22a..d1a43bc5b6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -69,6 +69,11 @@ public class MockQueueEntry implements QueueEntry } + public void routeToAlternate() + { + + } + public void dispose() { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index eb83f6ae1f..ba04a14e09 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -30,9 +30,11 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.ServerMessage; import java.util.List; import java.util.concurrent.atomic.AtomicLong; +import java.nio.ByteBuffer; /** * A message store that does nothing. Designed to be used in tests that do not want to use any message store @@ -160,6 +162,21 @@ public class SkeletonMessageStore implements MessageStore return false; } + public void storeMessageHeader(Long messageNumber, ServerMessage message) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void storeContent(Long messageNumber, long offset, ByteBuffer body) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public ServerMessage getMessage(Long messageNumber) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public void removeQueue(final AMQQueue queue) throws AMQException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 06a1fe2696..ac7f441a13 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -110,10 +110,6 @@ import org.slf4j.LoggerFactory; * <tr><td> * </table> * - * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the - * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing - * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec - * filter before it mean not doing the read/write asynchronously but in the main filter thread? * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could @@ -172,10 +168,10 @@ public class AMQProtocolHandler implements ProtocolEngine private Job _writeJob; private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); private NetworkDriver _networkDriver; - + private long _writtenBytes; private long _readBytes; - + /** * Creates a new protocol handler, associated with the specified client connection instance. * @@ -215,10 +211,6 @@ public class AMQProtocolHandler implements ProtocolEngine * process will be started, provided that it is the clients policy to allow failover, and provided that a failover * has not already been started or failed. * - * <p/>It is important to note that when the connection dies this method may be called or {@link #exceptionCaught} - * may be called first followed by this method. This depends on whether the client was trying to send data at the - * time of the failure. - * * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and * not otherwise? The above comment doesn't make that clear. */ @@ -261,7 +253,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.debug("sessionClose() not allowed to failover"); _connection.exceptionReceived(new AMQDisconnectedException( - "Server closed connection and reconnection " + "not permitted.", + "Server closed connection and reconnection " + "not permitted.", _stateManager.getLastException())); } else @@ -285,7 +277,6 @@ public class AMQProtocolHandler implements ProtocolEngine failoverThread.start(); } - @Override public void readerIdle() { _logger.debug("Protocol Session [" + this + "] idle: reader"); @@ -294,8 +285,7 @@ public class AMQProtocolHandler implements ProtocolEngine _logger.warn("Timed out while waiting for heartbeat from peer."); _networkDriver.close(); } - - @Override + public void writerIdle() { _logger.debug("Protocol Session [" + this + "] idle: reader"); @@ -368,7 +358,7 @@ public class AMQProtocolHandler implements ProtocolEngine public void propagateExceptionToAllWaiters(Exception e) { getStateManager().error(e); - + propagateExceptionToFrameListeners(e); } @@ -423,7 +413,6 @@ public class AMQProtocolHandler implements ProtocolEngine private static int _messageReceivedCount; - @Override public void received(ByteBuffer msg) { try @@ -433,7 +422,6 @@ public class AMQProtocolHandler implements ProtocolEngine Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() { - @Override public void run() { // Decode buffer @@ -568,7 +556,6 @@ public class AMQProtocolHandler implements ProtocolEngine _writtenBytes += buf.remaining(); Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable() { - @Override public void run() { _networkDriver.send(buf); @@ -589,7 +576,7 @@ public class AMQProtocolHandler implements ProtocolEngine } _connection.bytesSent(_writtenBytes); - + if (wait) { _networkDriver.flush(); @@ -649,7 +636,7 @@ public class AMQProtocolHandler implements ProtocolEngine _frameListeners.add(listener); //FIXME: At this point here we should check or before add we should check _stateManager is in an open - // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 + // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 } writeFrame(frame); @@ -835,7 +822,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _networkDriver = driver; } - + /** @param delay delay in seconds (not ms) */ void initHeartbeats(int delay) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 5bfc189b02..31953ea6ab 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -28,38 +28,34 @@ import org.apache.qpid.transport.Receiver; /** * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received - * decodes it and then process the result. - */ -public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> -{ - // Sets the network driver providing data for this ProtocolEngine + * decodes it and then process the result. + */ +public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> +{ + // Sets the network driver providing data for this ProtocolEngine void setNetworkDriver (NetworkDriver driver); - - // Returns the remote address of the NetworkDriver + + // Returns the remote address of the NetworkDriver SocketAddress getRemoteAddress(); - // Returns the local address of the NetworkDriver + // Returns the local address of the NetworkDriver SocketAddress getLocalAddress(); - - // Returns number of bytes written + + // Returns number of bytes written long getWrittenBytes(); - - // Returns number of bytes read + + // Returns number of bytes read long getReadBytes(); - - // Called by the NetworkDriver when the socket has been closed for reading + + // Called by the NetworkDriver when the socket has been closed for reading void closed(); - - // Called when the NetworkEngine has not written data for the specified period of time (will trigger a - // heartbeat) + + // Called when the NetworkEngine has not written data for the specified period of time (will trigger a + // heartbeat) void writerIdle(); - - // Called when the NetworkEngine has not read data for the specified period of time (will close the connection) + + // Called when the NetworkEngine has not read data for the specified period of time (will close the connection) void readerIdle(); - - /** - * Accepts an AMQFrame for writing to the network. The ProtocolEngine encodes the frame into bytes and - * passes the data onto the NetworkDriver for sending - */ - void writeFrame(AMQDataBlock frame); -}
\ No newline at end of file + + +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java index 408c95e075..2132fc2c03 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java @@ -39,7 +39,7 @@ import static org.apache.qpid.transport.network.InputHandler.State.*; * @author Rafael H. Schloming */ -public final class InputHandler implements Receiver<ByteBuffer> +public class InputHandler implements Receiver<ByteBuffer> { public enum State diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 5bce98aeb0..bbb889107c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -31,9 +31,11 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.message.ServerMessage; import java.util.HashMap; import java.util.Iterator; +import java.nio.ByteBuffer; public class SlowMessageStore implements MessageStore { @@ -317,4 +319,19 @@ public class SlowMessageStore implements MessageStore return _realStore.isPersistent(); } + public void storeMessageHeader(Long messageNumber, ServerMessage message) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void storeContent(Long messageNumber, long offset, ByteBuffer body) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public ServerMessage getMessage(Long messageNumber) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + } |