Skip to content

[Connector] AWS EMR Serverless async connector plugin (flytekitplugins-awsemrserverless) #7286

@rohitrsh

Description

@rohitrsh

Hi Flyte maintainers,

Sharing a proposal for a new first-class async connector for AWS EMR Serverless, distributed as flytekitplugins-awsemrserverless and deployed through the existing flyteconnector Helm chart. Task type: emr_serverless.

We've been running it in production for several months and would like to contribute it back so the broader Flyte community can benefit. A draft PR with the full implementation will follow this issue shortly. This write-up is mostly to give reviewers context up front and surface a few small naming and layout questions where your guidance would help us land cleanly.

Summary

The plugin supports two execution modes:

  • Script mode: user provides an s3://…/script.py plus Spark args. The connector calls StartJobRun and surfaces phase and logs back to FlytePropeller.
  • Pythonic mode: user writes a regular Python @task(task_config=EMRServerless(...)), similar to the Databricks plugin. The connector materializes a fast-registration bundle and runs pyflyte-fast-execute on EMR Serverless workers via a small bootstrap script the connector uploads to S3 (content-addressed, one PUT per plugin release).

Motivation

EMR Serverless is widely used for Spark and SQL workloads and currently has no first-class Flyte integration. Today users have two options, both with trade-offs:

  • Wrap boto3.start_job_run inside a plain @task. This loses type safety, retries, structured logs, and phase reporting.
  • Use the existing flytekitplugins-spark with EMR-on-EC2. This brings a heavier ops surface, longer cold-starts, and higher cost for bursty workloads.

A native async connector closes that gap and gives EMR Serverless feature parity with how Databricks Serverless is integrated today.

Design

Comparable plugins reviewed

We studied how other connectors handle remote script execution before settling on the design:

Plugin Entrypoint strategy Why ours differs
flytekitplugins-spark (Databricks) git_source clones flyteorg/flytetools pinned by SHA EMR Serverless StartJobRun API doesn't support git pulls; it takes an entryPoint S3 URI
flytekitplugins-aws-sagemaker Container-as-entrypoint (pyflyte-execute) EMR Serverless requires entryPoint to be a .py file path, not a container command
flytekitplugins-aws-batch Container-as-entrypoint Same as above
flytekitplugins-perian Container-as-entrypoint Same as above

EMR Serverless's API forces a file-reference entrypoint, similar to Databricks. Where Databricks uses a git URL, we use an s3:// URL with the entrypoint content-hashed so it's idempotent and easy to audit.

Connector architecture

  • AsyncConnectorBase subclass. Thin async wrapper over boto3 with a single _call chokepoint for testability.
  • Phases mapped via convert_to_flyte_phase.
  • Logs surfaced as TaskLog pointing at the EMR Serverless console plus S3 s3MonitoringConfiguration.logUri.
  • Application lifecycle: prefer existing apps by ID; optional gated dynamic creation by name (off by default).
  • IAM via IRSA on the connector pod's service account; iam:PassRole only for the configured execution role.

Why a content-addressed S3 entrypoint upload (vs. git or container-as-entrypoint)

  • Git: EMR Serverless API has no git_source equivalent, so this would need a sidecar to fetch.
  • Container-as-entrypoint: API requires entryPoint: <.py file>, so it can't take a container command.
  • S3 upload (chosen): connector reads its own _entrypoint.py at startup, hashes it, and idempotently HEAD/PUTs to S3. One PUT per plugin release. Easy to audit (aws s3 cp it down and diff).

We're open to feedback if a different approach would fit the project's direction better.

Deployment

No changes to the upstream flyteconnector chart are needed. Every plugin-specific knob fits cleanly into podEnv:

image:
  repository: cr.flyte.org/flyteorg/flyteagent  # or a custom image with the plugin pre-installed
  tag: "1.15.3"
serviceAccount:
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::<ACCOUNT>:role/FlyteEMRConnectorRole
podEnv:
  AWS_REGION: us-east-1
  FLYTE_EMR_ENTRYPOINT_S3_BUCKET: my-bucket
  FLYTE_EMR_ALLOW_CREATE_APPLICATION: "false"

This follows the same pattern as Databricks, BigQuery, and Airflow today.

Areas where your guidance would help

A few small questions where we'd appreciate your steer before or during PR review:

  1. Plugin location. We're planning to land it as flytekit/plugins/flytekit-awsemrserverless/, parallel to flytekit-spark and flytekit-aws-sagemaker. Happy to relocate if you prefer somewhere else.
  2. Default agent image. Once the package is on PyPI, would you be open to a small follow-up Dockerfile change to flyteorg/flyte to bundle flytekitplugins-awsemrserverless into the canonical flyteagent image (parallel to how flytekitplugins-spark is bundled today)? We'll skip that PR if the preference is to keep the agent image lean.
  3. Naming. We're leaning toward flytekitplugins-awsemrserverless to mirror flytekitplugins-awsbatch and flytekitplugins-awssagemaker. Glad to switch to flytekit-aws-emr-serverless or another spelling if that's the preferred convention.
  4. Worker image. For the EMR worker side we also ship a Dockerfile that adds flytekit and the plugin onto the official EMR Spark base. We were planning to put it under plugins/flytekit-awsemrserverless/docker/. Sagemaker and Batch handle this differently, so happy to follow whatever pattern you prefer.

These are alignment-on-details rather than blockers. We'll proceed with the PR using the choices above and update based on whatever you call out in review.

Testing posture (in our staging repo today)

  • 130+ unit tests, boto3 mocked at the _call boundary (no moto dependency).
  • Subprocess tests covering the bootstrap entrypoint.
  • End-to-end tested against real EMR Serverless applications in IRSA-enabled EKS.
  • helm template and helm lint clean against the upstream flyteconnector chart with podEnv-driven values.

Next steps

A draft PR with the full plugin (code, tests, README, worker Dockerfile) will be opened against flyteorg/flytekit shortly after this issue, so review can happen on the concrete implementation rather than just this design summary. We'll link the PR back here once it's up.

Thanks for taking a look, and for the work you've put into Flyte. Looking forward to your feedback.

/cc @machichima @kumare3 @pingsutw

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions