diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java | 88 |
1 files changed, 61 insertions, 27 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 1b0ea41e0b..df2754c16b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -29,15 +29,19 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.message.MessageMetaData_0_10; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.flow.*; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.*; import java.util.ArrayList; import java.util.Collection; +import java.util.Map; public class ServerSessionDelegate extends SessionDelegate { @@ -205,20 +209,19 @@ public class ServerSessionDelegate extends SessionDelegate } - - - MessageTransferMessage message = new MessageTransferMessage(xfr, ((ServerSession)ssn).getReference()); - final MessageStore store = getVirtualHost(ssn).getMessageStore(); - - store.storeMessageHeader(message.getMessageNumber(),message); - store.storeContent(message.getMessageNumber(), 0, xfr.getBody()); - DeliveryProperties delvProps = null; - if(message.getHeader() != null && (delvProps = message.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration()) + if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration()) { delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); } + MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); + final MessageStore store = getVirtualHost(ssn).getMessageStore(); + StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData); + storeMessage.addContent(0,xfr.getBody()); + storeMessage.flushToStore(); + MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference()); + ArrayList<AMQQueue> queues = exchange.route(message); @@ -267,8 +270,6 @@ public class ServerSessionDelegate extends SessionDelegate ssn.processed(xfr); - - super.messageTransfer(ssn, xfr); //To change body of overridden methods use File | Settings | File Templates. } @Override @@ -397,6 +398,11 @@ public class ServerSessionDelegate extends SessionDelegate exchange.setAlternateExchange(alternate); } + if (exchange.isDurable() && !exchange.isAutoDelete()) + { + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + store.createExchange(exchange); + } exchangeRegistry.registerExchange(exchange); } @@ -407,7 +413,6 @@ public class ServerSessionDelegate extends SessionDelegate catch (AMQException e) { //TODO - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. throw new RuntimeException(e); } @@ -431,7 +436,7 @@ public class ServerSessionDelegate extends SessionDelegate ex.setDescription(description); session.invoke(ex); - //session.close(); + } private Exchange getExchange(Session session, String exchangeName) @@ -487,6 +492,13 @@ public class ServerSessionDelegate extends SessionDelegate else { exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused()); + + if (exchange.isDurable() && !exchange.isAutoDelete()) + { + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + store.removeExchange(exchange); + } + } } catch (ExchangeInUseException e) @@ -496,7 +508,6 @@ public class ServerSessionDelegate extends SessionDelegate catch (AMQException e) { // TODO - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. throw new RuntimeException(e); } } @@ -585,6 +596,7 @@ public class ServerSessionDelegate extends SessionDelegate if (!exchange.isBound(routingKey, fieldTable, queue)) { queue.bind(exchange, routingKey, fieldTable); + } else { @@ -607,7 +619,7 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void exchangeUnbind(Session session, ExchangeUnbind method) { - VirtualHost virtualHost = getVirtualHost(session); + VirtualHost virtualHost = getVirtualHost(session); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); @@ -643,7 +655,6 @@ public class ServerSessionDelegate extends SessionDelegate } catch (AMQException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. throw new RuntimeException(e); } } @@ -761,6 +772,7 @@ public class ServerSessionDelegate extends SessionDelegate { VirtualHost virtualHost = getVirtualHost(session); + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); String queueName = method.getQueue(); @@ -818,10 +830,35 @@ public class ServerSessionDelegate extends SessionDelegate queue.setAlternateExchange(alternate); } + if(method.hasArguments() && method.getArguments() != null) + { + if(method.getArguments().containsKey("no-local")) + { + Object no_local = method.getArguments().get("no-local"); + if(no_local instanceof Boolean && ((Boolean)no_local)) + { + queue.setNoLocal(true); + } + } + } + if (queue.isDurable() && !queue.isAutoDelete()) { - //store.createQueue(queue, body.getArguments()); + if(method.hasArguments() && method.getArguments() != null) + { + Map<String,Object> args = method.getArguments(); + FieldTable ftArgs = new FieldTable(); + for(Map.Entry<String, Object> entry : args.entrySet()) + { + ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue()); + } + store.createQueue(queue, ftArgs); + } + else + { + store.createQueue(queue); + } } queueRegistry.registerQueue(queue); boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister(); @@ -853,8 +890,7 @@ public class ServerSessionDelegate extends SessionDelegate } catch (AMQException e) { - //TODO - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new RuntimeException(e); } } }; @@ -896,7 +932,6 @@ public class ServerSessionDelegate extends SessionDelegate } catch (AMQException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. throw new RuntimeException(e); } } @@ -948,7 +983,6 @@ public class ServerSessionDelegate extends SessionDelegate catch (AMQException e) { //TODO - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. throw new RuntimeException(e); } } @@ -1018,19 +1052,19 @@ public class ServerSessionDelegate extends SessionDelegate try { int purged = queue.delete(); + if (queue.isDurable() && !queue.isAutoDelete()) + { + DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + store.removeQueue(queue); + } + } catch (AMQException e) { //TODO - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. throw new RuntimeException(e); } - - /* if (queue.isDurable()) - { - store.removeQueue(queue); - }*/ } } |