Increase timeout for object manager valgrind tests (#4027)

* Avoid second copy of data for inlined objects

* Increase Wait timeout for valgrind tests

* Run object manager tests with and without inlined objects

* Fix test
This commit is contained in:
Stephanie Wang 2019-02-13 18:29:03 -08:00 committed by Philipp Moritz
parent 1fb56a4316
commit fd5b58a827
7 changed files with 56 additions and 35 deletions

View file

@ -115,8 +115,7 @@ void ObjectDirectory::RegisterBackend() {
ray::Status ObjectDirectory::ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) {
const plasma::ObjectBuffer &plasma_buffer) {
RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id << " inline? "
<< inline_object_flag;
// Append the addition entry to the object table.
@ -127,9 +126,12 @@ ray::Status ObjectDirectory::ReportObjectAdded(
data->inline_object_flag = inline_object_flag;
if (inline_object_flag) {
// Add object's data to its GCS entry.
data->inline_object_data.assign(inline_object_data.begin(), inline_object_data.end());
data->inline_object_metadata.assign(inline_object_metadata.begin(),
inline_object_metadata.end());
data->inline_object_data.assign(
plasma_buffer.data->data(),
plasma_buffer.data->data() + plasma_buffer.data->size());
data->inline_object_metadata.assign(
plasma_buffer.metadata->data(),
plasma_buffer.metadata->data() + plasma_buffer.metadata->size());
}
ray::Status status =
gcs_client_->object_table().Append(JobID::nil(), object_id, data, nullptr);

View file

@ -7,6 +7,8 @@
#include <unordered_set>
#include <vector>
#include "plasma/client.h"
#include "ray/gcs/client.h"
#include "ray/id.h"
#include "ray/object_manager/format/object_manager_generated.h"
@ -100,14 +102,13 @@ class ObjectDirectoryInterface {
/// \param client_id The client id corresponding to this node.
/// \param object_info Additional information about the object.
/// \param inline_object_flag Flag specifying whether object is inlined.
/// \param inline_object_data Object data. Only for inlined objects.
/// \param inline_object_metadata Object metadata. Only for inlined objects.
/// \param plasma_buffer Object data and metadata from plasma. This data is
/// only valid for inlined objects (i.e., when inline_object_flag=true).
/// \return Status of whether this method succeeded.
virtual ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) = 0;
const plasma::ObjectBuffer &plasma_buffer) = 0;
/// Report objects removed from this client's store to the object directory.
///
@ -162,8 +163,7 @@ class ObjectDirectory : public ObjectDirectoryInterface {
ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info,
bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) override;
const plasma::ObjectBuffer &plasma_buffer) override;
ray::Status ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) override;

View file

@ -75,29 +75,23 @@ void ObjectManager::HandleObjectAdded(
std::vector<uint8_t> inline_object_data;
std::string inline_object_metadata;
bool inline_object_flag = false;
plasma::ObjectBuffer object_buffer;
if (object_info.data_size <= RayConfig::instance().inline_object_max_size_bytes()) {
// Inline object. Try to get the data from the object store.
plasma::ObjectBuffer object_buffer;
plasma::ObjectID plasma_id = object_id.to_plasma_id();
RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer));
if (object_buffer.data != nullptr) {
// The object exists. Store the object data in the GCS entry.
// The object exists. Set inline_object_flag so that the object data
// will be stored in the GCS entry.
inline_object_flag = true;
inline_object_data.assign(
object_buffer.data->data(),
object_buffer.data->data() + object_buffer.data->size());
inline_object_metadata.assign(
object_buffer.metadata->data(),
object_buffer.metadata->data() + object_buffer.metadata->size());
// Mark this object as inlined, so that if this object is later
// evicted, we do not report it to the GCS.
local_inlined_objects_.insert(object_id);
}
}
RAY_CHECK_OK(object_directory_->ReportObjectAdded(
object_id, client_id_, object_info, inline_object_flag, inline_object_data,
inline_object_metadata));
RAY_CHECK_OK(object_directory_->ReportObjectAdded(object_id, client_id_, object_info,
inline_object_flag, object_buffer));
}
// Handle the unfulfilled_push_requests_ which contains the push request that is not
// completed due to unsatisfied local objects.

View file

@ -7,6 +7,12 @@
#include "ray/object_manager/object_manager.h"
namespace {
std::string store_executable;
int64_t wait_timeout_ms;
bool test_inline_objects = false;
}
namespace ray {
static inline void flushall_redis(void) {
@ -15,8 +21,6 @@ static inline void flushall_redis(void) {
redisFree(context);
}
std::string store_executable;
class MockServer {
public:
MockServer(boost::asio::io_service &main_service,
@ -337,32 +341,40 @@ class TestObjectManager : public TestObjectManagerBase {
}
void NextWaitTest() {
int data_size;
// Set the data size under or over the inline objects limit depending on
// the test configuration.
if (test_inline_objects) {
data_size = RayConfig::instance().inline_object_max_size_bytes() / 2;
} else {
data_size = RayConfig::instance().inline_object_max_size_bytes() * 2;
}
current_wait_test += 1;
switch (current_wait_test) {
case 0: {
// Ensure timeout_ms = 0 is handled correctly.
// Out of 5 objects, we expect 3 ready objects and 2 remaining objects.
TestWait(600, 5, 3, /*timeout_ms=*/0, false, false);
TestWait(data_size, 5, 3, /*timeout_ms=*/0, false, false);
} break;
case 1: {
// Ensure timeout_ms = 1000 is handled correctly.
// Out of 5 objects, we expect 3 ready objects and 2 remaining objects.
TestWait(600, 5, 3, /*timeout_ms=*/1000, false, false);
TestWait(data_size, 5, 3, wait_timeout_ms, false, false);
} break;
case 2: {
// Generate objects locally to ensure local object code-path works properly.
// Out of 5 objects, we expect 3 ready objects and 2 remaining objects.
TestWait(600, 5, 3, 1000, false, /*test_local=*/true);
TestWait(data_size, 5, 3, wait_timeout_ms, false, /*test_local=*/true);
} break;
case 3: {
// Wait on an object that's never registered with GCS to ensure timeout works
// properly.
TestWait(600, /*num_objects=*/5, /*required_objects=*/6, 1000,
TestWait(data_size, /*num_objects=*/5, /*required_objects=*/6, wait_timeout_ms,
/*include_nonexistent=*/true, false);
} break;
case 4: {
// Ensure infinite time code-path works properly.
TestWait(600, 5, 5, /*timeout_ms=*/-1, false, false);
TestWait(data_size, 5, 5, /*timeout_ms=*/-1, false, false);
} break;
}
}
@ -475,6 +487,7 @@ class TestObjectManager : public TestObjectManagerBase {
};
TEST_F(TestObjectManager, StartTestObjectManager) {
// TODO: Break this test suite into unit tests.
auto AsyncStartTests = main_service.wrap([this]() { WaitConnections(); });
AsyncStartTests();
main_service.run();
@ -484,6 +497,11 @@ TEST_F(TestObjectManager, StartTestObjectManager) {
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
ray::store_executable = std::string(argv[1]);
store_executable = std::string(argv[1]);
wait_timeout_ms = std::stoi(std::string(argv[2]));
// If a third argument is provided, then test with inline objects.
if (argc > 3) {
test_inline_objects = true;
}
return RUN_ALL_TESTS();
}

View file

@ -60,10 +60,10 @@ class MockObjectDirectory : public ObjectDirectoryInterface {
const OnLocationsFound &));
MOCK_METHOD2(UnsubscribeObjectLocations,
ray::Status(const ray::UniqueID &, const ObjectID &));
MOCK_METHOD6(ReportObjectAdded,
MOCK_METHOD5(ReportObjectAdded,
ray::Status(const ObjectID &, const ClientID &,
const object_manager::protocol::ObjectInfoT &, bool,
const std::vector<uint8_t> &, const std::string &));
const plasma::ObjectBuffer &));
MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &));

View file

@ -45,7 +45,10 @@ sleep 1s
# Run tests.
$CORE_DIR/src/ray/object_manager/object_manager_stress_test $STORE_EXEC
sleep 1s
$CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC
# Use timeout=1000ms for the Wait tests.
$CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 1000
# Run tests again with inlined objects.
$CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 1000 true
$REDIS_DIR/redis-cli -p 6379 shutdown
sleep 1s

View file

@ -48,8 +48,12 @@ sleep 1s
${REDIS_SERVER} --loglevel warning ${LOAD_MODULE_ARGS} --port 6379 &
sleep 1s
# Run tests.
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC
# Run tests. Use timeout=10000ms for the Wait tests since tests run slower
# in valgrind.
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 10000
sleep 1s
# Run tests again with inlined objects.
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 10000 true
sleep 1s
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_stress_test $STORE_EXEC
$REDIS_DIR/redis-cli -p 6379 shutdown