From 3f044de86e491ad7c64d3714e06ebfacc36e1a5d Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Thu, 14 Dec 2006 23:08:13 +0000 Subject: git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487383 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/client/BasicMessageProducer.java | 177 +++++++++++++++++++-- .../qpid/client/message/JMSBytesMessage.java | 34 +++- 2 files changed, 195 insertions(+), 16 deletions(-) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 8c53d93de6..705501363c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -24,11 +24,13 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; import javax.jms.*; import java.io.UnsupportedEncodingException; +import java.util.Enumeration; public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { @@ -229,9 +231,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { checkPreConditions(); checkInitialDestination(); + + synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive, + sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); } } @@ -240,9 +244,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j { checkPreConditions(); checkInitialDestination(); + synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, + sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); } } @@ -253,7 +258,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, + sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate); } } @@ -265,7 +270,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { - sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory, + sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate); } } @@ -277,7 +282,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive, + sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); } } @@ -291,7 +296,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive, + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate); } } @@ -305,7 +310,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive, + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate); } } @@ -319,7 +324,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive, + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate); } } @@ -334,11 +339,146 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive, + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, waitUntilSent); } } + + private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException + { + if(message instanceof AbstractJMSMessage) + { + return (AbstractJMSMessage) message; + } + else + { + AbstractJMSMessage newMessage; + + if(message instanceof BytesMessage) + { + BytesMessage bytesMessage = (BytesMessage) message; + bytesMessage.reset(); + + JMSBytesMessage nativeMsg = (JMSBytesMessage) _session.createBytesMessage(); + + + + byte[] buf = new byte[1024]; + + int len; + + while((len = bytesMessage.readBytes(buf)) != -1) + { + nativeMsg.writeBytes(buf,0,len); + } + + newMessage = nativeMsg; + } + else if(message instanceof MapMessage) + { + MapMessage origMessage = (MapMessage) message; + MapMessage nativeMessage = _session.createMapMessage(); + + Enumeration mapNames = origMessage.getMapNames(); + while(mapNames.hasMoreElements()) + { + String name = (String) mapNames.nextElement(); + nativeMessage.setObject(name, origMessage.getObject(name)); + } + newMessage = (AbstractJMSMessage) nativeMessage; + } + else if(message instanceof ObjectMessage) + { + ObjectMessage origMessage = (ObjectMessage) message; + ObjectMessage nativeMessage = _session.createObjectMessage(); + + nativeMessage.setObject(origMessage.getObject()); + + newMessage = (AbstractJMSMessage) nativeMessage; + } + else if(message instanceof TextMessage) + { + TextMessage origMessage = (TextMessage) message; + TextMessage nativeMessage = _session.createTextMessage(); + + nativeMessage.setText(origMessage.getText()); + + newMessage = (AbstractJMSMessage) nativeMessage; + } + else if(message instanceof StreamMessage) + { + StreamMessage origMessage = (StreamMessage) message; + StreamMessage nativeMessage = _session.createStreamMessage(); + + + try + { + origMessage.reset(); + while(true) + { + nativeMessage.writeObject(origMessage.readObject()); + } + } + catch (MessageEOFException e) + { + ;// + } + newMessage = (AbstractJMSMessage) nativeMessage; + } + else + { + newMessage = (AbstractJMSMessage) _session.createMessage(); + + } + + Enumeration propertyNames = message.getPropertyNames(); + while(propertyNames.hasMoreElements()) + { + String propertyName = String.valueOf(propertyNames.nextElement()); + if(!propertyName.startsWith("JMSX_")) + { + Object value = message.getObjectProperty(propertyName); + newMessage.setObjectProperty(propertyName, value); + } + } + + newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode()); + + + int priority = message.getJMSPriority(); + if(priority < 0) + { + priority = 0; + } + else if(priority > 9) + { + priority = 9; + } + + newMessage.setJMSPriority(priority); + if(message.getJMSReplyTo() != null) + { + newMessage.setJMSReplyTo(message.getJMSReplyTo()); + } + newMessage.setJMSType(message.getJMSType()); + + + + + if(newMessage != null) + { + return newMessage; + } + else + { + throw new JMSException("Unable to send message, due to class conversion error: " + message.getClass().getName()); + } + } + } + + + private void validateDestination(Destination destination) throws JMSException { if (!(destination instanceof AMQDestination)) @@ -349,7 +489,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j declareDestination((AMQDestination)destination); } - protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, + protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent); @@ -358,7 +498,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j /** * The caller of this method must hold the failover mutex. * @param destination - * @param message + * @param origMessage * @param deliveryMode * @param priority * @param timeToLive @@ -366,9 +506,11 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j * @param immediate * @throws JMSException */ - protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, + protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException { + + AbstractJMSMessage message = convertToNativeMessage(origMessage); AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), destination.getRoutingKey(), mandatory, immediate); @@ -424,6 +566,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j frames[1] = contentHeaderFrame; CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); _protocolHandler.writeFrame(compositeFrame, wait); + + + if(message != origMessage) + { + _logger.warn("Updating original message"); + origMessage.setJMSPriority(message.getJMSPriority()); + origMessage.setJMSTimestamp(message.getJMSTimestamp()); + _logger.warn("Setting JMSExpiration:" + message.getJMSExpiration()); + origMessage.setJMSExpiration(message.getJMSExpiration()); + origMessage.setJMSMessageID(message.getJMSMessageID()); + } } /** diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index 456d4d520c..debabfd559 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -30,6 +30,9 @@ import javax.jms.MessageFormatException; import javax.jms.MessageEOFException; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CharsetDecoder; +import java.nio.CharBuffer; public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage { @@ -149,10 +152,27 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag checkReadable(); // we check only for one byte since theoretically the string could be only a // single byte when using UTF-8 encoding - checkAvailable(1); + try { - return _data.getString(Charset.forName("UTF-8").newDecoder()); + short length = readShort(); + if(length == 0) + { + return ""; + } + else + { + CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); + ByteBuffer encodedString = _data.slice(); + encodedString.limit(length); + _data.position(_data.position()+length); + CharBuffer string = decoder.decode(encodedString.buf()); + + return string.toString(); + } + + + } catch (CharacterCodingException e) { @@ -257,9 +277,15 @@ public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessag checkWritable(); try { - _data.putString(string, Charset.forName("UTF-8").newEncoder()); + CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder(); + java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string)); + + _data.putShort((short)encodedString.limit()); + _data.put(encodedString); + + //_data.putString(string, Charset.forName("UTF-8").newEncoder()); // we must add the null terminator manually - _data.put((byte)0); + //_data.put((byte)0); } catch (CharacterCodingException e) { -- cgit v1.2.1