summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-11-22 18:40:26 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-11-22 18:40:26 +0000
commit9aa48dd2ceb91d74c92154384a2c10bef6335c1f (patch)
tree2e20c8d5a63bc216111ec587acde8e1be885487d
parentcdf15d0658cd57c2666963119b4e05c2ff815ff7 (diff)
parent315a3df6dfbadff97928ae0896512d069cbf9e65 (diff)
downloadbuildstream-9aa48dd2ceb91d74c92154384a2c10bef6335c1f.tar.gz
Merge branch 'aevri/casdprocessmanager' into 'master'
cascache: refactor, extract CASDProcessManager and CASDConnection See merge request BuildStream/buildstream!1638
-rw-r--r--src/buildstream/_cas/cascache.py233
-rw-r--r--src/buildstream/_cas/casdprocessmanager.py233
-rw-r--r--src/buildstream/_cas/casremote.py6
-rw-r--r--src/buildstream/_scheduler/scheduler.py12
-rw-r--r--src/buildstream/_stream.py2
5 files changed, 289 insertions, 197 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index c1f2b30b0..98581d351 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -25,23 +25,20 @@ import errno
import contextlib
import ctypes
import multiprocessing
-import shutil
import signal
-import subprocess
-import tempfile
import time
import grpc
from .._protos.google.rpc import code_pb2
-from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
-from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
+from .._protos.build.buildgrid import local_cas_pb2
from .. import _signals, utils
from ..types import FastEnum
from .._exceptions import CASCacheError
-from .._message import Message, MessageType
+from .casdprocessmanager import CASDProcessManager
from .casremote import _CASBatchRead, _CASBatchUpdate
_BUFFER_SIZE = 65536
@@ -50,8 +47,6 @@ _BUFFER_SIZE = 65536
# Refresh interval for disk usage of local cache in seconds
_CACHE_USAGE_REFRESH = 5
-_CASD_MAX_LOGFILES = 10
-
class CASLogLevel(FastEnum):
WARNING = "warning"
@@ -78,53 +73,35 @@ class CASCache:
os.makedirs(os.path.join(self.casdir, "objects"), exist_ok=True)
os.makedirs(self.tmpdir, exist_ok=True)
- self._casd_channel = None
- self._casd_cas = None
- self._local_cas = None
self._cache_usage_monitor = None
self._cache_usage_monitor_forbidden = False
+ self._casd_process_manager = None
+ self._casd_channel = None
if casd:
- # Place socket in global/user temporary directory to avoid hitting
- # the socket path length limit.
- self._casd_socket_tempdir = tempfile.mkdtemp(prefix="buildstream")
- self._casd_socket_path = os.path.join(self._casd_socket_tempdir, "casd.sock")
-
- casd_args = [utils.get_host_tool("buildbox-casd")]
- casd_args.append("--bind=unix:" + self._casd_socket_path)
- casd_args.append("--log-level=" + log_level.value)
-
- if cache_quota is not None:
- casd_args.append("--quota-high={}".format(int(cache_quota)))
- casd_args.append("--quota-low={}".format(int(cache_quota / 2)))
-
- if protect_session_blobs:
- casd_args.append("--protect-session-blobs")
-
- casd_args.append(path)
-
- self._casd_start_time = time.time()
- self.casd_logfile = self._rotate_and_get_next_logfile()
-
- with open(self.casd_logfile, "w") as logfile_fp:
- # Block SIGINT on buildbox-casd, we don't need to stop it
- # The frontend will take care of it if needed
- with _signals.blocked([signal.SIGINT], ignore=False):
- self._casd_process = subprocess.Popen(
- casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT
- )
+ log_dir = os.path.join(self.casdir, "logs")
+ self._casd_process_manager = CASDProcessManager(
+ path, log_dir, log_level, cache_quota, protect_session_blobs
+ )
- self._cache_usage_monitor = _CASCacheUsageMonitor(self)
- else:
- self._casd_process = None
+ self._casd_channel = self._casd_process_manager.create_channel()
+ self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel)
def __getstate__(self):
+ # Note that we can't use jobpickler's
+ # 'get_state_for_child_job_pickling' protocol here, since CASCache's
+ # are passed to subprocesses other than child jobs. e.g.
+ # test.utils.ArtifactShare.
+
state = self.__dict__.copy()
- # Popen objects are not pickle-able, however, child processes only
- # need the information whether a casd subprocess was started or not.
- assert "_casd_process" in state
- state["_casd_process"] = bool(self._casd_process)
+ # Child jobs do not need to manage the CASD process, they only need a
+ # connection to CASD.
+ if state["_casd_process_manager"] is not None:
+ state["_casd_process_manager"] = None
+ # In order to be pickle-able, the connection must be in the initial
+ # 'closed' state.
+ state["_casd_channel"] = self._casd_process_manager.create_channel()
# The usage monitor is not pickle-able, but we also don't need it in
# child processes currently. Make sure that if this changes, we get a
@@ -136,43 +113,21 @@ class CASCache:
return state
- def _init_casd(self):
- assert self._casd_process, "CASCache was instantiated without buildbox-casd"
-
- if not self._casd_channel:
- while not os.path.exists(self._casd_socket_path):
- # casd is not ready yet, try again after a 10ms delay,
- # but don't wait for more than 15s
- if time.time() > self._casd_start_time + 15:
- raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
-
- time.sleep(0.01)
-
- self._casd_channel = grpc.insecure_channel("unix:" + self._casd_socket_path)
- self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
- self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
-
- # Call GetCapabilities() to establish connection to casd
- capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel)
- capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest())
-
- # _get_cas():
+ # get_cas():
#
# Return ContentAddressableStorage stub for buildbox-casd channel.
#
- def _get_cas(self):
- if not self._casd_cas:
- self._init_casd()
- return self._casd_cas
+ def get_cas(self):
+ assert self._casd_channel, "CASCache was created without a channel"
+ return self._casd_channel.get_cas()
- # _get_local_cas():
+ # get_local_cas():
#
# Return LocalCAS stub for buildbox-casd channel.
#
- def _get_local_cas(self):
- if not self._local_cas:
- self._init_casd()
- return self._local_cas
+ def get_local_cas(self):
+ assert self._casd_channel, "CASCache was created without a channel"
+ return self._casd_channel.get_local_cas()
# preflight():
#
@@ -184,24 +139,13 @@ class CASCache:
if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir))
- # has_open_grpc_channels():
- #
- # Return whether there are gRPC channel instances. This is used to safeguard
- # against fork() with open gRPC channels.
- #
- def has_open_grpc_channels(self):
- return bool(self._casd_channel)
-
# close_grpc_channels():
#
# Close the casd channel if it exists
#
def close_grpc_channels(self):
if self._casd_channel:
- self._local_cas = None
- self._casd_cas = None
self._casd_channel.close()
- self._casd_channel = None
# release_resources():
#
@@ -211,10 +155,10 @@ class CASCache:
if self._cache_usage_monitor:
self._cache_usage_monitor.release_resources()
- if self._casd_process:
+ if self._casd_process_manager:
self.close_grpc_channels()
- self._terminate_casd_process(messenger)
- shutil.rmtree(self._casd_socket_tempdir)
+ self._casd_process_manager.release_resources(messenger)
+ self._casd_process_manager = None
# contains():
#
@@ -255,7 +199,7 @@ class CASCache:
# Returns: True if the directory is available in the local cache
#
def contains_directory(self, digest, *, with_files):
- local_cas = self._get_local_cas()
+ local_cas = self.get_local_cas()
request = local_cas_pb2.FetchTreeRequest()
request.root_digest.CopyFrom(digest)
@@ -414,7 +358,7 @@ class CASCache:
request.path.append(path)
- local_cas = self._get_local_cas()
+ local_cas = self.get_local_cas()
response = local_cas.CaptureFiles(request)
@@ -441,7 +385,7 @@ class CASCache:
# (Digest): The digest of the imported directory
#
def import_directory(self, path):
- local_cas = self._get_local_cas()
+ local_cas = self.get_local_cas()
request = local_cas_pb2.CaptureTreeRequest()
request.path.append(path)
@@ -561,7 +505,7 @@ class CASCache:
# Returns: List of missing Digest objects
#
def remote_missing_blobs(self, remote, blobs):
- cas = self._get_cas()
+ cas = self.get_cas()
instance_name = remote.local_cas_instance_name
missing_blobs = dict()
@@ -701,30 +645,6 @@ class CASCache:
# Local Private Methods #
################################################
- # _rotate_and_get_next_logfile()
- #
- # Get the logfile to use for casd
- #
- # This will ensure that we don't create too many casd log files by
- # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
- #
- # Returns:
- # (str): the path to the log file to use
- #
- def _rotate_and_get_next_logfile(self):
- log_dir = os.path.join(self.casdir, "logs")
-
- try:
- existing_logs = sorted(os.listdir(log_dir))
- except FileNotFoundError:
- os.makedirs(log_dir)
- else:
- while len(existing_logs) >= _CASD_MAX_LOGFILES:
- logfile_to_delete = existing_logs.pop(0)
- os.remove(os.path.join(log_dir, logfile_to_delete))
-
- return os.path.join(log_dir, str(self._casd_start_time) + ".log")
-
def _refpath(self, ref):
return os.path.join(self.casdir, "refs", "heads", ref)
@@ -993,67 +913,6 @@ class CASCache:
# Upload any blobs missing on the server
self.send_blobs(remote, missing_blobs)
- # _terminate_casd_process()
- #
- # Terminate the buildbox casd process
- #
- # Args:
- # messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
- #
- def _terminate_casd_process(self, messenger=None):
- return_code = self._casd_process.poll()
-
- if return_code is not None:
- # buildbox-casd is already dead
- self._casd_process = None
-
- if messenger:
- messenger.message(
- Message(
- MessageType.BUG,
- "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
- return_code, self.casd_logfile
- ),
- )
- )
- return
-
- self._casd_process.terminate()
-
- try:
- # Don't print anything if buildbox-casd terminates quickly
- return_code = self._casd_process.wait(timeout=0.5)
- except subprocess.TimeoutExpired:
- if messenger:
- cm = messenger.timed_activity("Terminating buildbox-casd")
- else:
- cm = contextlib.suppress()
- with cm:
- try:
- return_code = self._casd_process.wait(timeout=15)
- except subprocess.TimeoutExpired:
- self._casd_process.kill()
- self._casd_process.wait(timeout=15)
-
- if messenger:
- messenger.message(
- Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
- )
- self._casd_process = None
- return
-
- if return_code != 0 and messenger:
- messenger.message(
- Message(
- MessageType.BUG,
- "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
- return_code, self.casd_logfile
- ),
- )
- )
-
- self._casd_process = None
-
# get_cache_usage():
#
# Fetches the current usage of the CAS local cache.
@@ -1065,16 +924,16 @@ class CASCache:
assert not self._cache_usage_monitor_forbidden
return self._cache_usage_monitor.get_cache_usage()
- # get_casd_process()
+ # get_casd_process_manager()
#
# Get the underlying buildbox-casd process
#
# Returns:
# (subprocess.Process): The casd process that is used for the current cascache
#
- def get_casd_process(self):
- assert self._casd_process is not None, "This should only be called with a running buildbox-casd process"
- return self._casd_process
+ def get_casd_process_manager(self):
+ assert self._casd_process_manager is not None, "Only call this with a running buildbox-casd process"
+ return self._casd_process_manager
# _CASCacheUsage
@@ -1115,8 +974,8 @@ class _CASCacheUsage:
# buildbox-casd.
#
class _CASCacheUsageMonitor:
- def __init__(self, cas):
- self.cas = cas
+ def __init__(self, connection):
+ self._connection = connection
# Shared memory (64-bit signed integer) for current disk usage and quota
self._disk_usage = multiprocessing.Value(ctypes.c_longlong, -1)
@@ -1124,7 +983,7 @@ class _CASCacheUsageMonitor:
# multiprocessing.Process will fork without exec on Unix.
# This can't be allowed with background threads or open gRPC channels.
- assert utils._is_single_threaded() and not cas.has_open_grpc_channels()
+ assert utils._is_single_threaded() and connection.is_closed()
# Block SIGINT, we don't want to kill the process when we interrupt the frontend
# and this process if very lightweight.
@@ -1156,7 +1015,7 @@ class _CASCacheUsageMonitor:
disk_usage = self._disk_usage
disk_quota = self._disk_quota
- local_cas = self.cas._get_local_cas()
+ local_cas = self._connection.get_local_cas()
while True:
try:
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
new file mode 100644
index 000000000..e4a58d7d5
--- /dev/null
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -0,0 +1,233 @@
+#
+# Copyright (C) 2018 Codethink Limited
+# Copyright (C) 2018-2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import contextlib
+import os
+import shutil
+import signal
+import subprocess
+import tempfile
+import time
+
+import grpc
+
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
+from .._protos.build.buildgrid import local_cas_pb2_grpc
+
+from .. import _signals, utils
+from .._exceptions import CASCacheError
+from .._message import Message, MessageType
+
+_CASD_MAX_LOGFILES = 10
+
+
+# CASDProcessManager
+#
+# This manages the subprocess that runs buildbox-casd.
+#
+# Args:
+# path (str): The root directory for the CAS repository
+# log_dir (str): The directory for the logs
+# log_level (LogLevel): Log level to give to buildbox-casd for logging
+# cache_quota (int): User configured cache quota
+# protect_session_blobs (bool): Disable expiry for blobs used in the current session
+#
+class CASDProcessManager:
+ def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs):
+ self._log_dir = log_dir
+
+ # Place socket in global/user temporary directory to avoid hitting
+ # the socket path length limit.
+ self._socket_tempdir = tempfile.mkdtemp(prefix="buildstream")
+ self._socket_path = os.path.join(self._socket_tempdir, "casd.sock")
+ self._connection_string = "unix:" + self._socket_path
+
+ casd_args = [utils.get_host_tool("buildbox-casd")]
+ casd_args.append("--bind=" + self._connection_string)
+ casd_args.append("--log-level=" + log_level.value)
+
+ if cache_quota is not None:
+ casd_args.append("--quota-high={}".format(int(cache_quota)))
+ casd_args.append("--quota-low={}".format(int(cache_quota / 2)))
+
+ if protect_session_blobs:
+ casd_args.append("--protect-session-blobs")
+
+ casd_args.append(path)
+
+ self._start_time = time.time()
+ self._logfile = self._rotate_and_get_next_logfile()
+
+ with open(self._logfile, "w") as logfile_fp:
+ # Block SIGINT on buildbox-casd, we don't need to stop it
+ # The frontend will take care of it if needed
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ self.process = subprocess.Popen(casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+
+ # _rotate_and_get_next_logfile()
+ #
+ # Get the logfile to use for casd
+ #
+ # This will ensure that we don't create too many casd log files by
+ # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
+ #
+ # Returns:
+ # (str): the path to the log file to use
+ #
+ def _rotate_and_get_next_logfile(self):
+ try:
+ existing_logs = sorted(os.listdir(self._log_dir))
+ except FileNotFoundError:
+ os.makedirs(self._log_dir)
+ else:
+ while len(existing_logs) >= _CASD_MAX_LOGFILES:
+ logfile_to_delete = existing_logs.pop(0)
+ os.remove(os.path.join(self._log_dir, logfile_to_delete))
+
+ return os.path.join(self._log_dir, str(self._start_time) + ".log")
+
+ # release_resources()
+ #
+ # Terminate the process and release related resources.
+ #
+ def release_resources(self, messenger=None):
+ self._terminate(messenger)
+ self.process = None
+ shutil.rmtree(self._socket_tempdir)
+
+ # _terminate()
+ #
+ # Terminate the buildbox casd process.
+ #
+ def _terminate(self, messenger=None):
+ return_code = self.process.poll()
+
+ if return_code is not None:
+ # buildbox-casd is already dead
+
+ if messenger:
+ messenger.message(
+ Message(
+ MessageType.BUG,
+ "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
+ return_code, self._logfile
+ ),
+ )
+ )
+ return
+
+ self.process.terminate()
+
+ try:
+ # Don't print anything if buildbox-casd terminates quickly
+ return_code = self.process.wait(timeout=0.5)
+ except subprocess.TimeoutExpired:
+ if messenger:
+ cm = messenger.timed_activity("Terminating buildbox-casd")
+ else:
+ cm = contextlib.suppress()
+ with cm:
+ try:
+ return_code = self.process.wait(timeout=15)
+ except subprocess.TimeoutExpired:
+ self.process.kill()
+ self.process.wait(timeout=15)
+
+ if messenger:
+ messenger.message(
+ Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
+ )
+ return
+
+ if return_code != 0 and messenger:
+ messenger.message(
+ Message(
+ MessageType.BUG,
+ "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(return_code, self._logfile),
+ )
+ )
+
+ # create_channel():
+ #
+ # Return a CASDChannel, note that the actual connection is not necessarily
+ # established until it is needed.
+ #
+ def create_channel(self):
+ return CASDChannel(self._socket_path, self._connection_string, self._start_time)
+
+
+class CASDChannel:
+ def __init__(self, socket_path, connection_string, start_time):
+ self._socket_path = socket_path
+ self._connection_string = connection_string
+ self._start_time = start_time
+ self._casd_channel = None
+ self._casd_cas = None
+ self._local_cas = None
+
+ def _establish_connection(self):
+ assert self._casd_channel is None
+
+ while not os.path.exists(self._socket_path):
+ # casd is not ready yet, try again after a 10ms delay,
+ # but don't wait for more than 15s
+ if time.time() > self._start_time + 15:
+ raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
+
+ time.sleep(0.01)
+
+ self._casd_channel = grpc.insecure_channel(self._connection_string)
+ self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
+ self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
+
+ # get_cas():
+ #
+ # Return ContentAddressableStorage stub for buildbox-casd channel.
+ #
+ def get_cas(self):
+ if self._casd_channel is None:
+ self._establish_connection()
+ return self._casd_cas
+
+ # get_local_cas():
+ #
+ # Return LocalCAS stub for buildbox-casd channel.
+ #
+ def get_local_cas(self):
+ if self._casd_channel is None:
+ self._establish_connection()
+ return self._local_cas
+
+ # is_closed():
+ #
+ # Return whether this connection is closed or not.
+ #
+ def is_closed(self):
+ return self._casd_channel is None
+
+ # close():
+ #
+ # Close the casd channel.
+ #
+ def close(self):
+ if self.is_closed():
+ return
+ self._local_cas = None
+ self._casd_cas = None
+ self._casd_channel.close()
+ self._casd_channel = None
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py
index ee6f4679c..93f4e500c 100644
--- a/src/buildstream/_cas/casremote.py
+++ b/src/buildstream/_cas/casremote.py
@@ -53,7 +53,7 @@ class CASRemote(BaseRemote):
# be called outside of init().
#
def _configure_protocols(self):
- local_cas = self.cascache._get_local_cas()
+ local_cas = self.cascache.get_local_cas()
request = local_cas_pb2.GetInstanceNameForRemoteRequest()
request.url = self.spec.url
if self.spec.instance_name:
@@ -113,7 +113,7 @@ class _CASBatchRead:
if not self._requests:
return
- local_cas = self._remote.cascache._get_local_cas()
+ local_cas = self._remote.cascache.get_local_cas()
for request in self._requests:
batch_response = local_cas.FetchMissingBlobs(request)
@@ -167,7 +167,7 @@ class _CASBatchUpdate:
if not self._requests:
return
- local_cas = self._remote.cascache._get_local_cas()
+ local_cas = self._remote.cascache.get_local_cas()
for request in self._requests:
batch_response = local_cas.UploadMissingBlobs(request)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 171281bd9..80bd9fb50 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -152,8 +152,8 @@ class Scheduler:
#
# Args:
# queues (list): A list of Queue objects
- # casd_processes (subprocess.Process): The subprocess which runs casd in order to be notified
- # of failures.
+ # casd_process_manager (cascache.CASDProcessManager): The subprocess which runs casd, in order to be notified
+ # of failures.
#
# Returns:
# (SchedStatus): How the scheduling terminated
@@ -163,7 +163,7 @@ class Scheduler:
# elements have been processed by each queue or when
# an error arises
#
- def run(self, queues, casd_process):
+ def run(self, queues, casd_process_manager):
# Hold on to the queues to process
self.queues = queues
@@ -183,9 +183,9 @@ class Scheduler:
self._connect_signals()
# Watch casd while running to ensure it doesn't die
- self._casd_process = casd_process
+ self._casd_process = casd_process_manager.process
_watcher = asyncio.get_child_watcher()
- _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+ _watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure)
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
@@ -195,7 +195,7 @@ class Scheduler:
self.loop.close()
# Stop watching casd
- _watcher.remove_child_handler(casd_process.pid)
+ _watcher.remove_child_handler(self._casd_process.pid)
self._casd_process = None
# Stop handling unix signals
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 0a3495ce5..bab5cb100 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1373,7 +1373,7 @@ class Stream:
if self._session_start_callback is not None:
self._session_start_callback()
- status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
+ status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager())
if status == SchedStatus.ERROR:
raise StreamError()