diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java | 561 |
1 files changed, 462 insertions, 99 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index 8a3d3716c7..8bde913149 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.io.EOFException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -35,25 +38,46 @@ import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.DeliveryState; import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedByte; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Data; import org.apache.qpid.amqp_1_0.type.messaging.Header; import org.apache.qpid.amqp_1_0.type.messaging.Modified; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.messaging.Source; import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesFormatException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData_1_0; +import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.codec.BBDecoder; class Subscription_1_0 implements Subscription { @@ -201,150 +225,489 @@ class Subscription_1_0 implements Subscription public void send(final QueueEntry queueEntry) throws AMQException { - //TODO ServerMessage serverMessage = queueEntry.getMessage(); + Message_1_0 message; if(serverMessage instanceof Message_1_0) { - Message_1_0 message = (Message_1_0) serverMessage; - Transfer transfer = new Transfer(); - //TODO + message = (Message_1_0) serverMessage; + } + else + { + if(serverMessage instanceof AMQMessage) + { + message = new Message_1_0(convert08Message((AMQMessage)serverMessage)); + } + else if(serverMessage instanceof MessageTransferMessage) + { + message = new Message_1_0(convert010Message((MessageTransferMessage)serverMessage)); + } + else + { + return; + } + } + + Transfer transfer = new Transfer(); + //TODO - List<ByteBuffer> fragments = message.getFragments(); - ByteBuffer payload; - if(fragments.size() == 1) + List<ByteBuffer> fragments = message.getFragments(); + ByteBuffer payload; + if(fragments.size() == 1) + { + payload = fragments.get(0); + } + else + { + int size = 0; + for(ByteBuffer fragment : fragments) { - payload = fragments.get(0); + size += fragment.remaining(); } - else + + payload = ByteBuffer.allocate(size); + + for(ByteBuffer fragment : fragments) { - int size = 0; - for(ByteBuffer fragment : fragments) - { - size += fragment.remaining(); - } + payload.put(fragment.duplicate()); + } + + payload.flip(); + } - payload = ByteBuffer.allocate(size); + if(queueEntry.getDeliveryCount() != 0) + { + payload = payload.duplicate(); + ValueHandler valueHandler = new ValueHandler(_typeRegistry); - for(ByteBuffer fragment : fragments) + Header oldHeader = null; + try + { + ByteBuffer encodedBuf = payload.duplicate(); + Object value = valueHandler.parse(payload); + if(value instanceof Header) { - payload.put(fragment.duplicate()); + oldHeader = (Header) value; + } + else + { + payload.position(0); } - - payload.flip(); + } + catch (AmqpErrorException e) + { + //TODO + throw new RuntimeException(e); } - if(queueEntry.getDeliveryCount() != 0) + Header header = new Header(); + if(oldHeader != null) { - payload = payload.duplicate(); - ValueHandler valueHandler = new ValueHandler(_typeRegistry); + header.setDurable(oldHeader.getDurable()); + header.setPriority(oldHeader.getPriority()); + header.setTtl(oldHeader.getTtl()); + } + header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount())); + _sectionEncoder.reset(); + _sectionEncoder.encodeObject(header); + Binary encodedHeader = _sectionEncoder.getEncoding(); + + ByteBuffer oldPayload = payload; + payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength()); + payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength()); + payload.put(oldPayload); + payload.flip(); + } - Header oldHeader = null; - try + transfer.setPayload(payload); + byte[] data = new byte[8]; + ByteBuffer.wrap(data).putLong(_deliveryTag++); + final Binary tag = new Binary(data); + + transfer.setDeliveryTag(tag); + + synchronized(_link.getLock()) + { + if(_link.isAttached()) + { + if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode())) { - ByteBuffer encodedBuf = payload.duplicate(); - Object value = valueHandler.parse(payload); - if(value instanceof Header) - { - oldHeader = (Header) value; - } - else - { - payload.position(0); - } + transfer.setSettled(true); } - catch (AmqpErrorException e) + else { - //TODO - throw new RuntimeException(e); + UnsettledAction action = _acquires + ? new DispositionAction(tag, queueEntry) + : new DoNothingAction(tag, queueEntry); + + _link.addUnsettled(tag, action, queueEntry); } - Header header = new Header(); - if(oldHeader != null) + if(_transactionId != null) { - header.setDurable(oldHeader.getDurable()); - header.setPriority(oldHeader.getPriority()); - header.setTtl(oldHeader.getTtl()); + TransactionalState state = new TransactionalState(); + state.setTxnId(_transactionId); + transfer.setState(state); } - header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount())); - _sectionEncoder.reset(); - _sectionEncoder.encodeObject(header); - Binary encodedHeader = _sectionEncoder.getEncoding(); + // TODO - need to deal with failure here + if(_acquires && _transactionId != null) + { + ServerTransaction txn = _link.getTransaction(_transactionId); + if(txn != null) + { + txn.addPostTransactionAction(new ServerTransaction.Action(){ + + public void postCommit() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void onRollback() + { + if(queueEntry.isAcquiredBy(Subscription_1_0.this)) + { + queueEntry.release(); + _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); + + + } + } + }); + } - ByteBuffer oldPayload = payload; - payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength()); - payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength()); - payload.put(oldPayload); - payload.flip(); + } + getSession().getConnectionModel().registerMessageDelivered(message.getSize()); + getEndpoint().transfer(transfer); } + else + { + queueEntry.release(); + } + } - transfer.setPayload(payload); - byte[] data = new byte[8]; - ByteBuffer.wrap(data).putLong(_deliveryTag++); - final Binary tag = new Binary(data); + } - transfer.setDeliveryTag(tag); + private StoredMessage<MessageMetaData_1_0> convert010Message(final MessageTransferMessage serverMessage) + { + final MessageMetaData_1_0 metaData = convertMetaData(serverMessage); + + return convertServerMessage(metaData, serverMessage); + + } - synchronized(_link.getLock()) + private MessageMetaData_1_0 convertMetaData(final MessageTransferMessage serverMessage) + { + List<Section> sections = new ArrayList<Section>(3); + final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties(); + final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties(); + + Header header = new Header(); + if(deliveryProps != null) + { + header.setDurable(deliveryProps.hasDeliveryMode() && deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT); + if(deliveryProps.hasPriority()) { - if(_link.isAttached()) + header.setPriority(UnsignedByte.valueOf((byte)deliveryProps.getPriority().getValue())); + } + if(deliveryProps.hasTtl()) + { + header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl())); + } + sections.add(header); + } + + Properties props = new Properties(); + if(msgProps != null) + { + // props.setAbsoluteExpiryTime(); + if(msgProps.hasContentEncoding()) + { + props.setContentEncoding(Symbol.valueOf(msgProps.getContentEncoding())); + } + + if(msgProps.hasCorrelationId()) + { + props.setCorrelationId(msgProps.getCorrelationId()); + } + // props.setCreationTime(); + // props.setGroupId(); + // props.setGroupSequence(); + if(msgProps.hasMessageId()) + { + props.setMessageId(msgProps.getMessageId()); + } + if(msgProps.hasReplyTo()) + { + props.setReplyTo(msgProps.getReplyTo().getExchange()+"/"+msgProps.getReplyTo().getRoutingKey()); + } + if(msgProps.hasContentType()) + { + props.setContentType(Symbol.valueOf(msgProps.getContentType())); + + // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client + if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) { - if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode())) - { - transfer.setSettled(true); - } - else - { - UnsettledAction action = _acquires - ? new DispositionAction(tag, queueEntry) - : new DoNothingAction(tag, queueEntry); + props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); + } + } + // props.setReplyToGroupId(); + props.setSubject(serverMessage.getRoutingKey()); + // props.setTo(); + if(msgProps.hasUserId()) + { + props.setUserId(new Binary(msgProps.getUserId())); + } - _link.addUnsettled(tag, action, queueEntry); - } + sections.add(props); - if(_transactionId != null) - { - TransactionalState state = new TransactionalState(); - state.setTxnId(_transactionId); - transfer.setState(state); - } - // TODO - need to deal with failure here - if(_acquires && _transactionId != null) - { - ServerTransaction txn = _link.getTransaction(_transactionId); - if(txn != null) - { - txn.addPostTransactionAction(new ServerTransaction.Action(){ + if(msgProps.getApplicationHeaders() != null) + { + sections.add(new ApplicationProperties(msgProps.getApplicationHeaders())); + } + } + return new MessageMetaData_1_0(sections, _sectionEncoder); + } - public void postCommit() - { - //To change body of implemented methods use File | Settings | File Templates. - } + private StoredMessage<MessageMetaData_1_0> convert08Message(final AMQMessage serverMessage) + { + final MessageMetaData_1_0 metaData = convertMetaData(serverMessage); - public void onRollback() - { - if(queueEntry.isAcquiredBy(Subscription_1_0.this)) - { - queueEntry.release(); - _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); + return convertServerMessage(metaData, serverMessage); - } - } - }); - } + } - } + private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData, + final ServerMessage serverMessage) + { + final String mimeType = serverMessage.getMessageHeader().getMimeType(); + byte[] data = new byte[(int) serverMessage.getSize()]; + serverMessage.getContent(ByteBuffer.wrap(data), 0); + + Section bodySection = convertMessageBody(mimeType, data); - getEndpoint().transfer(transfer); + final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection); + + return new StoredMessage<MessageMetaData_1_0>() + { + @Override + public MessageMetaData_1_0 getMetaData() + { + return metaData; + } + + @Override + public long getMessageNumber() + { + return serverMessage.getMessageNumber(); + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + ByteBuffer buf = allData.duplicate(); + buf.position(offsetInMessage); + buf = buf.slice(); + int size; + if(dst.remaining()<buf.remaining()) + { + buf.limit(dst.remaining()); + size = dst.remaining(); } else { - queueEntry.release(); + size = buf.remaining(); + } + dst.put(buf); + return size; + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + ByteBuffer buf = allData.duplicate(); + buf.position(offsetInMessage); + buf = buf.slice(); + if(size < buf.remaining()) + { + buf.limit(size); + } + return buf; + } + + @Override + public StoreFuture flushToStore() + { + throw new UnsupportedOperationException(); + } + + @Override + public void remove() + { + serverMessage.getStoredMessage().remove(); + } + }; + } + + private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection) + { + int headerSize = (int) metaData.getStorableSize(); + + _sectionEncoder.reset(); + _sectionEncoder.encodeObject(bodySection); + Binary dataEncoding = _sectionEncoder.getEncoding(); + + final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength()); + metaData.writeToBuffer(0,allData); + allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength()); + return allData; + } + + private static Section convertMessageBody(String mimeType, byte[] data) + { + if("text/plain".equals(mimeType) || "text/xml".equals(mimeType)) + { + String text = new String(data); + return new AmqpValue(text); + } + else if("jms/map-message".equals(mimeType)) + { + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); + + LinkedHashMap map = new LinkedHashMap(); + final int entries = reader.readIntImpl(); + for (int i = 0; i < entries; i++) + { + try + { + String propName = reader.readStringImpl(); + Object value = reader.readObject(); + map.put(propName, value); + } + catch (EOFException e) + { + throw new IllegalArgumentException(e); + } + catch (TypedBytesFormatException e) + { + throw new IllegalArgumentException(e); } + } + + return new AmqpValue(map); + + } + else if("amqp/map".equals(mimeType)) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(ByteBuffer.wrap(data)); + return new AmqpValue(decoder.readMap()); + + } + else if("amqp/list".equals(mimeType)) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(ByteBuffer.wrap(data)); + return new AmqpValue(decoder.readList()); } + else if("jms/stream-message".equals(mimeType)) + { + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); + + List list = new ArrayList(); + while (reader.remaining() != 0) + { + try + { + list.add(reader.readObject()); + } + catch (TypedBytesFormatException e) + { + throw new RuntimeException(e); // TODO - Implement + } + catch (EOFException e) + { + throw new RuntimeException(e); // TODO - Implement + } + } + return new AmqpValue(list); + } + else + { + return new Data(new Binary(data)); + + } + } + + private MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage) + { + + List<Section> sections = new ArrayList<Section>(3); + + Header header = new Header(); + + header.setDurable(serverMessage.isPersistent()); + + BasicContentHeaderProperties contentHeader = + (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties(); + + header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority())); + final long expiration = serverMessage.getExpiration(); + final long arrivalTime = serverMessage.getArrivalTime(); + + if(expiration > arrivalTime) + { + header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime)); + } + sections.add(header); + + + Properties props = new Properties(); + + props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString())); + + props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString())); + + // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client + if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) + { + props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); + } + + final AMQShortString correlationId = contentHeader.getCorrelationId(); + if(correlationId != null) + { + props.setCorrelationId(new Binary(correlationId.getBytes())); + } + // props.setCreationTime(); + // props.setGroupId(); + // props.setGroupSequence(); + final AMQShortString messageId = contentHeader.getMessageId(); + if(messageId != null) + { + props.setMessageId(new Binary(messageId.getBytes())); + } + props.setReplyTo(String.valueOf(contentHeader.getReplyTo())); + + // props.setReplyToGroupId(); + props.setSubject(serverMessage.getRoutingKey()); + // props.setTo(); + if(contentHeader.getUserId() != null) + { + props.setUserId(new Binary(contentHeader.getUserId().getBytes())); + } + sections.add(props); + + sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders()))); + return new MessageMetaData_1_0(sections, _sectionEncoder); } public void queueDeleted(final AMQQueue queue) |