Run core worker tests in thread sanitizer and fix thread safety issues (#6701)

This commit is contained in:
Philipp Moritz 2020-01-05 16:18:21 -08:00 committed by GitHub
parent cc110ff1e3
commit e15bd8ff1a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 81 additions and 23 deletions

View file

@ -29,3 +29,11 @@ build --http_timeout_scaling=5.0
# This workaround is due to an incompatibility of
# bazel_common/tools/maven/pom_file.bzl with Bazel 1.0
build --incompatible_depset_is_not_iterable=false
# Thread sanitizer configuration:
build:tsan --strip=never
build:tsan --copt -fsanitize=thread
build:tsan --copt -DTHREAD_SANITIZER
build:tsan --copt -g
build:tsan --copt -fno-omit-frame-pointer
build:tsan --linkopt -fsanitize=thread

View file

@ -80,6 +80,22 @@ matrix:
- go get github.com/bazelbuild/buildtools/buildifier
- ./ci/travis/bazel-format.sh
- os: linux
env: SANITIZER=1 CC=clang PYTHON=3.5 PYTHONWARNINGS=ignore
install:
- eval `python $TRAVIS_BUILD_DIR/ci/travis/determine_tests_to_run.py`
- if [ $RAY_CI_PYTHON_AFFECTED != "1" ]; then exit; fi
- ./ci/suppress_output ./ci/travis/install-bazel.sh
- ./ci/suppress_output ./ci/travis/install-dependencies.sh
- export PATH="$HOME/miniconda/bin:$PATH"
- ./ci/suppress_output ./ci/travis/install-ray.sh
script:
# Run core worker tests with thread sanitizer
- RAY_BAZEL_CONFIG="--config=tsan" TSAN_OPTIONS="report_atomic_races=0" ./ci/suppress_output bash src/ray/test/run_core_worker_tests.sh
# Build Linux wheels.
- os: linux
env: LINUX_WHEELS=1 PYTHONWARNINGS=ignore

View file

@ -44,7 +44,7 @@ ActorHandle::ActorHandle(const std::string &serialized)
void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder,
const TaskTransportType transport_type,
const ObjectID new_cursor) {
std::unique_lock<std::mutex> guard(mutex_);
absl::MutexLock guard(&mutex_);
// Build actor task spec.
const TaskID actor_creation_task_id = TaskID::ForActorCreationTask(GetActorID());
const ObjectID actor_creation_dummy_object_id =
@ -59,7 +59,7 @@ void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder,
void ActorHandle::Serialize(std::string *output) { inner_.SerializeToString(output); }
void ActorHandle::Reset() {
std::unique_lock<std::mutex> guard(mutex_);
absl::MutexLock guard(&mutex_);
task_counter_ = 0;
actor_cursor_ = ObjectID::FromBinary(inner_.actor_cursor());
}

View file

@ -54,10 +54,16 @@ class ActorHandle {
void Reset();
// Mark the actor handle as dead.
void MarkDead() { state_ = rpc::ActorTableData::DEAD; }
void MarkDead() {
absl::MutexLock lock(&mutex_);
state_ = rpc::ActorTableData::DEAD;
}
// Returns whether the actor is known to be dead.
bool IsDead() const { return state_ == rpc::ActorTableData::DEAD; }
bool IsDead() const {
absl::MutexLock lock(&mutex_);
return state_ == rpc::ActorTableData::DEAD;
}
private:
// Protobuf-defined persistent state of the actor handle.
@ -65,18 +71,18 @@ class ActorHandle {
/// The actor's state (alive or dead). This defaults to ALIVE. Once marked
/// DEAD, the actor handle can never go back to being ALIVE.
rpc::ActorTableData::ActorState state_ = rpc::ActorTableData::ALIVE;
rpc::ActorTableData::ActorState state_ GUARDED_BY(mutex_) = rpc::ActorTableData::ALIVE;
/// The unique id of the dummy object returned by the previous task.
/// TODO: This can be removed once we schedule actor tasks by task counter
/// only.
// TODO: Save this state in the core worker.
ObjectID actor_cursor_;
ObjectID actor_cursor_ GUARDED_BY(mutex_);
// Number of tasks that have been submitted on this handle.
uint64_t task_counter_ = 0;
uint64_t task_counter_ GUARDED_BY(mutex_) = 0;
/// Guards actor_cursor_ and task_counter_.
std::mutex mutex_;
/// Mutex to protect fields in the actor handle.
mutable absl::Mutex mutex_;
FRIEND_TEST(ZeroNodeTest, TestActorHandle);
};

View file

@ -3,6 +3,7 @@
#include <grpcpp/grpcpp.h>
#include <boost/asio.hpp>
#include "absl/synchronization/mutex.h"
#include "ray/common/grpc_util.h"
#include "ray/common/status.h"
@ -22,6 +23,8 @@ class ClientCall {
virtual void OnReplyReceived() = 0;
/// Return status.
virtual ray::Status GetStatus() = 0;
/// Set return status.
virtual void SetReturnStatus() = 0;
virtual ~ClientCall() = default;
};
@ -46,11 +49,24 @@ class ClientCallImpl : public ClientCall {
/// \param[in] callback The callback function to handle the reply.
explicit ClientCallImpl(const ClientCallback<Reply> &callback) : callback_(callback) {}
Status GetStatus() override { return GrpcStatusToRayStatus(status_); }
Status GetStatus() override {
absl::MutexLock lock(&mutex_);
return return_status_;
}
void SetReturnStatus() override {
absl::MutexLock lock(&mutex_);
return_status_ = GrpcStatusToRayStatus(status_);
}
void OnReplyReceived() override {
ray::Status status;
{
absl::MutexLock lock(&mutex_);
status = return_status_;
}
if (callback_ != nullptr) {
callback_(GrpcStatusToRayStatus(status_), reply_);
callback_(status, reply_);
}
}
@ -67,6 +83,16 @@ class ClientCallImpl : public ClientCall {
/// gRPC status of this request.
grpc::Status status_;
/// Mutex to protect the return_status_ field.
absl::Mutex mutex_;
/// This is the status to be returned from GetStatus(). It is safe
/// to read from other threads while they hold mutex_. We have
/// return_status_ = GrpcStatusToRayStatus(status_) but need
/// a separate variable because status_ is set internally by
/// GRPC and we cannot control it holding the lock.
ray::Status return_status_ GUARDED_BY(mutex_);
/// Context for the client. It could be used to convey extra information to
/// the server and/or tweak certain RPC behaviors.
grpc::ClientContext context_;
@ -205,6 +231,7 @@ class ClientCallManager {
break;
} else if (status != grpc::CompletionQueue::TIMEOUT) {
auto tag = reinterpret_cast<ClientCallTag *>(got_tag);
tag->GetCall()->SetReturnStatus();
if (ok && !main_service_.stopped() && !shutdown_) {
// Post the callback to the main event loop.
main_service_.post([tag]() {

View file

@ -22,7 +22,7 @@ fi
set -e
set -x
bazel build "//:core_worker_test" "//:mock_worker" "//:raylet" "//:raylet_monitor" "//:libray_redis_module.so" "@plasma//:plasma_store_server"
bazel build -c dbg $RAY_BAZEL_CONFIG "//:core_worker_test" "//:mock_worker" "//:raylet" "//:raylet_monitor" "//:libray_redis_module.so" "@plasma//:plasma_store_server"
# Get the directory in which this script is executing.
SCRIPT_DIR="`dirname \"$0\"`"
@ -39,24 +39,25 @@ if [ ! -d "$RAY_ROOT/python" ]; then
fi
REDIS_MODULE="./bazel-bin/libray_redis_module.so"
BAZEL_BIN_PREFIX="$(bazel info -c dbg $RAY_BAZEL_CONFIG bazel-bin)"
LOAD_MODULE_ARGS="--loadmodule ${REDIS_MODULE}"
STORE_EXEC="./bazel-bin/external/plasma/plasma_store_server"
RAYLET_EXEC="./bazel-bin/raylet"
RAYLET_MONITOR_EXEC="./bazel-bin/raylet_monitor"
MOCK_WORKER_EXEC="./bazel-bin/mock_worker"
STORE_EXEC="$BAZEL_BIN_PREFIX/external/plasma/plasma_store_server"
RAYLET_EXEC="$BAZEL_BIN_PREFIX/raylet"
RAYLET_MONITOR_EXEC="$BAZEL_BIN_PREFIX/raylet_monitor"
MOCK_WORKER_EXEC="$BAZEL_BIN_PREFIX/mock_worker"
# Allow cleanup commands to fail.
bazel run //:redis-cli -- -p 6379 shutdown || true
bazel run "//:redis-cli" -- -p 6379 shutdown || true
sleep 1s
bazel run //:redis-cli -- -p 6380 shutdown || true
bazel run "//:redis-cli" -- -p 6380 shutdown || true
sleep 1s
bazel run //:redis-server -- --loglevel warning ${LOAD_MODULE_ARGS} --port 6379 &
bazel run "//:redis-server" -- --loglevel warning ${LOAD_MODULE_ARGS} --port 6379 &
sleep 2s
bazel run //:redis-server -- --loglevel warning ${LOAD_MODULE_ARGS} --port 6380 &
bazel run "//:redis-server" -- --loglevel warning ${LOAD_MODULE_ARGS} --port 6380 &
sleep 2s
# Run tests.
./bazel-bin/core_worker_test $STORE_EXEC $RAYLET_EXEC $RAYLET_PORT $RAYLET_MONITOR_EXEC $MOCK_WORKER_EXEC
bazel run -c dbg $RAY_BAZEL_CONFIG "//:core_worker_test" $STORE_EXEC $RAYLET_EXEC $RAYLET_PORT $RAYLET_MONITOR_EXEC $MOCK_WORKER_EXEC
sleep 1s
bazel run //:redis-cli -- -p 6379 shutdown
bazel run //:redis-cli -- -p 6380 shutdown
bazel run "//:redis-cli" -- -p 6379 shutdown
bazel run "//:redis-cli" -- -p 6380 shutdown
sleep 1s