Why are these changes needed?
The current logic can cause resource leak if AllocateTaskResourceInstances is requested with the custom resources that don't exist in the local node. The original assumption was the caller will free resources when it returns false, but it is an error prone API, and it actually turns out that we don't do this anywhere.
Related issue number
Closes#17044
* Fix DDPG, since it is based on GenericOffPolicyTrainer.
* Fix QMix, SAC, and MADDPA too.
* Undo QMix change.
* Fix DQN input batch type. Always use SampleBatch.
* apex ddpg should not use replay_buffer_config yet.
* Make eager tf policy to use SampleBatch.
* lint
* LINT.
* Re-enable RLlib broken tests to make sure things work ok now.
* fixes.
Co-authored-by: sven1977 <svenmika1977@gmail.com>
* Revert "[CI] Remove config that disables Bazel test result cache (#18701)"
This reverts commit 098ff36faa.
* Remove all RLlib tests from BUILD that currently fail.
Co-authored-by: sven1977 <svenmika1977@gmail.com>
## Why are these changes needed?
We have concern that grpc based broadcasting might have negative impact on pg related workload. This test is to ensure it's running well before merging.
## Related issue number
#19438
This PR adds more infrastructure for subscribing to GCS via ray::pubsub instead of Redis.
Most important logic added are
GCS subscriber RPC interface in src/ray/protobuf/gcs_service.proto
GCS subscriber handler in src/ray/gcs/gcs_server/pubsub_handler.{h,cc}
GCS wrapper for ray::pubsub subscriber in src/ray/gcs/pubsub/gcs_pub_sub.{h,cc}
Other files are modified for adding boilerplates, plumbing, removing dead code and cleanups.
This PR can also be reviewed commit by commit. 418f065, 3279430 are cleanups. 028939c is a pure-refactoring of how GCS clients subscribe to GCS updates that should not change behavior yet, similar to [Pubsub] Wrap Redis-based publisher in GCS to allow incrementally switching to the GCS-based publisher #19600. 286161f parameterized gcs_server_test to test GCS pubsub. The rest of commits have new logic added.
All new logic are behind the gcs_grpc_based_pubsub flag, so this PR should not affect Ray's default behavior.
The added subscriber logic was tested by enabling gcs_grpc_based_pubsub in service_based_gcs_client_test.cc and adding basic handling logic for TaskLease. Since TaskLease pubsub will be removed, the change will not be checked in.
Next step is to support SubscribeAll entities for a channel in ray::pubsub, and test migrating more channels.
## Why are these changes needed?
Previously, we don't send requests if there is an in-flight request. But this is actually bad, because it prevent raylet get the latest information. For example, if the request needs 200ms to arrive at the raylet, the raylet will lose one update. In this case, the next request will arrive after 200 + 100 + (in flight time) ms. So we still should send the request.
TODO:
- Push the snapshot to raylet if the message is lost.
- Handle message loss in raylet better.
## Related issue number
#19438
* [RLlib] Unify the way we create and use LocalReplayBuffer for all the agents.
This change
1. Get rid of the try...except clause when we call execution_plan(),
and get rid of the Deprecation warning as a result.
2. Fix the execution_plan() call in Trainer._try_recover() too.
3. Most importantly, makes it much easier to create and use different types
of local replay buffers for all our agents.
E.g., allow us to easily create a reservoir sampling replay buffer for
APPO agent for Riot in the near future.
* Introduce explicit configuration for replay buffer types.
* Fix is_training key error.
* actually deprecate buffer_size field.
This RPC is from legacy code and not needed anymore (the task spec is already in the actor table), but it adds quite amount of keys to Redis.
The below is the sum of bytes size(? I am not sure if it is bytes size, but I grabbed the length of the value when I queried Redis) of each prefix when running many_ppo. As you can see Task& and Task takes a lot of part although they are not really used.
�[0m ��[12A�[9C�[?7h�[0m�[?12l�[?25h�[?25l�[?7l�[0mb�[?7h�[0m�[?12l�[?25h�[?25l�[?7l�[10D�[0m�[J�[0;38;5;28mIn [�[0;92;1m82�[0;38;5;28m]: �[0mb�[10D�[0m
�[J�[?7h�[0m�[?12l�[?25h�[?2004l�[0m�[?7h�[0;38;5;88mOut[�[0;91;1m82�[0;38;5;88m]: �[0m�[0m
defaultdict(int,
{b'WORKE': 1080864,
b'ACTOR': 1470931,
b'TASK&': 1020646,
b'TASK:': 870551,
b'PROFI': 360000,
b'PLACE': 10107,
b'JOB:\x01': 8,
b'JOB:\x04': 8,
b'NODE:': 99,
b'NODE_': 126,
b'INTER': 44,
b'JOB:\x03': 8,
b'redis': 16,
b'JOB:\x02': 8,
b'JOB:\x05': 8})