2016-08-16 16:52:16 -07:00
|
|
|
import os
|
2016-08-16 15:38:45 -07:00
|
|
|
import socket
|
|
|
|
import ctypes
|
2016-10-14 19:27:17 -07:00
|
|
|
import time
|
2016-08-16 15:38:45 -07:00
|
|
|
|
|
|
|
Addr = ctypes.c_ubyte * 4
|
|
|
|
|
2016-10-14 19:27:17 -07:00
|
|
|
PLASMA_ID_SIZE = 20
|
|
|
|
ID = ctypes.c_ubyte * PLASMA_ID_SIZE
|
2016-08-16 15:38:45 -07:00
|
|
|
|
|
|
|
class PlasmaID(ctypes.Structure):
|
|
|
|
_fields_ = [("plasma_id", ID)]
|
|
|
|
|
|
|
|
def make_plasma_id(string):
|
2016-10-14 19:27:17 -07:00
|
|
|
if len(string) != PLASMA_ID_SIZE:
|
|
|
|
raise Exception("PlasmaIDs must be {} characters long".format(PLASMA_ID_SIZE))
|
2016-08-16 15:38:45 -07:00
|
|
|
object_id = map(ord, string)
|
|
|
|
return PlasmaID(plasma_id=ID(*object_id))
|
|
|
|
|
2016-10-21 00:47:34 -07:00
|
|
|
class PlasmaBuffer(object):
|
|
|
|
"""This is the type of objects returned by calls to get with a PlasmaClient.
|
|
|
|
|
|
|
|
We define our own class instead of directly returning a buffer object so that
|
|
|
|
we can add a custom destructor which notifies Plasma that the object is no
|
|
|
|
longer being used, so the memory in the Plasma store backing the object can
|
|
|
|
potentially be freed.
|
|
|
|
|
|
|
|
Attributes:
|
|
|
|
buffer (buffer): A buffer containing an object in the Plasma store.
|
|
|
|
plasma_id (PlasmaID): The ID of the object in the buffer.
|
|
|
|
plasma_client (PlasmaClient): The PlasmaClient that we use to communicate
|
|
|
|
with the store and manager.
|
|
|
|
"""
|
|
|
|
def __init__(self, buff, plasma_id, plasma_client):
|
|
|
|
"""Initialize a PlasmaBuffer."""
|
|
|
|
self.buffer = buff
|
|
|
|
self.plasma_id = plasma_id
|
|
|
|
self.plasma_client = plasma_client
|
|
|
|
|
|
|
|
def __del__(self):
|
|
|
|
"""Notify Plasma that the object is no longer needed."""
|
|
|
|
self.plasma_client.client.plasma_release(self.plasma_client.plasma_conn, self.plasma_id)
|
|
|
|
|
|
|
|
def __getitem__(self, index):
|
|
|
|
"""Read from the PlasmaBuffer as if it were just a regular buffer."""
|
|
|
|
return self.buffer[index]
|
|
|
|
|
|
|
|
def __setitem__(self, index, value):
|
|
|
|
"""Write to the PlasmaBuffer as if it were just a regular buffer.
|
|
|
|
|
|
|
|
This should fail because the buffer should be read only.
|
|
|
|
"""
|
|
|
|
self.buffer[index] = value
|
|
|
|
|
|
|
|
def __len__(self):
|
|
|
|
"""Return the length of the buffer."""
|
|
|
|
return len(self.buffer)
|
|
|
|
|
2016-08-16 15:38:45 -07:00
|
|
|
class PlasmaClient(object):
|
2016-09-05 15:34:11 -07:00
|
|
|
"""The PlasmaClient is used to interface with a plasma store and a plasma manager.
|
2016-08-18 09:56:20 -07:00
|
|
|
|
|
|
|
The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a
|
|
|
|
buffer, and get a buffer. Buffers are referred to by object IDs, which are
|
|
|
|
strings.
|
|
|
|
"""
|
|
|
|
|
2016-10-28 21:26:54 -07:00
|
|
|
def __init__(self, store_socket_name, manager_socket_name=None):
|
2016-09-05 15:34:11 -07:00
|
|
|
"""Initialize the PlasmaClient.
|
2016-09-14 14:20:34 -07:00
|
|
|
|
2016-09-05 15:34:11 -07:00
|
|
|
Args:
|
2016-10-28 21:26:54 -07:00
|
|
|
store_socket_name (str): Name of the socket the plasma store is listening at.
|
|
|
|
manager_socket_name (str): Name of the socket the plasma manager is listening at.
|
2016-09-05 15:34:11 -07:00
|
|
|
"""
|
2016-09-14 14:21:24 -07:00
|
|
|
|
2016-08-16 16:52:16 -07:00
|
|
|
plasma_client_library = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_client.so")
|
|
|
|
self.client = ctypes.cdll.LoadLibrary(plasma_client_library)
|
2016-08-16 15:38:45 -07:00
|
|
|
|
2016-10-18 18:20:59 -07:00
|
|
|
self.client.plasma_connect.restype = ctypes.c_void_p
|
2016-09-05 15:34:11 -07:00
|
|
|
self.client.plasma_create.restype = None
|
|
|
|
self.client.plasma_get.restype = None
|
2016-10-21 00:47:34 -07:00
|
|
|
self.client.plasma_release.restype = None
|
2016-09-23 15:07:50 -07:00
|
|
|
self.client.plasma_contains.restype = None
|
2016-08-16 15:38:45 -07:00
|
|
|
self.client.plasma_seal.restype = None
|
2016-09-23 15:07:50 -07:00
|
|
|
self.client.plasma_delete.restype = None
|
2016-10-14 19:27:17 -07:00
|
|
|
self.client.plasma_subscribe.restype = ctypes.c_int
|
2016-08-16 15:38:45 -07:00
|
|
|
|
|
|
|
self.buffer_from_memory = ctypes.pythonapi.PyBuffer_FromMemory
|
|
|
|
self.buffer_from_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64]
|
|
|
|
self.buffer_from_memory.restype = ctypes.py_object
|
|
|
|
|
|
|
|
self.buffer_from_read_write_memory = ctypes.pythonapi.PyBuffer_FromReadWriteMemory
|
|
|
|
self.buffer_from_read_write_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64]
|
|
|
|
self.buffer_from_read_write_memory.restype = ctypes.py_object
|
|
|
|
|
2016-10-28 21:26:54 -07:00
|
|
|
if manager_socket_name is not None:
|
2016-10-18 18:20:59 -07:00
|
|
|
self.has_manager_conn = True
|
2016-10-28 21:26:54 -07:00
|
|
|
self.plasma_conn = ctypes.c_void_p(self.client.plasma_connect(store_socket_name, manager_socket_name))
|
2016-09-05 15:34:11 -07:00
|
|
|
else:
|
2016-10-18 18:20:59 -07:00
|
|
|
self.has_manager_conn = False
|
2016-10-28 21:26:54 -07:00
|
|
|
self.plasma_conn = ctypes.c_void_p(self.client.plasma_connect(store_socket_name, None))
|
2016-09-05 15:34:11 -07:00
|
|
|
|
2016-09-14 14:20:34 -07:00
|
|
|
def create(self, object_id, size, metadata=None):
|
2016-08-18 09:56:20 -07:00
|
|
|
"""Create a new buffer in the PlasmaStore for a particular object ID.
|
|
|
|
|
|
|
|
The returned buffer is mutable until seal is called.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
object_id (str): A string used to identify an object.
|
|
|
|
size (int): The size in bytes of the created buffer.
|
2016-09-14 14:20:34 -07:00
|
|
|
metadata (buffer): An optional buffer encoding whatever metadata the user
|
|
|
|
wishes to encode.
|
2016-08-18 09:56:20 -07:00
|
|
|
"""
|
2016-09-14 14:20:34 -07:00
|
|
|
# This is used to hold the address of the buffer.
|
2016-09-05 15:34:11 -07:00
|
|
|
data = ctypes.c_void_p()
|
2016-09-14 14:20:34 -07:00
|
|
|
# Turn the metadata into the right type.
|
|
|
|
metadata = buffer("") if metadata is None else metadata
|
|
|
|
metadata = (ctypes.c_ubyte * len(metadata)).from_buffer_copy(metadata)
|
2016-10-18 18:20:59 -07:00
|
|
|
self.client.plasma_create(self.plasma_conn, make_plasma_id(object_id), size, ctypes.cast(metadata, ctypes.POINTER(ctypes.c_ubyte * len(metadata))), len(metadata), ctypes.byref(data))
|
2016-10-21 00:47:34 -07:00
|
|
|
return PlasmaBuffer(self.buffer_from_read_write_memory(data, size), make_plasma_id(object_id), self)
|
2016-08-16 15:38:45 -07:00
|
|
|
|
|
|
|
def get(self, object_id):
|
2016-08-18 09:56:20 -07:00
|
|
|
"""Create a buffer from the PlasmaStore based on object ID.
|
|
|
|
|
2016-09-14 14:20:34 -07:00
|
|
|
If the object has not been sealed yet, this call will block. The retrieved
|
|
|
|
buffer is immutable.
|
2016-08-18 09:56:20 -07:00
|
|
|
|
|
|
|
Args:
|
|
|
|
object_id (str): A string used to identify an object.
|
|
|
|
"""
|
2016-09-05 15:34:11 -07:00
|
|
|
size = ctypes.c_int64()
|
|
|
|
data = ctypes.c_void_p()
|
2016-09-14 14:20:34 -07:00
|
|
|
metadata_size = ctypes.c_int64()
|
|
|
|
metadata = ctypes.c_void_p()
|
2016-10-21 00:47:34 -07:00
|
|
|
self.client.plasma_get(self.plasma_conn, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data), ctypes.byref(metadata_size), ctypes.byref(metadata))
|
|
|
|
return PlasmaBuffer(self.buffer_from_memory(data, size), make_plasma_id(object_id), self)
|
2016-08-16 16:52:16 -07:00
|
|
|
|
2016-09-14 14:20:34 -07:00
|
|
|
def get_metadata(self, object_id):
|
|
|
|
"""Create a buffer from the PlasmaStore based on object ID.
|
|
|
|
|
|
|
|
If the object has not been sealed yet, this call will block until the object
|
|
|
|
has been sealed. The retrieved buffer is immutable.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
object_id (str): A string used to identify an object.
|
|
|
|
"""
|
|
|
|
size = ctypes.c_int64()
|
|
|
|
data = ctypes.c_void_p()
|
|
|
|
metadata_size = ctypes.c_int64()
|
|
|
|
metadata = ctypes.c_void_p()
|
2016-10-21 00:47:34 -07:00
|
|
|
self.client.plasma_get(self.plasma_conn, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data), ctypes.byref(metadata_size), ctypes.byref(metadata))
|
|
|
|
return PlasmaBuffer(self.buffer_from_memory(metadata, metadata_size), make_plasma_id(object_id), self)
|
2016-09-14 14:20:34 -07:00
|
|
|
|
2016-09-23 15:07:50 -07:00
|
|
|
def contains(self, object_id):
|
|
|
|
"""Check if the object is present and has been sealed in the PlasmaStore.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
object_id (str): A string used to identify an object.
|
|
|
|
"""
|
|
|
|
has_object = ctypes.c_int()
|
2016-10-18 18:20:59 -07:00
|
|
|
self.client.plasma_contains(self.plasma_conn, make_plasma_id(object_id), ctypes.byref(has_object))
|
2016-09-23 15:07:50 -07:00
|
|
|
has_object = has_object.value
|
|
|
|
if has_object == 1:
|
|
|
|
return True
|
|
|
|
elif has_object == 0:
|
|
|
|
return False
|
|
|
|
else:
|
|
|
|
raise Exception("This code should be unreachable.")
|
|
|
|
|
2016-08-16 15:38:45 -07:00
|
|
|
def seal(self, object_id):
|
2016-08-18 09:56:20 -07:00
|
|
|
"""Seal the buffer in the PlasmaStore for a particular object ID.
|
|
|
|
|
|
|
|
Once a buffer has been sealed, the buffer is immutable and can only be
|
|
|
|
accessed through get.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
object_id (str): A string used to identify an object.
|
|
|
|
"""
|
2016-10-18 18:20:59 -07:00
|
|
|
self.client.plasma_seal(self.plasma_conn, make_plasma_id(object_id))
|
2016-09-05 15:34:11 -07:00
|
|
|
|
2016-09-23 15:07:50 -07:00
|
|
|
def delete(self, object_id):
|
|
|
|
"""Delete the buffer in the PlasmaStore for a particular object ID.
|
|
|
|
|
|
|
|
Once a buffer has been deleted, the buffer is no longer accessible.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
object_id (str): A string used to identify an object.
|
|
|
|
"""
|
2016-10-18 18:20:59 -07:00
|
|
|
self.client.plasma_delete(self.plasma_conn, make_plasma_id(object_id))
|
2016-09-23 15:07:50 -07:00
|
|
|
|
2016-09-05 15:34:11 -07:00
|
|
|
def transfer(self, addr, port, object_id):
|
|
|
|
"""Transfer local object with id object_id to another plasma instance
|
2016-09-14 14:20:34 -07:00
|
|
|
|
2016-09-05 15:34:11 -07:00
|
|
|
Args:
|
|
|
|
addr (str): IPv4 address of the plasma instance the object is sent to.
|
|
|
|
port (int): Port number of the plasma instance the object is sent to.
|
|
|
|
object_id (str): A string used to identify an object.
|
|
|
|
"""
|
2016-10-18 18:20:59 -07:00
|
|
|
if not self.has_manager_conn:
|
|
|
|
raise Exception("Not connected to the plasma manager socket")
|
|
|
|
self.client.plasma_transfer(self.plasma_conn, addr, port, make_plasma_id(object_id))
|
|
|
|
|
|
|
|
def fetch(self, object_ids):
|
|
|
|
"""Fetch the object with id object_id from another plasma manager instance.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
object_id (str): A string used to identify an object.
|
|
|
|
"""
|
|
|
|
object_id_array = (len(object_ids) * PlasmaID)()
|
|
|
|
for i, object_id in enumerate(object_ids):
|
|
|
|
object_id_array[i] = make_plasma_id(object_id)
|
|
|
|
success_array = (len(object_ids) * ctypes.c_int)()
|
|
|
|
if not self.has_manager_conn:
|
2016-09-05 15:34:11 -07:00
|
|
|
raise Exception("Not connected to the plasma manager socket")
|
2016-10-18 18:20:59 -07:00
|
|
|
self.client.plasma_fetch(self.plasma_conn,
|
|
|
|
object_id_array._length_,
|
|
|
|
object_id_array,
|
|
|
|
success_array);
|
|
|
|
return [bool(success) for success in success_array]
|
2016-10-14 19:27:17 -07:00
|
|
|
|
|
|
|
def subscribe(self):
|
|
|
|
"""Subscribe to notifications about sealed objects."""
|
2016-10-18 18:20:59 -07:00
|
|
|
fd = self.client.plasma_subscribe(self.plasma_conn)
|
2016-10-14 19:27:17 -07:00
|
|
|
self.notification_sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
|
# Make the socket non-blocking.
|
|
|
|
self.notification_sock.setblocking(0)
|
|
|
|
|
|
|
|
def get_next_notification(self):
|
|
|
|
"""Get the next notification from the notification socket."""
|
|
|
|
if not self.notification_sock:
|
|
|
|
raise Exception("To get notifications, first call subscribe.")
|
|
|
|
# Loop until we've read PLASMA_ID_SIZE bytes from the socket.
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
message_data = self.notification_sock.recv(PLASMA_ID_SIZE)
|
|
|
|
except socket.error:
|
|
|
|
time.sleep(0.001)
|
|
|
|
else:
|
|
|
|
assert len(message_data) == PLASMA_ID_SIZE
|
|
|
|
break
|
|
|
|
return message_data
|