Skip to content

Conversation

@coolbeevip
Copy link
Collaborator

Description

Fixes #3352

Checklist

Go over all the following points, and put an x in all the boxes that apply.

  • I have read the CONTRIBUTION guide (required)
  • I have linked this PR to an issue using the Development section on the right sidebar or by adding Fixes #issue-number in the PR description (required)
  • I have checked if any dependencies need to be added or updated in pyproject.toml and uv lock
  • I have updated the tests accordingly (required for a bug fix or a new feature)
  • I have updated the documentation if needed:
  • I have added examples if this is a new feature

If you are unsure about any of these, don't hesitate to ask. We are here to help!

@github-actions github-actions bot added the Review Required PR need to be reviewed label Nov 3, 2025
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 3, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch issue-3352

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coolbeevip coolbeevip marked this pull request as ready for review November 4, 2025 00:50
@coolbeevip coolbeevip changed the title WIP: refactor(workforce): Refactor WorkforceCallback and all related callbacks to async interface refactor(workforce): Refactor WorkforceCallback and all related callbacks to async interface Nov 4, 2025
@Wendong-Fan Wendong-Fan added this to the Sprint 41 milestone Nov 5, 2025
Copy link
Collaborator

@fengju0213 fengju0213 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything else looks great—my only concern is that we should keep backward compatibility (or
provide sync wrappers) so existing code doesn’t break when upgrading.
ask for some comments cc @Wendong-Fan

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the excellent contribution—the async-first design should definitely help with
throughput and allow callbacks to run I/O without blocking. One question: since the long-
standing public Workforce APIs (e.g., add_single_agent_worker, reset, clone) are now
asynchronous, do we plan to retain the former synchronous entry points (even if just as thin
wrappers) to preserve backward compatibility for existing scripts and examples?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out — sync wrappers can introduce extra maintenance overhead and complexity. I’ll defer to the @Wendong-Fan , any thoughts or preferences on keeping them vs. deprecating?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @coolbeevip,

Would you mind re-requesting a review after a decision is made on whether to include backwards compatibility.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@waleedalzarooni @fengju0213 Thanks for the thorough review and pointing out the backward compatibility concern!

I agree this is important to avoid breaking existing code. Since I'm not strongly opinionated here, I'd like to defer to the community's preference:

Options:

  1. Add sync wrappers (e.g., add_single_agent_worker_sync() that calls the async version under the hood):

    • Pros: Minimal disruption for users.
    • Cons: Extra maintenance overhead (two APIs to keep in sync).
  2. Deprecate old sync APIs (with warnings) and guide users to migrate to async:

    • Pros: Clean async-first design long-term.
    • Cons: Requires users to update code (but we can provide migration guide).
  3. Force async-only (no wrappers):

    • Pros: Simplest codebase.
    • Cons: Breaks all existing sync usage immediately.

What do you think? Any preference? Once we decide, I'll implement and re-request reviews.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @coolbeevip , @fengju0213 and @waleedalzarooni , i would prefer implement Option 2, keeping method() as deprecated sync wrappers and adding method_async() as the primary async API

i added another enhance PR #3464, feel free to review and leave your comments

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Wendong-Fan. I've merged this PR and updated the related code in examples.

input_message, response_format=QueryResponse
)
elif isinstance(pipeline_template, Workforce):
pipeline = pipeline_template.clone() # type: ignore[assignment]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still calls Workforce.clone() synchronously

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder. This line uses '# type: ignore[...]' to suppress mypy checks. I will fix this issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Collaborator

@waleedalzarooni waleedalzarooni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great integration @coolbeevip!,

just a couple comments on my side, also additionally there is an example file that still uses the old synchronous syntax, here is some guidance on what to change:

The file examples/workforce/simple_workforce_mcp.py was NOT updated in this PR but calls the now-async method:

# Line 47 in examples/workforce/simple_workforce_mcp.py
def main():  #  Synchronous function
    workforce = Workforce(description="Simple Analysis Team")
    analyst = ChatAgent(system_message=analyst_msg)
    workforce.add_single_agent_worker(  #  Calling async method without await
        description="General purpose analyst", worker=analyst

self._child_listening_tasks.append(
asyncio.create_task(new_node.start())
)
self._child_listening_tasks.append(await new_node.start())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self._child_listening_tasks.append(await new_node.start())
self._child_listening_tasks.append(asyncio.create_task(new_node.start()))

The start() method is an async coroutine that runs indefinitely - it's a long-running background listener that processes messages from a channel. When you write await new_node.start():

  1. The execution blocks at this line
  2. It waits for start() to complete
  3. But start() never completes - it runs forever!
  4. The function never returns, so the worker is never added to the list

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


for child in self._children:
self._notify_worker_created(child)
asyncio.run(self._notify_worker_created(child))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
asyncio.run(self._notify_worker_created(child))
self._notify_worker_created(child)

asyncio.run() cannot be called from a running event loop

illustration of issue:

async def main():
    # This creates an event loop and runs main()
    workforce = Workforce(...)  #PROBLEM HAPPENS HERE

asyncio.run(main())  # Event loop is already running here

When Workforce.__init__() is called:
def __init__(self):
    # ... initialization code ...
    for child in self._children:
        asyncio.run(self._notify_worker_created(child))
        RuntimeError: asyncio.run() cannot be called from a running event loop

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for that.

The __init__ function only uses asyncio.run for callbacks when a child object is provided. I'll first add test cases for this part.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @waleedalzarooni, could you provide a complete test case that demonstrates how to pass the children parameter when constructing Workforce? I couldn’t find any usage of the children parameter in the codebase. A minimal, reproducible example would be helpful. Thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem

async def test_workforce_with_children():
      worker = SingleAgentWorker(
          description="Test worker",
          worker=ChatAgent("You are a test agent."),
      )

      workforce = Workforce(
          description="Test workforce",
          children=[worker],
      )

      assert len(workforce._children) == 1
      assert isinstance(workforce._children[0], SingleAgentWorker)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@coolbeevip
Copy link
Collaborator Author

Great integration @coolbeevip!,

just a couple comments on my side, also additionally there is an example file that still uses the old synchronous syntax, here is some guidance on what to change:

The file examples/workforce/simple_workforce_mcp.py was NOT updated in this PR but calls the now-async method:

# Line 47 in examples/workforce/simple_workforce_mcp.py
def main():  #  Synchronous function
    workforce = Workforce(description="Simple Analysis Team")
    analyst = ChatAgent(system_message=analyst_msg)
    workforce.add_single_agent_worker(  #  Calling async method without await
        description="General purpose analyst", worker=analyst

Thanks for the reminder. done

@coolbeevip coolbeevip force-pushed the issue-3352 branch 3 times, most recently from 29e6b91 to c55c18c Compare November 26, 2025 15:42
Copy link
Collaborator

@waleedalzarooni waleedalzarooni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Review Required PR need to be reviewed

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

[Feature Request] Refactor WorkforceCallback and all related callbacks to async interface

6 participants