mirror of
https://github.com/vale981/ray
synced 2025-03-05 18:11:42 -05:00
Convert UT datastructures in tests (#1203)
* bind_ipc_sock_retry returns std::string * snprintf -> std::snprintf * Fix formatting * Use stringstream instead of snprintf * Fix typo
This commit is contained in:
parent
e798a652bc
commit
9a6a056609
3 changed files with 39 additions and 36 deletions
|
@ -2,12 +2,14 @@
|
|||
#define TEST_COMMON_H
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#include <cstdio>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "common.h"
|
||||
#include "io.h"
|
||||
#include "hiredis/hiredis.h"
|
||||
#include "utstring.h"
|
||||
#include "state/redis.h"
|
||||
|
||||
#ifndef _WIN32
|
||||
|
@ -16,14 +18,17 @@ extern int usleep(useconds_t usec);
|
|||
#endif
|
||||
|
||||
/* I/O helper methods to retry binding to sockets. */
|
||||
static inline UT_string *bind_ipc_sock_retry(const char *socket_name_format,
|
||||
int *fd) {
|
||||
UT_string *socket_name = NULL;
|
||||
static inline std::string bind_ipc_sock_retry(const char *socket_name_format,
|
||||
int *fd) {
|
||||
std::string socket_name;
|
||||
for (int num_retries = 0; num_retries < 5; ++num_retries) {
|
||||
LOG_INFO("trying to find plasma socket (attempt %d)", num_retries);
|
||||
utstring_renew(socket_name);
|
||||
utstring_printf(socket_name, socket_name_format, rand());
|
||||
*fd = bind_ipc_sock(utstring_body(socket_name), true);
|
||||
size_t size = std::snprintf(nullptr, 0, socket_name_format, rand()) + 1;
|
||||
char socket_name_c_str[size];
|
||||
std::snprintf(socket_name_c_str, size, socket_name_format, rand());
|
||||
socket_name = std::string(socket_name_c_str);
|
||||
|
||||
*fd = bind_ipc_sock(socket_name.c_str(), true);
|
||||
if (*fd < 0) {
|
||||
/* Sleep for 100ms. */
|
||||
usleep(100000);
|
||||
|
|
|
@ -7,6 +7,9 @@
|
|||
#include <sys/socket.h>
|
||||
#include <sys/wait.h>
|
||||
|
||||
#include <cstdio>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include "common.h"
|
||||
|
@ -14,7 +17,6 @@
|
|||
#include "test/example_task.h"
|
||||
#include "event_loop.h"
|
||||
#include "io.h"
|
||||
#include "utstring.h"
|
||||
#include "task.h"
|
||||
#include "state/object_table.h"
|
||||
#include "state/task_table.h"
|
||||
|
@ -83,31 +85,30 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers,
|
|||
memset(mock, 0, sizeof(LocalSchedulerMock));
|
||||
mock->loop = event_loop_create();
|
||||
/* Bind to the local scheduler port and initialize the local scheduler. */
|
||||
UT_string *plasma_manager_socket_name = bind_ipc_sock_retry(
|
||||
std::string plasma_manager_socket_name = bind_ipc_sock_retry(
|
||||
plasma_manager_socket_name_format, &mock->plasma_manager_fd);
|
||||
mock->plasma_store_fd =
|
||||
connect_ipc_sock_retry(plasma_store_socket_name, 5, 100);
|
||||
UT_string *local_scheduler_socket_name = bind_ipc_sock_retry(
|
||||
std::string local_scheduler_socket_name = bind_ipc_sock_retry(
|
||||
local_scheduler_socket_name_format, &mock->local_scheduler_fd);
|
||||
CHECK(mock->plasma_store_fd >= 0 && mock->local_scheduler_fd >= 0);
|
||||
|
||||
UT_string *worker_command;
|
||||
utstring_new(worker_command);
|
||||
utstring_printf(worker_command,
|
||||
"python ../../../python/ray/workers/default_worker.py "
|
||||
"--node-ip-address=%s --object-store-name=%s "
|
||||
"--object-store-manager-name=%s --local-scheduler-name=%s "
|
||||
"--redis-address=%s:%d",
|
||||
node_ip_address, plasma_store_socket_name,
|
||||
utstring_body(plasma_manager_socket_name),
|
||||
utstring_body(local_scheduler_socket_name), redis_addr,
|
||||
redis_port);
|
||||
/* Construct worker command */
|
||||
std::stringstream worker_command_ss;
|
||||
worker_command_ss << "python ../../../python/ray/workers/default_worker.py"
|
||||
<< " --node-ip-address=" << node_ip_address
|
||||
<< " --object-store-name=" << plasma_store_socket_name
|
||||
<< " --object-store-manager-name="
|
||||
<< plasma_manager_socket_name
|
||||
<< " --local-scheduler-name=" << local_scheduler_socket_name
|
||||
<< " --redis-address=" << redis_addr << ":" << redis_port;
|
||||
std::string worker_command = worker_command_ss.str();
|
||||
|
||||
mock->local_scheduler_state = LocalSchedulerState_init(
|
||||
"127.0.0.1", mock->loop, redis_addr, redis_port,
|
||||
utstring_body(local_scheduler_socket_name), plasma_store_socket_name,
|
||||
utstring_body(plasma_manager_socket_name), NULL, false,
|
||||
static_resource_conf, utstring_body(worker_command), num_workers);
|
||||
local_scheduler_socket_name.c_str(), plasma_store_socket_name,
|
||||
plasma_manager_socket_name.c_str(), NULL, false, static_resource_conf,
|
||||
worker_command.c_str(), num_workers);
|
||||
|
||||
/* Accept the workers as clients to the plasma manager. */
|
||||
for (int i = 0; i < num_workers; ++i) {
|
||||
|
@ -123,16 +124,13 @@ LocalSchedulerMock *LocalSchedulerMock_init(int num_workers,
|
|||
std::thread(register_clients, num_mock_workers, mock);
|
||||
|
||||
for (int i = 0; i < num_mock_workers; ++i) {
|
||||
mock->conns[i] = LocalSchedulerConnection_init(
|
||||
utstring_body(local_scheduler_socket_name), NIL_WORKER_ID, NIL_ACTOR_ID,
|
||||
true, 0);
|
||||
mock->conns[i] =
|
||||
LocalSchedulerConnection_init(local_scheduler_socket_name.c_str(),
|
||||
NIL_WORKER_ID, NIL_ACTOR_ID, true, 0);
|
||||
}
|
||||
|
||||
background_thread.join();
|
||||
|
||||
utstring_free(worker_command);
|
||||
utstring_free(plasma_manager_socket_name);
|
||||
utstring_free(local_scheduler_socket_name);
|
||||
return mock;
|
||||
}
|
||||
|
||||
|
|
|
@ -7,11 +7,12 @@
|
|||
#include <sys/socket.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "common.h"
|
||||
#include "test/test_common.h"
|
||||
#include "event_loop.h"
|
||||
#include "io.h"
|
||||
#include "utstring.h"
|
||||
|
||||
#include "plasma/plasma.h"
|
||||
#include "plasma/client.h"
|
||||
|
@ -62,13 +63,13 @@ plasma_mock *init_plasma_mock(plasma_mock *remote_mock) {
|
|||
/* Start listening on all the ports and initiate the local plasma manager. */
|
||||
mock->port = bind_inet_sock_retry(&mock->manager_remote_fd);
|
||||
mock->local_store = connect_ipc_sock_retry(plasma_store_socket_name, 5, 100);
|
||||
UT_string *manager_socket_name = bind_ipc_sock_retry(
|
||||
std::string manager_socket_name = bind_ipc_sock_retry(
|
||||
plasma_manager_socket_name_format, &mock->manager_local_fd);
|
||||
|
||||
CHECK(mock->manager_local_fd >= 0 && mock->local_store >= 0);
|
||||
|
||||
mock->state = PlasmaManagerState_init(plasma_store_socket_name,
|
||||
utstring_body(manager_socket_name),
|
||||
manager_socket_name.c_str(),
|
||||
manager_addr, mock->port, NULL, 0);
|
||||
mock->loop = get_event_loop(mock->state);
|
||||
/* Accept a connection from the local manager on the remote manager. */
|
||||
|
@ -86,12 +87,11 @@ plasma_mock *init_plasma_mock(plasma_mock *remote_mock) {
|
|||
/* Connect a new client to the local plasma manager and mock a request to an
|
||||
* object. */
|
||||
mock->plasma_client = new plasma::PlasmaClient();
|
||||
ARROW_CHECK_OK(mock->plasma_client->Connect(
|
||||
plasma_store_socket_name, utstring_body(manager_socket_name), 0));
|
||||
ARROW_CHECK_OK(mock->plasma_client->Connect(plasma_store_socket_name,
|
||||
manager_socket_name.c_str(), 0));
|
||||
wait_for_pollin(mock->manager_local_fd);
|
||||
mock->client_conn = ClientConnection_listen(
|
||||
mock->loop, mock->manager_local_fd, mock->state, 0);
|
||||
utstring_free(manager_socket_name);
|
||||
return mock;
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue