feat(batch): implement async planner for batch orchestration#2197
Conversation
Signed-off-by: Ning Wang <n.wang.chn@hotmail.com>
Signed-off-by: Ning Wang <n.wang.chn@hotmail.com>
There was a problem hiding this comment.
Code Review
This pull request introduces an asynchronous Scheduler implementation for the planner, replacing the previous passthrough logic. It utilizes a background worker pool to handle resource provisioning and batch submission, and includes corresponding UI updates to support new 'pending' and 'provisioning' job statuses. The review identifies critical race conditions during job cancellation and submission, potential memory leaks in the in-memory job tracking maps, and opportunities to improve maintainability through named types and stable terminal timestamps.
| klog.Infof("[planner.scheduler] submit job_id=%q provision_id=%q model_template=%v", | ||
| req.JobID, provResult.ProvisionID, req.ModelTemplate) | ||
|
|
||
| batch, err := q.bc.CreateBatch(q.baseCtx, req.BatchParams, aibrix) |
There was a problem hiding this comment.
There is a race condition where a job can be submitted to MDS even if it was canceled while Provision was running. The process worker should re-check the job state under a lock before calling CreateBatch. If the state is no longer Provisioning (e.g., it became Canceled), the worker should release the provisioned resource and abort the submission.
There was a problem hiding this comment.
The race is handled at the post-CreateBatch checkpoint. If the job was canncelled. The provisioned resource will be released and the submission will be aborted.
| jobs map[string]*queuedJob // JobID -> state | ||
| jobByBatch map[string]string // batch.ID -> JobID (for ListJobs tagging) |
There was a problem hiding this comment.
The jobs and jobByBatch maps grow indefinitely as new jobs are enqueued. Since this is an in-memory implementation, it will eventually lead to a memory leak in long-running processes. Consider implementing a cleanup mechanism (e.g., a TTL-based cache or a background janitor) to remove terminal jobs (Failed, Canceled, or old Completed jobs) after a certain period.
There was a problem hiding this comment.
The concern is true. Thinking to address this in a seperate PR in the near future.
| type queuedJob struct { | ||
| req *plannerapi.EnqueueRequest | ||
| state jobState | ||
| batchID string // populated when state == jobStateSubmitted | ||
| err error // populated when state == jobStateFailed | ||
| enqueuedAt time.Time | ||
| } |
There was a problem hiding this comment.
To ensure that terminal timestamps (FailedAt, CancelledAt) remain stable and are not recalculated on every read, consider adding failedAt and canceledAt fields to the queuedJob struct. These should be populated in markFailed and Cancel respectively.
| type queuedJob struct { | |
| req *plannerapi.EnqueueRequest | |
| state jobState | |
| batchID string // populated when state == jobStateSubmitted | |
| err error // populated when state == jobStateFailed | |
| enqueuedAt time.Time | |
| } | |
| type queuedJob struct { | |
| req *plannerapi.EnqueueRequest | |
| state jobState | |
| batchID string // populated when state == jobStateSubmitted | |
| err error // populated when state == jobStateFailed | |
| enqueuedAt time.Time | |
| failedAt time.Time | |
| canceledAt time.Time | |
| } |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces an asynchronous 'Scheduler' planner to replace the synchronous 'Passthrough' implementation. The new scheduler manages job lifecycles by introducing 'pending' and 'provisioning' states, utilizing a worker pool to handle resource provisioning and batch submission. Corresponding updates were made to the API proto definitions, frontend components to support the new statuses, and the server initialization logic. My feedback suggests extracting the complex anonymous struct used for the planner decision into a named type to improve code readability and maintainability.
f44f9dd to
3d0c69e
Compare
Signed-off-by: Ning Wang <n.wang.chn@hotmail.com>
3d0c69e to
b5f75c1
Compare
| } | ||
| planner := plannerimpl.NewPassthrough(batchClient, rm.Provisioner) | ||
| planner := plannerimpl.NewScheduler(batchClient, rm.Provisioner, plannerimpl.DefaultWorkerCount) | ||
| s.planner = planner |
There was a problem hiding this comment.
122 and 123 could be single line?
| // success/error/latency; the struct also records every call for later | ||
| // assertion and tracks peak concurrent in-flight Provisions so worker-pool | ||
| // parallelism can be measured directly. | ||
| type fakeProvisioner struct { |
There was a problem hiding this comment.
this should be common provisioner interface. Can they be reused or not?
why not put them under async_planner_test.go..
There was a problem hiding this comment.
Right now, the test file is only used by planner. Maybe we can keep it here until later it is reused by another compoment.
| provPollInterval time.Duration | ||
| } | ||
|
|
||
| // jobState is the planner-side lifecycle. Before submission, statusFor maps |
There was a problem hiding this comment.
it should be not part of this file? is there planner types could be used?
There was a problem hiding this comment.
No existing planner type fits. jobState describes the AsyncPlanner-internal pre-submission lifecycle (Pending → Provisioning, plus terminal Failed/Canceled before MDS takes over) and isn't referenced outside this file,
There was a problem hiding this comment.
job state can not be used by other planner implmenetation? I think this should be common? I didn't get this part, if you have a new planner implementation, then you will have other jobstate definition?
There was a problem hiding this comment.
We only have one planner now. If we will have multiple planners, I agree that we should make the job state in common.
| } | ||
| } | ||
| if c, ok := s.planner.(io.Closer); ok { | ||
| if err := c.Close(); err != nil { |
There was a problem hiding this comment.
what happened for those workers on provisioning?
There was a problem hiding this comment.
There are two cases. If the the planner has received the provisionID, it will release the resource accordingly. If the RM has allocated internally but hasn't returned the ProvisionID, there will be a caveat, bounded by the RPC duration. In practice, this window should be very small?
|
|
||
| // ListJobs merges MDS batches with local not-yet-submitted jobs. Local jobs | ||
| // are shown only on the first page so the MDS cursor remains valid. | ||
| func (q *AsyncPlanner) ListJobs(ctx context.Context, req *plannerapi.ListJobsRequest) (*plannerapi.ListJobsResponse, error) { |
There was a problem hiding this comment.
is paging still working? seem not?
There was a problem hiding this comment.
Actually this is something I want to discuss and get feedback on regarding how to merge job status for jobs in queue and jobs in MDS.
Paging works but not perfectly. Right now, the implementation idea is to display all local jobs (jobs not submited to MDS) first. The jobs in MDS will be displayed after that. The reason we can't paginate strictly across both: the MDS After cursor only encodes positions in MDS's batch ID space; local jobs have no stable IDs (they're in-memory, lost on restart), so we can't build a unified cursor across both sources.
The fix of this issue can be deferred to a future PR.
There was a problem hiding this comment.
The jobs in MDS will be displayed after that.
I didn't get this part. there's logical job shown on UI. Since you manage the job state, you can do the merge whithin planner?
There was a problem hiding this comment.
Planner does the job merge and currently displays sorted local jobs first followed by the sorted MDS job. Ideally, we don't need to distinguish them and sort them uniformly. I can address this issue in a future PR.
| // provReadyTimeout caps how long a single worker will wait for a | ||
| // provision to reach Running. Beyond this, the job is marked Failed and | ||
| // the resource is released. | ||
| const provReadyTimeout = 2 * time.Minute |
There was a problem hiding this comment.
I do not quite understand why it's 2 mins? any analytics?
There was a problem hiding this comment.
It depends on the provisioning speed of resource providers. The 2*min is more than enough for the current k8s provisioning process but we may need to adjust this value for other resource providers.
There was a problem hiding this comment.
how to measure it, what if we switch ot other cloud providers? I just like to know 2mins is from some of your test or a magic number you just defined?
There was a problem hiding this comment.
2 mins are from my local provisioning tests. I feel that we need to test and config different numbers for different cloud providers.
| // MDS for a submitted job. A cancel that lands mid-Provision or | ||
| // mid-CreateBatch is honored at the worker's post-CreateBatch checkpoint | ||
| // (which forwards CancelBatch and releases the resource). | ||
| func (q *AsyncPlanner) Cancel(ctx context.Context, jobID string) (*plannerapi.Job, error) { |
There was a problem hiding this comment.
does it work with checkpoint pretty well?
There was a problem hiding this comment.
Yes, we re-read state at the end of process() after CreateBatch returns to check if a cancel landed; covered by TestCancelDuringProvisioning* and TestCancelDuringCreateBatch*. Let me know if you see any potential issues.
|
|
||
| aibrix := plannerclient.AIBrixExtraBody{ | ||
| JobID: req.JobID, | ||
| PlannerDecision: &struct { |
There was a problem hiding this comment.
why do you replicate a struct here?
There was a problem hiding this comment.
Created a named type for reuse.
There was a problem hiding this comment.
I mean this is not defined in earlier PR?
| return out | ||
| } | ||
|
|
||
| func (q *AsyncPlanner) deleteJob(jobID string) { |
There was a problem hiding this comment.
how do you manage jobs in the queue? Seem delete is not invoked if job is full or expired?
There was a problem hiding this comment.
Jobs are dequeued and removed from the Go channel implicitly by the worker's <-q.submit receive — no explicit delete is needed for the queue itself. The previous deleteJob has been renamed to rollbackEnqueue for clarity and is only used to undo the bookkeeping insert in two edge cases where Enqueue itself fails. Added some comments in the code as well.
There was a problem hiding this comment.
could you point it out? I didn't find the logics
There was a problem hiding this comment.
Is this part of code that you are looking for?
https://github.com/nwangfw/aibrix/blob/201ef7fc838728db85160e63ca2947e85c8920b0/apps/console/api/planner/impl/planner.go#L155-L159
Signed-off-by: Ning Wang <n.wang.chn@hotmail.com>
Signed-off-by: Ning Wang <n.wang.chn@hotmail.com>
| // details (currently always nil — RM doesn't return them yet). | ||
| type PlannerDecision struct { | ||
| ProvisionID string `json:"provision_id,omitempty"` | ||
| ProvisionResourceDeadline int64 `json:"provision_resource_deadline,omitempty"` |
There was a problem hiding this comment.
need put a comment here to explain if the deadline is a unix timestamp in seconds, milliseconds, etc.
| type PlannerDecision struct { | ||
| ProvisionID string `json:"provision_id,omitempty"` | ||
| ProvisionResourceDeadline int64 `json:"provision_resource_deadline,omitempty"` | ||
| ResourceDetails []struct { |
There was a problem hiding this comment.
- let's have enough comments to explain these fields
- need a comment to explain that the actual gpu num of each deployment is defined in the template, otherwise other contributors will get confused
There was a problem hiding this comment.
Sure. Comments here are added for every field here.
| bc plannerclient.BatchClient | ||
| prov provisioner.Provisioner | ||
|
|
||
| submit chan string // buffered FIFO of pending JobIDs |
There was a problem hiding this comment.
seems like planner's and queue's implementations are mixed together, let's split them for better flexibility
There was a problem hiding this comment.
Great point and have split the queue implementation into a separate file.
| } | ||
| } | ||
|
|
||
| func (q *Planner) markFailed(jobID string, err error) { |
There was a problem hiding this comment.
Once a job is marked as failed, we need to release its provision (if valid) as well.
There was a problem hiding this comment.
Yes, the current code will release the provision for if a job has been marked as failed under the process().
| return | ||
| } | ||
| if job.state != jobStatePending { | ||
| // Cancel raced ahead; drop without provisioning. |
There was a problem hiding this comment.
didn't get what does the comment mean, let's put a simple example to explain it
There was a problem hiding this comment.
got this part after reviewing the cancel function. let's explicitly check if the state is cancelled and write error log if encountering other invalid states.
There was a problem hiding this comment.
Added an example here regarding what does this statement mean. The state info is logged so that we can figure out the exact reason.
| return &plannerapi.Job{JobID: jobID, Batch: placeholderBatch(req, openai.BatchStatusCancelled, enqueuedAt, terminalAt)}, nil | ||
| case jobStateSubmitted: | ||
| klog.Infof("[planner] cancel submitted job_id=%q batch_id=%q", jobID, batchID) | ||
| batch, err := q.bc.CancelBatch(ctx, batchID) |
There was a problem hiding this comment.
need to release provision
There was a problem hiding this comment.
Great catch. Release provision added.
357ddbe to
9beeae4
Compare
| const queueCapacity = 256 | ||
|
|
||
| // jobQueue owns the buffered FIFO of pending JobIDs. | ||
| type jobQueue struct { |
There was a problem hiding this comment.
let's have a queue interface first and this impl as a fifo queue implementation. do not use the name of jobQueue, a name describing its functionality is preferred since we will have multiple queue implementations in the future.
There was a problem hiding this comment.
Make sense. How about this version?
Signed-off-by: Ning Wang <n.wang.chn@hotmail.com>
9beeae4 to
201ef7f
Compare
Pull Request Description
Replaces the synchronous
Passthroughplanner with an asynchronous queue-backed implementation to support the case when the provisioning takes time. The gRPC caller now returns immediately with a synthetic "validating" batch while a worker pool runsProvision+CreateBatchin the background. Theplannerapi.Plannerinterface is unchanged; the swap is one line inserver.go.