mirror of
https://github.com/langgenius/dify.git
synced 2025-12-21 15:02:26 +00:00
Compare commits
90 Commits
fix/codeow
...
feat/memor
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
47c1da05f2 | ||
|
|
7008be9fb7 | ||
|
|
b5cde91c92 | ||
|
|
b0f73d681c | ||
|
|
77f70c5973 | ||
|
|
e53147266c | ||
|
|
e47133569b | ||
|
|
f0ff2e1f2c | ||
|
|
89d53ecf50 | ||
|
|
bb8b6b494d | ||
|
|
5d5485e49d | ||
|
|
d6197e73b3 | ||
|
|
cc5ceca342 | ||
|
|
bfba8bec2d | ||
|
|
65a3646ce7 | ||
|
|
cb73335599 | ||
|
|
f4fa57dac9 | ||
|
|
7ca06931ec | ||
|
|
f4567fbf9e | ||
|
|
8fd088754a | ||
|
|
61d9428064 | ||
|
|
f6038a4557 | ||
|
|
c367f80ec5 | ||
|
|
791f33fd0b | ||
|
|
1e0a3b163e | ||
|
|
bb1f1a56a5 | ||
|
|
15be85514d | ||
|
|
8833fee232 | ||
|
|
5bf642c3f9 | ||
|
|
3d7d4182a6 | ||
|
|
75c221038d | ||
|
|
b7b5b0b8d0 | ||
|
|
6eab6a675c | ||
|
|
d94e598a89 | ||
|
|
28acb70118 | ||
|
|
7c35aaa99d | ||
|
|
a8c2a300f6 | ||
|
|
d654d9d8b1 | ||
|
|
394b7d09b8 | ||
|
|
e9313b9c1b | ||
|
|
ac5dd1f45a | ||
|
|
3005cf3282 | ||
|
|
54b272206e | ||
|
|
3d761a3189 | ||
|
|
e3903f34e4 | ||
|
|
f4f055fb36 | ||
|
|
8563ae5511 | ||
|
|
2c765ccfae | ||
|
|
626e7b2211 | ||
|
|
516b6b0fa8 | ||
|
|
613d086f1e | ||
|
|
9e0630f012 | ||
|
|
d6d9554954 | ||
|
|
2a532ab729 | ||
|
|
03eef65b25 | ||
|
|
ad07d63994 | ||
|
|
8685f055ea | ||
|
|
3b868a1cec | ||
|
|
ab389eaa8e | ||
|
|
008f778e8f | ||
|
|
6af168cb31 | ||
|
|
29f56cf0cf | ||
|
|
11b6ea742d | ||
|
|
05d231ad33 | ||
|
|
48f3c69c69 | ||
|
|
8b68020453 | ||
|
|
4d2fc66a8d | ||
|
|
f72ed4898c | ||
|
|
85a73181cc | ||
|
|
e31e4ab677 | ||
|
|
0d95c2192e | ||
|
|
1fa8b26e55 | ||
|
|
4b085d46f6 | ||
|
|
635c4ed4ce | ||
|
|
7ffcf8dd6f | ||
|
|
97cd21d3be | ||
|
|
a13cb7e1c5 | ||
|
|
7b602e9003 | ||
|
|
5a26ebec8f | ||
|
|
8341b8b1c1 | ||
|
|
bbb640c9a2 | ||
|
|
0c97bbf137 | ||
|
|
45fddc70d5 | ||
|
|
f977dc410a | ||
|
|
d535818505 | ||
|
|
fcf4e1f37d | ||
|
|
38130c8502 | ||
|
|
f284c91988 | ||
|
|
584b2cefa3 | ||
|
|
42091b4a79 |
8
api/.idea/vcs.xml
generated
8
api/.idea/vcs.xml
generated
@@ -1,5 +1,11 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="CommitMessageInspectionProfile">
|
||||
<profile version="1.0">
|
||||
<inspection_tool class="CommitFormat" enabled="true" level="WARNING" enabled_by_default="true" />
|
||||
<inspection_tool class="CommitNamingConvention" enabled="true" level="WARNING" enabled_by_default="true" />
|
||||
</profile>
|
||||
</component>
|
||||
<component name="IssueNavigationConfiguration">
|
||||
<option name="links">
|
||||
<list>
|
||||
@@ -14,4 +20,4 @@
|
||||
<mapping directory="" vcs="Git" />
|
||||
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
|
||||
</component>
|
||||
</project>
|
||||
</project>
|
||||
@@ -1,5 +1,3 @@
|
||||
from collections.abc import Sequence
|
||||
|
||||
from flask_restx import Resource, fields, reqparse
|
||||
|
||||
from controllers.console import console_ns
|
||||
@@ -11,14 +9,9 @@ from controllers.console.app.error import (
|
||||
)
|
||||
from controllers.console.wraps import account_initialization_required, setup_required
|
||||
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
|
||||
from core.helper.code_executor.code_node_provider import CodeNodeProvider
|
||||
from core.helper.code_executor.javascript.javascript_code_provider import JavascriptCodeProvider
|
||||
from core.helper.code_executor.python3.python3_code_provider import Python3CodeProvider
|
||||
from core.llm_generator.llm_generator import LLMGenerator
|
||||
from core.model_runtime.errors.invoke import InvokeError
|
||||
from extensions.ext_database import db
|
||||
from libs.login import current_account_with_tenant, login_required
|
||||
from models import App
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
|
||||
@@ -178,11 +171,29 @@ class InstructionGenerateApi(Resource):
|
||||
console_ns.model(
|
||||
"InstructionGenerateRequest",
|
||||
{
|
||||
"flow_id": fields.String(required=True, description="Workflow/Flow ID"),
|
||||
"node_id": fields.String(description="Node ID for workflow context"),
|
||||
"current": fields.String(description="Current instruction text"),
|
||||
"language": fields.String(default="javascript", description="Programming language (javascript/python)"),
|
||||
"instruction": fields.String(required=True, description="Instruction for generation"),
|
||||
"type": fields.String(
|
||||
required=True,
|
||||
description="Request type",
|
||||
enum=[
|
||||
"legacy_prompt_generate",
|
||||
"workflow_prompt_generate",
|
||||
"workflow_code_generate",
|
||||
"workflow_prompt_edit",
|
||||
"workflow_code_edit",
|
||||
"memory_template_generate",
|
||||
"memory_instruction_generate",
|
||||
"memory_template_edit",
|
||||
"memory_instruction_edit",
|
||||
]
|
||||
),
|
||||
"flow_id": fields.String(description="Workflow/Flow ID"),
|
||||
"node_id": fields.String(description="Node ID (optional)"),
|
||||
"current": fields.String(description="Current content"),
|
||||
"language": fields.String(
|
||||
default="javascript",
|
||||
description="Programming language (javascript/python)"
|
||||
),
|
||||
"instruction": fields.String(required=True, description="User instruction"),
|
||||
"model_config": fields.Raw(required=True, description="Model configuration"),
|
||||
"ideal_output": fields.String(description="Expected ideal output"),
|
||||
},
|
||||
@@ -197,7 +208,8 @@ class InstructionGenerateApi(Resource):
|
||||
def post(self):
|
||||
parser = (
|
||||
reqparse.RequestParser()
|
||||
.add_argument("flow_id", type=str, required=True, default="", location="json")
|
||||
.add_argument("type", type=str, required=True, nullable=False, location="json")
|
||||
.add_argument("flow_id", type=str, required=False, default="", location="json")
|
||||
.add_argument("node_id", type=str, required=False, default="", location="json")
|
||||
.add_argument("current", type=str, required=False, default="", location="json")
|
||||
.add_argument("language", type=str, required=False, default="javascript", location="json")
|
||||
@@ -207,70 +219,16 @@ class InstructionGenerateApi(Resource):
|
||||
)
|
||||
args = parser.parse_args()
|
||||
_, current_tenant_id = current_account_with_tenant()
|
||||
providers: list[type[CodeNodeProvider]] = [Python3CodeProvider, JavascriptCodeProvider]
|
||||
code_provider: type[CodeNodeProvider] | None = next(
|
||||
(p for p in providers if p.is_accept_language(args["language"])), None
|
||||
)
|
||||
code_template = code_provider.get_default_code() if code_provider else ""
|
||||
|
||||
try:
|
||||
# Generate from nothing for a workflow node
|
||||
if (args["current"] == code_template or args["current"] == "") and args["node_id"] != "":
|
||||
app = db.session.query(App).where(App.id == args["flow_id"]).first()
|
||||
if not app:
|
||||
return {"error": f"app {args['flow_id']} not found"}, 400
|
||||
workflow = WorkflowService().get_draft_workflow(app_model=app)
|
||||
if not workflow:
|
||||
return {"error": f"workflow {args['flow_id']} not found"}, 400
|
||||
nodes: Sequence = workflow.graph_dict["nodes"]
|
||||
node = [node for node in nodes if node["id"] == args["node_id"]]
|
||||
if len(node) == 0:
|
||||
return {"error": f"node {args['node_id']} not found"}, 400
|
||||
node_type = node[0]["data"]["type"]
|
||||
match node_type:
|
||||
case "llm":
|
||||
return LLMGenerator.generate_rule_config(
|
||||
current_tenant_id,
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
no_variable=True,
|
||||
)
|
||||
case "agent":
|
||||
return LLMGenerator.generate_rule_config(
|
||||
current_tenant_id,
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
no_variable=True,
|
||||
)
|
||||
case "code":
|
||||
return LLMGenerator.generate_code(
|
||||
tenant_id=current_tenant_id,
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
code_language=args["language"],
|
||||
)
|
||||
case _:
|
||||
return {"error": f"invalid node type: {node_type}"}
|
||||
if args["node_id"] == "" and args["current"] != "": # For legacy app without a workflow
|
||||
return LLMGenerator.instruction_modify_legacy(
|
||||
tenant_id=current_tenant_id,
|
||||
flow_id=args["flow_id"],
|
||||
current=args["current"],
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
ideal_output=args["ideal_output"],
|
||||
)
|
||||
if args["node_id"] != "" and args["current"] != "": # For workflow node
|
||||
return LLMGenerator.instruction_modify_workflow(
|
||||
tenant_id=current_tenant_id,
|
||||
flow_id=args["flow_id"],
|
||||
node_id=args["node_id"],
|
||||
current=args["current"],
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
ideal_output=args["ideal_output"],
|
||||
workflow_service=WorkflowService(),
|
||||
)
|
||||
return {"error": "incompatible parameters"}, 400
|
||||
# Validate parameters
|
||||
is_valid, error_message = self._validate_params(args["type"], args)
|
||||
if not is_valid:
|
||||
return {"error": error_message}, 400
|
||||
|
||||
# Route based on type
|
||||
return self._handle_by_type(args["type"], args, current_tenant_id)
|
||||
|
||||
except ProviderTokenNotInitError as ex:
|
||||
raise ProviderNotInitializeError(ex.description)
|
||||
except QuotaExceededError:
|
||||
@@ -280,6 +238,131 @@ class InstructionGenerateApi(Resource):
|
||||
except InvokeError as e:
|
||||
raise CompletionRequestError(e.description)
|
||||
|
||||
def _validate_params(self, request_type: str, args: dict) -> tuple[bool, str]:
|
||||
"""
|
||||
Validate request parameters
|
||||
|
||||
Returns:
|
||||
(is_valid, error_message)
|
||||
"""
|
||||
# All types require instruction and model_config
|
||||
if not args.get("instruction"):
|
||||
return False, "instruction is required"
|
||||
if not args.get("model_config"):
|
||||
return False, "model_config is required"
|
||||
|
||||
# Edit types require flow_id and current
|
||||
if request_type.endswith("_edit"):
|
||||
if not args.get("flow_id"):
|
||||
return False, f"{request_type} requires flow_id"
|
||||
if not args.get("current"):
|
||||
return False, f"{request_type} requires current content"
|
||||
|
||||
# Code generate requires language
|
||||
if request_type == "workflow_code_generate":
|
||||
if args.get("language") not in ["python", "javascript"]:
|
||||
return False, "language must be 'python' or 'javascript'"
|
||||
|
||||
return True, ""
|
||||
|
||||
def _handle_by_type(self, request_type: str, args: dict, tenant_id: str):
|
||||
"""
|
||||
Route handling based on type
|
||||
"""
|
||||
match request_type:
|
||||
case "legacy_prompt_generate":
|
||||
# Legacy prompt generation doesn't exist, this is actually an edit
|
||||
if not args.get("flow_id"):
|
||||
return {"error": "legacy_prompt_generate requires flow_id"}, 400
|
||||
return LLMGenerator.instruction_modify_legacy(
|
||||
tenant_id=tenant_id,
|
||||
flow_id=args["flow_id"],
|
||||
current=args["current"],
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
ideal_output=args["ideal_output"],
|
||||
)
|
||||
|
||||
case "workflow_prompt_generate":
|
||||
return LLMGenerator.generate_rule_config(
|
||||
tenant_id,
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
no_variable=True,
|
||||
)
|
||||
|
||||
case "workflow_code_generate":
|
||||
return LLMGenerator.generate_code(
|
||||
tenant_id=tenant_id,
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
code_language=args["language"],
|
||||
)
|
||||
|
||||
case "workflow_prompt_edit":
|
||||
return LLMGenerator.instruction_modify_workflow(
|
||||
tenant_id=tenant_id,
|
||||
flow_id=args["flow_id"],
|
||||
node_id=args["node_id"],
|
||||
current=args["current"],
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
ideal_output=args["ideal_output"],
|
||||
workflow_service=WorkflowService(),
|
||||
)
|
||||
|
||||
case "workflow_code_edit":
|
||||
# Code edit uses the same workflow edit logic
|
||||
return LLMGenerator.instruction_modify_workflow(
|
||||
tenant_id=tenant_id,
|
||||
flow_id=args["flow_id"],
|
||||
node_id=args["node_id"],
|
||||
current=args["current"],
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
ideal_output=args["ideal_output"],
|
||||
workflow_service=WorkflowService(),
|
||||
)
|
||||
|
||||
case "memory_template_generate":
|
||||
return LLMGenerator.generate_memory_template(
|
||||
tenant_id=tenant_id,
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
)
|
||||
|
||||
case "memory_instruction_generate":
|
||||
return LLMGenerator.generate_memory_instruction(
|
||||
tenant_id=tenant_id,
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
)
|
||||
|
||||
case "memory_template_edit":
|
||||
return LLMGenerator.edit_memory_template(
|
||||
tenant_id=tenant_id,
|
||||
flow_id=args["flow_id"],
|
||||
node_id=args.get("node_id") or None,
|
||||
current=args["current"],
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
ideal_output=args["ideal_output"],
|
||||
)
|
||||
|
||||
case "memory_instruction_edit":
|
||||
return LLMGenerator.edit_memory_instruction(
|
||||
tenant_id=tenant_id,
|
||||
flow_id=args["flow_id"],
|
||||
node_id=args.get("node_id") or None,
|
||||
current=args["current"],
|
||||
instruction=args["instruction"],
|
||||
model_config=args["model_config"],
|
||||
ideal_output=args["ideal_output"],
|
||||
)
|
||||
|
||||
case _:
|
||||
return {"error": f"Invalid request type: {request_type}"}, 400
|
||||
|
||||
|
||||
@console_ns.route("/instruction-generate/template")
|
||||
class InstructionGenerationTemplateApi(Resource):
|
||||
|
||||
@@ -5,6 +5,7 @@ from typing import cast
|
||||
|
||||
from flask import abort, request
|
||||
from flask_restx import Resource, fields, inputs, marshal_with, reqparse
|
||||
from pydantic_core import ValidationError
|
||||
from sqlalchemy.orm import Session
|
||||
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
|
||||
|
||||
@@ -110,6 +111,7 @@ class DraftWorkflowApi(Resource):
|
||||
"hash": fields.String(description="Workflow hash for validation"),
|
||||
"environment_variables": fields.List(fields.Raw, required=True, description="Environment variables"),
|
||||
"conversation_variables": fields.List(fields.Raw, description="Conversation variables"),
|
||||
"memory_blocks": fields.List(fields.Raw, description="Memory blocks"),
|
||||
},
|
||||
)
|
||||
)
|
||||
@@ -144,6 +146,7 @@ class DraftWorkflowApi(Resource):
|
||||
.add_argument("hash", type=str, required=False, location="json")
|
||||
.add_argument("environment_variables", type=list, required=True, location="json")
|
||||
.add_argument("conversation_variables", type=list, required=False, location="json")
|
||||
.add_argument("memory_blocks", type=list, required=False, location="json")
|
||||
)
|
||||
args = parser.parse_args()
|
||||
elif "text/plain" in content_type:
|
||||
@@ -161,6 +164,7 @@ class DraftWorkflowApi(Resource):
|
||||
"hash": data.get("hash"),
|
||||
"environment_variables": data.get("environment_variables"),
|
||||
"conversation_variables": data.get("conversation_variables"),
|
||||
"memory_blocks": data.get("memory_blocks"),
|
||||
}
|
||||
except json.JSONDecodeError:
|
||||
return {"message": "Invalid JSON data"}, 400
|
||||
@@ -177,6 +181,11 @@ class DraftWorkflowApi(Resource):
|
||||
conversation_variables = [
|
||||
variable_factory.build_conversation_variable_from_mapping(obj) for obj in conversation_variables_list
|
||||
]
|
||||
memory_blocks_list = args.get("memory_blocks") or []
|
||||
from core.memory.entities import MemoryBlockSpec
|
||||
memory_blocks = [
|
||||
MemoryBlockSpec.model_validate(obj) for obj in memory_blocks_list
|
||||
]
|
||||
workflow = workflow_service.sync_draft_workflow(
|
||||
app_model=app_model,
|
||||
graph=args["graph"],
|
||||
@@ -185,9 +194,12 @@ class DraftWorkflowApi(Resource):
|
||||
account=current_user,
|
||||
environment_variables=environment_variables,
|
||||
conversation_variables=conversation_variables,
|
||||
memory_blocks=memory_blocks,
|
||||
)
|
||||
except WorkflowHashNotEqualError:
|
||||
raise DraftWorkflowNotSync()
|
||||
except ValidationError as e:
|
||||
return {"message": str(e)}, 400
|
||||
|
||||
return {
|
||||
"result": "success",
|
||||
|
||||
@@ -19,6 +19,7 @@ from .app import (
|
||||
annotation,
|
||||
app,
|
||||
audio,
|
||||
chatflow_memory,
|
||||
completion,
|
||||
conversation,
|
||||
file,
|
||||
@@ -40,6 +41,7 @@ __all__ = [
|
||||
"annotation",
|
||||
"app",
|
||||
"audio",
|
||||
"chatflow_memory",
|
||||
"completion",
|
||||
"conversation",
|
||||
"dataset",
|
||||
|
||||
124
api/controllers/service_api/app/chatflow_memory.py
Normal file
124
api/controllers/service_api/app/chatflow_memory.py
Normal file
@@ -0,0 +1,124 @@
|
||||
from flask_restx import Resource, reqparse
|
||||
|
||||
from controllers.service_api import api
|
||||
from controllers.service_api.wraps import FetchUserArg, WhereisUserArg, validate_app_token
|
||||
from core.memory.entities import MemoryBlock, MemoryCreatedBy
|
||||
from core.workflow.runtime import VariablePool
|
||||
from models import App, EndUser
|
||||
from services.chatflow_memory_service import ChatflowMemoryService
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
|
||||
class MemoryListApi(Resource):
|
||||
@validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
|
||||
def get(self, app_model: App, end_user: EndUser):
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("conversation_id", required=False, type=str | None, default=None)
|
||||
parser.add_argument("memory_id", required=False, type=str | None, default=None)
|
||||
parser.add_argument("version", required=False, type=int | None, default=None)
|
||||
args = parser.parse_args()
|
||||
conversation_id: str | None = args.get("conversation_id")
|
||||
memory_id = args.get("memory_id")
|
||||
version = args.get("version")
|
||||
|
||||
if conversation_id:
|
||||
result = ChatflowMemoryService.get_persistent_memories_with_conversation(
|
||||
app_model,
|
||||
MemoryCreatedBy(end_user_id=end_user.id),
|
||||
conversation_id,
|
||||
version
|
||||
)
|
||||
session_memories = ChatflowMemoryService.get_session_memories_with_conversation(
|
||||
app_model,
|
||||
MemoryCreatedBy(end_user_id=end_user.id),
|
||||
conversation_id,
|
||||
version
|
||||
)
|
||||
result = [*result, *session_memories]
|
||||
else:
|
||||
result = ChatflowMemoryService.get_persistent_memories(
|
||||
app_model,
|
||||
MemoryCreatedBy(end_user_id=end_user.id),
|
||||
version
|
||||
)
|
||||
|
||||
if memory_id:
|
||||
result = [it for it in result if it.spec.id == memory_id]
|
||||
return [it for it in result if it.spec.end_user_visible]
|
||||
|
||||
|
||||
class MemoryEditApi(Resource):
|
||||
@validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
|
||||
def put(self, app_model: App, end_user: EndUser):
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument('id', type=str, required=True)
|
||||
parser.add_argument("conversation_id", type=str | None, required=False, default=None)
|
||||
parser.add_argument('node_id', type=str | None, required=False, default=None)
|
||||
parser.add_argument('update', type=str, required=True)
|
||||
args = parser.parse_args()
|
||||
workflow = WorkflowService().get_published_workflow(app_model)
|
||||
update = args.get("update")
|
||||
conversation_id = args.get("conversation_id")
|
||||
node_id = args.get("node_id")
|
||||
if not isinstance(update, str):
|
||||
return {'error': 'Invalid update'}, 400
|
||||
if not workflow:
|
||||
return {'error': 'Workflow not found'}, 404
|
||||
memory_spec = next((it for it in workflow.memory_blocks if it.id == args['id']), None)
|
||||
if not memory_spec:
|
||||
return {'error': 'Memory not found'}, 404
|
||||
|
||||
# First get existing memory
|
||||
existing_memory = ChatflowMemoryService.get_memory_by_spec(
|
||||
spec=memory_spec,
|
||||
tenant_id=app_model.tenant_id,
|
||||
app_id=app_model.id,
|
||||
created_by=MemoryCreatedBy(end_user_id=end_user.id),
|
||||
conversation_id=conversation_id,
|
||||
node_id=node_id,
|
||||
is_draft=False
|
||||
)
|
||||
|
||||
# Create updated memory instance with incremented version
|
||||
updated_memory = MemoryBlock(
|
||||
spec=existing_memory.spec,
|
||||
tenant_id=existing_memory.tenant_id,
|
||||
app_id=existing_memory.app_id,
|
||||
conversation_id=existing_memory.conversation_id,
|
||||
node_id=existing_memory.node_id,
|
||||
value=update, # New value
|
||||
version=existing_memory.version + 1, # Increment version for update
|
||||
edited_by_user=True,
|
||||
created_by=existing_memory.created_by,
|
||||
)
|
||||
|
||||
ChatflowMemoryService.save_memory(updated_memory, VariablePool(), False)
|
||||
return '', 204
|
||||
|
||||
|
||||
class MemoryDeleteApi(Resource):
|
||||
@validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
|
||||
def delete(self, app_model: App, end_user: EndUser):
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument('id', type=str, required=False, default=None)
|
||||
args = parser.parse_args()
|
||||
memory_id = args.get('id')
|
||||
|
||||
if memory_id:
|
||||
ChatflowMemoryService.delete_memory(
|
||||
app_model,
|
||||
memory_id,
|
||||
MemoryCreatedBy(end_user_id=end_user.id)
|
||||
)
|
||||
return '', 204
|
||||
else:
|
||||
ChatflowMemoryService.delete_all_user_memories(
|
||||
app_model,
|
||||
MemoryCreatedBy(end_user_id=end_user.id)
|
||||
)
|
||||
return '', 200
|
||||
|
||||
|
||||
api.add_resource(MemoryListApi, '/memories')
|
||||
api.add_resource(MemoryEditApi, '/memory-edit')
|
||||
api.add_resource(MemoryDeleteApi, '/memories')
|
||||
@@ -18,6 +18,7 @@ web_ns = Namespace("web", description="Web application API operations", path="/"
|
||||
from . import (
|
||||
app,
|
||||
audio,
|
||||
chatflow_memory,
|
||||
completion,
|
||||
conversation,
|
||||
feature,
|
||||
@@ -39,6 +40,7 @@ __all__ = [
|
||||
"app",
|
||||
"audio",
|
||||
"bp",
|
||||
"chatflow_memory",
|
||||
"completion",
|
||||
"conversation",
|
||||
"feature",
|
||||
|
||||
123
api/controllers/web/chatflow_memory.py
Normal file
123
api/controllers/web/chatflow_memory.py
Normal file
@@ -0,0 +1,123 @@
|
||||
from flask_restx import reqparse
|
||||
|
||||
from controllers.web import api
|
||||
from controllers.web.wraps import WebApiResource
|
||||
from core.memory.entities import MemoryBlock, MemoryCreatedBy
|
||||
from core.workflow.runtime import VariablePool
|
||||
from models import App, EndUser
|
||||
from services.chatflow_memory_service import ChatflowMemoryService
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
|
||||
class MemoryListApi(WebApiResource):
|
||||
def get(self, app_model: App, end_user: EndUser):
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("conversation_id", required=False, type=str | None, default=None)
|
||||
parser.add_argument("memory_id", required=False, type=str | None, default=None)
|
||||
parser.add_argument("version", required=False, type=int | None, default=None)
|
||||
args = parser.parse_args()
|
||||
conversation_id: str | None = args.get("conversation_id")
|
||||
memory_id = args.get("memory_id")
|
||||
version = args.get("version")
|
||||
|
||||
if conversation_id:
|
||||
result = ChatflowMemoryService.get_persistent_memories_with_conversation(
|
||||
app_model,
|
||||
MemoryCreatedBy(end_user_id=end_user.id),
|
||||
conversation_id,
|
||||
version
|
||||
)
|
||||
session_memories = ChatflowMemoryService.get_session_memories_with_conversation(
|
||||
app_model,
|
||||
MemoryCreatedBy(end_user_id=end_user.id),
|
||||
conversation_id,
|
||||
version
|
||||
)
|
||||
result = [*result, *session_memories]
|
||||
else:
|
||||
result = ChatflowMemoryService.get_persistent_memories(
|
||||
app_model,
|
||||
MemoryCreatedBy(end_user_id=end_user.id),
|
||||
version
|
||||
)
|
||||
|
||||
if memory_id:
|
||||
result = [it for it in result if it.spec.id == memory_id]
|
||||
return [it for it in result if it.spec.end_user_visible]
|
||||
|
||||
|
||||
class MemoryEditApi(WebApiResource):
|
||||
def put(self, app_model: App, end_user: EndUser):
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument('id', type=str, required=True)
|
||||
parser.add_argument("conversation_id", type=str | None, required=False, default=None)
|
||||
parser.add_argument('node_id', type=str | None, required=False, default=None)
|
||||
parser.add_argument('update', type=str, required=True)
|
||||
args = parser.parse_args()
|
||||
workflow = WorkflowService().get_published_workflow(app_model)
|
||||
update = args.get("update")
|
||||
conversation_id = args.get("conversation_id")
|
||||
node_id = args.get("node_id")
|
||||
if not isinstance(update, str):
|
||||
return {'error': 'Update must be a string'}, 400
|
||||
if not workflow:
|
||||
return {'error': 'Workflow not found'}, 404
|
||||
memory_spec = next((it for it in workflow.memory_blocks if it.id == args['id']), None)
|
||||
if not memory_spec:
|
||||
return {'error': 'Memory not found'}, 404
|
||||
if not memory_spec.end_user_editable:
|
||||
return {'error': 'Memory not editable'}, 403
|
||||
|
||||
# First get existing memory
|
||||
existing_memory = ChatflowMemoryService.get_memory_by_spec(
|
||||
spec=memory_spec,
|
||||
tenant_id=app_model.tenant_id,
|
||||
app_id=app_model.id,
|
||||
created_by=MemoryCreatedBy(end_user_id=end_user.id),
|
||||
conversation_id=conversation_id,
|
||||
node_id=node_id,
|
||||
is_draft=False
|
||||
)
|
||||
|
||||
# Create updated memory instance with incremented version
|
||||
updated_memory = MemoryBlock(
|
||||
spec=existing_memory.spec,
|
||||
tenant_id=existing_memory.tenant_id,
|
||||
app_id=existing_memory.app_id,
|
||||
conversation_id=existing_memory.conversation_id,
|
||||
node_id=existing_memory.node_id,
|
||||
value=update, # New value
|
||||
version=existing_memory.version + 1, # Increment version for update
|
||||
edited_by_user=True,
|
||||
created_by=existing_memory.created_by,
|
||||
)
|
||||
|
||||
ChatflowMemoryService.save_memory(updated_memory, VariablePool(), False)
|
||||
return '', 204
|
||||
|
||||
|
||||
class MemoryDeleteApi(WebApiResource):
|
||||
def delete(self, app_model: App, end_user: EndUser):
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument('id', type=str, required=False, default=None)
|
||||
args = parser.parse_args()
|
||||
memory_id = args.get('id')
|
||||
|
||||
if memory_id:
|
||||
ChatflowMemoryService.delete_memory(
|
||||
app_model,
|
||||
memory_id,
|
||||
MemoryCreatedBy(end_user_id=end_user.id)
|
||||
)
|
||||
return '', 204
|
||||
else:
|
||||
ChatflowMemoryService.delete_all_user_memories(
|
||||
app_model,
|
||||
MemoryCreatedBy(end_user_id=end_user.id)
|
||||
)
|
||||
return '', 200
|
||||
|
||||
|
||||
api.add_resource(MemoryListApi, '/memories')
|
||||
api.add_resource(MemoryEditApi, '/memory-edit')
|
||||
api.add_resource(MemoryDeleteApi, '/memories')
|
||||
@@ -1,10 +1,11 @@
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Mapping, Sequence
|
||||
from collections.abc import Mapping, MutableMapping, Sequence
|
||||
from typing import Any, cast
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
from typing_extensions import override
|
||||
|
||||
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig
|
||||
from core.app.apps.base_app_queue_manager import AppQueueManager
|
||||
@@ -20,6 +21,8 @@ from core.app.entities.queue_entities import (
|
||||
QueueTextChunkEvent,
|
||||
)
|
||||
from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature
|
||||
from core.memory.entities import MemoryCreatedBy, MemoryScope
|
||||
from core.model_runtime.entities import AssistantPromptMessage, UserPromptMessage
|
||||
from core.moderation.base import ModerationError
|
||||
from core.moderation.input_moderation import InputModeration
|
||||
from core.variables.variables import VariableUnion
|
||||
@@ -27,6 +30,7 @@ from core.workflow.enums import WorkflowType
|
||||
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
|
||||
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||||
from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
|
||||
from core.workflow.graph_events import GraphRunSucceededEvent
|
||||
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
|
||||
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
|
||||
from core.workflow.runtime import GraphRuntimeState, VariablePool
|
||||
@@ -39,6 +43,8 @@ from models import Workflow
|
||||
from models.enums import UserFrom
|
||||
from models.model import App, Conversation, Message, MessageAnnotation
|
||||
from models.workflow import ConversationVariable
|
||||
from services.chatflow_history_service import ChatflowHistoryService
|
||||
from services.chatflow_memory_service import ChatflowMemoryService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -81,6 +87,11 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
self._workflow_node_execution_repository = workflow_node_execution_repository
|
||||
|
||||
def run(self):
|
||||
ChatflowMemoryService.wait_for_sync_memory_completion(
|
||||
workflow=self._workflow,
|
||||
conversation_id=self.conversation.id
|
||||
)
|
||||
|
||||
app_config = self.application_generate_entity.app_config
|
||||
app_config = cast(AdvancedChatAppConfig, app_config)
|
||||
|
||||
@@ -143,6 +154,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
# Based on the definition of `VariableUnion`,
|
||||
# `list[Variable]` can be safely used as `list[VariableUnion]` since they are compatible.
|
||||
conversation_variables=conversation_variables,
|
||||
memory_blocks=self._fetch_memory_blocks(),
|
||||
)
|
||||
|
||||
# init graph
|
||||
@@ -206,6 +218,31 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
for event in generator:
|
||||
self._handle_event(workflow_entry, event)
|
||||
|
||||
try:
|
||||
self._check_app_memory_updates(variable_pool)
|
||||
except Exception as e:
|
||||
logger.exception("Failed to check app memory updates", exc_info=e)
|
||||
|
||||
@override
|
||||
def _handle_event(self, workflow_entry: WorkflowEntry, event: Any) -> None:
|
||||
super()._handle_event(workflow_entry, event)
|
||||
if isinstance(event, GraphRunSucceededEvent):
|
||||
workflow_outputs = event.outputs
|
||||
if not workflow_outputs:
|
||||
logger.warning("Chatflow output is empty.")
|
||||
return
|
||||
assistant_message = workflow_outputs.get('answer')
|
||||
if not assistant_message:
|
||||
logger.warning("Chatflow output does not contain 'answer'.")
|
||||
return
|
||||
if not isinstance(assistant_message, str):
|
||||
logger.warning("Chatflow output 'answer' is not a string.")
|
||||
return
|
||||
try:
|
||||
self._sync_conversation_to_chatflow_tables(assistant_message)
|
||||
except Exception as e:
|
||||
logger.exception("Failed to sync conversation to memory tables", exc_info=e)
|
||||
|
||||
def handle_input_moderation(
|
||||
self,
|
||||
app_record: App,
|
||||
@@ -403,3 +440,67 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
|
||||
# Return combined list
|
||||
return existing_variables + new_variables
|
||||
|
||||
def _fetch_memory_blocks(self) -> Mapping[str, str]:
|
||||
"""fetch all memory blocks for current app"""
|
||||
|
||||
memory_blocks_dict: MutableMapping[str, str] = {}
|
||||
is_draft = (self.application_generate_entity.invoke_from == InvokeFrom.DEBUGGER)
|
||||
conversation_id = self.conversation.id
|
||||
memory_block_specs = self._workflow.memory_blocks
|
||||
# Get runtime memory values
|
||||
memories = ChatflowMemoryService.get_memories_by_specs(
|
||||
memory_block_specs=memory_block_specs,
|
||||
tenant_id=self._workflow.tenant_id,
|
||||
app_id=self._workflow.app_id,
|
||||
node_id=None,
|
||||
conversation_id=conversation_id,
|
||||
is_draft=is_draft,
|
||||
created_by=self._get_created_by(),
|
||||
)
|
||||
|
||||
# Build memory_id -> value mapping
|
||||
for memory in memories:
|
||||
if memory.spec.scope == MemoryScope.APP:
|
||||
# App level: use memory_id directly
|
||||
memory_blocks_dict[memory.spec.id] = memory.value
|
||||
else: # NODE scope
|
||||
node_id = memory.node_id
|
||||
if not node_id:
|
||||
logger.warning("Memory block %s has no node_id, skip.", memory.spec.id)
|
||||
continue
|
||||
key = f"{node_id}_{memory.spec.id}"
|
||||
memory_blocks_dict[key] = memory.value
|
||||
|
||||
return memory_blocks_dict
|
||||
|
||||
def _sync_conversation_to_chatflow_tables(self, assistant_message: str):
|
||||
ChatflowHistoryService.save_app_message(
|
||||
prompt_message=UserPromptMessage(content=(self.application_generate_entity.query)),
|
||||
conversation_id=self.conversation.id,
|
||||
app_id=self._workflow.app_id,
|
||||
tenant_id=self._workflow.tenant_id
|
||||
)
|
||||
ChatflowHistoryService.save_app_message(
|
||||
prompt_message=AssistantPromptMessage(content=assistant_message),
|
||||
conversation_id=self.conversation.id,
|
||||
app_id=self._workflow.app_id,
|
||||
tenant_id=self._workflow.tenant_id
|
||||
)
|
||||
|
||||
def _check_app_memory_updates(self, variable_pool: VariablePool):
|
||||
is_draft = (self.application_generate_entity.invoke_from == InvokeFrom.DEBUGGER)
|
||||
|
||||
ChatflowMemoryService.update_app_memory_if_needed(
|
||||
workflow=self._workflow,
|
||||
conversation_id=self.conversation.id,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft,
|
||||
created_by=self._get_created_by()
|
||||
)
|
||||
|
||||
def _get_created_by(self) -> MemoryCreatedBy:
|
||||
if self.application_generate_entity.invoke_from in {InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE}:
|
||||
return MemoryCreatedBy(account_id=self.application_generate_entity.user_id)
|
||||
else:
|
||||
return MemoryCreatedBy(end_user_id=self.application_generate_entity.user_id)
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from collections.abc import Sequence
|
||||
from typing import Protocol, cast
|
||||
from collections.abc import Callable, Mapping, Sequence
|
||||
from typing import Any, Protocol, cast
|
||||
|
||||
import json_repair
|
||||
|
||||
@@ -14,10 +14,16 @@ from core.llm_generator.prompts import (
|
||||
JAVASCRIPT_CODE_GENERATOR_PROMPT_TEMPLATE,
|
||||
LLM_MODIFY_CODE_SYSTEM,
|
||||
LLM_MODIFY_PROMPT_SYSTEM,
|
||||
MEMORY_INSTRUCTION_EDIT_SYSTEM_PROMPT,
|
||||
MEMORY_INSTRUCTION_GENERATION_SYSTEM_PROMPT,
|
||||
MEMORY_TEMPLATE_EDIT_SYSTEM_PROMPT,
|
||||
MEMORY_TEMPLATE_GENERATION_SYSTEM_PROMPT,
|
||||
MEMORY_UPDATE_PROMPT,
|
||||
PYTHON_CODE_GENERATOR_PROMPT_TEMPLATE,
|
||||
SYSTEM_STRUCTURED_OUTPUT_GENERATE,
|
||||
WORKFLOW_RULE_CONFIG_PROMPT_GENERATE_TEMPLATE,
|
||||
)
|
||||
from core.memory.entities import MemoryBlock, MemoryBlockSpec
|
||||
from core.model_manager import ModelManager
|
||||
from core.model_runtime.entities.llm_entities import LLMResult
|
||||
from core.model_runtime.entities.message_entities import PromptMessage, SystemPromptMessage, UserPromptMessage
|
||||
@@ -28,6 +34,7 @@ from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
|
||||
from core.ops.utils import measure_time
|
||||
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
|
||||
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
|
||||
from core.workflow.runtime import VariablePool
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_storage import storage
|
||||
from models import App, Message, WorkflowNodeExecutionModel
|
||||
@@ -506,16 +513,17 @@ class LLMGenerator:
|
||||
node_type: str,
|
||||
ideal_output: str | None,
|
||||
):
|
||||
LAST_RUN = "{{#last_run#}}"
|
||||
CURRENT = "{{#current#}}"
|
||||
ERROR_MESSAGE = "{{#error_message#}}"
|
||||
injected_instruction = instruction
|
||||
if LAST_RUN in injected_instruction:
|
||||
injected_instruction = injected_instruction.replace(LAST_RUN, json.dumps(last_run))
|
||||
if CURRENT in injected_instruction:
|
||||
injected_instruction = injected_instruction.replace(CURRENT, current or "null")
|
||||
if ERROR_MESSAGE in injected_instruction:
|
||||
injected_instruction = injected_instruction.replace(ERROR_MESSAGE, error_message or "null")
|
||||
# Use unified variable injector
|
||||
variable_providers = {
|
||||
"last_run": lambda: json.dumps(last_run) if last_run else "null",
|
||||
"current": lambda: current or "null",
|
||||
"error_message": lambda: error_message or "null",
|
||||
}
|
||||
|
||||
injected_instruction = LLMGenerator.__inject_variables(
|
||||
instruction=instruction,
|
||||
variable_providers=variable_providers
|
||||
)
|
||||
model_instance = ModelManager().get_model_instance(
|
||||
tenant_id=tenant_id,
|
||||
model_type=ModelType.LLM,
|
||||
@@ -562,3 +570,424 @@ class LLMGenerator:
|
||||
"Failed to invoke LLM model, model: %s", json.dumps(model_config.get("name")), exc_info=True
|
||||
)
|
||||
return {"error": f"An unexpected error occurred: {str(e)}"}
|
||||
|
||||
@staticmethod
|
||||
def update_memory_block(
|
||||
tenant_id: str,
|
||||
visible_history: Sequence[tuple[str, str]],
|
||||
variable_pool: VariablePool,
|
||||
memory_block: MemoryBlock,
|
||||
memory_spec: MemoryBlockSpec
|
||||
) -> str:
|
||||
model_instance = ModelManager().get_model_instance(
|
||||
tenant_id=tenant_id,
|
||||
provider=memory_spec.model.provider,
|
||||
model=memory_spec.model.name,
|
||||
model_type=ModelType.LLM,
|
||||
)
|
||||
formatted_history = ""
|
||||
for sender, message in visible_history:
|
||||
formatted_history += f"{sender}: {message}\n"
|
||||
filled_instruction = variable_pool.convert_template(memory_spec.instruction).text
|
||||
formatted_prompt = PromptTemplateParser(MEMORY_UPDATE_PROMPT).format(
|
||||
inputs={
|
||||
"formatted_history": formatted_history,
|
||||
"current_value": memory_block.value,
|
||||
"instruction": filled_instruction,
|
||||
}
|
||||
)
|
||||
llm_result = model_instance.invoke_llm(
|
||||
prompt_messages=[UserPromptMessage(content=formatted_prompt)],
|
||||
model_parameters=memory_spec.model.completion_params,
|
||||
stream=False,
|
||||
)
|
||||
return llm_result.message.get_text_content()
|
||||
|
||||
@staticmethod
|
||||
def generate_memory_template(
|
||||
tenant_id: str,
|
||||
instruction: str,
|
||||
model_config: dict,
|
||||
) -> dict:
|
||||
"""
|
||||
Generate Memory Template
|
||||
|
||||
Uses MEMORY_TEMPLATE_GENERATION_SYSTEM_PROMPT
|
||||
"""
|
||||
model_instance = ModelManager().get_model_instance(
|
||||
tenant_id=tenant_id,
|
||||
model_type=ModelType.LLM,
|
||||
provider=model_config.get("provider", ""),
|
||||
model=model_config.get("name", ""),
|
||||
)
|
||||
|
||||
prompt_messages: list[PromptMessage] = [
|
||||
SystemPromptMessage(content=MEMORY_TEMPLATE_GENERATION_SYSTEM_PROMPT),
|
||||
UserPromptMessage(content=instruction),
|
||||
]
|
||||
|
||||
try:
|
||||
response = model_instance.invoke_llm(
|
||||
prompt_messages=prompt_messages,
|
||||
model_parameters={"temperature": 0.7},
|
||||
stream=False,
|
||||
)
|
||||
|
||||
generated_template = response.message.get_text_content()
|
||||
return {"template": generated_template}
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Failed to generate memory template")
|
||||
return {"error": f"Failed to generate memory template: {str(e)}"}
|
||||
|
||||
@staticmethod
|
||||
def generate_memory_instruction(
|
||||
tenant_id: str,
|
||||
instruction: str,
|
||||
model_config: dict,
|
||||
) -> dict:
|
||||
"""
|
||||
Generate Memory Instruction
|
||||
|
||||
Uses MEMORY_INSTRUCTION_GENERATION_SYSTEM_PROMPT
|
||||
"""
|
||||
model_instance = ModelManager().get_model_instance(
|
||||
tenant_id=tenant_id,
|
||||
model_type=ModelType.LLM,
|
||||
provider=model_config.get("provider", ""),
|
||||
model=model_config.get("name", ""),
|
||||
)
|
||||
|
||||
prompt_messages: list[PromptMessage] = [
|
||||
SystemPromptMessage(content=MEMORY_INSTRUCTION_GENERATION_SYSTEM_PROMPT),
|
||||
UserPromptMessage(content=instruction),
|
||||
]
|
||||
|
||||
try:
|
||||
response = model_instance.invoke_llm(
|
||||
prompt_messages=prompt_messages,
|
||||
model_parameters={"temperature": 0.7},
|
||||
stream=False,
|
||||
)
|
||||
|
||||
generated_instruction = response.message.get_text_content()
|
||||
return {"instruction": generated_instruction}
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Failed to generate memory instruction")
|
||||
return {"error": f"Failed to generate memory instruction: {str(e)}"}
|
||||
|
||||
@staticmethod
|
||||
def edit_memory_template(
|
||||
tenant_id: str,
|
||||
flow_id: str,
|
||||
node_id: str | None,
|
||||
current: str,
|
||||
instruction: str,
|
||||
model_config: dict,
|
||||
ideal_output: str | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Edit Memory Template
|
||||
|
||||
Supports variable references: {{#history#}}, {{#system_prompt#}}
|
||||
"""
|
||||
# Use unified variable injector
|
||||
variable_providers = {
|
||||
"history": lambda: LLMGenerator.__get_history_json(flow_id, node_id, tenant_id),
|
||||
"system_prompt": lambda: json.dumps(
|
||||
LLMGenerator.__get_system_prompt(flow_id, node_id, tenant_id),
|
||||
ensure_ascii=False
|
||||
),
|
||||
}
|
||||
|
||||
injected_instruction = LLMGenerator.__inject_variables(
|
||||
instruction=instruction,
|
||||
variable_providers=variable_providers
|
||||
)
|
||||
|
||||
model_instance = ModelManager().get_model_instance(
|
||||
tenant_id=tenant_id,
|
||||
model_type=ModelType.LLM,
|
||||
provider=model_config.get("provider", ""),
|
||||
model=model_config.get("name", ""),
|
||||
)
|
||||
|
||||
system_prompt = MEMORY_TEMPLATE_EDIT_SYSTEM_PROMPT
|
||||
user_content = json.dumps({
|
||||
"current_template": current,
|
||||
"instruction": injected_instruction,
|
||||
"ideal_output": ideal_output,
|
||||
})
|
||||
|
||||
prompt_messages: list[PromptMessage] = [
|
||||
SystemPromptMessage(content=system_prompt),
|
||||
UserPromptMessage(content=user_content),
|
||||
]
|
||||
|
||||
try:
|
||||
response = model_instance.invoke_llm(
|
||||
prompt_messages=prompt_messages,
|
||||
model_parameters={"temperature": 0.4},
|
||||
stream=False,
|
||||
)
|
||||
|
||||
generated_raw = response.message.get_text_content()
|
||||
# Extract JSON
|
||||
first_brace = generated_raw.find("{")
|
||||
last_brace = generated_raw.rfind("}")
|
||||
result = json.loads(generated_raw[first_brace : last_brace + 1])
|
||||
|
||||
return {
|
||||
"modified": result.get("modified", ""),
|
||||
"message": result.get("message", "Template updated successfully"),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Failed to edit memory template")
|
||||
return {"error": f"Failed to edit memory template: {str(e)}"}
|
||||
|
||||
@staticmethod
|
||||
def edit_memory_instruction(
|
||||
tenant_id: str,
|
||||
flow_id: str,
|
||||
node_id: str | None,
|
||||
current: str,
|
||||
instruction: str,
|
||||
model_config: dict,
|
||||
ideal_output: str | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Edit Memory Instruction
|
||||
|
||||
Supports variable references: {{#history#}}, {{#system_prompt#}}
|
||||
"""
|
||||
# Use unified variable injector
|
||||
variable_providers = {
|
||||
"history": lambda: LLMGenerator.__get_history_json(flow_id, node_id, tenant_id),
|
||||
"system_prompt": lambda: json.dumps(
|
||||
LLMGenerator.__get_system_prompt(flow_id, node_id, tenant_id),
|
||||
ensure_ascii=False
|
||||
),
|
||||
}
|
||||
|
||||
injected_instruction = LLMGenerator.__inject_variables(
|
||||
instruction=instruction,
|
||||
variable_providers=variable_providers
|
||||
)
|
||||
|
||||
model_instance = ModelManager().get_model_instance(
|
||||
tenant_id=tenant_id,
|
||||
model_type=ModelType.LLM,
|
||||
provider=model_config.get("provider", ""),
|
||||
model=model_config.get("name", ""),
|
||||
)
|
||||
|
||||
system_prompt = MEMORY_INSTRUCTION_EDIT_SYSTEM_PROMPT
|
||||
user_content = json.dumps({
|
||||
"current_instruction": current,
|
||||
"instruction": injected_instruction,
|
||||
"ideal_output": ideal_output,
|
||||
})
|
||||
|
||||
prompt_messages: list[PromptMessage] = [
|
||||
SystemPromptMessage(content=system_prompt),
|
||||
UserPromptMessage(content=user_content),
|
||||
]
|
||||
|
||||
try:
|
||||
response = model_instance.invoke_llm(
|
||||
prompt_messages=prompt_messages,
|
||||
model_parameters={"temperature": 0.4},
|
||||
stream=False,
|
||||
)
|
||||
|
||||
generated_raw = response.message.get_text_content()
|
||||
# Extract JSON
|
||||
first_brace = generated_raw.find("{")
|
||||
last_brace = generated_raw.rfind("}")
|
||||
result = json.loads(generated_raw[first_brace : last_brace + 1])
|
||||
|
||||
return {
|
||||
"modified": result.get("modified", ""),
|
||||
"message": result.get("message", "Instruction updated successfully"),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Failed to edit memory instruction")
|
||||
return {"error": f"Failed to edit memory instruction: {str(e)}"}
|
||||
|
||||
# ==================== Unified variable injector (private method) ====================
|
||||
|
||||
@staticmethod
|
||||
def __inject_variables(
|
||||
instruction: str,
|
||||
variable_providers: Mapping[str, Callable[[], str]]
|
||||
) -> str:
|
||||
"""
|
||||
Unified variable injector (private method)
|
||||
|
||||
Replaces variable placeholders {{#variable_name#}} in instruction with actual values
|
||||
|
||||
Args:
|
||||
instruction: User's original instruction
|
||||
variable_providers: Mapping of variable name -> getter function
|
||||
Example: {"last_run": lambda: json.dumps(data), "history": lambda: get_history()}
|
||||
|
||||
Returns:
|
||||
Instruction with injected variables
|
||||
|
||||
Features:
|
||||
1. Lazy loading: Only calls getter function when placeholder is present
|
||||
2. Fault tolerance: Failure of one variable doesn't affect others
|
||||
3. Extensible: New variables can be added through variable_providers parameter
|
||||
"""
|
||||
injected = instruction
|
||||
|
||||
for var_name, provider_func in variable_providers.items():
|
||||
placeholder = f"{{{{#{var_name}#}}}}"
|
||||
|
||||
if placeholder in injected:
|
||||
try:
|
||||
# Lazy loading: only call when needed
|
||||
value = provider_func()
|
||||
injected = injected.replace(placeholder, value)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to inject variable '%s': %s", var_name, e)
|
||||
# Use default value on failure, don't block the request
|
||||
default_value = "[]" if var_name == "history" else '""'
|
||||
injected = injected.replace(placeholder, default_value)
|
||||
|
||||
return injected
|
||||
|
||||
@staticmethod
|
||||
def __get_history_json(
|
||||
flow_id: str,
|
||||
node_id: str | None,
|
||||
tenant_id: str
|
||||
) -> str:
|
||||
"""
|
||||
Get conversation history as JSON string (private method)
|
||||
|
||||
Args:
|
||||
flow_id: Application ID
|
||||
node_id: Node ID (optional, None indicates APP level)
|
||||
tenant_id: Tenant ID
|
||||
|
||||
Returns:
|
||||
JSON array string in format: [{"role": "user", "content": "..."}, ...]
|
||||
Returns "[]" if no history exists
|
||||
"""
|
||||
from services.chatflow_history_service import ChatflowHistoryService
|
||||
|
||||
app = db.session.query(App).filter_by(id=flow_id).first()
|
||||
if not app:
|
||||
return "[]"
|
||||
|
||||
visible_messages = ChatflowHistoryService.get_latest_chat_history_for_app(
|
||||
app_id=app.id,
|
||||
tenant_id=tenant_id,
|
||||
node_id=node_id or None
|
||||
)
|
||||
|
||||
history_json = [
|
||||
{"role": msg.role.value, "content": msg.content}
|
||||
for msg in visible_messages
|
||||
]
|
||||
|
||||
return json.dumps(history_json, ensure_ascii=False)
|
||||
|
||||
@staticmethod
|
||||
def __get_system_prompt(
|
||||
flow_id: str,
|
||||
node_id: str | None,
|
||||
tenant_id: str
|
||||
) -> str:
|
||||
"""
|
||||
Get system prompt (private method)
|
||||
|
||||
Args:
|
||||
flow_id: Application ID
|
||||
node_id: Node ID (optional)
|
||||
tenant_id: Tenant ID
|
||||
|
||||
Returns:
|
||||
System prompt string, returns "" if none exists
|
||||
"""
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
app = db.session.query(App).filter_by(id=flow_id).first()
|
||||
if not app:
|
||||
return ""
|
||||
|
||||
# Legacy app
|
||||
if app.mode in {"chat", "completion"}:
|
||||
try:
|
||||
app_model_config = app.app_model_config_dict
|
||||
return app_model_config.get("pre_prompt", "")
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
# Workflow app
|
||||
try:
|
||||
workflow = WorkflowService().get_draft_workflow(app_model=app)
|
||||
if not workflow:
|
||||
return ""
|
||||
|
||||
nodes = workflow.graph_dict.get("nodes", [])
|
||||
|
||||
if node_id:
|
||||
# Get system prompt for specified node
|
||||
node = next((n for n in nodes if n["id"] == node_id), None)
|
||||
if not node or node["data"]["type"] not in ["llm", "agent"]:
|
||||
return ""
|
||||
|
||||
prompt_template = node["data"].get("prompt_template")
|
||||
return LLMGenerator.__extract_system_prompt_from_template(prompt_template)
|
||||
else:
|
||||
# APP level: find the main LLM node (connected to END node)
|
||||
edges = workflow.graph_dict.get("edges", [])
|
||||
llm_nodes = [n for n in nodes if n["data"]["type"] in ["llm", "agent"]]
|
||||
|
||||
for edge in edges:
|
||||
if edge.get("target") == "end":
|
||||
source_node = next((n for n in llm_nodes if n["id"] == edge.get("source")), None)
|
||||
if source_node:
|
||||
prompt_template = source_node["data"].get("prompt_template")
|
||||
system_prompt = LLMGenerator.__extract_system_prompt_from_template(prompt_template)
|
||||
if system_prompt:
|
||||
return system_prompt
|
||||
|
||||
# Fallback: return system prompt from first LLM node
|
||||
if llm_nodes:
|
||||
prompt_template = llm_nodes[0]["data"].get("prompt_template")
|
||||
return LLMGenerator.__extract_system_prompt_from_template(prompt_template)
|
||||
|
||||
return ""
|
||||
except Exception as e:
|
||||
logger.warning("Failed to get system prompt: %s", e)
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def __extract_system_prompt_from_template(prompt_template: Any) -> str:
|
||||
"""
|
||||
Extract system prompt from prompt_template (private method)
|
||||
|
||||
Args:
|
||||
prompt_template: LLM node's prompt_template (may be list or dict)
|
||||
|
||||
Returns:
|
||||
System prompt string
|
||||
"""
|
||||
if not prompt_template:
|
||||
return ""
|
||||
|
||||
if isinstance(prompt_template, list):
|
||||
# Chat model: [{"role": "system", "text": "..."}, ...]
|
||||
system_msg = next((m for m in prompt_template if m.get("role") == "system"), None)
|
||||
return system_msg.get("text", "") if system_msg else ""
|
||||
elif isinstance(prompt_template, dict):
|
||||
# Completion model: {"text": "..."}
|
||||
return prompt_template.get("text", "")
|
||||
else:
|
||||
return ""
|
||||
|
||||
@@ -422,3 +422,112 @@ INSTRUCTION_GENERATE_TEMPLATE_PROMPT = """The output of this prompt is not as ex
|
||||
You should edit the prompt according to the IDEAL OUTPUT."""
|
||||
|
||||
INSTRUCTION_GENERATE_TEMPLATE_CODE = """Please fix the errors in the {{#error_message#}}."""
|
||||
|
||||
MEMORY_UPDATE_PROMPT = """
|
||||
Based on the following conversation history, update the memory content:
|
||||
|
||||
Conversation history:
|
||||
{{formatted_history}}
|
||||
|
||||
Current memory:
|
||||
{{current_value}}
|
||||
|
||||
Update instruction:
|
||||
{{instruction}}
|
||||
|
||||
Please output only the updated memory content, no other text like greeting:
|
||||
"""
|
||||
|
||||
MEMORY_TEMPLATE_GENERATION_SYSTEM_PROMPT = """
|
||||
You are a helpful assistant designed to extract structured template information from a long-term conversation. Your task is to generate a concise and complete MemoryBlock template based on the underlying purpose of the conversation.
|
||||
|
||||
Each MemoryBlock represents a reusable schema that captures the key elements relevant to a specific task or goal (e.g., planning a trip, conducting a job interview, writing a blog post, etc.).
|
||||
|
||||
When generating a template:
|
||||
1. Analyze the overall goal or purpose of the conversation described in the user's instruction.
|
||||
2. Identify essential information categories that would be relevant to track.
|
||||
3. Structure the template using Markdown format with clear sections and fields.
|
||||
4. Do not fill in actual user data — only describe the structure and purpose of each field.
|
||||
5. Be general enough to be reusable, but specific enough to serve the user's intent.
|
||||
|
||||
Respond with only the template in Markdown format, with no additional explanation.
|
||||
|
||||
Example format:
|
||||
# [Template Name]
|
||||
|
||||
## Section 1
|
||||
- **Field 1:** Description of what should be captured here
|
||||
- **Field 2:** Description of what should be captured here
|
||||
|
||||
## Section 2
|
||||
- **Field 3:** Description of what should be captured here
|
||||
""" # noqa: E501
|
||||
|
||||
MEMORY_INSTRUCTION_GENERATION_SYSTEM_PROMPT = """
|
||||
You are a prompt generation model.
|
||||
|
||||
Your task is to generate an instruction for a downstream language model tasked with extracting structured memory blocks (MemoryBlock) from long, multi-turn conversations between a user and an assistant.
|
||||
|
||||
The downstream model will receive:
|
||||
- A template describing the structure and fields of the memory block it should extract.
|
||||
- The historical conversation, serialized as plain text with message tags.
|
||||
- Optional context including the assistant's system prompt.
|
||||
|
||||
You must generate a clear, specific, and instructional prompt that:
|
||||
1. Explains what a MemoryBlock is and its purpose.
|
||||
2. Instructs the model to extract only information relevant to the template fields.
|
||||
3. Emphasizes handling implicit information and scattered mentions across multiple turns.
|
||||
4. Instructs to ignore irrelevant or casual dialogue.
|
||||
5. Describes the expected output format (structured object matching the template).
|
||||
6. Uses placeholders {{#history#}} and {{#system_prompt#}} for runtime variable injection.
|
||||
|
||||
The tone should be concise, instructional, and focused on task precision.
|
||||
|
||||
Based on the user's description of the conversation context, generate the ideal extraction instruction.
|
||||
""" # noqa: E501
|
||||
|
||||
MEMORY_TEMPLATE_EDIT_SYSTEM_PROMPT = """
|
||||
You are an expert at refining memory templates for conversation tracking systems.
|
||||
|
||||
You will receive:
|
||||
- current_template: The existing memory template
|
||||
- instruction: User's instruction for how to modify the template (may include {{#history#}} and {{#system_prompt#}} references)
|
||||
- ideal_output: Optional description of the desired result
|
||||
|
||||
Your task:
|
||||
1. Analyze the current template structure
|
||||
2. Apply the requested modifications based on the instruction
|
||||
3. Ensure the modified template maintains proper Markdown structure
|
||||
4. Keep field descriptions clear and actionable
|
||||
|
||||
Output format (JSON):
|
||||
{
|
||||
"modified": "<the updated template in Markdown format>",
|
||||
"message": "<brief explanation of changes made>"
|
||||
}
|
||||
|
||||
Only output the JSON, no additional text.
|
||||
""" # noqa: E501
|
||||
|
||||
MEMORY_INSTRUCTION_EDIT_SYSTEM_PROMPT = """
|
||||
You are an expert at refining extraction instructions for memory systems.
|
||||
|
||||
You will receive:
|
||||
- current_instruction: The existing extraction instruction
|
||||
- instruction: User's instruction for how to improve it (may include {{#history#}} and {{#system_prompt#}} references)
|
||||
- ideal_output: Optional description of the desired result
|
||||
|
||||
Your task:
|
||||
1. Analyze the current instruction's effectiveness
|
||||
2. Apply the requested improvements based on the user's instruction
|
||||
3. Ensure the modified instruction is clear, specific, and actionable
|
||||
4. Maintain focus on structured extraction matching the template
|
||||
|
||||
Output format (JSON):
|
||||
{
|
||||
"modified": "<the improved instruction>",
|
||||
"message": "<brief explanation of improvements>"
|
||||
}
|
||||
|
||||
Only output the JSON, no additional text.
|
||||
"""
|
||||
|
||||
133
api/core/memory/entities.py
Normal file
133
api/core/memory/entities.py
Normal file
@@ -0,0 +1,133 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import StrEnum
|
||||
from typing import Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
from core.app.app_config.entities import ModelConfig
|
||||
|
||||
|
||||
class MemoryScope(StrEnum):
|
||||
"""Memory scope determined by node_id field"""
|
||||
APP = "app" # node_id is None
|
||||
NODE = "node" # node_id is not None
|
||||
|
||||
|
||||
class MemoryTerm(StrEnum):
|
||||
"""Memory term determined by conversation_id field"""
|
||||
SESSION = "session" # conversation_id is not None
|
||||
PERSISTENT = "persistent" # conversation_id is None
|
||||
|
||||
|
||||
class MemoryStrategy(StrEnum):
|
||||
ON_TURNS = "on_turns"
|
||||
|
||||
|
||||
class MemoryScheduleMode(StrEnum):
|
||||
SYNC = "sync"
|
||||
ASYNC = "async"
|
||||
|
||||
|
||||
class MemoryBlockSpec(BaseModel):
|
||||
"""Memory block specification for workflow configuration"""
|
||||
id: str = Field(
|
||||
default_factory=lambda: str(uuid4()),
|
||||
description="Unique identifier for the memory block",
|
||||
)
|
||||
name: str = Field(description="Display name of the memory block")
|
||||
description: str = Field(default="", description="Description of the memory block")
|
||||
template: str = Field(description="Initial template content for the memory")
|
||||
instruction: str = Field(description="Instructions for updating the memory")
|
||||
scope: MemoryScope = Field(description="Scope of the memory (app or node level)")
|
||||
term: MemoryTerm = Field(description="Term of the memory (session or persistent)")
|
||||
strategy: MemoryStrategy = Field(description="Update strategy for the memory")
|
||||
update_turns: int = Field(gt=0, description="Number of turns between updates")
|
||||
preserved_turns: int = Field(gt=0, description="Number of conversation turns to preserve")
|
||||
schedule_mode: MemoryScheduleMode = Field(description="Synchronous or asynchronous update mode")
|
||||
model: ModelConfig = Field(description="Model configuration for memory updates")
|
||||
end_user_visible: bool = Field(default=False, description="Whether memory is visible to end users")
|
||||
end_user_editable: bool = Field(default=False, description="Whether memory is editable by end users")
|
||||
node_id: str | None = Field(
|
||||
default=None,
|
||||
description="Node ID when scope is NODE. Must be None when scope is APP."
|
||||
)
|
||||
|
||||
@field_validator('node_id')
|
||||
@classmethod
|
||||
def validate_node_id_with_scope(cls, v: str | None, info) -> str | None:
|
||||
"""Validate node_id consistency with scope"""
|
||||
scope = info.data.get('scope')
|
||||
if scope == MemoryScope.NODE and v is None:
|
||||
raise ValueError("node_id is required when scope is NODE")
|
||||
if scope == MemoryScope.APP and v is not None:
|
||||
raise ValueError("node_id must be None when scope is APP")
|
||||
return v
|
||||
|
||||
|
||||
class MemoryCreatedBy(BaseModel):
|
||||
end_user_id: str | None = None
|
||||
account_id: str | None = None
|
||||
|
||||
|
||||
class MemoryBlock(BaseModel):
|
||||
"""Runtime memory block instance
|
||||
|
||||
Design Rules:
|
||||
- app_id = None: Global memory (future feature, not implemented yet)
|
||||
- app_id = str: App-specific memory
|
||||
- conversation_id = None: Persistent memory (cross-conversation)
|
||||
- conversation_id = str: Session memory (conversation-specific)
|
||||
- node_id = None: App-level scope
|
||||
- node_id = str: Node-level scope
|
||||
|
||||
These rules implicitly determine scope and term without redundant storage.
|
||||
"""
|
||||
spec: MemoryBlockSpec
|
||||
tenant_id: str
|
||||
value: str
|
||||
app_id: str
|
||||
conversation_id: Optional[str] = None
|
||||
node_id: Optional[str] = None
|
||||
edited_by_user: bool = False
|
||||
created_by: MemoryCreatedBy
|
||||
version: int = Field(description="Memory block version number")
|
||||
|
||||
|
||||
class MemoryValueData(BaseModel):
|
||||
value: str
|
||||
edited_by_user: bool = False
|
||||
|
||||
|
||||
class ChatflowConversationMetadata(BaseModel):
|
||||
"""Metadata for chatflow conversation with visible message count"""
|
||||
type: str = "mutable_visible_window"
|
||||
visible_count: int = Field(gt=0, description="Number of visible messages to keep")
|
||||
|
||||
|
||||
class MemoryBlockWithConversation(MemoryBlock):
|
||||
"""MemoryBlock with optional conversation metadata for session memories"""
|
||||
conversation_metadata: ChatflowConversationMetadata = Field(
|
||||
description="Conversation metadata, only present for session memories"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_memory_block(
|
||||
cls,
|
||||
memory_block: MemoryBlock,
|
||||
conversation_metadata: ChatflowConversationMetadata
|
||||
) -> MemoryBlockWithConversation:
|
||||
"""Create MemoryBlockWithConversation from MemoryBlock"""
|
||||
return cls(
|
||||
spec=memory_block.spec,
|
||||
tenant_id=memory_block.tenant_id,
|
||||
value=memory_block.value,
|
||||
app_id=memory_block.app_id,
|
||||
conversation_id=memory_block.conversation_id,
|
||||
node_id=memory_block.node_id,
|
||||
edited_by_user=memory_block.edited_by_user,
|
||||
created_by=memory_block.created_by,
|
||||
version=memory_block.version,
|
||||
conversation_metadata=conversation_metadata
|
||||
)
|
||||
6
api/core/memory/errors.py
Normal file
6
api/core/memory/errors.py
Normal file
@@ -0,0 +1,6 @@
|
||||
class MemorySyncTimeoutError(Exception):
|
||||
def __init__(self, app_id: str, conversation_id: str):
|
||||
self.app_id = app_id
|
||||
self.conversation_id = conversation_id
|
||||
self.message = "Memory synchronization timeout after 50 seconds"
|
||||
super().__init__(self.message)
|
||||
@@ -44,7 +44,7 @@ class MemoryConfig(BaseModel):
|
||||
|
||||
enabled: bool
|
||||
size: int | None = None
|
||||
|
||||
mode: Literal["linear", "block"] | None = "linear"
|
||||
role_prefix: RolePrefix | None = None
|
||||
window: WindowConfig
|
||||
query_prompt_template: str | None = None
|
||||
|
||||
@@ -203,6 +203,48 @@ class ArrayFileSegment(ArraySegment):
|
||||
return ""
|
||||
|
||||
|
||||
class VersionedMemoryValue(BaseModel):
|
||||
current_value: str = None # type: ignore
|
||||
versions: Mapping[str, str] = {}
|
||||
|
||||
model_config = ConfigDict(frozen=True)
|
||||
|
||||
def add_version(
|
||||
self,
|
||||
new_value: str,
|
||||
version_name: str | None = None
|
||||
) -> "VersionedMemoryValue":
|
||||
if version_name is None:
|
||||
version_name = str(len(self.versions) + 1)
|
||||
if version_name in self.versions:
|
||||
raise ValueError(f"Version '{version_name}' already exists.")
|
||||
self.current_value = new_value
|
||||
return VersionedMemoryValue(
|
||||
current_value=new_value,
|
||||
versions={
|
||||
version_name: new_value,
|
||||
**self.versions,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class VersionedMemorySegment(Segment):
|
||||
value_type: SegmentType = SegmentType.VERSIONED_MEMORY
|
||||
value: VersionedMemoryValue = None # type: ignore
|
||||
|
||||
@property
|
||||
def text(self) -> str:
|
||||
return self.value.current_value
|
||||
|
||||
@property
|
||||
def log(self) -> str:
|
||||
return self.value.current_value
|
||||
|
||||
@property
|
||||
def markdown(self) -> str:
|
||||
return self.value.current_value
|
||||
|
||||
|
||||
class ArrayBooleanSegment(ArraySegment):
|
||||
value_type: SegmentType = SegmentType.ARRAY_BOOLEAN
|
||||
value: Sequence[bool]
|
||||
@@ -248,6 +290,7 @@ SegmentUnion: TypeAlias = Annotated[
|
||||
| Annotated[ArrayObjectSegment, Tag(SegmentType.ARRAY_OBJECT)]
|
||||
| Annotated[ArrayFileSegment, Tag(SegmentType.ARRAY_FILE)]
|
||||
| Annotated[ArrayBooleanSegment, Tag(SegmentType.ARRAY_BOOLEAN)]
|
||||
| Annotated[VersionedMemorySegment, Tag(SegmentType.VERSIONED_MEMORY)]
|
||||
),
|
||||
Discriminator(get_segment_discriminator),
|
||||
]
|
||||
|
||||
@@ -44,6 +44,8 @@ class SegmentType(StrEnum):
|
||||
ARRAY_FILE = "array[file]"
|
||||
ARRAY_BOOLEAN = "array[boolean]"
|
||||
|
||||
VERSIONED_MEMORY = "versioned_memory"
|
||||
|
||||
NONE = "none"
|
||||
|
||||
GROUP = "group"
|
||||
|
||||
@@ -22,6 +22,7 @@ from .segments import (
|
||||
ObjectSegment,
|
||||
Segment,
|
||||
StringSegment,
|
||||
VersionedMemorySegment,
|
||||
get_segment_discriminator,
|
||||
)
|
||||
from .types import SegmentType
|
||||
@@ -106,6 +107,10 @@ class ArrayFileVariable(ArrayFileSegment, ArrayVariable):
|
||||
pass
|
||||
|
||||
|
||||
class VersionedMemoryVariable(VersionedMemorySegment, Variable):
|
||||
pass
|
||||
|
||||
|
||||
class ArrayBooleanVariable(ArrayBooleanSegment, ArrayVariable):
|
||||
pass
|
||||
|
||||
@@ -161,6 +166,7 @@ VariableUnion: TypeAlias = Annotated[
|
||||
| Annotated[ArrayFileVariable, Tag(SegmentType.ARRAY_FILE)]
|
||||
| Annotated[ArrayBooleanVariable, Tag(SegmentType.ARRAY_BOOLEAN)]
|
||||
| Annotated[SecretVariable, Tag(SegmentType.SECRET)]
|
||||
| Annotated[VersionedMemoryVariable, Tag(SegmentType.VERSIONED_MEMORY)]
|
||||
),
|
||||
Discriminator(get_segment_discriminator),
|
||||
]
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
SYSTEM_VARIABLE_NODE_ID = "sys"
|
||||
ENVIRONMENT_VARIABLE_NODE_ID = "env"
|
||||
CONVERSATION_VARIABLE_NODE_ID = "conversation"
|
||||
MEMORY_BLOCK_VARIABLE_NODE_ID = "memory_block"
|
||||
RAG_PIPELINE_VARIABLE_NODE_ID = "rag"
|
||||
|
||||
@@ -7,11 +7,15 @@ import time
|
||||
from collections.abc import Generator, Mapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any, Literal
|
||||
|
||||
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom, ModelConfigWithCredentialsEntity
|
||||
from core.file import FileType, file_manager
|
||||
from core.helper.code_executor import CodeExecutor, CodeLanguage
|
||||
from core.llm_generator.output_parser.errors import OutputParserError
|
||||
from core.llm_generator.output_parser.structured_output import invoke_llm_with_structured_output
|
||||
from core.memory.entities import MemoryCreatedBy, MemoryScope
|
||||
from core.memory.token_buffer_memory import TokenBufferMemory
|
||||
from core.model_manager import ModelInstance, ModelManager
|
||||
from core.model_runtime.entities import (
|
||||
@@ -73,6 +77,8 @@ from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig, Variabl
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.base.variable_template_parser import VariableTemplateParser
|
||||
from core.workflow.runtime import VariablePool
|
||||
from models import UserFrom, Workflow
|
||||
from models.engine import db
|
||||
|
||||
from . import llm_utils
|
||||
from .entities import (
|
||||
@@ -317,6 +323,11 @@ class LLMNode(Node):
|
||||
if self._file_outputs:
|
||||
outputs["files"] = ArrayFileSegment(value=self._file_outputs)
|
||||
|
||||
try:
|
||||
self._handle_chatflow_memory(result_text, variable_pool)
|
||||
except Exception as e:
|
||||
logger.warning("Memory orchestration failed for node %s: %s", self.node_id, str(e))
|
||||
|
||||
# Send final chunk event to indicate streaming is complete
|
||||
yield StreamChunkEvent(
|
||||
selector=[self._node_id, "text"],
|
||||
@@ -1228,6 +1239,79 @@ class LLMNode(Node):
|
||||
def retry(self) -> bool:
|
||||
return self._node_data.retry_config.retry_enabled
|
||||
|
||||
def _handle_chatflow_memory(self, llm_output: str, variable_pool: VariablePool):
|
||||
if not self._node_data.memory or self._node_data.memory.mode != "block":
|
||||
return
|
||||
|
||||
conversation_id_segment = variable_pool.get((SYSTEM_VARIABLE_NODE_ID, SystemVariableKey.CONVERSATION_ID))
|
||||
if not conversation_id_segment:
|
||||
raise ValueError("Conversation ID not found in variable pool.")
|
||||
conversation_id = conversation_id_segment.text
|
||||
|
||||
user_query_segment = variable_pool.get((SYSTEM_VARIABLE_NODE_ID, SystemVariableKey.QUERY))
|
||||
if not user_query_segment:
|
||||
raise ValueError("User query not found in variable pool.")
|
||||
user_query = user_query_segment.text
|
||||
|
||||
from core.model_runtime.entities.message_entities import AssistantPromptMessage, UserPromptMessage
|
||||
from services.chatflow_history_service import ChatflowHistoryService
|
||||
|
||||
ChatflowHistoryService.save_node_message(
|
||||
prompt_message=(UserPromptMessage(content=user_query)),
|
||||
node_id=self.node_id,
|
||||
conversation_id=conversation_id,
|
||||
app_id=self.app_id,
|
||||
tenant_id=self.tenant_id
|
||||
)
|
||||
ChatflowHistoryService.save_node_message(
|
||||
prompt_message=(AssistantPromptMessage(content=llm_output)),
|
||||
node_id=self.node_id,
|
||||
conversation_id=conversation_id,
|
||||
app_id=self.app_id,
|
||||
tenant_id=self.tenant_id
|
||||
)
|
||||
|
||||
# FIXME: This is dirty workaround and may cause incorrect resolution for workflow version
|
||||
with Session(db.engine) as session:
|
||||
stmt = select(Workflow).where(
|
||||
Workflow.tenant_id == self.tenant_id,
|
||||
Workflow.app_id == self.app_id
|
||||
)
|
||||
workflow = session.scalars(stmt).first()
|
||||
if not workflow:
|
||||
raise ValueError("Workflow not found.")
|
||||
|
||||
# Filter memory blocks that belong to this node
|
||||
node_memory_blocks = [
|
||||
block for block in workflow.memory_blocks
|
||||
if block.scope == MemoryScope.NODE and block.node_id == self.id
|
||||
]
|
||||
|
||||
if not node_memory_blocks:
|
||||
return
|
||||
|
||||
# Update each memory block that belongs to this node
|
||||
is_draft = (self.invoke_from == InvokeFrom.DEBUGGER)
|
||||
from services.chatflow_memory_service import ChatflowMemoryService
|
||||
|
||||
for memory_block_spec in node_memory_blocks:
|
||||
ChatflowMemoryService.update_node_memory_if_needed(
|
||||
tenant_id=self.tenant_id,
|
||||
app_id=self.app_id,
|
||||
node_id=self.id,
|
||||
conversation_id=conversation_id,
|
||||
memory_block_spec=memory_block_spec,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft,
|
||||
created_by=self._get_user_from_context()
|
||||
)
|
||||
|
||||
def _get_user_from_context(self) -> MemoryCreatedBy:
|
||||
if self.user_from == UserFrom.ACCOUNT:
|
||||
return MemoryCreatedBy(account_id=self.user_id)
|
||||
else:
|
||||
return MemoryCreatedBy(end_user_id=self.user_id)
|
||||
|
||||
|
||||
def _combine_message_content_with_role(
|
||||
*, contents: str | list[PromptMessageContentUnionTypes] | None = None, role: PromptMessageRole
|
||||
|
||||
@@ -9,11 +9,12 @@ from pydantic import BaseModel, Field
|
||||
from core.file import File, FileAttribute, file_manager
|
||||
from core.variables import Segment, SegmentGroup, Variable
|
||||
from core.variables.consts import SELECTORS_LENGTH
|
||||
from core.variables.segments import FileSegment, ObjectSegment
|
||||
from core.variables.variables import RAGPipelineVariableInput, VariableUnion
|
||||
from core.variables.segments import FileSegment, ObjectSegment, VersionedMemoryValue
|
||||
from core.variables.variables import RAGPipelineVariableInput, VariableUnion, VersionedMemoryVariable
|
||||
from core.workflow.constants import (
|
||||
CONVERSATION_VARIABLE_NODE_ID,
|
||||
ENVIRONMENT_VARIABLE_NODE_ID,
|
||||
MEMORY_BLOCK_VARIABLE_NODE_ID,
|
||||
RAG_PIPELINE_VARIABLE_NODE_ID,
|
||||
SYSTEM_VARIABLE_NODE_ID,
|
||||
)
|
||||
@@ -56,6 +57,10 @@ class VariablePool(BaseModel):
|
||||
description="RAG pipeline variables.",
|
||||
default_factory=list,
|
||||
)
|
||||
memory_blocks: Mapping[str, str] = Field(
|
||||
description="Memory blocks.",
|
||||
default_factory=dict,
|
||||
)
|
||||
|
||||
def model_post_init(self, context: Any, /):
|
||||
# Create a mapping from field names to SystemVariableKey enum values
|
||||
@@ -76,6 +81,18 @@ class VariablePool(BaseModel):
|
||||
rag_pipeline_variables_map[node_id][key] = value
|
||||
for key, value in rag_pipeline_variables_map.items():
|
||||
self.add((RAG_PIPELINE_VARIABLE_NODE_ID, key), value)
|
||||
# Add memory blocks to the variable pool
|
||||
for memory_id, memory_value in self.memory_blocks.items():
|
||||
self.add(
|
||||
[MEMORY_BLOCK_VARIABLE_NODE_ID, memory_id],
|
||||
VersionedMemoryVariable(
|
||||
value=VersionedMemoryValue(
|
||||
current_value=memory_value,
|
||||
versions={"1": memory_value},
|
||||
),
|
||||
name=memory_id,
|
||||
)
|
||||
)
|
||||
|
||||
def add(self, selector: Sequence[str], value: Any, /):
|
||||
"""
|
||||
|
||||
@@ -21,6 +21,8 @@ from core.variables.segments import (
|
||||
ObjectSegment,
|
||||
Segment,
|
||||
StringSegment,
|
||||
VersionedMemorySegment,
|
||||
VersionedMemoryValue,
|
||||
)
|
||||
from core.variables.types import SegmentType
|
||||
from core.variables.variables import (
|
||||
@@ -39,6 +41,7 @@ from core.variables.variables import (
|
||||
SecretVariable,
|
||||
StringVariable,
|
||||
Variable,
|
||||
VersionedMemoryVariable,
|
||||
)
|
||||
from core.workflow.constants import (
|
||||
CONVERSATION_VARIABLE_NODE_ID,
|
||||
@@ -69,6 +72,7 @@ SEGMENT_TO_VARIABLE_MAP = {
|
||||
NoneSegment: NoneVariable,
|
||||
ObjectSegment: ObjectVariable,
|
||||
StringSegment: StringVariable,
|
||||
VersionedMemorySegment: VersionedMemoryVariable
|
||||
}
|
||||
|
||||
|
||||
@@ -193,6 +197,7 @@ _segment_factory: Mapping[SegmentType, type[Segment]] = {
|
||||
SegmentType.FILE: FileSegment,
|
||||
SegmentType.BOOLEAN: BooleanSegment,
|
||||
SegmentType.OBJECT: ObjectSegment,
|
||||
SegmentType.VERSIONED_MEMORY: VersionedMemorySegment,
|
||||
# Array types
|
||||
SegmentType.ARRAY_ANY: ArrayAnySegment,
|
||||
SegmentType.ARRAY_STRING: ArrayStringSegment,
|
||||
@@ -259,6 +264,12 @@ def build_segment_with_type(segment_type: SegmentType, value: Any) -> Segment:
|
||||
else:
|
||||
raise TypeMismatchError(f"Type mismatch: expected {segment_type}, but got empty list")
|
||||
|
||||
if segment_type == SegmentType.VERSIONED_MEMORY:
|
||||
return VersionedMemorySegment(
|
||||
value_type=segment_type,
|
||||
value=VersionedMemoryValue.model_validate(value)
|
||||
)
|
||||
|
||||
inferred_type = SegmentType.infer_segment_type(value)
|
||||
# Type compatibility checking
|
||||
if inferred_type is None:
|
||||
|
||||
@@ -49,6 +49,31 @@ conversation_variable_fields = {
|
||||
"description": fields.String,
|
||||
}
|
||||
|
||||
model_config_fields = {
|
||||
"provider": fields.String,
|
||||
"name": fields.String,
|
||||
"mode": fields.String,
|
||||
"completion_params": fields.Raw,
|
||||
}
|
||||
|
||||
memory_block_fields = {
|
||||
"id": fields.String,
|
||||
"name": fields.String,
|
||||
"description": fields.String,
|
||||
"template": fields.String,
|
||||
"instruction": fields.String,
|
||||
"scope": fields.String,
|
||||
"term": fields.String,
|
||||
"strategy": fields.String,
|
||||
"update_turns": fields.Integer,
|
||||
"preserved_turns": fields.Integer,
|
||||
"schedule_mode": fields.String,
|
||||
"model": fields.Nested(model_config_fields),
|
||||
"end_user_visible": fields.Boolean,
|
||||
"end_user_editable": fields.Boolean,
|
||||
"node_id": fields.String,
|
||||
}
|
||||
|
||||
pipeline_variable_fields = {
|
||||
"label": fields.String,
|
||||
"variable": fields.String,
|
||||
@@ -81,6 +106,7 @@ workflow_fields = {
|
||||
"tool_published": fields.Boolean,
|
||||
"environment_variables": fields.List(EnvironmentVariableField()),
|
||||
"conversation_variables": fields.List(fields.Nested(conversation_variable_fields)),
|
||||
"memory_blocks": fields.Nested(memory_block_fields),
|
||||
"rag_pipeline_variables": fields.List(fields.Nested(pipeline_variable_fields)),
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from .account import (
|
||||
TenantStatus,
|
||||
)
|
||||
from .api_based_extension import APIBasedExtension, APIBasedExtensionPoint
|
||||
from .chatflow_memory import ChatflowConversation, ChatflowMemoryVariable, ChatflowMessage
|
||||
from .dataset import (
|
||||
AppDatasetJoin,
|
||||
Dataset,
|
||||
@@ -129,6 +130,9 @@ __all__ = [
|
||||
"BuiltinToolProvider",
|
||||
"CeleryTask",
|
||||
"CeleryTaskSet",
|
||||
"ChatflowConversation",
|
||||
"ChatflowMemoryVariable",
|
||||
"ChatflowMessage",
|
||||
"Conversation",
|
||||
"ConversationVariable",
|
||||
"CreatorUserRole",
|
||||
|
||||
76
api/models/chatflow_memory.py
Normal file
76
api/models/chatflow_memory.py
Normal file
@@ -0,0 +1,76 @@
|
||||
from datetime import datetime
|
||||
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import DateTime, func
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from .base import Base
|
||||
from .types import StringUUID
|
||||
|
||||
|
||||
class ChatflowMemoryVariable(Base):
|
||||
__tablename__ = "chatflow_memory_variables"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="chatflow_memory_variables_pkey"),
|
||||
sa.Index("chatflow_memory_variables_memory_id_idx", "tenant_id", "app_id", "node_id", "memory_id"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"))
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
app_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
|
||||
conversation_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
|
||||
node_id: Mapped[str | None] = mapped_column(sa.Text, nullable=True)
|
||||
memory_id: Mapped[str] = mapped_column(sa.Text, nullable=False)
|
||||
value: Mapped[str] = mapped_column(sa.Text, nullable=False)
|
||||
name: Mapped[str] = mapped_column(sa.Text, nullable=False)
|
||||
scope: Mapped[str] = mapped_column(sa.String(10), nullable=False) # 'app' or 'node'
|
||||
term: Mapped[str] = mapped_column(sa.String(20), nullable=False) # 'session' or 'persistent'
|
||||
version: Mapped[int] = mapped_column(sa.Integer, nullable=False, default=1)
|
||||
created_by_role: Mapped[str] = mapped_column(sa.String(20)) # 'end_user' or 'account`
|
||||
created_by: Mapped[str] = mapped_column(StringUUID)
|
||||
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
|
||||
)
|
||||
|
||||
|
||||
class ChatflowConversation(Base):
|
||||
__tablename__ = "chatflow_conversations"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="chatflow_conversations_pkey"),
|
||||
sa.Index(
|
||||
"chatflow_conversations_original_conversation_id_idx",
|
||||
"tenant_id", "app_id", "node_id", "original_conversation_id"
|
||||
),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"))
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
node_id: Mapped[str | None] = mapped_column(sa.Text, nullable=True)
|
||||
original_conversation_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
|
||||
conversation_metadata: Mapped[str] = mapped_column(sa.Text, nullable=False) # JSON
|
||||
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
|
||||
)
|
||||
|
||||
|
||||
class ChatflowMessage(Base):
|
||||
__tablename__ = "chatflow_messages"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="chatflow_messages_pkey"),
|
||||
sa.Index("chatflow_messages_version_idx", "conversation_id", "index", "version"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"))
|
||||
conversation_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
index: Mapped[int] = mapped_column(sa.Integer, nullable=False) # This index starts from 0
|
||||
version: Mapped[int] = mapped_column(sa.Integer, nullable=False)
|
||||
data: Mapped[str] = mapped_column(sa.Text, nullable=False) # Serialized PromptMessage JSON
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
|
||||
)
|
||||
@@ -28,6 +28,7 @@ class DraftVariableType(StrEnum):
|
||||
NODE = "node"
|
||||
SYS = "sys"
|
||||
CONVERSATION = "conversation"
|
||||
MEMORY_BLOCK = "memory_block"
|
||||
|
||||
|
||||
class MessageStatus(StrEnum):
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from collections.abc import Generator, Mapping, Sequence
|
||||
from datetime import datetime
|
||||
from enum import StrEnum
|
||||
@@ -23,10 +24,13 @@ from sqlalchemy.orm import Mapped, declared_attr, mapped_column
|
||||
|
||||
from core.file.constants import maybe_file_object
|
||||
from core.file.models import File
|
||||
from core.memory.entities import MemoryBlockSpec
|
||||
from core.variables import utils as variable_utils
|
||||
from core.variables.segments import VersionedMemoryValue
|
||||
from core.variables.variables import FloatVariable, IntegerVariable, StringVariable
|
||||
from core.workflow.constants import (
|
||||
CONVERSATION_VARIABLE_NODE_ID,
|
||||
MEMORY_BLOCK_VARIABLE_NODE_ID,
|
||||
SYSTEM_VARIABLE_NODE_ID,
|
||||
)
|
||||
from core.workflow.enums import NodeType
|
||||
@@ -161,6 +165,9 @@ class Workflow(Base):
|
||||
_rag_pipeline_variables: Mapped[str] = mapped_column(
|
||||
"rag_pipeline_variables", LongText, nullable=False, default="{}"
|
||||
)
|
||||
_memory_blocks: Mapped[str] = mapped_column(
|
||||
"memory_blocks", sa.Text, nullable=False, server_default="[]"
|
||||
)
|
||||
|
||||
VERSION_DRAFT = "draft"
|
||||
|
||||
@@ -178,6 +185,7 @@ class Workflow(Base):
|
||||
environment_variables: Sequence[Variable],
|
||||
conversation_variables: Sequence[Variable],
|
||||
rag_pipeline_variables: list[dict],
|
||||
memory_blocks: Sequence[MemoryBlockSpec] | None = None,
|
||||
marked_name: str = "",
|
||||
marked_comment: str = "",
|
||||
) -> "Workflow":
|
||||
@@ -193,6 +201,7 @@ class Workflow(Base):
|
||||
workflow.environment_variables = environment_variables or []
|
||||
workflow.conversation_variables = conversation_variables or []
|
||||
workflow.rag_pipeline_variables = rag_pipeline_variables or []
|
||||
workflow.memory_blocks = memory_blocks or []
|
||||
workflow.marked_name = marked_name
|
||||
workflow.marked_comment = marked_comment
|
||||
workflow.created_at = naive_utc_now()
|
||||
@@ -509,7 +518,7 @@ class Workflow(Base):
|
||||
"features": self.features_dict,
|
||||
"environment_variables": [var.model_dump(mode="json") for var in environment_variables],
|
||||
"conversation_variables": [var.model_dump(mode="json") for var in self.conversation_variables],
|
||||
"rag_pipeline_variables": self.rag_pipeline_variables,
|
||||
"memory_blocks": [block.model_dump(mode="json") for block in self.memory_blocks],
|
||||
}
|
||||
return result
|
||||
|
||||
@@ -547,6 +556,27 @@ class Workflow(Base):
|
||||
ensure_ascii=False,
|
||||
)
|
||||
|
||||
@property
|
||||
def memory_blocks(self) -> Sequence[MemoryBlockSpec]:
|
||||
"""Memory blocks configuration stored in database"""
|
||||
if self._memory_blocks is None or self._memory_blocks == "":
|
||||
self._memory_blocks = "[]"
|
||||
|
||||
memory_blocks_list: list[dict[str, Any]] = json.loads(self._memory_blocks)
|
||||
results = [MemoryBlockSpec.model_validate(config) for config in memory_blocks_list]
|
||||
return results
|
||||
|
||||
@memory_blocks.setter
|
||||
def memory_blocks(self, value: Sequence[MemoryBlockSpec]):
|
||||
if not value:
|
||||
self._memory_blocks = "[]"
|
||||
return
|
||||
|
||||
self._memory_blocks = json.dumps(
|
||||
[block.model_dump() for block in value],
|
||||
ensure_ascii=False,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def version_from_datetime(d: datetime) -> str:
|
||||
return str(d)
|
||||
@@ -1570,6 +1600,31 @@ class WorkflowDraftVariable(Base):
|
||||
variable.editable = editable
|
||||
return variable
|
||||
|
||||
@staticmethod
|
||||
def new_memory_block_variable(
|
||||
*,
|
||||
app_id: str,
|
||||
node_id: str | None = None,
|
||||
memory_id: str,
|
||||
name: str,
|
||||
value: VersionedMemoryValue,
|
||||
description: str = "",
|
||||
) -> "WorkflowDraftVariable":
|
||||
"""Create a new memory block draft variable."""
|
||||
return WorkflowDraftVariable(
|
||||
id=str(uuid.uuid4()),
|
||||
app_id=app_id,
|
||||
node_id=MEMORY_BLOCK_VARIABLE_NODE_ID,
|
||||
name=name,
|
||||
value=value.model_dump_json(),
|
||||
description=description,
|
||||
selector=[MEMORY_BLOCK_VARIABLE_NODE_ID, memory_id] if node_id is None else
|
||||
[MEMORY_BLOCK_VARIABLE_NODE_ID, memory_id, node_id],
|
||||
value_type=SegmentType.VERSIONED_MEMORY,
|
||||
visible=True,
|
||||
editable=True,
|
||||
)
|
||||
|
||||
@property
|
||||
def edited(self):
|
||||
return self.last_edited_at is not None
|
||||
|
||||
289
api/services/chatflow_history_service.py
Normal file
289
api/services/chatflow_history_service.py
Normal file
@@ -0,0 +1,289 @@
|
||||
import json
|
||||
from collections.abc import MutableMapping, Sequence
|
||||
from typing import Literal, Optional, overload
|
||||
|
||||
from sqlalchemy import Row, Select, and_, desc, func, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.memory.entities import ChatflowConversationMetadata
|
||||
from core.model_runtime.entities.message_entities import (
|
||||
PromptMessage,
|
||||
)
|
||||
from extensions.ext_database import db
|
||||
from models.chatflow_memory import ChatflowConversation, ChatflowMessage
|
||||
|
||||
|
||||
class ChatflowHistoryService:
|
||||
|
||||
@staticmethod
|
||||
def get_visible_chat_history(
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None,
|
||||
max_visible_count: Optional[int] = None
|
||||
) -> Sequence[PromptMessage]:
|
||||
with Session(db.engine) as session:
|
||||
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
|
||||
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=False
|
||||
)
|
||||
|
||||
if not chatflow_conv:
|
||||
return []
|
||||
|
||||
metadata = ChatflowConversationMetadata.model_validate_json(chatflow_conv.conversation_metadata)
|
||||
visible_count: int = max_visible_count or metadata.visible_count
|
||||
|
||||
stmt = select(ChatflowMessage).where(
|
||||
ChatflowMessage.conversation_id == chatflow_conv.id
|
||||
).order_by(ChatflowMessage.index.asc(), ChatflowMessage.version.desc())
|
||||
raw_messages: Sequence[Row[tuple[ChatflowMessage]]] = session.execute(stmt).all()
|
||||
sorted_messages = ChatflowHistoryService._filter_latest_messages(
|
||||
[it[0] for it in raw_messages]
|
||||
)
|
||||
visible_count = min(visible_count, len(sorted_messages))
|
||||
visible_messages = sorted_messages[-visible_count:]
|
||||
return [PromptMessage.model_validate_json(it.data) for it in visible_messages]
|
||||
|
||||
@staticmethod
|
||||
def get_latest_chat_history_for_app(
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None,
|
||||
max_visible_count: Optional[int] = None
|
||||
) -> Sequence[PromptMessage]:
|
||||
"""
|
||||
Get the latest chat history for an app
|
||||
|
||||
Args:
|
||||
app_id: Application ID
|
||||
tenant_id: Tenant ID
|
||||
node_id: Node ID (None for APP level)
|
||||
max_visible_count: Maximum number of visible messages (optional)
|
||||
|
||||
Returns:
|
||||
PromptMessage sequence, empty list if no history exists
|
||||
"""
|
||||
with Session(db.engine) as session:
|
||||
# Query the most recently updated chatflow conversation
|
||||
stmt = select(ChatflowConversation).where(
|
||||
ChatflowConversation.tenant_id == tenant_id,
|
||||
ChatflowConversation.app_id == app_id,
|
||||
ChatflowConversation.node_id == (node_id or None)
|
||||
).order_by(desc(ChatflowConversation.updated_at)).limit(1)
|
||||
|
||||
chatflow_conv_row = session.execute(stmt).first()
|
||||
if not chatflow_conv_row:
|
||||
return []
|
||||
|
||||
chatflow_conv = chatflow_conv_row[0]
|
||||
|
||||
# Get visible messages for this conversation
|
||||
metadata = ChatflowConversationMetadata.model_validate_json(
|
||||
chatflow_conv.conversation_metadata
|
||||
)
|
||||
visible_count: int = max_visible_count or metadata.visible_count
|
||||
|
||||
stmt = select(ChatflowMessage).where(
|
||||
ChatflowMessage.conversation_id == chatflow_conv.id
|
||||
).order_by(ChatflowMessage.index.asc(), ChatflowMessage.version.desc())
|
||||
|
||||
raw_messages: Sequence[Row[tuple[ChatflowMessage]]] = session.execute(stmt).all()
|
||||
sorted_messages = ChatflowHistoryService._filter_latest_messages(
|
||||
[it[0] for it in raw_messages]
|
||||
)
|
||||
|
||||
visible_count = min(visible_count, len(sorted_messages))
|
||||
visible_messages = sorted_messages[-visible_count:]
|
||||
return [PromptMessage.model_validate_json(it.data) for it in visible_messages]
|
||||
|
||||
@staticmethod
|
||||
def save_message(
|
||||
prompt_message: PromptMessage,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None
|
||||
) -> None:
|
||||
with Session(db.engine) as session:
|
||||
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
|
||||
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=True
|
||||
)
|
||||
|
||||
# Get next index
|
||||
max_index = session.execute(
|
||||
select(func.max(ChatflowMessage.index)).where(
|
||||
ChatflowMessage.conversation_id == chatflow_conv.id
|
||||
)
|
||||
).scalar() or -1
|
||||
next_index = max_index + 1
|
||||
|
||||
# Save new message to append-only table
|
||||
new_message = ChatflowMessage(
|
||||
conversation_id=chatflow_conv.id,
|
||||
index=next_index,
|
||||
version=1,
|
||||
data=json.dumps(prompt_message)
|
||||
)
|
||||
session.add(new_message)
|
||||
session.commit()
|
||||
|
||||
# Increment visible_count after each message save
|
||||
current_metadata = ChatflowConversationMetadata.model_validate_json(chatflow_conv.conversation_metadata)
|
||||
new_visible_count = current_metadata.visible_count + 1
|
||||
new_metadata = ChatflowConversationMetadata(visible_count=new_visible_count)
|
||||
chatflow_conv.conversation_metadata = new_metadata.model_dump_json()
|
||||
|
||||
@staticmethod
|
||||
def save_app_message(
|
||||
prompt_message: PromptMessage,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str
|
||||
) -> None:
|
||||
"""Save PromptMessage to app-level chatflow conversation."""
|
||||
ChatflowHistoryService.save_message(
|
||||
prompt_message=prompt_message,
|
||||
conversation_id=conversation_id,
|
||||
app_id=app_id,
|
||||
tenant_id=tenant_id,
|
||||
node_id=None
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def save_node_message(
|
||||
prompt_message: PromptMessage,
|
||||
node_id: str,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str
|
||||
) -> None:
|
||||
ChatflowHistoryService.save_message(
|
||||
prompt_message=prompt_message,
|
||||
conversation_id=conversation_id,
|
||||
app_id=app_id,
|
||||
tenant_id=tenant_id,
|
||||
node_id=node_id
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def update_visible_count(
|
||||
conversation_id: str,
|
||||
node_id: Optional[str],
|
||||
new_visible_count: int,
|
||||
app_id: str,
|
||||
tenant_id: str
|
||||
) -> None:
|
||||
with Session(db.engine) as session:
|
||||
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
|
||||
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=True
|
||||
)
|
||||
|
||||
# Only update visible_count in metadata, do not delete any data
|
||||
new_metadata = ChatflowConversationMetadata(visible_count=new_visible_count)
|
||||
chatflow_conv.conversation_metadata = new_metadata.model_dump_json()
|
||||
|
||||
session.commit()
|
||||
|
||||
@staticmethod
|
||||
def get_conversation_metadata(
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
conversation_id: str,
|
||||
node_id: Optional[str]
|
||||
) -> ChatflowConversationMetadata:
|
||||
with Session(db.engine) as session:
|
||||
chatflow_conv = ChatflowHistoryService._get_or_create_chatflow_conversation(
|
||||
session, conversation_id, app_id, tenant_id, node_id, create_if_missing=False
|
||||
)
|
||||
if not chatflow_conv:
|
||||
raise ValueError(f"Conversation not found: {conversation_id}")
|
||||
return ChatflowConversationMetadata.model_validate_json(chatflow_conv.conversation_metadata)
|
||||
|
||||
@staticmethod
|
||||
def _filter_latest_messages(raw_messages: Sequence[ChatflowMessage]) -> Sequence[ChatflowMessage]:
|
||||
index_to_message: MutableMapping[int, ChatflowMessage] = {}
|
||||
for msg in raw_messages:
|
||||
index = msg.index
|
||||
if index not in index_to_message or msg.version > index_to_message[index].version:
|
||||
index_to_message[index] = msg
|
||||
|
||||
sorted_messages = sorted(index_to_message.values(), key=lambda m: m.index)
|
||||
return sorted_messages
|
||||
|
||||
@overload
|
||||
@staticmethod
|
||||
def _get_or_create_chatflow_conversation(
|
||||
session: Session,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None,
|
||||
create_if_missing: Literal[True] = True
|
||||
) -> ChatflowConversation: ...
|
||||
|
||||
@overload
|
||||
@staticmethod
|
||||
def _get_or_create_chatflow_conversation(
|
||||
session: Session,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None,
|
||||
create_if_missing: Literal[False] = False
|
||||
) -> Optional[ChatflowConversation]: ...
|
||||
|
||||
@overload
|
||||
@staticmethod
|
||||
def _get_or_create_chatflow_conversation(
|
||||
session: Session,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None,
|
||||
create_if_missing: bool = False
|
||||
) -> Optional[ChatflowConversation]: ...
|
||||
|
||||
@staticmethod
|
||||
def _get_or_create_chatflow_conversation(
|
||||
session: Session,
|
||||
conversation_id: str,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
node_id: Optional[str] = None,
|
||||
create_if_missing: bool = False
|
||||
) -> Optional[ChatflowConversation]:
|
||||
"""Get existing chatflow conversation or optionally create new one"""
|
||||
stmt: Select[tuple[ChatflowConversation]] = select(ChatflowConversation).where(
|
||||
and_(
|
||||
ChatflowConversation.original_conversation_id == conversation_id,
|
||||
ChatflowConversation.tenant_id == tenant_id,
|
||||
ChatflowConversation.app_id == app_id
|
||||
)
|
||||
)
|
||||
|
||||
if node_id:
|
||||
stmt = stmt.where(ChatflowConversation.node_id == node_id)
|
||||
else:
|
||||
stmt = stmt.where(ChatflowConversation.node_id.is_(None))
|
||||
|
||||
chatflow_conv: Row[tuple[ChatflowConversation]] | None = session.execute(stmt).first()
|
||||
|
||||
if chatflow_conv:
|
||||
result: ChatflowConversation = chatflow_conv[0] # Extract the ChatflowConversation object
|
||||
return result
|
||||
else:
|
||||
if create_if_missing:
|
||||
# Create a new chatflow conversation
|
||||
default_metadata = ChatflowConversationMetadata(visible_count=0)
|
||||
new_chatflow_conv = ChatflowConversation(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
node_id=node_id,
|
||||
original_conversation_id=conversation_id,
|
||||
conversation_metadata=default_metadata.model_dump_json(),
|
||||
)
|
||||
session.add(new_chatflow_conv)
|
||||
session.flush() # Obtain ID
|
||||
return new_chatflow_conv
|
||||
return None
|
||||
680
api/services/chatflow_memory_service.py
Normal file
680
api/services/chatflow_memory_service.py
Normal file
@@ -0,0 +1,680 @@
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Sequence
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import and_, delete, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.llm_generator.llm_generator import LLMGenerator
|
||||
from core.memory.entities import (
|
||||
MemoryBlock,
|
||||
MemoryBlockSpec,
|
||||
MemoryBlockWithConversation,
|
||||
MemoryCreatedBy,
|
||||
MemoryScheduleMode,
|
||||
MemoryScope,
|
||||
MemoryTerm,
|
||||
MemoryValueData,
|
||||
)
|
||||
from core.memory.errors import MemorySyncTimeoutError
|
||||
from core.model_runtime.entities.message_entities import PromptMessage
|
||||
from core.variables.segments import VersionedMemoryValue
|
||||
from core.workflow.constants import MEMORY_BLOCK_VARIABLE_NODE_ID
|
||||
from core.workflow.runtime import VariablePool
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from models import App, CreatorUserRole
|
||||
from models.chatflow_memory import ChatflowMemoryVariable
|
||||
from models.workflow import Workflow, WorkflowDraftVariable
|
||||
from services.chatflow_history_service import ChatflowHistoryService
|
||||
from services.workflow_draft_variable_service import WorkflowDraftVariableService
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChatflowMemoryService:
|
||||
@staticmethod
|
||||
def get_persistent_memories(
|
||||
app: App,
|
||||
created_by: MemoryCreatedBy,
|
||||
version: int | None = None
|
||||
) -> Sequence[MemoryBlock]:
|
||||
if created_by.account_id:
|
||||
created_by_role = CreatorUserRole.ACCOUNT
|
||||
created_by_id = created_by.account_id
|
||||
else:
|
||||
created_by_role = CreatorUserRole.END_USER
|
||||
created_by_id = created_by.id
|
||||
if version is None:
|
||||
# If version not specified, get the latest version
|
||||
stmt = select(ChatflowMemoryVariable).distinct(ChatflowMemoryVariable.memory_id).where(
|
||||
and_(
|
||||
ChatflowMemoryVariable.tenant_id == app.tenant_id,
|
||||
ChatflowMemoryVariable.app_id == app.id,
|
||||
ChatflowMemoryVariable.conversation_id == None,
|
||||
ChatflowMemoryVariable.created_by_role == created_by_role,
|
||||
ChatflowMemoryVariable.created_by == created_by_id,
|
||||
)
|
||||
).order_by(ChatflowMemoryVariable.version.desc())
|
||||
else:
|
||||
stmt = select(ChatflowMemoryVariable).where(
|
||||
and_(
|
||||
ChatflowMemoryVariable.tenant_id == app.tenant_id,
|
||||
ChatflowMemoryVariable.app_id == app.id,
|
||||
ChatflowMemoryVariable.conversation_id == None,
|
||||
ChatflowMemoryVariable.created_by_role == created_by_role,
|
||||
ChatflowMemoryVariable.created_by == created_by_id,
|
||||
ChatflowMemoryVariable.version == version
|
||||
)
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
db_results = session.execute(stmt).all()
|
||||
return ChatflowMemoryService._convert_to_memory_blocks(app, created_by, [result[0] for result in db_results])
|
||||
|
||||
@staticmethod
|
||||
def get_session_memories(
|
||||
app: App,
|
||||
created_by: MemoryCreatedBy,
|
||||
conversation_id: str,
|
||||
version: int | None = None
|
||||
) -> Sequence[MemoryBlock]:
|
||||
if version is None:
|
||||
# If version not specified, get the latest version
|
||||
stmt = select(ChatflowMemoryVariable).distinct(ChatflowMemoryVariable.memory_id).where(
|
||||
and_(
|
||||
ChatflowMemoryVariable.tenant_id == app.tenant_id,
|
||||
ChatflowMemoryVariable.app_id == app.id,
|
||||
ChatflowMemoryVariable.conversation_id == conversation_id
|
||||
)
|
||||
).order_by(ChatflowMemoryVariable.version.desc())
|
||||
else:
|
||||
stmt = select(ChatflowMemoryVariable).where(
|
||||
and_(
|
||||
ChatflowMemoryVariable.tenant_id == app.tenant_id,
|
||||
ChatflowMemoryVariable.app_id == app.id,
|
||||
ChatflowMemoryVariable.conversation_id == conversation_id,
|
||||
ChatflowMemoryVariable.version == version
|
||||
)
|
||||
)
|
||||
with Session(db.engine) as session:
|
||||
db_results = session.execute(stmt).all()
|
||||
return ChatflowMemoryService._convert_to_memory_blocks(app, created_by, [result[0] for result in db_results])
|
||||
|
||||
@staticmethod
|
||||
def save_memory(memory: MemoryBlock, variable_pool: VariablePool, is_draft: bool) -> None:
|
||||
key = f"{memory.node_id}_{memory.spec.id}" if memory.node_id else memory.spec.id
|
||||
variable_pool.add([MEMORY_BLOCK_VARIABLE_NODE_ID, key], memory.value)
|
||||
if memory.created_by.account_id:
|
||||
created_by_role = CreatorUserRole.ACCOUNT
|
||||
created_by = memory.created_by.account_id
|
||||
else:
|
||||
created_by_role = CreatorUserRole.END_USER
|
||||
created_by = memory.created_by.id
|
||||
|
||||
with Session(db.engine) as session:
|
||||
session.add(
|
||||
ChatflowMemoryVariable(
|
||||
memory_id=memory.spec.id,
|
||||
tenant_id=memory.tenant_id,
|
||||
app_id=memory.app_id,
|
||||
node_id=memory.node_id,
|
||||
conversation_id=memory.conversation_id,
|
||||
name=memory.spec.name,
|
||||
value=MemoryValueData(
|
||||
value=memory.value,
|
||||
edited_by_user=memory.edited_by_user
|
||||
).model_dump_json(),
|
||||
term=memory.spec.term,
|
||||
scope=memory.spec.scope,
|
||||
version=memory.version, # Use version from MemoryBlock directly
|
||||
created_by_role=created_by_role,
|
||||
created_by=created_by,
|
||||
)
|
||||
)
|
||||
session.commit()
|
||||
|
||||
if is_draft:
|
||||
with Session(bind=db.engine) as session:
|
||||
draft_var_service = WorkflowDraftVariableService(session)
|
||||
memory_selector = memory.spec.id if not memory.node_id else f"{memory.node_id}_{memory.spec.id}"
|
||||
existing_vars = draft_var_service.get_draft_variables_by_selectors(
|
||||
app_id=memory.app_id,
|
||||
selectors=[[MEMORY_BLOCK_VARIABLE_NODE_ID, memory_selector]]
|
||||
)
|
||||
if existing_vars:
|
||||
draft_var = existing_vars[0]
|
||||
draft_var.value = VersionedMemoryValue.model_validate_json(draft_var.value)\
|
||||
.add_version(memory.value)\
|
||||
.model_dump_json()
|
||||
else:
|
||||
draft_var = WorkflowDraftVariable.new_memory_block_variable(
|
||||
app_id=memory.app_id,
|
||||
memory_id=memory.spec.id,
|
||||
name=memory.spec.name,
|
||||
value=VersionedMemoryValue().add_version(memory.value),
|
||||
description=memory.spec.description
|
||||
)
|
||||
session.add(draft_var)
|
||||
session.commit()
|
||||
|
||||
@staticmethod
|
||||
def get_memories_by_specs(
|
||||
memory_block_specs: Sequence[MemoryBlockSpec],
|
||||
tenant_id: str, app_id: str,
|
||||
created_by: MemoryCreatedBy,
|
||||
conversation_id: Optional[str],
|
||||
node_id: Optional[str],
|
||||
is_draft: bool
|
||||
) -> Sequence[MemoryBlock]:
|
||||
return [ChatflowMemoryService.get_memory_by_spec(
|
||||
spec, tenant_id, app_id, created_by, conversation_id, node_id, is_draft
|
||||
) for spec in memory_block_specs]
|
||||
|
||||
@staticmethod
|
||||
def get_memory_by_spec(
|
||||
spec: MemoryBlockSpec,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
created_by: MemoryCreatedBy,
|
||||
conversation_id: Optional[str],
|
||||
node_id: Optional[str],
|
||||
is_draft: bool
|
||||
) -> MemoryBlock:
|
||||
with Session(db.engine) as session:
|
||||
if is_draft:
|
||||
draft_var_service = WorkflowDraftVariableService(session)
|
||||
selector = [MEMORY_BLOCK_VARIABLE_NODE_ID, f"{spec.id}.{node_id}"] \
|
||||
if node_id else [MEMORY_BLOCK_VARIABLE_NODE_ID, spec.id]
|
||||
draft_vars = draft_var_service.get_draft_variables_by_selectors(
|
||||
app_id=app_id,
|
||||
selectors=[selector]
|
||||
)
|
||||
if draft_vars:
|
||||
draft_var = draft_vars[0]
|
||||
return MemoryBlock(
|
||||
value=draft_var.get_value().text,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
conversation_id=conversation_id,
|
||||
node_id=node_id,
|
||||
spec=spec,
|
||||
created_by=created_by,
|
||||
version=1,
|
||||
)
|
||||
stmt = select(ChatflowMemoryVariable).where(
|
||||
and_(
|
||||
ChatflowMemoryVariable.memory_id == spec.id,
|
||||
ChatflowMemoryVariable.tenant_id == tenant_id,
|
||||
ChatflowMemoryVariable.app_id == app_id,
|
||||
ChatflowMemoryVariable.node_id ==
|
||||
(node_id if spec.scope == MemoryScope.NODE else None),
|
||||
ChatflowMemoryVariable.conversation_id ==
|
||||
(conversation_id if spec.term == MemoryTerm.SESSION else None),
|
||||
)
|
||||
).order_by(ChatflowMemoryVariable.version.desc()).limit(1)
|
||||
result = session.execute(stmt).scalar()
|
||||
if result:
|
||||
memory_value_data = MemoryValueData.model_validate_json(result.value)
|
||||
return MemoryBlock(
|
||||
value=memory_value_data.value,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
conversation_id=conversation_id,
|
||||
node_id=node_id,
|
||||
spec=spec,
|
||||
edited_by_user=memory_value_data.edited_by_user,
|
||||
created_by=created_by,
|
||||
version=result.version,
|
||||
)
|
||||
return MemoryBlock(
|
||||
tenant_id=tenant_id,
|
||||
value=spec.template,
|
||||
app_id=app_id,
|
||||
conversation_id=conversation_id,
|
||||
node_id=node_id,
|
||||
spec=spec,
|
||||
created_by=created_by,
|
||||
version=1,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def update_app_memory_if_needed(
|
||||
workflow: Workflow,
|
||||
conversation_id: str,
|
||||
variable_pool: VariablePool,
|
||||
created_by: MemoryCreatedBy,
|
||||
is_draft: bool
|
||||
):
|
||||
visible_messages = ChatflowHistoryService.get_visible_chat_history(
|
||||
conversation_id=conversation_id,
|
||||
app_id=workflow.app_id,
|
||||
tenant_id=workflow.tenant_id,
|
||||
node_id=None,
|
||||
)
|
||||
sync_blocks: list[MemoryBlock] = []
|
||||
async_blocks: list[MemoryBlock] = []
|
||||
for memory_spec in workflow.memory_blocks:
|
||||
if memory_spec.scope == MemoryScope.APP:
|
||||
memory = ChatflowMemoryService.get_memory_by_spec(
|
||||
spec=memory_spec,
|
||||
tenant_id=workflow.tenant_id,
|
||||
app_id=workflow.app_id,
|
||||
conversation_id=conversation_id,
|
||||
node_id=None,
|
||||
is_draft=is_draft,
|
||||
created_by=created_by,
|
||||
)
|
||||
if ChatflowMemoryService._should_update_memory(memory, visible_messages):
|
||||
if memory.spec.schedule_mode == MemoryScheduleMode.SYNC:
|
||||
sync_blocks.append(memory)
|
||||
else:
|
||||
async_blocks.append(memory)
|
||||
|
||||
if not sync_blocks and not async_blocks:
|
||||
return
|
||||
|
||||
# async mode: submit individual async tasks directly
|
||||
for memory_block in async_blocks:
|
||||
ChatflowMemoryService._app_submit_async_memory_update(
|
||||
block=memory_block,
|
||||
is_draft=is_draft,
|
||||
variable_pool=variable_pool,
|
||||
visible_messages=visible_messages,
|
||||
conversation_id=conversation_id,
|
||||
)
|
||||
|
||||
# sync mode: submit a batch update task
|
||||
if sync_blocks:
|
||||
ChatflowMemoryService._app_submit_sync_memory_batch_update(
|
||||
sync_blocks=sync_blocks,
|
||||
is_draft=is_draft,
|
||||
conversation_id=conversation_id,
|
||||
app_id=workflow.app_id,
|
||||
visible_messages=visible_messages,
|
||||
variable_pool=variable_pool
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def update_node_memory_if_needed(
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
node_id: str,
|
||||
created_by: MemoryCreatedBy,
|
||||
conversation_id: str,
|
||||
memory_block_spec: MemoryBlockSpec,
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool
|
||||
) -> bool:
|
||||
visible_messages = ChatflowHistoryService.get_visible_chat_history(
|
||||
conversation_id=conversation_id,
|
||||
app_id=app_id,
|
||||
tenant_id=tenant_id,
|
||||
node_id=node_id,
|
||||
)
|
||||
memory_block = ChatflowMemoryService.get_memory_by_spec(
|
||||
spec=memory_block_spec,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
conversation_id=conversation_id,
|
||||
node_id=node_id,
|
||||
is_draft=is_draft,
|
||||
created_by=created_by,
|
||||
)
|
||||
if not ChatflowMemoryService._should_update_memory(
|
||||
memory_block=memory_block,
|
||||
visible_history=visible_messages
|
||||
):
|
||||
return False
|
||||
|
||||
if memory_block_spec.schedule_mode == MemoryScheduleMode.SYNC:
|
||||
# Node-level sync: blocking execution
|
||||
ChatflowMemoryService._update_node_memory_sync(
|
||||
visible_messages=visible_messages,
|
||||
memory_block=memory_block,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft,
|
||||
conversation_id=conversation_id
|
||||
)
|
||||
else:
|
||||
# Node-level async: execute asynchronously
|
||||
ChatflowMemoryService._update_node_memory_async(
|
||||
memory_block=memory_block,
|
||||
visible_messages=visible_messages,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft,
|
||||
conversation_id=conversation_id
|
||||
)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def wait_for_sync_memory_completion(workflow: Workflow, conversation_id: str):
|
||||
"""Wait for sync memory update to complete, maximum 50 seconds"""
|
||||
|
||||
memory_blocks = workflow.memory_blocks
|
||||
sync_memory_blocks = [
|
||||
block for block in memory_blocks
|
||||
if block.scope == MemoryScope.APP and block.schedule_mode == MemoryScheduleMode.SYNC
|
||||
]
|
||||
|
||||
if not sync_memory_blocks:
|
||||
return
|
||||
|
||||
lock_key = _get_memory_sync_lock_key(workflow.app_id, conversation_id)
|
||||
|
||||
# Retry up to 10 times, wait 5 seconds each time, total 50 seconds
|
||||
max_retries = 10
|
||||
retry_interval = 5
|
||||
|
||||
for i in range(max_retries):
|
||||
if not redis_client.exists(lock_key):
|
||||
# Lock doesn't exist, can continue
|
||||
return
|
||||
|
||||
if i < max_retries - 1:
|
||||
# Still have retry attempts, wait
|
||||
time.sleep(retry_interval)
|
||||
else:
|
||||
# Maximum retry attempts reached, raise exception
|
||||
raise MemorySyncTimeoutError(
|
||||
app_id=workflow.app_id,
|
||||
conversation_id=conversation_id
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _convert_to_memory_blocks(
|
||||
app: App,
|
||||
created_by: MemoryCreatedBy,
|
||||
raw_results: Sequence[ChatflowMemoryVariable]
|
||||
) -> Sequence[MemoryBlock]:
|
||||
workflow = WorkflowService().get_published_workflow(app)
|
||||
if not workflow:
|
||||
return []
|
||||
results = []
|
||||
for chatflow_memory_variable in raw_results:
|
||||
spec = next(
|
||||
(spec for spec in workflow.memory_blocks if spec.id == chatflow_memory_variable.memory_id),
|
||||
None
|
||||
)
|
||||
if spec and chatflow_memory_variable.app_id:
|
||||
memory_value_data = MemoryValueData.model_validate_json(chatflow_memory_variable.value)
|
||||
results.append(
|
||||
MemoryBlock(
|
||||
spec=spec,
|
||||
tenant_id=chatflow_memory_variable.tenant_id,
|
||||
value=memory_value_data.value,
|
||||
app_id=chatflow_memory_variable.app_id,
|
||||
conversation_id=chatflow_memory_variable.conversation_id,
|
||||
node_id=chatflow_memory_variable.node_id,
|
||||
edited_by_user=memory_value_data.edited_by_user,
|
||||
created_by=created_by,
|
||||
version=chatflow_memory_variable.version,
|
||||
)
|
||||
)
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
def _should_update_memory(
|
||||
memory_block: MemoryBlock,
|
||||
visible_history: Sequence[PromptMessage]
|
||||
) -> bool:
|
||||
return len(visible_history) >= memory_block.spec.update_turns
|
||||
|
||||
@staticmethod
|
||||
def _app_submit_async_memory_update(
|
||||
block: MemoryBlock,
|
||||
visible_messages: Sequence[PromptMessage],
|
||||
variable_pool: VariablePool,
|
||||
conversation_id: str,
|
||||
is_draft: bool
|
||||
):
|
||||
thread = threading.Thread(
|
||||
target=ChatflowMemoryService._perform_memory_update,
|
||||
kwargs={
|
||||
'memory_block': block,
|
||||
'visible_messages': visible_messages,
|
||||
'variable_pool': variable_pool,
|
||||
'is_draft': is_draft,
|
||||
'conversation_id': conversation_id
|
||||
},
|
||||
)
|
||||
thread.start()
|
||||
|
||||
@staticmethod
|
||||
def _app_submit_sync_memory_batch_update(
|
||||
sync_blocks: Sequence[MemoryBlock],
|
||||
app_id: str,
|
||||
conversation_id: str,
|
||||
visible_messages: Sequence[PromptMessage],
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool
|
||||
):
|
||||
"""Submit sync memory batch update task"""
|
||||
thread = threading.Thread(
|
||||
target=ChatflowMemoryService._batch_update_sync_memory,
|
||||
kwargs={
|
||||
'sync_blocks': sync_blocks,
|
||||
'app_id': app_id,
|
||||
'conversation_id': conversation_id,
|
||||
'visible_messages': visible_messages,
|
||||
'variable_pool': variable_pool,
|
||||
'is_draft': is_draft
|
||||
},
|
||||
)
|
||||
thread.start()
|
||||
|
||||
@staticmethod
|
||||
def _batch_update_sync_memory(
|
||||
sync_blocks: Sequence[MemoryBlock],
|
||||
app_id: str,
|
||||
conversation_id: str,
|
||||
visible_messages: Sequence[PromptMessage],
|
||||
variable_pool: VariablePool,
|
||||
is_draft: bool
|
||||
):
|
||||
try:
|
||||
lock_key = _get_memory_sync_lock_key(app_id, conversation_id)
|
||||
with redis_client.lock(lock_key, timeout=120):
|
||||
threads = []
|
||||
for block in sync_blocks:
|
||||
thread = threading.Thread(
|
||||
target=ChatflowMemoryService._perform_memory_update,
|
||||
kwargs={
|
||||
'memory_block': block,
|
||||
'visible_messages': visible_messages,
|
||||
'variable_pool': variable_pool,
|
||||
'is_draft': is_draft,
|
||||
'conversation_id': conversation_id,
|
||||
},
|
||||
)
|
||||
threads.append(thread)
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
except Exception as e:
|
||||
logger.exception("Error batch updating memory", exc_info=e)
|
||||
|
||||
@staticmethod
|
||||
def _update_node_memory_sync(
|
||||
memory_block: MemoryBlock,
|
||||
visible_messages: Sequence[PromptMessage],
|
||||
variable_pool: VariablePool,
|
||||
conversation_id: str,
|
||||
is_draft: bool
|
||||
):
|
||||
ChatflowMemoryService._perform_memory_update(
|
||||
memory_block=memory_block,
|
||||
visible_messages=visible_messages,
|
||||
variable_pool=variable_pool,
|
||||
is_draft=is_draft,
|
||||
conversation_id=conversation_id
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _update_node_memory_async(
|
||||
memory_block: MemoryBlock,
|
||||
visible_messages: Sequence[PromptMessage],
|
||||
variable_pool: VariablePool,
|
||||
conversation_id: str,
|
||||
is_draft: bool = False
|
||||
):
|
||||
thread = threading.Thread(
|
||||
target=ChatflowMemoryService._perform_memory_update,
|
||||
kwargs={
|
||||
'memory_block': memory_block,
|
||||
'visible_messages': visible_messages,
|
||||
'variable_pool': variable_pool,
|
||||
'is_draft': is_draft,
|
||||
'conversation_id': conversation_id,
|
||||
},
|
||||
daemon=True
|
||||
)
|
||||
thread.start()
|
||||
|
||||
@staticmethod
|
||||
def _perform_memory_update(
|
||||
memory_block: MemoryBlock,
|
||||
variable_pool: VariablePool,
|
||||
conversation_id: str,
|
||||
visible_messages: Sequence[PromptMessage],
|
||||
is_draft: bool
|
||||
):
|
||||
updated_value = LLMGenerator.update_memory_block(
|
||||
tenant_id=memory_block.tenant_id,
|
||||
visible_history=ChatflowMemoryService._format_chat_history(visible_messages),
|
||||
variable_pool=variable_pool,
|
||||
memory_block=memory_block,
|
||||
memory_spec=memory_block.spec,
|
||||
)
|
||||
updated_memory = MemoryBlock(
|
||||
tenant_id=memory_block.tenant_id,
|
||||
value=updated_value,
|
||||
spec=memory_block.spec,
|
||||
app_id=memory_block.app_id,
|
||||
conversation_id=conversation_id,
|
||||
node_id=memory_block.node_id,
|
||||
edited_by_user=False,
|
||||
created_by=memory_block.created_by,
|
||||
version=memory_block.version + 1, # Increment version for business logic update
|
||||
)
|
||||
ChatflowMemoryService.save_memory(updated_memory, variable_pool, is_draft)
|
||||
|
||||
ChatflowHistoryService.update_visible_count(
|
||||
conversation_id=conversation_id,
|
||||
node_id=memory_block.node_id,
|
||||
new_visible_count=memory_block.spec.preserved_turns,
|
||||
app_id=memory_block.app_id,
|
||||
tenant_id=memory_block.tenant_id
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def delete_memory(app: App, memory_id: str, created_by: MemoryCreatedBy):
|
||||
workflow = WorkflowService().get_published_workflow(app)
|
||||
if not workflow:
|
||||
raise ValueError("Workflow not found")
|
||||
|
||||
memory_spec = next((it for it in workflow.memory_blocks if it.id == memory_id), None)
|
||||
if not memory_spec or not memory_spec.end_user_editable:
|
||||
raise ValueError("Memory not found or not deletable")
|
||||
|
||||
if created_by.account_id:
|
||||
created_by_role = CreatorUserRole.ACCOUNT
|
||||
created_by_id = created_by.account_id
|
||||
else:
|
||||
created_by_role = CreatorUserRole.END_USER
|
||||
created_by_id = created_by.id
|
||||
|
||||
with Session(db.engine) as session:
|
||||
stmt = delete(ChatflowMemoryVariable).where(
|
||||
and_(
|
||||
ChatflowMemoryVariable.tenant_id == app.tenant_id,
|
||||
ChatflowMemoryVariable.app_id == app.id,
|
||||
ChatflowMemoryVariable.memory_id == memory_id,
|
||||
ChatflowMemoryVariable.created_by_role == created_by_role,
|
||||
ChatflowMemoryVariable.created_by == created_by_id
|
||||
)
|
||||
)
|
||||
session.execute(stmt)
|
||||
session.commit()
|
||||
|
||||
@staticmethod
|
||||
def delete_all_user_memories(app: App, created_by: MemoryCreatedBy):
|
||||
if created_by.account_id:
|
||||
created_by_role = CreatorUserRole.ACCOUNT
|
||||
created_by_id = created_by.account_id
|
||||
else:
|
||||
created_by_role = CreatorUserRole.END_USER
|
||||
created_by_id = created_by.id
|
||||
|
||||
with Session(db.engine) as session:
|
||||
stmt = delete(ChatflowMemoryVariable).where(
|
||||
and_(
|
||||
ChatflowMemoryVariable.tenant_id == app.tenant_id,
|
||||
ChatflowMemoryVariable.app_id == app.id,
|
||||
ChatflowMemoryVariable.created_by_role == created_by_role,
|
||||
ChatflowMemoryVariable.created_by == created_by_id
|
||||
)
|
||||
)
|
||||
session.execute(stmt)
|
||||
session.commit()
|
||||
|
||||
@staticmethod
|
||||
def get_persistent_memories_with_conversation(
|
||||
app: App,
|
||||
created_by: MemoryCreatedBy,
|
||||
conversation_id: str,
|
||||
version: int | None = None
|
||||
) -> Sequence[MemoryBlockWithConversation]:
|
||||
"""Get persistent memories with conversation metadata (always None for persistent)"""
|
||||
memory_blocks = ChatflowMemoryService.get_persistent_memories(app, created_by, version)
|
||||
return [
|
||||
MemoryBlockWithConversation.from_memory_block(
|
||||
block,
|
||||
ChatflowHistoryService.get_conversation_metadata(
|
||||
app.tenant_id, app.id, conversation_id, block.node_id
|
||||
)
|
||||
)
|
||||
for block in memory_blocks
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def get_session_memories_with_conversation(
|
||||
app: App,
|
||||
created_by: MemoryCreatedBy,
|
||||
conversation_id: str,
|
||||
version: int | None = None
|
||||
) -> Sequence[MemoryBlockWithConversation]:
|
||||
"""Get session memories with conversation metadata"""
|
||||
memory_blocks = ChatflowMemoryService.get_session_memories(app, created_by, conversation_id, version)
|
||||
return [
|
||||
MemoryBlockWithConversation.from_memory_block(
|
||||
block,
|
||||
ChatflowHistoryService.get_conversation_metadata(
|
||||
app.tenant_id, app.id, conversation_id, block.node_id
|
||||
)
|
||||
)
|
||||
for block in memory_blocks
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def _format_chat_history(messages: Sequence[PromptMessage]) -> Sequence[tuple[str, str]]:
|
||||
result = []
|
||||
for message in messages:
|
||||
result.append((str(message.role.value), message.get_text_content()))
|
||||
return result
|
||||
|
||||
|
||||
def _get_memory_sync_lock_key(app_id: str, conversation_id: str) -> str:
|
||||
"""Generate Redis lock key for memory sync updates
|
||||
|
||||
Args:
|
||||
app_id: Application ID
|
||||
conversation_id: Conversation ID
|
||||
|
||||
Returns:
|
||||
Formatted lock key
|
||||
"""
|
||||
return f"memory_sync_update:{app_id}:{conversation_id}"
|
||||
@@ -12,6 +12,7 @@ from core.app.app_config.entities import VariableEntityType
|
||||
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
|
||||
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
|
||||
from core.file import File
|
||||
from core.memory.entities import MemoryBlockSpec, MemoryCreatedBy, MemoryScope
|
||||
from core.repositories import DifyCoreRepositoryFactory
|
||||
from core.variables import Variable
|
||||
from core.variables.variables import VariableUnion
|
||||
@@ -199,6 +200,7 @@ class WorkflowService:
|
||||
account: Account,
|
||||
environment_variables: Sequence[Variable],
|
||||
conversation_variables: Sequence[Variable],
|
||||
memory_blocks: Sequence[MemoryBlockSpec] | None = None,
|
||||
) -> Workflow:
|
||||
"""
|
||||
Sync draft workflow
|
||||
@@ -229,6 +231,7 @@ class WorkflowService:
|
||||
environment_variables=environment_variables,
|
||||
conversation_variables=conversation_variables,
|
||||
)
|
||||
workflow.memory_blocks = memory_blocks or []
|
||||
db.session.add(workflow)
|
||||
# update draft workflow if found
|
||||
else:
|
||||
@@ -238,6 +241,7 @@ class WorkflowService:
|
||||
workflow.updated_at = naive_utc_now()
|
||||
workflow.environment_variables = environment_variables
|
||||
workflow.conversation_variables = conversation_variables
|
||||
workflow.memory_blocks = memory_blocks or []
|
||||
|
||||
# commit db session changes
|
||||
db.session.commit()
|
||||
@@ -303,6 +307,7 @@ class WorkflowService:
|
||||
marked_name=marked_name,
|
||||
marked_comment=marked_comment,
|
||||
rag_pipeline_variables=draft_workflow.rag_pipeline_variables,
|
||||
memory_blocks=draft_workflow.memory_blocks,
|
||||
features=draft_workflow.features,
|
||||
)
|
||||
|
||||
@@ -660,17 +665,10 @@ class WorkflowService:
|
||||
tenant_id=draft_workflow.tenant_id, start_node_data=start_data, user_inputs=user_inputs
|
||||
)
|
||||
# init variable pool
|
||||
variable_pool = _setup_variable_pool(
|
||||
query=query,
|
||||
files=files or [],
|
||||
user_id=account.id,
|
||||
user_inputs=user_inputs,
|
||||
workflow=draft_workflow,
|
||||
# NOTE(QuantumGhost): We rely on `DraftVarLoader` to load conversation variables.
|
||||
conversation_variables=[],
|
||||
node_type=node_type,
|
||||
conversation_id=conversation_id,
|
||||
)
|
||||
variable_pool = _setup_variable_pool(query=query, files=files or [], user_id=account.id,
|
||||
user_inputs=user_inputs, workflow=draft_workflow,
|
||||
node_type=node_type, conversation_id=conversation_id,
|
||||
conversation_variables=[], is_draft=True)
|
||||
|
||||
else:
|
||||
variable_pool = VariablePool(
|
||||
@@ -1044,6 +1042,7 @@ def _setup_variable_pool(
|
||||
node_type: NodeType,
|
||||
conversation_id: str,
|
||||
conversation_variables: list[Variable],
|
||||
is_draft: bool
|
||||
):
|
||||
# Only inject system variables for START node type.
|
||||
if node_type == NodeType.START or node_type.is_trigger_node:
|
||||
@@ -1063,7 +1062,6 @@ def _setup_variable_pool(
|
||||
system_variable.dialogue_count = 1
|
||||
else:
|
||||
system_variable = SystemVariable.empty()
|
||||
|
||||
# init variable pool
|
||||
variable_pool = VariablePool(
|
||||
system_variables=system_variable,
|
||||
@@ -1072,6 +1070,12 @@ def _setup_variable_pool(
|
||||
# Based on the definition of `VariableUnion`,
|
||||
# `list[Variable]` can be safely used as `list[VariableUnion]` since they are compatible.
|
||||
conversation_variables=cast(list[VariableUnion], conversation_variables), #
|
||||
memory_blocks=_fetch_memory_blocks(
|
||||
workflow,
|
||||
MemoryCreatedBy(account_id=user_id),
|
||||
conversation_id,
|
||||
is_draft=is_draft
|
||||
),
|
||||
)
|
||||
|
||||
return variable_pool
|
||||
@@ -1108,3 +1112,30 @@ def _rebuild_single_file(tenant_id: str, value: Any, variable_entity_type: Varia
|
||||
return build_from_mappings(mappings=value, tenant_id=tenant_id)
|
||||
else:
|
||||
raise Exception("unreachable")
|
||||
|
||||
|
||||
def _fetch_memory_blocks(
|
||||
workflow: Workflow,
|
||||
created_by: MemoryCreatedBy,
|
||||
conversation_id: str,
|
||||
is_draft: bool
|
||||
) -> Mapping[str, str]:
|
||||
memory_blocks = {}
|
||||
memory_block_specs = workflow.memory_blocks
|
||||
from services.chatflow_memory_service import ChatflowMemoryService
|
||||
memories = ChatflowMemoryService.get_memories_by_specs(
|
||||
memory_block_specs=memory_block_specs,
|
||||
tenant_id=workflow.tenant_id,
|
||||
app_id=workflow.app_id,
|
||||
node_id=None,
|
||||
conversation_id=conversation_id,
|
||||
is_draft=is_draft,
|
||||
created_by=created_by,
|
||||
)
|
||||
for memory in memories:
|
||||
if memory.spec.scope == MemoryScope.APP:
|
||||
memory_blocks[memory.spec.id] = memory.value
|
||||
else: # NODE scope
|
||||
memory_blocks[f"{memory.node_id}_{memory.spec.id}"] = memory.value
|
||||
|
||||
return memory_blocks
|
||||
|
||||
Reference in New Issue
Block a user