Inline objects (#3756)

* added store_client_ to object_manager and node_manager

* half through...

* all code in, and compiling! Nothing tested though...

* something is working ;-)

* added a few more comments

* now, add only one entry to the in GCS for inlined objects

* more comments

* remove a spurious todo

* some comment updates

* add test

* added support for meta data for inline objects

* avoid some copies

* Initialize plasma client in tests

* Better comments. Enable configuring nline_object_max_size_bytes.

* Update src/ray/object_manager/object_manager.cc

Co-Authored-By: istoica <istoica@cs.berkeley.edu>

* Update src/ray/raylet/node_manager.cc

Co-Authored-By: istoica <istoica@cs.berkeley.edu>

* Update src/ray/raylet/node_manager.cc

Co-Authored-By: istoica <istoica@cs.berkeley.edu>

* fiexed comments

* fixed various typos in comments

* updated comments in object_manager.h and object_manager.cc

* addressed all comments...hopefully ;-)

* Only add eviction entries for objects that are not inlined

* fixed a bunch of comments

* Fix test

* Fix object transfer dump test

* lint

* Comments

* Fix test?

* Fix test?

* lint

* fix build

* Fix build

* lint

* Use const ref

* Fixes, don't let object manager hang

* Increase object transfer retry time for travis?

* Fix test

* Fix test?

* Add internal config to java, fix PlasmaFreeTest
This commit is contained in:
Ion 2019-02-07 20:32:39 +02:00 committed by Stephanie Wang
parent 5db1afef07
commit f987572795
20 changed files with 369 additions and 70 deletions

View file

@ -6,8 +6,10 @@ import com.google.common.collect.ImmutableList;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.ray.api.id.UniqueId;
@ -50,6 +52,7 @@ public class RayConfig {
public final Long objectStoreSize;
public final String rayletSocketName;
public final List<String> rayletConfigParameters;
public final String redisServerExecutablePath;
public final String redisModulePath;
@ -162,6 +165,14 @@ public class RayConfig {
// raylet socket name
rayletSocketName = config.getString("ray.raylet.socket-name");
// raylet parameters
rayletConfigParameters = new ArrayList<String>();
Config rayletConfig = config.getConfig("ray.raylet.config");
for (java.util.Map.Entry<java.lang.String,ConfigValue> entry : rayletConfig.entrySet()) {
String parameter = entry.getKey() + "," + String.valueOf(entry.getValue().unwrapped());
rayletConfigParameters.add(parameter);
}
// library path
this.libraryPath = new ImmutableList.Builder<String>().add(
rayHome + "/build/src/plasma",

View file

@ -205,7 +205,7 @@ public class RunManager {
"0", // number of initial workers
String.valueOf(maximumStartupConcurrency),
ResourceUtil.getResourcesStringFromMap(rayConfig.resources),
"", // The internal config list.
String.join(",", rayConfig.rayletConfigParameters), // The internal config list.
buildPythonWorkerCommand(), // python worker command
buildWorkerCommandRaylet() // java worker command
);

View file

@ -85,6 +85,10 @@ ray {
raylet {
// RPC socket name of Raylet
socket-name: /tmp/ray/sockets/raylet
// See src/ray/ray_config_def.h for options.
config {
}
}
}

View file

@ -13,6 +13,7 @@ public class BaseTest {
public void setUp() {
System.setProperty("ray.home", "../..");
System.setProperty("ray.resources", "CPU:4,RES-A:4");
System.setProperty("ray.raylet.config.inline_object_max_size_bytes", "0");
Ray.init();
}
@ -29,6 +30,7 @@ public class BaseTest {
// unset system properties
System.clearProperty("ray.home");
System.clearProperty("ray.resources");
System.clearProperty("ray.raylet.config.inline_object_max_size_bytes");
}
}

View file

@ -122,12 +122,20 @@ table FunctionTableData {
table ObjectTableData {
// The size of the object.
object_size: long;
// Is object in-lined? Inline objects are objects whose data and metadata are
// inlined in the GCS object table entry, which normally only specifies
// the object location.
inline_object_flag: bool;
// The node manager ID that this object appeared on or was evicted by.
manager: string;
// Whether this entry is an addition or a deletion.
is_eviction: bool;
// The number of times this object has been evicted from this node so far.
num_evictions: int;
// In-line object data.
inline_object_data: [ubyte];
// In-line object metadata.
inline_object_metadata: [ubyte];
}
table TaskReconstructionData {

View file

@ -8,15 +8,21 @@ ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service,
namespace {
/// Process a suffix of the object table log and store the result in
/// client_ids. This assumes that client_ids already contains the result of the
/// object table log up to but not including this suffix. This also stores a
/// bool in has_been_created indicating whether the object has ever been
/// created before.
/// Process a suffix of the object table log.
/// If object is inlined (inline_object_flag = TRUE), its data and metadata are
/// stored with the object's entry so we read them into inline_object_data, and
/// inline_object_metadata, respectively.
/// If object is not inlined, store the result in client_ids.
/// This assumes that client_ids already contains the result of the
/// object table log up to but not including this suffix.
/// This function also stores a bool in has_been_created indicating whether the
/// object has ever been created before.
void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history,
const ray::gcs::ClientTable &client_table,
std::unordered_set<ClientID> *client_ids,
bool *has_been_created) {
bool *inline_object_flag,
std::vector<uint8_t> *inline_object_data,
std::string *inline_object_metadata, bool *has_been_created) {
// location_history contains the history of locations of the object (it is a log),
// which might look like the following:
// client1.is_eviction = false
@ -24,6 +30,9 @@ void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history
// client2.is_eviction = false
// In such a scenario, we want to indicate client2 is the only client that contains
// the object, which the following code achieves.
//
// If object is inlined each entry contains both the object's data and metadata,
// so we don't care about its location.
if (!location_history.empty()) {
// If there are entries, then the object has been created. Once this flag
// is set to true, it should never go back to false.
@ -31,18 +40,35 @@ void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history
}
for (const auto &object_table_data : location_history) {
ClientID client_id = ClientID::from_binary(object_table_data.manager);
if (object_table_data.inline_object_flag) {
if (!*inline_object_flag) {
// This is the first time we're receiving the inline object data. Read
// object's data from the GCS entry.
*inline_object_flag = object_table_data.inline_object_flag;
inline_object_data->assign(object_table_data.inline_object_data.begin(),
object_table_data.inline_object_data.end());
inline_object_metadata->assign(object_table_data.inline_object_metadata.begin(),
object_table_data.inline_object_metadata.end());
}
// We got the data and metadata of the object so exit the loop.
break;
}
if (!object_table_data.is_eviction) {
client_ids->insert(client_id);
} else {
client_ids->erase(client_id);
}
}
// Filter out the removed clients from the object locations.
for (auto it = client_ids->begin(); it != client_ids->end();) {
if (client_table.IsRemoved(*it)) {
it = client_ids->erase(it);
} else {
it++;
if (!*inline_object_flag) {
// Filter out the removed clients from the object locations.
for (auto it = client_ids->begin(); it != client_ids->end();) {
if (client_table.IsRemoved(*it)) {
it = client_ids->erase(it);
} else {
it++;
}
}
}
}
@ -62,6 +88,8 @@ void ObjectDirectory::RegisterBackend() {
// Update entries for this object.
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&it->second.current_object_locations,
&it->second.inline_object_flag, &it->second.inline_object_data,
&it->second.inline_object_metadata,
&it->second.has_been_created);
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
// looping over the callbacks.
@ -74,6 +102,8 @@ void ObjectDirectory::RegisterBackend() {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, it->second.current_object_locations,
it->second.inline_object_flag, it->second.inline_object_data,
it->second.inline_object_metadata,
it->second.has_been_created);
}
};
@ -84,13 +114,24 @@ void ObjectDirectory::RegisterBackend() {
ray::Status ObjectDirectory::ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info) {
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) {
RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id << " inline? "
<< inline_object_flag;
// Append the addition entry to the object table.
auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
data->is_eviction = false;
data->num_evictions = object_evictions_[object_id];
data->object_size = object_info.data_size;
data->inline_object_flag = inline_object_flag;
if (inline_object_flag) {
// Add object's data to its GCS entry.
data->inline_object_data.assign(inline_object_data.begin(), inline_object_data.end());
data->inline_object_metadata.assign(inline_object_metadata.begin(),
inline_object_metadata.end());
}
ray::Status status =
gcs_client_->object_table().Append(JobID::nil(), object_id, data, nullptr);
return status;
@ -98,6 +139,7 @@ ray::Status ObjectDirectory::ReportObjectAdded(
ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) {
RAY_LOG(DEBUG) << "Reporting object removed to GCS " << object_id;
// Append the eviction entry to the object table.
auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
@ -147,16 +189,19 @@ void ObjectDirectory::HandleClientRemoved(const ClientID &client_id) {
if (listener.second.current_object_locations.count(client_id) > 0) {
// If the subscribed object has the removed client as a location, update
// its locations with an empty log so that the location will be removed.
UpdateObjectLocations({}, gcs_client_->client_table(),
&listener.second.current_object_locations,
&listener.second.has_been_created);
UpdateObjectLocations(
{}, gcs_client_->client_table(), &listener.second.current_object_locations,
&listener.second.inline_object_flag, &listener.second.inline_object_data,
&listener.second.inline_object_metadata, &listener.second.has_been_created);
// Re-call all the subscribed callbacks for the object, since its
// locations have changed.
for (const auto &callback_pair : listener.second.callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, listener.second.current_object_locations,
listener.second.has_been_created);
callback_pair.second(
object_id, listener.second.current_object_locations,
listener.second.inline_object_flag, listener.second.inline_object_data,
listener.second.inline_object_metadata, listener.second.has_been_created);
}
}
}
@ -182,8 +227,14 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
// immediately notify the caller of the current known locations.
if (listener_state.has_been_created) {
auto &locations = listener_state.current_object_locations;
io_service_.post([callback, locations, object_id]() {
callback(object_id, locations, /*has_been_created=*/true);
auto inline_object_flag = listener_state.inline_object_flag;
const auto &inline_object_data = listener_state.inline_object_data;
const auto &inline_object_metadata = listener_state.inline_object_metadata;
io_service_.post([callback, locations, inline_object_flag, inline_object_data,
inline_object_metadata, object_id]() {
callback(object_id, locations, inline_object_flag, inline_object_data,
inline_object_metadata,
/*has_been_created=*/true);
});
}
return status;
@ -216,20 +267,31 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_history) {
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> client_ids;
bool inline_object_flag = false;
std::vector<uint8_t> inline_object_data;
std::string inline_object_metadata;
bool has_been_created = false;
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&client_ids, &has_been_created);
&client_ids, &inline_object_flag, &inline_object_data,
&inline_object_metadata, &has_been_created);
// It is safe to call the callback directly since this is already running
// in the GCS client's lookup callback stack.
callback(object_id, client_ids, has_been_created);
callback(object_id, client_ids, inline_object_flag, inline_object_data,
inline_object_metadata, has_been_created);
});
} else {
// If we have locations cached due to a concurrent SubscribeObjectLocations
// call, call the callback immediately with the cached locations.
// If object inlined, we already have the object's data.
auto &locations = it->second.current_object_locations;
bool has_been_created = it->second.has_been_created;
io_service_.post([callback, object_id, locations, has_been_created]() {
callback(object_id, locations, has_been_created);
bool inline_object_flag = it->second.inline_object_flag;
const auto &inline_object_data = it->second.inline_object_data;
const auto &inline_object_metadata = it->second.inline_object_metadata;
io_service_.post([callback, object_id, locations, inline_object_flag,
inline_object_data, inline_object_metadata, has_been_created]() {
callback(object_id, locations, inline_object_flag, inline_object_data,
inline_object_metadata, has_been_created);
});
}
return status;

View file

@ -48,9 +48,9 @@ class ObjectDirectoryInterface {
virtual std::vector<RemoteConnectionInfo> LookupAllRemoteConnections() const = 0;
/// Callback for object location notifications.
using OnLocationsFound = std::function<void(const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &,
bool has_been_created)>;
using OnLocationsFound = std::function<void(
const ray::ObjectID &object_id, const std::unordered_set<ray::ClientID> &, bool,
const std::vector<uint8_t> &, const std::string &, bool has_been_created)>;
/// Lookup object locations. Callback may be invoked with empty list of client ids.
///
@ -99,10 +99,15 @@ class ObjectDirectoryInterface {
/// \param object_id The object id that was put into the store.
/// \param client_id The client id corresponding to this node.
/// \param object_info Additional information about the object.
/// \param inline_object_flag Flag specifying whether object is inlined.
/// \param inline_object_data Object data. Only for inlined objects.
/// \param inline_object_metadata Object metadata. Only for inlined objects.
/// \return Status of whether this method succeeded.
virtual ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info) = 0;
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) = 0;
/// Report objects removed from this client's store to the object directory.
///
@ -154,9 +159,12 @@ class ObjectDirectory : public ObjectDirectoryInterface {
ray::Status UnsubscribeObjectLocations(const UniqueID &callback_id,
const ObjectID &object_id) override;
ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info) override;
ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info,
bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) override;
ray::Status ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) override;
@ -174,6 +182,15 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_map<UniqueID, OnLocationsFound> callbacks;
/// The current set of known locations of this object.
std::unordered_set<ClientID> current_object_locations;
/// Specify whether the object is inlined. The data and the metadata of
/// an inlined object are stored in the object's GCS entry. In this flag
/// (i.e., the object is inlined) the content of current_object_locations
/// can be ignored.
bool inline_object_flag;
/// Inlined object data, if inline_object_flag == true.
std::vector<uint8_t> inline_object_data;
/// Inlined object metadata, if inline_object_flag == true.
std::string inline_object_metadata;
/// This flag will get set to true if the object has ever been created. It
/// should never go back to false once set to true. If this is true, and
/// the current_object_locations is empty, then this means that the object

View file

@ -10,13 +10,15 @@ namespace ray {
ObjectManager::ObjectManager(asio::io_service &main_service,
const ObjectManagerConfig &config,
std::shared_ptr<ObjectDirectoryInterface> object_directory)
std::shared_ptr<ObjectDirectoryInterface> object_directory,
plasma::PlasmaClient &store_client)
: config_(config),
object_directory_(std::move(object_directory)),
store_notification_(main_service, config_.store_socket_name),
buffer_pool_(config_.store_socket_name, config_.object_chunk_size),
send_work_(send_service_),
receive_work_(receive_service_),
store_client_(store_client),
connection_pool_(),
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {
RAY_CHECK(config_.max_sends > 0);
@ -64,11 +66,39 @@ void ObjectManager::HandleObjectAdded(
const object_manager::protocol::ObjectInfoT &object_info) {
// Notify the object directory that the object has been added to this node.
ObjectID object_id = ObjectID::from_binary(object_info.object_id);
RAY_LOG(DEBUG) << "Object added " << object_id;
RAY_CHECK(local_objects_.count(object_id) == 0);
local_objects_[object_id].object_info = object_info;
ray::Status status =
object_directory_->ReportObjectAdded(object_id, client_id_, object_info);
// If this object was created from inlined data, this means it is already in GCS,
// so no need to write it again.
if (local_inlined_objects_.find(object_id) == local_inlined_objects_.end()) {
std::vector<uint8_t> inline_object_data;
std::string inline_object_metadata;
bool inline_object_flag = false;
if (object_info.data_size <= RayConfig::instance().inline_object_max_size_bytes()) {
// Inline object. Try to get the data from the object store.
plasma::ObjectBuffer object_buffer;
plasma::ObjectID plasma_id = object_id.to_plasma_id();
RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer));
if (object_buffer.data != nullptr) {
// The object exists. Store the object data in the GCS entry.
inline_object_flag = true;
inline_object_data.assign(
object_buffer.data->data(),
object_buffer.data->data() + object_buffer.data->size());
inline_object_metadata.assign(
object_buffer.metadata->data(),
object_buffer.metadata->data() + object_buffer.metadata->size());
// Mark this object as inlined, so that if this object is later
// evicted, we do not report it to the GCS.
local_inlined_objects_.insert(object_id);
}
}
RAY_CHECK_OK(object_directory_->ReportObjectAdded(
object_id, client_id_, object_info, inline_object_flag, inline_object_data,
inline_object_metadata));
}
// Handle the unfulfilled_push_requests_ which contains the push request that is not
// completed due to unsatisfied local objects.
auto iter = unfulfilled_push_requests_.find(object_id);
@ -90,10 +120,16 @@ void ObjectManager::HandleObjectAdded(
}
void ObjectManager::NotifyDirectoryObjectDeleted(const ObjectID &object_id) {
RAY_LOG(DEBUG) << "Object removed " << object_id;
auto it = local_objects_.find(object_id);
RAY_CHECK(it != local_objects_.end());
if (local_inlined_objects_.find(object_id) == local_inlined_objects_.end()) {
// Inline object data can be retrieved by any node by contacting the GCS,
// so only report that the object was evicted if it wasn't inlined.
RAY_CHECK_OK(object_directory_->ReportObjectRemoved(object_id, client_id_));
}
local_objects_.erase(it);
ray::Status status = object_directory_->ReportObjectRemoved(object_id, client_id_);
local_inlined_objects_.erase(object_id);
}
ray::Status ObjectManager::SubscribeObjAdded(
@ -108,6 +144,26 @@ ray::Status ObjectManager::SubscribeObjDeleted(
return ray::Status::OK();
}
void ObjectManager::PutInlineObject(const ObjectID &object_id,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) {
if (local_objects_.find(object_id) == local_objects_.end()) {
// Inline object is not in the local object store. Create it from
// inline_object_data, and inline_object_metadata, respectively.
//
// Since this function is called on notification or when reading the
// object's entry from GCS, we know this object's entry is already in GCS.
// Remember this by adding the object to local_inlined_objects_. This way
// we avoid writing another copy of this object to GCS in HandleObjectAdded().
local_inlined_objects_.insert(object_id);
auto status = store_client_.CreateAndSeal(
object_id.to_plasma_id(),
std::string(inline_object_data.begin(), inline_object_data.end()),
inline_object_metadata);
RAY_CHECK(status.IsPlasmaObjectExists() || status.ok()) << status.message();
}
}
ray::Status ObjectManager::Pull(const ObjectID &object_id) {
RAY_LOG(DEBUG) << "Pull on " << client_id_ << " of object " << object_id;
// Check if object is already local.
@ -127,7 +183,13 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) {
return object_directory_->SubscribeObjectLocations(
object_directory_pull_callback_id_, object_id,
[this](const ObjectID &object_id, const std::unordered_set<ClientID> &client_ids,
bool created) {
bool inline_object_flag, const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata, bool created) {
if (inline_object_flag) {
// This is an inlined object. Store it in the Plasma store and return.
PutInlineObject(object_id, inline_object_data, inline_object_metadata);
return;
}
// Exit if the Pull request has already been fulfilled or canceled.
auto it = pull_requests_.find(object_id);
if (it == pull_requests_.end()) {
@ -169,7 +231,14 @@ void ObjectManager::TryPull(const ObjectID &object_id) {
RAY_CHECK(local_objects_.count(object_id) == 0);
// Make sure that there is at least one client which is not the local client.
// TODO(rkn): It may actually be possible for this check to fail.
RAY_CHECK(client_vector.size() != 1 || client_vector[0] != client_id_);
if (client_vector.size() == 1 && client_vector[0] == client_id_) {
RAY_LOG(ERROR) << "The object manager with client ID " << client_id_
<< " is trying to pull object " << object_id
<< " but the object table suggests that this object manager "
<< "already has the object. The object may have been evicted.";
it->second.timer_set = false;
return;
}
// Choose a random client to pull the object from.
// Generate a random index.
@ -572,11 +641,19 @@ ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) {
RAY_RETURN_NOT_OK(object_directory_->LookupLocations(
object_id,
[this, wait_id](const ObjectID &lookup_object_id,
const std::unordered_set<ClientID> &client_ids, bool created) {
const std::unordered_set<ClientID> &client_ids,
bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata, bool created) {
auto &wait_state = active_wait_requests_.find(wait_id)->second;
if (!client_ids.empty()) {
if (!client_ids.empty() || inline_object_flag) {
wait_state.remaining.erase(lookup_object_id);
wait_state.found.insert(lookup_object_id);
if (inline_object_flag) {
// This is an inlined object. Store it in the Plasma store and return.
PutInlineObject(lookup_object_id, inline_object_data,
inline_object_metadata);
}
}
RAY_LOG(DEBUG) << "Wait request " << wait_id << ": " << client_ids.size()
<< " locations found for object " << lookup_object_id;
@ -610,8 +687,11 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
RAY_CHECK_OK(object_directory_->SubscribeObjectLocations(
wait_id, object_id,
[this, wait_id](const ObjectID &subscribe_object_id,
const std::unordered_set<ClientID> &client_ids, bool created) {
if (!client_ids.empty()) {
const std::unordered_set<ClientID> &client_ids,
bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata, bool created) {
if (!client_ids.empty() || inline_object_flag) {
RAY_LOG(DEBUG) << "Wait request " << wait_id
<< ": subscription notification received for object "
<< subscribe_object_id;
@ -623,6 +703,11 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
// notification.
return;
}
if (inline_object_flag) {
// This is an inlined object. Store it in the Plasma store.
PutInlineObject(subscribe_object_id, inline_object_data,
inline_object_metadata);
}
auto &wait_state = object_id_wait_state->second;
wait_state.remaining.erase(subscribe_object_id);
wait_state.found.insert(subscribe_object_id);

View file

@ -76,9 +76,12 @@ class ObjectManager : public ObjectManagerInterface {
/// \param main_service The main asio io_service.
/// \param config ObjectManager configuration.
/// \param object_directory An object implementing the object directory interface.
/// \param store_client Reference to Plasma store. This is used to get and put
/// inlined objects in the local object store.
explicit ObjectManager(boost::asio::io_service &main_service,
const ObjectManagerConfig &config,
std::shared_ptr<ObjectDirectoryInterface> object_directory);
std::shared_ptr<ObjectDirectoryInterface> object_directory,
plasma::PlasmaClient &store_client);
~ObjectManager();
@ -351,6 +354,12 @@ class ObjectManager : public ObjectManagerInterface {
/// Handle Push task timeout.
void HandlePushTaskTimeout(const ObjectID &object_id, const ClientID &client_id);
/// Add inline object to object store. Called when reading the object entry
/// from GCS or upon receiving a notification about an inline object.
void PutInlineObject(const ObjectID &object_id,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata);
ClientID client_id_;
const ObjectManagerConfig config_;
std::shared_ptr<ObjectDirectoryInterface> object_directory_;
@ -380,6 +389,10 @@ class ObjectManager : public ObjectManagerInterface {
/// all incoming object transfers.
std::vector<std::thread> receive_threads_;
/// Reference to Plasma Store. This is used to get and put inlined objects in
/// the local object store.
plasma::PlasmaClient &store_client_;
/// Connection pool for reusing outgoing connections to remote object managers.
ConnectionPool connection_pool_;
@ -387,6 +400,12 @@ class ObjectManager : public ObjectManagerInterface {
/// including when the object was last pushed to other object managers.
std::unordered_map<ObjectID, LocalObjectInfo> local_objects_;
/// Set of objects created from inlined data whose locations and/or evictions
/// should not be reported to the GCS. This includes objects that were
/// created from data retrieved from the GCS, since a GCS entry with the
/// inlined data already exists.
std::unordered_set<ObjectID> local_inlined_objects_;
/// This is used as the callback identifier in Pull for
/// SubscribeObjectLocations. We only need one identifier because we never need to
/// subscribe multiple times to the same object during Pull.

View file

@ -30,13 +30,16 @@ class MockServer {
public:
MockServer(boost::asio::io_service &main_service,
const ObjectManagerConfig &object_manager_config,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client)
std::shared_ptr<gcs::AsyncGcsClient> gcs_client,
const std::string &store_name)
: object_manager_acceptor_(
main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
object_manager_socket_(main_service),
gcs_client_(gcs_client),
object_manager_(main_service, object_manager_config,
std::make_shared<ObjectDirectory>(main_service, gcs_client_)) {
std::make_shared<ObjectDirectory>(main_service, gcs_client_),
store_client_) {
RAY_ARROW_CHECK_OK(store_client_.Connect(store_name.c_str()));
RAY_CHECK_OK(RegisterGcs(main_service));
// Start listening for clients.
DoAcceptObjectManager();
@ -88,6 +91,7 @@ class MockServer {
boost::asio::ip::tcp::acceptor object_manager_acceptor_;
boost::asio::ip::tcp::socket object_manager_socket_;
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
plasma::PlasmaClient store_client_;
ObjectManager object_manager_;
};
@ -142,7 +146,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_1.max_receives = max_receives_a;
om_config_1.object_chunk_size = object_chunk_size;
om_config_1.push_timeout_ms = push_timeout_ms;
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1));
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1, store_id_1));
// start second server
gcs_client_2 = std::shared_ptr<gcs::AsyncGcsClient>(
@ -154,7 +158,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_2.max_receives = max_receives_b;
om_config_2.object_chunk_size = object_chunk_size;
om_config_2.push_timeout_ms = push_timeout_ms;
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2, store_id_2));
// connect to stores.
RAY_ARROW_CHECK_OK(client1.Connect(store_id_1));

View file

@ -21,13 +21,16 @@ class MockServer {
public:
MockServer(boost::asio::io_service &main_service,
const ObjectManagerConfig &object_manager_config,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client)
std::shared_ptr<gcs::AsyncGcsClient> gcs_client,
const std::string &store_name)
: object_manager_acceptor_(
main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
object_manager_socket_(main_service),
gcs_client_(gcs_client),
object_manager_(main_service, object_manager_config,
std::make_shared<ObjectDirectory>(main_service, gcs_client_)) {
std::make_shared<ObjectDirectory>(main_service, gcs_client_),
store_client_) {
RAY_ARROW_CHECK_OK(store_client_.Connect(store_name.c_str()));
RAY_CHECK_OK(RegisterGcs(main_service));
// Start listening for clients.
DoAcceptObjectManager();
@ -79,6 +82,7 @@ class MockServer {
boost::asio::ip::tcp::acceptor object_manager_acceptor_;
boost::asio::ip::tcp::socket object_manager_socket_;
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
plasma::PlasmaClient store_client_;
ObjectManager object_manager_;
};
@ -127,7 +131,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_1.max_receives = max_receives;
om_config_1.object_chunk_size = object_chunk_size;
om_config_1.push_timeout_ms = push_timeout_ms;
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1));
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1, store_id_1));
// start second server
gcs_client_2 = std::shared_ptr<gcs::AsyncGcsClient>(
@ -139,7 +143,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_2.max_receives = max_receives;
om_config_2.object_chunk_size = object_chunk_size;
om_config_2.push_timeout_ms = push_timeout_ms;
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2, store_id_2));
// connect to stores.
RAY_ARROW_CHECK_OK(client1.Connect(store_id_1));
@ -291,8 +295,10 @@ class TestObjectManager : public TestObjectManagerBase {
sub_id, object_1,
[this, sub_id, object_1, object_2](
const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &clients, bool created) {
if (!clients.empty()) {
const std::unordered_set<ray::ClientID> &clients, bool inline_object_flag,
const std::vector<uint8_t> inline_object_data,
const std::string inline_object_metadata, bool created) {
if (!clients.empty() || inline_object_flag) {
TestWaitWhileSubscribed(sub_id, object_1, object_2);
}
}));

View file

@ -130,6 +130,11 @@ RAY_CONFIG(int, object_manager_repeated_push_delay_ms, 60000);
/// chunks exceeds the number of available sending threads.
RAY_CONFIG(uint64_t, object_manager_default_chunk_size, 1000000);
/// Maximum size of an inline object (bytes).
/// Inline objects are objects whose data and metadata are inlined in the
/// GCS object table entry, which normally only specifies the object locations.
RAY_CONFIG(int64_t, inline_object_max_size_bytes, 512);
/// Number of workers per process
RAY_CONFIG(int, num_workers_per_process, 1);

View file

@ -42,9 +42,11 @@ namespace raylet {
NodeManager::NodeManager(boost::asio::io_service &io_service,
const NodeManagerConfig &config, ObjectManager &object_manager,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client,
std::shared_ptr<ObjectDirectoryInterface> object_directory)
std::shared_ptr<ObjectDirectoryInterface> object_directory,
plasma::PlasmaClient &store_client)
: io_service_(io_service),
object_manager_(object_manager),
store_client_(store_client),
gcs_client_(std::move(gcs_client)),
object_directory_(std::move(object_directory)),
heartbeat_timer_(io_service),
@ -89,8 +91,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
}));
RAY_CHECK_OK(object_manager_.SubscribeObjDeleted(
[this](const ObjectID &object_id) { HandleObjectMissing(object_id); }));
RAY_ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str()));
}
ray::Status NodeManager::RegisterGcs() {
@ -1190,10 +1190,16 @@ void NodeManager::TreatTaskAsFailedIfLost(const Task &task) {
object_id,
[this, task_marked_as_failed, task](
const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &clients, bool has_been_created) {
const std::unordered_set<ray::ClientID> &clients, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata, bool has_been_created) {
if (!*task_marked_as_failed) {
// Only process the object locations if we haven't already marked the
// task as failed.
if (inline_object_flag) {
// If object is inlined, we already have its data and metadata, so return.
return;
}
if (clients.empty() && has_been_created) {
// The object does not exist on any nodes but has been created
// before, so the object has been lost. Mark the task as failed to
@ -1800,7 +1806,7 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
// Notify the task dependency manager that this object is local.
const auto ready_task_ids = task_dependency_manager_.HandleObjectLocal(object_id);
RAY_LOG(DEBUG) << "Object local " << object_id << ", "
<< " on " << gcs_client_->client_table().GetLocalClientId()
<< "on " << gcs_client_->client_table().GetLocalClientId()
<< ready_task_ids.size() << " tasks ready";
// Transition the tasks whose dependencies are now fulfilled to the ready state.
if (ready_task_ids.size() > 0) {

View file

@ -56,10 +56,12 @@ class NodeManager {
///
/// \param resource_config The initial set of node resources.
/// \param object_manager A reference to the local object manager.
/// \param reference to the local object store.
NodeManager(boost::asio::io_service &io_service, const NodeManagerConfig &config,
ObjectManager &object_manager,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client,
std::shared_ptr<ObjectDirectoryInterface> object_directory_);
std::shared_ptr<ObjectDirectoryInterface> object_directory_,
plasma::PlasmaClient &store_client);
/// Process a new client connection.
///
@ -400,7 +402,7 @@ class NodeManager {
/// A Plasma object store client. This is used exclusively for creating new
/// objects in the object store (e.g., for actor tasks that can't be run
/// because the actor died).
plasma::PlasmaClient store_client_;
plasma::PlasmaClient &store_client_;
/// A client connection to the GCS.
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
/// The object table. This is shared with the object manager.

View file

@ -41,9 +41,10 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
std::shared_ptr<gcs::AsyncGcsClient> gcs_client)
: gcs_client_(gcs_client),
object_directory_(std::make_shared<ObjectDirectory>(main_service, gcs_client_)),
object_manager_(main_service, object_manager_config, object_directory_),
object_manager_(main_service, object_manager_config, object_directory_,
store_client_),
node_manager_(main_service, node_manager_config, object_manager_, gcs_client_,
object_directory_),
object_directory_, store_client_),
socket_name_(socket_name),
acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)),
socket_(main_service),
@ -56,6 +57,8 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
boost::asio::ip::tcp::v4(),
node_manager_config.node_manager_port)),
node_manager_socket_(main_service) {
RAY_ARROW_CHECK_OK(
store_client_.Connect(node_manager_config.store_socket_name.c_str()));
// Start listening for clients.
DoAccept();
DoAcceptObjectManager();

View file

@ -73,6 +73,10 @@ class Raylet {
/// The object table. This is shared between the object manager and node
/// manager.
std::shared_ptr<ObjectDirectoryInterface> object_directory_;
/// Reference to Plasma Store.
/// A connection to the Plasma Store. This is shared between the node manager
/// and the main thread of the object manager.
plasma::PlasmaClient store_client_;
/// Manages client requests for object transfers and availability.
ObjectManager object_manager_;
/// Manages client requests for task submission and execution.

View file

@ -145,8 +145,10 @@ void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) {
created_object_id,
[this, task_id, reconstruction_attempt](
const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &clients, bool created) {
if (clients.empty()) {
const std::unordered_set<ray::ClientID> &clients, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata, bool created) {
if (clients.empty() && !inline_object_flag) {
// The required object no longer exists on any live nodes. Attempt
// reconstruction.
AttemptReconstruction(task_id, object_id, reconstruction_attempt, created);

View file

@ -29,10 +29,10 @@ class MockObjectDirectory : public ObjectDirectoryInterface {
const ObjectID object_id = callback.first;
auto it = locations_.find(object_id);
if (it == locations_.end()) {
callback.second(object_id, std::unordered_set<ray::ClientID>(),
callback.second(object_id, std::unordered_set<ray::ClientID>(), false, {}, "",
/*created=*/false);
} else {
callback.second(object_id, it->second, /*created=*/true);
callback.second(object_id, it->second, false, {}, "", /*created=*/true);
}
}
callbacks_.clear();
@ -60,9 +60,11 @@ class MockObjectDirectory : public ObjectDirectoryInterface {
const OnLocationsFound &));
MOCK_METHOD2(UnsubscribeObjectLocations,
ray::Status(const ray::UniqueID &, const ObjectID &));
MOCK_METHOD3(ReportObjectAdded,
MOCK_METHOD6(ReportObjectAdded,
ray::Status(const ObjectID &, const ClientID &,
const object_manager::protocol::ObjectInfoT &));
const object_manager::protocol::ObjectInfoT &, bool,
const std::vector<uint8_t> &, const std::string &));
MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &));
private:

View file

@ -210,7 +210,7 @@ def test_actor_broadcast(ray_start_cluster):
def test_object_transfer_retry(ray_start_empty_cluster):
cluster = ray_start_empty_cluster
repeated_push_delay = 4
repeated_push_delay = 10
# Force the sending object manager to allow duplicate pushes again sooner.
# Also, force the receiving object manager to retry the Pull sooner.
@ -246,6 +246,7 @@ def test_object_transfer_retry(ray_start_empty_cluster):
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
end_time = time.time()
print(end_time - start_time)
# Make sure that the first time the objects get transferred, it happens
# quickly.
assert end_time - start_time < repeated_push_delay

View file

@ -1059,8 +1059,14 @@ def test_object_transfer_dump(ray_start_cluster):
cluster = ray_start_cluster
num_nodes = 3
# Set the inline object size to 0 to force all objects to be written to
# plasma.
config = json.dumps({"inline_object_max_size_bytes": 0})
for i in range(num_nodes):
cluster.add_node(resources={str(i): 1}, object_store_memory=10**9)
cluster.add_node(
resources={str(i): 1},
object_store_memory=10**9,
_internal_config=config)
ray.init(redis_address=cluster.redis_address)
@ray.remote
@ -2518,6 +2524,56 @@ def test_wait_reconstruction(shutdown_only):
assert len(ready_ids) == 1
def test_inline_objects(shutdown_only):
config = json.dumps({"initial_reconstruction_timeout_milliseconds": 200})
ray.init(num_cpus=1, object_store_memory=10**7, _internal_config=config)
@ray.remote
class Actor(object):
def create_inline_object(self):
return "inline"
def create_non_inline_object(self):
return 10000 * [1]
def get(self):
return
a = Actor.remote()
# Count the number of objects that were successfully inlined.
inlined = 0
for _ in range(100):
inline_object = a.create_inline_object.remote()
ray.get(inline_object)
plasma_id = ray.pyarrow.plasma.ObjectID(inline_object.binary())
ray.worker.global_worker.plasma_client.delete([plasma_id])
# Make sure we can still get an inlined object created by an actor even
# after it has been evicted.
try:
value = ray.get(inline_object)
assert value == "inline"
inlined += 1
except ray.worker.RayTaskError:
pass
# Make sure some objects were inlined. Some of them may not get inlined
# because we evict the object soon after creating it.
assert inlined > 0
# Non-inlined objects are not able to be recreated after eviction.
for _ in range(10):
non_inline_object = a.create_non_inline_object.remote()
ray.get(non_inline_object)
plasma_id = ray.pyarrow.plasma.ObjectID(non_inline_object.binary())
# This while loop is necessary because sometimes the object is still
# there immediately after plasma_client.delete.
while ray.worker.global_worker.plasma_client.contains(plasma_id):
ray.worker.global_worker.plasma_client.delete([plasma_id])
# Objects created by an actor that were evicted and larger than the
# maximum inline object size cannot be retrieved or reconstructed.
with pytest.raises(ray.worker.RayTaskError):
ray.get(non_inline_object) == 10000 * [1]
def test_ray_setproctitle(shutdown_only):
ray.init(num_cpus=2)