Refactor: Remove humanInput parameter from determineNodesToIgnore function

- Updated the determineNodesToIgnore function to eliminate the humanInput parameter, simplifying the logic for decision nodes.
- Adjusted related function calls to ensure compatibility with the new signature.
- Enhanced handling of humanInput within the executeNode function, ensuring it is cleared after consumption to prevent unintended behavior in subsequent nodes.
This commit is contained in:
Henry 2025-06-24 16:59:12 +01:00
parent 4038eb13fc
commit b9d896ed2e
1 changed files with 16 additions and 6 deletions

View File

@ -558,7 +558,6 @@ function hasReceivedRequiredInputs(waitingNode: IWaitingNode): boolean {
async function determineNodesToIgnore( async function determineNodesToIgnore(
currentNode: IReactFlowNode, currentNode: IReactFlowNode,
result: any, result: any,
humanInput: IHumanInput | undefined,
edges: IReactFlowEdge[], edges: IReactFlowEdge[],
nodeId: string nodeId: string
): Promise<string[]> { ): Promise<string[]> {
@ -568,7 +567,7 @@ async function determineNodesToIgnore(
const isDecisionNode = const isDecisionNode =
currentNode.data.name === 'conditionAgentflow' || currentNode.data.name === 'conditionAgentflow' ||
currentNode.data.name === 'conditionAgentAgentflow' || currentNode.data.name === 'conditionAgentAgentflow' ||
(currentNode.data.name === 'humanInputAgentflow' && humanInput) currentNode.data.name === 'humanInputAgentflow'
if (isDecisionNode && result.output?.conditions) { if (isDecisionNode && result.output?.conditions) {
const outputConditions: ICondition[] = result.output.conditions const outputConditions: ICondition[] = result.output.conditions
@ -619,7 +618,7 @@ async function processNodeOutputs({
if (!currentNode) return { humanInput: updatedHumanInput } if (!currentNode) return { humanInput: updatedHumanInput }
// Get nodes to ignore based on conditions // Get nodes to ignore based on conditions
const ignoreNodeIds = await determineNodesToIgnore(currentNode, result, humanInput, edges, nodeId) const ignoreNodeIds = await determineNodesToIgnore(currentNode, result, edges, nodeId)
if (ignoreNodeIds.length) { if (ignoreNodeIds.length) {
logger.debug(` ⏭️ Skipping nodes: [${ignoreNodeIds.join(', ')}]`) logger.debug(` ⏭️ Skipping nodes: [${ignoreNodeIds.join(', ')}]`)
} }
@ -838,6 +837,7 @@ const executeNode = async ({
result: any result: any
shouldStop?: boolean shouldStop?: boolean
agentFlowExecutedData?: IAgentflowExecutedData[] agentFlowExecutedData?: IAgentflowExecutedData[]
humanInput?: IHumanInput
}> => { }> => {
try { try {
if (abortController?.signal?.aborted) { if (abortController?.signal?.aborted) {
@ -910,6 +910,7 @@ const executeNode = async ({
// Handle human input if present // Handle human input if present
let humanInputAction: Record<string, any> | undefined let humanInputAction: Record<string, any> | undefined
let updatedHumanInput = humanInput
if (agentFlowExecutedData.length) { if (agentFlowExecutedData.length) {
const lastNodeOutput = agentFlowExecutedData[agentFlowExecutedData.length - 1]?.data?.output as ICommonObject | undefined const lastNodeOutput = agentFlowExecutedData[agentFlowExecutedData.length - 1]?.data?.output as ICommonObject | undefined
@ -921,6 +922,10 @@ const executeNode = async ({
reactFlowNodeData.inputs = { ...reactFlowNodeData.inputs, humanInput } reactFlowNodeData.inputs = { ...reactFlowNodeData.inputs, humanInput }
// Remove the stopped humanInput from execution data // Remove the stopped humanInput from execution data
agentFlowExecutedData = agentFlowExecutedData.filter((execData) => execData.nodeId !== nodeId) agentFlowExecutedData = agentFlowExecutedData.filter((execData) => execData.nodeId !== nodeId)
// Clear humanInput after it's been consumed to prevent subsequent humanInputAgentflow nodes from proceeding
logger.debug(`🧹 Clearing humanInput after consumption by node: ${nodeId}`)
updatedHumanInput = undefined
} }
// Check if this is the last node for streaming purpose // Check if this is the last node for streaming purpose
@ -1165,7 +1170,7 @@ const executeNode = async ({
sseStreamer?.streamActionEvent(chatId, humanInputAction) sseStreamer?.streamActionEvent(chatId, humanInputAction)
return { result: results, shouldStop: true, agentFlowExecutedData } return { result: results, shouldStop: true, agentFlowExecutedData, humanInput: updatedHumanInput }
} }
// Stop going through the current route if the node is a agent node waiting for human input before using the tool // Stop going through the current route if the node is a agent node waiting for human input before using the tool
@ -1212,10 +1217,10 @@ const executeNode = async ({
sseStreamer?.streamActionEvent(chatId, humanInputAction) sseStreamer?.streamActionEvent(chatId, humanInputAction)
return { result: results, shouldStop: true, agentFlowExecutedData } return { result: results, shouldStop: true, agentFlowExecutedData, humanInput: updatedHumanInput }
} }
return { result: results, agentFlowExecutedData } return { result: results, agentFlowExecutedData, humanInput: updatedHumanInput }
} catch (error) { } catch (error) {
logger.error(`[server]: Error executing node ${nodeId}: ${getErrorMessage(error)}`) logger.error(`[server]: Error executing node ${nodeId}: ${getErrorMessage(error)}`)
throw error throw error
@ -1695,6 +1700,11 @@ export const executeAgentFlow = async ({
agentFlowExecutedData = executionResult.agentFlowExecutedData agentFlowExecutedData = executionResult.agentFlowExecutedData
} }
// Update humanInput if it was cleared by the executed node
if (executionResult.humanInput !== currentHumanInput) {
currentHumanInput = executionResult.humanInput
}
if (executionResult.shouldStop) { if (executionResult.shouldStop) {
status = 'STOPPED' status = 'STOPPED'
break break