summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-06-16 23:28:22 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-06-16 23:28:22 +0000
commit1bb980e0918447a629ce71ce834266e71e8d3759 (patch)
tree7200b5d7f6d0798d59b726c8bbf61f37707abe91
parentc6e1cac6277f818f096eff2b152055b82a6182ca (diff)
downloadqpid-python-1bb980e0918447a629ce71ce834266e71e8d3759.tar.gz
QPId-4027 Added an example that demonstrates sending/receiving a
String, Map and List message. Fixed several bugs. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1351025 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppMessageFactory.java7
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java4
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java51
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractMessageFactory.java25
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java5
5 files changed, 67 insertions, 25 deletions
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppMessageFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppMessageFactory.java
index 9bd8724014..b0ecb98c1a 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppMessageFactory.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppMessageFactory.java
@@ -215,11 +215,6 @@ public class CppMessageFactory extends MessageFactory_AMQP_0_10
_cppMessage.setProperty(key, value);
}
- protected NativeMessage getCppMessage()
- {
- return _cppMessage;
- }
-
@Override
public String toString()
{
@@ -232,7 +227,7 @@ public class CppMessageFactory extends MessageFactory_AMQP_0_10
return null; // The delegate is only for the headers
}
- public NativeMessage getNativeMessage()
+ NativeMessage getNativeMessage()
{
return _cppMessage;
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
index 9054b92e07..1b2604a3c0 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
@@ -21,6 +21,7 @@ import org.apache.qpid.messaging.Message;
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Sender;
import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.cpp.CppMessageFactory.CppMessageDelegate;
import org.apache.qpid.messaging.cpp.jni.NativeMessage;
import org.apache.qpid.messaging.cpp.jni.NativeSender;
import org.apache.qpid.messaging.ext.MessageInternal;
@@ -51,7 +52,8 @@ public class CppSender implements Sender
(_msgFactory.getClass() == ((MessageInternal)m).getMessageFactoryClass())
)
{
- NativeMessage msg = (NativeMessage)((MessageInternal)m).getFactorySpecificMessageDelegate();
+ CppMessageDelegate delegate = (CppMessageDelegate)((MessageInternal)m).getFactorySpecificMessageDelegate();
+ NativeMessage msg = delegate.getNativeMessage();
msg.setContentAsByteBuffer(m.getContent());
return msg;
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
index ff9f35638c..7aeb9ef400 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
@@ -17,40 +17,43 @@
*/
package org.apache.qpid.messaging.cpp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.qpid.messaging.Connection;
import org.apache.qpid.messaging.ConnectionFactory;
+import org.apache.qpid.messaging.ListMessage;
+import org.apache.qpid.messaging.MapMessage;
import org.apache.qpid.messaging.Message;
import org.apache.qpid.messaging.Receiver;
import org.apache.qpid.messaging.Session;
import org.apache.qpid.messaging.Sender;
+import org.apache.qpid.messaging.StringMessage;
public class CppTest
{
public static void main(String[] args) throws Exception
{
- /*Connection con = ConnectionFactory.get().createConnection("localhost:5672");
+ Connection con = ConnectionFactory.get().createConnection("localhost:5672");
con.open();
- Session ssn = con.createSession("hello");
- System.out.println("Got a session object " + ssn);
-
+ Session ssn = con.createSession(null);
Sender sender = ssn.createSender("amq.topic/test");
- System.out.println("Got a Sender object " + sender);
-
Receiver receiver = ssn.createReceiver("amq.topic/test");
- System.out.println("Got a Receiver object " + receiver);
- Message msg = new TextMessage("Hello World");
+ System.out.println("======= Text Message with Message Properties ========");
+
+ Message msg = con.getMessageFactory().createMessage("Hello World");
msg.setProperty("color", "blue");
msg.setProperty("price", 5);
msg.setProperty("boolean", true);
sender.send(msg, false);
- TextMessage m = (TextMessage) receiver.fetch(0);
- System.out.println("Received message " + m + " with content type : " + m.getContentType() + " and content : " + m.getContent());
+ StringMessage stringMsg = (StringMessage) receiver.fetch(0);
+ System.out.println("Received message " + stringMsg + " with content type : " + stringMsg.getContentType() + " and content : " + stringMsg.getString());
- Map<String,Object> props = m.getProperties();
+ Map<String,Object> props = stringMsg.getProperties();
System.out.println("Props size : " + props.size());
System.out.println("Props empty : " + props.isEmpty());
System.out.println("Contains key 'color' : " + props.containsKey("color"));
@@ -58,13 +61,33 @@ public class CppTest
{
System.out.println("Key=" + key + ", value=" + props.get(key));
}
-
System.out.println("Unspecified property : " + props.get("Unspecified-Prop"));
- System.out.println("Msg toString() : " + m);
+ System.out.println("================= Map Message =================");
+ Map<String,Object> myMap = new HashMap<String,Object>();
+ myMap.put("k1", 1);
+ myMap.put("k2", 2);
+
+ msg = con.getMessageFactory().createMessage(myMap);
+ sender.send(msg, false);
+ MapMessage mapMsg = (MapMessage) receiver.fetch(0);
+ System.out.println("Received message " + mapMsg + " with content type : " + mapMsg.getContentType() + " and content : " + mapMsg.getMap());
+
+ System.out.println("================= List Message =================");
+ List<Object> myList = new ArrayList<Object>();
+ myList.add("Red");
+ myList.add("Green");
+ myList.add("Blue");
+
+ msg = con.getMessageFactory().createMessage(myList);
+ sender.send(msg, false);
+ ListMessage listMsg = (ListMessage) receiver.fetch(0);
+ System.out.println("Received message " + listMsg + " with content type : " + listMsg.getContentType() + " and content : " + listMsg.getList());
+ sender.close();
+ receiver.close();
ssn.close();
- con.close();*/
+ con.close();
}
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractMessageFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractMessageFactory.java
index 91009dc8b6..2a668e312d 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractMessageFactory.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractMessageFactory.java
@@ -291,6 +291,12 @@ public abstract class AbstractMessageFactory implements MessageFactory
{
return super.getDelegate();
}
+
+ @Override
+ public String toString()
+ {
+ return super.getDelegate().toString();
+ }
}
protected class StringMessageImpl extends DefaultMessageImpl implements StringMessage
@@ -321,6 +327,7 @@ public abstract class AbstractMessageFactory implements MessageFactory
if(str != null && !str.isEmpty())
{
_rawData = encodeString(str);
+ setContentTypeIfNotSet(delegate,"text/plain");
}
}
@@ -366,6 +373,7 @@ public abstract class AbstractMessageFactory implements MessageFactory
if(map != null && !map.isEmpty())
{
_rawData = encodeMap(map);
+ setContentTypeIfNotSet(delegate,"amqp/map");
}
}
@@ -412,6 +420,7 @@ public abstract class AbstractMessageFactory implements MessageFactory
if(list != null && !list.isEmpty())
{
_rawData = encodeList(list);
+ setContentTypeIfNotSet(delegate,"amqp/list");
}
}
@@ -451,7 +460,6 @@ public abstract class AbstractMessageFactory implements MessageFactory
try
{
b = encoder.encode(CharBuffer.wrap(str));
- b.flip();
}
catch (CharacterCodingException e)
{
@@ -471,4 +479,19 @@ public abstract class AbstractMessageFactory implements MessageFactory
return b;
}
}
+
+ protected void setContentTypeIfNotSet(Message m, String contentType)
+ {
+ try
+ {
+ if (m.getContentType() == null || m.getContentType().isEmpty())
+ {
+ m.setContentType(contentType);
+ }
+ }
+ catch (MessagingException e)
+ {
+ //ignore.
+ }
+ }
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
index cf524bdddb..4f84d865df 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
@@ -79,11 +79,11 @@ public class SessionManagementDecorator implements SessionExt
{
private static Logger _logger = LoggerFactory.getLogger(SessionManagementDecorator.class);
- public enum SessionState { UNDEFINED, OPENED, CLOSED, ERROR}
+ public enum SessionState {OPENED, CLOSED, ERROR}
private ConnectionExt _conn;
private Session _delegate;
- SessionState _state = SessionState.UNDEFINED;
+ SessionState _state = SessionState.OPENED;
private List<ReceiverExt> _receivers = new ArrayList<ReceiverExt>();
private List<SenderExt> _senders = new ArrayList<SenderExt>();
private final Object _connectionLock; // global per connection lock
@@ -452,7 +452,6 @@ public class SessionManagementDecorator implements SessionExt
switch (_state)
{
case ERROR:
- case UNDEFINED:
throw new SessionException("Session is in a temporary error state. The session may or may not recover from this");
case CLOSED:
throw new SessionException(closedMessage);