diff options
-rw-r--r-- | gear/__init__.py | 6 | ||||
-rw-r--r-- | gear/tests/test_functional.py | 30 |
2 files changed, 36 insertions, 0 deletions
diff --git a/gear/__init__.py b/gear/__init__.py index b085c29..4ed674e 100644 --- a/gear/__init__.py +++ b/gear/__init__.py @@ -1897,6 +1897,12 @@ class Worker(BaseClient): else: self._sendCanDo(name) + connections = self.active_connections[:] + for connection in connections: + if connection.state == "SLEEP": + connection.changeState("IDLE") + self._updateStateMachines() + def unRegisterFunction(self, name): """Remove a function from Gearman's registry. diff --git a/gear/tests/test_functional.py b/gear/tests/test_functional.py index 09c7d2f..3bca907 100644 --- a/gear/tests/test_functional.py +++ b/gear/tests/test_functional.py @@ -238,6 +238,36 @@ class TestFunctionalText(tests.BaseTestCase): self.assertTrue(job.complete) self.assertEqual(job.exception, 'work failed') + def test_grab_job_after_register(self): + jobunique = uuid.uuid4().hex + job = gear.TextJob('test', 'testdata', unique=jobunique) + self.client.submitJob(job) + self.assertNotEqual(job.handle, None) + + def getJob(): + workerjob = self.worker.getJob() + workerjob.sendWorkComplete() + + jobthread = threading.Thread(target=getJob) + jobthread.daemon = True + jobthread.start() + + for count in iterate_timeout(30, "worker sleeping"): + if self.worker.active_connections[0].state == 'SLEEP': + break + self.assertEqual(1, len(self.server.normal_queue)) + self.assertFalse(job.complete) + + # When we register the function, the worker should send a + # grab_job packet and pick up the job and it should complete. + self.worker.registerFunction('test') + + for count in iterate_timeout(30, "job completion"): + if job.complete: + break + + self.assertEqual(0, len(self.server.normal_queue)) + def load_tests(loader, in_tests, pattern): return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern) |