Compare commits

...

1 Commits

Author SHA1 Message Date
qiuqiua
559547e9bf feat: cherry pick implementation (#31960) 2026-02-05 10:00:47 +08:00
37 changed files with 9729 additions and 1 deletions

View File

@@ -1,4 +1,5 @@
from collections.abc import Sequence
from typing import Any
from flask_restx import Resource
from pydantic import BaseModel, Field
@@ -19,6 +20,7 @@ from core.helper.code_executor.python3.python3_code_provider import Python3CodeP
from core.llm_generator.entities import RuleCodeGeneratePayload, RuleGeneratePayload, RuleStructuredOutputPayload
from core.llm_generator.llm_generator import LLMGenerator
from core.model_runtime.errors.invoke import InvokeError
from core.workflow.generator import WorkflowGenerator
from extensions.ext_database import db
from libs.login import current_account_with_tenant, login_required
from models import App
@@ -41,6 +43,30 @@ class InstructionTemplatePayload(BaseModel):
type: str = Field(..., description="Instruction template type")
class PreviousWorkflow(BaseModel):
"""Previous workflow attempt for regeneration context."""
nodes: list[dict[str, Any]] = Field(default_factory=list, description="Previously generated nodes")
edges: list[dict[str, Any]] = Field(default_factory=list, description="Previously generated edges")
warnings: list[str] = Field(default_factory=list, description="Warnings from previous generation")
class FlowchartGeneratePayload(BaseModel):
instruction: str = Field(..., description="Workflow flowchart generation instruction")
model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration")
available_nodes: list[dict[str, Any]] = Field(default_factory=list, description="Available node types")
existing_nodes: list[dict[str, Any]] = Field(default_factory=list, description="Existing workflow nodes")
existing_edges: list[dict[str, Any]] = Field(default_factory=list, description="Existing workflow edges")
available_tools: list[dict[str, Any]] = Field(default_factory=list, description="Available tools")
selected_node_ids: list[str] = Field(default_factory=list, description="IDs of selected nodes for context")
previous_workflow: PreviousWorkflow | None = Field(default=None, description="Previous workflow for regeneration")
regenerate_mode: bool = Field(default=False, description="Whether this is a regeneration request")
# Language preference for generated content (node titles, descriptions)
language: str | None = Field(default=None, description="Preferred language for generated content")
# Available models that user has configured (for LLM/question-classifier nodes)
available_models: list[dict[str, Any]] = Field(default_factory=list, description="User's configured models")
def reg(cls: type[BaseModel]):
console_ns.schema_model(cls.__name__, cls.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0))
@@ -50,6 +76,7 @@ reg(RuleCodeGeneratePayload)
reg(RuleStructuredOutputPayload)
reg(InstructionGeneratePayload)
reg(InstructionTemplatePayload)
reg(FlowchartGeneratePayload)
reg(ModelConfig)
@@ -240,6 +267,52 @@ class InstructionGenerateApi(Resource):
raise CompletionRequestError(e.description)
@console_ns.route("/flowchart-generate")
class FlowchartGenerateApi(Resource):
@console_ns.doc("generate_workflow_flowchart")
@console_ns.doc(description="Generate workflow flowchart using LLM with intent classification")
@console_ns.expect(console_ns.models[FlowchartGeneratePayload.__name__])
@console_ns.response(200, "Flowchart generated successfully")
@console_ns.response(400, "Invalid request parameters")
@console_ns.response(402, "Provider quota exceeded")
@setup_required
@login_required
@account_initialization_required
def post(self):
args = FlowchartGeneratePayload.model_validate(console_ns.payload)
_, current_tenant_id = current_account_with_tenant()
try:
# Convert PreviousWorkflow to dict if present
previous_workflow_dict = args.previous_workflow.model_dump() if args.previous_workflow else None
result = WorkflowGenerator.generate_workflow_flowchart(
tenant_id=current_tenant_id,
instruction=args.instruction,
model_config=args.model_config_data,
available_nodes=args.available_nodes,
existing_nodes=args.existing_nodes,
existing_edges=args.existing_edges,
available_tools=args.available_tools,
selected_node_ids=args.selected_node_ids,
previous_workflow=previous_workflow_dict,
regenerate_mode=args.regenerate_mode,
preferred_language=args.language,
available_models=args.available_models,
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
except QuotaExceededError:
raise ProviderQuotaExceededError()
except ModelCurrentlyNotSupportError:
raise ProviderModelCurrentlyNotSupportError()
except InvokeError as e:
raise CompletionRequestError(e.description)
return result
@console_ns.route("/instruction-generate/template")
class InstructionGenerationTemplateApi(Resource):
@console_ns.doc("get_instruction_template")

View File

@@ -0,0 +1 @@
from .runner import WorkflowGenerator

View File

@@ -0,0 +1,29 @@
"""
Vibe Workflow Generator Configuration Module.
This module centralizes configuration for the Vibe workflow generation feature,
including node schemas, fallback rules, and response templates.
"""
from core.workflow.generator.config.node_schemas import (
BUILTIN_NODE_SCHEMAS,
FALLBACK_RULES,
FIELD_NAME_CORRECTIONS,
NODE_TYPE_ALIASES,
get_builtin_node_schemas,
get_corrected_field_name,
validate_node_schemas,
)
from core.workflow.generator.config.responses import DEFAULT_SUGGESTIONS, OFF_TOPIC_RESPONSES
__all__ = [
"BUILTIN_NODE_SCHEMAS",
"DEFAULT_SUGGESTIONS",
"FALLBACK_RULES",
"FIELD_NAME_CORRECTIONS",
"NODE_TYPE_ALIASES",
"OFF_TOPIC_RESPONSES",
"get_builtin_node_schemas",
"get_corrected_field_name",
"validate_node_schemas",
]

View File

@@ -0,0 +1,501 @@
"""
Unified Node Configuration for Vibe Workflow Generation.
This module centralizes all node-related configuration:
- Node schemas (parameter definitions)
- Fallback rules (keyword-based node type inference)
- Node type aliases (natural language to canonical type mapping)
- Field name corrections (LLM output normalization)
- Validation utilities
Note: These definitions are the single source of truth.
Frontend has a mirrored copy at web/app/components/workflow/hooks/use-workflow-vibe-config.ts
"""
from typing import Any
# =============================================================================
# NODE SCHEMAS
# =============================================================================
# Built-in node schemas with parameter definitions
# These help the model understand what config each node type requires
_HARDCODED_SCHEMAS: dict[str, dict[str, Any]] = {
"http-request": {
"description": "Send HTTP requests to external APIs or fetch web content",
"required": ["url", "method"],
"parameters": {
"url": {
"type": "string",
"description": "Full URL including protocol (https://...)",
"example": "{{#start.url#}} or https://api.example.com/data",
},
"method": {
"type": "enum",
"options": ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD"],
"description": "HTTP method",
},
"headers": {
"type": "string",
"description": "HTTP headers as newline-separated 'Key: Value' pairs",
"example": "Content-Type: application/json\nAuthorization: Bearer {{#start.api_key#}}",
},
"params": {
"type": "string",
"description": "URL query parameters as newline-separated 'key: value' pairs",
},
"body": {
"type": "object",
"description": "Request body with type field required",
"example": {"type": "none", "data": []},
},
"authorization": {
"type": "object",
"description": "Authorization config",
"example": {"type": "no-auth"},
},
"timeout": {
"type": "number",
"description": "Request timeout in seconds",
"default": 60,
},
},
"outputs": ["body (response content)", "status_code", "headers"],
},
"code": {
"description": "Execute Python or JavaScript code for custom logic",
"required": ["code", "language"],
"parameters": {
"code": {
"type": "string",
"description": "Code to execute. Must define a main() function that returns a dict.",
},
"language": {
"type": "enum",
"options": ["python3", "javascript"],
},
"variables": {
"type": "array",
"description": "Input variables passed to the code",
"item_schema": {"variable": "string", "value_selector": "array"},
},
"outputs": {
"type": "object",
"description": "Output variable definitions",
},
},
"outputs": ["Variables defined in outputs schema"],
},
"llm": {
"description": "Call a large language model for text generation/processing",
"required": ["prompt_template"],
"parameters": {
"model": {
"type": "object",
"description": "Model configuration (provider, name, mode)",
},
"prompt_template": {
"type": "array",
"description": "Messages for the LLM",
"item_schema": {
"role": "enum: system, user, assistant",
"text": "string - message content, can include {{#node_id.field#}} references",
},
},
"context": {
"type": "object",
"description": "Optional context settings",
},
"memory": {
"type": "object",
"description": "Optional memory/conversation settings",
},
},
"outputs": ["text (generated response)"],
},
"if-else": {
"description": "Conditional branching based on conditions",
"required": ["cases"],
"parameters": {
"cases": {
"type": "array",
"description": "List of condition cases. Each case defines when 'true' branch is taken.",
"item_schema": {
"case_id": "string - unique case identifier (e.g., 'case_1')",
"logical_operator": "enum: and, or - how multiple conditions combine",
"conditions": {
"type": "array",
"item_schema": {
"variable_selector": "array of strings - path to variable, e.g. ['node_id', 'field']",
"comparison_operator": (
"enum: =, ≠, >, <, ≥, ≤, contains, not contains, is, is not, empty, not empty"
),
"value": "string or number - value to compare against",
},
},
},
},
},
"outputs": ["Branches: true (first case conditions met), false (else/no case matched)"],
},
"knowledge-retrieval": {
"description": "Query knowledge base for relevant content",
"required": ["query_variable_selector", "dataset_ids"],
"parameters": {
"query_variable_selector": {
"type": "array",
"description": "Path to query variable, e.g. ['start', 'query']",
},
"dataset_ids": {
"type": "array",
"description": "List of knowledge base IDs to search",
},
"retrieval_mode": {
"type": "enum",
"options": ["single", "multiple"],
},
},
"outputs": ["result (retrieved documents)"],
},
"template-transform": {
"description": "Transform data using Jinja2 templates",
"required": ["template", "variables"],
"parameters": {
"template": {
"type": "string",
"description": "Jinja2 template string. Use {{ variable_name }} to reference variables.",
},
"variables": {
"type": "array",
"description": "Input variables defined for the template",
"item_schema": {
"variable": "string - variable name to use in template",
"value_selector": "array - path to source value, e.g. ['start', 'user_input']",
},
},
},
"outputs": ["output (transformed string)"],
},
"variable-aggregator": {
"description": "Aggregate variables from multiple branches",
"required": ["variables"],
"parameters": {
"variables": {
"type": "array",
"description": "List of variable selectors to aggregate",
"item_schema": "array of strings - path to source variable, e.g. ['node_id', 'field']",
},
},
"outputs": ["output (aggregated value)"],
},
"iteration": {
"description": "Loop over array items",
"required": ["iterator_selector"],
"parameters": {
"iterator_selector": {
"type": "array",
"description": "Path to array variable to iterate",
},
},
"outputs": ["item (current iteration item)", "index (current index)"],
},
"parameter-extractor": {
"description": "Extract structured parameters from user input using LLM",
"required": ["query", "parameters"],
"parameters": {
"model": {
"type": "object",
"description": "Model configuration (provider, name, mode)",
},
"query": {
"type": "array",
"description": "Path to input text to extract parameters from, e.g. ['start', 'user_input']",
},
"parameters": {
"type": "array",
"description": "Parameters to extract from the input",
"item_schema": {
"name": "string - parameter name (required)",
"type": (
"enum: string, number, boolean, array[string], array[number], array[object], array[boolean]"
),
"description": "string - description of what to extract (required)",
"required": "boolean - whether this parameter is required (MUST be specified)",
"options": "array of strings (optional) - for enum-like selection",
},
},
"instruction": {
"type": "string",
"description": "Additional instructions for extraction",
},
"reasoning_mode": {
"type": "enum",
"options": ["function_call", "prompt"],
"description": "How to perform extraction (defaults to function_call)",
},
},
"outputs": ["Extracted parameters as defined in parameters array", "__is_success", "__reason"],
},
"question-classifier": {
"description": "Classify user input into predefined categories using LLM",
"required": ["query", "classes"],
"parameters": {
"model": {
"type": "object",
"description": "Model configuration (provider, name, mode)",
},
"query": {
"type": "array",
"description": "Path to input text to classify, e.g. ['start', 'user_input']",
},
"classes": {
"type": "array",
"description": "Classification categories",
"item_schema": {
"id": "string - unique class identifier",
"name": "string - class name/label",
},
},
"instruction": {
"type": "string",
"description": "Additional instructions for classification",
},
},
"outputs": ["class_name (selected class)"],
},
}
def _get_dynamic_schemas() -> dict[str, dict[str, Any]]:
"""
Dynamically load schemas from node classes.
Uses lazy import to avoid circular dependency.
"""
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
schemas = {}
for node_type, version_map in NODE_TYPE_CLASSES_MAPPING.items():
# Get the latest version class
node_cls = version_map.get(LATEST_VERSION)
if not node_cls:
continue
# Get schema from the class
schema = node_cls.get_default_config_schema()
if schema:
schemas[node_type.value] = schema
return schemas
# Cache for built-in schemas (populated on first access)
_builtin_schemas_cache: dict[str, dict[str, Any]] | None = None
def get_builtin_node_schemas() -> dict[str, dict[str, Any]]:
"""
Get the complete set of built-in node schemas.
Combines hardcoded schemas with dynamically loaded ones.
Results are cached after first call.
"""
global _builtin_schemas_cache
if _builtin_schemas_cache is None:
_builtin_schemas_cache = {**_HARDCODED_SCHEMAS, **_get_dynamic_schemas()}
return _builtin_schemas_cache
# For backward compatibility - but use get_builtin_node_schemas() for lazy loading
BUILTIN_NODE_SCHEMAS: dict[str, dict[str, Any]] = _HARDCODED_SCHEMAS.copy()
# =============================================================================
# FALLBACK RULES
# =============================================================================
# Keyword rules for smart fallback detection
# Maps node type to keywords that suggest using that node type as a fallback
FALLBACK_RULES: dict[str, list[str]] = {
"http-request": [
"http",
"url",
"web",
"scrape",
"scraper",
"fetch",
"api",
"request",
"download",
"upload",
"webhook",
"endpoint",
"rest",
"get",
"post",
],
"code": [
"code",
"script",
"calculate",
"compute",
"process",
"transform",
"parse",
"convert",
"format",
"filter",
"sort",
"math",
"logic",
],
"llm": [
"analyze",
"summarize",
"summary",
"extract",
"classify",
"translate",
"generate",
"write",
"rewrite",
"explain",
"answer",
"chat",
],
}
# =============================================================================
# NODE TYPE ALIASES
# =============================================================================
# Node type aliases for inference from natural language
# Maps common terms to canonical node type names
NODE_TYPE_ALIASES: dict[str, str] = {
# Start node aliases
"start": "start",
"begin": "start",
"input": "start",
# End node aliases
"end": "end",
"finish": "end",
"output": "end",
# LLM node aliases
"llm": "llm",
"ai": "llm",
"gpt": "llm",
"model": "llm",
"chat": "llm",
# Code node aliases
"code": "code",
"script": "code",
"python": "code",
"javascript": "code",
# HTTP request node aliases
"http-request": "http-request",
"http": "http-request",
"request": "http-request",
"api": "http-request",
"fetch": "http-request",
"webhook": "http-request",
# Conditional node aliases
"if-else": "if-else",
"condition": "if-else",
"branch": "if-else",
"switch": "if-else",
# Loop node aliases
"iteration": "iteration",
"loop": "loop",
"foreach": "iteration",
# Tool node alias
"tool": "tool",
}
# =============================================================================
# FIELD NAME CORRECTIONS
# =============================================================================
# Field name corrections for LLM-generated node configs
# Maps incorrect field names to correct ones for specific node types
FIELD_NAME_CORRECTIONS: dict[str, dict[str, str]] = {
"http-request": {
"text": "body", # LLM might use "text" instead of "body"
"content": "body",
"response": "body",
},
"code": {
"text": "result", # LLM might use "text" instead of "result"
"output": "result",
},
"llm": {
"response": "text",
"answer": "text",
},
}
def get_corrected_field_name(node_type: str, field: str) -> str:
"""
Get the corrected field name for a node type.
Args:
node_type: The type of the node (e.g., "http-request", "code")
field: The field name to correct
Returns:
The corrected field name, or the original if no correction needed
"""
corrections = FIELD_NAME_CORRECTIONS.get(node_type, {})
return corrections.get(field, field)
# =============================================================================
# VALIDATION UTILITIES
# =============================================================================
# Node types that are internal and don't need schemas for LLM generation
_INTERNAL_NODE_TYPES: set[str] = {
# Internal workflow nodes
"answer", # Internal to chatflow
"loop", # Uses iteration internally
"assigner", # Variable assignment utility
"variable-assigner", # Variable assignment utility
"agent", # Agent node (complex, handled separately)
"document-extractor", # Internal document processing
"list-operator", # Internal list operations
# Iteration internal nodes
"iteration-start", # Internal to iteration loop
"loop-start", # Internal to loop
"loop-end", # Internal to loop
# Trigger nodes (not user-creatable via LLM)
"trigger-plugin", # Plugin trigger
"trigger-schedule", # Scheduled trigger
"trigger-webhook", # Webhook trigger
# Other internal nodes
"datasource", # Data source configuration
"human-input", # Human-in-the-loop node
"knowledge-index", # Knowledge indexing node
}
def validate_node_schemas() -> list[str]:
"""
Validate that all registered node types have corresponding schemas.
This function checks if BUILTIN_NODE_SCHEMAS covers all node types
registered in NODE_TYPE_CLASSES_MAPPING, excluding internal node types.
Returns:
List of warning messages for missing schemas (empty if all valid)
"""
from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING
schemas = get_builtin_node_schemas()
warnings = []
for node_type in NODE_TYPE_CLASSES_MAPPING:
type_value = node_type.value
if type_value in _INTERNAL_NODE_TYPES:
continue
if type_value not in schemas:
warnings.append(f"Missing schema for node type: {type_value}")
return warnings

View File

@@ -0,0 +1,72 @@
"""
Response Templates for Vibe Workflow Generation.
This module defines templates for off-topic responses and default suggestions
to guide users back to workflow-related requests.
"""
# Off-topic response templates for different categories
# Each category has messages in multiple languages
OFF_TOPIC_RESPONSES: dict[str, dict[str, str]] = {
"weather": {
"en": (
"I'm the workflow design assistant - I can't check the weather, "
"but I can help you build AI workflows! For example, I could help you "
"create a workflow that fetches weather data from an API."
),
"zh": "我是工作流设计助手无法查询天气。但我可以帮你创建一个从API获取天气数据的工作流",
},
"math": {
"en": (
"I focus on workflow design rather than calculations. However, "
"if you need calculations in a workflow, I can help you add a Code node "
"that handles math operations!"
),
"zh": "我专注于工作流设计而非计算。但如果您需要在工作流中进行计算,我可以帮您添加一个处理数学运算的代码节点!",
},
"joke": {
"en": (
"While I'd love to share a laugh, I'm specialized in workflow design. "
"How about we create something fun instead - like a workflow that generates jokes using AI?"
),
"zh": "虽然我很想讲笑话但我专门从事工作流设计。不如我们创建一个有趣的东西——比如使用AI生成笑话的工作流",
},
"translation": {
"en": (
"I can't translate directly, but I can help you build a translation workflow! "
"Would you like to create one using an LLM node?"
),
"zh": "我不能直接翻译但我可以帮你构建一个翻译工作流要创建一个使用LLM节点的翻译流程吗",
},
"general_coding": {
"en": (
"I'm specialized in Dify workflow design rather than general coding help. "
"But if you want to add code logic to your workflow, I can help you configure a Code node!"
),
"zh": (
"我专注于Dify工作流设计而非通用编程帮助。但如果您想在工作流中添加代码逻辑我可以帮您配置一个代码节点"
),
},
"default": {
"en": (
"I'm the Dify workflow design assistant. I help create AI automation workflows, "
"but I can't help with general questions. Would you like to create a workflow instead?"
),
"zh": "我是Dify工作流设计助手。我帮助创建AI自动化工作流但无法回答一般性问题。您想创建一个工作流吗",
},
}
# Default suggestions for off-topic requests
# These help guide users towards valid workflow requests
DEFAULT_SUGGESTIONS: dict[str, list[str]] = {
"en": [
"Create a chatbot workflow",
"Build a document summarization pipeline",
"Add email notification to workflow",
],
"zh": [
"创建一个聊天机器人工作流",
"构建文档摘要处理流程",
"添加邮件通知到工作流",
],
}

View File

@@ -0,0 +1,733 @@
# =============================================================================
# NEW FORMAT: depends_on based prompt (for use with GraphBuilder)
# =============================================================================
BUILDER_SYSTEM_PROMPT_V2 = """<role>
You are a Workflow Configuration Engineer.
Your goal is to generate workflow node configurations with dependency declarations.
The graph structure (edges, start/end nodes) will be automatically built from your output.
</role>
<language_rules>
- Detect the language of the user's request automatically (e.g., English, Chinese, Japanese, etc.).
- Generate ALL node titles, descriptions, and user-facing text in the SAME language as the user's input.
- If the input language is ambiguous or cannot be determined (e.g. code-only input),
use {preferred_language} as the target language.
</language_rules>
<inputs>
<plan>
{plan_context}
</plan>
<tool_schemas>
{tool_schemas}
</tool_schemas>
<node_specs>
{builtin_node_specs}
</node_specs>
<available_models>
{available_models}
</available_models>
<workflow_context>
<existing_nodes>
{existing_nodes_context}
</existing_nodes>
<selected_nodes>
{selected_nodes_context}
</selected_nodes>
</workflow_context>
</inputs>
<critical_rules>
1. **DO NOT generate start or end nodes** - they are automatically added
2. **DO NOT generate edges** - they are automatically built from depends_on
3. **Use depends_on array** to declare which nodes must run before this one
4. **Leave depends_on empty []** for nodes that should start immediately (connect to start)
</critical_rules>
<rules>
1. **Configuration**:
- You MUST fill ALL required parameters for every node.
- Use `{{{{#node_id.field#}}}}` syntax to reference outputs from previous nodes in text fields.
2. **Dependency Declaration**:
- Each node has a `depends_on` array listing node IDs that must complete before it runs
- Empty depends_on `[]` means the node runs immediately after start
- Example: `"depends_on": ["fetch_data"]` means this node waits for fetch_data to complete
3. **Variable References**:
- For text fields (like prompts, queries): use string format `{{{{#node_id.field#}}}}`
- Dependencies will be auto-inferred from variable references if not explicitly declared
4. **Tools**:
- ONLY use the tools listed in `<tool_schemas>`.
- If a planned tool is missing from schemas, fallback to `http-request` or `code`.
5. **Model Selection** (CRITICAL):
- For LLM, question-classifier, and parameter-extractor nodes, you MUST include a "model" config.
- You MUST use ONLY models from the `<available_models>` section above.
- Copy the EXACT provider and name values from available_models.
- NEVER use openai/gpt-4o, gpt-3.5-turbo, gpt-4, or any other models unless they appear in available_models.
- If available_models is empty or shows "No models configured", omit the model config entirely.
6. **if-else Branching**:
- Add `true_branch` and `false_branch` in config to specify target node IDs
- Example: `"config": {{"cases": [...], "true_branch": "success_node", "false_branch": "fallback_node"}}`
7. **question-classifier Branching**:
- Add `target` field to each class in the classes array
- Example: `"classes": [{{"id": "tech", "name": "Tech", "target": "tech_handler"}}, ...]`
8. **Node Specifics**:
- For `if-else` comparison_operator, use literal symbols: `≥`, `≤`, `=`, `≠` (NOT `>=` or `==`).
</rules>
<output_format>
Return ONLY a JSON object with a `nodes` array. Each node has:
- id: unique identifier
- type: node type
- title: display name
- config: node configuration
- depends_on: array of node IDs this depends on
```json
{{{{
"nodes": [
{{{{
"id": "fetch_data",
"type": "http-request",
"title": "Fetch Data",
"config": {{"url": "{{{{#start.url#}}}}", "method": "GET"}},
"depends_on": []
}}}},
{{{{
"id": "analyze",
"type": "llm",
"title": "Analyze",
"config": {{"prompt_template": [{{"role": "user", "text": "Analyze: {{{{#fetch_data.body#}}}}"}}]}},
"depends_on": ["fetch_data"]
}}}}
]
}}}}
```
</output_format>
<examples>
<example name="simple_linear">
```json
{{{{
"nodes": [
{{{{
"id": "llm",
"type": "llm",
"title": "Generate Response",
"config": {{{{
"model": {{"provider": "openai", "name": "gpt-4o", "mode": "chat"}},
"prompt_template": [{{"role": "user", "text": "Answer: {{{{#start.query#}}}}"}}]
}}}},
"depends_on": []
}}}}
]
}}}}
```
</example>
<example name="parallel_then_merge">
```json
{{{{
"nodes": [
{{{{
"id": "api1",
"type": "http-request",
"title": "Fetch API 1",
"config": {{"url": "https://api1.example.com", "method": "GET"}},
"depends_on": []
}}}},
{{{{
"id": "api2",
"type": "http-request",
"title": "Fetch API 2",
"config": {{"url": "https://api2.example.com", "method": "GET"}},
"depends_on": []
}}}},
{{{{
"id": "merge",
"type": "llm",
"title": "Merge Results",
"config": {{{{
"prompt_template": [{{"role": "user", "text": "Combine: {{{{#api1.body#}}}} and {{{{#api2.body#}}}}"}}]
}}}},
"depends_on": ["api1", "api2"]
}}}}
]
}}}}
```
</example>
<example name="if_else_branching">
```json
{{{{
"nodes": [
{{{{
"id": "check",
"type": "if-else",
"title": "Check Condition",
"config": {{{{
"cases": [{{{{
"case_id": "case_1",
"logical_operator": "and",
"conditions": [{{{{
"variable_selector": ["start", "score"],
"comparison_operator": "",
"value": "60"
}}}}]
}}}}],
"true_branch": "pass_handler",
"false_branch": "fail_handler"
}}}},
"depends_on": []
}}}},
{{{{
"id": "pass_handler",
"type": "llm",
"title": "Pass Response",
"config": {{"prompt_template": [{{"role": "user", "text": "Congratulations!"}}]}},
"depends_on": []
}}}},
{{{{
"id": "fail_handler",
"type": "llm",
"title": "Fail Response",
"config": {{"prompt_template": [{{"role": "user", "text": "Try again."}}]}},
"depends_on": []
}}}}
]
}}}}
```
Note: pass_handler and fail_handler have empty depends_on because their connections come from if-else branches.
</example>
<example name="question_classifier">
```json
{{{{
"nodes": [
{{{{
"id": "classifier",
"type": "question-classifier",
"title": "Classify Intent",
"config": {{{{
"model": {{"provider": "openai", "name": "gpt-4o", "mode": "chat"}},
"query_variable_selector": ["start", "user_input"],
"classes": [
{{"id": "tech", "name": "Technical", "target": "tech_handler"}},
{{"id": "billing", "name": "Billing", "target": "billing_handler"}},
{{"id": "other", "name": "Other", "target": "other_handler"}}
]
}}}},
"depends_on": []
}}}},
{{{{
"id": "tech_handler",
"type": "llm",
"title": "Tech Support",
"config": {{"prompt_template": [{{"role": "user", "text": "Help with tech: {{{{#start.user_input#}}}}"}}]}},
"depends_on": []
}}}},
{{{{
"id": "billing_handler",
"type": "llm",
"title": "Billing Support",
"config": {{"prompt_template": [{{"role": "user", "text": "Help with billing: {{{{#start.user_input#}}}}"}}]}},
"depends_on": []
}}}},
{{{{
"id": "other_handler",
"type": "llm",
"title": "General Support",
"config": {{"prompt_template": [{{"role": "user", "text": "General help: {{{{#start.user_input#}}}}"}}]}},
"depends_on": []
}}}}
]
}}}}
```
Note: Handler nodes have empty depends_on because their connections come from classifier branches.
</example>
</examples>
"""
BUILDER_USER_PROMPT_V2 = """<instruction>
{instruction}
</instruction>
Generate the workflow nodes configuration. Remember:
1. Do NOT generate start or end nodes
2. Do NOT generate edges - use depends_on instead
3. For if-else: add true_branch/false_branch in config
4. For question-classifier: add target to each class
"""
# =============================================================================
# LEGACY FORMAT: edges-based prompt (backward compatible)
# =============================================================================
BUILDER_SYSTEM_PROMPT = """<role>
You are a Workflow Configuration Engineer.
Your goal is to implement the Architect's plan by generating a precise, runnable Dify Workflow JSON configuration.
</role>
<language_rules>
- Detect the language of the user's request automatically (e.g., English, Chinese, Japanese, etc.).
- Generate ALL node titles, descriptions, and user-facing text in the SAME language as the user's input.
- If the input language is ambiguous or cannot be determined (e.g. code-only input),
use {preferred_language} as the target language.
</language_rules>
<inputs>
<plan>
{plan_context}
</plan>
<tool_schemas>
{tool_schemas}
</tool_schemas>
<node_specs>
{builtin_node_specs}
</node_specs>
<available_models>
{available_models}
</available_models>
<workflow_context>
<existing_nodes>
{existing_nodes_context}
</existing_nodes>
<existing_edges>
{existing_edges_context}
</existing_edges>
<selected_nodes>
{selected_nodes_context}
</selected_nodes>
</workflow_context>
</inputs>
<rules>
1. **Configuration**:
- You MUST fill ALL required parameters for every node.
- Use `{{{{#node_id.field#}}}}` syntax to reference outputs from previous nodes in text fields.
- For 'start' node, define all necessary user inputs.
2. **Variable References**:
- For text fields (like prompts, queries): use string format `{{{{#node_id.field#}}}}`
- For 'end' node outputs: use `value_selector` array format `["node_id", "field"]`
- Example: to reference 'llm' node's 'text' output in end node, use `["llm", "text"]`
3. **Tools**:
- ONLY use the tools listed in `<tool_schemas>`.
- If a planned tool is missing from schemas, fallback to `http-request` or `code`.
4. **Model Selection** (CRITICAL):
- For LLM, question-classifier, and parameter-extractor nodes, you MUST include a "model" config.
- You MUST use ONLY models from the `<available_models>` section above.
- Copy the EXACT provider and name values from available_models.
- NEVER use openai/gpt-4o, gpt-3.5-turbo, gpt-4, or any other models unless they appear in available_models.
- If available_models is empty or shows "No models configured", omit the model config entirely.
5. **Node Specifics**:
- For `if-else` comparison_operator, use literal symbols: `≥`, `≤`, `=`, `≠` (NOT `>=` or `==`).
6. **Modification Mode**:
- If `<existing_nodes>` contains nodes, you are MODIFYING an existing workflow.
- Keep nodes that are NOT mentioned in the user's instruction UNCHANGED.
- Only modify/add/remove nodes that the user explicitly requested.
- Preserve node IDs for unchanged nodes to maintain connections.
- If user says "add X", append new nodes to existing workflow.
- If user says "change Y to Z", only modify that specific node.
- If user says "remove X", exclude that node from output.
**Edge Modification**:
- Use `<existing_edges>` to understand current node connections.
- If user mentions "fix edge", "connect", "link", or "add connection",
review existing_edges and correct missing/wrong connections.
- For multi-branch nodes (if-else, question-classifier),
ensure EACH branch has proper sourceHandle (e.g., "true"/"false") and target.
- Common edge issues to fix:
* Missing edge: Two nodes should connect but don't - add the edge
* Wrong target: Edge points to wrong node - update the target
* Missing sourceHandle: if-else/classifier branches lack sourceHandle - add "true"/"false"
* Disconnected nodes: Node has no incoming or outgoing edges - connect it properly
- When modifying edges, ensure logical flow makes sense (start → middle → end).
- ALWAYS output complete edges array, even if only modifying one edge.
**Validation Feedback** (Automatic Retry):
- If `<validation_feedback>` is present, you are RETRYING after validation errors.
- Focus ONLY on fixing the specific validation issues mentioned.
- Keep everything else from the previous attempt UNCHANGED (preserve node IDs, edges, etc).
- Common validation issues and fixes:
* "Missing required connection" → Add the missing edge
* "Invalid node configuration" → Fix the specific node's config section
* "Type mismatch in variable reference" → Correct the variable selector path
* "Unknown variable" → Update variable reference to existing output
- When fixing, make MINIMAL changes to address each specific error.
7. **Output**:
- Return ONLY the JSON object with `nodes` and `edges`.
- Do NOT generate Mermaid diagrams.
- Do NOT generate explanations.
</rules>
<edge_rules priority="critical">
**EDGES ARE CRITICAL** - Every node except 'end' MUST have at least one outgoing edge.
1. **Linear Flow**: Simple source -> target connection
```
{{"source": "node_a", "target": "node_b"}}
```
2. **question-classifier Branching**: Each class MUST have a separate edge with `sourceHandle` = class `id`
- If you define classes: [{{"id": "cls_refund", "name": "Refund"}}, {{"id": "cls_inquiry", "name": "Inquiry"}}]
- You MUST create edges:
- {{"source": "classifier", "sourceHandle": "cls_refund", "target": "refund_handler"}}
- {{"source": "classifier", "sourceHandle": "cls_inquiry", "target": "inquiry_handler"}}
3. **if-else Branching**: MUST have exactly TWO edges with sourceHandle "true" and "false"
- {{"source": "condition", "sourceHandle": "true", "target": "true_branch"}}
- {{"source": "condition", "sourceHandle": "false", "target": "false_branch"}}
4. **Branch Convergence**: Multiple branches can connect to same downstream node
- Both true_branch and false_branch can connect to the same 'end' node
5. **NEVER leave orphan nodes**: Every node must be connected in the graph
</edge_rules>
<examples>
<example name="simple_linear">
```json
{{
"nodes": [
{{
"id": "start",
"type": "start",
"title": "Start",
"config": {{
"variables": [{{"variable": "query", "label": "Query", "type": "text-input"}}]
}}
}},
{{
"id": "llm",
"type": "llm",
"title": "Generate Response",
"config": {{
"model": {{"provider": "openai", "name": "gpt-4o", "mode": "chat"}},
"prompt_template": [{{"role": "user", "text": "Answer: {{{{#start.query#}}}}"}}]
}}
}},
{{
"id": "end",
"type": "end",
"title": "End",
"config": {{
"outputs": [
{{"variable": "result", "value_selector": ["llm", "text"]}}
]
}}
}}
],
"edges": [
{{"source": "start", "target": "llm"}},
{{"source": "llm", "target": "end"}}
]
}}
```
</example>
<example name="question_classifier_branching" description="Customer service with intent classification">
```json
{{
"nodes": [
{{
"id": "start",
"type": "start",
"title": "Start",
"config": {{
"variables": [{{"variable": "user_input", "label": "User Message", "type": "text-input", "required": true}}]
}}
}},
{{
"id": "classifier",
"type": "question-classifier",
"title": "Classify Intent",
"config": {{
"model": {{"provider": "openai", "name": "gpt-4o", "mode": "chat"}},
"query_variable_selector": ["start", "user_input"],
"classes": [
{{"id": "cls_refund", "name": "Refund Request"}},
{{"id": "cls_inquiry", "name": "Product Inquiry"}},
{{"id": "cls_complaint", "name": "Complaint"}},
{{"id": "cls_other", "name": "Other"}}
],
"instruction": "Classify the user's intent"
}}
}},
{{
"id": "handle_refund",
"type": "llm",
"title": "Handle Refund",
"config": {{
"model": {{"provider": "openai", "name": "gpt-4o", "mode": "chat"}},
"prompt_template": [{{"role": "user", "text": "Extract order number and respond: {{{{#start.user_input#}}}}"}}]
}}
}},
{{
"id": "handle_inquiry",
"type": "llm",
"title": "Handle Inquiry",
"config": {{
"model": {{"provider": "openai", "name": "gpt-4o", "mode": "chat"}},
"prompt_template": [{{"role": "user", "text": "Answer product question: {{{{#start.user_input#}}}}"}}]
}}
}},
{{
"id": "handle_complaint",
"type": "llm",
"title": "Handle Complaint",
"config": {{
"model": {{"provider": "openai", "name": "gpt-4o", "mode": "chat"}},
"prompt_template": [{{"role": "user", "text": "Respond with empathy: {{{{#start.user_input#}}}}"}}]
}}
}},
{{
"id": "handle_other",
"type": "llm",
"title": "Handle Other",
"config": {{
"model": {{"provider": "openai", "name": "gpt-4o", "mode": "chat"}},
"prompt_template": [{{"role": "user", "text": "Provide general response: {{{{#start.user_input#}}}}"}}]
}}
}},
{{
"id": "end",
"type": "end",
"title": "End",
"config": {{
"outputs": [{{"variable": "response", "value_selector": ["handle_refund", "text"]}}]
}}
}}
],
"edges": [
{{"source": "start", "target": "classifier"}},
{{"source": "classifier", "sourceHandle": "cls_refund", "target": "handle_refund"}},
{{"source": "classifier", "sourceHandle": "cls_inquiry", "target": "handle_inquiry"}},
{{"source": "classifier", "sourceHandle": "cls_complaint", "target": "handle_complaint"}},
{{"source": "classifier", "sourceHandle": "cls_other", "target": "handle_other"}},
{{"source": "handle_refund", "target": "end"}},
{{"source": "handle_inquiry", "target": "end"}},
{{"source": "handle_complaint", "target": "end"}},
{{"source": "handle_other", "target": "end"}}
]
}}
```
CRITICAL: Notice that each class id (cls_refund, cls_inquiry, etc.) becomes a sourceHandle in the edges!
</example>
<example name="if_else_branching" description="Conditional logic with if-else">
```json
{{
"nodes": [
{{
"id": "start",
"type": "start",
"title": "Start",
"config": {{
"variables": [{{"variable": "years", "label": "Years of Experience", "type": "number", "required": true}}]
}}
}},
{{
"id": "check_experience",
"type": "if-else",
"title": "Check Experience",
"config": {{
"cases": [
{{
"case_id": "case_1",
"logical_operator": "and",
"conditions": [
{{
"variable_selector": ["start", "years"],
"comparison_operator": "",
"value": "3"
}}
]
}}
]
}}
}},
{{
"id": "qualified",
"type": "llm",
"title": "Qualified Response",
"config": {{
"model": {{"provider": "openai", "name": "gpt-4o", "mode": "chat"}},
"prompt_template": [{{"role": "user", "text": "Generate qualified candidate response"}}]
}}
}},
{{
"id": "not_qualified",
"type": "llm",
"title": "Not Qualified Response",
"config": {{
"model": {{"provider": "openai", "name": "gpt-4o", "mode": "chat"}},
"prompt_template": [{{"role": "user", "text": "Generate rejection response"}}]
}}
}},
{{
"id": "end",
"type": "end",
"title": "End",
"config": {{
"outputs": [{{"variable": "result", "value_selector": ["qualified", "text"]}}]
}}
}}
],
"edges": [
{{"source": "start", "target": "check_experience"}},
{{"source": "check_experience", "sourceHandle": "true", "target": "qualified"}},
{{"source": "check_experience", "sourceHandle": "false", "target": "not_qualified"}},
{{"source": "qualified", "target": "end"}},
{{"source": "not_qualified", "target": "end"}}
]
}}
```
CRITICAL: if-else MUST have exactly two edges with sourceHandle "true" and "false"!
</example>
<example name="parameter_extractor" description="Extract structured data from text">
```json
{{
"nodes": [
{{
"id": "start",
"type": "start",
"title": "Start",
"config": {{
"variables": [{{"variable": "resume", "label": "Resume Text", "type": "paragraph", "required": true}}]
}}
}},
{{
"id": "extract",
"type": "parameter-extractor",
"title": "Extract Info",
"config": {{
"model": {{"provider": "openai", "name": "gpt-4o", "mode": "chat"}},
"query": ["start", "resume"],
"parameters": [
{{"name": "name", "type": "string", "description": "Candidate name", "required": true}},
{{"name": "years", "type": "number", "description": "Years of experience", "required": true}},
{{"name": "skills", "type": "array[string]", "description": "List of skills", "required": true}}
],
"instruction": "Extract candidate information from resume"
}}
}},
{{
"id": "process",
"type": "llm",
"title": "Process Data",
"config": {{
"model": {{"provider": "openai", "name": "gpt-4o", "mode": "chat"}},
"prompt_template": [{{"role": "user", "text": "Name: {{{{#extract.name#}}}}, Years: {{{{#extract.years#}}}}"}}]
}}
}},
{{
"id": "end",
"type": "end",
"title": "End",
"config": {{
"outputs": [{{"variable": "result", "value_selector": ["process", "text"]}}]
}}
}}
],
"edges": [
{{"source": "start", "target": "extract"}},
{{"source": "extract", "target": "process"}},
{{"source": "process", "target": "end"}}
]
}}
```
</example>
</examples>
<edge_checklist>
Before finalizing, verify:
1. [ ] Every node (except 'end') has at least one outgoing edge
2. [ ] 'start' node has exactly one outgoing edge
3. [ ] 'question-classifier' has one edge per class, each with sourceHandle = class id
4. [ ] 'if-else' has exactly two edges: sourceHandle "true" and sourceHandle "false"
5. [ ] All branches eventually connect to 'end' (directly or through other nodes)
6. [ ] No orphan nodes exist (every node is reachable from 'start')
</edge_checklist>
"""
BUILDER_USER_PROMPT = """<instruction>
{instruction}
</instruction>
Generate the full workflow configuration now. Pay special attention to:
1. Creating edges for ALL branches of question-classifier and if-else nodes
2. Using correct sourceHandle values for branching nodes
3. Ensuring every node is connected in the graph
"""
def format_existing_nodes(nodes: list[dict] | None) -> str:
"""Format existing workflow nodes for context."""
if not nodes:
return "No existing nodes in workflow (creating from scratch)."
lines = []
for node in nodes:
node_id = node.get("id", "unknown")
node_type = node.get("type", "unknown")
title = node.get("title", "Untitled")
lines.append(f"- [{node_id}] {title} ({node_type})")
return "\n".join(lines)
def format_selected_nodes(
selected_ids: list[str] | None,
existing_nodes: list[dict] | None,
) -> str:
"""Format selected nodes for modification context."""
if not selected_ids:
return "No nodes selected (generating new workflow)."
node_map = {n.get("id"): n for n in (existing_nodes or [])}
lines = []
for node_id in selected_ids:
if node_id in node_map:
node = node_map[node_id]
lines.append(f"- [{node_id}] {node.get('title', 'Untitled')} ({node.get('type', 'unknown')})")
else:
lines.append(f"- [{node_id}] (not found in current workflow)")
return "\n".join(lines)
def format_existing_edges(edges: list[dict] | None) -> str:
"""Format existing workflow edges to show connections."""
if not edges:
return "No existing edges (creating new workflow)."
lines = []
for edge in edges:
source = edge.get("source", "unknown")
target = edge.get("target", "unknown")
source_handle = edge.get("sourceHandle", "")
if source_handle:
lines.append(f"- {source} ({source_handle}) -> {target}")
else:
lines.append(f"- {source} -> {target}")
return "\n".join(lines)

View File

@@ -0,0 +1,75 @@
PLANNER_SYSTEM_PROMPT = """<role>
You are an expert Workflow Architect.
Your job is to analyze user requests and plan a high-level automation workflow.
</role>
<task>
1. **Classify Intent**:
- Is the user asking to create an automation/workflow? -> Intent: "generate"
- Is it general chat/weather/jokes? -> Intent: "off_topic"
2. **Plan Steps** (if intent is "generate"):
- Break down the user's goal into logical steps.
- For each step, identify if a specific capability/tool is needed.
- Select the MOST RELEVANT tools from the available_tools list.
- DO NOT configure parameters yet. Just identify the tool.
3. **Output Format**:
Return a JSON object.
</task>
<available_tools>
{tools_summary}
</available_tools>
<response_format>
If intent is "generate":
```json
{{
"intent": "generate",
"plan_thought": "Brief explanation of the plan...",
"steps": [
{{ "step": 1, "description": "Fetch data from URL", "tool": "http-request" }},
{{ "step": 2, "description": "Summarize content", "tool": "llm" }},
{{ "step": 3, "description": "Search for info", "tool": "google_search" }}
],
"required_tool_keys": ["google_search"]
}}
```
(Note: 'http-request', 'llm', 'code' are built-in, you don't need to list them in required_tool_keys,
only external tools)
If intent is "off_topic":
```json
{{
"intent": "off_topic",
"message": "I can only help you build workflows. Try asking me to 'Create a workflow that...'",
"suggestions": ["Scrape a website", "Summarize a PDF"]
}}
```
</response_format>
"""
PLANNER_USER_PROMPT = """<user_request>
{instruction}
</user_request>
"""
def format_tools_for_planner(tools: list[dict]) -> str:
"""Format tools list for planner (Lightweight: Name + Description only)."""
if not tools:
return "No external tools available."
lines = []
for t in tools:
key = t.get("tool_key") or t.get("tool_name")
provider = t.get("provider_id") or t.get("provider", "")
desc = t.get("tool_description") or t.get("description", "")
label = t.get("tool_label") or key
# Format: - [provider/key] Label: Description
full_key = f"{provider}/{key}" if provider else key
lines.append(f"- [{full_key}] {label}: {desc}")
return "\n".join(lines)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,349 @@
import json
import logging
import re
from collections.abc import Sequence
import json_repair
from core.model_manager import ModelManager
from core.model_runtime.entities.message_entities import SystemPromptMessage, UserPromptMessage
from core.model_runtime.entities.model_entities import ModelType
from core.workflow.generator.prompts.builder_prompts import (
BUILDER_SYSTEM_PROMPT,
BUILDER_SYSTEM_PROMPT_V2,
BUILDER_USER_PROMPT,
BUILDER_USER_PROMPT_V2,
format_existing_edges,
format_existing_nodes,
format_selected_nodes,
)
from core.workflow.generator.prompts.planner_prompts import (
PLANNER_SYSTEM_PROMPT,
PLANNER_USER_PROMPT,
format_tools_for_planner,
)
from core.workflow.generator.prompts.vibe_prompts import (
format_available_models,
format_available_nodes,
format_available_tools,
parse_vibe_response,
)
from core.workflow.generator.utils.graph_builder import CyclicDependencyError, GraphBuilder
from core.workflow.generator.utils.mermaid_generator import generate_mermaid
from core.workflow.generator.utils.workflow_validator import ValidationHint, WorkflowValidator
logger = logging.getLogger(__name__)
class WorkflowGenerator:
"""
Refactored Vibe Workflow Generator (Planner-Builder Architecture).
Extracts Vibe logic from the monolithic LLMGenerator.
"""
@classmethod
def generate_workflow_flowchart(
cls,
tenant_id: str,
instruction: str,
model_config: dict,
available_nodes: Sequence[dict[str, object]] | None = None,
existing_nodes: Sequence[dict[str, object]] | None = None,
existing_edges: Sequence[dict[str, object]] | None = None,
available_tools: Sequence[dict[str, object]] | None = None,
selected_node_ids: Sequence[str] | None = None,
previous_workflow: dict[str, object] | None = None,
regenerate_mode: bool = False,
preferred_language: str | None = None,
available_models: Sequence[dict[str, object]] | None = None,
use_graph_builder: bool = False,
):
"""
Generates a Dify Workflow Flowchart from natural language instruction.
Pipeline:
1. Planner: Analyze intent & select tools.
2. Context Filter: Filter relevant tools (reduce tokens).
3. Builder: Generate node configurations.
4. Repair: Fix common node/edge issues (NodeRepair, EdgeRepair).
5. Validator: Check for errors & generate friendly hints.
6. Renderer: Deterministic Mermaid generation.
"""
model_manager = ModelManager()
model_instance = model_manager.get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
)
model_parameters = model_config.get("completion_params", {})
available_tools_list = list(available_tools) if available_tools else []
# Check if this is modification mode (user is refining existing workflow)
has_existing_nodes = existing_nodes and len(list(existing_nodes)) > 0
# --- STEP 1: PLANNER (Skip in modification mode) ---
if has_existing_nodes:
# In modification mode, skip Planner:
# - User intent is clear: modify the existing workflow
# - Tools are already in use (from existing nodes)
# - No need for intent classification or tool selection
plan_data = {"intent": "generate", "steps": [], "required_tool_keys": []}
filtered_tools = available_tools_list # Use all available tools
else:
# In creation mode, run Planner to validate intent and select tools
planner_tools_context = format_tools_for_planner(available_tools_list)
planner_system = PLANNER_SYSTEM_PROMPT.format(tools_summary=planner_tools_context)
planner_user = PLANNER_USER_PROMPT.format(instruction=instruction)
try:
response = model_instance.invoke_llm(
prompt_messages=[
SystemPromptMessage(content=planner_system),
UserPromptMessage(content=planner_user),
],
model_parameters=model_parameters,
stream=False,
)
plan_content = response.message.content
# Reuse parse_vibe_response logic or simple load
plan_data = parse_vibe_response(plan_content)
except Exception as e:
logger.exception("Planner failed")
return {"intent": "error", "error": f"Planning failed: {str(e)}"}
if plan_data.get("intent") == "off_topic":
return {
"intent": "off_topic",
"message": plan_data.get("message", "I can only help with workflow creation."),
"suggestions": plan_data.get("suggestions", []),
}
# --- STEP 2: CONTEXT FILTERING ---
required_tools = plan_data.get("required_tool_keys", [])
filtered_tools = []
if required_tools:
# Simple linear search (optimized version would use a map)
for tool in available_tools_list:
t_key = tool.get("tool_key") or tool.get("tool_name")
provider = tool.get("provider_id") or tool.get("provider")
full_key = f"{provider}/{t_key}" if provider else t_key
# Check if this tool is in required list (match either full key or short name)
if t_key in required_tools or full_key in required_tools:
filtered_tools.append(tool)
else:
# If logic only, no tools needed
filtered_tools = []
# --- STEP 3: BUILDER (with retry loop) ---
MAX_GLOBAL_RETRIES = 2 # Total attempts: 1 initial + 1 retry
workflow_data = None
mermaid_code = None
all_warnings = []
all_fixes = []
retry_count = 0
validation_hints = []
for attempt in range(MAX_GLOBAL_RETRIES):
retry_count = attempt
logger.info("Generation attempt %s/%s", attempt + 1, MAX_GLOBAL_RETRIES)
# Prepare context
tool_schemas = format_available_tools(filtered_tools)
node_specs = format_available_nodes(list(available_nodes) if available_nodes else [])
existing_nodes_context = format_existing_nodes(list(existing_nodes) if existing_nodes else None)
existing_edges_context = format_existing_edges(list(existing_edges) if existing_edges else None)
selected_nodes_context = format_selected_nodes(
list(selected_node_ids) if selected_node_ids else None, list(existing_nodes) if existing_nodes else None
)
# Build retry context
retry_context = ""
# NOTE: Manual regeneration/refinement mode removed
# Only handle automatic retry (validation errors)
# For automatic retry (validation errors)
if attempt > 0 and validation_hints:
severe_issues = [h for h in validation_hints if h.severity == "error"]
if severe_issues:
retry_context = "\n<validation_feedback>\n"
retry_context += "The previous generation had validation errors:\n"
for idx, hint in enumerate(severe_issues[:5], 1):
retry_context += f"{idx}. {hint.message}\n"
retry_context += "\nPlease fix these specific issues while keeping everything else UNCHANGED.\n"
retry_context += "</validation_feedback>\n"
# Select prompt version based on use_graph_builder flag
if use_graph_builder:
builder_system = BUILDER_SYSTEM_PROMPT_V2.format(
plan_context=json.dumps(plan_data.get("steps", []), indent=2),
tool_schemas=tool_schemas,
builtin_node_specs=node_specs,
available_models=format_available_models(list(available_models or [])),
preferred_language=preferred_language or "English",
existing_nodes_context=existing_nodes_context,
selected_nodes_context=selected_nodes_context,
)
builder_user = BUILDER_USER_PROMPT_V2.format(instruction=instruction) + retry_context
else:
builder_system = BUILDER_SYSTEM_PROMPT.format(
plan_context=json.dumps(plan_data.get("steps", []), indent=2),
tool_schemas=tool_schemas,
builtin_node_specs=node_specs,
available_models=format_available_models(list(available_models or [])),
preferred_language=preferred_language or "English",
existing_nodes_context=existing_nodes_context,
existing_edges_context=existing_edges_context,
selected_nodes_context=selected_nodes_context,
)
builder_user = BUILDER_USER_PROMPT.format(instruction=instruction) + retry_context
try:
build_res = model_instance.invoke_llm(
prompt_messages=[
SystemPromptMessage(content=builder_system),
UserPromptMessage(content=builder_user),
],
model_parameters=model_parameters,
stream=False,
)
# Builder output is raw JSON nodes/edges
build_content = build_res.message.content
match = re.search(r"```(?:json)?\s*([\s\S]+?)```", build_content)
if match:
build_content = match.group(1)
workflow_data = json_repair.loads(build_content)
if "nodes" not in workflow_data:
workflow_data["nodes"] = []
# --- GraphBuilder Mode: Build graph from depends_on ---
if use_graph_builder:
try:
# Extract nodes from LLM output (without start/end)
llm_nodes = workflow_data.get("nodes", [])
# Build complete graph with start/end and edges
complete_nodes, edges = GraphBuilder.build_graph(llm_nodes)
workflow_data["nodes"] = complete_nodes
workflow_data["edges"] = edges
logger.info(
"GraphBuilder: built %d nodes, %d edges from %d LLM nodes",
len(complete_nodes),
len(edges),
len(llm_nodes),
)
except CyclicDependencyError as e:
logger.warning("GraphBuilder: cyclic dependency detected: %s", e)
# Add to validation hints for retry
validation_hints.append(
ValidationHint(
node_id="",
field="depends_on",
message=f"Cyclic dependency detected: {e}. Please fix the dependency chain.",
severity="error",
)
)
if attempt == MAX_GLOBAL_RETRIES - 1:
return {
"intent": "error",
"error": "Failed to build workflow: cyclic dependency detected.",
}
continue # Retry with error feedback
except Exception as e:
logger.exception("GraphBuilder failed on attempt %d", attempt + 1)
if attempt == MAX_GLOBAL_RETRIES - 1:
return {"intent": "error", "error": f"Graph building failed: {str(e)}"}
continue
else:
# Legacy mode: edges from LLM output
if "edges" not in workflow_data:
workflow_data["edges"] = []
except Exception as e:
logger.exception("Builder failed on attempt %d", attempt + 1)
if attempt == MAX_GLOBAL_RETRIES - 1:
return {"intent": "error", "error": f"Building failed: {str(e)}"}
continue # Try again
# NOTE: NodeRepair and EdgeRepair have been removed.
# Validation will detect structural issues, and LLM will fix them on retry.
# This is more accurate because LLM understands the workflow context.
# --- STEP 4: RENDERER (Generate Mermaid early for validation) ---
mermaid_code = generate_mermaid(workflow_data)
# --- STEP 5: VALIDATOR ---
is_valid, validation_hints = WorkflowValidator.validate(workflow_data, available_tools_list)
# --- STEP 6: GRAPH VALIDATION (structural checks using graph algorithms) ---
if attempt < MAX_GLOBAL_RETRIES - 1:
try:
from core.workflow.generator.utils.graph_validator import GraphValidator
graph_result = GraphValidator.validate(workflow_data)
if not graph_result.success:
# Convert graph errors to validation hints
for graph_error in graph_result.errors:
validation_hints.append(
ValidationHint(
node_id=graph_error.node_id,
field="edges",
message=f"[Graph] {graph_error.message}",
severity="error",
)
)
# Also add warnings (dead ends) as hints
for graph_warning in graph_result.warnings:
validation_hints.append(
ValidationHint(
node_id=graph_warning.node_id,
field="edges",
message=f"[Graph] {graph_warning.message}",
severity="warning",
)
)
except Exception as e:
logger.warning("Graph validation error: %s", e)
# Collect all validation warnings
all_warnings = [h.message for h in validation_hints]
# Check if we should retry
severe_issues = [h for h in validation_hints if h.severity == "error"]
if not severe_issues or attempt == MAX_GLOBAL_RETRIES - 1:
break
# Has severe errors and retries remaining - continue to next attempt
# Collect all validation warnings
all_warnings = [h.message for h in validation_hints]
# Add stability warning (as requested by user)
stability_warning = "The generated workflow may require debugging."
if preferred_language and preferred_language.startswith("zh"):
stability_warning = "生成的 Workflow 可能需要调试。"
all_warnings.append(stability_warning)
return {
"intent": "generate",
"flowchart": mermaid_code,
"nodes": workflow_data["nodes"],
"edges": workflow_data["edges"],
"message": plan_data.get("plan_thought", "Generated workflow based on your request."),
"warnings": all_warnings,
"tool_recommendations": [], # Legacy field
"error": "",
"fixed_issues": all_fixes, # Track what was auto-fixed
"retry_count": retry_count, # Track how many retries were needed
}

View File

@@ -0,0 +1,217 @@
"""
Type definitions for Vibe Workflow Generator.
This module provides:
- TypedDict classes for lightweight type hints (no runtime overhead)
- Pydantic models for runtime validation where needed
Usage:
# For type hints only (no runtime validation):
from core.workflow.generator.types import WorkflowNodeDict, WorkflowEdgeDict
# For runtime validation:
from core.workflow.generator.types import WorkflowNode, WorkflowEdge
"""
from typing import Any, TypedDict
from pydantic import BaseModel, Field
# ============================================================
# TypedDict definitions (lightweight, for type hints only)
# ============================================================
class WorkflowNodeDict(TypedDict, total=False):
"""
Workflow node structure (TypedDict for hints).
Attributes:
id: Unique node identifier
type: Node type (e.g., "start", "end", "llm", "if-else", "http-request")
title: Human-readable node title
config: Node-specific configuration
data: Additional node data
"""
id: str
type: str
title: str
config: dict[str, Any]
data: dict[str, Any]
class WorkflowEdgeDict(TypedDict, total=False):
"""
Workflow edge structure (TypedDict for hints).
Attributes:
source: Source node ID
target: Target node ID
sourceHandle: Branch handle for if-else/question-classifier nodes
"""
source: str
target: str
sourceHandle: str
class AvailableModelDict(TypedDict):
"""
Available model structure.
Attributes:
provider: Model provider (e.g., "openai", "anthropic")
model: Model name (e.g., "gpt-4", "claude-3")
"""
provider: str
model: str
class ToolParameterDict(TypedDict, total=False):
"""
Tool parameter structure.
Attributes:
name: Parameter name
type: Parameter type (e.g., "string", "number", "boolean")
required: Whether parameter is required
human_description: Human-readable description
llm_description: LLM-oriented description
options: Available options for enum-type parameters
"""
name: str
type: str
required: bool
human_description: str | dict[str, str]
llm_description: str
options: list[Any]
class AvailableToolDict(TypedDict, total=False):
"""
Available tool structure.
Attributes:
provider_id: Tool provider ID
provider: Tool provider name (alternative to provider_id)
tool_key: Unique tool key
tool_name: Tool name (alternative to tool_key)
tool_description: Tool description
description: Alternative description field
is_team_authorization: Whether tool is configured/authorized
parameters: List of tool parameters
"""
provider_id: str
provider: str
tool_key: str
tool_name: str
tool_description: str
description: str
is_team_authorization: bool
parameters: list[ToolParameterDict]
class WorkflowDataDict(TypedDict, total=False):
"""
Complete workflow data structure.
Attributes:
nodes: List of workflow nodes
edges: List of workflow edges
warnings: List of warning messages
"""
nodes: list[WorkflowNodeDict]
edges: list[WorkflowEdgeDict]
warnings: list[str]
# ============================================================
# Pydantic models (for runtime validation)
# ============================================================
class WorkflowNode(BaseModel):
"""
Workflow node with runtime validation.
Use this model when you need to validate node data at runtime.
For lightweight type hints without validation, use WorkflowNodeDict.
"""
id: str
type: str
title: str = ""
config: dict[str, Any] = Field(default_factory=dict)
data: dict[str, Any] = Field(default_factory=dict)
class WorkflowEdge(BaseModel):
"""
Workflow edge with runtime validation.
Use this model when you need to validate edge data at runtime.
For lightweight type hints without validation, use WorkflowEdgeDict.
"""
source: str
target: str
sourceHandle: str | None = None
class AvailableModel(BaseModel):
"""
Available model with runtime validation.
Use this model when you need to validate model data at runtime.
For lightweight type hints without validation, use AvailableModelDict.
"""
provider: str
model: str
class ToolParameter(BaseModel):
"""Tool parameter with runtime validation."""
name: str = ""
type: str = "string"
required: bool = False
human_description: str | dict[str, str] = ""
llm_description: str = ""
options: list[Any] = Field(default_factory=list)
class AvailableTool(BaseModel):
"""
Available tool with runtime validation.
Use this model when you need to validate tool data at runtime.
For lightweight type hints without validation, use AvailableToolDict.
"""
provider_id: str = ""
provider: str = ""
tool_key: str = ""
tool_name: str = ""
tool_description: str = ""
description: str = ""
is_team_authorization: bool = False
parameters: list[ToolParameter] = Field(default_factory=list)
class WorkflowData(BaseModel):
"""
Complete workflow data with runtime validation.
Use this model when you need to validate workflow data at runtime.
For lightweight type hints without validation, use WorkflowDataDict.
"""
nodes: list[WorkflowNode] = Field(default_factory=list)
edges: list[WorkflowEdge] = Field(default_factory=list)
warnings: list[str] = Field(default_factory=list)

View File

@@ -0,0 +1,384 @@
"""
Edge Repair Utility for Vibe Workflow Generation.
This module provides intelligent edge repair capabilities for generated workflows.
It can detect and fix common edge issues:
- Missing edges between sequential nodes
- Incomplete branches for question-classifier and if-else nodes
- Orphaned nodes without connections
The repair logic is deterministic and doesn't require LLM calls.
"""
import logging
from dataclasses import dataclass, field
from core.workflow.generator.types import WorkflowDataDict, WorkflowEdgeDict, WorkflowNodeDict
logger = logging.getLogger(__name__)
@dataclass
class RepairResult:
"""Result of edge repair operation."""
nodes: list[WorkflowNodeDict]
edges: list[WorkflowEdgeDict]
repairs_made: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
@property
def was_repaired(self) -> bool:
"""Check if any repairs were made."""
return len(self.repairs_made) > 0
class EdgeRepair:
"""
Intelligent edge repair for workflow graphs.
Repairs are applied in order:
1. Infer linear connections from node order (if no edges exist)
2. Add missing branch edges for question-classifier
3. Add missing branch edges for if-else
4. Connect orphaned nodes
"""
@classmethod
def repair(cls, workflow_data: WorkflowDataDict) -> RepairResult:
"""
Repair edges in the workflow data.
Args:
workflow_data: Dict containing 'nodes' and 'edges'
Returns:
RepairResult with repaired nodes, edges, and repair logs
"""
nodes = list(workflow_data.get("nodes", []))
edges = list(workflow_data.get("edges", []))
repairs: list[str] = []
warnings: list[str] = []
logger.info("[EDGE REPAIR] Starting repair process for %s nodes, %s edges", len(nodes), len(edges))
# Build node lookup
# Build node lookup
node_map = {n.get("id"): n for n in nodes if n.get("id")}
node_ids = set(node_map.keys())
# 1. If no edges at all, infer linear chain
if not edges and len(nodes) > 1:
edges, inferred_repairs = cls._infer_linear_chain(nodes)
repairs.extend(inferred_repairs)
# 2. Build edge index for analysis
outgoing_edges: dict[str, list[WorkflowEdgeDict]] = {}
incoming_edges: dict[str, list[WorkflowEdgeDict]] = {}
for edge in edges:
src = edge.get("source")
tgt = edge.get("target")
if src:
outgoing_edges.setdefault(src, []).append(edge)
if tgt:
incoming_edges.setdefault(tgt, []).append(edge)
# 3. Repair question-classifier branches
for node in nodes:
if node.get("type") == "question-classifier":
new_edges, branch_repairs, branch_warnings = cls._repair_classifier_branches(
node, edges, outgoing_edges, node_ids
)
edges.extend(new_edges)
repairs.extend(branch_repairs)
warnings.extend(branch_warnings)
# Update outgoing index
for edge in new_edges:
outgoing_edges.setdefault(edge.get("source"), []).append(edge)
# 4. Repair if-else branches
for node in nodes:
if node.get("type") == "if-else":
new_edges, branch_repairs, branch_warnings = cls._repair_if_else_branches(
node, edges, outgoing_edges, node_ids
)
edges.extend(new_edges)
repairs.extend(branch_repairs)
warnings.extend(branch_warnings)
# Update outgoing index
for edge in new_edges:
outgoing_edges.setdefault(edge.get("source"), []).append(edge)
# 5. Connect orphaned nodes (nodes with no incoming edge, except start)
new_edges, orphan_repairs = cls._connect_orphaned_nodes(nodes, edges, outgoing_edges, incoming_edges)
edges.extend(new_edges)
repairs.extend(orphan_repairs)
# 6. Connect nodes with no outgoing edge to 'end' (except end nodes)
new_edges, terminal_repairs = cls._connect_terminal_nodes(nodes, edges, outgoing_edges)
edges.extend(new_edges)
repairs.extend(terminal_repairs)
if repairs:
logger.info("[EDGE REPAIR] Completed with %s repairs:", len(repairs))
for i, repair in enumerate(repairs, 1):
logger.info("[EDGE REPAIR] %s. %s", i, repair)
else:
logger.info("[EDGE REPAIR] Completed - no repairs needed")
return RepairResult(
nodes=nodes,
edges=edges,
repairs_made=repairs,
warnings=warnings,
)
@classmethod
def _infer_linear_chain(cls, nodes: list[WorkflowNodeDict]) -> tuple[list[WorkflowEdgeDict], list[str]]:
"""
Infer a linear chain of edges from node order.
This is used when no edges are provided at all.
"""
edges: list[WorkflowEdgeDict] = []
repairs: list[str] = []
# Filter to get ordered node IDs
node_ids = [n.get("id") for n in nodes if n.get("id")]
if len(node_ids) < 2:
return edges, repairs
# Create edges between consecutive nodes
for i in range(len(node_ids) - 1):
src = node_ids[i]
tgt = node_ids[i + 1]
edges.append({"source": src, "target": tgt})
repairs.append(f"Inferred edge: {src} -> {tgt}")
return edges, repairs
@classmethod
def _repair_classifier_branches(
cls,
node: WorkflowNodeDict,
edges: list[WorkflowEdgeDict],
outgoing_edges: dict[str, list[WorkflowEdgeDict]],
valid_node_ids: set[str],
) -> tuple[list[WorkflowEdgeDict], list[str], list[str]]:
"""
Repair missing branches for question-classifier nodes.
For each class that doesn't have an edge, create one pointing to 'end'.
"""
new_edges: list[WorkflowEdgeDict] = []
repairs: list[str] = []
warnings: list[str] = []
node_id = node.get("id")
if not node_id:
return new_edges, repairs, warnings
config = node.get("config", {})
classes = config.get("classes", [])
if not classes:
return new_edges, repairs, warnings
# Get existing sourceHandles for this node
existing_handles = set()
for edge in outgoing_edges.get(node_id, []):
handle = edge.get("sourceHandle")
if handle:
existing_handles.add(handle)
# Find 'end' node as default target
end_node_id = "end"
if "end" not in valid_node_ids:
# Try to find an end node
for nid in valid_node_ids:
if "end" in nid.lower():
end_node_id = nid
break
# Add missing branches
for cls_def in classes:
if not isinstance(cls_def, dict):
continue
cls_id = cls_def.get("id")
cls_name = cls_def.get("name", cls_id)
if cls_id and cls_id not in existing_handles:
new_edge = {
"source": node_id,
"sourceHandle": cls_id,
"target": end_node_id,
}
new_edges.append(new_edge)
repairs.append(f"Added missing branch edge for class '{cls_name}' -> {end_node_id}")
warnings.append(
f"Auto-connected question-classifier branch '{cls_name}' to '{end_node_id}'. "
"You may want to redirect this to a specific handler node."
)
return new_edges, repairs, warnings
@classmethod
def _repair_if_else_branches(
cls,
node: WorkflowNodeDict,
edges: list[WorkflowEdgeDict],
outgoing_edges: dict[str, list[WorkflowEdgeDict]],
valid_node_ids: set[str],
) -> tuple[list[WorkflowEdgeDict], list[str], list[str]]:
"""
Repair missing branches for if-else nodes.
If-else in Dify uses case_id as sourceHandle for each condition,
plus 'false' for the else branch.
"""
new_edges: list[WorkflowEdgeDict] = []
repairs: list[str] = []
warnings: list[str] = []
node_id = node.get("id")
if not node_id:
return new_edges, repairs, warnings
# Get existing sourceHandles
existing_handles = set()
for edge in outgoing_edges.get(node_id, []):
handle = edge.get("sourceHandle")
if handle:
existing_handles.add(handle)
# Find 'end' node as default target
end_node_id = "end"
if "end" not in valid_node_ids:
for nid in valid_node_ids:
if "end" in nid.lower():
end_node_id = nid
break
# Get required branches from config
config = node.get("config", {})
cases = config.get("cases", [])
# Build required handles: each case_id + 'false' for else
required_branches = set()
for case in cases:
case_id = case.get("case_id")
if case_id:
required_branches.add(case_id)
required_branches.add("false") # else branch
# Add missing branches
for branch in required_branches:
if branch not in existing_handles:
new_edge = {
"source": node_id,
"sourceHandle": branch,
"target": end_node_id,
}
new_edges.append(new_edge)
repairs.append(f"Added missing if-else branch '{branch}' -> {end_node_id}")
warnings.append(
f"Auto-connected if-else branch '{branch}' to '{end_node_id}'. "
"You may want to redirect this to a specific handler node."
)
return new_edges, repairs, warnings
@classmethod
def _connect_orphaned_nodes(
cls,
nodes: list[WorkflowNodeDict],
edges: list[WorkflowEdgeDict],
outgoing_edges: dict[str, list[WorkflowEdgeDict]],
incoming_edges: dict[str, list[WorkflowEdgeDict]],
) -> tuple[list[WorkflowEdgeDict], list[str]]:
"""
Connect orphaned nodes to the previous node in sequence.
An orphaned node has no incoming edges and is not a 'start' node.
"""
new_edges: list[WorkflowEdgeDict] = []
repairs: list[str] = []
node_ids = [n.get("id") for n in nodes if n.get("id")]
node_types = {n.get("id"): n.get("type") for n in nodes}
for i, node_id in enumerate(node_ids):
node_type = node_types.get(node_id)
# Skip start nodes - they don't need incoming edges
if node_type == "start":
continue
# Check if node has incoming edges
if node_id not in incoming_edges or not incoming_edges[node_id]:
# Find previous node to connect from
if i > 0:
prev_node_id = node_ids[i - 1]
new_edge = {"source": prev_node_id, "target": node_id}
new_edges.append(new_edge)
repairs.append(f"Connected orphaned node: {prev_node_id} -> {node_id}")
# Update incoming_edges for subsequent checks
incoming_edges.setdefault(node_id, []).append(new_edge)
return new_edges, repairs
@classmethod
def _connect_terminal_nodes(
cls,
nodes: list[WorkflowNodeDict],
edges: list[WorkflowEdgeDict],
outgoing_edges: dict[str, list[WorkflowEdgeDict]],
) -> tuple[list[WorkflowEdgeDict], list[str]]:
"""
Connect terminal nodes (no outgoing edges) to 'end'.
A terminal node has no outgoing edges and is not an 'end' node.
This ensures all branches eventually reach 'end'.
"""
new_edges: list[WorkflowEdgeDict] = []
repairs: list[str] = []
# Find end node
end_node_id = None
node_ids = set()
for n in nodes:
nid = n.get("id")
ntype = n.get("type")
if nid:
node_ids.add(nid)
if ntype == "end":
end_node_id = nid
if not end_node_id:
# No end node found, can't connect
return new_edges, repairs
for node in nodes:
node_id = node.get("id")
node_type = node.get("type")
# Skip end nodes
if node_type == "end":
continue
# Skip nodes that already have outgoing edges
if outgoing_edges.get(node_id):
continue
# Connect to end
new_edge = {"source": node_id, "target": end_node_id}
new_edges.append(new_edge)
repairs.append(f"Connected terminal node to end: {node_id} -> {end_node_id}")
# Update for subsequent checks
outgoing_edges.setdefault(node_id, []).append(new_edge)
return new_edges, repairs

View File

@@ -0,0 +1,621 @@
"""
GraphBuilder: Automatic workflow graph construction from node list.
This module implements the core logic for building complete workflow graphs
from LLM-generated node lists with dependency declarations.
Key features:
- Automatic start/end node generation
- Dependency inference from variable references
- Topological sorting with cycle detection
- Special handling for branching nodes (if-else, question-classifier)
- Silent error recovery where possible
"""
import json
import logging
import re
import uuid
from collections import defaultdict
from typing import Any
logger = logging.getLogger(__name__)
# Pattern to match variable references like {{#node_id.field#}}
VAR_PATTERN = re.compile(r"\{\{#([^.#]+)\.[^#]+#\}\}")
# System variable prefixes to exclude from dependency inference
SYSTEM_VAR_PREFIXES = {"sys", "start", "env"}
# Node types that have special branching behavior
BRANCHING_NODE_TYPES = {"if-else", "question-classifier"}
# Container node types (iteration, loop) - these have internal subgraphs
# but behave as single-input-single-output nodes in the external graph
CONTAINER_NODE_TYPES = {"iteration", "loop"}
class GraphBuildError(Exception):
"""Raised when graph cannot be built due to unrecoverable errors."""
pass
class CyclicDependencyError(GraphBuildError):
"""Raised when cyclic dependencies are detected."""
pass
class GraphBuilder:
"""
Builds complete workflow graphs from LLM-generated node lists.
This class handles the conversion from a simplified node list format
(with depends_on declarations) to a full workflow graph with nodes and edges.
The LLM only needs to generate:
- Node configurations with depends_on arrays
- Branch targets in config for branching nodes
The GraphBuilder automatically:
- Adds start and end nodes
- Generates all edges from dependencies
- Infers implicit dependencies from variable references
- Handles branching nodes (if-else, question-classifier)
- Validates graph structure (no cycles, proper connectivity)
"""
@classmethod
def build_graph(
cls,
nodes: list[dict[str, Any]],
start_config: dict[str, Any] | None = None,
end_config: dict[str, Any] | None = None,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""
Build a complete workflow graph from a node list.
Args:
nodes: LLM-generated nodes (without start/end)
start_config: Optional configuration for start node
end_config: Optional configuration for end node
Returns:
Tuple of (complete_nodes, edges) where:
- complete_nodes includes start, user nodes, and end
- edges contains all connections
Raises:
CyclicDependencyError: If cyclic dependencies are detected
GraphBuildError: If graph cannot be built
"""
if not nodes:
# Empty node list - create minimal workflow
start_node = cls._create_start_node([], start_config)
end_node = cls._create_end_node([], end_config)
edge = cls._create_edge("start", "end")
return [start_node, end_node], [edge]
# Build node index for quick lookup
node_map = {node["id"]: node for node in nodes}
# Step 1: Extract explicit dependencies from depends_on
dependencies = cls._extract_explicit_dependencies(nodes)
# Step 2: Infer implicit dependencies from variable references
dependencies = cls._infer_dependencies_from_variables(nodes, dependencies, node_map)
# Step 3: Validate and fix dependencies (remove invalid references)
dependencies = cls._validate_dependencies(dependencies, node_map)
# Step 4: Topological sort (detects cycles)
sorted_node_ids = cls._topological_sort(nodes, dependencies)
# Step 5: Generate start node
start_node = cls._create_start_node(nodes, start_config)
# Step 6: Generate edges
edges = cls._generate_edges(nodes, sorted_node_ids, dependencies, node_map)
# Step 7: Find terminal nodes and generate end node
terminal_nodes = cls._find_terminal_nodes(nodes, dependencies, node_map)
end_node = cls._create_end_node(terminal_nodes, end_config)
# Step 8: Add edges from terminal nodes to end
for terminal_id in terminal_nodes:
edges.append(cls._create_edge(terminal_id, "end"))
# Step 9: Assemble complete node list
all_nodes = [start_node, *nodes, end_node]
return all_nodes, edges
@classmethod
def _extract_explicit_dependencies(
cls,
nodes: list[dict[str, Any]],
) -> dict[str, list[str]]:
"""
Extract explicit dependencies from depends_on field.
Args:
nodes: List of nodes with optional depends_on field
Returns:
Dictionary mapping node_id -> list of dependency node_ids
"""
dependencies: dict[str, list[str]] = {}
for node in nodes:
node_id = node.get("id", "")
depends_on = node.get("depends_on", [])
# Ensure depends_on is a list
if isinstance(depends_on, str):
depends_on = [depends_on] if depends_on else []
elif not isinstance(depends_on, list):
depends_on = []
dependencies[node_id] = list(depends_on)
return dependencies
@classmethod
def _infer_dependencies_from_variables(
cls,
nodes: list[dict[str, Any]],
explicit_deps: dict[str, list[str]],
node_map: dict[str, dict[str, Any]],
) -> dict[str, list[str]]:
"""
Infer implicit dependencies from variable references in config.
Scans node configurations for patterns like {{#node_id.field#}}
and adds those as dependencies if not already declared.
Args:
nodes: List of nodes
explicit_deps: Already extracted explicit dependencies
node_map: Map of node_id -> node for validation
Returns:
Updated dependencies dictionary
"""
for node in nodes:
node_id = node.get("id", "")
config = node.get("config", {})
# Serialize config to search for variable references
try:
config_str = json.dumps(config, ensure_ascii=False)
except (TypeError, ValueError):
continue
# Find all variable references
referenced_nodes = set(VAR_PATTERN.findall(config_str))
# Filter out system variables
referenced_nodes -= SYSTEM_VAR_PREFIXES
# Ensure node_id exists in dependencies
if node_id not in explicit_deps:
explicit_deps[node_id] = []
# Add inferred dependencies
for ref in referenced_nodes:
# Skip self-references (e.g., loop nodes referencing their own outputs)
if ref == node_id:
logger.debug(
"Skipping self-reference: %s -> %s",
node_id,
ref,
)
continue
if ref in node_map and ref not in explicit_deps[node_id]:
explicit_deps[node_id].append(ref)
logger.debug(
"Inferred dependency: %s -> %s (from variable reference)",
node_id,
ref,
)
return explicit_deps
@classmethod
def _validate_dependencies(
cls,
dependencies: dict[str, list[str]],
node_map: dict[str, dict[str, Any]],
) -> dict[str, list[str]]:
"""
Validate dependencies and remove invalid references.
Silent fix: References to non-existent nodes are removed.
Args:
dependencies: Dependencies to validate
node_map: Map of valid node IDs
Returns:
Validated dependencies
"""
valid_deps: dict[str, list[str]] = {}
for node_id, deps in dependencies.items():
valid_deps[node_id] = []
for dep in deps:
if dep in node_map:
valid_deps[node_id].append(dep)
else:
logger.warning(
"Removed invalid dependency: %s -> %s (node does not exist)",
node_id,
dep,
)
return valid_deps
@classmethod
def _topological_sort(
cls,
nodes: list[dict[str, Any]],
dependencies: dict[str, list[str]],
) -> list[str]:
"""
Perform topological sort on nodes based on dependencies.
Uses Kahn's algorithm for cycle detection.
Args:
nodes: List of nodes
dependencies: Dependency graph
Returns:
List of node IDs in topological order
Raises:
CyclicDependencyError: If cyclic dependencies are detected
"""
# Build in-degree map
in_degree: dict[str, int] = defaultdict(int)
reverse_deps: dict[str, list[str]] = defaultdict(list)
node_ids = {node["id"] for node in nodes}
for node_id in node_ids:
in_degree[node_id] = 0
for node_id, deps in dependencies.items():
for dep in deps:
if dep in node_ids:
in_degree[node_id] += 1
reverse_deps[dep].append(node_id)
# Start with nodes that have no dependencies
queue = [nid for nid in node_ids if in_degree[nid] == 0]
sorted_ids: list[str] = []
while queue:
current = queue.pop(0)
sorted_ids.append(current)
for dependent in reverse_deps[current]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
# Check for cycles
if len(sorted_ids) != len(node_ids):
remaining = node_ids - set(sorted_ids)
raise CyclicDependencyError(
f"Cyclic dependency detected involving nodes: {remaining}"
)
return sorted_ids
@classmethod
def _generate_edges(
cls,
nodes: list[dict[str, Any]],
sorted_node_ids: list[str],
dependencies: dict[str, list[str]],
node_map: dict[str, dict[str, Any]],
) -> list[dict[str, Any]]:
"""
Generate all edges based on dependencies and special node handling.
Args:
nodes: List of nodes
sorted_node_ids: Topologically sorted node IDs
dependencies: Dependency graph
node_map: Map of node_id -> node
Returns:
List of edge dictionaries
"""
edges: list[dict[str, Any]] = []
nodes_with_incoming: set[str] = set()
# Track which nodes have outgoing edges from branching
branching_sources: set[str] = set()
# First pass: Handle branching nodes
for node in nodes:
node_id = node.get("id", "")
node_type = node.get("type", "")
if node_type == "if-else":
branch_edges = cls._handle_if_else_node(node)
edges.extend(branch_edges)
branching_sources.add(node_id)
nodes_with_incoming.update(edge["target"] for edge in branch_edges)
elif node_type == "question-classifier":
branch_edges = cls._handle_question_classifier_node(node)
edges.extend(branch_edges)
branching_sources.add(node_id)
nodes_with_incoming.update(edge["target"] for edge in branch_edges)
# Second pass: Generate edges from dependencies
for node_id in sorted_node_ids:
deps = dependencies.get(node_id, [])
if deps:
# Connect from each dependency
for dep_id in deps:
dep_node = node_map.get(dep_id, {})
dep_type = dep_node.get("type", "")
# Skip if dependency is a branching node (edges handled above)
if dep_type in BRANCHING_NODE_TYPES:
continue
edges.append(cls._create_edge(dep_id, node_id))
nodes_with_incoming.add(node_id)
else:
# No dependencies - connect from start
# But skip if this node receives edges from branching nodes
if node_id not in nodes_with_incoming:
edges.append(cls._create_edge("start", node_id))
nodes_with_incoming.add(node_id)
return edges
@classmethod
def _handle_if_else_node(
cls,
node: dict[str, Any],
) -> list[dict[str, Any]]:
"""
Handle if-else node branching.
Expects config to contain true_branch and/or false_branch.
Args:
node: If-else node
Returns:
List of branch edges
"""
edges: list[dict[str, Any]] = []
node_id = node.get("id", "")
config = node.get("config", {})
true_branch = config.get("true_branch")
false_branch = config.get("false_branch")
if true_branch:
edges.append(cls._create_edge(node_id, true_branch, source_handle="true"))
if false_branch:
edges.append(cls._create_edge(node_id, false_branch, source_handle="false"))
# If no branches specified, log warning
if not true_branch and not false_branch:
logger.warning(
"if-else node %s has no branch targets specified",
node_id,
)
return edges
@classmethod
def _handle_question_classifier_node(
cls,
node: dict[str, Any],
) -> list[dict[str, Any]]:
"""
Handle question-classifier node branching.
Expects config.classes to contain class definitions with target fields.
Args:
node: Question-classifier node
Returns:
List of branch edges
"""
edges: list[dict[str, Any]] = []
node_id = node.get("id", "")
config = node.get("config", {})
classes = config.get("classes", [])
if not classes:
logger.warning(
"question-classifier node %s has no classes defined",
node_id,
)
return edges
for cls_def in classes:
class_id = cls_def.get("id", "")
target = cls_def.get("target")
if target:
edges.append(cls._create_edge(node_id, target, source_handle=class_id))
else:
# Silent fix: Connect to end if no target specified
edges.append(cls._create_edge(node_id, "end", source_handle=class_id))
logger.debug(
"question-classifier class %s has no target, connecting to end",
class_id,
)
return edges
@classmethod
def _find_terminal_nodes(
cls,
nodes: list[dict[str, Any]],
dependencies: dict[str, list[str]],
node_map: dict[str, dict[str, Any]],
) -> list[str]:
"""
Find nodes that should connect to the end node.
Terminal nodes are those that:
- Are not dependencies of any other node
- Are not branching nodes (those connect to their branches)
Args:
nodes: List of nodes
dependencies: Dependency graph
node_map: Map of node_id -> node
Returns:
List of terminal node IDs
"""
# Build set of all nodes that are depended upon
depended_upon: set[str] = set()
for deps in dependencies.values():
depended_upon.update(deps)
# Also track nodes that are branch targets
branch_targets: set[str] = set()
branching_nodes: set[str] = set()
for node in nodes:
node_id = node.get("id", "")
node_type = node.get("type", "")
config = node.get("config", {})
if node_type == "if-else":
branching_nodes.add(node_id)
if config.get("true_branch"):
branch_targets.add(config["true_branch"])
if config.get("false_branch"):
branch_targets.add(config["false_branch"])
elif node_type == "question-classifier":
branching_nodes.add(node_id)
for cls_def in config.get("classes", []):
if cls_def.get("target"):
branch_targets.add(cls_def["target"])
# Find terminal nodes
terminal_nodes: list[str] = []
for node in nodes:
node_id = node.get("id", "")
node_type = node.get("type", "")
# Skip branching nodes - they don't connect to end directly
if node_type in BRANCHING_NODE_TYPES:
continue
# Terminal if not depended upon and not a branch target that leads elsewhere
if node_id not in depended_upon:
terminal_nodes.append(node_id)
# If no terminal nodes found (shouldn't happen), use all non-branching nodes
if not terminal_nodes:
terminal_nodes = [
node["id"]
for node in nodes
if node.get("type") not in BRANCHING_NODE_TYPES
]
logger.warning("No terminal nodes found, using all non-branching nodes")
return terminal_nodes
@classmethod
def _create_start_node(
cls,
nodes: list[dict[str, Any]],
config: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Create a start node.
Args:
nodes: User nodes (for potential config inference)
config: Optional start node configuration
Returns:
Start node dictionary
"""
return {
"id": "start",
"type": "start",
"title": "Start",
"config": config or {},
"data": {},
}
@classmethod
def _create_end_node(
cls,
terminal_nodes: list[str],
config: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Create an end node.
Args:
terminal_nodes: Nodes that will connect to end
config: Optional end node configuration
Returns:
End node dictionary
"""
return {
"id": "end",
"type": "end",
"title": "End",
"config": config or {},
"data": {},
}
@classmethod
def _create_edge(
cls,
source: str,
target: str,
source_handle: str | None = None,
) -> dict[str, Any]:
"""
Create an edge dictionary.
Args:
source: Source node ID
target: Target node ID
source_handle: Optional handle for branching (e.g., "true", "false", class_id)
Returns:
Edge dictionary
"""
edge: dict[str, Any] = {
"id": f"{source}-{target}-{uuid.uuid4().hex[:8]}",
"source": source,
"target": target,
}
if source_handle:
edge["sourceHandle"] = source_handle
else:
edge["sourceHandle"] = "source"
edge["targetHandle"] = "target"
return edge

View File

@@ -0,0 +1,280 @@
"""
Graph Validator for Workflow Generation
Validates workflow graph structure using graph algorithms:
- Reachability from start node (BFS)
- Reachability to end node (reverse BFS)
- Branch edge validation for if-else and classifier nodes
"""
import time
from collections import deque
from dataclasses import dataclass, field
@dataclass
class GraphError:
"""Represents a structural error in the workflow graph."""
node_id: str
node_type: str
error_type: str # "unreachable", "dead_end", "cycle", "missing_start", "missing_end"
message: str
@dataclass
class GraphValidationResult:
"""Result of graph validation."""
success: bool
errors: list[GraphError] = field(default_factory=list)
warnings: list[GraphError] = field(default_factory=list)
execution_time: float = 0.0
stats: dict = field(default_factory=dict)
class GraphValidator:
"""
Validates workflow graph structure using proper graph algorithms.
Performs:
1. Forward reachability analysis (BFS from start)
2. Backward reachability analysis (reverse BFS from end)
3. Branch edge validation for if-else and classifier nodes
"""
@staticmethod
def _build_adjacency(
nodes: dict[str, dict], edges: list[dict]
) -> tuple[dict[str, list[str]], dict[str, list[str]]]:
"""Build forward and reverse adjacency lists from edges."""
outgoing: dict[str, list[str]] = {node_id: [] for node_id in nodes}
incoming: dict[str, list[str]] = {node_id: [] for node_id in nodes}
for edge in edges:
source = edge.get("source")
target = edge.get("target")
if source in outgoing and target in incoming:
outgoing[source].append(target)
incoming[target].append(source)
return outgoing, incoming
@staticmethod
def _bfs_reachable(start: str, adjacency: dict[str, list[str]]) -> set[str]:
"""BFS to find all nodes reachable from start node."""
if start not in adjacency:
return set()
visited = set()
queue = deque([start])
visited.add(start)
while queue:
current = queue.popleft()
for neighbor in adjacency.get(current, []):
if neighbor not in visited:
visited.add(neighbor)
queue.append(neighbor)
return visited
@staticmethod
def validate(workflow_data: dict) -> GraphValidationResult:
"""Validate workflow graph structure."""
start_time = time.time()
errors: list[GraphError] = []
warnings: list[GraphError] = []
nodes_list = workflow_data.get("nodes", [])
edges_list = workflow_data.get("edges", [])
nodes = {n["id"]: n for n in nodes_list if n.get("id")}
# Find start and end nodes
start_node_id = None
end_node_ids = []
for node_id, node in nodes.items():
node_type = node.get("type")
if node_type == "start":
start_node_id = node_id
elif node_type == "end":
end_node_ids.append(node_id)
# Check start node exists
if not start_node_id:
errors.append(
GraphError(
node_id="workflow",
node_type="workflow",
error_type="missing_start",
message="Workflow has no start node",
)
)
# Check end node exists
if not end_node_ids:
errors.append(
GraphError(
node_id="workflow",
node_type="workflow",
error_type="missing_end",
message="Workflow has no end node",
)
)
# If missing start or end, can't do reachability analysis
if not start_node_id or not end_node_ids:
execution_time = time.time() - start_time
return GraphValidationResult(
success=False,
errors=errors,
warnings=warnings,
execution_time=execution_time,
stats={"nodes": len(nodes), "edges": len(edges_list)},
)
# Build adjacency lists
outgoing, incoming = GraphValidator._build_adjacency(nodes, edges_list)
# --- FORWARD REACHABILITY: BFS from start ---
reachable_from_start = GraphValidator._bfs_reachable(start_node_id, outgoing)
# Find unreachable nodes
unreachable_nodes = set(nodes.keys()) - reachable_from_start
for node_id in unreachable_nodes:
node = nodes[node_id]
errors.append(
GraphError(
node_id=node_id,
node_type=node.get("type", "unknown"),
error_type="unreachable",
message=f"Node '{node_id}' is not reachable from start node",
)
)
# --- BACKWARD REACHABILITY: Reverse BFS from end nodes ---
can_reach_end: set[str] = set()
for end_id in end_node_ids:
can_reach_end.update(GraphValidator._bfs_reachable(end_id, incoming))
# Find dead-end nodes (can't reach any end node)
dead_end_nodes = set(nodes.keys()) - can_reach_end
for node_id in dead_end_nodes:
if node_id in unreachable_nodes:
continue
node = nodes[node_id]
warnings.append(
GraphError(
node_id=node_id,
node_type=node.get("type", "unknown"),
error_type="dead_end",
message=f"Node '{node_id}' cannot reach any end node (dead end)",
)
)
# --- Start node has outgoing edges? ---
if not outgoing.get(start_node_id):
errors.append(
GraphError(
node_id=start_node_id,
node_type="start",
error_type="disconnected",
message="Start node has no outgoing connections",
)
)
# --- End nodes have incoming edges? ---
for end_id in end_node_ids:
if not incoming.get(end_id):
errors.append(
GraphError(
node_id=end_id,
node_type="end",
error_type="disconnected",
message="End node has no incoming connections",
)
)
# --- BRANCH EDGE VALIDATION ---
edge_handles: dict[str, set[str]] = {}
for edge in edges_list:
source = edge.get("source")
handle = edge.get("sourceHandle", "")
if source:
if source not in edge_handles:
edge_handles[source] = set()
edge_handles[source].add(handle)
# Check if-else and question-classifier nodes
for node_id, node in nodes.items():
node_type = node.get("type")
if node_type == "if-else":
handles = edge_handles.get(node_id, set())
config = node.get("config", {})
cases = config.get("cases", [])
required_handles = set()
for case in cases:
case_id = case.get("case_id")
if case_id:
required_handles.add(case_id)
required_handles.add("false")
missing = required_handles - handles
for handle in missing:
errors.append(
GraphError(
node_id=node_id,
node_type=node_type,
error_type="missing_branch",
message=f"If-else node '{node_id}' missing edge for branch '{handle}'",
)
)
elif node_type == "question-classifier":
handles = edge_handles.get(node_id, set())
config = node.get("config", {})
classes = config.get("classes", [])
required_handles = set()
for cls in classes:
if isinstance(cls, dict):
cls_id = cls.get("id")
if cls_id:
required_handles.add(cls_id)
missing = required_handles - handles
for handle in missing:
cls_name = handle
for cls in classes:
if isinstance(cls, dict) and cls.get("id") == handle:
cls_name = cls.get("name", handle)
break
errors.append(
GraphError(
node_id=node_id,
node_type=node_type,
error_type="missing_branch",
message=f"Classifier '{node_id}' missing edge for class '{cls_name}'",
)
)
execution_time = time.time() - start_time
success = len(errors) == 0
return GraphValidationResult(
success=success,
errors=errors,
warnings=warnings,
execution_time=execution_time,
stats={
"nodes": len(nodes),
"edges": len(edges_list),
"reachable_from_start": len(reachable_from_start),
"can_reach_end": len(can_reach_end),
"unreachable": len(unreachable_nodes),
"dead_ends": len(dead_end_nodes - unreachable_nodes),
},
)

View File

@@ -0,0 +1,113 @@
import logging
from core.workflow.generator.types import WorkflowDataDict
logger = logging.getLogger(__name__)
def generate_mermaid(workflow_data: WorkflowDataDict) -> str:
"""
Generate a Mermaid flowchart from workflow data consisting of nodes and edges.
Args:
workflow_data: Dict containing 'nodes' (list) and 'edges' (list)
Returns:
String containing the Mermaid flowchart syntax
"""
nodes = workflow_data.get("nodes", [])
edges = workflow_data.get("edges", [])
lines = ["flowchart TD"]
# 1. Define Nodes
# Format: node_id["title<br/>type"] or similar
# We will use the Vibe Workflow standard format: id["type=TYPE|title=TITLE"]
# Or specifically for tool nodes: id["type=tool|title=TITLE|tool=TOOL_KEY"]
# Map of original IDs to safe Mermaid IDs
id_map = {}
def get_safe_id(original_id: str) -> str:
if original_id == "end":
return "end_node"
if original_id == "subgraph":
return "subgraph_node"
# Mermaid IDs should be alphanumeric.
# If the ID has special chars, we might need to escape or hash, but Vibe usually generates simple IDs.
# We'll trust standard IDs but handle the reserved keyword 'end'.
return original_id
for node in nodes:
node_id = node.get("id")
if not node_id:
continue
safe_id = get_safe_id(node_id)
id_map[node_id] = safe_id
node_type = node.get("type", "unknown")
title = node.get("title", "Untitled")
# Escape quotes in title
safe_title = title.replace('"', "'")
if node_type == "tool":
config = node.get("config", {})
# Try multiple fields for tool reference
tool_ref = (
config.get("tool_key")
or config.get("tool")
or config.get("tool_name")
or node.get("tool_name")
or "unknown"
)
node_def = f'{safe_id}["type={node_type}|title={safe_title}|tool={tool_ref}"]'
else:
node_def = f'{safe_id}["type={node_type}|title={safe_title}"]'
lines.append(f" {node_def}")
# 2. Define Edges
# Format: source --> target
# Track defined nodes to avoid edge errors
defined_node_ids = {n.get("id") for n in nodes if n.get("id")}
for edge in edges:
source = edge.get("source")
target = edge.get("target")
# Skip invalid edges
if not source or not target:
continue
if source not in defined_node_ids or target not in defined_node_ids:
continue
safe_source = id_map.get(source, source)
safe_target = id_map.get(target, target)
# Handle conditional branches (true/false) if present
# In Dify workflow, sourceHandle is often used for this
source_handle = edge.get("sourceHandle")
label = ""
if source_handle == "true":
label = "|true|"
elif source_handle == "false":
label = "|false|"
elif source_handle and source_handle != "source":
# For question-classifier or other multi-path nodes
# Clean up handle for display if needed
safe_handle = str(source_handle).replace('"', "'")
label = f"|{safe_handle}|"
edge_line = f" {safe_source} -->{label} {safe_target}"
lines.append(edge_line)
# Start/End nodes are implicitly handled if they are in the 'nodes' list
# If not, we might need to add them, but usually the Builder should produce them.
result = "\n".join(lines)
return result

View File

@@ -0,0 +1,304 @@
"""
Node Repair Utility for Vibe Workflow Generation.
This module provides intelligent node configuration repair capabilities.
It can detect and fix common node configuration issues:
- Invalid comparison operators in if-else nodes (e.g. '>=' -> '')
"""
import copy
import logging
import uuid
from dataclasses import dataclass, field
from core.workflow.generator.types import WorkflowNodeDict
logger = logging.getLogger(__name__)
@dataclass
class NodeRepairResult:
"""Result of node repair operation."""
nodes: list[WorkflowNodeDict]
repairs_made: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
@property
def was_repaired(self) -> bool:
"""Check if any repairs were made."""
return len(self.repairs_made) > 0
class NodeRepair:
"""
Intelligent node configuration repair.
"""
OPERATOR_MAP = {
">=": "",
"<=": "",
"!=": "",
"==": "=",
}
TYPE_MAPPING = {
"json": "object",
"dict": "object",
"dictionary": "object",
"float": "number",
"int": "number",
"integer": "number",
"double": "number",
"str": "string",
"text": "string",
"bool": "boolean",
"list": "array[object]",
"array": "array[object]",
}
_REPAIR_HANDLERS = {
"if-else": "_repair_if_else_operators",
"variable-aggregator": "_repair_variable_aggregator_variables",
"code": "_repair_code_node_config",
}
@classmethod
def repair(
cls,
nodes: list[WorkflowNodeDict],
llm_callback=None,
) -> NodeRepairResult:
"""
Repair node configurations.
Args:
nodes: List of node dictionaries
llm_callback: Optional callback(node, issue_desc) -> fixed_config_part
Returns:
NodeRepairResult with repaired nodes and logs
"""
# Deep copy to avoid mutating original
nodes = copy.deepcopy(nodes)
repairs: list[str] = []
warnings: list[str] = []
logger.info("[NODE REPAIR] Starting repair process for %s nodes", len(nodes))
for node in nodes:
node_type = node.get("type")
# 1. Rule-based repairs
handler_name = cls._REPAIR_HANDLERS.get(node_type)
if handler_name:
handler = getattr(cls, handler_name)
# Check if handler accepts llm_callback (inspect signature or just pass generic kwargs?)
# Simplest for now: handlers signature: (node, repairs, llm_callback=None)
try:
handler(node, repairs, llm_callback=llm_callback)
except TypeError:
# Fallback for handlers that don't accept llm_callback yet
handler(node, repairs)
# Add other node type repairs here as needed
if repairs:
logger.info("[NODE REPAIR] Completed with %s repairs:", len(repairs))
for i, repair in enumerate(repairs, 1):
logger.info("[NODE REPAIR] %s. %s", i, repair)
else:
logger.info("[NODE REPAIR] Completed - no repairs needed")
return NodeRepairResult(
nodes=nodes,
repairs_made=repairs,
warnings=warnings,
)
@classmethod
def _repair_if_else_operators(cls, node: WorkflowNodeDict, repairs: list[str], **kwargs):
"""
Normalize comparison operators in if-else nodes.
And ensure 'id' field exists for cases and conditions (frontend requirement).
"""
node_id = node.get("id", "unknown")
config = node.get("config", {})
cases = config.get("cases", [])
for case in cases:
# Ensure case_id
if "case_id" not in case:
case["case_id"] = str(uuid.uuid4())
repairs.append(f"Generated missing case_id for case in node '{node_id}'")
conditions = case.get("conditions", [])
for condition in conditions:
# Ensure condition id
if "id" not in condition:
condition["id"] = str(uuid.uuid4())
# Not logging this repair to avoid clutter, as it's a structural fix
# Ensure value type (LLM might return int/float, but we need str/bool/list)
val = condition.get("value")
if isinstance(val, (int, float)) and not isinstance(val, bool):
condition["value"] = str(val)
repairs.append(f"Coerced numeric value to string in node '{node_id}'")
op = condition.get("comparison_operator")
if op in cls.OPERATOR_MAP:
new_op = cls.OPERATOR_MAP[op]
condition["comparison_operator"] = new_op
repairs.append(f"Normalized operator '{op}' to '{new_op}' in node '{node_id}'")
@classmethod
def _repair_variable_aggregator_variables(cls, node: WorkflowNodeDict, repairs: list[str]):
"""
Repair variable-aggregator variables format.
Converts dict format to list[list[str]] format.
Expected: [["node_id", "field"], ["node_id2", "field2"]]
May receive: [{"name": "...", "value_selector": ["node_id", "field"]}, ...]
"""
node_id = node.get("id", "unknown")
config = node.get("config", {})
variables = config.get("variables", [])
if not variables:
return
repaired = False
repaired_variables = []
for var in variables:
if isinstance(var, dict):
# Convert dict format to array format
value_selector = var.get("value_selector") or var.get("selector") or var.get("path")
if isinstance(value_selector, list) and len(value_selector) > 0:
repaired_variables.append(value_selector)
repaired = True
else:
# Try to extract from name field - LLM may generate {"name": "node_id.field"}
name = var.get("name")
if isinstance(name, str) and "." in name:
# Try to parse "node_id.field" format
parts = name.split(".", 1)
if len(parts) == 2:
repaired_variables.append([parts[0], parts[1]])
repaired = True
else:
logger.warning(
"Variable aggregator node '%s' has invalid variable format: %s",
node_id,
var,
)
repaired_variables.append([]) # Empty array as fallback
else:
# If no valid selector or name, skip this variable
logger.warning(
"Variable aggregator node '%s' has invalid variable format: %s",
node_id,
var,
)
# Don't add empty array - skip invalid variables
elif isinstance(var, list):
# Already in correct format
repaired_variables.append(var)
else:
# Unknown format, skip
logger.warning("Variable aggregator node '%s' has unknown variable format: %s", node_id, var)
# Don't add empty array - skip invalid variables
if repaired:
config["variables"] = repaired_variables
repairs.append(f"Repaired variable-aggregator variables format in node '{node_id}'")
@classmethod
def _repair_code_node_config(cls, node: WorkflowNodeDict, repairs: list[str], llm_callback=None):
"""
Repair code node configuration (outputs and variables).
1. Outputs: Converts list format to dict format AND normalizes types.
2. Variables: Ensures value_selector exists.
"""
node_id = node.get("id", "unknown")
config = node.get("config", {})
if "variables" not in config:
config["variables"] = []
# --- Repair Variables ---
variables = config.get("variables")
if isinstance(variables, list):
for var in variables:
if isinstance(var, dict):
# Ensure value_selector exists (frontend crashes if missing)
if "value_selector" not in var:
var["value_selector"] = []
# Not logging trivial repairs
# --- Repair Outputs ---
outputs = config.get("outputs")
if not outputs:
return
# Helper to normalize type
def normalize_type(t: str) -> str:
t_lower = str(t).lower()
return cls.TYPE_MAPPING.get(t_lower, t)
# 1. Handle Dict format (Standard) - Check for invalid types
if isinstance(outputs, dict):
changed = False
for var_name, var_config in outputs.items():
if isinstance(var_config, dict):
original_type = var_config.get("type")
if original_type:
new_type = normalize_type(original_type)
if new_type != original_type:
var_config["type"] = new_type
changed = True
repairs.append(
f"Normalized type '{original_type}' to '{new_type}' "
f"for var '{var_name}' in node '{node_id}'"
)
return
# 2. Handle List format (Repair needed)
if isinstance(outputs, list):
new_outputs = {}
for item in outputs:
if isinstance(item, dict):
var_name = item.get("variable") or item.get("name")
var_type = item.get("type")
if var_name and var_type:
norm_type = normalize_type(var_type)
new_outputs[var_name] = {"type": norm_type}
if norm_type != var_type:
repairs.append(
f"Normalized type '{var_type}' to '{norm_type}' "
f"during list conversion in node '{node_id}'"
)
if new_outputs:
config["outputs"] = new_outputs
repairs.append(f"Repaired code node outputs format in node '{node_id}'")
else:
# Fallback: Try LLM if available
if llm_callback:
try:
# Attempt to fix using LLM
fixed_outputs = llm_callback(
node,
"outputs must be a dictionary like {'var_name': {'type': 'string'}}, "
"but got a list or valid conversion failed.",
)
if isinstance(fixed_outputs, dict) and fixed_outputs:
config["outputs"] = fixed_outputs
repairs.append(f"Repaired code node outputs format using LLM in node '{node_id}'")
return
except Exception as e:
logger.warning("LLM fallback repair failed for node '%s': %s", node_id, e)
# If conversion/LLM failed, set to empty dict
config["outputs"] = {}
repairs.append(f"Reset invalid code node outputs to empty dict in node '{node_id}'")

View File

@@ -0,0 +1,101 @@
from dataclasses import dataclass
from core.workflow.generator.types import AvailableModelDict, AvailableToolDict, WorkflowDataDict
from core.workflow.generator.validation.context import ValidationContext
from core.workflow.generator.validation.engine import ValidationEngine
from core.workflow.generator.validation.rules import Severity
@dataclass
class ValidationHint:
"""Legacy compatibility class for validation hints."""
node_id: str
field: str
message: str
severity: str # 'error', 'warning'
suggestion: str = None
node_type: str = None # Added for test compatibility
# Alias for potential old code using 'type' instead of 'severity'
@property
def type(self) -> str:
return self.severity
@property
def element_id(self) -> str:
return self.node_id
FriendlyHint = ValidationHint # Alias for backward compatibility
class WorkflowValidator:
"""
Validates the generated workflow configuration (nodes and edges).
Wraps the new ValidationEngine for backward compatibility.
"""
@classmethod
def validate(
cls,
workflow_data: WorkflowDataDict,
available_tools: list[AvailableToolDict],
available_models: list[AvailableModelDict] | None = None,
) -> tuple[bool, list[ValidationHint]]:
"""
Validate workflow data and return validity status and hints.
Args:
workflow_data: Dict containing 'nodes' and 'edges'
available_tools: List of available tool configurations
available_models: List of available models (added for Vibe compat)
Returns:
Tuple(max_severity_is_not_error, list_of_hints)
"""
nodes = workflow_data.get("nodes", [])
edges = workflow_data.get("edges", [])
# Create context
context = ValidationContext(
nodes=nodes,
edges=edges,
available_models=available_models or [],
available_tools=available_tools or [],
)
# Run validation engine
engine = ValidationEngine()
result = engine.validate(context)
# Convert engine errors to legacy hints
hints: list[ValidationHint] = []
error_count = 0
warning_count = 0
for error in result.all_errors:
# Map severity
severity = "error" if error.severity == Severity.ERROR else "warning"
if severity == "error":
error_count += 1
else:
warning_count += 1
# Map field from message or details if possible (heuristic)
field_name = error.details.get("field", "unknown")
hints.append(
ValidationHint(
node_id=error.node_id,
field=field_name,
message=error.message,
severity=severity,
suggestion=error.fix_hint,
node_type=error.node_type,
)
)
return result.is_valid, hints

View File

@@ -0,0 +1,42 @@
"""
Validation Rule Engine for Vibe Workflow Generation.
This module provides a declarative, schema-based validation system for
generated workflow nodes. It classifies errors into fixable (LLM can auto-fix)
and user-required (needs manual intervention) categories.
Usage:
from core.workflow.generator.validation import ValidationEngine, ValidationContext
context = ValidationContext(
available_models=[...],
available_tools=[...],
nodes=[...],
edges=[...],
)
engine = ValidationEngine()
result = engine.validate(context)
# Access classified errors
fixable_errors = result.fixable_errors
user_required_errors = result.user_required_errors
"""
from core.workflow.generator.validation.context import ValidationContext
from core.workflow.generator.validation.engine import ValidationEngine, ValidationResult
from core.workflow.generator.validation.rules import (
RuleCategory,
Severity,
ValidationError,
ValidationRule,
)
__all__ = [
"RuleCategory",
"Severity",
"ValidationContext",
"ValidationEngine",
"ValidationError",
"ValidationResult",
"ValidationRule",
]

View File

@@ -0,0 +1,115 @@
"""
Validation Context for the Rule Engine.
The ValidationContext holds all the data needed for validation:
- Generated nodes and edges
- Available models, tools, and datasets
- Node output schemas for variable reference validation
"""
from dataclasses import dataclass, field
from core.workflow.generator.types import (
AvailableModelDict,
AvailableToolDict,
WorkflowEdgeDict,
WorkflowNodeDict,
)
@dataclass
class ValidationContext:
"""
Context object containing all data needed for validation.
This is passed to each validation rule, providing access to:
- The nodes being validated
- Edge connections between nodes
- Available external resources (models, tools)
"""
# Generated workflow data
nodes: list[WorkflowNodeDict] = field(default_factory=list)
edges: list[WorkflowEdgeDict] = field(default_factory=list)
# Available external resources
available_models: list[AvailableModelDict] = field(default_factory=list)
available_tools: list[AvailableToolDict] = field(default_factory=list)
# Cached lookups (populated lazily)
_node_map: dict[str, WorkflowNodeDict] | None = field(default=None, repr=False)
_model_set: set[tuple[str, str]] | None = field(default=None, repr=False)
_tool_set: set[str] | None = field(default=None, repr=False)
_configured_tool_set: set[str] | None = field(default=None, repr=False)
@property
def node_map(self) -> dict[str, WorkflowNodeDict]:
"""Get a map of node_id -> node for quick lookup."""
if self._node_map is None:
self._node_map = {node.get("id", ""): node for node in self.nodes}
return self._node_map
@property
def model_set(self) -> set[tuple[str, str]]:
"""Get a set of (provider, model_name) tuples for quick lookup."""
if self._model_set is None:
self._model_set = {(m.get("provider", ""), m.get("model", "")) for m in self.available_models}
return self._model_set
@property
def tool_set(self) -> set[str]:
"""Get a set of all tool keys (both configured and unconfigured)."""
if self._tool_set is None:
self._tool_set = set()
for tool in self.available_tools:
provider = tool.get("provider_id") or tool.get("provider", "")
tool_key = tool.get("tool_key") or tool.get("tool_name", "")
if provider and tool_key:
self._tool_set.add(f"{provider}/{tool_key}")
if tool_key:
self._tool_set.add(tool_key)
return self._tool_set
@property
def configured_tool_set(self) -> set[str]:
"""Get a set of configured (authorized) tool keys."""
if self._configured_tool_set is None:
self._configured_tool_set = set()
for tool in self.available_tools:
if not tool.get("is_team_authorization", False):
continue
provider = tool.get("provider_id") or tool.get("provider", "")
tool_key = tool.get("tool_key") or tool.get("tool_name", "")
if provider and tool_key:
self._configured_tool_set.add(f"{provider}/{tool_key}")
if tool_key:
self._configured_tool_set.add(tool_key)
return self._configured_tool_set
def has_model(self, provider: str, model_name: str) -> bool:
"""Check if a model is available."""
return (provider, model_name) in self.model_set
def has_tool(self, tool_key: str) -> bool:
"""Check if a tool exists (configured or not)."""
return tool_key in self.tool_set
def is_tool_configured(self, tool_key: str) -> bool:
"""Check if a tool is configured and ready to use."""
return tool_key in self.configured_tool_set
def get_node(self, node_id: str) -> WorkflowNodeDict | None:
"""Get a node by its ID."""
return self.node_map.get(node_id)
def get_node_ids(self) -> set[str]:
"""Get all node IDs in the workflow."""
return set(self.node_map.keys())
def get_upstream_nodes(self, node_id: str) -> list[str]:
"""Get IDs of nodes that connect to this node (upstream)."""
return [edge.get("source", "") for edge in self.edges if edge.get("target") == node_id]
def get_downstream_nodes(self, node_id: str) -> list[str]:
"""Get IDs of nodes that this node connects to (downstream)."""
return [edge.get("target", "") for edge in self.edges if edge.get("source") == node_id]

View File

@@ -0,0 +1,260 @@
"""
Validation Engine - Core validation logic.
The ValidationEngine orchestrates rule execution and aggregates results.
It provides a clean interface for validating workflow nodes.
"""
import logging
from dataclasses import dataclass, field
from typing import Any
from core.workflow.generator.types import (
AvailableModelDict,
AvailableToolDict,
WorkflowEdgeDict,
WorkflowNodeDict,
)
from core.workflow.generator.validation.context import ValidationContext
from core.workflow.generator.validation.rules import (
RuleCategory,
Severity,
ValidationError,
get_registry,
)
logger = logging.getLogger(__name__)
@dataclass
class ValidationResult:
"""
Result of validation containing all errors classified by fixability.
Attributes:
all_errors: All validation errors found
fixable_errors: Errors that LLM can automatically fix
user_required_errors: Errors that require user intervention
warnings: Non-blocking warnings
stats: Validation statistics
"""
all_errors: list[ValidationError] = field(default_factory=list)
fixable_errors: list[ValidationError] = field(default_factory=list)
user_required_errors: list[ValidationError] = field(default_factory=list)
warnings: list[ValidationError] = field(default_factory=list)
stats: dict[str, int] = field(default_factory=dict)
@property
def has_errors(self) -> bool:
"""Check if there are any errors (excluding warnings)."""
return len(self.fixable_errors) > 0 or len(self.user_required_errors) > 0
@property
def has_fixable_errors(self) -> bool:
"""Check if there are fixable errors."""
return len(self.fixable_errors) > 0
@property
def is_valid(self) -> bool:
"""Check if validation passed (no errors, warnings are OK)."""
return not self.has_errors
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for API response."""
return {
"fixable": [e.to_dict() for e in self.fixable_errors],
"user_required": [e.to_dict() for e in self.user_required_errors],
"warnings": [e.to_dict() for e in self.warnings],
"all_warnings": [e.message for e in self.all_errors],
"stats": self.stats,
}
def get_error_messages(self) -> list[str]:
"""Get all error messages as strings."""
return [e.message for e in self.all_errors]
def get_fixable_by_node(self) -> dict[str, list[ValidationError]]:
"""Group fixable errors by node ID."""
result: dict[str, list[ValidationError]] = {}
for error in self.fixable_errors:
if error.node_id not in result:
result[error.node_id] = []
result[error.node_id].append(error)
return result
class ValidationEngine:
"""
The main validation engine.
Usage:
engine = ValidationEngine()
context = ValidationContext(nodes=[...], available_models=[...])
result = engine.validate(context)
"""
def __init__(self):
self._registry = get_registry()
def validate(self, context: ValidationContext) -> ValidationResult:
"""
Validate all nodes in the context.
Args:
context: ValidationContext with nodes, edges, and available resources
Returns:
ValidationResult with classified errors
"""
result = ValidationResult()
stats = {
"total_nodes": len(context.nodes),
"total_rules_checked": 0,
"total_errors": 0,
"fixable_count": 0,
"user_required_count": 0,
"warning_count": 0,
}
# Validate each node
for node in context.nodes:
node_type = node.get("type", "unknown")
node_id = node.get("id", "unknown")
# Get applicable rules for this node type
rules = self._registry.get_rules_for_node(node_type)
for rule in rules:
stats["total_rules_checked"] += 1
try:
errors = rule.check(node, context)
for error in errors:
result.all_errors.append(error)
stats["total_errors"] += 1
# Classify by severity and fixability
if error.severity == Severity.WARNING:
result.warnings.append(error)
stats["warning_count"] += 1
elif error.is_fixable:
result.fixable_errors.append(error)
stats["fixable_count"] += 1
else:
result.user_required_errors.append(error)
stats["user_required_count"] += 1
except Exception:
logger.exception(
"Rule '%s' failed for node '%s'",
rule.id,
node_id,
)
# Don't let a rule failure break the entire validation
continue
# Validate edges separately
edge_errors = self._validate_edges(context)
for error in edge_errors:
result.all_errors.append(error)
stats["total_errors"] += 1
if error.is_fixable:
result.fixable_errors.append(error)
stats["fixable_count"] += 1
else:
result.user_required_errors.append(error)
stats["user_required_count"] += 1
result.stats = stats
return result
def _validate_edges(self, context: ValidationContext) -> list[ValidationError]:
"""Validate edge connections."""
errors: list[ValidationError] = []
valid_node_ids = context.get_node_ids()
for edge in context.edges:
source = edge.get("source", "")
target = edge.get("target", "")
if source and source not in valid_node_ids:
errors.append(
ValidationError(
rule_id="edge.source.invalid",
node_id=source,
node_type="edge",
category=RuleCategory.SEMANTIC,
severity=Severity.ERROR,
is_fixable=True,
message=f"Edge source '{source}' does not exist",
fix_hint="Update edge to reference existing node",
)
)
if target and target not in valid_node_ids:
errors.append(
ValidationError(
rule_id="edge.target.invalid",
node_id=target,
node_type="edge",
category=RuleCategory.SEMANTIC,
severity=Severity.ERROR,
is_fixable=True,
message=f"Edge target '{target}' does not exist",
fix_hint="Update edge to reference existing node",
)
)
return errors
def validate_single_node(
self,
node: WorkflowNodeDict,
context: ValidationContext,
) -> list[ValidationError]:
"""
Validate a single node.
Useful for incremental validation when a node is added/modified.
"""
node_type = node.get("type", "unknown")
rules = self._registry.get_rules_for_node(node_type)
errors: list[ValidationError] = []
for rule in rules:
try:
errors.extend(rule.check(node, context))
except Exception:
logger.exception("Rule '%s' failed", rule.id)
return errors
def validate_nodes(
nodes: list[WorkflowNodeDict],
edges: list[WorkflowEdgeDict] | None = None,
available_models: list[AvailableModelDict] | None = None,
available_tools: list[AvailableToolDict] | None = None,
) -> ValidationResult:
"""
Convenience function to validate nodes without creating engine/context manually.
Args:
nodes: List of workflow nodes to validate
edges: Optional list of edges
available_models: Optional list of available models
available_tools: Optional list of available tools
Returns:
ValidationResult with classified errors
"""
context = ValidationContext(
nodes=nodes,
edges=edges or [],
available_models=available_models or [],
available_tools=available_tools or [],
)
engine = ValidationEngine()
return engine.validate(context)

View File

@@ -0,0 +1,947 @@
"""
Validation Rules Definition and Registry.
This module defines:
- ValidationRule: The rule structure
- RuleCategory: Categories of validation rules
- Severity: Error severity levels
- ValidationError: Error output structure
- All built-in validation rules
"""
import re
from collections.abc import Callable
from dataclasses import dataclass, field
from enum import Enum
from typing import TYPE_CHECKING, Any
from core.workflow.generator.types import WorkflowNodeDict
if TYPE_CHECKING:
from core.workflow.generator.validation.context import ValidationContext
class RuleCategory(Enum):
"""Categories of validation rules."""
STRUCTURE = "structure" # Field existence, types, formats
SEMANTIC = "semantic" # Variable references, edge connections
REFERENCE = "reference" # External resources (models, tools, datasets)
class Severity(Enum):
"""Severity levels for validation errors."""
ERROR = "error" # Must be fixed
WARNING = "warning" # Should be fixed but not blocking
@dataclass
class ValidationError:
"""
Represents a validation error found during rule execution.
Attributes:
rule_id: The ID of the rule that generated this error
node_id: The ID of the node with the error
node_type: The type of the node
category: The rule category
severity: Error severity
is_fixable: Whether LLM can auto-fix this error
message: Human-readable error message
fix_hint: Hint for LLM to fix the error
details: Additional error details
"""
rule_id: str
node_id: str
node_type: str
category: RuleCategory
severity: Severity
is_fixable: bool
message: str
fix_hint: str = ""
details: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for API response."""
return {
"rule_id": self.rule_id,
"node_id": self.node_id,
"node_type": self.node_type,
"category": self.category.value,
"severity": self.severity.value,
"is_fixable": self.is_fixable,
"message": self.message,
"fix_hint": self.fix_hint,
"details": self.details,
}
# Type alias for rule check functions
RuleCheckFn = Callable[
[WorkflowNodeDict, "ValidationContext"],
list[ValidationError],
]
@dataclass
class ValidationRule:
"""
A validation rule definition.
Attributes:
id: Unique rule identifier (e.g., "llm.model.required")
node_types: List of node types this rule applies to, or ["*"] for all
category: The rule category
severity: Default severity for errors from this rule
is_fixable: Whether errors from this rule can be auto-fixed by LLM
check: The validation function
description: Human-readable description of what this rule checks
fix_hint: Default hint for fixing errors from this rule
"""
id: str
node_types: list[str]
category: RuleCategory
severity: Severity
is_fixable: bool
check: RuleCheckFn
description: str = ""
fix_hint: str = ""
def applies_to(self, node_type: str) -> bool:
"""Check if this rule applies to a given node type."""
return "*" in self.node_types or node_type in self.node_types
# =============================================================================
# Rule Registry
# =============================================================================
class RuleRegistry:
"""
Registry for validation rules.
Rules are registered here and can be retrieved by category or node type.
"""
def __init__(self):
self._rules: list[ValidationRule] = []
def register(self, rule: ValidationRule) -> None:
"""Register a validation rule."""
self._rules.append(rule)
def get_rules_for_node(self, node_type: str) -> list[ValidationRule]:
"""Get all rules that apply to a given node type."""
return [r for r in self._rules if r.applies_to(node_type)]
def get_rules_by_category(self, category: RuleCategory) -> list[ValidationRule]:
"""Get all rules in a given category."""
return [r for r in self._rules if r.category == category]
def get_all_rules(self) -> list[ValidationRule]:
"""Get all registered rules."""
return list(self._rules)
# Global rule registry instance
_registry = RuleRegistry()
def register_rule(rule: ValidationRule) -> ValidationRule:
"""Decorator/function to register a rule with the global registry."""
_registry.register(rule)
return rule
def get_registry() -> RuleRegistry:
"""Get the global rule registry."""
return _registry
# =============================================================================
# Helper Functions for Rule Implementations
# =============================================================================
# Explicit placeholder value defined in prompt contract
# See: api/core/workflow/generator/prompts/vibe_prompts.py
PLACEHOLDER_VALUE = "__PLACEHOLDER__"
# Variable reference pattern: {{#node_id.field#}}
VARIABLE_REF_PATTERN = re.compile(r"\{\{#([^.#]+)\.([^#]+)#\}\}")
def is_placeholder(value: Any) -> bool:
"""Check if a value appears to be a placeholder."""
if not isinstance(value, str):
return False
return value == PLACEHOLDER_VALUE or PLACEHOLDER_VALUE in value
def extract_variable_refs(text: str) -> list[tuple[str, str]]:
"""
Extract variable references from text.
Returns list of (node_id, field_name) tuples.
"""
return VARIABLE_REF_PATTERN.findall(text)
def check_required_field(
config: dict[str, Any],
field_name: str,
node_id: str,
node_type: str,
rule_id: str,
fix_hint: str = "",
) -> ValidationError | None:
"""Helper to check if a required field exists and is non-empty."""
value = config.get(field_name)
if value is None or value == "" or (isinstance(value, list) and len(value) == 0):
return ValidationError(
rule_id=rule_id,
node_id=node_id,
node_type=node_type,
category=RuleCategory.STRUCTURE,
severity=Severity.ERROR,
is_fixable=True,
message=f"Node '{node_id}': missing required field '{field_name}'",
fix_hint=fix_hint or f"Add '{field_name}' to the node config",
)
return None
# =============================================================================
# Structure Rules - Field existence, types, formats
# =============================================================================
def _check_llm_prompt_template(node: WorkflowNodeDict, ctx: "ValidationContext") -> list[ValidationError]:
"""Check that LLM node has prompt_template."""
errors: list[ValidationError] = []
node_id = node.get("id", "unknown")
config = node.get("config", {})
err = check_required_field(
config,
"prompt_template",
node_id,
"llm",
"llm.prompt_template.required",
"Add prompt_template with system and user messages",
)
if err:
errors.append(err)
return errors
def _check_http_request_url(node: WorkflowNodeDict, ctx: "ValidationContext") -> list[ValidationError]:
"""Check that http-request node has url and method."""
errors: list[ValidationError] = []
node_id = node.get("id", "unknown")
config = node.get("config", {})
# Check url
url = config.get("url", "")
if not url:
errors.append(
ValidationError(
rule_id="http.url.required",
node_id=node_id,
node_type="http-request",
category=RuleCategory.STRUCTURE,
severity=Severity.ERROR,
is_fixable=True,
message=f"Node '{node_id}': http-request missing required 'url'",
fix_hint="Add url - use {{#start.url#}} or a concrete URL",
)
)
elif is_placeholder(url):
errors.append(
ValidationError(
rule_id="http.url.placeholder",
node_id=node_id,
node_type="http-request",
category=RuleCategory.STRUCTURE,
severity=Severity.ERROR,
is_fixable=True,
message=f"Node '{node_id}': url contains placeholder value",
fix_hint="Replace placeholder with actual URL or variable reference",
)
)
# Check method
method = config.get("method", "")
if not method:
errors.append(
ValidationError(
rule_id="http.method.required",
node_id=node_id,
node_type="http-request",
category=RuleCategory.STRUCTURE,
severity=Severity.ERROR,
is_fixable=True,
message=f"Node '{node_id}': http-request missing 'method'",
fix_hint="Add method: GET, POST, PUT, DELETE, or PATCH",
)
)
return errors
def _check_code_node(node: WorkflowNodeDict, ctx: "ValidationContext") -> list[ValidationError]:
"""Check that code node has code and language."""
errors: list[ValidationError] = []
node_id = node.get("id", "unknown")
config = node.get("config", {})
err = check_required_field(
config,
"code",
node_id,
"code",
"code.code.required",
"Add code with a main() function that returns a dict",
)
if err:
errors.append(err)
err = check_required_field(
config,
"language",
node_id,
"code",
"code.language.required",
"Add language: python3 or javascript",
)
if err:
errors.append(err)
return errors
def _check_question_classifier(node: WorkflowNodeDict, ctx: "ValidationContext") -> list[ValidationError]:
"""Check that question-classifier has classes."""
errors: list[ValidationError] = []
node_id = node.get("id", "unknown")
config = node.get("config", {})
err = check_required_field(
config,
"classes",
node_id,
"question-classifier",
"classifier.classes.required",
"Add classes array with id and name for each classification",
)
if err:
errors.append(err)
return errors
def _check_parameter_extractor(node: WorkflowNodeDict, ctx: "ValidationContext") -> list[ValidationError]:
"""Check that parameter-extractor has parameters and instruction."""
errors: list[ValidationError] = []
node_id = node.get("id", "unknown")
config = node.get("config", {})
err = check_required_field(
config,
"parameters",
node_id,
"parameter-extractor",
"extractor.parameters.required",
"Add parameters array with name, type, description fields",
)
if err:
errors.append(err)
else:
# Check individual parameters for required fields
parameters = config.get("parameters", [])
if isinstance(parameters, list):
for i, param in enumerate(parameters):
if isinstance(param, dict):
# Check for 'required' field (boolean)
if "required" not in param:
errors.append(
ValidationError(
rule_id="extractor.param.required_field.missing",
node_id=node_id,
node_type="parameter-extractor",
category=RuleCategory.STRUCTURE,
severity=Severity.ERROR,
is_fixable=True,
message=f"Node '{node_id}': parameter[{i}] missing 'required' field",
fix_hint=f"Add 'required': True to parameter '{param.get('name', 'unknown')}'",
details={"param_index": i, "param_name": param.get("name")},
)
)
# instruction is recommended but not strictly required
if not config.get("instruction"):
errors.append(
ValidationError(
rule_id="extractor.instruction.recommended",
node_id=node_id,
node_type="parameter-extractor",
category=RuleCategory.STRUCTURE,
severity=Severity.WARNING,
is_fixable=True,
message=f"Node '{node_id}': parameter-extractor should have 'instruction'",
fix_hint="Add instruction describing what to extract",
)
)
return errors
def _check_knowledge_retrieval(node: WorkflowNodeDict, ctx: "ValidationContext") -> list[ValidationError]:
"""Check that knowledge-retrieval has dataset_ids."""
errors: list[ValidationError] = []
node_id = node.get("id", "unknown")
config = node.get("config", {})
dataset_ids = config.get("dataset_ids", [])
if not dataset_ids:
errors.append(
ValidationError(
rule_id="knowledge.dataset.required",
node_id=node_id,
node_type="knowledge-retrieval",
category=RuleCategory.STRUCTURE,
severity=Severity.ERROR,
is_fixable=False, # User must select knowledge base
message=f"Node '{node_id}': knowledge-retrieval missing 'dataset_ids'",
fix_hint="User must select knowledge bases in the UI",
)
)
else:
# Check for placeholder values
for ds_id in dataset_ids:
if is_placeholder(ds_id):
errors.append(
ValidationError(
rule_id="knowledge.dataset.placeholder",
node_id=node_id,
node_type="knowledge-retrieval",
category=RuleCategory.STRUCTURE,
severity=Severity.ERROR,
is_fixable=False,
message=f"Node '{node_id}': dataset_ids contains placeholder",
fix_hint="User must replace placeholder with actual knowledge base ID",
details={"placeholder_value": ds_id},
)
)
break
return errors
def _check_end_node(node: WorkflowNodeDict, ctx: "ValidationContext") -> list[ValidationError]:
"""Check that end node has outputs defined."""
errors: list[ValidationError] = []
node_id = node.get("id", "unknown")
config = node.get("config", {})
outputs = config.get("outputs", [])
if not outputs:
errors.append(
ValidationError(
rule_id="end.outputs.recommended",
node_id=node_id,
node_type="end",
category=RuleCategory.STRUCTURE,
severity=Severity.WARNING,
is_fixable=True,
message="End node should define output variables",
fix_hint="Add outputs array with variable and value_selector",
)
)
return errors
# =============================================================================
# Semantic Rules - Variable references, edge connections
# =============================================================================
def _check_variable_references(node: WorkflowNodeDict, ctx: "ValidationContext") -> list[ValidationError]:
"""Check that variable references point to valid nodes."""
errors: list[ValidationError] = []
node_id = node.get("id", "unknown")
node_type = node.get("type", "unknown")
config = node.get("config", {})
# Get all valid node IDs (including 'start' which is always valid)
valid_node_ids = ctx.get_node_ids()
valid_node_ids.add("start")
valid_node_ids.add("sys") # System variables
def check_text_for_refs(text: str, field_path: str) -> None:
if not isinstance(text, str):
return
refs = extract_variable_refs(text)
for ref_node_id, ref_field in refs:
if ref_node_id not in valid_node_ids:
errors.append(
ValidationError(
rule_id="variable.ref.invalid_node",
node_id=node_id,
node_type=node_type,
category=RuleCategory.SEMANTIC,
severity=Severity.ERROR,
is_fixable=True,
message=f"Node '{node_id}': references non-existent node '{ref_node_id}'",
fix_hint=f"Change {{{{#{ref_node_id}.{ref_field}#}}}} to reference a valid node",
details={"field_path": field_path, "invalid_ref": ref_node_id},
)
)
# Check prompt_template for LLM nodes
prompt_template = config.get("prompt_template", [])
if isinstance(prompt_template, list):
for i, msg in enumerate(prompt_template):
if isinstance(msg, dict):
text = msg.get("text", "")
check_text_for_refs(text, f"prompt_template[{i}].text")
# Check instruction field
instruction = config.get("instruction", "")
check_text_for_refs(instruction, "instruction")
# Check url for http-request
url = config.get("url", "")
check_text_for_refs(url, "url")
return errors
# NOTE: _check_node_has_outgoing_edge removed - handled by GraphValidator
# NOTE: _check_node_has_incoming_edge removed - handled by GraphValidator
# NOTE: _check_question_classifier_branches removed - handled by EdgeRepair
# NOTE: _check_if_else_branches removed - handled by EdgeRepair
def _check_if_else_operators(node: WorkflowNodeDict, ctx: "ValidationContext") -> list[ValidationError]:
"""Check that if-else comparison operators are valid."""
errors: list[ValidationError] = []
node_id = node.get("id", "unknown")
node_type = node.get("type", "unknown")
if node_type != "if-else":
return errors
valid_operators = {
"contains",
"not contains",
"start with",
"end with",
"is",
"is not",
"empty",
"not empty",
"in",
"not in",
"all of",
"=",
"",
">",
"<",
"",
"",
"null",
"not null",
"exists",
"not exists",
}
config = node.get("config", {})
cases = config.get("cases", [])
for case in cases:
conditions = case.get("conditions", [])
for condition in conditions:
op = condition.get("comparison_operator")
if op and op not in valid_operators:
errors.append(
ValidationError(
rule_id="ifelse.operator.invalid",
node_id=node_id,
node_type=node_type,
category=RuleCategory.SEMANTIC,
severity=Severity.ERROR,
is_fixable=True,
message=f"Invalid operator '{op}' in if-else node",
fix_hint=f"Use one of: {', '.join(sorted(valid_operators))}",
details={"invalid_operator": op, "field": "config.cases.conditions.comparison_operator"},
)
)
return errors
def _check_edge_targets_exist(node: WorkflowNodeDict, ctx: "ValidationContext") -> list[ValidationError]:
"""Check that edge targets reference existing nodes."""
errors: list[ValidationError] = []
node_id = node.get("id", "unknown")
node_type = node.get("type", "unknown")
valid_node_ids = ctx.get_node_ids()
# Check all outgoing edges from this node
for edge in ctx.edges:
if edge.get("source") == node_id:
target = edge.get("target")
if target and target not in valid_node_ids:
errors.append(
ValidationError(
rule_id="edge.target.invalid",
node_id=node_id,
node_type=node_type,
category=RuleCategory.SEMANTIC,
severity=Severity.ERROR,
is_fixable=True,
message=f"Edge from '{node_id}' targets non-existent node '{target}'",
fix_hint=f"Change edge target from '{target}' to an existing node",
details={"invalid_target": target, "field": "edges"},
)
)
return errors
# =============================================================================
# Reference Rules - External resources (models, tools, datasets)
# =============================================================================
# Node types that require model configuration
MODEL_REQUIRED_NODE_TYPES = {"llm", "question-classifier", "parameter-extractor"}
def _check_model_config(node: WorkflowNodeDict, ctx: "ValidationContext") -> list[ValidationError]:
"""Check that model configuration is valid."""
errors: list[ValidationError] = []
node_id = node.get("id", "unknown")
node_type = node.get("type", "unknown")
config = node.get("config", {})
if node_type not in MODEL_REQUIRED_NODE_TYPES:
return errors
model = config.get("model")
# Check if model config exists
if not model:
if ctx.available_models:
errors.append(
ValidationError(
rule_id="model.required",
node_id=node_id,
node_type=node_type,
category=RuleCategory.REFERENCE,
severity=Severity.ERROR,
is_fixable=True,
message=f"Node '{node_id}' ({node_type}): missing required 'model' configuration",
fix_hint="Add model config using one of the available models",
)
)
else:
errors.append(
ValidationError(
rule_id="model.no_available",
node_id=node_id,
node_type=node_type,
category=RuleCategory.REFERENCE,
severity=Severity.ERROR,
is_fixable=False,
message=f"Node '{node_id}' ({node_type}): needs model but no models available",
fix_hint="User must configure a model provider first",
)
)
return errors
# Check if model config is valid
if isinstance(model, dict):
provider = model.get("provider", "")
name = model.get("name", "")
# Check for placeholder values
if is_placeholder(provider) or is_placeholder(name):
if ctx.available_models:
errors.append(
ValidationError(
rule_id="model.placeholder",
node_id=node_id,
node_type=node_type,
category=RuleCategory.REFERENCE,
severity=Severity.ERROR,
is_fixable=True,
message=f"Node '{node_id}': model config contains placeholder",
fix_hint="Replace placeholder with actual model from available_models",
)
)
return errors
# Check if model exists in available_models
if ctx.available_models and provider and name:
if not ctx.has_model(provider, name):
errors.append(
ValidationError(
rule_id="model.not_found",
node_id=node_id,
node_type=node_type,
category=RuleCategory.REFERENCE,
severity=Severity.ERROR,
is_fixable=True,
message=f"Node '{node_id}': model '{provider}/{name}' not in available models",
fix_hint="Replace with a model from available_models",
details={"provider": provider, "model": name},
)
)
return errors
def _check_tool_reference(node: WorkflowNodeDict, ctx: "ValidationContext") -> list[ValidationError]:
"""Check that tool references are valid and configured."""
errors: list[ValidationError] = []
node_id = node.get("id", "unknown")
node_type = node.get("type", "unknown")
if node_type != "tool":
return errors
config = node.get("config", {})
tool_ref = (
config.get("tool_key")
or config.get("tool_name")
or config.get("provider_id", "") + "/" + config.get("tool_name", "")
)
if not tool_ref:
errors.append(
ValidationError(
rule_id="tool.key.required",
node_id=node_id,
node_type=node_type,
category=RuleCategory.REFERENCE,
severity=Severity.ERROR,
is_fixable=True,
message=f"Node '{node_id}': tool node missing tool_key",
fix_hint="Add tool_key from available_tools",
)
)
return errors
# Check if tool exists
if not ctx.has_tool(tool_ref):
errors.append(
ValidationError(
rule_id="tool.not_found",
node_id=node_id,
node_type=node_type,
category=RuleCategory.REFERENCE,
severity=Severity.ERROR,
is_fixable=True, # Can be replaced with http-request fallback
message=f"Node '{node_id}': tool '{tool_ref}' not found",
fix_hint="Use http-request or code node as fallback",
details={"tool_ref": tool_ref},
)
)
elif not ctx.is_tool_configured(tool_ref):
errors.append(
ValidationError(
rule_id="tool.not_configured",
node_id=node_id,
node_type=node_type,
category=RuleCategory.REFERENCE,
severity=Severity.WARNING,
is_fixable=False, # User needs to configure
message=f"Node '{node_id}': tool '{tool_ref}' requires configuration",
fix_hint="Configure the tool in Tools settings",
details={"tool_ref": tool_ref},
)
)
return errors
# =============================================================================
# Register All Rules
# =============================================================================
# Structure Rules
register_rule(
ValidationRule(
id="llm.prompt_template.required",
node_types=["llm"],
category=RuleCategory.STRUCTURE,
severity=Severity.ERROR,
is_fixable=True,
check=_check_llm_prompt_template,
description="LLM node must have prompt_template",
fix_hint="Add prompt_template with system and user messages",
)
)
register_rule(
ValidationRule(
id="http.config.required",
node_types=["http-request"],
category=RuleCategory.STRUCTURE,
severity=Severity.ERROR,
is_fixable=True,
check=_check_http_request_url,
description="HTTP request node must have url and method",
fix_hint="Add url and method to config",
)
)
register_rule(
ValidationRule(
id="code.config.required",
node_types=["code"],
category=RuleCategory.STRUCTURE,
severity=Severity.ERROR,
is_fixable=True,
check=_check_code_node,
description="Code node must have code and language",
fix_hint="Add code with main() function and language",
)
)
register_rule(
ValidationRule(
id="classifier.classes.required",
node_types=["question-classifier"],
category=RuleCategory.STRUCTURE,
severity=Severity.ERROR,
is_fixable=True,
check=_check_question_classifier,
description="Question classifier must have classes",
fix_hint="Add classes array with classification options",
)
)
register_rule(
ValidationRule(
id="extractor.config.required",
node_types=["parameter-extractor"],
category=RuleCategory.STRUCTURE,
severity=Severity.ERROR,
is_fixable=True,
check=_check_parameter_extractor,
description="Parameter extractor must have parameters",
fix_hint="Add parameters array",
)
)
register_rule(
ValidationRule(
id="knowledge.config.required",
node_types=["knowledge-retrieval"],
category=RuleCategory.STRUCTURE,
severity=Severity.ERROR,
is_fixable=False,
check=_check_knowledge_retrieval,
description="Knowledge retrieval must have dataset_ids",
fix_hint="User must select knowledge base",
)
)
register_rule(
ValidationRule(
id="end.outputs.check",
node_types=["end"],
category=RuleCategory.STRUCTURE,
severity=Severity.WARNING,
is_fixable=True,
check=_check_end_node,
description="End node should have outputs",
fix_hint="Add outputs array",
)
)
# Semantic Rules
register_rule(
ValidationRule(
id="variable.references.valid",
node_types=["*"],
category=RuleCategory.SEMANTIC,
severity=Severity.ERROR,
is_fixable=True,
check=_check_variable_references,
description="Variable references must point to valid nodes",
fix_hint="Fix variable reference to use valid node ID",
)
)
# Edge Validation Rules
# NOTE: Edge connectivity and branch completeness are now handled by:
# - GraphValidator (BFS-based reachability analysis)
# - EdgeRepair (automatic branch edge repair)
register_rule(
ValidationRule(
id="edge.targets.valid",
node_types=["*"],
category=RuleCategory.SEMANTIC,
severity=Severity.ERROR,
is_fixable=True,
check=_check_edge_targets_exist,
description="Edge targets must reference existing nodes",
fix_hint="Change edge target to an existing node ID",
)
)
# Reference Rules
register_rule(
ValidationRule(
id="model.config.valid",
node_types=["llm", "question-classifier", "parameter-extractor"],
category=RuleCategory.REFERENCE,
severity=Severity.ERROR,
is_fixable=True,
check=_check_model_config,
description="Model configuration must be valid",
fix_hint="Add valid model from available_models",
)
)
register_rule(
ValidationRule(
id="tool.reference.valid",
node_types=["tool"],
category=RuleCategory.REFERENCE,
severity=Severity.ERROR,
is_fixable=True,
check=_check_tool_reference,
description="Tool reference must be valid and configured",
fix_hint="Use valid tool or fallback node",
)
)
register_rule(
ValidationRule(
id="ifelse.operator.valid",
node_types=["if-else"],
category=RuleCategory.SEMANTIC,
severity=Severity.ERROR,
is_fixable=True,
check=_check_if_else_operators,
description="If-else operators must be valid",
fix_hint="Use standard operators like ≥, ≤, =, ≠",
)
)

View File

@@ -0,0 +1,434 @@
"""
Unit tests for the Vibe Workflow Validator.
Tests cover:
- Basic validation function
- User-friendly validation hints
- Edge cases and error handling
"""
from core.workflow.generator.utils.workflow_validator import ValidationHint, WorkflowValidator
class TestValidationHint:
"""Tests for ValidationHint dataclass."""
def test_hint_creation(self):
"""Test creating a validation hint."""
hint = ValidationHint(
node_id="llm_1",
field="model",
message="Model is not configured",
severity="error",
)
assert hint.node_id == "llm_1"
assert hint.field == "model"
assert hint.message == "Model is not configured"
assert hint.severity == "error"
def test_hint_with_suggestion(self):
"""Test hint with suggestion."""
hint = ValidationHint(
node_id="http_1",
field="url",
message="URL is required",
severity="error",
suggestion="Add a valid URL like https://api.example.com",
)
assert hint.suggestion is not None
class TestWorkflowValidatorBasic:
"""Tests for basic validation scenarios."""
def test_empty_workflow_is_valid(self):
"""Test empty workflow passes validation."""
workflow_data = {"nodes": [], "edges": []}
is_valid, hints = WorkflowValidator.validate(workflow_data, [])
# Empty but valid structure
assert is_valid is True
assert len(hints) == 0
def test_minimal_valid_workflow(self):
"""Test minimal Start → End workflow."""
workflow_data = {
"nodes": [
{"id": "start", "type": "start", "config": {}},
{"id": "end", "type": "end", "config": {}},
],
"edges": [{"source": "start", "target": "end"}],
}
is_valid, hints = WorkflowValidator.validate(workflow_data, [])
assert is_valid is True
def test_complete_workflow_with_llm(self):
"""Test complete workflow with LLM node."""
workflow_data = {
"nodes": [
{"id": "start", "type": "start", "config": {"variables": []}},
{
"id": "llm",
"type": "llm",
"config": {
"model": {"provider": "openai", "name": "gpt-4"},
"prompt_template": [{"role": "user", "text": "Hello"}],
},
},
{"id": "end", "type": "end", "config": {"outputs": []}},
],
"edges": [
{"source": "start", "target": "llm"},
{"source": "llm", "target": "end"},
],
}
is_valid, hints = WorkflowValidator.validate(workflow_data, [])
# Should pass with no critical errors
errors = [h for h in hints if h.severity == "error"]
assert len(errors) == 0
class TestVariableReferenceValidation:
"""Tests for variable reference validation."""
def test_valid_variable_reference(self):
"""Test valid variable reference passes."""
workflow_data = {
"nodes": [
{"id": "start", "type": "start", "config": {}},
{
"id": "llm",
"type": "llm",
"config": {"prompt_template": [{"role": "user", "text": "Query: {{#start.query#}}"}]},
},
],
"edges": [{"source": "start", "target": "llm"}],
}
is_valid, hints = WorkflowValidator.validate(workflow_data, [])
ref_errors = [h for h in hints if "reference" in h.message.lower()]
assert len(ref_errors) == 0
def test_invalid_variable_reference(self):
"""Test invalid variable reference generates hint."""
workflow_data = {
"nodes": [
{"id": "start", "type": "start", "config": {}},
{
"id": "llm",
"type": "llm",
"config": {"prompt_template": [{"role": "user", "text": "{{#nonexistent.field#}}"}]},
},
],
"edges": [{"source": "start", "target": "llm"}],
}
is_valid, hints = WorkflowValidator.validate(workflow_data, [])
# Should have a hint about invalid reference
ref_hints = [h for h in hints if "nonexistent" in h.message or "reference" in h.message.lower()]
assert len(ref_hints) >= 1
class TestEdgeValidation:
"""Tests for edge validation."""
def test_edge_with_invalid_source(self):
"""Test edge with non-existent source generates hint."""
workflow_data = {
"nodes": [{"id": "end", "type": "end", "config": {}}],
"edges": [{"source": "nonexistent", "target": "end"}],
}
is_valid, hints = WorkflowValidator.validate(workflow_data, [])
# Should have hint about invalid edge
edge_hints = [h for h in hints if "edge" in h.message.lower() or "source" in h.message.lower()]
assert len(edge_hints) >= 1
def test_edge_with_invalid_target(self):
"""Test edge with non-existent target generates hint."""
workflow_data = {
"nodes": [{"id": "start", "type": "start", "config": {}}],
"edges": [{"source": "start", "target": "nonexistent"}],
}
is_valid, hints = WorkflowValidator.validate(workflow_data, [])
edge_hints = [h for h in hints if "edge" in h.message.lower() or "target" in h.message.lower()]
assert len(edge_hints) >= 1
class TestToolValidation:
"""Tests for tool node validation."""
def test_tool_node_found_in_available(self):
"""Test tool node that exists in available tools."""
workflow_data = {
"nodes": [
{"id": "start", "type": "start", "config": {}},
{
"id": "tool1",
"type": "tool",
"config": {"tool_key": "google/search"},
},
{"id": "end", "type": "end", "config": {}},
],
"edges": [{"source": "start", "target": "tool1"}, {"source": "tool1", "target": "end"}],
}
available_tools = [{"provider_id": "google", "tool_key": "search", "is_team_authorization": True}]
is_valid, hints = WorkflowValidator.validate(workflow_data, available_tools)
tool_errors = [h for h in hints if h.severity == "error" and "tool" in h.message.lower()]
assert len(tool_errors) == 0
def test_tool_node_not_found(self):
"""Test tool node not in available tools generates hint."""
workflow_data = {
"nodes": [
{
"id": "tool1",
"type": "tool",
"config": {"tool_key": "unknown/tool"},
}
],
"edges": [],
}
available_tools = []
is_valid, hints = WorkflowValidator.validate(workflow_data, available_tools)
tool_hints = [h for h in hints if "tool" in h.message.lower()]
assert len(tool_hints) >= 1
class TestQuestionClassifierValidation:
"""Tests for question-classifier node validation."""
def test_question_classifier_with_classes(self):
"""Test question-classifier with valid classes."""
workflow_data = {
"nodes": [
{"id": "start", "type": "start", "config": {}},
{
"id": "classifier",
"type": "question-classifier",
"config": {
"classes": [
{"id": "class1", "name": "Class 1"},
{"id": "class2", "name": "Class 2"},
],
"model": {"provider": "openai", "name": "gpt-4", "mode": "chat"},
},
},
{"id": "h1", "type": "llm", "config": {}},
{"id": "h2", "type": "llm", "config": {}},
{"id": "end", "type": "end", "config": {}},
],
"edges": [
{"source": "start", "target": "classifier"},
{"source": "classifier", "sourceHandle": "class1", "target": "h1"},
{"source": "classifier", "sourceHandle": "class2", "target": "h2"},
{"source": "h1", "target": "end"},
{"source": "h2", "target": "end"},
],
}
available_models = [{"provider": "openai", "model": "gpt-4", "mode": "chat"}]
is_valid, hints = WorkflowValidator.validate(workflow_data, [], available_models=available_models)
class_errors = [h for h in hints if "class" in h.message.lower() and h.severity == "error"]
assert len(class_errors) == 0
def test_question_classifier_missing_classes(self):
"""Test question-classifier without classes generates hint."""
workflow_data = {
"nodes": [
{
"id": "classifier",
"type": "question-classifier",
"config": {"model": {"provider": "openai", "name": "gpt-4", "mode": "chat"}},
}
],
"edges": [],
}
available_models = [{"provider": "openai", "model": "gpt-4", "mode": "chat"}]
is_valid, hints = WorkflowValidator.validate(workflow_data, [], available_models=available_models)
# Should have hint about missing classes
class_hints = [h for h in hints if "class" in h.message.lower()]
assert len(class_hints) >= 1
class TestHttpRequestValidation:
"""Tests for HTTP request node validation."""
def test_http_request_with_url(self):
"""Test HTTP request with valid URL."""
workflow_data = {
"nodes": [
{"id": "start", "type": "start", "config": {}},
{
"id": "http",
"type": "http-request",
"config": {"url": "https://api.example.com", "method": "GET"},
},
{"id": "end", "type": "end", "config": {}},
],
"edges": [{"source": "start", "target": "http"}, {"source": "http", "target": "end"}],
}
is_valid, hints = WorkflowValidator.validate(workflow_data, [])
url_errors = [h for h in hints if "url" in h.message.lower() and h.severity == "error"]
assert len(url_errors) == 0
def test_http_request_missing_url(self):
"""Test HTTP request without URL generates hint."""
workflow_data = {
"nodes": [
{
"id": "http",
"type": "http-request",
"config": {"method": "GET"},
}
],
"edges": [],
}
is_valid, hints = WorkflowValidator.validate(workflow_data, [])
url_hints = [h for h in hints if "url" in h.message.lower()]
assert len(url_hints) >= 1
class TestParameterExtractorValidation:
"""Tests for parameter-extractor node validation."""
def test_parameter_extractor_valid_params(self):
"""Test parameter-extractor with valid parameters."""
workflow_data = {
"nodes": [
{"id": "start", "type": "start", "config": {}},
{
"id": "extractor",
"type": "parameter-extractor",
"config": {
"instruction": "Extract info",
"parameters": [
{
"name": "name",
"type": "string",
"description": "Name",
"required": True,
}
],
"model": {"provider": "openai", "name": "gpt-4", "mode": "chat"},
},
},
{"id": "end", "type": "end", "config": {}},
],
"edges": [{"source": "start", "target": "extractor"}, {"source": "extractor", "target": "end"}],
}
available_models = [{"provider": "openai", "model": "gpt-4", "mode": "chat"}]
is_valid, hints = WorkflowValidator.validate(workflow_data, [], available_models=available_models)
errors = [h for h in hints if h.severity == "error"]
assert len(errors) == 0
def test_parameter_extractor_missing_required_field(self):
"""Test parameter-extractor missing 'required' field in parameter item."""
workflow_data = {
"nodes": [
{
"id": "extractor",
"type": "parameter-extractor",
"config": {
"instruction": "Extract info",
"parameters": [
{
"name": "name",
"type": "string",
"description": "Name",
# Missing 'required'
}
],
"model": {"provider": "openai", "name": "gpt-4", "mode": "chat"},
},
}
],
"edges": [],
}
available_models = [{"provider": "openai", "model": "gpt-4", "mode": "chat"}]
is_valid, hints = WorkflowValidator.validate(workflow_data, [], available_models=available_models)
errors = [h for h in hints if "required" in h.message and h.severity == "error"]
assert len(errors) >= 1
assert "parameter-extractor" in errors[0].node_type
class TestIfElseValidation:
"""Tests for if-else node validation."""
def test_if_else_valid_operators(self):
"""Test if-else with valid operators."""
workflow_data = {
"nodes": [
{"id": "start", "type": "start", "config": {}},
{
"id": "ifelse",
"type": "if-else",
"config": {
"cases": [{"case_id": "c1", "conditions": [{"comparison_operator": "", "value": "1"}]}]
},
},
{"id": "t", "type": "llm", "config": {}},
{"id": "f", "type": "llm", "config": {}},
{"id": "end", "type": "end", "config": {}},
],
"edges": [
{"source": "start", "target": "ifelse"},
{"source": "ifelse", "sourceHandle": "true", "target": "t"},
{"source": "ifelse", "sourceHandle": "false", "target": "f"},
{"source": "t", "target": "end"},
{"source": "f", "target": "end"},
],
}
is_valid, hints = WorkflowValidator.validate(workflow_data, [])
errors = [h for h in hints if h.severity == "error"]
# Filter out LLM model errors if any (available tools/models check might trigger)
# (actually available_models empty list might trigger model error?
# No, model config validation skips if model field not present? No, LLM has model config.
# But logic skips check if key missing? Let's check logic.
# _check_model_config checks if provider/name match available. If available is empty, it fails.
# But wait, validate default available_models is None?
# I should provide mock available_models or ignore model errors.
# Actually LLM node "config": {} implies missing model config. Rules check if config structure is valid?
# Let's filter specifically for operator errors.
operator_errors = [h for h in errors if "operator" in h.message]
assert len(operator_errors) == 0
def test_if_else_invalid_operators(self):
"""Test if-else with invalid operators."""
workflow_data = {
"nodes": [
{"id": "start", "type": "start", "config": {}},
{
"id": "ifelse",
"type": "if-else",
"config": {
"cases": [{"case_id": "c1", "conditions": [{"comparison_operator": ">=", "value": "1"}]}]
},
},
{"id": "t", "type": "llm", "config": {}},
{"id": "f", "type": "llm", "config": {}},
{"id": "end", "type": "end", "config": {}},
],
"edges": [
{"source": "start", "target": "ifelse"},
{"source": "ifelse", "sourceHandle": "true", "target": "t"},
{"source": "ifelse", "sourceHandle": "false", "target": "f"},
{"source": "t", "target": "end"},
{"source": "f", "target": "end"},
],
}
is_valid, hints = WorkflowValidator.validate(workflow_data, [])
operator_errors = [h for h in hints if "operator" in h.message and h.severity == "error"]
assert len(operator_errors) > 0
assert "" in operator_errors[0].suggestion

View File

@@ -0,0 +1,189 @@
import { isInWorkflowPage, VIBE_COMMAND_EVENT } from '@/app/components/workflow/constants'
import i18n from '@/i18n-config/i18next-config'
import { bananaCommand } from './banana'
import { registerCommands, unregisterCommands } from './command-bus'
vi.mock('@/i18n-config/i18next-config', () => ({
default: {
t: vi.fn((key: string, options?: Record<string, unknown>) => {
if (!options)
return key
return `${key}:${JSON.stringify(options)}`
}),
},
}))
vi.mock('@/app/components/workflow/constants', async () => {
const actual = await vi.importActual<typeof import('@/app/components/workflow/constants')>(
'@/app/components/workflow/constants',
)
return {
...actual,
isInWorkflowPage: vi.fn(),
}
})
vi.mock('./command-bus', () => ({
registerCommands: vi.fn(),
unregisterCommands: vi.fn(),
}))
const mockedIsInWorkflowPage = vi.mocked(isInWorkflowPage)
const mockedRegisterCommands = vi.mocked(registerCommands)
const mockedUnregisterCommands = vi.mocked(unregisterCommands)
const mockedT = vi.mocked(i18n.t)
type CommandArgs = { dsl?: string }
type CommandMap = Record<string, (args?: CommandArgs) => void | Promise<void>>
beforeEach(() => {
vi.clearAllMocks()
})
// Command availability, search, and registration behavior for banana command.
describe('bananaCommand', () => {
// Command metadata mirrors the static definition.
describe('metadata', () => {
it('should expose name, mode, and description', () => {
// Assert
expect(bananaCommand.name).toBe('banana')
expect(bananaCommand.mode).toBe('submenu')
expect(bananaCommand.description).toContain('app.gotoAnything.actions.vibeDesc')
})
})
// Availability mirrors workflow page detection.
describe('availability', () => {
it('should return true when on workflow page', () => {
// Arrange
mockedIsInWorkflowPage.mockReturnValue(true)
// Act
const available = bananaCommand.isAvailable?.()
// Assert
expect(available).toBe(true)
expect(mockedIsInWorkflowPage).toHaveBeenCalledTimes(1)
})
it('should return false when not on workflow page', () => {
// Arrange
mockedIsInWorkflowPage.mockReturnValue(false)
// Act
const available = bananaCommand.isAvailable?.()
// Assert
expect(available).toBe(false)
expect(mockedIsInWorkflowPage).toHaveBeenCalledTimes(1)
})
})
// Search results depend on provided arguments.
describe('search', () => {
it('should return hint description when args are empty', async () => {
// Arrange
mockedIsInWorkflowPage.mockReturnValue(true)
// Act
const result = await bananaCommand.search(' ')
// Assert
expect(result).toHaveLength(1)
const [item] = result
expect(item.description).toContain('app.gotoAnything.actions.vibeHint')
expect(item.data?.args?.dsl).toBe('')
expect(item.data?.command).toBe('workflow.vibe')
expect(mockedT).toHaveBeenCalledWith(
'app.gotoAnything.actions.vibeTitle',
expect.objectContaining({ lng: 'en' }),
)
expect(mockedT).toHaveBeenCalledWith(
'app.gotoAnything.actions.vibeHint',
expect.objectContaining({ prompt: expect.any(String), lng: 'en' }),
)
})
it('should return default description when args are provided', async () => {
// Arrange
mockedIsInWorkflowPage.mockReturnValue(true)
// Act
const result = await bananaCommand.search(' make a flow ', 'fr')
// Assert
expect(result).toHaveLength(1)
const [item] = result
expect(item.description).toContain('app.gotoAnything.actions.vibeDesc')
expect(item.data?.args?.dsl).toBe('make a flow')
expect(item.data?.command).toBe('workflow.vibe')
expect(mockedT).toHaveBeenCalledWith(
'app.gotoAnything.actions.vibeTitle',
expect.objectContaining({ lng: 'fr' }),
)
expect(mockedT).toHaveBeenCalledWith(
'app.gotoAnything.actions.vibeDesc',
expect.objectContaining({ lng: 'fr' }),
)
})
it('should fall back to Banana when title translation is empty', async () => {
// Arrange
mockedIsInWorkflowPage.mockReturnValue(true)
mockedT.mockImplementationOnce(() => '')
// Act
const result = await bananaCommand.search('make a plan')
// Assert
expect(result).toHaveLength(1)
expect(result[0]?.title).toBe('Banana')
})
})
// Command registration and event dispatching.
describe('registration', () => {
it('should register the workflow vibe command', () => {
// Act
expect(bananaCommand.register).toBeDefined()
bananaCommand.register?.({})
// Assert
expect(mockedRegisterCommands).toHaveBeenCalledTimes(1)
const commands = mockedRegisterCommands.mock.calls[0]?.[0] as CommandMap
expect(commands['workflow.vibe']).toEqual(expect.any(Function))
})
it('should dispatch vibe event when command handler runs', async () => {
// Arrange
const dispatchSpy = vi.spyOn(document, 'dispatchEvent')
expect(bananaCommand.register).toBeDefined()
bananaCommand.register?.({})
expect(mockedRegisterCommands).toHaveBeenCalledTimes(1)
const commands = mockedRegisterCommands.mock.calls[0]?.[0] as CommandMap
try {
// Act
await commands['workflow.vibe']?.({ dsl: 'hello' })
// Assert
expect(dispatchSpy).toHaveBeenCalledTimes(1)
const event = dispatchSpy.mock.calls[0][0] as CustomEvent
expect(event.type).toBe(VIBE_COMMAND_EVENT)
expect(event.detail).toEqual({ dsl: 'hello' })
}
finally {
dispatchSpy.mockRestore()
}
})
it('should unregister workflow vibe command', () => {
// Act
expect(bananaCommand.unregister).toBeDefined()
bananaCommand.unregister?.()
// Assert
expect(mockedUnregisterCommands).toHaveBeenCalledWith(['workflow.vibe'])
})
})
})

View File

@@ -0,0 +1,59 @@
import type { SlashCommandHandler } from './types'
import { RiSparklingFill } from '@remixicon/react'
import * as React from 'react'
import { getI18n } from 'react-i18next'
import { isInWorkflowPage, VIBE_COMMAND_EVENT } from '@/app/components/workflow/constants'
import { registerCommands, unregisterCommands } from './command-bus'
type BananaDeps = Record<string, never>
const BANANA_PROMPT_EXAMPLE = 'Summarize a document, classify sentiment, then notify Slack'
const dispatchVibeCommand = (input?: string) => {
if (typeof document === 'undefined')
return
document.dispatchEvent(new CustomEvent(VIBE_COMMAND_EVENT, { detail: { dsl: input } }))
}
export const bananaCommand: SlashCommandHandler<BananaDeps> = {
name: 'banana',
description: getI18n().t('gotoAnything.actions.vibeDesc', { ns: 'app' }),
mode: 'submenu',
isAvailable: () => isInWorkflowPage(),
async search(args: string, locale: string = 'en') {
const trimmed = args.trim()
const hasInput = !!trimmed
return [{
id: 'banana-vibe',
title: getI18n().t('gotoAnything.actions.vibeTitle', { ns: 'app', lng: locale }) || 'Banana',
description: hasInput
? getI18n().t('gotoAnything.actions.vibeDesc', { ns: 'app', lng: locale })
: getI18n().t('gotoAnything.actions.vibeHint', { ns: 'app', lng: locale, prompt: BANANA_PROMPT_EXAMPLE }),
type: 'command' as const,
icon: (
<div className="flex h-6 w-6 items-center justify-center rounded-md border-[0.5px] border-divider-regular bg-components-panel-bg">
<RiSparklingFill className="h-4 w-4 text-text-tertiary" />
</div>
),
data: {
command: 'workflow.vibe',
args: { dsl: trimmed },
},
}]
},
register(_deps: BananaDeps) {
registerCommands({
'workflow.vibe': async (args) => {
dispatchVibeCommand(args?.dsl)
},
})
},
unregister() {
unregisterCommands(['workflow.vibe'])
},
}

View File

@@ -5,6 +5,7 @@ import { useEffect } from 'react'
import { getI18n } from 'react-i18next'
import { setLocaleOnClient } from '@/i18n-config'
import { accountCommand } from './account'
import { bananaCommand } from './banana'
import { executeCommand } from './command-bus'
import { communityCommand } from './community'
import { docsCommand } from './docs'
@@ -43,6 +44,7 @@ export const registerSlashCommands = (deps: Record<string, any>) => {
slashCommandRegistry.register(communityCommand, {})
slashCommandRegistry.register(accountCommand, {})
slashCommandRegistry.register(zenCommand, {})
slashCommandRegistry.register(bananaCommand, {})
}
export const unregisterSlashCommands = () => {
@@ -54,6 +56,7 @@ export const unregisterSlashCommands = () => {
slashCommandRegistry.unregister('community')
slashCommandRegistry.unregister('account')
slashCommandRegistry.unregister('zen')
slashCommandRegistry.unregister('banana')
}
export const SlashCommandProvider = () => {

View File

@@ -0,0 +1,59 @@
import type { SlashCommandHandler } from './types'
import { RiSparklingFill } from '@remixicon/react'
import * as React from 'react'
import { getI18n } from 'react-i18next'
import { isInWorkflowPage, VIBE_COMMAND_EVENT } from '@/app/components/workflow/constants'
import { registerCommands, unregisterCommands } from './command-bus'
type VibeDeps = Record<string, never>
const VIBE_PROMPT_EXAMPLE = 'Summarize a document, classify sentiment, then notify Slack'
const dispatchVibeCommand = (input?: string) => {
if (typeof document === 'undefined')
return
document.dispatchEvent(new CustomEvent(VIBE_COMMAND_EVENT, { detail: { dsl: input } }))
}
export const vibeCommand: SlashCommandHandler<VibeDeps> = {
name: 'vibe',
description: getI18n().t('gotoAnything.actions.vibeDesc', { ns: 'app' }),
mode: 'submenu',
isAvailable: () => isInWorkflowPage(),
async search(args: string, locale: string = 'en') {
const trimmed = args.trim()
const hasInput = !!trimmed
return [{
id: 'vibe',
title: getI18n().t('gotoAnything.actions.vibeTitle', { ns: 'app', lng: locale }) || 'Vibe',
description: hasInput
? getI18n().t('gotoAnything.actions.vibeDesc', { ns: 'app', lng: locale })
: getI18n().t('gotoAnything.actions.vibeHint', { ns: 'app', lng: locale, prompt: VIBE_PROMPT_EXAMPLE }),
type: 'command' as const,
icon: (
<div className="flex h-6 w-6 items-center justify-center rounded-md border-[0.5px] border-divider-regular bg-components-panel-bg">
<RiSparklingFill className="h-4 w-4 text-text-tertiary" />
</div>
),
data: {
command: 'workflow.vibe',
args: { dsl: trimmed },
},
}]
},
register(_deps: VibeDeps) {
registerCommands({
'workflow.vibe': async (args) => {
dispatchVibeCommand(args?.dsl)
},
})
},
unregister() {
unregisterCommands(['workflow.vibe'])
},
}

View File

@@ -160,7 +160,7 @@
* - `@knowledge` / `@kb` - Search knowledge bases
* - `@plugin` - Search plugins
* - `@node` - Search workflow nodes (workflow pages only)
* - `/` - Execute slash commands (theme, language, etc.)
* - `/` - Execute slash commands (theme, language, banana, etc.)
*/
import type { ActionItem, SearchResult } from './types'

View File

@@ -116,6 +116,7 @@ const CommandSelector: FC<Props> = ({ actions, onCommandSelect, searchFilter, co
'/docs': 'gotoAnything.actions.docDesc',
'/community': 'gotoAnything.actions.communityDesc',
'/zen': 'gotoAnything.actions.zenDesc',
'/banana': 'gotoAnything.actions.vibeDesc',
} as const
return t(slashKeyMap[item.key as keyof typeof slashKeyMap] || item.description, { ns: 'app' })
})()

View File

@@ -9,6 +9,9 @@ export const NODE_WIDTH = 240
export const X_OFFSET = 60
export const NODE_WIDTH_X_OFFSET = NODE_WIDTH + X_OFFSET
export const Y_OFFSET = 39
export const VIBE_COMMAND_EVENT = 'workflow-vibe-command'
export const VIBE_REGENERATE_EVENT = 'workflow-vibe-regenerate'
export const VIBE_ACCEPT_EVENT = 'workflow-vibe-accept'
export const START_INITIAL_POSITION = { x: 80, y: 282 }
export const AUTO_LAYOUT_OFFSET = {
x: -42,

View File

@@ -0,0 +1,80 @@
import { describe, expect, it } from 'vitest'
import { replaceVariableReferences } from '../use-workflow-vibe'
// Mock types needed for the test
type NodeData = {
title: string
[key: string]: any
}
describe('use-workflow-vibe', () => {
describe('replaceVariableReferences', () => {
it('should replace variable references in strings', () => {
const data = {
title: 'Test Node',
prompt: 'Hello {{#old_id.query#}}',
}
const nodeIdMap = new Map<string, any>()
nodeIdMap.set('old_id', { id: 'new_uuid', data: { type: 'llm' } })
const result = replaceVariableReferences(data, nodeIdMap) as NodeData
expect(result.prompt).toBe('Hello {{#new_uuid.query#}}')
})
it('should handle multiple references in one string', () => {
const data = {
title: 'Test Node',
text: '{{#node1.out#}} and {{#node2.out#}}',
}
const nodeIdMap = new Map<string, any>()
nodeIdMap.set('node1', { id: 'uuid1', data: { type: 'llm' } })
nodeIdMap.set('node2', { id: 'uuid2', data: { type: 'llm' } })
const result = replaceVariableReferences(data, nodeIdMap) as NodeData
expect(result.text).toBe('{{#uuid1.out#}} and {{#uuid2.out#}}')
})
it('should replace variable references in value_selector arrays', () => {
const data = {
title: 'End Node',
outputs: [
{
variable: 'result',
value_selector: ['old_id', 'text'],
},
],
}
const nodeIdMap = new Map<string, any>()
nodeIdMap.set('old_id', { id: 'new_uuid', data: { type: 'llm' } })
const result = replaceVariableReferences(data, nodeIdMap) as NodeData
expect(result.outputs[0].value_selector).toEqual(['new_uuid', 'text'])
})
it('should handle nested objects recursively', () => {
const data = {
config: {
model: {
prompt: '{{#old_id.text#}}',
},
},
}
const nodeIdMap = new Map<string, any>()
nodeIdMap.set('old_id', { id: 'new_uuid', data: { type: 'llm' } })
const result = replaceVariableReferences(data, nodeIdMap) as any
expect(result.config.model.prompt).toBe('{{#new_uuid.text#}}')
})
it('should ignoring missing node mappings', () => {
const data = {
text: '{{#missing_id.text#}}',
}
const nodeIdMap = new Map<string, any>()
// missing_id is not in map
const result = replaceVariableReferences(data, nodeIdMap) as NodeData
expect(result.text).toBe('{{#missing_id.text#}}')
})
})
})

View File

@@ -24,3 +24,5 @@ export * from './use-workflow-run'
export * from './use-workflow-search'
export * from './use-workflow-start-run'
export * from './use-workflow-variables'
export * from './use-workflow-vibe'
export * from './use-workflow-vibe-config'

View File

@@ -0,0 +1,99 @@
/**
* Vibe Workflow Generator Configuration
*
* This module centralizes configuration for the Vibe workflow generation feature,
* including node type aliases and field name corrections.
*
* Note: These definitions are mirrored in the backend at:
* api/core/workflow/generator/config/node_schemas.py
* When updating these values, also update the backend file.
*/
/**
* Node type aliases for inference from natural language.
* Maps common terms to canonical node type names.
*/
export const NODE_TYPE_ALIASES: Record<string, string> = {
// Start node aliases
'start': 'start',
'begin': 'start',
'input': 'start',
// End node aliases
'end': 'end',
'finish': 'end',
'output': 'end',
// LLM node aliases
'llm': 'llm',
'ai': 'llm',
'gpt': 'llm',
'model': 'llm',
'chat': 'llm',
// Code node aliases
'code': 'code',
'script': 'code',
'python': 'code',
'javascript': 'code',
// HTTP request node aliases
'http-request': 'http-request',
'http': 'http-request',
'request': 'http-request',
'api': 'http-request',
'fetch': 'http-request',
'webhook': 'http-request',
// Conditional node aliases
'if-else': 'if-else',
'condition': 'if-else',
'branch': 'if-else',
'switch': 'if-else',
// Loop node aliases
'iteration': 'iteration',
'loop': 'loop',
'foreach': 'iteration',
// Tool node alias
'tool': 'tool',
}
/**
* Field name corrections for LLM-generated node configs.
* Maps incorrect field names to correct ones for specific node types.
*/
export const FIELD_NAME_CORRECTIONS: Record<string, Record<string, string>> = {
'http-request': {
text: 'body', // LLM might use "text" instead of "body"
content: 'body',
response: 'body',
},
'code': {
text: 'result', // LLM might use "text" instead of "result"
output: 'result',
},
'llm': {
response: 'text',
answer: 'text',
},
}
/**
* Correct field names based on node type.
* LLM sometimes generates wrong field names (e.g., "text" instead of "body" for HTTP nodes).
*
* @param field - The field name to correct
* @param nodeType - The type of the node
* @returns The corrected field name, or the original if no correction needed
*/
export const correctFieldName = (field: string, nodeType: string): string => {
const corrections = FIELD_NAME_CORRECTIONS[nodeType]
if (corrections && corrections[field])
return corrections[field]
return field
}
/**
* Get the canonical node type from an alias.
*
* @param alias - The alias to look up
* @returns The canonical node type, or undefined if not found
*/
export const getCanonicalNodeType = (alias: string): string | undefined => {
return NODE_TYPE_ALIASES[alias.toLowerCase()]
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,333 @@
/**
* VibePanel Component Tests
*
* Covers rendering states, user interactions, and edge cases for the vibe panel.
*/
import type { Shape as WorkflowState } from '@/app/components/workflow/store/workflow'
import type { Edge, Node } from '@/app/components/workflow/types'
import { fireEvent, render, screen, waitFor } from '@testing-library/react'
import userEvent from '@testing-library/user-event'
import Toast from '@/app/components/base/toast'
import { WorkflowContext } from '@/app/components/workflow/context'
import { HooksStoreContext } from '@/app/components/workflow/hooks-store/provider'
import { createHooksStore } from '@/app/components/workflow/hooks-store/store'
import { createWorkflowStore } from '@/app/components/workflow/store/workflow'
import { BlockEnum } from '@/app/components/workflow/types'
import { VIBE_APPLY_EVENT, VIBE_COMMAND_EVENT } from '../../constants'
import VibePanel from './index'
// ============================================================================
// Mocks
// ============================================================================
const mockCopy = vi.hoisted(() => vi.fn())
const mockUseVibeFlowData = vi.hoisted(() => vi.fn())
vi.mock('copy-to-clipboard', () => ({
default: mockCopy,
}))
vi.mock('@/app/components/header/account-setting/model-provider-page/hooks', () => ({
useModelListAndDefaultModelAndCurrentProviderAndModel: () => ({ defaultModel: null }),
}))
vi.mock('@/app/components/header/account-setting/model-provider-page/model-parameter-modal', () => ({
__esModule: true,
default: ({ modelId, provider }: { modelId: string, provider: string }) => (
<div data-testid="model-parameter-modal" data-model-id={modelId} data-provider={provider} />
),
}))
vi.mock('@/app/components/workflow/hooks/use-workflow-vibe', () => ({
useVibeFlowData: () => mockUseVibeFlowData(),
}))
vi.mock('@/app/components/workflow/workflow-preview', () => ({
__esModule: true,
default: ({ nodes, edges }: { nodes: Node[], edges: Edge[] }) => (
<div data-testid="workflow-preview" data-nodes-count={nodes.length} data-edges-count={edges.length} />
),
}))
// ============================================================================
// Test Utilities
// ============================================================================
type FlowGraph = {
nodes: Node[]
edges: Edge[]
}
type VibeFlowData = {
versions: FlowGraph[]
currentVersionIndex: number
setCurrentVersionIndex: (index: number) => void
current?: FlowGraph
}
const createMockNode = (overrides: Partial<Node> = {}): Node => ({
id: 'node-1',
type: 'custom',
position: { x: 0, y: 0 },
data: {
title: 'Start',
desc: '',
type: BlockEnum.Start,
},
...overrides,
})
const createMockEdge = (overrides: Partial<Edge> = {}): Edge => ({
id: 'edge-1',
source: 'node-1',
target: 'node-2',
data: {
sourceType: BlockEnum.Start,
targetType: BlockEnum.End,
},
...overrides,
})
const createFlowGraph = (overrides: Partial<FlowGraph> = {}): FlowGraph => ({
nodes: [],
edges: [],
...overrides,
})
const createVibeFlowData = (overrides: Partial<VibeFlowData> = {}): VibeFlowData => ({
versions: [],
currentVersionIndex: 0,
setCurrentVersionIndex: vi.fn(),
current: undefined,
...overrides,
})
const renderVibePanel = ({
workflowState,
vibeFlowData,
}: {
workflowState?: Partial<WorkflowState>
vibeFlowData?: VibeFlowData
} = {}) => {
if (vibeFlowData)
mockUseVibeFlowData.mockReturnValue(vibeFlowData)
const workflowStore = createWorkflowStore({})
const vibeFlowState = vibeFlowData
? {
vibeFlowVersions: vibeFlowData.versions,
vibeFlowCurrentIndex: vibeFlowData.currentVersionIndex,
currentVibeFlow: vibeFlowData.current,
}
: {}
workflowStore.setState({
showVibePanel: true,
isVibeGenerating: false,
vibePanelInstruction: '',
vibePanelMermaidCode: '',
...vibeFlowState,
...workflowState,
})
const hooksStore = createHooksStore({})
return {
workflowStore,
...render(
<WorkflowContext.Provider value={workflowStore}>
<HooksStoreContext.Provider value={hooksStore}>
<VibePanel />
</HooksStoreContext.Provider>
</WorkflowContext.Provider>,
),
}
}
const getCopyButton = () => {
const buttons = screen.getAllByRole('button')
const copyButton = buttons.find(button => button.textContent?.trim() === '' && button.querySelector('svg'))
if (!copyButton)
throw new Error('Copy button not found')
return copyButton
}
// ============================================================================
// Tests
// ============================================================================
describe('VibePanel', () => {
let toastNotifySpy: ReturnType<typeof vi.spyOn>
beforeEach(() => {
vi.clearAllMocks()
mockUseVibeFlowData.mockReturnValue(createVibeFlowData())
toastNotifySpy = vi.spyOn(Toast, 'notify').mockImplementation(() => ({ clear: vi.fn() }))
})
afterEach(() => {
toastNotifySpy.mockRestore()
})
// --------------------------------------------------------------------------
// Rendering: default visibility and primary view states.
// --------------------------------------------------------------------------
describe('Rendering', () => {
it('should render nothing when panel is hidden', () => {
renderVibePanel({ workflowState: { showVibePanel: false } })
expect(screen.queryByText(/app\.gotoAnything\.actions\.vibeTitle/i)).not.toBeInTheDocument()
})
it('should render placeholder when no preview data and not generating', () => {
renderVibePanel({
workflowState: { showVibePanel: true, isVibeGenerating: false },
vibeFlowData: createVibeFlowData({ current: undefined }),
})
expect(screen.getByText(/appDebug\.generate\.newNoDataLine1/i)).toBeInTheDocument()
})
it('should render loading state when generating', () => {
renderVibePanel({
workflowState: { showVibePanel: true, isVibeGenerating: true },
})
expect(screen.getByText(/workflow\.vibe\.generatingFlowchart/i)).toBeInTheDocument()
expect(screen.getByRole('button', { name: 'appDebug.generate.generate' })).toBeDisabled()
})
it('should render preview panel when nodes exist', () => {
const flowGraph = createFlowGraph({
nodes: [createMockNode()],
edges: [createMockEdge()],
})
renderVibePanel({
vibeFlowData: createVibeFlowData({
current: flowGraph,
versions: [flowGraph],
}),
})
expect(screen.getByTestId('workflow-preview')).toBeInTheDocument()
expect(screen.getByRole('button', { name: 'workflow.vibe.apply' })).toBeInTheDocument()
expect(screen.getByText(/appDebug\.generate\.version/i)).toBeInTheDocument()
})
})
// --------------------------------------------------------------------------
// Props: store-driven inputs that toggle behavior.
// --------------------------------------------------------------------------
describe('Props', () => {
it('should render modal content when showVibePanel is true', () => {
renderVibePanel({ workflowState: { showVibePanel: true } })
expect(screen.getByText(/app\.gotoAnything\.actions\.vibeTitle/i)).toBeInTheDocument()
})
})
// --------------------------------------------------------------------------
// User Interactions: input edits and action triggers.
// --------------------------------------------------------------------------
describe('User Interactions', () => {
it('should update instruction in store when typing', async () => {
const { workflowStore } = renderVibePanel()
const textarea = screen.getByPlaceholderText('workflow.vibe.missingInstruction')
fireEvent.change(textarea, { target: { value: 'Build a vibe flow' } })
await waitFor(() => {
expect(workflowStore.getState().vibePanelInstruction).toBe('Build a vibe flow')
})
})
it('should dispatch command event with instruction when generate clicked', async () => {
const user = userEvent.setup()
const { workflowStore } = renderVibePanel({
workflowState: { vibePanelInstruction: 'Generate a workflow' },
})
const handler = vi.fn()
document.addEventListener(VIBE_COMMAND_EVENT, handler)
await user.click(screen.getByRole('button', { name: 'appDebug.generate.generate' }))
expect(handler).toHaveBeenCalledTimes(1)
const event = handler.mock.calls[0][0] as CustomEvent<{ dsl?: string }>
expect(event.detail).toEqual({ dsl: workflowStore.getState().vibePanelInstruction })
document.removeEventListener(VIBE_COMMAND_EVENT, handler)
})
it('should close panel when dismiss clicked', async () => {
const user = userEvent.setup()
const { workflowStore } = renderVibePanel({
workflowState: {
vibePanelMermaidCode: 'graph TD',
isVibeGenerating: true,
},
})
await user.click(screen.getByRole('button', { name: 'appDebug.generate.dismiss' }))
const state = workflowStore.getState()
expect(state.showVibePanel).toBe(false)
expect(state.vibePanelMermaidCode).toBe('')
expect(state.isVibeGenerating).toBe(false)
})
it('should dispatch apply event and close panel when apply clicked', async () => {
const user = userEvent.setup()
const flowGraph = createFlowGraph({
nodes: [createMockNode()],
edges: [createMockEdge()],
})
const { workflowStore } = renderVibePanel({
workflowState: { vibePanelMermaidCode: 'graph TD' },
vibeFlowData: createVibeFlowData({
current: flowGraph,
versions: [flowGraph],
}),
})
const handler = vi.fn()
document.addEventListener(VIBE_APPLY_EVENT, handler)
await user.click(screen.getByRole('button', { name: 'workflow.vibe.apply' }))
expect(handler).toHaveBeenCalledTimes(1)
const state = workflowStore.getState()
expect(state.showVibePanel).toBe(false)
expect(state.vibePanelMermaidCode).toBe('')
expect(state.isVibeGenerating).toBe(false)
document.removeEventListener(VIBE_APPLY_EVENT, handler)
})
it('should copy mermaid and notify when copy clicked', async () => {
const user = userEvent.setup()
const flowGraph = createFlowGraph({
nodes: [createMockNode()],
edges: [createMockEdge()],
})
renderVibePanel({
workflowState: { vibePanelMermaidCode: 'graph TD' },
vibeFlowData: createVibeFlowData({
current: flowGraph,
versions: [flowGraph],
}),
})
await user.click(getCopyButton())
expect(mockCopy).toHaveBeenCalledWith('graph TD')
expect(toastNotifySpy).toHaveBeenCalledWith(expect.objectContaining({
type: 'success',
message: 'common.actionMsg.copySuccessfully',
}))
})
})
})

View File

@@ -0,0 +1,332 @@
'use client'
import type { FC } from 'react'
import type { FormValue } from '@/app/components/header/account-setting/model-provider-page/declarations'
import type { CompletionParams, Model } from '@/types/app'
import { RiClipboardLine } from '@remixicon/react'
import copy from 'copy-to-clipboard'
import { useCallback, useMemo, useState } from 'react'
import { useTranslation } from 'react-i18next'
import { z } from 'zod'
import ResPlaceholder from '@/app/components/app/configuration/config/automatic/res-placeholder'
import VersionSelector from '@/app/components/app/configuration/config/automatic/version-selector'
import Button from '@/app/components/base/button'
import { Generator } from '@/app/components/base/icons/src/vender/other'
import Loading from '@/app/components/base/loading'
import Modal from '@/app/components/base/modal'
import Textarea from '@/app/components/base/textarea'
import Toast from '@/app/components/base/toast'
import { ModelTypeEnum } from '@/app/components/header/account-setting/model-provider-page/declarations'
import { useModelListAndDefaultModelAndCurrentProviderAndModel } from '@/app/components/header/account-setting/model-provider-page/hooks'
import ModelParameterModal from '@/app/components/header/account-setting/model-provider-page/model-parameter-modal'
import { ModelModeType } from '@/types/app'
import { VIBE_APPLY_EVENT, VIBE_COMMAND_EVENT } from '../../constants'
import { useStore, useWorkflowStore } from '../../store'
import WorkflowPreview from '../../workflow-preview'
const CompletionParamsSchema = z.object({
max_tokens: z.number(),
temperature: z.number(),
top_p: z.number(),
echo: z.boolean(),
stop: z.array(z.string()),
presence_penalty: z.number(),
frequency_penalty: z.number(),
})
const ModelSchema = z.object({
provider: z.string(),
name: z.string(),
mode: z.nativeEnum(ModelModeType),
completion_params: CompletionParamsSchema,
})
const VibePanel: FC = () => {
const { t } = useTranslation()
const workflowStore = useWorkflowStore()
const showVibePanel = useStore(s => s.showVibePanel)
const setShowVibePanel = useStore(s => s.setShowVibePanel)
const isVibeGenerating = useStore(s => s.isVibeGenerating)
const setIsVibeGenerating = useStore(s => s.setIsVibeGenerating)
const vibePanelInstruction = useStore(s => s.vibePanelInstruction)
const vibePanelMermaidCode = useStore(s => s.vibePanelMermaidCode)
const setVibePanelMermaidCode = useStore(s => s.setVibePanelMermaidCode)
const currentFlowGraph = useStore(s => s.currentVibeFlow)
const versions = useStore(s => s.vibeFlowVersions)
const currentVersionIndex = useStore(s => s.vibeFlowCurrentIndex)
const vibePanelPreviewNodes = currentFlowGraph?.nodes || []
const vibePanelPreviewEdges = currentFlowGraph?.edges || []
const setVibePanelInstruction = useStore(s => s.setVibePanelInstruction)
const vibePanelIntent = useStore(s => s.vibePanelIntent)
const setVibePanelIntent = useStore(s => s.setVibePanelIntent)
const vibePanelMessage = useStore(s => s.vibePanelMessage)
const setVibePanelMessage = useStore(s => s.setVibePanelMessage)
const vibePanelSuggestions = useStore(s => s.vibePanelSuggestions)
const setVibePanelSuggestions = useStore(s => s.setVibePanelSuggestions)
const { defaultModel } = useModelListAndDefaultModelAndCurrentProviderAndModel(ModelTypeEnum.textGeneration)
// Track user's explicit model selection (from localStorage)
const [userModel, setUserModel] = useState<Model | null>(() => {
try {
const stored = localStorage.getItem('auto-gen-model')
if (stored) {
const parsed = JSON.parse(stored)
const result = ModelSchema.safeParse(parsed)
if (result.success)
return result.data
// If validation fails, clear the invalid data
localStorage.removeItem('auto-gen-model')
}
}
catch {
// ignore parse errors
}
return null
})
// Derive the actual model from user selection or default
const model: Model = useMemo(() => {
if (userModel)
return userModel
if (defaultModel) {
return {
name: defaultModel.model,
provider: defaultModel.provider.provider,
mode: ModelModeType.chat,
completion_params: {} as CompletionParams,
}
}
return {
name: '',
provider: '',
mode: ModelModeType.chat,
completion_params: {} as CompletionParams,
}
}, [userModel, defaultModel])
const setModel = useCallback((newModel: Model) => {
setUserModel(newModel)
localStorage.setItem('auto-gen-model', JSON.stringify(newModel))
}, [])
const handleModelChange = useCallback((newValue: { modelId: string, provider: string, mode?: string, features?: string[] }) => {
setModel({
...model,
provider: newValue.provider,
name: newValue.modelId,
mode: newValue.mode as ModelModeType,
})
}, [model, setModel])
const handleCompletionParamsChange = useCallback((newParams: FormValue) => {
setModel({
...model,
completion_params: newParams as CompletionParams,
})
}, [model, setModel])
const handleInstructionChange = useCallback((e: React.ChangeEvent<HTMLTextAreaElement>) => {
workflowStore.setState(state => ({
...state,
vibePanelInstruction: e.target.value,
}))
}, [workflowStore])
const handleClose = useCallback(() => {
setShowVibePanel(false)
setVibePanelMermaidCode('')
setIsVibeGenerating(false)
setVibePanelIntent('')
setVibePanelMessage('')
setVibePanelSuggestions([])
}, [setShowVibePanel, setVibePanelMermaidCode, setIsVibeGenerating, setVibePanelIntent, setVibePanelMessage, setVibePanelSuggestions])
const handleGenerate = useCallback(() => {
const event = new CustomEvent(VIBE_COMMAND_EVENT, {
detail: { dsl: vibePanelInstruction },
})
document.dispatchEvent(event)
}, [vibePanelInstruction])
const handleAccept = useCallback(() => {
const event = new CustomEvent(VIBE_APPLY_EVENT)
document.dispatchEvent(event)
handleClose()
}, [handleClose])
const handleCopyMermaid = useCallback(() => {
copy(vibePanelMermaidCode)
Toast.notify({ type: 'success', message: t('actionMsg.copySuccessfully', { ns: 'common' }) })
}, [vibePanelMermaidCode, t])
const handleSuggestionClick = useCallback((suggestion: string) => {
setVibePanelInstruction(suggestion)
// Trigger generation with the suggestion
const event = new CustomEvent(VIBE_COMMAND_EVENT, {
detail: { dsl: suggestion },
})
document.dispatchEvent(event)
}, [setVibePanelInstruction])
const handleVersionChange = useCallback((index: number) => {
const { setVibeFlowCurrentIndex } = workflowStore.getState()
setVibeFlowCurrentIndex(index)
}, [workflowStore])
// Button label - always use "Generate" (refinement mode removed)
const generateButtonLabel = useMemo(() => {
return t('generate.generate', { ns: 'appDebug' })
}, [t])
if (!showVibePanel)
return null
const renderLoading = (
<div className="flex h-full w-full grow flex-col items-center justify-center space-y-3">
<Loading />
<div className="text-[13px] text-text-tertiary">{t('vibe.generatingFlowchart', { ns: 'workflow' })}</div>
</div>
)
const renderOffTopic = (
<div className="flex h-full w-0 grow flex-col items-center justify-center p-6">
<div className="flex max-w-[400px] flex-col items-center text-center">
<div className="text-sm font-medium text-text-secondary">
{t('vibe.offTopicTitle', { ns: 'workflow' })}
</div>
<div className="mt-1 text-xs text-text-tertiary">
{vibePanelMessage || t('vibe.offTopicDefault', { ns: 'workflow' })}
</div>
{vibePanelSuggestions.length > 0 && (
<div className="mt-6 w-full">
<div className="mb-2 text-xs text-text-quaternary">
{t('vibe.trySuggestion', { ns: 'workflow' })}
</div>
<div className="flex flex-col gap-2">
{vibePanelSuggestions.map(suggestion => (
<button
key={suggestion}
onClick={() => handleSuggestionClick(suggestion)}
className="w-full cursor-pointer rounded-lg border border-divider-subtle bg-components-panel-bg px-3 py-2.5 text-left text-sm text-text-secondary transition-colors hover:border-divider-regular hover:bg-state-base-hover"
>
{suggestion}
</button>
))}
</div>
</div>
)}
</div>
</div>
)
return (
<Modal
isShow={showVibePanel}
onClose={handleClose}
className="min-w-[1140px] !p-0"
clickOutsideNotClose
>
<div className="flex h-[680px] flex-wrap">
<div className="h-full w-[300px] shrink-0 overflow-y-auto border-r border-divider-regular p-6">
<div className="mb-5">
<div className="text-lg font-bold leading-[28px] text-text-primary">{t('gotoAnything.actions.vibeTitle', { ns: 'app' })}</div>
<div className="mt-1 text-[13px] font-normal text-text-tertiary">{t('gotoAnything.actions.vibeDesc', { ns: 'app' })}</div>
</div>
<div>
<ModelParameterModal
popupClassName="!w-[520px]"
portalToFollowElemContentClassName="z-[1000]"
isAdvancedMode={true}
provider={model.provider}
completionParams={model.completion_params}
modelId={model.name}
setModel={handleModelChange}
onCompletionParamsChange={handleCompletionParamsChange}
hideDebugWithMultipleModel
/>
</div>
<div className="mt-4">
<div className="system-sm-semibold-uppercase mb-1.5 text-text-secondary">{t('generate.instruction', { ns: 'appDebug' })}</div>
<Textarea
className="min-h-[240px] resize-none rounded-[10px] px-4 pt-3"
placeholder={t('vibe.missingInstruction', { ns: 'workflow' })}
value={vibePanelInstruction}
onChange={handleInstructionChange}
/>
</div>
<div className="mt-7 flex justify-end space-x-2">
<Button onClick={handleClose}>{t('generate.dismiss', { ns: 'appDebug' })}</Button>
<Button
className="flex space-x-1"
variant="primary"
onClick={handleGenerate}
disabled={isVibeGenerating}
>
<Generator className="h-4 w-4" />
<span className="system-xs-semibold">{generateButtonLabel}</span>
</Button>
</div>
</div>
{isVibeGenerating && (
<div className="h-full w-0 grow bg-background-default-subtle">
{renderLoading}
</div>
)}
{!isVibeGenerating && vibePanelIntent === 'off_topic' && renderOffTopic}
{!isVibeGenerating && vibePanelIntent !== 'off_topic' && (vibePanelPreviewNodes.length > 0 || vibePanelMermaidCode) && (
<div className="relative h-full w-0 grow bg-background-default-subtle p-6 pb-0">
<div className="flex h-full flex-col">
<div className="mb-3 flex shrink-0 items-center justify-between">
<div className="flex shrink-0 flex-col">
<div className="system-xl-semibold text-text-secondary">{t('vibe.panelTitle', { ns: 'workflow' })}</div>
<VersionSelector
versionLen={versions.length}
value={currentVersionIndex}
onChange={handleVersionChange}
contentClassName="z-[1200]"
/>
</div>
<div className="flex items-center space-x-2">
<Button
variant="secondary"
size="medium"
onClick={handleCopyMermaid}
className="px-2"
>
<RiClipboardLine className="h-4 w-4" />
</Button>
<Button
variant="primary"
size="medium"
onClick={handleAccept}
>
{t('vibe.apply', { ns: 'workflow' })}
</Button>
</div>
</div>
<div className="flex grow flex-col overflow-hidden pb-6">
<WorkflowPreview
key={currentVersionIndex}
fitView
fitViewOptions={{ padding: 0.2 }}
nodes={vibePanelPreviewNodes}
edges={vibePanelPreviewEdges}
className="rounded-lg border border-divider-subtle"
/>
</div>
</div>
</div>
)}
{!isVibeGenerating && vibePanelIntent !== 'off_topic' && vibePanelPreviewNodes.length === 0 && !vibePanelMermaidCode && <ResPlaceholder />}
</div>
</Modal>
)
}
export default VibePanel

View File

@@ -11,6 +11,7 @@ import type { LayoutSliceShape } from './layout-slice'
import type { NodeSliceShape } from './node-slice'
import type { PanelSliceShape } from './panel-slice'
import type { ToolSliceShape } from './tool-slice'
import type { VibeWorkflowSliceShape } from './vibe-workflow-slice'
import type { VersionSliceShape } from './version-slice'
import type { WorkflowDraftSliceShape } from './workflow-draft-slice'
import type { WorkflowSliceShape } from './workflow-slice'
@@ -33,6 +34,7 @@ import { createNodeSlice } from './node-slice'
import { createPanelSlice } from './panel-slice'
import { createToolSlice } from './tool-slice'
import { createVibeWorkflowSlice } from './vibe-workflow-slice'
import { createVersionSlice } from './version-slice'
import { createWorkflowDraftSlice } from './workflow-draft-slice'
import { createWorkflowSlice } from './workflow-slice'
@@ -55,6 +57,7 @@ export type Shape
& WorkflowSliceShape
& InspectVarsSliceShape
& LayoutSliceShape
& VibeWorkflowSliceShape
& SliceFromInjection
export type InjectWorkflowStoreSliceFn = StateCreator<SliceFromInjection>
@@ -80,6 +83,7 @@ export const createWorkflowStore = (params: CreateWorkflowStoreParams) => {
...createWorkflowSlice(...args),
...createInspectVarsSlice(...args),
...createLayoutSlice(...args),
...createVibeWorkflowSlice(...args),
...(injectWorkflowStoreSliceFn?.(...args) || {} as SliceFromInjection),
}))
}

View File

@@ -0,0 +1,78 @@
import type { StateCreator } from 'zustand'
import type { Edge, Node } from '../../types'
export type FlowGraph = {
nodes: Node[]
edges: Edge[]
}
export type VibeWorkflowSliceShape = {
vibePanelMermaidCode: string
setVibePanelMermaidCode: (vibePanelMermaidCode: string) => void
isVibeGenerating: boolean
setIsVibeGenerating: (isVibeGenerating: boolean) => void
vibePanelInstruction: string
setVibePanelInstruction: (vibePanelInstruction: string) => void
vibeFlowVersions: FlowGraph[]
setVibeFlowVersions: (versions: FlowGraph[]) => void
vibeFlowCurrentIndex: number
setVibeFlowCurrentIndex: (index: number) => void
addVibeFlowVersion: (version: FlowGraph) => void
currentVibeFlow: FlowGraph | undefined
}
const getCurrentVibeFlow = (versions: FlowGraph[], currentIndex: number): FlowGraph | undefined => {
if (!versions || versions.length === 0)
return undefined
const index = currentIndex ?? 0
if (index < 0)
return undefined
return versions[index] || versions[versions.length - 1]
}
export const createVibeWorkflowSlice: StateCreator<VibeWorkflowSliceShape> = (set, get) => ({
vibePanelMermaidCode: '',
setVibePanelMermaidCode: vibePanelMermaidCode => set(() => ({ vibePanelMermaidCode })),
isVibeGenerating: false,
setIsVibeGenerating: isVibeGenerating => set(() => ({ isVibeGenerating })),
vibePanelInstruction: '',
setVibePanelInstruction: vibePanelInstruction => set(() => ({ vibePanelInstruction })),
vibeFlowVersions: [],
setVibeFlowVersions: versions => set((state) => {
const currentVibeFlow = getCurrentVibeFlow(versions, state.vibeFlowCurrentIndex)
return { vibeFlowVersions: versions, currentVibeFlow }
}),
vibeFlowCurrentIndex: 0,
setVibeFlowCurrentIndex: (index) => {
const state = get()
const versions = state.vibeFlowVersions || []
if (!versions || versions.length === 0) {
set({ vibeFlowCurrentIndex: 0, currentVibeFlow: undefined })
return
}
const normalizedIndex = Math.min(Math.max(index, 0), versions.length - 1)
const currentVibeFlow = getCurrentVibeFlow(versions, normalizedIndex)
set({ vibeFlowCurrentIndex: normalizedIndex, currentVibeFlow })
},
addVibeFlowVersion: (version) => {
// Prevent adding empty graphs
if (!version || !version.nodes || version.nodes.length === 0) {
set({ vibeFlowCurrentIndex: -1, currentVibeFlow: undefined })
return
}
set((state) => {
const newVersions = [...(state.vibeFlowVersions || []), version]
const newIndex = newVersions.length - 1
const currentVibeFlow = getCurrentVibeFlow(newVersions, newIndex)
return {
vibeFlowVersions: newVersions,
vibeFlowCurrentIndex: newIndex,
currentVibeFlow,
}
})
},
currentVibeFlow: undefined,
})