first cut: offload trigger inputs#7396
Draft
katrogan wants to merge 1 commit into
Draft
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces an “offloaded trigger inputs” path by extending the Trigger and RunSpec APIs to reference offloaded input blobs, while still supporting scheduled-trigger fires that need a per-fire kickoff timestamp. It also propagates kickoff-time metadata into the executor/runtime so the kickoff timestamp can be injected at input-read time and used to avoid cache-key collisions.
Changes:
- Add
offloaded_inputstoTriggerSpecand enforce mutual exclusivity with inlineinputsat deploy time. - Add
KickoffTimeArgtoRunSpecand inject kickoff timestamp into runtime inputs + fold kickoff into cache-key hashing for offloaded-input runs. - Propagate kickoff metadata through TaskActionSpec into plugin machinery templating and executor input reading.
Reviewed changes
Copilot reviewed 18 out of 27 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| runs/service/utils.go | Adds kickoff-folding helper for offloaded-input hashing. |
| runs/service/trigger_service.go | Validates trigger spec does not set both inline and offloaded inputs. |
| runs/service/run_service.go | Folds kickoff arg into effective inputs hash for cache-key generation. |
| gen/ts/flyteidl2/trigger/trigger_definition_pb.ts | TS bindings updated for offloaded_inputs on triggers. |
| gen/ts/flyteidl2/task/run_pb.ts | TS bindings updated for KickoffTimeArg and kickoff_time_arg. |
| gen/python/flyteidl2/trigger/trigger_definition_pb2.pyi | Python typings updated for trigger offloaded inputs. |
| gen/python/flyteidl2/trigger/trigger_definition_pb2.py | Python codegen updated for trigger offloaded inputs. |
| gen/python/flyteidl2/task/run_pb2.pyi | Python typings updated for KickoffTimeArg. |
| gen/python/flyteidl2/task/run_pb2.py | Python codegen updated for KickoffTimeArg. |
| gen/go/flyteidl2/trigger/trigger_definition.pb.validate.go | Go validation codegen updated for trigger spec fields. |
| gen/go/flyteidl2/trigger/trigger_definition.pb.go | Go proto updated for trigger offloaded_inputs. |
| gen/go/flyteidl2/task/run.pb.validate.go | Go validation codegen updated for KickoffTimeArg. |
| gen/go/flyteidl2/task/run.pb.go | Go proto updated for KickoffTimeArg and kickoff_time_arg. |
| flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go | Updates mocks for new TaskExecutionMetadata methods. |
| flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go | Updates mocks for new TaskExecutionMetadata methods. |
| flyteplugins/go/tasks/pluginmachinery/core/template/template.go | Adds template substitutions for kickoff arg name/value. |
| flyteplugins/go/tasks/pluginmachinery/core/template/template_test.go | Updates tests/mocks for kickoff metadata methods. |
| flyteplugins/go/tasks/pluginmachinery/core/mocks/mocks.go | Adds mock methods for kickoff metadata getters. |
| flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go | Extends TaskExecutionMetadata interface with kickoff getters. |
| flyteidl2/trigger/trigger_definition.proto | Adds offloaded_inputs to TriggerSpec. |
| flyteidl2/task/run.proto | Adds KickoffTimeArg and kickoff_time_arg to RunSpec. |
| executor/pkg/plugin/task_exec_metadata.go | Populates kickoff metadata from TaskAction CRD into plugin metadata. |
| executor/pkg/plugin/task_exec_context.go | Injects kickoff timestamp into inputs at read time when configured. |
| executor/api/v1/taskaction_types.go | Extends TaskActionSpec with kickoff arg name/time fields. |
| actions/k8s/client.go | Projects RunSpec kickoff fields into TaskAction CRD spec. |
Files not reviewed (7)
- flyteplugins/go/tasks/pluginmachinery/core/mocks/mocks.go: Language not supported
- gen/go/flyteidl2/task/run.pb.go: Language not supported
- gen/go/flyteidl2/task/run.pb.validate.go: Language not supported
- gen/go/flyteidl2/trigger/trigger_definition.pb.go: Language not supported
- gen/go/flyteidl2/trigger/trigger_definition.pb.validate.go: Language not supported
- gen/python/flyteidl2/task/run_pb2.py: Language not supported
- gen/python/flyteidl2/trigger/trigger_definition_pb2.py: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+59
to
+63
| h := sha256.New() | ||
| h.Write([]byte(blobHash)) | ||
| h.Write([]byte(kickoffArgName)) | ||
| h.Write([]byte(strconv.FormatInt(kickoffTime.UnixNano(), 10))) | ||
| return base64.StdEncoding.EncodeToString(h.Sum(nil)) |
Comment on lines
+47
to
+56
| // FoldKickoffIntoInputsHash composes a deterministic effective inputs hash from an offloaded | ||
| // inputs hash plus a kickoff timestamp argument. Used by the offloaded trigger path where the | ||
| // uploaded blob excludes the kickoff arg by design — without folding, every fire of the same | ||
| // trigger would produce the same cache key. | ||
| // | ||
| // Honors cacheIgnoreInputVars: if the kickoff arg is cache-ignored, the blob hash is returned | ||
| // unchanged so fires share a cache key. Otherwise returns | ||
| // base64(sha256(blobHash || argName || unixNano(kickoffTime))). | ||
| func FoldKickoffIntoInputsHash(blobHash, kickoffArgName string, kickoffTime time.Time, cacheIgnoreInputVars []string) string { | ||
| if kickoffArgName == "" || slices.Contains(cacheIgnoreInputVars, kickoffArgName) { |
Comment on lines
+212
to
+216
| func validateTriggerInputs(spec *triggerpb.TriggerSpec) error { | ||
| if spec.GetInputs() != nil && spec.GetOffloadedInputs() != nil { | ||
| return connect.NewError(connect.CodeInvalidArgument, | ||
| fmt.Errorf("trigger spec sets both inputs and offloaded_inputs; zero-trust environments do not support passing inputs directly to the control plane — call UploadInputs first and set offloaded_inputs")) | ||
| } |
Comment on lines
+245
to
+248
| effectiveHash = FoldKickoffIntoInputsHash( | ||
| effectiveHash, | ||
| kt.GetInputArgName(), | ||
| kt.GetKickoffTime().AsTime(), |
Comment on lines
239
to
+252
| if taskSpec.GetTaskTemplate().GetMetadata().GetDiscoverable() { | ||
| cacheKey, err = generateCacheKeyForTask(taskSpec.GetTaskTemplate(), iw.OffloadedInputData.GetInputsHash()) | ||
| // When a kickoff arg is set (scheduled-trigger fire), fold its name + time into the | ||
| // effective inputs hash so different fires don't collide on the cache key. The blob | ||
| // is not re-read — the fold is deterministic over (blob_hash, arg_name, time). | ||
| effectiveHash := iw.OffloadedInputData.GetInputsHash() | ||
| if kt := runSpec.GetKickoffTimeArg(); kt != nil { | ||
| effectiveHash = FoldKickoffIntoInputsHash( | ||
| effectiveHash, | ||
| kt.GetInputArgName(), | ||
| kt.GetKickoffTime().AsTime(), | ||
| taskSpec.GetTaskTemplate().GetMetadata().GetCacheIgnoreInputVars(), | ||
| ) | ||
| } | ||
| cacheKey, err = generateCacheKeyForTask(taskSpec.GetTaskTemplate(), effectiveHash) |
Comment on lines
+794
to
+800
| if kt := runSpec.GetKickoffTimeArg(); kt != nil { | ||
| taskAction.Spec.KickoffTimeArgName = kt.GetInputArgName() | ||
| ts := metav1.NewTime(kt.GetKickoffTime().AsTime()) | ||
| taskAction.Spec.KickoffTime = &ts | ||
| } else { | ||
| taskAction.Spec.KickoffTimeArgName = "" | ||
| taskAction.Spec.KickoffTime = nil |
Comment on lines
+794
to
+801
| if kt := runSpec.GetKickoffTimeArg(); kt != nil { | ||
| taskAction.Spec.KickoffTimeArgName = kt.GetInputArgName() | ||
| ts := metav1.NewTime(kt.GetKickoffTime().AsTime()) | ||
| taskAction.Spec.KickoffTime = &ts | ||
| } else { | ||
| taskAction.Spec.KickoffTimeArgName = "" | ||
| taskAction.Spec.KickoffTime = nil | ||
| } |
Comment on lines
+157
to
+158
| val = kickoffTimeArgNameRegex.ReplaceAllString(val, params.TaskExecMetadata.GetKickoffTimeArgName()) | ||
| val = kickoffTimeValueRegex.ReplaceAllString(val, formatKickoffTimeValue(params.TaskExecMetadata.GetKickoffTimeValue())) |
Comment on lines
+108
to
+112
| var kickoffTimeValue time.Time | ||
| if ta.Spec.KickoffTime != nil { | ||
| kickoffTimeValue = ta.Spec.KickoffTime.Time | ||
| } | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Tracking issue
Why are the changes needed?
What changes were proposed in this pull request?
How was this patch tested?
Labels
Please add one or more of the following labels to categorize your PR:
This is important to improve the readability of release notes.
Setup process
Screenshots
Check all the applicable boxes
Related PRs
Stack
If you do use
git townto manage PR Stacks, the stack relevant to this PRwill show below. Otherwise, you can ignore this section.
Docs link