Skip to content

first cut: offload trigger inputs#7396

Draft
katrogan wants to merge 1 commit into
mainfrom
katrina/eng26-532-trigger-service-create-execute-trigger-store-pass-offloaded
Draft

first cut: offload trigger inputs#7396
katrogan wants to merge 1 commit into
mainfrom
katrina/eng26-532-trigger-service-create-execute-trigger-store-pass-offloaded

Conversation

@katrogan
Copy link
Copy Markdown
Contributor

Tracking issue

Why are the changes needed?

What changes were proposed in this pull request?

How was this patch tested?

Labels

Please add one or more of the following labels to categorize your PR:

  • added: For new features.
  • changed: For changes in existing functionality.
  • deprecated: For soon-to-be-removed features.
  • removed: For features being removed.
  • fixed: For any bug fixed.
  • security: In case of vulnerabilities

This is important to improve the readability of release notes.

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Stack

If you do use git town to manage PR Stacks, the stack relevant to this PR
will show below. Otherwise, you can ignore this section.

Docs link

Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
Copilot AI review requested due to automatic review settings May 20, 2026 18:42
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces an “offloaded trigger inputs” path by extending the Trigger and RunSpec APIs to reference offloaded input blobs, while still supporting scheduled-trigger fires that need a per-fire kickoff timestamp. It also propagates kickoff-time metadata into the executor/runtime so the kickoff timestamp can be injected at input-read time and used to avoid cache-key collisions.

Changes:

  • Add offloaded_inputs to TriggerSpec and enforce mutual exclusivity with inline inputs at deploy time.
  • Add KickoffTimeArg to RunSpec and inject kickoff timestamp into runtime inputs + fold kickoff into cache-key hashing for offloaded-input runs.
  • Propagate kickoff metadata through TaskActionSpec into plugin machinery templating and executor input reading.

Reviewed changes

Copilot reviewed 18 out of 27 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
runs/service/utils.go Adds kickoff-folding helper for offloaded-input hashing.
runs/service/trigger_service.go Validates trigger spec does not set both inline and offloaded inputs.
runs/service/run_service.go Folds kickoff arg into effective inputs hash for cache-key generation.
gen/ts/flyteidl2/trigger/trigger_definition_pb.ts TS bindings updated for offloaded_inputs on triggers.
gen/ts/flyteidl2/task/run_pb.ts TS bindings updated for KickoffTimeArg and kickoff_time_arg.
gen/python/flyteidl2/trigger/trigger_definition_pb2.pyi Python typings updated for trigger offloaded inputs.
gen/python/flyteidl2/trigger/trigger_definition_pb2.py Python codegen updated for trigger offloaded inputs.
gen/python/flyteidl2/task/run_pb2.pyi Python typings updated for KickoffTimeArg.
gen/python/flyteidl2/task/run_pb2.py Python codegen updated for KickoffTimeArg.
gen/go/flyteidl2/trigger/trigger_definition.pb.validate.go Go validation codegen updated for trigger spec fields.
gen/go/flyteidl2/trigger/trigger_definition.pb.go Go proto updated for trigger offloaded_inputs.
gen/go/flyteidl2/task/run.pb.validate.go Go validation codegen updated for KickoffTimeArg.
gen/go/flyteidl2/task/run.pb.go Go proto updated for KickoffTimeArg and kickoff_time_arg.
flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go Updates mocks for new TaskExecutionMetadata methods.
flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go Updates mocks for new TaskExecutionMetadata methods.
flyteplugins/go/tasks/pluginmachinery/core/template/template.go Adds template substitutions for kickoff arg name/value.
flyteplugins/go/tasks/pluginmachinery/core/template/template_test.go Updates tests/mocks for kickoff metadata methods.
flyteplugins/go/tasks/pluginmachinery/core/mocks/mocks.go Adds mock methods for kickoff metadata getters.
flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go Extends TaskExecutionMetadata interface with kickoff getters.
flyteidl2/trigger/trigger_definition.proto Adds offloaded_inputs to TriggerSpec.
flyteidl2/task/run.proto Adds KickoffTimeArg and kickoff_time_arg to RunSpec.
executor/pkg/plugin/task_exec_metadata.go Populates kickoff metadata from TaskAction CRD into plugin metadata.
executor/pkg/plugin/task_exec_context.go Injects kickoff timestamp into inputs at read time when configured.
executor/api/v1/taskaction_types.go Extends TaskActionSpec with kickoff arg name/time fields.
actions/k8s/client.go Projects RunSpec kickoff fields into TaskAction CRD spec.
Files not reviewed (7)
  • flyteplugins/go/tasks/pluginmachinery/core/mocks/mocks.go: Language not supported
  • gen/go/flyteidl2/task/run.pb.go: Language not supported
  • gen/go/flyteidl2/task/run.pb.validate.go: Language not supported
  • gen/go/flyteidl2/trigger/trigger_definition.pb.go: Language not supported
  • gen/go/flyteidl2/trigger/trigger_definition.pb.validate.go: Language not supported
  • gen/python/flyteidl2/task/run_pb2.py: Language not supported
  • gen/python/flyteidl2/trigger/trigger_definition_pb2.py: Language not supported

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread runs/service/utils.go
Comment on lines +59 to +63
h := sha256.New()
h.Write([]byte(blobHash))
h.Write([]byte(kickoffArgName))
h.Write([]byte(strconv.FormatInt(kickoffTime.UnixNano(), 10)))
return base64.StdEncoding.EncodeToString(h.Sum(nil))
Comment thread runs/service/utils.go
Comment on lines +47 to +56
// FoldKickoffIntoInputsHash composes a deterministic effective inputs hash from an offloaded
// inputs hash plus a kickoff timestamp argument. Used by the offloaded trigger path where the
// uploaded blob excludes the kickoff arg by design — without folding, every fire of the same
// trigger would produce the same cache key.
//
// Honors cacheIgnoreInputVars: if the kickoff arg is cache-ignored, the blob hash is returned
// unchanged so fires share a cache key. Otherwise returns
// base64(sha256(blobHash || argName || unixNano(kickoffTime))).
func FoldKickoffIntoInputsHash(blobHash, kickoffArgName string, kickoffTime time.Time, cacheIgnoreInputVars []string) string {
if kickoffArgName == "" || slices.Contains(cacheIgnoreInputVars, kickoffArgName) {
Comment on lines +212 to +216
func validateTriggerInputs(spec *triggerpb.TriggerSpec) error {
if spec.GetInputs() != nil && spec.GetOffloadedInputs() != nil {
return connect.NewError(connect.CodeInvalidArgument,
fmt.Errorf("trigger spec sets both inputs and offloaded_inputs; zero-trust environments do not support passing inputs directly to the control plane — call UploadInputs first and set offloaded_inputs"))
}
Comment on lines +245 to +248
effectiveHash = FoldKickoffIntoInputsHash(
effectiveHash,
kt.GetInputArgName(),
kt.GetKickoffTime().AsTime(),
Comment on lines 239 to +252
if taskSpec.GetTaskTemplate().GetMetadata().GetDiscoverable() {
cacheKey, err = generateCacheKeyForTask(taskSpec.GetTaskTemplate(), iw.OffloadedInputData.GetInputsHash())
// When a kickoff arg is set (scheduled-trigger fire), fold its name + time into the
// effective inputs hash so different fires don't collide on the cache key. The blob
// is not re-read — the fold is deterministic over (blob_hash, arg_name, time).
effectiveHash := iw.OffloadedInputData.GetInputsHash()
if kt := runSpec.GetKickoffTimeArg(); kt != nil {
effectiveHash = FoldKickoffIntoInputsHash(
effectiveHash,
kt.GetInputArgName(),
kt.GetKickoffTime().AsTime(),
taskSpec.GetTaskTemplate().GetMetadata().GetCacheIgnoreInputVars(),
)
}
cacheKey, err = generateCacheKeyForTask(taskSpec.GetTaskTemplate(), effectiveHash)
Comment thread actions/k8s/client.go
Comment on lines +794 to +800
if kt := runSpec.GetKickoffTimeArg(); kt != nil {
taskAction.Spec.KickoffTimeArgName = kt.GetInputArgName()
ts := metav1.NewTime(kt.GetKickoffTime().AsTime())
taskAction.Spec.KickoffTime = &ts
} else {
taskAction.Spec.KickoffTimeArgName = ""
taskAction.Spec.KickoffTime = nil
Comment thread actions/k8s/client.go
Comment on lines +794 to +801
if kt := runSpec.GetKickoffTimeArg(); kt != nil {
taskAction.Spec.KickoffTimeArgName = kt.GetInputArgName()
ts := metav1.NewTime(kt.GetKickoffTime().AsTime())
taskAction.Spec.KickoffTime = &ts
} else {
taskAction.Spec.KickoffTimeArgName = ""
taskAction.Spec.KickoffTime = nil
}
Comment on lines +157 to +158
val = kickoffTimeArgNameRegex.ReplaceAllString(val, params.TaskExecMetadata.GetKickoffTimeArgName())
val = kickoffTimeValueRegex.ReplaceAllString(val, formatKickoffTimeValue(params.TaskExecMetadata.GetKickoffTimeValue()))
Comment on lines +108 to +112
var kickoffTimeValue time.Time
if ta.Spec.KickoffTime != nil {
kickoffTimeValue = ta.Spec.KickoffTime.Time
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants