summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java561
1 files changed, 462 insertions, 99 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index 8a3d3716c7..8bde913149 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.io.EOFException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,25 +38,46 @@ import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.DeliveryState;
import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
import org.apache.qpid.amqp_1_0.type.messaging.Modified;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData_1_0;
+import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.codec.BBDecoder;
class Subscription_1_0 implements Subscription
{
@@ -201,150 +225,489 @@ class Subscription_1_0 implements Subscription
public void send(final QueueEntry queueEntry) throws AMQException
{
- //TODO
ServerMessage serverMessage = queueEntry.getMessage();
+ Message_1_0 message;
if(serverMessage instanceof Message_1_0)
{
- Message_1_0 message = (Message_1_0) serverMessage;
- Transfer transfer = new Transfer();
- //TODO
+ message = (Message_1_0) serverMessage;
+ }
+ else
+ {
+ if(serverMessage instanceof AMQMessage)
+ {
+ message = new Message_1_0(convert08Message((AMQMessage)serverMessage));
+ }
+ else if(serverMessage instanceof MessageTransferMessage)
+ {
+ message = new Message_1_0(convert010Message((MessageTransferMessage)serverMessage));
+ }
+ else
+ {
+ return;
+ }
+ }
+
+ Transfer transfer = new Transfer();
+ //TODO
- List<ByteBuffer> fragments = message.getFragments();
- ByteBuffer payload;
- if(fragments.size() == 1)
+ List<ByteBuffer> fragments = message.getFragments();
+ ByteBuffer payload;
+ if(fragments.size() == 1)
+ {
+ payload = fragments.get(0);
+ }
+ else
+ {
+ int size = 0;
+ for(ByteBuffer fragment : fragments)
{
- payload = fragments.get(0);
+ size += fragment.remaining();
}
- else
+
+ payload = ByteBuffer.allocate(size);
+
+ for(ByteBuffer fragment : fragments)
{
- int size = 0;
- for(ByteBuffer fragment : fragments)
- {
- size += fragment.remaining();
- }
+ payload.put(fragment.duplicate());
+ }
+
+ payload.flip();
+ }
- payload = ByteBuffer.allocate(size);
+ if(queueEntry.getDeliveryCount() != 0)
+ {
+ payload = payload.duplicate();
+ ValueHandler valueHandler = new ValueHandler(_typeRegistry);
- for(ByteBuffer fragment : fragments)
+ Header oldHeader = null;
+ try
+ {
+ ByteBuffer encodedBuf = payload.duplicate();
+ Object value = valueHandler.parse(payload);
+ if(value instanceof Header)
{
- payload.put(fragment.duplicate());
+ oldHeader = (Header) value;
+ }
+ else
+ {
+ payload.position(0);
}
-
- payload.flip();
+ }
+ catch (AmqpErrorException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
}
- if(queueEntry.getDeliveryCount() != 0)
+ Header header = new Header();
+ if(oldHeader != null)
{
- payload = payload.duplicate();
- ValueHandler valueHandler = new ValueHandler(_typeRegistry);
+ header.setDurable(oldHeader.getDurable());
+ header.setPriority(oldHeader.getPriority());
+ header.setTtl(oldHeader.getTtl());
+ }
+ header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount()));
+ _sectionEncoder.reset();
+ _sectionEncoder.encodeObject(header);
+ Binary encodedHeader = _sectionEncoder.getEncoding();
+
+ ByteBuffer oldPayload = payload;
+ payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength());
+ payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
+ payload.put(oldPayload);
+ payload.flip();
+ }
- Header oldHeader = null;
- try
+ transfer.setPayload(payload);
+ byte[] data = new byte[8];
+ ByteBuffer.wrap(data).putLong(_deliveryTag++);
+ final Binary tag = new Binary(data);
+
+ transfer.setDeliveryTag(tag);
+
+ synchronized(_link.getLock())
+ {
+ if(_link.isAttached())
+ {
+ if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
{
- ByteBuffer encodedBuf = payload.duplicate();
- Object value = valueHandler.parse(payload);
- if(value instanceof Header)
- {
- oldHeader = (Header) value;
- }
- else
- {
- payload.position(0);
- }
+ transfer.setSettled(true);
}
- catch (AmqpErrorException e)
+ else
{
- //TODO
- throw new RuntimeException(e);
+ UnsettledAction action = _acquires
+ ? new DispositionAction(tag, queueEntry)
+ : new DoNothingAction(tag, queueEntry);
+
+ _link.addUnsettled(tag, action, queueEntry);
}
- Header header = new Header();
- if(oldHeader != null)
+ if(_transactionId != null)
{
- header.setDurable(oldHeader.getDurable());
- header.setPriority(oldHeader.getPriority());
- header.setTtl(oldHeader.getTtl());
+ TransactionalState state = new TransactionalState();
+ state.setTxnId(_transactionId);
+ transfer.setState(state);
}
- header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount()));
- _sectionEncoder.reset();
- _sectionEncoder.encodeObject(header);
- Binary encodedHeader = _sectionEncoder.getEncoding();
+ // TODO - need to deal with failure here
+ if(_acquires && _transactionId != null)
+ {
+ ServerTransaction txn = _link.getTransaction(_transactionId);
+ if(txn != null)
+ {
+ txn.addPostTransactionAction(new ServerTransaction.Action(){
+
+ public void postCommit()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void onRollback()
+ {
+ if(queueEntry.isAcquiredBy(Subscription_1_0.this))
+ {
+ queueEntry.release();
+ _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
+
+
+ }
+ }
+ });
+ }
- ByteBuffer oldPayload = payload;
- payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength());
- payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
- payload.put(oldPayload);
- payload.flip();
+ }
+ getSession().getConnectionModel().registerMessageDelivered(message.getSize());
+ getEndpoint().transfer(transfer);
}
+ else
+ {
+ queueEntry.release();
+ }
+ }
- transfer.setPayload(payload);
- byte[] data = new byte[8];
- ByteBuffer.wrap(data).putLong(_deliveryTag++);
- final Binary tag = new Binary(data);
+ }
- transfer.setDeliveryTag(tag);
+ private StoredMessage<MessageMetaData_1_0> convert010Message(final MessageTransferMessage serverMessage)
+ {
+ final MessageMetaData_1_0 metaData = convertMetaData(serverMessage);
+
+ return convertServerMessage(metaData, serverMessage);
+
+ }
- synchronized(_link.getLock())
+ private MessageMetaData_1_0 convertMetaData(final MessageTransferMessage serverMessage)
+ {
+ List<Section> sections = new ArrayList<Section>(3);
+ final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties();
+ final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties();
+
+ Header header = new Header();
+ if(deliveryProps != null)
+ {
+ header.setDurable(deliveryProps.hasDeliveryMode() && deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT);
+ if(deliveryProps.hasPriority())
{
- if(_link.isAttached())
+ header.setPriority(UnsignedByte.valueOf((byte)deliveryProps.getPriority().getValue()));
+ }
+ if(deliveryProps.hasTtl())
+ {
+ header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl()));
+ }
+ sections.add(header);
+ }
+
+ Properties props = new Properties();
+ if(msgProps != null)
+ {
+ // props.setAbsoluteExpiryTime();
+ if(msgProps.hasContentEncoding())
+ {
+ props.setContentEncoding(Symbol.valueOf(msgProps.getContentEncoding()));
+ }
+
+ if(msgProps.hasCorrelationId())
+ {
+ props.setCorrelationId(msgProps.getCorrelationId());
+ }
+ // props.setCreationTime();
+ // props.setGroupId();
+ // props.setGroupSequence();
+ if(msgProps.hasMessageId())
+ {
+ props.setMessageId(msgProps.getMessageId());
+ }
+ if(msgProps.hasReplyTo())
+ {
+ props.setReplyTo(msgProps.getReplyTo().getExchange()+"/"+msgProps.getReplyTo().getRoutingKey());
+ }
+ if(msgProps.hasContentType())
+ {
+ props.setContentType(Symbol.valueOf(msgProps.getContentType()));
+
+ // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
+ if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
{
- if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
- {
- transfer.setSettled(true);
- }
- else
- {
- UnsettledAction action = _acquires
- ? new DispositionAction(tag, queueEntry)
- : new DoNothingAction(tag, queueEntry);
+ props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
+ }
+ }
+ // props.setReplyToGroupId();
+ props.setSubject(serverMessage.getRoutingKey());
+ // props.setTo();
+ if(msgProps.hasUserId())
+ {
+ props.setUserId(new Binary(msgProps.getUserId()));
+ }
- _link.addUnsettled(tag, action, queueEntry);
- }
+ sections.add(props);
- if(_transactionId != null)
- {
- TransactionalState state = new TransactionalState();
- state.setTxnId(_transactionId);
- transfer.setState(state);
- }
- // TODO - need to deal with failure here
- if(_acquires && _transactionId != null)
- {
- ServerTransaction txn = _link.getTransaction(_transactionId);
- if(txn != null)
- {
- txn.addPostTransactionAction(new ServerTransaction.Action(){
+ if(msgProps.getApplicationHeaders() != null)
+ {
+ sections.add(new ApplicationProperties(msgProps.getApplicationHeaders()));
+ }
+ }
+ return new MessageMetaData_1_0(sections, _sectionEncoder);
+ }
- public void postCommit()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
+ private StoredMessage<MessageMetaData_1_0> convert08Message(final AMQMessage serverMessage)
+ {
+ final MessageMetaData_1_0 metaData = convertMetaData(serverMessage);
- public void onRollback()
- {
- if(queueEntry.isAcquiredBy(Subscription_1_0.this))
- {
- queueEntry.release();
- _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
+ return convertServerMessage(metaData, serverMessage);
- }
- }
- });
- }
+ }
- }
+ private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData,
+ final ServerMessage serverMessage)
+ {
+ final String mimeType = serverMessage.getMessageHeader().getMimeType();
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getContent(ByteBuffer.wrap(data), 0);
+
+ Section bodySection = convertMessageBody(mimeType, data);
- getEndpoint().transfer(transfer);
+ final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection);
+
+ return new StoredMessage<MessageMetaData_1_0>()
+ {
+ @Override
+ public MessageMetaData_1_0 getMetaData()
+ {
+ return metaData;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return serverMessage.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ ByteBuffer buf = allData.duplicate();
+ buf.position(offsetInMessage);
+ buf = buf.slice();
+ int size;
+ if(dst.remaining()<buf.remaining())
+ {
+ buf.limit(dst.remaining());
+ size = dst.remaining();
}
else
{
- queueEntry.release();
+ size = buf.remaining();
+ }
+ dst.put(buf);
+ return size;
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = allData.duplicate();
+ buf.position(offsetInMessage);
+ buf = buf.slice();
+ if(size < buf.remaining())
+ {
+ buf.limit(size);
+ }
+ return buf;
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void remove()
+ {
+ serverMessage.getStoredMessage().remove();
+ }
+ };
+ }
+
+ private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection)
+ {
+ int headerSize = (int) metaData.getStorableSize();
+
+ _sectionEncoder.reset();
+ _sectionEncoder.encodeObject(bodySection);
+ Binary dataEncoding = _sectionEncoder.getEncoding();
+
+ final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength());
+ metaData.writeToBuffer(0,allData);
+ allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength());
+ return allData;
+ }
+
+ private static Section convertMessageBody(String mimeType, byte[] data)
+ {
+ if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
+ {
+ String text = new String(data);
+ return new AmqpValue(text);
+ }
+ else if("jms/map-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ LinkedHashMap map = new LinkedHashMap();
+ final int entries = reader.readIntImpl();
+ for (int i = 0; i < entries; i++)
+ {
+ try
+ {
+ String propName = reader.readStringImpl();
+ Object value = reader.readObject();
+ map.put(propName, value);
+ }
+ catch (EOFException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new IllegalArgumentException(e);
}
+
}
+
+ return new AmqpValue(map);
+
+ }
+ else if("amqp/map".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ return new AmqpValue(decoder.readMap());
+
+ }
+ else if("amqp/list".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ return new AmqpValue(decoder.readList());
}
+ else if("jms/stream-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ List list = new ArrayList();
+ while (reader.remaining() != 0)
+ {
+ try
+ {
+ list.add(reader.readObject());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ catch (EOFException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ }
+ return new AmqpValue(list);
+ }
+ else
+ {
+ return new Data(new Binary(data));
+
+ }
+ }
+
+ private MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage)
+ {
+
+ List<Section> sections = new ArrayList<Section>(3);
+
+ Header header = new Header();
+
+ header.setDurable(serverMessage.isPersistent());
+
+ BasicContentHeaderProperties contentHeader =
+ (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties();
+
+ header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
+ final long expiration = serverMessage.getExpiration();
+ final long arrivalTime = serverMessage.getArrivalTime();
+
+ if(expiration > arrivalTime)
+ {
+ header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime));
+ }
+ sections.add(header);
+
+
+ Properties props = new Properties();
+
+ props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString()));
+
+ props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString()));
+
+ // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
+ if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
+ {
+ props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
+ }
+
+ final AMQShortString correlationId = contentHeader.getCorrelationId();
+ if(correlationId != null)
+ {
+ props.setCorrelationId(new Binary(correlationId.getBytes()));
+ }
+ // props.setCreationTime();
+ // props.setGroupId();
+ // props.setGroupSequence();
+ final AMQShortString messageId = contentHeader.getMessageId();
+ if(messageId != null)
+ {
+ props.setMessageId(new Binary(messageId.getBytes()));
+ }
+ props.setReplyTo(String.valueOf(contentHeader.getReplyTo()));
+
+ // props.setReplyToGroupId();
+ props.setSubject(serverMessage.getRoutingKey());
+ // props.setTo();
+ if(contentHeader.getUserId() != null)
+ {
+ props.setUserId(new Binary(contentHeader.getUserId().getBytes()));
+ }
+ sections.add(props);
+
+ sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders())));
+ return new MessageMetaData_1_0(sections, _sectionEncoder);
}
public void queueDeleted(final AMQQueue queue)