diff --git a/src/common/test/test_common.h b/src/common/test/test_common.h index 9d482e91d..6f6d0da53 100644 --- a/src/common/test/test_common.h +++ b/src/common/test/test_common.h @@ -2,12 +2,14 @@ #define TEST_COMMON_H #include + +#include +#include #include #include "common.h" #include "io.h" #include "hiredis/hiredis.h" -#include "utstring.h" #include "state/redis.h" #ifndef _WIN32 @@ -16,14 +18,17 @@ extern int usleep(useconds_t usec); #endif /* I/O helper methods to retry binding to sockets. */ -static inline UT_string *bind_ipc_sock_retry(const char *socket_name_format, - int *fd) { - UT_string *socket_name = NULL; +static inline std::string bind_ipc_sock_retry(const char *socket_name_format, + int *fd) { + std::string socket_name; for (int num_retries = 0; num_retries < 5; ++num_retries) { LOG_INFO("trying to find plasma socket (attempt %d)", num_retries); - utstring_renew(socket_name); - utstring_printf(socket_name, socket_name_format, rand()); - *fd = bind_ipc_sock(utstring_body(socket_name), true); + size_t size = std::snprintf(nullptr, 0, socket_name_format, rand()) + 1; + char socket_name_c_str[size]; + std::snprintf(socket_name_c_str, size, socket_name_format, rand()); + socket_name = std::string(socket_name_c_str); + + *fd = bind_ipc_sock(socket_name.c_str(), true); if (*fd < 0) { /* Sleep for 100ms. */ usleep(100000); diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index 23d2dd4c4..1919deefb 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -7,6 +7,9 @@ #include #include +#include +#include +#include #include #include "common.h" @@ -14,7 +17,6 @@ #include "test/example_task.h" #include "event_loop.h" #include "io.h" -#include "utstring.h" #include "task.h" #include "state/object_table.h" #include "state/task_table.h" @@ -83,31 +85,30 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers, memset(mock, 0, sizeof(LocalSchedulerMock)); mock->loop = event_loop_create(); /* Bind to the local scheduler port and initialize the local scheduler. */ - UT_string *plasma_manager_socket_name = bind_ipc_sock_retry( + std::string plasma_manager_socket_name = bind_ipc_sock_retry( plasma_manager_socket_name_format, &mock->plasma_manager_fd); mock->plasma_store_fd = connect_ipc_sock_retry(plasma_store_socket_name, 5, 100); - UT_string *local_scheduler_socket_name = bind_ipc_sock_retry( + std::string local_scheduler_socket_name = bind_ipc_sock_retry( local_scheduler_socket_name_format, &mock->local_scheduler_fd); CHECK(mock->plasma_store_fd >= 0 && mock->local_scheduler_fd >= 0); - UT_string *worker_command; - utstring_new(worker_command); - utstring_printf(worker_command, - "python ../../../python/ray/workers/default_worker.py " - "--node-ip-address=%s --object-store-name=%s " - "--object-store-manager-name=%s --local-scheduler-name=%s " - "--redis-address=%s:%d", - node_ip_address, plasma_store_socket_name, - utstring_body(plasma_manager_socket_name), - utstring_body(local_scheduler_socket_name), redis_addr, - redis_port); + /* Construct worker command */ + std::stringstream worker_command_ss; + worker_command_ss << "python ../../../python/ray/workers/default_worker.py" + << " --node-ip-address=" << node_ip_address + << " --object-store-name=" << plasma_store_socket_name + << " --object-store-manager-name=" + << plasma_manager_socket_name + << " --local-scheduler-name=" << local_scheduler_socket_name + << " --redis-address=" << redis_addr << ":" << redis_port; + std::string worker_command = worker_command_ss.str(); mock->local_scheduler_state = LocalSchedulerState_init( "127.0.0.1", mock->loop, redis_addr, redis_port, - utstring_body(local_scheduler_socket_name), plasma_store_socket_name, - utstring_body(plasma_manager_socket_name), NULL, false, - static_resource_conf, utstring_body(worker_command), num_workers); + local_scheduler_socket_name.c_str(), plasma_store_socket_name, + plasma_manager_socket_name.c_str(), NULL, false, static_resource_conf, + worker_command.c_str(), num_workers); /* Accept the workers as clients to the plasma manager. */ for (int i = 0; i < num_workers; ++i) { @@ -123,16 +124,13 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers, std::thread(register_clients, num_mock_workers, mock); for (int i = 0; i < num_mock_workers; ++i) { - mock->conns[i] = LocalSchedulerConnection_init( - utstring_body(local_scheduler_socket_name), NIL_WORKER_ID, NIL_ACTOR_ID, - true, 0); + mock->conns[i] = + LocalSchedulerConnection_init(local_scheduler_socket_name.c_str(), + NIL_WORKER_ID, NIL_ACTOR_ID, true, 0); } background_thread.join(); - utstring_free(worker_command); - utstring_free(plasma_manager_socket_name); - utstring_free(local_scheduler_socket_name); return mock; } diff --git a/src/plasma/test/manager_tests.cc b/src/plasma/test/manager_tests.cc index 428c45143..5cf8c06c7 100644 --- a/src/plasma/test/manager_tests.cc +++ b/src/plasma/test/manager_tests.cc @@ -7,11 +7,12 @@ #include #include +#include + #include "common.h" #include "test/test_common.h" #include "event_loop.h" #include "io.h" -#include "utstring.h" #include "plasma/plasma.h" #include "plasma/client.h" @@ -62,13 +63,13 @@ plasma_mock *init_plasma_mock(plasma_mock *remote_mock) { /* Start listening on all the ports and initiate the local plasma manager. */ mock->port = bind_inet_sock_retry(&mock->manager_remote_fd); mock->local_store = connect_ipc_sock_retry(plasma_store_socket_name, 5, 100); - UT_string *manager_socket_name = bind_ipc_sock_retry( + std::string manager_socket_name = bind_ipc_sock_retry( plasma_manager_socket_name_format, &mock->manager_local_fd); CHECK(mock->manager_local_fd >= 0 && mock->local_store >= 0); mock->state = PlasmaManagerState_init(plasma_store_socket_name, - utstring_body(manager_socket_name), + manager_socket_name.c_str(), manager_addr, mock->port, NULL, 0); mock->loop = get_event_loop(mock->state); /* Accept a connection from the local manager on the remote manager. */ @@ -86,12 +87,11 @@ plasma_mock *init_plasma_mock(plasma_mock *remote_mock) { /* Connect a new client to the local plasma manager and mock a request to an * object. */ mock->plasma_client = new plasma::PlasmaClient(); - ARROW_CHECK_OK(mock->plasma_client->Connect( - plasma_store_socket_name, utstring_body(manager_socket_name), 0)); + ARROW_CHECK_OK(mock->plasma_client->Connect(plasma_store_socket_name, + manager_socket_name.c_str(), 0)); wait_for_pollin(mock->manager_local_fd); mock->client_conn = ClientConnection_listen( mock->loop, mock->manager_local_fd, mock->state, 0); - utstring_free(manager_socket_name); return mock; }