summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-12 20:50:07 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-12 20:50:07 +0000
commitf772c4f42999d902722584623910184c8f439a1b (patch)
tree04f9a03a1e0771ad1c3c23e67eed68eb22629e17
parentf86519a1181233a5179d8a9e4bbb3f427baaf988 (diff)
downloadqpid-python-f772c4f42999d902722584623910184c8f439a1b.tar.gz
Implemented alternate exchanges, some work on integrating IO layer
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@824494 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java43
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/ExchangeReferrer.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java57
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java50
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java77
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java110
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java7
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentHolder.java41
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentIterator.java75
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyFrameIterator.java94
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java71
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java70
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java148
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java86
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java96
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (renamed from qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java)2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java17
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java29
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java48
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java17
36 files changed, 1006 insertions, 349 deletions
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
index 572b123676..b228e7270c 100644
--- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
+++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
@@ -74,6 +74,16 @@ public class TestExchange implements Exchange
return false;
}
+ public Exchange getAlternateExchange()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setAlternateExchange(Exchange exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void initialise(VirtualHost host, AMQShortString name, boolean durable, boolean autoDelete)
throws AMQException
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 7b966700fb..6d27c1c07c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -232,7 +232,7 @@ public class AMQChannel
if(!checkMessageUserId(_currentMessage.getContentHeader()))
{
- _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", createAMQMessage(_currentMessage)));
+ _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", _currentMessage));
}
else
{
@@ -240,7 +240,7 @@ public class AMQChannel
{
if (_currentMessage.isMandatory() || _currentMessage.isImmediate())
{
- _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", createAMQMessage(_currentMessage)));
+ _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", _currentMessage));
}
else
{
@@ -898,37 +898,6 @@ public class AMQChannel
return _defaultQueue;
}
- public void processReturns() throws AMQException
- {
- if (!_returnMessages.isEmpty())
- {
-
- for (RequiredDeliveryException bouncedMessage : _returnMessages)
- {
- ServerMessage serverMessage = bouncedMessage.getAMQMessage();
- if(serverMessage instanceof AMQMessage)
- {
- AMQMessage message = (AMQMessage) serverMessage;
- _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
- message.getContentHeaderBody(),
- message.getBodyFrameIterator(_session,_channelId),
- _channelId,
- bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
-
- }
- else
- {
- // TODO AMQP 0-10 Message
- throw new RuntimeException("not yet implemented conversion of 0-10 messages");
- }
- bouncedMessage.release();
- }
-
-
- _returnMessages.clear();
- }
- }
public boolean isClosing()
{
@@ -1154,12 +1123,12 @@ public class AMQChannel
private class WriteReturnAction implements Transaction.Action
{
private final AMQConstant _errorCode;
- private final AMQMessage _message;
+ private final IncomingMessage _message;
private final String _description;
public WriteReturnAction(AMQConstant errorCode,
String description,
- AMQMessage message)
+ IncomingMessage message)
{
_errorCode = errorCode;
_message = message;
@@ -1171,8 +1140,8 @@ public class AMQChannel
try
{
_session.getProtocolOutputConverter().writeReturn(_message.getMessagePublishInfo(),
- _message.getContentHeaderBody(),
- _message.getBodyFrameIterator(_session,_channelId),
+ _message.getContentHeader(),
+ new BodyFrameIterator(_session,_channelId,_message),
_channelId,
_errorCode.getCode(),
new AMQShortString(_description));
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExchangeReferrer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExchangeReferrer.java
new file mode 100755
index 0000000000..314f2761a8
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExchangeReferrer.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server;
+
+public interface ExchangeReferrer
+{
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
index baa9ba6208..de095a3bc2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -245,7 +245,7 @@ public class Main
String logConfig = commandLine.getOptionValue("l");
String logWatchConfig = commandLine.getOptionValue("w", "0");
-
+
int logWatchTime = 0;
try
{
@@ -256,7 +256,7 @@ public class Main
System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
+ "a non-negative integer. Using default of zero (no watching configured");
}
-
+
File logConfigFile;
if (logConfig != null)
{
@@ -282,8 +282,10 @@ public class Main
BrokerMessages.reload();
// AR.initialise() sets its own actor so we now need to set the actor
- // for the remainder of the startup
+ // for the remainder of the startup
CurrentActor.set(new BrokerActor(config.getRootMessageLogger()));
+ CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger()));
+
try
{
configureLoggingManagementMBean(logConfigFile, logWatchTime);
@@ -339,40 +341,53 @@ public class Main
if (!serverConfig.getSSLOnly())
{
NetworkDriver driver = new MINANetworkDriver();
- driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(),
+ driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(),
serverConfig.getNetworkConfiguration(), null);
- ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
new QpidAcceptor(driver,"TCP"));
CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port));
}
-
+
if (serverConfig.getEnableSSL())
{
sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
NetworkDriver driver = new MINANetworkDriver();
- driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
+ driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
- ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
new QpidAcceptor(driver,"TCP"));
CurrentActor.get().message(BrokerMessages.BRK_1002("TCP/SSL", serverConfig.getSSLPort()));
}
-
+
//fixme qpid.AMQP should be using qpidproperties to get value
_brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
+ " build: " + QpidProperties.getBuildVersion());
CurrentActor.get().message(BrokerMessages.BRK_1004());
-
- // TODO - Fix to use a proper binding
int port_0_10 = port + 1;
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
-
final ConnectionDelegate delegate =
- new org.apache.qpid.server.transport.ServerConnectionDelegate(appRegistry, "localhost");
-
-
+ new org.apache.qpid.server.transport.ServerConnectionDelegate(appRegistry, bindAddress.getCanonicalHostName());
+
+/*
+ NetworkDriver driver = new MINANetworkDriver();
+ driver.bind(port, new InetAddress[]{bindAddress}, new ProtocolEngineFactory_0_10(delegate),
+ serverConfig.getNetworkConfiguration(), null);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port));
+*/
+
+
+ // TODO - Fix to use a proper binding
+
+
+
+
+
+
ConnectionBinding cb = new ConnectionBinding()
{
public Connection connection()
@@ -382,7 +397,7 @@ public class Main
return conn;
}
};
-
+
org.apache.qpid.transport.network.io.IoAcceptor ioa = new org.apache.qpid.transport.network.io.IoAcceptor
("0.0.0.0", port_0_10, cb);
ioa.start();
@@ -426,11 +441,11 @@ public class Main
{
System.setProperty("log4j.defaultInitOverride", "true");
}
-
+
//now that the override status is know, we can instantiate the Loggers
_logger = Logger.getLogger(Main.class);
_brokerLogger = Logger.getLogger("Qpid.Broker");
-
+
new Main(args);
}
@@ -466,7 +481,7 @@ public class Main
{
if (logConfigFile.exists() && logConfigFile.canRead())
{
- CurrentActor.get().message(BrokerMessages.BRK_1007(logConfigFile.getAbsolutePath()));
+ CurrentActor.get().message(BrokerMessages.BRK_1007(logConfigFile.getAbsolutePath()));
System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath());
if (logWatchTime > 0)
{
@@ -498,7 +513,7 @@ public class Main
{
System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
System.err.println("Using the fallback internal log4j.properties configuration");
-
+
InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties");
if(propsFile == null)
{
@@ -516,7 +531,7 @@ public class Main
private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception
{
LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime);
-
+
try
{
blm.register();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java
new file mode 100755
index 0000000000..4c84b47d43
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.network.Disassembler;
+
+public class ProtocolEngineFactory_0_10 implements ProtocolEngineFactory
+{
+ private ConnectionDelegate _delegate;
+
+ public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
+
+ public ProtocolEngineFactory_0_10(ConnectionDelegate delegate)
+ {
+ _delegate = delegate;
+ }
+
+ public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+ {
+ Connection conn = new Connection();
+ conn.setConnectionDelegate(_delegate);
+ Disassembler dis = new Disassembler(networkDriver, MAX_FRAME_SIZE);
+ conn.setSender(dis);
+ return new ProtocolEngine_0_10(conn, networkDriver); //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java
new file mode 100755
index 0000000000..7ddd3da1a6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.Assembler;
+
+import java.net.SocketAddress;
+
+public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine
+{
+ private NetworkDriver _networkDriver;
+ private long _readBytes;
+ private long _writtenBytes;
+
+ public ProtocolEngine_0_10(Connection conn, NetworkDriver networkDriver)
+ {
+ super(new Assembler(conn));
+ _networkDriver = networkDriver;
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return _writtenBytes;
+ }
+
+ public void writerIdle()
+ {
+ //Todo
+ }
+
+ public void readerIdle()
+ {
+ //Todo
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index f3bc543562..cacd294464 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -46,14 +46,16 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.ExchangeReferrer;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public abstract class AbstractExchange implements Exchange, Managable
{
private AMQShortString _name;
-
+ private Exchange _alternateExchange;
protected boolean _durable;
protected String _exchangeType;
@@ -70,6 +72,7 @@ public abstract class AbstractExchange implements Exchange, Managable
//The logSubject for ths exchange
private LogSubject _logSubject;
+ private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>();
/**
* Abstract MBean class. This has some of the methods implemented from
@@ -197,6 +200,10 @@ public abstract class AbstractExchange implements Exchange, Managable
{
_exchangeMbean.unregister();
}
+ if(_alternateExchange != null)
+ {
+ _alternateExchange.removeReference(this);
+ }
CurrentActor.get().message(_logSubject, ExchangeMessages.EXH_1002());
}
@@ -237,4 +244,37 @@ public abstract class AbstractExchange implements Exchange, Managable
return isBound(new AMQShortString(bindingKey));
}
+ public Exchange getAlternateExchange()
+ {
+ return _alternateExchange;
+ }
+
+ public void setAlternateExchange(Exchange exchange)
+ {
+ if(_alternateExchange != null)
+ {
+ _alternateExchange.removeReference(this);
+ }
+ if(exchange != null)
+ {
+ exchange.addReference(this);
+ }
+ _alternateExchange = exchange;
+
+ }
+
+ public void removeReference(ExchangeReferrer exchange)
+ {
+ _referrers.remove(exchange);
+ }
+
+ public void addReference(ExchangeReferrer exchange)
+ {
+ _referrers.put(exchange, Boolean.TRUE);
+ }
+
+ public boolean hasReferrers()
+ {
+ return !_referrers.isEmpty();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 441996c212..2bcf5e3053 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -27,11 +27,12 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.ExchangeReferrer;
import java.util.ArrayList;
import java.util.Map;
-public interface Exchange
+public interface Exchange extends ExchangeReferrer
{
AMQShortString getName();
@@ -100,4 +101,13 @@ public interface Exchange
boolean isBound(String bindingKey);
+ Exchange getAlternateExchange();
+
+ void setAlternateExchange(Exchange exchange);
+
+ void removeReference(ExchangeReferrer exchange);
+
+ void addReference(ExchangeReferrer exchange);
+
+ boolean hasReferrers();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
index df4da2a79e..3d31a705fe 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.logging.actors;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.RootMessageLogger;
import java.util.EmptyStackException;
import java.util.Stack;
@@ -66,6 +69,8 @@ public class CurrentActor
}
};
+ private static LogActor _defaultActor;
+
/**
* Set a new LogActor to be the Current Actor
* <p/>
@@ -105,7 +110,12 @@ public class CurrentActor
}
catch (EmptyStackException ese)
{
- return null;
+ return _defaultActor;
}
}
+
+ public static void setDefault(LogActor defaultActor)
+ {
+ _defaultActor = defaultActor;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
index b9143ece91..580e7d21f0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
@@ -41,8 +41,14 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage
public MessageTransferMessage(MessageTransfer xfr, WeakReference<Session> sessionRef)
{
+ this(_numberSource.getAndIncrement(), xfr, sessionRef);
+ }
+
+ public MessageTransferMessage(long messageNumber, MessageTransfer xfr, WeakReference<Session> sessionRef)
+ {
+
_xfr = xfr;
- _messageNumber = _numberSource.getAndIncrement();
+ _messageNumber = messageNumber;
Header header = _xfr.getHeader();
if(header != null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 7004bac519..1d4fd6b1ea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -22,22 +22,17 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.message.*;
import java.util.Iterator;
-import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.CopyOnWriteArrayList;
/**
* A deliverable message.
@@ -74,107 +69,6 @@ public class AMQMessage implements ServerMessage
/**
- * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
- * therefore is memory-efficient.
- */
- private class BodyFrameIterator implements Iterator<AMQDataBlock>
- {
- private int _channel;
-
- private int _index = -1;
- private AMQProtocolSession _protocolSession;
-
- private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
- {
- _channel = channel;
- _protocolSession = protocolSession;
- }
-
- public boolean hasNext()
- {
- try
- {
- return _index < (_messageHandle.getBodyCount() - 1);
- }
- catch (AMQException e)
- {
- _log.error("Unable to get body count: " + e, e);
-
- return false;
- }
- }
-
- public AMQDataBlock next()
- {
- try
- {
-
- AMQBody cb =
- getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(
- ++_index));
-
- return new AMQFrame(_channel, cb);
- }
- catch (AMQException e)
- {
- // have no choice but to throw a runtime exception
- throw new RuntimeException("Error getting content body: " + e, e);
- }
-
- }
-
- private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
- {
- return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
-
- private class BodyContentIterator implements Iterator<ContentChunk>
- {
-
- private int _index = -1;
-
- public boolean hasNext()
- {
- try
- {
- return _index < (_messageHandle.getBodyCount() - 1);
- }
- catch (AMQException e)
- {
- _log.error("Error getting body count: " + e, e);
-
- return false;
- }
- }
-
- public ContentChunk next()
- {
- try
- {
- return _messageHandle.getContentChunk(++_index);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Error getting content body: " + e, e);
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
-
-
- /**
* Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
* enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
* queues.
@@ -260,12 +154,12 @@ public class AMQMessage implements ServerMessage
public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
{
- return new BodyFrameIterator(protocolSession, channel);
+ return new BodyFrameIterator(protocolSession, channel, _messageHandle);
}
public Iterator<ContentChunk> getContentBodyIterator()
{
- return new BodyContentIterator();
+ return new BodyContentIterator(_messageHandle);
}
public ContentHeaderBody getContentHeaderBody() throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
index d8ab6ee9b2..5798dca079 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
@@ -29,7 +29,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
* A pluggable way of getting message data. Implementations can provide intelligent caching for example or
* even no caching at all to minimise the broker memory footprint.
*/
-public interface AMQMessageHandle
+public interface AMQMessageHandle extends BodyContentHolder
{
ContentHeaderBody getContentHeaderBody() throws AMQException;
@@ -41,23 +41,10 @@ public interface AMQMessageHandle
/**
- * @return the number of body frames associated with this message
- */
- int getBodyCount() throws AMQException;
-
- /**
* @return the size of the body
*/
long getBodySize() throws AMQException;
- /**
- * Get a particular content body
- * @param index the index of the body to retrieve, must be between 0 and getBodyCount() - 1
- * @return a content body
- * @throws IllegalArgumentException if the index is invalid
- */
- ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException;
-
void addContentBodyFrame(ContentChunk contentBody, boolean isLastContentBody) throws AMQException;
MessagePublishInfo getMessagePublishInfo() throws AMQException;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index c5691792d4..95ca18c778 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.ExchangeReferrer;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -39,7 +40,7 @@ import java.util.List;
import java.util.Set;
import java.util.Map;
-public interface AMQQueue extends Managable, Comparable<AMQQueue>
+public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer
{
@@ -207,6 +208,10 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
boolean isExclusive();
+ Exchange getAlternateExchange();
+
+ void setAlternateExchange(Exchange exchange);
+
Map<String, Object> getArguments();
void checkCapacity(AMQChannel channel);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentHolder.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentHolder.java
new file mode 100755
index 0000000000..4ab6278715
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentHolder.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+
+public interface BodyContentHolder
+{
+ /**
+ * @return the number of body frames associated with this message
+ */
+ int getBodyCount() throws AMQException;
+
+ /**
+ * Get a particular content body
+ * @param index the index of the body to retrieve, must be between 0 and getBodyCount() - 1
+ * @return a content body
+ * @throws IllegalArgumentException if the index is invalid
+ */
+ ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException;
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentIterator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentIterator.java
new file mode 100755
index 0000000000..1483177855
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentIterator.java
@@ -0,0 +1,75 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+import java.util.Iterator;
+
+
+class BodyContentIterator implements Iterator<ContentChunk>
+{
+ private static final Logger _log = Logger.getLogger(BodyContentIterator.class);
+
+
+ private AMQMessageHandle _messageHandle;
+ private int _index = -1;
+
+
+ public BodyContentIterator(AMQMessageHandle messageHandle)
+ {
+ _messageHandle = messageHandle;
+ }
+
+
+ public boolean hasNext()
+ {
+ try
+ {
+ return _index < (_messageHandle.getBodyCount() - 1);
+ }
+ catch (AMQException e)
+ {
+ _log.error("Error getting body count: " + e, e);
+
+ return false;
+ }
+ }
+
+ public ContentChunk next()
+ {
+ try
+ {
+ return _messageHandle.getContentChunk(++_index);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Error getting content body: " + e, e);
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyFrameIterator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyFrameIterator.java
new file mode 100755
index 0000000000..fd9cfe45bb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyFrameIterator.java
@@ -0,0 +1,94 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+import java.util.Iterator;
+
+
+public class BodyFrameIterator implements Iterator<AMQDataBlock>
+{
+ private static final Logger _log = Logger.getLogger(BodyFrameIterator.class);
+
+
+ private int _channel;
+
+ private int _index = -1;
+ private AMQProtocolSession _protocolSession;
+ private BodyContentHolder _bodyContentHolder;
+
+ public BodyFrameIterator(AMQProtocolSession protocolSession, int channel, BodyContentHolder messageHandle)
+ {
+ _channel = channel;
+ _protocolSession = protocolSession;
+ _bodyContentHolder = messageHandle;
+ }
+
+ public boolean hasNext()
+ {
+ try
+ {
+ return _index < (_bodyContentHolder.getBodyCount() - 1);
+ }
+ catch (AMQException e)
+ {
+ _log.error("Unable to get body count: " + e, e);
+
+ return false;
+ }
+ }
+
+ public AMQDataBlock next()
+ {
+ try
+ {
+
+ AMQBody cb =
+ getProtocolVersionMethodConverter().convertToBody(_bodyContentHolder.getContentChunk(
+ ++_index));
+
+ return new AMQFrame(_channel, cb);
+ }
+ catch (AMQException e)
+ {
+ // have no choice but to throw a runtime exception
+ throw new RuntimeException("Error getting content body: " + e, e);
+ }
+
+ }
+
+ private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
+ {
+ return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
new file mode 100755
index 0000000000..77da08d8c4
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+
+class InboundMessageAdapter implements InboundMessage
+{
+
+ private QueueEntry _entry;
+
+ InboundMessageAdapter()
+ {
+ }
+
+ InboundMessageAdapter(QueueEntry entry)
+ {
+ _entry = entry;
+ }
+
+ public void setEntry(QueueEntry entry)
+ {
+ _entry = entry;
+ }
+
+
+ public String getRoutingKey()
+ {
+ return _entry.getMessage().getRoutingKey();
+ }
+
+ public AMQMessageHeader getMessageHeader()
+ {
+ return _entry.getMessageHeader();
+ }
+
+ public boolean isPersistent()
+ {
+ return _entry.isPersistent();
+ }
+
+ public boolean isRedelivered()
+ {
+ return _entry.isRedelivered();
+ }
+
+ public long getSize()
+ {
+ return _entry.getSize();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 726d99f8b3..ac82e1f2b3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -37,8 +37,9 @@ import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import java.util.ArrayList;
+import java.util.List;
-public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage
+public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, BodyContentHolder
{
/** Used for debugging purposes. */
@@ -73,6 +74,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
private int _receivedChunkCount = 0;
+ private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
public IncomingMessage(final Long messageId,
@@ -145,6 +147,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
{
_bodyLengthReceived += contentChunk.getSize();
+ _contentChunks.add(contentChunk);
_messageHandle.addContentBodyFrame(contentChunk, allContentReceived());
return _receivedChunkCount++;
}
@@ -246,4 +249,13 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
}
+ public int getBodyCount() throws AMQException
+ {
+ return _contentChunks.size();
+ }
+
+ public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException
+ {
+ return _contentChunks.get(index);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 87b0267b0f..6a303fd156 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.message.ServerMessage;
public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
+
public static enum State
{
AVAILABLE,
@@ -199,6 +200,8 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
void discard();
+ void routeToAlternate();
+
boolean isQueueDeleted();
void addStateChangeListener(StateChangeListener listener);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index f10a4175db..74c034d305 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -25,10 +25,14 @@ import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.Transaction;
import org.apache.log4j.Logger;
import java.util.Set;
import java.util.HashSet;
+import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -213,12 +217,21 @@ public class QueueEntryImpl implements QueueEntry
public void release()
{
_stateUpdater.set(this,AVAILABLE_STATE);
- getQueue().requeue(this);
- if(_stateChangeListeners != null)
+ if(!getQueue().isDeleted())
{
- notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ getQueue().requeue(this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ }
+
+ }
+ else if(acquire())
+ {
+ routeToAlternate();
}
+
}
public boolean releaseButRetain()
@@ -386,6 +399,57 @@ public class QueueEntryImpl implements QueueEntry
dispose();
}
+ public void routeToAlternate()
+ {
+ final AMQQueue currentQueue = getQueue();
+ Exchange alternateExchange = currentQueue.getAlternateExchange();
+
+ if(alternateExchange != null)
+ {
+ final List<AMQQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+ final ServerMessage message = getMessage();
+ if(rerouteQueues != null && rerouteQueues.size() != 0)
+ {
+ Transaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+
+ txn.enqueue(rerouteQueues, message, new Transaction.Action() {
+ public void postCommit()
+ {
+ try
+ {
+ for(AMQQueue queue : rerouteQueues)
+ {
+ QueueEntry entry = queue.enqueue(message);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ txn.dequeue(currentQueue,message,
+ new Transaction.Action()
+ {
+ public void postCommit()
+ {
+ discard();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+ }
+ }
+
public boolean isQueueDeleted()
{
return getQueue().isDeleted();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 8865c7946a..de74974d2d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -19,8 +19,8 @@ import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -61,12 +61,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+ private int unused;
private PrincipalHolder _prinicpalHolder;
private Object _exclusiveOwner;
+ private Exchange _alternateExchange;
+
static final class QueueContext implements Context
{
@@ -267,6 +270,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return _exclusiveOwner != null;
}
+ public Exchange getAlternateExchange()
+ {
+ return _alternateExchange;
+ }
+
+ public void setAlternateExchange(Exchange exchange)
+ {
+ if(_alternateExchange != null)
+ {
+ _alternateExchange.removeReference(this);
+ }
+ if(exchange != null)
+ {
+ exchange.addReference(this);
+ }
+ _alternateExchange = exchange;
+ }
+
public Map<String, Object> getArguments()
{
return null;
@@ -922,7 +943,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- MessageStore store = getVirtualHost().getMessageStore();
+ TransactionLog txnLog = getVirtualHost().getTransactionLog();
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -943,7 +964,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
- store.beginTran(storeContext);
+ txnLog.beginTran(storeContext);
// Move the messages in on the message store.
for (QueueEntry entry : entries)
@@ -952,7 +973,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (message.isPersistent() && toQueue.isDurable())
{
- store.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
+ txnLog.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
}
// dequeue does not decrement the refence count
entry.dequeue();
@@ -961,7 +982,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Commit and flush the move transcations.
try
{
- store.commitTran(storeContext);
+ txnLog.commitTran(storeContext);
}
catch (AMQException e)
{
@@ -972,7 +993,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
try
{
- store.abortTran(storeContext);
+ txnLog.abortTran(storeContext);
}
catch (AMQException rollbackEx)
{
@@ -1007,7 +1028,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
final StoreContext storeContext)
{
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- MessageStore store = getVirtualHost().getMessageStore();
+ TransactionLog txnLog = getVirtualHost().getTransactionLog();
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -1027,7 +1048,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
- store.beginTran(storeContext);
+ txnLog.beginTran(storeContext);
// Move the messages in on the message store.
for (QueueEntry entry : entries)
@@ -1037,7 +1058,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (message.isPersistent() && toQueue.isDurable())
{
- store.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
+ txnLog.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
}
}
@@ -1045,7 +1066,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Commit and flush the move transcations.
try
{
- store.commitTran(storeContext);
+ txnLog.commitTran(storeContext);
}
catch (AMQException e)
{
@@ -1056,7 +1077,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
try
{
- store.abortTran(storeContext);
+ txnLog.abortTran(storeContext);
}
catch (AMQException rollbackEx)
{
@@ -1139,7 +1160,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntryIterator queueListIterator = _entries.iterator();
long count = 0;
- Transaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
+ Transaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
while (queueListIterator.advance())
{
@@ -1160,7 +1181,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private void dequeueEntry(final QueueEntry node)
{
- Transaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore());
+ Transaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog());
dequeueEntry(node, txn);
}
@@ -1206,7 +1227,108 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_bindings.deregister();
_virtualHost.getQueueRegistry().unregisterQueue(_name);
+ List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+
+ public boolean accept(QueueEntry entry)
+ {
+ return entry.acquire();
+ }
+
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
+
+ Transaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+
+ if(_alternateExchange != null)
+ {
+
+ InboundMessageAdapter adapter = new InboundMessageAdapter();
+ for(final QueueEntry entry : entries)
+ {
+ adapter.setEntry(entry);
+ final List<AMQQueue> rerouteQueues = _alternateExchange.route(adapter);
+ final ServerMessage message = entry.getMessage();
+ if(rerouteQueues != null & rerouteQueues.size() != 0)
+ {
+ txn.enqueue(rerouteQueues, entry.getMessage(),
+ new Transaction.Action()
+ {
+
+ public void postCommit()
+ {
+ try
+ {
+ for(AMQQueue queue : rerouteQueues)
+ {
+ QueueEntry entry = queue.enqueue(message);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ txn.dequeue(this, entry.getMessage(),
+ new Transaction.Action()
+ {
+
+ public void postCommit()
+ {
+ entry.discard();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
+
+ }
+
+ _alternateExchange.removeReference(this);
+ }
+ else
+ {
+ // TODO log discard
+
+ for(final QueueEntry entry : entries)
+ {
+ final ServerMessage message = entry.getMessage();
+ if(message != null)
+ {
+ txn.dequeue(this, message,
+ new Transaction.Action()
+ {
+
+ public void postCommit()
+ {
+ entry.discard();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
+ }
+ }
+
+ txn.commit();
+
+
_managedObject.unregister();
+
for (Task task : _deleteTaskList)
{
task.doTask(this);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 6e86f4b70d..6dce764e63 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.ServerMessage;
import java.io.ByteArrayInputStream;
@@ -1375,6 +1376,21 @@ public class DerbyMessageStore extends AbstractMessageStore
return true;
}
+ public void storeMessageHeader(Long messageNumber, ServerMessage message)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void storeContent(Long messageNumber, long offset, java.nio.ByteBuffer body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public ServerMessage getMessage(Long messageNumber)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
private void checkNotClosed() throws MessageStoreClosedException
{
if (_closed.get())
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 8a9c1571ca..ecae3cb794 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.ServerMessage;
import java.util.ArrayList;
import java.util.Collections;
@@ -39,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.nio.ByteBuffer;
/** A simple message store that stores the messages in a threadsafe structure in memory. */
public class MemoryMessageStore extends AbstractMessageStore
@@ -240,6 +242,21 @@ public class MemoryMessageStore extends AbstractMessageStore
return false;
}
+ public void storeMessageHeader(Long messageNumber, ServerMessage message)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void storeContent(Long messageNumber, long offset, ByteBuffer body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public ServerMessage getMessage(Long messageNumber)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
private void checkNotClosed() throws MessageStoreClosedException
{
if (_closed.get())
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 802c0bc709..a30e6b485c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -25,6 +25,10 @@ import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.message.ServerMessage;
+
+import java.nio.ByteBuffer;
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages, queues
@@ -134,9 +138,14 @@ public interface MessageStore extends DurableConfigurationStore, TransactionLog
/**
* Is this store capable of persisting the data
- *
+ *
* @return true if this store is capable of persisting data
*/
boolean isPersistent();
+ void storeMessageHeader(Long messageNumber, ServerMessage message);
+
+ void storeContent(Long messageNumber, long offset, ByteBuffer body);
+
+ ServerMessage getMessage(Long messageNumber);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 1b80ede81f..2669475a63 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -108,8 +108,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
_creditManager.addStateListener(this);
_state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED);
- _logSubject = new SubscriptionLogSubject(this);
- _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
}
@@ -135,6 +133,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
}
_queue = queue;
+ _logSubject = new SubscriptionLogSubject(this);
+ _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
+
}
public AMQShortString getConsumerTag()
@@ -335,7 +336,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
public void onComplete(Method method)
{
- restoreCredit(entry);
+ restoreCredit(entry);
}
});
}
@@ -409,7 +410,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private void reject(QueueEntry entry)
{
entry.setRedelivered(true);
- entry.reject(this);
+ entry.routeToAlternate();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 7891c5214c..1b0ea41e0b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.flow.*;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.*;
@@ -87,7 +88,7 @@ public class ServerSessionDelegate extends SessionDelegate
session.executionResult((int) method.getId(), result);
-
+
}
@Override
@@ -154,7 +155,7 @@ public class ServerSessionDelegate extends SessionDelegate
// TODO filters
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
+ Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
destination,
method.getAcceptMode(),
method.getAcquireMode(),
@@ -177,7 +178,6 @@ public class ServerSessionDelegate extends SessionDelegate
catch (AMQException e)
{
// TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
}
@@ -204,7 +204,14 @@ public class ServerSessionDelegate extends SessionDelegate
exchange = exchangeRegistry.getDefaultExchange();
}
+
+
+
MessageTransferMessage message = new MessageTransferMessage(xfr, ((ServerSession)ssn).getReference());
+ final MessageStore store = getVirtualHost(ssn).getMessageStore();
+
+ store.storeMessageHeader(message.getMessageNumber(),message);
+ store.storeContent(message.getMessageNumber(), 0, xfr.getBody());
DeliveryProperties delvProps = null;
if(message.getHeader() != null && (delvProps = message.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -216,10 +223,47 @@ public class ServerSessionDelegate extends SessionDelegate
- if(queues != null)
+ if(queues != null && queues.size() != 0)
{
((ServerSession) ssn).enqueue(message, queues);
}
+ else
+ {
+ if(delvProps == null || !delvProps.hasDiscardUnroutable() || !delvProps.getDiscardUnroutable())
+ {
+ if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ {
+ RangeSet rejects = new RangeSet();
+ rejects.add(xfr.getId());
+ MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+ ssn.invoke(reject);
+ }
+ else
+ {
+ Exchange alternate = exchange.getAlternateExchange();
+ if(alternate != null)
+ {
+ queues = alternate.route(message);
+ if(queues != null && queues.size() != 0)
+ {
+ ((ServerSession) ssn).enqueue(message, queues);
+ }
+ else
+ {
+ //TODO - log the message discard
+ }
+ }
+ else
+ {
+ //TODO - log the message discard
+ }
+
+
+ }
+ }
+
+
+ }
ssn.processed(xfr);
@@ -346,6 +390,14 @@ public class ServerSessionDelegate extends SessionDelegate
method.getDurable(),
method.getAutoDelete());
+ String alternateExchangeName = method.getAlternateExchange();
+ if(alternateExchangeName != null && alternateExchangeName.length() != 0)
+ {
+ Exchange alternate = getExchange(session, alternateExchangeName);
+ exchange.setAlternateExchange(alternate);
+ }
+
+
exchangeRegistry.registerExchange(exchange);
}
catch(AMQUnknownExchangeType e)
@@ -426,7 +478,16 @@ public class ServerSessionDelegate extends SessionDelegate
try
{
- exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused());
+ Exchange exchange = getExchange(session, method.getExchange());
+
+ if(exchange != null && exchange.hasReferrers())
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ }
+ else
+ {
+ exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused());
+ }
}
catch (ExchangeInUseException e)
{
@@ -491,7 +552,7 @@ public class ServerSessionDelegate extends SessionDelegate
//TODO - here because of non-compiant python tests
if (!method.hasBindingKey())
{
- method.setBindingKey(method.getQueue());
+ method.setBindingKey(method.getQueue());
}
AMQQueue queue = queueRegistry.getQueue(method.getQueue());
Exchange exchange = exchangeRegistry.getExchange(method.getExchange());
@@ -512,7 +573,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else if(exchange.getType().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
{
- exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header");
+ exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header");
}
else
{
@@ -622,7 +683,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
result.setQueueNotFound(true);
}
-
+
if(exchange != null && queue != null)
{
@@ -651,7 +712,7 @@ public class ServerSessionDelegate extends SessionDelegate
else if (method.hasArguments())
{
// TODO
-
+
}
result.setQueueNotMatched(!exchange.isBound(queue));
@@ -748,6 +809,13 @@ public class ServerSessionDelegate extends SessionDelegate
if(method.getExclusive())
{
queue.setPrincipalHolder((ServerSession)session);
+ queue.setExclusiveOwner(session);
+ }
+ final String alternateExchangeName = method.getAlternateExchange();
+ if(alternateExchangeName != null && alternateExchangeName.length() != 0)
+ {
+ Exchange alternate = getExchange(session, alternateExchangeName);
+ queue.setAlternateExchange(alternate);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 8ce9cbc480..eccdce1ebc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -54,6 +54,7 @@ import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticat
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.TransactionLog;
import javax.management.NotCompliantMBeanException;
import java.util.Collections;
@@ -88,6 +89,7 @@ public class VirtualHost implements Accessable
private final Timer _houseKeepingTimer;
private VirtualHostConfiguration _configuration;
+ private DurableConfigurationStore _durableConfigurationStore;
public void setAccessableName(String name)
{
@@ -181,7 +183,7 @@ public class VirtualHost implements Accessable
StartupRoutingTable configFileRT = new StartupRoutingTable();
- _messageStore = configFileRT;
+ _durableConfigurationStore = configFileRT;
// This needs to be after the RT has been defined as it creates the default durable exchanges.
_exchangeRegistry.initialise();
@@ -211,6 +213,7 @@ public class VirtualHost implements Accessable
if (store != null)
{
_messageStore = store;
+ _durableConfigurationStore = store;
}
else
{
@@ -302,6 +305,7 @@ public class VirtualHost implements Accessable
MessageStore messageStore = (MessageStore) o;
messageStore.configure(this, "store", hostConfig);
_messageStore = messageStore;
+ _durableConfigurationStore = messageStore;
}
private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
@@ -413,11 +417,16 @@ public class VirtualHost implements Accessable
return _messageStore;
}
- public DurableConfigurationStore getDurableConfigurationStore()
+ public TransactionLog getTransactionLog()
{
return _messageStore;
}
+ public DurableConfigurationStore getDurableConfigurationStore()
+ {
+ return _durableConfigurationStore;
+ }
+
public AuthenticationManager getAuthenticationManager()
{
return _authenticationManager;
@@ -475,7 +484,7 @@ public class VirtualHost implements Accessable
* This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded.
* This should be removed after the _RT has been fully split from the the TL
*/
- private class StartupRoutingTable implements MessageStore
+ private class StartupRoutingTable implements DurableConfigurationStore
{
public List<Exchange> exchange = new LinkedList<Exchange>();
public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>();
@@ -535,87 +544,6 @@ public class VirtualHost implements Accessable
{
}
- public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void beginTran(StoreContext context) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void commitTran(StoreContext context) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public StoreFuture commitTranAsync(StoreContext context) throws AMQException
- {
- commitTran(context);
- return new StoreFuture()
- {
- public boolean isComplete()
- {
- return true;
- }
-
- public void waitForCompletion()
- {
-
- }
- };
-
- }
-
- public void abortTran(StoreContext context) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean inTran(StoreContext context)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Long getNewMessageId()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void storeContentBodyChunk(
- Long messageId,
- int index,
- ContentChunk contentBody,
- boolean lastContentBody) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isPersistent()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
private class CreateQueueTuple
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 2747094caf..268451c74a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -413,6 +413,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void routeToAlternate()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isQueueDeleted()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index bd0f273742..f093e42874 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -33,7 +33,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-public class DestWildExchangeTest extends TestCase
+public class TopicExchangeTest extends TestCase
{
TopicExchange _exchange;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 77a1412f5f..912233f5cd 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -336,6 +336,16 @@ public class MockAMQQueue implements AMQQueue
return false; //To change body of implemented methods use File | Settings | File Templates.
}
+ public Exchange getAlternateExchange()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setAlternateExchange(Exchange exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public Map<String, Object> getArguments()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index cff6d4d22a..d1a43bc5b6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -69,6 +69,11 @@ public class MockQueueEntry implements QueueEntry
}
+ public void routeToAlternate()
+ {
+
+ }
+
public void dispose()
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index eb83f6ae1f..ba04a14e09 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -30,9 +30,11 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.ServerMessage;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+import java.nio.ByteBuffer;
/**
* A message store that does nothing. Designed to be used in tests that do not want to use any message store
@@ -160,6 +162,21 @@ public class SkeletonMessageStore implements MessageStore
return false;
}
+ public void storeMessageHeader(Long messageNumber, ServerMessage message)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void storeContent(Long messageNumber, long offset, ByteBuffer body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public ServerMessage getMessage(Long messageNumber)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void removeQueue(final AMQQueue queue) throws AMQException
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 06a1fe2696..ac7f441a13 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -110,10 +110,6 @@ import org.slf4j.LoggerFactory;
* <tr><td>
* </table>
*
- * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
- * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
- * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
- * filter before it mean not doing the read/write asynchronously but in the main filter thread?
* @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
* failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
* AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
@@ -172,10 +168,10 @@ public class AMQProtocolHandler implements ProtocolEngine
private Job _writeJob;
private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
private NetworkDriver _networkDriver;
-
+
private long _writtenBytes;
private long _readBytes;
-
+
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -215,10 +211,6 @@ public class AMQProtocolHandler implements ProtocolEngine
* process will be started, provided that it is the clients policy to allow failover, and provided that a failover
* has not already been started or failed.
*
- * <p/>It is important to note that when the connection dies this method may be called or {@link #exceptionCaught}
- * may be called first followed by this method. This depends on whether the client was trying to send data at the
- * time of the failure.
- *
* @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
* not otherwise? The above comment doesn't make that clear.
*/
@@ -261,7 +253,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.debug("sessionClose() not allowed to failover");
_connection.exceptionReceived(new AMQDisconnectedException(
- "Server closed connection and reconnection " + "not permitted.",
+ "Server closed connection and reconnection " + "not permitted.",
_stateManager.getLastException()));
}
else
@@ -285,7 +277,6 @@ public class AMQProtocolHandler implements ProtocolEngine
failoverThread.start();
}
- @Override
public void readerIdle()
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
@@ -294,8 +285,7 @@ public class AMQProtocolHandler implements ProtocolEngine
_logger.warn("Timed out while waiting for heartbeat from peer.");
_networkDriver.close();
}
-
- @Override
+
public void writerIdle()
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
@@ -368,7 +358,7 @@ public class AMQProtocolHandler implements ProtocolEngine
public void propagateExceptionToAllWaiters(Exception e)
{
getStateManager().error(e);
-
+
propagateExceptionToFrameListeners(e);
}
@@ -423,7 +413,6 @@ public class AMQProtocolHandler implements ProtocolEngine
private static int _messageReceivedCount;
- @Override
public void received(ByteBuffer msg)
{
try
@@ -433,7 +422,6 @@ public class AMQProtocolHandler implements ProtocolEngine
Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
{
- @Override
public void run()
{
// Decode buffer
@@ -568,7 +556,6 @@ public class AMQProtocolHandler implements ProtocolEngine
_writtenBytes += buf.remaining();
Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
{
- @Override
public void run()
{
_networkDriver.send(buf);
@@ -589,7 +576,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
_connection.bytesSent(_writtenBytes);
-
+
if (wait)
{
_networkDriver.flush();
@@ -649,7 +636,7 @@ public class AMQProtocolHandler implements ProtocolEngine
_frameListeners.add(listener);
//FIXME: At this point here we should check or before add we should check _stateManager is in an open
- // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255
+ // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255
}
writeFrame(frame);
@@ -835,7 +822,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_networkDriver = driver;
}
-
+
/** @param delay delay in seconds (not ms) */
void initHeartbeats(int delay)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 5bfc189b02..31953ea6ab 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -28,38 +28,34 @@ import org.apache.qpid.transport.Receiver;
/**
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
- * decodes it and then process the result.
- */
-public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
-{
- // Sets the network driver providing data for this ProtocolEngine
+ * decodes it and then process the result.
+ */
+public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
+{
+ // Sets the network driver providing data for this ProtocolEngine
void setNetworkDriver (NetworkDriver driver);
-
- // Returns the remote address of the NetworkDriver
+
+ // Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
- // Returns the local address of the NetworkDriver
+ // Returns the local address of the NetworkDriver
SocketAddress getLocalAddress();
-
- // Returns number of bytes written
+
+ // Returns number of bytes written
long getWrittenBytes();
-
- // Returns number of bytes read
+
+ // Returns number of bytes read
long getReadBytes();
-
- // Called by the NetworkDriver when the socket has been closed for reading
+
+ // Called by the NetworkDriver when the socket has been closed for reading
void closed();
-
- // Called when the NetworkEngine has not written data for the specified period of time (will trigger a
- // heartbeat)
+
+ // Called when the NetworkEngine has not written data for the specified period of time (will trigger a
+ // heartbeat)
void writerIdle();
-
- // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
+
+ // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
void readerIdle();
-
- /**
- * Accepts an AMQFrame for writing to the network. The ProtocolEngine encodes the frame into bytes and
- * passes the data onto the NetworkDriver for sending
- */
- void writeFrame(AMQDataBlock frame);
-} \ No newline at end of file
+
+
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
index 408c95e075..2132fc2c03 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
@@ -39,7 +39,7 @@ import static org.apache.qpid.transport.network.InputHandler.State.*;
* @author Rafael H. Schloming
*/
-public final class InputHandler implements Receiver<ByteBuffer>
+public class InputHandler implements Receiver<ByteBuffer>
{
public enum State
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index 5bce98aeb0..bbb889107c 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -31,9 +31,11 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.message.ServerMessage;
import java.util.HashMap;
import java.util.Iterator;
+import java.nio.ByteBuffer;
public class SlowMessageStore implements MessageStore
{
@@ -317,4 +319,19 @@ public class SlowMessageStore implements MessageStore
return _realStore.isPersistent();
}
+ public void storeMessageHeader(Long messageNumber, ServerMessage message)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void storeContent(Long messageNumber, long offset, ByteBuffer body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public ServerMessage getMessage(Long messageNumber)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
}