summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-03-17 16:44:47 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-03-17 16:44:47 +0000
commitba09630a4258cded77842e1bd5d746b8fbda0cfe (patch)
treeda5fd4e29ce839185c6759a1a141fb2d65f0250c
parent1533a95469482a820d3f883c44e7e92fa02c5eb3 (diff)
downloadqpid-python-ba09630a4258cded77842e1bd5d746b8fbda0cfe.tar.gz
QPID-4000 : [Java Broker] Add conversion of 0-x messages to 1-0 subscriptions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1457482 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java4
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java561
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java36
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java45
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java163
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesCodes.java (renamed from qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java)2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentReader.java (renamed from qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java)87
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentWriter.java (renamed from qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java)68
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java9
10 files changed, 776 insertions, 216 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
index 6a0e4d216e..439d7aa928 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
@@ -65,7 +65,7 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
{
this(handle, null);
}
-
+
public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef)
{
super(handle);
@@ -93,7 +93,7 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
return getStoredMessage().getMetaData();
}
- public ContentHeaderBody getContentHeaderBody() throws AMQException
+ public ContentHeaderBody getContentHeaderBody()
{
return getMessageMetaData().getContentHeaderBody();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
index e01f20d54f..1d8b239733 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Section;
import org.apache.qpid.amqp_1_0.type.Symbol;
@@ -59,6 +60,22 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
private MessageHeader_1_0 _messageHeader;
+ public MessageMetaData_1_0(List<Section> sections, SectionEncoder encoder)
+ {
+ this(sections, encodeSections(sections, encoder));
+ }
+
+ private static ArrayList<ByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder)
+ {
+ ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(sections.size());
+ for(Section section : sections)
+ {
+ encoder.encodeObject(section);
+ encodedSections.add(encoder.getEncoding().asByteBuffer());
+ encoder.reset();
+ }
+ return encodedSections;
+ }
public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index 8a3d3716c7..8bde913149 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.io.EOFException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,25 +38,46 @@ import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.DeliveryState;
import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
import org.apache.qpid.amqp_1_0.type.messaging.Modified;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData_1_0;
+import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.codec.BBDecoder;
class Subscription_1_0 implements Subscription
{
@@ -201,150 +225,489 @@ class Subscription_1_0 implements Subscription
public void send(final QueueEntry queueEntry) throws AMQException
{
- //TODO
ServerMessage serverMessage = queueEntry.getMessage();
+ Message_1_0 message;
if(serverMessage instanceof Message_1_0)
{
- Message_1_0 message = (Message_1_0) serverMessage;
- Transfer transfer = new Transfer();
- //TODO
+ message = (Message_1_0) serverMessage;
+ }
+ else
+ {
+ if(serverMessage instanceof AMQMessage)
+ {
+ message = new Message_1_0(convert08Message((AMQMessage)serverMessage));
+ }
+ else if(serverMessage instanceof MessageTransferMessage)
+ {
+ message = new Message_1_0(convert010Message((MessageTransferMessage)serverMessage));
+ }
+ else
+ {
+ return;
+ }
+ }
+
+ Transfer transfer = new Transfer();
+ //TODO
- List<ByteBuffer> fragments = message.getFragments();
- ByteBuffer payload;
- if(fragments.size() == 1)
+ List<ByteBuffer> fragments = message.getFragments();
+ ByteBuffer payload;
+ if(fragments.size() == 1)
+ {
+ payload = fragments.get(0);
+ }
+ else
+ {
+ int size = 0;
+ for(ByteBuffer fragment : fragments)
{
- payload = fragments.get(0);
+ size += fragment.remaining();
}
- else
+
+ payload = ByteBuffer.allocate(size);
+
+ for(ByteBuffer fragment : fragments)
{
- int size = 0;
- for(ByteBuffer fragment : fragments)
- {
- size += fragment.remaining();
- }
+ payload.put(fragment.duplicate());
+ }
+
+ payload.flip();
+ }
- payload = ByteBuffer.allocate(size);
+ if(queueEntry.getDeliveryCount() != 0)
+ {
+ payload = payload.duplicate();
+ ValueHandler valueHandler = new ValueHandler(_typeRegistry);
- for(ByteBuffer fragment : fragments)
+ Header oldHeader = null;
+ try
+ {
+ ByteBuffer encodedBuf = payload.duplicate();
+ Object value = valueHandler.parse(payload);
+ if(value instanceof Header)
{
- payload.put(fragment.duplicate());
+ oldHeader = (Header) value;
+ }
+ else
+ {
+ payload.position(0);
}
-
- payload.flip();
+ }
+ catch (AmqpErrorException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
}
- if(queueEntry.getDeliveryCount() != 0)
+ Header header = new Header();
+ if(oldHeader != null)
{
- payload = payload.duplicate();
- ValueHandler valueHandler = new ValueHandler(_typeRegistry);
+ header.setDurable(oldHeader.getDurable());
+ header.setPriority(oldHeader.getPriority());
+ header.setTtl(oldHeader.getTtl());
+ }
+ header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount()));
+ _sectionEncoder.reset();
+ _sectionEncoder.encodeObject(header);
+ Binary encodedHeader = _sectionEncoder.getEncoding();
+
+ ByteBuffer oldPayload = payload;
+ payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength());
+ payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
+ payload.put(oldPayload);
+ payload.flip();
+ }
- Header oldHeader = null;
- try
+ transfer.setPayload(payload);
+ byte[] data = new byte[8];
+ ByteBuffer.wrap(data).putLong(_deliveryTag++);
+ final Binary tag = new Binary(data);
+
+ transfer.setDeliveryTag(tag);
+
+ synchronized(_link.getLock())
+ {
+ if(_link.isAttached())
+ {
+ if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
{
- ByteBuffer encodedBuf = payload.duplicate();
- Object value = valueHandler.parse(payload);
- if(value instanceof Header)
- {
- oldHeader = (Header) value;
- }
- else
- {
- payload.position(0);
- }
+ transfer.setSettled(true);
}
- catch (AmqpErrorException e)
+ else
{
- //TODO
- throw new RuntimeException(e);
+ UnsettledAction action = _acquires
+ ? new DispositionAction(tag, queueEntry)
+ : new DoNothingAction(tag, queueEntry);
+
+ _link.addUnsettled(tag, action, queueEntry);
}
- Header header = new Header();
- if(oldHeader != null)
+ if(_transactionId != null)
{
- header.setDurable(oldHeader.getDurable());
- header.setPriority(oldHeader.getPriority());
- header.setTtl(oldHeader.getTtl());
+ TransactionalState state = new TransactionalState();
+ state.setTxnId(_transactionId);
+ transfer.setState(state);
}
- header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount()));
- _sectionEncoder.reset();
- _sectionEncoder.encodeObject(header);
- Binary encodedHeader = _sectionEncoder.getEncoding();
+ // TODO - need to deal with failure here
+ if(_acquires && _transactionId != null)
+ {
+ ServerTransaction txn = _link.getTransaction(_transactionId);
+ if(txn != null)
+ {
+ txn.addPostTransactionAction(new ServerTransaction.Action(){
+
+ public void postCommit()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void onRollback()
+ {
+ if(queueEntry.isAcquiredBy(Subscription_1_0.this))
+ {
+ queueEntry.release();
+ _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
+
+
+ }
+ }
+ });
+ }
- ByteBuffer oldPayload = payload;
- payload = ByteBuffer.allocate(oldPayload.remaining() + encodedHeader.getLength());
- payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
- payload.put(oldPayload);
- payload.flip();
+ }
+ getSession().getConnectionModel().registerMessageDelivered(message.getSize());
+ getEndpoint().transfer(transfer);
}
+ else
+ {
+ queueEntry.release();
+ }
+ }
- transfer.setPayload(payload);
- byte[] data = new byte[8];
- ByteBuffer.wrap(data).putLong(_deliveryTag++);
- final Binary tag = new Binary(data);
+ }
- transfer.setDeliveryTag(tag);
+ private StoredMessage<MessageMetaData_1_0> convert010Message(final MessageTransferMessage serverMessage)
+ {
+ final MessageMetaData_1_0 metaData = convertMetaData(serverMessage);
+
+ return convertServerMessage(metaData, serverMessage);
+
+ }
- synchronized(_link.getLock())
+ private MessageMetaData_1_0 convertMetaData(final MessageTransferMessage serverMessage)
+ {
+ List<Section> sections = new ArrayList<Section>(3);
+ final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties();
+ final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties();
+
+ Header header = new Header();
+ if(deliveryProps != null)
+ {
+ header.setDurable(deliveryProps.hasDeliveryMode() && deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT);
+ if(deliveryProps.hasPriority())
{
- if(_link.isAttached())
+ header.setPriority(UnsignedByte.valueOf((byte)deliveryProps.getPriority().getValue()));
+ }
+ if(deliveryProps.hasTtl())
+ {
+ header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl()));
+ }
+ sections.add(header);
+ }
+
+ Properties props = new Properties();
+ if(msgProps != null)
+ {
+ // props.setAbsoluteExpiryTime();
+ if(msgProps.hasContentEncoding())
+ {
+ props.setContentEncoding(Symbol.valueOf(msgProps.getContentEncoding()));
+ }
+
+ if(msgProps.hasCorrelationId())
+ {
+ props.setCorrelationId(msgProps.getCorrelationId());
+ }
+ // props.setCreationTime();
+ // props.setGroupId();
+ // props.setGroupSequence();
+ if(msgProps.hasMessageId())
+ {
+ props.setMessageId(msgProps.getMessageId());
+ }
+ if(msgProps.hasReplyTo())
+ {
+ props.setReplyTo(msgProps.getReplyTo().getExchange()+"/"+msgProps.getReplyTo().getRoutingKey());
+ }
+ if(msgProps.hasContentType())
+ {
+ props.setContentType(Symbol.valueOf(msgProps.getContentType()));
+
+ // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
+ if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
{
- if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
- {
- transfer.setSettled(true);
- }
- else
- {
- UnsettledAction action = _acquires
- ? new DispositionAction(tag, queueEntry)
- : new DoNothingAction(tag, queueEntry);
+ props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
+ }
+ }
+ // props.setReplyToGroupId();
+ props.setSubject(serverMessage.getRoutingKey());
+ // props.setTo();
+ if(msgProps.hasUserId())
+ {
+ props.setUserId(new Binary(msgProps.getUserId()));
+ }
- _link.addUnsettled(tag, action, queueEntry);
- }
+ sections.add(props);
- if(_transactionId != null)
- {
- TransactionalState state = new TransactionalState();
- state.setTxnId(_transactionId);
- transfer.setState(state);
- }
- // TODO - need to deal with failure here
- if(_acquires && _transactionId != null)
- {
- ServerTransaction txn = _link.getTransaction(_transactionId);
- if(txn != null)
- {
- txn.addPostTransactionAction(new ServerTransaction.Action(){
+ if(msgProps.getApplicationHeaders() != null)
+ {
+ sections.add(new ApplicationProperties(msgProps.getApplicationHeaders()));
+ }
+ }
+ return new MessageMetaData_1_0(sections, _sectionEncoder);
+ }
- public void postCommit()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
+ private StoredMessage<MessageMetaData_1_0> convert08Message(final AMQMessage serverMessage)
+ {
+ final MessageMetaData_1_0 metaData = convertMetaData(serverMessage);
- public void onRollback()
- {
- if(queueEntry.isAcquiredBy(Subscription_1_0.this))
- {
- queueEntry.release();
- _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
+ return convertServerMessage(metaData, serverMessage);
- }
- }
- });
- }
+ }
- }
+ private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData,
+ final ServerMessage serverMessage)
+ {
+ final String mimeType = serverMessage.getMessageHeader().getMimeType();
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getContent(ByteBuffer.wrap(data), 0);
+
+ Section bodySection = convertMessageBody(mimeType, data);
- getEndpoint().transfer(transfer);
+ final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection);
+
+ return new StoredMessage<MessageMetaData_1_0>()
+ {
+ @Override
+ public MessageMetaData_1_0 getMetaData()
+ {
+ return metaData;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return serverMessage.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ ByteBuffer buf = allData.duplicate();
+ buf.position(offsetInMessage);
+ buf = buf.slice();
+ int size;
+ if(dst.remaining()<buf.remaining())
+ {
+ buf.limit(dst.remaining());
+ size = dst.remaining();
}
else
{
- queueEntry.release();
+ size = buf.remaining();
+ }
+ dst.put(buf);
+ return size;
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = allData.duplicate();
+ buf.position(offsetInMessage);
+ buf = buf.slice();
+ if(size < buf.remaining())
+ {
+ buf.limit(size);
+ }
+ return buf;
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void remove()
+ {
+ serverMessage.getStoredMessage().remove();
+ }
+ };
+ }
+
+ private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection)
+ {
+ int headerSize = (int) metaData.getStorableSize();
+
+ _sectionEncoder.reset();
+ _sectionEncoder.encodeObject(bodySection);
+ Binary dataEncoding = _sectionEncoder.getEncoding();
+
+ final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength());
+ metaData.writeToBuffer(0,allData);
+ allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength());
+ return allData;
+ }
+
+ private static Section convertMessageBody(String mimeType, byte[] data)
+ {
+ if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
+ {
+ String text = new String(data);
+ return new AmqpValue(text);
+ }
+ else if("jms/map-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ LinkedHashMap map = new LinkedHashMap();
+ final int entries = reader.readIntImpl();
+ for (int i = 0; i < entries; i++)
+ {
+ try
+ {
+ String propName = reader.readStringImpl();
+ Object value = reader.readObject();
+ map.put(propName, value);
+ }
+ catch (EOFException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new IllegalArgumentException(e);
}
+
}
+
+ return new AmqpValue(map);
+
+ }
+ else if("amqp/map".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ return new AmqpValue(decoder.readMap());
+
+ }
+ else if("amqp/list".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ return new AmqpValue(decoder.readList());
}
+ else if("jms/stream-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ List list = new ArrayList();
+ while (reader.remaining() != 0)
+ {
+ try
+ {
+ list.add(reader.readObject());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ catch (EOFException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ }
+ return new AmqpValue(list);
+ }
+ else
+ {
+ return new Data(new Binary(data));
+
+ }
+ }
+
+ private MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage)
+ {
+
+ List<Section> sections = new ArrayList<Section>(3);
+
+ Header header = new Header();
+
+ header.setDurable(serverMessage.isPersistent());
+
+ BasicContentHeaderProperties contentHeader =
+ (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties();
+
+ header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
+ final long expiration = serverMessage.getExpiration();
+ final long arrivalTime = serverMessage.getArrivalTime();
+
+ if(expiration > arrivalTime)
+ {
+ header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime));
+ }
+ sections.add(header);
+
+
+ Properties props = new Properties();
+
+ props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString()));
+
+ props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString()));
+
+ // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
+ if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
+ {
+ props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
+ }
+
+ final AMQShortString correlationId = contentHeader.getCorrelationId();
+ if(correlationId != null)
+ {
+ props.setCorrelationId(new Binary(correlationId.getBytes()));
+ }
+ // props.setCreationTime();
+ // props.setGroupId();
+ // props.setGroupSequence();
+ final AMQShortString messageId = contentHeader.getMessageId();
+ if(messageId != null)
+ {
+ props.setMessageId(new Binary(messageId.getBytes()));
+ }
+ props.setReplyTo(String.valueOf(contentHeader.getReplyTo()));
+
+ // props.setReplyToGroupId();
+ props.setSubject(serverMessage.getRoutingKey());
+ // props.setTo();
+ if(contentHeader.getUserId() != null)
+ {
+ props.setUserId(new Binary(contentHeader.getUserId().getBytes()));
+ }
+ sections.add(props);
+
+ sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders())));
+ return new MessageMetaData_1_0(sections, _sectionEncoder);
}
public void queueDeleted(final AMQQueue queue)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index b0320d0f4e..6ffa051ff8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -20,13 +20,16 @@
*/
package org.apache.qpid.client.message;
-import org.apache.qpid.AMQException;
-
+import java.io.EOFException;
+import java.nio.ByteBuffer;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
-import java.nio.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesContentWriter;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesMessage
{
@@ -100,7 +103,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM
private void checkAvailable(final int i) throws MessageEOFException
{
- _typedBytesContentReader.checkAvailable(1);
+ try
+ {
+ _typedBytesContentReader.checkAvailable(1);
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
}
public byte readByte() throws JMSException
@@ -178,7 +188,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM
// we check only for one byte since theoretically the string could be only a
// single byte when using UTF-8 encoding
- return _typedBytesContentReader.readLengthPrefixedUTF();
+ try
+ {
+ return _typedBytesContentReader.readLengthPrefixedUTF();
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public int readBytes(byte[] bytes) throws JMSException
@@ -275,7 +292,14 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM
public void writeUTF(String string) throws JMSException
{
checkWritable();
- _typedBytesContentWriter.writeLengthPrefixedUTF(string);
+ try
+ {
+ _typedBytesContentWriter.writeLengthPrefixedUTF(string);
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public void writeBytes(byte[] bytes) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index e18ed80f6d..0b05179215 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -20,18 +20,21 @@
*/
package org.apache.qpid.client.message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.AMQException;
-
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
+import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesContentWriter;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMessage
{
@@ -455,9 +458,22 @@ public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMe
final int entries = reader.readIntImpl();
for (int i = 0; i < entries; i++)
{
- String propName = reader.readStringImpl();
- Object value = reader.readObject();
- _map.put(propName, value);
+ String propName = null;
+ try
+ {
+ propName = reader.readStringImpl();
+ Object value = reader.readObject();
+ _map.put(propName, value);
+
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
}
}
else
@@ -477,7 +493,14 @@ public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMe
{
writer.writeNullTerminatedStringImpl(entry.getKey());
- writer.writeObject(entry.getValue());
+ try
+ {
+ writer.writeObject(entry.getValue());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
return writer.getData();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index b1af262580..223facbb59 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -20,11 +20,16 @@
*/
package org.apache.qpid.client.message;
-import org.apache.qpid.AMQException;
-
+import java.io.EOFException;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
import javax.jms.StreamMessage;
-import java.nio.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesContentWriter;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
/**
* @author Apache Software Foundation
@@ -95,20 +100,53 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
public boolean readBoolean() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readBoolean();
+ try
+ {
+ return _typedBytesContentReader.readBoolean();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public byte readByte() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readByte();
+ try
+ {
+ return _typedBytesContentReader.readByte();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public short readShort() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readShort();
+ try
+ {
+ return _typedBytesContentReader.readShort();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
/**
@@ -120,37 +158,103 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
public char readChar() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readChar();
+ try
+ {
+ return _typedBytesContentReader.readChar();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public int readInt() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readInt();
+ try
+ {
+ return _typedBytesContentReader.readInt();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public long readLong() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readLong();
+ try
+ {
+ return _typedBytesContentReader.readLong();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public float readFloat() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readFloat();
+ try
+ {
+ return _typedBytesContentReader.readFloat();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public double readDouble() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readDouble();
+ try
+ {
+ return _typedBytesContentReader.readDouble();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public String readString() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readString();
+ try
+ {
+ return _typedBytesContentReader.readString();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public int readBytes(byte[] bytes) throws JMSException
@@ -161,14 +265,36 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
}
checkReadable();
- return _typedBytesContentReader.readBytes(bytes);
+ try
+ {
+ return _typedBytesContentReader.readBytes(bytes);
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public Object readObject() throws JMSException
{
checkReadable();
- return _typedBytesContentReader.readObject();
+ try
+ {
+ return _typedBytesContentReader.readObject();
+ }
+ catch (EOFException e)
+ {
+ throw new MessageEOFException(e.getMessage());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
public void writeBoolean(boolean b) throws JMSException
@@ -240,6 +366,13 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
public void writeObject(Object object) throws JMSException
{
checkWritable();
- _typedBytesContentWriter.writeObject(object);
+ try
+ {
+ _typedBytesContentWriter.writeObject(object);
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new MessageFormatException(e.getMessage());
+ }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesCodes.java
index 26a0b41cdc..0e12ac65d8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesCodes.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesCodes.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.message;
+package org.apache.qpid.typedmessage;
public interface TypedBytesCodes
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentReader.java
index b00ac7e34b..0ba865f1e6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentReader.java
@@ -18,19 +18,17 @@
* under the License.
*
*/
-package org.apache.qpid.client.message;
+package org.apache.qpid.typedmessage;
-import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageNotReadableException;
+
+import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
-class TypedBytesContentReader implements TypedBytesCodes
+public class TypedBytesContentReader implements TypedBytesCodes
{
private final ByteBuffer _data;
@@ -58,22 +56,21 @@ class TypedBytesContentReader implements TypedBytesCodes
* @param len the number of bytes
* @throws javax.jms.MessageEOFException if there are less than len bytes available to read
*/
- protected void checkAvailable(int len) throws MessageEOFException
+ public void checkAvailable(int len) throws EOFException
{
if (_data.remaining() < len)
{
- throw new MessageEOFException("Unable to read " + len + " bytes");
+ throw new EOFException("Unable to read " + len + " bytes");
}
}
- protected byte readWireType() throws MessageFormatException, MessageEOFException,
- MessageNotReadableException
+ public byte readWireType() throws TypedBytesFormatException, EOFException
{
checkAvailable(1);
return _data.get();
}
- protected boolean readBoolean() throws JMSException
+ public boolean readBoolean() throws EOFException, TypedBytesFormatException
{
int position = _data.position();
byte wireType = readWireType();
@@ -92,7 +89,7 @@ class TypedBytesContentReader implements TypedBytesCodes
break;
default:
_data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ throw new TypedBytesFormatException("Unable to convert " + wireType + " to a boolean");
}
return result;
}
@@ -103,12 +100,12 @@ class TypedBytesContentReader implements TypedBytesCodes
}
}
- boolean readBooleanImpl()
+ public boolean readBooleanImpl()
{
return _data.get() != 0;
}
- protected byte readByte() throws JMSException
+ public byte readByte() throws EOFException, TypedBytesFormatException
{
int position = _data.position();
byte wireType = readWireType();
@@ -127,7 +124,7 @@ class TypedBytesContentReader implements TypedBytesCodes
break;
default:
_data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ throw new TypedBytesFormatException("Unable to convert " + wireType + " to a byte");
}
}
catch (RuntimeException e)
@@ -138,12 +135,12 @@ class TypedBytesContentReader implements TypedBytesCodes
return result;
}
- byte readByteImpl()
+ public byte readByteImpl()
{
return _data.get();
}
- protected short readShort() throws JMSException
+ public short readShort() throws EOFException, TypedBytesFormatException
{
int position = _data.position();
byte wireType = readWireType();
@@ -166,7 +163,7 @@ class TypedBytesContentReader implements TypedBytesCodes
break;
default:
_data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ throw new TypedBytesFormatException("Unable to convert " + wireType + " to a short");
}
}
catch (RuntimeException e)
@@ -177,7 +174,7 @@ class TypedBytesContentReader implements TypedBytesCodes
return result;
}
- short readShortImpl()
+ public short readShortImpl()
{
return _data.getShort();
}
@@ -188,7 +185,7 @@ class TypedBytesContentReader implements TypedBytesCodes
* @return the character read from the stream
* @throws javax.jms.JMSException
*/
- protected char readChar() throws JMSException
+ public char readChar() throws EOFException, TypedBytesFormatException
{
int position = _data.position();
byte wireType = readWireType();
@@ -202,7 +199,7 @@ class TypedBytesContentReader implements TypedBytesCodes
if (wireType != CHAR_TYPE)
{
_data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ throw new TypedBytesFormatException("Unable to convert " + wireType + " to a char");
}
else
{
@@ -217,12 +214,12 @@ class TypedBytesContentReader implements TypedBytesCodes
}
}
- char readCharImpl()
+ public char readCharImpl()
{
return _data.getChar();
}
- protected int readInt() throws JMSException
+ public int readInt() throws EOFException, TypedBytesFormatException
{
int position = _data.position();
byte wireType = readWireType();
@@ -249,7 +246,7 @@ class TypedBytesContentReader implements TypedBytesCodes
break;
default:
_data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ throw new TypedBytesFormatException("Unable to convert " + wireType + " to an int");
}
return result;
}
@@ -260,12 +257,12 @@ class TypedBytesContentReader implements TypedBytesCodes
}
}
- protected int readIntImpl()
+ public int readIntImpl()
{
return _data.getInt();
}
- protected long readLong() throws JMSException
+ public long readLong() throws EOFException, TypedBytesFormatException
{
int position = _data.position();
byte wireType = readWireType();
@@ -296,7 +293,7 @@ class TypedBytesContentReader implements TypedBytesCodes
break;
default:
_data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ throw new TypedBytesFormatException("Unable to convert " + wireType + " to a long");
}
return result;
}
@@ -307,12 +304,12 @@ class TypedBytesContentReader implements TypedBytesCodes
}
}
- long readLongImpl()
+ public long readLongImpl()
{
return _data.getLong();
}
- protected float readFloat() throws JMSException
+ public float readFloat() throws EOFException, TypedBytesFormatException
{
int position = _data.position();
byte wireType = readWireType();
@@ -331,7 +328,7 @@ class TypedBytesContentReader implements TypedBytesCodes
break;
default:
_data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ throw new TypedBytesFormatException("Unable to convert " + wireType + " to a float");
}
return result;
}
@@ -342,12 +339,12 @@ class TypedBytesContentReader implements TypedBytesCodes
}
}
- float readFloatImpl()
+ public float readFloatImpl()
{
return _data.getFloat();
}
- protected double readDouble() throws JMSException
+ public double readDouble() throws TypedBytesFormatException, EOFException
{
int position = _data.position();
byte wireType = readWireType();
@@ -370,7 +367,7 @@ class TypedBytesContentReader implements TypedBytesCodes
break;
default:
_data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ throw new TypedBytesFormatException("Unable to convert " + wireType + " to a double");
}
return result;
}
@@ -381,12 +378,12 @@ class TypedBytesContentReader implements TypedBytesCodes
}
}
- double readDoubleImpl()
+ public double readDoubleImpl()
{
return _data.getDouble();
}
- protected String readString() throws JMSException
+ public String readString() throws EOFException, TypedBytesFormatException
{
int position = _data.position();
byte wireType = readWireType();
@@ -436,7 +433,7 @@ class TypedBytesContentReader implements TypedBytesCodes
break;
default:
_data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ throw new TypedBytesFormatException("Unable to convert " + wireType + " to a String");
}
return result;
}
@@ -447,7 +444,7 @@ class TypedBytesContentReader implements TypedBytesCodes
}
}
- protected String readStringImpl() throws JMSException
+ public String readStringImpl() throws TypedBytesFormatException
{
try
{
@@ -462,14 +459,13 @@ class TypedBytesContentReader implements TypedBytesCodes
}
catch (CharacterCodingException e)
{
- JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
- jmse.setLinkedException(e);
+ TypedBytesFormatException jmse = new TypedBytesFormatException("Error decoding byte stream as a UTF8 string: " + e);
jmse.initCause(e);
throw jmse;
}
}
- protected int readBytes(byte[] bytes) throws JMSException
+ public int readBytes(byte[] bytes) throws EOFException, TypedBytesFormatException
{
if (bytes == null)
{
@@ -484,7 +480,7 @@ class TypedBytesContentReader implements TypedBytesCodes
byte wireType = readWireType();
if (wireType != BYTEARRAY_TYPE)
{
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
+ throw new TypedBytesFormatException("Unable to convert " + wireType + " to a byte array");
}
checkAvailable(4);
int size = _data.getInt();
@@ -497,7 +493,7 @@ class TypedBytesContentReader implements TypedBytesCodes
{
if (size > _data.remaining())
{
- throw new MessageEOFException("Byte array has stated length "
+ throw new EOFException("Byte array has stated length "
+ size
+ " but message only contains "
+
@@ -540,7 +536,7 @@ class TypedBytesContentReader implements TypedBytesCodes
}
}
- protected Object readObject() throws JMSException
+ public Object readObject() throws EOFException, TypedBytesFormatException
{
int position = _data.position();
byte wireType = readWireType();
@@ -643,7 +639,7 @@ class TypedBytesContentReader implements TypedBytesCodes
_data.get(bytes, offset, count);
}
- public String readLengthPrefixedUTF() throws JMSException
+ public String readLengthPrefixedUTF() throws TypedBytesFormatException
{
try
{
@@ -665,8 +661,7 @@ class TypedBytesContentReader implements TypedBytesCodes
}
catch(CharacterCodingException e)
{
- JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
- jmse.setLinkedException(e);
+ TypedBytesFormatException jmse = new TypedBytesFormatException("Error decoding byte stream as a UTF8 string: " + e);
jmse.initCause(e);
throw jmse;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentWriter.java
index 7c91db3a32..c7ca2d7df7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesContentWriter.java
@@ -18,10 +18,8 @@
* under the License.
*
*/
-package org.apache.qpid.client.message;
+package org.apache.qpid.typedmessage;
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -31,13 +29,13 @@ import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
-class TypedBytesContentWriter implements TypedBytesCodes
+public class TypedBytesContentWriter implements TypedBytesCodes
{
private final ByteArrayOutputStream _baos = new ByteArrayOutputStream();
private final DataOutputStream _data = new DataOutputStream(_baos);
private static final Charset UTF8 = Charset.forName("UTF-8");
- protected void writeTypeDiscriminator(byte type) throws JMSException
+ protected void writeTypeDiscriminator(byte type)
{
try
{
@@ -49,21 +47,20 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
}
- private JMSException handle(final IOException e)
+ private RuntimeException handle(final IOException e)
{
- JMSException jmsEx = new JMSException("Unable to write value: " + e.getMessage());
- jmsEx.setLinkedException(e);
+ RuntimeException jmsEx = new RuntimeException("Unable to write value: " + e.getMessage());
return jmsEx;
}
- protected void writeBoolean(boolean b) throws JMSException
+ public void writeBoolean(boolean b)
{
writeTypeDiscriminator(BOOLEAN_TYPE);
writeBooleanImpl(b);
}
- public void writeBooleanImpl(final boolean b) throws JMSException
+ public void writeBooleanImpl(final boolean b)
{
try
{
@@ -75,13 +72,13 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
}
- protected void writeByte(byte b) throws JMSException
+ public void writeByte(byte b)
{
writeTypeDiscriminator(BYTE_TYPE);
writeByteImpl(b);
}
- public void writeByteImpl(final byte b) throws JMSException
+ public void writeByteImpl(final byte b)
{
try
{
@@ -93,13 +90,13 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
}
- protected void writeShort(short i) throws JMSException
+ public void writeShort(short i)
{
writeTypeDiscriminator(SHORT_TYPE);
writeShortImpl(i);
}
- public void writeShortImpl(final short i) throws JMSException
+ public void writeShortImpl(final short i)
{
try
{
@@ -111,13 +108,13 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
}
- protected void writeChar(char c) throws JMSException
+ public void writeChar(char c)
{
writeTypeDiscriminator(CHAR_TYPE);
writeCharImpl(c);
}
- public void writeCharImpl(final char c) throws JMSException
+ public void writeCharImpl(final char c)
{
try
{
@@ -129,13 +126,13 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
}
- protected void writeInt(int i) throws JMSException
+ public void writeInt(int i)
{
writeTypeDiscriminator(INT_TYPE);
writeIntImpl(i);
}
- protected void writeIntImpl(int i) throws JMSException
+ public void writeIntImpl(int i)
{
try
{
@@ -147,13 +144,13 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
}
- protected void writeLong(long l) throws JMSException
+ public void writeLong(long l)
{
writeTypeDiscriminator(LONG_TYPE);
writeLongImpl(l);
}
- public void writeLongImpl(final long l) throws JMSException
+ public void writeLongImpl(final long l)
{
try
{
@@ -165,13 +162,13 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
}
- protected void writeFloat(float v) throws JMSException
+ public void writeFloat(float v)
{
writeTypeDiscriminator(FLOAT_TYPE);
writeFloatImpl(v);
}
- public void writeFloatImpl(final float v) throws JMSException
+ public void writeFloatImpl(final float v)
{
try
{
@@ -183,13 +180,13 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
}
- protected void writeDouble(double v) throws JMSException
+ public void writeDouble(double v)
{
writeTypeDiscriminator(DOUBLE_TYPE);
writeDoubleImpl(v);
}
- public void writeDoubleImpl(final double v) throws JMSException
+ public void writeDoubleImpl(final double v)
{
try
{
@@ -201,7 +198,7 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
}
- protected void writeString(String string) throws JMSException
+ public void writeString(String string)
{
if (string == null)
{
@@ -214,8 +211,8 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
}
- protected void writeNullTerminatedStringImpl(String string)
- throws JMSException
+ public void writeNullTerminatedStringImpl(String string)
+
{
try
{
@@ -229,18 +226,18 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
- protected void writeBytes(byte[] bytes) throws JMSException
+ public void writeBytes(byte[] bytes)
{
writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
}
- protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException
+ public void writeBytes(byte[] bytes, int offset, int length)
{
writeTypeDiscriminator(BYTEARRAY_TYPE);
writeBytesImpl(bytes, offset, length);
}
- public void writeBytesImpl(final byte[] bytes, final int offset, final int length) throws JMSException
+ public void writeBytesImpl(final byte[] bytes, final int offset, final int length)
{
try
{
@@ -260,7 +257,7 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
}
- public void writeBytesRaw(final byte[] bytes, final int offset, final int length) throws JMSException
+ public void writeBytesRaw(final byte[] bytes, final int offset, final int length)
{
try
{
@@ -276,7 +273,7 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
- protected void writeObject(Object object) throws JMSException
+ public void writeObject(Object object) throws TypedBytesFormatException
{
Class clazz;
@@ -332,7 +329,7 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
else
{
- throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
+ throw new TypedBytesFormatException("Only primitives plus byte arrays and String are valid types");
}
}
@@ -341,7 +338,7 @@ class TypedBytesContentWriter implements TypedBytesCodes
return ByteBuffer.wrap(_baos.toByteArray());
}
- public void writeLengthPrefixedUTF(final String string) throws JMSException
+ public void writeLengthPrefixedUTF(final String string) throws TypedBytesFormatException
{
try
{
@@ -356,8 +353,7 @@ class TypedBytesContentWriter implements TypedBytesCodes
}
catch (CharacterCodingException e)
{
- JMSException jmse = new JMSException("Unable to encode string: " + e);
- jmse.setLinkedException(e);
+ TypedBytesFormatException jmse = new TypedBytesFormatException("Unable to encode string: " + e);
jmse.initCause(e);
throw jmse;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java
new file mode 100644
index 0000000000..95e7ea0acc
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/typedmessage/TypedBytesFormatException.java
@@ -0,0 +1,9 @@
+package org.apache.qpid.typedmessage;
+
+public class TypedBytesFormatException extends Exception
+{
+ public TypedBytesFormatException(String s)
+ {
+ super(s);
+ }
+}