summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java88
1 files changed, 61 insertions, 27 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 1b0ea41e0b..df2754c16b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -29,15 +29,19 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.flow.*;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.*;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Map;
public class ServerSessionDelegate extends SessionDelegate
{
@@ -205,20 +209,19 @@ public class ServerSessionDelegate extends SessionDelegate
}
-
-
- MessageTransferMessage message = new MessageTransferMessage(xfr, ((ServerSession)ssn).getReference());
- final MessageStore store = getVirtualHost(ssn).getMessageStore();
-
- store.storeMessageHeader(message.getMessageNumber(),message);
- store.storeContent(message.getMessageNumber(), 0, xfr.getBody());
-
DeliveryProperties delvProps = null;
- if(message.getHeader() != null && (delvProps = message.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+ if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
{
delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
}
+ MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
+ final MessageStore store = getVirtualHost(ssn).getMessageStore();
+ StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
+ storeMessage.addContent(0,xfr.getBody());
+ storeMessage.flushToStore();
+ MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
+
ArrayList<AMQQueue> queues = exchange.route(message);
@@ -267,8 +270,6 @@ public class ServerSessionDelegate extends SessionDelegate
ssn.processed(xfr);
-
- super.messageTransfer(ssn, xfr); //To change body of overridden methods use File | Settings | File Templates.
}
@Override
@@ -397,6 +398,11 @@ public class ServerSessionDelegate extends SessionDelegate
exchange.setAlternateExchange(alternate);
}
+ if (exchange.isDurable() && !exchange.isAutoDelete())
+ {
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ store.createExchange(exchange);
+ }
exchangeRegistry.registerExchange(exchange);
}
@@ -407,7 +413,6 @@ public class ServerSessionDelegate extends SessionDelegate
catch (AMQException e)
{
//TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
@@ -431,7 +436,7 @@ public class ServerSessionDelegate extends SessionDelegate
ex.setDescription(description);
session.invoke(ex);
- //session.close();
+
}
private Exchange getExchange(Session session, String exchangeName)
@@ -487,6 +492,13 @@ public class ServerSessionDelegate extends SessionDelegate
else
{
exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused());
+
+ if (exchange.isDurable() && !exchange.isAutoDelete())
+ {
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ store.removeExchange(exchange);
+ }
+
}
}
catch (ExchangeInUseException e)
@@ -496,7 +508,6 @@ public class ServerSessionDelegate extends SessionDelegate
catch (AMQException e)
{
// TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
}
@@ -585,6 +596,7 @@ public class ServerSessionDelegate extends SessionDelegate
if (!exchange.isBound(routingKey, fieldTable, queue))
{
queue.bind(exchange, routingKey, fieldTable);
+
}
else
{
@@ -607,7 +619,7 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void exchangeUnbind(Session session, ExchangeUnbind method)
{
- VirtualHost virtualHost = getVirtualHost(session);
+ VirtualHost virtualHost = getVirtualHost(session);
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
@@ -643,7 +655,6 @@ public class ServerSessionDelegate extends SessionDelegate
}
catch (AMQException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
}
@@ -761,6 +772,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
VirtualHost virtualHost = getVirtualHost(session);
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
String queueName = method.getQueue();
@@ -818,10 +830,35 @@ public class ServerSessionDelegate extends SessionDelegate
queue.setAlternateExchange(alternate);
}
+ if(method.hasArguments() && method.getArguments() != null)
+ {
+ if(method.getArguments().containsKey("no-local"))
+ {
+ Object no_local = method.getArguments().get("no-local");
+ if(no_local instanceof Boolean && ((Boolean)no_local))
+ {
+ queue.setNoLocal(true);
+ }
+ }
+ }
+
if (queue.isDurable() && !queue.isAutoDelete())
{
- //store.createQueue(queue, body.getArguments());
+ if(method.hasArguments() && method.getArguments() != null)
+ {
+ Map<String,Object> args = method.getArguments();
+ FieldTable ftArgs = new FieldTable();
+ for(Map.Entry<String, Object> entry : args.entrySet())
+ {
+ ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue());
+ }
+ store.createQueue(queue, ftArgs);
+ }
+ else
+ {
+ store.createQueue(queue);
+ }
}
queueRegistry.registerQueue(queue);
boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister();
@@ -853,8 +890,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
catch (AMQException e)
{
- //TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
}
}
};
@@ -896,7 +932,6 @@ public class ServerSessionDelegate extends SessionDelegate
}
catch (AMQException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
}
@@ -948,7 +983,6 @@ public class ServerSessionDelegate extends SessionDelegate
catch (AMQException e)
{
//TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
}
@@ -1018,19 +1052,19 @@ public class ServerSessionDelegate extends SessionDelegate
try
{
int purged = queue.delete();
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ store.removeQueue(queue);
+ }
+
}
catch (AMQException e)
{
//TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw new RuntimeException(e);
}
-
- /* if (queue.isDurable())
- {
- store.removeQueue(queue);
- }*/
}
}