Commit 0ac2957
authored
[ENH]: Integrate task operators into compaction (#5687)
## Description of changes
This change implements the TaskRunner that pulls scheduled Tasks from the heap and runs them. This reuses a large part of the CompactionManager's code with the following additions:
- The CompactionManager takes in a strongly typed binary enum to specify whether it is intended to service Tasks or usual collection Compactions. All the changes are meant to take effect in Tasks mode.
- The scheduler regularly pulls tasks from the S3 heap and adds them to its internal queue that the overlying CompactionManager polls.
- CompactionOrchestrator has been refactored to not just take in an input_collection to compact but an input_collection and output_collection. Its preexisting operators have been refactored as such as well.
- For each scheduled task the Task-related CompactionManager finds, it kicks off a CompactionOrchestrator with PrepareTask as the initial operator. PrepareTask does the following:
- It takes in a task uuid and task nonce to resolve a SysDB Task object. It creates an output collection if needed.
- It marks the beginning of this nonce's execution by generating a new value for `next_nonce` in the `Tasks` table in the SysDB.
- It detects if this step had already been done before by seeing whether `Tasks.lowest_live_nonce != Tasks.next_nonce`.
- If the two are not equal, the previously described increment to `next_nonce` does not occur. This case implies that there was an incomplete execution of `lowest_live_nonce`. PrepareTask compares the `scout_logs` result of the given input collection with `Tasks.completion_offset` to determine whether this Task needs to be re-executed or whether it can skip to the FinishTask operator.
- The Register operator has been edited to update the current Task's completion offset in the same transaction where the output collection's records are flushed to SysDB.
- A FinishTask operator has been added to run after the Register operator in Task-related CompactionOrchestrators. This operator rechecks scout_logs for the given input collection. If the resulting offset is too far ahead of the `completion_offset` that we sent to SysDB in Register, we will schedule a new item in the heap to do this task again at a later time. We then unconditionally set `lowest_live_nonce` to `next_nonce` in the SysDB, marking `lowest_live_nonce` as completed and allowing all heap entries below the new value of `lowest_live_nonce` to be pruned.
- Improvements & Bug fixes
- ^^^^
- New functionality
- The TaskRunner's operation as a separate system component in the Compactor node is disabled by default it can be enabled by the following configuration in the compaction_service config.
```
task_runner:
enabled: true
```
Relevant TODOs after this:
- Adjust scheduler to be aware of `lowest_live_nonce`
- Send a new heap item in `FinishTask` if the `scout_logs` check deems it necessary.
- Adjust `create_task` to do a 2-phase commit and adjust the TaskRunner logic to handle partially created tasks. `create_task` must also send over an initial backfill item to the heap if its input collection existed before this Task.
- Validate operator parameters and pull in operator data during TaskRunner execution.
- TaskRunner only pulls data from compacted segments of the input collection. It is possible to also pull log data in this process.
- Support for incremental tasks.
## Test plan
_How are these changes tested?_
An integration test has been added in [compact.rs](http://compact.rs).
- [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust
## Migration plan
_Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_
## Observability plan
_What is the plan to instrument and monitor this change?_
## Documentation Changes
_Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the_ [_docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_1 parent eeecb57 commit 0ac2957
File tree
19 files changed
+1473
-193
lines changed- examples
- rust
- log-service/src
- s3heap-service
- types/src
- worker
- src
- compactor
- execution
- operators
- orchestration
19 files changed
+1473
-193
lines changedSome generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
237 | 237 | | |
238 | 238 | | |
239 | 239 | | |
240 | | - | |
| 240 | + | |
241 | 241 | | |
242 | 242 | | |
243 | 243 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
7 | 7 | | |
8 | 8 | | |
9 | 9 | | |
| 10 | + | |
10 | 11 | | |
11 | 12 | | |
12 | 13 | | |
| |||
60 | 61 | | |
61 | 62 | | |
62 | 63 | | |
| 64 | + | |
| 65 | + | |
63 | 66 | | |
64 | 67 | | |
65 | 68 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
1388 | 1388 | | |
1389 | 1389 | | |
1390 | 1390 | | |
1391 | | - | |
| 1391 | + | |
| 1392 | + | |
| 1393 | + | |
| 1394 | + | |
1392 | 1395 | | |
1393 | 1396 | | |
1394 | 1397 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
14 | 14 | | |
15 | 15 | | |
16 | 16 | | |
| 17 | + | |
17 | 18 | | |
18 | 19 | | |
19 | 20 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
6 | 6 | | |
7 | 7 | | |
8 | 8 | | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
9 | 29 | | |
10 | 30 | | |
11 | 31 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
79 | 79 | | |
80 | 80 | | |
81 | 81 | | |
| 82 | + | |
82 | 83 | | |
83 | 84 | | |
84 | 85 | | |
| |||
0 commit comments