summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-14 23:08:13 +0000
committerRobert Greig <rgreig@apache.org>2006-12-14 23:08:13 +0000
commit3f044de86e491ad7c64d3714e06ebfacc36e1a5d (patch)
tree11df085c693af20dd352c12be53b6bb7f6d941df
parentbda1d29e19486a9dd29291c5fdb7d9df4ed4f647 (diff)
downloadqpid-python-3f044de86e491ad7c64d3714e06ebfacc36e1a5d.tar.gz
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487383 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java177
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java34
2 files changed, 195 insertions, 16 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 8c53d93de6..705501363c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -24,11 +24,13 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
import javax.jms.*;
import java.io.UnsupportedEncodingException;
+import java.util.Enumeration;
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -229,9 +231,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
checkPreConditions();
checkInitialDestination();
+
+
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
+ sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive,
_mandatory, _immediate);
}
}
@@ -240,9 +244,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
{
checkPreConditions();
checkInitialDestination();
+
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
_mandatory, _immediate);
}
}
@@ -253,7 +258,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
_mandatory, immediate);
}
}
@@ -265,7 +270,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory,
+ sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory,
_immediate);
}
}
@@ -277,7 +282,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
+ sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
_mandatory, _immediate);
}
}
@@ -291,7 +296,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
_mandatory, _immediate);
}
}
@@ -305,7 +310,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
mandatory, _immediate);
}
}
@@ -319,7 +324,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
mandatory, immediate);
}
}
@@ -334,11 +339,146 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
mandatory, immediate, waitUntilSent);
}
}
+
+ private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
+ {
+ if(message instanceof AbstractJMSMessage)
+ {
+ return (AbstractJMSMessage) message;
+ }
+ else
+ {
+ AbstractJMSMessage newMessage;
+
+ if(message instanceof BytesMessage)
+ {
+ BytesMessage bytesMessage = (BytesMessage) message;
+ bytesMessage.reset();
+
+ JMSBytesMessage nativeMsg = (JMSBytesMessage) _session.createBytesMessage();
+
+
+
+ byte[] buf = new byte[1024];
+
+ int len;
+
+ while((len = bytesMessage.readBytes(buf)) != -1)
+ {
+ nativeMsg.writeBytes(buf,0,len);
+ }
+
+ newMessage = nativeMsg;
+ }
+ else if(message instanceof MapMessage)
+ {
+ MapMessage origMessage = (MapMessage) message;
+ MapMessage nativeMessage = _session.createMapMessage();
+
+ Enumeration mapNames = origMessage.getMapNames();
+ while(mapNames.hasMoreElements())
+ {
+ String name = (String) mapNames.nextElement();
+ nativeMessage.setObject(name, origMessage.getObject(name));
+ }
+ newMessage = (AbstractJMSMessage) nativeMessage;
+ }
+ else if(message instanceof ObjectMessage)
+ {
+ ObjectMessage origMessage = (ObjectMessage) message;
+ ObjectMessage nativeMessage = _session.createObjectMessage();
+
+ nativeMessage.setObject(origMessage.getObject());
+
+ newMessage = (AbstractJMSMessage) nativeMessage;
+ }
+ else if(message instanceof TextMessage)
+ {
+ TextMessage origMessage = (TextMessage) message;
+ TextMessage nativeMessage = _session.createTextMessage();
+
+ nativeMessage.setText(origMessage.getText());
+
+ newMessage = (AbstractJMSMessage) nativeMessage;
+ }
+ else if(message instanceof StreamMessage)
+ {
+ StreamMessage origMessage = (StreamMessage) message;
+ StreamMessage nativeMessage = _session.createStreamMessage();
+
+
+ try
+ {
+ origMessage.reset();
+ while(true)
+ {
+ nativeMessage.writeObject(origMessage.readObject());
+ }
+ }
+ catch (MessageEOFException e)
+ {
+ ;//
+ }
+ newMessage = (AbstractJMSMessage) nativeMessage;
+ }
+ else
+ {
+ newMessage = (AbstractJMSMessage) _session.createMessage();
+
+ }
+
+ Enumeration propertyNames = message.getPropertyNames();
+ while(propertyNames.hasMoreElements())
+ {
+ String propertyName = String.valueOf(propertyNames.nextElement());
+ if(!propertyName.startsWith("JMSX_"))
+ {
+ Object value = message.getObjectProperty(propertyName);
+ newMessage.setObjectProperty(propertyName, value);
+ }
+ }
+
+ newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
+
+
+ int priority = message.getJMSPriority();
+ if(priority < 0)
+ {
+ priority = 0;
+ }
+ else if(priority > 9)
+ {
+ priority = 9;
+ }
+
+ newMessage.setJMSPriority(priority);
+ if(message.getJMSReplyTo() != null)
+ {
+ newMessage.setJMSReplyTo(message.getJMSReplyTo());
+ }
+ newMessage.setJMSType(message.getJMSType());
+
+
+
+
+ if(newMessage != null)
+ {
+ return newMessage;
+ }
+ else
+ {
+ throw new JMSException("Unable to send message, due to class conversion error: " + message.getClass().getName());
+ }
+ }
+ }
+
+
+
private void validateDestination(Destination destination) throws JMSException
{
if (!(destination instanceof AMQDestination))
@@ -349,7 +489,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
declareDestination((AMQDestination)destination);
}
- protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
+ protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority,
long timeToLive, boolean mandatory, boolean immediate) throws JMSException
{
sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
@@ -358,7 +498,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
/**
* The caller of this method must hold the failover mutex.
* @param destination
- * @param message
+ * @param origMessage
* @param deliveryMode
* @param priority
* @param timeToLive
@@ -366,9 +506,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
* @param immediate
* @throws JMSException
*/
- protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
+ protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority,
long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
+
+ AbstractJMSMessage message = convertToNativeMessage(origMessage);
AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
destination.getRoutingKey(), mandatory, immediate);
@@ -424,6 +566,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
_protocolHandler.writeFrame(compositeFrame, wait);
+
+
+ if(message != origMessage)
+ {
+ _logger.warn("Updating original message");
+ origMessage.setJMSPriority(message.getJMSPriority());
+ origMessage.setJMSTimestamp(message.getJMSTimestamp());
+ _logger.warn("Setting JMSExpiration:" + message.getJMSExpiration());
+ origMessage.setJMSExpiration(message.getJMSExpiration());
+ origMessage.setJMSMessageID(message.getJMSMessageID());
+ }
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index 456d4d520c..debabfd559 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -30,6 +30,9 @@ import javax.jms.MessageFormatException;
import javax.jms.MessageEOFException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CharsetDecoder;
+import java.nio.CharBuffer;
public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
{
@@ -149,10 +152,27 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
checkReadable();
// we check only for one byte since theoretically the string could be only a
// single byte when using UTF-8 encoding
- checkAvailable(1);
+
try
{
- return _data.getString(Charset.forName("UTF-8").newDecoder());
+ short length = readShort();
+ if(length == 0)
+ {
+ return "";
+ }
+ else
+ {
+ CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
+ ByteBuffer encodedString = _data.slice();
+ encodedString.limit(length);
+ _data.position(_data.position()+length);
+ CharBuffer string = decoder.decode(encodedString.buf());
+
+ return string.toString();
+ }
+
+
+
}
catch (CharacterCodingException e)
{
@@ -257,9 +277,15 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag
checkWritable();
try
{
- _data.putString(string, Charset.forName("UTF-8").newEncoder());
+ CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
+ java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string));
+
+ _data.putShort((short)encodedString.limit());
+ _data.put(encodedString);
+
+ //_data.putString(string, Charset.forName("UTF-8").newEncoder());
// we must add the null terminator manually
- _data.put((byte)0);
+ //_data.put((byte)0);
}
catch (CharacterCodingException e)
{