Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions flyteidl2/workflow/run_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ package flyteidl2.workflow;
import "buf/validate/validate.proto";
import "flyteidl2/common/identifier.proto";
import "flyteidl2/common/list.proto";
import "flyteidl2/common/phase.proto";
import "flyteidl2/common/run.proto";
import "flyteidl2/core/execution.proto";
import "flyteidl2/task/common.proto";
import "flyteidl2/task/run.proto";
import "flyteidl2/task/task_definition.proto";
import "flyteidl2/workflow/run_definition.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

option go_package = "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/workflow";
Expand Down Expand Up @@ -63,6 +65,12 @@ service RunService {
// to existing ones from the point of invocation.
rpc WatchActions(WatchActionsRequest) returns (stream WatchActionsResponse) {}

// Stream a windowed slice of a run's action list. Client sends a Subscribe
// message followed by UpdateWindow messages as the user scrolls / expands /
// filters; server streams windowed responses containing only the visible
// overscan slice, ancestor path, and group aggregates.
rpc WatchWindowedActions(stream WatchWindowedActionsRequest) returns (stream WatchWindowedActionsResponse) {}

// Stream of k8s cluster events in human readable form
rpc WatchClusterEvents(WatchClusterEventsRequest) returns (stream WatchClusterEventsResponse) {}

Expand Down Expand Up @@ -339,6 +347,165 @@ message WatchActionsResponse {
repeated EnrichedAction enriched_actions = 1;
}

// WatchWindowedActionsRequest is the client → server message on the
// bidi stream. The first message must be Subscribe; subsequent
// messages must be UpdateWindow.
message WatchWindowedActionsRequest {
oneof msg {
Subscribe subscribe = 1;
UpdateWindow update_window = 2;
}

// Subscribe opens the subscription. Sent exactly once as the first
// message on the stream.
message Subscribe {
common.RunIdentifier run_id = 1 [(buf.validate.field).required = true];

// Action name or group key currently focused in the UI; the
// server centers the window on this item.
string selected_item_id = 2;

// Extra items to materialize above/below the selected item. Server
// clamps; 0 means "use server default".
int32 overscan_before = 3;
int32 overscan_after = 4;

// Items the client wants expanded. Key = action name OR group key.
map<string, NodeExpansionParams> expanded_nodes = 5;

// Optional filters; same semantics as legacy WatchActions filters.
repeated common.ActionPhase phase_filter = 6;
string name_filter = 7;
}

// UpdateWindow updates one or more window parameters. Sent as the
// user scrolls, expands/collapses, or changes filters.
message UpdateWindow {
string selected_item_id = 1;
int32 overscan_before = 2;
int32 overscan_after = 3;
map<string, NodeExpansionParams> expanded_nodes = 4;
repeated common.ActionPhase phase_filter = 5;
string name_filter = 6;
}
}

// NodeExpansionParams bounds how many children the server should
// materialize under an expanded node or group. limit <= 0 → server
// default; server also clamps to a hard cap.
message NodeExpansionParams {
int32 offset = 1;
int32 limit = 2;
}

// WatchWindowedActionsResponse is one server → client tick.
message WatchWindowedActionsResponse {
// Window slice in flat_index order — only the items the client
// can render right now (selected ± overscan).
repeated WindowedItem window_items = 1;

// Root → selected_item's parent. Sent on every response for
// breadcrumb rendering; bounded by tree depth.
repeated WindowedItem ancestors = 2;

// Length of the full expanded flat list (not just the window).
int64 total_flat_count = 3;

// Position of selected_item_id within the flat list.
int64 selected_flat_index = 4;

// True on the response that completes the initial hydration; the
// client should treat this as the "first paint done" signal.
bool initial_snapshot_complete = 5;

// Sticky truncation notices — present on every response until the
// server confirms client receipt.
repeated TruncationNotice truncations = 6;

// True when the server had to drop a tick due to backpressure; the
// client should re-send its UpdateWindow to force a fresh snapshot.
bool resync_hint = 7;
}

// WindowedItem is one slot in the windowed flat list — either a
// regular action or a synthetic group folder.
message WindowedItem {
oneof item {
EnrichedAction action = 1;
GroupNode group = 2;
}
int32 depth = 3;
bool is_expanded = 4;
}

// ActionLeaf is one entry in a GroupAggregations top-3 list.
message ActionLeaf {
// Member action name; resolves to a node under the group's parent.
string action_id = 1;
// Interned short name from the member's task spec.
string short_name = 2;
// Metric for this entry — wall-clock duration, running duration, or
// setup time depending on which list the leaf appears in.
google.protobuf.Duration duration = 3;
}

// GroupAggregations carries the four top-3 outlier lists for a group.
// Each list holds the three members with the largest metric, sorted
// DESC.
message GroupAggregations {
// Members in terminal FAILED phase, by total duration.
repeated ActionLeaf failed = 1;
// By total wall-clock duration (created → ended).
repeated ActionLeaf longest_duration = 2;
// By time spent in RUNNING phase.
repeated ActionLeaf longest_running = 3;
// By setup time = total duration − running duration.
repeated ActionLeaf longest_setup = 4;
}

// GroupNode is the synthetic (parent, action_group) folder. The
// server emits one of these in place of a contiguous block of
// sibling actions sharing an action_group.
message GroupNode {
// Synthetic key — "{parent_action_id}::group::{group_name}".
string id = 1;
string group_name = 2;
string parent_id = 3;

// Per-phase member counts (key = ActionPhase enum value).
map<int32, int32> child_phase_counts = 4;

// Earliest StartedAt across members; unset until any member starts.
google.protobuf.Timestamp earliest_start_time = 5;
// Latest EndedAt across members; unset while any member is still
// running.
google.protobuf.Timestamp latest_end_time = 6;

int32 total_children = 7;
bool meets_filter = 8;

GroupAggregations aggregations = 9;
}

// TruncationNotice tells the client that the response is incomplete
// due to a hard cap. Sent sticky — server keeps re-attaching the
// notice until receipt is confirmed via a subsequent client message.
message TruncationNotice {
enum Reason {
REASON_UNSPECIFIED = 0;
// MaxActionsPerRun hit — the run has more actions than the store tracks.
REASON_RUN_NODE_LIMIT = 1;
// MaxChildrenPerParent hit — at least one parent has more children than tracked.
REASON_PARENT_CHILD_LIMIT = 2;
// Transient; the store is still hydrating from the DB.
REASON_HYDRATING = 3;
}
Reason reason = 1;
int64 tracked_action_count = 2;
int64 known_total_action_count = 3;
string message = 4;
}

message WatchClusterEventsRequest {
common.ActionIdentifier id = 1 [(buf.validate.field).required = true];

Expand Down
Loading
Loading