Remove num_threads as a parameter. (#1891)

* remove num_threads as a parameter.

* linting.

* add additional checks.

* Invoke TransferCompleted on failures.

* Fix issue with failed Gets on store.

* ray check status of writing object headers.

* fix mac issues.
This commit is contained in:
Melih Elibol 2018-04-14 15:22:59 -07:00 committed by Philipp Moritz
parent 6be73350c6
commit 6a84b1f26e
4 changed files with 26 additions and 27 deletions

View file

@ -20,7 +20,10 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
connection_pool_(),
transfer_queue_(),
num_transfers_send_(0),
num_transfers_receive_(0) {
num_transfers_receive_(0),
num_threads_(config_.max_sends + config_.max_receives) {
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
main_service_ = &main_service;
config_ = config;
store_notification_.SubscribeObjAdded(
@ -42,7 +45,10 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
connection_pool_(),
transfer_queue_(),
num_transfers_send_(0),
num_transfers_receive_(0) {
num_transfers_receive_(0),
num_threads_(config_.max_sends + config_.max_receives) {
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
// TODO(hme) Client ID is never set with this constructor.
main_service_ = &main_service;
config_ = config;
@ -55,13 +61,13 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
ObjectManager::~ObjectManager() {
object_manager_service_->stop();
for (int i = 0; i < config_.num_threads; ++i) {
for (int i = 0; i < num_threads_; ++i) {
io_threads_[i].join();
}
}
void ObjectManager::StartIOService() {
for (int i = 0; i < config_.num_threads; ++i) {
for (int i = 0; i < num_threads_; ++i) {
io_threads_.emplace_back(std::thread(&ObjectManager::IOServiceLoop, this));
}
}
@ -212,7 +218,8 @@ ray::Status ObjectManager::DequeueTransfers() {
ray::Status status = ray::Status::OK();
// Dequeue sends.
while (true) {
if (std::atomic_fetch_add(&num_transfers_send_, 1) <= config_.max_sends) {
int num_transfers_send = std::atomic_fetch_add(&num_transfers_send_, 1);
if (num_transfers_send < config_.max_sends) {
TransferQueue::SendRequest req;
bool exists = transfer_queue_.DequeueSendIfPresent(&req);
if (exists) {
@ -233,7 +240,8 @@ ray::Status ObjectManager::DequeueTransfers() {
}
// Dequeue receives.
while (true) {
if (std::atomic_fetch_add(&num_transfers_receive_, 1) <= config_.max_receives) {
int num_transfers_receive = std::atomic_fetch_add(&num_transfers_receive_, 1);
if (num_transfers_receive < config_.max_receives) {
TransferQueue::ReceiveRequest req;
bool exists = transfer_queue_.DequeueReceiveIfPresent(&req);
if (exists) {
@ -307,6 +315,8 @@ ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id_const,
// transfer again.
RAY_CHECK_OK(
connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::TRANSFER, conn));
store_pool_.ReleaseObjectStore(store_client);
RAY_CHECK_OK(TransferCompleted(TransferQueue::TransferType::SEND));
return ray::Status::IOError(
"Unable to transfer object to requesting plasma manager, object not local.");
}
@ -329,14 +339,7 @@ ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id_const,
ray::Status status =
conn->WriteMessage(object_manager_protocol::MessageType_PushRequest, fbb.GetSize(),
fbb.GetBufferPointer());
if (!status.ok()) {
// push failed.
// TODO(hme): Trash sender.
RAY_CHECK_OK(
connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::TRANSFER, conn));
return status;
}
RAY_CHECK_OK(status);
// TODO(hme): Make this async.
return SendObjectData(conn, context_id, store_client);
}

View file

@ -33,13 +33,11 @@ namespace ray {
struct ObjectManagerConfig {
/// The time in milliseconds to wait before retrying a pull
/// that failed due to client id lookup.
int pull_timeout_ms = 100;
/// Size of thread pool.
int num_threads = 2;
uint pull_timeout_ms = 100;
/// Maximum number of sends allowed.
int max_sends = 20;
int max_sends = 2;
/// Maximum number of receives allowed.
int max_receives = 20;
int max_receives = 2;
// TODO(hme): Implement num retries (to avoid infinite retries).
std::string store_socket_name;
};
@ -190,6 +188,10 @@ class ObjectManager {
std::atomic<int> num_transfers_send_;
std::atomic<int> num_transfers_receive_;
/// Size of thread pool. This is the sum of
/// config_.max_sends and config_.max_receives
const int num_threads_;
/// Cache of locally available objects.
std::unordered_map<ObjectID, ObjectInfoT, UniqueIDHasher> local_objects_;

View file

@ -41,9 +41,7 @@ class MockServer {
DoAcceptObjectManager();
}
~MockServer() {
RAY_CHECK_OK(gcs_client_->client_table().Disconnect());
}
~MockServer() { RAY_CHECK_OK(gcs_client_->client_table().Disconnect()); }
private:
ray::Status RegisterGcs(boost::asio::io_service &io_service) {
@ -120,7 +118,6 @@ class TestObjectManagerBase : public ::testing::Test {
gcs_client_1 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
ObjectManagerConfig om_config_1;
om_config_1.store_socket_name = store_sock_1;
om_config_1.num_threads = 4;
om_config_1.max_sends = 2;
om_config_1.max_receives = 2;
server1.reset(new MockServer(main_service, std::move(object_manager_service_1),
@ -130,7 +127,6 @@ class TestObjectManagerBase : public ::testing::Test {
gcs_client_2 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
ObjectManagerConfig om_config_2;
om_config_2.store_socket_name = store_sock_2;
om_config_2.num_threads = 4;
om_config_2.max_sends = 2;
om_config_2.max_receives = 2;
server2.reset(new MockServer(main_service, std::move(object_manager_service_2),

View file

@ -32,9 +32,7 @@ class MockServer {
DoAcceptObjectManager();
}
~MockServer() {
RAY_CHECK_OK(gcs_client_->client_table().Disconnect());
}
~MockServer() { RAY_CHECK_OK(gcs_client_->client_table().Disconnect()); }
private:
ray::Status RegisterGcs(boost::asio::io_service &io_service) {