-
Notifications
You must be signed in to change notification settings - Fork 1.7k
refactor(workforce): Refactor WorkforceCallback and all related callbacks to async interface #3363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
fengju0213
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything else looks great—my only concern is that we should keep backward compatibility (or
provide sync wrappers) so existing code doesn’t break when upgrading.
ask for some comments cc @Wendong-Fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the excellent contribution—the async-first design should definitely help with
throughput and allow callbacks to run I/O without blocking. One question: since the long-
standing public Workforce APIs (e.g., add_single_agent_worker, reset, clone) are now
asynchronous, do we plan to retain the former synchronous entry points (even if just as thin
wrappers) to preserve backward compatibility for existing scripts and examples?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out — sync wrappers can introduce extra maintenance overhead and complexity. I’ll defer to the @Wendong-Fan , any thoughts or preferences on keeping them vs. deprecating?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @coolbeevip,
Would you mind re-requesting a review after a decision is made on whether to include backwards compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@waleedalzarooni @fengju0213 Thanks for the thorough review and pointing out the backward compatibility concern!
I agree this is important to avoid breaking existing code. Since I'm not strongly opinionated here, I'd like to defer to the community's preference:
Options:
-
Add sync wrappers (e.g.,
add_single_agent_worker_sync()that calls the async version under the hood):- Pros: Minimal disruption for users.
- Cons: Extra maintenance overhead (two APIs to keep in sync).
-
Deprecate old sync APIs (with warnings) and guide users to migrate to async:
- Pros: Clean async-first design long-term.
- Cons: Requires users to update code (but we can provide migration guide).
-
Force async-only (no wrappers):
- Pros: Simplest codebase.
- Cons: Breaks all existing sync usage immediately.
What do you think? Any preference? Once we decide, I'll implement and re-request reviews.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @coolbeevip , @fengju0213 and @waleedalzarooni , i would prefer implement Option 2, keeping method() as deprecated sync wrappers and adding method_async() as the primary async API
i added another enhance PR #3464, feel free to review and leave your comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Wendong-Fan. I've merged this PR and updated the related code in examples.
camel/benchmarks/browsecomp.py
Outdated
| input_message, response_format=QueryResponse | ||
| ) | ||
| elif isinstance(pipeline_template, Workforce): | ||
| pipeline = pipeline_template.clone() # type: ignore[assignment] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still calls Workforce.clone() synchronously
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reminder. This line uses '# type: ignore[...]' to suppress mypy checks. I will fix this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
waleedalzarooni
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great integration @coolbeevip!,
just a couple comments on my side, also additionally there is an example file that still uses the old synchronous syntax, here is some guidance on what to change:
The file examples/workforce/simple_workforce_mcp.py was NOT updated in this PR but calls the now-async method:
# Line 47 in examples/workforce/simple_workforce_mcp.py
def main(): # Synchronous function
workforce = Workforce(description="Simple Analysis Team")
analyst = ChatAgent(system_message=analyst_msg)
workforce.add_single_agent_worker( # Calling async method without await
description="General purpose analyst", worker=analyst
| self._child_listening_tasks.append( | ||
| asyncio.create_task(new_node.start()) | ||
| ) | ||
| self._child_listening_tasks.append(await new_node.start()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| self._child_listening_tasks.append(await new_node.start()) | |
| self._child_listening_tasks.append(asyncio.create_task(new_node.start())) |
The start() method is an async coroutine that runs indefinitely - it's a long-running background listener that processes messages from a channel. When you write await new_node.start():
- The execution blocks at this line
- It waits for start() to complete
- But start() never completes - it runs forever!
- The function never returns, so the worker is never added to the list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
|
||
| for child in self._children: | ||
| self._notify_worker_created(child) | ||
| asyncio.run(self._notify_worker_created(child)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| asyncio.run(self._notify_worker_created(child)) | |
| self._notify_worker_created(child) |
asyncio.run() cannot be called from a running event loop
illustration of issue:
async def main():
# This creates an event loop and runs main()
workforce = Workforce(...) #PROBLEM HAPPENS HERE
asyncio.run(main()) # Event loop is already running here
When Workforce.__init__() is called:
def __init__(self):
# ... initialization code ...
for child in self._children:
asyncio.run(self._notify_worker_created(child))
RuntimeError: asyncio.run() cannot be called from a running event loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for that.
The __init__ function only uses asyncio.run for callbacks when a child object is provided. I'll first add test cases for this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @waleedalzarooni, could you provide a complete test case that demonstrates how to pass the children parameter when constructing Workforce? I couldn’t find any usage of the children parameter in the codebase. A minimal, reproducible example would be helpful. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem
async def test_workforce_with_children():
worker = SingleAgentWorker(
description="Test worker",
worker=ChatAgent("You are a test agent."),
)
workforce = Workforce(
description="Test workforce",
children=[worker],
)
assert len(workforce._children) == 1
assert isinstance(workforce._children[0], SingleAgentWorker)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
151326a to
a88f9ce
Compare
Thanks for the reminder. done |
29e6b91 to
c55c18c
Compare
a96e1d5 to
f45163f
Compare
waleedalzarooni
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Description
Fixes #3352
Checklist
Go over all the following points, and put an
xin all the boxes that apply.Fixes #issue-numberin the PR description (required)pyproject.tomlanduv lockIf you are unsure about any of these, don't hesitate to ask. We are here to help!