Skip to content

feat(batch): implement async planner for batch orchestration#2197

Merged
Jeffwan merged 6 commits into
vllm-project:mainfrom
nwangfw:feat/planner-queue-impl-clean
May 16, 2026
Merged

feat(batch): implement async planner for batch orchestration#2197
Jeffwan merged 6 commits into
vllm-project:mainfrom
nwangfw:feat/planner-queue-impl-clean

Conversation

@nwangfw
Copy link
Copy Markdown
Collaborator

@nwangfw nwangfw commented May 13, 2026

Pull Request Description

Replaces the synchronous Passthrough planner with an asynchronous queue-backed implementation to support the case when the provisioning takes time. The gRPC caller now returns immediately with a synthetic "validating" batch while a worker pool runs Provision + CreateBatch in the background. The plannerapi.Planner interface is unchanged; the swap is one line in server.go.

nwangfw added 2 commits May 12, 2026 18:57
Signed-off-by: Ning Wang <n.wang.chn@hotmail.com>
Signed-off-by: Ning Wang <n.wang.chn@hotmail.com>
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an asynchronous Scheduler implementation for the planner, replacing the previous passthrough logic. It utilizes a background worker pool to handle resource provisioning and batch submission, and includes corresponding UI updates to support new 'pending' and 'provisioning' job statuses. The review identifies critical race conditions during job cancellation and submission, potential memory leaks in the in-memory job tracking maps, and opportunities to improve maintainability through named types and stable terminal timestamps.

klog.Infof("[planner.scheduler] submit job_id=%q provision_id=%q model_template=%v",
req.JobID, provResult.ProvisionID, req.ModelTemplate)

batch, err := q.bc.CreateBatch(q.baseCtx, req.BatchParams, aibrix)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a race condition where a job can be submitted to MDS even if it was canceled while Provision was running. The process worker should re-check the job state under a lock before calling CreateBatch. If the state is no longer Provisioning (e.g., it became Canceled), the worker should release the provisioned resource and abort the submission.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The race is handled at the post-CreateBatch checkpoint. If the job was canncelled. The provisioned resource will be released and the submission will be aborted.

Comment thread apps/console/api/planner/impl/async_planner.go Outdated
Comment on lines +50 to +51
jobs map[string]*queuedJob // JobID -> state
jobByBatch map[string]string // batch.ID -> JobID (for ListJobs tagging)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The jobs and jobByBatch maps grow indefinitely as new jobs are enqueued. Since this is an in-memory implementation, it will eventually lead to a memory leak in long-running processes. Consider implementing a cleanup mechanism (e.g., a TTL-based cache or a background janitor) to remove terminal jobs (Failed, Canceled, or old Completed jobs) after a certain period.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concern is true. Thinking to address this in a seperate PR in the near future.

Comment on lines +74 to +80
type queuedJob struct {
req *plannerapi.EnqueueRequest
state jobState
batchID string // populated when state == jobStateSubmitted
err error // populated when state == jobStateFailed
enqueuedAt time.Time
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To ensure that terminal timestamps (FailedAt, CancelledAt) remain stable and are not recalculated on every read, consider adding failedAt and canceledAt fields to the queuedJob struct. These should be populated in markFailed and Cancel respectively.

Suggested change
type queuedJob struct {
req *plannerapi.EnqueueRequest
state jobState
batchID string // populated when state == jobStateSubmitted
err error // populated when state == jobStateFailed
enqueuedAt time.Time
}
type queuedJob struct {
req *plannerapi.EnqueueRequest
state jobState
batchID string // populated when state == jobStateSubmitted
err error // populated when state == jobStateFailed
enqueuedAt time.Time
failedAt time.Time
canceledAt time.Time
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

Comment thread apps/console/api/planner/impl/async_planner.go Outdated
Comment thread apps/console/api/planner/impl/async_planner.go Outdated
@nwangfw
Copy link
Copy Markdown
Collaborator Author

nwangfw commented May 13, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an asynchronous 'Scheduler' planner to replace the synchronous 'Passthrough' implementation. The new scheduler manages job lifecycles by introducing 'pending' and 'provisioning' states, utilizing a worker pool to handle resource provisioning and batch submission. Corresponding updates were made to the API proto definitions, frontend components to support the new statuses, and the server initialization logic. My feedback suggests extracting the complex anonymous struct used for the planner decision into a named type to improve code readability and maintainability.

Comment thread apps/console/api/planner/impl/async_planner.go Outdated
@nwangfw nwangfw force-pushed the feat/planner-queue-impl-clean branch from f44f9dd to 3d0c69e Compare May 13, 2026 21:59
@nwangfw nwangfw changed the title [WIP] feat(planner): implement async planner feat(planner): implement async planner May 13, 2026
@nwangfw nwangfw assigned nwangfw and unassigned nwangfw May 13, 2026
@nwangfw nwangfw requested review from DwyaneShi, Jeffwan and zhangjyr May 13, 2026 22:58
Signed-off-by: Ning Wang <n.wang.chn@hotmail.com>
@nwangfw nwangfw force-pushed the feat/planner-queue-impl-clean branch from 3d0c69e to b5f75c1 Compare May 14, 2026 04:27
Comment thread apps/console/api/server/server.go Outdated
}
planner := plannerimpl.NewPassthrough(batchClient, rm.Provisioner)
planner := plannerimpl.NewScheduler(batchClient, rm.Provisioner, plannerimpl.DefaultWorkerCount)
s.planner = planner
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

122 and 123 could be single line?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified.

// success/error/latency; the struct also records every call for later
// assertion and tracks peak concurrent in-flight Provisions so worker-pool
// parallelism can be measured directly.
type fakeProvisioner struct {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be common provisioner interface. Can they be reused or not?

why not put them under async_planner_test.go..

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, the test file is only used by planner. Maybe we can keep it here until later it is reused by another compoment.

provPollInterval time.Duration
}

// jobState is the planner-side lifecycle. Before submission, statusFor maps
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be not part of this file? is there planner types could be used?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No existing planner type fits. jobState describes the AsyncPlanner-internal pre-submission lifecycle (Pending → Provisioning, plus terminal Failed/Canceled before MDS takes over) and isn't referenced outside this file,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job state can not be used by other planner implmenetation? I think this should be common? I didn't get this part, if you have a new planner implementation, then you will have other jobstate definition?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only have one planner now. If we will have multiple planners, I agree that we should make the job state in common.

Comment thread apps/console/api/server/server.go Outdated
}
}
if c, ok := s.planner.(io.Closer); ok {
if err := c.Close(); err != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happened for those workers on provisioning?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two cases. If the the planner has received the provisionID, it will release the resource accordingly. If the RM has allocated internally but hasn't returned the ProvisionID, there will be a caveat, bounded by the RPC duration. In practice, this window should be very small?


// ListJobs merges MDS batches with local not-yet-submitted jobs. Local jobs
// are shown only on the first page so the MDS cursor remains valid.
func (q *AsyncPlanner) ListJobs(ctx context.Context, req *plannerapi.ListJobsRequest) (*plannerapi.ListJobsResponse, error) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is paging still working? seem not?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is something I want to discuss and get feedback on regarding how to merge job status for jobs in queue and jobs in MDS.

Paging works but not perfectly. Right now, the implementation idea is to display all local jobs (jobs not submited to MDS) first. The jobs in MDS will be displayed after that. The reason we can't paginate strictly across both: the MDS After cursor only encodes positions in MDS's batch ID space; local jobs have no stable IDs (they're in-memory, lost on restart), so we can't build a unified cursor across both sources.

The fix of this issue can be deferred to a future PR.

Copy link
Copy Markdown
Collaborator

@Jeffwan Jeffwan May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The jobs in MDS will be displayed after that.

I didn't get this part. there's logical job shown on UI. Since you manage the job state, you can do the merge whithin planner?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Planner does the job merge and currently displays sorted local jobs first followed by the sorted MDS job. Ideally, we don't need to distinguish them and sort them uniformly. I can address this issue in a future PR.

// provReadyTimeout caps how long a single worker will wait for a
// provision to reach Running. Beyond this, the job is marked Failed and
// the resource is released.
const provReadyTimeout = 2 * time.Minute
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not quite understand why it's 2 mins? any analytics?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on the provisioning speed of resource providers. The 2*min is more than enough for the current k8s provisioning process but we may need to adjust this value for other resource providers.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to measure it, what if we switch ot other cloud providers? I just like to know 2mins is from some of your test or a magic number you just defined?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 mins are from my local provisioning tests. I feel that we need to test and config different numbers for different cloud providers.

// MDS for a submitted job. A cancel that lands mid-Provision or
// mid-CreateBatch is honored at the worker's post-CreateBatch checkpoint
// (which forwards CancelBatch and releases the resource).
func (q *AsyncPlanner) Cancel(ctx context.Context, jobID string) (*plannerapi.Job, error) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it work with checkpoint pretty well?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we re-read state at the end of process() after CreateBatch returns to check if a cancel landed; covered by TestCancelDuringProvisioning* and TestCancelDuringCreateBatch*. Let me know if you see any potential issues.


aibrix := plannerclient.AIBrixExtraBody{
JobID: req.JobID,
PlannerDecision: &struct {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you replicate a struct here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a named type for reuse.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean this is not defined in earlier PR?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No

return out
}

func (q *AsyncPlanner) deleteJob(jobID string) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you manage jobs in the queue? Seem delete is not invoked if job is full or expired?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jobs are dequeued and removed from the Go channel implicitly by the worker's <-q.submit receive — no explicit delete is needed for the queue itself. The previous deleteJob has been renamed to rollbackEnqueue for clarity and is only used to undo the bookkeeping insert in two edge cases where Enqueue itself fails. Added some comments in the code as well.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you point it out? I didn't find the logics

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nwangfw added 2 commits May 14, 2026 15:50
Signed-off-by: Ning Wang <n.wang.chn@hotmail.com>
Signed-off-by: Ning Wang <n.wang.chn@hotmail.com>
// details (currently always nil — RM doesn't return them yet).
type PlannerDecision struct {
ProvisionID string `json:"provision_id,omitempty"`
ProvisionResourceDeadline int64 `json:"provision_resource_deadline,omitempty"`
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need put a comment here to explain if the deadline is a unix timestamp in seconds, milliseconds, etc.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

type PlannerDecision struct {
ProvisionID string `json:"provision_id,omitempty"`
ProvisionResourceDeadline int64 `json:"provision_resource_deadline,omitempty"`
ResourceDetails []struct {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. let's have enough comments to explain these fields
  2. need a comment to explain that the actual gpu num of each deployment is defined in the template, otherwise other contributors will get confused

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Comments here are added for every field here.

bc plannerclient.BatchClient
prov provisioner.Provisioner

submit chan string // buffered FIFO of pending JobIDs
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like planner's and queue's implementations are mixed together, let's split them for better flexibility

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point and have split the queue implementation into a separate file.

}
}

func (q *Planner) markFailed(jobID string, err error) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once a job is marked as failed, we need to release its provision (if valid) as well.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the current code will release the provision for if a job has been marked as failed under the process().

return
}
if job.state != jobStatePending {
// Cancel raced ahead; drop without provisioning.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't get what does the comment mean, let's put a simple example to explain it

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got this part after reviewing the cancel function. let's explicitly check if the state is cancelled and write error log if encountering other invalid states.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an example here regarding what does this statement mean. The state info is logged so that we can figure out the exact reason.

return &plannerapi.Job{JobID: jobID, Batch: placeholderBatch(req, openai.BatchStatusCancelled, enqueuedAt, terminalAt)}, nil
case jobStateSubmitted:
klog.Infof("[planner] cancel submitted job_id=%q batch_id=%q", jobID, batchID)
batch, err := q.bc.CancelBatch(ctx, batchID)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to release provision

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch. Release provision added.

@nwangfw nwangfw force-pushed the feat/planner-queue-impl-clean branch from 357ddbe to 9beeae4 Compare May 15, 2026 03:27
Comment thread apps/console/api/planner/impl/queue.go Outdated
const queueCapacity = 256

// jobQueue owns the buffered FIFO of pending JobIDs.
type jobQueue struct {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have a queue interface first and this impl as a fifo queue implementation. do not use the name of jobQueue, a name describing its functionality is preferred since we will have multiple queue implementations in the future.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. How about this version?

Signed-off-by: Ning Wang <n.wang.chn@hotmail.com>
@nwangfw nwangfw force-pushed the feat/planner-queue-impl-clean branch from 9beeae4 to 201ef7f Compare May 15, 2026 04:23
@Jeffwan Jeffwan changed the title feat(planner): implement async planner feat(batch): implement async planner for batch orchestration May 16, 2026
@Jeffwan Jeffwan merged commit 05551ab into vllm-project:main May 16, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants