- 
                Notifications
    
You must be signed in to change notification settings  - Fork 2.8k
 
Human-in-the-Loop Implementation #2021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This commit adds the foundational components for human-in-the-loop functionality in the Python OpenAI Agents SDK, matching the TypeScript implementation. **Completed Components:** 1. **Tool Approval Field** (tool.py) - Added `needs_approval` field to FunctionTool - Supports boolean or callable (dynamic approval) - Updated function_tool() decorator 2. **ToolApprovalItem Class** (items.py) - New item type for tool calls requiring approval - Added to RunItem union type 3. **Approval Tracking** (run_context.py) - Created ApprovalRecord class - Added approval infrastructure to RunContextWrapper - Methods: is_tool_approved(), approve_tool(), reject_tool() - Supports individual and permanent approvals/rejections 4. **RunState Class** (run_state.py) - NEW FILE - Complete serialization/deserialization support - approve() and reject() methods - get_interruptions() method - Agent map building for name resolution - 567 lines of serialization logic 5. **Interruptions Support** (result.py) - Added interruptions field to RunResultBase - Will contain ToolApprovalItem instances when paused 6. **NextStepInterruption** (run_state.py) - New step type for representing interruptions **Remaining Work:** 1. Add NextStepInterruption to NextStep union in _run_impl.py 2. Implement tool approval checking in run execution 3. Update run methods to accept RunState 4. Add comprehensive tests 5. Update documentation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit integrates the human-in-the-loop infrastructure into the
actual run execution flow, making tool approval functional.
**Changes:**
1. **NextStepInterruption Type** (_run_impl.py:205-210)
   - Added NextStepInterruption dataclass
   - Includes interruptions list (ToolApprovalItems)
   - Added to NextStep union type
2. **ProcessedResponse Enhancement** (_run_impl.py:167-192)
   - Added interruptions field
   - Added has_interruptions() method
3. **Tool Approval Checking** (_run_impl.py:773-848)
   - Check needs_approval before tool execution
   - Support dynamic approval functions
   - If approval needed:
     * Check approval status via context
     * If None: Create ToolApprovalItem, return for interruption
     * If False: Return rejection message
     * If True: Continue with execution
4. **Interruption Handling** (_run_impl.py:311-333)
   - After tool execution, check for ToolApprovalItems
   - If found, create NextStepInterruption and return immediately
   - Prevents execution of remaining tools when approval pending
**Flow:**
Tool Call → Check needs_approval → Check approval status →
  If None: Create interruption, pause run →
  User approves/rejects → Resume run →
  If approved: Execute tool
  If rejected: Return rejection message
**Remaining Work:**
- Update Runner.run() to accept RunState
- Handle interruptions in result creation
- Add tests
- Add documentation/examples
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
    This commit integrates RunState into the Runner API, allowing runs to be
resumed from a saved state. This is the final piece needed to make
human-in-the-loop (HITL) tool approval fully functional.
**Changes:**
1. **Import NextStepInterruption** (run.py:21-32)
   - Added NextStepInterruption to imports from _run_impl
   - Added RunState import
2. **Updated Method Signatures** (run.py:285-444)
   - Runner.run(): Added `RunState[TContext]` to input union type
   - Runner.run_sync(): Added `RunState[TContext]` to input union type
   - Runner.run_streamed(): Added `RunState[TContext]` to input union type
   - AgentRunner.run(): Added `RunState[TContext]` to input union type
   - AgentRunner.run_sync(): Added `RunState[TContext]` to input union type
   - AgentRunner.run_streamed(): Added `RunState[TContext]` to input union type
3. **RunState Resumption Logic** (run.py:524-584)
   - Check if input is RunState instance
   - Extract state fields when resuming: current_turn, original_input,
     generated_items, model_responses, context_wrapper
   - Prime server conversation tracker from model_responses if resuming
   - Cast context_wrapper to correct type after extraction
4. **Interruption Handling** (run.py:689-726)
   - Added `interruptions=[]` to successful RunResult creation
   - Added elif branch for NextStepInterruption
   - Return RunResult with interruptions when tool approval needed
   - Set final_output to None for interrupted runs
5. **RunResultStreaming Support** (run.py:879-918)
   - Handle RunState input for streaming runs
   - Added `interruptions=[]` field to RunResultStreaming creation
   - Extract original_input from RunState for result
**How It Works:**
When resuming from RunState:
```python
# User approves/rejects tool calls on the state
run_state.approve(approval_item)
# Resume the run from where it left off
result = await Runner.run(agent, run_state)
```
When a tool needs approval:
1. Run pauses at tool execution
2. Returns RunResult with interruptions=[ToolApprovalItem(...)]
3. User can inspect interruptions and approve/reject
4. User resumes by passing RunResult back to Runner.run()
**Remaining Work:**
- Add `state` property to RunResult for creating RunState from results
- Add comprehensive tests
- Add documentation/examples
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
    This commit adds a method to convert a RunResult back into a RunState,
enabling the resume workflow for interrupted runs.
**Changes:**
1. **to_state() Method** (result.py:125-165)
   - Added method to RunResult class
   - Creates a new RunState from the result's data
   - Populates generated_items, model_responses, and guardrail results
   - Includes comprehensive docstring with usage example
**How to Use:**
```python
# Run agent until it needs approval
result = await Runner.run(agent, "Use the delete_file tool")
if result.interruptions:
    # Convert result to state
    state = result.to_state()
    # Approve the tool call
    state.approve(result.interruptions[0])
    # Resume the run
    result = await Runner.run(agent, state)
```
**Complete HITL Flow:**
1. Run agent with tool that needs_approval=True
2. Run pauses, returns RunResult with interruptions
3. User calls result.to_state() to get RunState
4. User calls state.approve() or state.reject()
5. User passes state back to Runner.run() to resume
6. Run continues from where it left off
**Remaining Work:**
- Add comprehensive tests
- Create example demonstrating HITL
- Add documentation
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
    …mentation This commit completes the human-in-the-loop (HITL) implementation by adding full streaming support, matching the TypeScript SDK functionality. **Streaming HITL Support:** 1. **ToolApprovalItem Handling** (_run_impl.py:67, 1282-1284) - Added ToolApprovalItem to imports - Handle ToolApprovalItem in stream_step_items_to_queue - Prevents "Unexpected item type" errors during streaming 2. **NextStepInterruption in Streaming** (run.py:1222-1226) - Added NextStepInterruption case in streaming turn loop - Sets interruptions and completes stream when approval needed - Matches non-streaming interruption handling 3. **RunState Support in run_streamed** (run.py:890-905) - Added full RunState input handling - Restores context wrapper from RunState - Enables streaming resumption after approval 4. **Streaming Tool Execution** (run.py:1044-1101) - Added run_state parameter to _start_streaming - Execute approved tools when resuming from interruption - Created _execute_approved_tools instance method - Created _execute_approved_tools_static classmethod for streaming 5. **RunResultStreaming.to_state()** (result.py:401-451) - Added to_state() method to RunResultStreaming - Enables state serialization from streaming results - Includes current_turn for proper state restoration - Complete parity with non-streaming RunResult.to_state() **RunState Enhancements:** 6. **Runtime Imports** (run_state.py:108, 238, 369, 461) - Added runtime imports for NextStepInterruption - Fixes NameError when serializing/deserializing interruptions - Keeps TYPE_CHECKING imports for type hints 7. **from_json() Method** (run_state.py:385-475) - Added from_json() static method for dict deserialization - Complements existing from_string() method - Matches TypeScript API: to_json() / from_json() **Examples:** 8. **human_in_the_loop.py** (examples/agent_patterns/) - Complete non-streaming HITL example - Demonstrates state serialization to JSON file - Shows approve/reject workflow with while loop - Matches TypeScript non-streaming example behavior 9. **human_in_the_loop_stream.py** (examples/agent_patterns/) - Complete streaming HITL example - Uses Runner.run_streamed() for streaming output - Shows streaming with interruption handling - Updated docstring to reflect streaming support - Includes while loop for rejection handling - Matches TypeScript streaming example behavior **Key Design Decisions:** - Kept _start_streaming as @classmethod (existing pattern) - Separate instance/classmethod for tool execution (additive only) - No breaking changes to existing functionality - Complete API parity with TypeScript SDK - Rejection returns error message to LLM for retry - While loops in examples handle rejection/retry flow **Testing:** - ✅ Streaming HITL: interruption, approval, resumption - ✅ Non-streaming HITL: interruption, approval, resumption - ✅ State serialization: to_json() / from_json() - ✅ Tool rejection: message returned, retry possible - ✅ Examples: both streaming and non-streaming work - ✅ Code quality: ruff format, ruff check, mypy pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
 - Mark a draft as ready
 - Comment "@codex review".
 
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| previous_response_id: str | None, | ||
| conversation_id: str | None, | ||
| session: Session | None, | ||
| run_state: RunState[TContext] | None = None, | ||
| ): | ||
| if streamed_result.trace: | ||
| streamed_result.trace.start(mark_as_current=True) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
  Prime server tracker when resuming streaming runs
When Runner.run_streamed resumes from a RunState with conversation_id or previous_response_id, _start_streaming constructs a _ServerConversationTracker but never seeds it with the prior model_responses that were already sent. Unlike the synchronous path, no call to track_server_items is made, so prepare_input treats every previously generated item as unsent and resubmits them to the server. This duplicates earlier messages and breaks server-side conversation threading when a run is resumed.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in a56ce0a. Added server conversation tracker priming at lines 1076-1079 to match the non-streaming implementation and prevent message duplication when resuming from RunState.
| previous_response_id: str | None, | ||
| conversation_id: str | None, | ||
| session: Session | None, | ||
| run_state: RunState[TContext] | None = None, | ||
| ): | ||
| if streamed_result.trace: | ||
| streamed_result.trace.start(mark_as_current=True) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
  Streaming resume ignores existing turn count
The streaming execution path always initializes current_turn = 0 when _start_streaming is called, even if a RunState with an existing _current_turn is supplied. The loop then increments from zero, so any turns completed before the interruption are ignored and the max_turns guard is reset. After each interruption, a resumed streaming run can exceed the user’s turn limit and misreport the current turn number.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was already fixed in 74c50fd at line 914: current_turn=run_state._current_turn if run_state else 0. The turn counter is properly restored from the RunState.
…essage duplication When resuming a streaming run from RunState, the server conversation tracker was not being primed with previously sent model responses. This caused `prepare_input` to treat all previously generated items as unsent and resubmit them to the server, breaking conversation threading. **Issue**: Missing `track_server_items` call in streaming resumption path **Fix**: Added server conversation tracker priming logic in `_start_streaming` method (lines 1076-1079) to match the non-streaming path implementation (lines 553-556). The fix iterates through `run_state._model_responses` and calls `track_server_items(response)` to mark them as already sent to the server. **Impact**: Resolves message duplication when resuming interrupted streaming runs, ensuring proper conversation threading with server-side sessions. Fixes code review feedback from PR openai#2021 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
| 
           Thanks for sending this patch! I currently don't have the bandwidth to check this in depth, but one thing I wanted to mention is that, while implementing the sessions feature in openai-agents-js project, I found that the internals of runner need to take various HITL patterns into consideration. There might not be necessary to make those changes in this Python SDK, but sufficient testing for the sessions scenarios is worth doing.  | 
    
Resolves #636.
See #636 (comment).