summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java964
1 files changed, 0 insertions, 964 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java
deleted file mode 100644
index 625836bcf2..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java
+++ /dev/null
@@ -1,964 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import java.security.PrivilegedAction;
-
-import javax.security.auth.Subject;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
-
-public class ServerMethodProcessor implements MethodProcessor
-{
- private static final Logger LOGGER = Logger.getLogger(ServerMethodProcessor.class);
- private int _classId;
- private int _methodId;
-
-
- private static interface ChannelAction
- {
- void onChannel(ChannelMethodProcessor channel);
- }
-
- private ProtocolVersion _protocolVersion;
- private ServerMethodDispatcherImpl _dispatcher;
- private AMQProtocolEngine _connection;
-
- public ServerMethodProcessor(final ProtocolVersion protocolVersion)
- {
- _protocolVersion = protocolVersion;
- }
-
-
- private void processChannelMethod(int channelId, final ChannelAction action)
- {
- final AMQChannel channel = _connection.getChannel(channelId);
- if (channel == null)
- {
- // TODO throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
- }
- else
- {
- Subject.doAs(channel.getSubject(), new PrivilegedAction<Void>()
- {
- @Override
- public Void run()
- {
- action.onChannel(channel);
- return null;
- }
- });
- }
-
- }
-
- @Override
- public void receiveConnectionStart(final short versionMajor,
- final short versionMinor,
- final FieldTable serverProperties,
- final byte[] mechanisms,
- final byte[] locales)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0,
- new ConnectionStartBody(versionMajor,
- versionMinor,
- serverProperties,
- mechanisms,
- locales));
- }
- _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Unexpected method received: ConnectionStart", 0
- );
-
- }
-
- @Override
- public void receiveConnectionStartOk(final FieldTable clientProperties,
- final AMQShortString mechanism,
- final byte[] response,
- final AMQShortString locale)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale));
- }
-
- Broker<?> broker = _connection.getBroker();
-
- SubjectCreator subjectCreator = _connection.getSubjectCreator();
- SaslServer ss = null;
- try
- {
- ss = subjectCreator.createSaslServer(String.valueOf(mechanism),
- _connection.getLocalFQDN(),
- _connection.getPeerPrincipal());
-
- if (ss == null)
- {
- _connection.closeConnection(AMQConstant.RESOURCE_ERROR,
- "Unable to create SASL Server:" + mechanism, 0
- );
- }
- else
- {
- _connection.setSaslServer(ss);
-
- final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response);
- //save clientProperties
- _connection.setClientProperties(clientProperties);
-
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
-
- switch (authResult.getStatus())
- {
- case ERROR:
- Exception cause = authResult.getCause();
-
- LOGGER.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
-
- _connection.closeConnection(AMQConstant.NOT_ALLOWED,
- AMQConstant.NOT_ALLOWED.getName().toString(), 0
- );
-
- disposeSaslServer();
- break;
-
- case SUCCESS:
- if (LOGGER.isInfoEnabled())
- {
- LOGGER.info("Connected as: " + authResult.getSubject());
- }
- _connection.setAuthorizedSubject(authResult.getSubject());
-
- int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
-
- if (frameMax <= 0)
- {
- frameMax = Integer.MAX_VALUE;
- }
-
- ConnectionTuneBody
- tuneBody =
- methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
- frameMax,
- broker.getConnection_heartBeatDelay());
- _connection.writeFrame(tuneBody.generateFrame(0));
- break;
- case CONTINUE:
- ConnectionSecureBody
- secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
- _connection.writeFrame(secureBody.generateFrame(0));
- }
- }
- }
- catch (SaslException e)
- {
- disposeSaslServer();
-
- _connection.closeConnection(AMQConstant.RESOURCE_ERROR, "SASL error: " + e.getMessage(), 0
- );
- }
-
- }
-
- @Override
- public void receiveTxSelect(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, TxSelectBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveTxSelectOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, TxSelectOkBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveTxCommit(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, TxCommitBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveTxCommitOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, TxCommitOkBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveTxRollback(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, TxRollbackBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveTxRollbackOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, TxRollbackOkBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveConnectionSecure(final byte[] challenge)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionSecureBody(challenge));
- }
-
- }
-
- @Override
- public void receiveConnectionSecureOk(final byte[] response)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionSecureOkBody(response));
- }
-
- }
-
- @Override
- public void receiveConnectionTune(final int channelMax, final long frameMax, final int heartbeat)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat));
- }
-
- }
-
- @Override
- public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat));
- }
-
- }
-
- @Override
- public void receiveConnectionOpen(final AMQShortString virtualHost,
- final AMQShortString capabilities,
- final boolean insist)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist));
- }
-
- }
-
- @Override
- public void receiveConnectionOpenOk(final AMQShortString knownHosts)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionOpenOkBody(knownHosts));
- }
-
- }
-
- @Override
- public void receiveConnectionRedirect(final AMQShortString host, final AMQShortString knownHosts)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts));
- }
-
- }
-
- @Override
- public void receiveConnectionClose(final int replyCode,
- final AMQShortString replyText,
- final int classId,
- final int methodId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0,
- new ConnectionCloseBody(getProtocolVersion(),
- replyCode,
- replyText,
- classId,
- methodId));
- }
-
- }
-
- @Override
- public void receiveConnectionCloseOk()
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion())
- ? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8
- : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9);
- }
- }
-
- @Override
- public void receiveChannelOpen(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ChannelOpenBody());
- }
-
- }
-
- @Override
- public void receiveChannelOpenOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion())
- ? ChannelOpenOkBody.INSTANCE_0_8
- : ChannelOpenOkBody.INSTANCE_0_9);
- }
- }
-
- @Override
- public void receiveChannelFlow(final int channelId, final boolean active)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ChannelFlowBody(active));
- }
-
- }
-
- @Override
- public void receiveChannelFlowOk(final int channelId, final boolean active)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ChannelFlowOkBody(active));
- }
-
- }
-
- @Override
- public void receiveChannelAlert(final int channelId,
- final int replyCode,
- final AMQShortString replyText,
- final FieldTable details)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details));
- }
-
- }
-
- @Override
- public void receiveChannelClose(final int channelId,
- final int replyCode,
- final AMQShortString replyText,
- final int classId,
- final int methodId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId));
- }
-
- }
-
- @Override
- public void receiveChannelCloseOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE);
- }
-
- }
-
- @Override
- public void receiveAccessRequest(final int channelId,
- final AMQShortString realm,
- final boolean exclusive,
- final boolean passive,
- final boolean active,
- final boolean write,
- final boolean read)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame =
- new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read));
- }
-
- }
-
- @Override
- public void receiveAccessRequestOk(final int channelId, final int ticket)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new AccessRequestOkBody(ticket));
- }
-
- }
-
- @Override
- public void receiveExchangeDeclare(final int channelId,
- final AMQShortString exchange,
- final AMQShortString type,
- final boolean passive,
- final boolean durable,
- final boolean autoDelete,
- final boolean internal,
- final boolean nowait, final FieldTable arguments)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId,
- new ExchangeDeclareBody(0,
- exchange,
- type,
- passive,
- durable,
- autoDelete,
- internal,
- nowait,
- arguments));
- }
-
- }
-
- @Override
- public void receiveExchangeDeclareOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ExchangeDeclareOkBody());
- }
-
- }
-
- @Override
- public void receiveExchangeDelete(final int channelId,
- final AMQShortString exchange,
- final boolean ifUnused,
- final boolean nowait)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait));
- }
-
- }
-
- @Override
- public void receiveExchangeDeleteOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ExchangeDeleteOkBody());
- }
-
- }
-
- @Override
- public void receiveExchangeBound(final int channelId,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final AMQShortString queue)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue));
- }
-
- }
-
- @Override
- public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText));
- }
-
- }
-
- @Override
- public void receiveQueueBindOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueueBindOkBody());
- }
-
- }
-
- @Override
- public void receiveQueueUnbindOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueueUnbindOkBody());
- }
-
- }
-
- @Override
- public void receiveQueueDeclare(final int channelId,
- final AMQShortString queue,
- final boolean passive,
- final boolean durable,
- final boolean exclusive,
- final boolean autoDelete,
- final boolean nowait,
- final FieldTable arguments)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId,
- new QueueDeclareBody(0,
- queue,
- passive,
- durable,
- exclusive,
- autoDelete,
- nowait,
- arguments));
- }
-
- }
-
- @Override
- public void receiveQueueDeclareOk(final int channelId,
- final AMQShortString queue,
- final long messageCount,
- final long consumerCount)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount));
- }
-
- }
-
- @Override
- public void receiveQueueBind(final int channelId,
- final AMQShortString queue,
- final AMQShortString exchange,
- final AMQShortString bindingKey,
- final boolean nowait,
- final FieldTable arguments)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame =
- new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments));
- }
-
- }
-
- @Override
- public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait));
- }
-
- }
-
- @Override
- public void receiveQueuePurgeOk(final int channelId, final long messageCount)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueuePurgeOkBody(messageCount));
- }
-
- }
-
- @Override
- public void receiveQueueDelete(final int channelId,
- final AMQShortString queue,
- final boolean ifUnused,
- final boolean ifEmpty,
- final boolean nowait)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait));
- }
-
- }
-
- @Override
- public void receiveQueueDeleteOk(final int channelId, final long messageCount)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueueDeleteOkBody(messageCount));
- }
-
- }
-
- @Override
- public void receiveQueueUnbind(final int channelId,
- final AMQShortString queue,
- final AMQShortString exchange,
- final AMQShortString bindingKey,
- final FieldTable arguments)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments));
- }
-
- }
-
- @Override
- public void receiveBasicRecoverSyncOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion()));
- }
-
- }
-
- @Override
- public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync)
- {
- if (ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicRecoverBody(requeue));
- }
-
- }
- else
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue));
- }
-
- }
- }
-
- @Override
- public void receiveBasicQos(final int channelId,
- final long prefetchSize,
- final int prefetchCount,
- final boolean global)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global));
- }
-
- }
-
- @Override
- public void receiveBasicQosOk(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicQosOkBody());
- }
-
- }
-
- @Override
- public void receiveBasicConsume(final int channelId,
- final AMQShortString queue,
- final AMQShortString consumerTag,
- final boolean noLocal,
- final boolean noAck,
- final boolean exclusive,
- final boolean nowait,
- final FieldTable arguments)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId,
- new BasicConsumeBody(0,
- queue,
- consumerTag,
- noLocal,
- noAck,
- exclusive,
- nowait,
- arguments));
- }
-
- }
-
- @Override
- public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag));
- }
-
- }
-
- @Override
- public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait));
- }
-
- }
-
- @Override
- public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicCancelOkBody(consumerTag));
- }
-
- }
-
- @Override
- public void receiveBasicPublish(final int channelId,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final boolean mandatory,
- final boolean immediate)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame =
- new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate));
- }
-
- }
-
- @Override
- public void receiveBasicReturn(final int channelId, final int replyCode,
- final AMQShortString replyText,
- final AMQShortString exchange,
- final AMQShortString routingKey)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey));
- }
-
- }
-
- @Override
- public void receiveBasicDeliver(final int channelId,
- final AMQShortString consumerTag,
- final long deliveryTag,
- final boolean redelivered,
- final AMQShortString exchange,
- final AMQShortString routingKey)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId,
- new BasicDeliverBody(consumerTag,
- deliveryTag,
- redelivered,
- exchange,
- routingKey));
- }
-
- }
-
- @Override
- public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicGetBody(0, queue, noAck));
- }
-
- }
-
- @Override
- public void receiveBasicGetOk(final int channelId,
- final long deliveryTag,
- final boolean redelivered,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final long messageCount)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId,
- new BasicGetOkBody(deliveryTag,
- redelivered,
- exchange,
- routingKey,
- messageCount));
- }
-
- }
-
- @Override
- public void receiveBasicGetEmpty(final int channelId)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString) null));
- }
-
- }
-
- @Override
- public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple));
- }
-
- }
-
- @Override
- public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue));
- }
-
- }
-
- @Override
- public void receiveHeartbeat()
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(0, new HeartbeatBody());
- }
-
- }
-
- @Override
- public ProtocolVersion getProtocolVersion()
- {
- return _protocolVersion;
- }
-
- public void setProtocolVersion(final ProtocolVersion protocolVersion)
- {
- _protocolVersion = protocolVersion;
- }
-
- @Override
- public void receiveMessageContent(final int channelId, final byte[] data)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ContentBody(data));
- }
-
- }
-
- @Override
- public void receiveMessageHeader(final int channelId,
- final BasicContentHeaderProperties properties,
- final long bodySize)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQFrame frame = new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize));
- }
-
- }
-
- @Override
- public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
- {
- if (LOGGER.isDebugEnabled())
- {
- AMQDataBlock frame = protocolInitiation;
- }
-
- }
-
- @Override
- public void setCurrentMethod(final int classId, final int methodId)
- {
- _classId = classId;
- _methodId = methodId;
- }
-
- private void disposeSaslServer()
- {
- SaslServer ss = _connection.getSaslServer();
- if (ss != null)
- {
- _connection.setSaslServer(null);
- try
- {
- ss.dispose();
- }
- catch (SaslException e)
- {
- LOGGER.error("Error disposing of Sasl server: " + e);
- }
- }
- }
-}