Commit graph

2658 commits

Author SHA1 Message Date
jon-chuang
3bc0858a4f
[Core/GCS] remove default 100 concurrent rate limit for heartbeat (#22613)
better scalability

Closes https://github.com/ray-project/ray/issues/20773
2022-02-28 15:26:05 -08:00
SangBin Cho
08374e8af4
Revert "[core] Fix bug in fusion for spilled objects (#22571)" (#22694)
Makes 2 tests flaky
2022-02-28 10:11:14 -08:00
Qing Wang
9572bb717f
[RuntimeEnv] Support setting actor level env vars for Java worker (#22240)
This PR supports setting actor level env vars for Java worker in runtime env.
General API looks like:
```java
RuntimeEnv runtimeEnv = new RuntimeEnv.Builder()
    .addEnvVar("KEY1", "A")
    .addEnvVar("KEY2", "B")
    .addEnvVar("KEY1", "C")  // This overwrites "KEY1" to "C"
    .build();

ActorHandle<A> actor1 = Ray.actor(A::new).setRuntimeEnv(runtimeEnv).remote();
```

If `num-java-workers-per-process` > 1, it will never reuse the worker process except they have the same runtime envs.

Co-authored-by: Qing Wang <jovany.wq@antgroup.com>
2022-02-28 10:58:37 +08:00
Lingxuan Zuo
94caac8722
Remove exporting symbols (#22623)
To hidden symbols of thirdparty library, this pull request reuses internal namespace that can be imported by any external native projects without side effects.

Besides, we suggest all of contributors to make sure it'd better use thirdparty library in ray scopes/namspaces and only ray::internal should be exported.

More details in https://github.com/ray-project/ray/pull/22526

Mobius has applied this change in https://github.com/ray-project/mobius/pull/28.

Co-authored-by: 林濯 <lingxuan.zlx@antgroup.com>
2022-02-28 09:41:10 +08:00
Stephanie Wang
0da541bb71
[core] Fix bug in fusion for spilled objects (#22571)
Whenever we spill, we try to spill all spillable objects. We also try to fuse small objects together to reduce total IOPS. If there aren't enough objects in the object store to meet the fusion threshold, we spill the objects anyway to avoid liveness issues.

However, the current logic always spills once we reach the end of the spillable objects or once we've reached the fusion threshold. This can produce lots of unfused objects if they are created concurrently with the spill.

This PR changes the spill logic: once we reach the end of the spillable objects, if the last batch of spilled objects is under the fusion threshold, we'll only spill it if we don't have other spills pending too. This gives the pending spills time to finish, and then we can re-evaluate whether it's necessary to spill the remaining objects. Liveness is also preserved.
2022-02-25 13:24:05 -08:00
Stephanie Wang
634ca9afdb
[core] Cleanup handling for nondeterministic object size during transfer (#22639)
Currently object transfers assume that the object size is fixed. This is a bad assumption during failures, especially with lineage reconstruction enabled and tasks with nondeterministic outputs.

This PR cleans up the handling and hopefully guards against two cases where the object size may change during a transfer:
1. The object manager's size information does not match the object in the local plasma store (due to async notifications). --> the object manager overwrites its own information if it finds that the physical object has a different size.
2. The receiver's created buffer size does not match the sender's object size. --> the receiver destroys the previous buffer and creates a new buffer with the correct size. This might cause some transient errors but eventually object transfer should succeed.

Unfortunately I couldn't trigger this from Python because it depends on some pretty specific timing conditions. However, I did add some unit tests for case 2 (this is the majority of the PR).
2022-02-25 09:39:14 -08:00
xwjiang2010
d4a1bc7bc7
Revert "[runtime env] runtime env inheritance refactor (#22244)" (#22626)
Breaks train_torch_linear_test.py.
2022-02-25 08:42:30 -06:00
Chen Shen
89aaa79ee9
[resource scheduler] unify the GetBestSchedulableNode into one public method. (#22560)
* clean up cluster resource scheduler

* address comments

* always prioritize local node when spill back waiting tasks

* address comments
2022-02-25 01:09:21 -08:00
ZhuSenlin
5efeb6534b
[Core] Bug fix about FixedPoint (#22584)
* Fix FixedPoint::operator-(double const d)

* add unit test

* remove FixedPoint(uint32_t i)

Co-authored-by: 黑驰 <senlin.zsl@antgroup.com>
2022-02-24 15:44:21 -08:00
Chen Shen
03f3bc302c
[Scheduler] Fix string id map bug (#22586)
* preserve

* fix bug
2022-02-24 09:55:21 -08:00
Tao Wang
8906305ab8
[Tiny][Core]save memory copy for getting data in gcs storage (#22582)
When get a bunch of data from redis, we first initialize local variables and then put them in vector, which bring so much copies from stack to heap or from local variables to vector.
This tiny little change would save the copies.
2022-02-24 14:15:27 +08:00
Qing Wang
eb9960785b
[Core][Remove JVM FullGC 1/N] Add allocator to in-memory store. (#21250)
According to the description of #21218 , in this PR, we support the ability specifying a frontend-defined in-memory object allocator. So that we can specify an allocator to allocate the buffers from JVM heap. This is the basic functionality for the next PR #21441 that the JVM is able to be aware of the memory pressure of the in-memeory store objects.

Note that, if we use a frontend defined allocator, it may break the zerocopy ability. In Java, JVM buffers is in heap and we should copy it to native memory if needed.

Co-authored-by: Qing Wang <jovany.wq@antgroup.com>
2022-02-24 10:53:59 +08:00
Chris K. W
3371e78d2e
[client] Chunk PutRequests (#22327)
Why are these changes needed?
Data from PutRequests is chunked into 64MiB messages over the datastream, to avoid the 2GiB message size limit from gRPC. This will allow users to transfer objects larger than 2GiB over the network.

Proto changes
Put requests now have fields for chunk_id to identify which chunk data belongs to, total_chunks to identify the total number of chunks in the object, and total_size for total size in bytes of the object (useful for raising warnings).

PutObject is still unary-unary. The dataservicer handles reassembling the chunks before passing the result to the underlying RayletServicer.

Dataclient changes
If a put request is inserted into the request queue, self._requests will chunk it lazily. Doing this lazily is important since inserting all of the chunks onto the request queue immediately would double the amount of memory needed to handle a large request. This also guarantees that the chunks of a given putrequest will be contiguous

Dataservicer changes
The dataservicer now maintains some state to track received chunks. Once all chunks for a putrequest are received, the combined chunks are passed to the raylet servicer.
2022-02-23 18:21:25 +02:00
Qing Wang
bf5693e0b1
[Java] Remove GetGcsClient (#22542)
This PR removes GetGcsClient from core worker and gets necessary data in Java worker.
2022-02-23 03:41:32 -08:00
Lingxuan Zuo
46cb246d75
[Symbols]Exporting openceus for streaming outside (#22526)
Opencenus symobls haven been exported in linux version of libcore_worker_library_java.so, but deleted from ray_exported_symbols.lds, which makes streaming macos test case failed.
This pr add this exporting record and rename *ray*streaming* stuff to *ray*internal* that's a united entry to ray cpp.

Co-authored-by: 林濯 <lingxuan.zlx@antgroup.com>
2022-02-23 16:24:16 +08:00
Stephanie Wang
abf2a70a29
[core] Add task and object reconstruction status to ray memory (#22317)
Improve observability for general objects and lineage reconstruction by adding a "Status" field to `ray memory`. The value of the field can be:
```
  // The task is waiting for its dependencies to be created.
  WAITING_FOR_DEPENDENCIES = 1;
  // All dependencies have been created and the task is scheduled to execute.
  SCHEDULED = 2;
  // The task finished successfully.
  FINISHED = 3;
```

In addition, tasks that failed or that needed to be re-executed due to lineage reconstruction will have a field listing the attempt number. Example output:
```
IP Address    | PID      | Type    | Call Site | Status    | Size     | Reference Type | Object Ref
192.168.4.22  | 279475   | Driver  | (task call) ... | Attempt #2: FINISHED | 10000254.0 B | LOCAL_REFERENCE | c2668a65bda616c1ffffffffffffffffffffffff0100000001000000


```
2022-02-22 21:26:21 -08:00
Eric Liang
9261428004
Drop level of spammy log message (#22576) 2022-02-22 21:23:34 -08:00
Guyang Song
902243fb03
[runtime env] support raylet sharing fate with agent (#22382)
- Remove the agent restart feature. 
- Raylet shares fate with agent to make the failover logic easier.
Refer to issue https://github.com/ray-project/ray/issues/21695#issuecomment-1032161528
2022-02-21 18:16:21 +08:00
Guyang Song
5783cdb254
[runtime env] runtime env inheritance refactor (#22244)
Runtime Environments is already GA in Ray 1.6.0. The latest doc is [here](https://docs.ray.io/en/master/ray-core/handling-dependencies.html#runtime-environments). And now, we already supported a [inheritance](https://docs.ray.io/en/master/ray-core/handling-dependencies.html#inheritance) behavior as follows (copied from the doc):
- The runtime_env["env_vars"] field will be merged with the runtime_env["env_vars"] field of the parent. This allows for environment variables set in the parent’s runtime environment to be automatically propagated to the child, even if new environment variables are set in the child’s runtime environment.
- Every other field in the runtime_env will be overridden by the child, not merged. For example, if runtime_env["py_modules"] is specified, it will replace the runtime_env["py_modules"] field of the parent.

We think this runtime env merging logic is so complex and confusing to users because users can't know the final runtime env before the jobs are run.

Current PR tries to do a refactor and change the behavior of Runtime Environments inheritance. Here is the new behavior:
- **If there is no runtime env option when we create actor, inherit the parent runtime env.**
- **Otherwise, use the optional runtime env directly and don't do the merging.**

Add a new API named `ray.runtime_env.get_current_runtime_env()` to get the parent runtime env and modify this dict by yourself. Like:
```Actor.options(runtime_env=ray.runtime_env.get_current_runtime_env().update({"X": "Y"}))```
This new API also can be used in ray client.
2022-02-21 18:13:22 +08:00
Jiajun Yao
baa14d695a
Round robin during spread scheduling (#21303)
- Separate spread scheduling and default hydra scheduling (i.e. SpreadScheduling != HybridScheduling(threshold=0)): they are already separated in the API layer and they have the different end goals so it makes sense to separate their implementations and evolve them independently.
- Simple round robin for spread scheduling: this is just a starting implementation, can be optimized later.
- Prefer not to spill back tasks that are waiting for args since the pull is already in progress.
2022-02-18 15:05:35 -08:00
Yi Cheng
95256181dd
[1][resource reporting] Remove redis based resource broadcasting. (#22463)
This flag has been turned on by default for almost 4 months. Delete the old code so that when refactoring, we don't need to take care of the legacy code path.
2022-02-18 14:09:37 -08:00
ZhuSenlin
3341fae573
[Core] remove unused method GcsResourceManager::UpdateResourceCapacity (#22462)
In the implementation of `GcsResourceManager::UpdateResourceCapacity`, 'cluster_scheduling_resources_'  is modified,  but this method is only used in c++ unit test, it is easy to cause confuse when reading the code. Since this method can be completely replaced by `GcsResourceManager::OnNodeAdd`, just remove it.

Co-authored-by: 黑驰 <senlin.zsl@antgroup.com>
2022-02-18 13:35:47 +08:00
ZhuSenlin
15cccd0286
[Core] Fix null pointer crash when GcsResourceManager::SetAvailableResources (#22459)
* fix null pointer crash when GcsResourceManager::SetAvailableResources

* add warning log when node does not exist

* add unit test

Co-authored-by: 黑驰 <senlin.zsl@antgroup.com>
2022-02-17 17:18:30 -08:00
Chen Shen
ab53848dfc
[refactor cluster-task-manage 4/n] refactor cluster_task_manager into distributed and local part (#21660)
This is a working in progress PR that splits cluster_task_manager into local and distributed parts.

For the distributed scheduler (cluster_task_manager_:
/// Schedules a task onto one node of the cluster. The logic is as follows:
/// 1. Queue tasks for scheduling.
/// 2. Pick a node on the cluster which has the available resources to run a
/// task.
/// * Step 2 should occur any time the state of the cluster is
/// changed, or a new task is queued.
/// 3. For tasks that's infeasible, put them into infeasible queue and reports
/// it to gcs, where the auto scaler will be notified and start new node
/// to accommodate the requirement.

For the local task manager:

/// It Manages the lifetime of a task on the local node. It receives request from
/// cluster_task_manager (the distributed scheduler) and does the following
/// steps:
/// 1. Pulling task dependencies, add the task into to_dispatch queue.
/// 2. Once task's dependencies are all pulled locally, the task becomes ready
/// to dispatch.
/// 3. For all tasks that are dispatch-ready, we schedule them by acquiring
/// local resources (including pinning the objects in memory and deduct
/// cpu/gpu and other resources from local resource manager), and start
/// a worker.
/// 4. If task failed to acquire resources in step 3, we will try to
/// spill it to a different remote node.
/// 5. When a worker finishes executing its task(s), the requester will return
/// it and we should release the resources in our view of the node's state.
/// 6. If a task has been waiting for arguments for too long, it will also be
/// spilled back to a different node.
///
2022-02-17 01:14:33 -08:00
Yi Cheng
83257a4193
Revert "[Client] chunked get requests" (#22455)
Reverts ray-project/ray#22100

linux://python/ray/tests:test_runtime_env_working_dir_remote_uri becomes very flaky after this PR.
2022-02-16 16:43:43 -08:00
mwtian
839bc5019f
Fix building Windows wheels (#22388) (#22391)
This fixes Windows wheel build issue on master and releases/1.11.0 branch. If the issue happens more often we can try to run iwyu.
2022-02-15 15:24:10 -08:00
mwtian
32035eb125
[Pubsub] increase subscriber timeout (#22394)
As mentioned in https://github.com/ray-project/ray/issues/22161#issuecomment-1039661368, increase subscriber timeout to avoid subscriber state being deleted too soon.
2022-02-15 14:48:19 -08:00
Chris K. W
9a7979d9a2
[Client] chunked get requests (#22100)
Why are these changes needed?
Switches GetObject from unary-unary to unary-streaming so that large objects can be streamed across multiple messages (currently hardcoded to 64MiB chunks). This will allow users to retrieve objects larger than 2GiB from a remote cluster. If the transfer is interrupted by a recoverable gRPC error (i.e. temporary disconnect), then the request will be retried starting from the first chunk that hasn't been received yet.

Proto changes
GetRequest's now have the field start_chunk_id, to indicate which chunk to start from (useful if the we have to retry a request after already receiving some chunks). GetResponses now have a chunk_id (0 indexed chunk of the serialized object), total_chunks (total number of chunks, used in async transfers to determine when all chunks have been received), and total_size (the total size of the object in bytes, used to raise user warnings if the object being retrieved is very large).

Server changes
Mainly just updating GetObject logic to yield chunks instead of returning

Client changes
At the moment, objects can be retrieved directly from the raylet servicer (ray.get) or asynchronously over the datapath (await some_remote_func.remote()). In both cases, the request will error if the chunk isn't valid (server side error) or if a chunk is received out of order (shouldn't happen in practice, since gRPC guarantees that messages in a stream either arrive in order or not at all).

ray.get is fairly straightforward, and changes are mainly to accommodate yielding from the stub instead of taking the value directly.

await some_remote_func.remote() is similar, but to keep things consistent with other async handling collecting the chunks is handled by a ChunkCollector, which wraps around the original callback.
2022-02-16 00:07:16 +02:00
Chen Shen
4ad1fba100
[refactor cluster-task-manage 3/n] Separate stats reporting into its own file (#22359)
* wip

* refactor
2022-02-15 10:48:00 -08:00
ZhuSenlin
37ef372a10
Use shared_ptr to instead of object in cluster_scheduling_resources_ to reduce rehash cost. (#22376)
1. In scheduling optimization, we should encapsulate `SchedulingResources`, `GcsNodeInfo` and other node related information into a `NodeContext` for use, which requires that `SchedulingResources` is shareable. This PR does not involve the transformation logic of `NodeContext`, but only transforms `SchedulingResources` into shareable.
2. `cluster_scheduling_resources_` holds raw object of `SchedulingResources`, which will bring some overhead when rehash (even though the std::move used when rehash).

Co-authored-by: 黑驰 <senlin.zsl@antgroup.com>
2022-02-15 23:43:59 +08:00
mwtian
59d9e20a4c
Revert "Revert "[Release 1.11.0][Core] avoid unnecessary work during event st… (#22144)" (#22284)
This reverts commit 6235b6d7e9.

Looks like windows://python/ray/tests:test_dataclient_disconnect has similar level of flakiness as before the revert. This seems unrelated and the test needs to be fixed in another way.
2022-02-15 00:20:28 -08:00
Yi Cheng
8a3bd6c275
[gcs/ha] Enable HA flags by default (#21608)
PR to enable all three flags for GCS HA:
- RAY_bootstrap_with_gcs=1 
- RAY_gcs_grpc_based_pubsub=1 
- RAY_gcs_storage=memory
2022-02-14 11:13:17 -08:00
Chen Shen
db5de9c35c
[scheduler-refactor 2/n] move actor reporting into helper class too (#22333)
* move this

* address comments
2022-02-14 02:13:14 -08:00
Yi Cheng
531e215921
[gcs] Fix in_memory_store not handling nullptr callback issue (#22321)
in memory store is not handling the nullptr callback well which leads to gcs crash in node failure tests. This PR fixed it.
2022-02-11 18:35:40 -08:00
Edward Oakes
49b3e6c53c
[serve] Support user-provided health check via def check_health(self) method (#22178) 2022-02-11 12:53:37 -06:00
ZhuSenlin
358771c636
Optimize MultiItemCallback and MapCallback to reduce data copy when GCS load data after restart (#22307)
After GCS restarts, metadata will be loaded from redis. Now redis callback returns const &, which requires a copy of the loaded data. After modifying to && and then using std::move, data copy can be reduced.
2022-02-11 16:57:16 +08:00
Jiajun Yao
07a1ba8e34
Update local object store usage (#22157)
* Update local object store usage

* fix

* test
2022-02-09 22:08:25 -08:00
Stephanie Wang
495eb14179
[core] Recover spilled objects that are lost during node failure (#21485)
* Failing test

* trigger recovery from ref counter

* x

* update

* lint

* stress test

* update

* format

* x
2022-02-09 18:22:16 -08:00
Jiajun Yao
673ecd1241
Isolate ray configs for each job (#22206)
If we run multiple jobs in the same process (this is basically the behavior of python tests), they should be isolated in the sense that system config for job 1 shouldn't affect config for job 2.
```
ray.init(_system_config={})
# job 1
ray.shutdown()

ray.init(_system_config={})
# job 2
ray.shutdown()
```

Currently it's not the case, since RayConfig is a static variable and it's shared across drivers in the same process. This PR resets the configs to default value before applying job specific _system_config.

Note: it's backward incompatible change if user depends on the current behavior but I'm not aware of such case.
2022-02-09 10:18:46 -08:00
Chen Shen
1abe69e9b7
[refactor cluster-task-manage 1/n] separate resource reporting logic into helper class (#22215)
Separate Scheduler Resource Reporting logic into a separate class for better readability and maintainability.
2022-02-08 17:22:05 -08:00
Stephanie Wang
dcd96ca348
[core] Increment ref count when creating an ObjectRef to prevent object from going out of scope (#22120)
When a Ray program first creates an ObjectRef (via ray.put or task call), we add it with a ref count of 0 in the C++ backend because the language frontend will increment the initial local ref once we return the allocated ObjectID, then delete the local ref once the ObjectRef goes out of scope. Thus, there is a brief window where the object ref will appear to be out of scope.

This can cause problems with async protocols that check whether the object is in scope or not, such as the previous bug fixed in #19910. Now that we plan to enable lineage reconstruction to automatically recover lost objects, this race condition can also be problematic because we use the ref count to decide whether an object needs to be recovered or not.

This PR avoids these race conditions by incrementing the local ref count in the C++ backend when executing ray.put() and task calls. The frontend is then responsible for skipping the initial local ref increment when creating the ObjectRef. This is the same fix used in #19910, but generalized to all initial ObjectRefs.

This is a re-merge for #21719 with a fix for removing the owned object ref if creation fails.
2022-02-08 14:50:50 -08:00
Eric Liang
8f7db1c6ab
Properly release resources of workers exiting due to max_calls (#22146)
Previously code incorrectly assumed that an exiting worker would disconnect from the raylet promptly to release resources. This isn't the case if the worker is owning references. This PR plumbs through the right release resources call even in this scenario.

Closes https://github.com/ray-project/ray/issues/10960

Co-authored-by: SangBin Cho <rkooo567@gmail.com>
2022-02-07 21:57:11 -08:00
Archit Kulkarni
de2c950d55
[runtime env] Unify checks for empty runtime env using helper function (#22129)
Followup from https://github.com/ray-project/ray/pull/21788.  Previously we had a lot of `serialized_runtime_env == "{}" || serialized_runtime_env == ""` scattered around the C++ code; this PR puts this in a helper function.
2022-02-07 17:18:51 -06:00
Chen Shen
cc577c10ed
[refactor cluster-task-manage 0/n] move internal state into a separate header #22160
this is the first PR that refactors cluster task manager. specifically, we move those internal state into a separate header file.
2022-02-06 22:17:33 -08:00
Jiajun Yao
ff8af2edba
Remove TaskExecutionSpec (#22155) 2022-02-06 21:59:23 -08:00
SangBin Cho
6235b6d7e9
Revert "[Release 1.11.0][Core] avoid unnecessary work during event st… (#22144)
This reverts commit 9ac3f6879d.

Seems like this makes this test flaky, so I will revert it for now.
2022-02-06 18:19:44 -08:00
Jiajun Yao
88d2e21585
Disable scheduler_report_pinned_bytes_only (#22132) 2022-02-05 11:06:59 -08:00
SangBin Cho
ea4079465d
[Runtime Env] Support runtime env error message for actors (#22109) 2022-02-04 15:32:02 -06:00
SangBin Cho
6dda196f47
Revert "[core] Increment ref count when creating an ObjectRef to prev… (#22106)
This reverts commit e3af828220.
2022-02-04 00:55:45 -08:00
Stephanie Wang
e3af828220
[core] Increment ref count when creating an ObjectRef to prevent object from going out of scope (#21719)
When a Ray program first creates an ObjectRef (via ray.put or task call), we add it with a ref count of 0 in the C++ backend because the language frontend will increment the initial local ref once we return the allocated ObjectID, then delete the local ref once the ObjectRef goes out of scope. Thus, there is a brief window where the object ref will appear to be out of scope.

This can cause problems with async protocols that check whether the object is in scope or not, such as the previous bug fixed in #19910. Now that we plan to enable lineage reconstruction to automatically recover lost objects, this race condition can also be problematic because we use the ref count to decide whether an object needs to be recovered or not.

This PR avoids these race conditions by incrementing the local ref count in the C++ backend when executing ray.put() and task calls. The frontend is then responsible for skipping the initial local ref increment when creating the ObjectRef. This is the same fix used in #19910, but generalized to all initial ObjectRefs.
2022-02-03 17:31:27 -08:00