From a430284aa21e3ae1f0d5654e55b2ad2852519cc2 Mon Sep 17 00:00:00 2001 From: wwf <yearningwang@iqtogether.com> Date: 星期三, 04 六月 2025 15:17:49 +0800 Subject: [PATCH] 初始化 --- app/components/workflow/hooks/use-workflow-run.ts | 784 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 778 insertions(+), 6 deletions(-) diff --git a/app/components/workflow/hooks/use-workflow-run.ts b/app/components/workflow/hooks/use-workflow-run.ts index 05a60eb..53a6b58 100644 --- a/app/components/workflow/hooks/use-workflow-run.ts +++ b/app/components/workflow/hooks/use-workflow-run.ts @@ -1,11 +1,783 @@ -import { useHooksStore } from '@/app/components/workflow/hooks-store' +import { useCallback } from 'react' +import { + useReactFlow, + useStoreApi, +} from 'reactflow' +import produce from 'immer' +import { v4 as uuidV4 } from 'uuid' +import { usePathname } from 'next/navigation' +import { useWorkflowStore } from '../store' +import { useNodesSyncDraft } from '../hooks' +import { + BlockEnum, + NodeRunningStatus, + WorkflowRunningStatus, +} from '../types' +import { DEFAULT_ITER_TIMES } from '../constants' +import { useWorkflowUpdate } from './use-workflow-interactions' +import { useStore as useAppStore } from '@/app/components/app/store' +import type { IOtherOptions } from '@/service/base' +import { ssePost } from '@/service/base' +import { stopWorkflowRun } from '@/service/workflow' +import { useFeaturesStore } from '@/app/components/base/features/hooks' +import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player.manager' +import { + getFilesInLogs, +} from '@/app/components/base/file-uploader/utils' +import { ErrorHandleTypeEnum } from '@/app/components/workflow/nodes/_base/components/error-handle/types' +import type { NodeTracing, VersionHistory } from '@/types/workflow' export const useWorkflowRun = () => { - const handleBackupDraft = useHooksStore(s => s.handleBackupDraft) - const handleLoadBackupDraft = useHooksStore(s => s.handleLoadBackupDraft) - const handleRestoreFromPublishedWorkflow = useHooksStore(s => s.handleRestoreFromPublishedWorkflow) - const handleRun = useHooksStore(s => s.handleRun) - const handleStopRun = useHooksStore(s => s.handleStopRun) + const store = useStoreApi() + const workflowStore = useWorkflowStore() + const reactflow = useReactFlow() + const featuresStore = useFeaturesStore() + const { doSyncWorkflowDraft } = useNodesSyncDraft() + const { handleUpdateWorkflowCanvas } = useWorkflowUpdate() + const pathname = usePathname() + + const handleBackupDraft = useCallback(() => { + const { + getNodes, + edges, + } = store.getState() + const { getViewport } = reactflow + const { + backupDraft, + setBackupDraft, + environmentVariables, + } = workflowStore.getState() + const { features } = featuresStore!.getState() + + if (!backupDraft) { + setBackupDraft({ + nodes: getNodes(), + edges, + viewport: getViewport(), + features, + environmentVariables, + }) + doSyncWorkflowDraft() + } + }, [reactflow, workflowStore, store, featuresStore, doSyncWorkflowDraft]) + + const handleLoadBackupDraft = useCallback(() => { + const { + backupDraft, + setBackupDraft, + setEnvironmentVariables, + } = workflowStore.getState() + + if (backupDraft) { + const { + nodes, + edges, + viewport, + features, + environmentVariables, + } = backupDraft + handleUpdateWorkflowCanvas({ + nodes, + edges, + viewport, + }) + setEnvironmentVariables(environmentVariables) + featuresStore!.setState({ features }) + setBackupDraft(undefined) + } + }, [handleUpdateWorkflowCanvas, workflowStore, featuresStore]) + + const handleRun = useCallback(async ( + params: any, + callback?: IOtherOptions, + ) => { + const { + getNodes, + setNodes, + } = store.getState() + const newNodes = produce(getNodes(), (draft) => { + draft.forEach((node) => { + node.data.selected = false + node.data._runningStatus = undefined + }) + }) + setNodes(newNodes) + await doSyncWorkflowDraft() + + const { + onWorkflowStarted, + onWorkflowFinished, + onNodeStarted, + onNodeFinished, + onIterationStart, + onIterationNext, + onIterationFinish, + onNodeRetry, + onError, + ...restCallback + } = callback || {} + workflowStore.setState({ historyWorkflowData: undefined }) + const appDetail = useAppStore.getState().appDetail + const workflowContainer = document.getElementById('workflow-container') + + const { + clientWidth, + clientHeight, + } = workflowContainer! + + let url = '' + if (appDetail?.mode === 'advanced-chat') + url = `/apps/${appDetail.id}/advanced-chat/workflows/draft/run` + + if (appDetail?.mode === 'workflow') + url = `/apps/${appDetail.id}/workflows/draft/run` + + let prevNodeId = '' + + const { + setWorkflowRunningData, + } = workflowStore.getState() + setWorkflowRunningData({ + result: { + status: WorkflowRunningStatus.Running, + }, + tracing: [], + resultText: '', + }) + + let ttsUrl = '' + let ttsIsPublic = false + if (params.token) { + ttsUrl = '/text-to-audio' + ttsIsPublic = true + } + else if (params.appId) { + if (pathname.search('explore/installed') > -1) + ttsUrl = `/installed-apps/${params.appId}/text-to-audio` + else + ttsUrl = `/apps/${params.appId}/text-to-audio` + } + const player = AudioPlayerManager.getInstance().getAudioPlayer(ttsUrl, ttsIsPublic, uuidV4(), 'none', 'none', (_: any): any => {}) + + ssePost( + url, + { + body: params, + }, + { + onWorkflowStarted: (params) => { + const { task_id, data } = params + const { + workflowRunningData, + setWorkflowRunningData, + setIterParallelLogMap, + } = workflowStore.getState() + const { + getNodes, + setNodes, + edges, + setEdges, + } = store.getState() + setIterParallelLogMap(new Map()) + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.task_id = task_id + draft.result = { + ...draft?.result, + ...data, + status: WorkflowRunningStatus.Running, + } + })) + const nodes = getNodes() + const newNodes = produce(nodes, (draft) => { + draft.forEach((node) => { + node.data._waitingRun = true + node.data._runningBranchId = undefined + }) + }) + setNodes(newNodes) + const newEdges = produce(edges, (draft) => { + draft.forEach((edge) => { + edge.data = { + ...edge.data, + _sourceRunningStatus: undefined, + _targetRunningStatus: undefined, + _waitingRun: true, + } + }) + }) + setEdges(newEdges) + + if (onWorkflowStarted) + onWorkflowStarted(params) + }, + onWorkflowFinished: (params) => { + const { data } = params + const { + workflowRunningData, + setWorkflowRunningData, + } = workflowStore.getState() + + const isStringOutput = data.outputs && Object.keys(data.outputs).length === 1 && typeof data.outputs[Object.keys(data.outputs)[0]] === 'string' + + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.result = { + ...draft.result, + ...data, + files: getFilesInLogs(data.outputs), + } as any + if (isStringOutput) { + draft.resultTabActive = true + draft.resultText = data.outputs[Object.keys(data.outputs)[0]] + } + })) + + prevNodeId = '' + + if (onWorkflowFinished) + onWorkflowFinished(params) + }, + onError: (params) => { + const { + workflowRunningData, + setWorkflowRunningData, + } = workflowStore.getState() + + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.result = { + ...draft.result, + status: WorkflowRunningStatus.Failed, + } + })) + + if (onError) + onError(params) + }, + onNodeStarted: (params) => { + const { data } = params + const { + workflowRunningData, + setWorkflowRunningData, + iterParallelLogMap, + setIterParallelLogMap, + } = workflowStore.getState() + const { + getNodes, + setNodes, + edges, + setEdges, + transform, + } = store.getState() + const nodes = getNodes() + const node = nodes.find(node => node.id === data.node_id) + if (node?.parentId) { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const iterations = tracing.find(trace => trace.node_id === node?.parentId) + const currIteration = iterations?.details![node.data.iteration_index] || iterations?.details![iterations.details!.length - 1] + if (!data.parallel_run_id) { + currIteration?.push({ + ...data, + status: NodeRunningStatus.Running, + } as any) + } + else { + const nodeId = iterations?.node_id as string + if (!iterParallelLogMap.has(nodeId as string)) + iterParallelLogMap.set(iterations?.node_id as string, new Map()) + + const currentIterLogMap = iterParallelLogMap.get(nodeId)! + if (!currentIterLogMap.has(data.parallel_run_id)) + currentIterLogMap.set(data.parallel_run_id, [{ ...data, status: NodeRunningStatus.Running } as any]) + else + currentIterLogMap.get(data.parallel_run_id)!.push({ ...data, status: NodeRunningStatus.Running } as any) + setIterParallelLogMap(iterParallelLogMap) + if (iterations) + iterations.details = Array.from(currentIterLogMap.values()) + } + })) + } + else { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.tracing!.push({ + ...data, + status: NodeRunningStatus.Running, + } as any) + })) + + const { + setViewport, + } = reactflow + const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id) + const currentNode = nodes[currentNodeIndex] + const position = currentNode.position + const zoom = transform[2] + + if (!currentNode.parentId) { + setViewport({ + x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom, + y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom, + zoom: transform[2], + }) + } + const newNodes = produce(nodes, (draft) => { + draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running + draft[currentNodeIndex].data._waitingRun = false + }) + setNodes(newNodes) + const newEdges = produce(edges, (draft) => { + const incomeEdges = draft.filter((edge) => { + return edge.target === data.node_id + }) + + incomeEdges.forEach((edge) => { + const incomeNode = nodes.find(node => node.id === edge.source)! + if ( + (!incomeNode.data._runningBranchId && edge.sourceHandle === 'source') + || (incomeNode.data._runningBranchId && edge.sourceHandle === incomeNode.data._runningBranchId) + ) { + edge.data = { + ...edge.data, + _sourceRunningStatus: incomeNode.data._runningStatus, + _targetRunningStatus: NodeRunningStatus.Running, + _waitingRun: false, + } + } + }) + }) + setEdges(newEdges) + } + if (onNodeStarted) + onNodeStarted(params) + }, + onNodeFinished: (params) => { + const { data } = params + const { + workflowRunningData, + setWorkflowRunningData, + iterParallelLogMap, + setIterParallelLogMap, + } = workflowStore.getState() + const { + getNodes, + setNodes, + edges, + setEdges, + } = store.getState() + const nodes = getNodes() + const nodeParentId = nodes.find(node => node.id === data.node_id)!.parentId + if (nodeParentId) { + if (!data.execution_metadata.parallel_mode_run_id) { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node + + if (iterations && iterations.details) { + const iterationIndex = data.execution_metadata?.iteration_index || 0 + if (!iterations.details[iterationIndex]) + iterations.details[iterationIndex] = [] + + const currIteration = iterations.details[iterationIndex] + const nodeIndex = currIteration.findIndex(node => + node.node_id === data.node_id && ( + node.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || node.parallel_id === data.execution_metadata?.parallel_id), + ) + if (nodeIndex !== -1) { + currIteration[nodeIndex] = { + ...currIteration[nodeIndex], + ...(currIteration[nodeIndex].retryDetail + ? { retryDetail: currIteration[nodeIndex].retryDetail } + : {}), + ...data, + } as any + } + else { + currIteration.push({ + ...data, + } as any) + } + } + })) + } + else { + // open parallel mode + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node + + if (iterations && iterations.details) { + const iterRunID = data.execution_metadata?.parallel_mode_run_id + + const currIteration = iterParallelLogMap.get(iterations.node_id)?.get(iterRunID) + const nodeIndex = currIteration?.findIndex(node => + node.node_id === data.node_id && ( + node?.parallel_run_id === data.execution_metadata?.parallel_mode_run_id), + ) + if (currIteration) { + if (nodeIndex !== undefined && nodeIndex !== -1) { + currIteration[nodeIndex] = { + ...currIteration[nodeIndex], + ...data, + } as any + } + else { + currIteration.push({ + ...data, + } as any) + } + } + setIterParallelLogMap(iterParallelLogMap) + const iterLogMap = iterParallelLogMap.get(iterations.node_id) + if (iterLogMap) + iterations.details = Array.from(iterLogMap.values()) + } + })) + } + } + else { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const currentIndex = draft.tracing!.findIndex((trace) => { + if (!trace.execution_metadata?.parallel_id) + return trace.node_id === data.node_id + return trace.node_id === data.node_id && trace.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id + }) + if (currentIndex > -1 && draft.tracing) { + draft.tracing[currentIndex] = { + ...data, + ...(draft.tracing[currentIndex].extras + ? { extras: draft.tracing[currentIndex].extras } + : {}), + ...(draft.tracing[currentIndex].retryDetail + ? { retryDetail: draft.tracing[currentIndex].retryDetail } + : {}), + } as any + } + })) + const newNodes = produce(nodes, (draft) => { + const currentNode = draft.find(node => node.id === data.node_id)! + currentNode.data._runningStatus = data.status as any + if (data.status === NodeRunningStatus.Exception) { + if (data.execution_metadata.error_strategy === ErrorHandleTypeEnum.failBranch) + currentNode.data._runningBranchId = ErrorHandleTypeEnum.failBranch + } + else { + if (data.node_type === BlockEnum.IfElse) + currentNode.data._runningBranchId = data?.outputs?.selected_case_id + + if (data.node_type === BlockEnum.QuestionClassifier) + currentNode.data._runningBranchId = data?.outputs?.class_id + } + }) + setNodes(newNodes) + const newEdges = produce(edges, (draft) => { + const incomeEdges = draft.filter((edge) => { + return edge.target === data.node_id + }) + incomeEdges.forEach((edge) => { + edge.data = { + ...edge.data, + _targetRunningStatus: data.status as any, + } + }) + }) + setEdges(newEdges) + prevNodeId = data.node_id + } + + if (onNodeFinished) + onNodeFinished(params) + }, + onIterationStart: (params) => { + const { data } = params + const { + workflowRunningData, + setWorkflowRunningData, + setIterTimes, + } = workflowStore.getState() + const { + getNodes, + setNodes, + edges, + setEdges, + transform, + } = store.getState() + const nodes = getNodes() + setIterTimes(DEFAULT_ITER_TIMES) + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.tracing!.push({ + ...data, + status: NodeRunningStatus.Running, + details: [], + iterDurationMap: {}, + } as any) + })) + + const { + setViewport, + } = reactflow + const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id) + const currentNode = nodes[currentNodeIndex] + const position = currentNode.position + const zoom = transform[2] + + if (!currentNode.parentId) { + setViewport({ + x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom, + y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom, + zoom: transform[2], + }) + } + const newNodes = produce(nodes, (draft) => { + draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running + draft[currentNodeIndex].data._iterationLength = data.metadata.iterator_length + draft[currentNodeIndex].data._waitingRun = false + }) + setNodes(newNodes) + const newEdges = produce(edges, (draft) => { + const incomeEdges = draft.filter(edge => edge.target === data.node_id) + + incomeEdges.forEach((edge) => { + edge.data = { + ...edge.data, + _sourceRunningStatus: nodes.find(node => node.id === edge.source)!.data._runningStatus, + _targetRunningStatus: NodeRunningStatus.Running, + _waitingRun: false, + } + }) + }) + setEdges(newEdges) + + if (onIterationStart) + onIterationStart(params) + }, + onIterationNext: (params) => { + const { + workflowRunningData, + setWorkflowRunningData, + iterTimes, + setIterTimes, + } = workflowStore.getState() + + const { data } = params + const { + getNodes, + setNodes, + } = store.getState() + + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const iteration = draft.tracing!.find(trace => trace.node_id === data.node_id) + if (iteration) { + if (iteration.iterDurationMap && data.duration) + iteration.iterDurationMap[data.parallel_mode_run_id ?? `${data.index - 1}`] = data.duration + if (iteration.details!.length >= iteration.metadata.iterator_length!) + return + } + if (!data.parallel_mode_run_id) + iteration?.details!.push([]) + })) + const nodes = getNodes() + const newNodes = produce(nodes, (draft) => { + const currentNode = draft.find(node => node.id === data.node_id)! + currentNode.data._iterationIndex = iterTimes + setIterTimes(iterTimes + 1) + }) + setNodes(newNodes) + + if (onIterationNext) + onIterationNext(params) + }, + onIterationFinish: (params) => { + const { data } = params + + const { + workflowRunningData, + setWorkflowRunningData, + setIterTimes, + } = workflowStore.getState() + const { + getNodes, + setNodes, + } = store.getState() + const nodes = getNodes() + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const currIterationNode = tracing.find(trace => trace.node_id === data.node_id) + if (currIterationNode) { + Object.assign(currIterationNode, { + ...data, + status: NodeRunningStatus.Succeeded, + }) + } + })) + setIterTimes(DEFAULT_ITER_TIMES) + const newNodes = produce(nodes, (draft) => { + const currentNode = draft.find(node => node.id === data.node_id)! + + currentNode.data._runningStatus = data.status + }) + setNodes(newNodes) + + prevNodeId = data.node_id + + if (onIterationFinish) + onIterationFinish(params) + }, + onNodeRetry: (params) => { + const { data } = params + const { + workflowRunningData, + setWorkflowRunningData, + iterParallelLogMap, + setIterParallelLogMap, + } = workflowStore.getState() + const { + getNodes, + setNodes, + } = store.getState() + + const nodes = getNodes() + const currentNode = nodes.find(node => node.id === data.node_id)! + const nodeParent = nodes.find(node => node.id === currentNode.parentId) + if (nodeParent) { + if (!data.execution_metadata.parallel_mode_run_id) { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const iteration = tracing.find(trace => trace.node_id === nodeParent.id) + + if (iteration && iteration.details?.length) { + const currentNodeRetry = iteration.details[nodeParent.data._iterationIndex - 1]?.find(item => item.node_id === data.node_id) + + if (currentNodeRetry) { + if (currentNodeRetry?.retryDetail) + currentNodeRetry?.retryDetail.push(data as NodeTracing) + else + currentNodeRetry.retryDetail = [data as NodeTracing] + } + } + })) + } + else { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const iteration = tracing.find(trace => trace.node_id === nodeParent.id) + + if (iteration && iteration.details?.length) { + const iterRunID = data.execution_metadata?.parallel_mode_run_id + + const currIteration = iterParallelLogMap.get(iteration.node_id)?.get(iterRunID) + const currentNodeRetry = currIteration?.find(item => item.node_id === data.node_id) + + if (currentNodeRetry) { + if (currentNodeRetry?.retryDetail) + currentNodeRetry?.retryDetail.push(data as NodeTracing) + else + currentNodeRetry.retryDetail = [data as NodeTracing] + } + setIterParallelLogMap(iterParallelLogMap) + const iterLogMap = iterParallelLogMap.get(iteration.node_id) + if (iterLogMap) + iteration.details = Array.from(iterLogMap.values()) + } + })) + } + } + else { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const currentRetryNodeIndex = tracing.findIndex(trace => trace.node_id === data.node_id) + + if (currentRetryNodeIndex > -1) { + const currentRetryNode = tracing[currentRetryNodeIndex] + if (currentRetryNode.retryDetail) + draft.tracing![currentRetryNodeIndex].retryDetail!.push(data as NodeTracing) + else + draft.tracing![currentRetryNodeIndex].retryDetail = [data as NodeTracing] + } + })) + } + const newNodes = produce(nodes, (draft) => { + const currentNode = draft.find(node => node.id === data.node_id)! + + currentNode.data._retryIndex = data.retry_index + }) + setNodes(newNodes) + + if (onNodeRetry) + onNodeRetry(params) + }, + onParallelBranchStarted: (params) => { + // console.log(params, 'parallel start') + }, + onParallelBranchFinished: (params) => { + // console.log(params, 'finished') + }, + onTextChunk: (params) => { + const { data: { text } } = params + const { + workflowRunningData, + setWorkflowRunningData, + } = workflowStore.getState() + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.resultTabActive = true + draft.resultText += text + })) + }, + onTextReplace: (params) => { + const { data: { text } } = params + const { + workflowRunningData, + setWorkflowRunningData, + } = workflowStore.getState() + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.resultText = text + })) + }, + onTTSChunk: (messageId: string, audio: string, audioType?: string) => { + if (!audio || audio === '') + return + player.playAudioWithAudio(audio, true) + AudioPlayerManager.getInstance().resetMsgId(messageId) + }, + onTTSEnd: (messageId: string, audio: string, audioType?: string) => { + player.playAudioWithAudio(audio, false) + }, + ...restCallback, + }, + ) + }, [store, reactflow, workflowStore, doSyncWorkflowDraft]) + + const handleStopRun = useCallback((taskId: string) => { + const appId = useAppStore.getState().appDetail?.id + + stopWorkflowRun(`/apps/${appId}/workflow-runs/tasks/${taskId}/stop`) + }, []) + + const handleRestoreFromPublishedWorkflow = useCallback((publishedWorkflow: VersionHistory) => { + const nodes = publishedWorkflow.graph.nodes.map(node => ({ ...node, selected: false, data: { ...node.data, selected: false } })) + const edges = publishedWorkflow.graph.edges + const viewport = publishedWorkflow.graph.viewport! + handleUpdateWorkflowCanvas({ + nodes, + edges, + viewport, + }) + const mappedFeatures = { + opening: { + enabled: !!publishedWorkflow.features.opening_statement || !!publishedWorkflow.features.suggested_questions.length, + opening_statement: publishedWorkflow.features.opening_statement, + suggested_questions: publishedWorkflow.features.suggested_questions, + }, + suggested: publishedWorkflow.features.suggested_questions_after_answer, + text2speech: publishedWorkflow.features.text_to_speech, + speech2text: publishedWorkflow.features.speech_to_text, + citation: publishedWorkflow.features.retriever_resource, + moderation: publishedWorkflow.features.sensitive_word_avoidance, + file: publishedWorkflow.features.file_upload, + } + + featuresStore?.setState({ features: mappedFeatures }) + workflowStore.getState().setPublishedAt(publishedWorkflow.created_at) + workflowStore.getState().setEnvironmentVariables(publishedWorkflow.environment_variables || []) + }, [featuresStore, handleUpdateWorkflowCanvas, workflowStore]) return { handleBackupDraft, -- Gitblit v1.8.0