Hi Flyte maintainers,
Sharing a proposal for a new first-class async connector for AWS EMR Serverless, distributed as flytekitplugins-awsemrserverless and deployed through the existing flyteconnector Helm chart. Task type: emr_serverless.
We've been running it in production for several months and would like to contribute it back so the broader Flyte community can benefit. A draft PR with the full implementation will follow this issue shortly. This write-up is mostly to give reviewers context up front and surface a few small naming and layout questions where your guidance would help us land cleanly.
Summary
The plugin supports two execution modes:
- Script mode: user provides an
s3://…/script.py plus Spark args. The connector calls StartJobRun and surfaces phase and logs back to FlytePropeller.
- Pythonic mode: user writes a regular Python
@task(task_config=EMRServerless(...)), similar to the Databricks plugin. The connector materializes a fast-registration bundle and runs pyflyte-fast-execute on EMR Serverless workers via a small bootstrap script the connector uploads to S3 (content-addressed, one PUT per plugin release).
Motivation
EMR Serverless is widely used for Spark and SQL workloads and currently has no first-class Flyte integration. Today users have two options, both with trade-offs:
- Wrap
boto3.start_job_run inside a plain @task. This loses type safety, retries, structured logs, and phase reporting.
- Use the existing
flytekitplugins-spark with EMR-on-EC2. This brings a heavier ops surface, longer cold-starts, and higher cost for bursty workloads.
A native async connector closes that gap and gives EMR Serverless feature parity with how Databricks Serverless is integrated today.
Design
Comparable plugins reviewed
We studied how other connectors handle remote script execution before settling on the design:
| Plugin |
Entrypoint strategy |
Why ours differs |
flytekitplugins-spark (Databricks) |
git_source clones flyteorg/flytetools pinned by SHA |
EMR Serverless StartJobRun API doesn't support git pulls; it takes an entryPoint S3 URI |
flytekitplugins-aws-sagemaker |
Container-as-entrypoint (pyflyte-execute) |
EMR Serverless requires entryPoint to be a .py file path, not a container command |
flytekitplugins-aws-batch |
Container-as-entrypoint |
Same as above |
flytekitplugins-perian |
Container-as-entrypoint |
Same as above |
EMR Serverless's API forces a file-reference entrypoint, similar to Databricks. Where Databricks uses a git URL, we use an s3:// URL with the entrypoint content-hashed so it's idempotent and easy to audit.
Connector architecture
AsyncConnectorBase subclass. Thin async wrapper over boto3 with a single _call chokepoint for testability.
- Phases mapped via
convert_to_flyte_phase.
- Logs surfaced as
TaskLog pointing at the EMR Serverless console plus S3 s3MonitoringConfiguration.logUri.
- Application lifecycle: prefer existing apps by ID; optional gated dynamic creation by name (off by default).
- IAM via IRSA on the connector pod's service account;
iam:PassRole only for the configured execution role.
Why a content-addressed S3 entrypoint upload (vs. git or container-as-entrypoint)
- Git: EMR Serverless API has no
git_source equivalent, so this would need a sidecar to fetch.
- Container-as-entrypoint: API requires
entryPoint: <.py file>, so it can't take a container command.
- S3 upload (chosen): connector reads its own
_entrypoint.py at startup, hashes it, and idempotently HEAD/PUTs to S3. One PUT per plugin release. Easy to audit (aws s3 cp it down and diff).
We're open to feedback if a different approach would fit the project's direction better.
Deployment
No changes to the upstream flyteconnector chart are needed. Every plugin-specific knob fits cleanly into podEnv:
image:
repository: cr.flyte.org/flyteorg/flyteagent # or a custom image with the plugin pre-installed
tag: "1.15.3"
serviceAccount:
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::<ACCOUNT>:role/FlyteEMRConnectorRole
podEnv:
AWS_REGION: us-east-1
FLYTE_EMR_ENTRYPOINT_S3_BUCKET: my-bucket
FLYTE_EMR_ALLOW_CREATE_APPLICATION: "false"
This follows the same pattern as Databricks, BigQuery, and Airflow today.
Areas where your guidance would help
A few small questions where we'd appreciate your steer before or during PR review:
- Plugin location. We're planning to land it as
flytekit/plugins/flytekit-awsemrserverless/, parallel to flytekit-spark and flytekit-aws-sagemaker. Happy to relocate if you prefer somewhere else.
- Default agent image. Once the package is on PyPI, would you be open to a small follow-up Dockerfile change to
flyteorg/flyte to bundle flytekitplugins-awsemrserverless into the canonical flyteagent image (parallel to how flytekitplugins-spark is bundled today)? We'll skip that PR if the preference is to keep the agent image lean.
- Naming. We're leaning toward
flytekitplugins-awsemrserverless to mirror flytekitplugins-awsbatch and flytekitplugins-awssagemaker. Glad to switch to flytekit-aws-emr-serverless or another spelling if that's the preferred convention.
- Worker image. For the EMR worker side we also ship a
Dockerfile that adds flytekit and the plugin onto the official EMR Spark base. We were planning to put it under plugins/flytekit-awsemrserverless/docker/. Sagemaker and Batch handle this differently, so happy to follow whatever pattern you prefer.
These are alignment-on-details rather than blockers. We'll proceed with the PR using the choices above and update based on whatever you call out in review.
Testing posture (in our staging repo today)
- 130+ unit tests, boto3 mocked at the
_call boundary (no moto dependency).
- Subprocess tests covering the bootstrap entrypoint.
- End-to-end tested against real EMR Serverless applications in IRSA-enabled EKS.
helm template and helm lint clean against the upstream flyteconnector chart with podEnv-driven values.
Next steps
A draft PR with the full plugin (code, tests, README, worker Dockerfile) will be opened against flyteorg/flytekit shortly after this issue, so review can happen on the concrete implementation rather than just this design summary. We'll link the PR back here once it's up.
Thanks for taking a look, and for the work you've put into Flyte. Looking forward to your feedback.
/cc @machichima @kumare3 @pingsutw
Hi Flyte maintainers,
Sharing a proposal for a new first-class async connector for AWS EMR Serverless, distributed as
flytekitplugins-awsemrserverlessand deployed through the existingflyteconnectorHelm chart. Task type:emr_serverless.We've been running it in production for several months and would like to contribute it back so the broader Flyte community can benefit. A draft PR with the full implementation will follow this issue shortly. This write-up is mostly to give reviewers context up front and surface a few small naming and layout questions where your guidance would help us land cleanly.
Summary
The plugin supports two execution modes:
s3://…/script.pyplus Spark args. The connector callsStartJobRunand surfaces phase and logs back to FlytePropeller.@task(task_config=EMRServerless(...)), similar to the Databricks plugin. The connector materializes a fast-registration bundle and runspyflyte-fast-executeon EMR Serverless workers via a small bootstrap script the connector uploads to S3 (content-addressed, one PUT per plugin release).Motivation
EMR Serverless is widely used for Spark and SQL workloads and currently has no first-class Flyte integration. Today users have two options, both with trade-offs:
boto3.start_job_runinside a plain@task. This loses type safety, retries, structured logs, and phase reporting.flytekitplugins-sparkwith EMR-on-EC2. This brings a heavier ops surface, longer cold-starts, and higher cost for bursty workloads.A native async connector closes that gap and gives EMR Serverless feature parity with how Databricks Serverless is integrated today.
Design
Comparable plugins reviewed
We studied how other connectors handle remote script execution before settling on the design:
flytekitplugins-spark(Databricks)git_sourceclonesflyteorg/flytetoolspinned by SHAStartJobRunAPI doesn't support git pulls; it takes anentryPointS3 URIflytekitplugins-aws-sagemakerpyflyte-execute)entryPointto be a.pyfile path, not a container commandflytekitplugins-aws-batchflytekitplugins-perianEMR Serverless's API forces a file-reference entrypoint, similar to Databricks. Where Databricks uses a git URL, we use an
s3://URL with the entrypoint content-hashed so it's idempotent and easy to audit.Connector architecture
AsyncConnectorBasesubclass. Thin async wrapper over boto3 with a single_callchokepoint for testability.convert_to_flyte_phase.TaskLogpointing at the EMR Serverless console plus S3s3MonitoringConfiguration.logUri.iam:PassRoleonly for the configured execution role.Why a content-addressed S3 entrypoint upload (vs. git or container-as-entrypoint)
git_sourceequivalent, so this would need a sidecar to fetch.entryPoint: <.py file>, so it can't take a container command._entrypoint.pyat startup, hashes it, and idempotently HEAD/PUTs to S3. One PUT per plugin release. Easy to audit (aws s3 cpit down and diff).We're open to feedback if a different approach would fit the project's direction better.
Deployment
No changes to the upstream
flyteconnectorchart are needed. Every plugin-specific knob fits cleanly intopodEnv:This follows the same pattern as Databricks, BigQuery, and Airflow today.
Areas where your guidance would help
A few small questions where we'd appreciate your steer before or during PR review:
flytekit/plugins/flytekit-awsemrserverless/, parallel toflytekit-sparkandflytekit-aws-sagemaker. Happy to relocate if you prefer somewhere else.flyteorg/flyteto bundleflytekitplugins-awsemrserverlessinto the canonicalflyteagentimage (parallel to howflytekitplugins-sparkis bundled today)? We'll skip that PR if the preference is to keep the agent image lean.flytekitplugins-awsemrserverlessto mirrorflytekitplugins-awsbatchandflytekitplugins-awssagemaker. Glad to switch toflytekit-aws-emr-serverlessor another spelling if that's the preferred convention.Dockerfilethat addsflytekitand the plugin onto the official EMR Spark base. We were planning to put it underplugins/flytekit-awsemrserverless/docker/. Sagemaker and Batch handle this differently, so happy to follow whatever pattern you prefer.These are alignment-on-details rather than blockers. We'll proceed with the PR using the choices above and update based on whatever you call out in review.
Testing posture (in our staging repo today)
_callboundary (nomotodependency).helm templateandhelm lintclean against the upstreamflyteconnectorchart withpodEnv-driven values.Next steps
A draft PR with the full plugin (code, tests, README, worker Dockerfile) will be opened against
flyteorg/flytekitshortly after this issue, so review can happen on the concrete implementation rather than just this design summary. We'll link the PR back here once it's up.Thanks for taking a look, and for the work you've put into Flyte. Looking forward to your feedback.
/cc @machichima @kumare3 @pingsutw