summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-08-14 17:14:51 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-08-14 17:14:51 +0000
commit858ddcc441ca47636a710d93f5084146ce73476c (patch)
tree110a4d66dcaed5a2b180cc0f737f21761cc259d7 /qpid/java/broker/src
parentd84a3a50dbb794c4383de7e5eca730ca602771e7 (diff)
downloadqpid-python-858ddcc441ca47636a710d93f5084146ce73476c.tar.gz
Initial checkin of AMQP 1-0 Java Prototype work
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1157566 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java8
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java501
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java339
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java65
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java390
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java426
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java98
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java90
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java59
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java145
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java99
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java35
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java51
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java303
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java27
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java367
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java406
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java487
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java195
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java39
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java5
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java2
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java26
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java3
-rwxr-xr-xqpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java17
40 files changed, 4325 insertions, 69 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
index 895ff643a2..3f6290dedb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
@@ -28,7 +28,7 @@ import org.apache.qpid.transport.codec.BBEncoder;
import java.nio.ByteBuffer;
import java.util.Set;
-public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHeader
+public class QMFMessage implements ServerMessage<QMFMessage>, InboundMessage, AMQMessageHeader
{
private ByteBuffer _content;
@@ -154,7 +154,7 @@ public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHead
return false;
}
- public MessageReference newReference()
+ public MessageReference<QMFMessage> newReference()
{
return new QMFMessageReference(this);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 4f86c82578..7cad0bc5c1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -70,6 +70,7 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -136,7 +137,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private ServerTransaction _transaction;
-
+
private final AtomicLong _txnStarts = new AtomicLong(0);
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
@@ -199,7 +200,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
// theory
return !(_transaction instanceof AutoCommitTransaction);
}
-
+
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -209,7 +210,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_txnCount.compareAndSet(0,1);
}
}
-
+
private void decrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -308,7 +309,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
try
{
_currentMessage.getStoredMessage().flushToStore();
-
+
final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
if(!checkMessageUserId(_currentMessage.getContentHeader()))
@@ -425,7 +426,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
throw new AMQException("Consumer already exists with same tag: " + tag);
}
-
+
Subscription subscription =
SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
@@ -933,7 +934,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
finally
{
_rollingBack = false;
-
+
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
@@ -1017,7 +1018,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
throws AMQException
{
getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
- deliveryTag, sub.getConsumerTag());
+ deliveryTag,
+ ((SubscriptionImpl)sub).getConsumerTag());
}
};
@@ -1402,7 +1404,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
return _createTime;
}
-
+
public void mgmtClose() throws AMQException
{
_session.mgmtCloseChannel(_channelId);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index 4512de6fb4..3a28b65a08 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -167,6 +167,11 @@ public class QueueConfiguration extends ConfigurationPlugin
return getStringValue("lvqKey", null);
}
+ public boolean isTopic()
+ {
+ return getBooleanValue("topic");
+ }
+
public static class QueueConfig extends ConfigurationPlugin
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
index e0c181a5fc..f16e30ef92 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
@@ -37,7 +37,7 @@ import java.nio.ByteBuffer;
/**
* A deliverable message.
*/
-public class AMQMessage implements ServerMessage
+public class AMQMessage implements ServerMessage<AMQMessage>
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -73,7 +73,7 @@ public class AMQMessage implements ServerMessage
{
this(handle, null);
}
-
+
public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef)
{
_handle = handle;
@@ -85,7 +85,7 @@ public class AMQMessage implements ServerMessage
{
_flags |= IMMEDIATE;
}
-
+
_channelRef = channelRef;
}
@@ -294,7 +294,7 @@ public class AMQMessage implements ServerMessage
return _expiration;
}
- public MessageReference newReference()
+ public AMQMessageReference newReference()
{
return new AMQMessageReference(this);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
new file mode 100755
index 0000000000..a5841dd94e
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
@@ -0,0 +1,501 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.amqp_1_0.codec.ValueHandler;
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+
+
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.qpid.configuration.Validator;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class MessageMetaData_1_0 implements StorableMessageMetaData
+{
+ // TODO move to somewhere more useful
+ public static final Symbol JMS_TYPE = Symbol.valueOf("x-jms-type");
+
+
+ private Header _header;
+ private Properties _properties;
+ private Map _deliveryAnnotations;
+ private Map _messageAnnotations;
+ private Map _appProperties;
+ private Map _footer;
+
+ private List<ByteBuffer> _encodedSections = new ArrayList<ByteBuffer>(3);
+
+ private volatile ByteBuffer _encoded;
+ private MessageHeader_1_0 _messageHeader;
+
+
+
+ public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder)
+ {
+ this(fragments, decoder, new ArrayList<ByteBuffer>(3));
+ }
+
+ public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder, List<ByteBuffer> immuatableSections)
+ {
+ this(constructSections(fragments, decoder,immuatableSections), immuatableSections);
+ }
+
+ private MessageMetaData_1_0(List<Section> sections, List<ByteBuffer> encodedSections)
+ {
+ _encodedSections = encodedSections;
+
+ Iterator<Section> sectIter = sections.iterator();
+
+ Section section = sectIter.hasNext() ? sectIter.next() : null;
+ if(section instanceof Header)
+ {
+ _header = (Header) section;
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof DeliveryAnnotations)
+ {
+ _deliveryAnnotations = ((DeliveryAnnotations) section).getValue();
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof MessageAnnotations)
+ {
+ _messageAnnotations = ((MessageAnnotations) section).getValue();
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof Properties)
+ {
+ _properties = (Properties) section;
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof ApplicationProperties)
+ {
+ _appProperties = ((ApplicationProperties) section).getValue();
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ if(section instanceof Footer)
+ {
+ _footer = ((Footer) section).getValue();
+ section = sectIter.hasNext() ? sectIter.next() : null;
+ }
+
+ _messageHeader = new MessageHeader_1_0();
+
+ }
+
+ private static List<Section> constructSections(final ByteBuffer[] fragments, final SectionDecoder decoder, List<ByteBuffer> encodedSections)
+ {
+ List<Section> sections = new ArrayList<Section>(3);
+
+ ByteBuffer src;
+ if(fragments.length == 1)
+ {
+ src = fragments[0].duplicate();
+ }
+ else
+ {
+ int size = 0;
+ for(ByteBuffer buf : fragments)
+ {
+ size += buf.remaining();
+ }
+ src = ByteBuffer.allocate(size);
+ for(ByteBuffer buf : fragments)
+ {
+ src.put(buf.duplicate());
+ }
+ src.flip();
+
+ }
+
+ try
+ {
+ int startBarePos = -1;
+ int lastPos = src.position();
+ Section s = decoder.readSection(src);
+
+
+
+ if(s instanceof Header)
+ {
+ sections.add(s);
+ lastPos = src.position();
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+
+ if(s instanceof DeliveryAnnotations)
+ {
+ sections.add(s);
+ lastPos = src.position();
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+
+ if(s instanceof MessageAnnotations)
+ {
+ sections.add(s);
+ lastPos = src.position();
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+
+ if(s instanceof Properties)
+ {
+ sections.add(s);
+ if(startBarePos == -1)
+ {
+ startBarePos = lastPos;
+ }
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+
+ if(s instanceof ApplicationProperties)
+ {
+ sections.add(s);
+ if(startBarePos == -1)
+ {
+ startBarePos = lastPos;
+ }
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+
+ if(s instanceof AmqpValue)
+ {
+ if(startBarePos == -1)
+ {
+ startBarePos = lastPos;
+ }
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+ else if(s instanceof Data)
+ {
+ if(startBarePos == -1)
+ {
+ startBarePos = lastPos;
+ }
+ do
+ {
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ } while(s instanceof Data);
+ }
+ else if(s instanceof AmqpSequence)
+ {
+ if(startBarePos == -1)
+ {
+ startBarePos = lastPos;
+ }
+ do
+ {
+ s = src.hasRemaining() ? decoder.readSection(src) : null;
+ }
+ while(s instanceof AmqpSequence);
+ }
+
+ if(s instanceof Footer)
+ {
+ sections.add(s);
+ }
+
+
+ int pos = 0;
+ for(ByteBuffer buf : fragments)
+ {
+/*
+ if(pos < startBarePos)
+ {
+ if(pos + buf.remaining() > startBarePos)
+ {
+ ByteBuffer dup = buf.duplicate();
+ dup.position(dup.position()+startBarePos-pos);
+ dup.slice();
+ encodedSections.add(dup);
+ }
+ }
+ else
+*/
+ {
+ encodedSections.add(buf.duplicate());
+ }
+ pos += buf.remaining();
+ }
+
+ return sections;
+ }
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+
+ public MessageMetaDataType getType()
+ {
+ return MessageMetaDataType.META_DATA_1_0;
+ }
+
+
+ public int getStorableSize()
+ {
+ int size = 0;
+
+ for(ByteBuffer bin : _encodedSections)
+ {
+ size += bin.limit();
+ }
+
+ return size;
+ }
+
+ private ByteBuffer encodeAsBuffer()
+ {
+ ByteBuffer buf = ByteBuffer.allocate(getStorableSize());
+
+ for(ByteBuffer bin : _encodedSections)
+ {
+ buf.put(bin.duplicate());
+ }
+
+ return buf;
+ }
+
+ public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+ {
+ ByteBuffer buf = _encoded;
+
+ if(buf == null)
+ {
+ buf = encodeAsBuffer();
+ _encoded = buf;
+ }
+
+ buf = buf.duplicate();
+
+ buf.position(offsetInMetaData);
+
+ if(dest.remaining() < buf.limit())
+ {
+ buf.limit(dest.remaining());
+ }
+ dest.put(buf);
+ return buf.limit();
+ }
+
+ public boolean isPersistent()
+ {
+ return _header != null && Boolean.TRUE.equals(_header.getDurable());
+ }
+
+ public MessageHeader_1_0 getMessageHeader()
+ {
+ return _messageHeader;
+ }
+
+ public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory();
+
+
+ private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_1_0>
+ {
+ private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance();
+
+ public MessageMetaData_1_0 createMetaData(ByteBuffer buf)
+ {
+ ValueHandler valueHandler = new ValueHandler(_typeRegistry);
+
+ ArrayList<Section> sections = new ArrayList<Section>(3);
+ ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(3);
+
+ while(buf.hasRemaining())
+ {
+ try
+ {
+ ByteBuffer encodedBuf = buf.duplicate();
+ sections.add((Section) valueHandler.parse(buf));
+ encodedBuf.limit(buf.position());
+ encodedSections.add(encodedBuf);
+
+ }
+ catch (AmqpErrorException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ return new MessageMetaData_1_0(sections,encodedSections);
+
+ }
+ }
+
+ public class MessageHeader_1_0 implements AMQMessageHeader
+ {
+
+ public String getCorrelationId()
+ {
+ if(_properties == null || _properties.getCorrelationId() == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _properties.getMessageId().toString();
+ }
+ }
+
+ public long getExpiration()
+ {
+ return 0; //TODO
+ }
+
+ public String getMessageId()
+ {
+ if(_properties == null || _properties.getCorrelationId() == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _properties.getCorrelationId().toString();
+ }
+ }
+
+ public String getMimeType()
+ {
+
+ if(_properties == null || _properties.getContentType() == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _properties.getContentType().toString();
+ }
+ }
+
+ public String getEncoding()
+ {
+ return null; //TODO
+ }
+
+ public byte getPriority()
+ {
+ if(_header == null || _header.getPriority() == null)
+ {
+ return 4; //javax.jms.Message.DEFAULT_PRIORITY;
+ }
+ else
+ {
+ return _header.getPriority().byteValue();
+ }
+ }
+
+ public long getTimestamp()
+ {
+ if(_properties == null || _properties.getCreationTime() == null)
+ {
+ return 0L;
+ }
+ else
+ {
+ return _properties.getCreationTime().getTime();
+ }
+
+ }
+
+ public String getType()
+ {
+
+ if(_appProperties == null || _appProperties.get(JMS_TYPE) == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _appProperties.get(JMS_TYPE).toString();
+ }
+ }
+
+ public String getReplyTo()
+ {
+ if(_properties == null || _properties.getReplyTo() == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _properties.getReplyTo().toString();
+ }
+ }
+
+ public String getReplyToExchange()
+ {
+ return null; //TODO
+ }
+
+ public String getReplyToRoutingKey()
+ {
+ return null; //TODO
+ }
+
+ public Object getHeader(final String name)
+ {
+ return _appProperties == null ? null : _appProperties.get(name);
+ }
+
+ public boolean containsHeaders(final Set<String> names)
+ {
+ if(_appProperties == null)
+ {
+ return false;
+ }
+
+ for(String key : names)
+ {
+ if(!_appProperties.containsKey(key))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean containsHeader(final String name)
+ {
+ return _appProperties != null && _appProperties.containsKey(name);
+ }
+
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
index 08006435f8..af78042a63 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
@@ -29,7 +29,7 @@ import java.nio.ByteBuffer;
import java.lang.ref.WeakReference;
-public class MessageTransferMessage implements InboundMessage, ServerMessage
+public class MessageTransferMessage implements InboundMessage, ServerMessage<MessageTransferMessage>
{
@@ -90,7 +90,7 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage
return getMetaData().getExpiration();
}
- public MessageReference newReference()
+ public MessageReference<MessageTransferMessage> newReference()
{
return new TransferMessageReference(this);
}
@@ -145,5 +145,5 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage
{
return _sessionRef == null ? null : (ServerSession) _sessionRef.get();
}
-
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 2f2d39115f..e3cc9879fa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
import org.apache.qpid.server.configuration.SessionConfig;
-public interface ServerMessage extends EnqueableMessage, MessageContentSource
+public interface ServerMessage<T extends ServerMessage> extends EnqueableMessage, MessageContentSource
{
String getRoutingKey();
@@ -38,7 +38,7 @@ public interface ServerMessage extends EnqueableMessage, MessageContentSource
long getExpiration();
- MessageReference newReference();
+ MessageReference<T> newReference();
Long getMessageNumber();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java
new file mode 100644
index 0000000000..c1282555cc
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java
@@ -0,0 +1,339 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.amqp_1_0.codec.FrameWriter;
+import org.apache.qpid.amqp_1_0.framing.AMQFrame;
+import org.apache.qpid.amqp_1_0.framing.FrameHandler;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConnectionConfigType;
+import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.transport.NetworkDriver;
+
+import java.io.PrintWriter;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.UUID;
+
+public class AMQPSASLEngine_1_0_0 implements ProtocolEngine, FrameOutputHandler
+{
+ public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
+ private NetworkDriver _networkDriver;
+ private long _readBytes;
+ private long _writtenBytes;
+ private final UUID _id;
+ private final IApplicationRegistry _appRegistry;
+ private long _createTime = System.currentTimeMillis();
+
+ private static final int BUF_SIZE = 8;
+ private static final ByteBuffer HEADER =
+ ByteBuffer.wrap(new byte[]
+ {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 0,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ });
+
+ private FrameWriter _frameWriter;
+ private FrameHandler _frameHandler;
+ private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024);
+ private Object _sendLock = new Object();
+ private byte _major;
+ private byte _minor;
+ private byte _revision;
+ private ConnectionEndpoint _conn;
+
+
+ static enum State {
+ A,
+ M,
+ Q,
+ P,
+ PROTOCOL,
+ MAJOR,
+ MINOR,
+ REVISION,
+ FRAME
+ }
+
+ private State _state = State.A;
+
+ public AMQPSASLEngine_1_0_0(NetworkDriver networkDriver,
+ final IApplicationRegistry appRegistry)
+ {
+
+
+ _networkDriver = networkDriver;
+ _id = appRegistry.getConfigStore().createId();
+ _appRegistry = appRegistry;
+
+
+ // FIXME Two log messages to maintain compatinbility with earlier protocol versions
+// _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
+// _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+
+
+ _conn = new ConnectionEndpoint(new Container(),_appRegistry.getAuthenticationManager());
+ _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+ _conn.setFrameOutputHandler(this);
+
+
+ _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
+ _frameHandler = new FrameHandler(_conn);
+
+ _networkDriver.send(HEADER.duplicate());
+
+
+
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return _writtenBytes;
+ }
+
+ public void writerIdle()
+ {
+ //Todo
+ }
+
+ public void readerIdle()
+ {
+ //Todo
+ }
+
+ public String getAddress()
+ {
+ return getRemoteAddress().toString();
+ }
+
+
+ public ConfigStore getConfigStore()
+ {
+ return _appRegistry.getConfigStore();
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public ConnectionConfigType getConfigType()
+ {
+ return ConnectionConfigType.getInstance();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public synchronized void received(ByteBuffer msg)
+ {
+ _readBytes += msg.remaining();
+ switch(_state)
+ {
+ case A:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ break;
+ }
+ case M:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.M;
+ break;
+ }
+
+ case Q:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.Q;
+ break;
+ }
+ case P:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.P;
+ break;
+ }
+ case PROTOCOL:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.PROTOCOL;
+ break;
+ }
+ case MAJOR:
+ if(msg.hasRemaining())
+ {
+ _major = msg.get();
+ }
+ else
+ {
+ _state = State.MAJOR;
+ break;
+ }
+ case MINOR:
+ if(msg.hasRemaining())
+ {
+ _minor = msg.get();
+ }
+ else
+ {
+ _state = State.MINOR;
+ break;
+ }
+ case REVISION:
+ if(msg.hasRemaining())
+ {
+ _revision = msg.get();
+
+ _state = State.FRAME;
+ }
+ else
+ {
+ _state = State.REVISION;
+ break;
+ }
+ case FRAME:
+ _frameHandler.parse(msg);
+ }
+
+ }
+
+ public void exception(Throwable t)
+ {
+ t.printStackTrace();
+ }
+
+ public void closed()
+ {
+ // todo
+
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+
+ public boolean canSend()
+ {
+ return true;
+ }
+
+ public void send(AMQFrame frame)
+ {
+ send(frame, null);
+ }
+ public void send(AMQFrame frame, ByteBuffer buffer)
+ {
+
+ synchronized(_sendLock)
+ {
+
+ if(_buf.remaining() < MAX_FRAME_SIZE)
+ {
+ _buf = ByteBuffer.allocate(1024*1024);
+ }
+
+ _frameWriter.setValue(frame);
+
+ ByteBuffer dup = _buf.slice();
+
+ _frameWriter.writeToBuffer(dup);
+
+ _buf.position(_buf.position()+dup.position());
+
+ dup.flip();
+ _writtenBytes += dup.limit();
+ _networkDriver.send(dup);
+
+ }
+ }
+
+ public void close()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setLogOutput(final PrintWriter out)
+ {
+ //TODO
+ }
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index eb957ee33c..f8f4929a73 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -153,6 +153,28 @@ private static final byte[] AMQP_0_9_1_HEADER =
(byte) 10
};
+ private static final byte[] AMQP_1_0_0_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 0,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ };
+
+ private static final byte[] AMQP_SASL_1_0_0_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 3,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ };
+
private static interface DelegateCreator
{
VERSION getVersion();
@@ -246,8 +268,49 @@ private static final byte[] AMQP_0_9_1_HEADER =
}
};
+ private DelegateCreator creator_1_0_0 = new DelegateCreator()
+ {
+
+ public VERSION getVersion()
+ {
+ return VERSION.v1_0_0;
+ }
+
+
+ public byte[] getHeaderIdentifier()
+ {
+ return AMQP_1_0_0_HEADER;
+ }
+
+ public ProtocolEngine getProtocolEngine()
+ {
+ return new ProtocolEngine_1_0_0(_networkDriver, _appRegistry);
+ }
+ };
+
+ private DelegateCreator creator_1_0_0_SASL = new DelegateCreator()
+ {
+
+ public VERSION getVersion()
+ {
+ return VERSION.v1_0_0;
+ }
+
+
+ public byte[] getHeaderIdentifier()
+ {
+ return AMQP_SASL_1_0_0_HEADER;
+ }
+
+ public ProtocolEngine getProtocolEngine()
+ {
+ return new ProtocolEngine_1_0_0_SASL(_networkDriver, _appRegistry);
+ }
+ };
+
+
private final DelegateCreator[] _creators =
- new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
+ new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10, creator_1_0_0_SASL, creator_1_0_0};
private class ClosedDelegateProtocolEngine implements ProtocolEngine
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
new file mode 100755
index 0000000000..aa84552210
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
@@ -0,0 +1,390 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.amqp_1_0.codec.FrameWriter;
+import org.apache.qpid.amqp_1_0.framing.AMQFrame;
+import org.apache.qpid.amqp_1_0.framing.FrameHandler;
+import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
+import org.apache.qpid.amqp_1_0.transport.*;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.transport.NetworkDriver;
+
+import java.io.PrintWriter;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.UUID;
+import java.util.logging.*;
+
+public class ProtocolEngine_1_0_0 implements ProtocolEngine, FrameOutputHandler
+{
+ public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
+ private NetworkDriver _networkDriver;
+ private long _readBytes;
+ private long _writtenBytes;
+ private final UUID _id;
+ private final IApplicationRegistry _appRegistry;
+ private long _createTime = System.currentTimeMillis();
+ private ConnectionEndpoint _conn;
+
+ private static final int BUF_SIZE = 8;
+ private static final ByteBuffer HEADER =
+ ByteBuffer.wrap(new byte[]
+ {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 0,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ });
+
+ private FrameWriter _frameWriter;
+ private FrameHandler _frameHandler;
+ private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024);
+ private Object _sendLock = new Object();
+ private byte _major;
+ private byte _minor;
+ private byte _revision;
+
+
+ static enum State {
+ A,
+ M,
+ Q,
+ P,
+ PROTOCOL,
+ MAJOR,
+ MINOR,
+ REVISION,
+ FRAME
+ }
+
+ private State _state = State.A;
+
+ public ProtocolEngine_1_0_0(NetworkDriver networkDriver,
+ final IApplicationRegistry appRegistry)
+ {
+
+
+ _networkDriver = networkDriver;
+ _id = appRegistry.getConfigStore().createId();
+ _appRegistry = appRegistry;
+
+
+ // FIXME Two log messages to maintain compatinbility with earlier protocol versions
+// _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
+// _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ Container container = new Container();
+
+ Principal principal = new Principal()
+ {
+
+ public String getName()
+ {
+ // TODO
+ return "rob";
+ }
+ };
+ _conn = new ConnectionEndpoint(container,_appRegistry.getAuthenticationManager());
+ _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+ _conn.setFrameOutputHandler(this);
+ _conn.setRemoteAddress(driver.getRemoteAddress());
+
+
+ _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
+ _frameHandler = new FrameHandler(_conn);
+
+ _networkDriver.send(HEADER.duplicate());
+
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return _writtenBytes;
+ }
+
+ public void writerIdle()
+ {
+ //Todo
+ }
+
+ public void readerIdle()
+ {
+ //Todo
+ }
+
+ public String getAddress()
+ {
+ return getRemoteAddress().toString();
+ }
+
+
+ public ConfigStore getConfigStore()
+ {
+ return _appRegistry.getConfigStore();
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public ConnectionConfigType getConfigType()
+ {
+ return ConnectionConfigType.getInstance();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public synchronized void received(ByteBuffer msg)
+ {
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ ByteBuffer dup = msg.duplicate();
+ byte[] data = new byte[dup.remaining()];
+ dup.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString());
+ }
+ _readBytes += msg.remaining();
+ switch(_state)
+ {
+ case A:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ break;
+ }
+ case M:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.M;
+ break;
+ }
+
+ case Q:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.Q;
+ break;
+ }
+ case P:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.P;
+ break;
+ }
+ case PROTOCOL:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.PROTOCOL;
+ break;
+ }
+ case MAJOR:
+ if(msg.hasRemaining())
+ {
+ _major = msg.get();
+ }
+ else
+ {
+ _state = State.MAJOR;
+ break;
+ }
+ case MINOR:
+ if(msg.hasRemaining())
+ {
+ _minor = msg.get();
+ }
+ else
+ {
+ _state = State.MINOR;
+ break;
+ }
+ case REVISION:
+ if(msg.hasRemaining())
+ {
+ _revision = msg.get();
+
+ _state = State.FRAME;
+ }
+ else
+ {
+ _state = State.REVISION;
+ break;
+ }
+ case FRAME:
+ _frameHandler.parse(msg);
+ }
+
+ }
+
+ public void exception(Throwable t)
+ {
+ t.printStackTrace();
+ }
+
+ public void closed()
+ {
+ _conn.inputClosed();
+ if(_conn != null && _conn.getConnectionEventListener() != null)
+ {
+ ((Connection_1_0)_conn.getConnectionEventListener()).closed();
+ }
+
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+
+ public boolean canSend()
+ {
+ return true;
+ }
+
+ public void send(final AMQFrame amqFrame)
+ {
+ send(amqFrame, null);
+ }
+
+ private final Logger FRAME_LOGGER = Logger.getLogger("FRM");
+ private final Logger RAW_LOGGER = Logger.getLogger("RAW");
+
+
+ public void send(final AMQFrame amqFrame, ByteBuffer buf)
+ {
+ synchronized(_sendLock)
+ {
+
+ if(FRAME_LOGGER.isLoggable(Level.FINE))
+ {
+ FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
+ }
+
+ if(_buf.remaining() < _conn.getMaxFrameSize())
+ {
+ _buf = ByteBuffer.allocate(Math.min(_conn.getMaxFrameSize(),1024*1024));
+ }
+ _frameWriter.setValue(amqFrame);
+
+
+
+ ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
+ // int pos = _buf.position();
+
+ int size = _frameWriter.writeToBuffer(dup);
+ if(size > _conn.getMaxFrameSize())
+ {
+// _buf.position(pos);
+ throw new OversizeFrameException(amqFrame,size);
+ }
+
+// _buf.position(_buf.position()+dup.position());
+
+ dup.flip();
+ _writtenBytes += dup.limit();
+
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ ByteBuffer dup2 = dup.duplicate();
+ byte[] data = new byte[dup2.remaining()];
+ dup2.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString());
+ }
+
+
+ _networkDriver.send(dup);
+
+ }
+ }
+
+ public void send(short channel, FrameBody body)
+ {
+ AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
+ send(frame);
+
+ }
+
+ public void close()
+ {
+ System.out.print(true);
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
new file mode 100644
index 0000000000..d718058072
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
@@ -0,0 +1,426 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.amqp_1_0.codec.FrameWriter;
+import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
+import org.apache.qpid.amqp_1_0.framing.AMQFrame;
+import org.apache.qpid.amqp_1_0.framing.FrameHandler;
+import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
+import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConnectionConfigType;
+import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.transport.NetworkDriver;
+
+import java.io.PrintWriter;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ProtocolEngine_1_0_0_SASL implements ProtocolEngine, FrameOutputHandler
+{
+ private NetworkDriver _networkDriver;
+ private long _readBytes;
+ private long _writtenBytes;
+ private final UUID _id;
+ private final IApplicationRegistry _appRegistry;
+ private long _createTime = System.currentTimeMillis();
+ private ConnectionEndpoint _conn;
+
+ private static final int BUF_SIZE = 8;
+ private static final ByteBuffer HEADER =
+ ByteBuffer.wrap(new byte[]
+ {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 3,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ });
+
+ private static final ByteBuffer PROTOCOL_HEADER =
+ ByteBuffer.wrap(new byte[]
+ {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 0,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ });
+
+
+ private FrameWriter _frameWriter;
+ private ProtocolHandler _frameHandler;
+ private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024);
+ private Object _sendLock = new Object();
+ private byte _major;
+ private byte _minor;
+ private byte _revision;
+ private PrintWriter _out;
+
+
+ static enum State {
+ A,
+ M,
+ Q,
+ P,
+ PROTOCOL,
+ MAJOR,
+ MINOR,
+ REVISION,
+ FRAME
+ }
+
+ private State _state = State.A;
+
+
+ public ProtocolEngine_1_0_0_SASL(final NetworkDriver networkDriver, final IApplicationRegistry appRegistry)
+ {
+ _networkDriver = networkDriver;
+ _id = appRegistry.getConfigStore().createId();
+ _appRegistry = appRegistry;
+ }
+
+ public void setNetworkDriver(final NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ Container container = new Container();
+
+ Principal principal = new Principal()
+ {
+
+ public String getName()
+ {
+ // TODO
+ return "rob";
+ }
+ };
+ _conn = new ConnectionEndpoint(container, ApplicationRegistry.getInstance().getAuthenticationManager());
+ _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+ _conn.setRemoteAddress(getRemoteAddress());
+
+
+ _conn.setFrameOutputHandler(this);
+ _conn.setSaslFrameOutput(this);
+
+ _conn.setOnSaslComplete(new Runnable()
+ {
+
+
+ public void run()
+ {
+ if(_conn.isAuthenticated())
+ {
+ _networkDriver.send(PROTOCOL_HEADER.duplicate());
+ }
+ else
+ {
+ _networkDriver.close();
+ }
+ }
+ });
+ _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
+ _frameHandler = new SASLFrameHandler(_conn);
+
+ _networkDriver.send(HEADER.duplicate());
+
+ _conn.initiateSASL();
+
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return _writtenBytes;
+ }
+
+ public void writerIdle()
+ {
+ //Todo
+ }
+
+ public void readerIdle()
+ {
+ //Todo
+ }
+
+ public String getAddress()
+ {
+ return getRemoteAddress().toString();
+ }
+
+
+ public ConfigStore getConfigStore()
+ {
+ return _appRegistry.getConfigStore();
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public ConnectionConfigType getConfigType()
+ {
+ return ConnectionConfigType.getInstance();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ private final Logger RAW_LOGGER = Logger.getLogger("RAW");
+
+
+ public synchronized void received(ByteBuffer msg)
+ {
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ ByteBuffer dup = msg.duplicate();
+ byte[] data = new byte[dup.remaining()];
+ dup.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString());
+ }
+ _readBytes += msg.remaining();
+ switch(_state)
+ {
+ case A:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ break;
+ }
+ case M:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.M;
+ break;
+ }
+
+ case Q:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.Q;
+ break;
+ }
+ case P:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.P;
+ break;
+ }
+ case PROTOCOL:
+ if(msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.PROTOCOL;
+ break;
+ }
+ case MAJOR:
+ if(msg.hasRemaining())
+ {
+ _major = msg.get();
+ }
+ else
+ {
+ _state = State.MAJOR;
+ break;
+ }
+ case MINOR:
+ if(msg.hasRemaining())
+ {
+ _minor = msg.get();
+ }
+ else
+ {
+ _state = State.MINOR;
+ break;
+ }
+ case REVISION:
+ if(msg.hasRemaining())
+ {
+ _revision = msg.get();
+
+ _state = State.FRAME;
+ }
+ else
+ {
+ _state = State.REVISION;
+ break;
+ }
+ case FRAME:
+ _frameHandler = _frameHandler.parse(msg);
+ }
+
+ }
+
+ public void exception(Throwable t)
+ {
+ t.printStackTrace();
+ }
+
+ public void closed()
+ {
+ // todo
+ _conn.inputClosed();
+ if(_conn != null && _conn.getConnectionEventListener() != null)
+ {
+ ((Connection_1_0)_conn.getConnectionEventListener()).closed();
+ }
+
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+
+ public boolean canSend()
+ {
+ return true;
+ }
+
+ public void send(final AMQFrame amqFrame)
+ {
+ send(amqFrame, null);
+ }
+
+ private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
+
+
+ public void send(final AMQFrame amqFrame, ByteBuffer buf)
+ {
+
+ synchronized(_sendLock)
+ {
+ if(FRAME_LOGGER.isLoggable(Level.FINE))
+ {
+ FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
+ }
+ if(_buf.remaining() < _conn.getMaxFrameSize())
+ {
+ _buf = ByteBuffer.allocate(Math.min(_conn.getMaxFrameSize(),1024*1024));
+ }
+ _frameWriter.setValue(amqFrame);
+
+ ByteBuffer dup = _buf.slice();
+ int pos = _buf.position();
+
+ int size = _frameWriter.writeToBuffer(dup);
+ if(size > _conn.getMaxFrameSize())
+ {
+ _buf.position(pos);
+ throw new OversizeFrameException(amqFrame,size);
+ }
+
+ _buf.position(_buf.position()+dup.position());
+
+ dup.flip();
+ _writtenBytes += dup.limit();
+
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ ByteBuffer dup2 = dup.duplicate();
+ byte[] data = new byte[dup2.remaining()];
+ dup2.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString());
+ }
+
+ _networkDriver.send(dup);
+
+ }
+ }
+
+ public void send(short channel, FrameBody body)
+ {
+ AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
+ send(frame);
+
+ }
+
+ public void close()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setLogOutput(final PrintWriter out)
+ {
+ _out = out;
+ }
+
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
new file mode 100644
index 0000000000..318a240b27
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
+import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Connection_1_0 implements ConnectionEventListener
+{
+
+ private IApplicationRegistry _appRegistry;
+ private VirtualHost _vhost;
+
+
+ public static interface Task
+ {
+ public void doTask(Connection_1_0 connection);
+ }
+
+
+ private List<Task> _closeTasks =
+ Collections.synchronizedList(new ArrayList<Task>());
+
+
+
+ public Connection_1_0(IApplicationRegistry appRegistry)
+ {
+ _appRegistry = appRegistry;
+ _vhost = _appRegistry.getVirtualHostRegistry().getDefaultVirtualHost();
+ }
+
+ public void remoteSessionCreation(SessionEndpoint endpoint)
+ {
+ Session_1_0 session = new Session_1_0(_vhost, _appRegistry, this);
+ endpoint.setSessionEventListener(session);
+ }
+
+
+ void removeConnectionCloseTask(final Task task)
+ {
+ _closeTasks.remove( task );
+ }
+
+ void addConnectionCloseTask(final Task task)
+ {
+ _closeTasks.add( task );
+ }
+
+ public void closeReceived()
+ {
+ List<Task> taskCopy;
+ synchronized (_closeTasks)
+ {
+ taskCopy = new ArrayList<Task>(_closeTasks);
+ }
+ for(Task task : taskCopy)
+ {
+ task.doTask(this);
+ }
+ synchronized (_closeTasks)
+ {
+ _closeTasks.clear();
+ }
+
+ }
+
+ public void closed()
+ {
+ closeReceived();
+ }
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
new file mode 100644
index 0000000000..d45758391c
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+
+public interface Destination
+{
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
new file mode 100644
index 0000000000..c518ee5cc3
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.txn.ServerTransaction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class ExchangeDestination implements ReceivingDestination
+{
+ private static final Accepted ACCEPTED = new Accepted();
+ private static final Outcome[] OUTCOMES = { ACCEPTED };
+
+ private Exchange _exchange;
+
+ public ExchangeDestination(Exchange exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public Outcome[] getOutcomes()
+ {
+ return OUTCOMES;
+ }
+
+ public Outcome send(final Message_1_0 message, ServerTransaction txn)
+ {
+ final ArrayList<? extends BaseQueue> queues = _exchange.route(message);
+
+ txn.enqueue(queues,message, new ServerTransaction.Action()
+ {
+
+ BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
+
+ public void postCommit()
+ {
+ for(int i = 0; i < _queues.length; i++)
+ {
+ try
+ {
+ _queues[i].enqueue(message);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void onRollback()
+ {
+ // NO-OP
+ }
+ });
+
+ return ACCEPTED;
+ }
+
+ public int getCredit()
+ {
+ // TODO - fix
+ return 20000;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
new file mode 100644
index 0000000000..42eea05d37
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
@@ -0,0 +1,59 @@
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LinkRegistry
+{
+ private final Map<String, SendingLink_1_0> _sendingLinks = new HashMap<String, SendingLink_1_0>();
+ private final Map<String, ReceivingLink_1_0> _receivingLinks = new HashMap<String, ReceivingLink_1_0>();
+
+ public synchronized SendingLink_1_0 getDurableSendingLink(String name)
+ {
+ return _sendingLinks.get(name);
+ }
+
+ public synchronized boolean registerSendingLink(String name, SendingLink_1_0 link)
+ {
+ if(_sendingLinks.containsKey(name))
+ {
+ return false;
+ }
+ else
+ {
+ _sendingLinks.put(name, link);
+ return true;
+ }
+ }
+
+ public synchronized boolean unregisterSendingLink(String name)
+ {
+ if(!_sendingLinks.containsKey(name))
+ {
+ return false;
+ }
+ else
+ {
+ _sendingLinks.remove(name);
+ return true;
+ }
+ }
+
+ public synchronized ReceivingLink_1_0 getDurableReceivingLink(String name)
+ {
+ return _receivingLinks.get(name);
+ }
+
+ public synchronized boolean registerReceivingLink(String name, ReceivingLink_1_0 link)
+ {
+ if(_receivingLinks.containsKey(name))
+ {
+ return false;
+ }
+ else
+ {
+ _receivingLinks.put(name, link);
+ return true;
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
new file mode 100644
index 0000000000..db81d3b205
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+public interface Link_1_0
+{
+ void start();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
new file mode 100644
index 0000000000..d1d6effc2f
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+
+import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.MessageMetaData_1_0;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.StoredMessage;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+
+public class Message_1_0 implements ServerMessage<Message_1_0>, InboundMessage
+{
+ private final StoredMessage<MessageMetaData_1_0> _storedMessage;
+ private List<ByteBuffer> _fragments;
+
+
+ public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage, final List<ByteBuffer> fragments)
+ {
+ _storedMessage = storedMessage;
+ _fragments = fragments;
+ }
+
+ public String getRoutingKey()
+ {
+ Object routingKey = getMessageHeader().getHeader("routing-key");
+ if(routingKey != null)
+ {
+ return routingKey.toString();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private MessageMetaData_1_0 getMessageMetaData()
+ {
+ return _storedMessage.getMetaData();
+ }
+
+ public AMQMessageHeader getMessageHeader()
+ {
+ return getMessageMetaData().getMessageHeader();
+ }
+
+ public boolean isPersistent()
+ {
+ return getMessageMetaData().isPersistent();
+ }
+
+ public boolean isRedelivered()
+ {
+ // TODO
+ return false;
+ }
+
+ public long getSize()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public long getExpiration()
+ {
+ return getMessageHeader().getExpiration();
+ }
+
+ public MessageReference<Message_1_0> newReference()
+ {
+ return new Reference(this);
+ }
+
+ public Long getMessageNumber()
+ {
+ return _storedMessage.getMessageNumber();
+ }
+
+ public long getArrivalTime()
+ {
+ return 0; //TODO
+ }
+
+ public int getContent(final ByteBuffer buf, final int offset)
+ {
+ return _storedMessage.getContent(offset, buf);
+ }
+
+ public SessionConfig getSessionConfig()
+ {
+ return null; //TODO
+ }
+
+ public List<ByteBuffer> getFragments()
+ {
+ return _fragments;
+ }
+
+ public static class Reference extends MessageReference<Message_1_0>
+ {
+ public Reference(Message_1_0 message)
+ {
+ super(message);
+ }
+
+ protected void onReference(Message_1_0 message)
+ {
+
+ }
+
+ protected void onRelease(Message_1_0 message)
+ {
+
+ }
+}
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
new file mode 100644
index 0000000000..7635b4299a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
+
+import org.apache.qpid.server.txn.ServerTransaction;
+
+import java.util.Arrays;
+
+public class QueueDestination implements SendingDestination, ReceivingDestination
+{
+ private static final Accepted ACCEPTED = new Accepted();
+ private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
+
+
+ private AMQQueue _queue;
+
+ public QueueDestination(AMQQueue queue)
+ {
+ _queue = queue;
+ }
+
+ public Outcome[] getOutcomes()
+ {
+ return OUTCOMES;
+ }
+
+ public Outcome send(final Message_1_0 message, ServerTransaction txn)
+ {
+
+ try
+ {
+ txn.enqueue(_queue,message, new ServerTransaction.Action()
+ {
+
+
+ public void postCommit()
+ {
+ try
+ {
+
+ _queue.enqueue(message);
+ }
+ catch (Exception e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void onRollback()
+ {
+ // NO-OP
+ }
+ });
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ return ACCEPTED;
+ }
+
+ public int getCredit()
+ {
+ // TODO - fix
+ return 100;
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
new file mode 100644
index 0000000000..4ae0596e25
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.type.Outcome;
+
+import org.apache.qpid.server.txn.ServerTransaction;
+
+public interface ReceivingDestination extends Destination
+{
+
+ Outcome[] getOutcomes();
+
+ Outcome send(Message_1_0 message, ServerTransaction txn);
+
+ int getCredit();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
new file mode 100644
index 0000000000..6da5081185
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
@@ -0,0 +1,51 @@
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Source;
+import org.apache.qpid.amqp_1_0.type.Target;
+
+public class ReceivingLinkAttachment
+{
+ private final Session_1_0 _session;
+ private final ReceivingLinkEndpoint _endpoint;
+
+ public ReceivingLinkAttachment(final Session_1_0 session, final ReceivingLinkEndpoint endpoint)
+ {
+ _session = session;
+ _endpoint = endpoint;
+ }
+
+ public Session_1_0 getSession()
+ {
+ return _session;
+ }
+
+ public ReceivingLinkEndpoint getEndpoint()
+ {
+ return _endpoint;
+ }
+
+ public Source getSource()
+ {
+ return getEndpoint().getSource();
+ }
+
+ public void setDeliveryStateHandler(final DeliveryStateHandler handler)
+ {
+ getEndpoint().setDeliveryStateHandler(handler);
+ }
+
+ public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled)
+ {
+ getEndpoint().updateDisposition(deliveryTag, state, settled);
+ }
+
+ public Target getTarget()
+ {
+ return getEndpoint().getTarget();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
new file mode 100644
index 0000000000..5f3e4602f8
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
@@ -0,0 +1,303 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
+import org.apache.qpid.server.message.MessageMetaData_1_0;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, DeliveryStateHandler
+{
+ private VirtualHost _vhost;
+
+ private ReceivingDestination _destination;
+ private SectionDecoderImpl _sectionDecoder;
+ private volatile ReceivingLinkAttachment _attachment;
+
+
+ private ArrayList<Transfer> _incompleteMessage;
+ private TerminusDurability _durability;
+
+ private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
+ private boolean _resumedMessage;
+ private Binary _messageDeliveryTag;
+ private ReceiverSettleMode _receivingSettlementMode;
+
+
+ public ReceivingLink_1_0(ReceivingLinkAttachment receivingLinkAttachment, VirtualHost vhost,
+ ReceivingDestination destination)
+ {
+ _vhost = vhost;
+ _destination = destination;
+ _attachment = receivingLinkAttachment;
+ receivingLinkAttachment.setDeliveryStateHandler(this);
+
+ _durability = ((Target)receivingLinkAttachment.getTarget()).getDurable();
+
+ _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getDescribedTypeRegistry());
+
+
+ }
+
+ public void messageTransfer(Transfer xfr)
+ {
+ // TODO - cope with fragmented messages
+
+ List<ByteBuffer> fragments = null;
+
+
+
+ if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
+ {
+ _incompleteMessage = new ArrayList<Transfer>();
+ _incompleteMessage.add(xfr);
+ _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
+ _messageDeliveryTag = xfr.getDeliveryTag();
+ return;
+ }
+ else if(_incompleteMessage != null)
+ {
+ _incompleteMessage.add(xfr);
+
+ if(Boolean.TRUE.equals(xfr.getMore()))
+ {
+ return;
+ }
+
+ fragments = new ArrayList<ByteBuffer>(_incompleteMessage.size());
+ for(Transfer t : _incompleteMessage)
+ {
+ fragments.add(t.getPayload());
+ }
+ _incompleteMessage=null;
+
+ }
+ else
+ {
+ _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
+ _messageDeliveryTag = xfr.getDeliveryTag();
+ fragments = Collections.singletonList(xfr.getPayload());
+ }
+
+ if(_resumedMessage)
+ {
+ if(_unsettledMap.containsKey(_messageDeliveryTag))
+ {
+ Outcome outcome = _unsettledMap.get(_messageDeliveryTag);
+ boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
+ getEndpoint().updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
+ if(settled)
+ {
+ _unsettledMap.remove(_messageDeliveryTag);
+ }
+ }
+ else
+ {
+ System.err.println("UNEXPECTED!!");
+ System.err.println("Delivery Tag: " + _messageDeliveryTag);
+ System.err.println("_unsettledMap: " + _unsettledMap);
+
+ }
+ }
+ else
+ {
+ MessageMetaData_1_0 mmd = null;
+ List<ByteBuffer> immutableSections = new ArrayList<ByteBuffer>(3);
+ mmd = new MessageMetaData_1_0(fragments.toArray(new ByteBuffer[fragments.size()]),
+ _sectionDecoder,
+ immutableSections);
+
+ StoredMessage<MessageMetaData_1_0> storedMessage = _vhost.getMessageStore().addMessage(mmd);
+
+ boolean skipping = true;
+ int offset = 0;
+
+ for(ByteBuffer bareMessageBuf : immutableSections)
+ {
+ storedMessage.addContent(offset, bareMessageBuf.duplicate());
+ offset += bareMessageBuf.remaining();
+ }
+
+ storedMessage.flushToStore();
+
+ Message_1_0 message = new Message_1_0(storedMessage, fragments);
+
+
+ Binary transactionId = null;
+ org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState();
+ if(xfrState != null)
+ {
+ if(xfrState instanceof TransactionalState)
+ {
+ transactionId = ((TransactionalState)xfrState).getTxnId();
+ }
+ }
+
+ ServerTransaction transaction = null;
+ if(transactionId != null)
+ {
+ transaction = getSession().getTransaction(transactionId);
+ }
+ else
+ {
+ Session_1_0 session = getSession();
+ transaction = session != null ? session.getTransaction(null) : new AutoCommitTransaction(_vhost.getTransactionLog());
+ }
+
+ Outcome outcome = _destination.send(message, transaction);
+
+ DeliveryState resultantState;
+
+ if(transactionId == null)
+ {
+ resultantState = (DeliveryState) outcome;
+ }
+ else
+ {
+ TransactionalState transactionalState = new TransactionalState();
+ transactionalState.setOutcome(outcome);
+ transactionalState.setTxnId(transactionId);
+ resultantState = transactionalState;
+
+ }
+
+
+ boolean settled = transaction instanceof AutoCommitTransaction && ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
+
+ final Binary deliveryTag = xfr.getDeliveryTag();
+
+ if(!settled)
+ {
+ _unsettledMap.put(deliveryTag, outcome);
+ }
+
+ getEndpoint().updateDisposition(deliveryTag, resultantState, settled);
+
+ if(!(transaction instanceof AutoCommitTransaction))
+ {
+ ServerTransaction.Action a;
+ transaction.addPostTransactionAction(new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ getEndpoint().updateDisposition(deliveryTag, null, true);
+ }
+
+ public void onRollback()
+ {
+ getEndpoint().updateDisposition(deliveryTag, null, true);
+ }
+ });
+ }
+ }
+ }
+
+ private ReceiverSettleMode getReceivingSettlementMode()
+ {
+ return _receivingSettlementMode;
+ }
+
+ public void remoteDetached(LinkEndpoint endpoint, Detach detach)
+ {
+ //TODO
+ // if not durable or close
+ if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
+ (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+ {
+ endpoint.detach();
+ }
+ else if(detach == null || detach.getError() != null)
+ {
+ _attachment = null;
+ }
+ }
+
+ public void start()
+ {
+ getEndpoint().setLinkCredit(UnsignedInteger.valueOf(_destination.getCredit()));
+ getEndpoint().setCreditWindow();
+ }
+
+ public ReceivingLinkEndpoint getEndpoint()
+ {
+ return _attachment.getEndpoint();
+ }
+
+
+ public Session_1_0 getSession()
+ {
+ ReceivingLinkAttachment attachment = _attachment;
+ return attachment == null ? null : attachment.getSession();
+ }
+
+ public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+ {
+ if(Boolean.TRUE.equals(settled))
+ {
+ _unsettledMap.remove(deliveryTag);
+ }
+ }
+
+ public void setLinkAttachment(ReceivingLinkAttachment linkAttachment)
+ {
+ _attachment = linkAttachment;
+ _receivingSettlementMode = linkAttachment.getEndpoint().getReceivingSettlementMode();
+ ReceivingLinkEndpoint endpoint = linkAttachment.getEndpoint();
+ Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
+
+ Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap);
+ for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
+ {
+ Binary deliveryTag = entry.getKey();
+ if(!initialUnsettledMap.containsKey(deliveryTag))
+ {
+ _unsettledMap.remove(deliveryTag);
+ }
+ }
+
+ }
+
+ public Map getUnsettledOutcomeMap()
+ {
+ return _unsettledMap;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java
new file mode 100644
index 0000000000..6d601c9dda
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+
+public interface SendingDestination extends Destination
+{
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
new file mode 100644
index 0000000000..9d7af24135
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
@@ -0,0 +1,44 @@
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Source;
+
+public class SendingLinkAttachment
+{
+ private final Session_1_0 _session;
+ private final SendingLinkEndpoint _endpoint;
+
+ public SendingLinkAttachment(final Session_1_0 session, final SendingLinkEndpoint endpoint)
+ {
+ _session = session;
+ _endpoint = endpoint;
+ }
+
+ public Session_1_0 getSession()
+ {
+ return _session;
+ }
+
+ public SendingLinkEndpoint getEndpoint()
+ {
+ return _endpoint;
+ }
+
+ public Source getSource()
+ {
+ return getEndpoint().getSource();
+ }
+
+ public void setDeliveryStateHandler(final DeliveryStateHandler handler)
+ {
+ getEndpoint().setDeliveryStateHandler(handler);
+ }
+
+ public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled)
+ {
+ getEndpoint().updateDisposition(deliveryTag, state, settled);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
new file mode 100644
index 0000000000..152ccc706b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -0,0 +1,367 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
+{
+ private VirtualHost _vhost;
+ private SendingDestination _destination;
+
+ private Subscription_1_0 _subscription;
+ private boolean _draining;
+ private final Map<Binary, QueueEntry> _unsettledMap =
+ new HashMap<Binary, QueueEntry>();
+
+ private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap =
+ new ConcurrentHashMap<Binary, UnsettledAction>();
+ private volatile SendingLinkAttachment _linkAttachment;
+ private TerminusDurability _durability;
+ private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>();
+ private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
+ private Runnable _closeAction;
+
+ public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
+ final VirtualHost vhost,
+ final SendingDestination destination)
+ {
+ _vhost = vhost;
+ _destination = destination;
+ _linkAttachment = linkAttachment;
+ _durability = ((Source)linkAttachment.getSource()).getDurable();
+ linkAttachment.setDeliveryStateHandler(this);
+
+ if(destination instanceof QueueDestination)
+ {
+ AMQQueue queue = ((QueueDestination) _destination).getQueue();
+ if(queue.getArguments().containsKey("topic"))
+ {
+ ((Source)linkAttachment.getSource()).setDistributionMode(StdDistMode.COPY);
+ }
+ _subscription = new Subscription_1_0(this, (QueueDestination)destination);
+ try
+ {
+
+ queue.registerSubscription(_subscription, false);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //TODO
+ }
+ }
+
+ }
+
+ public void resume(SendingLinkAttachment linkAttachment)
+ {
+ _linkAttachment = linkAttachment;
+
+ }
+
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ //TODO
+ // if not durable or close
+ if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
+ (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+ {
+
+ try
+ {
+ ((QueueDestination)_destination).getQueue().unregisterSubscription(_subscription);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //TODO
+ }
+
+ DeliveryState state = new Released();
+
+ for(UnsettledAction action : _unsettledActionMap.values())
+ {
+
+ action.process(state,Boolean.TRUE);
+ }
+ _unsettledActionMap.clear();
+
+ endpoint.detach();
+ if(_closeAction != null)
+ {
+ _closeAction.run();
+ }
+ }
+ else if(detach == null || detach.getError() != null)
+ {
+ _linkAttachment = null;
+ _subscription.flowStateChanged();
+ }
+ }
+
+ public void start()
+ {
+ //TODO
+ }
+
+ public SendingLinkEndpoint getEndpoint()
+ {
+ return _linkAttachment == null ? null : _linkAttachment.getEndpoint() ;
+ }
+
+ public void flowStateChanged()
+ {
+ if(Boolean.TRUE.equals(getEndpoint().getDrain())
+ && getEndpoint().getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0)
+ {
+ _draining = true;
+ }
+
+ while(!_resumeAcceptedTransfers.isEmpty() && getEndpoint().hasCreditToSend())
+ {
+ Accepted accepted = new Accepted();
+ synchronized(getLock())
+ {
+
+ Transfer xfr = new Transfer();
+ Binary dt = _resumeAcceptedTransfers.remove(0);
+ xfr.setDeliveryTag(dt);
+ xfr.setState(accepted);
+ xfr.setResume(Boolean.TRUE);
+ getEndpoint().transfer(xfr);
+ }
+
+ }
+ if(_resumeAcceptedTransfers.isEmpty())
+ {
+ _subscription.flowStateChanged();
+ }
+
+ }
+
+ public boolean isDraining()
+ {
+ return false; //TODO
+ }
+
+ public boolean drained()
+ {
+ if(getEndpoint() != null)
+ {
+ synchronized(getEndpoint().getLock())
+ {
+ if(_draining)
+ {
+ //TODO
+ getEndpoint().drained();
+ _draining = false;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry)
+ {
+ _unsettledActionMap.put(tag,unsettledAction);
+ if(getTransactionId() == null)
+ {
+ _unsettledMap.put(tag, queueEntry);
+ }
+ }
+
+ public void removeUnsettled(Binary tag)
+ {
+ _unsettledActionMap.remove(tag);
+ }
+
+ public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+ {
+ UnsettledAction action = _unsettledActionMap.get(deliveryTag);
+ boolean localSettle = false;
+ if(action != null)
+ {
+ localSettle = action.process(state, settled);
+ if(localSettle && !Boolean.TRUE.equals(settled))
+ {
+ _linkAttachment.updateDisposition(deliveryTag, state, true);
+ }
+ }
+ if(Boolean.TRUE.equals(settled) || localSettle)
+ {
+ _unsettledActionMap.remove(deliveryTag);
+ _unsettledMap.remove(deliveryTag);
+ }
+ }
+
+ ServerTransaction getTransaction(Binary transactionId)
+ {
+ return _linkAttachment.getSession().getTransaction(transactionId);
+ }
+
+ public Binary getTransactionId()
+ {
+ return getEndpoint().getTransactionId();
+ }
+
+ public synchronized Object getLock()
+ {
+ return _linkAttachment == null ? this : getEndpoint().getLock();
+ }
+
+ public boolean isDetached()
+ {
+ return _linkAttachment == null || getEndpoint().isDetached();
+ }
+
+ public boolean isAttached()
+ {
+ return _linkAttachment != null && getEndpoint().isAttached();
+ }
+
+ public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
+ {
+
+ if(_subscription.isActive())
+ {
+ _subscription.suspend();
+ }
+
+ _linkAttachment = linkAttachment;
+
+ SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
+ endpoint.setDeliveryStateHandler(this);
+ Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
+ Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap);
+ _resumeAcceptedTransfers.clear();
+ _resumeFullTransfers.clear();
+ for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet())
+ {
+ Binary deliveryTag = entry.getKey();
+ final QueueEntry queueEntry = entry.getValue();
+ if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
+ {
+ queueEntry.setRedelivered();
+ queueEntry.release();
+ _unsettledMap.remove(deliveryTag);
+ }
+ else if(initialUnsettledMap != null && (initialUnsettledMap.get(deliveryTag) instanceof Outcome))
+ {
+ Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
+
+ if(outcome instanceof Accepted)
+ {
+ AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getTransactionLog());
+ if(_subscription.acquires())
+ {
+ txn.dequeue(Collections.singleton(queueEntry),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ queueEntry.discard();
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
+ }
+ }
+ else if(outcome instanceof Released)
+ {
+ AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getTransactionLog());
+ if(_subscription.acquires())
+ {
+ txn.dequeue(Collections.singleton(queueEntry),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ queueEntry.release();
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
+ }
+ }
+ //_unsettledMap.remove(deliveryTag);
+ initialUnsettledMap.remove(deliveryTag);
+ _resumeAcceptedTransfers.add(deliveryTag);
+ }
+ else
+ {
+ _resumeFullTransfers.add(queueEntry);
+ // exists in receivers map, but not yet got an outcome ... should resend with resume = true
+ }
+ // TODO - else
+ }
+
+ }
+
+ public Map getUnsettledOutcomeMap()
+ {
+ Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap);
+
+ for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet())
+ {
+ entry.setValue(null);
+ }
+
+ return unsettled;
+ }
+
+ public void setCloseAction(Runnable action)
+ {
+ _closeAction = action;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
new file mode 100644
index 0000000000..67b3d25d5e
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -0,0 +1,406 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.LifetimePolicy;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.transaction.Coordinator;
+import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.*;
+
+public class Session_1_0 implements SessionEventListener
+{
+ private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+ private IApplicationRegistry _appRegistry;
+ private VirtualHost _vhost;
+ private AutoCommitTransaction _transaction;
+
+ private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
+ new LinkedHashMap<Integer, ServerTransaction>();
+ private final Connection_1_0 _connection;
+
+
+ public Session_1_0(VirtualHost vhost, IApplicationRegistry appRegistry, final Connection_1_0 connection)
+ {
+ _appRegistry = appRegistry;
+ _vhost = vhost;
+ _transaction = new AutoCommitTransaction(vhost.getMessageStore());
+ _connection = connection;
+
+ }
+
+ public void remoteLinkCreation(final LinkEndpoint endpoint)
+ {
+
+
+ Destination destination;
+ Link_1_0 link = null;
+ Error error = null;
+
+ final LinkRegistry linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId());
+
+
+ if(endpoint.getRole() == Role.SENDER)
+ {
+
+ SendingLink_1_0 previousLink = linkRegistry.getDurableSendingLink(endpoint.getName());
+
+ if(previousLink == null)
+ {
+
+ Target target = (Target) endpoint.getTarget();
+ Source source = (Source) endpoint.getSource();
+
+
+ if(source != null)
+ {
+ if(Boolean.TRUE.equals(source.getDynamic()))
+ {
+ AMQQueue tempQueue = createTemporaryQueue(source.getDynamicNodeProperties());
+ source.setAddress(tempQueue.getName());
+ }
+ String addr = source.getAddress();
+ AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
+ if(queue != null)
+ {
+
+ destination = new QueueDestination(queue);
+ }
+ else
+ {
+ endpoint.setSource(null);
+ destination = null;
+ }
+
+ }
+ else
+ {
+ destination = null;
+ }
+
+ if(destination != null)
+ {
+ final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
+ final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
+ _vhost,
+ (SendingDestination) destination
+ );
+ sendingLinkEndpoint.setLinkEventListener(sendingLink);
+ link = sendingLink;
+ if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
+ {
+ linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
+ sendingLink.setCloseAction(new Runnable() {
+
+ public void run()
+ {
+ linkRegistry.unregisterSendingLink(endpoint.getName());
+ }
+ });
+ }
+ }
+ }
+ else
+ {
+ SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
+ previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
+ sendingLinkEndpoint.setLinkEventListener(previousLink);
+ link = previousLink;
+ endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+ }
+ }
+ else
+ {
+ if(endpoint.getTarget() instanceof Coordinator)
+ {
+ Coordinator coordinator = (Coordinator) endpoint.getTarget();
+ TxnCapability[] capabilities = coordinator.getCapabilities();
+ boolean localTxn = false;
+ boolean multiplePerSession = false;
+ if(capabilities != null)
+ {
+ for(TxnCapability capability : capabilities)
+ {
+ if(capability.equals(TxnCapability.LOCAL_TXN))
+ {
+ localTxn = true;
+ }
+ else if(capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
+ {
+ multiplePerSession = true;
+ }
+ else
+ {
+ error = new Error();
+ error.setCondition(AmqpError.NOT_IMPLEMENTED);
+ error.setDescription("Unsupported capability: " + capability);
+ break;
+ }
+ }
+ }
+
+ /* if(!localTxn)
+ {
+ capabilities.add(TxnCapabilities.LOCAL_TXN);
+ }*/
+
+ final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+ final TxnCoordinatorLink_1_0 coordinatorLink =
+ new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions);
+ receivingLinkEndpoint.setLinkEventListener(coordinatorLink);
+ link = coordinatorLink;
+
+
+ }
+ else
+ {
+
+ ReceivingLink_1_0 previousLink = linkRegistry.getDurableReceivingLink(endpoint.getName());
+
+ if(previousLink == null)
+ {
+
+ Target target = (Target) endpoint.getTarget();
+
+ if(target != null)
+ {
+ if(Boolean.TRUE.equals(target.getDynamic()))
+ {
+
+ AMQQueue tempQueue = createTemporaryQueue(target.getDynamicNodeProperties());
+ target.setAddress(tempQueue.getName());
+ }
+
+ String addr = target.getAddress();
+ Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr);
+ if(exchg != null)
+ {
+ destination = new ExchangeDestination(exchg);
+ }
+ else
+ {
+ AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
+ if(queue != null)
+ {
+
+ destination = new QueueDestination(queue);
+ }
+ else
+ {
+ endpoint.setTarget(null);
+ destination = null;
+ }
+
+ }
+
+
+ }
+ else
+ {
+ destination = null;
+ }
+ if(destination != null)
+ {
+ final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+ final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost,
+ (ReceivingDestination) destination);
+ receivingLinkEndpoint.setLinkEventListener(receivingLink);
+ link = receivingLink;
+ if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable()))
+ {
+ linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
+ }
+ }
+ }
+ else
+ {
+ ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+ previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
+ receivingLinkEndpoint.setLinkEventListener(previousLink);
+ link = previousLink;
+ endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+
+ }
+ }
+ }
+
+ endpoint.attach();
+
+ if(link == null)
+ {
+ endpoint.detach(error);
+ }
+ else
+ {
+ link.start();
+ }
+ }
+
+
+ private AMQQueue createTemporaryQueue(Map properties)
+ {
+ final String queueName = UUID.randomUUID().toString();
+ AMQQueue queue = null;
+ try
+ {
+ LifetimePolicy lifetimePolicy = properties == null
+ ? null
+ : (LifetimePolicy) properties.get(LIFETIME_POLICY);
+
+ final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl(queueName,
+ false, // durable
+ null, // owner
+ false, // autodelete
+ false, // exclusive
+ _vhost,
+ properties);
+
+
+
+ if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
+ {
+ final Connection_1_0.Task deleteQueueTask =
+ new Connection_1_0.Task()
+ {
+ public void doTask(Connection_1_0 session)
+ {
+ if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue)
+ {
+ try
+ {
+ tempQueue.delete();
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ }
+ }
+ };
+
+ _connection.addConnectionCloseTask(deleteQueueTask);
+
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue)
+ {
+ _connection.removeConnectionCloseTask(deleteQueueTask);
+ }
+
+
+ });
+ }
+ else if(lifetimePolicy instanceof DeleteOnNoLinks)
+ {
+
+ }
+ else if(lifetimePolicy instanceof DeleteOnNoMessages)
+ {
+
+ }
+ else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
+ {
+
+ }
+ }
+ catch (AMQSecurityException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
+ return queue;
+ }
+
+ public ServerTransaction getTransaction(Binary transactionId)
+ {
+ // TODO should treat invalid id differently to null
+ ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId));
+ return transaction == null ? _transaction : transaction;
+ }
+
+ public void remoteEnd(End end)
+ {
+ Iterator<Map.Entry<Integer, ServerTransaction>> iter = _openTransactions.entrySet().iterator();
+ while(iter.hasNext())
+ {
+ Map.Entry<Integer, ServerTransaction> entry = iter.next();
+ entry.getValue().rollback();
+ iter.remove();
+ }
+
+ }
+
+ Integer binaryToInteger(final Binary txnId)
+ {
+ if(txnId == null)
+ {
+ return null;
+ }
+
+ if(txnId.getLength() > 4)
+ throw new IllegalArgumentException();
+
+ int id = 0;
+ byte[] data = txnId.getArray();
+ for(int i = 0; i < txnId.getLength(); i++)
+ {
+ id <<= 8;
+ id += data[i+txnId.getArrayOffset()];
+ }
+
+ return id;
+
+ }
+
+ Binary integerToBinary(final int txnId)
+ {
+ byte[] data = new byte[4];
+ data[3] = (byte) (txnId & 0xff);
+ data[2] = (byte) ((txnId & 0xff00) >> 8);
+ data[1] = (byte) ((txnId & 0xff0000) >> 16);
+ data[0] = (byte) ((txnId & 0xff000000) >> 24);
+ return new Binary(data);
+
+ }
+
+ public void forceEnd()
+ {
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
new file mode 100644
index 0000000000..3797a09009
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -0,0 +1,487 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.Released;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.txn.ServerTransaction;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+class Subscription_1_0 implements Subscription
+{
+ private SendingLink_1_0 _link;
+
+ private AMQQueue _queue;
+
+ private final AtomicReference<State> _state = new AtomicReference<State>(State.SUSPENDED);
+
+ private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+ private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
+ private final long _id;
+ private final boolean _acquires;
+ private AMQQueue.Context _queueContext;
+ private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+ private ReentrantLock _stateChangeLock = new ReentrantLock();
+
+ private long _deliveryTag = 0L;
+ private StateListener _stateListener;
+
+ private Binary _transactionId;
+
+ public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination)
+ {
+ _link = link;
+ _queue = destination.getQueue();
+ _id = getEndpoint().getLocalHandle().longValue();
+ _acquires = !StdDistMode.COPY.equals(((Source) (getEndpoint().getSource())).getDistributionMode());
+ }
+
+ private SendingLinkEndpoint getEndpoint()
+ {
+ return _link.getEndpoint();
+ }
+
+ public LogActor getLogActor()
+ {
+ return null; //TODO
+ }
+
+ public boolean isTransient()
+ {
+ return true; //TODO
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public QueueEntry.SubscriptionAcquiredState getOwningState()
+ {
+ return _owningState;
+ }
+
+ public QueueEntry.SubscriptionAssignedState getAssignedState()
+ {
+ return _assignedState;
+ }
+
+ public void setQueue(final AMQQueue queue, final boolean exclusive)
+ {
+ //TODO
+ }
+
+ public void setNoLocal(final boolean noLocal)
+ {
+ //TODO
+ }
+
+ public long getSubscriptionID()
+ {
+ return _id;
+ }
+
+ public boolean isSuspended()
+ {
+ final boolean isSuspended = !isActive();// || !getEndpoint().hasCreditToSend();
+ return isSuspended;
+ }
+
+ public boolean hasInterest(final QueueEntry msg)
+ {
+ return true; //TODO - filters
+ }
+
+ public boolean isClosed()
+ {
+ return !getEndpoint().isAttached();
+ }
+
+ public boolean acquires()
+ {
+ return _acquires;
+ }
+
+ public boolean seesRequeues()
+ {
+ // TODO
+ return acquires();
+ }
+
+ public void close()
+ {
+ getEndpoint().detach();
+ }
+
+ public void send(final QueueEntry queueEntry) throws AMQException
+ {
+ //TODO
+ ServerMessage serverMessage = queueEntry.getMessage();
+ if(serverMessage instanceof Message_1_0)
+ {
+ Message_1_0 message = (Message_1_0) serverMessage;
+ Transfer transfer = new Transfer();
+ //TODO
+
+
+ List<ByteBuffer> fragments = message.getFragments();
+ ByteBuffer payload;
+ if(fragments.size() == 1)
+ {
+ payload = fragments.get(0);
+ }
+ else
+ {
+ int size = 0;
+ for(ByteBuffer fragment : fragments)
+ {
+ size += fragment.remaining();
+ }
+
+ payload = ByteBuffer.allocate(size);
+
+ for(ByteBuffer fragment : fragments)
+ {
+ payload.put(fragment.duplicate());
+ }
+
+ payload.flip();
+ }
+
+ transfer.setPayload(payload);
+ byte[] data = new byte[8];
+ ByteBuffer.wrap(data).putLong(_deliveryTag++);
+ final Binary tag = new Binary(data);
+
+ transfer.setDeliveryTag(tag);
+
+ synchronized(_link.getLock())
+ {
+ if(_link.isAttached())
+ {
+ if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
+ {
+ transfer.setSettled(true);
+ }
+ else
+ {
+ UnsettledAction action = _acquires
+ ? new DispositionAction(tag, queueEntry)
+ : new DoNothingAction(tag, queueEntry);
+
+ _link.addUnsettled(tag, action, queueEntry);
+ }
+
+ if(_transactionId != null)
+ {
+ TransactionalState state = new TransactionalState();
+ state.setTxnId(_transactionId);
+ transfer.setState(state);
+ }
+ // TODO - need to deal with failure here
+ if(_acquires && _transactionId != null)
+ {
+ ServerTransaction txn = _link.getTransaction(_transactionId);
+ if(txn != null)
+ {
+ txn.addPostTransactionAction(new ServerTransaction.Action(){
+
+ public void postCommit()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void onRollback()
+ {
+ if(queueEntry.isAcquiredBy(Subscription_1_0.this))
+ {
+ queueEntry.release();
+ _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
+
+
+ }
+ }
+ });
+ }
+
+ }
+
+ getEndpoint().transfer(transfer);
+ }
+ else
+ {
+ queueEntry.release();
+ }
+ }
+ }
+
+ }
+
+ public void queueDeleted(final AMQQueue queue)
+ {
+ //TODO
+ getEndpoint().setSource(null);
+ getEndpoint().detach();
+ }
+
+ public boolean wouldSuspend(final QueueEntry msg)
+ {
+ final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
+ if(!hasCredit && getState() == State.ACTIVE)
+ {
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+ }
+
+ return !hasCredit;
+ }
+
+ public void suspend()
+ {
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+ }
+
+ public void getSendLock()
+ {
+ _stateChangeLock.lock();
+ }
+
+ public void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
+ }
+
+
+ public void onDequeue(final QueueEntry queueEntry)
+ {
+ //TODO
+ }
+
+ public void restoreCredit(final QueueEntry queueEntry)
+ {
+ //TODO
+ }
+
+ public void setStateListener(final StateListener listener)
+ {
+ _stateListener = listener;
+ }
+
+ public State getState()
+ {
+ return _state.get();
+ }
+
+ public AMQQueue.Context getQueueContext()
+ {
+ return _queueContext;
+ }
+
+ public void setQueueContext(AMQQueue.Context queueContext)
+ {
+ _queueContext = queueContext;
+ }
+
+
+ public boolean isActive()
+ {
+ return getState() == State.ACTIVE;
+ }
+
+ public void set(String key, Object value)
+ {
+ _properties.put(key, value);
+ }
+
+ public Object get(String key)
+ {
+ return _properties.get(key);
+ }
+
+ public boolean isSessionTransactional()
+ {
+ return false; //TODO
+ }
+
+ public void queueEmpty()
+ {
+ if(_link.drained())
+ {
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+ }
+ }
+
+ public void flowStateChanged()
+ {
+ if(isSuspended())
+ {
+ if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+ {
+ _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+ }
+ _transactionId = _link.getTransactionId();
+ }
+ }
+
+ private class DispositionAction implements UnsettledAction
+ {
+
+ private final QueueEntry _queueEntry;
+ private final Binary _deliveryTag;
+
+ public DispositionAction(Binary tag, QueueEntry queueEntry)
+ {
+ _deliveryTag = tag;
+ _queueEntry = queueEntry;
+ }
+
+ public boolean process(DeliveryState state, Boolean settled)
+ {
+
+ Binary transactionId = null;
+ final Outcome outcome;
+ // If disposition is settled this overrides the txn?
+ if(state instanceof TransactionalState)
+ {
+ transactionId = ((TransactionalState)state).getTxnId();
+ outcome = ((TransactionalState)state).getOutcome();
+ }
+ else if (state instanceof Outcome)
+ {
+ outcome = (Outcome) state;
+ }
+ else
+ {
+ outcome = null;
+ }
+
+
+ ServerTransaction txn = _link.getTransaction(transactionId);
+
+ if(outcome instanceof Accepted)
+ {
+ txn.dequeue(_queueEntry.getQueue(), _queueEntry.getMessage(),
+ new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ if(_queueEntry.isAcquiredBy(Subscription_1_0.this))
+ {
+ _queueEntry.discard();
+ }
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ txn.addPostTransactionAction(new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ //_link.getEndpoint().settle(_deliveryTag);
+ _link.getEndpoint().updateDisposition(_deliveryTag, (DeliveryState)outcome, true);
+ _link.getEndpoint().sendFlowConditional();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
+ else if(outcome instanceof Released)
+ {
+ txn.addPostTransactionAction(new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ _queueEntry.release();
+ _link.getEndpoint().settle(_deliveryTag);
+ }
+
+ public void onRollback()
+ {
+ _link.getEndpoint().settle(_deliveryTag);
+ }
+ });
+ }
+
+ return (transactionId == null && outcome != null);
+ }
+ }
+
+ private class DoNothingAction implements UnsettledAction
+ {
+ public DoNothingAction(final Binary tag,
+ final QueueEntry queueEntry)
+ {
+ }
+
+ public boolean process(final DeliveryState state, final Boolean settled)
+ {
+ Binary transactionId = null;
+ Outcome outcome = null;
+ // If disposition is settled this overrides the txn?
+ if(state instanceof TransactionalState)
+ {
+ transactionId = ((TransactionalState)state).getTxnId();
+ outcome = ((TransactionalState)state).getOutcome();
+ }
+ else if (state instanceof Outcome)
+ {
+ outcome = (Outcome) state;
+ }
+ return true;
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
new file mode 100644
index 0000000000..0e700dc10b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.transaction.Declare;
+import org.apache.qpid.amqp_1_0.type.transaction.Declared;
+import org.apache.qpid.amqp_1_0.type.transaction.Discharge;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class TxnCoordinatorLink_1_0 implements ReceivingLinkListener, Link_1_0
+{
+ private VirtualHost _vhost;
+ private ReceivingLinkEndpoint _endpoint;
+
+ private ArrayList<Transfer> _incompleteMessage;
+ private SectionDecoder _sectionDecoder;
+ private LinkedHashMap<Integer, ServerTransaction> _openTransactions;
+ private Session_1_0 _session;
+
+
+ public TxnCoordinatorLink_1_0(VirtualHost vhost,
+ Session_1_0 session_1_0, ReceivingLinkEndpoint endpoint,
+ LinkedHashMap<Integer, ServerTransaction> openTransactions)
+ {
+ _vhost = vhost;
+ _session = session_1_0;
+ _endpoint = endpoint;
+ _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry());
+ _openTransactions = openTransactions;
+ }
+
+ public void messageTransfer(Transfer xfr)
+ {
+ // TODO - cope with fragmented messages
+
+ ByteBuffer payload = null;
+
+
+ if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
+ {
+ _incompleteMessage = new ArrayList<Transfer>();
+ _incompleteMessage.add(xfr);
+ return;
+ }
+ else if(_incompleteMessage != null)
+ {
+ _incompleteMessage.add(xfr);
+ if(Boolean.TRUE.equals(xfr.getMore()))
+ {
+ return;
+ }
+
+ int size = 0;
+ for(Transfer t : _incompleteMessage)
+ {
+ size += t.getPayload().limit();
+ }
+ payload = ByteBuffer.allocate(size);
+ for(Transfer t : _incompleteMessage)
+ {
+ payload.put(t.getPayload().duplicate());
+ }
+ payload.flip();
+ _incompleteMessage=null;
+
+ }
+ else
+ {
+ payload = xfr.getPayload();
+ }
+
+
+ // Only interested int he amqp-value section that holds the message to the co-ordinator
+ try
+ {
+ List<Section> sections = _sectionDecoder.parseAll(payload);
+
+ for(Section section : sections)
+ {
+ if(section instanceof AmqpValue)
+ {
+ Object command = ((AmqpValue) section).getValue();
+
+ if(command instanceof Declare)
+ {
+ Integer txnId = Integer.valueOf(0);
+ Iterator<Integer> existingTxn = _openTransactions.keySet().iterator();
+ while(existingTxn.hasNext())
+ {
+ txnId = existingTxn.next();
+ }
+ txnId = Integer.valueOf(txnId.intValue() + 1);
+
+ _openTransactions.put(txnId, new LocalTransaction(_vhost.getTransactionLog()));
+
+ Declared state = new Declared();
+
+
+
+ state.setTxnId(_session.integerToBinary(txnId));
+ _endpoint.updateDisposition(xfr.getDeliveryTag(), state, true);
+
+ }
+ else if(command instanceof Discharge)
+ {
+ Discharge discharge = (Discharge) command;
+
+ DeliveryState state = xfr.getState();
+ discharge(_session.binaryToInteger(discharge.getTxnId()), discharge.getFail());
+ _endpoint.updateDisposition(xfr.getDeliveryTag(), new Accepted(), true);
+
+ }
+ }
+ }
+
+ }
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
+ }
+
+ public void remoteDetached(LinkEndpoint endpoint, Detach detach)
+ {
+ //TODO
+ endpoint.detach();
+ }
+
+ private Error discharge(Integer transactionId, boolean fail)
+ {
+ Error error = null;
+ ServerTransaction txn = _openTransactions.get(transactionId);
+ if(txn != null)
+ {
+ if(fail)
+ {
+ txn.rollback();
+ }
+ else
+ {
+ txn.commit();
+ }
+ _openTransactions.remove(transactionId);
+ }
+ else
+ {
+ error = new Error();
+ error.setCondition(AmqpError.NOT_FOUND);
+ error.setDescription("Unkown transactionId" + transactionId);
+ }
+ return error;
+ }
+
+
+
+ public void start()
+ {
+ _endpoint.setLinkCredit(UnsignedInteger.ONE);
+ _endpoint.setCreditWindow();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java
new file mode 100644
index 0000000000..c497cc5146
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java
@@ -0,0 +1,8 @@
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+
+public interface UnsettledAction
+{
+ boolean process(DeliveryState state, Boolean settled);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index bee55118ba..5fbad74978 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -245,6 +245,15 @@ public class AMQQueueFactory
}
}
+ if(config.isTopic())
+ {
+ if(arguments == null)
+ {
+ arguments = new HashMap<String,Object>();
+ }
+ arguments.put("topic", Boolean.TRUE);
+ }
+
AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments);
q.configure(config);
return q;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
index 76ebea0321..1612d13b1b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.security.auth.sasl.amqplain.AmqPlainInitialiser;
+import org.apache.qpid.server.security.auth.sasl.anonymous.AnonymousInitialiser;
import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5Initialiser;
import org.apache.qpid.server.security.auth.sasl.plain.PlainInitialiser;
@@ -78,6 +79,13 @@ public class PlainPasswordFilePrincipalDatabase implements PrincipalDatabase
AmqPlainInitialiser amqplain = new AmqPlainInitialiser();
amqplain.initialise(this);
+
+
+ // Accept AMQPlain incomming and compare it to the file.
+ AnonymousInitialiser anonymous = new AnonymousInitialiser();
+ anonymous.initialise(this);
+
+
// Accept Plain incomming and compare it to the file.
PlainInitialiser plain = new PlainInitialiser();
plain.initialise(this);
@@ -89,6 +97,7 @@ public class PlainPasswordFilePrincipalDatabase implements PrincipalDatabase
_saslServers.put(amqplain.getMechanismName(), amqplain);
_saslServers.put(plain.getMechanismName(), plain);
_saslServers.put(cram.getMechanismName(), cram);
+ _saslServers.put(anonymous.getMechanismName(), anonymous);
}
public void setPasswordFile(String passwordFile) throws IOException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java
index 4a66b74783..e35e999766 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java
@@ -20,18 +20,53 @@
*/
package org.apache.qpid.server.security.auth.sasl.anonymous;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.SaslServerFactory;
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
import org.apache.qpid.server.security.auth.sasl.UsernamePasswordInitialiser;
-import org.apache.qpid.server.security.auth.sasl.amqplain.AmqPlainSaslServerFactory;
-public class AnonymousInitialiser extends UsernamePasswordInitialiser
+import java.io.IOException;
+import java.util.Map;
+
+public class AnonymousInitialiser implements AuthenticationProviderInitialiser
{
public String getMechanismName()
{
return "ANONYMOUS";
}
+ public void initialise(String baseConfigPath, Configuration configuration, Map<String, PrincipalDatabase> principalDatabases) throws Exception
+ {
+ }
+
+ public void initialise(PrincipalDatabase db)
+ {
+ }
+
+ public CallbackHandler getCallbackHandler()
+ {
+ return new CallbackHandler()
+ {
+
+ public Callback[] _callbacks;
+
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
+ {
+ _callbacks =callbacks;
+ }
+ };
+ }
+
+ public Map<String, ?> getProperties()
+ {
+ return null;
+ }
+
public Class<? extends SaslServerFactory> getServerFactoryClassForJCARegistration()
{
return AnonymousSaslServerFactory.class;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java
index 428bb1e41b..0fab60b6f3 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java
@@ -22,13 +22,16 @@ package org.apache.qpid.server.store;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.message.MessageMetaData_1_0;
import java.nio.ByteBuffer;
public enum MessageMetaDataType
{
META_DATA_0_8 { public Factory<MessageMetaData> getFactory() { return MessageMetaData.FACTORY; } },
- META_DATA_0_10 { public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } };
+ META_DATA_0_10 { public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } },
+ META_DATA_1_0 { public Factory<MessageMetaData_1_0> getFactory() { return MessageMetaData_1_0.FACTORY; } };
+
public static interface Factory<M extends StorableMessageMetaData>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
index 12d2a6a6c7..2381301c30 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
@@ -30,7 +30,5 @@ public interface StorableMessageMetaData
int writeToBuffer(int offsetInMetaData, ByteBuffer dest);
- int getContentSize();
-
boolean isPersistent();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index 1f5b027b80..fba4745523 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -26,15 +26,13 @@ import java.nio.ByteBuffer;
public class StoredMemoryMessage implements StoredMessage
{
private final long _messageNumber;
- private final ByteBuffer _content;
+ private ByteBuffer _content;
private final StorableMessageMetaData _metaData;
public StoredMemoryMessage(long messageNumber, StorableMessageMetaData metaData)
{
_messageNumber = messageNumber;
_metaData = metaData;
- _content = ByteBuffer.allocate(metaData.getContentSize());
-
}
public long getMessageNumber()
@@ -45,6 +43,16 @@ public class StoredMemoryMessage implements StoredMessage
public void addContent(int offsetInMessage, ByteBuffer src)
{
src = src.duplicate();
+ if(_content == null || offsetInMessage + src.remaining() > _content.capacity())
+ {
+ ByteBuffer newContent = ByteBuffer.allocate(offsetInMessage+src.remaining());
+ if(_content != null)
+ {
+ newContent.duplicate().put(_content.array());
+ }
+ _content = newContent;
+ }
+
ByteBuffer dst = _content.duplicate();
dst.position(offsetInMessage);
dst.put(src);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 0a3576ff42..5c4e413989 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -55,16 +55,12 @@ public interface Subscription
void setNoLocal(boolean noLocal);
- AMQShortString getConsumerTag();
-
long getSubscriptionID();
boolean isSuspended();
boolean hasInterest(QueueEntry msg);
- boolean isAutoClose();
-
boolean isClosed();
boolean acquires();
@@ -99,11 +95,11 @@ public interface Subscription
boolean isActive();
- void confirmAutoClose();
-
public void set(String key, Object value);
public Object get(String key);
boolean isSessionTransactional();
+
+ void queueEmpty() throws AMQException;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index d8f44c9f7f..08867f86ca 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -341,7 +341,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
{
return getQueue().getConfigStore();
}
-
+
public Long getDelivered()
{
return _deliveredCount.get();
@@ -777,9 +777,19 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
{
return _channel.isTransactional();
}
-
+
public long getCreateTime()
{
return _createTime;
}
+
+ public void queueEmpty() throws AMQException
+ {
+ if (isAutoClose())
+ {
+ _queue.unregisterSubscription(this);
+
+ confirmAutoClose();
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index b36ac84cdd..7884565073 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -103,7 +103,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public void stateChange(Subscription sub, State oldState, State newState)
{
- CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
+ CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
}
};
private AMQQueue _queue;
@@ -189,12 +189,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
filterLogString.length() > 0));
}
-
- }
- public AMQShortString getConsumerTag()
- {
- return new AMQShortString(_destination);
}
public boolean isSuspended()
@@ -234,12 +229,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return (_filters == null) || _filters.allAllow(entry);
}
- public boolean isAutoClose()
- {
- // no such thing in 0-10
- return false;
- }
-
public boolean isClosed()
{
return getState() == State.CLOSED;
@@ -292,7 +281,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
return getQueue().getConfigStore();
}
-
+
public Long getDelivered()
{
return _deliveredCount.get();
@@ -708,11 +697,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return getState() == State.ACTIVE;
}
- public void confirmAutoClose()
- {
- //No such thing in 0-10
- }
-
public void set(String key, Object value)
{
_properties.put(key, value);
@@ -901,6 +885,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return _session.isTransactional();
}
+ public void queueEmpty()
+ {
+ }
+
public long getCreateTime()
{
return _createTime;
@@ -908,7 +896,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public String toLogString()
{
- String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
+ String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
_queue.getNameShortString());
String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "("
// queueString is "vh(/{0})/qu({1}) " so need to trim
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 4ed0507228..bf7cf831c1 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfig;
import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeFactory;
@@ -99,4 +100,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo
ConfigStore getConfigStore();
void removeBrokerConnection(BrokerLink brokerLink);
+
+ LinkRegistry getLinkRegistry(String remoteContainerId);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
index 7dc491de4d..0f812c9c7a 100755
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
@@ -34,7 +34,7 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
{
private long _messageId;
private MessageMetaData _metaData;
- private final ByteBuffer _content;
+ private ByteBuffer _content;
public MockStoredMessage(long messageId)
@@ -46,8 +46,6 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
{
_messageId = messageId;
_metaData = new MessageMetaData(info, chb, 0);
- _content = ByteBuffer.allocate(_metaData.getContentSize());
-
}
public MessageMetaData getMetaData()
@@ -63,6 +61,16 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
public void addContent(int offsetInMessage, ByteBuffer src)
{
src = src.duplicate();
+ if(_content == null || offsetInMessage + src.remaining() > _content.capacity())
+ {
+ ByteBuffer newContent = ByteBuffer.allocate(offsetInMessage+src.remaining());
+ if(_content != null)
+ {
+ newContent.duplicate().put(_content.array());
+ }
+ _content = newContent;
+ }
+
ByteBuffer dst = _content.duplicate();
dst.position(offsetInMessage);
dst.put(src);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 1ec134e90e..1c318aa457 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -129,11 +129,6 @@ public class MockSubscription implements Subscription
return true;
}
- public void confirmAutoClose()
- {
-
- }
-
public void set(String key, Object value)
{
}
@@ -143,11 +138,6 @@ public class MockSubscription implements Subscription
return null;
}
- public boolean isAutoClose()
- {
- return false;
- }
-
public boolean isBrowser()
{
return false;
@@ -214,7 +204,7 @@ public class MockSubscription implements Subscription
}
public void setNoLocal(boolean noLocal)
- {
+ {
}
public void setStateListener(StateListener listener)
@@ -241,4 +231,9 @@ public class MockSubscription implements Subscription
{
return false;
}
+
+ public void queueEmpty() throws AMQException
+ {
+ //TODO
+ }
}