diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-03-17 16:44:47 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-03-17 16:44:47 +0000 |
commit | ba09630a4258cded77842e1bd5d746b8fbda0cfe (patch) | |
tree | da5fd4e29ce839185c6759a1a141fb2d65f0250c | |
parent | 1533a95469482a820d3f883c44e7e92fa02c5eb3 (diff) | |
download | qpid-python-ba09630a4258cded77842e1bd5d746b8fbda0cfe.tar.gz |
QPID-4000 : [Java Broker] Add conversion of 0-x messages to 1-0 subscriptions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1457482 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 776 insertions, 216 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java index 6a0e4d216e..439d7aa928 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java @@ -65,7 +65,7 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData> { this(handle, null); } - + public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef) { super(handle); @@ -93,7 +93,7 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData> return getStoredMessage().getMetaData(); } - public ContentHeaderBody getContentHeaderBody() throws AMQException + public ContentHeaderBody getContentHeaderBody() { return getMessageMetaData().getContentHeaderBody(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java index e01f20d54f..1d8b239733 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.*; import org.apache.qpid.amqp_1_0.codec.ValueHandler; import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; +import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.Section; import org.apache.qpid.amqp_1_0.type.Symbol; @@ -59,6 +60,22 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData private MessageHeader_1_0 _messageHeader; + public MessageMetaData_1_0(List<Section> sections, SectionEncoder encoder) + { + this(sections, encodeSections(sections, encoder)); + } + + private static ArrayList<ByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder) + { + ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(sections.size()); + for(Section section : sections) + { + encoder.encodeObject(section); + encodedSections.add(encoder.getEncoding().asByteBuffer()); + encoder.reset(); + } + return encodedSections; + } public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index 8a3d3716c7..8bde913149 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.io.EOFException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -35,25 +38,46 @@ import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.DeliveryState; import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedByte; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Data; import org.apache.qpid.amqp_1_0.type.messaging.Header; import org.apache.qpid.amqp_1_0.type.messaging.Modified; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.messaging.Source; import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesFormatException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData_1_0; +import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.codec.BBDecoder; class Subscription_1_0 implements Subscription { @@ -201,150 +225,489 @@ class Subscription_1_0 implements Subscription public void send(final QueueEntry queueEntry) throws AMQException { - //TODO ServerMessage serverMessage = queueEntry.getMessage(); + Message_1_0 message; if(serverMessage instanceof Message_1_0) { - Message_1_0 message = (Message_1_0) serverMessage; - Transfer transfer = new Transfer(); - //TODO + message = (Message_1_0) serverMessage; + } + else + { + if(serverMessage instanceof AMQMessage) + { + message = new Message_1_0(convert08Message((AMQMessage)serverMessage)); + } + else if(serverMessage instanceof MessageTransferMessage) + { + message = new Message_1_0(convert010Message((MessageTransferMessage)serverMessage)); + } + else + { + return; + } + } + + Transfer transfer = new Transfer(); + //TODO - List<ByteBuffer> fragments = message.getFragments(); - ByteBuffer payload; - if(fragments.size() == 1) + List<ByteBuffer> fragments = message.getFragments(); + ByteBuffer payload; + if(fragments.size() == 1) + { + payload = fragments.get(0); + } + else + { + int size = 0; + for(ByteBuffer fragment : fragments) { - payload = fragments.get(0); + size += fragment.remaining(); } - else + + payload = ByteBuffer.allocate(size); + + for(ByteBuffer fragment : fragments) { - int size = 0; - for(ByteBuffer fragment : fragments) - { - size += fragment.remaining(); - } + payload.put(fragment.duplicate()); + } + + payload.flip(); + } - payload = ByteBuffer.allocate(size); + if(queueEntry.getDeliveryCount() != 0) + { + payload = payload.duplicate(); + ValueHandler valueHandler = new ValueHandler(_typeRegistry); - for(ByteBuffer fragment : fragments) + Header oldHeader = null; + try + { + ByteBuffer encodedBuf = payload.duplicate(); + Object value = valueHandler.parse(payload); + if(value instanceof Header) { - payload.put(fragment.duplicate()); + oldHeader = (Header) value; + } + else + { + payload.position(0); } - - payload.flip(); + } + catch (AmqpErrorException e) + { + //TODO + throw new RuntimeException(e); } - if(queueEntry.getDeliveryCount() != 0) + Header header = new Header(); + if(oldHeader != null) { - payload = payload.duplicate(); - ValueHandler valueHandler = new ValueHandler(_typeRegistry); + header.setDurable(oldHeader.getDurable()); + header.setPriority(oldHeader.getPriority()); + header.setTtl(oldHeader.getTtl()); + } + header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount())); + _sectionEncoder.reset(); + _sectionEncoder.encodeObject(header); + Binary encodedHeader = _sectionEncoder.getEncoding(); + + ByteBuffer oldPayload = payload; + payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength()); + payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength()); + payload.put(oldPayload); + payload.flip(); + } - Header oldHeader = null; - try + transfer.setPayload(payload); + byte[] data = new byte[8]; + ByteBuffer.wrap(data).putLong(_deliveryTag++); + final Binary tag = new Binary(data); + + transfer.setDeliveryTag(tag); + + synchronized(_link.getLock()) + { + if(_link.isAttached()) + { + if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode())) { - ByteBuffer encodedBuf = payload.duplicate(); - Object value = valueHandler.parse(payload); - if(value instanceof Header) - { - oldHeader = (Header) value; - } - else - { - payload.position(0); - } + transfer.setSettled(true); } - catch (AmqpErrorException e) + else { - //TODO - throw new RuntimeException(e); + UnsettledAction action = _acquires + ? new DispositionAction(tag, queueEntry) + : new DoNothingAction(tag, queueEntry); + + _link.addUnsettled(tag, action, queueEntry); } - Header header = new Header(); - if(oldHeader != null) + if(_transactionId != null) { - header.setDurable(oldHeader.getDurable()); - header.setPriority(oldHeader.getPriority()); - header.setTtl(oldHeader.getTtl()); + TransactionalState state = new TransactionalState(); + state.setTxnId(_transactionId); + transfer.setState(state); } - header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount())); - _sectionEncoder.reset(); - _sectionEncoder.encodeObject(header); - Binary encodedHeader = _sectionEncoder.getEncoding(); + // TODO - need to deal with failure here + if(_acquires && _transactionId != null) + { + ServerTransaction txn = _link.getTransaction(_transactionId); + if(txn != null) + { + txn.addPostTransactionAction(new ServerTransaction.Action(){ + + public void postCommit() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void onRollback() + { + if(queueEntry.isAcquiredBy(Subscription_1_0.this)) + { + queueEntry.release(); + _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); + + + } + } + }); + } - ByteBuffer oldPayload = payload; - payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength()); - payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength()); - payload.put(oldPayload); - payload.flip(); + } + getSession().getConnectionModel().registerMessageDelivered(message.getSize()); + getEndpoint().transfer(transfer); } + else + { + queueEntry.release(); + } + } - transfer.setPayload(payload); - byte[] data = new byte[8]; - ByteBuffer.wrap(data).putLong(_deliveryTag++); - final Binary tag = new Binary(data); + } - transfer.setDeliveryTag(tag); + private StoredMessage<MessageMetaData_1_0> convert010Message(final MessageTransferMessage serverMessage) + { + final MessageMetaData_1_0 metaData = convertMetaData(serverMessage); + + return convertServerMessage(metaData, serverMessage); + + } - synchronized(_link.getLock()) + private MessageMetaData_1_0 convertMetaData(final MessageTransferMessage serverMessage) + { + List<Section> sections = new ArrayList<Section>(3); + final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties(); + final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties(); + + Header header = new Header(); + if(deliveryProps != null) + { + header.setDurable(deliveryProps.hasDeliveryMode() && deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT); + if(deliveryProps.hasPriority()) { - if(_link.isAttached()) + header.setPriority(UnsignedByte.valueOf((byte)deliveryProps.getPriority().getValue())); + } + if(deliveryProps.hasTtl()) + { + header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl())); + } + sections.add(header); + } + + Properties props = new Properties(); + if(msgProps != null) + { + // props.setAbsoluteExpiryTime(); + if(msgProps.hasContentEncoding()) + { + props.setContentEncoding(Symbol.valueOf(msgProps.getContentEncoding())); + } + + if(msgProps.hasCorrelationId()) + { + props.setCorrelationId(msgProps.getCorrelationId()); + } + // props.setCreationTime(); + // props.setGroupId(); + // props.setGroupSequence(); + if(msgProps.hasMessageId()) + { + props.setMessageId(msgProps.getMessageId()); + } + if(msgProps.hasReplyTo()) + { + props.setReplyTo(msgProps.getReplyTo().getExchange()+"/"+msgProps.getReplyTo().getRoutingKey()); + } + if(msgProps.hasContentType()) + { + props.setContentType(Symbol.valueOf(msgProps.getContentType())); + + // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client + if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) { - if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode())) - { - transfer.setSettled(true); - } - else - { - UnsettledAction action = _acquires - ? new DispositionAction(tag, queueEntry) - : new DoNothingAction(tag, queueEntry); + props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); + } + } + // props.setReplyToGroupId(); + props.setSubject(serverMessage.getRoutingKey()); + // props.setTo(); + if(msgProps.hasUserId()) + { + props.setUserId(new Binary(msgProps.getUserId())); + } - _link.addUnsettled(tag, action, queueEntry); - } + sections.add(props); - if(_transactionId != null) - { - TransactionalState state = new TransactionalState(); - state.setTxnId(_transactionId); - transfer.setState(state); - } - // TODO - need to deal with failure here - if(_acquires && _transactionId != null) - { - ServerTransaction txn = _link.getTransaction(_transactionId); - if(txn != null) - { - txn.addPostTransactionAction(new ServerTransaction.Action(){ + if(msgProps.getApplicationHeaders() != null) + { + sections.add(new ApplicationProperties(msgProps.getApplicationHeaders())); + } + } + return new MessageMetaData_1_0(sections, _sectionEncoder); + } - public void postCommit() - { - //To change body of implemented methods use File | Settings | File Templates. - } + private StoredMessage<MessageMetaData_1_0> convert08Message(final AMQMessage serverMessage) + { + final MessageMetaData_1_0 metaData = convertMetaData(serverMessage); - public void onRollback() - { - if(queueEntry.isAcquiredBy(Subscription_1_0.this)) - { - queueEntry.release(); - _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); + return convertServerMessage(metaData, serverMessage); - } - } - }); - } + } - } + private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData, + final ServerMessage serverMessage) + { + final String mimeType = serverMessage.getMessageHeader().getMimeType(); + byte[] data = new byte[(int) serverMessage.getSize()]; + serverMessage.getContent(ByteBuffer.wrap(data), 0); + + Section bodySection = convertMessageBody(mimeType, data); - getEndpoint().transfer(transfer); + final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection); + + return new StoredMessage<MessageMetaData_1_0>() + { + @Override + public MessageMetaData_1_0 getMetaData() + { + return metaData; + } + + @Override + public long getMessageNumber() + { + return serverMessage.getMessageNumber(); + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + ByteBuffer buf = allData.duplicate(); + buf.position(offsetInMessage); + buf = buf.slice(); + int size; + if(dst.remaining()<buf.remaining()) + { + buf.limit(dst.remaining()); + size = dst.remaining(); } else { - queueEntry.release(); + size = buf.remaining(); + } + dst.put(buf); + return size; + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + ByteBuffer buf = allData.duplicate(); + buf.position(offsetInMessage); + buf = buf.slice(); + if(size < buf.remaining()) + { + buf.limit(size); + } + return buf; + } + + @Override + public StoreFuture flushToStore() + { + throw new UnsupportedOperationException(); + } + + @Override + public void remove() + { + serverMessage.getStoredMessage().remove(); + } + }; + } + + private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection) + { + int headerSize = (int) metaData.getStorableSize(); + + _sectionEncoder.reset(); + _sectionEncoder.encodeObject(bodySection); + Binary dataEncoding = _sectionEncoder.getEncoding(); + + final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength()); + metaData.writeToBuffer(0,allData); + allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength()); + return allData; + } + + private static Section convertMessageBody(String mimeType, byte[] data) + { + if("text/plain".equals(mimeType) || "text/xml".equals(mimeType)) + { + String text = new String(data); + return new AmqpValue(text); + } + else if("jms/map-message".equals(mimeType)) + { + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); + + LinkedHashMap map = new LinkedHashMap(); + final int entries = reader.readIntImpl(); + for (int i = 0; i < entries; i++) + { + try + { + String propName = reader.readStringImpl(); + Object value = reader.readObject(); + map.put(propName, value); + } + catch (EOFException e) + { + throw new IllegalArgumentException(e); + } + catch (TypedBytesFormatException e) + { + throw new IllegalArgumentException(e); } + } + + return new AmqpValue(map); + + } + else if("amqp/map".equals(mimeType)) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(ByteBuffer.wrap(data)); + return new AmqpValue(decoder.readMap()); + + } + else if("amqp/list".equals(mimeType)) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(ByteBuffer.wrap(data)); + return new AmqpValue(decoder.readList()); } + else if("jms/stream-message".equals(mimeType)) + { + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); + + List list = new ArrayList(); + while (reader.remaining() != 0) + { + try + { + list.add(reader.readObject()); + } + catch (TypedBytesFormatException e) + { + throw new RuntimeException(e); // TODO - Implement + } + catch (EOFException e) + { + throw new RuntimeException(e); // TODO - Implement + } + } + return new AmqpValue(list); + } + else + { + return new Data(new Binary(data)); + + } + } + + private MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage) + { + + List<Section> sections = new ArrayList<Section>(3); + + Header header = new Header(); + + header.setDurable(serverMessage.isPersistent()); + + BasicContentHeaderProperties contentHeader = + (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties(); + + header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority())); + final long expiration = serverMessage.getExpiration(); + final long arrivalTime = serverMessage.getArrivalTime(); + + if(expiration > arrivalTime) + { + header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime)); + } + sections.add(header); + + + Properties props = new Properties(); + + props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString())); + + props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString())); + + // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client + if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) + { + props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); + } + + final AMQShortString correlationId = contentHeader.getCorrelationId(); + if(correlationId != null) + { + props.setCorrelationId(new Binary(correlationId.getBytes())); + } + // props.setCreationTime(); + // props.setGroupId(); + // props.setGroupSequence(); + final AMQShortString messageId = contentHeader.getMessageId(); + if(messageId != null) + { + props.setMessageId(new Binary(messageId.getBytes())); + } + props.setReplyTo(String.valueOf(contentHeader.getReplyTo())); + + // props.setReplyToGroupId(); + props.setSubject(serverMessage.getRoutingKey()); + // props.setTo(); + if(contentHeader.getUserId() != null) + { + props.setUserId(new Binary(contentHeader.getUserId().getBytes())); + } + sections.add(props); + + sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders()))); + return new MessageMetaData_1_0(sections, _sectionEncoder); } public void queueDeleted(final AMQQueue queue) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index b0320d0f4e..6ffa051ff8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -20,13 +20,16 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.AMQException; - +import java.io.EOFException; +import java.nio.ByteBuffer; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; -import java.nio.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesContentWriter; +import org.apache.qpid.typedmessage.TypedBytesFormatException; public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesMessage { @@ -100,7 +103,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM private void checkAvailable(final int i) throws MessageEOFException { - _typedBytesContentReader.checkAvailable(1); + try + { + _typedBytesContentReader.checkAvailable(1); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } } public byte readByte() throws JMSException @@ -178,7 +188,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM // we check only for one byte since theoretically the string could be only a // single byte when using UTF-8 encoding - return _typedBytesContentReader.readLengthPrefixedUTF(); + try + { + return _typedBytesContentReader.readLengthPrefixedUTF(); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public int readBytes(byte[] bytes) throws JMSException @@ -275,7 +292,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM public void writeUTF(String string) throws JMSException { checkWritable(); - _typedBytesContentWriter.writeLengthPrefixedUTF(string); + try + { + _typedBytesContentWriter.writeLengthPrefixedUTF(string); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public void writeBytes(byte[] bytes) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index e18ed80f6d..0b05179215 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -20,18 +20,21 @@ */ package org.apache.qpid.client.message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.AMQException; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; +import java.io.EOFException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import org.apache.qpid.AMQException; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesContentWriter; +import org.apache.qpid.typedmessage.TypedBytesFormatException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMessage { @@ -455,9 +458,22 @@ public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMe final int entries = reader.readIntImpl(); for (int i = 0; i < entries; i++) { - String propName = reader.readStringImpl(); - Object value = reader.readObject(); - _map.put(propName, value); + String propName = null; + try + { + propName = reader.readStringImpl(); + Object value = reader.readObject(); + _map.put(propName, value); + + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } } } else @@ -477,7 +493,14 @@ public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMe { writer.writeNullTerminatedStringImpl(entry.getKey()); - writer.writeObject(entry.getValue()); + try + { + writer.writeObject(entry.getValue()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } return writer.getData(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index b1af262580..223facbb59 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -20,11 +20,16 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.AMQException; - +import java.io.EOFException; +import java.nio.ByteBuffer; import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; import javax.jms.StreamMessage; -import java.nio.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesContentWriter; +import org.apache.qpid.typedmessage.TypedBytesFormatException; /** * @author Apache Software Foundation @@ -95,20 +100,53 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public boolean readBoolean() throws JMSException { checkReadable(); - return _typedBytesContentReader.readBoolean(); + try + { + return _typedBytesContentReader.readBoolean(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public byte readByte() throws JMSException { checkReadable(); - return _typedBytesContentReader.readByte(); + try + { + return _typedBytesContentReader.readByte(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public short readShort() throws JMSException { checkReadable(); - return _typedBytesContentReader.readShort(); + try + { + return _typedBytesContentReader.readShort(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } /** @@ -120,37 +158,103 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public char readChar() throws JMSException { checkReadable(); - return _typedBytesContentReader.readChar(); + try + { + return _typedBytesContentReader.readChar(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public int readInt() throws JMSException { checkReadable(); - return _typedBytesContentReader.readInt(); + try + { + return _typedBytesContentReader.readInt(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public long readLong() throws JMSException { checkReadable(); - return _typedBytesContentReader.readLong(); + try + { + return _typedBytesContentReader.readLong(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public float readFloat() throws JMSException { checkReadable(); - return _typedBytesContentReader.readFloat(); + try + { + return _typedBytesContentReader.readFloat(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public double readDouble() throws JMSException { checkReadable(); - return _typedBytesContentReader.readDouble(); + try + { + return _typedBytesContentReader.readDouble(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public String readString() throws JMSException { checkReadable(); - return _typedBytesContentReader.readString(); + try + { + return _typedBytesContentReader.readString(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public int readBytes(byte[] bytes) throws JMSException @@ -161,14 +265,36 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea } checkReadable(); - return _typedBytesContentReader.readBytes(bytes); + try + { + return _typedBytesContentReader.readBytes(bytes); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public Object readObject() throws JMSException { checkReadable(); - return _typedBytesContentReader.readObject(); + try + { + return _typedBytesContentReader.readObject(); + } + catch (EOFException e) + { + throw new MessageEOFException(e.getMessage()); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } public void writeBoolean(boolean b) throws JMSException @@ -240,6 +366,13 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public void writeObject(Object object) throws JMSException { checkWritable(); - _typedBytesContentWriter.writeObject(object); + try + { + _typedBytesContentWriter.writeObject(object); + } + catch (TypedBytesFormatException e) + { + throw new MessageFormatException(e.getMessage()); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesCodes.java index 26a0b41cdc..0e12ac65d8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesCodes.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.client.message; +package org.apache.qpid.typedmessage; public interface TypedBytesCodes { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentReader.java index b00ac7e34b..0ba865f1e6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentReader.java @@ -18,19 +18,17 @@ * under the License. * */ -package org.apache.qpid.client.message; +package org.apache.qpid.typedmessage; -import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotReadableException; + +import java.io.EOFException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; -class TypedBytesContentReader implements TypedBytesCodes +public class TypedBytesContentReader implements TypedBytesCodes { private final ByteBuffer _data; @@ -58,22 +56,21 @@ class TypedBytesContentReader implements TypedBytesCodes * @param len the number of bytes * @throws javax.jms.MessageEOFException if there are less than len bytes available to read */ - protected void checkAvailable(int len) throws MessageEOFException + public void checkAvailable(int len) throws EOFException { if (_data.remaining() < len) { - throw new MessageEOFException("Unable to read " + len + " bytes"); + throw new EOFException("Unable to read " + len + " bytes"); } } - protected byte readWireType() throws MessageFormatException, MessageEOFException, - MessageNotReadableException + public byte readWireType() throws TypedBytesFormatException, EOFException { checkAvailable(1); return _data.get(); } - protected boolean readBoolean() throws JMSException + public boolean readBoolean() throws EOFException, TypedBytesFormatException { int position = _data.position(); byte wireType = readWireType(); @@ -92,7 +89,7 @@ class TypedBytesContentReader implements TypedBytesCodes break; default: _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a boolean"); } return result; } @@ -103,12 +100,12 @@ class TypedBytesContentReader implements TypedBytesCodes } } - boolean readBooleanImpl() + public boolean readBooleanImpl() { return _data.get() != 0; } - protected byte readByte() throws JMSException + public byte readByte() throws EOFException, TypedBytesFormatException { int position = _data.position(); byte wireType = readWireType(); @@ -127,7 +124,7 @@ class TypedBytesContentReader implements TypedBytesCodes break; default: _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a byte"); } } catch (RuntimeException e) @@ -138,12 +135,12 @@ class TypedBytesContentReader implements TypedBytesCodes return result; } - byte readByteImpl() + public byte readByteImpl() { return _data.get(); } - protected short readShort() throws JMSException + public short readShort() throws EOFException, TypedBytesFormatException { int position = _data.position(); byte wireType = readWireType(); @@ -166,7 +163,7 @@ class TypedBytesContentReader implements TypedBytesCodes break; default: _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a short"); } } catch (RuntimeException e) @@ -177,7 +174,7 @@ class TypedBytesContentReader implements TypedBytesCodes return result; } - short readShortImpl() + public short readShortImpl() { return _data.getShort(); } @@ -188,7 +185,7 @@ class TypedBytesContentReader implements TypedBytesCodes * @return the character read from the stream * @throws javax.jms.JMSException */ - protected char readChar() throws JMSException + public char readChar() throws EOFException, TypedBytesFormatException { int position = _data.position(); byte wireType = readWireType(); @@ -202,7 +199,7 @@ class TypedBytesContentReader implements TypedBytesCodes if (wireType != CHAR_TYPE) { _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a char"); } else { @@ -217,12 +214,12 @@ class TypedBytesContentReader implements TypedBytesCodes } } - char readCharImpl() + public char readCharImpl() { return _data.getChar(); } - protected int readInt() throws JMSException + public int readInt() throws EOFException, TypedBytesFormatException { int position = _data.position(); byte wireType = readWireType(); @@ -249,7 +246,7 @@ class TypedBytesContentReader implements TypedBytesCodes break; default: _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to an int"); } return result; } @@ -260,12 +257,12 @@ class TypedBytesContentReader implements TypedBytesCodes } } - protected int readIntImpl() + public int readIntImpl() { return _data.getInt(); } - protected long readLong() throws JMSException + public long readLong() throws EOFException, TypedBytesFormatException { int position = _data.position(); byte wireType = readWireType(); @@ -296,7 +293,7 @@ class TypedBytesContentReader implements TypedBytesCodes break; default: _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a long"); } return result; } @@ -307,12 +304,12 @@ class TypedBytesContentReader implements TypedBytesCodes } } - long readLongImpl() + public long readLongImpl() { return _data.getLong(); } - protected float readFloat() throws JMSException + public float readFloat() throws EOFException, TypedBytesFormatException { int position = _data.position(); byte wireType = readWireType(); @@ -331,7 +328,7 @@ class TypedBytesContentReader implements TypedBytesCodes break; default: _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a float"); } return result; } @@ -342,12 +339,12 @@ class TypedBytesContentReader implements TypedBytesCodes } } - float readFloatImpl() + public float readFloatImpl() { return _data.getFloat(); } - protected double readDouble() throws JMSException + public double readDouble() throws TypedBytesFormatException, EOFException { int position = _data.position(); byte wireType = readWireType(); @@ -370,7 +367,7 @@ class TypedBytesContentReader implements TypedBytesCodes break; default: _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a double"); } return result; } @@ -381,12 +378,12 @@ class TypedBytesContentReader implements TypedBytesCodes } } - double readDoubleImpl() + public double readDoubleImpl() { return _data.getDouble(); } - protected String readString() throws JMSException + public String readString() throws EOFException, TypedBytesFormatException { int position = _data.position(); byte wireType = readWireType(); @@ -436,7 +433,7 @@ class TypedBytesContentReader implements TypedBytesCodes break; default: _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a String"); } return result; } @@ -447,7 +444,7 @@ class TypedBytesContentReader implements TypedBytesCodes } } - protected String readStringImpl() throws JMSException + public String readStringImpl() throws TypedBytesFormatException { try { @@ -462,14 +459,13 @@ class TypedBytesContentReader implements TypedBytesCodes } catch (CharacterCodingException e) { - JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); - jmse.setLinkedException(e); + TypedBytesFormatException jmse = new TypedBytesFormatException("Error decoding byte stream as a UTF8 string: " + e); jmse.initCause(e); throw jmse; } } - protected int readBytes(byte[] bytes) throws JMSException + public int readBytes(byte[] bytes) throws EOFException, TypedBytesFormatException { if (bytes == null) { @@ -484,7 +480,7 @@ class TypedBytesContentReader implements TypedBytesCodes byte wireType = readWireType(); if (wireType != BYTEARRAY_TYPE) { - throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); + throw new TypedBytesFormatException("Unable to convert " + wireType + " to a byte array"); } checkAvailable(4); int size = _data.getInt(); @@ -497,7 +493,7 @@ class TypedBytesContentReader implements TypedBytesCodes { if (size > _data.remaining()) { - throw new MessageEOFException("Byte array has stated length " + throw new EOFException("Byte array has stated length " + size + " but message only contains " + @@ -540,7 +536,7 @@ class TypedBytesContentReader implements TypedBytesCodes } } - protected Object readObject() throws JMSException + public Object readObject() throws EOFException, TypedBytesFormatException { int position = _data.position(); byte wireType = readWireType(); @@ -643,7 +639,7 @@ class TypedBytesContentReader implements TypedBytesCodes _data.get(bytes, offset, count); } - public String readLengthPrefixedUTF() throws JMSException + public String readLengthPrefixedUTF() throws TypedBytesFormatException { try { @@ -665,8 +661,7 @@ class TypedBytesContentReader implements TypedBytesCodes } catch(CharacterCodingException e) { - JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); - jmse.setLinkedException(e); + TypedBytesFormatException jmse = new TypedBytesFormatException("Error decoding byte stream as a UTF8 string: " + e); jmse.initCause(e); throw jmse; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentWriter.java index 7c91db3a32..c7ca2d7df7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentWriter.java @@ -18,10 +18,8 @@ * under the License. * */ -package org.apache.qpid.client.message; +package org.apache.qpid.typedmessage; -import javax.jms.JMSException; -import javax.jms.MessageFormatException; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -31,13 +29,13 @@ import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; -class TypedBytesContentWriter implements TypedBytesCodes +public class TypedBytesContentWriter implements TypedBytesCodes { private final ByteArrayOutputStream _baos = new ByteArrayOutputStream(); private final DataOutputStream _data = new DataOutputStream(_baos); private static final Charset UTF8 = Charset.forName("UTF-8"); - protected void writeTypeDiscriminator(byte type) throws JMSException + protected void writeTypeDiscriminator(byte type) { try { @@ -49,21 +47,20 @@ class TypedBytesContentWriter implements TypedBytesCodes } } - private JMSException handle(final IOException e) + private RuntimeException handle(final IOException e) { - JMSException jmsEx = new JMSException("Unable to write value: " + e.getMessage()); - jmsEx.setLinkedException(e); + RuntimeException jmsEx = new RuntimeException("Unable to write value: " + e.getMessage()); return jmsEx; } - protected void writeBoolean(boolean b) throws JMSException + public void writeBoolean(boolean b) { writeTypeDiscriminator(BOOLEAN_TYPE); writeBooleanImpl(b); } - public void writeBooleanImpl(final boolean b) throws JMSException + public void writeBooleanImpl(final boolean b) { try { @@ -75,13 +72,13 @@ class TypedBytesContentWriter implements TypedBytesCodes } } - protected void writeByte(byte b) throws JMSException + public void writeByte(byte b) { writeTypeDiscriminator(BYTE_TYPE); writeByteImpl(b); } - public void writeByteImpl(final byte b) throws JMSException + public void writeByteImpl(final byte b) { try { @@ -93,13 +90,13 @@ class TypedBytesContentWriter implements TypedBytesCodes } } - protected void writeShort(short i) throws JMSException + public void writeShort(short i) { writeTypeDiscriminator(SHORT_TYPE); writeShortImpl(i); } - public void writeShortImpl(final short i) throws JMSException + public void writeShortImpl(final short i) { try { @@ -111,13 +108,13 @@ class TypedBytesContentWriter implements TypedBytesCodes } } - protected void writeChar(char c) throws JMSException + public void writeChar(char c) { writeTypeDiscriminator(CHAR_TYPE); writeCharImpl(c); } - public void writeCharImpl(final char c) throws JMSException + public void writeCharImpl(final char c) { try { @@ -129,13 +126,13 @@ class TypedBytesContentWriter implements TypedBytesCodes } } - protected void writeInt(int i) throws JMSException + public void writeInt(int i) { writeTypeDiscriminator(INT_TYPE); writeIntImpl(i); } - protected void writeIntImpl(int i) throws JMSException + public void writeIntImpl(int i) { try { @@ -147,13 +144,13 @@ class TypedBytesContentWriter implements TypedBytesCodes } } - protected void writeLong(long l) throws JMSException + public void writeLong(long l) { writeTypeDiscriminator(LONG_TYPE); writeLongImpl(l); } - public void writeLongImpl(final long l) throws JMSException + public void writeLongImpl(final long l) { try { @@ -165,13 +162,13 @@ class TypedBytesContentWriter implements TypedBytesCodes } } - protected void writeFloat(float v) throws JMSException + public void writeFloat(float v) { writeTypeDiscriminator(FLOAT_TYPE); writeFloatImpl(v); } - public void writeFloatImpl(final float v) throws JMSException + public void writeFloatImpl(final float v) { try { @@ -183,13 +180,13 @@ class TypedBytesContentWriter implements TypedBytesCodes } } - protected void writeDouble(double v) throws JMSException + public void writeDouble(double v) { writeTypeDiscriminator(DOUBLE_TYPE); writeDoubleImpl(v); } - public void writeDoubleImpl(final double v) throws JMSException + public void writeDoubleImpl(final double v) { try { @@ -201,7 +198,7 @@ class TypedBytesContentWriter implements TypedBytesCodes } } - protected void writeString(String string) throws JMSException + public void writeString(String string) { if (string == null) { @@ -214,8 +211,8 @@ class TypedBytesContentWriter implements TypedBytesCodes } } - protected void writeNullTerminatedStringImpl(String string) - throws JMSException + public void writeNullTerminatedStringImpl(String string) + { try { @@ -229,18 +226,18 @@ class TypedBytesContentWriter implements TypedBytesCodes } - protected void writeBytes(byte[] bytes) throws JMSException + public void writeBytes(byte[] bytes) { writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); } - protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException + public void writeBytes(byte[] bytes, int offset, int length) { writeTypeDiscriminator(BYTEARRAY_TYPE); writeBytesImpl(bytes, offset, length); } - public void writeBytesImpl(final byte[] bytes, final int offset, final int length) throws JMSException + public void writeBytesImpl(final byte[] bytes, final int offset, final int length) { try { @@ -260,7 +257,7 @@ class TypedBytesContentWriter implements TypedBytesCodes } } - public void writeBytesRaw(final byte[] bytes, final int offset, final int length) throws JMSException + public void writeBytesRaw(final byte[] bytes, final int offset, final int length) { try { @@ -276,7 +273,7 @@ class TypedBytesContentWriter implements TypedBytesCodes } - protected void writeObject(Object object) throws JMSException + public void writeObject(Object object) throws TypedBytesFormatException { Class clazz; @@ -332,7 +329,7 @@ class TypedBytesContentWriter implements TypedBytesCodes } else { - throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + throw new TypedBytesFormatException("Only primitives plus byte arrays and String are valid types"); } } @@ -341,7 +338,7 @@ class TypedBytesContentWriter implements TypedBytesCodes return ByteBuffer.wrap(_baos.toByteArray()); } - public void writeLengthPrefixedUTF(final String string) throws JMSException + public void writeLengthPrefixedUTF(final String string) throws TypedBytesFormatException { try { @@ -356,8 +353,7 @@ class TypedBytesContentWriter implements TypedBytesCodes } catch (CharacterCodingException e) { - JMSException jmse = new JMSException("Unable to encode string: " + e); - jmse.setLinkedException(e); + TypedBytesFormatException jmse = new TypedBytesFormatException("Unable to encode string: " + e); jmse.initCause(e); throw jmse; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java new file mode 100644 index 0000000000..95e7ea0acc --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java @@ -0,0 +1,9 @@ +package org.apache.qpid.typedmessage; + +public class TypedBytesFormatException extends Exception +{ + public TypedBytesFormatException(String s) + { + super(s); + } +} |