mirror of
https://github.com/vale981/ray
synced 2025-04-23 06:25:52 -04:00
Replace actor dummy objects with mock calls to the local scheduler (#1467)
* Replace putting the dummy object with a call to the local scheduler * Mark dummy objects as locally available
This commit is contained in:
parent
782b4aeb0f
commit
668737f383
3 changed files with 14 additions and 44 deletions
|
@ -6,7 +6,6 @@ import copy
|
|||
import hashlib
|
||||
import inspect
|
||||
import json
|
||||
import numpy as np
|
||||
import traceback
|
||||
|
||||
import pyarrow.plasma as plasma
|
||||
|
@ -110,37 +109,6 @@ def get_actor_checkpoint(worker, actor_id):
|
|||
return checkpoint_index, checkpoint
|
||||
|
||||
|
||||
def put_dummy_object(worker, dummy_object_id):
|
||||
"""Put a dummy actor object into the local object store.
|
||||
|
||||
This registers a dummy object ID in the local store with an empty numpy
|
||||
array as the value. The resulting object is pinned to the store by storing
|
||||
it to the worker's state.
|
||||
|
||||
For actors, dummy objects are used to store the stateful dependencies
|
||||
between consecutive method calls. This function should be called for every
|
||||
actor method execution that updates the actor's internal state.
|
||||
|
||||
Args:
|
||||
worker: The worker to use to perform the put.
|
||||
dummy_object_id: The object ID of the dummy object.
|
||||
"""
|
||||
# Add the dummy output for actor tasks. TODO(swang): We use
|
||||
# a numpy array as a hack to pin the object in the object
|
||||
# store. Once we allow object pinning in the store, we may
|
||||
# use `None`.
|
||||
dummy_object = np.zeros(1)
|
||||
worker.put_object(dummy_object_id, dummy_object)
|
||||
# Keep the dummy output in scope for the lifetime of the
|
||||
# actor, to prevent eviction from the object store.
|
||||
dummy_object = worker.get_object([dummy_object_id])
|
||||
dummy_object = dummy_object[0]
|
||||
worker.actor_pinned_objects.append(dummy_object)
|
||||
if (len(worker.actor_pinned_objects) >
|
||||
ray._config.actor_max_dummy_objects()):
|
||||
worker.actor_pinned_objects.pop(0)
|
||||
|
||||
|
||||
def make_actor_method_executor(worker, method_name, method):
|
||||
"""Make an executor that wraps a user-defined actor method.
|
||||
|
||||
|
@ -168,11 +136,10 @@ def make_actor_method_executor(worker, method_name, method):
|
|||
if method_name == "__ray_checkpoint__":
|
||||
# Execute the checkpoint task.
|
||||
actor_checkpoint_failed, error = method(actor, *args)
|
||||
# If the checkpoint was successfully loaded, put the dummy object
|
||||
# and update the actor's task counter, so that the task following
|
||||
# the checkpoint can run.
|
||||
# If the checkpoint was successfully loaded, update the actor's
|
||||
# task counter and set a flag to notify the local scheduler, so
|
||||
# that the task following the checkpoint can run.
|
||||
if not actor_checkpoint_failed:
|
||||
put_dummy_object(worker, dummy_return_id)
|
||||
worker.actor_task_counter = task_counter + 1
|
||||
# Once the actor has resumed from a checkpoint, it counts as
|
||||
# loaded.
|
||||
|
@ -188,7 +155,6 @@ def make_actor_method_executor(worker, method_name, method):
|
|||
else:
|
||||
# Update the worker's internal state before executing the method in
|
||||
# case the method throws an exception.
|
||||
put_dummy_object(worker, dummy_return_id)
|
||||
worker.actor_task_counter = task_counter + 1
|
||||
# Once the actor executes a task, it counts as loaded.
|
||||
worker.actor_loaded = True
|
||||
|
|
|
@ -230,10 +230,6 @@ class Worker(object):
|
|||
# task assigned. Workers are not assigned a task on startup, so we
|
||||
# initialize to False.
|
||||
self.actor_checkpoint_failed = False
|
||||
# TODO(swang): This is a hack to prevent the object store from evicting
|
||||
# dummy objects. Once we allow object pinning in the store, we may
|
||||
# remove this variable.
|
||||
self.actor_pinned_objects = None
|
||||
# The number of threads Plasma should use when putting an object in the
|
||||
# object store.
|
||||
self.memcopy_threads = 12
|
||||
|
@ -1920,9 +1916,6 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
|
|||
actor_key = b"Actor:" + worker.actor_id
|
||||
class_id = worker.redis_client.hget(actor_key, "class_id")
|
||||
worker.class_id = class_id
|
||||
# Store a list of the dummy outputs produced by actor tasks, to pin the
|
||||
# dummy outputs in the object store.
|
||||
worker.actor_pinned_objects = []
|
||||
|
||||
# Initialize the serialization library. This registers some classes, and so
|
||||
# it must be run before we export all of the cached remote functions.
|
||||
|
|
|
@ -599,6 +599,17 @@ void finish_task(LocalSchedulerState *state,
|
|||
worker->resources_in_use;
|
||||
release_resources(state, worker, cpu_resources);
|
||||
}
|
||||
/* For successful actor tasks, mark returned dummy objects as locally
|
||||
* available. This is not added to the object table, so the update will be
|
||||
* invisible to other nodes. */
|
||||
/* NOTE(swang): These objects are never cleaned up. We should consider
|
||||
* removing the objects, e.g., when an actor is terminated. */
|
||||
if (TaskSpec_is_actor_task(spec)) {
|
||||
if (!actor_checkpoint_failed) {
|
||||
handle_object_available(state, state->algorithm_state,
|
||||
TaskSpec_actor_dummy_object(spec));
|
||||
}
|
||||
}
|
||||
/* If we're connected to Redis, update tables. */
|
||||
if (state->db != NULL) {
|
||||
/* Update control state tables. If there was an error while executing a *
|
||||
|
|
Loading…
Add table
Reference in a new issue