mirror of
https://github.com/langgenius/dify.git
synced 2026-01-07 23:04:12 +00:00
add AdvancedChatAppGenerateTaskPipeline
This commit is contained in:
@@ -8,19 +8,24 @@ from sqlalchemy.orm import DeclarativeMeta
|
||||
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.app.entities.queue_entities import (
|
||||
AnnotationReplyEvent,
|
||||
AppQueueEvent,
|
||||
QueueAgentMessageEvent,
|
||||
QueueAgentThoughtEvent,
|
||||
QueueAnnotationReplyEvent,
|
||||
QueueErrorEvent,
|
||||
QueueLLMChunkEvent,
|
||||
QueueMessage,
|
||||
QueueMessageEndEvent,
|
||||
QueueMessageEvent,
|
||||
QueueMessageFileEvent,
|
||||
QueueMessageReplaceEvent,
|
||||
QueueNodeFinishedEvent,
|
||||
QueueNodeStartedEvent,
|
||||
QueuePingEvent,
|
||||
QueueRetrieverResourcesEvent,
|
||||
QueueStopEvent,
|
||||
QueueTextChunkEvent,
|
||||
QueueWorkflowFinishedEvent,
|
||||
QueueWorkflowStartedEvent,
|
||||
)
|
||||
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
|
||||
from extensions.ext_redis import redis_client
|
||||
@@ -97,18 +102,30 @@ class AppQueueManager:
|
||||
"""
|
||||
self._q.put(None)
|
||||
|
||||
def publish_chunk_message(self, chunk: LLMResultChunk, pub_from: PublishFrom) -> None:
|
||||
def publish_llm_chunk(self, chunk: LLMResultChunk, pub_from: PublishFrom) -> None:
|
||||
"""
|
||||
Publish chunk message to channel
|
||||
Publish llm chunk to channel
|
||||
|
||||
:param chunk: chunk
|
||||
:param chunk: llm chunk
|
||||
:param pub_from: publish from
|
||||
:return:
|
||||
"""
|
||||
self.publish(QueueMessageEvent(
|
||||
self.publish(QueueLLMChunkEvent(
|
||||
chunk=chunk
|
||||
), pub_from)
|
||||
|
||||
def publish_text_chunk(self, text: str, pub_from: PublishFrom) -> None:
|
||||
"""
|
||||
Publish text chunk to channel
|
||||
|
||||
:param text: text
|
||||
:param pub_from: publish from
|
||||
:return:
|
||||
"""
|
||||
self.publish(QueueTextChunkEvent(
|
||||
text=text
|
||||
), pub_from)
|
||||
|
||||
def publish_agent_chunk_message(self, chunk: LLMResultChunk, pub_from: PublishFrom) -> None:
|
||||
"""
|
||||
Publish agent chunk message to channel
|
||||
@@ -146,7 +163,7 @@ class AppQueueManager:
|
||||
:param pub_from: publish from
|
||||
:return:
|
||||
"""
|
||||
self.publish(AnnotationReplyEvent(message_annotation_id=message_annotation_id), pub_from)
|
||||
self.publish(QueueAnnotationReplyEvent(message_annotation_id=message_annotation_id), pub_from)
|
||||
|
||||
def publish_message_end(self, llm_result: LLMResult, pub_from: PublishFrom) -> None:
|
||||
"""
|
||||
@@ -158,6 +175,42 @@ class AppQueueManager:
|
||||
self.publish(QueueMessageEndEvent(llm_result=llm_result), pub_from)
|
||||
self.stop_listen()
|
||||
|
||||
def publish_workflow_started(self, workflow_run_id: str, pub_from: PublishFrom) -> None:
|
||||
"""
|
||||
Publish workflow started
|
||||
:param workflow_run_id: workflow run id
|
||||
:param pub_from: publish from
|
||||
:return:
|
||||
"""
|
||||
self.publish(QueueWorkflowStartedEvent(workflow_run_id=workflow_run_id), pub_from)
|
||||
|
||||
def publish_workflow_finished(self, workflow_run_id: str, pub_from: PublishFrom) -> None:
|
||||
"""
|
||||
Publish workflow finished
|
||||
:param workflow_run_id: workflow run id
|
||||
:param pub_from: publish from
|
||||
:return:
|
||||
"""
|
||||
self.publish(QueueWorkflowFinishedEvent(workflow_run_id=workflow_run_id), pub_from)
|
||||
|
||||
def publish_node_started(self, workflow_node_execution_id: str, pub_from: PublishFrom) -> None:
|
||||
"""
|
||||
Publish node started
|
||||
:param workflow_node_execution_id: workflow node execution id
|
||||
:param pub_from: publish from
|
||||
:return:
|
||||
"""
|
||||
self.publish(QueueNodeStartedEvent(workflow_node_execution_id=workflow_node_execution_id), pub_from)
|
||||
|
||||
def publish_node_finished(self, workflow_node_execution_id: str, pub_from: PublishFrom) -> None:
|
||||
"""
|
||||
Publish node finished
|
||||
:param workflow_node_execution_id: workflow node execution id
|
||||
:param pub_from: publish from
|
||||
:return:
|
||||
"""
|
||||
self.publish(QueueNodeFinishedEvent(workflow_node_execution_id=workflow_node_execution_id), pub_from)
|
||||
|
||||
def publish_agent_thought(self, message_agent_thought: MessageAgentThought, pub_from: PublishFrom) -> None:
|
||||
"""
|
||||
Publish agent thought
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
from typing import Optional
|
||||
|
||||
from core.app.app_config.base_app_config_manager import BaseAppConfigManager
|
||||
from core.app.app_config.common.sensitive_word_avoidance.manager import SensitiveWordAvoidanceConfigManager
|
||||
@@ -12,7 +11,7 @@ from core.app.app_config.features.suggested_questions_after_answer.manager impor
|
||||
)
|
||||
from core.app.app_config.features.text_to_speech.manager import TextToSpeechConfigManager
|
||||
from core.app.app_config.workflow_ui_based_app.variables.manager import WorkflowVariablesConfigManager
|
||||
from models.model import App, AppMode, Conversation
|
||||
from models.model import App, AppMode
|
||||
from models.workflow import Workflow
|
||||
|
||||
|
||||
@@ -26,8 +25,7 @@ class AdvancedChatAppConfig(WorkflowUIBasedAppConfig):
|
||||
class AdvancedChatAppConfigManager(BaseAppConfigManager):
|
||||
@classmethod
|
||||
def get_app_config(cls, app_model: App,
|
||||
workflow: Workflow,
|
||||
conversation: Optional[Conversation] = None) -> AdvancedChatAppConfig:
|
||||
workflow: Workflow) -> AdvancedChatAppConfig:
|
||||
features_dict = workflow.features_dict
|
||||
|
||||
app_config = AdvancedChatAppConfig(
|
||||
|
||||
218
api/core/app/apps/advanced_chat/app_generator.py
Normal file
218
api/core/app/apps/advanced_chat/app_generator.py
Normal file
@@ -0,0 +1,218 @@
|
||||
import logging
|
||||
import threading
|
||||
import uuid
|
||||
from collections.abc import Generator
|
||||
from typing import Any, Union
|
||||
|
||||
from flask import Flask, current_app
|
||||
from pydantic import ValidationError
|
||||
|
||||
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
|
||||
from core.app.app_queue_manager import AppQueueManager, ConversationTaskStoppedException, PublishFrom
|
||||
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
|
||||
from core.app.apps.advanced_chat.app_runner import AdvancedChatAppRunner
|
||||
from core.app.apps.advanced_chat.generate_task_pipeline import AdvancedChatAppGenerateTaskPipeline
|
||||
from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
|
||||
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom
|
||||
from core.file.message_file_parser import MessageFileParser
|
||||
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
|
||||
from core.workflow.workflow_engine_manager import WorkflowEngineManager
|
||||
from extensions.ext_database import db
|
||||
from models.account import Account
|
||||
from models.model import App, Conversation, EndUser, Message
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AdvancedChatAppGenerator(MessageBasedAppGenerator):
|
||||
def generate(self, app_model: App,
|
||||
user: Union[Account, EndUser],
|
||||
args: Any,
|
||||
invoke_from: InvokeFrom,
|
||||
stream: bool = True) \
|
||||
-> Union[dict, Generator]:
|
||||
"""
|
||||
Generate App response.
|
||||
|
||||
:param app_model: App
|
||||
:param user: account or end user
|
||||
:param args: request args
|
||||
:param invoke_from: invoke from source
|
||||
:param stream: is stream
|
||||
"""
|
||||
if not args.get('query'):
|
||||
raise ValueError('query is required')
|
||||
|
||||
query = args['query']
|
||||
if not isinstance(query, str):
|
||||
raise ValueError('query must be a string')
|
||||
|
||||
query = query.replace('\x00', '')
|
||||
inputs = args['inputs']
|
||||
|
||||
extras = {
|
||||
"auto_generate_conversation_name": args['auto_generate_name'] if 'auto_generate_name' in args else True
|
||||
}
|
||||
|
||||
# get conversation
|
||||
conversation = None
|
||||
if args.get('conversation_id'):
|
||||
conversation = self._get_conversation_by_user(app_model, args.get('conversation_id'), user)
|
||||
|
||||
# get workflow
|
||||
workflow_engine_manager = WorkflowEngineManager()
|
||||
if invoke_from == InvokeFrom.DEBUGGER:
|
||||
workflow = workflow_engine_manager.get_draft_workflow(app_model=app_model)
|
||||
else:
|
||||
workflow = workflow_engine_manager.get_published_workflow(app_model=app_model)
|
||||
|
||||
if not workflow:
|
||||
raise ValueError('Workflow not initialized')
|
||||
|
||||
# parse files
|
||||
files = args['files'] if 'files' in args and args['files'] else []
|
||||
message_file_parser = MessageFileParser(tenant_id=app_model.tenant_id, app_id=app_model.id)
|
||||
file_upload_entity = FileUploadConfigManager.convert(workflow.features_dict)
|
||||
if file_upload_entity:
|
||||
file_objs = message_file_parser.validate_and_transform_files_arg(
|
||||
files,
|
||||
file_upload_entity,
|
||||
user
|
||||
)
|
||||
else:
|
||||
file_objs = []
|
||||
|
||||
# convert to app config
|
||||
app_config = AdvancedChatAppConfigManager.get_app_config(
|
||||
app_model=app_model,
|
||||
workflow=workflow
|
||||
)
|
||||
|
||||
# init application generate entity
|
||||
application_generate_entity = AdvancedChatAppGenerateEntity(
|
||||
task_id=str(uuid.uuid4()),
|
||||
app_config=app_config,
|
||||
conversation_id=conversation.id if conversation else None,
|
||||
inputs=conversation.inputs if conversation else self._get_cleaned_inputs(inputs, app_config),
|
||||
query=query,
|
||||
files=file_objs,
|
||||
user_id=user.id,
|
||||
stream=stream,
|
||||
invoke_from=invoke_from,
|
||||
extras=extras
|
||||
)
|
||||
|
||||
# init generate records
|
||||
(
|
||||
conversation,
|
||||
message
|
||||
) = self._init_generate_records(application_generate_entity, conversation)
|
||||
|
||||
# init queue manager
|
||||
queue_manager = AppQueueManager(
|
||||
task_id=application_generate_entity.task_id,
|
||||
user_id=application_generate_entity.user_id,
|
||||
invoke_from=application_generate_entity.invoke_from,
|
||||
conversation_id=conversation.id,
|
||||
app_mode=conversation.mode,
|
||||
message_id=message.id
|
||||
)
|
||||
|
||||
# new thread
|
||||
worker_thread = threading.Thread(target=self._generate_worker, kwargs={
|
||||
'flask_app': current_app._get_current_object(),
|
||||
'application_generate_entity': application_generate_entity,
|
||||
'queue_manager': queue_manager,
|
||||
'conversation_id': conversation.id,
|
||||
'message_id': message.id,
|
||||
})
|
||||
|
||||
worker_thread.start()
|
||||
|
||||
# return response or stream generator
|
||||
return self._handle_response(
|
||||
application_generate_entity=application_generate_entity,
|
||||
queue_manager=queue_manager,
|
||||
conversation=conversation,
|
||||
message=message,
|
||||
stream=stream
|
||||
)
|
||||
|
||||
def _generate_worker(self, flask_app: Flask,
|
||||
application_generate_entity: AdvancedChatAppGenerateEntity,
|
||||
queue_manager: AppQueueManager,
|
||||
conversation_id: str,
|
||||
message_id: str) -> None:
|
||||
"""
|
||||
Generate worker in a new thread.
|
||||
:param flask_app: Flask app
|
||||
:param application_generate_entity: application generate entity
|
||||
:param queue_manager: queue manager
|
||||
:param conversation_id: conversation ID
|
||||
:param message_id: message ID
|
||||
:return:
|
||||
"""
|
||||
with flask_app.app_context():
|
||||
try:
|
||||
# get conversation and message
|
||||
conversation = self._get_conversation(conversation_id)
|
||||
message = self._get_message(message_id)
|
||||
|
||||
# chatbot app
|
||||
runner = AdvancedChatAppRunner()
|
||||
runner.run(
|
||||
application_generate_entity=application_generate_entity,
|
||||
queue_manager=queue_manager,
|
||||
conversation=conversation,
|
||||
message=message
|
||||
)
|
||||
except ConversationTaskStoppedException:
|
||||
pass
|
||||
except InvokeAuthorizationError:
|
||||
queue_manager.publish_error(
|
||||
InvokeAuthorizationError('Incorrect API key provided'),
|
||||
PublishFrom.APPLICATION_MANAGER
|
||||
)
|
||||
except ValidationError as e:
|
||||
logger.exception("Validation Error when generating")
|
||||
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)
|
||||
except (ValueError, InvokeError) as e:
|
||||
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)
|
||||
except Exception as e:
|
||||
logger.exception("Unknown Error when generating")
|
||||
queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER)
|
||||
finally:
|
||||
db.session.remove()
|
||||
|
||||
def _handle_response(self, application_generate_entity: AdvancedChatAppGenerateEntity,
|
||||
queue_manager: AppQueueManager,
|
||||
conversation: Conversation,
|
||||
message: Message,
|
||||
stream: bool = False) -> Union[dict, Generator]:
|
||||
"""
|
||||
Handle response.
|
||||
:param application_generate_entity: application generate entity
|
||||
:param queue_manager: queue manager
|
||||
:param conversation: conversation
|
||||
:param message: message
|
||||
:param stream: is stream
|
||||
:return:
|
||||
"""
|
||||
# init generate task pipeline
|
||||
generate_task_pipeline = AdvancedChatAppGenerateTaskPipeline(
|
||||
application_generate_entity=application_generate_entity,
|
||||
queue_manager=queue_manager,
|
||||
conversation=conversation,
|
||||
message=message
|
||||
)
|
||||
|
||||
try:
|
||||
return generate_task_pipeline.process(stream=stream)
|
||||
except ValueError as e:
|
||||
if e.args[0] == "I/O operation on closed file.": # ignore this error
|
||||
raise ConversationTaskStoppedException()
|
||||
else:
|
||||
logger.exception(e)
|
||||
raise e
|
||||
finally:
|
||||
db.session.remove()
|
||||
103
api/core/app/apps/advanced_chat/app_runner.py
Normal file
103
api/core/app/apps/advanced_chat/app_runner.py
Normal file
@@ -0,0 +1,103 @@
|
||||
import logging
|
||||
from typing import cast
|
||||
|
||||
from core.app.app_queue_manager import AppQueueManager, PublishFrom
|
||||
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig
|
||||
from core.app.apps.base_app_runner import AppRunner
|
||||
from core.app.entities.app_invoke_entities import (
|
||||
AdvancedChatAppGenerateEntity,
|
||||
)
|
||||
from core.moderation.base import ModerationException
|
||||
from extensions.ext_database import db
|
||||
from models.model import App, Conversation, Message
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AdvancedChatAppRunner(AppRunner):
|
||||
"""
|
||||
AdvancedChat Application Runner
|
||||
"""
|
||||
|
||||
def run(self, application_generate_entity: AdvancedChatAppGenerateEntity,
|
||||
queue_manager: AppQueueManager,
|
||||
conversation: Conversation,
|
||||
message: Message) -> None:
|
||||
"""
|
||||
Run application
|
||||
:param application_generate_entity: application generate entity
|
||||
:param queue_manager: application queue manager
|
||||
:param conversation: conversation
|
||||
:param message: message
|
||||
:return:
|
||||
"""
|
||||
app_config = application_generate_entity.app_config
|
||||
app_config = cast(AdvancedChatAppConfig, app_config)
|
||||
|
||||
app_record = db.session.query(App).filter(App.id == app_config.app_id).first()
|
||||
if not app_record:
|
||||
raise ValueError("App not found")
|
||||
|
||||
inputs = application_generate_entity.inputs
|
||||
query = application_generate_entity.query
|
||||
files = application_generate_entity.files
|
||||
|
||||
# moderation
|
||||
try:
|
||||
# process sensitive_word_avoidance
|
||||
_, inputs, query = self.moderation_for_inputs(
|
||||
app_id=app_record.id,
|
||||
tenant_id=app_config.tenant_id,
|
||||
app_generate_entity=application_generate_entity,
|
||||
inputs=inputs,
|
||||
query=query,
|
||||
)
|
||||
except ModerationException as e:
|
||||
# TODO
|
||||
self.direct_output(
|
||||
queue_manager=queue_manager,
|
||||
app_generate_entity=application_generate_entity,
|
||||
prompt_messages=prompt_messages,
|
||||
text=str(e),
|
||||
stream=application_generate_entity.stream
|
||||
)
|
||||
return
|
||||
|
||||
if query:
|
||||
# annotation reply
|
||||
annotation_reply = self.query_app_annotations_to_reply(
|
||||
app_record=app_record,
|
||||
message=message,
|
||||
query=query,
|
||||
user_id=application_generate_entity.user_id,
|
||||
invoke_from=application_generate_entity.invoke_from
|
||||
)
|
||||
|
||||
if annotation_reply:
|
||||
queue_manager.publish_annotation_reply(
|
||||
message_annotation_id=annotation_reply.id,
|
||||
pub_from=PublishFrom.APPLICATION_MANAGER
|
||||
)
|
||||
|
||||
# TODO
|
||||
self.direct_output(
|
||||
queue_manager=queue_manager,
|
||||
app_generate_entity=application_generate_entity,
|
||||
prompt_messages=prompt_messages,
|
||||
text=annotation_reply.content,
|
||||
stream=application_generate_entity.stream
|
||||
)
|
||||
return
|
||||
|
||||
# check hosting moderation
|
||||
# TODO
|
||||
hosting_moderation_result = self.check_hosting_moderation(
|
||||
application_generate_entity=application_generate_entity,
|
||||
queue_manager=queue_manager,
|
||||
prompt_messages=prompt_messages
|
||||
)
|
||||
|
||||
if hosting_moderation_result:
|
||||
return
|
||||
|
||||
# todo RUN WORKFLOW
|
||||
@@ -187,7 +187,7 @@ class AppRunner:
|
||||
if stream:
|
||||
index = 0
|
||||
for token in text:
|
||||
queue_manager.publish_chunk_message(LLMResultChunk(
|
||||
queue_manager.publish_llm_chunk(LLMResultChunk(
|
||||
model=app_generate_entity.model_config.model,
|
||||
prompt_messages=prompt_messages,
|
||||
delta=LLMResultChunkDelta(
|
||||
@@ -261,7 +261,7 @@ class AppRunner:
|
||||
usage = None
|
||||
for result in invoke_result:
|
||||
if not agent:
|
||||
queue_manager.publish_chunk_message(result, PublishFrom.APPLICATION_MANAGER)
|
||||
queue_manager.publish_llm_chunk(result, PublishFrom.APPLICATION_MANAGER)
|
||||
else:
|
||||
queue_manager.publish_agent_chunk_message(result, PublishFrom.APPLICATION_MANAGER)
|
||||
|
||||
|
||||
@@ -8,14 +8,15 @@ from sqlalchemy import and_
|
||||
from core.app.app_config.entities import EasyUIBasedAppModelConfigFrom
|
||||
from core.app.app_queue_manager import AppQueueManager, ConversationTaskStoppedException
|
||||
from core.app.apps.base_app_generator import BaseAppGenerator
|
||||
from core.app.apps.easy_ui_based_generate_task_pipeline import EasyUIBasedGenerateTaskPipeline
|
||||
from core.app.entities.app_invoke_entities import (
|
||||
AdvancedChatAppGenerateEntity,
|
||||
AgentChatAppGenerateEntity,
|
||||
AppGenerateEntity,
|
||||
ChatAppGenerateEntity,
|
||||
CompletionAppGenerateEntity,
|
||||
InvokeFrom,
|
||||
)
|
||||
from core.app.generate_task_pipeline import GenerateTaskPipeline
|
||||
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
|
||||
from extensions.ext_database import db
|
||||
from models.account import Account
|
||||
@@ -31,7 +32,8 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
def _handle_response(self, application_generate_entity: Union[
|
||||
ChatAppGenerateEntity,
|
||||
CompletionAppGenerateEntity,
|
||||
AgentChatAppGenerateEntity
|
||||
AgentChatAppGenerateEntity,
|
||||
AdvancedChatAppGenerateEntity
|
||||
],
|
||||
queue_manager: AppQueueManager,
|
||||
conversation: Conversation,
|
||||
@@ -47,7 +49,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
:return:
|
||||
"""
|
||||
# init generate task pipeline
|
||||
generate_task_pipeline = GenerateTaskPipeline(
|
||||
generate_task_pipeline = EasyUIBasedGenerateTaskPipeline(
|
||||
application_generate_entity=application_generate_entity,
|
||||
queue_manager=queue_manager,
|
||||
conversation=conversation,
|
||||
@@ -114,7 +116,8 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
application_generate_entity: Union[
|
||||
ChatAppGenerateEntity,
|
||||
CompletionAppGenerateEntity,
|
||||
AgentChatAppGenerateEntity
|
||||
AgentChatAppGenerateEntity,
|
||||
AdvancedChatAppGenerateEntity
|
||||
],
|
||||
conversation: Optional[Conversation] = None) \
|
||||
-> tuple[Conversation, Message]:
|
||||
@@ -135,10 +138,19 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
from_source = 'console'
|
||||
account_id = application_generate_entity.user_id
|
||||
|
||||
override_model_configs = None
|
||||
if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS \
|
||||
and app_config.app_mode in [AppMode.AGENT_CHAT, AppMode.CHAT, AppMode.COMPLETION]:
|
||||
override_model_configs = app_config.app_model_config_dict
|
||||
if isinstance(application_generate_entity, AdvancedChatAppGenerateEntity):
|
||||
app_model_config_id = None
|
||||
override_model_configs = None
|
||||
model_provider = None
|
||||
model_id = None
|
||||
else:
|
||||
app_model_config_id = app_config.app_model_config_id
|
||||
model_provider = application_generate_entity.model_config.provider
|
||||
model_id = application_generate_entity.model_config.model
|
||||
override_model_configs = None
|
||||
if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS \
|
||||
and app_config.app_mode in [AppMode.AGENT_CHAT, AppMode.CHAT, AppMode.COMPLETION]:
|
||||
override_model_configs = app_config.app_model_config_dict
|
||||
|
||||
# get conversation introduction
|
||||
introduction = self._get_conversation_introduction(application_generate_entity)
|
||||
@@ -146,9 +158,9 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
if not conversation:
|
||||
conversation = Conversation(
|
||||
app_id=app_config.app_id,
|
||||
app_model_config_id=app_config.app_model_config_id,
|
||||
model_provider=application_generate_entity.model_config.provider,
|
||||
model_id=application_generate_entity.model_config.model,
|
||||
app_model_config_id=app_model_config_id,
|
||||
model_provider=model_provider,
|
||||
model_id=model_id,
|
||||
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
|
||||
mode=app_config.app_mode.value,
|
||||
name='New conversation',
|
||||
@@ -167,8 +179,8 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
|
||||
message = Message(
|
||||
app_id=app_config.app_id,
|
||||
model_provider=application_generate_entity.model_config.provider,
|
||||
model_id=application_generate_entity.model_config.model,
|
||||
model_provider=model_provider,
|
||||
model_id=model_id,
|
||||
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
|
||||
conversation_id=conversation.id,
|
||||
inputs=application_generate_entity.inputs,
|
||||
|
||||
@@ -10,14 +10,19 @@ class QueueEvent(Enum):
|
||||
"""
|
||||
QueueEvent enum
|
||||
"""
|
||||
MESSAGE = "message"
|
||||
LLM_CHUNK = "llm_chunk"
|
||||
TEXT_CHUNK = "text_chunk"
|
||||
AGENT_MESSAGE = "agent_message"
|
||||
MESSAGE_REPLACE = "message-replace"
|
||||
MESSAGE_END = "message-end"
|
||||
RETRIEVER_RESOURCES = "retriever-resources"
|
||||
ANNOTATION_REPLY = "annotation-reply"
|
||||
AGENT_THOUGHT = "agent-thought"
|
||||
MESSAGE_FILE = "message-file"
|
||||
MESSAGE_REPLACE = "message_replace"
|
||||
MESSAGE_END = "message_end"
|
||||
WORKFLOW_STARTED = "workflow_started"
|
||||
WORKFLOW_FINISHED = "workflow_finished"
|
||||
NODE_STARTED = "node_started"
|
||||
NODE_FINISHED = "node_finished"
|
||||
RETRIEVER_RESOURCES = "retriever_resources"
|
||||
ANNOTATION_REPLY = "annotation_reply"
|
||||
AGENT_THOUGHT = "agent_thought"
|
||||
MESSAGE_FILE = "message_file"
|
||||
ERROR = "error"
|
||||
PING = "ping"
|
||||
STOP = "stop"
|
||||
@@ -30,13 +35,22 @@ class AppQueueEvent(BaseModel):
|
||||
event: QueueEvent
|
||||
|
||||
|
||||
class QueueMessageEvent(AppQueueEvent):
|
||||
class QueueLLMChunkEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueMessageEvent entity
|
||||
QueueLLMChunkEvent entity
|
||||
"""
|
||||
event = QueueEvent.MESSAGE
|
||||
event = QueueEvent.LLM_CHUNK
|
||||
chunk: LLMResultChunk
|
||||
|
||||
|
||||
class QueueTextChunkEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueTextChunkEvent entity
|
||||
"""
|
||||
event = QueueEvent.TEXT_CHUNK
|
||||
chunk_text: str
|
||||
|
||||
|
||||
class QueueAgentMessageEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueMessageEvent entity
|
||||
@@ -61,9 +75,9 @@ class QueueRetrieverResourcesEvent(AppQueueEvent):
|
||||
retriever_resources: list[dict]
|
||||
|
||||
|
||||
class AnnotationReplyEvent(AppQueueEvent):
|
||||
class QueueAnnotationReplyEvent(AppQueueEvent):
|
||||
"""
|
||||
AnnotationReplyEvent entity
|
||||
QueueAnnotationReplyEvent entity
|
||||
"""
|
||||
event = QueueEvent.ANNOTATION_REPLY
|
||||
message_annotation_id: str
|
||||
@@ -76,6 +90,38 @@ class QueueMessageEndEvent(AppQueueEvent):
|
||||
event = QueueEvent.MESSAGE_END
|
||||
llm_result: LLMResult
|
||||
|
||||
|
||||
class QueueWorkflowStartedEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueWorkflowStartedEvent entity
|
||||
"""
|
||||
event = QueueEvent.WORKFLOW_STARTED
|
||||
workflow_run_id: str
|
||||
|
||||
|
||||
class QueueWorkflowFinishedEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueWorkflowFinishedEvent entity
|
||||
"""
|
||||
event = QueueEvent.WORKFLOW_FINISHED
|
||||
workflow_run_id: str
|
||||
|
||||
|
||||
class QueueNodeStartedEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueNodeStartedEvent entity
|
||||
"""
|
||||
event = QueueEvent.NODE_STARTED
|
||||
workflow_node_execution_id: str
|
||||
|
||||
|
||||
class QueueNodeFinishedEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueNodeFinishedEvent entity
|
||||
"""
|
||||
event = QueueEvent.NODE_FINISHED
|
||||
workflow_node_execution_id: str
|
||||
|
||||
|
||||
class QueueAgentThoughtEvent(AppQueueEvent):
|
||||
"""
|
||||
@@ -84,13 +130,15 @@ class QueueAgentThoughtEvent(AppQueueEvent):
|
||||
event = QueueEvent.AGENT_THOUGHT
|
||||
agent_thought_id: str
|
||||
|
||||
|
||||
class QueueMessageFileEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueAgentThoughtEvent entity
|
||||
"""
|
||||
event = QueueEvent.MESSAGE_FILE
|
||||
message_file_id: str
|
||||
|
||||
|
||||
|
||||
class QueueErrorEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueErrorEvent entity
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
from typing import Optional
|
||||
|
||||
from extensions.ext_database import db
|
||||
from models.model import App
|
||||
from models.workflow import Workflow
|
||||
|
||||
|
||||
class WorkflowEngineManager:
|
||||
def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
|
||||
"""
|
||||
Get draft workflow
|
||||
"""
|
||||
# fetch draft workflow by app_model
|
||||
workflow = db.session.query(Workflow).filter(
|
||||
Workflow.tenant_id == app_model.tenant_id,
|
||||
Workflow.app_id == app_model.id,
|
||||
Workflow.version == 'draft'
|
||||
).first()
|
||||
|
||||
# return draft workflow
|
||||
return workflow
|
||||
|
||||
def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
|
||||
"""
|
||||
Get published workflow
|
||||
"""
|
||||
if not app_model.workflow_id:
|
||||
return None
|
||||
|
||||
# fetch published workflow by workflow_id
|
||||
workflow = db.session.query(Workflow).filter(
|
||||
Workflow.tenant_id == app_model.tenant_id,
|
||||
Workflow.app_id == app_model.id,
|
||||
Workflow.id == app_model.workflow_id
|
||||
).first()
|
||||
|
||||
# return published workflow
|
||||
return workflow
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from core.app.entities.app_invoke_entities import ChatAppGenerateEntity
|
||||
from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity, ChatAppGenerateEntity
|
||||
from core.entities.provider_entities import QuotaUnit
|
||||
from events.message_event import message_was_created
|
||||
from extensions.ext_database import db
|
||||
@@ -8,7 +8,10 @@ from models.provider import Provider, ProviderType
|
||||
@message_was_created.connect
|
||||
def handle(sender, **kwargs):
|
||||
message = sender
|
||||
application_generate_entity: ChatAppGenerateEntity = kwargs.get('application_generate_entity')
|
||||
application_generate_entity = kwargs.get('application_generate_entity')
|
||||
|
||||
if not isinstance(application_generate_entity, ChatAppGenerateEntity | AgentChatAppGenerateEntity):
|
||||
return
|
||||
|
||||
model_config = application_generate_entity.model_config
|
||||
provider_model_bundle = model_config.provider_model_bundle
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from core.llm_generator.llm_generator import LLMGenerator
|
||||
from events.message_event import message_was_created
|
||||
from extensions.ext_database import db
|
||||
from models.model import AppMode
|
||||
|
||||
|
||||
@message_was_created.connect
|
||||
@@ -15,7 +16,7 @@ def handle(sender, **kwargs):
|
||||
auto_generate_conversation_name = extras.get('auto_generate_conversation_name', True)
|
||||
|
||||
if auto_generate_conversation_name and is_first_message:
|
||||
if conversation.mode == 'chat':
|
||||
if conversation.mode != AppMode.COMPLETION.value:
|
||||
app_model = conversation.app
|
||||
if not app_model:
|
||||
return
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from datetime import datetime
|
||||
|
||||
from core.app.entities.app_invoke_entities import ChatAppGenerateEntity
|
||||
from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity, ChatAppGenerateEntity
|
||||
from events.message_event import message_was_created
|
||||
from extensions.ext_database import db
|
||||
from models.provider import Provider
|
||||
@@ -9,7 +9,10 @@ from models.provider import Provider
|
||||
@message_was_created.connect
|
||||
def handle(sender, **kwargs):
|
||||
message = sender
|
||||
application_generate_entity: ChatAppGenerateEntity = kwargs.get('application_generate_entity')
|
||||
application_generate_entity = kwargs.get('application_generate_entity')
|
||||
|
||||
if not isinstance(application_generate_entity, ChatAppGenerateEntity | AgentChatAppGenerateEntity):
|
||||
return
|
||||
|
||||
db.session.query(Provider).filter(
|
||||
Provider.tenant_id == application_generate_entity.app_config.tenant_id,
|
||||
|
||||
@@ -451,10 +451,10 @@ class Conversation(db.Model):
|
||||
|
||||
id = db.Column(UUID, server_default=db.text('uuid_generate_v4()'))
|
||||
app_id = db.Column(UUID, nullable=False)
|
||||
app_model_config_id = db.Column(UUID, nullable=False)
|
||||
model_provider = db.Column(db.String(255), nullable=False)
|
||||
app_model_config_id = db.Column(UUID, nullable=True)
|
||||
model_provider = db.Column(db.String(255), nullable=True)
|
||||
override_model_configs = db.Column(db.Text)
|
||||
model_id = db.Column(db.String(255), nullable=False)
|
||||
model_id = db.Column(db.String(255), nullable=True)
|
||||
mode = db.Column(db.String(255), nullable=False)
|
||||
name = db.Column(db.String(255), nullable=False)
|
||||
summary = db.Column(db.Text)
|
||||
|
||||
@@ -272,6 +272,10 @@ class WorkflowRun(db.Model):
|
||||
return EndUser.query.get(self.created_by) \
|
||||
if created_by_role == CreatedByRole.END_USER else None
|
||||
|
||||
@property
|
||||
def outputs_dict(self):
|
||||
return self.outputs if not self.outputs else json.loads(self.outputs)
|
||||
|
||||
|
||||
class WorkflowNodeExecutionTriggeredFrom(Enum):
|
||||
"""
|
||||
@@ -294,6 +298,28 @@ class WorkflowNodeExecutionTriggeredFrom(Enum):
|
||||
raise ValueError(f'invalid workflow node execution triggered from value {value}')
|
||||
|
||||
|
||||
class WorkflowNodeExecutionStatus(Enum):
|
||||
"""
|
||||
Workflow Node Execution Status Enum
|
||||
"""
|
||||
RUNNING = 'running'
|
||||
SUCCEEDED = 'succeeded'
|
||||
FAILED = 'failed'
|
||||
|
||||
@classmethod
|
||||
def value_of(cls, value: str) -> 'WorkflowNodeExecutionStatus':
|
||||
"""
|
||||
Get value of given mode.
|
||||
|
||||
:param value: mode value
|
||||
:return: mode
|
||||
"""
|
||||
for mode in cls:
|
||||
if mode.value == value:
|
||||
return mode
|
||||
raise ValueError(f'invalid workflow node execution status value {value}')
|
||||
|
||||
|
||||
class WorkflowNodeExecution(db.Model):
|
||||
"""
|
||||
Workflow Node Execution
|
||||
@@ -387,6 +413,21 @@ class WorkflowNodeExecution(db.Model):
|
||||
return EndUser.query.get(self.created_by) \
|
||||
if created_by_role == CreatedByRole.END_USER else None
|
||||
|
||||
@property
|
||||
def inputs_dict(self):
|
||||
return self.inputs if not self.inputs else json.loads(self.inputs)
|
||||
|
||||
@property
|
||||
def outputs_dict(self):
|
||||
return self.outputs if not self.outputs else json.loads(self.outputs)
|
||||
|
||||
@property
|
||||
def process_data_dict(self):
|
||||
return self.process_data if not self.process_data else json.loads(self.process_data)
|
||||
|
||||
@property
|
||||
def execution_metadata_dict(self):
|
||||
return self.execution_metadata if not self.execution_metadata else json.loads(self.execution_metadata)
|
||||
|
||||
class WorkflowAppLog(db.Model):
|
||||
"""
|
||||
|
||||
@@ -4,6 +4,7 @@ from typing import Optional
|
||||
|
||||
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
|
||||
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
|
||||
from core.workflow.workflow_engine_manager import WorkflowEngineManager
|
||||
from extensions.ext_database import db
|
||||
from models.account import Account
|
||||
from models.model import App, AppMode
|
||||
@@ -21,15 +22,10 @@ class WorkflowService:
|
||||
"""
|
||||
Get draft workflow
|
||||
"""
|
||||
# fetch draft workflow by app_model
|
||||
workflow = db.session.query(Workflow).filter(
|
||||
Workflow.tenant_id == app_model.tenant_id,
|
||||
Workflow.app_id == app_model.id,
|
||||
Workflow.version == 'draft'
|
||||
).first()
|
||||
workflow_engine_manager = WorkflowEngineManager()
|
||||
|
||||
# return draft workflow
|
||||
return workflow
|
||||
return workflow_engine_manager.get_draft_workflow(app_model=app_model)
|
||||
|
||||
def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
|
||||
"""
|
||||
@@ -38,15 +34,10 @@ class WorkflowService:
|
||||
if not app_model.workflow_id:
|
||||
return None
|
||||
|
||||
# fetch published workflow by workflow_id
|
||||
workflow = db.session.query(Workflow).filter(
|
||||
Workflow.tenant_id == app_model.tenant_id,
|
||||
Workflow.app_id == app_model.id,
|
||||
Workflow.id == app_model.workflow_id
|
||||
).first()
|
||||
workflow_engine_manager = WorkflowEngineManager()
|
||||
|
||||
# return published workflow
|
||||
return workflow
|
||||
return workflow_engine_manager.get_published_workflow(app_model=app_model)
|
||||
|
||||
def sync_draft_workflow(self, app_model: App,
|
||||
graph: dict,
|
||||
|
||||
Reference in New Issue
Block a user