diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/nclient/Client.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/Client.java | 295 |
1 files changed, 0 insertions, 295 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Client.java b/java/client/src/main/java/org/apache/qpid/nclient/Client.java deleted file mode 100644 index af0e724e42..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/Client.java +++ /dev/null @@ -1,295 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.nclient; - -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.qpid.client.url.URLParser_0_10; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.url.QpidURL; -import org.apache.qpid.ErrorCode; -import org.apache.qpid.QpidException; -import org.apache.qpid.nclient.impl.ClientSession; -import org.apache.qpid.nclient.impl.ClientSessionDelegate; -import org.apache.qpid.transport.Channel; -import org.apache.qpid.transport.ClientDelegate; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionClose; -import org.apache.qpid.transport.ConnectionCloseCode; -import org.apache.qpid.transport.ConnectionCloseOk; -import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.ProtocolVersionException; -import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.network.io.IoTransport; -import org.apache.qpid.transport.network.mina.MinaHandler; -import org.apache.qpid.transport.network.nio.NioHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class Client implements org.apache.qpid.nclient.Connection -{ - private Connection _conn; - private ClosedListener _closedListner; - private final Lock _lock = new ReentrantLock(); - private static Logger _logger = LoggerFactory.getLogger(Client.class); - private Condition closeOk; - private boolean closed = false; - private long timeout = 60000; - - private ProtocolHeader header = null; - - /** - * - * @return returns a new connection to the broker. - */ - public static org.apache.qpid.nclient.Connection createConnection() - { - return new Client(); - } - - public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException - { - - final Condition negotiationComplete = _lock.newCondition(); - closeOk = _lock.newCondition(); - _lock.lock(); - - ClientDelegate connectionDelegate = new ClientDelegate() - { - private boolean receivedClose = false; - public SessionDelegate getSessionDelegate() - { - return new ClientSessionDelegate(); - } - - public void exception(Throwable t) - { - if (_closedListner != null) - { - _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),t); - } - else - { - throw new RuntimeException("connection closed",t); - } - } - - public void closed() - { - if (_closedListner != null && !this.receivedClose) - { - _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),null); - } - } - - @Override public void connectionCloseOk(Channel context, ConnectionCloseOk struct) - { - _lock.lock(); - try - { - closed = true; - this.receivedClose = true; - closeOk.signalAll(); - } - finally - { - _lock.unlock(); - } - } - - @Override public void connectionClose(Channel context, ConnectionClose connectionClose) - { - super.connectionClose(context, connectionClose); - ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode().getValue()); - if (_closedListner == null && errorCode != ErrorCode.NO_ERROR) - { - throw new RuntimeException - (new QpidException("Server closed the connection: Reason " + - connectionClose.getReplyText(), - errorCode, - null)); - } - else - { - _closedListner.onClosed(errorCode, connectionClose.getReplyText(),null); - } - - this.receivedClose = true; - } - @Override public void init(Channel ch, ProtocolHeader hdr) - { - // TODO: once the merge is done we'll need to update this code - // for handling 0.8 protocol version type i.e. major=8 and mino - if (hdr.getMajor() != 0 || hdr.getMinor() != 10) - { - Client.this.header = hdr; - _lock.lock(); - negotiationComplete.signalAll(); - _lock.unlock(); - } - } - }; - - connectionDelegate.setCondition(_lock,negotiationComplete); - connectionDelegate.setUsername(username); - connectionDelegate.setPassword(password); - connectionDelegate.setVirtualHost(virtualHost); - - String transport = System.getProperty("transport","io"); - if (transport.equalsIgnoreCase("nio")) - { - _logger.info("using NIO Transport"); - _conn = NioHandler.connect(host, port,connectionDelegate); - } - else if (transport.equalsIgnoreCase("io")) - { - _logger.info("using Plain IO Transport"); - _conn = IoTransport.connect(host, port,connectionDelegate); - } - else - { - _logger.info("using MINA Transport"); - _conn = MinaHandler.connect(host, port,connectionDelegate); - // _conn = NativeHandler.connect(host, port,connectionDelegate); - } - - // XXX: hardcoded version numbers - _conn.send(new ProtocolHeader(1, 0, 10)); - - try - { - negotiationComplete.await(timeout, TimeUnit.MILLISECONDS); - if (header != null) - { - _conn.close(); - throw new ProtocolVersionException(header.getMajor(), header.getMinor()); - } - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - finally - { - _lock.unlock(); - } - } - - public void connect(String url)throws QpidException - { - URLParser_0_10 parser = null; - try - { - parser = new URLParser_0_10(url); - } - catch(Exception e) - { - throw new QpidException("Error parsing the URL",ErrorCode.UNDEFINED,e); - } - List<BrokerDetails> brokers = parser.getAllBrokerDetails(); - BrokerDetails brokerDetail = brokers.get(0); - connect(brokerDetail.getHost(), brokerDetail.getPort(), brokerDetail.getProperty("virtualhost"), - brokerDetail.getProperty("username")== null? "guest":brokerDetail.getProperty("username"), - brokerDetail.getProperty("password")== null? "guest":brokerDetail.getProperty("password")); - } - - /* - * Until the dust settles with the URL disucssion - * I am not going to implement this. - */ - public void connect(QpidURL url) throws QpidException - { - throw new UnsupportedOperationException("Not implemented"); - } - - /* { - // temp impl to tests - BrokerDetails details = url.getAllBrokerDetails().get(0); - connect(details.getHost(), - details.getPort(), - details.getVirtualHost(), - details.getUserName(), - details.getPassword()); - } -*/ - - public void close() throws QpidException - { - Channel ch = _conn.getChannel(0); - ch.connectionClose(ConnectionCloseCode.NORMAL, "client is closing"); - _lock.lock(); - try - { - try - { - long start = System.currentTimeMillis(); - long elapsed = 0; - while (!closed && elapsed < timeout) - { - closeOk.await(timeout - elapsed, TimeUnit.MILLISECONDS); - elapsed = System.currentTimeMillis() - start; - } - if(!closed) - { - throw new QpidException("Timed out when closing connection", ErrorCode.CONNECTION_ERROR, null); - } - } - catch (InterruptedException e) - { - throw new QpidException("Interrupted when closing connection", ErrorCode.CONNECTION_ERROR, null); - } - } - finally - { - _lock.unlock(); - } - _conn.close(); - } - - public Session createSession(long expiryInSeconds) - { - Channel ch = _conn.getChannel(); - ClientSession ssn = new ClientSession(UUID.randomUUID().toString().getBytes()); - ssn.attach(ch); - ssn.sessionAttach(ssn.getName()); - ssn.sessionRequestTimeout(expiryInSeconds); - return ssn; - } - - public DtxSession createDTXSession(int expiryInSeconds) - { - ClientSession clientSession = (ClientSession) createSession(expiryInSeconds); - clientSession.dtxSelect(); - return (DtxSession) clientSession; - } - - public void setClosedListener(ClosedListener closedListner) - { - - _closedListner = closedListner; - } - -} |