summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java1867
1 files changed, 0 insertions, 1867 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java
deleted file mode 100644
index c2d17e51fb..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java
+++ /dev/null
@@ -1,1867 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.messageStore;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.exception.InternalErrorException;
-import org.apache.qpid.server.exception.InvalidXidException;
-import org.apache.qpid.server.exception.MessageAlreadyStagedException;
-import org.apache.qpid.server.exception.MessageDoesntExistException;
-import org.apache.qpid.server.exception.QueueAlreadyExistsException;
-import org.apache.qpid.server.exception.QueueDoesntExistException;
-import org.apache.qpid.server.exception.UnknownXidException;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.JDBCAbstractRecord;
-import org.apache.qpid.server.txn.JDBCDequeueRecord;
-import org.apache.qpid.server.txn.JDBCEnqueueRecord;
-import org.apache.qpid.server.txn.JDBCTransaction;
-import org.apache.qpid.server.txn.JDBCTransactionManager;
-import org.apache.qpid.server.txn.Transaction;
-import org.apache.qpid.server.txn.TransactionManager;
-import org.apache.qpid.server.txn.TransactionRecord;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.XidImpl;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import javax.transaction.xa.Xid;
-import java.sql.Blob;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * Created by Arnaud Simon
- * Date: 15-May-2007
- * Time: 09:59:12
- */
-public class JDBCStore implements MessageStore
-{
- //========================================================================
- // Static Constants
- //========================================================================
- // The logger for this class
- private static final Logger _log = Logger.getLogger(JDBCStore.class);
- // the database connection pool
- public static ConnectionPool _connectionPool = null;
- // the prepared statements
- //==== IMPORTANT: remember to update if we add more prepared statements!
- private static final int CREATE_EXCHANGE = 0;
- private static final int DELETE_EXCHANGE = 1;
- private static final int BIND_QUEUE = 2;
- private static final int UNBIND_QUEUE = 3;
- private static final int CREATE_QUEUE = 4;
- private static final int DELETE_QUEUE = 5;
- private static final int STAGE_MESSAGE = 6;
- private static final int UPDATE_MESSAGE_PAYLOAD = 7;
- private static final int SELECT_MESSAGE_PAYLOAD = 8;
- private static final int DELETE_MESSAGE = 9;
- private static final int ENQUEUE = 10;
- private static final int DEQUEUE = 11;
- private static final int GET_ALL_QUEUES = 12;
- private static final int GET_ALL_MESSAGES = 13;
- private static final int SAVE_RECORD = 14;
- private static final int SAVE_XID = 15;
- private static final int DELETE_RECORD = 16;
- private static final int DELETE_XID = 17;
- private static final int UPDATE_QMR = 18;
- private static final int GET_CONTENT_HEADER = 19;
- private static final int GET_MESSAGE_INFO = 20;
- //==== size:
- private static final int STATEMENT_SIZE = 21;
- //========================================================================
- // field properties
- //========================================================================
- //The default URL
- protected String _connectionURL = "jdbc:derby:derbyDB;create=true";
- // The default driver
- private String _driver = "org.apache.derby.jdbc.EmbeddedDriver";
- // The pool max size
- private int _maxSize = 40;
- // The tables
- // the table containing the messages
- private String _tableNameMessage = "MessageTable";
- private String _tableNameQueue = "QueueTable";
- private String _tableNameQueueMessageRelation = "QeueMessageRelation";
- private String _tableNameExchange = "Exchange";
- private String _tableNameExchangeQueueRelation = "ExchangeQueueRelation";
- private String _tableNameTransaction = "TransactionTable";
- private String _tableNameRecord = "RecordTable";
-
- // The transaction maanger
- private JDBCTransactionManager _tm;
- // the message ID
- private long _messageID = 0;
- // the virtual host
- private VirtualHost _virtualHost;
- // indicate whether this store is recovering
- private boolean _recovering = false;
- // the recovered queues
- private HashMap<Integer, AMQQueue> _queueMap;
-
- //========================================================================
- // Interface MessageStore
- //========================================================================
- public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
- throws
- InternalErrorException,
- IllegalArgumentException
- {
- _log.info("Configuring Derby message store");
- // the virtual host
- _virtualHost = virtualHost;
- // Specify that the tables must be dropped.
- // If true then this means that recovery is not possible.
- boolean dropTables = true;
- if (config != null)
- {
- dropTables = config.getBoolean(base + "dropTables", false);
- _driver = config.getString(base + "driver", _driver);
- _connectionURL = config.getString(base + "connectionURL", _connectionURL);
- _maxSize = config.getInt(base + "connectionPoolSize", 20);
- }
- if (dropTables)
- {
- _log.info("Dropping table of Derby message store");
- }
- if (!setupStore(dropTables))
- {
- _log.error("Error configuration of Derby store failed");
- throw new InternalErrorException("Error configuration of Derby store failed");
- }
- // recovery
- _recovering = true;
- _queueMap = recover(); //==> recover the queues and the messages
- // recreate the excahnges and bind the queues
- recoverExchanges(_queueMap);
- _recovering = false;
- _tm = (JDBCTransactionManager) tm;
- _tm.configure(this, "txn", config);
- _queueMap.clear();
- _queueMap = null;
- }
-
- public void close()
- throws
- InternalErrorException
- {
- // nothing has to be done
- }
-
- public void createExchange(Exchange exchange)
- throws
- InternalErrorException
- {
- if (!_recovering)
- {
- MyConnection connection = null;
- try
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- PreparedStatement pstmt = connection.getStatements()[CREATE_EXCHANGE];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameExchange +
- " (Name,Type) VALUES (?,?)");
- connection.getStatements()[CREATE_EXCHANGE] = pstmt;
- }
- pstmt.setString(1, exchange.getName().asString());
- pstmt.setString(2, exchange.getType().asString());
- pstmt.executeUpdate();
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot create Exchange: " + exchange, e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot create Exchange: " + exchange, e);
- }
- }
- }
- }
- }
-
- public void removeExchange(Exchange exchange)
- throws
- InternalErrorException
- {
- if (!_recovering)
- {
- MyConnection connection = null;
- try
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- PreparedStatement pstmt = connection.getStatements()[DELETE_EXCHANGE];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameExchange +
- " WHERE Name = ?");
- connection.getStatements()[DELETE_EXCHANGE] = pstmt;
- }
- pstmt.setString(1, exchange.getName().asString());
- pstmt.executeUpdate();
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot remove Exchange: " + exchange, e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot remove Exchange: " + exchange, e);
- }
- }
- }
- }
- }
-
- public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
- throws
- InternalErrorException
- {
- if (!_recovering)
- {
- MyConnection connection = null;
- try
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- PreparedStatement pstmt = connection.getStatements()[BIND_QUEUE];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameExchangeQueueRelation +
- " (QueueID,Name,RoutingKey,fieldTable) VALUES (?,?,?,?)");
- connection.getStatements()[BIND_QUEUE] = pstmt;
- }
- pstmt.setInt(1, queue.getQueueID());
- pstmt.setString(2, exchange.getName().asString());
- pstmt.setString(3, routingKey.asString());
- if (args != null)
- {
- pstmt.setBytes(4, args.getDataAsBytes());
- }
- else
- {
- pstmt.setBytes(4, null);
- }
- pstmt.executeUpdate();
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot create Exchange: " + exchange, e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot create Exchange: " + exchange, e);
- }
- }
- }
- }
- }
-
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
- throws
- InternalErrorException
- {
- MyConnection connection = null;
- try
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- PreparedStatement pstmt = connection.getStatements()[UNBIND_QUEUE];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameExchangeQueueRelation +
- " WHERE QueueID = ? AND NAME = ? AND RoutingKey = ?");
- connection.getStatements()[UNBIND_QUEUE] = pstmt;
- }
- pstmt.setInt(1, queue.getQueueID());
- pstmt.setString(2, exchange.getName().asString());
- pstmt.setString(3, routingKey.asString());
- pstmt.executeUpdate();
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot remove Exchange: " + exchange, e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot remove Exchange: " + exchange, e);
- }
- }
- }
- }
-
- public void createQueue(StorableQueue queue)
- throws
- InternalErrorException,
- QueueAlreadyExistsException
- {
- MyConnection connection = null;
- try
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- PreparedStatement pstmt = connection.getStatements()[CREATE_QUEUE];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameQueue +
- " (QueueID,Name,Owner) VALUES (?,?,?)");
- connection.getStatements()[CREATE_QUEUE] = pstmt;
- }
- pstmt.setInt(1, queue.getQueueID());
- pstmt.setString(2, queue.getName().asString());
- if (queue.getOwner() != null)
- {
- pstmt.setString(3, queue.getOwner().asString());
- }
- else
- {
- pstmt.setString(3, null);
- }
- pstmt.executeUpdate();
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot create Queue: " + queue, e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot create Queue: " + queue, e);
- }
- }
- }
- }
-
- public void destroyQueue(StorableQueue queue)
- throws
- InternalErrorException,
- QueueDoesntExistException
- {
- MyConnection connection = null;
- try
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- PreparedStatement pstmt = connection.getStatements()[DELETE_QUEUE];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameQueue +
- " WHERE QueueID = ?");
- connection.getStatements()[DELETE_QUEUE] = pstmt;
- }
- pstmt.setInt(1, queue.getQueueID());
- pstmt.executeUpdate();
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot remove Queue: " + queue, e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot remove Queue: " + queue, e);
- }
- }
- }
- }
-
- public void stage(StorableMessage m)
- throws
- InternalErrorException,
- MessageAlreadyStagedException
- {
- if (m.isStaged() || m.isEnqueued())
- {
- _log.error("Message with Id " + m.getMessageId() + " is already staged");
- throw new MessageAlreadyStagedException("Message eith Id " + m.getMessageId() + " is already staged");
- }
- MyConnection connection = null;
- try
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- stage(connection, m);
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot stage Message: " + m, e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot stage Message: " + m, e);
- }
- }
- }
- }
-
- public void appendContent(StorableMessage m, byte[] data, int offset, int size)
- throws
- InternalErrorException,
- MessageDoesntExistException
- {
- // The message must have been staged
- if (!m.isStaged())
- {
- _log.error("Cannot append content of message Id "
- + m.getMessageId() + " as it has not been staged");
- throw new MessageDoesntExistException("Cannot append content of message Id "
- + m.getMessageId() + " as it has not been staged");
- }
- MyConnection connection = null;
- try
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- appendContent(connection, m, data, offset, size);
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot stage Message: " + m, e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot stage Message: " + m, e);
- }
- }
- }
- }
-
- public byte[] loadContent(StorableMessage m, int offset, int size)
- throws
- InternalErrorException,
- MessageDoesntExistException
- {
- MyConnection connection = null;
- try
- {
- byte[] result;
- connection = (MyConnection) _connectionPool.acquireInstance();
- PreparedStatement pstmt = connection.getStatements()[SELECT_MESSAGE_PAYLOAD];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("SELECT Payload FROM " + _tableNameMessage +
- " WHERE MessageID = ? ");
- connection.getStatements()[SELECT_MESSAGE_PAYLOAD] = pstmt;
- }
- pstmt.setLong(1, m.getMessageId());
- ResultSet rs = pstmt.executeQuery();
- if (!rs.next())
- {
- throw new MessageDoesntExistException("Cannot load content of message Id "
- + m.getMessageId() + " as it has not been found");
- }
- Blob myBlob = rs.getBlob(1);
-
- if (myBlob.length() > 0)
- {
- if (size == 0)
- {
- result = myBlob.getBytes(offset, (int) myBlob.length());
- }
- else
- {
- result = myBlob.getBytes(offset, size);
- }
- }
- else
- {
- throw new MessageDoesntExistException("Cannot load content of message Id "
- + m.getMessageId() + " as it has not been found");
- }
- rs.close();
- return result;
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot load Message: " + m, e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot load Message: " + m, e);
- }
- }
- }
- }
-
- public void destroy(StorableMessage m)
- throws
- InternalErrorException,
- MessageDoesntExistException
- {
- MyConnection connection = null;
- try
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- destroy(connection, m);
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot destroy message: " + m, e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot destroy message: " + m, e);
- }
- }
- }
- }
-
- public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
- throws
- InternalErrorException,
- QueueDoesntExistException,
- InvalidXidException,
- UnknownXidException,
- MessageDoesntExistException
- {
- MyConnection connection = null;
- // Get the current tx
- JDBCTransaction tx = getTx(xid);
- // If this operation is transacted then we need to add a record
- if (tx != null && !tx.isPrepared())
- {
- // add an enqueue record
- tx.addRecord(new JDBCEnqueueRecord(m, queue));
- }
- else
- {
- try
- {
- if (tx != null)
- {
- connection = tx.getConnection();
- }
- else
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- }
- if (!m.isStaged() && !m.isEnqueued())
- {
- //This is the first time this message is enqueued and it has not been staged.
- stage(connection, m);
- appendContent(connection, m, m.getData(), 0, m.getData().length);
- }
- PreparedStatement pstmt = connection.getStatements()[ENQUEUE];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameQueueMessageRelation +
- " (QueueID,MessageID,Prepared) VALUES (?,?,0)");
- connection.getStatements()[ENQUEUE] = pstmt;
- }
- pstmt.setInt(1, queue.getQueueID());
- pstmt.setLong(2, m.getMessageId());
- pstmt.executeUpdate();
- m.enqueue(queue);
- queue.enqueue(m);
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue, e);
- }
- finally
- {
- if (tx == null && connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue, e);
- }
- }
- }
- }
- }
-
- public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
- throws
- InternalErrorException,
- QueueDoesntExistException,
- InvalidXidException,
- UnknownXidException
- {
- MyConnection connection = null;
- // Get the current tx
- JDBCTransaction tx = getTx(xid);
- // If this operation is transacted then we need to add a record
- if (tx != null && !tx.isPrepared())
- {
- // add an dequeue record
- tx.addRecord(new JDBCDequeueRecord(m, queue));
- }
- else
- {
- try
- {
- if (tx != null)
- {
- connection = tx.getConnection();
- }
- else
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- }
- PreparedStatement pstmt = connection.getStatements()[DEQUEUE];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameQueueMessageRelation +
- " WHERE QueueID = ? AND MessageID = ?");
- connection.getStatements()[DEQUEUE] = pstmt;
- }
- pstmt.setInt(1, queue.getQueueID());
- pstmt.setLong(2, m.getMessageId());
- pstmt.executeUpdate();
- m.dequeue(queue);
- if (!m.isEnqueued())
- {
- // delete this message from persistence store
- destroy(connection, m);
- }
- queue.dequeue(m);
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue, e);
- }
- finally
- {
- if (tx == null && connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot enqueue message : " + m + " in queue: " + queue, e);
- }
- }
- }
- }
- }
-
- public Collection<StorableQueue> getAllQueues()
- throws
- InternalErrorException
- {
- MyConnection connection = null;
- List<StorableQueue> result = new ArrayList<StorableQueue>();
- try
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- PreparedStatement pstmt = connection.getStatements()[GET_ALL_QUEUES];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("SELECT * FROM " + _tableNameQueue);
- connection.getStatements()[GET_ALL_QUEUES] = pstmt;
- }
- ResultSet rs = pstmt.executeQuery();
- while (rs.next())
- {
- //the queue owner may be null
- AMQShortString queueOwner = null;
- if (rs.getString(3) != null)
- {
- queueOwner = new AMQShortString(rs.getString(3));
- }
- result.add(new AMQQueue(new AMQShortString(rs.getString(2)), true, queueOwner,
- false, _virtualHost));
- }
- rs.close();
- return result;
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot get all queues", e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot get all queues", e);
- }
- }
- }
- }
-
- public Collection<StorableMessage> getAllMessages(StorableQueue queue)
- throws
- InternalErrorException
- {
- MyConnection connection = null;
- try
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- return getAllMessages(connection, queue);
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot get all queues", e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot get all queues", e);
- }
- }
- }
- }
-
- public HashMap<Xid, Transaction> getAllInddoubt()
- throws
- InternalErrorException
- {
- MyConnection connection = null;
- HashMap<Xid, Transaction> result = new HashMap<Xid, Transaction>();
- try
- {
- //TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
- MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
- // re-create all the tx
- connection = (MyConnection) _connectionPool.acquireInstance();
- Statement stmt = connection.getConnection().createStatement();
- ResultSet rs = stmt.executeQuery("SELECT * FROM " + _tableNameTransaction);
- JDBCTransaction foundTx;
- Xid foundXid;
- long foundXIDID;
- while (rs.next())
- {
- // set the XID_ID
- foundXIDID = rs.getLong(1);
- if (foundXIDID > JDBCTransaction._xidId)
- {
- JDBCTransaction._xidId = foundXIDID;
- }
- foundTx = new JDBCTransaction();
- foundXid = new XidImpl(rs.getBlob(3).getBytes(1, (int) rs.getBlob(3).length()),
- rs.getInt(2), rs.getBlob(4).getBytes(1, (int) rs.getBlob(4).length()));
- // get all the records
- Statement stmtr = connection.getConnection().createStatement();
- ResultSet rsr = stmtr.executeQuery("SELECT * FROM " + _tableNameRecord +
- " WHERE XID_ID = " + rs.getLong(1));
- int foundType;
- AMQQueue foundQueue;
- StorableMessage foundMessage;
- TransactionRecord foundRecord;
- while (rsr.next())
- {
- // those messages were not recovered before so they need to be recreated
- foundType = rsr.getInt(2);
- foundQueue = _queueMap.get(new Integer(rsr.getInt(4)));
-
- //DTX MessageStore - this -> null , txContext -> null
- foundMessage = new AMQMessage(rs.getLong(3), null, messageHandleFactory, null);
- if (foundType == JDBCAbstractRecord.TYPE_DEQUEUE)
- {
- foundRecord = new JDBCDequeueRecord(foundMessage, foundQueue);
- }
- else
- {
- foundRecord = new JDBCEnqueueRecord(foundMessage, foundQueue);
- }
- foundTx.addRecord(foundRecord);
- }
- rsr.close();
- // add this tx to the map
- result.put(foundXid, foundTx);
- }
- rs.close();
- return result;
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot recover: ", e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot recover: ", e);
- }
- }
- }
- }
-
-
- public long getNewMessageId()
- {
- return _messageID++;
- }
-
- //========================================================================
- // Public methods
- //========================================================================
-
- public MyConnection getConnection()
- throws
- Exception
- {
- return (MyConnection) _connectionPool.acquireInstance();
- }
-
- public void commitConnection(MyConnection connection)
- throws
- InternalErrorException
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot commit connection =", e);
- }
- }
-
- public void rollbackConnection(MyConnection connection)
- throws
- InternalErrorException
- {
- try
- {
- connection.getConnection().rollback();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to rollback this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot rollback connection", e);
- }
- }
-
- public void appendContent(MyConnection connection, StorableMessage m, byte[] data, int offset, int size)
- throws
- SQLException,
- MessageDoesntExistException
- {
- PreparedStatement pstmt = connection.getStatements()[SELECT_MESSAGE_PAYLOAD];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("SELECT Payload FROM " + _tableNameMessage +
- " WHERE MessageID = ? ");
- connection.getStatements()[SELECT_MESSAGE_PAYLOAD] = pstmt;
- }
- pstmt.setLong(1, m.getMessageId());
- ResultSet rs = pstmt.executeQuery();
- if (!rs.next())
- {
- throw new MessageDoesntExistException("Cannot append content of message Id "
- + m.getMessageId() + " as it has not been found");
- }
- Blob myBlob = rs.getBlob(1);
- byte[] oldPayload;
- if (myBlob != null && myBlob.length() > 0)
- {
- oldPayload = myBlob.getBytes(1, (int) myBlob.length());
- }
- else
- {
- oldPayload = new byte[0];
- }
- rs.close();
- byte[] newPayload = new byte[oldPayload.length + size];
- ByteBuffer buffer = ByteBuffer.wrap(newPayload);
- buffer.put(oldPayload);
- buffer.put(data, offset, size);
- PreparedStatement pstmtUpdate = connection.getStatements()[UPDATE_MESSAGE_PAYLOAD];
- if (pstmtUpdate == null)
- {
- pstmtUpdate = connection.getConnection().prepareStatement("UPDATE " + _tableNameMessage +
- " SET Payload = ? WHERE MessageID = ?");
- connection.getStatements()[UPDATE_MESSAGE_PAYLOAD] = pstmtUpdate;
- }
- pstmtUpdate.setBytes(1, newPayload);
- pstmtUpdate.setLong(2, m.getMessageId());
- pstmtUpdate.executeUpdate();
- }
-
- public void stage(MyConnection connection, StorableMessage m)
- throws
- Exception
- {
- PreparedStatement pstmt = connection.getStatements()[STAGE_MESSAGE];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameMessage +
- " (MessageID,Header,ExchangeName,RoutingKey,Mandatory,Is_Immediate) VALUES (?,?,?,?,?,?)");
- connection.getStatements()[STAGE_MESSAGE] = pstmt;
- }
- pstmt.setLong(1, m.getMessageId());
- pstmt.setBytes(2, m.getHeaderBody());
- pstmt.setString(3, ((AMQMessage) m).getMessagePublishInfo().getExchange().asString());
- pstmt.setString(4, ((AMQMessage) m).getMessagePublishInfo().getRoutingKey().asString());
- pstmt.setBoolean(5, ((AMQMessage) m).getMessagePublishInfo().isMandatory());
- pstmt.setBoolean(6, ((AMQMessage) m).getMessagePublishInfo().isImmediate());
- pstmt.executeUpdate();
- m.staged();
- }
-
- public void saveRecord(MyConnection connection, JDBCTransaction tx, JDBCAbstractRecord record)
- throws
- InternalErrorException
- {
- try
- {
- PreparedStatement pstmt = connection.getStatements()[SAVE_RECORD];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameRecord +
- " (XID_ID,Type,MessageID,QueueID) VALUES (?,?,?,?)");
- connection.getStatements()[SAVE_RECORD] = pstmt;
- }
- pstmt.setLong(1, tx.getXidID());
- pstmt.setInt(2, record.getType());
- pstmt.setLong(3, record.getMessageID());
- pstmt.setLong(4, record.getQueueID());
- pstmt.executeUpdate();
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot save record: " + record, e);
- }
- }
-
- public void saveXID(MyConnection connection, JDBCTransaction tx, Xid xid)
- throws
- InternalErrorException
- {
- try
- {
- PreparedStatement pstmt = connection.getStatements()[SAVE_XID];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("INSERT INTO " + _tableNameTransaction +
- " (XID_ID,FormatId, BranchQualifier,GlobalTransactionId) VALUES (?,?,?,?)");
- connection.getStatements()[SAVE_XID] = pstmt;
- }
- pstmt.setLong(1, tx.getXidID());
- pstmt.setInt(2, xid.getFormatId());
- pstmt.setBytes(3, xid.getBranchQualifier());
- pstmt.setBytes(4, xid.getGlobalTransactionId());
- pstmt.executeUpdate();
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot save xid: " + xid, e);
- }
- }
-
- public void deleteRecords(MyConnection connection, JDBCTransaction tx)
- throws
- InternalErrorException
- {
- try
- {
- PreparedStatement pstmt = connection.getStatements()[DELETE_RECORD];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameRecord +
- " WHERE XID_ID = ?");
- connection.getStatements()[DELETE_RECORD] = pstmt;
- }
- pstmt.setLong(1, tx.getXidID());
- pstmt.executeUpdate();
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot delete record: " + tx.getXidID(), e);
- }
- }
-
- public void deleteXID(MyConnection connection, JDBCTransaction tx)
- throws
- InternalErrorException
- {
- try
- {
- PreparedStatement pstmt = connection.getStatements()[DELETE_XID];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameTransaction +
- " WHERE XID_ID = ?");
- connection.getStatements()[DELETE_XID] = pstmt;
- }
- pstmt.setLong(1, tx.getXidID());
- pstmt.executeUpdate();
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot delete xid: " + tx.getXidID(), e);
- }
- }
-
- public void prepareDequeu(Xid xid, StorableMessage m, StorableQueue queue)
- throws
- UnknownXidException,
- InternalErrorException
- {
- JDBCTransaction tx = getTx(xid);
- if (tx == null)
- {
- throw new UnknownXidException(xid, null);
- }
- updateQueueMessageRelation(tx.getConnection(), queue.getQueueID(), m.getMessageId(), 1);
-
- }
-
- public void rollbackDequeu(Xid xid, StorableMessage m, StorableQueue queue)
- throws
- UnknownXidException,
- InternalErrorException
- {
- JDBCTransaction tx = getTx(xid);
- if (tx == null)
- {
- throw new UnknownXidException(xid, null);
- }
- updateQueueMessageRelation(tx.getConnection(), queue.getQueueID(), m.getMessageId(), 0);
- }
-
- //========================================================================
- // Private methods
- //========================================================================
-
-
- private void updateQueueMessageRelation(MyConnection connection,
- int queueID, long messageId, int prepared)
- throws
- InternalErrorException
- {
- try
- {
- PreparedStatement pstmt = connection.getStatements()[UPDATE_QMR];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("UPDATE " + _tableNameQueueMessageRelation +
- " SET Prepared = ? WHERE MessageID = ? AND QueueID = ?");
- connection.getStatements()[UPDATE_QMR] = pstmt;
- }
- pstmt.setInt(1, prepared);
- pstmt.setLong(2, messageId);
- pstmt.setInt(3, queueID);
- pstmt.executeUpdate();
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot update QMR", e);
- }
-
- }
-
- public MessagePublishInfo getMessagePublishInfo(StorableMessage m)
- throws
- InternalErrorException
- {
- MyConnection connection = null;
- MessagePublishInfo result;
- try
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- PreparedStatement pstmt = connection.getStatements()[GET_MESSAGE_INFO];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("SELECT ExchangeName, RoutingKey," +
- " Mandatory, Is_Immediate from " + _tableNameMessage +
- " WHERE MessageID = ?");
- connection.getStatements()[GET_MESSAGE_INFO] = pstmt;
- }
- pstmt.setLong(1, m.getMessageId());
- final ResultSet rs = pstmt.executeQuery();
- if (rs.next())
- {
-
- result = new MessagePublishInfo()
- {
- AMQShortString exchange = new AMQShortString(rs.getString(1));
- final AMQShortString routingKey = new AMQShortString(rs.getString(2));
- final boolean mandatory = rs.getBoolean(3);
- final boolean immediate = rs.getBoolean(4);
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
-
- public void setExchange(AMQShortString ex)
- {
- exchange = ex;
- }
- };
- }
- else
- {
- throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m);
- }
- rs.close();
- return result;
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m, e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot get MessagePublishInfo of message: " + m, e);
- }
- }
- }
- }
-
- public ContentHeaderBody getContentHeaderBody(StorableMessage m)
- throws
- InternalErrorException
- {
- MyConnection connection = null;
- ContentHeaderBody result;
- try
- {
- connection = (MyConnection) _connectionPool.acquireInstance();
- PreparedStatement pstmt = connection.getStatements()[GET_CONTENT_HEADER];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("SELECT Header from " + _tableNameMessage +
- " WHERE MessageID = ?");
- connection.getStatements()[GET_CONTENT_HEADER] = pstmt;
- }
- pstmt.setLong(1, m.getMessageId());
- ResultSet rs = pstmt.executeQuery();
- if (rs.next())
- {
- result = new ContentHeaderBody(ByteBuffer.wrap(rs.getBlob(1).getBytes(1, (int) rs.getBlob(1).length())), 0);
- }
- else
- {
- throw new InternalErrorException("Cannot get Content Header of message: " + m);
- }
- rs.close();
- return result;
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot get Content Header of message: " + m, e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot get Content Header of message: " + m, e);
- }
- }
- }
- }
-
- private List<StorableMessage> getAllMessages(MyConnection connection, StorableQueue queue)
- throws
- SQLException,
- AMQException
- {
- List<StorableMessage> result = new ArrayList<StorableMessage>();
-// TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
- MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
- PreparedStatement pstmt = connection.getStatements()[GET_ALL_MESSAGES];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("SELECT " + _tableNameMessage + ".MessageID, Header FROM " +
- _tableNameMessage +
- " INNER JOIN " +
- _tableNameQueueMessageRelation +
- " ON " +
- _tableNameMessage + ".MessageID = " + _tableNameQueueMessageRelation + ".MessageID" +
- " WHERE " +
- _tableNameQueueMessageRelation + ".QueueID = ?" +
- " AND " +
- _tableNameQueueMessageRelation + ".Prepared = 0");
- connection.getStatements()[GET_ALL_MESSAGES] = pstmt;
- }
- pstmt.setInt(1, queue.getQueueID());
- ResultSet rs = pstmt.executeQuery();
- AMQMessage foundMessage;
- // ContentHeaderBody hb;
- while (rs.next())
- {
-
- //DTX MessageStore - this -> null , txContext -> null
- foundMessage = new AMQMessage(rs.getLong(1), null, messageHandleFactory, null);
-
- result.add(foundMessage);
- }
- rs.close();
- return result;
- }
-
- private HashMap<Integer, AMQQueue> recover()
- throws
- InternalErrorException
- {
- MyConnection connection = null;
- HashMap<Integer, AMQQueue> result = new HashMap<Integer, AMQQueue>();
- try
- {
- // re-create all the queues
- connection = (MyConnection) _connectionPool.acquireInstance();
- Statement stmt = connection.getConnection().createStatement();
- ResultSet rs = stmt.executeQuery("SELECT * FROM " + _tableNameQueue);
- AMQQueue foundQueue;
- List<StorableMessage> foundMessages;
- StoreContext context = new StoreContext();
- while (rs.next())
- {
- AMQShortString owner = null;
- if (rs.getString(3) != null)
- {
- owner = new AMQShortString(rs.getString(3));
- }
- foundQueue = new AMQQueue(new AMQShortString(rs.getString(2)),
- true, owner, false, _virtualHost);
- // get all the Messages of that queue
- foundMessages = getAllMessages(connection, foundQueue);
- // enqueue those messages
- if (_log.isDebugEnabled())
- {
- _log.debug("Recovering " + foundMessages.size() + " messages for queue " + foundQueue.getName());
- }
- for (StorableMessage foundMessage : foundMessages)
- {
- foundMessage.staged();
- foundMessage.enqueue(foundQueue);
- foundQueue.enqueue(foundMessage);
- // FIXME: TGM AS foundQueue.process(context, (AMQMessage) foundMessage, false);
- }
- // add the queue in the result map
- result.put(foundQueue.getQueueID(), foundQueue);
- // add it in the registry
- _virtualHost.getQueueRegistry().registerQueue(foundQueue);
- }
- rs.close();
- return result;
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot recover: ", e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot recover: ", e);
- }
- }
- }
- }
-
- private void recoverExchanges(HashMap<Integer, AMQQueue> queueMap)
- throws
- InternalErrorException
- {
- MyConnection connection = null;
- try
- {
- // re-create all the exchanges
- connection = (MyConnection) _connectionPool.acquireInstance();
- Statement stmt = connection.getConnection().createStatement();
- ResultSet rs = stmt.executeQuery("SELECT * FROM " + _tableNameExchange);
- Exchange foundExchange;
- AMQQueue foundQueue;
- while (rs.next())
- {
- foundExchange = _virtualHost.getExchangeFactory().createExchange(
- new AMQShortString(rs.getString(1)), new AMQShortString(rs.getString(2)), true, false);
- // get all the bindings
- Statement stmtb = connection.getConnection().createStatement();
- ResultSet rsb = stmtb.executeQuery("SELECT * FROM " + _tableNameExchangeQueueRelation +
- " WHERE Name = '" + rs.getString(1) + "'");
- while (rsb.next())
- {
- foundQueue = queueMap.get(new Integer(rsb.getInt(1)));
- if (foundQueue != null)
- {
- // the field table
- FieldTable ft = null;
- if (rsb.getBlob(4) != null)
- {
- long length = rsb.getBlob(4).length();
- ByteBuffer buffer = ByteBuffer.wrap(rsb.getBlob(4).getBytes(1, (int) length));
- ft = new FieldTable(buffer, length);
- }
- foundQueue.bind(new AMQShortString(rsb.getString(3)), ft, foundExchange);
- }
- }
- rsb.close();
- // register this exchange
- _virtualHost.getExchangeRegistry().registerExchange(foundExchange);
- }
- rs.close();
- }
- catch (Exception e)
- {
- throw new InternalErrorException("Cannot recover: ", e);
- }
- finally
- {
- if (connection != null)
- {
- try
- {
- connection.getConnection().commit();
- _connectionPool.releaseInstance(connection);
- }
- catch (SQLException e)
- {
- // we did not manage to commit this connection
- // it is better to release it
- _connectionPool.releaseDeadInstance();
- throw new InternalErrorException("Cannot recover: ", e);
- }
- }
- }
- }
-
- private void destroy(MyConnection connection, StorableMessage m)
- throws
- SQLException
- {
- PreparedStatement pstmt = connection.getStatements()[DELETE_MESSAGE];
- if (pstmt == null)
- {
- pstmt = connection.getConnection().prepareStatement("DELETE FROM " + _tableNameMessage +
- " WHERE MessageID = ?");
- connection.getStatements()[DELETE_MESSAGE] = pstmt;
- }
- pstmt.setLong(1, m.getMessageId());
- pstmt.executeUpdate();
- }
-
- private JDBCTransaction getTx(Xid xid)
- throws
- UnknownXidException
- {
- JDBCTransaction tx = null;
- if (xid != null)
- {
- tx = _tm.getTransaction(xid);
- }
- return tx;
- }
-
- /**
- * setupConnections - Initialize the connections
- *
- * @return true if ok
- */
- private synchronized boolean setupConnections()
- {
- try
- {
- if (_connectionPool == null)
- {
- // In an embedded environment, loading the driver also starts Derby.
- Class.forName(_driver).newInstance();
- _connectionPool = new ConnectionPool(_maxSize);
- }
- }
- catch (Exception e)
- {
- _log.warn("Setup connections trouble", e);
- return false;
- }
- return true;
- }
-
- /**
- * Try to create the connection and table.
- * If this fails, then we will exit.
- */
- protected synchronized boolean setupStore(boolean dropTables)
- {
- if (!setupConnections())
- {
- return false;
- }
- MyConnection myconnection = null;
- try
- {
- myconnection = (MyConnection) _connectionPool.acquireInstance();
- Statement stmt = myconnection._connection.createStatement();
- /*
- * TODO Need some management interface to delete the table!
- */
- if (dropTables)
- {
- try
- {
- stmt.executeUpdate("DROP TABLE " + _tableNameMessage);
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- // don't want to print error - chances are it
- // just reports that the table does not exist
- // ex.printStackTrace();
- }
- try
- {
- stmt.executeUpdate("DROP TABLE " + _tableNameQueue);
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- // ex.printStackTrace();
- }
- try
- {
- stmt.executeUpdate("DROP TABLE " + _tableNameQueueMessageRelation);
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- // ex.printStackTrace();
- }
- try
- {
- stmt.executeUpdate("DROP TABLE " + _tableNameExchange);
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- // ex.printStackTrace();
- }
- try
- {
- stmt.executeUpdate("DROP TABLE " + _tableNameExchangeQueueRelation);
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- // ex.printStackTrace();
- }
- try
- {
- stmt.executeUpdate("DROP TABLE " + _tableNameRecord);
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- // ex.printStackTrace();
- }
- try
- {
- stmt.executeUpdate("DROP TABLE " + _tableNameTransaction);
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- // ex.printStackTrace();
- }
- }
- // create the table for messages
- try
- {
- stmt.executeUpdate("CREATE TABLE " + _tableNameMessage + " (MessageID FLOAT NOT NULL, Header BLOB," +
- " Payload BLOB, ExchangeName VARCHAR(1024), RoutingKey VARCHAR(1024)," +
- " Mandatory INTEGER, Is_Immediate INTEGER, PRIMARY KEY(MessageID))");
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- // ex.printStackTrace();
- // assume this is reporting that the table already exists:
- }
- // create the table for queues
- try
- {
- stmt.executeUpdate("CREATE TABLE " + _tableNameQueue + " (QueueID INTEGER NOT NULL, " +
- "Name VARCHAR(1024) NOT NULL, Owner VARCHAR(1024), PRIMARY KEY(QueueID))");
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- //ex.printStackTrace();
- // assume this is reporting that the table already exists:
- }
- // create the table for queue to message mapping
- try
- {
- stmt.executeUpdate("CREATE TABLE " + _tableNameQueueMessageRelation + " (QueueID INTEGER NOT NULL, " +
- "MessageID FLOAT NOT NULL, Prepared INTEGER)");
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- //ex.printStackTrace();
- // assume this is reporting that the table already exists:
- }
- try
- {
- stmt.executeUpdate("CREATE TABLE " + _tableNameExchange + " (Name VARCHAR(1024) NOT NULL, " +
- "Type VARCHAR(1024) NOT NULL, PRIMARY KEY(Name))");
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- //ex.printStackTrace();
- // assume this is reporting that the table already exists:
- }
- try
- {
- stmt.executeUpdate("CREATE TABLE " + _tableNameExchangeQueueRelation + " (QueueID INTEGER NOT NULL, " +
- "Name VARCHAR(1024) NOT NULL, RoutingKey VARCHAR(1024), FieldTable BLOB )");
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- //ex.printStackTrace();
- // assume this is reporting that the table already exists:
- }
- try
- {
- stmt.executeUpdate("CREATE TABLE " + _tableNameRecord + " (XID_ID FLOAT, Type INTEGER, MessageID FLOAT, " +
- "QueueID INTEGER, PRIMARY KEY(Type, MessageID, QueueID))");
- // we could alter the table with QueueID as foreign key
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- //ex.printStackTrace();
- // assume this is reporting that the table already exists:
- }
- try
- {
- stmt.executeUpdate("CREATE TABLE " + _tableNameTransaction + " (XID_ID FLOAT, FormatId INTEGER, " +
- "BranchQualifier BLOB, GlobalTransactionId BLOB, PRIMARY KEY(XID_ID))");
- myconnection._connection.commit();
- }
- catch (SQLException ex)
- {
- // ex.printStackTrace();
- // assume this is reporting that the table already exists:
- }
- }
- catch (Throwable e)
- {
- _log.warn("Setup Store trouble: ", e);
- return false;
- }
- finally
- {
- if (myconnection != null)
- {
- _connectionPool.releaseInstance(myconnection);
- }
- }
- return true;
- }
- //========================================================================================
- //============== the connection pool =====================================================
- //========================================================================================
-
- private class ConnectionPool extends Pool
- {
-
- /**
- * Create a pool of specified size. Negative or null pool sizes are
- * disallowed.
- *
- * @param poolSize The size of the pool to create. Should be 1 or
- * greater.
- * @throws Exception If the pool size is less than 1.
- */
- public ConnectionPool(int poolSize)
- throws
- Exception
- {
- super(poolSize);
- }
-
- /**
- * @return An instance of the pooled object.
- * @throws Exception In case of internal error.
- */
- protected MyConnection createInstance()
- throws
- Exception
- {
- try
- {
- // standard way to obtain a Connection object is to call the method DriverManager.getConnection,
- // which takes a String containing a connection URL (uniform resource locator).
- Connection conn = DriverManager.getConnection(_connectionURL);
- //conn.setAutoCommit(true);
- PreparedStatement[] st = new PreparedStatement[STATEMENT_SIZE];
- for (int j = 0; j < STATEMENT_SIZE; j++)
- {
- st[j] = null;
- }
- return new MyConnection(conn, st);
- }
- catch (SQLException e)
- {
- throw new Exception("sqlException when creating connection to " + _connectionURL, e);
- }
- }
- }
-
- public class MyConnection
- {
- // the connection
- private Connection _connection = null;
- // its associated prepared statements
- private PreparedStatement[] _preparedStatements = null;
-
- MyConnection(Connection con, PreparedStatement[] st)
- {
- _connection = con;
- _preparedStatements = st;
- }
-
- public Connection getConnection()
- {
- return _connection;
- }
-
- public PreparedStatement[] getStatements()
- {
- return _preparedStatements;
- }
-
- }
-}