Global scheduler - per-task transfer-aware policy (#145)

* global scheduler with object transfer cost awareness -- upstream rebase

* debugging global scheduler: multiple subscriptions

* global scheduler: utarray push bug fix; tasks change state to SCHEDULED

* change global scheduler test to be an integraton test

* unit and integration tests are passing for global scheduler

* improve global scheduler test: break up into several

* global scheduler checkpoint: fix photon object id bug in test

* test with timesync between object and task notifications; TODO: handle OoO object+task notifications in GS

* fallback to base policy if no object dependencies are cached (may happen due to OoO object+task notification arrivals

* clean up printfs; handle a missing LS in LS cache

* Minor changes to Python test and factor out some common code.

* refactoring handle task waiting

* addressing comments

* log_info -> log_debug

* Change object ID printing.

* PRId64 merge

* Python 3 fix.

* PRId64.

* Python 3 fix.

* resurrect differentiation between no args and missing object info; spacing

* Valgrind fix.

* Run all global scheduler tests in valgrind.

* clang format

* Comments and documentation changes.

* Minor cleanups.

* fix whitespace

* Fix.

* Documentation fix.
This commit is contained in:
Alexey Tumanov 2016-12-22 03:11:46 -08:00 committed by Robert Nishihara
parent 6cd02d71f8
commit 46a887039e
17 changed files with 498 additions and 120 deletions

View file

@ -58,6 +58,7 @@ popd
cp "$PLASMA_DIR/build/plasma_store" "$PYTHON_PLASMA_DIR/"
cp "$PLASMA_DIR/build/plasma_manager" "$PYTHON_PLASMA_DIR/"
cp "$PLASMA_DIR/plasma/plasma.py" "$PYTHON_PLASMA_DIR/"
cp "$PLASMA_DIR/plasma/utils.py" "$PYTHON_PLASMA_DIR/"
cp "$PLASMA_DIR/plasma/libplasma.so" "$PYTHON_PLASMA_DIR/"
pushd "$PHOTON_DIR"

View file

@ -41,16 +41,17 @@ bool db_client_ids_equal(db_client_id first_id, db_client_id second_id) {
return UNIQUE_ID_EQ(first_id, second_id);
}
char *sha1_to_hex(const unsigned char *sha1, char *buffer) {
char *object_id_to_string(object_id obj_id, char *id_string, int id_length) {
CHECK(id_length >= ID_STRING_SIZE);
static const char hex[] = "0123456789abcdef";
char *buf = buffer;
char *buf = id_string;
for (int i = 0; i < UNIQUE_ID_SIZE; i++) {
unsigned int val = *sha1++;
unsigned int val = obj_id.id[i];
*buf++ = hex[val >> 4];
*buf++ = hex[val & 0xf];
}
*buf = '\0';
return buffer;
return id_string;
}

View file

@ -30,6 +30,8 @@
#define RAY_COMMON_LOG_LEVEL RAY_COMMON_INFO
#endif
#define UNUSED(x) ((void) (x))
/**
* Macros to enable each level of Ray logging statements depending on the
* current logging level. */
@ -123,15 +125,25 @@ extern const unique_id NIL_ID;
/* Generate a globally unique ID. */
unique_id globally_unique_id(void);
/* Convert a 20 byte sha1 hash to a hexdecimal string. This function assumes
* that buffer points to an already allocated char array of size 2 *
* UNIQUE_ID_SIZE + 1 */
char *sha1_to_hex(const unsigned char *sha1, char *buffer);
#define NIL_OBJECT_ID NIL_ID
typedef unique_id object_id;
#define ID_STRING_SIZE (2 * UNIQUE_ID_SIZE + 1)
/**
* Convert an object ID to a hexdecimal string. This function assumes that
* buffer points to an already allocated char array of size ID_STRING_SIZE. And
* it writes a null-terminated hex-formatted string to id_string.
*
* @param obj_id The object ID to convert to a string.
* @param id_string A buffer to write the string to. It is assumed that this is
* managed by the caller and is sufficiently long to store the object ID
* string.
* @param id_length The length of the id_string buffer.
*/
char *object_id_to_string(object_id obj_id, char *id_string, int id_length);
/**
* Compare two object IDs.
*

View file

@ -122,9 +122,8 @@ static long PyObjectID_hash(PyObjectID *self) {
}
static PyObject *PyObjectID_repr(PyObjectID *self) {
enum { hex_length = 2 * UNIQUE_ID_SIZE + 1 };
char hex_id[hex_length];
sha1_to_hex(self->object_id.id, hex_id);
char hex_id[ID_STRING_SIZE];
object_id_to_string(self->object_id, hex_id, ID_STRING_SIZE);
UT_string *repr;
utstring_new(repr);
utstring_printf(repr, "ObjectID(%s)", hex_id);

View file

@ -626,9 +626,13 @@ object_id parse_subscribe_to_notifications_payload(
int num_managers = (length - sizeof(object_id) - 1 - sizeof(data_size_value) -
1 - strlen("MANAGERS")) /
(1 + sizeof(db_client_id));
CHECK(length ==
sizeof(object_id) + 1 + sizeof(data_size_value) + 1 +
strlen("MANAGERS") + num_managers * (1 + sizeof(db_client_id)));
int64_t rval = sizeof(object_id) + 1 + sizeof(data_size_value) + 1 +
strlen("MANAGERS") + num_managers * (1 + sizeof(db_client_id));
CHECKM(length == rval,
"length mismatch: num_managers = %d, length = %d, rval = %" PRId64,
num_managers, length, rval);
CHECK(num_managers > 0);
object_id obj_id;
/* Track our current offset in the payload. */

View file

@ -276,19 +276,20 @@ void free_task_spec(task_spec *spec) {
void print_task(task_spec *spec, UT_string *output) {
/* For converting an id to hex, which has double the number
* of bytes compared to the id (+ 1 byte for '\0'). */
static char hex[2 * UNIQUE_ID_SIZE + 1];
static char hex[ID_STRING_SIZE];
/* Print function id. */
sha1_to_hex(&task_function(spec).id[0], &hex[0]);
object_id_to_string((object_id) task_function(spec), &hex[0], ID_STRING_SIZE);
utstring_printf(output, "fun %s ", &hex[0]);
/* Print arguments. */
for (int i = 0; i < task_num_args(spec); ++i) {
sha1_to_hex(&task_arg_id(spec, i).id[0], &hex[0]);
object_id_to_string((object_id) task_arg_id(spec, i), &hex[0],
ID_STRING_SIZE);
utstring_printf(output, " id:%d %s", i, &hex[0]);
}
/* Print return ids. */
for (int i = 0; i < task_num_returns(spec); ++i) {
object_id object_id = task_return(spec, i);
sha1_to_hex(&object_id.id[0], &hex[0]);
object_id obj_id = task_return(spec, i);
object_id_to_string(obj_id, &hex[0], ID_STRING_SIZE);
utstring_printf(output, " ret:%d %s", i, &hex[0]);
}
}

View file

@ -5,9 +5,9 @@
SUITE(common_tests);
TEST sha1_test(void) {
static char hex[2 * UNIQUE_ID_SIZE + 1];
static char hex[ID_STRING_SIZE];
unique_id uid = globally_unique_id();
sha1_to_hex(&uid.id[0], &hex[0]);
object_id_to_string((object_id) uid, &hex[0], ID_STRING_SIZE);
PASS();
}

View file

@ -7,7 +7,9 @@
#include "global_scheduler.h"
#include "global_scheduler_algorithm.h"
#include "net.h"
#include "object_info.h"
#include "state/db_client_table.h"
#include "state/object_table.h"
#include "state/table.h"
#include "state/task_table.h"
@ -18,11 +20,17 @@ UT_icd local_scheduler_icd = {sizeof(local_scheduler), NULL, NULL, NULL};
void assign_task_to_local_scheduler(global_scheduler_state *state,
task *task,
node_id node_id) {
char id_string[ID_STRING_SIZE];
LOG_DEBUG("assigning task to node_id = %s",
object_id_to_string(node_id, id_string, ID_STRING_SIZE));
task_set_state(task, TASK_STATUS_SCHEDULED);
task_set_node(task, node_id);
retry_info retry = {
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
};
LOG_DEBUG("Issuing a task table update for task = %s",
object_id_to_string(task_task_id(task), id_string, ID_STRING_SIZE));
UNUSED(id_string);
task_table_update(state->db, copy_task(task), &retry, NULL, NULL);
}
@ -46,13 +54,21 @@ void free_global_scheduler(global_scheduler_state *state) {
db_disconnect(state->db);
utarray_free(state->local_schedulers);
destroy_global_scheduler_policy(state->policy_state);
/* delete the plasma 2 photon association map */
/* Delete the plasma 2 photon association map. */
HASH_ITER(hh, state->plasma_photon_map, entry, tmp) {
HASH_DELETE(hh, state->plasma_photon_map, entry);
/* Now deallocate hash table entry. */
free(entry->aux_address);
free(entry);
}
/* Free the scheduler object info table. */
scheduler_object_info *object_entry, *tmp_entry;
HASH_ITER(hh, state->scheduler_object_info_table, object_entry, tmp_entry) {
HASH_DELETE(hh, state->scheduler_object_info_table, object_entry);
utarray_free(object_entry->object_locations);
free(object_entry);
}
/* Free the global scheduler state. */
free(state);
}
@ -71,6 +87,7 @@ void signal_handler(int signal) {
void process_task_waiting(task *task, void *user_context) {
global_scheduler_state *state = (global_scheduler_state *) user_context;
LOG_DEBUG("Task waiting callback is called.");
handle_task_waiting(state, state->policy_state, task);
}
@ -84,17 +101,97 @@ void process_new_db_client(db_client_id db_client_id,
const char *aux_address,
void *user_context) {
global_scheduler_state *state = (global_scheduler_state *) user_context;
char id_string[ID_STRING_SIZE];
LOG_DEBUG("db client table callback for db client = %s",
object_id_to_string(db_client_id, id_string, ID_STRING_SIZE));
UNUSED(id_string);
if (strncmp(client_type, "photon", strlen("photon")) == 0) {
/* Add plasma_manager ip:port -> photon_db_client_id association to state.
*/
aux_address_entry *plasma_photon_entry = malloc(sizeof(aux_address_entry));
aux_address_entry *plasma_photon_entry =
calloc(1, sizeof(aux_address_entry));
plasma_photon_entry->aux_address = strdup(aux_address);
plasma_photon_entry->photon_db_client_id = db_client_id;
HASH_ADD_STR(state->plasma_photon_map, aux_address, plasma_photon_entry);
HASH_ADD_KEYPTR(
hh, state->plasma_photon_map, plasma_photon_entry->aux_address,
strlen(plasma_photon_entry->aux_address), plasma_photon_entry);
{
/* Print the photon to plasma association map so far. */
aux_address_entry *entry, *tmp;
LOG_DEBUG("Photon to Plasma hash map so far:");
HASH_ITER(hh, state->plasma_photon_map, entry, tmp) {
LOG_DEBUG("%s -> %s", entry->aux_address,
object_id_to_string(entry->photon_db_client_id, id_string,
ID_STRING_SIZE));
}
}
/* Add new local scheduler to the state. */
handle_new_local_scheduler(state, state->policy_state, db_client_id);
}
}
/**
* Process notification about the new object information.
*
* @param object_id ID of the object that the notification is about.
* @param data_size The object size.
* @param manager_count The number of locations for this object.
* @param manager_vector The vector of Plasma Manager locations.
* @param user_context The user context.
* @return Void.
*/
void object_table_subscribe_callback(object_id object_id,
int64_t data_size,
int manager_count,
const char *manager_vector[],
void *user_context) {
/* Extract global scheduler state from the callback context. */
global_scheduler_state *state = (global_scheduler_state *) user_context;
char id_string[ID_STRING_SIZE];
LOG_DEBUG("object table subscribe callback for OBJECT = %s",
object_id_to_string(object_id, id_string, ID_STRING_SIZE));
UNUSED(id_string);
LOG_DEBUG("\tManagers<%d>:", manager_count);
for (int i = 0; i < manager_count; i++) {
LOG_DEBUG("\t\t%s", manager_vector[i]);
}
scheduler_object_info *obj_info_entry = NULL;
HASH_FIND(hh, state->scheduler_object_info_table, &object_id,
sizeof(object_id), obj_info_entry);
if (obj_info_entry == NULL) {
/* Construct a new object info hash table entry. */
obj_info_entry = malloc(sizeof(scheduler_object_info));
memset(obj_info_entry, 0, sizeof(scheduler_object_info));
obj_info_entry->object_id = object_id;
obj_info_entry->data_size = data_size;
HASH_ADD(hh, state->scheduler_object_info_table, object_id,
sizeof(obj_info_entry->object_id), obj_info_entry);
LOG_DEBUG("New object added to object_info_table with id = %s",
object_id_to_string(object_id, id_string, ID_STRING_SIZE));
LOG_DEBUG("\tmanager locations:");
for (int i = 0; i < manager_count; i++) {
LOG_DEBUG("\t\t%s", manager_vector[i]);
}
}
/* In all cases, replace the object location vector on each callback. */
if (obj_info_entry->object_locations != NULL) {
utarray_free(obj_info_entry->object_locations);
obj_info_entry->object_locations = NULL;
}
utarray_new(obj_info_entry->object_locations, &ut_str_icd);
for (int i = 0; i < manager_count; i++) {
utarray_push_back(obj_info_entry->object_locations, &manager_vector[i]);
}
}
void start_server(const char *redis_addr, int redis_port) {
event_loop *loop = event_loop_create();
g_state = init_global_scheduler(loop, redis_addr, redis_port);
@ -113,6 +210,10 @@ void start_server(const char *redis_addr, int redis_port) {
task_table_subscribe(g_state->db, NIL_ID, TASK_STATUS_WAITING,
process_task_waiting, (void *) g_state, &retry, NULL,
NULL);
object_table_subscribe_to_notifications(g_state->db, true,
object_table_subscribe_callback,
g_state, &retry, NULL, NULL);
/* Start the event loop. */
event_loop_run(loop);
}

View file

@ -11,10 +11,27 @@
typedef struct {
/** The ID of the local scheduler in Redis. */
db_client_id id;
/** The number of tasks sent from the global scheduler to this local
* scheduler. */
int64_t num_tasks_sent;
} local_scheduler;
typedef struct global_scheduler_policy_state global_scheduler_policy_state;
/**
* This defines a hash table used to cache information about different objects.
*/
typedef struct {
/** The object ID in question. */
object_id object_id;
/** The size in bytes of the object. */
int64_t data_size;
/** An array of object locations for this object. */
UT_array *object_locations;
/** Handle for the uthash table. */
UT_hash_handle hh;
} scheduler_object_info;
typedef struct {
char *aux_address; /* Key */
db_client_id photon_db_client_id;
@ -31,6 +48,8 @@ typedef struct {
/** The state managed by the scheduling policy. */
global_scheduler_policy_state *policy_state;
aux_address_entry *plasma_photon_map;
/** Objects cached by this global scheduler instance. */
scheduler_object_info *scheduler_object_info_table;
} global_scheduler_state;
void assign_task_to_local_scheduler(global_scheduler_state *state,

View file

@ -1,3 +1,4 @@
#include "object_info.h"
#include "task.h"
#include "task_table.h"
@ -15,20 +16,183 @@ void destroy_global_scheduler_policy(
free(policy_state);
}
void handle_task_waiting(global_scheduler_state *state,
global_scheduler_policy_state *policy_state,
task *task) {
void handle_task_round_robin(global_scheduler_state *state,
global_scheduler_policy_state *policy_state,
task *task) {
if (utarray_len(state->local_schedulers) > 0) {
local_scheduler *scheduler = (local_scheduler *) utarray_eltptr(
state->local_schedulers, policy_state->round_robin_index);
scheduler->num_tasks_sent++;
policy_state->round_robin_index += 1;
policy_state->round_robin_index %= utarray_len(state->local_schedulers);
assign_task_to_local_scheduler(state, task, scheduler->id);
} else {
CHECKM(0, "We currently don't handle this case.");
CHECKM(0, "No local schedulers. We currently don't handle this case.");
}
}
object_size_entry *create_object_size_hashmap(global_scheduler_state *state,
task_spec *task_spec,
bool *has_args_by_ref) {
object_size_entry *s = NULL, *object_size_table = NULL;
for (int i = 0; i < task_num_args(task_spec); i++) {
/* Object ids are only available for args by references.
* Args by value are serialized into the task_spec itself.
* We will only concern ourselves with args by ref for data size calculation
*/
if (task_arg_type(task_spec, i) != ARG_BY_REF) {
continue;
}
*has_args_by_ref = true;
object_id obj_id = task_arg_id(task_spec, i);
/* Look up this object ID in the global scheduler object cache. */
scheduler_object_info *obj_info_entry = NULL;
HASH_FIND(hh, state->scheduler_object_info_table, &obj_id, sizeof(obj_id),
obj_info_entry);
if (obj_info_entry == NULL) {
/* Global scheduler doesn't know anything about this object ID, so log a
* warning and skipt it. */
LOG_WARN("Processing task with object ID not known to global scheduler");
continue;
}
LOG_DEBUG("[GS] found object id, data_size = %" PRId64,
obj_info_entry->data_size);
/* Object is known to the scheduler. For each of its locations, add size. */
int64_t object_size = obj_info_entry->data_size;
char **p = NULL;
char id_string[ID_STRING_SIZE];
LOG_DEBUG("locations for an arg_by_ref obj_id = %s",
object_id_to_string(obj_id, id_string, ID_STRING_SIZE));
UNUSED(id_string);
for (p = (char **) utarray_front(obj_info_entry->object_locations);
p != NULL;
p = (char **) utarray_next(obj_info_entry->object_locations, p)) {
const char *object_location = *p;
LOG_DEBUG("\tobject location: %s", object_location);
/* Look up this location in the local object size hash table. */
HASH_FIND_STR(object_size_table, object_location, s);
if (NULL == s) {
/* This location not yet known, so add this object location. */
s = calloc(1, sizeof(object_size_entry));
s->object_location = object_location;
HASH_ADD_KEYPTR(hh, object_size_table, s->object_location,
strlen(s->object_location), s);
}
/* At this point the object location exists in our hash table. */
s->total_object_size += object_size;
} /* End for each object's location. */
} /* End for each task's object. */
return object_size_table;
}
void free_object_size_hashmap(object_size_entry *object_size_table) {
/* Destroy local state. */
object_size_entry *tmp, *s = NULL;
HASH_ITER(hh, object_size_table, s, tmp) {
HASH_DEL(object_size_table, s);
/* NOTE: Do not free externally stored s->object_location. */
free(s);
}
}
db_client_id get_photon_id(global_scheduler_state *state,
const char *plasma_location) {
aux_address_entry *aux_entry = NULL;
db_client_id photon_id = NIL_ID;
if (plasma_location != NULL) {
LOG_DEBUG("max object size location found : %s", plasma_location);
/* Lookup association of plasma location to photon. */
HASH_FIND_STR(state->plasma_photon_map, plasma_location, aux_entry);
if (aux_entry) {
LOG_DEBUG("found photon db client association for plasma ip:port = %s",
aux_entry->aux_address);
/* Plasma to photon db client ID association found, get photon ID. */
photon_id = aux_entry->photon_db_client_id;
} else {
LOG_ERROR("photon db client association not found for plasma ip:port=%s",
plasma_location);
}
}
char id_string[ID_STRING_SIZE];
LOG_DEBUG("photon ID found = %s",
object_id_to_string(photon_id, id_string, ID_STRING_SIZE));
UNUSED(id_string);
if (IS_NIL_ID(photon_id)) {
return photon_id;
}
/* Check to make sure this photon_db_client_id matches one of the
* schedulers. */
int i;
for (i = 0; i < utarray_len(state->local_schedulers); ++i) {
local_scheduler *local_scheduler_ptr =
(local_scheduler *) utarray_eltptr(state->local_schedulers, i);
if (memcmp(&local_scheduler_ptr->id, &photon_id, sizeof(photon_id)) == 0) {
LOG_DEBUG("photon_id matched cached local scheduler entry.");
break;
}
}
if (i == utarray_len(state->local_schedulers)) {
LOG_WARN("photon_id didn't match any cached local scheduler entries");
}
return photon_id;
}
void handle_task_waiting(global_scheduler_state *state,
global_scheduler_policy_state *policy_state,
task *task) {
task_spec *task_spec = task_task_spec(task);
CHECKM(task_spec != NULL,
"task wait handler encounted a task with NULL spec");
/* Local hash table to keep track of aggregate object sizes per local
* scheduler. */
object_size_entry *tmp, *s = NULL, *object_size_table = NULL;
bool has_args_by_ref = false;
object_size_table =
create_object_size_hashmap(state, task_spec, &has_args_by_ref);
if (!object_size_table) {
char id_string[ID_STRING_SIZE];
if (has_args_by_ref) {
LOG_DEBUG(
"Using simple policy. Didn't find objects in GS cache for task = %s",
object_id_to_string(task_task_id(task), id_string, ID_STRING_SIZE));
/* TODO(future): wait for object notification and try again. */
} else {
LOG_DEBUG("Using simple policy. No arguments passed by reference.");
}
UNUSED(id_string);
handle_task_round_robin(state, policy_state, task);
return;
}
LOG_DEBUG("Using transfer-aware policy");
/* Pick maximum object_size and assign task to that scheduler. */
int64_t max_object_size = 0;
const char *max_object_location = NULL;
HASH_ITER(hh, object_size_table, s, tmp) {
if (s->total_object_size > max_object_size) {
max_object_size = s->total_object_size;
max_object_location = s->object_location;
}
}
db_client_id photon_id = get_photon_id(state, max_object_location);
CHECKM(!IS_NIL_ID(photon_id), "Failed to find an LS: num_args = %" PRId64
" num_returns = %" PRId64 "\n",
task_num_args(task_spec), task_num_returns(task_spec));
assign_task_to_local_scheduler(state, task, photon_id);
free_object_size_hashmap(object_size_table);
}
void handle_object_available(global_scheduler_state *state,
global_scheduler_policy_state *policy_state,
object_id object_id) {
@ -47,5 +211,6 @@ void handle_new_local_scheduler(global_scheduler_state *state,
local_scheduler local_scheduler;
memset(&local_scheduler, 0, sizeof(local_scheduler));
local_scheduler.id = db_client_id;
local_scheduler.num_tasks_sent = 0;
utarray_push_back(state->local_schedulers, &local_scheduler);
}

View file

@ -13,12 +13,24 @@
*
*/
typedef enum {
SCHED_ALGORITHM_ROUND_ROBIN = 1,
SCHED_ALGORITHM_TRANSFER_AWARE = 2,
SCHED_ALGORITHM_MAX
} global_scheduler_algorithm;
/** The state managed by the global scheduling policy. */
struct global_scheduler_policy_state {
/** The index of the next local scheduler to assign a task to. */
int64_t round_robin_index;
};
typedef struct {
const char *object_location;
int64_t total_object_size;
UT_hash_handle hh;
} object_size_entry;
/**
* Create the state of the global scheduler policy. This state must be freed by
* the caller.

View file

@ -16,19 +16,21 @@ import unittest
import global_scheduler
import photon
import plasma
from plasma.utils import random_object_id, generate_metadata, write_to_data_buffer, create_object_with_id, create_object
USE_VALGRIND = False
PLASMA_STORE_MEMORY = 1000000000
ID_SIZE = 20
# These constants must match the schedulign state enum in task.h.
# These constants must match the scheduling state enum in task.h.
TASK_STATUS_WAITING = 1
TASK_STATUS_SCHEDULED = 2
TASK_STATUS_RUNNING = 4
TASK_STATUS_DONE = 8
def random_object_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
# DB_CLIENT_PREFIX is an implementation detail of ray_redis_module.c, so this
# must be kept in sync with that file.
DB_CLIENT_PREFIX = "CL:"
def random_task_id():
return photon.ObjectID(np.random.bytes(ID_SIZE))
@ -60,9 +62,14 @@ class TestGlobalScheduler(unittest.TestCase):
plasma_store_name, self.p2 = plasma.start_plasma_store()
# Start the Plasma manager.
plasma_manager_name, self.p3, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address)
plasma_address = "{}:{}".format(node_ip_address, plasma_manager_port)
self.plasma_address = "{}:{}".format(node_ip_address, plasma_manager_port)
self.plasma_client = plasma.PlasmaClient(plasma_store_name, plasma_manager_name)
# Start the local scheduler.
local_scheduler_name, self.p4 = photon.start_local_scheduler(plasma_store_name, plasma_manager_name=plasma_manager_name, plasma_address=plasma_address, redis_address=redis_address)
local_scheduler_name, self.p4 = photon.start_local_scheduler(
plasma_store_name,
plasma_manager_name=plasma_manager_name,
plasma_address=self.plasma_address,
redis_address=redis_address)
# Connect to the scheduler.
self.photon_client = photon.PhotonClient(local_scheduler_name)
@ -78,7 +85,8 @@ class TestGlobalScheduler(unittest.TestCase):
if USE_VALGRIND:
self.p1.send_signal(signal.SIGTERM)
self.p1.wait()
os._exit(self.p1.returncode)
if self.p1.returncode != 0:
os._exit(-1)
else:
self.p1.kill()
self.p2.kill()
@ -88,22 +96,65 @@ class TestGlobalScheduler(unittest.TestCase):
# after we kill the global scheduler.
self.redis_process.kill()
def test_redis_contents(self):
# DB_CLIENT_PREFIX is an implementation detail of ray_redis_module.c, so
# this must be kept in sync with that file.
DB_CLIENT_PREFIX = "CL:"
def get_plasma_manager_id(self):
"""Get the db_client_id with client_type equal to plasma_manager.
Iterates over all the client table keys, gets the db_client_id for the
client with client_type matching plasma_manager. Strips the client table
prefix. TODO(atumanov): write a separate function to get all plasma manager
client IDs.
Returns:
The db_client_id if one is found and otherwise None.
"""
db_client_id = None
client_list = self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))
for client_id in client_list:
response = self.redis_client.hget(client_id, b"client_type")
if response == b"plasma_manager":
db_client_id = client_id
break
return db_client_id
def test_redis_only_single_task(self):
"""
Tests global scheduler functionality by interacting with Redis and checking
task state transitions in Redis only. TODO(atumanov): implement.
"""
# Check precondition for this test:
# There should be three db clients, the global scheduler, the local
# scheduler, and the plasma manager.
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 3)
db_client_id = self.get_plasma_manager_id()
assert(db_client_id != None)
assert(db_client_id.startswith(b"CL:"))
db_client_id = db_client_id[len(b"CL:"):] # Remove the CL: prefix.
def test_integration_single_task(self):
# There should be three db clients, the global scheduler, the local
# scheduler, and the plasma manager.
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 3)
num_return_vals = [0, 1, 2, 3, 5, 10]
# There should not be anything else in Redis yet.
self.assertEqual(len(self.redis_client.keys("*")), 3)
# Insert the object into Redis.
data_size = 0xf1f0
metadata_size = 0x40
object_dep, memory_buffer, metadata = create_object(self.plasma_client, data_size, metadata_size, seal=True)
# Sleep before submitting task to photon.
time.sleep(0.1)
# Submit a task to Redis.
task = photon.Task(random_function_id(), [], 0, random_task_id(), 0)
task = photon.Task(random_function_id(), [photon.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0)
self.photon_client.submit(task)
time.sleep(0.1)
# There should now be a task in Redis, and it should get assigned to the
# local scheduler
while True:
num_retries = 10
while num_retries > 0:
task_entries = self.redis_client.keys("task*")
self.assertLessEqual(len(task_entries), 1)
if len(task_entries) == 1:
@ -112,29 +163,73 @@ class TestGlobalScheduler(unittest.TestCase):
self.assertTrue(task_status in [TASK_STATUS_WAITING, TASK_STATUS_SCHEDULED])
if task_status == TASK_STATUS_SCHEDULED:
break
else:
print(task_status)
print("The task has not been scheduled yet, trying again.")
num_retries -= 1
time.sleep(1)
if num_retries <= 0 and task_status != TASK_STATUS_SCHEDULED:
# Failed to submit and schedule a single task -- bail.
self.tearDown()
sys.exit(1)
def integration_many_tasks_helper(self, timesync=True):
# There should be three db clients, the global scheduler, the local
# scheduler, and the plasma manager.
self.assertEqual(len(self.redis_client.keys("{}*".format(DB_CLIENT_PREFIX))), 3)
num_return_vals = [0, 1, 2, 3, 5, 10]
# Submit a bunch of tasks to Redis.
num_tasks = 1000
for _ in range(num_tasks):
task = photon.Task(random_function_id(), [], 0, random_task_id(), 0)
# Create a new object for each task.
data_size = np.random.randint(1 << 20)
metadata_size = np.random.randint(1 << 10)
object_dep, memory_buffer, metadata = create_object(self.plasma_client, data_size, metadata_size, seal=True)
if timesync:
# Give 10ms for object info handler to fire (long enough to yield CPU).
time.sleep(0.010)
task = photon.Task(random_function_id(), [photon.ObjectID(object_dep)], num_return_vals[0], random_task_id(), 0)
self.photon_client.submit(task)
# Check that there are the correct number of tasks in Redis and that they
# all get assigned to the local scheduler.
while True:
num_retries = 10
num_tasks_done = 0
while num_retries > 0:
task_entries = self.redis_client.keys("task*")
self.assertLessEqual(len(task_entries), num_tasks + 1)
if len(task_entries) == num_tasks + 1:
self.assertLessEqual(len(task_entries), num_tasks)
# First, check if all tasks made it to Redis.
if len(task_entries) == num_tasks:
task_contents = [self.redis_client.hgetall(task_entries[i]) for i in range(len(task_entries))]
task_statuses = [int(contents[b"state"]) for contents in task_contents]
self.assertTrue(all([status in [TASK_STATUS_WAITING, TASK_STATUS_SCHEDULED] for status in task_statuses]))
num_tasks_done = task_statuses.count(TASK_STATUS_SCHEDULED)
num_tasks_waiting = task_statuses.count(TASK_STATUS_WAITING)
print("tasks in Redis = {}, tasks waiting = {}, tasks scheduled = {}, retries left = {}"
.format(len(task_entries), num_tasks_waiting, num_tasks_done, num_retries))
if all([status == TASK_STATUS_SCHEDULED for status in task_statuses]):
# We're done, so pass.
break
print("The tasks have not been scheduled yet, trying again.")
num_retries -= 1
time.sleep(0.1)
if num_tasks_done != num_tasks:
# At least one of the tasks failed to schedule.
self.tearDown()
sys.exit(2)
def test_integration_many_tasks_handler_sync(self):
self.integration_many_tasks_helper(timesync=True)
def test_integration_many_tasks(self):
# More realistic case: should handle out of order object and task
# notifications.
self.integration_many_tasks_helper(timesync=False)
if __name__ == "__main__":
if len(sys.argv) > 1:
# pop the argument so we don't mess with unittest's own argument parser
# Pop the argument so we don't mess with unittest's own argument parser.
arg = sys.argv.pop()
if arg == "valgrind":
USE_VALGRIND = True

View file

@ -49,7 +49,8 @@ class TestPhotonClient(unittest.TestCase):
if USE_VALGRIND:
self.p2.send_signal(signal.SIGTERM)
self.p2.wait()
os._exit(self.p2.returncode)
if self.p2.returncode != 0:
os._exit(-1)
else:
self.p2.kill()

View file

@ -0,0 +1,38 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import random
def random_object_id():
return np.random.bytes(20)
def generate_metadata(length):
metadata_buffer = bytearray(length)
if length > 0:
metadata_buffer[0] = random.randint(0, 255)
metadata_buffer[-1] = random.randint(0, 255)
for _ in range(100):
metadata_buffer[random.randint(0, length - 1)] = random.randint(0, 255)
return metadata_buffer
def write_to_data_buffer(buff, length):
if length > 0:
buff[0] = chr(random.randint(0, 255))
buff[-1] = chr(random.randint(0, 255))
for _ in range(100):
buff[random.randint(0, length - 1)] = chr(random.randint(0, 255))
def create_object_with_id(client, object_id, data_size, metadata_size, seal=True):
metadata = generate_metadata(metadata_size)
memory_buffer = client.create(object_id, data_size, metadata)
write_to_data_buffer(memory_buffer, data_size)
if seal:
client.seal(object_id)
return memory_buffer, metadata
def create_object(client, data_size, metadata_size, seal=True):
object_id = random_object_id()
memory_buffer, metadata = create_object_with_id(client, object_id, data_size, metadata_size, seal=seal)
return object_id, memory_buffer, metadata

View file

@ -882,24 +882,3 @@ void object_requests_set_status_all(int num_object_requests,
object_requests[i].status = status;
}
}
void object_id_print(object_id obj_id) {
for (int i = 0; i < sizeof(object_id); ++i) {
printf("%u.", obj_id.id[i]);
if (i < sizeof(object_id) - 1) {
printf(".");
}
}
}
void object_requests_print(int num_object_requests,
object_request object_requests[]) {
for (int i = 0; i < num_object_requests; ++i) {
printf("[");
for (int j = 0; j < sizeof(object_id); ++j) {
object_id_print(object_requests[i].object_id);
}
printf(" | %d | %d], ", object_requests[i].type, object_requests[i].status);
}
printf("\n");
}

View file

@ -446,23 +446,4 @@ void object_requests_set_status_all(int num_object_requests,
object_request object_requests[],
int status);
/**
* Print an object ID with bytes separated by ".".
*
* @param object_id Object ID to be printed.
* @return Void.
*/
void object_id_print(object_id object_id);
/**
* Print all object requests in an array (for debugging purposes).
*
* @param num_object_requests Number of elements in the array of object
* requests.
* @param object_requests Array of object requests.
* @return Void.
*/
void object_requests_print(int num_object_requests,
object_request object_requests[]);
#endif /* PLASMA_CLIENT_H */

View file

@ -16,42 +16,11 @@ import time
import unittest
import plasma
from plasma.utils import random_object_id, generate_metadata, write_to_data_buffer, create_object_with_id, create_object
USE_VALGRIND = False
PLASMA_STORE_MEMORY = 1000000000
def random_object_id():
return np.random.bytes(20)
def generate_metadata(length):
metadata_buffer = bytearray(length)
if length > 0:
metadata_buffer[0] = random.randint(0, 255)
metadata_buffer[-1] = random.randint(0, 255)
for _ in range(100):
metadata_buffer[random.randint(0, length - 1)] = random.randint(0, 255)
return metadata_buffer
def write_to_data_buffer(buff, length):
if length > 0:
buff[0] = chr(random.randint(0, 255))
buff[-1] = chr(random.randint(0, 255))
for _ in range(100):
buff[random.randint(0, length - 1)] = chr(random.randint(0, 255))
def create_object_with_id(client, object_id, data_size, metadata_size, seal=True):
metadata = generate_metadata(metadata_size)
memory_buffer = client.create(object_id, data_size, metadata)
write_to_data_buffer(memory_buffer, data_size)
if seal:
client.seal(object_id)
return memory_buffer, metadata
def create_object(client, data_size, metadata_size, seal=True):
object_id = random_object_id()
memory_buffer, metadata = create_object_with_id(client, object_id, data_size, metadata_size, seal=seal)
return object_id, memory_buffer, metadata
def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffer=None, metadata=None):
if memory_buffer is not None:
unit_test.assertEqual(memory_buffer[:], client2.get(object_id)[:])