Merge pull request #130 from amplab/memoryfix

Fix memory leaks for Python dicts in C extension
This commit is contained in:
Robert Nishihara 2016-06-20 12:06:19 -07:00 committed by GitHub
commit 0f9ce8a856
2 changed files with 40 additions and 7 deletions

View file

@ -183,6 +183,20 @@ void TaskCapsule_Destructor(PyObject* capsule) {
delete obj;
}
// Helper methods
// Pass ownership of both the key and the value to the PyDict.
// This is only required for PyDicts, not for PyLists or PyTuples, compare
// https://docs.python.org/2/c-api/dict.html
// https://docs.python.org/2/c-api/list.html
// https://docs.python.org/2/c-api/tuple.html
void set_dict_item_and_transfer_ownership(PyObject* dict, PyObject* key, PyObject* val) {
PyDict_SetItem(dict, key, val);
Py_XDECREF(key);
Py_XDECREF(val);
}
// Serialization
// serialize will serialize the python object val into the protocol buffer
@ -364,7 +378,9 @@ PyObject* deserialize(PyObject* worker_capsule, const Obj& obj, std::vector<ObjR
PyObject* dict = PyDict_New();
size_t size = data.elem_size();
for (size_t i = 0; i < size; ++i) {
PyDict_SetItem(dict, deserialize(worker_capsule, data.elem(i).key(), objrefs), deserialize(worker_capsule, data.elem(i).value(), objrefs));
PyObject* pykey = deserialize(worker_capsule, data.elem(i).key(), objrefs);
PyObject* pyval = deserialize(worker_capsule, data.elem(i).value(), objrefs);
set_dict_item_and_transfer_ownership(dict, pykey, pyval);
}
return dict;
} else if (obj.has_string_data()) {
@ -785,8 +801,8 @@ PyObject* scheduler_info(PyObject* self, PyObject* args) {
}
PyObject* dict = PyDict_New();
PyDict_SetItem(dict, PyString_FromString("target_objrefs"), target_objref_list);
PyDict_SetItem(dict, PyString_FromString("reference_counts"), reference_count_list);
set_dict_item_and_transfer_ownership(dict, PyString_FromString("target_objrefs"), target_objref_list);
set_dict_item_and_transfer_ownership(dict, PyString_FromString("reference_counts"), reference_count_list);
return dict;
}
@ -804,14 +820,14 @@ PyObject* task_info(PyObject* self, PyObject* args) {
for (size_t i = 0; i < reply.failed_task_size(); ++i) {
const TaskStatus& info = reply.failed_task(i);
PyObject* info_dict = PyDict_New();
PyDict_SetItem(info_dict, PyString_FromString("worker_address"), PyString_FromStringAndSize(info.worker_address().data(), info.worker_address().size()));
PyDict_SetItem(info_dict, PyString_FromString("operationid"), PyInt_FromLong(info.operationid()));
PyDict_SetItem(info_dict, PyString_FromString("error_message"), PyString_FromStringAndSize(info.error_message().data(), info.error_message().size()));
set_dict_item_and_transfer_ownership(info_dict, PyString_FromString("worker_address"), PyString_FromStringAndSize(info.worker_address().data(), info.worker_address().size()));
set_dict_item_and_transfer_ownership(info_dict, PyString_FromString("operationid"), PyInt_FromLong(info.operationid()));
set_dict_item_and_transfer_ownership(info_dict, PyString_FromString("error_message"), PyString_FromStringAndSize(info.error_message().data(), info.error_message().size()));
PyList_SetItem(failed_tasks_list, i, info_dict);
}
PyObject* dict = PyDict_New();
PyDict_SetItem(dict, PyString_FromString("failed_tasks"), failed_tasks_list);
set_dict_item_and_transfer_ownership(dict, PyString_FromString("failed_tasks"), failed_tasks_list);
return dict;
}

View file

@ -0,0 +1,17 @@
# This code reproduces a memory leak we had in the past
import os
import numpy as np
import ray
import ray.worker
import ray.services as services
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py")
services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=worker_path)
d = {"w": np.zeros(1000000)}
obj_capsule, contained_objrefs = ray.lib.serialize_object(ray.worker.global_worker.handle, d)
while True:
ray.lib.deserialize_object(ray.worker.global_worker.handle, obj_capsule)