summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-04 16:09:10 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-04 16:09:10 -0700
commitff10ccf3cef3f5602f717d6b062c78fe8e47a4d2 (patch)
treea242dff881bfb482cb8864321aa9e5580198ba80 /kafka
parent51fc3e428b7782d3533512c39264552a2ec87f0f (diff)
parente83443126a7513404f4f67c24cb490f85bb02c69 (diff)
downloadkafka-python-ff10ccf3cef3f5602f717d6b062c78fe8e47a4d2.tar.gz
Merge pull request #589 from dpkp/node_not_ready
Refactor NodeNotReadyError handling
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client_async.py45
-rw-r--r--kafka/conn.py50
-rw-r--r--kafka/consumer/fetcher.py3
-rw-r--r--kafka/coordinator/base.py22
4 files changed, 59 insertions, 61 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 5a1d624..d70e4f2 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -152,8 +152,8 @@ class KafkaClient(object):
conn = self._conns[node_id]
return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out()
- def _initiate_connect(self, node_id):
- """Initiate a connection to the given node (must be in metadata)"""
+ def _maybe_connect(self, node_id):
+ """Idempotent non-blocking connection attempt to the given node id."""
if node_id not in self._conns:
broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % node_id
@@ -164,22 +164,21 @@ class KafkaClient(object):
host, port, afi = get_ip_port_afi(broker.host)
self._conns[node_id] = BrokerConnection(host, broker.port, afi,
**self.config)
- return self._finish_connect(node_id)
-
- def _finish_connect(self, node_id):
- assert node_id in self._conns, '%s is not in current conns' % node_id
state = self._conns[node_id].connect()
if state is ConnectionStates.CONNECTING:
self._connecting.add(node_id)
+
+ # Whether CONNECTED or DISCONNECTED, we need to remove from connecting
elif node_id in self._connecting:
log.debug("Node %s connection state is %s", node_id, state)
self._connecting.remove(node_id)
+ # Connection failures imply that our metadata is stale, so let's refresh
if state is ConnectionStates.DISCONNECTED:
log.warning("Node %s connect failed -- refreshing metadata", node_id)
self.cluster.request_update()
- return state
+ return self._conns[node_id].connected()
def ready(self, node_id):
"""Check whether a node is connected and ok to send more requests.
@@ -190,19 +189,15 @@ class KafkaClient(object):
Returns:
bool: True if we are ready to send to the given node
"""
- if self.is_ready(node_id):
- return True
-
- if self._can_connect(node_id):
- # if we are interested in sending to a node
- # and we don't have a connection to it, initiate one
- self._initiate_connect(node_id)
-
- if node_id in self._connecting:
- self._finish_connect(node_id)
-
+ self._maybe_connect(node_id)
return self.is_ready(node_id)
+ def connected(self, node_id):
+ """Return True iff the node_id is connected."""
+ if node_id not in self._conns:
+ return False
+ return self._conns[node_id].connected()
+
def close(self, node_id=None):
"""Closes the connection to a particular node (if there is one).
@@ -295,15 +290,13 @@ class KafkaClient(object):
request (Struct): request object (not-encoded)
Raises:
- NodeNotReadyError: if node_id is not ready
+ AssertionError: if node_id is not in current cluster metadata
Returns:
- Future: resolves to Response struct
+ Future: resolves to Response struct or Error
"""
- if not self._can_send_request(node_id):
- raise Errors.NodeNotReadyError("Attempt to send a request to node"
- " which is not ready (node id %s)."
- % node_id)
+ if not self._maybe_connect(node_id):
+ return Future().failure(Errors.NodeNotReadyError(node_id))
# Every request gets a response, except one special case:
expect_response = True
@@ -341,7 +334,7 @@ class KafkaClient(object):
# Attempt to complete pending connections
for node_id in list(self._connecting):
- self._finish_connect(node_id)
+ self._maybe_connect(node_id)
# Send a metadata request if needed
metadata_timeout_ms = self._maybe_refresh_metadata()
@@ -557,7 +550,7 @@ class KafkaClient(object):
elif self._can_connect(node_id):
log.debug("Initializing connection to node %s for metadata request", node_id)
- self._initiate_connect(node_id)
+ self._maybe_connect(node_id)
return 0
diff --git a/kafka/conn.py b/kafka/conn.py
index 0ce469d..2b82b6d 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -106,24 +106,22 @@ class BrokerConnection(object):
# in non-blocking mode, use repeated calls to socket.connect_ex
# to check connection status
request_timeout = self.config['request_timeout_ms'] / 1000.0
- if time.time() > request_timeout + self.last_attempt:
+ try:
+ ret = self._sock.connect_ex((self.host, self.port))
+ except socket.error as ret:
+ pass
+ if not ret or ret == errno.EISCONN:
+ self.state = ConnectionStates.CONNECTED
+ elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022):
+ log.error('Connect attempt to %s returned error %s.'
+ ' Disconnecting.', self, ret)
+ self.close()
+ self.last_failure = time.time()
+ elif time.time() > request_timeout + self.last_attempt:
log.error('Connection attempt to %s timed out', self)
self.close() # error=TimeoutError ?
self.last_failure = time.time()
- else:
- try:
- ret = self._sock.connect_ex((self.host, self.port))
- except socket.error as ret:
- pass
- if not ret or ret == errno.EISCONN:
- self.state = ConnectionStates.CONNECTED
- # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
- elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022):
- log.error('Connect attempt to %s returned error %s.'
- ' Disconnecting.', self, ret)
- self.close()
- self.last_failure = time.time()
return self.state
def blacked_out(self):
@@ -141,6 +139,10 @@ class BrokerConnection(object):
"""Return True iff socket is connected."""
return self.state is ConnectionStates.CONNECTED
+ def connecting(self):
+ """Return True iff socket is in intermediate connecting state."""
+ return self.state is ConnectionStates.CONNECTING
+
def close(self, error=None):
"""Close socket and fail all in-flight-requests.
@@ -158,7 +160,7 @@ class BrokerConnection(object):
self._rbuffer.seek(0)
self._rbuffer.truncate()
if error is None:
- error = Errors.ConnectionError()
+ error = Errors.ConnectionError(str(self))
while self.in_flight_requests:
ifr = self.in_flight_requests.popleft()
ifr.future.failure(error)
@@ -169,10 +171,12 @@ class BrokerConnection(object):
Can block on network if request is larger than send_buffer_bytes
"""
future = Future()
- if not self.connected():
- return future.failure(Errors.ConnectionError())
- if not self.can_send_more():
- return future.failure(Errors.TooManyInFlightRequests())
+ if self.connecting():
+ return future.failure(Errors.NodeNotReadyError(str(self)))
+ elif not self.connected():
+ return future.failure(Errors.ConnectionError(str(self)))
+ elif not self.can_send_more():
+ return future.failure(Errors.TooManyInFlightRequests(str(self)))
correlation_id = self._next_correlation_id()
header = RequestHeader(request,
correlation_id=correlation_id,
@@ -191,7 +195,7 @@ class BrokerConnection(object):
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
log.exception("Error sending %s to %s", request, self)
- error = Errors.ConnectionError(e)
+ error = Errors.ConnectionError("%s: %s" % (str(self), e))
self.close(error=error)
return future.failure(error)
log.debug('%s Request %d: %s', self, correlation_id, request)
@@ -324,11 +328,9 @@ class BrokerConnection(object):
' initialized on the broker')
elif ifr.correlation_id != recv_correlation_id:
-
-
error = Errors.CorrelationIdError(
- 'Correlation ids do not match: sent %d, recv %d'
- % (ifr.correlation_id, recv_correlation_id))
+ '%s: Correlation ids do not match: sent %d, recv %d'
+ % (str(self), ifr.correlation_id, recv_correlation_id))
ifr.future.failure(error)
self.close()
self._processing = False
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index f406a30..7112c7e 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -479,9 +479,6 @@ class Fetcher(six.Iterator):
# so create a separate future and attach a callback to update it
# based on response error codes
future = Future()
- if not self._client.ready(node_id):
- return future.failure(Errors.NodeNotReadyError(node_id))
-
_f = self._client.send(node_id, request)
_f.add_callback(self._handle_offset_response, partition, future)
_f.add_errback(lambda e: future.failure(e))
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index dca809e..b0a0981 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -186,7 +186,7 @@ class BaseCoordinator(object):
self.coordinator_dead()
return True
- return not self._client.ready(self.coordinator_id)
+ return False
def ensure_coordinator_known(self):
"""Block until the coordinator for this group is known
@@ -288,9 +288,13 @@ class BaseCoordinator(object):
return future
def _failed_request(self, node_id, request, future, error):
- log.error('Error sending %s to node %s [%s] -- marking coordinator dead',
+ log.error('Error sending %s to node %s [%s]',
request.__class__.__name__, node_id, error)
- self.coordinator_dead()
+ # Marking coordinator dead
+ # unless the error is caused by internal client pipelining
+ if not isinstance(error, (Errors.NodeNotReadyError,
+ Errors.TooManyInFlightRequests)):
+ self.coordinator_dead()
future.failure(error)
def _handle_join_group_response(self, future, response):
@@ -388,7 +392,8 @@ class BaseCoordinator(object):
def _send_sync_group_request(self, request):
if self.coordinator_unknown():
- return Future().failure(Errors.GroupCoordinatorNotAvailableError())
+ e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
+ return Future().failure(e)
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_sync_group_response, future)
@@ -439,7 +444,7 @@ class BaseCoordinator(object):
Future: resolves to the node id of the coordinator
"""
node_id = self._client.least_loaded_node()
- if node_id is None or not self._client.ready(node_id):
+ if node_id is None:
return Future().failure(Errors.NoBrokersAvailable())
log.debug("Issuing group metadata request to broker %s", node_id)
@@ -490,8 +495,8 @@ class BaseCoordinator(object):
def coordinator_dead(self, error=None):
"""Mark the current coordinator as dead."""
if self.coordinator_id is not None:
- log.info("Marking the coordinator dead (node %s): %s.",
- self.coordinator_id, error)
+ log.warning("Marking the coordinator dead (node %s): %s.",
+ self.coordinator_id, error)
self.coordinator_id = None
def close(self):
@@ -501,6 +506,7 @@ class BaseCoordinator(object):
self._client.unschedule(self.heartbeat_task)
except KeyError:
pass
+
if not self.coordinator_unknown() and self.generation > 0:
# this is a minimal effort attempt to leave the group. we do not
# attempt any resending if the request fails or times out.
@@ -634,7 +640,7 @@ class HeartbeatTask(object):
self._client.schedule(self, time.time() + ttl)
def _handle_heartbeat_failure(self, e):
- log.warning("Heartbeat failed; retrying")
+ log.warning("Heartbeat failed (%s); retrying", e)
self._request_in_flight = False
etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
self._client.schedule(self, etd)