remove C struct duplication and python plasma manager

This commit is contained in:
Philipp Moritz 2016-09-05 15:34:11 -07:00
parent a0490846a2
commit ad1a8454d5
7 changed files with 157 additions and 151 deletions

View file

@ -9,77 +9,38 @@ ID = ctypes.c_ubyte * 20
class PlasmaID(ctypes.Structure): class PlasmaID(ctypes.Structure):
_fields_ = [("plasma_id", ID)] _fields_ = [("plasma_id", ID)]
# these must be in sync with plasma_request_type in plasma.h (can we have a test for that?)
PLASMA_CREATE = 0
PLASMA_GET = 1
PLASMA_SEAL = 2
PLASMA_TRANSFER = 3
PLASMA_DATA = 4
PLASMA_REGISTER = 5
class PlasmaRequest(ctypes.Structure):
_fields_ = [("type", ctypes.c_int),
("object_id", PlasmaID),
("size", ctypes.c_int64),
("addr", Addr),
("port", ctypes.c_int)]
class PlasmaBuffer(ctypes.Structure):
_fields_ = [("plasma_id", PlasmaID),
("data", ctypes.c_void_p),
("size", ctypes.c_int64),
("writable", ctypes.c_int)]
def make_plasma_id(string): def make_plasma_id(string):
if len(string) != 20: if len(string) != 20:
raise Exception("PlasmaIDs must be 20 characters long") raise Exception("PlasmaIDs must be 20 characters long")
object_id = map(ord, string) object_id = map(ord, string)
return PlasmaID(plasma_id=ID(*object_id)) return PlasmaID(plasma_id=ID(*object_id))
class PlasmaManager(object):
"""The PlasmaManager is used to manage a PlasmaStore.
There should be one PlasmaManager per PlasmaStore. The PlasmaManager is
responsible for interfacing with other PlasmaManagers in order to transfer
objects between PlasmaStores. This class sends commands to the C
implementation of the PlasmaManager using sockets.
Attributes:
sock: The socket used to communicate with the C implementation of the
PlasmaManager.
"""
def __init__(self, addr, port):
"""Initialize the PlasmaManager."""
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((addr, port))
def transfer(self, addr, port, object_id):
"""Transfer local object with id object_id to manager with id manager_id."""
req = PlasmaRequest(type=PLASMA_TRANSFER, object_id=make_plasma_id(object_id),
addr=Addr(*map(int, addr.split("."))), port=port)
print "sending port", port
self.sock.send(buffer(req)[:])
class PlasmaClient(object): class PlasmaClient(object):
"""The PlasmaClient is used to interface with a PlasmaStore. """The PlasmaClient is used to interface with a plasma store and a plasma manager.
The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a
buffer, and get a buffer. Buffers are referred to by object IDs, which are buffer, and get a buffer. Buffers are referred to by object IDs, which are
strings. strings.
""" """
def __init__(self, socket_name): def __init__(self, socket_name, addr=None, port=None):
"""Initialize the PlasmaClient.
Args:
socket_name (str): Name of the socket the plasma store is listening at.
addr (str): IPv4 address of plasma manager attached to the plasma store.
port (int): Port number of the plasma manager attached to the plasma store.
"""
plasma_client_library = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_client.so") plasma_client_library = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_client.so")
self.client = ctypes.cdll.LoadLibrary(plasma_client_library) self.client = ctypes.cdll.LoadLibrary(plasma_client_library)
self.client.plasma_store_connect.restype = ctypes.c_int self.client.plasma_store_connect.restype = ctypes.c_int
self.client.plasma_create.argtypes = [ctypes.c_int, PlasmaID, ctypes.c_int64] self.client.plasma_create.argtypes = [ctypes.c_int, PlasmaID, ctypes.c_int64, ctypes.POINTER(ctypes.c_void_p)]
self.client.plasma_create.restype = PlasmaBuffer self.client.plasma_create.restype = None
self.client.plasma_get.argtypes = [ctypes.c_int, PlasmaID] self.client.plasma_get.argtypes = [ctypes.c_int, PlasmaID, ctypes.POINTER(ctypes.c_int64), ctypes.POINTER(ctypes.c_void_p)]
self.client.plasma_get.restype = PlasmaBuffer self.client.plasma_get.restype = None
self.client.plasma_seal.argtypes = [ctypes.c_int, PlasmaID] self.client.plasma_seal.argtypes = [ctypes.c_int, PlasmaID]
self.client.plasma_seal.restype = None self.client.plasma_seal.restype = None
@ -94,6 +55,11 @@ class PlasmaClient(object):
self.sock = self.client.plasma_store_connect(socket_name) self.sock = self.client.plasma_store_connect(socket_name)
if addr is not None and port is not None:
self.manager_conn = self.client.plasma_manager_connect(addr, port)
else:
self.manager_conn = -1 # not connected
def create(self, object_id, size): def create(self, object_id, size):
"""Create a new buffer in the PlasmaStore for a particular object ID. """Create a new buffer in the PlasmaStore for a particular object ID.
@ -103,8 +69,9 @@ class PlasmaClient(object):
object_id (str): A string used to identify an object. object_id (str): A string used to identify an object.
size (int): The size in bytes of the created buffer. size (int): The size in bytes of the created buffer.
""" """
buf = self.client.plasma_create(self.sock, make_plasma_id(object_id), size) data = ctypes.c_void_p()
return self.buffer_from_read_write_memory(buf.data, buf.size) self.client.plasma_create(self.sock, make_plasma_id(object_id), size, ctypes.byref(data))
return self.buffer_from_read_write_memory(data, size)
def get(self, object_id): def get(self, object_id):
"""Create a buffer from the PlasmaStore based on object ID. """Create a buffer from the PlasmaStore based on object ID.
@ -115,8 +82,10 @@ class PlasmaClient(object):
Args: Args:
object_id (str): A string used to identify an object. object_id (str): A string used to identify an object.
""" """
buf = self.client.plasma_get(self.sock, make_plasma_id(object_id)) size = ctypes.c_int64()
return self.buffer_from_memory(buf.data, buf.size) data = ctypes.c_void_p()
buf = self.client.plasma_get(self.sock, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data))
return self.buffer_from_memory(data, size)
def seal(self, object_id): def seal(self, object_id):
"""Seal the buffer in the PlasmaStore for a particular object ID. """Seal the buffer in the PlasmaStore for a particular object ID.
@ -128,3 +97,16 @@ class PlasmaClient(object):
object_id (str): A string used to identify an object. object_id (str): A string used to identify an object.
""" """
self.client.plasma_seal(self.sock, make_plasma_id(object_id)) self.client.plasma_seal(self.sock, make_plasma_id(object_id))
def transfer(self, addr, port, object_id):
"""Transfer local object with id object_id to another plasma instance
Args:
addr (str): IPv4 address of the plasma instance the object is sent to.
port (int): Port number of the plasma instance the object is sent to.
object_id (str): A string used to identify an object.
"""
if self.manager_conn == -1:
raise Exception("Not connected to the plasma manager socket")
self.client.plasma_transfer(self.manager_conn, addr, port, make_plasma_id(object_id))

View file

@ -16,6 +16,8 @@
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
int conn = -1; int conn = -1;
int64_t size;
void *data;
int c; int c;
plasma_id id = {{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, plasma_id id = {{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255}}; 255, 255, 255, 255, 255, 255, 255, 255}};
@ -26,14 +28,14 @@ int main(int argc, char *argv[]) {
break; break;
case 'c': case 'c':
assert(conn != -1); assert(conn != -1);
plasma_create(conn, id, 100); plasma_create(conn, id, 100, &data);
break; break;
case 'f': case 'f':
assert(conn != -1); assert(conn != -1);
plasma_seal(conn, id); plasma_seal(conn, id);
break; break;
case 'g': case 'g':
plasma_get(conn, id); plasma_get(conn, id, &size, &data);
break; break;
default: default:
abort(); abort();
@ -42,4 +44,3 @@ int main(int argc, char *argv[]) {
assert(conn != -1); assert(conn != -1);
close(conn); close(conn);
} }

View file

@ -65,11 +65,16 @@ typedef struct {
int writable; int writable;
} plasma_buffer; } plasma_buffer;
// Connect to the local plasma store UNIX domain socket
int plasma_store_connect(const char* socket_name); int plasma_store_connect(const char* socket_name);
plasma_buffer plasma_create(int conn, plasma_id object_id, int64_t size);
plasma_buffer plasma_get(int conn, plasma_id object_id);
void plasma_seal(int fd, plasma_id object_id);
void plasma_send(int fd, plasma_request *req); // Connect to a possibly remote plasma manager
int plasma_manager_connect(const char* addr, int port);
void plasma_create(int store, plasma_id object_id, int64_t size, void **data);
void plasma_get(int store, plasma_id object_id, int64_t *size, void **data);
void plasma_seal(int store, plasma_id object_id);
void plasma_send(int conn, plasma_request *req);
#endif #endif

View file

@ -23,7 +23,7 @@ void plasma_send(int fd, plasma_request *req) {
} }
} }
plasma_buffer plasma_create(int conn, plasma_id object_id, int64_t size) { void plasma_create(int conn, plasma_id object_id, int64_t size, void **data) {
LOG_INFO("called plasma_create on conn %d with size %" PRId64, conn, size); LOG_INFO("called plasma_create on conn %d with size %" PRId64, conn, size);
plasma_request req = { .type = PLASMA_CREATE, .object_id = object_id, .size = size }; plasma_request req = { .type = PLASMA_CREATE, .object_id = object_id, .size = size };
plasma_send(conn, &req); plasma_send(conn, &req);
@ -31,16 +31,14 @@ plasma_buffer plasma_create(int conn, plasma_id object_id, int64_t size) {
int fd = recv_fd(conn, (char*)&reply, sizeof(plasma_reply)); int fd = recv_fd(conn, (char*)&reply, sizeof(plasma_reply));
assert(reply.type == PLASMA_OBJECT); assert(reply.type == PLASMA_OBJECT);
assert(reply.size == size); assert(reply.size == size);
void *data = mmap(NULL, reply.size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); *data = mmap(NULL, reply.size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (data == MAP_FAILED) { if (*data == MAP_FAILED) {
LOG_ERR("mmap failed"); LOG_ERR("mmap failed");
exit(-1); exit(-1);
} }
plasma_buffer buffer = { object_id, data, size, 1 };
return buffer;
} }
plasma_buffer plasma_get(int conn, plasma_id object_id) { void plasma_get(int conn, plasma_id object_id, int64_t *size, void **data) {
plasma_request req = { .type = PLASMA_GET, .object_id = object_id }; plasma_request req = { .type = PLASMA_GET, .object_id = object_id };
plasma_send(conn, &req); plasma_send(conn, &req);
plasma_reply reply; plasma_reply reply;
@ -52,13 +50,12 @@ plasma_buffer plasma_get(int conn, plasma_id object_id) {
fd = new_fd; fd = new_fd;
} }
assert(reply.type == PLASMA_OBJECT); assert(reply.type == PLASMA_OBJECT);
void *data = mmap(NULL, reply.size, PROT_READ, MAP_SHARED, fd, 0); *data = mmap(NULL, reply.size, PROT_READ, MAP_SHARED, fd, 0);
if (data == MAP_FAILED) { if (*data == MAP_FAILED) {
LOG_ERR("mmap failed"); LOG_ERR("mmap failed");
exit(-1); exit(-1);
} }
plasma_buffer buffer = { object_id, data, reply.size, 0 }; *size = reply.size;
return buffer;
} }
void plasma_seal(int fd, plasma_id object_id) { void plasma_seal(int fd, plasma_id object_id) {
@ -94,3 +91,41 @@ int plasma_store_connect(const char* socket_name) {
} }
return fd; return fd;
} }
#define h_addr h_addr_list[0]
int plasma_manager_connect(const char* ip_addr, int port) {
int fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd < 0) {
LOG_ERR("could not create socket");
exit(-1);
}
struct hostent *manager = gethostbyname(ip_addr); // TODO(pcm): cache this
if (!manager) {
LOG_ERR("plasma manager %s not found", ip_addr);
exit(-1);
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
bcopy(manager->h_addr, &addr.sin_addr.s_addr, manager->h_length);
addr.sin_port = htons(port);
int r = connect(fd, (struct sockaddr*) &addr, sizeof(addr));
if (r < 0) {
LOG_ERR("could not establish connection to manager with id %s:%d", &ip_addr[0], port);
exit(-1);
}
return fd;
}
void plasma_transfer(int manager, const char* addr, int port, plasma_id object_id) {
plasma_request req = {.type = PLASMA_TRANSFER, .object_id = object_id, .port = port};
char* end = NULL;
for (int i = 0; i < 4; ++i) {
req.addr[i] = strtol(end ? end : addr, &end, 10);
end += 1; // skip the '.'
}
plasma_send(manager, &req);
}

View file

@ -22,43 +22,7 @@
#include <netdb.h> #include <netdb.h>
#include "plasma.h" #include "plasma.h"
#include "plasma_manager.h"
#define MAX_CONNECTIONS 2048
#define MAX_NUM_MANAGERS 1024
enum conn_type {
// Connection to send commands to the manager.
CONN_CONTROL,
// Connection to send data to another manager.
CONN_WRITE_DATA,
// Connection to receive data from another manager.
CONN_READ_DATA
};
typedef struct {
// Unique identifier for the connection.
int id;
// Of type conn_type.
int type;
// Socket of the plasma store that is accessed for reading or writing data for
// this connection.
int store_conn;
// Buffer this connection is reading from or writing to.
plasma_buffer buf;
// Current position in the buffer.
int64_t cursor;
} conn_state;
typedef struct {
// Name of the socket connecting to local plasma store.
const char* store_socket_name;
// Number of connections.
int num_conn;
// For the "poll" system call.
struct pollfd waiting[MAX_CONNECTIONS];
// Status of connections (both control and data).
conn_state conn[MAX_CONNECTIONS];
} plasma_manager_state;
void init_manager_state(plasma_manager_state *s, const char* store_socket_name) { void init_manager_state(plasma_manager_state *s, const char* store_socket_name) {
memset(&s->waiting, 0, sizeof(s->waiting)); memset(&s->waiting, 0, sizeof(s->waiting));
@ -67,22 +31,17 @@ void init_manager_state(plasma_manager_state *s, const char* store_socket_name)
s->store_socket_name = store_socket_name; s->store_socket_name = store_socket_name;
} }
#define h_addr h_addr_list[0]
// Add connection for sending commands or data to another plasma manager // Add connection for sending commands or data to another plasma manager
// (returns the connection id). // (returns the connection index).
int add_conn(plasma_manager_state* s, int type, int fd, int events, plasma_buffer* buf) { int add_conn(plasma_manager_state* s, int type, int fd, int events, plasma_buffer* buf) {
static int conn_id = 0;
s->waiting[s->num_conn].fd = fd; s->waiting[s->num_conn].fd = fd;
s->waiting[s->num_conn].events = events; s->waiting[s->num_conn].events = events;
s->conn[s->num_conn].id = conn_id;
s->conn[s->num_conn].type = type; s->conn[s->num_conn].type = type;
if (buf) { if (buf) {
s->conn[s->num_conn].buf = *buf; s->conn[s->num_conn].buf = *buf;
} }
s->conn[s->num_conn].cursor = 0; s->conn[s->num_conn].cursor = 0;
s->num_conn += 1; return s->num_conn++;
return conn_id++;
} }
// Remove connection with index i by swapping it with the last element. // Remove connection with index i by swapping it with the last element.
@ -100,33 +59,15 @@ void remove_conn(plasma_manager_state* s, int i) {
// the data header to the other object manager. // the data header to the other object manager.
void initiate_transfer(plasma_manager_state* state, plasma_request* req) { void initiate_transfer(plasma_manager_state* state, plasma_request* req) {
int c = plasma_store_connect(state->store_socket_name); int c = plasma_store_connect(state->store_socket_name);
plasma_buffer buf = plasma_get(c, req->object_id); plasma_buffer buf = { .object_id = req->object_id, .writable = 0 };
plasma_get(c, req->object_id, &buf.size, &buf.data);
int fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd < 0) {
LOG_ERR("could not create socket");
exit(-1);
}
char ip_addr[16]; char ip_addr[16];
snprintf(ip_addr, 32, "%d.%d.%d.%d", snprintf(ip_addr, 32, "%d.%d.%d.%d",
req->addr[0], req->addr[1], req->addr[0], req->addr[1],
req->addr[2], req->addr[3]); req->addr[2], req->addr[3]);
struct hostent *manager = gethostbyname(ip_addr); // TODO(pcm): cache this
if (!manager) { int fd = plasma_manager_connect(&ip_addr[0], req->port);
LOG_ERR("plasma manager %s not found", ip_addr);
exit(-1);
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
bcopy(manager->h_addr, &addr.sin_addr.s_addr, manager->h_length);
addr.sin_port = htons(req->port);
int r = connect(fd, (struct sockaddr*) &addr, sizeof(addr));
if (r < 0) {
LOG_ERR("could not establish connection to manager with id %s:%d", &ip_addr[0], req->port);
exit(-1);
}
add_conn(state, CONN_WRITE_DATA, fd, POLLOUT, &buf); add_conn(state, CONN_WRITE_DATA, fd, POLLOUT, &buf);
@ -139,7 +80,10 @@ void setup_data_connection(int conn_idx, plasma_manager_state* state, plasma_req
int store_conn = plasma_store_connect(state->store_socket_name); int store_conn = plasma_store_connect(state->store_socket_name);
state->conn[conn_idx].type = CONN_READ_DATA; state->conn[conn_idx].type = CONN_READ_DATA;
state->conn[conn_idx].store_conn = store_conn; state->conn[conn_idx].store_conn = store_conn;
state->conn[conn_idx].buf = plasma_create(store_conn, req->object_id, req->size); state->conn[conn_idx].buf.object_id = req->object_id;
state->conn[conn_idx].buf.size = req->size;
state->conn[conn_idx].buf.writable = 1;
plasma_create(store_conn, req->object_id, req->size, &state->conn[conn_idx].buf.data);
state->conn[conn_idx].cursor = 0; state->conn[conn_idx].cursor = 0;
} }
@ -170,7 +114,7 @@ void read_from_socket(plasma_manager_state* state, int i, plasma_request* req) {
if (r == 1) { if (r == 1) {
LOG_ERR("read error"); LOG_ERR("read error");
} else if (r == 0) { } else if (r == 0) {
LOG_INFO("connection with id %d disconnected", state->conn[i].id); LOG_INFO("connection with index %d disconnected", i);
remove_conn(state, i); remove_conn(state, i);
} else { } else {
process_command(i, state, req); process_command(i, state, req);

42
src/plasma_manager.h Normal file
View file

@ -0,0 +1,42 @@
#ifndef PLASMA_MANAGER_H
#define PLASMA_MANAGER_H
#include <poll.h>
#define MAX_CONNECTIONS 2048
enum conn_type {
// Connection to send commands to the manager.
CONN_CONTROL,
// Connection to send data to another manager.
CONN_WRITE_DATA,
// Connection to receive data from another manager.
CONN_READ_DATA
};
typedef struct {
// Of type conn_type.
int type;
// Socket of the plasma store that is accessed for reading or writing data for
// this connection.
int store_conn;
// Buffer this connection is reading from or writing to.
plasma_buffer buf;
// Current position in the buffer.
int64_t cursor;
} conn_state;
typedef struct {
// ID of this manager
int64_t manager_id;
// Name of the socket connecting to local plasma store.
const char* store_socket_name;
// Number of connections.
int num_conn;
// For the "poll" system call.
struct pollfd waiting[MAX_CONNECTIONS];
// Status of connections (both control and data).
conn_state conn[MAX_CONNECTIONS];
} plasma_manager_state;
#endif

View file

@ -69,9 +69,6 @@ class TestPlasmaManager(unittest.TestCase):
plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_store") plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_store")
self.p2 = subprocess.Popen([plasma_store_executable, "-s", "/tmp/store1"]) self.p2 = subprocess.Popen([plasma_store_executable, "-s", "/tmp/store1"])
self.p3 = subprocess.Popen([plasma_store_executable, "-s", "/tmp/store2"]) self.p3 = subprocess.Popen([plasma_store_executable, "-s", "/tmp/store2"])
# Connect two PlasmaClients.
self.client1 = plasma.PlasmaClient("/tmp/store1")
self.client2 = plasma.PlasmaClient("/tmp/store2")
# Start two PlasmaManagers. # Start two PlasmaManagers.
self.port1 = random.randint(10000, 50000) self.port1 = random.randint(10000, 50000)
self.port2 = random.randint(10000, 50000) self.port2 = random.randint(10000, 50000)
@ -79,13 +76,13 @@ class TestPlasmaManager(unittest.TestCase):
self.p4 = subprocess.Popen([plasma_manager_executable, "-s", "/tmp/store1", "-m", "127.0.0.1", "-p", str(self.port1)]) self.p4 = subprocess.Popen([plasma_manager_executable, "-s", "/tmp/store1", "-m", "127.0.0.1", "-p", str(self.port1)])
self.p5 = subprocess.Popen([plasma_manager_executable, "-s", "/tmp/store2", "-m", "127.0.0.1", "-p", str(self.port2)]) self.p5 = subprocess.Popen([plasma_manager_executable, "-s", "/tmp/store2", "-m", "127.0.0.1", "-p", str(self.port2)])
time.sleep(0.1) time.sleep(0.1)
# Connect two Python PlasmaManagers. # Connect two PlasmaClients.
self.manager1 = plasma.PlasmaManager("127.0.0.1", self.port1) self.client1 = plasma.PlasmaClient("/tmp/store1", "127.0.0.1", self.port1)
self.manager2 = plasma.PlasmaManager("127.0.0.1", self.port2) self.client2 = plasma.PlasmaClient("/tmp/store2", "127.0.0.1", self.port2)
time.sleep(0.5) time.sleep(0.5)
def tearDown(self): def tearDown(self):
# Kill the nameserver, PlasmaStore and PlasmaManager processes. # Kill the PlasmaStore and PlasmaManager processes.
self.p2.kill() self.p2.kill()
self.p3.kill() self.p3.kill()
self.p4.kill() self.p4.kill()
@ -101,11 +98,11 @@ class TestPlasmaManager(unittest.TestCase):
# Seal the buffer. # Seal the buffer.
self.client1.seal(object_id1) self.client1.seal(object_id1)
# Transfer the buffer to the the other PlasmaStore. # Transfer the buffer to the the other PlasmaStore.
self.manager1.transfer("127.0.0.1", self.port2, object_id1) self.client1.transfer("127.0.0.1", self.port2, object_id1)
# Compare the two buffers. # Compare the two buffers.
self.assertEqual(self.client1.get(object_id1)[:], self.client2.get(object_id1)[:]) self.assertEqual(self.client1.get(object_id1)[:], self.client2.get(object_id1)[:])
# Transfer the buffer again. # Transfer the buffer again.
self.manager1.transfer("127.0.0.1", self.port2, object_id1) self.client1.transfer("127.0.0.1", self.port2, object_id1)
# Compare the two buffers. # Compare the two buffers.
self.assertEqual(self.client1.get(object_id1)[:], self.client2.get(object_id1)[:]) self.assertEqual(self.client1.get(object_id1)[:], self.client2.get(object_id1)[:])
# Create a new object id string. # Create a new object id string.
@ -117,7 +114,7 @@ class TestPlasmaManager(unittest.TestCase):
# Seal the buffer. # Seal the buffer.
self.client2.seal(object_id2) self.client2.seal(object_id2)
# Transfer the buffer to the the other PlasmaStore. # Transfer the buffer to the the other PlasmaStore.
self.manager2.transfer("127.0.0.1", self.port1, object_id2) self.client2.transfer("127.0.0.1", self.port1, object_id2)
# Compare the two buffers. # Compare the two buffers.
self.assertEqual(self.client1.get(object_id2)[:], self.client2.get(object_id2)[:]) self.assertEqual(self.client1.get(object_id2)[:], self.client2.get(object_id2)[:])