diff options
author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-06 13:54:05 +0000 |
---|---|---|
committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-06 13:54:05 +0000 |
commit | 939bd95d763c7bb644a5a7f37a9c9721e1d8ed3b (patch) | |
tree | e420c668927e9dc77bc39879342fc66487b65b42 | |
parent | 37490eeb4c979c1831b06febf32f830b993e576d (diff) | |
download | qpid-python-939bd95d763c7bb644a5a7f37a9c9721e1d8ed3b.tar.gz |
moved from nclient
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563134 13f79535-47bb-0310-9956-ffa450edef68
54 files changed, 10413 insertions, 0 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/Connection.java b/java/client/src/main/java/org/apache/qpidity/Connection.java new file mode 100644 index 0000000000..cb56ee954c --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/Connection.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity; + + +import java.net.URL; + +import org.apache.qpidity.QpidException; + +/** + * This represents a physical connection to a broker. + */ +public interface Connection +{ + /** + * Establish the connection with the broker identified by the provided URL. + * + * @param url The URL of the broker. + * @throws QpidException If the communication layer fails to connect with the broker. + */ + public void connect(URL url) throws QpidException; + + /** + * Close this connection. + * + * @throws QpidException if the communication layer fails to close the connection. + */ + public void close() throws QpidException; + + + /** + * Create a session for this connection. + * <p> The retuned session is suspended + * (i.e. this session is not attached with an underlying channel) + * + * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. + * @return A Newly created (suspended) session. + * @throws QpidException If the connection fails to create a session due to some internal error. + */ + public Session createSession(int expiryInSeconds) throws QpidException; + + /** + * Create a DtxSession for this connection. + * <p> A Dtx Session must be used when resources have to be manipulated as + * part of a global transaction. + * <p> The retuned DtxSession is suspended + * (i.e. this session is not attached with an underlying channel) + * + * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire. + * @return A Newly created (suspended) DtxSession. + * @throws QpidException If the connection fails to create a DtxSession due to some internal error. + */ + public DtxSession createDTXSession(int expiryInSeconds) throws QpidException; + + /** + * If the communication layer detects a serious problem with a connection, it + * informs the connection's ExceptionListener + * + * @param exceptionListner The execptionListener + */ + public void setExceptionListener(ExceptionListener exceptionListner); +} diff --git a/java/client/src/main/java/org/apache/qpidity/DtxSession.java b/java/client/src/main/java/org/apache/qpidity/DtxSession.java new file mode 100644 index 0000000000..045f52541d --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/DtxSession.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity; + +import org.apache.qpidity.QpidException; + +/** + * This session�s resources are control under the scope of a distributed transaction. + */ +public interface DtxSession extends Session +{ + + /** + * Get the XA resource associated with this session. + * + * @return this session XA resource. + * @throws QpidException If the session fails to retrieve its associated XA resource + * due to some error. + */ + public javax.transaction.xa.XAResource getDTXResource() throws QpidException; +} diff --git a/java/client/src/main/java/org/apache/qpidity/ExceptionListener.java b/java/client/src/main/java/org/apache/qpidity/ExceptionListener.java new file mode 100644 index 0000000000..e3ca8989ef --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/ExceptionListener.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity; + +import org.apache.qpidity.QpidException; + +/** + * If the communication layer detects a serious problem with a <CODE>connection</CODE>, it + * informs the connection's ExceptionListener + */ +public interface ExceptionListener +{ + /** + * If the communication layer detects a serious problem with a connection, it + * informs the connection's ExceptionListener + * + * @param exception The exception comming from the communication layer. + * @see Connection + */ + public void onException(QpidException exception); +}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpidity/FieldTable.java b/java/client/src/main/java/org/apache/qpidity/FieldTable.java new file mode 100644 index 0000000000..f752bb3373 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/FieldTable.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity; + +/** + * + */ +public interface FieldTable +{ +} diff --git a/java/client/src/main/java/org/apache/qpidity/MessageListener.java b/java/client/src/main/java/org/apache/qpidity/MessageListener.java new file mode 100644 index 0000000000..5a1a526f0b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/MessageListener.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity; + +import org.apache.qpidity.api.Message; + +/** + *A message listener + */ +public interface MessageListener +{ + /** + * Process an incoming message. + * + * @param message The incoming message. + */ + public void onMessage(Message message); +} diff --git a/java/client/src/main/java/org/apache/qpidity/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/MessagePartListener.java new file mode 100644 index 0000000000..b370efff17 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/MessagePartListener.java @@ -0,0 +1,55 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity; + +import org.apache.qpidity.Header; + +/** + * Assembles message parts. + * <p> The sequence of event for transferring a message is as follows: + * <ul> + * <li> messageHeaders + * <li> n calls to addData + * <li> messageReceived + * </ul> + * This is up to the implementation to assembled the message when the different parts + * are transferred. + */ +public interface MessagePartListener +{ + /** + * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} + * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received. + * + * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code> + */ + public void messageHeaders(Header... headers); + + /** + * Add the following byte array to the content of the message being received + * + * @param data Data to be added or streamed. + */ + public void addData(byte[] data); + + /** + * Indicates that the message has been fully received. + */ + public void messageReceived(); + +} diff --git a/java/client/src/main/java/org/apache/qpidity/Session.java b/java/client/src/main/java/org/apache/qpidity/Session.java new file mode 100644 index 0000000000..6f283cf203 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/Session.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity; + +import java.util.Map; + +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; +import org.apache.qpidity.Header; +import org.apache.qpidity.Range; +import org.apache.qpidity.api.Message; + +/** + * <p>A session is associated with a connection. + * When created a Session is not attached with an underlying channel. + * Session is single threaded </p> + */ +public interface Session +{ + public static final short ACQUIRE_MODE_NO_ACQUIRE = 0; + public static final short ACQUIRE_MODE_PRE_ACQUIRE = 1; + public static final short CONFIRM_MODE_REQUIRED = 1; + public static final short CONFIRM_MODE_NOT_REQUIRED = 0; + public static final short MESSAGE_FLOW_MODE_CREDIT = 0; + public static final short MESSAGE_FLOW_MODE_WINDOW = 1; + public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0; + public static final short MESSAGE_FLOW_UNIT_BYTE = 1; + + //------------------------------------------------------ + // Session housekeeping methods + //------------------------------------------------------ + /** + * Close this session and any associated resources. + * + * @throws QpidException If the communication layer fails to close this session or if an internal error happens + * when closing this session resources. . + */ + public void close() throws QpidException; + + /** + * Suspend this session resulting in interrupting the traffic with the broker. + * <p> The session timer will start to tick in suspend. + * <p> When a session is suspend any operation of this session and of the associated resources are unavailable. + * + * @throws QpidException If the communication layer fails to suspend this session + */ + public void suspend() throws QpidException; + + /** + * This will resume an existing session + * <p> Upon resume the session is attached with an underlying channel + * hence making operation on this session available. + * + * @throws QpidException If the communication layer fails to execute this properly + */ + public void resume() throws QpidException; + + //------------------------------------------------------ + // Messaging methods + // Producer + //------------------------------------------------------ + /** + * Transfer the given message to a specified exchange. + * + * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred + * <p/> + * <li> on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or + * explicit as in no-acquire mode) is not considered transferred until the original + * transfer is complete (signaled via execution.complete) + * </ul> + * @param acquireMode <ul> <li> no-acquire (0): the message must be explicitly acquired + * <p/> + * <li> pre-acquire (1): the message is acquired when the transfer starts + * </ul> + * @param exchange The exchange the message is being sent. + * @param msg The Message to be sent + * @throws QpidException If the session fails to send the message due to some error + */ + public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException; + + /** + * Declare the beginning of a message transfer operation. This operation must + * be followed by {@link Session#addMessageHeaders} then followed by any number of {@link Session#addData}. + * The transfer is ended by endData. + * <p> This way of transferring messages is useful when streaming large messages + * <p> In the interval [messageTransfer endData] any attempt to call a method other than + * {@link Session#addMessageHeaders}, {@link Session#endData} ore {@link Session#close} + * will result in an exception being thrown. + * + * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred + * <p/> + * <li> on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or + * explicit as in no-acquire mode) is not considered transferred until the original + * transfer is complete (signaled via execution.complete) + * </ul> + * @param acquireMode <ul> <li> no-acquire (0): the message must be explicitly acquired + * <p/> + * <li> pre-acquire (1): the message is acquired when the transfer starts + * </ul> + * @param exchange The exchange the message is being sent. + * @throws QpidException If the session fails to send the message due to some error. + */ + public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException; + + /** + * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} + * or to the message being sent. + * + * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code> + * @throws QpidException If the session fails to execute the method due to some error + * @see org.apache.qpidity.DeliveryProperties + */ + public void addMessageHeaders(Header... headers) throws QpidException; + + /** + * Add the following byte array to the content of the message being sent. + * + * @param data Data to be added. + * @param off Offset from which to start reading data + * @param len Number of bytes to be read + * @throws QpidException If the session fails to execute the method due to some error + */ + public void addData(byte[] data, int off, int len) throws QpidException; + + /** + * Signals the end of data for the message. + * + * @throws QpidException If the session fails to execute the method due to some error + */ + public void endData() throws QpidException; + + //------------------------------------------------------ + // Messaging methods + // Consumer + //------------------------------------------------------ + + /** + * Associate a message listener with a destination. + * <p> The destination is bound to a queue and messages are filtered based + * on the provider filter map (message filtering is specific to the provider and may not be handled). + * <p/> + * <p> Following are the valid options + * <ul> + * <li> NO_LOCAL + * <li> EXCLUSIVE + * </ul> + * <p> In the absence of a particular option, defaul values are: + * <ul> + * <li> NO_LOCAL = false + * <li> EXCLUSIVE = false + * </ul> + * + * @param queue The queue this receiver is receiving messages from. + * @param destination The destination for the subscriber ,a.k.a the delivery tag. + * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred + * <p/> + * <li> on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or + * explicit as in no-acquire mode) is not considered transferred until the original + * transfer is complete (signaled via execution.complete) + * </ul> + * @param acquireMode <ul> <li> no-acquire (0): the message must be explicitly acquired + * <p/> + * <li> pre-acquire (1): the message is acquired when the transfer starts + * </ul> + * @param listener The listener for this destination. When big message are transfered then + * it is recommended to use a {@link MessagePartListener}. + * @param options Set of Options. + * @param filter A set of filters for the subscription. The syntax and semantics of these filters depends + * on the providers implementation. + * @throws QpidException If the session fails to create the receiver due to some error. + */ + public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, + MessagePartListener listener, Map<String, ?> filter, Option... options) + throws QpidException; + + /** + * This method cancels a consumer. This does not affect already delivered messages, but it does + * mean the server will not send any more messages for that destination. The client may receive an + * arbitrary number of messages in between sending the cancel method and receiving the + * notification of completion of the cancel command. + * + * @param destination The destination for the subscriber used at subscription + * @throws QpidException If cancelling the subscription fails due to some error. + */ + public void messageCancel(String destination) throws QpidException; + + /** + * Associate a message part listener with a destination. + * We currently allow one listerner per destination this means + * that the previous message listener is replaced. This is done gracefully i.e. the message + * listener is replaced once it return from the processing of a message. + * + * @param destination The destination the listener is associated with. + * @param listener The new listener for this destination. + */ + public void setMessageListener(String destination, MessagePartListener listener); + + /** + * Sets the mode of flow control used for a given destination. + * <p/> + * With credit based flow control, the broker continually maintains its current + * credit balance with the recipient. The credit balance consists of two values, a message + * count, and a byte count. Whenever message data is sent, both counts must be decremented. + * If either value reaches zero, the flow of message data must stop. Additional credit is + * received via the {@link Session#messageFlow} method. + * <p/> + * Window based flow control is identical to credit based flow control, however message + * acknowledgment implicitly grants a single unit of message credit, and the size of the + * message in byte credits for each acknowledged message. + * + * @param destination The destination to set the flow mode on. + * @param mode <ul> <li>credit (0): choose credit based flow control + * <li> window (1): choose window based flow control</ul> + * @throws QpidException If setting the flow mode fails due to some error. + */ + public void messageFlowMode(String destination, short mode) throws QpidException; + + + /** + * This method controls the flow of message data to a given destination. It is used by the + * recipient of messages to dynamically match the incoming rate of message flow to its + * processing or forwarding capacity. Upon receipt of this method, the sender must add "value" + * number of the specified unit to the available credit balance for the specified destination. + * A value of 0 indicates an infinite amount of credit. This disables any limit for + * the given unit until the credit balance is zeroed with {@link Session#messageStop} + * or {@link Session#messageFlush}. + * + * @param destination The destination to set the flow. + * @param unit Specifies the unit of credit balance. + * <p/> + * One of: <ul> + * <li> message (0) + * <li> byte (1) + * </ul> + * @param value Number of credits, a value of 0 indicates an infinite amount of credit. + * @throws QpidException If setting the flow fails due to some error. + */ + public void messageFlow(String destination, short unit, long value) throws QpidException; + + /** + * Forces the broker to exhaust its credit supply. + * <p> The broker's credit will always be zero when + * this method completes. This method does not complete until all the message transfers occur. + * <p> This method returns true if messages have been flushed + * (i.e. the queue was not empty and the credit greater then zero). + * It returns false if the queue was empty. + * + * @param destination The destination to call flush on. + * @return True is messages were flushed, false otherwise. + * @throws QpidException If flushing fails due to some error. + */ + public boolean messageFlush(String destination) throws QpidException; + + /** + * On receipt of this method, the brokers MUST set his credit to zero for the given + * destination. This obeys the generic semantics of command completion, i.e. when confirmation + * is issued credit MUST be zero and no further messages will be sent until such a time as + * further credit is received. + * + * @param destination The destination to stop. + * @throws QpidException If stopping fails due to some error. + */ + public void messageStop(String destination) throws QpidException; + + /** + * Acknowledge the receipt of ranges of messages. + * <p>Message must have been previously acquired either by receiving them in + * pre-acquire mode or by explicitly acquiring them. + * + * @param range Range of acknowledged messages. + * @throws QpidException If the acknowledgement of the messages fails due to some error. + */ + public void messageAcknowledge(Range<Long>... range) throws QpidException; + + /** + * Reject ranges of acquired messages. + * <p> A rejected message will not be delivered to any receiver + * and may be either discarded or moved to the broker dead letter queue. + * + * @param range Range of rejected messages. + * @throws QpidException If those messages cannot be rejected dus to some error + */ + public void messageReject(Range<Long>... range) throws QpidException; + + /** + * Try to acquire ranges of messages hence releasing them form the queue. + * This means that once acknowledged, a message will not be delivered to any other receiver. + * <p> As those messages may have been consumed by another receivers hence, + * message acquisition can fail. + * The outcome of the acquisition is returned as an array of ranges of qcquired messages. + * <p> This method should only be called on non-acquired messages. + * + * @param range Ranges of messages to be acquired. + * @return Ranges of explicitly acquired messages. + * @throws QpidException If this message cannot be acquired dus to some error + */ + public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException; + + /** + * Give up responsibility for processing ranges of messages. + * <p> Released messages are re-enqueued. + * + * @param range Ranges of messages to be released. + * @throws QpidException If this message cannot be released dus to some error. + */ + public void messageRelease(Range<Long>... range) throws QpidException; + + // ----------------------------------------------- + // Local transaction methods + // ---------------------------------------------- + /** + * Selects the session for local transaction support. + * + * @throws QpidException If selecting this session for local transaction support fails due to some error. + */ + public void txSelect() throws QpidException; + + /** + * Commit the receipt and the delivery of all messages exchanged by this session resources. + * + * @throws QpidException If the session fails to commit due to some error. + * @throws IllegalStateException If this session is not transacted. + */ + public void txCommit() throws QpidException, IllegalStateException; + + /** + * Rollback the receipt and the delivery of all messages exchanged by this session resources. + * + * @throws QpidException If the session fails to rollback due to some error. + * @throws IllegalStateException If this session is not transacted. + */ + public void txRollback() throws QpidException, IllegalStateException; + + //--------------------------------------------- + // Queue methods + //--------------------------------------------- + + /** + * Declare a queue with the given queueName + * <p> Following are the valid options for declareQueue + * <ul> + * <li> AUTO_DELETE + * <li> DURABLE + * <li> EXCLUSIVE + * <li> NO_WAIT + * <li> PASSIVE + * </ul> + * </p> + * <p/> + * <p>In the absence of a particular option, the defaul value is false for each option + * + * @param queueName The name of the delcared queue. + * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message + * may be rejected by a queue for the following reasons: + * <oL> <li> The queue is deleted when it is not empty; + * <<li> Immediate delivery of a message is requested, but there are no consumers connected to + * the queue. </ol> + * @param arguments Used for backward compatibility + * @param options Set of Options. + * @throws QpidException If the session fails to declare the queue due to some error. + * @see Option + */ + public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options) + throws QpidException; + + /** + * Bind a queue with an exchange. + * + * @param queueName The queue to be bound. + * @param exchangeName The exchange name. + * @param routingKey The routing key. + * @param arguments Used for backward compatibility + * @throws QpidException If the session fails to bind the queue due to some error. + */ + public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) + throws QpidException; + + /** + * Unbind a queue from an exchange. + * + * @param queueName The queue to be unbound. + * @param exchangeName The exchange name. + * @param routingKey The routing key. + * @param arguments Used for backward compatibility + * @throws QpidException If the session fails to unbind the queue due to some error. + */ + public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) + throws QpidException; + + /** + * Purge a queue. i.e. delete all enqueued messages + * + * @param queueName The queue to be purged + * @throws QpidException If the session fails to purge the queue due to some error. + */ + public void queuePurge(String queueName) throws QpidException; + + /** + * Delet a queue. + * <p> Following are the valid options for createReceive + * <ul> + * <li> IF_EMPTY + * <li> IF_UNUSE + * <li> NO_WAIT + * </ul> + * </p> + * <p/> + * <p>In the absence of a particular option, the defaul value is false for each option</p> + * + * @param queueName The name of the queue to be deleted + * @param options Set of options + * @throws QpidException If the session fails to delete the queue due to some error. + * @see Option + * <p/> + * Following are the valid options + */ + public void queueDelete(String queueName, Option... options) throws QpidException; + + // -------------------------------------- + // exhcange methods + // -------------------------------------- + + /** + * Declare an exchange. + * <p> Following are the valid options for createReceive + * <ul> + * <li> AUTO_DELETE + * <li> DURABLE + * <li> INTERNAL + * <li> NO_WAIT + * <li> PASSIVE + * </ul> + * </p> + * <p/> + * <p>In the absence of a particular option, the defaul value is false for each option</p> * + * + * @param exchangeName The exchange name. + * @param exchangeClass The fully qualified name of the exchange class. + * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which + * the message will be sent. + * @param options Set of options. + * @param arguments Used for backward compatibility + * @throws QpidException If the session fails to declare the exchange due to some error. + * @see Option + */ + public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, + Map<String, ?> arguments, Option... options) throws QpidException; + + /** + * Delete an exchange. + * <p> Following are the valid options for createReceive + * <ul> + * <li> IF_UNUSEDL + * <li> NO_WAIT + * </ul> + * </p> + * <p/> + * <p>In the absence of a particular option, the defaul value is false for each option + * Immediately deleted even if it is used by another resources.</p> + * + * @param exchangeName The name of exchange to be deleted. + * @param options Set of options. + * @throws QpidException If the session fails to delete the exchange due to some error. + * @see Option + */ + public void exchangeDelete(String exchangeName, Option... options) throws QpidException; +} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java new file mode 100644 index 0000000000..1baf063ef3 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java @@ -0,0 +1,204 @@ +package org.apache.qpidity.impl; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpidity.api.Message; +import org.apache.qpidity.MessagePartListener; +import org.apache.qpidity.*; + +/** + * Implements a Qpid Sesion. + */ +public class ClientSession implements org.apache.qpidity.Session +{ + + Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>(); + + + //------------------------------------------------------ + // Session housekeeping methods + //------------------------------------------------------ + public void close() throws QpidException + { + // TODO + + } + + public void suspend() throws QpidException + { + // TODO + + } + + public void resume() throws QpidException + { + // TODO + + }//------------------------------------------------------ + // Messaging methods + // Producer + //------------------------------------------------------ + public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException + { + // TODO + + } + + public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException + { + // TODO + + } + + public void addMessageHeaders(Header... headers) throws QpidException + { + // TODO + + } + + public void addData(byte[] data, int off, int len) throws QpidException + { + // TODO + + } + + public void endData() throws QpidException + { + // TODO + + } + + public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, + MessagePartListener listener, Map<String, ?> filter, Option... options) + throws QpidException + { + // TODO + + } + + public void messageCancel(String destination) throws QpidException + { + // TODO + + } + + public void setMessageListener(String destination, MessagePartListener listener) + { + // TODO + + } + + public void messageFlowMode(String destination, short mode) throws QpidException + { + // TODO + + } + + public void messageFlow(String destination, short unit, long value) throws QpidException + { + // TODO + + } + + public boolean messageFlush(String destination) throws QpidException + { + // TODO + return false; + } + + public void messageStop(String destination) throws QpidException + { + // TODO + + } + + public void messageAcknowledge(Range<Long>... range) throws QpidException + { + // TODO + + } + + public void messageReject(Range<Long>... range) throws QpidException + { + // TODO + + } + + public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException + { + // TODO + return null; + } + + public void messageRelease(Range<Long>... range) throws QpidException + { + // TODO + + }// ----------------------------------------------- + // Local transaction methods + // ---------------------------------------------- + public void txSelect() throws QpidException + { + // TODO + + } + + public void txCommit() throws QpidException, IllegalStateException + { + // TODO + + } + + public void txRollback() throws QpidException, IllegalStateException + { + // TODO + + } + + public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options) + throws QpidException + { + // TODO + + } + + public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) + throws QpidException + { + // TODO + + } + + public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) + throws QpidException + { + // TODO + + } + + public void queuePurge(String queueName) throws QpidException + { + // TODO + + } + + public void queueDelete(String queueName, Option... options) throws QpidException + { + // TODO + + } + + public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, + Map<String, ?> arguments, Option... options) throws QpidException + { + // TODO + + } + + public void exchangeDelete(String exchangeName, Option... options) throws QpidException + { + // TODO + + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java new file mode 100644 index 0000000000..00b4a65fee --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java @@ -0,0 +1,52 @@ +package org.apache.qpidity.impl; + +import org.apache.qpidity.CommonSessionDelegate; +import org.apache.qpidity.ExchangeQueryOk; +import org.apache.qpidity.Session; + + +public class ClientSessionDelegate extends CommonSessionDelegate +{ + + /*@Override public void messageTransfer(Session context, MessageTransfer struct) + { + MessagePartListener l = context.messagListeners.get(struct.getDestination()); + l.messageTransfer(struct.getDestination(),new Option[0]); + }*/ + + // --------------------------------------------------------------- + // Non generated methods - but would like if they are also generated. + // These methods should be called from Body and Header Handlers. + // If these methods are generated as part of the delegate then + // I can call these methods from the BodyHandler and HeaderHandler + // in a generic way + + // I have used destination to indicate my intent of receiving + // some form of correlation to know which consumer this data belongs to. + // It can be anything as long as I can make the right correlation + // ---------------------------------------------------------------- + /* public void data(Session context,String destination,byte[] src) throws QpidException + { + MessagePartListener l = context.messagListeners.get(destination); + l.data(src); + } + + public void endData(Session context,String destination) throws QpidException + { + MessagePartListener l = context.messagListeners.get(destination); + l.endData(); + } + + public void messageHeaders(Session context,String destination,Header... headers) throws QpidException + { + MessagePartListener l = context.messagListeners.get(destination); + l.endData(); + }*/ + + + // -------------------------------------------- + // Exchange related functionality + // -------------------------------------------- + public void exchangeQueryOk(Session session, ExchangeQueryOk struct) {} + +} diff --git a/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java new file mode 100644 index 0000000000..30d6710cfe --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/impl/MessagePartListenerAdapter.java @@ -0,0 +1,33 @@ +package org.apache.qpidity.impl; + +import org.apache.qpidity.MessagePartListener; +import org.apache.qpidity.MessageListener; +import org.apache.qpidity.Header; +import org.apache.qpidity.api.Message; + +public class MessagePartListenerAdapter implements MessagePartListener +{ + MessageListener _adaptee; + Message _currentMsg; + + public MessagePartListenerAdapter(MessageListener listener) + { + _adaptee = listener; + _currentMsg = null; + } + + public void addData(byte[] src) + { + _currentMsg.appendData(src); + } + + public void messageHeaders(Header... headers) + { + //_currentMsg add the headers + } + + public void messageReceived() + { + _adaptee.onMessage(_currentMsg); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java new file mode 100644 index 0000000000..25951bc0c1 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java @@ -0,0 +1,458 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpidity.QpidException; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.jms.Session; +import javax.jms.ExceptionListener; +import javax.jms.Connection; +import java.util.Vector; + + +/** + * Implements javax.jms.Connection, javax.jms.QueueConnection adn javax.jms.TopicConnection + */ +public class ConnectionImpl implements Connection, QueueConnection, TopicConnection +{ + /** + * This class's logger + */ + private static final Logger _logger = LoggerFactory.getLogger(ConnectionImpl.class); + + /** + * Maps from session id (Integer) to SessionImpl instance + */ + private final Vector<SessionImpl> _sessions = new Vector<SessionImpl>(); + + /** + * This is the clientID + */ + private String _clientID; + + /** + * The user name to use for authentication + */ + private String _username; + + /** + * The password to use for authentication + */ + private String _password; + + /** + * The Exception listenr get informed when a serious problem is detected + */ + private ExceptionListener _exceptionListener; + + /** + * Whether this connection is started, i.e. whether messages are flowing to consumers. + * It has no meaning for message publication. + */ + private boolean _started; + + /** + * set to true if this Connection has been closed. + * <p/> + * A closed Connection cannot accept invocations to any of its methods with the exception + * of close(). All other methods should throw javax.jms.IllegalStateExceptions if the + * Connection has been closed. + * <p/> + * A Connection is open after creation, but not started. Once it has been closed, a Connection + * cannot be reused any more. + */ + private boolean _isClosed = false; + + + /** + * The QpidConeection instance that is mapped with thie JMS connection + */ + org.apache.qpidity.Connection _qpidConnection; + + /** + * This is the exception listener for this qpid connection. + * The jms exception listener is registered with this listener. + */ + QpidExceptionListenerImpl _qpidExceptionListener; + + //------ Constructors ---// + /** + * TODO define the parameters + */ + public ConnectionImpl() + { + } + + //---- Interface javax.jms.Connection ---// + + /** + * Creates a Session + * + * @param transacted Indicates whether the session is transacted. + * @param acknowledgeMode ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>, + * <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>. + * @return A newly created session + * @throws JMSException If the Connection object fails to create a session due to some internal error. + */ + public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException + { + checkNotClosed(); + SessionImpl session = new SessionImpl(this, transacted, acknowledgeMode); + // add this session with the list of session that are handled by this connection + _sessions.add(session); + return session; + } + + /** + * Gets the client identifier for this connection. + * <P>It is either preconfigured as a JNDI property or assigned dynamically by the application + * by calling the <code>setClientID</code> method. + * <p/> + * TODO: Make sure that the client identifier can be set on the <CODE>ConnectionFactory</CODE> + * + * @return The unique client identifier. + * @throws JMSException If this connection is closed. + */ + public String getClientID() throws JMSException + { + checkNotClosed(); + return _clientID; + } + + /** + * Sets the client identifier for this connection. + * <p/> + * <P>The preferred way to assign a JMS client's client identifier is for + * it to be configured in a client-specific <CODE>ConnectionFactory</CODE> + * object and transparently assigned to the <CODE>Connection</CODE> object + * it creates. + * <p> In AMQP it is not possible to change the client ID. If one is not specified + * upon connection construction, an id is generated automatically. Therefore + * we can always throw an exception. + * TODO: Make sure that the client identifier can be set on the <CODE>ConnectionFactory</CODE> + * + * @param clientID the unique client identifier + * @throws JMSException Always as clientID is always set at construction time. + */ + public void setClientID(String clientID) throws JMSException + { + checkNotClosed(); + throw new IllegalStateException("Client name cannot be changed after being set"); + } + + /** + * Gets the metadata for this connection. + * + * @return The connection metadata + * @throws JMSException If there ie a problem getting the connection metadata for this connection. + * @see javax.jms.ConnectionMetaData + */ + public ConnectionMetaData getMetaData() throws JMSException + { + checkNotClosed(); + return ConnectionMetaDataImpl.getInstance(); + } + + /** + * Gets the <CODE>ExceptionListener</CODE> object for this connection. + * + * @return the <CODE>ExceptionListener</CODE> for this connection + * @throws JMSException In case of unforeseen problem + */ + public ExceptionListener getExceptionListener() throws JMSException + { + checkNotClosed(); + return _exceptionListener; + } + + /** + * Sets an exception listener for this connection. + * <p/> + * <p> The JMS specification says: + * <P>If a JMS provider detects a serious problem with a connection, it + * informs the connection's <CODE>ExceptionListener</CODE>, if one has been + * registered. It does this by calling the listener's + * <CODE>onException</CODE> method, passing it a <CODE>JMSException</CODE> + * object describing the problem. + * <p/> + * <P>A connection serializes execution of its + * <CODE>ExceptionListener</CODE>. + * <p/> + * <P>A JMS provider should attempt to resolve connection problems + * itself before it notifies the client of them. + * + * @param exceptionListener The connection listener. + * @throws JMSException If the connection is closed. + */ + public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException + { + checkNotClosed(); + _exceptionListener = exceptionListener; + _qpidExceptionListener.setJMSExceptionListner(_exceptionListener); + } + + /** + * Starts (or restarts) a connection's delivery of incoming messages. + * A call to start on a connection that has already been + * started is ignored. + * + * @throws JMSException In case of a problem due to some internal error. + */ + public void start() throws JMSException + { + checkNotClosed(); + if (!_started) + { + // start all the sessions + for (SessionImpl session : _sessions) + { + try + { + session.start(); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + _started = true; + } + } + + /** + * Temporarily stops a connection's delivery of incoming messages. + * <p> The JMS specification says: + * <p> Delivery can be restarted using the connection's <CODE>start</CODE> + * method. When the connection is stopped, delivery to all the connection's message consumers is inhibited: + * synchronous receives block, and messages are not delivered to message listeners. + * <P>This call blocks until receives and/or message listeners in progress have completed. + * + * @throws JMSException In case of a problem due to some internal error. + */ + public void stop() throws JMSException + { + checkNotClosed(); + if (_started) + { + // stop all the sessions + for (SessionImpl session : _sessions) + { + try + { + session.stop(); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + _started = false; + } + } + + /** + * Closes the connection. + * <p/> + * <p> The JMS specification says: + * <P>Since a provider typically allocates significant resources outside + * the JVM on behalf of a connection, clients should close these resources + * when they are not needed. Relying on garbage collection to eventually + * reclaim these resources may not be timely enough. + * <P>There is no need to close the sessions, producers, and consumers of a closed connection. + * <P>Closing a connection causes all temporary destinations to be deleted. + * <P>When this method is invoked, it should not return until message + * processing has been shut down in an orderly fashion. + * + * @throws JMSException In case of a problem due to some internal error. + */ + public void close() throws JMSException + { + checkNotClosed(); + if (!_isClosed) + { + _isClosed = true; + _started = false; + // close all the sessions + for (SessionImpl session : _sessions) + { + session.close(); + } + // close the underlaying Qpid connection + try + { + _qpidConnection.close(); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + } + + /** + * Creates a connection consumer for this connection (optional operation). + * This is an expert facility for App server integration. + * + * @param destination The destination to access. + * @param messageSelector Only messages with properties matching the message selector expression are delivered. + * @param sessionPool The session pool to associate with this connection consumer. + * @param maxMessages The maximum number of messages that can be assigned to a server session at one time. + * @return Null for the moment. + * @throws JMSException In case of a problem due to some internal error. + */ + public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws + JMSException + { + checkNotClosed(); + return null; + } + + /** + * Create a durable connection consumer for this connection (optional operation). + * + * @param topic The topic to access. + * @param subscriptionName Durable subscription name. + * @param messageSelector Only messages with properties matching the message selector expression are delivered. + * @param sessionPool The server session pool to associate with this durable connection consumer. + * @param maxMessages The maximum number of messages that can be assigned to a server session at one time. + * @return Null for the moment. + * @throws JMSException In case of a problem due to some internal error. + */ + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, + String messageSelector, ServerSessionPool sessionPool, + int maxMessages) throws JMSException + { + checkNotClosed(); + return null; + } + + //-------------- QueueConnection API + + /** + * Create a QueueSession. + * + * @param transacted Indicates whether the session is transacted. + * @param acknowledgeMode Indicates whether the consumer or the + * client will acknowledge any messages it receives; ignored if the session + * is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>, + * <code>Session.CLIENT_ACKNOWLEDGE</code> and <code>Session.DUPS_OK_ACKNOWLEDGE</code>. + * @return A queueSession object/ + * @throws JMSException If creating a QueueSession fails due to some internal error. + */ + public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException + { + checkNotClosed(); + QueueSessionImpl queueSession = new QueueSessionImpl(this, transacted, acknowledgeMode); + // add this session to the list of handled sessions. + _sessions.add(queueSession); + return queueSession; + } + + /** + * Creates a connection consumer for this connection (optional operation). + * This is an expert facility for App server integration. + * + * @param queue The queue to access. + * @param messageSelector Only messages with properties matching the message selector expression are delivered. + * @param sessionPool The session pool to associate with this connection consumer. + * @param maxMessages The maximum number of messages that can be assigned to a server session at one time. + * @return Null for the moment. + * @throws JMSException In case of a problem due to some internal error. + */ + public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws + JMSException + { + return createConnectionConsumer((Destination) queue, messageSelector, sessionPool, maxMessages); + } + + //-------------- TopicConnection API + /** + * Create a TopicSession. + * + * @param transacted Indicates whether the session is transacted + * @param acknowledgeMode Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>, <code>Session.CLIENT_ACKNOWLEDGE</code>, and + * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. + * @return a newly created topic session + * @throws JMSException If creating the session fails due to some internal error. + */ + public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException + { + checkNotClosed(); + TopicSessionImpl session = new TopicSessionImpl(this, transacted, acknowledgeMode); + // add the session with this Connection's sessions + // important for when the Connection is closed. + _sessions.add(session); + return session; + } + + /** + * Creates a connection consumer for this connection (optional operation). + * This is an expert facility for App server integration. + * + * @param topic The topic to access. + * @param messageSelector Only messages with properties matching the message selector expression are delivered. + * @param sessionPool The session pool to associate with this connection consumer. + * @param maxMessages The maximum number of messages that can be assigned to a server session at one time. + * @return Null for the moment. + * @throws JMSException In case of a problem due to some internal error. + */ + public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws + JMSException + { + return createConnectionConsumer((Destination) topic, messageSelector, sessionPool, maxMessages); + } + + //-------------- protected and private methods + + /** + * Validate that the Connection is not closed. + * <p/> + * If the Connection has been closed, throw a IllegalStateException. This behaviour is + * required by the JMS specification. + * + * @throws IllegalStateException If the session is closed. + */ + protected synchronized void checkNotClosed() throws IllegalStateException + { + if (_isClosed) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Connection has been closed. Cannot invoke any further operations."); + } + throw new javax.jms.IllegalStateException("Connection has been closed. Cannot invoke any further operations."); + } + } + + /** + * Provide access to the underlying qpid Connection. + * + * @return This JMS connection underlying Qpid Connection. + */ + protected org.apache.qpidity.Connection getQpidConnection() + { + return _qpidConnection; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionMetaDataImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionMetaDataImpl.java new file mode 100644 index 0000000000..a4b15e9758 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionMetaDataImpl.java @@ -0,0 +1,165 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpid.common.QpidProperties; + +import javax.jms.ConnectionMetaData; +import javax.jms.JMSException; +import java.util.Enumeration; + +/** + * Implements javax.jms.ConnectionMetaData + * A ConnectionMetaDataImpl provides information describing the JMS <code>Connection</code>. + */ +public class ConnectionMetaDataImpl implements ConnectionMetaData +{ + + /** + * A singleton instance. + */ + static ConnectionMetaDataImpl _singleton = new ConnectionMetaDataImpl(); + + // ------------------------ The metadata + // JMS major version + private static final int JMS_MAJOR_VERSION = 1; + // JMS minor version + private static final int JMS_MINOR_VERSION = 1; + // JMS version + private static final String JMS_VERSION = "1.1"; + // Provider name + private static final String PROVIDER_NAME = "Apache " + QpidProperties.getProductName(); + // Provider major version + private static final int PROVIDER_MAJOR_VERSION = 0; + // Provider minor version + private static final int PROVIDER_MINOR_VERSION = 10; + // Provider version + private static final String PROVIDER_VERSION = QpidProperties.getProductName() + " (Client: [" + QpidProperties.getBuildVersion() + "] ; Protocol: [ 0.10 ] )"; + + /** + * Prevent instantiation. + */ + private ConnectionMetaDataImpl() + { + } + + /** + * Get the singleton instance of ConnectionMetaDataImpl. + * + * @return the singleton instance of ConnectionMetaDataImpl. + */ + public static ConnectionMetaDataImpl getInstance() + { + return _singleton; + } + + //-- Connection MetaData API + + /** + * Gets the JMS API version. + * + * @return the JMS API version + * @throws JMSException Never + */ + public String getJMSVersion() throws JMSException + { + return JMS_VERSION; + } + + + /** + * Gets the JMS major version number. + * + * @return the JMS API major version number + * @throws JMSException Never + */ + public int getJMSMajorVersion() throws JMSException + { + return JMS_MAJOR_VERSION; + } + + + /** + * Gets the JMS minor version number. + * + * @return the JMS API minor version number + * @throws JMSException Never + */ + public int getJMSMinorVersion() throws JMSException + { + return JMS_MINOR_VERSION; + } + + + /** + * Gets Qpid name. + * + * @return Qpid name + * @throws JMSException Never + */ + public String getJMSProviderName() throws JMSException + { + return PROVIDER_NAME; + } + + /** + * Gets Qpid version. + * + * @return Qpid version + * @throws JMSException Never + */ + public String getProviderVersion() throws JMSException + { + return PROVIDER_VERSION; + // TODO: We certainly can dynamically get the server version. + } + + /** + * Gets Qpid major version number. + * + * @return Qpid major version number + * @throws JMSException Never + */ + public int getProviderMajorVersion() throws JMSException + { + return PROVIDER_MAJOR_VERSION; + } + + /** + * Gets Qpid minor version number. + * + * @return Qpid minor version number + * @throws JMSException Never + */ + public int getProviderMinorVersion() throws JMSException + { + return PROVIDER_MINOR_VERSION; + } + + /** + * Gets an enumeration of the JMSX property names. + * + * @return an Enumeration of JMSX property names + * @throws JMSException if cannot retrieve metadata due to some internal error. + */ + public Enumeration getJMSXPropertyNames() throws JMSException + { + return CustomJMSXProperty.asEnumeration(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/CustomJMSXProperty.java b/java/client/src/main/java/org/apache/qpidity/jms/CustomJMSXProperty.java new file mode 100644 index 0000000000..8e6d5ef038 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/CustomJMSXProperty.java @@ -0,0 +1,47 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import java.util.Enumeration; +import java.util.ArrayList; +import java.util.Collections; + +public enum CustomJMSXProperty +{ + JMS_AMQP_NULL, + JMS_QPID_DESTTYPE, + JMSXGroupID, + JMSXGroupSeq; + + private static Enumeration _names; + + public static synchronized Enumeration asEnumeration() + { + if (_names == null) + { + CustomJMSXProperty[] properties = values(); + ArrayList<String> nameList = new ArrayList<String>(properties.length); + for (CustomJMSXProperty property : properties) + { + nameList.add(property.toString()); + } + _names = Collections.enumeration(nameList); + } + return _names; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java new file mode 100644 index 0000000000..1964d4e525 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java @@ -0,0 +1,142 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; +import org.apache.qpidity.url.BindingURL; + +import javax.jms.Destination; + +/** + * Implementation of the JMS Destination interface + */ +public class DestinationImpl implements Destination +{ + /** + * The destination's name + */ + protected String _name = null; + + /** + * The session used to create this destination + */ + protected SessionImpl _session; + + /** + * The excahnge name + */ + protected String _exchangeName; + + /** + * The excahnge class + */ + protected String _exchangeClass; + + /** + * The queu name + */ + protected String _queueName; + + //--- Constructor + /** + * Create a new DestinationImpl with a given name. + * + * @param name The name of this destination. + * @param session The session used to create this destination. + * @throws QpidException If the destiantion name is not valid + */ + protected DestinationImpl(SessionImpl session, String name) throws QpidException + { + _session = session; + _name = name; + } + + /** + * Create a destiantion from a binding URL + * + * @param session The session used to create this queue. + * @param binding The URL + * @throws QpidException If the URL is not valid + */ + protected DestinationImpl(SessionImpl session, BindingURL binding) throws QpidException + { + _session = session; + _exchangeName = binding.getExchangeName(); + _exchangeClass = binding.getExchangeClass(); + _name = binding.getDestinationName(); + // _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE)); + boolean isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); + boolean isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); + _queueName = binding.getQueueName(); + // create this exchange + _session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeClass, null, null, + isDurable ? Option.DURABLE : Option.NO_OPTION, + isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION); + } + + //---- Getters and Setters + + /** + * Gets the name of this destination. + * + * @return The destination name. + */ + public String getName() + { + return _name; + } + + /** + * set the destination name + * <p> This name is not verified until producing or consuming messages for that destination. + * + * @param name The destination name. + */ + public void setName(String name) + { + _name = name; + } + + /** + * Overrides Object.toString(); + * + * @return Stringified destination representation. + */ + public String toString() + { + return _name; + } + + // getter methods + public String getQpidQueueName() + { + return _queueName; + } + + public String getExchangeName() + { + return _exchangeName; + } + + public String getExchangeClass() + { + return _exchangeClass; + } +} + diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java new file mode 100644 index 0000000000..6174fa9da9 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java @@ -0,0 +1,50 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; + +import javax.jms.JMSException; + +/** + * Helper class for handling exceptions + */ +public class ExceptionHelper +{ + static public JMSException convertQpidExceptionToJMSException(Exception exception) + { + JMSException jmsException = null; + if (!(exception instanceof JMSException)) + { + if (exception instanceof QpidException) + { + jmsException = new JMSException(exception.getMessage(), ((QpidException) exception).getErrorCode()); + } + else + { + jmsException = new JMSException(exception.getMessage()); + } + jmsException.setLinkedException(exception); + } + else + { + jmsException = (JMSException) exception; + } + return jmsException; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java new file mode 100644 index 0000000000..c28767d699 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageActor.java @@ -0,0 +1,182 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpidity.QpidException; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; + +/** + * MessageActor is the superclass for MessageProducerImpl and MessageProducerImpl. + */ +public abstract class MessageActor +{ + /** + * Used for debugging. + */ + protected static final Logger _logger = LoggerFactory.getLogger(MessageActor.class); + + /** + * Indicates whether this MessageActor is closed. + */ + protected boolean _isClosed = false; + + /** + * This messageActor's session + */ + private SessionImpl _session; + + /** + * The JMS destination this actor is set for. + */ + DestinationImpl _destination; + + /** + * Indicates that this actor is stopped + */ + protected boolean _isStopped; + + /** + * The ID of this actor for the session. + */ + private String _messageActorID; + + //-- Constructor + + //TODO define the parameters + + protected MessageActor() + { + + } + + protected MessageActor(SessionImpl session, DestinationImpl destination) + { + _session = session; + _destination = destination; + } + + //--- public methods (part of the jms public API) + /** + * Closes the MessageActor and deregister it from its session. + * + * @throws JMSException if the MessaeActor cannot be closed due to some internal error. + */ + public void close() throws JMSException + { + if (!_isClosed) + { + closeMessageActor(); + // notify the session that this message actor is closing + _session.closeMessageActor(this); + } + } + + //-- protected methods + + /** + * Stop this message actor + * + * @throws Exception If the consumer cannot be stopped due to some internal error. + */ + protected void stop() throws Exception + { + _isStopped = true; + } + + /** + * Start this message Actor + * + * @throws Exception If the consumer cannot be started due to some internal error. + */ + protected void start() throws Exception + { + + _isStopped = false; + + } + + /** + * Check if this MessageActor is not closed. + * <p> If the MessageActor is closed, throw a javax.jms.IllegalStateException. + * <p> The method is not synchronized, since MessageProducers can only be used by a single thread. + * + * @throws IllegalStateException if the MessageActor is closed + */ + protected void checkNotClosed() throws IllegalStateException + { + if (_isClosed || _session == null) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Actor " + this + " is already closed"); + } + throw new IllegalStateException("Actor " + this + " is already closed"); + } + _session.checkNotClosed(); + } + + /** + * Closes a MessageActor. + * <p> This method is invoked when the session is closing or when this + * messageActor is closing. + * + * @throws JMSException If the MessaeActor cannot be closed due to some internal error. + */ + protected void closeMessageActor() throws JMSException + { + if (!_isClosed) + { + try + { + // cancle this destination + getSession().getQpidSession().messageCancel(getMessageActorID()); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + _isClosed = true; + } + } + + /** + * Get the associated session object. + * + * @return This Actor's Session. + */ + protected SessionImpl getSession() + { + return _session; + } + + /** + * Get the ID of this actor within its session. + * + * @return This actor ID. + */ + protected String getMessageActorID() + { + return _messageActorID; + } + + +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java new file mode 100644 index 0000000000..07f274381f --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -0,0 +1,613 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.jms.message.QpidMessage; +import org.apache.qpidity.impl.MessagePartListenerAdapter; +import org.apache.qpidity.MessagePartListener; +import org.apache.qpidity.Range; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; +import org.apache.qpidity.filter.MessageFilter; +import org.apache.qpidity.filter.JMSSelectorFilter; +import org.apache.qpidity.exchange.ExchangeDefaults; + +import javax.jms.*; + +/** + * Implementation of JMS message consumer + */ +public class MessageConsumerImpl extends MessageActor implements MessageConsumer +{ + // we can receive up to 100 messages for an asynchronous listener + public static final int MAX_MESSAGE_TRANSFERRED = 100; + + /** + * This MessageConsumer's messageselector. + */ + private String _messageSelector = null; + + /** + * The message selector filter associated with this consumer message selector + */ + private MessageFilter _filter = null; + + /** + * NoLocal + * If true, and the destination is a topic then inhibits the delivery of messages published + * by its own connection. The behavior for NoLocal is not specified if the destination is a queue. + */ + protected boolean _noLocal; + + /** + * The subscription name + */ + protected String _subscriptionName; + + /** + * Indicates whether this consumer receives pre-acquired messages + */ + private boolean _preAcquire = true; + + /** + * A MessagePartListener set up for this consumer. + */ + private MessageListener _messageListener; + + /** + * The synchronous message just delivered + */ + private QpidMessage _incomingMessage; + + /** + * A lcok on the syncrhonous message + */ + private final Object _incomingMessageLock = new Object(); + + /** + * Indicates that this consumer is receiving a synch message + */ + private boolean _isReceiving = false; + + /** + * Indicates that a nowait is receiving a message. + */ + private boolean _isNoWaitIsReceiving = false; + + /** + * Number of mesages received asynchronously + * Nether exceed MAX_MESSAGE_TRANSFERRED + */ + private int _messageAsyncrhonouslyReceived = 0; + + //----- Constructors + /** + * Create a new MessageProducerImpl. + * + * @param session The session from which the MessageProducerImpl is instantiated + * @param destination The default destination for this MessageProducerImpl + * @param messageSelector The message selector for this QueueReceiverImpl. + * @param noLocal If true inhibits the delivery of messages published by its own connection. + * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber. + * If this value is null, a non-durable subscription is created. + * @throws Exception If the MessageProducerImpl cannot be created due to some internal error. + */ + protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector, + boolean noLocal, String subscriptionName) throws Exception + { + super(session, destination); + if (messageSelector != null) + { + _messageSelector = messageSelector; + _filter = new JMSSelectorFilter(messageSelector); + } + _noLocal = noLocal; + _subscriptionName = subscriptionName; + _isStopped = getSession().isStopped(); + // let's create a message part assembler + /** + * A Qpid message listener that pushes messages to this consumer session when this consumer is + * asynchronous or directly to this consumer when it is synchronously accessed. + */ + MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this)); + + if (destination instanceof Queue) + { + // this is a queue we expect that this queue exists + getSession().getQpidSession() + .messageSubscribe(destination.getName(), getMessageActorID(), + org.apache.qpidity.Session.CONFIRM_MODE_NOT_REQUIRED, + // When the message selctor is set we do not acquire the messages + _messageSelector != null ? org.apache.qpidity.Session.ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.Session.ACQUIRE_MODE_PRE_ACQUIRE, + messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION); + if (_messageSelector != null) + { + _preAcquire = false; + } + } + else + { + // this is a topic we need to create a temporary queue for this consumer + // unless this is a durable subscriber + String queueName; + if (subscriptionName != null) + { + // this ia a durable subscriber + // create a persistent queue for this subscriber + queueName = "topic-" + subscriptionName; + getSession().getQpidSession() + .queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.DURABLE); + } + else + { + // this is a non durable subscriber + // create a temporary queue + queueName = "topic-" + getMessageActorID(); + getSession().getQpidSession() + .queueDeclare(queueName, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE); + } + // bind this queue with the topic exchange + getSession().getQpidSession() + .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getName(), null); + // subscribe to this topic + getSession().getQpidSession() + .messageSubscribe(queueName, getMessageActorID(), + org.apache.qpidity.Session.CONFIRM_MODE_NOT_REQUIRED, + // We always acquire the messages + org.apache.qpidity.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, + _noLocal ? Option.NO_LOCAL : Option.NO_OPTION, + // Request exclusive subscription access, meaning only this subscription + // can access the queue. + Option.EXCLUSIVE); + + } + // set the flow mode + getSession().getQpidSession() + .messageFlowMode(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_MODE_CREDIT); + } + + //----- Message consumer API + /** + * Gets this MessageConsumer's message selector. + * + * @return This MessageConsumer's message selector, or null if no + * message selector exists for the message consumer (that is, if + * the message selector was not set or was set to null or the + * empty string) + * @throws JMSException if getting the message selector fails due to some internal error. + */ + public String getMessageSelector() throws JMSException + { + checkNotClosed(); + return _messageSelector; + } + + /** + * Gets this MessageConsumer's <CODE>MessagePartListener</CODE>. + * + * @return The listener for the MessageConsumer, or null if no listener is set + * @throws JMSException if getting the message listener fails due to some internal error. + */ + public MessageListener getMessageListener() throws JMSException + { + checkNotClosed(); + return _messageListener; + } + + /** + * Sets the MessageConsumer's <CODE>MessagePartListener</CODE>. + * <p> The JMS specification says: + * <P>Setting the message listener to null is the equivalent of + * unsetting the message listener for the message consumer. + * <P>The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> + * while messages are being consumed by an existing listener + * or the consumer is being used to consume messages synchronously + * is undefined. + * + * @param messageListener The listener to which the messages are to be delivered + * @throws JMSException If setting the message listener fails due to some internal error. + */ + public synchronized void setMessageListener(MessageListener messageListener) throws JMSException + { + // this method is synchronized as onMessage also access _messagelistener + // onMessage, getMessageListener and this method are the only synchronized methods + checkNotClosed(); + try + { + _messageListener = messageListener; + if (messageListener != null) + { + resetAsynchMessageReceived(); + } + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + /** + * Contact the broker and ask for the delivery of MAX_MESSAGE_TRANSFERRED messages + * + * @throws QpidException If there is a communication error + */ + private void resetAsynchMessageReceived() throws QpidException + { + if (!_isStopped && _messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED) + { + getSession().getQpidSession().messageStop(getMessageActorID()); + } + _messageAsyncrhonouslyReceived = 0; + getSession().getQpidSession() + .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, + MAX_MESSAGE_TRANSFERRED); + } + + /** + * Receive the next message produced for this message consumer. + * <P>This call blocks indefinitely until a message is produced or until this message consumer is closed. + * + * @return The next message produced for this message consumer, or + * null if this message consumer is concurrently closed + * @throws JMSException If receiving the next message fails due to some internal error. + */ + public Message receive() throws JMSException + { + return receive(0); + } + + /** + * Receive the next message that arrives within the specified timeout interval. + * <p> This call blocks until a message arrives, the timeout expires, or this message consumer + * is closed. + * <p> A timeout of zero never expires, and the call blocks indefinitely. + * <p> A timeout less than 0 throws a JMSException. + * + * @param timeout The timeout value (in milliseconds) + * @return The next message that arrives within the specified timeout interval. + * @throws JMSException If receiving the next message fails due to some internal error. + */ + public Message receive(long timeout) throws JMSException + { + if (timeout < 0) + { + throw new JMSException("Invalid timeout value: " + timeout); + } + Message result; + try + { + result = internalReceive(timeout); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; + } + + /** + * Receive the next message if one is immediately available. + * + * @return the next message or null if one is not available. + * @throws JMSException If receiving the next message fails due to some internal error. + */ + public Message receiveNoWait() throws JMSException + { + Message result; + try + { + result = internalReceive(-1); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; + } + + // not public methods + + /** + * Receive a synchronous message + * <p> This call blocks until a message arrives, the timeout expires, or this message consumer + * is closed. + * <p> A timeout of zero never expires, and the call blocks indefinitely (unless this message consumer + * is closed) + * <p> A timeout less than 0 returns the next message or null if one is not available. + * + * @param timeout The timeout value (in milliseconds) + * @return the next message or null if one is not available. + * @throws Exception If receiving the next message fails due to some internal error. + */ + private Message internalReceive(long timeout) throws Exception + { + checkNotClosed(); + if (_messageListener != null) + { + throw new javax.jms.IllegalStateException("A listener has already been set."); + } + + Message result = null; + synchronized (_incomingMessageLock) + { + // This indicate to the delivery thread to deliver the message to this consumer + // as it can happens that a message is delivered after a receive operation as returned. + _isReceiving = true; + boolean received = false; + if (!_isStopped) + { + // if this consumer is stopped then this will be call when starting + getSession().getQpidSession() + .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + received = getSession().getQpidSession().messageFlush(getMessageActorID()); + } + if (!received && timeout < 0) + { + // this is a nowait and we havent received a message then we must immediatly return + result = null; + } + else + { + // right we need to let onMessage know that a nowait is potentially waiting for a message + if (timeout < 0) + { + _isNoWaitIsReceiving = true; + } + while (_incomingMessage == null && !_isClosed) + { + try + { + _incomingMessageLock.wait(timeout); + } + catch (InterruptedException e) + { + // do nothing + } + } + if (_incomingMessage != null) + { + result = _incomingMessage.getJMSMessage(); + // tell the session that a message is inprocess + getSession().preProcessMessage(_incomingMessage); + // tell the session to acknowledge this message (if required) + getSession().acknowledgeMessage(_incomingMessage); + } + _incomingMessage = null; + } + // We now release any message received for this consumer + _isReceiving = false; + _isNoWaitIsReceiving = false; + } + return result; + } + + /** + * Stop the delivery of messages to this consumer. + * <p>For asynchronous receiver, this operation blocks until the message listener + * finishes processing the current message, + * + * @throws Exception If the consumer cannot be stopped due to some internal error. + */ + protected void stop() throws Exception + { + getSession().getQpidSession().messageStop(getMessageActorID()); + _isStopped = true; + } + + /** + * Start the delivery of messages to this consumer. + * + * @throws Exception If the consumer cannot be started due to some internal error. + */ + protected void start() throws Exception + { + synchronized (_incomingMessageLock) + { + _isStopped = false; + if (_isReceiving) + { + // there is a synch call waiting for a message to be delivered + // so tell the broker to deliver a message + getSession().getQpidSession() + .messageFlow(getMessageActorID(), org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + getSession().getQpidSession().messageFlush(getMessageActorID()); + } + } + } + + /** + * Deliver a message to this consumer. + * + * @param message The message delivered to this consumer. + */ + protected synchronized void onMessage(QpidMessage message) + { + try + { + // if there is a message selector then we need to evaluate it. + boolean messageOk = true; + if (_messageSelector != null) + { + messageOk = _filter.matches(message.getJMSMessage()); + } + if (!messageOk && _preAcquire) + { + // this is the case for topics + // We need to ack this message + acknowledgeMessage(message); + } + // now we need to acquire this message if needed + // this is the case of queue with a message selector set + if (!_preAcquire && messageOk) + { + messageOk = acquireMessage(message); + } + + // if this consumer is synchronous then set the current message and + // notify the waiting thread + if (_messageListener == null) + { + synchronized (_incomingMessageLock) + { + if (messageOk) + { + // we have received a proper message that we can deliver + if (_isReceiving) + { + _incomingMessage = message; + _incomingMessageLock.notify(); + } + else + { + // this message has been received after a received as returned + // we need to release it + releaseMessage(message); + } + } + else + { + // oups the message did not match the selector or we did not manage to acquire it + // If the receiver is still waiting for a message + // then we need to request a new one from the server + if (_isReceiving) + { + getSession().getQpidSession() + .messageFlow(getMessageActorID(), + org.apache.qpidity.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + boolean received = getSession().getQpidSession().messageFlush(getMessageActorID()); + if (!received && _isNoWaitIsReceiving) + { + // Right a message nowait is waiting for a message + // but no one can be delivered it then need to return + _incomingMessageLock.notify(); + } + } + } + } + } + else + { + _messageAsyncrhonouslyReceived++; + if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED) + { + // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages + resetAsynchMessageReceived(); + } + // only deliver the message if it is valid + if (messageOk) + { + // This is an asynchronous message + // tell the session that a message is in process + getSession().preProcessMessage(message); + // If the session is transacted we need to ack the message first + // This is because a message is associated with its tx only when acked + if (getSession().getTransacted()) + { + getSession().acknowledgeMessage(message); + } + // The JMS specs says: + /* The result of a listener throwing a RuntimeException depends on the session?s + * acknowledgment mode. + ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message + * will be immediately redelivered. The number of times a JMS provider will + * redeliver the same message before giving up is provider-dependent. + ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered. + * --- Transacted Session - the next message for the listener is delivered. + * + * The number of time we try redelivering the message is 0 + **/ + try + { + _messageListener.onMessage(message.getJMSMessage()); + } + catch (RuntimeException re) + { + // do nothing as this message will not be redelivered + } + // If the session has been recovered we then need to redelivered this message + if (getSession().isInRecovery()) + { + releaseMessage(message); + } + else if (!getSession().getTransacted()) + { + // Tell the jms Session to ack this message if required + getSession().acknowledgeMessage(message); + } + } + } + } + catch (Exception e) + { + throw new RuntimeException(e.getMessage()); + } + } + + /** + * Release a message + * + * @param message The message to be released + * @throws QpidException If the message cannot be released due to some internal error. + */ + private void releaseMessage(QpidMessage message) throws QpidException + { + if (_preAcquire) + { + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + getSession().getQpidSession().messageRelease(range); + } + } + + /** + * Acquire a message + * + * @param message The message to be acquired + * @return true if the message has been acquired, false otherwise. + * @throws QpidException If the message cannot be acquired due to some internal error. + */ + private boolean acquireMessage(QpidMessage message) throws QpidException + { + boolean result = false; + if (!_preAcquire) + { + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + + Range<Long>[] rangeResult = getSession().getQpidSession().messageAcquire(range); + if (rangeResult.length > 0) + { + result = rangeResult[0].getLower().compareTo(message.getMessageID()) == 0; + } + } + return result; + } + + /** + * Acknowledge a message + * + * @param message The message to be acknowledged + * @throws QpidException If the message cannot be acquired due to some internal error. + */ + private void acknowledgeMessage(QpidMessage message) throws QpidException + { + if (!_preAcquire) + { + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + getSession().getQpidSession().messageAcknowledge(range); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java new file mode 100644 index 0000000000..29f867d6b8 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java @@ -0,0 +1,316 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import javax.jms.*; + +/** + * Implements MessageProducer + */ +public class MessageProducerImpl extends MessageActor implements MessageProducer +{ + /** + * If true, messages will not get a timestamp. + */ + private boolean _disableTimestamps = false; + + /** + * Priority of messages created by this producer. + */ + private int _messagePriority = Message.DEFAULT_PRIORITY; + + /** + * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. + */ + private long _timeToLive; + + /** + * Delivery mode used for this producer. + */ + private int _deliveryMode = DeliveryMode.PERSISTENT; + + /** + * Speicify whether the messageID is disable + */ + private boolean _disableMessageId = false; + + //-- constructors + public MessageProducerImpl(SessionImpl session, DestinationImpl destination) + { + super(session, destination); + } + + //--- Interface javax.jms.MessageProducer + /** + * Sets whether message IDs are disabled. + * + * @param value Specify whether the MessageID must be disabled + * @throws JMSException If disabling messageID fails due to some internal error. + */ + public void setDisableMessageID(boolean value) throws JMSException + { + checkNotClosed(); + _disableMessageId = value; + } + + /** + * Gets an indication of whether message IDs are disabled. + * + * @return true is messageID is disabled, false otherwise + * @throws JMSException If getting whether messagID is disabled fails due to some internal error. + */ + public boolean getDisableMessageID() throws JMSException + { + checkNotClosed(); + return _disableMessageId; + } + + /** + * Sets whether message timestamps are disabled. + * <P> JMS spec says: + * <p> Since timestamps take some effort to create and increase a + * message's size, some JMS providers may be able to optimize message + * overhead if they are given a hint that the timestamp is not used by an + * application.... + * these messages must have the timestamp set to zero; if the provider + * ignores the hint, the timestamp must be set to its normal value. + * <p>Message timestamps are enabled by default. + * + * @param value Indicates if message timestamps are disabled + * @throws JMSException if disabling the timestamps fails due to some internal error. + */ + public void setDisableMessageTimestamp(boolean value) throws JMSException + { + checkNotClosed(); + _disableTimestamps = value; + } + + /** + * Gets an indication of whether message timestamps are disabled. + * + * @return an indication of whether message timestamps are disabled + * @throws JMSException if getting whether timestamps are disabled fails due to some internal error. + */ + public boolean getDisableMessageTimestamp() throws JMSException + { + checkNotClosed(); + return _disableTimestamps; + } + + /** + * Sets the producer's default delivery mode. + * <p> JMS specification says: + * <p>Delivery mode is set to {@link DeliveryMode#PERSISTENT} by default. + * + * @param deliveryMode The message delivery mode for this message producer; legal + * values are {@link DeliveryMode#NON_PERSISTENT} + * and {@link DeliveryMode#PERSISTENT}. + * @throws JMSException if setting the delivery mode fails due to some internal error. + */ + public void setDeliveryMode(int deliveryMode) throws JMSException + { + checkNotClosed(); + if ((deliveryMode != DeliveryMode.NON_PERSISTENT) && (deliveryMode != DeliveryMode.PERSISTENT)) + { + throw new JMSException( + "DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + deliveryMode + " is illegal"); + } + _deliveryMode = deliveryMode; + } + + /** + * Gets the producer's delivery mode. + * + * @return The message delivery mode for this message producer + * @throws JMSException If getting the delivery mode fails due to some internal error. + */ + public int getDeliveryMode() throws JMSException + { + checkNotClosed(); + return _deliveryMode; + } + + /** + * Sets the producer's message priority. + * <p> The jms spec says: + * <p> The JMS API defines ten levels of priority value, with 0 as the + * lowest priority and 9 as the highest. Clients should consider priorities + * 0-4 as gradations of normal priority and priorities 5-9 as gradations + * of expedited priority. + * <p> Priority is set to 4 by default. + * + * @param priority The message priority for this message producer; must be a value between 0 and 9 + * @throws JMSException if setting this producer priority fails due to some internal error. + */ + public void setPriority(int priority) throws JMSException + { + checkNotClosed(); + if ((priority < 0) || (priority > 9)) + { + throw new IllegalArgumentException( + "Priority of " + priority + " is illegal. Value must be in range 0 to 9"); + } + _messagePriority = priority; + } + + /** + * Gets the producer's message priority. + * + * @return The message priority for this message producer. + * @throws JMSException If getting this producer message priority fails due to some internal error. + */ + public int getPriority() throws JMSException + { + checkNotClosed(); + return _messagePriority; + } + + /** + * Sets the default length of time in milliseconds from its dispatch time + * that a produced message should be retained by the message system. + * <p> The JMS spec says that time to live must be set to zero by default. + * + * @param timeToLive The message time to live in milliseconds; zero is unlimited + * @throws JMSException If setting the default time to live fails due to some internal error. + */ + public void setTimeToLive(long timeToLive) throws JMSException + { + checkNotClosed(); + if (timeToLive < 0) + { + throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive); + } + _timeToLive = timeToLive; + } + + /** + * Gets the default length of time in milliseconds from its dispatch time + * that a produced message should be retained by the message system. + * + * @return The default message time to live in milliseconds; zero is unlimited + * @throws JMSException if getting the default time to live fails due to some internal error. + * @see javax.jms.MessageProducer#setTimeToLive + */ + public long getTimeToLive() throws JMSException + { + checkNotClosed(); + return _timeToLive; + } + + /** + * Gets the destination associated with this producer. + * + * @return This producer's destination. + * @throws JMSException If getting the destination for this producer fails + * due to some internal error. + */ + public Destination getDestination() throws JMSException + { + checkNotClosed(); + return _destination; + } + + /** + * Sends a message using the producer's default delivery mode, priority, destination + * and time to live. + * + * @param message the message to be sent + * @throws JMSException If sending the message fails due to some internal error. + * @throws MessageFormatException If an invalid message is specified. + * @throws InvalidDestinationException If this producer destination is invalid. + * @throws java.lang.UnsupportedOperationException + * If a client uses this method with a producer that did + * not specify a destination at creation time. + */ + public void send(Message message) throws JMSException + { + send(message, _deliveryMode, _messagePriority, _timeToLive); + } + + /** + * Sends a message to this producer default destination, specifying delivery mode, + * priority, and time to live. + * + * @param message The message to send + * @param deliveryMode The delivery mode to use + * @param priority The priority for this message + * @param timeToLive The message's lifetime (in milliseconds) + * @throws JMSException If sending the message fails due to some internal error. + * @throws MessageFormatException If an invalid message is specified. + * @throws InvalidDestinationException If this producer's destination is invalid. + * @throws java.lang.UnsupportedOperationException + * If a client uses this method with a producer that did + * not specify a destination at creation time. + */ + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException + { + send(_destination, message, deliveryMode, priority, timeToLive); + } + + /** + * Sends a message to a specified destination using this producer's default + * delivery mode, priority and time to live. + * <p/> + * <P>Typically, a message producer is assigned a destination at creation + * time; however, the JMS API also supports unidentified message producers, + * which require that the destination be supplied every time a message is + * sent. + * + * @param destination The destination to send this message to + * @param message The message to send + * @throws JMSException If sending the message fails due to some internal error. + * @throws MessageFormatException If an invalid message is specified. + * @throws InvalidDestinationException If an invalid destination is specified. + */ + public void send(Destination destination, Message message) throws JMSException + { + send(destination, message, _deliveryMode, _messagePriority, _timeToLive); + } + + /** + * Sends a message to a destination specifying delivery mode, priority and time to live. + * + * @param destination The destination to send this message to. + * @param message The message to be sent. + * @param deliveryMode The delivery mode to use. + * @param priority The priority for this message. + * @param timeToLive The message's lifetime (in milliseconds) + * @throws JMSException If sending the message fails due to some internal error. + * @throws MessageFormatException If an invalid message is specified. + * @throws InvalidDestinationException If an invalid destination is specified. + */ + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) + throws JMSException + { + checkNotClosed(); + getSession().checkDestination(destination); + // Do not allow negative timeToLive values + if (timeToLive < 0) + { + throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive); + } + // check that the message is not a foreign one + + // set the properties + + // + + // dispatch it + // todo getSession().getQpidSession().messageTransfer(((DestinationImpl) destination).getExchangeName(), message, Option); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidExceptionListenerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidExceptionListenerImpl.java new file mode 100644 index 0000000000..80eda8f7ce --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/QpidExceptionListenerImpl.java @@ -0,0 +1,51 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; + +import javax.jms.JMSException; + +/** + * An exception listner + */ +public class QpidExceptionListenerImpl //implements ExceptionListener +{ + private javax.jms.ExceptionListener _jmsExceptionListener; + + public QpidExceptionListenerImpl() + { + } + + void setJMSExceptionListner(javax.jms.ExceptionListener jmsExceptionListener) + { + _jmsExceptionListener = jmsExceptionListener; + } + //----- ExceptionListener API + + public void onException(QpidException exception) + { + // convert this exception in a JMS exception + JMSException jmsException = ExceptionHelper.convertQpidExceptionToJMSException(exception); + // propagate to the jms exception listener + if (_jmsExceptionListener != null) + { + _jmsExceptionListener.onException(jmsException); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java new file mode 100644 index 0000000000..35935d894c --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java @@ -0,0 +1,82 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.MessageListener; +import org.apache.qpidity.jms.message.QpidMessage; +import org.apache.qpidity.api.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> When asynchronous, upon receive of a message this listener delegate the dispatching to its session. + * This is for guarantying that asynch messages are sequentially processed within their session. + * <p> when used synchonously, messages are dispatched to the receiver itself. + */ +public class QpidMessageListener implements MessageListener +{ + /** + * Used for debugging. + */ + private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class); + + /** + * This message listener consumer + */ + MessageConsumerImpl _consumer = null; + + //---- constructor + /** + * Create a message listener wrapper for a given consumer + * + * @param consumer The consumer of this listener + */ + public QpidMessageListener(MessageConsumerImpl consumer) + { + _consumer = consumer; + } + + //---- org.apache.qpidity.MessagePartListener API + /** + * Deliver a message to the listener. + * + * @param message The message delivered to the listner. + */ + public void onMessage(Message message) + { + try + { + //convert this message into a JMS one + QpidMessage jmsMessage = null; // todo + // if consumer is asynchronous then send this message to its session. + if( _consumer.getMessageListener() != null ) + { + _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage); + } + else + { + // deliver this message to the consumer itself + _consumer.onMessage(jmsMessage); + } + } + catch (Exception e) + { + throw new RuntimeException(e.getMessage()); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java new file mode 100644 index 0000000000..ad7a777c37 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java @@ -0,0 +1,86 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import javax.jms.QueueBrowser; +import javax.jms.JMSException; +import javax.jms.Queue; +import java.util.Enumeration; + +/** + * Implementation of the JMS QueueBrowser interface + */ +public class QueueBrowserImpl extends MessageActor implements QueueBrowser +{ + /** + * The browsers MessageSelector. + */ + private String _messageSelector = null; + + //--- constructor + + /** + * Create a QueueBrowser for a specific queue and a given message selector. + * + * @param session The session of this browser. + * @param queue The queue name for this browser + * @param messageSelector only messages with properties matching the message selector expression are delivered. + * @throws JMSException In case of internal problem when creating this browser. + */ + protected QueueBrowserImpl(SessionImpl session, Queue queue, String messageSelector) throws JMSException + { + super(session, (DestinationImpl) queue); + _messageSelector = messageSelector; + //-- TODO: Create the QPid browser + } + + //--- javax.jms.QueueBrowser API + /** + * Get an enumeration for browsing the current queue messages in the order they would be received. + * + * @return An enumeration for browsing the messages + * @throws JMSException If getting the enumeration for this browser fails due to some internal error. + */ + public Enumeration getEnumeration() throws JMSException + { + // TODO + return null; + } + + /** + * Get the queue associated with this queue browser. + * + * @return The queue associated with this queue browser. + * @throws JMSException If getting the queue associated with this browser failts due to some internal error. + */ + public Queue getQueue() throws JMSException + { + return (Queue) _destination; + } + + /** + * Get this queue browser's message selector expression. + * + * @return This queue browser's message selector, or null if no message selector exists. + * @throws JMSException if getting the message selector for this browser fails due to some internal error. + */ + public String getMessageSelector() throws JMSException + { + return _messageSelector; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java new file mode 100644 index 0000000000..b95790486a --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java @@ -0,0 +1,75 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; +import org.apache.qpidity.url.BindingURL; +import org.apache.qpidity.exchange.ExchangeDefaults; + +import javax.jms.Queue; +import javax.jms.JMSException; + +/** + * Implementation of the JMS Queue interface + */ +public class QueueImpl extends DestinationImpl implements Queue +{ + + //--- Constructor + /** + * Create a new QueueImpl with a given name. + * + * @param name The name of this queue. + * @param session The session used to create this queue. + * @throws QpidException If the queue name is not valid + */ + protected QueueImpl(SessionImpl session, String name) throws QpidException + { + super(session, name); + _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + _queueName = name; + // check that this queue exist on the server + // As pasive is set the server will not create the queue. + session.getQpidSession().queueDeclare(name, null, null, Option.PASSIVE); + } + + /** + * Create a destiantion from a binding URL + * + * @param session The session used to create this queue. + * @param binding The URL + * @throws QpidException If the URL is not valid + */ + protected QueueImpl(SessionImpl session, BindingURL binding) throws QpidException + { + super(session, binding); + } + + //---- Interface javax.jms.Queue + /** + * Gets the name of this queue. + * + * @return This queue's name. + */ + public String getQueueName() throws JMSException + { + return super.getName(); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java new file mode 100644 index 0000000000..7ef0f151ae --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueReceiverImpl.java @@ -0,0 +1,55 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import javax.jms.QueueReceiver; +import javax.jms.JMSException; +import javax.jms.Queue; + +/** + * Implements javax.jms.QueueReceiver + */ +public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver +{ + //--- Constructor + /** + * create a new QueueReceiverImpl. + * + * @param session The session from which the QueueReceiverImpl is instantiated. + * @param queue The default queue for this QueueReceiverImpl. + * @param messageSelector the message selector for this QueueReceiverImpl. + * @throws Exception If the QueueReceiverImpl cannot be created due to some internal error. + */ + protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws Exception + { + super(session, (DestinationImpl) queue, messageSelector, false, null); + } + + //--- Interface QueueReceiver + /** + * Get the Queue associated with this queue receiver. + * + * @return this receiver's Queue + * @throws JMSException If getting the queue for this queue receiver fails due to some internal error. + */ + public Queue getQueue() throws JMSException + { + checkNotClosed(); + return (QueueImpl) _destination; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueSenderImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueSenderImpl.java new file mode 100644 index 0000000000..5ba2df1220 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueSenderImpl.java @@ -0,0 +1,131 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import javax.jms.QueueSender; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Message; + +/** + * Implements javax.jms.QueueSender + */ +public class QueueSenderImpl extends MessageProducerImpl implements QueueSender +{ + //--- Constructor + /** + * Create a new QueueSenderImpl. + * + * @param session the session from which the QueueSenderImpl is instantiated + * @param queue the default queue for this QueueSenderImpl + * @throws JMSException If the QueueSenderImpl cannot be created due to some internal error. + */ + protected QueueSenderImpl(SessionImpl session, QueueImpl queue) throws JMSException + { + super(session, queue); + } + + //--- Interface javax.jms.QueueSender + /** + * Get the queue associated with this QueueSender. + * + * @return This QueueSender's queue + * @throws JMSException If getting the queue for this QueueSender fails due to some internal error. + */ + public Queue getQueue() throws JMSException + { + return (Queue) getDestination(); + } + + /** + * Sends a message to the queue. Uses the <CODE>QueueSender</CODE>'s default delivery mode, priority, + * and time to live. + * + * @param message The message to send. + * @throws JMSException if sending the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If the queue is invalid. + * @throws java.lang.UnsupportedOperationException + * If invoked on QueueSender that did not specify a queue at creation time. + */ + public void send(Message message) throws JMSException + { + super.send(message); + } + + /** + * Send a message to the queue, specifying delivery mode, priority, and time to live. + * + * @param message The message to send + * @param deliveryMode The delivery mode to use + * @param priority The priority for this message + * @param timeToLive The message's lifetime (in milliseconds) + * + * @throws JMSException if sending the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If the queue is invalid. + * @throws java.lang.UnsupportedOperationException + * If invoked on QueueSender that did not specify a queue at creation time. + */ + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException + { + super.send(message, deliveryMode, priority, timeToLive); + } + + /** + * Send a message to a queue for an unidentified message producer. + * Uses the <CODE>QueueSender</CODE>'s default delivery mode, priority, + * and time to live. + * + * @param queue The queue to send this message to + * @param message The message to send + * @throws JMSException if sending the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If the queue is invalid. + */ + public void send(Queue queue, Message message) throws JMSException + { + super.send(queue, message); + } + + /** + * Sends a message to a queue for an unidentified message producer, + * specifying delivery mode, priority and time to live. + * + * @param queue The queue to send this message to + * @param message The message to send + * @param deliveryMode The delivery mode to use + * @param priority The priority for this message + * @param timeToLive The message's lifetime (in milliseconds) + * @throws JMSException if sending the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If the queue is invalid. + */ + public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException + { + super.send(queue, message, deliveryMode, priority, timeToLive); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java new file mode 100644 index 0000000000..8ba0f7409b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueSessionImpl.java @@ -0,0 +1,152 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import javax.jms.*; +import javax.jms.IllegalStateException; + +/** + * Implementation of javax.jms.QueueSession + */ +public class QueueSessionImpl extends SessionImpl implements QueueSession +{ + //--- constructor + /** + * Create a JMS Session + * + * @param connection The ConnectionImpl object from which the Session is created. + * @param transacted Indicates if the session transacted. + * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to + * {@link javax.jms.Session#SESSION_TRANSACTED} if the <code>transacted</code> + * parameter is true. + * @throws javax.jms.JMSSecurityException If the user could not be authenticated. + * @throws javax.jms.JMSException In case of internal error. + */ + protected QueueSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException + { + super(connection, transacted, acknowledgeMode); + } + + //-- Overwritten methods + /** + * Creates a durable subscriber to the specified topic, + * + * @param topic The non-temporary <CODE>Topic</CODE> to subscribe to. + * @param name The name used to identify this subscription. + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + { + throw new IllegalStateException("Cannot invoke createDurableSubscriber from QueueSession"); + } + + /** + * Create a TemporaryTopic. + * + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public TemporaryTopic createTemporaryTopic() throws JMSException + { + throw new IllegalStateException("Cannot invoke createTemporaryTopic from QueueSession"); + } + + /** + * Creates a topic identity given a Topicname. + * + * @param topicName The name of this <CODE>Topic</CODE> + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public Topic createTopic(String topicName) throws JMSException + { + throw new IllegalStateException("Cannot invoke createTopic from QueueSession"); + } + + /** + * Unsubscribes a durable subscription that has been created by a client. + * + * @param name the name used to identify this subscription + * @throws IllegalStateException Always + */ + @Override + public void unsubscribe(String name) throws JMSException + { + throw new IllegalStateException("Cannot invoke unsubscribe from QueueSession"); + } + + //--- Interface javax.jms.QueueSession + /** + * Create a QueueReceiver to receive messages from the specified queue. + * + * @param queue the <CODE>Queue</CODE> to access + * @return A QueueReceiver + * @throws JMSException If creating a receiver fails due to some internal error. + * @throws InvalidDestinationException If an invalid queue is specified. + */ + public QueueReceiver createReceiver(Queue queue) throws JMSException + { + return createReceiver(queue, null); + } + + /** + * Create a QueueReceiver to receive messages from the specified queue for a given message selector. + * + * @param queue the Queue to access + * @param messageSelector A value of null or an empty string indicates that + * there is no message selector for the message consumer. + * @return A QueueReceiver + * @throws JMSException If creating a receiver fails due to some internal error. + * @throws InvalidDestinationException If an invalid queue is specified. + * @throws InvalidSelectorException If the message selector is invalid. + */ + public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException + { + checkNotClosed(); + checkDestination(queue); + QueueReceiver receiver; + try + { + receiver = new QueueReceiverImpl(this, queue, messageSelector); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return receiver; + } + + /** + * Create a QueueSender object to send messages to the specified queue. + * + * @param queue the Queue to access, or null if this is an unidentified producer + * @return A QueueSender + * @throws JMSException If creating the sender fails due to some internal error. + * @throws InvalidDestinationException If an invalid queue is specified. + */ + public QueueSender createSender(Queue queue) throws JMSException + { + checkNotClosed(); + // we do not check the destination since unidentified producers are allowed (no default destination). + return new QueueSenderImpl(this, (QueueImpl) queue); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java new file mode 100644 index 0000000000..3d9b9b9d17 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -0,0 +1,1228 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpidity.jms.message.*; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.jms.Session; +import javax.jms.Message; +import javax.jms.MessageListener; +import java.io.Serializable; +import java.util.LinkedList; +import java.util.HashMap; +import java.util.ArrayList; + +/** + * Implementation of the JMS Session interface + */ +public class SessionImpl implements Session +{ + /** + * this session's logger + */ + private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class); + + /** + * A queue for incoming asynch messages. + */ + private final LinkedList<IncomingMessage> _incomingAsynchronousMessages = new LinkedList<IncomingMessage>(); + + //--- MessageDispatcherThread and Session locking + /** + * indicates that the MessageDispatcherThread has stopped + */ + private boolean _hasStopped = false; + + /** + * lock for the MessageDispatcherThread to wait until the session is stopped + */ + private final Object _stoppingLock = new Object(); + + /** + * lock for the stopper thread to wait on when the MessageDispatcherThread is stopping + */ + private final Object _stoppingJoin = new Object(); + + /** + * thread to dispatch messages to async consumers + */ + private MessageDispatcherThread _messageDispatcherThread = null; + //----END + + /** + * The messageActors of this session. + */ + private final HashMap<String, MessageActor> _messageActors = new HashMap<String, MessageActor>(); + + /** + * All the not yet acknoledged messages + */ + private final ArrayList<QpidMessage> _unacknowledgedMessages = new ArrayList<QpidMessage>(); + + /** + * Indicates whether this session is closed. + */ + private boolean _isClosed = false; + + /** + * Indicates whether this session is closing. + */ + private boolean _isClosing = false; + + /** + * Indicates whether this session is stopped. + */ + private boolean _isStopped = false; + + /** + * Used to indicate whether or not this is a transactional session. + */ + private boolean _transacted; + + /** + * Holds the sessions acknowledgement mode. + */ + private int _acknowledgeMode; + + /** + * The underlying QpidSession + */ + private org.apache.qpidity.Session _qpidSession; + + /** + * Indicates whether this session is recovering + */ + private boolean _inRecovery = false; + + /** + * This session connection + */ + private ConnectionImpl _connection; + + //--- Constructor + /** + * Create a JMS Session + * + * @param connection The ConnectionImpl object from which the Session is created. + * @param transacted Indicates if the session transacted. + * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to + * {@link Session#SESSION_TRANSACTED} if the <code>transacted</code> parameter is true. + * @throws JMSSecurityException If the user could not be authenticated. + * @throws JMSException In case of internal error. + */ + protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException + { + _connection = connection; + _transacted = transacted; + // for transacted sessions we ignore the acknowledgeMode and use GenericAckMode.SESSION_TRANSACTED + if (_transacted) + { + acknowledgeMode = Session.SESSION_TRANSACTED; + } + _acknowledgeMode = acknowledgeMode; + try + { + // create the qpid session with an expiry <= 0 so that the session does not expire + _qpidSession = _connection.getQpidConnection().createSession(0); + // set transacted if required + if (_transacted) + { + //_qpidSession.setTransacted(); + } + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + // init the message dispatcher. + initMessageDispatcherThread(); + } + + //--- javax.jms.Session API + /** + * Creates a <CODE>BytesMessage</CODE> object used to send a message + * containing a stream of uninterpreted bytes. + * + * @return A BytesMessage. + * @throws JMSException If Creating a BytesMessage object fails due to some internal error. + */ + public BytesMessage createBytesMessage() throws JMSException + { + checkNotClosed(); + return new JMSBytesMessage(); + } + + /** + * Creates a <CODE>MapMessage</CODE> object used to send a self-defining set + * of name-value pairs, where names are Strings and values are primitive values. + * + * @return A MapMessage. + * @throws JMSException If Creating a MapMessage object fails due to some internal error. + */ + public MapMessage createMapMessage() throws JMSException + { + checkNotClosed(); + return new JMSMapMessage(); + } + + /** + * Creates a <code>Message</code> object that holds all the standard message header information. + * It can be sent when a message containing only header information is sufficient. + * We simply return a ByteMessage + * + * @return A Message. + * @throws JMSException If Creating a Message object fails due to some internal error. + */ + public Message createMessage() throws JMSException + { + return createBytesMessage(); + } + + /** + * Creates an <code>ObjectMessage</code> used to send a message + * that contains a serializable Java object. + * + * @return An ObjectMessage. + * @throws JMSException If Creating an ObjectMessage object fails due to some internal error. + */ + public ObjectMessage createObjectMessage() throws JMSException + { + checkNotClosed(); + return new JMSObjectMessage(); + } + + /** + * Creates an initialized <code>ObjectMessage</code> used to send a message that contains + * a serializable Java object. + * + * @param serializable The object to use to initialize this message. + * @return An initialised ObjectMessage. + * @throws JMSException If Creating an initialised ObjectMessage object fails due to some internal error. + */ + public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException + { + ObjectMessage msg = createObjectMessage(); + msg.setObject(serializable); + return msg; + } + + /** + * Creates a <code>StreamMessage</code> used to send a + * self-defining stream of primitive values in the Java programming + * language. + * + * @return A StreamMessage + * @throws JMSException If Creating an StreamMessage object fails due to some internal error. + */ + public StreamMessage createStreamMessage() throws JMSException + { + checkNotClosed(); + return new JMSStreamMessage(); + } + + /** + * Creates a <code>TextMessage</code> object used to send a message containing a String. + * + * @return A TextMessage object + * @throws JMSException If Creating an TextMessage object fails due to some internal error. + */ + public TextMessage createTextMessage() throws JMSException + { + checkNotClosed(); + return new JMSTextMessage(); + } + + /** + * Creates an initialized <code>TextMessage</code> used to send + * a message containing a String. + * + * @param text The string used to initialize this message. + * @return An initialized TextMessage + * @throws JMSException If Creating an initialised TextMessage object fails due to some internal error. + */ + public TextMessage createTextMessage(String text) throws JMSException + { + TextMessage msg = createTextMessage(); + msg.setText(text); + return msg; + } + + /** + * Indicates whether the session is in transacted mode. + * + * @return true if the session is in transacted mode + * @throws JMSException If geting the transaction mode fails due to some internal error. + */ + public boolean getTransacted() throws JMSException + { + checkNotClosed(); + return _transacted; + } + + /** + * Returns the acknowledgement mode of this session. + * <p> The acknowledgement mode is set at the time that the session is created. + * If the session is transacted, the acknowledgement mode is ignored. + * + * @return If the session is not transacted, returns the current acknowledgement mode for the session. + * else returns SESSION_TRANSACTED. + * @throws JMSException if geting the acknowledgement mode fails due to some internal error. + */ + public int getAcknowledgeMode() throws JMSException + { + checkNotClosed(); + return _acknowledgeMode; + } + + /** + * Commits all messages done in this transaction. + * + * @throws JMSException If committing the transaction fails due to some internal error. + * @throws TransactionRolledBackException If the transaction is rolled back due to some internal error during commit. + * @throws javax.jms.IllegalStateException + * If the method is not called by a transacted session. + */ + public void commit() throws JMSException + { + checkNotClosed(); + //make sure the Session is a transacted one + if (!_transacted) + { + throw new IllegalStateException("Cannot commit non-transacted session", "Session is not transacted"); + } + // commit the underlying Qpid Session + try + { + // Note: this operation makes sure that asynch message processing has returned + _qpidSession.txCommit(); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + /** + * Rolls back any messages done in this transaction. + * + * @throws JMSException If rolling back the session fails due to some internal error. + * @throws javax.jms.IllegalStateException + * If the method is not called by a transacted session. + */ + public void rollback() throws JMSException + { + checkNotClosed(); + //make sure the Session is a transacted one + if (!_transacted) + { + throw new IllegalStateException("Cannot rollback non-transacted session", "Session is not transacted"); + } + // rollback the underlying Qpid Session + try + { + // Note: this operation makes sure that asynch message processing has returned + _qpidSession.txRollback(); + } + catch (org.apache.qpidity.QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + /** + * Closes this session. + * <p> The JMS specification says + * <P> This call will block until a <code>receive</code> call or message + * listener in progress has completed. A blocked message consumer + * <code>receive</code> call returns <code>null</code> when this session is closed. + * <P>Closing a transacted session must roll back the transaction in progress. + * <P>This method is the only <code>Session</code> method that can be called concurrently. + * <P>Invoking any other <code>Session</code> method on a closed session + * must throw a <code>javax.jms.IllegalStateException</code>. + * <p> Closing a closed session must <I>not</I> throw an exception. + * + * @throws JMSException If closing the session fails due to some internal error. + */ + public synchronized void close() throws JMSException + { + if (!_isClosed) + { + _messageDispatcherThread.interrupt(); + if (!_isClosing) + { + _isClosing = true; + // if the session is stopped then restart it before notifying on the lock + // that will stop the sessionThread + if (_isStopped) + { + startDispatchThread(); + } + //notify the sessionThread + synchronized (_incomingAsynchronousMessages) + { + _incomingAsynchronousMessages.notifyAll(); + } + + try + { + _messageDispatcherThread.join(); + _messageDispatcherThread = null; + } + catch (InterruptedException ie) + { + /* ignore */ + } + } + // from now all the session methods will throw a IllegalStateException + _isClosed = true; + // close all the actors + closeAllMessageActors(); + _messageActors.clear(); + // We may have a thread trying to add a message + synchronized (_incomingAsynchronousMessages) + { + _incomingAsynchronousMessages.clear(); + _incomingAsynchronousMessages.notifyAll(); + } + // close the underlaying QpidSession + try + { + _qpidSession.close(); + } + catch (org.apache.qpidity.QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + } + + /** + * Stops message delivery in this session, and restarts message delivery with + * the oldest unacknowledged message. + * <p>Recovering a session causes it to take the following actions: + * <ul> + * <li>Stop message delivery. + * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered". + * <li>Restart the delivery sequence including all unacknowledged messages that had been + * previously delivered. + * Redelivered messages do not have to be delivered in exactly their original delivery order. + * </ul> + * + * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error. + * Not that this does not necessarily mean that the recovery has failed, but simply that it is + * not possible to tell if it has or not. + */ + public void recover() throws JMSException + { + // Ensure that the session is open. + checkNotClosed(); + // we are recovering + _inRecovery = true; + // Ensure that the session is not transacted. + if (getTransacted()) + { + throw new IllegalStateException("Session is transacted"); + } + // release all unack messages + for (QpidMessage message : _unacknowledgedMessages) + { + // release this message + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + try + { + getQpidSession().messageRelease(range); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + // TODO We can be a little bit cleverer and build a set of ranges + } + } + + /** + * Returns the session's distinguished message listener (optional). + * <p>This is an expert facility used only by Application Servers. + * <p> This is an optional operation that is not yet supported + * + * @return The message listener associated with this session. + * @throws JMSException If getting the message listener fails due to an internal error. + */ + public MessageListener getMessageListener() throws JMSException + { + checkNotClosed(); + throw new java.lang.UnsupportedOperationException(); + } + + /** + * Sets the session's distinguished message listener. + * <p>This is an expert facility used only by Application Servers. + * <p> This is an optional operation that is not yet supported + * + * @param messageListener The message listener to associate with this session + * @throws JMSException If setting the message listener fails due to an internal error. + */ + public void setMessageListener(MessageListener messageListener) throws JMSException + { + checkNotClosed(); + throw new java.lang.UnsupportedOperationException(); + } + + /** + * Optional operation, intended to be used only by Application Servers, + * not by ordinary JMS clients. + * <p> This is an optional operation that is not yet supported + */ + public void run() + { + throw new java.lang.UnsupportedOperationException(); + } + + /** + * Creates a MessageProducer to send messages to the specified destination. + * + * @param destination the Destination to send messages to, or null if this is a producer + * which does not have a specified destination. + * @return A new MessageProducer + * @throws JMSException If the session fails to create a MessageProducer + * due to some internal error. + * @throws InvalidDestinationException If an invalid destination is specified. + */ + public MessageProducer createProducer(Destination destination) throws JMSException + { + checkNotClosed(); + MessageProducerImpl producer = new MessageProducerImpl(this, (DestinationImpl) destination); + // register this actor with the session + _messageActors.put(producer.getMessageActorID(), producer); + return producer; + } + + /** + * Creates a MessageConsumer for the specified destination. + * + * @param destination The <code>Destination</code> to access + * @return A new MessageConsumer for the specified destination. + * @throws JMSException If the session fails to create a MessageConsumer due to some internal error. + * @throws InvalidDestinationException If an invalid destination is specified. + */ + public MessageConsumer createConsumer(Destination destination) throws JMSException + { + return createConsumer(destination, null); + } + + /** + * Creates a MessageConsumer for the specified destination, using a message selector. + * + * @param destination The <code>Destination</code> to access + * @param messageSelector Only messages with properties matching the message selector expression are delivered. + * @return A new MessageConsumer for the specified destination. + * @throws JMSException If the session fails to create a MessageConsumer due to some internal error. + * @throws InvalidDestinationException If an invalid destination is specified. + * @throws InvalidSelectorException If the message selector is invalid. + */ + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException + { + return createConsumer(destination, messageSelector, false); + } + + /** + * Creates MessageConsumer for the specified destination, using a message selector. + * <p> This method can specify whether messages published by its own connection should + * be delivered to it, if the destination is a topic. + * <p/> + * <P>In some cases, a connection may both publish and subscribe to a topic. The consumer + * NoLocal attribute allows a consumer to inhibit the delivery of messages published by its + * own connection. The default value for this attribute is False. + * + * @param destination The <code>Destination</code> to access + * @param messageSelector Only messages with properties matching the message selector expression are delivered. + * @param noLocal If true, and the destination is a topic, inhibits the delivery of messages published + * by its own connection. + * @return A new MessageConsumer for the specified destination. + * @throws JMSException If the session fails to create a MessageConsumer due to some internal error. + * @throws InvalidDestinationException If an invalid destination is specified. + * @throws InvalidSelectorException If the message selector is invalid. + */ + public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) + throws JMSException + { + checkNotClosed(); + checkDestination(destination); + MessageConsumerImpl consumer; + try + { + consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + // register this actor with the session + _messageActors.put(consumer.getMessageActorID(), consumer); + return consumer; + } + + /** + * Creates a queue identity by a given name. + * <P>This facility is provided for the rare cases where clients need to + * dynamically manipulate queue identity. It allows the creation of a + * queue identity with a provider-specific name. Clients that depend + * on this ability are not portable. + * <P>Note that this method is not for creating the physical queue. + * The physical creation of queues is an administrative task and is not + * to be initiated by the JMS API. The one exception is the + * creation of temporary queues, which is accomplished with the + * <code>createTemporaryQueue</code> method. + * + * @param queueName the name of this <code>Queue</code> + * @return a <code>Queue</code> with the given name + * @throws JMSException If the session fails to create a queue due to some internal error. + */ + public Queue createQueue(String queueName) throws JMSException + { + checkNotClosed(); + Queue result; + try + { + result = new QueueImpl(this, queueName); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; + } + + /** + * Creates a topic identity given a Topicname. + * <P>This facility is provided for the rare cases where clients need to + * dynamically manipulate queue identity. It allows the creation of a + * queue identity with a provider-specific name. Clients that depend + * on this ability are not portable. + * <P>Note that this method is not for creating the physical queue. + * The physical creation of queues is an administrative task and is not + * to be initiated by the JMS API. The one exception is the + * creation of temporary queues, which is accomplished with the + * <code>createTemporaryTopic</code> method. + * + * @param topicName The name of this <code>Topic</code> + * @return a <code>Topic</code> with the given name + * @throws JMSException If the session fails to create a topic due to some internal error. + */ + public Topic createTopic(String topicName) throws JMSException + { + checkNotClosed(); + Topic result; + try + { + result = new TopicImpl(this, topicName); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; + } + + /** + * Creates a durable subscriber to the specified topic, + * + * @param topic The non-temporary <code>Topic</code> to subscribe to. + * @param name The name used to identify this subscription. + * @return A durable subscriber to the specified topic, + * @throws JMSException If creating a subscriber fails due to some internal error. + * @throws InvalidDestinationException If an invalid topic is specified. + * @throws InvalidSelectorException If the message selector is invalid. + */ + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + { + // by default, use a null messageselector and set noLocal to falsen + return createDurableSubscriber(topic, name, null, false); + } + + /** + * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages + * published by its + * own connection should be delivered to it. + * <p> A client can change an existing durable subscription by creating a durable <code>TopicSubscriber</code> with + * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to + * unsubscribing (deleting) the old one and creating a new one. + * + * @param topic The non-temporary <code>Topic</code> to subscribe to. + * @param name The name used to identify this subscription. + * @param messageSelector Only messages with properties matching the message selector expression are delivered. + * @param noLocal If set, inhibits the delivery of messages published by its own connection + * @return A durable subscriber to the specified topic, + * @throws JMSException If creating a subscriber fails due to some internal error. + * @throws InvalidDestinationException If an invalid topic is specified. + * @throws InvalidSelectorException If the message selector is invalid. + */ + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) + throws JMSException + { + checkNotClosed(); + checkDestination(topic); + TopicSubscriberImpl subscriber; + try + { + subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, + _connection.getClientID() + ":" + name); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + _messageActors.put(subscriber.getMessageActorID(), subscriber); + return subscriber; + } + + /** + * Create a QueueBrowser to peek at the messages on the specified queue. + * + * @param queue The <code>Queue</code> to browse. + * @return A QueueBrowser. + * @throws JMSException If creating a browser fails due to some internal error. + * @throws InvalidDestinationException If an invalid queue is specified. + */ + public QueueBrowser createBrowser(Queue queue) throws JMSException + { + return createBrowser(queue, null); + } + + /** + * Create a QueueBrowser to peek at the messages on the specified queue using a message selector. + * + * @param queue The <code>Queue</code> to browse. + * @param messageSelector Only messages with properties matching the message selector expression are delivered. + * @return A QueueBrowser. + * @throws JMSException If creating a browser fails due to some internal error. + * @throws InvalidDestinationException If an invalid queue is specified. + * @throws InvalidSelectorException If the message selector is invalid. + */ + public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException + { + checkNotClosed(); + checkDestination(queue); + QueueBrowserImpl browser = new QueueBrowserImpl(this, queue, messageSelector); + // register this actor with the session + _messageActors.put(browser.getMessageActorID(), browser); + return browser; + } + + /** + * Create a TemporaryQueue. Its lifetime will be the Connection unless it is deleted earlier. + * + * @return A temporary queue. + * @throws JMSException If creating the temporary queue fails due to some internal error. + */ + public TemporaryQueue createTemporaryQueue() throws JMSException + { + TemporaryQueue result; + try + { + result = new TemporaryQueueImpl(this); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; + } + + /** + * Create a TemporaryTopic. Its lifetime will be the Connection unless it is deleted earlier. + * + * @return A temporary topic. + * @throws JMSException If creating the temporary topic fails due to some internal error. + */ + public TemporaryTopic createTemporaryTopic() throws JMSException + { + TemporaryTopic result; + try + { + result = new TemporaryTopicImpl(this); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; + } + + /** + * Unsubscribes a durable subscription that has been created by a client. + * <p/> + * <P>This method deletes the state being maintained on behalf of the + * subscriber by its provider. + * <p/> + * <P>It is erroneous for a client to delete a durable subscription + * while there is an active <code>TopicSubscriber</code> for the + * subscription, or while a consumed message is part of a pending + * transaction or has not been acknowledged in the session. + * + * @param name the name used to identify this subscription + * @throws JMSException if the session fails to unsubscribe to the durable subscription due to some internal error. + * @throws InvalidDestinationException if an invalid subscription name + * is specified. + */ + public void unsubscribe(String name) throws JMSException + { + checkNotClosed(); + } + + //----- Protected methods + + /** + * Remove a message actor form this session + * <p> This method is called when an actor is independently closed. + * + * @param messageActor The closed actor. + */ + protected void closeMessageActor(MessageActor messageActor) + { + _messageActors.remove(messageActor.getMessageActorID()); + } + + /** + * Idincates whether this session is stopped. + * + * @return True is this session is stopped, false otherwise. + */ + protected boolean isStopped() + { + return _isStopped; + } + + /** + * Start the flow of message to this session. + * + * @throws Exception If starting the session fails due to some communication error. + */ + protected synchronized void start() throws Exception + { + if (_isStopped) + { + // start all the MessageActors + for (MessageActor messageActor : _messageActors.values()) + { + messageActor.start(); + } + startDispatchThread(); + } + } + + /** + * Restart delivery of asynch messages + */ + private void startDispatchThread() + { + synchronized (_stoppingLock) + { + _isStopped = false; + _stoppingLock.notify(); + } + synchronized (_stoppingJoin) + { + _hasStopped = false; + } + } + + /** + * Stop the flow of message to this session. + * + * @throws Exception If stopping the session fails due to some communication error. + */ + protected synchronized void stop() throws Exception + { + if (!_isClosing && !_isStopped) + { + // stop all the MessageActors + for (MessageActor messageActor : _messageActors.values()) + { + messageActor.stop(); + } + synchronized (_incomingAsynchronousMessages) + { + _isStopped = true; + // unlock the sessionThread that will then wait on _stoppingLock + _incomingAsynchronousMessages.notifyAll(); + } + // wait for the sessionThread to stop processing messages + synchronized (_stoppingJoin) + { + while (!_hasStopped) + { + try + { + _stoppingJoin.wait(); + } + catch (InterruptedException e) + { + /* ignore */ + } + } + } + } + } + + /** + * Notify this session that a message is processed + * + * @param message The processed message. + */ + protected void preProcessMessage(QpidMessage message) + { + _inRecovery = false; + } + + /** + * Dispatch this message to this session asynchronous consumers + * + * @param consumerID The consumer ID. + * @param message The message to be dispatched. + */ + public void dispatchMessage(String consumerID, QpidMessage message) + { + synchronized (_incomingAsynchronousMessages) + { + _incomingAsynchronousMessages.addLast(new IncomingMessage(consumerID, message)); + _incomingAsynchronousMessages.notifyAll(); + } + } + + /** + * Indicate whether this session is recovering . + * + * @return true if this session is recovering. + */ + protected boolean isInRecovery() + { + return _inRecovery; + } + + /** + * Validate that the Session is not closed. + * <p/> + * If the Session has been closed, throw a IllegalStateException. This behaviour is + * required by the JMS specification. + * + * @throws IllegalStateException If the session is closed. + */ + protected void checkNotClosed() throws IllegalStateException + { + if (_isClosed) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Session has been closed. Cannot invoke any further operations."); + } + throw new javax.jms.IllegalStateException("Session has been closed. Cannot invoke any further operations."); + } + } + + /** + * Validate that the destination is valid i.e. it is not null + * + * @param dest The destination to be checked + * @throws InvalidDestinationException If the destination not valid. + */ + protected void checkDestination(Destination dest) throws InvalidDestinationException + { + if (dest == null) + { + throw new javax.jms.InvalidDestinationException("Invalid destination specified: " + dest, + "Invalid destination"); + } + } + + /** + * A session keeps the list of unack messages only when the ack mode is + * set to client ack mode. Otherwise messages are always ack. + * <p> We can use an ack heuristic for dups ok mode where bunch of messages are ack. + * This has to be done. + * + * @param message The message to be acknowledged. + * @throws JMSException If the message cannot be acknowledged due to an internal error. + */ + protected void acknowledgeMessage(QpidMessage message) throws JMSException + { + if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + // messages will be acknowldeged by the client application. + // store this message for acknowledging it afterward + synchronized (_unacknowledgedMessages) + { + _unacknowledgedMessages.add(message); + } + } + else + { + // acknowledge this message + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + try + { + getQpidSession().messageAcknowledge(range); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + //tobedone: Implement DUPS OK heuristic + } + + /** + * This method is called when a message is acked. + * <p/> + * <P>Acknowledgment of a message automatically acknowledges all + * messages previously received by the session. Clients may + * individually acknowledge messages or they may choose to acknowledge + * messages in application defined groups (which is done by acknowledging + * the last received message in the group). + * + * @throws JMSException If this method is called on a closed session. + */ + protected void acknowledge() throws JMSException + { + checkNotClosed(); + if (getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + synchronized (_unacknowledgedMessages) + { + for (QpidMessage message : _unacknowledgedMessages) + { + // acknowledge this message + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + try + { + getQpidSession().messageAcknowledge(range); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + // TODO We can be a little bit cleverer and build a set of ranges + } + //empty the list of unack messages + _unacknowledgedMessages.clear(); + } + } + //else there is no effect + } + + /** + * Access to the underlying Qpid Session + * + * @return The associated Qpid Session. + */ + protected org.apache.qpidity.Session getQpidSession() + { + return _qpidSession; + } + + /** + * Get this session's conneciton + * + * @return This session's connection + */ + protected ConnectionImpl getConnection() + { + return _connection; + } + + //------ Private Methods + /** + * Close the producer and the consumers of this session + * + * @throws JMSException If one of the MessaeActor cannot be closed due to some internal error. + */ + private void closeAllMessageActors() throws JMSException + { + for (MessageActor messageActor : _messageActors.values()) + { + messageActor.closeMessageActor(); + } + } + + /** + * create and start the MessageDispatcherThread. + */ + private synchronized void initMessageDispatcherThread() + { + // Create and start a MessageDispatcherThread + // This thread is dispatching messages to the async consumers + _messageDispatcherThread = new MessageDispatcherThread(); + _messageDispatcherThread.start(); + } + + //------ Inner classes + + /** + * Convenient class for storing incoming messages associated with a consumer ID. + * <p> Those messages are enqueued in _incomingAsynchronousMessages + */ + private class IncomingMessage + { + // The consumer ID + private String _consumerId; + // The message + private QpidMessage _message; + + //-- constructor + /** + * Creat a new incoming message + * + * @param consumerId The consumer ID + * @param message The message to be delivered + */ + IncomingMessage(String consumerId, QpidMessage message) + { + _consumerId = consumerId; + _message = message; + } + + // Getters + /** + * Get the consumer ID + * + * @return The consumer ID for this message + */ + public String getConsumerId() + { + return _consumerId; + } + + /** + * Get the message. + * + * @return The message. + */ + public QpidMessage getMessage() + { + return _message; + } + } + + /** + * A MessageDispatcherThread is attached to every SessionImpl. + * <p/> + * This thread is responsible for removing messages from m_incomingMessages and + * dispatching them to the appropriate MessageConsumer. + * <p> Messages have to be dispatched serially. + */ + private class MessageDispatcherThread extends Thread + { + //--- Constructor + /** + * Create a Deamon thread for dispatching messages to this session listeners. + */ + MessageDispatcherThread() + { + super("MessageDispatcher"); + // this thread is Deamon + setDaemon(true); + } + + /** + * Use to run this thread. + */ + public void run() + { + IncomingMessage message = null; + // deliver messages to asynchronous consumers until the stop flag is set. + do + { + // When this session is not closing and and stopped + // then this thread needs to wait until messages are delivered. + synchronized (_incomingAsynchronousMessages) + { + while (!_isClosing && !_isStopped && _incomingAsynchronousMessages.isEmpty()) + { + try + { + _incomingAsynchronousMessages.wait(); + } + catch (InterruptedException ie) + { + /* ignore */ + } + } + } + // If this session is stopped then we need to wait on the stoppingLock + synchronized (_stoppingLock) + { + try + { + while (_isStopped) + { + // if the session is stopped we have to notify the stopper thread + synchronized (_stoppingJoin) + { + _hasStopped = true; + _stoppingJoin.notify(); + } + _stoppingLock.wait(); + } + } + catch (Exception ie) + { + /* ignore */ + } + } + synchronized (_incomingAsynchronousMessages) + { + if (!_isClosing && !_incomingAsynchronousMessages.isEmpty()) + { + message = _incomingAsynchronousMessages.getFirst(); + } + } + + if (message != null) + { + MessageConsumerImpl mc; + synchronized (_messageActors) + { + mc = (MessageConsumerImpl) _messageActors.get(message.getConsumerId()); + } + if (mc != null) + { + try + { + mc.onMessage(message.getMessage()); + } + catch (RuntimeException t) + { + // the JMS specification tells us to flag that to the client! + _logger.error( + "Warning! Asynchronous message consumer" + mc + " from session " + this + " has thrown a RunTimeException " + t); + } + } + } + message = null; + } + while (!_isClosing); // repeat as long as this session is not closing + } + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryDestination.java b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryDestination.java new file mode 100644 index 0000000000..b5e59eeaa6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryDestination.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import javax.jms.JMSException; + +/** + * Interface to abstract functionalities of temporary destinations. + */ +public interface TemporaryDestination +{ + /** + * Delete this temporary destination. + * + * @throws javax.jms.JMSException If the temporary destination cannot be deleted due to some internal error. + */ + public void delete() throws JMSException; + + /** + * Indicate whether this temporary destination is deleted + * @return True is this temporary destination is deleted, false otherwise + */ + public boolean isdeleted(); + +}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java new file mode 100644 index 0000000000..fe94531046 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java @@ -0,0 +1,92 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; +import org.apache.qpidity.exchange.ExchangeDefaults; + +import javax.jms.TemporaryQueue; +import javax.jms.JMSException; +import java.util.UUID; + +/** + * Implements TemporaryQueue + */ +public class TemporaryQueueImpl extends DestinationImpl implements TemporaryQueue, TemporaryDestination +{ + /** + * Indicates whether this temporary queue is deleted. + */ + private boolean _isDeleted = false; + + //--- constructor + + /** + * Create a new TemporaryQueueImpl with a given name. + * + * @param session The session used to create this TemporaryQueueImpl. + * @throws QpidException If creating the TemporaryQueueImpl fails due to some error. + */ + protected TemporaryQueueImpl(SessionImpl session) throws QpidException + { + // temporary destinations do not have names + super(session, "NAME_NOT_SET"); + _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + _queueName = "TempQueue-" + UUID.randomUUID(); + // check that this queue exist on the server + // As pasive is set the server will not create the queue. + session.getQpidSession().queueDeclare(_queueName, null, null, Option.AUTO_DELETE); + session.getQpidSession().queueBind(_queueName, _exchangeName, _queueName, null); + } + + //-- TemporaryDestination Interface + /** + * Specify whether this temporary destination is deleted. + * + * @return true is this temporary destination is deleted. + */ + public boolean isdeleted() + { + return _isDeleted; + } + + //-- TemporaryTopic Interface + /** + * Delete this temporary destinaiton + * + * @throws JMSException If deleting this temporary queue fails due to some error. + */ + public void delete() throws JMSException + { + // todo delete this temporary queue + _isDeleted = true; + } + + //---- Interface javax.jms.Queue + /** + * Gets the name of this queue. + * + * @return This queue's name. + */ + public String getQueueName() throws JMSException + { + return super.getName(); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java new file mode 100644 index 0000000000..e103c9dad1 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java @@ -0,0 +1,61 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; + +import javax.jms.TemporaryTopic; +import javax.jms.JMSException; + + +/** + * Implements TemporaryTopic + */ +public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic, TemporaryDestination +{ + /** + * Indicates whether this temporary topic is deleted. + */ + private boolean _isDeleted = false; + + //--- constructor + /** + * Create a new TemporaryTopicImpl with a given name. + * + * @param session The session used to create this TemporaryTopicImpl. + * @throws QpidException If creating the TemporaryTopicImpl fails due to some error. + */ + protected TemporaryTopicImpl(SessionImpl session) throws QpidException + { + // temporary destinations do not have names. + super(session, "NAME_NOT_SET"); + } + + //-- TemporaryDestination Interface + public boolean isdeleted() + { + return _isDeleted; + } + + //-- TemporaryTopic Interface + public void delete() throws JMSException + { + // todo: delete this destinaiton + _isDeleted = true; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java new file mode 100644 index 0000000000..a439c514d4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java @@ -0,0 +1,69 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; +import org.apache.qpidity.exchange.ExchangeDefaults; +import org.apache.qpidity.url.BindingURL; + +import javax.jms.Topic; + +/** + * Implementation of the javax.jms.Topic interface. + */ +public class TopicImpl extends DestinationImpl implements Topic +{ + //--- Constructor + /** + * Create a new TopicImpl with a given name. + * + * @param name The name of this topic + * @param session The session used to create this queue. + * @throws QpidException If the topic name is not valid + */ + public TopicImpl(SessionImpl session, String name) throws QpidException + { + super(session, name); + _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + _exchangeClass = ExchangeDefaults.TOPIC_EXCHANGE_CLASS; + } + + /** + * Create a TopicImpl from a binding URL + * + * @param session The session used to create this Topic. + * @param binding The URL + * @throws QpidException If the URL is not valid + */ + protected TopicImpl(SessionImpl session, BindingURL binding) throws QpidException + { + super(session, binding); + } + + //--- javax.jsm.Topic Interface + /** + * Gets the name of this topic. + * + * @return This topic's name. + */ + public String getTopicName() + { + return super.getName(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicPublisherImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicPublisherImpl.java new file mode 100644 index 0000000000..4bf39e35c4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicPublisherImpl.java @@ -0,0 +1,128 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import javax.jms.*; + +/** + * Implements TopicPublisher + */ +public class TopicPublisherImpl extends MessageProducerImpl implements TopicPublisher +{ + //--- Constructor + /** + * Create a TopicPublisherImpl. + * + * @param session The session from which the TopicPublisherImpl is instantiated + * @param topic The default topic for this TopicPublisherImpl + * @throws JMSException If the TopicPublisherImpl cannot be created due to some internal error. + */ + protected TopicPublisherImpl(SessionImpl session, Topic topic) throws JMSException + { + super(session, (DestinationImpl) topic); + } + + //--- Interface javax.jms.TopicPublisher + /** + * Get the topic associated with this TopicPublisher. + * + * @return This publisher's topic + * @throws JMSException If getting the topic fails due to some internal error. + */ + public Topic getTopic() throws JMSException + { + return (Topic) getDestination(); + } + + + /** + * Publish a message to the topic using the default delivery mode, priority and time to live. + * + * @param message The message to publish + * @throws JMSException If publishing the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If an invalid topic is specified. + * @throws java.lang.UnsupportedOperationException + * If that publisher topic was not specified at creation time. + */ + public void publish(Message message) throws JMSException + { + super.send(message); + } + + /** + * Publish a message to the topic, specifying delivery mode, priority and time to live. + * + * @param message The message to publish + * @param deliveryMode The delivery mode to use + * @param priority The priority for this message + * @param timeToLive The message's lifetime (in milliseconds) + * @throws JMSException If publishing the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If an invalid topic is specified. + * @throws java.lang.UnsupportedOperationException + * If that publisher topic was not specified at creation time. + */ + public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException + { + super.send(message, deliveryMode, priority, timeToLive); + } + + + /** + * Publish a message to a topic for an unidentified message producer. + * Uses this TopicPublisher's default delivery mode, priority and time to live. + * + * @param topic The topic to publish this message to + * @param message The message to publish + * @throws JMSException If publishing the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If an invalid topic is specified. + */ + public void publish(Topic topic, Message message) throws JMSException + { + super.send(topic, message); + } + + /** + * Publishes a message to a topic for an unidentified message + * producer, specifying delivery mode, priority and time to live. + * + * @param topic The topic to publish this message to + * @param message The message to publish + * @param deliveryMode The delivery mode + * @param priority The priority for this message + * @param timeToLive The message's lifetime (in milliseconds) + * @throws JMSException If publishing the message fails due to some internal error. + * @throws javax.jms.MessageFormatException + * If an invalid message is specified. + * @throws javax.jms.InvalidDestinationException + * If an invalid topic is specified. + */ + public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive) throws + JMSException + { + super.send(topic, message, deliveryMode, priority, timeToLive); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java new file mode 100644 index 0000000000..9aff4f1416 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicSessionImpl.java @@ -0,0 +1,153 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import javax.jms.*; +import javax.jms.IllegalStateException; + +/** + * Implements TopicSession + */ +public class TopicSessionImpl extends SessionImpl implements TopicSession +{ + //-- constructor + /** + * Create a new TopicSessionImpl. + * + * @param connection The ConnectionImpl object from which the Session is created. + * @param transacted Specifiy whether this session is transacted? + * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to + * {@link javax.jms.Session#SESSION_TRANSACTED} if the <code>transacted</code> parameter + * is true. + * @throws javax.jms.JMSSecurityException If the user could not be authenticated. + * @throws javax.jms.JMSException In case of internal error. + */ + protected TopicSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException + { + super(connection, transacted, acknowledgeMode); + } + + //-- Overwritten methods + /** + * Create a QueueBrowser. + * + * @param queue The <CODE>Queue</CODE> to browse. + * @param messageSelector Only messages with properties matching the message selector expression are delivered. + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException + { + throw new IllegalStateException("Cannot invoke createBrowser from TopicSession"); + } + + /** + * Create a QueueBrowser. + * + * @param queue The <CODE>Queue</CODE> to browse. + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public QueueBrowser createBrowser(Queue queue) throws JMSException + { + throw new IllegalStateException("Cannot invoke createBrowser from TopicSession"); + } + + /** + * Creates a temporary queue. + * + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public TemporaryQueue createTemporaryQueue() throws JMSException + { + throw new IllegalStateException("Cannot invoke createTemporaryQueue from TopicSession"); + } + + /** + * Creates a queue identity by a given name. + * + * @param queueName the name of this <CODE>Queue</CODE> + * @return Always throws an exception + * @throws IllegalStateException Always + */ + @Override + public Queue createQueue(String queueName) throws JMSException + { + throw new IllegalStateException("Cannot invoke createQueue from TopicSession"); + } + + //--- Interface TopicSession + /** + * Create a publisher for the specified topic. + * + * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified publisher. + * @throws JMSException If the creating a publisher fails due to some internal error. + * @throws InvalidDestinationException If an invalid topic is specified. + */ + public TopicPublisher createPublisher(Topic topic) throws JMSException + { + + checkNotClosed(); + // we do not check the destination topic here, since unidentified publishers are allowed. + return new TopicPublisherImpl(this, topic); + } + + /** + * Creates a nondurable subscriber to the specified topic. + * + * @param topic The Topic to subscribe to + * @throws JMSException If creating a subscriber fails due to some internal error. + * @throws InvalidDestinationException If an invalid topic is specified. + */ + public TopicSubscriber createSubscriber(Topic topic) throws JMSException + { + return createSubscriber(topic, null, false); + } + + /** + * Creates a nondurable subscriber to the specified topic, using a + * message selector or specifying whether messages published by its + * own connection should be delivered to it. + * + * @param topic The Topic to subscribe to + * @param messageSelector A value of null or an empty string indicates that there is no message selector. + * @param noLocal If true then inhibits the delivery of messages published by this subscriber's connection. + * @throws JMSException If creating a subscriber fails due to some internal error. + * @throws InvalidDestinationException If an invalid topic is specified. + * @throws InvalidSelectorException If the message selector is invalid. + */ + public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException + { + checkNotClosed(); + checkDestination(topic); + TopicSubscriber topicSubscriber; + try + { + topicSubscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return topicSubscriber; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java new file mode 100644 index 0000000000..9fe03d336d --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicSubscriberImpl.java @@ -0,0 +1,72 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import javax.jms.TopicSubscriber; +import javax.jms.Topic; +import javax.jms.JMSException; + +/** + * Implementation of the JMS TopicSubscriber interface. + */ +public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSubscriber +{ + //--- Constructor + /** + * Create a new TopicSubscriberImpl. + * + * @param session The session of this topic subscriber. + * @param topic The default topic for this TopicSubscriberImpl + * @param messageSelector The MessageSelector + * @param noLocal If true inhibits the delivery of messages published by its own connection. + * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber. + * If this value is null, a non-durable subscription is created. + * @throws Exception If the TopicSubscriberImpl cannot be created due to internal error. + */ + protected TopicSubscriberImpl(SessionImpl session, Topic topic, String messageSelector, boolean noLocal, + String subscriptionName) throws Exception + { + super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName); + } + + //--- javax.jms.TopicSubscriber interface + /** + * Get the Topic associated with this subscriber. + * + * @return This subscriber's Topic + * @throws JMSException if getting the topic for this topicSubscriber fails due to some internal error. + */ + public Topic getTopic() throws JMSException + { + checkNotClosed(); + return (TopicImpl) _destination; + } + + + /** + * Get NoLocal for this subscriber. + * + * @return True if locally published messages are being inhibited, false otherwise + * @throws JMSException If getting NoLocal for this topic subscriber fails due to some internal error. + */ + public boolean getNoLocal() throws JMSException + { + checkNotClosed(); + return _noLocal; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractBytesMessage.java new file mode 100644 index 0000000000..10cfadf903 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractBytesMessage.java @@ -0,0 +1,151 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import java.io.IOException; +import java.nio.charset.Charset; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; + +/** + * @author Apache Software Foundation + */ +public abstract class AbstractBytesMessage extends AbstractJMSMessage +{ + + /** + * The default initial size of the buffer. The buffer expands automatically. + */ + private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024; + + AbstractBytesMessage() + { + this(null); + } + + /** + * Construct a bytes message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + AbstractBytesMessage(ByteBuffer data) + { + super(data); // this instanties a content header + getContentHeaderProperties().setContentType(getMimeTypeAsShortString()); + + if (_data == null) + { + allocateInitialBuffer(); + } + } + + protected void allocateInitialBuffer() + { + _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); + _data.setAutoExpand(true); + } + + AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException + { + // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea + super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); + getContentHeaderProperties().setContentType(getMimeTypeAsShortString()); + } + + public void clearBodyImpl() throws JMSException + { + allocateInitialBuffer(); + } + + public String toBodyString() throws JMSException + { + checkReadable(); + try + { + return getText(); + } + catch (IOException e) + { + JMSException jmse = new JMSException(e.toString()); + jmse.setLinkedException(e); + throw jmse; + } + } + + /** + * We reset the stream before and after reading the data. This means that toString() will always output + * the entire message and also that the caller can then immediately start reading as if toString() had + * never been called. + * + * @return + * @throws IOException + */ + private String getText() throws IOException + { + // this will use the default platform encoding + if (_data == null) + { + return null; + } + + int pos = _data.position(); + _data.rewind(); + // one byte left is for the end of frame marker + if (_data.remaining() == 0) + { + // this is really redundant since pos must be zero + _data.position(pos); + + return null; + } + else + { + String data = _data.getString(Charset.forName("UTF8").newDecoder()); + _data.position(pos); + + return data; + } + } + + /** + * Check that there is at least a certain number of bytes available to read + * + * @param len the number of bytes + * @throws javax.jms.MessageEOFException if there are less than len bytes available to read + */ + protected void checkAvailable(int len) throws MessageEOFException + { + if (_data.remaining() < len) + { + throw new MessageEOFException("Unable to read " + len + " bytes"); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractBytesTypedMessage.java new file mode 100644 index 0000000000..9e205eb679 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractBytesTypedMessage.java @@ -0,0 +1,801 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpidity.jms.message; + +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +/** + * @author Apache Software Foundation + */ +public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage +{ + + protected static final byte BOOLEAN_TYPE = (byte) 1; + + protected static final byte BYTE_TYPE = (byte) 2; + + protected static final byte BYTEARRAY_TYPE = (byte) 3; + + protected static final byte SHORT_TYPE = (byte) 4; + + protected static final byte CHAR_TYPE = (byte) 5; + + protected static final byte INT_TYPE = (byte) 6; + + protected static final byte LONG_TYPE = (byte) 7; + + protected static final byte FLOAT_TYPE = (byte) 8; + + protected static final byte DOUBLE_TYPE = (byte) 9; + + protected static final byte STRING_TYPE = (byte) 10; + + protected static final byte NULL_STRING_TYPE = (byte) 11; + + /** + * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read + * a byte array in multiple chunks, hence this is used to track how much is left to be read + */ + private int _byteArrayRemaining = -1; + + AbstractBytesTypedMessage() + { + this(null); + } + + /** + * Construct a stream message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + AbstractBytesTypedMessage(ByteBuffer data) + { + super(data); // this instanties a content header + } + + + AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException + { + super(messageNbr, contentHeader, exchange, routingKey, data); + } + + + protected byte readWireType() throws MessageFormatException, MessageEOFException, + MessageNotReadableException + { + checkReadable(); + checkAvailable(1); + return _data.get(); + } + + protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException + { + checkWritable(); + _data.put(type); + _changedData = true; + } + + protected boolean readBoolean() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + boolean result; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private boolean readBooleanImpl() + { + return _data.get() != 0; + } + + protected byte readByte() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + byte result; + try + { + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + private byte readByteImpl() + { + return _data.get(); + } + + protected short readShort() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + short result; + try + { + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + private short readShortImpl() + { + return _data.getShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws javax.jms.JMSException + */ + protected char readChar() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + try + { + if(wireType == NULL_STRING_TYPE){ + throw new NullPointerException(); + } + + if (wireType != CHAR_TYPE) + { + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private char readCharImpl() + { + return _data.getChar(); + } + + protected int readInt() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + int result; + try + { + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected int readIntImpl() + { + return _data.getInt(); + } + + protected long readLong() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + long result; + try + { + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private long readLongImpl() + { + return _data.getLong(); + } + + protected float readFloat() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + float result; + try + { + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private float readFloatImpl() + { + return _data.getFloat(); + } + + protected double readDouble() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + double result; + try + { + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private double readDoubleImpl() + { + return _data.getDouble(); + } + + protected String readString() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + String result; + try + { + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case NULL_STRING_TYPE: + result = null; + throw new NullPointerException("data is null"); + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected String readStringImpl() throws JMSException + { + try + { + return _data.getString(Charset.forName("UTF-8").newDecoder()); + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + je.setLinkedException(e); + throw je; + } + } + + protected int readBytes(byte[] bytes) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + checkReadable(); + // first call + if (_byteArrayRemaining == -1) + { + // type discriminator checked separately so you get a MessageFormatException rather than + // an EOF even in the case where both would be applicable + checkAvailable(1); + byte wireType = readWireType(); + if (wireType != BYTEARRAY_TYPE) + { + throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); + } + checkAvailable(4); + int size = _data.getInt(); + // length of -1 indicates null + if (size == -1) + { + return -1; + } + else + { + if (size > _data.remaining()) + { + throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " + + _data.remaining() + " bytes"); + } + else + { + _byteArrayRemaining = size; + } + } + } + else if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + return -1; + } + + int returnedSize = readBytesImpl(bytes); + if (returnedSize < bytes.length) + { + _byteArrayRemaining = -1; + } + return returnedSize; + } + + private int readBytesImpl(byte[] bytes) + { + int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); + _byteArrayRemaining -= count; + + if (count == 0) + { + return 0; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + protected Object readObject() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + Object result = null; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + byte[] bytesResult = new byte[size]; + readBytesImpl(bytesResult); + result = bytesResult; + } + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case CHAR_TYPE: + checkAvailable(2); + result = readCharImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case NULL_STRING_TYPE: + result = null; + break; + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected void writeBoolean(boolean b) throws JMSException + { + writeTypeDiscriminator(BOOLEAN_TYPE); + _data.put(b ? (byte) 1 : (byte) 0); + } + + protected void writeByte(byte b) throws JMSException + { + writeTypeDiscriminator(BYTE_TYPE); + _data.put(b); + } + + protected void writeShort(short i) throws JMSException + { + writeTypeDiscriminator(SHORT_TYPE); + _data.putShort(i); + } + + protected void writeChar(char c) throws JMSException + { + writeTypeDiscriminator(CHAR_TYPE); + _data.putChar(c); + } + + protected void writeInt(int i) throws JMSException + { + writeTypeDiscriminator(INT_TYPE); + writeIntImpl(i); + } + + protected void writeIntImpl(int i) + { + _data.putInt(i); + } + + protected void writeLong(long l) throws JMSException + { + writeTypeDiscriminator(LONG_TYPE); + _data.putLong(l); + } + + protected void writeFloat(float v) throws JMSException + { + writeTypeDiscriminator(FLOAT_TYPE); + _data.putFloat(v); + } + + protected void writeDouble(double v) throws JMSException + { + writeTypeDiscriminator(DOUBLE_TYPE); + _data.putDouble(v); + } + + protected void writeString(String string) throws JMSException + { + if (string == null) + { + writeTypeDiscriminator(NULL_STRING_TYPE); + } + else + { + writeTypeDiscriminator(STRING_TYPE); + try + { + writeStringImpl(string); + } + catch (CharacterCodingException e) + { + JMSException ex = new JMSException("Unable to encode string: " + e); + ex.setLinkedException(e); + throw ex; + } + } + } + + protected void writeStringImpl(String string) + throws CharacterCodingException + { + _data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must write the null terminator ourselves + _data.put((byte) 0); + } + + protected void writeBytes(byte[] bytes) throws JMSException + { + writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); + } + + protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + writeTypeDiscriminator(BYTEARRAY_TYPE); + if (bytes == null) + { + _data.putInt(-1); + } + else + { + _data.putInt(length); + _data.put(bytes, offset, length); + } + } + + protected void writeObject(Object object) throws JMSException + { + checkWritable(); + Class clazz; + + if (object == null) + { + // string handles the output of null values + clazz = String.class; + } + else + { + clazz = object.getClass(); + } + + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeString((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractJMSMessage.java new file mode 100644 index 0000000000..795556295e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractJMSMessage.java @@ -0,0 +1,669 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import org.apache.commons.collections.map.ReferenceMap; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.*; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.*; + +import java.util.Collections; +import java.util.Enumeration; +import java.util.Map; +import java.util.UUID; + +public abstract class AbstractJMSMessage extends QpidMessage implements Message +{ + private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); + + protected boolean _redelivered; + + protected ByteBuffer _data; + private boolean _readableProperties = false; + protected boolean _readableMessage = false; + protected boolean _changedData; + private Destination _destination; + private JMSHeaderAdapter _headerAdapter; + private BasicMessageConsumer _consumer; + private boolean _strictAMQP; + + protected AbstractJMSMessage(ByteBuffer data) + { + //super(new BasicContentHeaderProperties()); + _data = data; + if (_data != null) + { + _data.acquire(); + } + + _readableProperties = false; + _readableMessage = (data != null); + _changedData = (data == null); + // _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); + + _strictAMQP = + Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); + } + + protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException + { + this(contentHeader, deliveryTag); + + Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); + + AMQDestination dest; + + if (AMQDestination.QUEUE_TYPE.equals(type)) + { + dest = new AMQQueue(exchange, routingKey, routingKey); + } + else if (AMQDestination.TOPIC_TYPE.equals(type)) + { + dest = new AMQTopic(exchange, routingKey, null); + } + else + { + dest = new AMQUndefinedDestination(exchange, routingKey, null); + } + // Destination dest = AMQDestination.createDestination(url); + setJMSDestination(dest); + + _data = data; + if (_data != null) + { + _data.acquire(); + } + + _readableMessage = data != null; + + } + + protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) + { + // super(contentHeader, deliveryTag); + // _readableProperties = (_contentHeaderProperties != null); + // _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); + } + + public String getJMSMessageID() throws JMSException + { + if (getContentHeaderProperties().getMessageIdAsString() == null) + { + getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID()); + } + + return getContentHeaderProperties().getMessageIdAsString(); + } + + public void setJMSMessageID(String messageId) throws JMSException + { + getContentHeaderProperties().setMessageId(messageId); + } + + public long getJMSTimestamp() throws JMSException + { + return getContentHeaderProperties().getTimestamp(); + } + + public void setJMSTimestamp(long timestamp) throws JMSException + { + getContentHeaderProperties().setTimestamp(timestamp); + } + + public byte[] getJMSCorrelationIDAsBytes() throws JMSException + { + return getContentHeaderProperties().getCorrelationIdAsString().getBytes(); + } + + public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException + { + getContentHeaderProperties().setCorrelationId(new String(bytes)); + } + + public void setJMSCorrelationID(String correlationId) throws JMSException + { + getContentHeaderProperties().setCorrelationId(correlationId); + } + + public String getJMSCorrelationID() throws JMSException + { + return getContentHeaderProperties().getCorrelationIdAsString(); + } + + public Destination getJMSReplyTo() throws JMSException + { + String replyToEncoding = getContentHeaderProperties().getReplyToAsString(); + if (replyToEncoding == null) + { + return null; + } + else + { + Destination dest = (Destination) _destinationCache.get(replyToEncoding); + if (dest == null) + { + try + { + BindingURL binding = new AMQBindingURL(replyToEncoding); + dest = AMQDestination.createDestination(binding); + } + catch (URLSyntaxException e) + { + throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e); + } + + _destinationCache.put(replyToEncoding, dest); + } + + return dest; + } + } + + public void setJMSReplyTo(Destination destination) throws JMSException + { + if (destination == null) + { + throw new IllegalArgumentException("Null destination not allowed"); + } + + if (!(destination instanceof AMQDestination)) + { + throw new IllegalArgumentException( + "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); + } + + final AMQDestination amqd = (AMQDestination) destination; + + final AMQShortString encodedDestination = amqd.getEncodedName(); + _destinationCache.put(encodedDestination, destination); + getContentHeaderProperties().setReplyTo(encodedDestination); + } + + public Destination getJMSDestination() throws JMSException + { + return _destination; + } + + public void setJMSDestination(Destination destination) + { + _destination = destination; + } + + public int getJMSDeliveryMode() throws JMSException + { + return getContentHeaderProperties().getDeliveryMode(); + } + + public void setJMSDeliveryMode(int i) throws JMSException + { + getContentHeaderProperties().setDeliveryMode((byte) i); + } + + public BasicContentHeaderProperties getContentHeaderProperties() + { + return null; + } + + public boolean getJMSRedelivered() throws JMSException + { + return _redelivered; + } + + public void setJMSRedelivered(boolean b) throws JMSException + { + _redelivered = b; + } + + public String getJMSType() throws JMSException + { + return getContentHeaderProperties().getTypeAsString(); + } + + public void setJMSType(String string) throws JMSException + { + getContentHeaderProperties().setType(string); + } + + public long getJMSExpiration() throws JMSException + { + return getContentHeaderProperties().getExpiration(); + } + + public void setJMSExpiration(long l) throws JMSException + { + getContentHeaderProperties().setExpiration(l); + } + + public int getJMSPriority() throws JMSException + { + return getContentHeaderProperties().getPriority(); + } + + public void setJMSPriority(int i) throws JMSException + { + getContentHeaderProperties().setPriority((byte) i); + } + + public void clearProperties() throws JMSException + { + getJmsHeaders().clear(); + + _readableProperties = false; + } + + public void clearBody() throws JMSException + { + clearBodyImpl(); + _readableMessage = false; + } + + public boolean propertyExists(AMQShortString propertyName) throws JMSException + { + return getJmsHeaders().propertyExists(propertyName); + } + + public boolean propertyExists(String propertyName) throws JMSException + { + return getJmsHeaders().propertyExists(propertyName); + } + + public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getBoolean(propertyName); + } + + public boolean getBooleanProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getBoolean(propertyName); + } + + public byte getByteProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getByte(propertyName); + } + + public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getBytes(propertyName); + } + + public short getShortProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getShort(propertyName); + } + + public int getIntProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getInteger(propertyName); + } + + public long getLongProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getLong(propertyName); + } + + public float getFloatProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getFloat(propertyName); + } + + public double getDoubleProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getDouble(propertyName); + } + + public String getStringProperty(String propertyName) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + return getJmsHeaders().getString(propertyName); + } + + public Object getObjectProperty(String propertyName) throws JMSException + { + return getJmsHeaders().getObject(propertyName); + } + + public Enumeration getPropertyNames() throws JMSException + { + return getJmsHeaders().getPropertyNames(); + } + + public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setBoolean(propertyName, b); + } + + public void setBooleanProperty(String propertyName, boolean b) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setBoolean(propertyName, b); + } + + public void setByteProperty(String propertyName, byte b) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setByte(propertyName, new Byte(b)); + } + + public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setBytes(propertyName, bytes); + } + + public void setShortProperty(String propertyName, short i) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setShort(propertyName, new Short(i)); + } + + public void setIntProperty(String propertyName, int i) throws JMSException + { + checkWritableProperties(); + JMSHeaderAdapter.checkPropertyName(propertyName); + // super.setIntProperty(new AMQShortString(propertyName), new Integer(i)); + } + + public void setLongProperty(String propertyName, long l) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setLong(propertyName, new Long(l)); + } + + public void setFloatProperty(String propertyName, float f) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setFloat(propertyName, new Float(f)); + } + + public void setDoubleProperty(String propertyName, double v) throws JMSException + { + if (_strictAMQP) + { + throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + } + + checkWritableProperties(); + getJmsHeaders().setDouble(propertyName, new Double(v)); + } + + public void setStringProperty(String propertyName, String value) throws JMSException + { + checkWritableProperties(); + JMSHeaderAdapter.checkPropertyName(propertyName); + // super.setLongStringProperty(new AMQShortString(propertyName), value); + } + + public void setObjectProperty(String propertyName, Object object) throws JMSException + { + checkWritableProperties(); + getJmsHeaders().setObject(propertyName, object); + } + + protected void removeProperty(AMQShortString propertyName) throws JMSException + { + getJmsHeaders().remove(propertyName); + } + + protected void removeProperty(String propertyName) throws JMSException + { + getJmsHeaders().remove(propertyName); + } + + public void acknowledgeThis() throws JMSException + { + // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge + // is not specified. In our case, we only set the session field where client acknowledge mode is specified. + + } + + public void acknowledge() throws JMSException + { + + } + + /** + * This forces concrete classes to implement clearBody() + * + * @throws JMSException + */ + public abstract void clearBodyImpl() throws JMSException; + + /** + * Get a String representation of the body of the message. Used in the toString() method which outputs this before + * message properties. + */ + public abstract String toBodyString() throws JMSException; + + public String getMimeType() + { + return getMimeTypeAsShortString().toString(); + } + + public abstract AMQShortString getMimeTypeAsShortString(); + + public String toString() + { + try + { + StringBuffer buf = new StringBuffer("Body:\n"); + buf.append(toBodyString()); + buf.append("\nJMS Correlation ID: ").append(getJMSCorrelationID()); + buf.append("\nJMS timestamp: ").append(getJMSTimestamp()); + buf.append("\nJMS expiration: ").append(getJMSExpiration()); + buf.append("\nJMS priority: ").append(getJMSPriority()); + buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode()); + buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); + buf.append("\nJMS Redelivered: ").append(_redelivered); + buf.append("\nJMS Destination: ").append(getJMSDestination()); + buf.append("\nJMS Type: ").append(getJMSType()); + buf.append("\nJMS MessageID: ").append(getJMSMessageID()); + // buf.append("\nAMQ message number: ").append(_deliveryTag); + + buf.append("\nProperties:"); + if (getJmsHeaders().isEmpty()) + { + buf.append("<NONE>"); + } + else + { + buf.append('\n').append(getJmsHeaders().getHeaders()); + } + + return buf.toString(); + } + catch (JMSException e) + { + return e.toString(); + } + } + + public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties) + { + getContentHeaderProperties().setHeaders(messageProperties); + } + + public JMSHeaderAdapter getJmsHeaders() + { + return _headerAdapter; + } + + public ByteBuffer getData() + { + // make sure we rewind the data just in case any method has moved the + // position beyond the start + if (_data != null) + { + reset(); + } + + return _data; + } + + protected void checkReadable() throws MessageNotReadableException + { + if (!_readableMessage) + { + throw new MessageNotReadableException("You need to call reset() to make the message readable"); + } + } + + protected void checkWritable() throws MessageNotWriteableException + { + if (_readableMessage) + { + throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); + } + } + + protected void checkWritableProperties() throws MessageNotWriteableException + { + if (_readableProperties) + { + throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable"); + } + } + + public boolean isReadable() + { + return _readableMessage; + } + + public boolean isWritable() + { + return !_readableMessage; + } + + public void reset() + { + if (!_changedData) + { + _data.rewind(); + } + else + { + _data.flip(); + _changedData = false; + } + } + + public void setConsumer(BasicMessageConsumer basicMessageConsumer) + { + _consumer = basicMessageConsumer; + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractJMSMessageFactory.java new file mode 100644 index 0000000000..020fb299fd --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/AbstractJMSMessageFactory.java @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; + +import java.util.Iterator; +import java.util.List; + +public abstract class AbstractJMSMessageFactory implements MessageFactory +{ + private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class); + + protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange, + AMQShortString routingKey, ContentHeaderBody contentHeader) throws AMQException; + + protected AbstractJMSMessage createMessageWithBody(long messageNbr, ContentHeaderBody contentHeader, + AMQShortString exchange, AMQShortString routingKey, List bodies) throws AMQException + { + ByteBuffer data; + final boolean debug = _logger.isDebugEnabled(); + + // we optimise the non-fragmented case to avoid copying + if ((bodies != null) && (bodies.size() == 1)) + { + if (debug) + { + _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")"); + } + + data = ((ContentBody) bodies.get(0)).payload; + } + else if (bodies != null) + { + if (debug) + { + _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize + + ")"); + } + + data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem? + final Iterator it = bodies.iterator(); + while (it.hasNext()) + { + ContentBody cb = (ContentBody) it.next(); + data.put(cb.payload); + cb.payload.release(); + } + + data.flip(); + } + else // bodies == null + { + data = ByteBuffer.allocate(0); + } + + if (debug) + { + _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + + data.remaining()); + } + + return createMessage(messageNbr, data, exchange, routingKey, contentHeader); + } + + public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader, + AMQShortString exchange, AMQShortString routingKey, List bodies) throws JMSException, AMQException + { + final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies); + msg.setJMSRedelivered(redelivered); + + return msg; + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSBytesMessage.java new file mode 100644 index 0000000000..06d8e4dd4d --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSBytesMessage.java @@ -0,0 +1,388 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MessageFormatException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage +{ + public static final String MIME_TYPE = "application/octet-stream"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + + + public JMSBytesMessage() + { + this(null); + } + + /** + * Construct a bytes message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + JMSBytesMessage(ByteBuffer data) + { + super(data); // this instanties a content header + } + + JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException + { + super(messageNbr, contentHeader, exchange, routingKey, data); + } + + public void reset() + { + super.reset(); + _readableMessage = true; + } + + public AMQShortString getMimeTypeAsShortString() + { + return MIME_TYPE_SHORT_STRING; + } + + public long getBodyLength() throws JMSException + { + checkReadable(); + return _data.limit(); + } + + public boolean readBoolean() throws JMSException + { + checkReadable(); + checkAvailable(1); + return _data.get() != 0; + } + + public byte readByte() throws JMSException + { + checkReadable(); + checkAvailable(1); + return _data.get(); + } + + public int readUnsignedByte() throws JMSException + { + checkReadable(); + checkAvailable(1); + return _data.getUnsigned(); + } + + public short readShort() throws JMSException + { + checkReadable(); + checkAvailable(2); + return _data.getShort(); + } + + public int readUnsignedShort() throws JMSException + { + checkReadable(); + checkAvailable(2); + return _data.getUnsignedShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws JMSException + */ + public char readChar() throws JMSException + { + checkReadable(); + checkAvailable(2); + return _data.getChar(); + } + + public int readInt() throws JMSException + { + checkReadable(); + checkAvailable(4); + return _data.getInt(); + } + + public long readLong() throws JMSException + { + checkReadable(); + checkAvailable(8); + return _data.getLong(); + } + + public float readFloat() throws JMSException + { + checkReadable(); + checkAvailable(4); + return _data.getFloat(); + } + + public double readDouble() throws JMSException + { + checkReadable(); + checkAvailable(8); + return _data.getDouble(); + } + + public String readUTF() throws JMSException + { + checkReadable(); + // we check only for one byte since theoretically the string could be only a + // single byte when using UTF-8 encoding + + try + { + short length = readShort(); + if(length == 0) + { + return ""; + } + else + { + CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); + ByteBuffer encodedString = _data.slice(); + encodedString.limit(length); + _data.position(_data.position()+length); + CharBuffer string = decoder.decode(encodedString.buf()); + + return string.toString(); + } + + + + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + je.setLinkedException(e); + throw je; + } + } + + public int readBytes(byte[] bytes) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + checkReadable(); + int count = (_data.remaining() >= bytes.length ? bytes.length : _data.remaining()); + if (count == 0) + { + return -1; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + public int readBytes(byte[] bytes, int maxLength) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + if (maxLength > bytes.length) + { + throw new IllegalArgumentException("maxLength must be <= bytes.length"); + } + checkReadable(); + int count = (_data.remaining() >= maxLength ? maxLength : _data.remaining()); + if (count == 0) + { + return -1; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + public void writeBoolean(boolean b) throws JMSException + { + checkWritable(); + _changedData = true; + _data.put(b ? (byte) 1 : (byte) 0); + } + + public void writeByte(byte b) throws JMSException + { + checkWritable(); + _changedData = true; + _data.put(b); + } + + public void writeShort(short i) throws JMSException + { + checkWritable(); + _changedData = true; + _data.putShort(i); + } + + public void writeChar(char c) throws JMSException + { + checkWritable(); + _changedData = true; + _data.putChar(c); + } + + public void writeInt(int i) throws JMSException + { + checkWritable(); + _changedData = true; + _data.putInt(i); + } + + public void writeLong(long l) throws JMSException + { + checkWritable(); + _changedData = true; + _data.putLong(l); + } + + public void writeFloat(float v) throws JMSException + { + checkWritable(); + _changedData = true; + _data.putFloat(v); + } + + public void writeDouble(double v) throws JMSException + { + checkWritable(); + _changedData = true; + _data.putDouble(v); + } + + public void writeUTF(String string) throws JMSException + { + checkWritable(); + try + { + CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder(); + java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string)); + + _data.putShort((short)encodedString.limit()); + _data.put(encodedString); + _changedData = true; + //_data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must add the null terminator manually + //_data.put((byte)0); + } + catch (CharacterCodingException e) + { + JMSException ex = new JMSException("Unable to encode string: " + e); + ex.setLinkedException(e); + throw ex; + } + } + + public void writeBytes(byte[] bytes) throws JMSException + { + checkWritable(); + _data.put(bytes); + _changedData = true; + } + + public void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + checkWritable(); + _data.put(bytes, offset, length); + _changedData = true; + } + + public void writeObject(Object object) throws JMSException + { + checkWritable(); + if (object == null) + { + throw new NullPointerException("Argument must not be null"); + } + Class clazz = object.getClass(); + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeUTF((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } + } + + public String toString() + { + return String.valueOf(System.identityHashCode(this)); + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSBytesMessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSBytesMessageFactory.java new file mode 100644 index 0000000000..006ebb2c83 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSBytesMessageFactory.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSBytesMessageFactory extends AbstractJMSMessageFactory +{ + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException + { + return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data); + } + + public AbstractJMSMessage createMessage() throws JMSException + { + return new JMSBytesMessage(); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSHeaderAdapter.java new file mode 100644 index 0000000000..e688b5e2be --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSHeaderAdapter.java @@ -0,0 +1,552 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import java.util.Enumeration; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + + +public final class JMSHeaderAdapter +{ + private final FieldTable _headers; + + public JMSHeaderAdapter(FieldTable headers) + { + _headers = headers; + } + + + public FieldTable getHeaders() + { + return _headers; + } + + public boolean getBoolean(String string) throws JMSException + { + checkPropertyName(string); + Boolean b = getHeaders().getBoolean(string); + + if (b == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getBoolean can't use " + string + " item."); + } + else + { + return Boolean.valueOf((String) str); + } + } + else + { + b = Boolean.valueOf(null); + } + } + + return b; + } + + public boolean getBoolean(AMQShortString string) throws JMSException + { + checkPropertyName(string); + Boolean b = getHeaders().getBoolean(string); + + if (b == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getBoolean can't use " + string + " item."); + } + else + { + return Boolean.valueOf((String) str); + } + } + else + { + b = Boolean.valueOf(null); + } + } + + return b; + } + + public char getCharacter(String string) throws JMSException + { + checkPropertyName(string); + Character c = getHeaders().getCharacter(string); + + if (c == null) + { + if (getHeaders().isNullStringValue(string)) + { + throw new NullPointerException("Cannot convert null char"); + } + else + { + throw new MessageFormatException("getChar can't use " + string + " item."); + } + } + else + { + return (char) c; + } + } + + public byte[] getBytes(String string) throws JMSException + { + return getBytes(new AMQShortString(string)); + } + + public byte[] getBytes(AMQShortString string) throws JMSException + { + checkPropertyName(string); + + byte[] bs = getHeaders().getBytes(string); + + if (bs == null) + { + throw new MessageFormatException("getBytes can't use " + string + " item."); + } + else + { + return bs; + } + } + + public byte getByte(String string) throws JMSException + { + checkPropertyName(string); + Byte b = getHeaders().getByte(string); + if (b == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getByte can't use " + string + " item."); + } + else + { + return Byte.valueOf((String) str); + } + } + else + { + b = Byte.valueOf(null); + } + } + + return b; + } + + public short getShort(String string) throws JMSException + { + checkPropertyName(string); + Short s = getHeaders().getShort(string); + + if (s == null) + { + s = Short.valueOf(getByte(string)); + } + + return s; + } + + public int getInteger(String string) throws JMSException + { + checkPropertyName(string); + Integer i = getHeaders().getInteger(string); + + if (i == null) + { + i = Integer.valueOf(getShort(string)); + } + + return i; + } + + public long getLong(String string) throws JMSException + { + checkPropertyName(string); + Long l = getHeaders().getLong(string); + + if (l == null) + { + l = Long.valueOf(getInteger(string)); + } + + return l; + } + + public float getFloat(String string) throws JMSException + { + checkPropertyName(string); + Float f = getHeaders().getFloat(string); + + if (f == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getFloat can't use " + string + " item."); + } + else + { + return Float.valueOf((String) str); + } + } + else + { + f = Float.valueOf(null); + } + + } + + return f; + } + + public double getDouble(String string) throws JMSException + { + checkPropertyName(string); + Double d = getHeaders().getDouble(string); + + if (d == null) + { + d = Double.valueOf(getFloat(string)); + } + + return d; + } + + public String getString(String string) throws JMSException + { + checkPropertyName(string); + String s = getHeaders().getString(string); + + if (s == null) + { + if (getHeaders().containsKey(string)) + { + Object o = getHeaders().getObject(string); + if (o instanceof byte[]) + { + throw new MessageFormatException("getObject couldn't find " + string + " item."); + } + else + { + if (o == null) + { + return null; + } + else + { + s = String.valueOf(o); + } + } + }//else return s // null; + } + + return s; + } + + public Object getObject(String string) throws JMSException + { + checkPropertyName(string); + return getHeaders().getObject(string); + } + + public void setBoolean(AMQShortString string, boolean b) throws JMSException + { + checkPropertyName(string); + getHeaders().setBoolean(string, b); + } + + public void setBoolean(String string, boolean b) throws JMSException + { + checkPropertyName(string); + getHeaders().setBoolean(string, b); + } + + public void setChar(String string, char c) throws JMSException + { + checkPropertyName(string); + getHeaders().setChar(string, c); + } + + public Object setBytes(AMQShortString string, byte[] bytes) + { + checkPropertyName(string); + return getHeaders().setBytes(string, bytes); + } + + public Object setBytes(String string, byte[] bytes) + { + checkPropertyName(string); + return getHeaders().setBytes(string, bytes); + } + + public Object setBytes(String string, byte[] bytes, int start, int length) + { + checkPropertyName(string); + return getHeaders().setBytes(string, bytes, start, length); + } + + public void setByte(String string, byte b) throws JMSException + { + checkPropertyName(string); + getHeaders().setByte(string, b); + } + + public void setByte(AMQShortString string, byte b) throws JMSException + { + checkPropertyName(string); + getHeaders().setByte(string, b); + } + + + public void setShort(String string, short i) throws JMSException + { + checkPropertyName(string); + getHeaders().setShort(string, i); + } + + public void setInteger(String string, int i) throws JMSException + { + checkPropertyName(string); + getHeaders().setInteger(string, i); + } + + public void setInteger(AMQShortString string, int i) throws JMSException + { + checkPropertyName(string); + getHeaders().setInteger(string, i); + } + + public void setLong(String string, long l) throws JMSException + { + checkPropertyName(string); + getHeaders().setLong(string, l); + } + + public void setFloat(String string, float v) throws JMSException + { + checkPropertyName(string); + getHeaders().setFloat(string, v); + } + + public void setDouble(String string, double v) throws JMSException + { + checkPropertyName(string); + getHeaders().setDouble(string, v); + } + + public void setString(String string, String string1) throws JMSException + { + checkPropertyName(string); + getHeaders().setString(string, string1); + } + + public void setString(AMQShortString string, String string1) throws JMSException + { + checkPropertyName(string); + getHeaders().setString(string, string1); + } + + public void setObject(String string, Object object) throws JMSException + { + checkPropertyName(string); + try + { + getHeaders().setObject(string, object); + } + catch (AMQPInvalidClassException aice) + { + MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass()); + mfe.setLinkedException(aice); + throw mfe; + } + } + + public boolean itemExists(String string) throws JMSException + { + checkPropertyName(string); + return getHeaders().containsKey(string); + } + + public Enumeration getPropertyNames() + { + return getHeaders().getPropertyNames(); + } + + public void clear() + { + getHeaders().clear(); + } + + public boolean propertyExists(AMQShortString propertyName) + { + checkPropertyName(propertyName); + return getHeaders().propertyExists(propertyName); + } + + public boolean propertyExists(String propertyName) + { + checkPropertyName(propertyName); + return getHeaders().propertyExists(propertyName); + } + + public Object put(Object key, Object value) + { + checkPropertyName(key.toString()); + return getHeaders().setObject(key.toString(), value); + } + + public Object remove(AMQShortString propertyName) + { + checkPropertyName(propertyName); + return getHeaders().remove(propertyName); + } + + public Object remove(String propertyName) + { + checkPropertyName(propertyName); + return getHeaders().remove(propertyName); + } + + public boolean isEmpty() + { + return getHeaders().isEmpty(); + } + + public void writeToBuffer(ByteBuffer data) + { + getHeaders().writeToBuffer(data); + } + + public Enumeration getMapNames() + { + return getPropertyNames(); + } + + protected static void checkPropertyName(CharSequence propertyName) + { + if (propertyName == null) + { + throw new IllegalArgumentException("Property name must not be null"); + } + else if (propertyName.length() == 0) + { + throw new IllegalArgumentException("Property name must not be the empty string"); + } + + checkIdentiferFormat(propertyName); + } + + protected static void checkIdentiferFormat(CharSequence propertyName) + { +// JMS requirements 3.5.1 Property Names +// Identifiers: +// - An identifier is an unlimited-length character sequence that must begin +// with a Java identifier start character; all following characters must be Java +// identifier part characters. An identifier start character is any character for +// which the method Character.isJavaIdentifierStart returns true. This includes +// '_' and '$'. An identifier part character is any character for which the +// method Character.isJavaIdentifierPart returns true. +// - Identifiers cannot be the names NULL, TRUE, or FALSE. +// � Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or +// ESCAPE. +// � Identifiers are either header field references or property references. The +// type of a property value in a message selector corresponds to the type +// used to set the property. If a property that does not exist in a message is +// referenced, its value is NULL. The semantics of evaluating NULL values +// in a selector are described in Section 3.8.1.2, �Null Values.� +// � The conversions that apply to the get methods for properties do not +// apply when a property is used in a message selector expression. For +// example, suppose you set a property as a string value, as in the +// following: +// myMessage.setStringProperty("NumberOfOrders", "2"); +// The following expression in a message selector would evaluate to false, +// because a string cannot be used in an arithmetic expression: +// "NumberOfOrders > 1" +// � Identifiers are case sensitive. +// � Message header field references are restricted to JMSDeliveryMode, +// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and +// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be +// null and if so are treated as a NULL value. + + if (Boolean.getBoolean("strict-jms")) + { + // JMS start character + if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character"); + } + + // JMS part character + int length = propertyName.length(); + for (int c = 1; c < length; c++) + { + if (!(Character.isJavaIdentifierPart(propertyName.charAt(c)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character"); + } + } + + // JMS invalid names + if ((propertyName.equals("NULL") + || propertyName.equals("TRUE") + || propertyName.equals("FALSE") + || propertyName.equals("NOT") + || propertyName.equals("AND") + || propertyName.equals("OR") + || propertyName.equals("BETWEEN") + || propertyName.equals("LIKE") + || propertyName.equals("IN") + || propertyName.equals("IS") + || propertyName.equals("ESCAPE"))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS"); + } + } + + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSMapMessage.java new file mode 100644 index 0000000000..67c79b096f --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSMapMessage.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpidity.jms.message; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; + +import java.nio.charset.CharacterCodingException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; + +public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage +{ + private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class); + + public static final String MIME_TYPE = "jms/map-message"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + + private Map<String, Object> _map = new HashMap<String, Object>(); + + public JMSMapMessage() throws JMSException + { + this(null); + } + + JMSMapMessage(ByteBuffer data) throws JMSException + { + super(data); // this instantiates a content header + populateMapFromData(); + } + + JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, + ByteBuffer data) throws AMQException + { + super(messageNbr, contentHeader, exchange, routingKey, data); + try + { + populateMapFromData(); + } + catch (JMSException je) + { + throw new AMQException(null, "Error populating MapMessage from ByteBuffer", je); + + } + + } + + public String toBodyString() throws JMSException + { + return _map.toString(); + } + + public AMQShortString getMimeTypeAsShortString() + { + return MIME_TYPE_SHORT_STRING; + } + + public ByteBuffer getData() + { + // What if _data is null? + writeMapToData(); + + return super.getData(); + } + + @Override + public void clearBodyImpl() throws JMSException + { + super.clearBodyImpl(); + _map.clear(); + } + + public boolean getBoolean(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Boolean) + { + return ((Boolean) value).booleanValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Boolean.valueOf((String) value); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to boolean."); + } + + } + + public byte getByte(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Byte) + { + return ((Byte) value).byteValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Byte.valueOf((String) value).byteValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to byte."); + } + } + + public short getShort(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Short) + { + return ((Short) value).shortValue(); + } + else if (value instanceof Byte) + { + return ((Byte) value).shortValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Short.valueOf((String) value).shortValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to short."); + } + + } + + public int getInt(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Integer) + { + return ((Integer) value).intValue(); + } + else if (value instanceof Short) + { + return ((Short) value).intValue(); + } + else if (value instanceof Byte) + { + return ((Byte) value).intValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Integer.valueOf((String) value).intValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to int."); + } + + } + + public long getLong(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Long) + { + return ((Long) value).longValue(); + } + else if (value instanceof Integer) + { + return ((Integer) value).longValue(); + } + + if (value instanceof Short) + { + return ((Short) value).longValue(); + } + + if (value instanceof Byte) + { + return ((Byte) value).longValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Long.valueOf((String) value).longValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to long."); + } + + } + + public char getChar(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (!_map.containsKey(propName)) + { + throw new MessageFormatException("Property " + propName + " not present"); + } + else if (value instanceof Character) + { + return ((Character) value).charValue(); + } + else if (value == null) + { + throw new NullPointerException("Property " + propName + " has null value and therefore cannot " + + "be converted to char."); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to boolan."); + } + + } + + public float getFloat(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Float) + { + return ((Float) value).floatValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Float.valueOf((String) value).floatValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to float."); + } + } + + public double getDouble(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (value instanceof Double) + { + return ((Double) value).doubleValue(); + } + else if (value instanceof Float) + { + return ((Float) value).doubleValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Double.valueOf((String) value).doubleValue(); + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to double."); + } + } + + public String getString(String propName) throws JMSException + { + Object value = _map.get(propName); + + if ((value instanceof String) || (value == null)) + { + return (String) value; + } + else if (value instanceof byte[]) + { + throw new MessageFormatException("Property " + propName + " of type byte[] " + "cannot be converted to String."); + } + else + { + return value.toString(); + } + + } + + public byte[] getBytes(String propName) throws JMSException + { + Object value = _map.get(propName); + + if (!_map.containsKey(propName)) + { + throw new MessageFormatException("Property " + propName + " not present"); + } + else if ((value instanceof byte[]) || (value == null)) + { + return (byte[]) value; + } + else + { + throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName() + + " cannot be converted to byte[]."); + } + } + + public Object getObject(String propName) throws JMSException + { + return _map.get(propName); + } + + public Enumeration getMapNames() throws JMSException + { + return Collections.enumeration(_map.keySet()); + } + + public void setBoolean(String propName, boolean b) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, b); + } + + public void setByte(String propName, byte b) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, b); + } + + public void setShort(String propName, short i) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, i); + } + + public void setChar(String propName, char c) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, c); + } + + public void setInt(String propName, int i) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, i); + } + + public void setLong(String propName, long l) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, l); + } + + public void setFloat(String propName, float v) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, v); + } + + public void setDouble(String propName, double v) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, v); + } + + public void setString(String propName, String string1) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, string1); + } + + public void setBytes(String propName, byte[] bytes) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + _map.put(propName, bytes); + } + + public void setBytes(String propName, byte[] bytes, int offset, int length) throws JMSException + { + if ((offset == 0) && (length == bytes.length)) + { + setBytes(propName, bytes); + } + else + { + byte[] newBytes = new byte[length]; + System.arraycopy(bytes, offset, newBytes, 0, length); + setBytes(propName, newBytes); + } + } + + public void setObject(String propName, Object value) throws JMSException + { + checkWritable(); + checkPropertyName(propName); + if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer) + || (value instanceof Long) || (value instanceof Character) || (value instanceof Float) + || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null)) + { + _map.put(propName, value); + } + else + { + throw new MessageFormatException("Cannot set property " + propName + " to value " + value + "of type " + + value.getClass().getName() + "."); + } + } + + private void checkPropertyName(String propName) + { + if ((propName == null) || propName.equals("")) + { + throw new IllegalArgumentException("Property name cannot be null, or the empty String."); + } + } + + public boolean itemExists(String propName) throws JMSException + { + return _map.containsKey(propName); + } + + private void populateMapFromData() throws JMSException + { + if (_data != null) + { + _data.rewind(); + + final int entries = readIntImpl(); + for (int i = 0; i < entries; i++) + { + String propName = readStringImpl(); + Object value = readObject(); + _map.put(propName, value); + } + } + else + { + _map.clear(); + } + } + + private void writeMapToData() + { + allocateInitialBuffer(); + final int size = _map.size(); + writeIntImpl(size); + for (Map.Entry<String, Object> entry : _map.entrySet()) + { + try + { + writeStringImpl(entry.getKey()); + } + catch (CharacterCodingException e) + { + throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e); + + } + + try + { + writeObject(entry.getValue()); + } + catch (JMSException e) + { + Object value = entry.getValue(); + throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + " value : " + value + + " (type: " + value.getClass().getName() + ").", e); + } + } + + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSMapMessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSMapMessageFactory.java new file mode 100644 index 0000000000..12addc3279 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSMapMessageFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpidity.jms.message; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSMapMessageFactory extends AbstractJMSMessageFactory +{ + public AbstractJMSMessage createMessage() throws JMSException + { + return new JMSMapMessage(); + } + + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException + { + return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSObjectMessage.java new file mode 100644 index 0000000000..5798ab42b5 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSObjectMessage.java @@ -0,0 +1,197 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import javax.jms.ObjectMessage; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage +{ + public static final String MIME_TYPE = "application/java-object-stream"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + + private static final int DEFAULT_BUFFER_SIZE = 1024; + + /** + * Creates empty, writable message for use by producers + */ + public JMSObjectMessage() + { + this(null); + } + + private JMSObjectMessage(ByteBuffer data) + { + super(data); + if (data == null) + { + _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); + _data.setAutoExpand(true); + } + + getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING); + } + + /** + * Creates read only message for delivery to consumers + */ + JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, + ByteBuffer data) throws AMQException + { + super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data); + } + + public void clearBodyImpl() throws JMSException + { + if (_data != null) + { + _data.release(); + } + + _data = null; + + } + + public String toBodyString() throws JMSException + { + return toString(_data); + } + + public AMQShortString getMimeTypeAsShortString() + { + return MIME_TYPE_SHORT_STRING; + } + + public void setObject(Serializable serializable) throws JMSException + { + checkWritable(); + + if (_data == null) + { + _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); + _data.setAutoExpand(true); + } + else + { + _data.rewind(); + } + + try + { + ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream()); + out.writeObject(serializable); + out.flush(); + out.close(); + } + catch (IOException e) + { + MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e); + mfe.setLinkedException(e); + throw mfe; + } + + } + + public Serializable getObject() throws JMSException + { + ObjectInputStream in = null; + if (_data == null) + { + return null; + } + + try + { + _data.rewind(); + in = new ObjectInputStream(_data.asInputStream()); + + return (Serializable) in.readObject(); + } + catch (IOException e) + { + MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); + mfe.setLinkedException(e); + throw mfe; + } + catch (ClassNotFoundException e) + { + MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); + mfe.setLinkedException(e); + throw mfe; + } + finally + { + _data.rewind(); + close(in); + } + } + + private static void close(InputStream in) + { + try + { + if (in != null) + { + in.close(); + } + } + catch (IOException ignore) + { } + } + + private static String toString(ByteBuffer data) + { + if (data == null) + { + return null; + } + + int pos = data.position(); + try + { + return data.getString(Charset.forName("UTF8").newDecoder()); + } + catch (CharacterCodingException e) + { + return null; + } + finally + { + data.position(pos); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSObjectMessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSObjectMessageFactory.java new file mode 100644 index 0000000000..75431c2312 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSObjectMessageFactory.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSObjectMessageFactory extends AbstractJMSMessageFactory +{ + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException + { + return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data); + } + + public AbstractJMSMessage createMessage() throws JMSException + { + return new JMSObjectMessage(); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSStreamMessage.java new file mode 100644 index 0000000000..d3b53a28cf --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSStreamMessage.java @@ -0,0 +1,204 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import javax.jms.JMSException; +import javax.jms.StreamMessage; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +/** + * @author Apache Software Foundation + */ +public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage +{ + public static final String MIME_TYPE="jms/stream-message"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + + + /** + * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read + * a byte array in multiple chunks, hence this is used to track how much is left to be read + */ + private int _byteArrayRemaining = -1; + + public JMSStreamMessage() + { + this(null); + } + + /** + * Construct a stream message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + JMSStreamMessage(ByteBuffer data) + { + super(data); // this instanties a content header + } + + + JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException + { + super(messageNbr, contentHeader, exchange, routingKey, data); + } + + public void reset() + { + super.reset(); + _readableMessage = true; + } + + public AMQShortString getMimeTypeAsShortString() + { + return MIME_TYPE_SHORT_STRING; + } + + + + public boolean readBoolean() throws JMSException + { + return super.readBoolean(); + } + + + public byte readByte() throws JMSException + { + return super.readByte(); + } + + public short readShort() throws JMSException + { + return super.readShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws JMSException + */ + public char readChar() throws JMSException + { + return super.readChar(); + } + + public int readInt() throws JMSException + { + return super.readInt(); + } + + public long readLong() throws JMSException + { + return super.readLong(); + } + + public float readFloat() throws JMSException + { + return super.readFloat(); + } + + public double readDouble() throws JMSException + { + return super.readDouble(); + } + + public String readString() throws JMSException + { + return super.readString(); + } + + public int readBytes(byte[] bytes) throws JMSException + { + return super.readBytes(bytes); + } + + + public Object readObject() throws JMSException + { + return super.readObject(); + } + + public void writeBoolean(boolean b) throws JMSException + { + super.writeBoolean(b); + } + + public void writeByte(byte b) throws JMSException + { + super.writeByte(b); + } + + public void writeShort(short i) throws JMSException + { + super.writeShort(i); + } + + public void writeChar(char c) throws JMSException + { + super.writeChar(c); + } + + public void writeInt(int i) throws JMSException + { + super.writeInt(i); + } + + public void writeLong(long l) throws JMSException + { + super.writeLong(l); + } + + public void writeFloat(float v) throws JMSException + { + super.writeFloat(v); + } + + public void writeDouble(double v) throws JMSException + { + super.writeDouble(v); + } + + public void writeString(String string) throws JMSException + { + super.writeString(string); + } + + public void writeBytes(byte[] bytes) throws JMSException + { + super.writeBytes(bytes); + } + + public void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + super.writeBytes(bytes,offset,length); + } + + public void writeObject(Object object) throws JMSException + { + super.writeObject(object); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSStreamMessageFactory.java new file mode 100644 index 0000000000..769e47482b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSStreamMessageFactory.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSStreamMessageFactory extends AbstractJMSMessageFactory +{ + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException + { + return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data); + } + + public AbstractJMSMessage createMessage() throws JMSException + { + return new JMSStreamMessage(); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSTextMessage.java new file mode 100644 index 0000000000..aac7e583ae --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSTextMessage.java @@ -0,0 +1,201 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import java.io.UnsupportedEncodingException; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; + +public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage +{ + private static final String MIME_TYPE = "text/plain"; + private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE); + + + private String _decodedValue; + + /** + * This constant represents the name of a property that is set when the message payload is null. + */ + private static final AMQShortString PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.getShortStringName(); + private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); + + public JMSTextMessage() throws JMSException + { + this(null, null); + } + + JMSTextMessage(ByteBuffer data, String encoding) throws JMSException + { + super(data); // this instantiates a content header + getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING); + getContentHeaderProperties().setEncoding(encoding); + } + + JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) + throws AMQException + { + super(deliveryTag, contentHeader, exchange, routingKey, data); + contentHeader.setContentType(MIME_TYPE_SHORT_STRING); + _data = data; + } + + JMSTextMessage(ByteBuffer data) throws JMSException + { + this(data, null); + } + + JMSTextMessage(String text) throws JMSException + { + super((ByteBuffer) null); + setText(text); + } + + public void clearBodyImpl() throws JMSException + { + if (_data != null) + { + _data.release(); + } + _data = null; + _decodedValue = null; + } + + public String toBodyString() throws JMSException + { + return getText(); + } + + public void setData(ByteBuffer data) + { + _data = data; + } + + public AMQShortString getMimeTypeAsShortString() + { + return MIME_TYPE_SHORT_STRING; + } + + public void setText(String text) throws JMSException + { + checkWritable(); + + clearBody(); + try + { + if (text != null) + { + _data = ByteBuffer.allocate(text.length()); + _data.limit(text.length()) ; + //_data.sweep(); + _data.setAutoExpand(true); + final String encoding = getContentHeaderProperties().getEncodingAsString(); + if (encoding == null) + { + _data.put(text.getBytes(DEFAULT_CHARSET.name())); + } + else + { + _data.put(text.getBytes(encoding)); + } + _changedData=true; + } + _decodedValue = text; + } + catch (UnsupportedEncodingException e) + { + // should never occur + JMSException jmse = new JMSException("Unable to decode text data"); + jmse.setLinkedException(e); + } + } + + public String getText() throws JMSException + { + if (_data == null && _decodedValue == null) + { + return null; + } + else if (_decodedValue != null) + { + return _decodedValue; + } + else + { + _data.rewind(); + + if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY)) + { + return null; + } + if (getContentHeaderProperties().getEncodingAsString() != null) + { + try + { + _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder()); + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Could not decode string data: " + e); + je.setLinkedException(e); + throw je; + } + } + else + { + try + { + _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder()); + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Could not decode string data: " + e); + je.setLinkedException(e); + throw je; + } + } + return _decodedValue; + } + } + + // @Override + public void prepareForSending() throws JMSException + { + // super.prepareForSending(); + if (_data == null) + { + setBooleanProperty(PAYLOAD_NULL_PROPERTY, true); + } + else + { + removeProperty(PAYLOAD_NULL_PROPERTY); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/JMSTextMessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSTextMessageFactory.java new file mode 100644 index 0000000000..4a078eb141 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/JMSTextMessageFactory.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import javax.jms.JMSException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; + +public class JMSTextMessageFactory extends AbstractJMSMessageFactory +{ + + public AbstractJMSMessage createMessage() throws JMSException + { + return new JMSTextMessage(); + } + + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, + AMQShortString exchange, AMQShortString routingKey, + ContentHeaderBody contentHeader) throws AMQException + { + return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties, + exchange, routingKey, data); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageConverter.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageConverter.java new file mode 100644 index 0000000000..5554edb107 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageConverter.java @@ -0,0 +1,202 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageEOFException; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import java.util.Enumeration; + +public class MessageConverter +{ + + /** + * Log4J logger + */ + protected final Logger _logger = LoggerFactory.getLogger(getClass()); + + /** + * AbstractJMSMessage which will hold the converted message + */ + private AbstractJMSMessage _newMessage; + + public MessageConverter(AbstractJMSMessage message) throws JMSException + { + _newMessage = message; + } + + public MessageConverter(BytesMessage message) throws JMSException + { + BytesMessage bytesMessage = (BytesMessage) message; + bytesMessage.reset(); + + JMSBytesMessage nativeMsg = new JMSBytesMessage(); + + byte[] buf = new byte[1024]; + + int len; + + while ((len = bytesMessage.readBytes(buf)) != -1) + { + nativeMsg.writeBytes(buf, 0, len); + } + + _newMessage = nativeMsg; + setMessageProperties(message); + } + + public MessageConverter(MapMessage message) throws JMSException + { + MapMessage nativeMessage = new JMSMapMessage(); + + Enumeration mapNames = message.getMapNames(); + while (mapNames.hasMoreElements()) + { + String name = (String) mapNames.nextElement(); + nativeMessage.setObject(name, message.getObject(name)); + } + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + } + + public MessageConverter(ObjectMessage message) throws JMSException + { + ObjectMessage origMessage = (ObjectMessage) message; + ObjectMessage nativeMessage = new JMSObjectMessage(); + + nativeMessage.setObject(origMessage.getObject()); + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + + } + + public MessageConverter(TextMessage message) throws JMSException + { + TextMessage nativeMessage = new JMSTextMessage(); + + nativeMessage.setText(message.getText()); + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + } + + public MessageConverter(StreamMessage message) throws JMSException + { + StreamMessage nativeMessage = new JMSStreamMessage(); + + try + { + message.reset(); + while (true) + { + nativeMessage.writeObject(message.readObject()); + } + } + catch (MessageEOFException e) + { + // we're at the end so don't mind the exception + } + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + } + + public MessageConverter(Message message) throws JMSException + { + // Send a message with just properties. + // Throwing away content + BytesMessage nativeMessage = new JMSBytesMessage(); + + _newMessage = (AbstractJMSMessage) nativeMessage; + setMessageProperties(message); + } + + public AbstractJMSMessage getConvertedMessage() + { + return _newMessage; + } + + /** + * Sets all message properties + */ + protected void setMessageProperties(Message message) throws JMSException + { + setNonJMSProperties(message); + setJMSProperties(message); + } + + /** + * Sets all non-JMS defined properties on converted message + */ + protected void setNonJMSProperties(Message message) throws JMSException + { + Enumeration propertyNames = message.getPropertyNames(); + while (propertyNames.hasMoreElements()) + { + String propertyName = String.valueOf(propertyNames.nextElement()); + // TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them + if (!propertyName.startsWith("JMSX_")) + { + Object value = message.getObjectProperty(propertyName); + _newMessage.setObjectProperty(propertyName, value); + } + } + } + + /** + * Exposed JMS defined properties on converted message: + * JMSDestination - we don't set here + * JMSDeliveryMode - set + * JMSExpiration - we don't set here + * JMSPriority - we don't set here + * JMSMessageID - we don't set here + * JMSTimestamp - we don't set here + * JMSCorrelationID - set + * JMSReplyTo - set + * JMSType - set + * JMSRedlivered - we don't set here + */ + protected void setJMSProperties(Message message) throws JMSException + { + _newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode()); + + if (message.getJMSReplyTo() != null) + { + _newMessage.setJMSReplyTo(message.getJMSReplyTo()); + } + + _newMessage.setJMSType(message.getJMSType()); + + _newMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactory.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactory.java new file mode 100644 index 0000000000..33de600ab9 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import java.util.List; + +import javax.jms.JMSException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; + + +public interface MessageFactory +{ + AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, + ContentHeaderBody contentHeader, + AMQShortString exchange, AMQShortString routingKey, + List bodies) + throws JMSException, AMQException; + + AbstractJMSMessage createMessage() throws JMSException; +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactoryRegistry.java new file mode 100644 index 0000000000..7d84edf29b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageFactoryRegistry.java @@ -0,0 +1,127 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.JMSException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; + +public class MessageFactoryRegistry +{ + private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>(); + private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = + new HashMap<AMQShortString, MessageFactory>(); + + /** + * Construct a new registry with the default message factories registered + * @return a message factory registry + */ + public static MessageFactoryRegistry newDefaultRegistry() + { + MessageFactoryRegistry mf = new MessageFactoryRegistry(); + mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory()); + mf.registerFactory("text/plain", new JMSTextMessageFactory()); + mf.registerFactory("text/xml", new JMSTextMessageFactory()); + mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory()); + mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); + mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); + mf.registerFactory(null, new JMSBytesMessageFactory()); + + return mf; + } + + public void registerFactory(String mimeType, MessageFactory mf) + { + if (mf == null) + { + throw new IllegalArgumentException("Message factory must not be null"); + } + + _mimeStringToFactoryMap.put(mimeType, mf); + _mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf); + } + + public MessageFactory deregisterFactory(String mimeType) + { + _mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType)); + + return _mimeStringToFactoryMap.remove(mimeType); + } + + /** + * Create a message. This looks up the MIME type from the content header and instantiates the appropriate + * concrete message type. + * @param deliveryTag the AMQ message id + * @param redelivered true if redelivered + * @param contentHeader the content header that was received + * @param bodies a list of ContentBody instances + * @return the message. + * @throws AMQException + * @throws JMSException + */ + public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange, + AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies) + throws AMQException, JMSException + { + BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties; + + // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over + // AMQP. When the type is null, it can only be assumed that the message is a byte message. + AMQShortString contentTypeShortString = properties.getContentType(); + contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE) + : contentTypeShortString; + + MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString); + if (mf == null) + { + throw new AMQException(null, "Unsupport MIME type of " + properties.getContentTypeAsString(), null); + } + else + { + return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies); + } + } + + public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException + { + if (mimeType == null) + { + throw new IllegalArgumentException("Mime type must not be null"); + } + + MessageFactory mf = _mimeStringToFactoryMap.get(mimeType); + if (mf == null) + { + throw new AMQException(null, "Unsupport MIME type of " + mimeType, null); + } + else + { + return mf.createMessage(); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java new file mode 100644 index 0000000000..2e99c0eb6f --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java @@ -0,0 +1,356 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms.message; + +import javax.jms.Message; +import javax.jms.JMSException; +import javax.jms.Destination; +import java.util.Enumeration; + +/** + * Implementation of javax.jms.Message + */ +public class MessageImpl extends QpidMessage implements Message +{ + private String _messageID; + + //---- javax.jms.Message interface + /** + * Get the message ID. + * <p> The JMS sprec says: + * <p>The messageID header field contains a value that uniquely + * identifies each message sent by a provider. + * <p>When a message is sent, messageID can be ignored. When + * the send method returns it contains a provider-assigned value. + * <P>All JMSMessageID values must start with the prefix `ID:'. + * Uniqueness of message ID values across different providers is + * not required. + * + * @return The message ID + * @throws JMSException If getting the message Id fails due to internal JMS error. + */ + public String getJMSMessageID() throws JMSException + { + // check if the message ID has been set + if (_messageID == null) + { + _messageID = super.getJMSMessageID(); + } + return _messageID; + } + + /** + * Set the message ID. + * <p> The JMS spec says: + * <P>Providers set this field when a message is sent. This operation + * can be used to change the value of a message that's been received. + * + * @param messageID The ID of the message + * @throws JMSException If setting the message Id fails due to internal JMS error. + */ + public void setJMSMessageID(String messageID) throws JMSException + { + _messageID = messageID; + } + + /** + * Get the message timestamp. + * <p> The JMS sepc says: + * <P>The JMSTimestamp header field contains the time a message was + * handed off to a provider to be sent. It is not the time the + * message was actually transmitted because the actual send may occur + * later due to transactions or other client side queueing of messages. + * <p/> + * <P>When a message is sent, JMSTimestamp is ignored. When the send + * method returns it contains a a time value somewhere in the interval + * between the call and the return. It is in the format of a normal + * Java millis time value. + * <p/> + * <P>Since timestamps take some effort to create and increase a + * message's size, some JMS providers may be able to optimize message + * overhead if they are given a hint that timestamp is not used by an + * application. JMS message Producers provide a hint to disable + * timestamps. When a client sets a producer to disable timestamps + * they are saying that they do not depend on the value of timestamp + * for the messages it produces. These messages must either have + * timestamp set to null or, if the hint is ignored, timestamp must + * be set to its normal value. + * + * @return the message timestamp + * @throws JMSException if JMS fails to get the Timestamp + * due to internal JMS error. + * @see Message#setJMSTimestamp(long) + */ + public long getJMSTimestamp() throws JMSException + { + // TODO + return 0; + } + + public void setJMSTimestamp(long l) throws JMSException + { + // TODO + + } + + public byte[] getJMSCorrelationIDAsBytes() throws JMSException + { + // TODO + return new byte[0]; + } + + public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException + { + // TODO + + } + + public void setJMSCorrelationID(String string) throws JMSException + { + // TODO + + } + + public String getJMSCorrelationID() throws JMSException + { + // TODO + return null; + } + + public Destination getJMSReplyTo() throws JMSException + { + // TODO + return null; + } + + public void setJMSReplyTo(Destination destination) throws JMSException + { + // TODO + + } + + public Destination getJMSDestination() throws JMSException + { + // TODO + return null; + } + + public void setJMSDestination(Destination destination) throws JMSException + { + // TODO + + } + + public int getJMSDeliveryMode() throws JMSException + { + // TODO + return 0; + } + + public void setJMSDeliveryMode(int i) throws JMSException + { + // TODO + + } + + public boolean getJMSRedelivered() throws JMSException + { + // TODO + return false; + } + + public void setJMSRedelivered(boolean b) throws JMSException + { + // TODO + + } + + public String getJMSType() throws JMSException + { + // TODO + return null; + } + + public void setJMSType(String string) throws JMSException + { + // TODO + + } + + public long getJMSExpiration() throws JMSException + { + // TODO + return 0; + } + + public void setJMSExpiration(long l) throws JMSException + { + // TODO + + } + + public int getJMSPriority() throws JMSException + { + // TODO + return 0; + } + + public void setJMSPriority(int i) throws JMSException + { + // TODO + + } + + public void clearProperties() throws JMSException + { + // TODO + + } + + public boolean propertyExists(String string) throws JMSException + { + // TODO + return false; + } + + public boolean getBooleanProperty(String string) throws JMSException + { + // TODO + return false; + } + + public byte getByteProperty(String string) throws JMSException + { + // TODO + return 0; + } + + public short getShortProperty(String string) throws JMSException + { + // TODO + return 0; + } + + public int getIntProperty(String string) throws JMSException + { + // TODO + return 0; + } + + public long getLongProperty(String string) throws JMSException + { + // TODO + return 0; + } + + public float getFloatProperty(String string) throws JMSException + { + // TODO + return 0; + } + + public double getDoubleProperty(String string) throws JMSException + { + // TODO + return 0; + } + + public String getStringProperty(String string) throws JMSException + { + // TODO + return null; + } + + public Object getObjectProperty(String string) throws JMSException + { + // TODO + return null; + } + + public Enumeration getPropertyNames() throws JMSException + { + // TODO + return null; + } + + public void setBooleanProperty(String string, boolean b) throws JMSException + { + // TODO + + } + + public void setByteProperty(String string, byte b) throws JMSException + { + // TODO + + } + + public void setShortProperty(String string, short i) throws JMSException + { + // TODO + + } + + public void setIntProperty(String string, int i) throws JMSException + { + // TODO + + } + + public void setLongProperty(String string, long l) throws JMSException + { + // TODO + + } + + public void setFloatProperty(String string, float v) throws JMSException + { + // TODO + + } + + public void setDoubleProperty(String string, double v) throws JMSException + { + // TODO + + } + + public void setStringProperty(String string, String string1) throws JMSException + { + // TODO + + } + + public void setObjectProperty(String string, Object object) throws JMSException + { + // TODO + + } + + public void acknowledge() throws JMSException + { + // TODO + + } + + public void clearBody() throws JMSException + { + // TODO + + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java new file mode 100644 index 0000000000..3c9b8ba3d4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import javax.jms.Message; +import javax.jms.JMSException; + + +public class QpidMessage +{ + /** + * The underlying qpidity message + */ + private org.apache.qpidity.api.Message _qpidityMessage; + + + //--- javax.jsm.Message API + /** + * Get the message ID. + * <p> The JMS sprec says: + * <p>The messageID header field contains a value that uniquely + * identifies each message sent by a provider. + * <p>When a message is sent, messageID can be ignored. When + * the send method returns it contains a provider-assigned value. + * <P>All JMSMessageID values must start with the prefix `ID:'. + * Uniqueness of message ID values across different providers is + * not required. + * + * @return The message ID + * @throws JMSException If getting the message Id fails due to internal JMS error. + */ + public String getJMSMessageID() throws JMSException + { + return "ID:" + _qpidityMessage.getMessageProperties().getMessageId(); + } + + + + public Message getJMSMessage() + { + // todo + return null; + } + + public Long getMessageID() + { + //todo + return new Long(1); + } + +} + + diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/UnprocessedMessage.java new file mode 100644 index 0000000000..605760bc96 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/UnprocessedMessage.java @@ -0,0 +1,131 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.jms.message; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.qpid.framing.BasicDeliverBody; +import org.apache.qpid.framing.BasicReturnBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; + +/** + * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and + * the content body/ies. + * + * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher + * thread in order to minimise the amount of work done in the MINA dispatcher thread. + */ +public class UnprocessedMessage +{ + private long _bytesReceived = 0; + + private final BasicDeliverBody _deliverBody; + private final BasicReturnBody _bounceBody; // TODO: check change (gustavo) + private final int _channelId; + private ContentHeaderBody _contentHeader; + + /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ + private List<ContentBody> _bodies; + + public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody) + { + _deliverBody = deliverBody; + _channelId = channelId; + _bounceBody = null; + } + + + public UnprocessedMessage(int channelId, BasicReturnBody bounceBody) + { + _deliverBody = null; + _channelId = channelId; + _bounceBody = bounceBody; + } + + public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException + { + + if (body.payload != null) + { + final long payloadSize = body.payload.remaining(); + + if (_bodies == null) + { + if (payloadSize == getContentHeader().bodySize) + { + _bodies = Collections.singletonList(body); + } + else + { + _bodies = new ArrayList<ContentBody>(); + _bodies.add(body); + } + + } + else + { + _bodies.add(body); + } + _bytesReceived += payloadSize; + } + } + + public boolean isAllBodyDataReceived() + { + return _bytesReceived == getContentHeader().bodySize; + } + + public BasicDeliverBody getDeliverBody() + { + return _deliverBody; + } + + public BasicReturnBody getBounceBody() + { + return _bounceBody; + } + + + public int getChannelId() + { + return _channelId; + } + + + public ContentHeaderBody getContentHeader() + { + return _contentHeader; + } + + public void setContentHeader(ContentHeaderBody contentHeader) + { + this._contentHeader = contentHeader; + } + + public List<ContentBody> getBodies() + { + return _bodies; + } + +} |