From fd5b58a827a9e9fb043a421263b46beafa9b4210 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Wed, 13 Feb 2019 18:29:03 -0800 Subject: [PATCH] Increase timeout for object manager valgrind tests (#4027) * Avoid second copy of data for inlined objects * Increase Wait timeout for valgrind tests * Run object manager tests with and without inlined objects * Fix test --- src/ray/object_manager/object_directory.cc | 12 ++++--- src/ray/object_manager/object_directory.h | 12 +++---- src/ray/object_manager/object_manager.cc | 16 +++------ .../test/object_manager_test.cc | 34 ++++++++++++++----- src/ray/raylet/reconstruction_policy_test.cc | 4 +-- src/ray/test/run_object_manager_tests.sh | 5 ++- src/ray/test/run_object_manager_valgrind.sh | 8 +++-- 7 files changed, 56 insertions(+), 35 deletions(-) diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 5aa39a03b..6a2535025 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -115,8 +115,7 @@ void ObjectDirectory::RegisterBackend() { ray::Status ObjectDirectory::ReportObjectAdded( const ObjectID &object_id, const ClientID &client_id, const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag, - const std::vector &inline_object_data, - const std::string &inline_object_metadata) { + const plasma::ObjectBuffer &plasma_buffer) { RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id << " inline? " << inline_object_flag; // Append the addition entry to the object table. @@ -127,9 +126,12 @@ ray::Status ObjectDirectory::ReportObjectAdded( data->inline_object_flag = inline_object_flag; if (inline_object_flag) { // Add object's data to its GCS entry. - data->inline_object_data.assign(inline_object_data.begin(), inline_object_data.end()); - data->inline_object_metadata.assign(inline_object_metadata.begin(), - inline_object_metadata.end()); + data->inline_object_data.assign( + plasma_buffer.data->data(), + plasma_buffer.data->data() + plasma_buffer.data->size()); + data->inline_object_metadata.assign( + plasma_buffer.metadata->data(), + plasma_buffer.metadata->data() + plasma_buffer.metadata->size()); } ray::Status status = gcs_client_->object_table().Append(JobID::nil(), object_id, data, nullptr); diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index 3506f30bc..f1c04de0d 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -7,6 +7,8 @@ #include #include +#include "plasma/client.h" + #include "ray/gcs/client.h" #include "ray/id.h" #include "ray/object_manager/format/object_manager_generated.h" @@ -100,14 +102,13 @@ class ObjectDirectoryInterface { /// \param client_id The client id corresponding to this node. /// \param object_info Additional information about the object. /// \param inline_object_flag Flag specifying whether object is inlined. - /// \param inline_object_data Object data. Only for inlined objects. - /// \param inline_object_metadata Object metadata. Only for inlined objects. + /// \param plasma_buffer Object data and metadata from plasma. This data is + /// only valid for inlined objects (i.e., when inline_object_flag=true). /// \return Status of whether this method succeeded. virtual ray::Status ReportObjectAdded( const ObjectID &object_id, const ClientID &client_id, const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag, - const std::vector &inline_object_data, - const std::string &inline_object_metadata) = 0; + const plasma::ObjectBuffer &plasma_buffer) = 0; /// Report objects removed from this client's store to the object directory. /// @@ -162,8 +163,7 @@ class ObjectDirectory : public ObjectDirectoryInterface { ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id, const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag, - const std::vector &inline_object_data, - const std::string &inline_object_metadata) override; + const plasma::ObjectBuffer &plasma_buffer) override; ray::Status ReportObjectRemoved(const ObjectID &object_id, const ClientID &client_id) override; diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 241d1bf86..3f82f3baf 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -75,29 +75,23 @@ void ObjectManager::HandleObjectAdded( std::vector inline_object_data; std::string inline_object_metadata; bool inline_object_flag = false; + plasma::ObjectBuffer object_buffer; if (object_info.data_size <= RayConfig::instance().inline_object_max_size_bytes()) { // Inline object. Try to get the data from the object store. - plasma::ObjectBuffer object_buffer; plasma::ObjectID plasma_id = object_id.to_plasma_id(); RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer)); if (object_buffer.data != nullptr) { - // The object exists. Store the object data in the GCS entry. + // The object exists. Set inline_object_flag so that the object data + // will be stored in the GCS entry. inline_object_flag = true; - inline_object_data.assign( - object_buffer.data->data(), - object_buffer.data->data() + object_buffer.data->size()); - inline_object_metadata.assign( - object_buffer.metadata->data(), - object_buffer.metadata->data() + object_buffer.metadata->size()); // Mark this object as inlined, so that if this object is later // evicted, we do not report it to the GCS. local_inlined_objects_.insert(object_id); } } - RAY_CHECK_OK(object_directory_->ReportObjectAdded( - object_id, client_id_, object_info, inline_object_flag, inline_object_data, - inline_object_metadata)); + RAY_CHECK_OK(object_directory_->ReportObjectAdded(object_id, client_id_, object_info, + inline_object_flag, object_buffer)); } // Handle the unfulfilled_push_requests_ which contains the push request that is not // completed due to unsatisfied local objects. diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index a6d79d608..3da9c6408 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -7,6 +7,12 @@ #include "ray/object_manager/object_manager.h" +namespace { +std::string store_executable; +int64_t wait_timeout_ms; +bool test_inline_objects = false; +} + namespace ray { static inline void flushall_redis(void) { @@ -15,8 +21,6 @@ static inline void flushall_redis(void) { redisFree(context); } -std::string store_executable; - class MockServer { public: MockServer(boost::asio::io_service &main_service, @@ -337,32 +341,40 @@ class TestObjectManager : public TestObjectManagerBase { } void NextWaitTest() { + int data_size; + // Set the data size under or over the inline objects limit depending on + // the test configuration. + if (test_inline_objects) { + data_size = RayConfig::instance().inline_object_max_size_bytes() / 2; + } else { + data_size = RayConfig::instance().inline_object_max_size_bytes() * 2; + } current_wait_test += 1; switch (current_wait_test) { case 0: { // Ensure timeout_ms = 0 is handled correctly. // Out of 5 objects, we expect 3 ready objects and 2 remaining objects. - TestWait(600, 5, 3, /*timeout_ms=*/0, false, false); + TestWait(data_size, 5, 3, /*timeout_ms=*/0, false, false); } break; case 1: { // Ensure timeout_ms = 1000 is handled correctly. // Out of 5 objects, we expect 3 ready objects and 2 remaining objects. - TestWait(600, 5, 3, /*timeout_ms=*/1000, false, false); + TestWait(data_size, 5, 3, wait_timeout_ms, false, false); } break; case 2: { // Generate objects locally to ensure local object code-path works properly. // Out of 5 objects, we expect 3 ready objects and 2 remaining objects. - TestWait(600, 5, 3, 1000, false, /*test_local=*/true); + TestWait(data_size, 5, 3, wait_timeout_ms, false, /*test_local=*/true); } break; case 3: { // Wait on an object that's never registered with GCS to ensure timeout works // properly. - TestWait(600, /*num_objects=*/5, /*required_objects=*/6, 1000, + TestWait(data_size, /*num_objects=*/5, /*required_objects=*/6, wait_timeout_ms, /*include_nonexistent=*/true, false); } break; case 4: { // Ensure infinite time code-path works properly. - TestWait(600, 5, 5, /*timeout_ms=*/-1, false, false); + TestWait(data_size, 5, 5, /*timeout_ms=*/-1, false, false); } break; } } @@ -475,6 +487,7 @@ class TestObjectManager : public TestObjectManagerBase { }; TEST_F(TestObjectManager, StartTestObjectManager) { + // TODO: Break this test suite into unit tests. auto AsyncStartTests = main_service.wrap([this]() { WaitConnections(); }); AsyncStartTests(); main_service.run(); @@ -484,6 +497,11 @@ TEST_F(TestObjectManager, StartTestObjectManager) { int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); - ray::store_executable = std::string(argv[1]); + store_executable = std::string(argv[1]); + wait_timeout_ms = std::stoi(std::string(argv[2])); + // If a third argument is provided, then test with inline objects. + if (argc > 3) { + test_inline_objects = true; + } return RUN_ALL_TESTS(); } diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index 9adbc1e89..5d4756389 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -60,10 +60,10 @@ class MockObjectDirectory : public ObjectDirectoryInterface { const OnLocationsFound &)); MOCK_METHOD2(UnsubscribeObjectLocations, ray::Status(const ray::UniqueID &, const ObjectID &)); - MOCK_METHOD6(ReportObjectAdded, + MOCK_METHOD5(ReportObjectAdded, ray::Status(const ObjectID &, const ClientID &, const object_manager::protocol::ObjectInfoT &, bool, - const std::vector &, const std::string &)); + const plasma::ObjectBuffer &)); MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &)); diff --git a/src/ray/test/run_object_manager_tests.sh b/src/ray/test/run_object_manager_tests.sh index d0363cadf..09f9e2cc8 100644 --- a/src/ray/test/run_object_manager_tests.sh +++ b/src/ray/test/run_object_manager_tests.sh @@ -45,7 +45,10 @@ sleep 1s # Run tests. $CORE_DIR/src/ray/object_manager/object_manager_stress_test $STORE_EXEC sleep 1s -$CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC +# Use timeout=1000ms for the Wait tests. +$CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 1000 +# Run tests again with inlined objects. +$CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 1000 true $REDIS_DIR/redis-cli -p 6379 shutdown sleep 1s diff --git a/src/ray/test/run_object_manager_valgrind.sh b/src/ray/test/run_object_manager_valgrind.sh index 943e629a3..2f73abf9b 100644 --- a/src/ray/test/run_object_manager_valgrind.sh +++ b/src/ray/test/run_object_manager_valgrind.sh @@ -48,8 +48,12 @@ sleep 1s ${REDIS_SERVER} --loglevel warning ${LOAD_MODULE_ARGS} --port 6379 & sleep 1s -# Run tests. -$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC +# Run tests. Use timeout=10000ms for the Wait tests since tests run slower +# in valgrind. +$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 10000 +sleep 1s +# Run tests again with inlined objects. +$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 10000 true sleep 1s $VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_stress_test $STORE_EXEC $REDIS_DIR/redis-cli -p 6379 shutdown