blob: 620cf14c3307ff65ee4203877b79593e4ec89166 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
package org.apache.qpid.nclient.impl;
import java.nio.ByteBuffer;
import org.apache.qpid.ErrorCode;
import org.apache.qpid.nclient.MessagePartListener;
import org.apache.qpid.QpidException;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageReject;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDetached;
import org.apache.qpid.transport.SessionDelegate;
public class ClientSessionDelegate extends SessionDelegate
{
// --------------------------------------------
// Message methods
// --------------------------------------------
@Override public void messageTransfer(Session session, MessageTransfer xfr)
{
MessagePartListener listener = ((ClientSession)session).getMessageListeners()
.get(xfr.getDestination());
listener.messageTransfer(xfr);
}
@Override public void messageReject(Session session, MessageReject struct)
{
for (Range range : struct.getTransfers())
{
for (long l = range.getLower(); l <= range.getUpper(); l++)
{
System.out.println("message rejected: " +
session.getCommand((int) l));
}
}
((ClientSession)session).setRejectedMessages(struct.getTransfers());
((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null));
session.processed(struct);
}
}
|