diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2011-08-14 17:14:51 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2011-08-14 17:14:51 +0000 |
commit | 858ddcc441ca47636a710d93f5084146ce73476c (patch) | |
tree | 110a4d66dcaed5a2b180cc0f737f21761cc259d7 /qpid/java/broker/src | |
parent | d84a3a50dbb794c4383de7e5eca730ca602771e7 (diff) | |
download | qpid-python-858ddcc441ca47636a710d93f5084146ce73476c.tar.gz |
Initial checkin of AMQP 1-0 Java Prototype work
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1157566 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
40 files changed, 4325 insertions, 69 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java index 895ff643a2..3f6290dedb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java @@ -28,7 +28,7 @@ import org.apache.qpid.transport.codec.BBEncoder; import java.nio.ByteBuffer; import java.util.Set; -public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHeader +public class QMFMessage implements ServerMessage<QMFMessage>, InboundMessage, AMQMessageHeader { private ByteBuffer _content; @@ -154,7 +154,7 @@ public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHead return false; } - public MessageReference newReference() + public MessageReference<QMFMessage> newReference() { return new QMFMessageReference(this); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 4f86c82578..7cad0bc5c1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -70,6 +70,7 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; +import org.apache.qpid.server.subscription.SubscriptionImpl; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -136,7 +137,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private final AtomicBoolean _suspended = new AtomicBoolean(false); private ServerTransaction _transaction; - + private final AtomicLong _txnStarts = new AtomicLong(0); private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); @@ -199,7 +200,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel // theory return !(_transaction instanceof AutoCommitTransaction); } - + private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -209,7 +210,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _txnCount.compareAndSet(0,1); } } - + private void decrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -308,7 +309,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel try { _currentMessage.getStoredMessage().flushToStore(); - + final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues(); if(!checkMessageUserId(_currentMessage.getContentHeader())) @@ -425,7 +426,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { throw new AMQException("Consumer already exists with same tag: " + tag); } - + Subscription subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager); @@ -933,7 +934,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel finally { _rollingBack = false; - + _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); @@ -1017,7 +1018,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel throws AMQException { getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), - deliveryTag, sub.getConsumerTag()); + deliveryTag, + ((SubscriptionImpl)sub).getConsumerTag()); } }; @@ -1402,7 +1404,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { return _createTime; } - + public void mgmtClose() throws AMQException { _session.mgmtCloseChannel(_channelId); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 4512de6fb4..3a28b65a08 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -167,6 +167,11 @@ public class QueueConfiguration extends ConfigurationPlugin return getStringValue("lvqKey", null); } + public boolean isTopic() + { + return getBooleanValue("topic"); + } + public static class QueueConfig extends ConfigurationPlugin { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java index e0c181a5fc..f16e30ef92 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java @@ -37,7 +37,7 @@ import java.nio.ByteBuffer; /** * A deliverable message. */ -public class AMQMessage implements ServerMessage +public class AMQMessage implements ServerMessage<AMQMessage> { /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); @@ -73,7 +73,7 @@ public class AMQMessage implements ServerMessage { this(handle, null); } - + public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef) { _handle = handle; @@ -85,7 +85,7 @@ public class AMQMessage implements ServerMessage { _flags |= IMMEDIATE; } - + _channelRef = channelRef; } @@ -294,7 +294,7 @@ public class AMQMessage implements ServerMessage return _expiration; } - public MessageReference newReference() + public AMQMessageReference newReference() { return new AMQMessageReference(this); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java new file mode 100755 index 0000000000..a5841dd94e --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java @@ -0,0 +1,501 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.message; + +import org.apache.qpid.amqp_1_0.codec.ValueHandler; +import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.amqp_1_0.type.messaging.*; + + +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.qpid.configuration.Validator; +import org.apache.qpid.server.store.MessageMetaDataType; +import org.apache.qpid.server.store.StorableMessageMetaData; + +import java.nio.ByteBuffer; +import java.util.*; + +public class MessageMetaData_1_0 implements StorableMessageMetaData +{ + // TODO move to somewhere more useful + public static final Symbol JMS_TYPE = Symbol.valueOf("x-jms-type"); + + + private Header _header; + private Properties _properties; + private Map _deliveryAnnotations; + private Map _messageAnnotations; + private Map _appProperties; + private Map _footer; + + private List<ByteBuffer> _encodedSections = new ArrayList<ByteBuffer>(3); + + private volatile ByteBuffer _encoded; + private MessageHeader_1_0 _messageHeader; + + + + public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder) + { + this(fragments, decoder, new ArrayList<ByteBuffer>(3)); + } + + public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder, List<ByteBuffer> immuatableSections) + { + this(constructSections(fragments, decoder,immuatableSections), immuatableSections); + } + + private MessageMetaData_1_0(List<Section> sections, List<ByteBuffer> encodedSections) + { + _encodedSections = encodedSections; + + Iterator<Section> sectIter = sections.iterator(); + + Section section = sectIter.hasNext() ? sectIter.next() : null; + if(section instanceof Header) + { + _header = (Header) section; + section = sectIter.hasNext() ? sectIter.next() : null; + } + + if(section instanceof DeliveryAnnotations) + { + _deliveryAnnotations = ((DeliveryAnnotations) section).getValue(); + section = sectIter.hasNext() ? sectIter.next() : null; + } + + if(section instanceof MessageAnnotations) + { + _messageAnnotations = ((MessageAnnotations) section).getValue(); + section = sectIter.hasNext() ? sectIter.next() : null; + } + + if(section instanceof Properties) + { + _properties = (Properties) section; + section = sectIter.hasNext() ? sectIter.next() : null; + } + + if(section instanceof ApplicationProperties) + { + _appProperties = ((ApplicationProperties) section).getValue(); + section = sectIter.hasNext() ? sectIter.next() : null; + } + + if(section instanceof Footer) + { + _footer = ((Footer) section).getValue(); + section = sectIter.hasNext() ? sectIter.next() : null; + } + + _messageHeader = new MessageHeader_1_0(); + + } + + private static List<Section> constructSections(final ByteBuffer[] fragments, final SectionDecoder decoder, List<ByteBuffer> encodedSections) + { + List<Section> sections = new ArrayList<Section>(3); + + ByteBuffer src; + if(fragments.length == 1) + { + src = fragments[0].duplicate(); + } + else + { + int size = 0; + for(ByteBuffer buf : fragments) + { + size += buf.remaining(); + } + src = ByteBuffer.allocate(size); + for(ByteBuffer buf : fragments) + { + src.put(buf.duplicate()); + } + src.flip(); + + } + + try + { + int startBarePos = -1; + int lastPos = src.position(); + Section s = decoder.readSection(src); + + + + if(s instanceof Header) + { + sections.add(s); + lastPos = src.position(); + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + + if(s instanceof DeliveryAnnotations) + { + sections.add(s); + lastPos = src.position(); + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + + if(s instanceof MessageAnnotations) + { + sections.add(s); + lastPos = src.position(); + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + + if(s instanceof Properties) + { + sections.add(s); + if(startBarePos == -1) + { + startBarePos = lastPos; + } + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + + if(s instanceof ApplicationProperties) + { + sections.add(s); + if(startBarePos == -1) + { + startBarePos = lastPos; + } + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + + if(s instanceof AmqpValue) + { + if(startBarePos == -1) + { + startBarePos = lastPos; + } + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + else if(s instanceof Data) + { + if(startBarePos == -1) + { + startBarePos = lastPos; + } + do + { + s = src.hasRemaining() ? decoder.readSection(src) : null; + } while(s instanceof Data); + } + else if(s instanceof AmqpSequence) + { + if(startBarePos == -1) + { + startBarePos = lastPos; + } + do + { + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + while(s instanceof AmqpSequence); + } + + if(s instanceof Footer) + { + sections.add(s); + } + + + int pos = 0; + for(ByteBuffer buf : fragments) + { +/* + if(pos < startBarePos) + { + if(pos + buf.remaining() > startBarePos) + { + ByteBuffer dup = buf.duplicate(); + dup.position(dup.position()+startBarePos-pos); + dup.slice(); + encodedSections.add(dup); + } + } + else +*/ + { + encodedSections.add(buf.duplicate()); + } + pos += buf.remaining(); + } + + return sections; + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new IllegalArgumentException(e); + } + } + + + public MessageMetaDataType getType() + { + return MessageMetaDataType.META_DATA_1_0; + } + + + public int getStorableSize() + { + int size = 0; + + for(ByteBuffer bin : _encodedSections) + { + size += bin.limit(); + } + + return size; + } + + private ByteBuffer encodeAsBuffer() + { + ByteBuffer buf = ByteBuffer.allocate(getStorableSize()); + + for(ByteBuffer bin : _encodedSections) + { + buf.put(bin.duplicate()); + } + + return buf; + } + + public int writeToBuffer(int offsetInMetaData, ByteBuffer dest) + { + ByteBuffer buf = _encoded; + + if(buf == null) + { + buf = encodeAsBuffer(); + _encoded = buf; + } + + buf = buf.duplicate(); + + buf.position(offsetInMetaData); + + if(dest.remaining() < buf.limit()) + { + buf.limit(dest.remaining()); + } + dest.put(buf); + return buf.limit(); + } + + public boolean isPersistent() + { + return _header != null && Boolean.TRUE.equals(_header.getDurable()); + } + + public MessageHeader_1_0 getMessageHeader() + { + return _messageHeader; + } + + public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory(); + + + private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_1_0> + { + private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance(); + + public MessageMetaData_1_0 createMetaData(ByteBuffer buf) + { + ValueHandler valueHandler = new ValueHandler(_typeRegistry); + + ArrayList<Section> sections = new ArrayList<Section>(3); + ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(3); + + while(buf.hasRemaining()) + { + try + { + ByteBuffer encodedBuf = buf.duplicate(); + sections.add((Section) valueHandler.parse(buf)); + encodedBuf.limit(buf.position()); + encodedSections.add(encodedBuf); + + } + catch (AmqpErrorException e) + { + //TODO + throw new RuntimeException(e); + } + + } + + return new MessageMetaData_1_0(sections,encodedSections); + + } + } + + public class MessageHeader_1_0 implements AMQMessageHeader + { + + public String getCorrelationId() + { + if(_properties == null || _properties.getCorrelationId() == null) + { + return null; + } + else + { + return _properties.getMessageId().toString(); + } + } + + public long getExpiration() + { + return 0; //TODO + } + + public String getMessageId() + { + if(_properties == null || _properties.getCorrelationId() == null) + { + return null; + } + else + { + return _properties.getCorrelationId().toString(); + } + } + + public String getMimeType() + { + + if(_properties == null || _properties.getContentType() == null) + { + return null; + } + else + { + return _properties.getContentType().toString(); + } + } + + public String getEncoding() + { + return null; //TODO + } + + public byte getPriority() + { + if(_header == null || _header.getPriority() == null) + { + return 4; //javax.jms.Message.DEFAULT_PRIORITY; + } + else + { + return _header.getPriority().byteValue(); + } + } + + public long getTimestamp() + { + if(_properties == null || _properties.getCreationTime() == null) + { + return 0L; + } + else + { + return _properties.getCreationTime().getTime(); + } + + } + + public String getType() + { + + if(_appProperties == null || _appProperties.get(JMS_TYPE) == null) + { + return null; + } + else + { + return _appProperties.get(JMS_TYPE).toString(); + } + } + + public String getReplyTo() + { + if(_properties == null || _properties.getReplyTo() == null) + { + return null; + } + else + { + return _properties.getReplyTo().toString(); + } + } + + public String getReplyToExchange() + { + return null; //TODO + } + + public String getReplyToRoutingKey() + { + return null; //TODO + } + + public Object getHeader(final String name) + { + return _appProperties == null ? null : _appProperties.get(name); + } + + public boolean containsHeaders(final Set<String> names) + { + if(_appProperties == null) + { + return false; + } + + for(String key : names) + { + if(!_appProperties.containsKey(key)) + { + return false; + } + } + return true; + } + + public boolean containsHeader(final String name) + { + return _appProperties != null && _appProperties.containsKey(name); + } + + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java index 08006435f8..af78042a63 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java @@ -29,7 +29,7 @@ import java.nio.ByteBuffer; import java.lang.ref.WeakReference; -public class MessageTransferMessage implements InboundMessage, ServerMessage +public class MessageTransferMessage implements InboundMessage, ServerMessage<MessageTransferMessage> { @@ -90,7 +90,7 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage return getMetaData().getExpiration(); } - public MessageReference newReference() + public MessageReference<MessageTransferMessage> newReference() { return new TransferMessageReference(this); } @@ -145,5 +145,5 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage { return _sessionRef == null ? null : (ServerSession) _sessionRef.get(); } - + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java index 2f2d39115f..e3cc9879fa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -24,7 +24,7 @@ import java.nio.ByteBuffer; import org.apache.qpid.server.configuration.SessionConfig; -public interface ServerMessage extends EnqueableMessage, MessageContentSource +public interface ServerMessage<T extends ServerMessage> extends EnqueableMessage, MessageContentSource { String getRoutingKey(); @@ -38,7 +38,7 @@ public interface ServerMessage extends EnqueableMessage, MessageContentSource long getExpiration(); - MessageReference newReference(); + MessageReference<T> newReference(); Long getMessageNumber(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java new file mode 100644 index 0000000000..c1282555cc --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPSASLEngine_1_0_0.java @@ -0,0 +1,339 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.amqp_1_0.codec.FrameWriter; +import org.apache.qpid.amqp_1_0.framing.AMQFrame; +import org.apache.qpid.amqp_1_0.framing.FrameHandler; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; +import org.apache.qpid.amqp_1_0.transport.Container; +import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler; +import org.apache.qpid.amqp_1_0.type.FrameBody; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.configuration.ConfigStore; +import org.apache.qpid.server.configuration.ConnectionConfigType; +import org.apache.qpid.server.protocol.v1_0.Connection_1_0; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.transport.NetworkDriver; + +import java.io.PrintWriter; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.UUID; + +public class AMQPSASLEngine_1_0_0 implements ProtocolEngine, FrameOutputHandler +{ + public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + + private NetworkDriver _networkDriver; + private long _readBytes; + private long _writtenBytes; + private final UUID _id; + private final IApplicationRegistry _appRegistry; + private long _createTime = System.currentTimeMillis(); + + private static final int BUF_SIZE = 8; + private static final ByteBuffer HEADER = + ByteBuffer.wrap(new byte[] + { + (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte) 0, + (byte) 1, + (byte) 0, + (byte) 0 + }); + + private FrameWriter _frameWriter; + private FrameHandler _frameHandler; + private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024); + private Object _sendLock = new Object(); + private byte _major; + private byte _minor; + private byte _revision; + private ConnectionEndpoint _conn; + + + static enum State { + A, + M, + Q, + P, + PROTOCOL, + MAJOR, + MINOR, + REVISION, + FRAME + } + + private State _state = State.A; + + public AMQPSASLEngine_1_0_0(NetworkDriver networkDriver, + final IApplicationRegistry appRegistry) + { + + + _networkDriver = networkDriver; + _id = appRegistry.getConfigStore().createId(); + _appRegistry = appRegistry; + + + // FIXME Two log messages to maintain compatinbility with earlier protocol versions +// _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); +// _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); + } + + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + + + _conn = new ConnectionEndpoint(new Container(),_appRegistry.getAuthenticationManager()); + _conn.setConnectionEventListener(new Connection_1_0(_appRegistry)); + _conn.setFrameOutputHandler(this); + + + _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); + _frameHandler = new FrameHandler(_conn); + + _networkDriver.send(HEADER.duplicate()); + + + + } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public long getReadBytes() + { + return _readBytes; + } + + public long getWrittenBytes() + { + return _writtenBytes; + } + + public void writerIdle() + { + //Todo + } + + public void readerIdle() + { + //Todo + } + + public String getAddress() + { + return getRemoteAddress().toString(); + } + + + public ConfigStore getConfigStore() + { + return _appRegistry.getConfigStore(); + } + + public UUID getId() + { + return _id; + } + + public ConnectionConfigType getConfigType() + { + return ConnectionConfigType.getInstance(); + } + + public boolean isDurable() + { + return false; + } + + public synchronized void received(ByteBuffer msg) + { + _readBytes += msg.remaining(); + switch(_state) + { + case A: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + break; + } + case M: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.M; + break; + } + + case Q: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.Q; + break; + } + case P: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.P; + break; + } + case PROTOCOL: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.PROTOCOL; + break; + } + case MAJOR: + if(msg.hasRemaining()) + { + _major = msg.get(); + } + else + { + _state = State.MAJOR; + break; + } + case MINOR: + if(msg.hasRemaining()) + { + _minor = msg.get(); + } + else + { + _state = State.MINOR; + break; + } + case REVISION: + if(msg.hasRemaining()) + { + _revision = msg.get(); + + _state = State.FRAME; + } + else + { + _state = State.REVISION; + break; + } + case FRAME: + _frameHandler.parse(msg); + } + + } + + public void exception(Throwable t) + { + t.printStackTrace(); + } + + public void closed() + { + // todo + + } + + public long getCreateTime() + { + return _createTime; + } + + + public boolean canSend() + { + return true; + } + + public void send(AMQFrame frame) + { + send(frame, null); + } + public void send(AMQFrame frame, ByteBuffer buffer) + { + + synchronized(_sendLock) + { + + if(_buf.remaining() < MAX_FRAME_SIZE) + { + _buf = ByteBuffer.allocate(1024*1024); + } + + _frameWriter.setValue(frame); + + ByteBuffer dup = _buf.slice(); + + _frameWriter.writeToBuffer(dup); + + _buf.position(_buf.position()+dup.position()); + + dup.flip(); + _writtenBytes += dup.limit(); + _networkDriver.send(dup); + + } + } + + public void close() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void setLogOutput(final PrintWriter out) + { + //TODO + } + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index eb957ee33c..f8f4929a73 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -153,6 +153,28 @@ private static final byte[] AMQP_0_9_1_HEADER = (byte) 10 }; + private static final byte[] AMQP_1_0_0_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 0, + (byte) 1, + (byte) 0, + (byte) 0 + }; + + private static final byte[] AMQP_SASL_1_0_0_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 3, + (byte) 1, + (byte) 0, + (byte) 0 + }; + private static interface DelegateCreator { VERSION getVersion(); @@ -246,8 +268,49 @@ private static final byte[] AMQP_0_9_1_HEADER = } }; + private DelegateCreator creator_1_0_0 = new DelegateCreator() + { + + public VERSION getVersion() + { + return VERSION.v1_0_0; + } + + + public byte[] getHeaderIdentifier() + { + return AMQP_1_0_0_HEADER; + } + + public ProtocolEngine getProtocolEngine() + { + return new ProtocolEngine_1_0_0(_networkDriver, _appRegistry); + } + }; + + private DelegateCreator creator_1_0_0_SASL = new DelegateCreator() + { + + public VERSION getVersion() + { + return VERSION.v1_0_0; + } + + + public byte[] getHeaderIdentifier() + { + return AMQP_SASL_1_0_0_HEADER; + } + + public ProtocolEngine getProtocolEngine() + { + return new ProtocolEngine_1_0_0_SASL(_networkDriver, _appRegistry); + } + }; + + private final DelegateCreator[] _creators = - new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 }; + new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10, creator_1_0_0_SASL, creator_1_0_0}; private class ClosedDelegateProtocolEngine implements ProtocolEngine diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java new file mode 100755 index 0000000000..aa84552210 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java @@ -0,0 +1,390 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.amqp_1_0.codec.FrameWriter; +import org.apache.qpid.amqp_1_0.framing.AMQFrame; +import org.apache.qpid.amqp_1_0.framing.FrameHandler; +import org.apache.qpid.amqp_1_0.framing.OversizeFrameException; +import org.apache.qpid.amqp_1_0.transport.*; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.FrameBody; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.configuration.*; +import org.apache.qpid.server.protocol.v1_0.Connection_1_0; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.transport.NetworkDriver; + +import java.io.PrintWriter; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.UUID; +import java.util.logging.*; + +public class ProtocolEngine_1_0_0 implements ProtocolEngine, FrameOutputHandler +{ + public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + + private NetworkDriver _networkDriver; + private long _readBytes; + private long _writtenBytes; + private final UUID _id; + private final IApplicationRegistry _appRegistry; + private long _createTime = System.currentTimeMillis(); + private ConnectionEndpoint _conn; + + private static final int BUF_SIZE = 8; + private static final ByteBuffer HEADER = + ByteBuffer.wrap(new byte[] + { + (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte) 0, + (byte) 1, + (byte) 0, + (byte) 0 + }); + + private FrameWriter _frameWriter; + private FrameHandler _frameHandler; + private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024); + private Object _sendLock = new Object(); + private byte _major; + private byte _minor; + private byte _revision; + + + static enum State { + A, + M, + Q, + P, + PROTOCOL, + MAJOR, + MINOR, + REVISION, + FRAME + } + + private State _state = State.A; + + public ProtocolEngine_1_0_0(NetworkDriver networkDriver, + final IApplicationRegistry appRegistry) + { + + + _networkDriver = networkDriver; + _id = appRegistry.getConfigStore().createId(); + _appRegistry = appRegistry; + + + // FIXME Two log messages to maintain compatinbility with earlier protocol versions +// _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); +// _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); + } + + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + Container container = new Container(); + + Principal principal = new Principal() + { + + public String getName() + { + // TODO + return "rob"; + } + }; + _conn = new ConnectionEndpoint(container,_appRegistry.getAuthenticationManager()); + _conn.setConnectionEventListener(new Connection_1_0(_appRegistry)); + _conn.setFrameOutputHandler(this); + _conn.setRemoteAddress(driver.getRemoteAddress()); + + + _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); + _frameHandler = new FrameHandler(_conn); + + _networkDriver.send(HEADER.duplicate()); + + } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public long getReadBytes() + { + return _readBytes; + } + + public long getWrittenBytes() + { + return _writtenBytes; + } + + public void writerIdle() + { + //Todo + } + + public void readerIdle() + { + //Todo + } + + public String getAddress() + { + return getRemoteAddress().toString(); + } + + + public ConfigStore getConfigStore() + { + return _appRegistry.getConfigStore(); + } + + public UUID getId() + { + return _id; + } + + public ConnectionConfigType getConfigType() + { + return ConnectionConfigType.getInstance(); + } + + public boolean isDurable() + { + return false; + } + + public synchronized void received(ByteBuffer msg) + { + if(RAW_LOGGER.isLoggable(Level.FINE)) + { + ByteBuffer dup = msg.duplicate(); + byte[] data = new byte[dup.remaining()]; + dup.get(data); + Binary bin = new Binary(data); + RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString()); + } + _readBytes += msg.remaining(); + switch(_state) + { + case A: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + break; + } + case M: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.M; + break; + } + + case Q: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.Q; + break; + } + case P: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.P; + break; + } + case PROTOCOL: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.PROTOCOL; + break; + } + case MAJOR: + if(msg.hasRemaining()) + { + _major = msg.get(); + } + else + { + _state = State.MAJOR; + break; + } + case MINOR: + if(msg.hasRemaining()) + { + _minor = msg.get(); + } + else + { + _state = State.MINOR; + break; + } + case REVISION: + if(msg.hasRemaining()) + { + _revision = msg.get(); + + _state = State.FRAME; + } + else + { + _state = State.REVISION; + break; + } + case FRAME: + _frameHandler.parse(msg); + } + + } + + public void exception(Throwable t) + { + t.printStackTrace(); + } + + public void closed() + { + _conn.inputClosed(); + if(_conn != null && _conn.getConnectionEventListener() != null) + { + ((Connection_1_0)_conn.getConnectionEventListener()).closed(); + } + + } + + public long getCreateTime() + { + return _createTime; + } + + + public boolean canSend() + { + return true; + } + + public void send(final AMQFrame amqFrame) + { + send(amqFrame, null); + } + + private final Logger FRAME_LOGGER = Logger.getLogger("FRM"); + private final Logger RAW_LOGGER = Logger.getLogger("RAW"); + + + public void send(final AMQFrame amqFrame, ByteBuffer buf) + { + synchronized(_sendLock) + { + + if(FRAME_LOGGER.isLoggable(Level.FINE)) + { + FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); + } + + if(_buf.remaining() < _conn.getMaxFrameSize()) + { + _buf = ByteBuffer.allocate(Math.min(_conn.getMaxFrameSize(),1024*1024)); + } + _frameWriter.setValue(amqFrame); + + + + ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize()); + // int pos = _buf.position(); + + int size = _frameWriter.writeToBuffer(dup); + if(size > _conn.getMaxFrameSize()) + { +// _buf.position(pos); + throw new OversizeFrameException(amqFrame,size); + } + +// _buf.position(_buf.position()+dup.position()); + + dup.flip(); + _writtenBytes += dup.limit(); + + if(RAW_LOGGER.isLoggable(Level.FINE)) + { + ByteBuffer dup2 = dup.duplicate(); + byte[] data = new byte[dup2.remaining()]; + dup2.get(data); + Binary bin = new Binary(data); + RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString()); + } + + + _networkDriver.send(dup); + + } + } + + public void send(short channel, FrameBody body) + { + AMQFrame frame = AMQFrame.createAMQFrame(channel, body); + send(frame); + + } + + public void close() + { + System.out.print(true); + //To change body of implemented methods use File | Settings | File Templates. + } + + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java new file mode 100644 index 0000000000..d718058072 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java @@ -0,0 +1,426 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.amqp_1_0.codec.FrameWriter; +import org.apache.qpid.amqp_1_0.codec.ProtocolHandler; +import org.apache.qpid.amqp_1_0.framing.AMQFrame; +import org.apache.qpid.amqp_1_0.framing.FrameHandler; +import org.apache.qpid.amqp_1_0.framing.OversizeFrameException; +import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; +import org.apache.qpid.amqp_1_0.transport.Container; +import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.FrameBody; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.configuration.ConfigStore; +import org.apache.qpid.server.configuration.ConnectionConfigType; +import org.apache.qpid.server.protocol.v1_0.Connection_1_0; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.transport.NetworkDriver; + +import java.io.PrintWriter; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class ProtocolEngine_1_0_0_SASL implements ProtocolEngine, FrameOutputHandler +{ + private NetworkDriver _networkDriver; + private long _readBytes; + private long _writtenBytes; + private final UUID _id; + private final IApplicationRegistry _appRegistry; + private long _createTime = System.currentTimeMillis(); + private ConnectionEndpoint _conn; + + private static final int BUF_SIZE = 8; + private static final ByteBuffer HEADER = + ByteBuffer.wrap(new byte[] + { + (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte) 3, + (byte) 1, + (byte) 0, + (byte) 0 + }); + + private static final ByteBuffer PROTOCOL_HEADER = + ByteBuffer.wrap(new byte[] + { + (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte) 0, + (byte) 1, + (byte) 0, + (byte) 0 + }); + + + private FrameWriter _frameWriter; + private ProtocolHandler _frameHandler; + private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024); + private Object _sendLock = new Object(); + private byte _major; + private byte _minor; + private byte _revision; + private PrintWriter _out; + + + static enum State { + A, + M, + Q, + P, + PROTOCOL, + MAJOR, + MINOR, + REVISION, + FRAME + } + + private State _state = State.A; + + + public ProtocolEngine_1_0_0_SASL(final NetworkDriver networkDriver, final IApplicationRegistry appRegistry) + { + _networkDriver = networkDriver; + _id = appRegistry.getConfigStore().createId(); + _appRegistry = appRegistry; + } + + public void setNetworkDriver(final NetworkDriver driver) + { + _networkDriver = driver; + Container container = new Container(); + + Principal principal = new Principal() + { + + public String getName() + { + // TODO + return "rob"; + } + }; + _conn = new ConnectionEndpoint(container, ApplicationRegistry.getInstance().getAuthenticationManager()); + _conn.setConnectionEventListener(new Connection_1_0(_appRegistry)); + _conn.setRemoteAddress(getRemoteAddress()); + + + _conn.setFrameOutputHandler(this); + _conn.setSaslFrameOutput(this); + + _conn.setOnSaslComplete(new Runnable() + { + + + public void run() + { + if(_conn.isAuthenticated()) + { + _networkDriver.send(PROTOCOL_HEADER.duplicate()); + } + else + { + _networkDriver.close(); + } + } + }); + _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); + _frameHandler = new SASLFrameHandler(_conn); + + _networkDriver.send(HEADER.duplicate()); + + _conn.initiateSASL(); + + } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public long getReadBytes() + { + return _readBytes; + } + + public long getWrittenBytes() + { + return _writtenBytes; + } + + public void writerIdle() + { + //Todo + } + + public void readerIdle() + { + //Todo + } + + public String getAddress() + { + return getRemoteAddress().toString(); + } + + + public ConfigStore getConfigStore() + { + return _appRegistry.getConfigStore(); + } + + public UUID getId() + { + return _id; + } + + public ConnectionConfigType getConfigType() + { + return ConnectionConfigType.getInstance(); + } + + public boolean isDurable() + { + return false; + } + + private final Logger RAW_LOGGER = Logger.getLogger("RAW"); + + + public synchronized void received(ByteBuffer msg) + { + if(RAW_LOGGER.isLoggable(Level.FINE)) + { + ByteBuffer dup = msg.duplicate(); + byte[] data = new byte[dup.remaining()]; + dup.get(data); + Binary bin = new Binary(data); + RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString()); + } + _readBytes += msg.remaining(); + switch(_state) + { + case A: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + break; + } + case M: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.M; + break; + } + + case Q: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.Q; + break; + } + case P: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.P; + break; + } + case PROTOCOL: + if(msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.PROTOCOL; + break; + } + case MAJOR: + if(msg.hasRemaining()) + { + _major = msg.get(); + } + else + { + _state = State.MAJOR; + break; + } + case MINOR: + if(msg.hasRemaining()) + { + _minor = msg.get(); + } + else + { + _state = State.MINOR; + break; + } + case REVISION: + if(msg.hasRemaining()) + { + _revision = msg.get(); + + _state = State.FRAME; + } + else + { + _state = State.REVISION; + break; + } + case FRAME: + _frameHandler = _frameHandler.parse(msg); + } + + } + + public void exception(Throwable t) + { + t.printStackTrace(); + } + + public void closed() + { + // todo + _conn.inputClosed(); + if(_conn != null && _conn.getConnectionEventListener() != null) + { + ((Connection_1_0)_conn.getConnectionEventListener()).closed(); + } + + } + + public long getCreateTime() + { + return _createTime; + } + + + public boolean canSend() + { + return true; + } + + public void send(final AMQFrame amqFrame) + { + send(amqFrame, null); + } + + private static final Logger FRAME_LOGGER = Logger.getLogger("FRM"); + + + public void send(final AMQFrame amqFrame, ByteBuffer buf) + { + + synchronized(_sendLock) + { + if(FRAME_LOGGER.isLoggable(Level.FINE)) + { + FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); + } + if(_buf.remaining() < _conn.getMaxFrameSize()) + { + _buf = ByteBuffer.allocate(Math.min(_conn.getMaxFrameSize(),1024*1024)); + } + _frameWriter.setValue(amqFrame); + + ByteBuffer dup = _buf.slice(); + int pos = _buf.position(); + + int size = _frameWriter.writeToBuffer(dup); + if(size > _conn.getMaxFrameSize()) + { + _buf.position(pos); + throw new OversizeFrameException(amqFrame,size); + } + + _buf.position(_buf.position()+dup.position()); + + dup.flip(); + _writtenBytes += dup.limit(); + + if(RAW_LOGGER.isLoggable(Level.FINE)) + { + ByteBuffer dup2 = dup.duplicate(); + byte[] data = new byte[dup2.remaining()]; + dup2.get(data); + Binary bin = new Binary(data); + RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString()); + } + + _networkDriver.send(dup); + + } + } + + public void send(short channel, FrameBody body) + { + AMQFrame frame = AMQFrame.createAMQFrame(channel, body); + send(frame); + + } + + public void close() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void setLogOutput(final PrintWriter out) + { + _out = out; + } + + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java new file mode 100644 index 0000000000..318a240b27 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; +import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; + +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class Connection_1_0 implements ConnectionEventListener +{ + + private IApplicationRegistry _appRegistry; + private VirtualHost _vhost; + + + public static interface Task + { + public void doTask(Connection_1_0 connection); + } + + + private List<Task> _closeTasks = + Collections.synchronizedList(new ArrayList<Task>()); + + + + public Connection_1_0(IApplicationRegistry appRegistry) + { + _appRegistry = appRegistry; + _vhost = _appRegistry.getVirtualHostRegistry().getDefaultVirtualHost(); + } + + public void remoteSessionCreation(SessionEndpoint endpoint) + { + Session_1_0 session = new Session_1_0(_vhost, _appRegistry, this); + endpoint.setSessionEventListener(session); + } + + + void removeConnectionCloseTask(final Task task) + { + _closeTasks.remove( task ); + } + + void addConnectionCloseTask(final Task task) + { + _closeTasks.add( task ); + } + + public void closeReceived() + { + List<Task> taskCopy; + synchronized (_closeTasks) + { + taskCopy = new ArrayList<Task>(_closeTasks); + } + for(Task task : taskCopy) + { + task.doTask(this); + } + synchronized (_closeTasks) + { + _closeTasks.clear(); + } + + } + + public void closed() + { + closeReceived(); + } + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java new file mode 100644 index 0000000000..d45758391c --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Destination.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + + +public interface Destination +{ + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java new file mode 100644 index 0000000000..c518ee5cc3 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.messaging.Accepted; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.txn.ServerTransaction; + +import java.util.ArrayList; +import java.util.Arrays; + +public class ExchangeDestination implements ReceivingDestination +{ + private static final Accepted ACCEPTED = new Accepted(); + private static final Outcome[] OUTCOMES = { ACCEPTED }; + + private Exchange _exchange; + + public ExchangeDestination(Exchange exchange) + { + _exchange = exchange; + } + + public Outcome[] getOutcomes() + { + return OUTCOMES; + } + + public Outcome send(final Message_1_0 message, ServerTransaction txn) + { + final ArrayList<? extends BaseQueue> queues = _exchange.route(message); + + txn.enqueue(queues,message, new ServerTransaction.Action() + { + + BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]); + + public void postCommit() + { + for(int i = 0; i < _queues.length; i++) + { + try + { + _queues[i].enqueue(message); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + } + } + + public void onRollback() + { + // NO-OP + } + }); + + return ACCEPTED; + } + + public int getCredit() + { + // TODO - fix + return 20000; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java new file mode 100644 index 0000000000..42eea05d37 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java @@ -0,0 +1,59 @@ +package org.apache.qpid.server.protocol.v1_0; + +import java.util.HashMap; +import java.util.Map; + +public class LinkRegistry +{ + private final Map<String, SendingLink_1_0> _sendingLinks = new HashMap<String, SendingLink_1_0>(); + private final Map<String, ReceivingLink_1_0> _receivingLinks = new HashMap<String, ReceivingLink_1_0>(); + + public synchronized SendingLink_1_0 getDurableSendingLink(String name) + { + return _sendingLinks.get(name); + } + + public synchronized boolean registerSendingLink(String name, SendingLink_1_0 link) + { + if(_sendingLinks.containsKey(name)) + { + return false; + } + else + { + _sendingLinks.put(name, link); + return true; + } + } + + public synchronized boolean unregisterSendingLink(String name) + { + if(!_sendingLinks.containsKey(name)) + { + return false; + } + else + { + _sendingLinks.remove(name); + return true; + } + } + + public synchronized ReceivingLink_1_0 getDurableReceivingLink(String name) + { + return _receivingLinks.get(name); + } + + public synchronized boolean registerReceivingLink(String name, ReceivingLink_1_0 link) + { + if(_receivingLinks.containsKey(name)) + { + return false; + } + else + { + _receivingLinks.put(name, link); + return true; + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java new file mode 100644 index 0000000000..db81d3b205 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +public interface Link_1_0 +{ + void start(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java new file mode 100644 index 0000000000..d1d6effc2f --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -0,0 +1,145 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + + +import org.apache.qpid.server.configuration.SessionConfig; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.MessageMetaData_1_0; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.StoredMessage; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; + +public class Message_1_0 implements ServerMessage<Message_1_0>, InboundMessage +{ + private final StoredMessage<MessageMetaData_1_0> _storedMessage; + private List<ByteBuffer> _fragments; + + + public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage, final List<ByteBuffer> fragments) + { + _storedMessage = storedMessage; + _fragments = fragments; + } + + public String getRoutingKey() + { + Object routingKey = getMessageHeader().getHeader("routing-key"); + if(routingKey != null) + { + return routingKey.toString(); + } + else + { + return null; + } + } + + private MessageMetaData_1_0 getMessageMetaData() + { + return _storedMessage.getMetaData(); + } + + public AMQMessageHeader getMessageHeader() + { + return getMessageMetaData().getMessageHeader(); + } + + public boolean isPersistent() + { + return getMessageMetaData().isPersistent(); + } + + public boolean isRedelivered() + { + // TODO + return false; + } + + public long getSize() + { + // TODO + return 0l; + } + + public boolean isImmediate() + { + return false; + } + + public long getExpiration() + { + return getMessageHeader().getExpiration(); + } + + public MessageReference<Message_1_0> newReference() + { + return new Reference(this); + } + + public Long getMessageNumber() + { + return _storedMessage.getMessageNumber(); + } + + public long getArrivalTime() + { + return 0; //TODO + } + + public int getContent(final ByteBuffer buf, final int offset) + { + return _storedMessage.getContent(offset, buf); + } + + public SessionConfig getSessionConfig() + { + return null; //TODO + } + + public List<ByteBuffer> getFragments() + { + return _fragments; + } + + public static class Reference extends MessageReference<Message_1_0> + { + public Reference(Message_1_0 message) + { + super(message); + } + + protected void onReference(Message_1_0 message) + { + + } + + protected void onRelease(Message_1_0 message) + { + + } +} +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java new file mode 100644 index 0000000000..7635b4299a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.messaging.Accepted; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQQueue; + +import org.apache.qpid.server.txn.ServerTransaction; + +import java.util.Arrays; + +public class QueueDestination implements SendingDestination, ReceivingDestination +{ + private static final Accepted ACCEPTED = new Accepted(); + private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED }; + + + private AMQQueue _queue; + + public QueueDestination(AMQQueue queue) + { + _queue = queue; + } + + public Outcome[] getOutcomes() + { + return OUTCOMES; + } + + public Outcome send(final Message_1_0 message, ServerTransaction txn) + { + + try + { + txn.enqueue(_queue,message, new ServerTransaction.Action() + { + + + public void postCommit() + { + try + { + + _queue.enqueue(message); + } + catch (Exception e) + { + // TODO + throw new RuntimeException(e); + } + + } + + public void onRollback() + { + // NO-OP + } + }); + } + catch(Exception e) + { + e.printStackTrace(); + throw new RuntimeException(e); + } + return ACCEPTED; + } + + public int getCredit() + { + // TODO - fix + return 100; + } + + public AMQQueue getQueue() + { + return _queue; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java new file mode 100644 index 0000000000..4ae0596e25 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.type.Outcome; + +import org.apache.qpid.server.txn.ServerTransaction; + +public interface ReceivingDestination extends Destination +{ + + Outcome[] getOutcomes(); + + Outcome send(Message_1_0 message, ServerTransaction txn); + + int getCredit(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java new file mode 100644 index 0000000000..6da5081185 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java @@ -0,0 +1,51 @@ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; +import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Source; +import org.apache.qpid.amqp_1_0.type.Target; + +public class ReceivingLinkAttachment +{ + private final Session_1_0 _session; + private final ReceivingLinkEndpoint _endpoint; + + public ReceivingLinkAttachment(final Session_1_0 session, final ReceivingLinkEndpoint endpoint) + { + _session = session; + _endpoint = endpoint; + } + + public Session_1_0 getSession() + { + return _session; + } + + public ReceivingLinkEndpoint getEndpoint() + { + return _endpoint; + } + + public Source getSource() + { + return getEndpoint().getSource(); + } + + public void setDeliveryStateHandler(final DeliveryStateHandler handler) + { + getEndpoint().setDeliveryStateHandler(handler); + } + + public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled) + { + getEndpoint().updateDisposition(deliveryTag, state, settled); + } + + public Target getTarget() + { + return getEndpoint().getTarget(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java new file mode 100644 index 0000000000..5f3e4602f8 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java @@ -0,0 +1,303 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.messaging.Target; +import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; +import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; +import org.apache.qpid.amqp_1_0.type.transport.Detach; +import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.Transfer; + +import org.apache.qpid.server.message.MessageMetaData_1_0; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.nio.ByteBuffer; +import java.util.*; + +public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, DeliveryStateHandler +{ + private VirtualHost _vhost; + + private ReceivingDestination _destination; + private SectionDecoderImpl _sectionDecoder; + private volatile ReceivingLinkAttachment _attachment; + + + private ArrayList<Transfer> _incompleteMessage; + private TerminusDurability _durability; + + private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>()); + private boolean _resumedMessage; + private Binary _messageDeliveryTag; + private ReceiverSettleMode _receivingSettlementMode; + + + public ReceivingLink_1_0(ReceivingLinkAttachment receivingLinkAttachment, VirtualHost vhost, + ReceivingDestination destination) + { + _vhost = vhost; + _destination = destination; + _attachment = receivingLinkAttachment; + receivingLinkAttachment.setDeliveryStateHandler(this); + + _durability = ((Target)receivingLinkAttachment.getTarget()).getDurable(); + + _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getDescribedTypeRegistry()); + + + } + + public void messageTransfer(Transfer xfr) + { + // TODO - cope with fragmented messages + + List<ByteBuffer> fragments = null; + + + + if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null) + { + _incompleteMessage = new ArrayList<Transfer>(); + _incompleteMessage.add(xfr); + _resumedMessage = Boolean.TRUE.equals(xfr.getResume()); + _messageDeliveryTag = xfr.getDeliveryTag(); + return; + } + else if(_incompleteMessage != null) + { + _incompleteMessage.add(xfr); + + if(Boolean.TRUE.equals(xfr.getMore())) + { + return; + } + + fragments = new ArrayList<ByteBuffer>(_incompleteMessage.size()); + for(Transfer t : _incompleteMessage) + { + fragments.add(t.getPayload()); + } + _incompleteMessage=null; + + } + else + { + _resumedMessage = Boolean.TRUE.equals(xfr.getResume()); + _messageDeliveryTag = xfr.getDeliveryTag(); + fragments = Collections.singletonList(xfr.getPayload()); + } + + if(_resumedMessage) + { + if(_unsettledMap.containsKey(_messageDeliveryTag)) + { + Outcome outcome = _unsettledMap.get(_messageDeliveryTag); + boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode()); + getEndpoint().updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled); + if(settled) + { + _unsettledMap.remove(_messageDeliveryTag); + } + } + else + { + System.err.println("UNEXPECTED!!"); + System.err.println("Delivery Tag: " + _messageDeliveryTag); + System.err.println("_unsettledMap: " + _unsettledMap); + + } + } + else + { + MessageMetaData_1_0 mmd = null; + List<ByteBuffer> immutableSections = new ArrayList<ByteBuffer>(3); + mmd = new MessageMetaData_1_0(fragments.toArray(new ByteBuffer[fragments.size()]), + _sectionDecoder, + immutableSections); + + StoredMessage<MessageMetaData_1_0> storedMessage = _vhost.getMessageStore().addMessage(mmd); + + boolean skipping = true; + int offset = 0; + + for(ByteBuffer bareMessageBuf : immutableSections) + { + storedMessage.addContent(offset, bareMessageBuf.duplicate()); + offset += bareMessageBuf.remaining(); + } + + storedMessage.flushToStore(); + + Message_1_0 message = new Message_1_0(storedMessage, fragments); + + + Binary transactionId = null; + org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState(); + if(xfrState != null) + { + if(xfrState instanceof TransactionalState) + { + transactionId = ((TransactionalState)xfrState).getTxnId(); + } + } + + ServerTransaction transaction = null; + if(transactionId != null) + { + transaction = getSession().getTransaction(transactionId); + } + else + { + Session_1_0 session = getSession(); + transaction = session != null ? session.getTransaction(null) : new AutoCommitTransaction(_vhost.getTransactionLog()); + } + + Outcome outcome = _destination.send(message, transaction); + + DeliveryState resultantState; + + if(transactionId == null) + { + resultantState = (DeliveryState) outcome; + } + else + { + TransactionalState transactionalState = new TransactionalState(); + transactionalState.setOutcome(outcome); + transactionalState.setTxnId(transactionId); + resultantState = transactionalState; + + } + + + boolean settled = transaction instanceof AutoCommitTransaction && ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode()); + + final Binary deliveryTag = xfr.getDeliveryTag(); + + if(!settled) + { + _unsettledMap.put(deliveryTag, outcome); + } + + getEndpoint().updateDisposition(deliveryTag, resultantState, settled); + + if(!(transaction instanceof AutoCommitTransaction)) + { + ServerTransaction.Action a; + transaction.addPostTransactionAction(new ServerTransaction.Action() + { + public void postCommit() + { + getEndpoint().updateDisposition(deliveryTag, null, true); + } + + public void onRollback() + { + getEndpoint().updateDisposition(deliveryTag, null, true); + } + }); + } + } + } + + private ReceiverSettleMode getReceivingSettlementMode() + { + return _receivingSettlementMode; + } + + public void remoteDetached(LinkEndpoint endpoint, Detach detach) + { + //TODO + // if not durable or close + if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) || + (detach != null && Boolean.TRUE.equals(detach.getClosed()))) + { + endpoint.detach(); + } + else if(detach == null || detach.getError() != null) + { + _attachment = null; + } + } + + public void start() + { + getEndpoint().setLinkCredit(UnsignedInteger.valueOf(_destination.getCredit())); + getEndpoint().setCreditWindow(); + } + + public ReceivingLinkEndpoint getEndpoint() + { + return _attachment.getEndpoint(); + } + + + public Session_1_0 getSession() + { + ReceivingLinkAttachment attachment = _attachment; + return attachment == null ? null : attachment.getSession(); + } + + public void handle(Binary deliveryTag, DeliveryState state, Boolean settled) + { + if(Boolean.TRUE.equals(settled)) + { + _unsettledMap.remove(deliveryTag); + } + } + + public void setLinkAttachment(ReceivingLinkAttachment linkAttachment) + { + _attachment = linkAttachment; + _receivingSettlementMode = linkAttachment.getEndpoint().getReceivingSettlementMode(); + ReceivingLinkEndpoint endpoint = linkAttachment.getEndpoint(); + Map initialUnsettledMap = endpoint.getInitialUnsettledMap(); + + Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap); + for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet()) + { + Binary deliveryTag = entry.getKey(); + if(!initialUnsettledMap.containsKey(deliveryTag)) + { + _unsettledMap.remove(deliveryTag); + } + } + + } + + public Map getUnsettledOutcomeMap() + { + return _unsettledMap; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java new file mode 100644 index 0000000000..6d601c9dda --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + + +public interface SendingDestination extends Destination +{ + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java new file mode 100644 index 0000000000..9d7af24135 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java @@ -0,0 +1,44 @@ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; +import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Source; + +public class SendingLinkAttachment +{ + private final Session_1_0 _session; + private final SendingLinkEndpoint _endpoint; + + public SendingLinkAttachment(final Session_1_0 session, final SendingLinkEndpoint endpoint) + { + _session = session; + _endpoint = endpoint; + } + + public Session_1_0 getSession() + { + return _session; + } + + public SendingLinkEndpoint getEndpoint() + { + return _endpoint; + } + + public Source getSource() + { + return getEndpoint().getSource(); + } + + public void setDeliveryStateHandler(final DeliveryStateHandler handler) + { + getEndpoint().setDeliveryStateHandler(handler); + } + + public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled) + { + getEndpoint().updateDisposition(deliveryTag, state, settled); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java new file mode 100644 index 0000000000..152ccc706b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -0,0 +1,367 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; + +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.transport.Detach; +import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler +{ + private VirtualHost _vhost; + private SendingDestination _destination; + + private Subscription_1_0 _subscription; + private boolean _draining; + private final Map<Binary, QueueEntry> _unsettledMap = + new HashMap<Binary, QueueEntry>(); + + private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap = + new ConcurrentHashMap<Binary, UnsettledAction>(); + private volatile SendingLinkAttachment _linkAttachment; + private TerminusDurability _durability; + private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>(); + private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>(); + private Runnable _closeAction; + + public SendingLink_1_0(final SendingLinkAttachment linkAttachment, + final VirtualHost vhost, + final SendingDestination destination) + { + _vhost = vhost; + _destination = destination; + _linkAttachment = linkAttachment; + _durability = ((Source)linkAttachment.getSource()).getDurable(); + linkAttachment.setDeliveryStateHandler(this); + + if(destination instanceof QueueDestination) + { + AMQQueue queue = ((QueueDestination) _destination).getQueue(); + if(queue.getArguments().containsKey("topic")) + { + ((Source)linkAttachment.getSource()).setDistributionMode(StdDistMode.COPY); + } + _subscription = new Subscription_1_0(this, (QueueDestination)destination); + try + { + + queue.registerSubscription(_subscription, false); + } + catch (AMQException e) + { + e.printStackTrace(); //TODO + } + } + + } + + public void resume(SendingLinkAttachment linkAttachment) + { + _linkAttachment = linkAttachment; + + } + + public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) + { + //TODO + // if not durable or close + if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) || + (detach != null && Boolean.TRUE.equals(detach.getClosed()))) + { + + try + { + ((QueueDestination)_destination).getQueue().unregisterSubscription(_subscription); + } + catch (AMQException e) + { + e.printStackTrace(); //TODO + } + + DeliveryState state = new Released(); + + for(UnsettledAction action : _unsettledActionMap.values()) + { + + action.process(state,Boolean.TRUE); + } + _unsettledActionMap.clear(); + + endpoint.detach(); + if(_closeAction != null) + { + _closeAction.run(); + } + } + else if(detach == null || detach.getError() != null) + { + _linkAttachment = null; + _subscription.flowStateChanged(); + } + } + + public void start() + { + //TODO + } + + public SendingLinkEndpoint getEndpoint() + { + return _linkAttachment == null ? null : _linkAttachment.getEndpoint() ; + } + + public void flowStateChanged() + { + if(Boolean.TRUE.equals(getEndpoint().getDrain()) + && getEndpoint().getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0) + { + _draining = true; + } + + while(!_resumeAcceptedTransfers.isEmpty() && getEndpoint().hasCreditToSend()) + { + Accepted accepted = new Accepted(); + synchronized(getLock()) + { + + Transfer xfr = new Transfer(); + Binary dt = _resumeAcceptedTransfers.remove(0); + xfr.setDeliveryTag(dt); + xfr.setState(accepted); + xfr.setResume(Boolean.TRUE); + getEndpoint().transfer(xfr); + } + + } + if(_resumeAcceptedTransfers.isEmpty()) + { + _subscription.flowStateChanged(); + } + + } + + public boolean isDraining() + { + return false; //TODO + } + + public boolean drained() + { + if(getEndpoint() != null) + { + synchronized(getEndpoint().getLock()) + { + if(_draining) + { + //TODO + getEndpoint().drained(); + _draining = false; + return true; + } + else + { + return false; + } + } + } + else + { + return false; + } + } + + public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry) + { + _unsettledActionMap.put(tag,unsettledAction); + if(getTransactionId() == null) + { + _unsettledMap.put(tag, queueEntry); + } + } + + public void removeUnsettled(Binary tag) + { + _unsettledActionMap.remove(tag); + } + + public void handle(Binary deliveryTag, DeliveryState state, Boolean settled) + { + UnsettledAction action = _unsettledActionMap.get(deliveryTag); + boolean localSettle = false; + if(action != null) + { + localSettle = action.process(state, settled); + if(localSettle && !Boolean.TRUE.equals(settled)) + { + _linkAttachment.updateDisposition(deliveryTag, state, true); + } + } + if(Boolean.TRUE.equals(settled) || localSettle) + { + _unsettledActionMap.remove(deliveryTag); + _unsettledMap.remove(deliveryTag); + } + } + + ServerTransaction getTransaction(Binary transactionId) + { + return _linkAttachment.getSession().getTransaction(transactionId); + } + + public Binary getTransactionId() + { + return getEndpoint().getTransactionId(); + } + + public synchronized Object getLock() + { + return _linkAttachment == null ? this : getEndpoint().getLock(); + } + + public boolean isDetached() + { + return _linkAttachment == null || getEndpoint().isDetached(); + } + + public boolean isAttached() + { + return _linkAttachment != null && getEndpoint().isAttached(); + } + + public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment) + { + + if(_subscription.isActive()) + { + _subscription.suspend(); + } + + _linkAttachment = linkAttachment; + + SendingLinkEndpoint endpoint = linkAttachment.getEndpoint(); + endpoint.setDeliveryStateHandler(this); + Map initialUnsettledMap = endpoint.getInitialUnsettledMap(); + Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap); + _resumeAcceptedTransfers.clear(); + _resumeFullTransfers.clear(); + for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet()) + { + Binary deliveryTag = entry.getKey(); + final QueueEntry queueEntry = entry.getValue(); + if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag)) + { + queueEntry.setRedelivered(); + queueEntry.release(); + _unsettledMap.remove(deliveryTag); + } + else if(initialUnsettledMap != null && (initialUnsettledMap.get(deliveryTag) instanceof Outcome)) + { + Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag); + + if(outcome instanceof Accepted) + { + AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getTransactionLog()); + if(_subscription.acquires()) + { + txn.dequeue(Collections.singleton(queueEntry), + new ServerTransaction.Action() + { + public void postCommit() + { + queueEntry.discard(); + } + + public void onRollback() + { + //To change body of implemented methods use File | Settings | File Templates. + } + }); + } + } + else if(outcome instanceof Released) + { + AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getTransactionLog()); + if(_subscription.acquires()) + { + txn.dequeue(Collections.singleton(queueEntry), + new ServerTransaction.Action() + { + public void postCommit() + { + queueEntry.release(); + } + + public void onRollback() + { + //To change body of implemented methods use File | Settings | File Templates. + } + }); + } + } + //_unsettledMap.remove(deliveryTag); + initialUnsettledMap.remove(deliveryTag); + _resumeAcceptedTransfers.add(deliveryTag); + } + else + { + _resumeFullTransfers.add(queueEntry); + // exists in receivers map, but not yet got an outcome ... should resend with resume = true + } + // TODO - else + } + + } + + public Map getUnsettledOutcomeMap() + { + Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap); + + for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet()) + { + entry.setValue(null); + } + + return unsettled; + } + + public void setCloseAction(Runnable action) + { + _closeAction = action; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java new file mode 100644 index 0000000000..67b3d25d5e --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -0,0 +1,406 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SessionEventListener; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.LifetimePolicy; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.transaction.Coordinator; +import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; +import org.apache.qpid.amqp_1_0.type.transport.*; + +import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.*; + +public class Session_1_0 implements SessionEventListener +{ + private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); + private IApplicationRegistry _appRegistry; + private VirtualHost _vhost; + private AutoCommitTransaction _transaction; + + private final LinkedHashMap<Integer, ServerTransaction> _openTransactions = + new LinkedHashMap<Integer, ServerTransaction>(); + private final Connection_1_0 _connection; + + + public Session_1_0(VirtualHost vhost, IApplicationRegistry appRegistry, final Connection_1_0 connection) + { + _appRegistry = appRegistry; + _vhost = vhost; + _transaction = new AutoCommitTransaction(vhost.getMessageStore()); + _connection = connection; + + } + + public void remoteLinkCreation(final LinkEndpoint endpoint) + { + + + Destination destination; + Link_1_0 link = null; + Error error = null; + + final LinkRegistry linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId()); + + + if(endpoint.getRole() == Role.SENDER) + { + + SendingLink_1_0 previousLink = linkRegistry.getDurableSendingLink(endpoint.getName()); + + if(previousLink == null) + { + + Target target = (Target) endpoint.getTarget(); + Source source = (Source) endpoint.getSource(); + + + if(source != null) + { + if(Boolean.TRUE.equals(source.getDynamic())) + { + AMQQueue tempQueue = createTemporaryQueue(source.getDynamicNodeProperties()); + source.setAddress(tempQueue.getName()); + } + String addr = source.getAddress(); + AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr); + if(queue != null) + { + + destination = new QueueDestination(queue); + } + else + { + endpoint.setSource(null); + destination = null; + } + + } + else + { + destination = null; + } + + if(destination != null) + { + final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; + final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint), + _vhost, + (SendingDestination) destination + ); + sendingLinkEndpoint.setLinkEventListener(sendingLink); + link = sendingLink; + if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) + { + linkRegistry.registerSendingLink(endpoint.getName(), sendingLink); + sendingLink.setCloseAction(new Runnable() { + + public void run() + { + linkRegistry.unregisterSendingLink(endpoint.getName()); + } + }); + } + } + } + else + { + SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; + previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint)); + sendingLinkEndpoint.setLinkEventListener(previousLink); + link = previousLink; + endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap()); + } + } + else + { + if(endpoint.getTarget() instanceof Coordinator) + { + Coordinator coordinator = (Coordinator) endpoint.getTarget(); + TxnCapability[] capabilities = coordinator.getCapabilities(); + boolean localTxn = false; + boolean multiplePerSession = false; + if(capabilities != null) + { + for(TxnCapability capability : capabilities) + { + if(capability.equals(TxnCapability.LOCAL_TXN)) + { + localTxn = true; + } + else if(capability.equals(TxnCapability.MULTI_TXNS_PER_SSN)) + { + multiplePerSession = true; + } + else + { + error = new Error(); + error.setCondition(AmqpError.NOT_IMPLEMENTED); + error.setDescription("Unsupported capability: " + capability); + break; + } + } + } + + /* if(!localTxn) + { + capabilities.add(TxnCapabilities.LOCAL_TXN); + }*/ + + final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; + final TxnCoordinatorLink_1_0 coordinatorLink = + new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions); + receivingLinkEndpoint.setLinkEventListener(coordinatorLink); + link = coordinatorLink; + + + } + else + { + + ReceivingLink_1_0 previousLink = linkRegistry.getDurableReceivingLink(endpoint.getName()); + + if(previousLink == null) + { + + Target target = (Target) endpoint.getTarget(); + + if(target != null) + { + if(Boolean.TRUE.equals(target.getDynamic())) + { + + AMQQueue tempQueue = createTemporaryQueue(target.getDynamicNodeProperties()); + target.setAddress(tempQueue.getName()); + } + + String addr = target.getAddress(); + Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr); + if(exchg != null) + { + destination = new ExchangeDestination(exchg); + } + else + { + AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr); + if(queue != null) + { + + destination = new QueueDestination(queue); + } + else + { + endpoint.setTarget(null); + destination = null; + } + + } + + + } + else + { + destination = null; + } + if(destination != null) + { + final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; + final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost, + (ReceivingDestination) destination); + receivingLinkEndpoint.setLinkEventListener(receivingLink); + link = receivingLink; + if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())) + { + linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink); + } + } + } + else + { + ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; + previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint)); + receivingLinkEndpoint.setLinkEventListener(previousLink); + link = previousLink; + endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap()); + + } + } + } + + endpoint.attach(); + + if(link == null) + { + endpoint.detach(error); + } + else + { + link.start(); + } + } + + + private AMQQueue createTemporaryQueue(Map properties) + { + final String queueName = UUID.randomUUID().toString(); + AMQQueue queue = null; + try + { + LifetimePolicy lifetimePolicy = properties == null + ? null + : (LifetimePolicy) properties.get(LIFETIME_POLICY); + + final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl(queueName, + false, // durable + null, // owner + false, // autodelete + false, // exclusive + _vhost, + properties); + + + + if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose) + { + final Connection_1_0.Task deleteQueueTask = + new Connection_1_0.Task() + { + public void doTask(Connection_1_0 session) + { + if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue) + { + try + { + tempQueue.delete(); + } + catch (AMQException e) + { + e.printStackTrace(); //TODO. + } + } + } + }; + + _connection.addConnectionCloseTask(deleteQueueTask); + + queue.addQueueDeleteTask(new AMQQueue.Task() + { + public void doTask(AMQQueue queue) + { + _connection.removeConnectionCloseTask(deleteQueueTask); + } + + + }); + } + else if(lifetimePolicy instanceof DeleteOnNoLinks) + { + + } + else if(lifetimePolicy instanceof DeleteOnNoMessages) + { + + } + else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages) + { + + } + } + catch (AMQSecurityException e) + { + e.printStackTrace(); //TODO. + } + + return queue; + } + + public ServerTransaction getTransaction(Binary transactionId) + { + // TODO should treat invalid id differently to null + ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId)); + return transaction == null ? _transaction : transaction; + } + + public void remoteEnd(End end) + { + Iterator<Map.Entry<Integer, ServerTransaction>> iter = _openTransactions.entrySet().iterator(); + while(iter.hasNext()) + { + Map.Entry<Integer, ServerTransaction> entry = iter.next(); + entry.getValue().rollback(); + iter.remove(); + } + + } + + Integer binaryToInteger(final Binary txnId) + { + if(txnId == null) + { + return null; + } + + if(txnId.getLength() > 4) + throw new IllegalArgumentException(); + + int id = 0; + byte[] data = txnId.getArray(); + for(int i = 0; i < txnId.getLength(); i++) + { + id <<= 8; + id += data[i+txnId.getArrayOffset()]; + } + + return id; + + } + + Binary integerToBinary(final int txnId) + { + byte[] data = new byte[4]; + data[3] = (byte) (txnId & 0xff); + data[2] = (byte) ((txnId & 0xff00) >> 8); + data[1] = (byte) ((txnId & 0xff0000) >> 16); + data[0] = (byte) ((txnId & 0xff000000) >> 24); + return new Binary(data); + + } + + public void forceEnd() + { + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java new file mode 100644 index 0000000000..3797a09009 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -0,0 +1,487 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.messaging.Released; +import org.apache.qpid.amqp_1_0.type.messaging.Source; +import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; +import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; +import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.Transfer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.txn.ServerTransaction; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +class Subscription_1_0 implements Subscription +{ + private SendingLink_1_0 _link; + + private AMQQueue _queue; + + private final AtomicReference<State> _state = new AtomicReference<State>(State.SUSPENDED); + + private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); + private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this); + private final long _id; + private final boolean _acquires; + private AMQQueue.Context _queueContext; + private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); + private ReentrantLock _stateChangeLock = new ReentrantLock(); + + private long _deliveryTag = 0L; + private StateListener _stateListener; + + private Binary _transactionId; + + public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination) + { + _link = link; + _queue = destination.getQueue(); + _id = getEndpoint().getLocalHandle().longValue(); + _acquires = !StdDistMode.COPY.equals(((Source) (getEndpoint().getSource())).getDistributionMode()); + } + + private SendingLinkEndpoint getEndpoint() + { + return _link.getEndpoint(); + } + + public LogActor getLogActor() + { + return null; //TODO + } + + public boolean isTransient() + { + return true; //TODO + } + + public AMQQueue getQueue() + { + return _queue; + } + + public QueueEntry.SubscriptionAcquiredState getOwningState() + { + return _owningState; + } + + public QueueEntry.SubscriptionAssignedState getAssignedState() + { + return _assignedState; + } + + public void setQueue(final AMQQueue queue, final boolean exclusive) + { + //TODO + } + + public void setNoLocal(final boolean noLocal) + { + //TODO + } + + public long getSubscriptionID() + { + return _id; + } + + public boolean isSuspended() + { + final boolean isSuspended = !isActive();// || !getEndpoint().hasCreditToSend(); + return isSuspended; + } + + public boolean hasInterest(final QueueEntry msg) + { + return true; //TODO - filters + } + + public boolean isClosed() + { + return !getEndpoint().isAttached(); + } + + public boolean acquires() + { + return _acquires; + } + + public boolean seesRequeues() + { + // TODO + return acquires(); + } + + public void close() + { + getEndpoint().detach(); + } + + public void send(final QueueEntry queueEntry) throws AMQException + { + //TODO + ServerMessage serverMessage = queueEntry.getMessage(); + if(serverMessage instanceof Message_1_0) + { + Message_1_0 message = (Message_1_0) serverMessage; + Transfer transfer = new Transfer(); + //TODO + + + List<ByteBuffer> fragments = message.getFragments(); + ByteBuffer payload; + if(fragments.size() == 1) + { + payload = fragments.get(0); + } + else + { + int size = 0; + for(ByteBuffer fragment : fragments) + { + size += fragment.remaining(); + } + + payload = ByteBuffer.allocate(size); + + for(ByteBuffer fragment : fragments) + { + payload.put(fragment.duplicate()); + } + + payload.flip(); + } + + transfer.setPayload(payload); + byte[] data = new byte[8]; + ByteBuffer.wrap(data).putLong(_deliveryTag++); + final Binary tag = new Binary(data); + + transfer.setDeliveryTag(tag); + + synchronized(_link.getLock()) + { + if(_link.isAttached()) + { + if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode())) + { + transfer.setSettled(true); + } + else + { + UnsettledAction action = _acquires + ? new DispositionAction(tag, queueEntry) + : new DoNothingAction(tag, queueEntry); + + _link.addUnsettled(tag, action, queueEntry); + } + + if(_transactionId != null) + { + TransactionalState state = new TransactionalState(); + state.setTxnId(_transactionId); + transfer.setState(state); + } + // TODO - need to deal with failure here + if(_acquires && _transactionId != null) + { + ServerTransaction txn = _link.getTransaction(_transactionId); + if(txn != null) + { + txn.addPostTransactionAction(new ServerTransaction.Action(){ + + public void postCommit() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void onRollback() + { + if(queueEntry.isAcquiredBy(Subscription_1_0.this)) + { + queueEntry.release(); + _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); + + + } + } + }); + } + + } + + getEndpoint().transfer(transfer); + } + else + { + queueEntry.release(); + } + } + } + + } + + public void queueDeleted(final AMQQueue queue) + { + //TODO + getEndpoint().setSource(null); + getEndpoint().detach(); + } + + public boolean wouldSuspend(final QueueEntry msg) + { + final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend(); + if(!hasCredit && getState() == State.ACTIVE) + { + if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + { + _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + } + } + + return !hasCredit; + } + + public void suspend() + { + if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + { + _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + } + } + + public void getSendLock() + { + _stateChangeLock.lock(); + } + + public void releaseSendLock() + { + _stateChangeLock.unlock(); + } + + + public void onDequeue(final QueueEntry queueEntry) + { + //TODO + } + + public void restoreCredit(final QueueEntry queueEntry) + { + //TODO + } + + public void setStateListener(final StateListener listener) + { + _stateListener = listener; + } + + public State getState() + { + return _state.get(); + } + + public AMQQueue.Context getQueueContext() + { + return _queueContext; + } + + public void setQueueContext(AMQQueue.Context queueContext) + { + _queueContext = queueContext; + } + + + public boolean isActive() + { + return getState() == State.ACTIVE; + } + + public void set(String key, Object value) + { + _properties.put(key, value); + } + + public Object get(String key) + { + return _properties.get(key); + } + + public boolean isSessionTransactional() + { + return false; //TODO + } + + public void queueEmpty() + { + if(_link.drained()) + { + if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + { + _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + } + } + } + + public void flowStateChanged() + { + if(isSuspended()) + { + if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) + { + _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); + } + _transactionId = _link.getTransactionId(); + } + } + + private class DispositionAction implements UnsettledAction + { + + private final QueueEntry _queueEntry; + private final Binary _deliveryTag; + + public DispositionAction(Binary tag, QueueEntry queueEntry) + { + _deliveryTag = tag; + _queueEntry = queueEntry; + } + + public boolean process(DeliveryState state, Boolean settled) + { + + Binary transactionId = null; + final Outcome outcome; + // If disposition is settled this overrides the txn? + if(state instanceof TransactionalState) + { + transactionId = ((TransactionalState)state).getTxnId(); + outcome = ((TransactionalState)state).getOutcome(); + } + else if (state instanceof Outcome) + { + outcome = (Outcome) state; + } + else + { + outcome = null; + } + + + ServerTransaction txn = _link.getTransaction(transactionId); + + if(outcome instanceof Accepted) + { + txn.dequeue(_queueEntry.getQueue(), _queueEntry.getMessage(), + new ServerTransaction.Action() + { + + public void postCommit() + { + if(_queueEntry.isAcquiredBy(Subscription_1_0.this)) + { + _queueEntry.discard(); + } + } + + public void onRollback() + { + + } + }); + txn.addPostTransactionAction(new ServerTransaction.Action() + { + public void postCommit() + { + //_link.getEndpoint().settle(_deliveryTag); + _link.getEndpoint().updateDisposition(_deliveryTag, (DeliveryState)outcome, true); + _link.getEndpoint().sendFlowConditional(); + } + + public void onRollback() + { + } + }); + } + else if(outcome instanceof Released) + { + txn.addPostTransactionAction(new ServerTransaction.Action() + { + public void postCommit() + { + _queueEntry.release(); + _link.getEndpoint().settle(_deliveryTag); + } + + public void onRollback() + { + _link.getEndpoint().settle(_deliveryTag); + } + }); + } + + return (transactionId == null && outcome != null); + } + } + + private class DoNothingAction implements UnsettledAction + { + public DoNothingAction(final Binary tag, + final QueueEntry queueEntry) + { + } + + public boolean process(final DeliveryState state, final Boolean settled) + { + Binary transactionId = null; + Outcome outcome = null; + // If disposition is settled this overrides the txn? + if(state instanceof TransactionalState) + { + transactionId = ((TransactionalState)state).getTxnId(); + outcome = ((TransactionalState)state).getOutcome(); + } + else if (state instanceof Outcome) + { + outcome = (Outcome) state; + } + return true; + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java new file mode 100644 index 0000000000..0e700dc10b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java @@ -0,0 +1,195 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; +import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener; +import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.transaction.Declare; +import org.apache.qpid.amqp_1_0.type.transaction.Declared; +import org.apache.qpid.amqp_1_0.type.transaction.Discharge; +import org.apache.qpid.amqp_1_0.type.transport.*; +import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.nio.ByteBuffer; +import java.util.*; + +public class TxnCoordinatorLink_1_0 implements ReceivingLinkListener, Link_1_0 +{ + private VirtualHost _vhost; + private ReceivingLinkEndpoint _endpoint; + + private ArrayList<Transfer> _incompleteMessage; + private SectionDecoder _sectionDecoder; + private LinkedHashMap<Integer, ServerTransaction> _openTransactions; + private Session_1_0 _session; + + + public TxnCoordinatorLink_1_0(VirtualHost vhost, + Session_1_0 session_1_0, ReceivingLinkEndpoint endpoint, + LinkedHashMap<Integer, ServerTransaction> openTransactions) + { + _vhost = vhost; + _session = session_1_0; + _endpoint = endpoint; + _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry()); + _openTransactions = openTransactions; + } + + public void messageTransfer(Transfer xfr) + { + // TODO - cope with fragmented messages + + ByteBuffer payload = null; + + + if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null) + { + _incompleteMessage = new ArrayList<Transfer>(); + _incompleteMessage.add(xfr); + return; + } + else if(_incompleteMessage != null) + { + _incompleteMessage.add(xfr); + if(Boolean.TRUE.equals(xfr.getMore())) + { + return; + } + + int size = 0; + for(Transfer t : _incompleteMessage) + { + size += t.getPayload().limit(); + } + payload = ByteBuffer.allocate(size); + for(Transfer t : _incompleteMessage) + { + payload.put(t.getPayload().duplicate()); + } + payload.flip(); + _incompleteMessage=null; + + } + else + { + payload = xfr.getPayload(); + } + + + // Only interested int he amqp-value section that holds the message to the co-ordinator + try + { + List<Section> sections = _sectionDecoder.parseAll(payload); + + for(Section section : sections) + { + if(section instanceof AmqpValue) + { + Object command = ((AmqpValue) section).getValue(); + + if(command instanceof Declare) + { + Integer txnId = Integer.valueOf(0); + Iterator<Integer> existingTxn = _openTransactions.keySet().iterator(); + while(existingTxn.hasNext()) + { + txnId = existingTxn.next(); + } + txnId = Integer.valueOf(txnId.intValue() + 1); + + _openTransactions.put(txnId, new LocalTransaction(_vhost.getTransactionLog())); + + Declared state = new Declared(); + + + + state.setTxnId(_session.integerToBinary(txnId)); + _endpoint.updateDisposition(xfr.getDeliveryTag(), state, true); + + } + else if(command instanceof Discharge) + { + Discharge discharge = (Discharge) command; + + DeliveryState state = xfr.getState(); + discharge(_session.binaryToInteger(discharge.getTxnId()), discharge.getFail()); + _endpoint.updateDisposition(xfr.getDeliveryTag(), new Accepted(), true); + + } + } + } + + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + public void remoteDetached(LinkEndpoint endpoint, Detach detach) + { + //TODO + endpoint.detach(); + } + + private Error discharge(Integer transactionId, boolean fail) + { + Error error = null; + ServerTransaction txn = _openTransactions.get(transactionId); + if(txn != null) + { + if(fail) + { + txn.rollback(); + } + else + { + txn.commit(); + } + _openTransactions.remove(transactionId); + } + else + { + error = new Error(); + error.setCondition(AmqpError.NOT_FOUND); + error.setDescription("Unkown transactionId" + transactionId); + } + return error; + } + + + + public void start() + { + _endpoint.setLinkCredit(UnsignedInteger.ONE); + _endpoint.setCreditWindow(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java new file mode 100644 index 0000000000..c497cc5146 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java @@ -0,0 +1,8 @@ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.type.DeliveryState; + +public interface UnsettledAction +{ + boolean process(DeliveryState state, Boolean settled); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index bee55118ba..5fbad74978 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -245,6 +245,15 @@ public class AMQQueueFactory } } + if(config.isTopic()) + { + if(arguments == null) + { + arguments = new HashMap<String,Object>(); + } + arguments.put("topic", Boolean.TRUE); + } + AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments); q.configure(config); return q; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java index 76ebea0321..1612d13b1b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.security.auth.sasl.amqplain.AmqPlainInitialiser; +import org.apache.qpid.server.security.auth.sasl.anonymous.AnonymousInitialiser; import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5Initialiser; import org.apache.qpid.server.security.auth.sasl.plain.PlainInitialiser; @@ -78,6 +79,13 @@ public class PlainPasswordFilePrincipalDatabase implements PrincipalDatabase AmqPlainInitialiser amqplain = new AmqPlainInitialiser(); amqplain.initialise(this); + + + // Accept AMQPlain incomming and compare it to the file. + AnonymousInitialiser anonymous = new AnonymousInitialiser(); + anonymous.initialise(this); + + // Accept Plain incomming and compare it to the file. PlainInitialiser plain = new PlainInitialiser(); plain.initialise(this); @@ -89,6 +97,7 @@ public class PlainPasswordFilePrincipalDatabase implements PrincipalDatabase _saslServers.put(amqplain.getMechanismName(), amqplain); _saslServers.put(plain.getMechanismName(), plain); _saslServers.put(cram.getMechanismName(), cram); + _saslServers.put(anonymous.getMechanismName(), anonymous); } public void setPasswordFile(String passwordFile) throws IOException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java index 4a66b74783..e35e999766 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java @@ -20,18 +20,53 @@ */ package org.apache.qpid.server.security.auth.sasl.anonymous; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.sasl.SaslServerFactory; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; import org.apache.qpid.server.security.auth.sasl.UsernamePasswordInitialiser; -import org.apache.qpid.server.security.auth.sasl.amqplain.AmqPlainSaslServerFactory; -public class AnonymousInitialiser extends UsernamePasswordInitialiser +import java.io.IOException; +import java.util.Map; + +public class AnonymousInitialiser implements AuthenticationProviderInitialiser { public String getMechanismName() { return "ANONYMOUS"; } + public void initialise(String baseConfigPath, Configuration configuration, Map<String, PrincipalDatabase> principalDatabases) throws Exception + { + } + + public void initialise(PrincipalDatabase db) + { + } + + public CallbackHandler getCallbackHandler() + { + return new CallbackHandler() + { + + public Callback[] _callbacks; + + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException + { + _callbacks =callbacks; + } + }; + } + + public Map<String, ?> getProperties() + { + return null; + } + public Class<? extends SaslServerFactory> getServerFactoryClassForJCARegistration() { return AnonymousSaslServerFactory.class; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java index 428bb1e41b..0fab60b6f3 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java @@ -22,13 +22,16 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.message.MessageMetaData_0_10; +import org.apache.qpid.server.message.MessageMetaData_1_0; import java.nio.ByteBuffer; public enum MessageMetaDataType { META_DATA_0_8 { public Factory<MessageMetaData> getFactory() { return MessageMetaData.FACTORY; } }, - META_DATA_0_10 { public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } }; + META_DATA_0_10 { public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } }, + META_DATA_1_0 { public Factory<MessageMetaData_1_0> getFactory() { return MessageMetaData_1_0.FACTORY; } }; + public static interface Factory<M extends StorableMessageMetaData> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java index 12d2a6a6c7..2381301c30 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java @@ -30,7 +30,5 @@ public interface StorableMessageMetaData int writeToBuffer(int offsetInMetaData, ByteBuffer dest); - int getContentSize(); - boolean isPersistent(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index 1f5b027b80..fba4745523 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -26,15 +26,13 @@ import java.nio.ByteBuffer; public class StoredMemoryMessage implements StoredMessage { private final long _messageNumber; - private final ByteBuffer _content; + private ByteBuffer _content; private final StorableMessageMetaData _metaData; public StoredMemoryMessage(long messageNumber, StorableMessageMetaData metaData) { _messageNumber = messageNumber; _metaData = metaData; - _content = ByteBuffer.allocate(metaData.getContentSize()); - } public long getMessageNumber() @@ -45,6 +43,16 @@ public class StoredMemoryMessage implements StoredMessage public void addContent(int offsetInMessage, ByteBuffer src) { src = src.duplicate(); + if(_content == null || offsetInMessage + src.remaining() > _content.capacity()) + { + ByteBuffer newContent = ByteBuffer.allocate(offsetInMessage+src.remaining()); + if(_content != null) + { + newContent.duplicate().put(_content.array()); + } + _content = newContent; + } + ByteBuffer dst = _content.duplicate(); dst.position(offsetInMessage); dst.put(src); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 0a3576ff42..5c4e413989 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -55,16 +55,12 @@ public interface Subscription void setNoLocal(boolean noLocal); - AMQShortString getConsumerTag(); - long getSubscriptionID(); boolean isSuspended(); boolean hasInterest(QueueEntry msg); - boolean isAutoClose(); - boolean isClosed(); boolean acquires(); @@ -99,11 +95,11 @@ public interface Subscription boolean isActive(); - void confirmAutoClose(); - public void set(String key, Object value); public Object get(String key); boolean isSessionTransactional(); + + void queueEmpty() throws AMQException; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index d8f44c9f7f..08867f86ca 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -341,7 +341,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage { return getQueue().getConfigStore(); } - + public Long getDelivered() { return _deliveredCount.get(); @@ -777,9 +777,19 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage { return _channel.isTransactional(); } - + public long getCreateTime() { return _createTime; } + + public void queueEmpty() throws AMQException + { + if (isAutoClose()) + { + _queue.unregisterSubscription(this); + + confirmAutoClose(); + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index b36ac84cdd..7884565073 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -103,7 +103,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public void stateChange(Subscription sub, State oldState, State newState) { - CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); + CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); } }; private AMQQueue _queue; @@ -189,12 +189,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive, filterLogString.length() > 0)); } - - } - public AMQShortString getConsumerTag() - { - return new AMQShortString(_destination); } public boolean isSuspended() @@ -234,12 +229,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return (_filters == null) || _filters.allAllow(entry); } - public boolean isAutoClose() - { - // no such thing in 0-10 - return false; - } - public boolean isClosed() { return getState() == State.CLOSED; @@ -292,7 +281,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { return getQueue().getConfigStore(); } - + public Long getDelivered() { return _deliveredCount.get(); @@ -708,11 +697,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return getState() == State.ACTIVE; } - public void confirmAutoClose() - { - //No such thing in 0-10 - } - public void set(String key, Object value) { _properties.put(key, value); @@ -901,6 +885,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return _session.isTransactional(); } + public void queueEmpty() + { + } + public long getCreateTime() { return _createTime; @@ -908,7 +896,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public String toLogString() { - String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(), + String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(), _queue.getNameShortString()); String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "(" // queueString is "vh(/{0})/qu({1}) " so need to trim diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 4ed0507228..bf7cf831c1 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfig; import org.apache.qpid.server.configuration.ConfigStore; +import org.apache.qpid.server.protocol.v1_0.LinkRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.ExchangeFactory; @@ -99,4 +100,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo ConfigStore getConfigStore(); void removeBrokerConnection(BrokerLink brokerLink); + + LinkRegistry getLinkRegistry(String remoteContainerId); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java index 7dc491de4d..0f812c9c7a 100755 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java @@ -34,7 +34,7 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> { private long _messageId; private MessageMetaData _metaData; - private final ByteBuffer _content; + private ByteBuffer _content; public MockStoredMessage(long messageId) @@ -46,8 +46,6 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> { _messageId = messageId; _metaData = new MessageMetaData(info, chb, 0); - _content = ByteBuffer.allocate(_metaData.getContentSize()); - } public MessageMetaData getMetaData() @@ -63,6 +61,16 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> public void addContent(int offsetInMessage, ByteBuffer src) { src = src.duplicate(); + if(_content == null || offsetInMessage + src.remaining() > _content.capacity()) + { + ByteBuffer newContent = ByteBuffer.allocate(offsetInMessage+src.remaining()); + if(_content != null) + { + newContent.duplicate().put(_content.array()); + } + _content = newContent; + } + ByteBuffer dst = _content.duplicate(); dst.position(offsetInMessage); dst.put(src); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 1ec134e90e..1c318aa457 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -129,11 +129,6 @@ public class MockSubscription implements Subscription return true; } - public void confirmAutoClose() - { - - } - public void set(String key, Object value) { } @@ -143,11 +138,6 @@ public class MockSubscription implements Subscription return null; } - public boolean isAutoClose() - { - return false; - } - public boolean isBrowser() { return false; @@ -214,7 +204,7 @@ public class MockSubscription implements Subscription } public void setNoLocal(boolean noLocal) - { + { } public void setStateListener(StateListener listener) @@ -241,4 +231,9 @@ public class MockSubscription implements Subscription { return false; } + + public void queueEmpty() throws AMQException + { + //TODO + } } |