diff --git a/api/core/app/app_queue_manager.py b/api/core/app/app_queue_manager.py index 4bd491269c..5655c8d979 100644 --- a/api/core/app/app_queue_manager.py +++ b/api/core/app/app_queue_manager.py @@ -8,19 +8,24 @@ from sqlalchemy.orm import DeclarativeMeta from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.queue_entities import ( - AnnotationReplyEvent, AppQueueEvent, QueueAgentMessageEvent, QueueAgentThoughtEvent, + QueueAnnotationReplyEvent, QueueErrorEvent, + QueueLLMChunkEvent, QueueMessage, QueueMessageEndEvent, - QueueMessageEvent, QueueMessageFileEvent, QueueMessageReplaceEvent, + QueueNodeFinishedEvent, + QueueNodeStartedEvent, QueuePingEvent, QueueRetrieverResourcesEvent, QueueStopEvent, + QueueTextChunkEvent, + QueueWorkflowFinishedEvent, + QueueWorkflowStartedEvent, ) from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk from extensions.ext_redis import redis_client @@ -97,18 +102,30 @@ class AppQueueManager: """ self._q.put(None) - def publish_chunk_message(self, chunk: LLMResultChunk, pub_from: PublishFrom) -> None: + def publish_llm_chunk(self, chunk: LLMResultChunk, pub_from: PublishFrom) -> None: """ - Publish chunk message to channel + Publish llm chunk to channel - :param chunk: chunk + :param chunk: llm chunk :param pub_from: publish from :return: """ - self.publish(QueueMessageEvent( + self.publish(QueueLLMChunkEvent( chunk=chunk ), pub_from) + def publish_text_chunk(self, text: str, pub_from: PublishFrom) -> None: + """ + Publish text chunk to channel + + :param text: text + :param pub_from: publish from + :return: + """ + self.publish(QueueTextChunkEvent( + text=text + ), pub_from) + def publish_agent_chunk_message(self, chunk: LLMResultChunk, pub_from: PublishFrom) -> None: """ Publish agent chunk message to channel @@ -146,7 +163,7 @@ class AppQueueManager: :param pub_from: publish from :return: """ - self.publish(AnnotationReplyEvent(message_annotation_id=message_annotation_id), pub_from) + self.publish(QueueAnnotationReplyEvent(message_annotation_id=message_annotation_id), pub_from) def publish_message_end(self, llm_result: LLMResult, pub_from: PublishFrom) -> None: """ @@ -158,6 +175,42 @@ class AppQueueManager: self.publish(QueueMessageEndEvent(llm_result=llm_result), pub_from) self.stop_listen() + def publish_workflow_started(self, workflow_run_id: str, pub_from: PublishFrom) -> None: + """ + Publish workflow started + :param workflow_run_id: workflow run id + :param pub_from: publish from + :return: + """ + self.publish(QueueWorkflowStartedEvent(workflow_run_id=workflow_run_id), pub_from) + + def publish_workflow_finished(self, workflow_run_id: str, pub_from: PublishFrom) -> None: + """ + Publish workflow finished + :param workflow_run_id: workflow run id + :param pub_from: publish from + :return: + """ + self.publish(QueueWorkflowFinishedEvent(workflow_run_id=workflow_run_id), pub_from) + + def publish_node_started(self, workflow_node_execution_id: str, pub_from: PublishFrom) -> None: + """ + Publish node started + :param workflow_node_execution_id: workflow node execution id + :param pub_from: publish from + :return: + """ + self.publish(QueueNodeStartedEvent(workflow_node_execution_id=workflow_node_execution_id), pub_from) + + def publish_node_finished(self, workflow_node_execution_id: str, pub_from: PublishFrom) -> None: + """ + Publish node finished + :param workflow_node_execution_id: workflow node execution id + :param pub_from: publish from + :return: + """ + self.publish(QueueNodeFinishedEvent(workflow_node_execution_id=workflow_node_execution_id), pub_from) + def publish_agent_thought(self, message_agent_thought: MessageAgentThought, pub_from: PublishFrom) -> None: """ Publish agent thought diff --git a/api/core/app/apps/advanced_chat/app_config_manager.py b/api/core/app/apps/advanced_chat/app_config_manager.py index 72ba4c33d4..3ac26ebe80 100644 --- a/api/core/app/apps/advanced_chat/app_config_manager.py +++ b/api/core/app/apps/advanced_chat/app_config_manager.py @@ -1,4 +1,3 @@ -from typing import Optional from core.app.app_config.base_app_config_manager import BaseAppConfigManager from core.app.app_config.common.sensitive_word_avoidance.manager import SensitiveWordAvoidanceConfigManager @@ -12,7 +11,7 @@ from core.app.app_config.features.suggested_questions_after_answer.manager impor ) from core.app.app_config.features.text_to_speech.manager import TextToSpeechConfigManager from core.app.app_config.workflow_ui_based_app.variables.manager import WorkflowVariablesConfigManager -from models.model import App, AppMode, Conversation +from models.model import App, AppMode from models.workflow import Workflow @@ -26,8 +25,7 @@ class AdvancedChatAppConfig(WorkflowUIBasedAppConfig): class AdvancedChatAppConfigManager(BaseAppConfigManager): @classmethod def get_app_config(cls, app_model: App, - workflow: Workflow, - conversation: Optional[Conversation] = None) -> AdvancedChatAppConfig: + workflow: Workflow) -> AdvancedChatAppConfig: features_dict = workflow.features_dict app_config = AdvancedChatAppConfig( diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py new file mode 100644 index 0000000000..ca2f400547 --- /dev/null +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -0,0 +1,218 @@ +import logging +import threading +import uuid +from collections.abc import Generator +from typing import Any, Union + +from flask import Flask, current_app +from pydantic import ValidationError + +from core.app.app_config.features.file_upload.manager import FileUploadConfigManager +from core.app.app_queue_manager import AppQueueManager, ConversationTaskStoppedException, PublishFrom +from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager +from core.app.apps.advanced_chat.app_runner import AdvancedChatAppRunner +from core.app.apps.advanced_chat.generate_task_pipeline import AdvancedChatAppGenerateTaskPipeline +from core.app.apps.message_based_app_generator import MessageBasedAppGenerator +from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom +from core.file.message_file_parser import MessageFileParser +from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError +from core.workflow.workflow_engine_manager import WorkflowEngineManager +from extensions.ext_database import db +from models.account import Account +from models.model import App, Conversation, EndUser, Message + +logger = logging.getLogger(__name__) + + +class AdvancedChatAppGenerator(MessageBasedAppGenerator): + def generate(self, app_model: App, + user: Union[Account, EndUser], + args: Any, + invoke_from: InvokeFrom, + stream: bool = True) \ + -> Union[dict, Generator]: + """ + Generate App response. + + :param app_model: App + :param user: account or end user + :param args: request args + :param invoke_from: invoke from source + :param stream: is stream + """ + if not args.get('query'): + raise ValueError('query is required') + + query = args['query'] + if not isinstance(query, str): + raise ValueError('query must be a string') + + query = query.replace('\x00', '') + inputs = args['inputs'] + + extras = { + "auto_generate_conversation_name": args['auto_generate_name'] if 'auto_generate_name' in args else True + } + + # get conversation + conversation = None + if args.get('conversation_id'): + conversation = self._get_conversation_by_user(app_model, args.get('conversation_id'), user) + + # get workflow + workflow_engine_manager = WorkflowEngineManager() + if invoke_from == InvokeFrom.DEBUGGER: + workflow = workflow_engine_manager.get_draft_workflow(app_model=app_model) + else: + workflow = workflow_engine_manager.get_published_workflow(app_model=app_model) + + if not workflow: + raise ValueError('Workflow not initialized') + + # parse files + files = args['files'] if 'files' in args and args['files'] else [] + message_file_parser = MessageFileParser(tenant_id=app_model.tenant_id, app_id=app_model.id) + file_upload_entity = FileUploadConfigManager.convert(workflow.features_dict) + if file_upload_entity: + file_objs = message_file_parser.validate_and_transform_files_arg( + files, + file_upload_entity, + user + ) + else: + file_objs = [] + + # convert to app config + app_config = AdvancedChatAppConfigManager.get_app_config( + app_model=app_model, + workflow=workflow + ) + + # init application generate entity + application_generate_entity = AdvancedChatAppGenerateEntity( + task_id=str(uuid.uuid4()), + app_config=app_config, + conversation_id=conversation.id if conversation else None, + inputs=conversation.inputs if conversation else self._get_cleaned_inputs(inputs, app_config), + query=query, + files=file_objs, + user_id=user.id, + stream=stream, + invoke_from=invoke_from, + extras=extras + ) + + # init generate records + ( + conversation, + message + ) = self._init_generate_records(application_generate_entity, conversation) + + # init queue manager + queue_manager = AppQueueManager( + task_id=application_generate_entity.task_id, + user_id=application_generate_entity.user_id, + invoke_from=application_generate_entity.invoke_from, + conversation_id=conversation.id, + app_mode=conversation.mode, + message_id=message.id + ) + + # new thread + worker_thread = threading.Thread(target=self._generate_worker, kwargs={ + 'flask_app': current_app._get_current_object(), + 'application_generate_entity': application_generate_entity, + 'queue_manager': queue_manager, + 'conversation_id': conversation.id, + 'message_id': message.id, + }) + + worker_thread.start() + + # return response or stream generator + return self._handle_response( + application_generate_entity=application_generate_entity, + queue_manager=queue_manager, + conversation=conversation, + message=message, + stream=stream + ) + + def _generate_worker(self, flask_app: Flask, + application_generate_entity: AdvancedChatAppGenerateEntity, + queue_manager: AppQueueManager, + conversation_id: str, + message_id: str) -> None: + """ + Generate worker in a new thread. + :param flask_app: Flask app + :param application_generate_entity: application generate entity + :param queue_manager: queue manager + :param conversation_id: conversation ID + :param message_id: message ID + :return: + """ + with flask_app.app_context(): + try: + # get conversation and message + conversation = self._get_conversation(conversation_id) + message = self._get_message(message_id) + + # chatbot app + runner = AdvancedChatAppRunner() + runner.run( + application_generate_entity=application_generate_entity, + queue_manager=queue_manager, + conversation=conversation, + message=message + ) + except ConversationTaskStoppedException: + pass + except InvokeAuthorizationError: + queue_manager.publish_error( + InvokeAuthorizationError('Incorrect API key provided'), + PublishFrom.APPLICATION_MANAGER + ) + except ValidationError as e: + logger.exception("Validation Error when generating") + queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER) + except (ValueError, InvokeError) as e: + queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER) + except Exception as e: + logger.exception("Unknown Error when generating") + queue_manager.publish_error(e, PublishFrom.APPLICATION_MANAGER) + finally: + db.session.remove() + + def _handle_response(self, application_generate_entity: AdvancedChatAppGenerateEntity, + queue_manager: AppQueueManager, + conversation: Conversation, + message: Message, + stream: bool = False) -> Union[dict, Generator]: + """ + Handle response. + :param application_generate_entity: application generate entity + :param queue_manager: queue manager + :param conversation: conversation + :param message: message + :param stream: is stream + :return: + """ + # init generate task pipeline + generate_task_pipeline = AdvancedChatAppGenerateTaskPipeline( + application_generate_entity=application_generate_entity, + queue_manager=queue_manager, + conversation=conversation, + message=message + ) + + try: + return generate_task_pipeline.process(stream=stream) + except ValueError as e: + if e.args[0] == "I/O operation on closed file.": # ignore this error + raise ConversationTaskStoppedException() + else: + logger.exception(e) + raise e + finally: + db.session.remove() diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py new file mode 100644 index 0000000000..0d701ae224 --- /dev/null +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -0,0 +1,103 @@ +import logging +from typing import cast + +from core.app.app_queue_manager import AppQueueManager, PublishFrom +from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig +from core.app.apps.base_app_runner import AppRunner +from core.app.entities.app_invoke_entities import ( + AdvancedChatAppGenerateEntity, +) +from core.moderation.base import ModerationException +from extensions.ext_database import db +from models.model import App, Conversation, Message + +logger = logging.getLogger(__name__) + + +class AdvancedChatAppRunner(AppRunner): + """ + AdvancedChat Application Runner + """ + + def run(self, application_generate_entity: AdvancedChatAppGenerateEntity, + queue_manager: AppQueueManager, + conversation: Conversation, + message: Message) -> None: + """ + Run application + :param application_generate_entity: application generate entity + :param queue_manager: application queue manager + :param conversation: conversation + :param message: message + :return: + """ + app_config = application_generate_entity.app_config + app_config = cast(AdvancedChatAppConfig, app_config) + + app_record = db.session.query(App).filter(App.id == app_config.app_id).first() + if not app_record: + raise ValueError("App not found") + + inputs = application_generate_entity.inputs + query = application_generate_entity.query + files = application_generate_entity.files + + # moderation + try: + # process sensitive_word_avoidance + _, inputs, query = self.moderation_for_inputs( + app_id=app_record.id, + tenant_id=app_config.tenant_id, + app_generate_entity=application_generate_entity, + inputs=inputs, + query=query, + ) + except ModerationException as e: + # TODO + self.direct_output( + queue_manager=queue_manager, + app_generate_entity=application_generate_entity, + prompt_messages=prompt_messages, + text=str(e), + stream=application_generate_entity.stream + ) + return + + if query: + # annotation reply + annotation_reply = self.query_app_annotations_to_reply( + app_record=app_record, + message=message, + query=query, + user_id=application_generate_entity.user_id, + invoke_from=application_generate_entity.invoke_from + ) + + if annotation_reply: + queue_manager.publish_annotation_reply( + message_annotation_id=annotation_reply.id, + pub_from=PublishFrom.APPLICATION_MANAGER + ) + + # TODO + self.direct_output( + queue_manager=queue_manager, + app_generate_entity=application_generate_entity, + prompt_messages=prompt_messages, + text=annotation_reply.content, + stream=application_generate_entity.stream + ) + return + + # check hosting moderation + # TODO + hosting_moderation_result = self.check_hosting_moderation( + application_generate_entity=application_generate_entity, + queue_manager=queue_manager, + prompt_messages=prompt_messages + ) + + if hosting_moderation_result: + return + + # todo RUN WORKFLOW \ No newline at end of file diff --git a/api/core/app/apps/base_app_runner.py b/api/core/app/apps/base_app_runner.py index 8de71d4bfb..4e099c9ae1 100644 --- a/api/core/app/apps/base_app_runner.py +++ b/api/core/app/apps/base_app_runner.py @@ -187,7 +187,7 @@ class AppRunner: if stream: index = 0 for token in text: - queue_manager.publish_chunk_message(LLMResultChunk( + queue_manager.publish_llm_chunk(LLMResultChunk( model=app_generate_entity.model_config.model, prompt_messages=prompt_messages, delta=LLMResultChunkDelta( @@ -261,7 +261,7 @@ class AppRunner: usage = None for result in invoke_result: if not agent: - queue_manager.publish_chunk_message(result, PublishFrom.APPLICATION_MANAGER) + queue_manager.publish_llm_chunk(result, PublishFrom.APPLICATION_MANAGER) else: queue_manager.publish_agent_chunk_message(result, PublishFrom.APPLICATION_MANAGER) diff --git a/api/core/app/apps/message_based_app_generator.py b/api/core/app/apps/message_based_app_generator.py index 2fb609e615..dab72bd6d6 100644 --- a/api/core/app/apps/message_based_app_generator.py +++ b/api/core/app/apps/message_based_app_generator.py @@ -8,14 +8,15 @@ from sqlalchemy import and_ from core.app.app_config.entities import EasyUIBasedAppModelConfigFrom from core.app.app_queue_manager import AppQueueManager, ConversationTaskStoppedException from core.app.apps.base_app_generator import BaseAppGenerator +from core.app.apps.easy_ui_based_generate_task_pipeline import EasyUIBasedGenerateTaskPipeline from core.app.entities.app_invoke_entities import ( + AdvancedChatAppGenerateEntity, AgentChatAppGenerateEntity, AppGenerateEntity, ChatAppGenerateEntity, CompletionAppGenerateEntity, InvokeFrom, ) -from core.app.generate_task_pipeline import GenerateTaskPipeline from core.prompt.utils.prompt_template_parser import PromptTemplateParser from extensions.ext_database import db from models.account import Account @@ -31,7 +32,8 @@ class MessageBasedAppGenerator(BaseAppGenerator): def _handle_response(self, application_generate_entity: Union[ ChatAppGenerateEntity, CompletionAppGenerateEntity, - AgentChatAppGenerateEntity + AgentChatAppGenerateEntity, + AdvancedChatAppGenerateEntity ], queue_manager: AppQueueManager, conversation: Conversation, @@ -47,7 +49,7 @@ class MessageBasedAppGenerator(BaseAppGenerator): :return: """ # init generate task pipeline - generate_task_pipeline = GenerateTaskPipeline( + generate_task_pipeline = EasyUIBasedGenerateTaskPipeline( application_generate_entity=application_generate_entity, queue_manager=queue_manager, conversation=conversation, @@ -114,7 +116,8 @@ class MessageBasedAppGenerator(BaseAppGenerator): application_generate_entity: Union[ ChatAppGenerateEntity, CompletionAppGenerateEntity, - AgentChatAppGenerateEntity + AgentChatAppGenerateEntity, + AdvancedChatAppGenerateEntity ], conversation: Optional[Conversation] = None) \ -> tuple[Conversation, Message]: @@ -135,10 +138,19 @@ class MessageBasedAppGenerator(BaseAppGenerator): from_source = 'console' account_id = application_generate_entity.user_id - override_model_configs = None - if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS \ - and app_config.app_mode in [AppMode.AGENT_CHAT, AppMode.CHAT, AppMode.COMPLETION]: - override_model_configs = app_config.app_model_config_dict + if isinstance(application_generate_entity, AdvancedChatAppGenerateEntity): + app_model_config_id = None + override_model_configs = None + model_provider = None + model_id = None + else: + app_model_config_id = app_config.app_model_config_id + model_provider = application_generate_entity.model_config.provider + model_id = application_generate_entity.model_config.model + override_model_configs = None + if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS \ + and app_config.app_mode in [AppMode.AGENT_CHAT, AppMode.CHAT, AppMode.COMPLETION]: + override_model_configs = app_config.app_model_config_dict # get conversation introduction introduction = self._get_conversation_introduction(application_generate_entity) @@ -146,9 +158,9 @@ class MessageBasedAppGenerator(BaseAppGenerator): if not conversation: conversation = Conversation( app_id=app_config.app_id, - app_model_config_id=app_config.app_model_config_id, - model_provider=application_generate_entity.model_config.provider, - model_id=application_generate_entity.model_config.model, + app_model_config_id=app_model_config_id, + model_provider=model_provider, + model_id=model_id, override_model_configs=json.dumps(override_model_configs) if override_model_configs else None, mode=app_config.app_mode.value, name='New conversation', @@ -167,8 +179,8 @@ class MessageBasedAppGenerator(BaseAppGenerator): message = Message( app_id=app_config.app_id, - model_provider=application_generate_entity.model_config.provider, - model_id=application_generate_entity.model_config.model, + model_provider=model_provider, + model_id=model_id, override_model_configs=json.dumps(override_model_configs) if override_model_configs else None, conversation_id=conversation.id, inputs=application_generate_entity.inputs, diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index c1f8fb7e89..25bdd7d9e3 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -10,14 +10,19 @@ class QueueEvent(Enum): """ QueueEvent enum """ - MESSAGE = "message" + LLM_CHUNK = "llm_chunk" + TEXT_CHUNK = "text_chunk" AGENT_MESSAGE = "agent_message" - MESSAGE_REPLACE = "message-replace" - MESSAGE_END = "message-end" - RETRIEVER_RESOURCES = "retriever-resources" - ANNOTATION_REPLY = "annotation-reply" - AGENT_THOUGHT = "agent-thought" - MESSAGE_FILE = "message-file" + MESSAGE_REPLACE = "message_replace" + MESSAGE_END = "message_end" + WORKFLOW_STARTED = "workflow_started" + WORKFLOW_FINISHED = "workflow_finished" + NODE_STARTED = "node_started" + NODE_FINISHED = "node_finished" + RETRIEVER_RESOURCES = "retriever_resources" + ANNOTATION_REPLY = "annotation_reply" + AGENT_THOUGHT = "agent_thought" + MESSAGE_FILE = "message_file" ERROR = "error" PING = "ping" STOP = "stop" @@ -30,13 +35,22 @@ class AppQueueEvent(BaseModel): event: QueueEvent -class QueueMessageEvent(AppQueueEvent): +class QueueLLMChunkEvent(AppQueueEvent): """ - QueueMessageEvent entity + QueueLLMChunkEvent entity """ - event = QueueEvent.MESSAGE + event = QueueEvent.LLM_CHUNK chunk: LLMResultChunk + +class QueueTextChunkEvent(AppQueueEvent): + """ + QueueTextChunkEvent entity + """ + event = QueueEvent.TEXT_CHUNK + chunk_text: str + + class QueueAgentMessageEvent(AppQueueEvent): """ QueueMessageEvent entity @@ -61,9 +75,9 @@ class QueueRetrieverResourcesEvent(AppQueueEvent): retriever_resources: list[dict] -class AnnotationReplyEvent(AppQueueEvent): +class QueueAnnotationReplyEvent(AppQueueEvent): """ - AnnotationReplyEvent entity + QueueAnnotationReplyEvent entity """ event = QueueEvent.ANNOTATION_REPLY message_annotation_id: str @@ -76,6 +90,38 @@ class QueueMessageEndEvent(AppQueueEvent): event = QueueEvent.MESSAGE_END llm_result: LLMResult + +class QueueWorkflowStartedEvent(AppQueueEvent): + """ + QueueWorkflowStartedEvent entity + """ + event = QueueEvent.WORKFLOW_STARTED + workflow_run_id: str + + +class QueueWorkflowFinishedEvent(AppQueueEvent): + """ + QueueWorkflowFinishedEvent entity + """ + event = QueueEvent.WORKFLOW_FINISHED + workflow_run_id: str + + +class QueueNodeStartedEvent(AppQueueEvent): + """ + QueueNodeStartedEvent entity + """ + event = QueueEvent.NODE_STARTED + workflow_node_execution_id: str + + +class QueueNodeFinishedEvent(AppQueueEvent): + """ + QueueNodeFinishedEvent entity + """ + event = QueueEvent.NODE_FINISHED + workflow_node_execution_id: str + class QueueAgentThoughtEvent(AppQueueEvent): """ @@ -84,13 +130,15 @@ class QueueAgentThoughtEvent(AppQueueEvent): event = QueueEvent.AGENT_THOUGHT agent_thought_id: str + class QueueMessageFileEvent(AppQueueEvent): """ QueueAgentThoughtEvent entity """ event = QueueEvent.MESSAGE_FILE message_file_id: str - + + class QueueErrorEvent(AppQueueEvent): """ QueueErrorEvent entity diff --git a/api/core/workflow/workflow_engine_manager.py b/api/core/workflow/workflow_engine_manager.py index e69de29bb2..f7955a87e8 100644 --- a/api/core/workflow/workflow_engine_manager.py +++ b/api/core/workflow/workflow_engine_manager.py @@ -0,0 +1,38 @@ +from typing import Optional + +from extensions.ext_database import db +from models.model import App +from models.workflow import Workflow + + +class WorkflowEngineManager: + def get_draft_workflow(self, app_model: App) -> Optional[Workflow]: + """ + Get draft workflow + """ + # fetch draft workflow by app_model + workflow = db.session.query(Workflow).filter( + Workflow.tenant_id == app_model.tenant_id, + Workflow.app_id == app_model.id, + Workflow.version == 'draft' + ).first() + + # return draft workflow + return workflow + + def get_published_workflow(self, app_model: App) -> Optional[Workflow]: + """ + Get published workflow + """ + if not app_model.workflow_id: + return None + + # fetch published workflow by workflow_id + workflow = db.session.query(Workflow).filter( + Workflow.tenant_id == app_model.tenant_id, + Workflow.app_id == app_model.id, + Workflow.id == app_model.workflow_id + ).first() + + # return published workflow + return workflow diff --git a/api/events/event_handlers/deduct_quota_when_messaeg_created.py b/api/events/event_handlers/deduct_quota_when_messaeg_created.py index 77d1ab0822..53cbb2ecdc 100644 --- a/api/events/event_handlers/deduct_quota_when_messaeg_created.py +++ b/api/events/event_handlers/deduct_quota_when_messaeg_created.py @@ -1,4 +1,4 @@ -from core.app.entities.app_invoke_entities import ChatAppGenerateEntity +from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity, ChatAppGenerateEntity from core.entities.provider_entities import QuotaUnit from events.message_event import message_was_created from extensions.ext_database import db @@ -8,7 +8,10 @@ from models.provider import Provider, ProviderType @message_was_created.connect def handle(sender, **kwargs): message = sender - application_generate_entity: ChatAppGenerateEntity = kwargs.get('application_generate_entity') + application_generate_entity = kwargs.get('application_generate_entity') + + if not isinstance(application_generate_entity, ChatAppGenerateEntity | AgentChatAppGenerateEntity): + return model_config = application_generate_entity.model_config provider_model_bundle = model_config.provider_model_bundle diff --git a/api/events/event_handlers/generate_conversation_name_when_first_message_created.py b/api/events/event_handlers/generate_conversation_name_when_first_message_created.py index ebeb3a26dd..1b9cfe41e9 100644 --- a/api/events/event_handlers/generate_conversation_name_when_first_message_created.py +++ b/api/events/event_handlers/generate_conversation_name_when_first_message_created.py @@ -1,6 +1,7 @@ from core.llm_generator.llm_generator import LLMGenerator from events.message_event import message_was_created from extensions.ext_database import db +from models.model import AppMode @message_was_created.connect @@ -15,7 +16,7 @@ def handle(sender, **kwargs): auto_generate_conversation_name = extras.get('auto_generate_conversation_name', True) if auto_generate_conversation_name and is_first_message: - if conversation.mode == 'chat': + if conversation.mode != AppMode.COMPLETION.value: app_model = conversation.app if not app_model: return diff --git a/api/events/event_handlers/update_provider_last_used_at_when_messaeg_created.py b/api/events/event_handlers/update_provider_last_used_at_when_messaeg_created.py index eca773f3b3..ae983cc5d1 100644 --- a/api/events/event_handlers/update_provider_last_used_at_when_messaeg_created.py +++ b/api/events/event_handlers/update_provider_last_used_at_when_messaeg_created.py @@ -1,6 +1,6 @@ from datetime import datetime -from core.app.entities.app_invoke_entities import ChatAppGenerateEntity +from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity, ChatAppGenerateEntity from events.message_event import message_was_created from extensions.ext_database import db from models.provider import Provider @@ -9,7 +9,10 @@ from models.provider import Provider @message_was_created.connect def handle(sender, **kwargs): message = sender - application_generate_entity: ChatAppGenerateEntity = kwargs.get('application_generate_entity') + application_generate_entity = kwargs.get('application_generate_entity') + + if not isinstance(application_generate_entity, ChatAppGenerateEntity | AgentChatAppGenerateEntity): + return db.session.query(Provider).filter( Provider.tenant_id == application_generate_entity.app_config.tenant_id, diff --git a/api/models/model.py b/api/models/model.py index a8ae474c02..05b6abacc0 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -451,10 +451,10 @@ class Conversation(db.Model): id = db.Column(UUID, server_default=db.text('uuid_generate_v4()')) app_id = db.Column(UUID, nullable=False) - app_model_config_id = db.Column(UUID, nullable=False) - model_provider = db.Column(db.String(255), nullable=False) + app_model_config_id = db.Column(UUID, nullable=True) + model_provider = db.Column(db.String(255), nullable=True) override_model_configs = db.Column(db.Text) - model_id = db.Column(db.String(255), nullable=False) + model_id = db.Column(db.String(255), nullable=True) mode = db.Column(db.String(255), nullable=False) name = db.Column(db.String(255), nullable=False) summary = db.Column(db.Text) diff --git a/api/models/workflow.py b/api/models/workflow.py index f9c906b85c..2540d33402 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -272,6 +272,10 @@ class WorkflowRun(db.Model): return EndUser.query.get(self.created_by) \ if created_by_role == CreatedByRole.END_USER else None + @property + def outputs_dict(self): + return self.outputs if not self.outputs else json.loads(self.outputs) + class WorkflowNodeExecutionTriggeredFrom(Enum): """ @@ -294,6 +298,28 @@ class WorkflowNodeExecutionTriggeredFrom(Enum): raise ValueError(f'invalid workflow node execution triggered from value {value}') +class WorkflowNodeExecutionStatus(Enum): + """ + Workflow Node Execution Status Enum + """ + RUNNING = 'running' + SUCCEEDED = 'succeeded' + FAILED = 'failed' + + @classmethod + def value_of(cls, value: str) -> 'WorkflowNodeExecutionStatus': + """ + Get value of given mode. + + :param value: mode value + :return: mode + """ + for mode in cls: + if mode.value == value: + return mode + raise ValueError(f'invalid workflow node execution status value {value}') + + class WorkflowNodeExecution(db.Model): """ Workflow Node Execution @@ -387,6 +413,21 @@ class WorkflowNodeExecution(db.Model): return EndUser.query.get(self.created_by) \ if created_by_role == CreatedByRole.END_USER else None + @property + def inputs_dict(self): + return self.inputs if not self.inputs else json.loads(self.inputs) + + @property + def outputs_dict(self): + return self.outputs if not self.outputs else json.loads(self.outputs) + + @property + def process_data_dict(self): + return self.process_data if not self.process_data else json.loads(self.process_data) + + @property + def execution_metadata_dict(self): + return self.execution_metadata if not self.execution_metadata else json.loads(self.execution_metadata) class WorkflowAppLog(db.Model): """ diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index c9efd056ff..13ea67d343 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -4,6 +4,7 @@ from typing import Optional from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager +from core.workflow.workflow_engine_manager import WorkflowEngineManager from extensions.ext_database import db from models.account import Account from models.model import App, AppMode @@ -21,15 +22,10 @@ class WorkflowService: """ Get draft workflow """ - # fetch draft workflow by app_model - workflow = db.session.query(Workflow).filter( - Workflow.tenant_id == app_model.tenant_id, - Workflow.app_id == app_model.id, - Workflow.version == 'draft' - ).first() + workflow_engine_manager = WorkflowEngineManager() # return draft workflow - return workflow + return workflow_engine_manager.get_draft_workflow(app_model=app_model) def get_published_workflow(self, app_model: App) -> Optional[Workflow]: """ @@ -38,15 +34,10 @@ class WorkflowService: if not app_model.workflow_id: return None - # fetch published workflow by workflow_id - workflow = db.session.query(Workflow).filter( - Workflow.tenant_id == app_model.tenant_id, - Workflow.app_id == app_model.id, - Workflow.id == app_model.workflow_id - ).first() + workflow_engine_manager = WorkflowEngineManager() # return published workflow - return workflow + return workflow_engine_manager.get_published_workflow(app_model=app_model) def sync_draft_workflow(self, app_model: App, graph: dict,