summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-03-19 15:14:02 +0000
committerTed Ross <tross@apache.org>2010-03-19 15:14:02 +0000
commit3d4e75c60e892d2a0adf59b1f6d944eef58b343d (patch)
tree0f049a6caf74abbdebe7ff290ee6e09bc3f433bf
parent0b7c046413aacb145573725be13b15dfc4dd9727 (diff)
downloadqpid-python-3d4e75c60e892d2a0adf59b1f6d944eef58b343d.tar.gz
Added functioning agent discovery, heartbeat, and age-out to C++ agent and Python console.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@925260 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp103
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h16
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py185
3 files changed, 236 insertions, 68 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 547d71e165..b21e7105a3 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -242,7 +242,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
headers["qmf.agent"] = name_address;
content.encode();
- connThreadBody.sendBuffer(msg.getContent(), 0,
+ connThreadBody.sendBuffer(msg.getContent(), "",
headers,
"qmf.default.topic", key.str());
}
@@ -264,7 +264,7 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
methodQueue.pop_front();
{
Mutex::ScopedUnlock unlock(agentLock);
- invokeMethodRequest(item->body, item->sequence, item->replyTo);
+ invokeMethodRequest(item->body, item->cid, item->replyTo);
delete item;
}
}
@@ -353,8 +353,10 @@ void ManagementAgentImpl::sendHeartbeat()
headers["qmf.agent"] = name_address;
map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
content.encode();
- connThreadBody.sendBuffer(msg.getContent(), 0, headers, addr_exchange, addr_key);
+ connThreadBody.sendBuffer(msg.getContent(), "", headers, addr_exchange, addr_key);
QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
}
@@ -470,7 +472,7 @@ void ManagementAgentImpl::handleConsoleAddedIndication()
QPID_LOG(trace, "RCVD ConsoleAddedInd");
}
-void ManagementAgentImpl::invokeMethodRequest(const std::string& body, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo)
{
string methodName;
qpid::messaging::Message inMsg(body);
@@ -521,10 +523,10 @@ void ManagementAgentImpl::invokeMethodRequest(const std::string& body, uint32_t
headers["qmf.agent"] = name_address;
outMap.encode();
- connThreadBody.sendBuffer(outMsg.getContent(), sequence, headers, "qmf.default.direct", replyTo);
+ connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
}
-void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, const string& cid, const string& replyTo)
{
FieldTable ft;
FieldTable::ValuePtr value;
@@ -565,11 +567,11 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
headers["qmf.agent"] = name_address;
content.encode();
- connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qmf.default.direct", replyTo);
+ connThreadBody.sendBuffer(m.getContent(), cid, headers, "qmf.default.direct", replyTo);
QPID_LOG(trace, "SENT ObjectInd");
}
- sendCommandComplete(replyTo, sequence);
+ //sendCommandComplete(replyTo, sequence);
return;
}
@@ -600,21 +602,44 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
headers["qmf.agent"] = name_address;
content.encode();
- connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qmf.default.direct", replyTo);
+ connThreadBody.sendBuffer(m.getContent(), cid, headers, "qmf.default.direct", replyTo);
QPID_LOG(trace, "SENT ObjectInd");
}
}
- sendCommandComplete(replyTo, sequence);
+ //sendCommandComplete(replyTo, sequence);
}
-void ManagementAgentImpl::handleMethodRequest(const std::string& body, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
+{
+ QPID_LOG(trace, "RCVD AgentLocateRequest");
+ static const string addr_exchange("qmf.default.direct");
+
+ messaging::Message msg;
+ messaging::MapContent content(msg);
+ messaging::Variant::Map& map(content.asMap());
+ messaging::Variant::Map headers;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_locate_response";
+ headers["qmf.agent"] = name_address;
+
+ map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
+ content.encode();
+ connThreadBody.sendBuffer(msg.getContent(), cid, headers, addr_exchange, replyTo);
+
+ QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+}
+
+void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo)
{
if (extThread) {
Mutex::ScopedLock lock(agentLock);
- methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
+ methodQueue.push_back(new QueuedMethod(cid, replyTo, body));
if (pipeHandle != 0) {
pipeHandle->write("X", 1);
} else if (notifyable != 0) {
@@ -633,7 +658,7 @@ void ManagementAgentImpl::handleMethodRequest(const std::string& body, uint32_t
inCallback = false;
}
} else {
- invokeMethodRequest(body, sequence, replyTo);
+ invokeMethodRequest(body, cid, replyTo);
}
QPID_LOG(trace, "RCVD MethodRequest");
@@ -642,32 +667,22 @@ void ManagementAgentImpl::handleMethodRequest(const std::string& body, uint32_t
void ManagementAgentImpl::received(Message& msg)
{
string replyToKey;
- framing::MessageProperties p = msg.getMessageProperties();
- if (p.hasReplyTo()) {
- const framing::ReplyTo& rt = p.getReplyTo();
+ framing::MessageProperties mp = msg.getMessageProperties();
+ if (mp.hasReplyTo()) {
+ const framing::ReplyTo& rt = mp.getReplyTo();
replyToKey = rt.getRoutingKey();
}
- if (msg.getHeaders().getAsString("app_id") == "qmf2")
+ if (mp.hasAppId() && mp.getAppId() == "qmf2")
{
- uint32_t sequence = 0;
- std::string opcode = msg.getHeaders().getAsString("qmf.opcode");
- std::string cid = msg.getMessageProperties().getCorrelationId();
- if (!cid.empty()) {
- try {
- sequence = boost::lexical_cast<uint32_t>(cid);
- } catch(const boost::bad_lexical_cast&) {
- QPID_LOG(warning, "Bad correlation Id for received QMF request.");
- return;
- }
- }
+ string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode");
+ string cid = msg.getMessageProperties().getCorrelationId();
- if (opcode == "_method_request") {
- handleMethodRequest(msg.getData(), sequence, replyToKey);
- return;
+ if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey);
+ else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey);
+ else {
+ QPID_LOG(trace, "Support for QMF Opcode [" << opcode << "] TBD!!!");
}
-
- QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
return;
}
@@ -684,7 +699,6 @@ void ManagementAgentImpl::received(Message& msg)
if (opcode == 'a') handleAttachResponse(inBuffer);
else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
else if (opcode == 'x') handleConsoleAddedIndication();
- else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey);
else if (opcode == 'M')
QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!");
}
@@ -886,17 +900,14 @@ void ManagementAgentImpl::periodicProcessing()
content.encode();
const std::string &str = m.getContent();
if (str.length()) {
- stringstream key;
::qpid::messaging::Variant::Map headers;
- key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." <<
- baseObject->getPackageName() << "." << baseObject->getClassName();
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
- connThreadBody.sendBuffer(str, 0, headers, "qmf.default.topic", key.str(), "amqp/list");
- QPID_LOG(trace, "SENT DataIndication key=" << key.str());
+ connThreadBody.sendBuffer(str, "", headers, "qmf.default.topic", "agent.ind.data", "amqp/list");
+ QPID_LOG(trace, "SENT DataIndication");
}
}
@@ -936,6 +947,10 @@ void ManagementAgentImpl::ConnectionThread::run()
arg::exclusive=true);
session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
arg::bindingKey=queueName.str());
+ session.exchangeBind(arg::exchange="qmf.default.direct", arg::queue=queueName.str(),
+ arg::bindingKey=agent.name_address);
+ session.exchangeBind(arg::exchange="qmf.default.topic", arg::queue=queueName.str(),
+ arg::bindingKey="console.#");
subscriptions->subscribe(agent, queueName.str(), dest);
QPID_LOG(info, "Connection established with broker");
@@ -1009,7 +1024,7 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
- uint32_t sequence,
+ const string& cid,
const qpid::messaging::VariantMap headers,
const string& exchange,
const string& routingKey,
@@ -1018,11 +1033,9 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
Message msg;
qpid::messaging::VariantMap::const_iterator i;
- if (sequence) {
- std::stringstream seqstr;
- seqstr << sequence;
- msg.getMessageProperties().setCorrelationId(seqstr.str());
- }
+ if (!cid.empty())
+ msg.getMessageProperties().setCorrelationId(cid);
+
if (!contentType.empty())
msg.getMessageProperties().setContentType(contentType);
for (i = headers.begin(); i != headers.end(); ++i) {
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index f753395415..0c2d06a6c8 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -123,10 +123,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
};
struct QueuedMethod {
- QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) :
- sequence(_seq), replyTo(_reply), body(_body) {}
+ QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body) :
+ cid(_cid), replyTo(_reply), body(_body) {}
- uint32_t sequence;
+ std::string cid;
std::string replyTo;
std::string body;
};
@@ -205,7 +205,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
const std::string& exchange,
const std::string& routingKey);
void sendBuffer(const std::string& data,
- const uint32_t sequence,
+ const std::string& cid,
const qpid::messaging::VariantMap headers,
const std::string& exchange,
const std::string& routingKey,
@@ -263,9 +263,11 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void handlePackageRequest (qpid::framing::Buffer& inBuffer);
void handleClassQuery (qpid::framing::Buffer& inBuffer);
void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
- void invokeMethodRequest (const std::string& body, uint32_t sequence, std::string replyTo);
- void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
- void handleMethodRequest (const std::string& body, uint32_t sequence, std::string replyTo);
+ void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo);
+
+ void handleGetQuery (qpid::framing::Buffer& inBuffer, const std::string& cid, const std::string& replyTo);
+ void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
+ void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
void handleConsoleAddedIndication();
};
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index b663116f40..226d586f8b 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -352,7 +352,6 @@ class Object(object):
raise Exception("Invalid Method (software defect) [%s]" % name)
def _encodeUnmanaged(self, codec):
-
codec.write_uint8(20)
codec.write_str8(self._schema.getKey().getPackageName())
codec.write_str8(self._schema.getKey().getClassName())
@@ -482,7 +481,7 @@ class Session:
self.brokers.append(broker)
if not self.manageConnections:
- self.getObjects(broker=broker, _class="agent")
+ self.getObjects(broker=broker, _class="agent", _agent=broker.getAgent(1,0))
return broker
def delBroker(self, broker):
@@ -873,6 +872,7 @@ class Session:
timestamp = codec.read_uint64()
if self.console != None and agent != None:
self.console.heartbeat(agent, timestamp)
+ broker._ageAgents()
def _handleEventInd(self, broker, codec, seq):
if self.console != None:
@@ -927,6 +927,39 @@ class Session:
if stat:
self.console.objectStats(broker, object)
+ def _v2HandleHeartbeatInd(self, broker, mp, ah, content):
+ brokerBank = 1
+ agentName = ah["qmf.agent"]
+ values = content["_values"]
+ timestamp = values["timestamp"]
+ interval = values["heartbeat_interval"]
+ if agentName == None:
+ return
+ agent = broker.getAgent(brokerBank, agentName)
+ if agent == None:
+ agent = Agent(broker, agentName, "QMFv2 Agent", True, interval)
+ broker._addAgent(agentName, agent)
+ else:
+ agent.touch()
+ if self.console and agent:
+ self.console.heartbeat(agent, timestamp)
+ broker._ageAgents()
+
+ def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
+ self._v2HandleHeartbeatInd(broker, mp, ah, content)
+
+ def _v2HandleDataInd(self, broker, mp, ah, content):
+ pass
+
+ def _v2HandleQueryRsp(self, broker, mp, ah, content):
+ pass
+
+ def _v2HandleMethodRsp(self, broker, mp, ah, content):
+ pass
+
+ def _v2HandleException(self, broker, mp, ah, content):
+ pass
+
def _handleError(self, error):
try:
self.cv.acquire()
@@ -1557,6 +1590,7 @@ class Broker:
self.authUser = authUser
self.authPass = authPass
self.cv = Condition()
+ self.agentLock = Lock()
self.error = None
self.brokerId = None
self.connected = False
@@ -1590,8 +1624,12 @@ class Broker:
def getAgent(self, brokerBank, agentBank):
""" Return the agent object associated with a particular broker and agent bank value."""
bankKey = (brokerBank, agentBank)
- if bankKey in self.agents:
- return self.agents[bankKey]
+ try:
+ self.agentLock.acquire()
+ if bankKey in self.agents:
+ return self.agents[bankKey]
+ finally:
+ self.agentLock.release()
return None
def getSessionId(self):
@@ -1600,7 +1638,11 @@ class Broker:
def getAgents(self):
""" Get the list of agents reachable via this broker """
- return self.agents.values()
+ try:
+ self.agentLock.acquire()
+ return self.agents.values()
+ finally:
+ self.agentLock.release()
def getAmqpSession(self):
""" Get the AMQP session object for this connected broker. """
@@ -1629,8 +1671,13 @@ class Broker:
def _tryToConnect(self):
try:
- self.agents = {}
- self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
+ try:
+ self.agentLock.acquire()
+ self.agents = {}
+ self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
+ finally:
+ self.agentLock.release()
+
self.topicBound = False
self.syncInFlight = False
self.syncRequest = 0
@@ -1679,6 +1726,24 @@ class Broker:
self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL)
self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL)
+ ##
+ ## Set up connectivity for QMFv2
+ ##
+ self.v2_queue_name = "qmfc-v2-%s" % self.amqpSessionId
+ self.amqpSession.queue_declare(queue=self.v2_queue_name, exclusive=True, auto_delete=True)
+ self.amqpSession.exchange_bind(exchange="qmf.default.direct",
+ queue=self.v2_queue_name, binding_key=self.v2_queue_name)
+ self.amqpSession.exchange_bind(exchange="qmf.default.topic",
+ queue=self.v2_queue_name, binding_key="agent.#")
+ ## Other bindings here...
+ self.amqpSession.message_subscribe(queue=self.v2_queue_name, destination="v2dest",
+ accept_mode=self.amqpSession.accept_mode.none,
+ acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
+ self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb)
+ self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=1)
+ self.amqpSession.message_flow(destination="v2dest", unit=0, value=0xFFFFFFFFL)
+ self.amqpSession.message_flow(destination="v2dest", unit=1, value=0xFFFFFFFFL)
+
self.connected = True
self.session._handleBrokerConnect(self)
@@ -1686,6 +1751,7 @@ class Broker:
self._setHeader(codec, 'B')
msg = self._message(codec.encoded)
self._send(msg)
+ self._v2SendAgentLocate()
except socket.error, e:
self.error = "Socket Error %s - %s" % (e.__class__.__name__, e)
@@ -1699,17 +1765,65 @@ class Broker:
def _updateAgent(self, obj):
bankKey = (obj.brokerBank, obj.agentBank)
+ agent = None
if obj._deleteTime == 0:
- if bankKey not in self.agents:
- agent = Agent(self, obj.agentBank, obj.label)
- self.agents[bankKey] = agent
- if self.session.console != None:
- self.session.console.newAgent(agent)
+ try:
+ self.agentLock.acquire()
+ if bankKey not in self.agents:
+ agent = Agent(self, obj.agentBank, obj.label)
+ self.agents[bankKey] = agent
+ finally:
+ self.agentLock.release()
+ if agent and self.session.console:
+ self.session.console.newAgent(agent)
else:
- agent = self.agents.pop(bankKey, None)
- if agent != None and self.session.console != None:
+ try:
+ self.agentLock.acquire()
+ agent = self.agents.pop(bankKey, None)
+ finally:
+ self.agentLock.release()
+ if agent and self.session.console:
self.session.console.delAgent(agent)
+ def _addAgent(self, name, agent):
+ try:
+ self.agentLock.acquire()
+ self.agents[(1, name)] = agent
+ finally:
+ self.agentLock.release()
+ if self.session.console:
+ self.session.console.newAgent(agent)
+
+ def _ageAgents(self):
+ try:
+ self.agentLock.acquire()
+ to_delete = []
+ to_notify = []
+ for key in self.agents:
+ if self.agents[key].isOld():
+ to_delete.append(key)
+ for key in to_delete:
+ to_notify.append(self.agents.pop(key, None))
+ finally:
+ self.agentLock.release()
+ if self.session.console:
+ for agent in to_notify:
+ self.session.console.delAgent(agent)
+
+ def _v2SendAgentLocate(self, predicate={}):
+ dp = self.amqpSession.delivery_properties()
+ dp.routing_key = "console.request.agent_locate"
+ mp = self.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = self.authUser
+ mp.app_id = "qmf2"
+ mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_queue_name)
+ mp.application_headers = {'qmf.opcode':'_agent_locate_request'}
+ sendCodec = Codec()
+ sendCodec.write_map(predicate)
+ msg = Message(dp, mp, sendCodec.encoded)
+ self._send(msg, "qmf.default.topic")
+
def _setHeader(self, codec, opcode, seq=0):
""" Compose the header of a management message. """
codec.write_uint8(ord('A'))
@@ -1819,6 +1933,28 @@ class Broker:
self.session.receiver._completed.add(msg.id)
self.session.channel.session_completed(self.session.receiver._completed)
+ def _v2Cb(self, msg):
+ dp = msg.get("delivery_properties")
+ mp = msg.get("message_properties")
+ ah = mp["application_headers"]
+ opcode = ah["qmf.opcode"]
+ codec = Codec(msg.body)
+
+ if mp.content_type == "amqp/list":
+ content = codec.read_list()
+ elif mp.content_type == "amqp/map":
+ content = codec.read_map()
+ else:
+ return
+
+ if opcode == None: return
+ elif opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content)
+ elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content)
+ elif opcode == '_data_indication': self.session._v2HandleDataInd(self, mp, ah, content)
+ elif opcode == '_query_response': self.session._v2HandleQueryRsp(self, mp, ah, content)
+ elif opcode == '_method_response': self.session._v2HandleMethodRsp(self, mp, ah, content)
+ elif opcode == '_exception': self.session._v2HandleException(self, mp, ah, content)
+
def _exceptionCb(self, data):
self.connected = False
self.error = data
@@ -1835,14 +1971,31 @@ class Broker:
class Agent:
""" """
- def __init__(self, broker, agentBank, label):
+ def __init__(self, broker, agentBank, label, isV2=False, interval=0):
self.broker = broker
self.brokerBank = broker.getBrokerBank()
self.agentBank = agentBank
self.label = label
+ self.isV2 = isV2
+ self.heartbeatInterval = interval
+ self.lastSeenTime = time()
+
+ def touch(self):
+ self.lastSeenTime = time()
+
+ def isOld(self):
+ if self.heartbeatInterval == 0:
+ return None
+ if time() - self.lastSeenTime > (2.0 * self.heartbeatInterval):
+ return True
+ return None
def __repr__(self):
- return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label)
+ if self.isV2:
+ ver = "v2"
+ else:
+ ver = "v1"
+ return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label)
def getBroker(self):
return self.broker