[tune] Remove hack to serve pin requests off thread (#2680)

* nopin

* fix
This commit is contained in:
Eric Liang 2018-08-18 13:19:52 -07:00 committed by GitHub
parent aaf5456b3d
commit e56eb354eb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 0 additions and 30 deletions

View file

@ -9,7 +9,6 @@ import traceback
from ray.tune import TuneError
from ray.tune.trainable import Trainable
from ray.tune.result import TIMESTEPS_TOTAL
from ray.tune.util import _serve_get_pin_requests
class StatusReporter(object):
@ -108,7 +107,6 @@ class FunctionRunner(Trainable):
self._default_config["script_min_iter_time_s"]))
result = self._status_reporter._get_and_clear_status()
while result is None:
_serve_get_pin_requests()
time.sleep(1)
result = self._status_reporter._get_and_clear_status()

View file

@ -2,15 +2,12 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from six.moves import queue
import base64
import numpy as np
import threading
import ray
_pinned_objects = []
_fetch_requests = queue.Queue()
PINNED_OBJECT_PREFIX = "ray.tune.PinnedObject:"
@ -33,36 +30,11 @@ def get_pinned_object(pinned_id):
from ray.local_scheduler import ObjectID
if threading.current_thread().getName() != "MainThread":
placeholder = queue.Queue()
_fetch_requests.put((placeholder, pinned_id))
print("Requesting main thread to fetch pinned object", pinned_id)
return placeholder.get()
return _from_pinnable(
ray.get(
ObjectID(base64.b64decode(pinned_id[len(PINNED_OBJECT_PREFIX):]))))
def _serve_get_pin_requests():
"""This is hack to avoid ray.get() on the function runner thread.
The issue is that we run trainable functions on a separate thread,
which cannot access Ray API methods. So instead, that thread puts the
fetch in a queue that is periodically checked from the main thread.
"""
assert threading.current_thread().getName() == "MainThread"
try:
while not _fetch_requests.empty():
(placeholder, pinned_id) = _fetch_requests.get_nowait()
print("Fetching pinned object from main thread", pinned_id)
placeholder.put(get_pinned_object(pinned_id))
except queue.Empty:
pass
def _to_pinnable(obj):
"""Converts obj to a form that can be pinned in object store memory.