Compare commits

...

90 Commits

Author SHA1 Message Date
Stream
47c1da05f2 Merge branch 'main' into feat/memory-orchestration-be 2025-11-24 17:03:32 +08:00
Stream
7008be9fb7 Merge branch 'main' into feat/memory-orchestration-be
Signed-off-by: Stream <Stream_2@qq.com>

# Conflicts:
#	api/controllers/console/app/generator.py
#	api/models/workflow.py
2025-11-14 17:05:31 +08:00
Stream
b5cde91c92 chore: ruff
Signed-off-by: Stream <Stream_2@qq.com>
2025-11-14 16:43:50 +08:00
Stream
b0f73d681c feat: new instruction-generate with new LLMGenerator
Signed-off-by: Stream <Stream_2@qq.com>
2025-11-14 16:43:33 +08:00
Stream
77f70c5973 Merge branch 'main' into feat/memory-orchestration-be 2025-11-12 16:15:09 +08:00
Stream
e53147266c Merge branch 'main' into feat/memory-orchestration-be
# Conflicts:
#	api/core/app/apps/advanced_chat/app_runner.py
#	api/models/workflow.py
2025-11-05 17:40:48 +08:00
Stream
e47133569b refactor: use underscore as separator for node_id and memory_id 2025-11-05 17:38:13 +08:00
Stream
f0ff2e1f2c refactor: add node_id to MemoryBlockSpec 2025-10-28 13:45:30 +08:00
Stream
89d53ecf50 fix: memory model_config fields 2025-10-21 19:41:52 +08:00
Stream
bb8b6b494d chore: run ruff 2025-10-21 16:28:26 +08:00
Stream
5d5485e49d Merge branch 'main' into feat/memory-orchestration-be 2025-10-21 15:19:48 +08:00
Stream
d6197e73b3 fix: merge 2025-10-20 16:29:13 +08:00
Stream
cc5ceca342 Merge branch 'main' into feat/memory-orchestration-be
# Conflicts:
#	api/controllers/console/app/workflow.py
#	api/core/app/apps/advanced_chat/app_runner.py
#	api/core/workflow/nodes/llm/node.py
2025-10-20 16:24:48 +08:00
Stream
bfba8bec2d fix: workflow fields 2025-10-16 17:15:05 +08:00
Stream
65a3646ce7 fix: error handling with model validation 2025-10-16 16:33:33 +08:00
Stream
cb73335599 chore: run ruff 2025-10-15 16:57:18 +08:00
Stream
f4fa57dac9 fix: store memory_blocks in correct field 2025-10-15 16:56:12 +08:00
Stream
7ca06931ec fix: unify memory variable in VariablePool 2025-10-15 14:39:05 +08:00
Stream
f4567fbf9e fix: fix circular ref 2025-10-11 16:27:40 +08:00
Stream
8fd088754a fix: fix circular ref 2025-10-11 16:16:51 +08:00
Stream
61d9428064 refactor: fix basedpyright error 2025-10-10 18:47:16 +08:00
Stream
f6038a4557 Merge branch 'main' into feat/memory-orchestration-be 2025-10-10 18:43:59 +08:00
Stream
c367f80ec5 Merge branch 'main' into feat/memory-orchestration-be 2025-10-09 15:01:03 +08:00
Stream
791f33fd0b Merge branch 'main' into feat/memory-orchestration-be 2025-09-28 22:41:24 +08:00
Stream
1e0a3b163e refactor: fix ruff 2025-09-28 22:41:07 +08:00
Stream
bb1f1a56a5 feat: update MemoryListApi response format with ChatflowConversationMetadata 2025-09-28 22:36:10 +08:00
Stream
15be85514d fix: chatflow message visibility from index 2025-09-28 21:20:37 +08:00
Stream
8833fee232 feat: move version update logic out of save_memory 2025-09-23 23:17:34 +08:00
Stream
5bf642c3f9 feat: expose version to MemoryBlock 2025-09-23 23:09:45 +08:00
Stream
3d7d4182a6 feat: add endpoints to delete memory 2025-09-23 19:07:37 +08:00
Stream
75c221038d feat: add endpoints to __init__.py 2025-09-23 18:35:11 +08:00
Stream
b7b5b0b8d0 Merge branch 'main' into feat/memory-orchestration-be 2025-09-23 17:43:52 +08:00
Stream
6eab6a675c feat: add created_by to memory blocks 2025-09-23 17:35:36 +08:00
Stream
d94e598a89 revert: remove memory database migration 2025-09-23 14:19:40 +08:00
Stream
28acb70118 feat: add edited_by_user field 2025-09-22 18:37:54 +08:00
Stream
7c35aaa99d refactor: remove MemoryBlockWithVisibility 2025-09-22 18:16:37 +08:00
Stream
a8c2a300f6 refactor: make memories API return MemoryBlock 2025-09-22 17:14:07 +08:00
Stream
d654d9d8b1 refactor: make ChatflowMemoryVariable.value JSON 2025-09-22 16:46:39 +08:00
Stream
394b7d09b8 refactor: fix basedpyright/ruff errors 2025-09-22 15:17:19 +08:00
Stream
e9313b9c1b Merge branch 'main' into feat/memory-orchestration-be
# Conflicts:
#	api/core/app/apps/advanced_chat/app_runner.py
#	api/core/workflow/constants.py
#	api/core/workflow/entities/variable_pool.py
#	api/core/workflow/nodes/llm/node.py
#	api/models/workflow.py
2025-09-22 14:46:30 +08:00
Stream
ac5dd1f45a refactor: update MemoryApi(Resource) for version 2025-09-16 19:25:17 +08:00
Stream
3005cf3282 refactor: update MemoryApi(WebApiResource) for version 2025-09-16 19:12:08 +08:00
Stream
54b272206e refactor: add version param to get_session_memories and get_persistent_memories 2025-09-16 18:32:58 +08:00
Stream
3d761a3189 refactor: make save_memory and get_memory_by_spec work on latest version 2025-09-15 19:28:22 +08:00
Stream
e3903f34e4 refactor: add version field to ChatflowMemoryVariable table 2025-09-15 19:27:41 +08:00
Stream
f4f055fb36 refactor: add version field to MemoryBlockWithVisibility 2025-09-15 19:27:17 +08:00
Stream
8563ae5511 feat: add inference for VersionedMemory type when deserializing 2025-09-15 16:13:07 +08:00
Stream
2c765ccfae refactor: use VersionedMemoryVariable in ChatflowMemoryService.get_memory_by_spec 2025-09-15 15:47:02 +08:00
Stream
626e7b2211 refactor: use VersionedMemoryVariable in ChatflowMemoryService.save_memory 2025-09-15 15:41:33 +08:00
Stream
516b6b0fa8 refactor: use VersionedMemoryVariable in creation of WorkflowDraftVariable instead of StringVariable 2025-09-15 15:39:38 +08:00
Stream
613d086f1e refactor: give VersionedMemoryValue a default value 2025-09-15 15:38:20 +08:00
Stream
9e0630f012 fix: use correct description from spec 2025-09-15 15:30:08 +08:00
Stream
d6d9554954 fix: fix basedpyright errors 2025-09-15 14:20:30 +08:00
Stream
2a532ab729 Merge branch 'main' into feat/memory-orchestration-be
# Conflicts:
#	api/core/app/apps/advanced_chat/app_runner.py
#	api/core/prompt/entities/advanced_prompt_entities.py
#	api/core/variables/segments.py
2025-09-15 14:14:56 +08:00
Stream
03eef65b25 feat: add VersionedMemorySegment and VersionedMemoryVariable 2025-09-15 14:00:54 +08:00
Stream
ad07d63994 feat: add VersionedMemoryValueModel 2025-09-15 14:00:54 +08:00
Stream
8685f055ea fix: use model parameters from memory_spec in llm_generator 2025-09-15 14:00:54 +08:00
Stream
3b868a1cec feat: integrate VariablePool into memory update process 2025-09-15 14:00:53 +08:00
Stream
ab389eaa8e fix: fix ruff 2025-09-15 14:00:53 +08:00
Stream
008f778e8f fix: fix mypy 2025-09-15 14:00:53 +08:00
Stream
6af168cb31 Merge branch 'main' into feat/memory-orchestration-be 2025-08-25 14:54:14 +08:00
Stream
29f56cf0cf chore: add database migration 2025-08-22 21:07:54 +08:00
Stream
11b6ea742d feat: add index for data tables 2025-08-22 20:43:49 +08:00
Stream
05d231ad33 fix: fix bugs check by Claude Code 2025-08-22 19:59:17 +08:00
Stream
48f3c69c69 fix: fix bugs check by Claude Code 2025-08-22 17:54:18 +08:00
Stream
8b68020453 refactor: refactor from ChatflowHistoryService and ChatflowMemoryService 2025-08-22 17:44:27 +08:00
Stream
4d2fc66a8d feat: refactor: refactor from ChatflowHistoryService and ChatflowMemoryService 2025-08-22 15:33:45 +08:00
Stream
f72ed4898c refactor: refactor from ChatflowHistoryService and ChatflowMemoryService 2025-08-22 14:57:27 +08:00
Stream
85a73181cc chore: run ruff 2025-08-21 17:23:24 +08:00
Stream
e31e4ab677 feat: add Service API for memory read and modify 2025-08-21 17:22:39 +08:00
Stream
0d95c2192e feat: add Web API for memory read and modify 2025-08-21 17:17:08 +08:00
Stream
1fa8b26e55 feat: fetch memory block from WorkflowDraftVariable when debugging single node 2025-08-21 15:17:25 +08:00
Stream
4b085d46f6 feat: update variable pool when update memory 2025-08-21 15:15:23 +08:00
Stream
635c4ed4ce feat: add memory update check in AdvancedChatAppRunner 2025-08-21 14:24:17 +08:00
Stream
7ffcf8dd6f feat: add memory update check in AdvancedChatAppRunner 2025-08-21 13:27:00 +08:00
Stream
97cd21d3be feat: sync conversation history with chatflow_ tables in chatflow 2025-08-21 13:03:19 +08:00
Stream
a13cb7e1c5 feat: init memory block for VariablePool in AdvancedChatAppRunner.run 2025-08-21 11:40:30 +08:00
Stream
7b602e9003 feat: wait for sync memory update in AdvancedChatAppRunner.run 2025-08-21 11:32:27 +08:00
Stream
5a26ebec8f feat: add _fetch_memory_blocks for AdvancedChatAppRunner 2025-08-21 11:28:47 +08:00
Stream
8341b8b1c1 feat: add MemoryBlock config to LLM's memory config 2025-08-20 19:53:44 +08:00
Stream
bbb640c9a2 feat: add MemoryBlock to VariablePool 2025-08-20 19:45:18 +08:00
Stream
0c97bbf137 chore: run ruff 2025-08-20 19:12:34 +08:00
Stream
45fddc70d5 feat: add ChatflowHistoryService and ChatflowMemoryService 2025-08-20 19:11:12 +08:00
Stream
f977dc410a feat: add MemorySyncTimeoutError 2025-08-20 17:45:53 +08:00
Stream
d535818505 feat: add new_memory_block_variable for WorkflowDraftVariable 2025-08-20 17:41:45 +08:00
Stream
fcf4e1f37d feat: add MEMORY_BLOCK_VARIABLE_NODE_ID 2025-08-20 17:41:13 +08:00
Stream
38130c8502 feat: add memory_blocks property to workflow's graph for memory block configuration 2025-08-20 17:19:48 +08:00
Stream
f284c91988 feat: add data tables for chatflow memory 2025-08-20 17:16:54 +08:00
Stream
584b2cefa3 feat: add pydantic models for memory 2025-08-20 17:03:15 +08:00
Stream
42091b4a79 feat: add MEMORY_BLOCK in DraftVariableType 2025-08-20 16:51:07 +08:00
28 changed files with 2563 additions and 107 deletions

8
api/.idea/vcs.xml generated
View File

@@ -1,5 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CommitMessageInspectionProfile">
<profile version="1.0">
<inspection_tool class="CommitFormat" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="CommitNamingConvention" enabled="true" level="WARNING" enabled_by_default="true" />
</profile>
</component>
<component name="IssueNavigationConfiguration">
<option name="links">
<list>
@@ -14,4 +20,4 @@
<mapping directory="" vcs="Git" />
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>
</project>

View File

@@ -1,5 +1,3 @@
from collections.abc import Sequence
from flask_restx import Resource, fields, reqparse
from controllers.console import console_ns
@@ -11,14 +9,9 @@ from controllers.console.app.error import (
)
from controllers.console.wraps import account_initialization_required, setup_required
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.helper.code_executor.code_node_provider import CodeNodeProvider
from core.helper.code_executor.javascript.javascript_code_provider import JavascriptCodeProvider
from core.helper.code_executor.python3.python3_code_provider import Python3CodeProvider
from core.llm_generator.llm_generator import LLMGenerator
from core.model_runtime.errors.invoke import InvokeError
from extensions.ext_database import db
from libs.login import current_account_with_tenant, login_required
from models import App
from services.workflow_service import WorkflowService
@@ -178,11 +171,29 @@ class InstructionGenerateApi(Resource):
console_ns.model(
"InstructionGenerateRequest",
{
"flow_id": fields.String(required=True, description="Workflow/Flow ID"),
"node_id": fields.String(description="Node ID for workflow context"),
"current": fields.String(description="Current instruction text"),
"language": fields.String(default="javascript", description="Programming language (javascript/python)"),
"instruction": fields.String(required=True, description="Instruction for generation"),
"type": fields.String(
required=True,
description="Request type",
enum=[
"legacy_prompt_generate",
"workflow_prompt_generate",
"workflow_code_generate",
"workflow_prompt_edit",
"workflow_code_edit",
"memory_template_generate",
"memory_instruction_generate",
"memory_template_edit",
"memory_instruction_edit",
]
),
"flow_id": fields.String(description="Workflow/Flow ID"),
"node_id": fields.String(description="Node ID (optional)"),
"current": fields.String(description="Current content"),
"language": fields.String(
default="javascript",
description="Programming language (javascript/python)"
),
"instruction": fields.String(required=True, description="User instruction"),
"model_config": fields.Raw(required=True, description="Model configuration"),
"ideal_output": fields.String(description="Expected ideal output"),
},
@@ -197,7 +208,8 @@ class InstructionGenerateApi(Resource):
def post(self):
parser = (
reqparse.RequestParser()
.add_argument("flow_id", type=str, required=True, default="", location="json")
.add_argument("type", type=str, required=True, nullable=False, location="json")
.add_argument("flow_id", type=str, required=False, default="", location="json")
.add_argument("node_id", type=str, required=False, default="", location="json")
.add_argument("current", type=str, required=False, default="", location="json")
.add_argument("language", type=str, required=False, default="javascript", location="json")
@@ -207,70 +219,16 @@ class InstructionGenerateApi(Resource):
)
args = parser.parse_args()
_, current_tenant_id = current_account_with_tenant()
providers: list[type[CodeNodeProvider]] = [Python3CodeProvider, JavascriptCodeProvider]
code_provider: type[CodeNodeProvider] | None = next(
(p for p in providers if p.is_accept_language(args["language"])), None
)
code_template = code_provider.get_default_code() if code_provider else ""
try:
# Generate from nothing for a workflow node
if (args["current"] == code_template or args["current"] == "") and args["node_id"] != "":
app = db.session.query(App).where(App.id == args["flow_id"]).first()
if not app:
return {"error": f"app {args['flow_id']} not found"}, 400
workflow = WorkflowService().get_draft_workflow(app_model=app)
if not workflow:
return {"error": f"workflow {args['flow_id']} not found"}, 400
nodes: Sequence = workflow.graph_dict["nodes"]
node = [node for node in nodes if node["id"] == args["node_id"]]
if len(node) == 0:
return {"error": f"node {args['node_id']} not found"}, 400
node_type = node[0]["data"]["type"]
match node_type:
case "llm":
return LLMGenerator.generate_rule_config(
current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
no_variable=True,
)
case "agent":
return LLMGenerator.generate_rule_config(
current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
no_variable=True,
)
case "code":
return LLMGenerator.generate_code(
tenant_id=current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
code_language=args["language"],
)
case _:
return {"error": f"invalid node type: {node_type}"}
if args["node_id"] == "" and args["current"] != "": # For legacy app without a workflow
return LLMGenerator.instruction_modify_legacy(
tenant_id=current_tenant_id,
flow_id=args["flow_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
)
if args["node_id"] != "" and args["current"] != "": # For workflow node
return LLMGenerator.instruction_modify_workflow(
tenant_id=current_tenant_id,
flow_id=args["flow_id"],
node_id=args["node_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
workflow_service=WorkflowService(),
)
return {"error": "incompatible parameters"}, 400
# Validate parameters
is_valid, error_message = self._validate_params(args["type"], args)
if not is_valid:
return {"error": error_message}, 400
# Route based on type
return self._handle_by_type(args["type"], args, current_tenant_id)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
except QuotaExceededError:
@@ -280,6 +238,131 @@ class InstructionGenerateApi(Resource):
except InvokeError as e:
raise CompletionRequestError(e.description)
def _validate_params(self, request_type: str, args: dict) -> tuple[bool, str]:
"""
Validate request parameters
Returns:
(is_valid, error_message)
"""
# All types require instruction and model_config
if not args.get("instruction"):
return False, "instruction is required"
if not args.get("model_config"):
return False, "model_config is required"
# Edit types require flow_id and current
if request_type.endswith("_edit"):
if not args.get("flow_id"):
return False, f"{request_type} requires flow_id"
if not args.get("current"):
return False, f"{request_type} requires current content"
# Code generate requires language
if request_type == "workflow_code_generate":
if args.get("language") not in ["python", "javascript"]:
return False, "language must be 'python' or 'javascript'"
return True, ""
def _handle_by_type(self, request_type: str, args: dict, tenant_id: str):
"""
Route handling based on type
"""
match request_type:
case "legacy_prompt_generate":
# Legacy prompt generation doesn't exist, this is actually an edit
if not args.get("flow_id"):
return {"error": "legacy_prompt_generate requires flow_id"}, 400
return LLMGenerator.instruction_modify_legacy(
tenant_id=tenant_id,
flow_id=args["flow_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
)
case "workflow_prompt_generate":
return LLMGenerator.generate_rule_config(
tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
no_variable=True,
)
case "workflow_code_generate":
return LLMGenerator.generate_code(
tenant_id=tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
code_language=args["language"],
)
case "workflow_prompt_edit":
return LLMGenerator.instruction_modify_workflow(
tenant_id=tenant_id,
flow_id=args["flow_id"],
node_id=args["node_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
workflow_service=WorkflowService(),
)
case "workflow_code_edit":
# Code edit uses the same workflow edit logic
return LLMGenerator.instruction_modify_workflow(
tenant_id=tenant_id,
flow_id=args["flow_id"],
node_id=args["node_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
workflow_service=WorkflowService(),
)
case "memory_template_generate":
return LLMGenerator.generate_memory_template(
tenant_id=tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
)
case "memory_instruction_generate":
return LLMGenerator.generate_memory_instruction(
tenant_id=tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
)
case "memory_template_edit":
return LLMGenerator.edit_memory_template(
tenant_id=tenant_id,
flow_id=args["flow_id"],
node_id=args.get("node_id") or None,
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
)
case "memory_instruction_edit":
return LLMGenerator.edit_memory_instruction(
tenant_id=tenant_id,
flow_id=args["flow_id"],
node_id=args.get("node_id") or None,
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
)
case _:
return {"error": f"Invalid request type: {request_type}"}, 400
@console_ns.route("/instruction-generate/template")
class InstructionGenerationTemplateApi(Resource):

View File

@@ -5,6 +5,7 @@ from typing import cast
from flask import abort, request
from flask_restx import Resource, fields, inputs, marshal_with, reqparse
from pydantic_core import ValidationError
from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
@@ -110,6 +111,7 @@ class DraftWorkflowApi(Resource):
"hash": fields.String(description="Workflow hash for validation"),
"environment_variables": fields.List(fields.Raw, required=True, description="Environment variables"),
"conversation_variables": fields.List(fields.Raw, description="Conversation variables"),
"memory_blocks": fields.List(fields.Raw, description="Memory blocks"),
},
)
)
@@ -144,6 +146,7 @@ class DraftWorkflowApi(Resource):
.add_argument("hash", type=str, required=False, location="json")
.add_argument("environment_variables", type=list, required=True, location="json")
.add_argument("conversation_variables", type=list, required=False, location="json")
.add_argument("memory_blocks", type=list, required=False, location="json")
)
args = parser.parse_args()
elif "text/plain" in content_type:
@@ -161,6 +164,7 @@ class DraftWorkflowApi(Resource):
"hash": data.get("hash"),
"environment_variables": data.get("environment_variables"),
"conversation_variables": data.get("conversation_variables"),
"memory_blocks": data.get("memory_blocks"),
}
except json.JSONDecodeError:
return {"message": "Invalid JSON data"}, 400
@@ -177,6 +181,11 @@ class DraftWorkflowApi(Resource):
conversation_variables = [
variable_factory.build_conversation_variable_from_mapping(obj) for obj in conversation_variables_list
]
memory_blocks_list = args.get("memory_blocks") or []
from core.memory.entities import MemoryBlockSpec
memory_blocks = [
MemoryBlockSpec.model_validate(obj) for obj in memory_blocks_list
]
workflow = workflow_service.sync_draft_workflow(
app_model=app_model,
graph=args["graph"],
@@ -185,9 +194,12 @@ class DraftWorkflowApi(Resource):
account=current_user,
environment_variables=environment_variables,
conversation_variables=conversation_variables,
memory_blocks=memory_blocks,
)
except WorkflowHashNotEqualError:
raise DraftWorkflowNotSync()
except ValidationError as e:
return {"message": str(e)}, 400
return {
"result": "success",

View File

@@ -19,6 +19,7 @@ from .app import (
annotation,
app,
audio,
chatflow_memory,
completion,
conversation,
file,
@@ -40,6 +41,7 @@ __all__ = [
"annotation",
"app",
"audio",
"chatflow_memory",
"completion",
"conversation",
"dataset",

View File

@@ -0,0 +1,124 @@
from flask_restx import Resource, reqparse
from controllers.service_api import api
from controllers.service_api.wraps import FetchUserArg, WhereisUserArg, validate_app_token
from core.memory.entities import MemoryBlock, MemoryCreatedBy
from core.workflow.runtime import VariablePool
from models import App, EndUser
from services.chatflow_memory_service import ChatflowMemoryService
from services.workflow_service import WorkflowService
class MemoryListApi(Resource):
@validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
def get(self, app_model: App, end_user: EndUser):
parser = reqparse.RequestParser()
parser.add_argument("conversation_id", required=False, type=str | None, default=None)
parser.add_argument("memory_id", required=False, type=str | None, default=None)
parser.add_argument("version", required=False, type=int | None, default=None)
args = parser.parse_args()
conversation_id: str | None = args.get("conversation_id")
memory_id = args.get("memory_id")
version = args.get("version")
if conversation_id:
result = ChatflowMemoryService.get_persistent_memories_with_conversation(
app_model,
MemoryCreatedBy(end_user_id=end_user.id),
conversation_id,
version
)
session_memories = ChatflowMemoryService.get_session_memories_with_conversation(
app_model,
MemoryCreatedBy(end_user_id=end_user.id),
conversation_id,
version
)
result = [*result, *session_memories]
else:
result = ChatflowMemoryService.get_persistent_memories(
app_model,
MemoryCreatedBy(end_user_id=end_user.id),
version
)
if memory_id:
result = [it for it in result if it.spec.id == memory_id]
return [it for it in result if it.spec.end_user_visible]
class MemoryEditApi(Resource):
@validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
def put(self, app_model: App, end_user: EndUser):
parser = reqparse.RequestParser()
parser.add_argument('id', type=str, required=True)
parser.add_argument("conversation_id", type=str | None, required=False, default=None)
parser.add_argument('node_id', type=str | None, required=False, default=None)
parser.add_argument('update', type=str, required=True)
args = parser.parse_args()
workflow = WorkflowService().get_published_workflow(app_model)
update = args.get("update")
conversation_id = args.get("conversation_id")
node_id = args.get("node_id")
if not isinstance(update, str):
return {'error': 'Invalid update'}, 400
if not workflow:
return {'error': 'Workflow not found'}, 404
memory_spec = next((it for it in workflow.memory_blocks if it.id == args['id']), None)
if not memory_spec:
return {'error': 'Memory not found'}, 404
# First get existing memory
existing_memory = ChatflowMemoryService.get_memory_by_spec(
spec=memory_spec,
tenant_id=app_model.tenant_id,
app_id=app_model.id,
created_by=MemoryCreatedBy(end_user_id=end_user.id),
conversation_id=conversation_id,
node_id=node_id,
is_draft=False
)
# Create updated memory instance with incremented version
updated_memory = MemoryBlock(
spec=existing_memory.spec,
tenant_id=existing_memory.tenant_id,
app_id=existing_memory.app_id,
conversation_id=existing_memory.conversation_id,
node_id=existing_memory.node_id,
value=update, # New value
version=existing_memory.version + 1, # Increment version for update
edited_by_user=True,
created_by=existing_memory.created_by,
)
ChatflowMemoryService.save_memory(updated_memory, VariablePool(), False)
return '', 204
class MemoryDeleteApi(Resource):
@validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
def delete(self, app_model: App, end_user: EndUser):
parser = reqparse.RequestParser()
parser.add_argument('id', type=str, required=False, default=None)
args = parser.parse_args()
memory_id = args.get('id')
if memory_id:
ChatflowMemoryService.delete_memory(
app_model,
memory_id,
MemoryCreatedBy(end_user_id=end_user.id)
)
return '', 204
else:
ChatflowMemoryService.delete_all_user_memories(
app_model,
MemoryCreatedBy(end_user_id=end_user.id)
)
return '', 200
api.add_resource(MemoryListApi, '/memories')
api.add_resource(MemoryEditApi, '/memory-edit')
api.add_resource(MemoryDeleteApi, '/memories')

View File

@@ -18,6 +18,7 @@ web_ns = Namespace("web", description="Web application API operations", path="/"
from . import (
app,
audio,
chatflow_memory,
completion,
conversation,
feature,
@@ -39,6 +40,7 @@ __all__ = [
"app",
"audio",
"bp",
"chatflow_memory",
"completion",
"conversation",
"feature",

View File

@@ -0,0 +1,123 @@
from flask_restx import reqparse
from controllers.web import api
from controllers.web.wraps import WebApiResource
from core.memory.entities import MemoryBlock, MemoryCreatedBy
from core.workflow.runtime import VariablePool
from models import App, EndUser
from services.chatflow_memory_service import ChatflowMemoryService
from services.workflow_service import WorkflowService
class MemoryListApi(WebApiResource):
def get(self, app_model: App, end_user: EndUser):
parser = reqparse.RequestParser()
parser.add_argument("conversation_id", required=False, type=str | None, default=None)
parser.add_argument("memory_id", required=False, type=str | None, default=None)
parser.add_argument("version", required=False, type=int | None, default=None)
args = parser.parse_args()
conversation_id: str | None = args.get("conversation_id")
memory_id = args.get("memory_id")
version = args.get("version")
if conversation_id:
result = ChatflowMemoryService.get_persistent_memories_with_conversation(
app_model,
MemoryCreatedBy(end_user_id=end_user.id),
conversation_id,
version
)
session_memories = ChatflowMemoryService.get_session_memories_with_conversation(
app_model,
MemoryCreatedBy(end_user_id=end_user.id),
conversation_id,
version
)
result = [*result, *session_memories]
else:
result = ChatflowMemoryService.get_persistent_memories(
app_model,
MemoryCreatedBy(end_user_id=end_user.id),
version
)
if memory_id:
result = [it for it in result if it.spec.id == memory_id]
return [it for it in result if it.spec.end_user_visible]
class MemoryEditApi(WebApiResource):
def put(self, app_model: App, end_user: EndUser):
parser = reqparse.RequestParser()
parser.add_argument('id', type=str, required=True)
parser.add_argument("conversation_id", type=str | None, required=False, default=None)
parser.add_argument('node_id', type=str | None, required=False, default=None)
parser.add_argument('update', type=str, required=True)
args = parser.parse_args()
workflow = WorkflowService().get_published_workflow(app_model)
update = args.get("update")
conversation_id = args.get("conversation_id")
node_id = args.get("node_id")
if not isinstance(update, str):
return {'error': 'Update must be a string'}, 400
if not workflow:
return {'error': 'Workflow not found'}, 404
memory_spec = next((it for it in workflow.memory_blocks if it.id == args['id']), None)
if not memory_spec:
return {'error': 'Memory not found'}, 404
if not memory_spec.end_user_editable:
return {'error': 'Memory not editable'}, 403
# First get existing memory
existing_memory = ChatflowMemoryService.get_memory_by_spec(
spec=memory_spec,
tenant_id=app_model.tenant_id,
app_id=app_model.id,
created_by=MemoryCreatedBy(end_user_id=end_user.id),
conversation_id=conversation_id,
node_id=node_id,
is_draft=False
)
# Create updated memory instance with incremented version
updated_memory = MemoryBlock(
spec=existing_memory.spec,
tenant_id=existing_memory.tenant_id,
app_id=existing_memory.app_id,
conversation_id=existing_memory.conversation_id,
node_id=existing_memory.node_id,
value=update, # New value
version=existing_memory.version + 1, # Increment version for update
edited_by_user=True,
created_by=existing_memory.created_by,
)
ChatflowMemoryService.save_memory(updated_memory, VariablePool(), False)
return '', 204
class MemoryDeleteApi(WebApiResource):
def delete(self, app_model: App, end_user: EndUser):
parser = reqparse.RequestParser()
parser.add_argument('id', type=str, required=False, default=None)
args = parser.parse_args()
memory_id = args.get('id')
if memory_id:
ChatflowMemoryService.delete_memory(
app_model,
memory_id,
MemoryCreatedBy(end_user_id=end_user.id)
)
return '', 204
else:
ChatflowMemoryService.delete_all_user_memories(
app_model,
MemoryCreatedBy(end_user_id=end_user.id)
)
return '', 200
api.add_resource(MemoryListApi, '/memories')
api.add_resource(MemoryEditApi, '/memory-edit')
api.add_resource(MemoryDeleteApi, '/memories')

View File

@@ -1,10 +1,11 @@
import logging
import time
from collections.abc import Mapping, Sequence
from collections.abc import Mapping, MutableMapping, Sequence
from typing import Any, cast
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import override
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig
from core.app.apps.base_app_queue_manager import AppQueueManager
@@ -20,6 +21,8 @@ from core.app.entities.queue_entities import (
QueueTextChunkEvent,
)
from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature
from core.memory.entities import MemoryCreatedBy, MemoryScope
from core.model_runtime.entities import AssistantPromptMessage, UserPromptMessage
from core.moderation.base import ModerationError
from core.moderation.input_moderation import InputModeration
from core.variables.variables import VariableUnion
@@ -27,6 +30,7 @@ from core.workflow.enums import WorkflowType
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.workflow.graph_events import GraphRunSucceededEvent
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.runtime import GraphRuntimeState, VariablePool
@@ -39,6 +43,8 @@ from models import Workflow
from models.enums import UserFrom
from models.model import App, Conversation, Message, MessageAnnotation
from models.workflow import ConversationVariable
from services.chatflow_history_service import ChatflowHistoryService
from services.chatflow_memory_service import ChatflowMemoryService
logger = logging.getLogger(__name__)
@@ -81,6 +87,11 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
self._workflow_node_execution_repository = workflow_node_execution_repository
def run(self):
ChatflowMemoryService.wait_for_sync_memory_completion(
workflow=self._workflow,
conversation_id=self.conversation.id
)
app_config = self.application_generate_entity.app_config
app_config = cast(AdvancedChatAppConfig, app_config)
@@ -143,6 +154,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
# Based on the definition of `VariableUnion`,
# `list[Variable]` can be safely used as `list[VariableUnion]` since they are compatible.
conversation_variables=conversation_variables,
memory_blocks=self._fetch_memory_blocks(),
)
# init graph
@@ -206,6 +218,31 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
for event in generator:
self._handle_event(workflow_entry, event)
try:
self._check_app_memory_updates(variable_pool)
except Exception as e:
logger.exception("Failed to check app memory updates", exc_info=e)
@override
def _handle_event(self, workflow_entry: WorkflowEntry, event: Any) -> None:
super()._handle_event(workflow_entry, event)
if isinstance(event, GraphRunSucceededEvent):
workflow_outputs = event.outputs
if not workflow_outputs:
logger.warning("Chatflow output is empty.")
return
assistant_message = workflow_outputs.get('answer')
if not assistant_message:
logger.warning("Chatflow output does not contain 'answer'.")
return
if not isinstance(assistant_message, str):
logger.warning("Chatflow output 'answer' is not a string.")
return
try:
self._sync_conversation_to_chatflow_tables(assistant_message)
except Exception as e:
logger.exception("Failed to sync conversation to memory tables", exc_info=e)
def handle_input_moderation(
self,
app_record: App,
@@ -403,3 +440,67 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
# Return combined list
return existing_variables + new_variables
def _fetch_memory_blocks(self) -> Mapping[str, str]:
"""fetch all memory blocks for current app"""
memory_blocks_dict: MutableMapping[str, str] = {}
is_draft = (self.application_generate_entity.invoke_from == InvokeFrom.DEBUGGER)
conversation_id = self.conversation.id
memory_block_specs = self._workflow.memory_blocks
# Get runtime memory values
memories = ChatflowMemoryService.get_memories_by_specs(
memory_block_specs=memory_block_specs,
tenant_id=self._workflow.tenant_id,
app_id=self._workflow.app_id,
node_id=None,
conversation_id=conversation_id,
is_draft=is_draft,
created_by=self._get_created_by(),
)
# Build memory_id -> value mapping
for memory in memories:
if memory.spec.scope == MemoryScope.APP:
# App level: use memory_id directly
memory_blocks_dict[memory.spec.id] = memory.value
else: # NODE scope
node_id = memory.node_id
if not node_id:
logger.warning("Memory block %s has no node_id, skip.", memory.spec.id)
continue
key = f"{node_id}_{memory.spec.id}"
memory_blocks_dict[key] = memory.value
return memory_blocks_dict
def _sync_conversation_to_chatflow_tables(self, assistant_message: str):
ChatflowHistoryService.save_app_message(
prompt_message=UserPromptMessage(content=(self.application_generate_entity.query)),
conversation_id=self.conversation.id,
app_id=self._workflow.app_id,
tenant_id=self._workflow.tenant_id
)
ChatflowHistoryService.save_app_message(
prompt_message=AssistantPromptMessage(content=assistant_message),
conversation_id=self.conversation.id,
app_id=self._workflow.app_id,
tenant_id=self._workflow.tenant_id
)
def _check_app_memory_updates(self, variable_pool: VariablePool):
is_draft = (self.application_generate_entity.invoke_from == InvokeFrom.DEBUGGER)
ChatflowMemoryService.update_app_memory_if_needed(
workflow=self._workflow,
conversation_id=self.conversation.id,
variable_pool=variable_pool,
is_draft=is_draft,
created_by=self._get_created_by()
)
def _get_created_by(self) -> MemoryCreatedBy:
if self.application_generate_entity.invoke_from in {InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE}:
return MemoryCreatedBy(account_id=self.application_generate_entity.user_id)
else:
return MemoryCreatedBy(end_user_id=self.application_generate_entity.user_id)

View File

@@ -1,8 +1,8 @@
import json
import logging
import re
from collections.abc import Sequence
from typing import Protocol, cast
from collections.abc import Callable, Mapping, Sequence
from typing import Any, Protocol, cast
import json_repair
@@ -14,10 +14,16 @@ from core.llm_generator.prompts import (
JAVASCRIPT_CODE_GENERATOR_PROMPT_TEMPLATE,
LLM_MODIFY_CODE_SYSTEM,
LLM_MODIFY_PROMPT_SYSTEM,
MEMORY_INSTRUCTION_EDIT_SYSTEM_PROMPT,
MEMORY_INSTRUCTION_GENERATION_SYSTEM_PROMPT,
MEMORY_TEMPLATE_EDIT_SYSTEM_PROMPT,
MEMORY_TEMPLATE_GENERATION_SYSTEM_PROMPT,
MEMORY_UPDATE_PROMPT,
PYTHON_CODE_GENERATOR_PROMPT_TEMPLATE,
SYSTEM_STRUCTURED_OUTPUT_GENERATE,
WORKFLOW_RULE_CONFIG_PROMPT_GENERATE_TEMPLATE,
)
from core.memory.entities import MemoryBlock, MemoryBlockSpec
from core.model_manager import ModelManager
from core.model_runtime.entities.llm_entities import LLMResult
from core.model_runtime.entities.message_entities import PromptMessage, SystemPromptMessage, UserPromptMessage
@@ -28,6 +34,7 @@ from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.ops.utils import measure_time
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from core.workflow.runtime import VariablePool
from extensions.ext_database import db
from extensions.ext_storage import storage
from models import App, Message, WorkflowNodeExecutionModel
@@ -506,16 +513,17 @@ class LLMGenerator:
node_type: str,
ideal_output: str | None,
):
LAST_RUN = "{{#last_run#}}"
CURRENT = "{{#current#}}"
ERROR_MESSAGE = "{{#error_message#}}"
injected_instruction = instruction
if LAST_RUN in injected_instruction:
injected_instruction = injected_instruction.replace(LAST_RUN, json.dumps(last_run))
if CURRENT in injected_instruction:
injected_instruction = injected_instruction.replace(CURRENT, current or "null")
if ERROR_MESSAGE in injected_instruction:
injected_instruction = injected_instruction.replace(ERROR_MESSAGE, error_message or "null")
# Use unified variable injector
variable_providers = {
"last_run": lambda: json.dumps(last_run) if last_run else "null",
"current": lambda: current or "null",
"error_message": lambda: error_message or "null",
}
injected_instruction = LLMGenerator.__inject_variables(
instruction=instruction,
variable_providers=variable_providers
)
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
@@ -562,3 +570,424 @@ class LLMGenerator:
"Failed to invoke LLM model, model: %s", json.dumps(model_config.get("name")), exc_info=True
)
return {"error": f"An unexpected error occurred: {str(e)}"}
@staticmethod
def update_memory_block(
tenant_id: str,
visible_history: Sequence[tuple[str, str]],
variable_pool: VariablePool,
memory_block: MemoryBlock,
memory_spec: MemoryBlockSpec
) -> str:
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
provider=memory_spec.model.provider,
model=memory_spec.model.name,
model_type=ModelType.LLM,
)
formatted_history = ""
for sender, message in visible_history:
formatted_history += f"{sender}: {message}\n"
filled_instruction = variable_pool.convert_template(memory_spec.instruction).text
formatted_prompt = PromptTemplateParser(MEMORY_UPDATE_PROMPT).format(
inputs={
"formatted_history": formatted_history,
"current_value": memory_block.value,
"instruction": filled_instruction,
}
)
llm_result = model_instance.invoke_llm(
prompt_messages=[UserPromptMessage(content=formatted_prompt)],
model_parameters=memory_spec.model.completion_params,
stream=False,
)
return llm_result.message.get_text_content()
@staticmethod
def generate_memory_template(
tenant_id: str,
instruction: str,
model_config: dict,
) -> dict:
"""
Generate Memory Template
Uses MEMORY_TEMPLATE_GENERATION_SYSTEM_PROMPT
"""
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
)
prompt_messages: list[PromptMessage] = [
SystemPromptMessage(content=MEMORY_TEMPLATE_GENERATION_SYSTEM_PROMPT),
UserPromptMessage(content=instruction),
]
try:
response = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters={"temperature": 0.7},
stream=False,
)
generated_template = response.message.get_text_content()
return {"template": generated_template}
except Exception as e:
logger.exception("Failed to generate memory template")
return {"error": f"Failed to generate memory template: {str(e)}"}
@staticmethod
def generate_memory_instruction(
tenant_id: str,
instruction: str,
model_config: dict,
) -> dict:
"""
Generate Memory Instruction
Uses MEMORY_INSTRUCTION_GENERATION_SYSTEM_PROMPT
"""
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
)
prompt_messages: list[PromptMessage] = [
SystemPromptMessage(content=MEMORY_INSTRUCTION_GENERATION_SYSTEM_PROMPT),
UserPromptMessage(content=instruction),
]
try:
response = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters={"temperature": 0.7},
stream=False,
)
generated_instruction = response.message.get_text_content()
return {"instruction": generated_instruction}
except Exception as e:
logger.exception("Failed to generate memory instruction")
return {"error": f"Failed to generate memory instruction: {str(e)}"}
@staticmethod
def edit_memory_template(
tenant_id: str,
flow_id: str,
node_id: str | None,
current: str,
instruction: str,
model_config: dict,
ideal_output: str | None = None,
) -> dict:
"""
Edit Memory Template
Supports variable references: {{#history#}}, {{#system_prompt#}}
"""
# Use unified variable injector
variable_providers = {
"history": lambda: LLMGenerator.__get_history_json(flow_id, node_id, tenant_id),
"system_prompt": lambda: json.dumps(
LLMGenerator.__get_system_prompt(flow_id, node_id, tenant_id),
ensure_ascii=False
),
}
injected_instruction = LLMGenerator.__inject_variables(
instruction=instruction,
variable_providers=variable_providers
)
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
)
system_prompt = MEMORY_TEMPLATE_EDIT_SYSTEM_PROMPT
user_content = json.dumps({
"current_template": current,
"instruction": injected_instruction,
"ideal_output": ideal_output,
})
prompt_messages: list[PromptMessage] = [
SystemPromptMessage(content=system_prompt),
UserPromptMessage(content=user_content),
]
try:
response = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters={"temperature": 0.4},
stream=False,
)
generated_raw = response.message.get_text_content()
# Extract JSON
first_brace = generated_raw.find("{")
last_brace = generated_raw.rfind("}")
result = json.loads(generated_raw[first_brace : last_brace + 1])
return {
"modified": result.get("modified", ""),
"message": result.get("message", "Template updated successfully"),
}
except Exception as e:
logger.exception("Failed to edit memory template")
return {"error": f"Failed to edit memory template: {str(e)}"}
@staticmethod
def edit_memory_instruction(
tenant_id: str,
flow_id: str,
node_id: str | None,
current: str,
instruction: str,
model_config: dict,
ideal_output: str | None = None,
) -> dict:
"""
Edit Memory Instruction
Supports variable references: {{#history#}}, {{#system_prompt#}}
"""
# Use unified variable injector
variable_providers = {
"history": lambda: LLMGenerator.__get_history_json(flow_id, node_id, tenant_id),
"system_prompt": lambda: json.dumps(
LLMGenerator.__get_system_prompt(flow_id, node_id, tenant_id),
ensure_ascii=False
),
}
injected_instruction = LLMGenerator.__inject_variables(
instruction=instruction,
variable_providers=variable_providers
)
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
)
system_prompt = MEMORY_INSTRUCTION_EDIT_SYSTEM_PROMPT
user_content = json.dumps({
"current_instruction": current,
"instruction": injected_instruction,
"ideal_output": ideal_output,
})
prompt_messages: list[PromptMessage] = [
SystemPromptMessage(content=system_prompt),
UserPromptMessage(content=user_content),
]
try:
response = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters={"temperature": 0.4},
stream=False,
)
generated_raw = response.message.get_text_content()
# Extract JSON
first_brace = generated_raw.find("{")
last_brace = generated_raw.rfind("}")
result = json.loads(generated_raw[first_brace : last_brace + 1])
return {
"modified": result.get("modified", ""),
"message": result.get("message", "Instruction updated successfully"),
}
except Exception as e:
logger.exception("Failed to edit memory instruction")
return {"error": f"Failed to edit memory instruction: {str(e)}"}
# ==================== Unified variable injector (private method) ====================
@staticmethod
def __inject_variables(
instruction: str,
variable_providers: Mapping[str, Callable[[], str]]
) -> str:
"""
Unified variable injector (private method)
Replaces variable placeholders {{#variable_name#}} in instruction with actual values
Args:
instruction: User's original instruction
variable_providers: Mapping of variable name -> getter function
Example: {"last_run": lambda: json.dumps(data), "history": lambda: get_history()}
Returns:
Instruction with injected variables
Features:
1. Lazy loading: Only calls getter function when placeholder is present
2. Fault tolerance: Failure of one variable doesn't affect others
3. Extensible: New variables can be added through variable_providers parameter
"""
injected = instruction
for var_name, provider_func in variable_providers.items():
placeholder = f"{{{{#{var_name}#}}}}"
if placeholder in injected:
try:
# Lazy loading: only call when needed
value = provider_func()
injected = injected.replace(placeholder, value)
except Exception as e:
logger.warning("Failed to inject variable '%s': %s", var_name, e)
# Use default value on failure, don't block the request
default_value = "[]" if var_name == "history" else '""'
injected = injected.replace(placeholder, default_value)
return injected
@staticmethod
def __get_history_json(
flow_id: str,
node_id: str | None,
tenant_id: str
) -> str:
"""
Get conversation history as JSON string (private method)
Args:
flow_id: Application ID
node_id: Node ID (optional, None indicates APP level)
tenant_id: Tenant ID
Returns:
JSON array string in format: [{"role": "user", "content": "..."}, ...]
Returns "[]" if no history exists
"""
from services.chatflow_history_service import ChatflowHistoryService
app = db.session.query(App).filter_by(id=flow_id).first()
if not app:
return "[]"
visible_messages = ChatflowHistoryService.get_latest_chat_history_for_app(
app_id=app.id,
tenant_id=tenant_id,
node_id=node_id or None
)
history_json = [
{"role": msg.role.value, "content": msg.content}
for msg in visible_messages
]
return json.dumps(history_json, ensure_ascii=False)
@staticmethod
def __get_system_prompt(
flow_id: str,
node_id: str | None,
tenant_id: str
) -> str:
"""
Get system prompt (private method)
Args:
flow_id: Application ID
node_id: Node ID (optional)
tenant_id: Tenant ID
Returns:
System prompt string, returns "" if none exists
"""
from services.workflow_service import WorkflowService
app = db.session.query(App).filter_by(id=flow_id).first()
if not app:
return ""
# Legacy app
if app.mode in {"chat", "completion"}:
try:
app_model_config = app.app_model_config_dict
return app_model_config.get("pre_prompt", "")
except Exception:
return ""
# Workflow app
try:
workflow = WorkflowService().get_draft_workflow(app_model=app)
if not workflow:
return ""
nodes = workflow.graph_dict.get("nodes", [])
if node_id:
# Get system prompt for specified node
node = next((n for n in nodes if n["id"] == node_id), None)
if not node or node["data"]["type"] not in ["llm", "agent"]:
return ""
prompt_template = node["data"].get("prompt_template")
return LLMGenerator.__extract_system_prompt_from_template(prompt_template)
else:
# APP level: find the main LLM node (connected to END node)
edges = workflow.graph_dict.get("edges", [])
llm_nodes = [n for n in nodes if n["data"]["type"] in ["llm", "agent"]]
for edge in edges:
if edge.get("target") == "end":
source_node = next((n for n in llm_nodes if n["id"] == edge.get("source")), None)
if source_node:
prompt_template = source_node["data"].get("prompt_template")
system_prompt = LLMGenerator.__extract_system_prompt_from_template(prompt_template)
if system_prompt:
return system_prompt
# Fallback: return system prompt from first LLM node
if llm_nodes:
prompt_template = llm_nodes[0]["data"].get("prompt_template")
return LLMGenerator.__extract_system_prompt_from_template(prompt_template)
return ""
except Exception as e:
logger.warning("Failed to get system prompt: %s", e)
return ""
@staticmethod
def __extract_system_prompt_from_template(prompt_template: Any) -> str:
"""
Extract system prompt from prompt_template (private method)
Args:
prompt_template: LLM node's prompt_template (may be list or dict)
Returns:
System prompt string
"""
if not prompt_template:
return ""
if isinstance(prompt_template, list):
# Chat model: [{"role": "system", "text": "..."}, ...]
system_msg = next((m for m in prompt_template if m.get("role") == "system"), None)
return system_msg.get("text", "") if system_msg else ""
elif isinstance(prompt_template, dict):
# Completion model: {"text": "..."}
return prompt_template.get("text", "")
else:
return ""

View File

@@ -422,3 +422,112 @@ INSTRUCTION_GENERATE_TEMPLATE_PROMPT = """The output of this prompt is not as ex
You should edit the prompt according to the IDEAL OUTPUT."""
INSTRUCTION_GENERATE_TEMPLATE_CODE = """Please fix the errors in the {{#error_message#}}."""
MEMORY_UPDATE_PROMPT = """
Based on the following conversation history, update the memory content:
Conversation history:
{{formatted_history}}
Current memory:
{{current_value}}
Update instruction:
{{instruction}}
Please output only the updated memory content, no other text like greeting:
"""
MEMORY_TEMPLATE_GENERATION_SYSTEM_PROMPT = """
You are a helpful assistant designed to extract structured template information from a long-term conversation. Your task is to generate a concise and complete MemoryBlock template based on the underlying purpose of the conversation.
Each MemoryBlock represents a reusable schema that captures the key elements relevant to a specific task or goal (e.g., planning a trip, conducting a job interview, writing a blog post, etc.).
When generating a template:
1. Analyze the overall goal or purpose of the conversation described in the user's instruction.
2. Identify essential information categories that would be relevant to track.
3. Structure the template using Markdown format with clear sections and fields.
4. Do not fill in actual user data — only describe the structure and purpose of each field.
5. Be general enough to be reusable, but specific enough to serve the user's intent.
Respond with only the template in Markdown format, with no additional explanation.
Example format:
# [Template Name]
## Section 1
- **Field 1:** Description of what should be captured here
- **Field 2:** Description of what should be captured here
## Section 2
- **Field 3:** Description of what should be captured here
""" # noqa: E501
MEMORY_INSTRUCTION_GENERATION_SYSTEM_PROMPT = """
You are a prompt generation model.
Your task is to generate an instruction for a downstream language model tasked with extracting structured memory blocks (MemoryBlock) from long, multi-turn conversations between a user and an assistant.
The downstream model will receive:
- A template describing the structure and fields of the memory block it should extract.
- The historical conversation, serialized as plain text with message tags.
- Optional context including the assistant's system prompt.
You must generate a clear, specific, and instructional prompt that:
1. Explains what a MemoryBlock is and its purpose.
2. Instructs the model to extract only information relevant to the template fields.
3. Emphasizes handling implicit information and scattered mentions across multiple turns.
4. Instructs to ignore irrelevant or casual dialogue.
5. Describes the expected output format (structured object matching the template).
6. Uses placeholders {{#history#}} and {{#system_prompt#}} for runtime variable injection.
The tone should be concise, instructional, and focused on task precision.
Based on the user's description of the conversation context, generate the ideal extraction instruction.
""" # noqa: E501
MEMORY_TEMPLATE_EDIT_SYSTEM_PROMPT = """
You are an expert at refining memory templates for conversation tracking systems.
You will receive:
- current_template: The existing memory template
- instruction: User's instruction for how to modify the template (may include {{#history#}} and {{#system_prompt#}} references)
- ideal_output: Optional description of the desired result
Your task:
1. Analyze the current template structure
2. Apply the requested modifications based on the instruction
3. Ensure the modified template maintains proper Markdown structure
4. Keep field descriptions clear and actionable
Output format (JSON):
{
"modified": "<the updated template in Markdown format>",
"message": "<brief explanation of changes made>"
}
Only output the JSON, no additional text.
""" # noqa: E501
MEMORY_INSTRUCTION_EDIT_SYSTEM_PROMPT = """
You are an expert at refining extraction instructions for memory systems.
You will receive:
- current_instruction: The existing extraction instruction
- instruction: User's instruction for how to improve it (may include {{#history#}} and {{#system_prompt#}} references)
- ideal_output: Optional description of the desired result
Your task:
1. Analyze the current instruction's effectiveness
2. Apply the requested improvements based on the user's instruction
3. Ensure the modified instruction is clear, specific, and actionable
4. Maintain focus on structured extraction matching the template
Output format (JSON):
{
"modified": "<the improved instruction>",
"message": "<brief explanation of improvements>"
}
Only output the JSON, no additional text.
"""

133
api/core/memory/entities.py Normal file
View File

@@ -0,0 +1,133 @@
from __future__ import annotations
from enum import StrEnum
from typing import Optional
from uuid import uuid4
from pydantic import BaseModel, Field, field_validator
from core.app.app_config.entities import ModelConfig
class MemoryScope(StrEnum):
"""Memory scope determined by node_id field"""
APP = "app" # node_id is None
NODE = "node" # node_id is not None
class MemoryTerm(StrEnum):
"""Memory term determined by conversation_id field"""
SESSION = "session" # conversation_id is not None
PERSISTENT = "persistent" # conversation_id is None
class MemoryStrategy(StrEnum):
ON_TURNS = "on_turns"
class MemoryScheduleMode(StrEnum):
SYNC = "sync"
ASYNC = "async"
class MemoryBlockSpec(BaseModel):
"""Memory block specification for workflow configuration"""
id: str = Field(
default_factory=lambda: str(uuid4()),
description="Unique identifier for the memory block",
)
name: str = Field(description="Display name of the memory block")
description: str = Field(default="", description="Description of the memory block")
template: str = Field(description="Initial template content for the memory")
instruction: str = Field(description="Instructions for updating the memory")
scope: MemoryScope = Field(description="Scope of the memory (app or node level)")
term: MemoryTerm = Field(description="Term of the memory (session or persistent)")
strategy: MemoryStrategy = Field(description="Update strategy for the memory")
update_turns: int = Field(gt=0, description="Number of turns between updates")
preserved_turns: int = Field(gt=0, description="Number of conversation turns to preserve")
schedule_mode: MemoryScheduleMode = Field(description="Synchronous or asynchronous update mode")
model: ModelConfig = Field(description="Model configuration for memory updates")
end_user_visible: bool = Field(default=False, description="Whether memory is visible to end users")
end_user_editable: bool = Field(default=False, description="Whether memory is editable by end users")
node_id: str | None = Field(
default=None,
description="Node ID when scope is NODE. Must be None when scope is APP."
)
@field_validator('node_id')
@classmethod
def validate_node_id_with_scope(cls, v: str | None, info) -> str | None:
"""Validate node_id consistency with scope"""
scope = info.data.get('scope')
if scope == MemoryScope.NODE and v is None:
raise ValueError("node_id is required when scope is NODE")
if scope == MemoryScope.APP and v is not None:
raise ValueError("node_id must be None when scope is APP")
return v
class MemoryCreatedBy(BaseModel):
end_user_id: str | None = None
account_id: str | None = None
class MemoryBlock(BaseModel):
"""Runtime memory block instance
Design Rules:
- app_id = None: Global memory (future feature, not implemented yet)
- app_id = str: App-specific memory
- conversation_id = None: Persistent memory (cross-conversation)
- conversation_id = str: Session memory (conversation-specific)
- node_id = None: App-level scope
- node_id = str: Node-level scope
These rules implicitly determine scope and term without redundant storage.
"""
spec: MemoryBlockSpec
tenant_id: str
value: str
app_id: str
conversation_id: Optional[str] = None
node_id: Optional[str] = None
edited_by_user: bool = False
created_by: MemoryCreatedBy
version: int = Field(description="Memory block version number")
class MemoryValueData(BaseModel):
value: str
edited_by_user: bool = False
class ChatflowConversationMetadata(BaseModel):
"""Metadata for chatflow conversation with visible message count"""
type: str = "mutable_visible_window"
visible_count: int = Field(gt=0, description="Number of visible messages to keep")
class MemoryBlockWithConversation(MemoryBlock):
"""MemoryBlock with optional conversation metadata for session memories"""
conversation_metadata: ChatflowConversationMetadata = Field(
description="Conversation metadata, only present for session memories"
)
@classmethod
def from_memory_block(
cls,
memory_block: MemoryBlock,
conversation_metadata: ChatflowConversationMetadata
) -> MemoryBlockWithConversation:
"""Create MemoryBlockWithConversation from MemoryBlock"""
return cls(
spec=memory_block.spec,
tenant_id=memory_block.tenant_id,
value=memory_block.value,
app_id=memory_block.app_id,
conversation_id=memory_block.conversation_id,
node_id=memory_block.node_id,
edited_by_user=memory_block.edited_by_user,
created_by=memory_block.created_by,
version=memory_block.version,
conversation_metadata=conversation_metadata
)

View File

@@ -0,0 +1,6 @@
class MemorySyncTimeoutError(Exception):
def __init__(self, app_id: str, conversation_id: str):
self.app_id = app_id
self.conversation_id = conversation_id
self.message = "Memory synchronization timeout after 50 seconds"
super().__init__(self.message)

View File

@@ -44,7 +44,7 @@ class MemoryConfig(BaseModel):
enabled: bool
size: int | None = None
mode: Literal["linear", "block"] | None = "linear"
role_prefix: RolePrefix | None = None
window: WindowConfig
query_prompt_template: str | None = None

View File

@@ -203,6 +203,48 @@ class ArrayFileSegment(ArraySegment):
return ""
class VersionedMemoryValue(BaseModel):
current_value: str = None # type: ignore
versions: Mapping[str, str] = {}
model_config = ConfigDict(frozen=True)
def add_version(
self,
new_value: str,
version_name: str | None = None
) -> "VersionedMemoryValue":
if version_name is None:
version_name = str(len(self.versions) + 1)
if version_name in self.versions:
raise ValueError(f"Version '{version_name}' already exists.")
self.current_value = new_value
return VersionedMemoryValue(
current_value=new_value,
versions={
version_name: new_value,
**self.versions,
}
)
class VersionedMemorySegment(Segment):
value_type: SegmentType = SegmentType.VERSIONED_MEMORY
value: VersionedMemoryValue = None # type: ignore
@property
def text(self) -> str:
return self.value.current_value
@property
def log(self) -> str:
return self.value.current_value
@property
def markdown(self) -> str:
return self.value.current_value
class ArrayBooleanSegment(ArraySegment):
value_type: SegmentType = SegmentType.ARRAY_BOOLEAN
value: Sequence[bool]
@@ -248,6 +290,7 @@ SegmentUnion: TypeAlias = Annotated[
| Annotated[ArrayObjectSegment, Tag(SegmentType.ARRAY_OBJECT)]
| Annotated[ArrayFileSegment, Tag(SegmentType.ARRAY_FILE)]
| Annotated[ArrayBooleanSegment, Tag(SegmentType.ARRAY_BOOLEAN)]
| Annotated[VersionedMemorySegment, Tag(SegmentType.VERSIONED_MEMORY)]
),
Discriminator(get_segment_discriminator),
]

View File

@@ -44,6 +44,8 @@ class SegmentType(StrEnum):
ARRAY_FILE = "array[file]"
ARRAY_BOOLEAN = "array[boolean]"
VERSIONED_MEMORY = "versioned_memory"
NONE = "none"
GROUP = "group"

View File

@@ -22,6 +22,7 @@ from .segments import (
ObjectSegment,
Segment,
StringSegment,
VersionedMemorySegment,
get_segment_discriminator,
)
from .types import SegmentType
@@ -106,6 +107,10 @@ class ArrayFileVariable(ArrayFileSegment, ArrayVariable):
pass
class VersionedMemoryVariable(VersionedMemorySegment, Variable):
pass
class ArrayBooleanVariable(ArrayBooleanSegment, ArrayVariable):
pass
@@ -161,6 +166,7 @@ VariableUnion: TypeAlias = Annotated[
| Annotated[ArrayFileVariable, Tag(SegmentType.ARRAY_FILE)]
| Annotated[ArrayBooleanVariable, Tag(SegmentType.ARRAY_BOOLEAN)]
| Annotated[SecretVariable, Tag(SegmentType.SECRET)]
| Annotated[VersionedMemoryVariable, Tag(SegmentType.VERSIONED_MEMORY)]
),
Discriminator(get_segment_discriminator),
]

View File

@@ -1,4 +1,5 @@
SYSTEM_VARIABLE_NODE_ID = "sys"
ENVIRONMENT_VARIABLE_NODE_ID = "env"
CONVERSATION_VARIABLE_NODE_ID = "conversation"
MEMORY_BLOCK_VARIABLE_NODE_ID = "memory_block"
RAG_PIPELINE_VARIABLE_NODE_ID = "rag"

View File

@@ -7,11 +7,15 @@ import time
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any, Literal
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.app.entities.app_invoke_entities import InvokeFrom, ModelConfigWithCredentialsEntity
from core.file import FileType, file_manager
from core.helper.code_executor import CodeExecutor, CodeLanguage
from core.llm_generator.output_parser.errors import OutputParserError
from core.llm_generator.output_parser.structured_output import invoke_llm_with_structured_output
from core.memory.entities import MemoryCreatedBy, MemoryScope
from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance, ModelManager
from core.model_runtime.entities import (
@@ -73,6 +77,8 @@ from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig, Variabl
from core.workflow.nodes.base.node import Node
from core.workflow.nodes.base.variable_template_parser import VariableTemplateParser
from core.workflow.runtime import VariablePool
from models import UserFrom, Workflow
from models.engine import db
from . import llm_utils
from .entities import (
@@ -317,6 +323,11 @@ class LLMNode(Node):
if self._file_outputs:
outputs["files"] = ArrayFileSegment(value=self._file_outputs)
try:
self._handle_chatflow_memory(result_text, variable_pool)
except Exception as e:
logger.warning("Memory orchestration failed for node %s: %s", self.node_id, str(e))
# Send final chunk event to indicate streaming is complete
yield StreamChunkEvent(
selector=[self._node_id, "text"],
@@ -1228,6 +1239,79 @@ class LLMNode(Node):
def retry(self) -> bool:
return self._node_data.retry_config.retry_enabled
def _handle_chatflow_memory(self, llm_output: str, variable_pool: VariablePool):
if not self._node_data.memory or self._node_data.memory.mode != "block":
return
conversation_id_segment = variable_pool.get((SYSTEM_VARIABLE_NODE_ID, SystemVariableKey.CONVERSATION_ID))
if not conversation_id_segment:
raise ValueError("Conversation ID not found in variable pool.")
conversation_id = conversation_id_segment.text
user_query_segment = variable_pool.get((SYSTEM_VARIABLE_NODE_ID, SystemVariableKey.QUERY))
if not user_query_segment:
raise ValueError("User query not found in variable pool.")
user_query = user_query_segment.text
from core.model_runtime.entities.message_entities import AssistantPromptMessage, UserPromptMessage
from services.chatflow_history_service import ChatflowHistoryService
ChatflowHistoryService.save_node_message(
prompt_message=(UserPromptMessage(content=user_query)),
node_id=self.node_id,
conversation_id=conversation_id,
app_id=self.app_id,
tenant_id=self.tenant_id
)
ChatflowHistoryService.save_node_message(
prompt_message=(AssistantPromptMessage(content=llm_output)),
node_id=self.node_id,
conversation_id=conversation_id,
app_id=self.app_id,
tenant_id=self.tenant_id
)
# FIXME: This is dirty workaround and may cause incorrect resolution for workflow version
with Session(db.engine) as session:
stmt = select(Workflow).where(
Workflow.tenant_id == self.tenant_id,
Workflow.app_id == self.app_id
)
workflow = session.scalars(stmt).first()
if not workflow:
raise ValueError("Workflow not found.")
# Filter memory blocks that belong to this node
node_memory_blocks = [
block for block in workflow.memory_blocks
if block.scope == MemoryScope.NODE and block.node_id == self.id
]
if not node_memory_blocks:
return
# Update each memory block that belongs to this node
is_draft = (self.invoke_from == InvokeFrom.DEBUGGER)
from services.chatflow_memory_service import ChatflowMemoryService
for memory_block_spec in node_memory_blocks:
ChatflowMemoryService.update_node_memory_if_needed(
tenant_id=self.tenant_id,
app_id=self.app_id,
node_id=self.id,
conversation_id=conversation_id,
memory_block_spec=memory_block_spec,
variable_pool=variable_pool,
is_draft=is_draft,
created_by=self._get_user_from_context()
)
def _get_user_from_context(self) -> MemoryCreatedBy:
if self.user_from == UserFrom.ACCOUNT:
return MemoryCreatedBy(account_id=self.user_id)
else:
return MemoryCreatedBy(end_user_id=self.user_id)
def _combine_message_content_with_role(
*, contents: str | list[PromptMessageContentUnionTypes] | None = None, role: PromptMessageRole

View File

@@ -9,11 +9,12 @@ from pydantic import BaseModel, Field
from core.file import File, FileAttribute, file_manager
from core.variables import Segment, SegmentGroup, Variable
from core.variables.consts import SELECTORS_LENGTH
from core.variables.segments import FileSegment, ObjectSegment
from core.variables.variables import RAGPipelineVariableInput, VariableUnion
from core.variables.segments import FileSegment, ObjectSegment, VersionedMemoryValue
from core.variables.variables import RAGPipelineVariableInput, VariableUnion, VersionedMemoryVariable
from core.workflow.constants import (
CONVERSATION_VARIABLE_NODE_ID,
ENVIRONMENT_VARIABLE_NODE_ID,
MEMORY_BLOCK_VARIABLE_NODE_ID,
RAG_PIPELINE_VARIABLE_NODE_ID,
SYSTEM_VARIABLE_NODE_ID,
)
@@ -56,6 +57,10 @@ class VariablePool(BaseModel):
description="RAG pipeline variables.",
default_factory=list,
)
memory_blocks: Mapping[str, str] = Field(
description="Memory blocks.",
default_factory=dict,
)
def model_post_init(self, context: Any, /):
# Create a mapping from field names to SystemVariableKey enum values
@@ -76,6 +81,18 @@ class VariablePool(BaseModel):
rag_pipeline_variables_map[node_id][key] = value
for key, value in rag_pipeline_variables_map.items():
self.add((RAG_PIPELINE_VARIABLE_NODE_ID, key), value)
# Add memory blocks to the variable pool
for memory_id, memory_value in self.memory_blocks.items():
self.add(
[MEMORY_BLOCK_VARIABLE_NODE_ID, memory_id],
VersionedMemoryVariable(
value=VersionedMemoryValue(
current_value=memory_value,
versions={"1": memory_value},
),
name=memory_id,
)
)
def add(self, selector: Sequence[str], value: Any, /):
"""

View File

@@ -21,6 +21,8 @@ from core.variables.segments import (
ObjectSegment,
Segment,
StringSegment,
VersionedMemorySegment,
VersionedMemoryValue,
)
from core.variables.types import SegmentType
from core.variables.variables import (
@@ -39,6 +41,7 @@ from core.variables.variables import (
SecretVariable,
StringVariable,
Variable,
VersionedMemoryVariable,
)
from core.workflow.constants import (
CONVERSATION_VARIABLE_NODE_ID,
@@ -69,6 +72,7 @@ SEGMENT_TO_VARIABLE_MAP = {
NoneSegment: NoneVariable,
ObjectSegment: ObjectVariable,
StringSegment: StringVariable,
VersionedMemorySegment: VersionedMemoryVariable
}
@@ -193,6 +197,7 @@ _segment_factory: Mapping[SegmentType, type[Segment]] = {
SegmentType.FILE: FileSegment,
SegmentType.BOOLEAN: BooleanSegment,
SegmentType.OBJECT: ObjectSegment,
SegmentType.VERSIONED_MEMORY: VersionedMemorySegment,
# Array types
SegmentType.ARRAY_ANY: ArrayAnySegment,
SegmentType.ARRAY_STRING: ArrayStringSegment,
@@ -259,6 +264,12 @@ def build_segment_with_type(segment_type: SegmentType, value: Any) -> Segment:
else:
raise TypeMismatchError(f"Type mismatch: expected {segment_type}, but got empty list")
if segment_type == SegmentType.VERSIONED_MEMORY:
return VersionedMemorySegment(
value_type=segment_type,
value=VersionedMemoryValue.model_validate(value)
)
inferred_type = SegmentType.infer_segment_type(value)
# Type compatibility checking
if inferred_type is None:

View File

@@ -49,6 +49,31 @@ conversation_variable_fields = {
"description": fields.String,
}
model_config_fields = {
"provider": fields.String,
"name": fields.String,
"mode": fields.String,
"completion_params": fields.Raw,
}
memory_block_fields = {
"id": fields.String,
"name": fields.String,
"description": fields.String,
"template": fields.String,
"instruction": fields.String,
"scope": fields.String,
"term": fields.String,
"strategy": fields.String,
"update_turns": fields.Integer,
"preserved_turns": fields.Integer,
"schedule_mode": fields.String,
"model": fields.Nested(model_config_fields),
"end_user_visible": fields.Boolean,
"end_user_editable": fields.Boolean,
"node_id": fields.String,
}
pipeline_variable_fields = {
"label": fields.String,
"variable": fields.String,
@@ -81,6 +106,7 @@ workflow_fields = {
"tool_published": fields.Boolean,
"environment_variables": fields.List(EnvironmentVariableField()),
"conversation_variables": fields.List(fields.Nested(conversation_variable_fields)),
"memory_blocks": fields.Nested(memory_block_fields),
"rag_pipeline_variables": fields.List(fields.Nested(pipeline_variable_fields)),
}

View File

@@ -9,6 +9,7 @@ from .account import (
TenantStatus,
)
from .api_based_extension import APIBasedExtension, APIBasedExtensionPoint
from .chatflow_memory import ChatflowConversation, ChatflowMemoryVariable, ChatflowMessage
from .dataset import (
AppDatasetJoin,
Dataset,
@@ -129,6 +130,9 @@ __all__ = [
"BuiltinToolProvider",
"CeleryTask",
"CeleryTaskSet",
"ChatflowConversation",
"ChatflowMemoryVariable",
"ChatflowMessage",
"Conversation",
"ConversationVariable",
"CreatorUserRole",

View File

@@ -0,0 +1,76 @@
from datetime import datetime
import sqlalchemy as sa
from sqlalchemy import DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from .base import Base
from .types import StringUUID
class ChatflowMemoryVariable(Base):
__tablename__ = "chatflow_memory_variables"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="chatflow_memory_variables_pkey"),
sa.Index("chatflow_memory_variables_memory_id_idx", "tenant_id", "app_id", "node_id", "memory_id"),
)
id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
conversation_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
node_id: Mapped[str | None] = mapped_column(sa.Text, nullable=True)
memory_id: Mapped[str] = mapped_column(sa.Text, nullable=False)
value: Mapped[str] = mapped_column(sa.Text, nullable=False)
name: Mapped[str] = mapped_column(sa.Text, nullable=False)
scope: Mapped[str] = mapped_column(sa.String(10), nullable=False) # 'app' or 'node'
term: Mapped[str] = mapped_column(sa.String(20), nullable=False) # 'session' or 'persistent'
version: Mapped[int] = mapped_column(sa.Integer, nullable=False, default=1)
created_by_role: Mapped[str] = mapped_column(sa.String(20)) # 'end_user' or 'account`
created_by: Mapped[str] = mapped_column(StringUUID)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
class ChatflowConversation(Base):
__tablename__ = "chatflow_conversations"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="chatflow_conversations_pkey"),
sa.Index(
"chatflow_conversations_original_conversation_id_idx",
"tenant_id", "app_id", "node_id", "original_conversation_id"
),
)
id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str | None] = mapped_column(sa.Text, nullable=True)
original_conversation_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
conversation_metadata: Mapped[str] = mapped_column(sa.Text, nullable=False) # JSON
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
class ChatflowMessage(Base):
__tablename__ = "chatflow_messages"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="chatflow_messages_pkey"),
sa.Index("chatflow_messages_version_idx", "conversation_id", "index", "version"),
)
id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"))
conversation_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
index: Mapped[int] = mapped_column(sa.Integer, nullable=False) # This index starts from 0
version: Mapped[int] = mapped_column(sa.Integer, nullable=False)
data: Mapped[str] = mapped_column(sa.Text, nullable=False) # Serialized PromptMessage JSON
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)

View File

@@ -28,6 +28,7 @@ class DraftVariableType(StrEnum):
NODE = "node"
SYS = "sys"
CONVERSATION = "conversation"
MEMORY_BLOCK = "memory_block"
class MessageStatus(StrEnum):

View File

@@ -1,5 +1,6 @@
import json
import logging
import uuid
from collections.abc import Generator, Mapping, Sequence
from datetime import datetime
from enum import StrEnum
@@ -23,10 +24,13 @@ from sqlalchemy.orm import Mapped, declared_attr, mapped_column
from core.file.constants import maybe_file_object
from core.file.models import File
from core.memory.entities import MemoryBlockSpec
from core.variables import utils as variable_utils
from core.variables.segments import VersionedMemoryValue
from core.variables.variables import FloatVariable, IntegerVariable, StringVariable
from core.workflow.constants import (
CONVERSATION_VARIABLE_NODE_ID,
MEMORY_BLOCK_VARIABLE_NODE_ID,
SYSTEM_VARIABLE_NODE_ID,
)
from core.workflow.enums import NodeType
@@ -161,6 +165,9 @@ class Workflow(Base):
_rag_pipeline_variables: Mapped[str] = mapped_column(
"rag_pipeline_variables", LongText, nullable=False, default="{}"
)
_memory_blocks: Mapped[str] = mapped_column(
"memory_blocks", sa.Text, nullable=False, server_default="[]"
)
VERSION_DRAFT = "draft"
@@ -178,6 +185,7 @@ class Workflow(Base):
environment_variables: Sequence[Variable],
conversation_variables: Sequence[Variable],
rag_pipeline_variables: list[dict],
memory_blocks: Sequence[MemoryBlockSpec] | None = None,
marked_name: str = "",
marked_comment: str = "",
) -> "Workflow":
@@ -193,6 +201,7 @@ class Workflow(Base):
workflow.environment_variables = environment_variables or []
workflow.conversation_variables = conversation_variables or []
workflow.rag_pipeline_variables = rag_pipeline_variables or []
workflow.memory_blocks = memory_blocks or []
workflow.marked_name = marked_name
workflow.marked_comment = marked_comment
workflow.created_at = naive_utc_now()
@@ -509,7 +518,7 @@ class Workflow(Base):
"features": self.features_dict,
"environment_variables": [var.model_dump(mode="json") for var in environment_variables],
"conversation_variables": [var.model_dump(mode="json") for var in self.conversation_variables],
"rag_pipeline_variables": self.rag_pipeline_variables,
"memory_blocks": [block.model_dump(mode="json") for block in self.memory_blocks],
}
return result
@@ -547,6 +556,27 @@ class Workflow(Base):
ensure_ascii=False,
)
@property
def memory_blocks(self) -> Sequence[MemoryBlockSpec]:
"""Memory blocks configuration stored in database"""
if self._memory_blocks is None or self._memory_blocks == "":
self._memory_blocks = "[]"
memory_blocks_list: list[dict[str, Any]] = json.loads(self._memory_blocks)
results = [MemoryBlockSpec.model_validate(config) for config in memory_blocks_list]
return results
@memory_blocks.setter
def memory_blocks(self, value: Sequence[MemoryBlockSpec]):
if not value:
self._memory_blocks = "[]"
return
self._memory_blocks = json.dumps(
[block.model_dump() for block in value],
ensure_ascii=False,
)
@staticmethod
def version_from_datetime(d: datetime) -> str:
return str(d)
@@ -1570,6 +1600,31 @@ class WorkflowDraftVariable(Base):
variable.editable = editable
return variable
@staticmethod
def new_memory_block_variable(
*,
app_id: str,
node_id: str | None = None,
memory_id: str,
name: str,
value: VersionedMemoryValue,
description: str = "",
) -> "WorkflowDraftVariable":
"""Create a new memory block draft variable."""
return WorkflowDraftVariable(
id=str(uuid.uuid4()),
app_id=app_id,
node_id=MEMORY_BLOCK_VARIABLE_NODE_ID,
name=name,
value=value.model_dump_json(),
description=description,
selector=[MEMORY_BLOCK_VARIABLE_NODE_ID, memory_id] if node_id is None else
[MEMORY_BLOCK_VARIABLE_NODE_ID, memory_id, node_id],
value_type=SegmentType.VERSIONED_MEMORY,
visible=True,
editable=True,
)
@property
def edited(self):
return self.last_edited_at is not None

View File

@@ -0,0 +1,289 @@
import json
from collections.abc import MutableMapping, Sequence
from typing import Literal, Optional, overload
from sqlalchemy import Row, Select, and_, desc, func, select
from sqlalchemy.orm import Session
from core.memory.entities import ChatflowConversationMetadata
from core.model_runtime.entities.message_entities import (
PromptMessage,
)
from extensions.ext_database import db
from models.chatflow_memory import ChatflowConversation, ChatflowMessage
class ChatflowHistoryService:
@staticmethod
def get_visible_chat_history(
conversation_id: str,
app_id: str,
tenant_id: str,
node_id: Optional[str] = None,
max_visible_count: Optional[int] = None
) -> Sequence[PromptMessage]:
with Session(db.engine) as session:
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=False
)
if not chatflow_conv:
return []
metadata = ChatflowConversationMetadata.model_validate_json(chatflow_conv.conversation_metadata)
visible_count: int = max_visible_count or metadata.visible_count
stmt = select(ChatflowMessage).where(
ChatflowMessage.conversation_id == chatflow_conv.id
).order_by(ChatflowMessage.index.asc(), ChatflowMessage.version.desc())
raw_messages: Sequence[Row[tuple[ChatflowMessage]]] = session.execute(stmt).all()
sorted_messages = ChatflowHistoryService._filter_latest_messages(
[it[0] for it in raw_messages]
)
visible_count = min(visible_count, len(sorted_messages))
visible_messages = sorted_messages[-visible_count:]
return [PromptMessage.model_validate_json(it.data) for it in visible_messages]
@staticmethod
def get_latest_chat_history_for_app(
app_id: str,
tenant_id: str,
node_id: Optional[str] = None,
max_visible_count: Optional[int] = None
) -> Sequence[PromptMessage]:
"""
Get the latest chat history for an app
Args:
app_id: Application ID
tenant_id: Tenant ID
node_id: Node ID (None for APP level)
max_visible_count: Maximum number of visible messages (optional)
Returns:
PromptMessage sequence, empty list if no history exists
"""
with Session(db.engine) as session:
# Query the most recently updated chatflow conversation
stmt = select(ChatflowConversation).where(
ChatflowConversation.tenant_id == tenant_id,
ChatflowConversation.app_id == app_id,
ChatflowConversation.node_id == (node_id or None)
).order_by(desc(ChatflowConversation.updated_at)).limit(1)
chatflow_conv_row = session.execute(stmt).first()
if not chatflow_conv_row:
return []
chatflow_conv = chatflow_conv_row[0]
# Get visible messages for this conversation
metadata = ChatflowConversationMetadata.model_validate_json(
chatflow_conv.conversation_metadata
)
visible_count: int = max_visible_count or metadata.visible_count
stmt = select(ChatflowMessage).where(
ChatflowMessage.conversation_id == chatflow_conv.id
).order_by(ChatflowMessage.index.asc(), ChatflowMessage.version.desc())
raw_messages: Sequence[Row[tuple[ChatflowMessage]]] = session.execute(stmt).all()
sorted_messages = ChatflowHistoryService._filter_latest_messages(
[it[0] for it in raw_messages]
)
visible_count = min(visible_count, len(sorted_messages))
visible_messages = sorted_messages[-visible_count:]
return [PromptMessage.model_validate_json(it.data) for it in visible_messages]
@staticmethod
def save_message(
prompt_message: PromptMessage,
conversation_id: str,
app_id: str,
tenant_id: str,
node_id: Optional[str] = None
) -> None:
with Session(db.engine) as session:
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=True
)
# Get next index
max_index = session.execute(
select(func.max(ChatflowMessage.index)).where(
ChatflowMessage.conversation_id == chatflow_conv.id
)
).scalar() or -1
next_index = max_index + 1
# Save new message to append-only table
new_message = ChatflowMessage(
conversation_id=chatflow_conv.id,
index=next_index,
version=1,
data=json.dumps(prompt_message)
)
session.add(new_message)
session.commit()
# Increment visible_count after each message save
current_metadata = ChatflowConversationMetadata.model_validate_json(chatflow_conv.conversation_metadata)
new_visible_count = current_metadata.visible_count + 1
new_metadata = ChatflowConversationMetadata(visible_count=new_visible_count)
chatflow_conv.conversation_metadata = new_metadata.model_dump_json()
@staticmethod
def save_app_message(
prompt_message: PromptMessage,
conversation_id: str,
app_id: str,
tenant_id: str
) -> None:
"""Save PromptMessage to app-level chatflow conversation."""
ChatflowHistoryService.save_message(
prompt_message=prompt_message,
conversation_id=conversation_id,
app_id=app_id,
tenant_id=tenant_id,
node_id=None
)
@staticmethod
def save_node_message(
prompt_message: PromptMessage,
node_id: str,
conversation_id: str,
app_id: str,
tenant_id: str
) -> None:
ChatflowHistoryService.save_message(
prompt_message=prompt_message,
conversation_id=conversation_id,
app_id=app_id,
tenant_id=tenant_id,
node_id=node_id
)
@staticmethod
def update_visible_count(
conversation_id: str,
node_id: Optional[str],
new_visible_count: int,
app_id: str,
tenant_id: str
) -> None:
with Session(db.engine) as session:
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=True
)
# Only update visible_count in metadata, do not delete any data
new_metadata = ChatflowConversationMetadata(visible_count=new_visible_count)
chatflow_conv.conversation_metadata = new_metadata.model_dump_json()
session.commit()
@staticmethod
def get_conversation_metadata(
tenant_id: str,
app_id: str,
conversation_id: str,
node_id: Optional[str]
) -> ChatflowConversationMetadata:
with Session(db.engine) as session:
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=False
)
if not chatflow_conv:
raise ValueError(f"Conversation not found: {conversation_id}")
return ChatflowConversationMetadata.model_validate_json(chatflow_conv.conversation_metadata)
@staticmethod
def _filter_latest_messages(raw_messages: Sequence[ChatflowMessage]) -> Sequence[ChatflowMessage]:
index_to_message: MutableMapping[int, ChatflowMessage] = {}
for msg in raw_messages:
index = msg.index
if index not in index_to_message or msg.version > index_to_message[index].version:
index_to_message[index] = msg
sorted_messages = sorted(index_to_message.values(), key=lambda m: m.index)
return sorted_messages
@overload
@staticmethod
def _get_or_create_chatflow_conversation(
session: Session,
conversation_id: str,
app_id: str,
tenant_id: str,
node_id: Optional[str] = None,
create_if_missing: Literal[True] = True
) -> ChatflowConversation: ...
@overload
@staticmethod
def _get_or_create_chatflow_conversation(
session: Session,
conversation_id: str,
app_id: str,
tenant_id: str,
node_id: Optional[str] = None,
create_if_missing: Literal[False] = False
) -> Optional[ChatflowConversation]: ...
@overload
@staticmethod
def _get_or_create_chatflow_conversation(
session: Session,
conversation_id: str,
app_id: str,
tenant_id: str,
node_id: Optional[str] = None,
create_if_missing: bool = False
) -> Optional[ChatflowConversation]: ...
@staticmethod
def _get_or_create_chatflow_conversation(
session: Session,
conversation_id: str,
app_id: str,
tenant_id: str,
node_id: Optional[str] = None,
create_if_missing: bool = False
) -> Optional[ChatflowConversation]:
"""Get existing chatflow conversation or optionally create new one"""
stmt: Select[tuple[ChatflowConversation]] = select(ChatflowConversation).where(
and_(
ChatflowConversation.original_conversation_id == conversation_id,
ChatflowConversation.tenant_id == tenant_id,
ChatflowConversation.app_id == app_id
)
)
if node_id:
stmt = stmt.where(ChatflowConversation.node_id == node_id)
else:
stmt = stmt.where(ChatflowConversation.node_id.is_(None))
chatflow_conv: Row[tuple[ChatflowConversation]] | None = session.execute(stmt).first()
if chatflow_conv:
result: ChatflowConversation = chatflow_conv[0] # Extract the ChatflowConversation object
return result
else:
if create_if_missing:
# Create a new chatflow conversation
default_metadata = ChatflowConversationMetadata(visible_count=0)
new_chatflow_conv = ChatflowConversation(
tenant_id=tenant_id,
app_id=app_id,
node_id=node_id,
original_conversation_id=conversation_id,
conversation_metadata=default_metadata.model_dump_json(),
)
session.add(new_chatflow_conv)
session.flush() # Obtain ID
return new_chatflow_conv
return None

View File

@@ -0,0 +1,680 @@
import logging
import threading
import time
from collections.abc import Sequence
from typing import Optional
from sqlalchemy import and_, delete, select
from sqlalchemy.orm import Session
from core.llm_generator.llm_generator import LLMGenerator
from core.memory.entities import (
MemoryBlock,
MemoryBlockSpec,
MemoryBlockWithConversation,
MemoryCreatedBy,
MemoryScheduleMode,
MemoryScope,
MemoryTerm,
MemoryValueData,
)
from core.memory.errors import MemorySyncTimeoutError
from core.model_runtime.entities.message_entities import PromptMessage
from core.variables.segments import VersionedMemoryValue
from core.workflow.constants import MEMORY_BLOCK_VARIABLE_NODE_ID
from core.workflow.runtime import VariablePool
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models import App, CreatorUserRole
from models.chatflow_memory import ChatflowMemoryVariable
from models.workflow import Workflow, WorkflowDraftVariable
from services.chatflow_history_service import ChatflowHistoryService
from services.workflow_draft_variable_service import WorkflowDraftVariableService
from services.workflow_service import WorkflowService
logger = logging.getLogger(__name__)
class ChatflowMemoryService:
@staticmethod
def get_persistent_memories(
app: App,
created_by: MemoryCreatedBy,
version: int | None = None
) -> Sequence[MemoryBlock]:
if created_by.account_id:
created_by_role = CreatorUserRole.ACCOUNT
created_by_id = created_by.account_id
else:
created_by_role = CreatorUserRole.END_USER
created_by_id = created_by.id
if version is None:
# If version not specified, get the latest version
stmt = select(ChatflowMemoryVariable).distinct(ChatflowMemoryVariable.memory_id).where(
and_(
ChatflowMemoryVariable.tenant_id == app.tenant_id,
ChatflowMemoryVariable.app_id == app.id,
ChatflowMemoryVariable.conversation_id == None,
ChatflowMemoryVariable.created_by_role == created_by_role,
ChatflowMemoryVariable.created_by == created_by_id,
)
).order_by(ChatflowMemoryVariable.version.desc())
else:
stmt = select(ChatflowMemoryVariable).where(
and_(
ChatflowMemoryVariable.tenant_id == app.tenant_id,
ChatflowMemoryVariable.app_id == app.id,
ChatflowMemoryVariable.conversation_id == None,
ChatflowMemoryVariable.created_by_role == created_by_role,
ChatflowMemoryVariable.created_by == created_by_id,
ChatflowMemoryVariable.version == version
)
)
with Session(db.engine) as session:
db_results = session.execute(stmt).all()
return ChatflowMemoryService._convert_to_memory_blocks(app, created_by, [result[0] for result in db_results])
@staticmethod
def get_session_memories(
app: App,
created_by: MemoryCreatedBy,
conversation_id: str,
version: int | None = None
) -> Sequence[MemoryBlock]:
if version is None:
# If version not specified, get the latest version
stmt = select(ChatflowMemoryVariable).distinct(ChatflowMemoryVariable.memory_id).where(
and_(
ChatflowMemoryVariable.tenant_id == app.tenant_id,
ChatflowMemoryVariable.app_id == app.id,
ChatflowMemoryVariable.conversation_id == conversation_id
)
).order_by(ChatflowMemoryVariable.version.desc())
else:
stmt = select(ChatflowMemoryVariable).where(
and_(
ChatflowMemoryVariable.tenant_id == app.tenant_id,
ChatflowMemoryVariable.app_id == app.id,
ChatflowMemoryVariable.conversation_id == conversation_id,
ChatflowMemoryVariable.version == version
)
)
with Session(db.engine) as session:
db_results = session.execute(stmt).all()
return ChatflowMemoryService._convert_to_memory_blocks(app, created_by, [result[0] for result in db_results])
@staticmethod
def save_memory(memory: MemoryBlock, variable_pool: VariablePool, is_draft: bool) -> None:
key = f"{memory.node_id}_{memory.spec.id}" if memory.node_id else memory.spec.id
variable_pool.add([MEMORY_BLOCK_VARIABLE_NODE_ID, key], memory.value)
if memory.created_by.account_id:
created_by_role = CreatorUserRole.ACCOUNT
created_by = memory.created_by.account_id
else:
created_by_role = CreatorUserRole.END_USER
created_by = memory.created_by.id
with Session(db.engine) as session:
session.add(
ChatflowMemoryVariable(
memory_id=memory.spec.id,
tenant_id=memory.tenant_id,
app_id=memory.app_id,
node_id=memory.node_id,
conversation_id=memory.conversation_id,
name=memory.spec.name,
value=MemoryValueData(
value=memory.value,
edited_by_user=memory.edited_by_user
).model_dump_json(),
term=memory.spec.term,
scope=memory.spec.scope,
version=memory.version, # Use version from MemoryBlock directly
created_by_role=created_by_role,
created_by=created_by,
)
)
session.commit()
if is_draft:
with Session(bind=db.engine) as session:
draft_var_service = WorkflowDraftVariableService(session)
memory_selector = memory.spec.id if not memory.node_id else f"{memory.node_id}_{memory.spec.id}"
existing_vars = draft_var_service.get_draft_variables_by_selectors(
app_id=memory.app_id,
selectors=[[MEMORY_BLOCK_VARIABLE_NODE_ID, memory_selector]]
)
if existing_vars:
draft_var = existing_vars[0]
draft_var.value = VersionedMemoryValue.model_validate_json(draft_var.value)\
.add_version(memory.value)\
.model_dump_json()
else:
draft_var = WorkflowDraftVariable.new_memory_block_variable(
app_id=memory.app_id,
memory_id=memory.spec.id,
name=memory.spec.name,
value=VersionedMemoryValue().add_version(memory.value),
description=memory.spec.description
)
session.add(draft_var)
session.commit()
@staticmethod
def get_memories_by_specs(
memory_block_specs: Sequence[MemoryBlockSpec],
tenant_id: str, app_id: str,
created_by: MemoryCreatedBy,
conversation_id: Optional[str],
node_id: Optional[str],
is_draft: bool
) -> Sequence[MemoryBlock]:
return [ChatflowMemoryService.get_memory_by_spec(
spec, tenant_id, app_id, created_by, conversation_id, node_id, is_draft
) for spec in memory_block_specs]
@staticmethod
def get_memory_by_spec(
spec: MemoryBlockSpec,
tenant_id: str,
app_id: str,
created_by: MemoryCreatedBy,
conversation_id: Optional[str],
node_id: Optional[str],
is_draft: bool
) -> MemoryBlock:
with Session(db.engine) as session:
if is_draft:
draft_var_service = WorkflowDraftVariableService(session)
selector = [MEMORY_BLOCK_VARIABLE_NODE_ID, f"{spec.id}.{node_id}"] \
if node_id else [MEMORY_BLOCK_VARIABLE_NODE_ID, spec.id]
draft_vars = draft_var_service.get_draft_variables_by_selectors(
app_id=app_id,
selectors=[selector]
)
if draft_vars:
draft_var = draft_vars[0]
return MemoryBlock(
value=draft_var.get_value().text,
tenant_id=tenant_id,
app_id=app_id,
conversation_id=conversation_id,
node_id=node_id,
spec=spec,
created_by=created_by,
version=1,
)
stmt = select(ChatflowMemoryVariable).where(
and_(
ChatflowMemoryVariable.memory_id == spec.id,
ChatflowMemoryVariable.tenant_id == tenant_id,
ChatflowMemoryVariable.app_id == app_id,
ChatflowMemoryVariable.node_id ==
(node_id if spec.scope == MemoryScope.NODE else None),
ChatflowMemoryVariable.conversation_id ==
(conversation_id if spec.term == MemoryTerm.SESSION else None),
)
).order_by(ChatflowMemoryVariable.version.desc()).limit(1)
result = session.execute(stmt).scalar()
if result:
memory_value_data = MemoryValueData.model_validate_json(result.value)
return MemoryBlock(
value=memory_value_data.value,
tenant_id=tenant_id,
app_id=app_id,
conversation_id=conversation_id,
node_id=node_id,
spec=spec,
edited_by_user=memory_value_data.edited_by_user,
created_by=created_by,
version=result.version,
)
return MemoryBlock(
tenant_id=tenant_id,
value=spec.template,
app_id=app_id,
conversation_id=conversation_id,
node_id=node_id,
spec=spec,
created_by=created_by,
version=1,
)
@staticmethod
def update_app_memory_if_needed(
workflow: Workflow,
conversation_id: str,
variable_pool: VariablePool,
created_by: MemoryCreatedBy,
is_draft: bool
):
visible_messages = ChatflowHistoryService.get_visible_chat_history(
conversation_id=conversation_id,
app_id=workflow.app_id,
tenant_id=workflow.tenant_id,
node_id=None,
)
sync_blocks: list[MemoryBlock] = []
async_blocks: list[MemoryBlock] = []
for memory_spec in workflow.memory_blocks:
if memory_spec.scope == MemoryScope.APP:
memory = ChatflowMemoryService.get_memory_by_spec(
spec=memory_spec,
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
conversation_id=conversation_id,
node_id=None,
is_draft=is_draft,
created_by=created_by,
)
if ChatflowMemoryService._should_update_memory(memory, visible_messages):
if memory.spec.schedule_mode == MemoryScheduleMode.SYNC:
sync_blocks.append(memory)
else:
async_blocks.append(memory)
if not sync_blocks and not async_blocks:
return
# async mode: submit individual async tasks directly
for memory_block in async_blocks:
ChatflowMemoryService._app_submit_async_memory_update(
block=memory_block,
is_draft=is_draft,
variable_pool=variable_pool,
visible_messages=visible_messages,
conversation_id=conversation_id,
)
# sync mode: submit a batch update task
if sync_blocks:
ChatflowMemoryService._app_submit_sync_memory_batch_update(
sync_blocks=sync_blocks,
is_draft=is_draft,
conversation_id=conversation_id,
app_id=workflow.app_id,
visible_messages=visible_messages,
variable_pool=variable_pool
)
@staticmethod
def update_node_memory_if_needed(
tenant_id: str,
app_id: str,
node_id: str,
created_by: MemoryCreatedBy,
conversation_id: str,
memory_block_spec: MemoryBlockSpec,
variable_pool: VariablePool,
is_draft: bool
) -> bool:
visible_messages = ChatflowHistoryService.get_visible_chat_history(
conversation_id=conversation_id,
app_id=app_id,
tenant_id=tenant_id,
node_id=node_id,
)
memory_block = ChatflowMemoryService.get_memory_by_spec(
spec=memory_block_spec,
tenant_id=tenant_id,
app_id=app_id,
conversation_id=conversation_id,
node_id=node_id,
is_draft=is_draft,
created_by=created_by,
)
if not ChatflowMemoryService._should_update_memory(
memory_block=memory_block,
visible_history=visible_messages
):
return False
if memory_block_spec.schedule_mode == MemoryScheduleMode.SYNC:
# Node-level sync: blocking execution
ChatflowMemoryService._update_node_memory_sync(
visible_messages=visible_messages,
memory_block=memory_block,
variable_pool=variable_pool,
is_draft=is_draft,
conversation_id=conversation_id
)
else:
# Node-level async: execute asynchronously
ChatflowMemoryService._update_node_memory_async(
memory_block=memory_block,
visible_messages=visible_messages,
variable_pool=variable_pool,
is_draft=is_draft,
conversation_id=conversation_id
)
return True
@staticmethod
def wait_for_sync_memory_completion(workflow: Workflow, conversation_id: str):
"""Wait for sync memory update to complete, maximum 50 seconds"""
memory_blocks = workflow.memory_blocks
sync_memory_blocks = [
block for block in memory_blocks
if block.scope == MemoryScope.APP and block.schedule_mode == MemoryScheduleMode.SYNC
]
if not sync_memory_blocks:
return
lock_key = _get_memory_sync_lock_key(workflow.app_id, conversation_id)
# Retry up to 10 times, wait 5 seconds each time, total 50 seconds
max_retries = 10
retry_interval = 5
for i in range(max_retries):
if not redis_client.exists(lock_key):
# Lock doesn't exist, can continue
return
if i < max_retries - 1:
# Still have retry attempts, wait
time.sleep(retry_interval)
else:
# Maximum retry attempts reached, raise exception
raise MemorySyncTimeoutError(
app_id=workflow.app_id,
conversation_id=conversation_id
)
@staticmethod
def _convert_to_memory_blocks(
app: App,
created_by: MemoryCreatedBy,
raw_results: Sequence[ChatflowMemoryVariable]
) -> Sequence[MemoryBlock]:
workflow = WorkflowService().get_published_workflow(app)
if not workflow:
return []
results = []
for chatflow_memory_variable in raw_results:
spec = next(
(spec for spec in workflow.memory_blocks if spec.id == chatflow_memory_variable.memory_id),
None
)
if spec and chatflow_memory_variable.app_id:
memory_value_data = MemoryValueData.model_validate_json(chatflow_memory_variable.value)
results.append(
MemoryBlock(
spec=spec,
tenant_id=chatflow_memory_variable.tenant_id,
value=memory_value_data.value,
app_id=chatflow_memory_variable.app_id,
conversation_id=chatflow_memory_variable.conversation_id,
node_id=chatflow_memory_variable.node_id,
edited_by_user=memory_value_data.edited_by_user,
created_by=created_by,
version=chatflow_memory_variable.version,
)
)
return results
@staticmethod
def _should_update_memory(
memory_block: MemoryBlock,
visible_history: Sequence[PromptMessage]
) -> bool:
return len(visible_history) >= memory_block.spec.update_turns
@staticmethod
def _app_submit_async_memory_update(
block: MemoryBlock,
visible_messages: Sequence[PromptMessage],
variable_pool: VariablePool,
conversation_id: str,
is_draft: bool
):
thread = threading.Thread(
target=ChatflowMemoryService._perform_memory_update,
kwargs={
'memory_block': block,
'visible_messages': visible_messages,
'variable_pool': variable_pool,
'is_draft': is_draft,
'conversation_id': conversation_id
},
)
thread.start()
@staticmethod
def _app_submit_sync_memory_batch_update(
sync_blocks: Sequence[MemoryBlock],
app_id: str,
conversation_id: str,
visible_messages: Sequence[PromptMessage],
variable_pool: VariablePool,
is_draft: bool
):
"""Submit sync memory batch update task"""
thread = threading.Thread(
target=ChatflowMemoryService._batch_update_sync_memory,
kwargs={
'sync_blocks': sync_blocks,
'app_id': app_id,
'conversation_id': conversation_id,
'visible_messages': visible_messages,
'variable_pool': variable_pool,
'is_draft': is_draft
},
)
thread.start()
@staticmethod
def _batch_update_sync_memory(
sync_blocks: Sequence[MemoryBlock],
app_id: str,
conversation_id: str,
visible_messages: Sequence[PromptMessage],
variable_pool: VariablePool,
is_draft: bool
):
try:
lock_key = _get_memory_sync_lock_key(app_id, conversation_id)
with redis_client.lock(lock_key, timeout=120):
threads = []
for block in sync_blocks:
thread = threading.Thread(
target=ChatflowMemoryService._perform_memory_update,
kwargs={
'memory_block': block,
'visible_messages': visible_messages,
'variable_pool': variable_pool,
'is_draft': is_draft,
'conversation_id': conversation_id,
},
)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
except Exception as e:
logger.exception("Error batch updating memory", exc_info=e)
@staticmethod
def _update_node_memory_sync(
memory_block: MemoryBlock,
visible_messages: Sequence[PromptMessage],
variable_pool: VariablePool,
conversation_id: str,
is_draft: bool
):
ChatflowMemoryService._perform_memory_update(
memory_block=memory_block,
visible_messages=visible_messages,
variable_pool=variable_pool,
is_draft=is_draft,
conversation_id=conversation_id
)
@staticmethod
def _update_node_memory_async(
memory_block: MemoryBlock,
visible_messages: Sequence[PromptMessage],
variable_pool: VariablePool,
conversation_id: str,
is_draft: bool = False
):
thread = threading.Thread(
target=ChatflowMemoryService._perform_memory_update,
kwargs={
'memory_block': memory_block,
'visible_messages': visible_messages,
'variable_pool': variable_pool,
'is_draft': is_draft,
'conversation_id': conversation_id,
},
daemon=True
)
thread.start()
@staticmethod
def _perform_memory_update(
memory_block: MemoryBlock,
variable_pool: VariablePool,
conversation_id: str,
visible_messages: Sequence[PromptMessage],
is_draft: bool
):
updated_value = LLMGenerator.update_memory_block(
tenant_id=memory_block.tenant_id,
visible_history=ChatflowMemoryService._format_chat_history(visible_messages),
variable_pool=variable_pool,
memory_block=memory_block,
memory_spec=memory_block.spec,
)
updated_memory = MemoryBlock(
tenant_id=memory_block.tenant_id,
value=updated_value,
spec=memory_block.spec,
app_id=memory_block.app_id,
conversation_id=conversation_id,
node_id=memory_block.node_id,
edited_by_user=False,
created_by=memory_block.created_by,
version=memory_block.version + 1, # Increment version for business logic update
)
ChatflowMemoryService.save_memory(updated_memory, variable_pool, is_draft)
ChatflowHistoryService.update_visible_count(
conversation_id=conversation_id,
node_id=memory_block.node_id,
new_visible_count=memory_block.spec.preserved_turns,
app_id=memory_block.app_id,
tenant_id=memory_block.tenant_id
)
@staticmethod
def delete_memory(app: App, memory_id: str, created_by: MemoryCreatedBy):
workflow = WorkflowService().get_published_workflow(app)
if not workflow:
raise ValueError("Workflow not found")
memory_spec = next((it for it in workflow.memory_blocks if it.id == memory_id), None)
if not memory_spec or not memory_spec.end_user_editable:
raise ValueError("Memory not found or not deletable")
if created_by.account_id:
created_by_role = CreatorUserRole.ACCOUNT
created_by_id = created_by.account_id
else:
created_by_role = CreatorUserRole.END_USER
created_by_id = created_by.id
with Session(db.engine) as session:
stmt = delete(ChatflowMemoryVariable).where(
and_(
ChatflowMemoryVariable.tenant_id == app.tenant_id,
ChatflowMemoryVariable.app_id == app.id,
ChatflowMemoryVariable.memory_id == memory_id,
ChatflowMemoryVariable.created_by_role == created_by_role,
ChatflowMemoryVariable.created_by == created_by_id
)
)
session.execute(stmt)
session.commit()
@staticmethod
def delete_all_user_memories(app: App, created_by: MemoryCreatedBy):
if created_by.account_id:
created_by_role = CreatorUserRole.ACCOUNT
created_by_id = created_by.account_id
else:
created_by_role = CreatorUserRole.END_USER
created_by_id = created_by.id
with Session(db.engine) as session:
stmt = delete(ChatflowMemoryVariable).where(
and_(
ChatflowMemoryVariable.tenant_id == app.tenant_id,
ChatflowMemoryVariable.app_id == app.id,
ChatflowMemoryVariable.created_by_role == created_by_role,
ChatflowMemoryVariable.created_by == created_by_id
)
)
session.execute(stmt)
session.commit()
@staticmethod
def get_persistent_memories_with_conversation(
app: App,
created_by: MemoryCreatedBy,
conversation_id: str,
version: int | None = None
) -> Sequence[MemoryBlockWithConversation]:
"""Get persistent memories with conversation metadata (always None for persistent)"""
memory_blocks = ChatflowMemoryService.get_persistent_memories(app, created_by, version)
return [
MemoryBlockWithConversation.from_memory_block(
block,
ChatflowHistoryService.get_conversation_metadata(
app.tenant_id, app.id, conversation_id, block.node_id
)
)
for block in memory_blocks
]
@staticmethod
def get_session_memories_with_conversation(
app: App,
created_by: MemoryCreatedBy,
conversation_id: str,
version: int | None = None
) -> Sequence[MemoryBlockWithConversation]:
"""Get session memories with conversation metadata"""
memory_blocks = ChatflowMemoryService.get_session_memories(app, created_by, conversation_id, version)
return [
MemoryBlockWithConversation.from_memory_block(
block,
ChatflowHistoryService.get_conversation_metadata(
app.tenant_id, app.id, conversation_id, block.node_id
)
)
for block in memory_blocks
]
@staticmethod
def _format_chat_history(messages: Sequence[PromptMessage]) -> Sequence[tuple[str, str]]:
result = []
for message in messages:
result.append((str(message.role.value), message.get_text_content()))
return result
def _get_memory_sync_lock_key(app_id: str, conversation_id: str) -> str:
"""Generate Redis lock key for memory sync updates
Args:
app_id: Application ID
conversation_id: Conversation ID
Returns:
Formatted lock key
"""
return f"memory_sync_update:{app_id}:{conversation_id}"

View File

@@ -12,6 +12,7 @@ from core.app.app_config.entities import VariableEntityType
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.file import File
from core.memory.entities import MemoryBlockSpec, MemoryCreatedBy, MemoryScope
from core.repositories import DifyCoreRepositoryFactory
from core.variables import Variable
from core.variables.variables import VariableUnion
@@ -199,6 +200,7 @@ class WorkflowService:
account: Account,
environment_variables: Sequence[Variable],
conversation_variables: Sequence[Variable],
memory_blocks: Sequence[MemoryBlockSpec] | None = None,
) -> Workflow:
"""
Sync draft workflow
@@ -229,6 +231,7 @@ class WorkflowService:
environment_variables=environment_variables,
conversation_variables=conversation_variables,
)
workflow.memory_blocks = memory_blocks or []
db.session.add(workflow)
# update draft workflow if found
else:
@@ -238,6 +241,7 @@ class WorkflowService:
workflow.updated_at = naive_utc_now()
workflow.environment_variables = environment_variables
workflow.conversation_variables = conversation_variables
workflow.memory_blocks = memory_blocks or []
# commit db session changes
db.session.commit()
@@ -303,6 +307,7 @@ class WorkflowService:
marked_name=marked_name,
marked_comment=marked_comment,
rag_pipeline_variables=draft_workflow.rag_pipeline_variables,
memory_blocks=draft_workflow.memory_blocks,
features=draft_workflow.features,
)
@@ -660,17 +665,10 @@ class WorkflowService:
tenant_id=draft_workflow.tenant_id, start_node_data=start_data, user_inputs=user_inputs
)
# init variable pool
variable_pool = _setup_variable_pool(
query=query,
files=files or [],
user_id=account.id,
user_inputs=user_inputs,
workflow=draft_workflow,
# NOTE(QuantumGhost): We rely on `DraftVarLoader` to load conversation variables.
conversation_variables=[],
node_type=node_type,
conversation_id=conversation_id,
)
variable_pool = _setup_variable_pool(query=query, files=files or [], user_id=account.id,
user_inputs=user_inputs, workflow=draft_workflow,
node_type=node_type, conversation_id=conversation_id,
conversation_variables=[], is_draft=True)
else:
variable_pool = VariablePool(
@@ -1044,6 +1042,7 @@ def _setup_variable_pool(
node_type: NodeType,
conversation_id: str,
conversation_variables: list[Variable],
is_draft: bool
):
# Only inject system variables for START node type.
if node_type == NodeType.START or node_type.is_trigger_node:
@@ -1063,7 +1062,6 @@ def _setup_variable_pool(
system_variable.dialogue_count = 1
else:
system_variable = SystemVariable.empty()
# init variable pool
variable_pool = VariablePool(
system_variables=system_variable,
@@ -1072,6 +1070,12 @@ def _setup_variable_pool(
# Based on the definition of `VariableUnion`,
# `list[Variable]` can be safely used as `list[VariableUnion]` since they are compatible.
conversation_variables=cast(list[VariableUnion], conversation_variables), #
memory_blocks=_fetch_memory_blocks(
workflow,
MemoryCreatedBy(account_id=user_id),
conversation_id,
is_draft=is_draft
),
)
return variable_pool
@@ -1108,3 +1112,30 @@ def _rebuild_single_file(tenant_id: str, value: Any, variable_entity_type: Varia
return build_from_mappings(mappings=value, tenant_id=tenant_id)
else:
raise Exception("unreachable")
def _fetch_memory_blocks(
workflow: Workflow,
created_by: MemoryCreatedBy,
conversation_id: str,
is_draft: bool
) -> Mapping[str, str]:
memory_blocks = {}
memory_block_specs = workflow.memory_blocks
from services.chatflow_memory_service import ChatflowMemoryService
memories = ChatflowMemoryService.get_memories_by_specs(
memory_block_specs=memory_block_specs,
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
node_id=None,
conversation_id=conversation_id,
is_draft=is_draft,
created_by=created_by,
)
for memory in memories:
if memory.spec.scope == MemoryScope.APP:
memory_blocks[memory.spec.id] = memory.value
else: # NODE scope
memory_blocks[f"{memory.node_id}_{memory.spec.id}"] = memory.value
return memory_blocks