Fix Experimental Async API (#7391)

This commit is contained in:
ijrsvt 2020-03-02 20:24:20 -08:00 committed by GitHub
parent 580b017b43
commit 584645cc7d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 28 deletions

View file

@ -332,7 +332,7 @@ script:
- cd ../.. - cd ../..
# random python tests TODO(ekl): these should be moved to bazel # random python tests TODO(ekl): these should be moved to bazel
# - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -m pytest -v --durations=5 --timeout=300 python/ray/experimental/test/async_test.py; fi - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -m pytest -v --durations=5 --timeout=300 python/ray/experimental/test/async_test.py; fi
# bazel python tests. This should be run last to keep its logs at the end of travis logs. # bazel python tests. This should be run last to keep its logs at the end of travis logs.
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then ./ci/keep_alive bazel test --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors --test_tag_filters=-jenkins_only python/ray/tests/...; fi - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then ./ci/keep_alive bazel test --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors --test_tag_filters=-jenkins_only python/ray/tests/...; fi
@ -341,7 +341,7 @@ script:
deploy: deploy:
- provider: s3 - provider: s3
edge: true # This supposedly opts in to deploy v2. edge: true # This supposedly opts in to deploy v2.
access_key_id: AKIAU6DMUCJUFL3EX3SM access_key_id: AKIAU6DMUCJUFL3EX3SM
secret_access_key: secret_access_key:
secure: J1sX71fKFPQhgWzColllxfzcF877ScBZ1cIl71krZ6SO0LKnwsCScpQck5eZOyQo/Iverwye0iKtE87qNsiRi3+V2D9iulSr18T09j7+FjPKfxAmXmjfrNafoMXTDQroSJblCri5vl+DysISPqImJkWTNaYhGJ9QakoSd5djnAopLNWj6PCR3S50baS49+nB5nSIY3jMhtUzlaBdniFPFC81Cxyuafr4pv6McGRfR/dK+ZnPhdGtMnVeIJXB+ooZKQ26mDJKBPka4jm3u1Oa72b/Atu2RO3MwxTg79LTrMxXKh2OcCqhtD2Z3lz1OltvNSunCuwY8AejCJsfSLbM9mGDoz+xhNUWmYNy48YFf+61OY8PXi8S/9Q817yb3GpLbb2l/P+KMgq9eSEiELIOwuYsDxPX5TuAg6dx0wCNgDEBJoThSQjYl6MgJrLrs7p+JBxp3giedHiy0TLa5hCVKTj3euONAXDArYnnT+DvUIOkaeTk5DClRZbZ0sUXhLy//HuT5WJvjFBJJZ0u0f4RLVb5D7DI4uMZr7+yJPDR2AXCyW9YMaBEbmEYbPaKi283jlEyn7R33+AZlnXv0THHwZ4xvjKKG3/fBSXsOUmv5wmUveEqVGDj1mKPGj9NF8iA5qMm2AaZuJpEEBVBZtSlTZt6ZG7rzAJZGNL52t7xuMo= secure: J1sX71fKFPQhgWzColllxfzcF877ScBZ1cIl71krZ6SO0LKnwsCScpQck5eZOyQo/Iverwye0iKtE87qNsiRi3+V2D9iulSr18T09j7+FjPKfxAmXmjfrNafoMXTDQroSJblCri5vl+DysISPqImJkWTNaYhGJ9QakoSd5djnAopLNWj6PCR3S50baS49+nB5nSIY3jMhtUzlaBdniFPFC81Cxyuafr4pv6McGRfR/dK+ZnPhdGtMnVeIJXB+ooZKQ26mDJKBPka4jm3u1Oa72b/Atu2RO3MwxTg79LTrMxXKh2OcCqhtD2Z3lz1OltvNSunCuwY8AejCJsfSLbM9mGDoz+xhNUWmYNy48YFf+61OY8PXi8S/9Q817yb3GpLbb2l/P+KMgq9eSEiELIOwuYsDxPX5TuAg6dx0wCNgDEBJoThSQjYl6MgJrLrs7p+JBxp3giedHiy0TLa5hCVKTj3euONAXDArYnnT+DvUIOkaeTk5DClRZbZ0sUXhLy//HuT5WJvjFBJJZ0u0f4RLVb5D7DI4uMZr7+yJPDR2AXCyW9YMaBEbmEYbPaKi283jlEyn7R33+AZlnXv0THHwZ4xvjKKG3/fBSXsOUmv5wmUveEqVGDj1mKPGj9NF8iA5qMm2AaZuJpEEBVBZtSlTZt6ZG7rzAJZGNL52t7xuMo=
@ -357,7 +357,7 @@ deploy:
condition: $LINUX_WHEELS = 1 || $MAC_WHEELS = 1 condition: $LINUX_WHEELS = 1 || $MAC_WHEELS = 1
- provider: s3 - provider: s3
edge: true # This supposedly opts in to deploy v2. edge: true # This supposedly opts in to deploy v2.
access_key_id: AKIAU6DMUCJUFL3EX3SM access_key_id: AKIAU6DMUCJUFL3EX3SM
secret_access_key: secret_access_key:
secure: J1sX71fKFPQhgWzColllxfzcF877ScBZ1cIl71krZ6SO0LKnwsCScpQck5eZOyQo/Iverwye0iKtE87qNsiRi3+V2D9iulSr18T09j7+FjPKfxAmXmjfrNafoMXTDQroSJblCri5vl+DysISPqImJkWTNaYhGJ9QakoSd5djnAopLNWj6PCR3S50baS49+nB5nSIY3jMhtUzlaBdniFPFC81Cxyuafr4pv6McGRfR/dK+ZnPhdGtMnVeIJXB+ooZKQ26mDJKBPka4jm3u1Oa72b/Atu2RO3MwxTg79LTrMxXKh2OcCqhtD2Z3lz1OltvNSunCuwY8AejCJsfSLbM9mGDoz+xhNUWmYNy48YFf+61OY8PXi8S/9Q817yb3GpLbb2l/P+KMgq9eSEiELIOwuYsDxPX5TuAg6dx0wCNgDEBJoThSQjYl6MgJrLrs7p+JBxp3giedHiy0TLa5hCVKTj3euONAXDArYnnT+DvUIOkaeTk5DClRZbZ0sUXhLy//HuT5WJvjFBJJZ0u0f4RLVb5D7DI4uMZr7+yJPDR2AXCyW9YMaBEbmEYbPaKi283jlEyn7R33+AZlnXv0THHwZ4xvjKKG3/fBSXsOUmv5wmUveEqVGDj1mKPGj9NF8iA5qMm2AaZuJpEEBVBZtSlTZt6ZG7rzAJZGNL52t7xuMo= secure: J1sX71fKFPQhgWzColllxfzcF877ScBZ1cIl71krZ6SO0LKnwsCScpQck5eZOyQo/Iverwye0iKtE87qNsiRi3+V2D9iulSr18T09j7+FjPKfxAmXmjfrNafoMXTDQroSJblCri5vl+DysISPqImJkWTNaYhGJ9QakoSd5djnAopLNWj6PCR3S50baS49+nB5nSIY3jMhtUzlaBdniFPFC81Cxyuafr4pv6McGRfR/dK+ZnPhdGtMnVeIJXB+ooZKQ26mDJKBPka4jm3u1Oa72b/Atu2RO3MwxTg79LTrMxXKh2OcCqhtD2Z3lz1OltvNSunCuwY8AejCJsfSLbM9mGDoz+xhNUWmYNy48YFf+61OY8PXi8S/9Q817yb3GpLbb2l/P+KMgq9eSEiELIOwuYsDxPX5TuAg6dx0wCNgDEBJoThSQjYl6MgJrLrs7p+JBxp3giedHiy0TLa5hCVKTj3euONAXDArYnnT+DvUIOkaeTk5DClRZbZ0sUXhLy//HuT5WJvjFBJJZ0u0f4RLVb5D7DI4uMZr7+yJPDR2AXCyW9YMaBEbmEYbPaKi283jlEyn7R33+AZlnXv0THHwZ4xvjKKG3/fBSXsOUmv5wmUveEqVGDj1mKPGj9NF8iA5qMm2AaZuJpEEBVBZtSlTZt6ZG7rzAJZGNL52t7xuMo=
@ -373,7 +373,7 @@ deploy:
condition: $LINUX_WHEELS = 1 || $MAC_WHEELS = 1 condition: $LINUX_WHEELS = 1 || $MAC_WHEELS = 1
- provider: script - provider: script
edge: true # This supposedly opts in to deploy v2. edge: true # This supposedly opts in to deploy v2.
script: bash $TRAVIS_BUILD_DIR/ci/travis/build-autoscaler-images.sh || true script: bash $TRAVIS_BUILD_DIR/ci/travis/build-autoscaler-images.sh || true
skip_cleanup: true skip_cleanup: true
on: on:

View file

@ -10,6 +10,23 @@ class PlasmaObjectFuture(asyncio.Future):
pass pass
def _complete_future(event_handler, ray_object_id):
# TODO(ilr): Consider race condition between popping from the
# waiting_dict and as_future appending to the waiting_dict's list.
logger.debug(
"Completing plasma futures for object id {}".format(ray_object_id))
obj = event_handler._worker.get_objects([ray_object_id], timeout=0)[0]
futures = event_handler._waiting_dict.pop(ray_object_id)
for fut in futures:
try:
fut.set_result(obj)
except asyncio.InvalidStateError:
# Avoid issues where process_notifications
# and check_immediately both get executed
logger.debug("Failed to set result for future {}."
"Most likely already set.".format(fut))
class PlasmaEventHandler: class PlasmaEventHandler:
"""This class is an event handler for Plasma.""" """This class is an event handler for Plasma."""
@ -23,7 +40,10 @@ class PlasmaEventHandler:
"""Process notifications.""" """Process notifications."""
for object_id, object_size, metadata_size in messages: for object_id, object_size, metadata_size in messages:
if object_size > 0 and object_id in self._waiting_dict: if object_size > 0 and object_id in self._waiting_dict:
self._complete_future(object_id) # This must be asynchronous to allow objects to be locally
# received
self._loop.call_soon_threadsafe(_complete_future, self,
object_id)
def close(self): def close(self):
"""Clean up this handler.""" """Clean up this handler."""
@ -31,32 +51,10 @@ class PlasmaEventHandler:
for fut in futures: for fut in futures:
fut.cancel() fut.cancel()
def _complete_future(self, ray_object_id):
# TODO(ilr): Consider race condition between popping from the
# waiting_dict and as_future appending to the waiting_dict's list.
logger.debug(
"Completing plasma futures for object id {}".format(ray_object_id))
obj = self._worker.get_objects([ray_object_id])[0]
futures = self._waiting_dict.pop(ray_object_id)
for fut in futures:
loop = fut._loop
def complete_closure():
try:
fut.set_result(obj)
except asyncio.InvalidStateError:
# Avoid issues where process_notifications
# and check_ready both get executed
logger.debug("Failed to set result for future {}."
"Most likely already set.".format(fut))
loop.call_soon_threadsafe(complete_closure)
def check_immediately(self, object_id): def check_immediately(self, object_id):
ready, _ = ray.wait([object_id], timeout=0) ready, _ = ray.wait([object_id], timeout=0)
if ready: if ready:
self._complete_future(object_id) _complete_future(self, object_id)
def as_future(self, object_id, check_ready=True): def as_future(self, object_id, check_ready=True):
"""Turn an object_id into a Future object. """Turn an object_id into a Future object.

View file

@ -1399,6 +1399,7 @@ void CoreWorker::GetAsync(const ObjectID &object_id, SetResultCallback success_c
void CoreWorker::SubscribeToAsyncPlasma(PlasmaSubscriptionCallback subscribe_callback) { void CoreWorker::SubscribeToAsyncPlasma(PlasmaSubscriptionCallback subscribe_callback) {
plasma_notifier_->SubscribeObjAdded( plasma_notifier_->SubscribeObjAdded(
[subscribe_callback](const object_manager::protocol::ObjectInfoT &info) { [subscribe_callback](const object_manager::protocol::ObjectInfoT &info) {
// This callback must be asynchronous to allow plasma to receive objects
subscribe_callback(ObjectID::FromPlasmaIdBinary(info.object_id), info.data_size, subscribe_callback(ObjectID::FromPlasmaIdBinary(info.object_id), info.data_size,
info.metadata_size); info.metadata_size);
}); });