mirror of
https://github.com/langgenius/dify.git
synced 2026-01-08 07:14:14 +00:00
Compare commits
4 Commits
fix/redis-
...
alert-auto
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dddd72ec83 | ||
|
|
181572e330 | ||
|
|
91e5882814 | ||
|
|
fc63841169 |
1
.github/workflows/build-push.yml
vendored
1
.github/workflows/build-push.yml
vendored
@@ -5,7 +5,6 @@ on:
|
||||
branches:
|
||||
- "main"
|
||||
- "deploy/dev"
|
||||
- "fix/redis-slow-in-gevent"
|
||||
release:
|
||||
types: [published]
|
||||
|
||||
|
||||
5
api/configs/middleware/cache/redis_config.py
vendored
5
api/configs/middleware/cache/redis_config.py
vendored
@@ -34,11 +34,6 @@ class RedisConfig(BaseSettings):
|
||||
default=0,
|
||||
)
|
||||
|
||||
REDIS_MAX_CONNECTIONS: PositiveInt = Field(
|
||||
description="Maximum number of connections to Redis",
|
||||
default=200,
|
||||
)
|
||||
|
||||
REDIS_USE_SSL: bool = Field(
|
||||
description="Enable SSL/TLS for the Redis connection",
|
||||
default=False,
|
||||
|
||||
@@ -226,11 +226,13 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
|
||||
is_first_conversation = True
|
||||
|
||||
# init generate records
|
||||
(conversation, message) = self._init_generate_records(
|
||||
application_generate_entity=application_generate_entity,
|
||||
conversation=conversation,
|
||||
override_model_configs=workflow.features_dict if is_first_conversation else None,
|
||||
)
|
||||
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
|
||||
|
||||
if is_first_conversation:
|
||||
# update conversation features
|
||||
conversation.override_model_configs = workflow.features
|
||||
db.session.commit()
|
||||
db.session.refresh(conversation)
|
||||
|
||||
# init queue manager
|
||||
queue_manager = MessageBasedAppQueueManager(
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Mapping
|
||||
from typing import Any, cast
|
||||
|
||||
@@ -102,9 +101,6 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
):
|
||||
return
|
||||
|
||||
# trace start time
|
||||
start_time = time.perf_counter()
|
||||
|
||||
# Init conversation variables
|
||||
stmt = select(ConversationVariable).where(
|
||||
ConversationVariable.app_id == self.conversation.app_id,
|
||||
@@ -132,13 +128,6 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
conversation_dialogue_count = self.conversation.dialogue_count
|
||||
db.session.commit()
|
||||
|
||||
# trace end time
|
||||
end_time = time.perf_counter()
|
||||
print(f"conversation_dialogue_count time: {end_time - start_time}")
|
||||
|
||||
# trace start time
|
||||
start_time = time.perf_counter()
|
||||
|
||||
# Create a variable pool.
|
||||
system_inputs = {
|
||||
SystemVariableKey.QUERY: query,
|
||||
@@ -162,10 +151,6 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
||||
# init graph
|
||||
graph = self._init_graph(graph_config=workflow.graph_dict)
|
||||
|
||||
# trace end time
|
||||
end_time = time.perf_counter()
|
||||
print(f"init graph time: {end_time - start_time}")
|
||||
|
||||
db.session.close()
|
||||
|
||||
# RUN WORKFLOW
|
||||
|
||||
@@ -15,6 +15,7 @@ from core.app.entities.queue_entities import (
|
||||
QueuePingEvent,
|
||||
QueueStopEvent,
|
||||
)
|
||||
from extensions.ext_redis import redis_client
|
||||
|
||||
|
||||
class PublishFrom(Enum):
|
||||
@@ -31,10 +32,10 @@ class AppQueueManager:
|
||||
self._user_id = user_id
|
||||
self._invoke_from = invoke_from
|
||||
|
||||
# user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
|
||||
# redis_client.setex(
|
||||
# AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}"
|
||||
# )
|
||||
user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
|
||||
redis_client.setex(
|
||||
AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}"
|
||||
)
|
||||
|
||||
q = queue.Queue()
|
||||
|
||||
@@ -113,27 +114,26 @@ class AppQueueManager:
|
||||
Set task stop flag
|
||||
:return:
|
||||
"""
|
||||
return
|
||||
# result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
|
||||
# if result is None:
|
||||
# return
|
||||
result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
|
||||
if result is None:
|
||||
return
|
||||
|
||||
# user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
|
||||
# if result.decode("utf-8") != f"{user_prefix}-{user_id}":
|
||||
# return
|
||||
user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
|
||||
if result.decode("utf-8") != f"{user_prefix}-{user_id}":
|
||||
return
|
||||
|
||||
# stopped_cache_key = cls._generate_stopped_cache_key(task_id)
|
||||
# redis_client.setex(stopped_cache_key, 600, 1)
|
||||
stopped_cache_key = cls._generate_stopped_cache_key(task_id)
|
||||
redis_client.setex(stopped_cache_key, 600, 1)
|
||||
|
||||
def _is_stopped(self) -> bool:
|
||||
"""
|
||||
Check if task is stopped
|
||||
:return:
|
||||
"""
|
||||
# stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id)
|
||||
# result = redis_client.get(stopped_cache_key)
|
||||
# if result is not None:
|
||||
# return True
|
||||
stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id)
|
||||
result = redis_client.get(stopped_cache_key)
|
||||
if result is not None:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from collections.abc import Generator, Mapping
|
||||
from collections.abc import Generator
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Optional, Union
|
||||
from typing import Optional, Union
|
||||
|
||||
from sqlalchemy import and_
|
||||
|
||||
@@ -138,7 +137,6 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
AdvancedChatAppGenerateEntity,
|
||||
],
|
||||
conversation: Optional[Conversation] = None,
|
||||
override_model_configs: Optional[Mapping[str, Any]] = None,
|
||||
) -> tuple[Conversation, Message]:
|
||||
"""
|
||||
Initialize generate records
|
||||
@@ -160,12 +158,14 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
|
||||
if isinstance(application_generate_entity, AdvancedChatAppGenerateEntity):
|
||||
app_model_config_id = None
|
||||
override_model_configs = None
|
||||
model_provider = None
|
||||
model_id = None
|
||||
else:
|
||||
app_model_config_id = app_config.app_model_config_id
|
||||
model_provider = application_generate_entity.model_conf.provider
|
||||
model_id = application_generate_entity.model_conf.model
|
||||
override_model_configs = None
|
||||
if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS and app_config.app_mode in {
|
||||
AppMode.AGENT_CHAT,
|
||||
AppMode.CHAT,
|
||||
@@ -177,63 +177,61 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
introduction = self._get_conversation_introduction(application_generate_entity)
|
||||
|
||||
if not conversation:
|
||||
with db.Session(bind=db.engine, expire_on_commit=False) as session:
|
||||
conversation = Conversation()
|
||||
conversation.id = str(uuid.uuid4())
|
||||
conversation.app_id = app_config.app_id
|
||||
conversation.app_model_config_id = app_model_config_id
|
||||
conversation.model_provider = model_provider
|
||||
conversation.model_id = model_id
|
||||
conversation.override_model_configs = (
|
||||
json.dumps(override_model_configs) if override_model_configs else None
|
||||
)
|
||||
conversation.mode = app_config.app_mode.value
|
||||
conversation.name = "New conversation"
|
||||
conversation.inputs = application_generate_entity.inputs
|
||||
conversation.introduction = introduction
|
||||
conversation.system_instruction = ""
|
||||
conversation.system_instruction_tokens = 0
|
||||
conversation.status = "normal"
|
||||
conversation.invoke_from = application_generate_entity.invoke_from.value
|
||||
conversation.from_source = from_source
|
||||
conversation.from_end_user_id = end_user_id
|
||||
conversation.from_account_id = account_id
|
||||
conversation = Conversation(
|
||||
app_id=app_config.app_id,
|
||||
app_model_config_id=app_model_config_id,
|
||||
model_provider=model_provider,
|
||||
model_id=model_id,
|
||||
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
|
||||
mode=app_config.app_mode.value,
|
||||
name="New conversation",
|
||||
inputs=application_generate_entity.inputs,
|
||||
introduction=introduction,
|
||||
system_instruction="",
|
||||
system_instruction_tokens=0,
|
||||
status="normal",
|
||||
invoke_from=application_generate_entity.invoke_from.value,
|
||||
from_source=from_source,
|
||||
from_end_user_id=end_user_id,
|
||||
from_account_id=account_id,
|
||||
)
|
||||
|
||||
session.add(conversation)
|
||||
session.commit()
|
||||
session.refresh(conversation)
|
||||
db.session.add(conversation)
|
||||
db.session.commit()
|
||||
db.session.refresh(conversation)
|
||||
else:
|
||||
conversation.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
|
||||
db.session.commit()
|
||||
|
||||
with db.Session(bind=db.engine, expire_on_commit=False) as session:
|
||||
message = Message()
|
||||
message.app_id = app_config.app_id
|
||||
message.model_provider = model_provider
|
||||
message.model_id = model_id
|
||||
message.override_model_configs = json.dumps(override_model_configs) if override_model_configs else None
|
||||
message.conversation_id = conversation.id
|
||||
message.inputs = application_generate_entity.inputs
|
||||
message.query = application_generate_entity.query or ""
|
||||
message.message = ""
|
||||
message.message_tokens = 0
|
||||
message.message_unit_price = 0
|
||||
message.answer = ""
|
||||
message.answer_tokens = 0
|
||||
message.answer_unit_price = 0
|
||||
message.answer_price_unit = 0
|
||||
message.parent_message_id = getattr(application_generate_entity, "parent_message_id", None)
|
||||
message.provider_response_latency = 0
|
||||
message.total_price = 0
|
||||
message.currency = "USD"
|
||||
message.invoke_from = application_generate_entity.invoke_from.value
|
||||
message.from_source = from_source
|
||||
message.from_end_user_id = end_user_id
|
||||
message.from_account_id = account_id
|
||||
message = Message(
|
||||
app_id=app_config.app_id,
|
||||
model_provider=model_provider,
|
||||
model_id=model_id,
|
||||
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
|
||||
conversation_id=conversation.id,
|
||||
inputs=application_generate_entity.inputs,
|
||||
query=application_generate_entity.query or "",
|
||||
message="",
|
||||
message_tokens=0,
|
||||
message_unit_price=0,
|
||||
message_price_unit=0,
|
||||
answer="",
|
||||
answer_tokens=0,
|
||||
answer_unit_price=0,
|
||||
answer_price_unit=0,
|
||||
parent_message_id=getattr(application_generate_entity, "parent_message_id", None),
|
||||
provider_response_latency=0,
|
||||
total_price=0,
|
||||
currency="USD",
|
||||
invoke_from=application_generate_entity.invoke_from.value,
|
||||
from_source=from_source,
|
||||
from_end_user_id=end_user_id,
|
||||
from_account_id=account_id,
|
||||
)
|
||||
|
||||
session.add(message)
|
||||
session.commit()
|
||||
session.refresh(message)
|
||||
db.session.add(message)
|
||||
db.session.commit()
|
||||
db.session.refresh(message)
|
||||
|
||||
for file in application_generate_entity.files:
|
||||
message_file = MessageFile(
|
||||
|
||||
@@ -16,7 +16,6 @@ import timezone from 'dayjs/plugin/timezone'
|
||||
import { createContext, useContext } from 'use-context-selector'
|
||||
import { useShallow } from 'zustand/react/shallow'
|
||||
import { useTranslation } from 'react-i18next'
|
||||
import { UUID_NIL } from '../../base/chat/constants'
|
||||
import type { ChatItemInTree } from '../../base/chat/types'
|
||||
import VarPanel from './var-panel'
|
||||
import cn from '@/utils/classnames'
|
||||
@@ -84,95 +83,76 @@ const PARAM_MAP = {
|
||||
frequency_penalty: 'Frequency Penalty',
|
||||
}
|
||||
|
||||
function appendQAToChatList(newChatList: IChatItem[], item: any, conversationId: string, timezone: string, format: string) {
|
||||
const answerFiles = item.message_files?.filter((file: any) => file.belongs_to === 'assistant') || []
|
||||
newChatList.push({
|
||||
id: item.id,
|
||||
content: item.answer,
|
||||
agent_thoughts: addFileInfos(item.agent_thoughts ? sortAgentSorts(item.agent_thoughts) : item.agent_thoughts, item.message_files),
|
||||
feedback: item.feedbacks.find((item: any) => item.from_source === 'user'), // user feedback
|
||||
adminFeedback: item.feedbacks.find((item: any) => item.from_source === 'admin'), // admin feedback
|
||||
feedbackDisabled: false,
|
||||
isAnswer: true,
|
||||
message_files: getProcessedFilesFromResponse(answerFiles.map((item: any) => ({ ...item, related_id: item.id }))),
|
||||
log: [
|
||||
...item.message,
|
||||
...(item.message[item.message.length - 1]?.role !== 'assistant'
|
||||
? [
|
||||
{
|
||||
role: 'assistant',
|
||||
text: item.answer,
|
||||
files: item.message_files?.filter((file: any) => file.belongs_to === 'assistant') || [],
|
||||
},
|
||||
]
|
||||
: []),
|
||||
],
|
||||
workflow_run_id: item.workflow_run_id,
|
||||
conversationId,
|
||||
input: {
|
||||
inputs: item.inputs,
|
||||
query: item.query,
|
||||
},
|
||||
more: {
|
||||
time: dayjs.unix(item.created_at).tz(timezone).format(format),
|
||||
tokens: item.answer_tokens + item.message_tokens,
|
||||
latency: item.provider_response_latency.toFixed(2),
|
||||
},
|
||||
citation: item.metadata?.retriever_resources,
|
||||
annotation: (() => {
|
||||
if (item.annotation_hit_history) {
|
||||
return {
|
||||
id: item.annotation_hit_history.annotation_id,
|
||||
authorName: item.annotation_hit_history.annotation_create_account?.name || 'N/A',
|
||||
created_at: item.annotation_hit_history.created_at,
|
||||
}
|
||||
}
|
||||
|
||||
if (item.annotation) {
|
||||
return {
|
||||
id: item.annotation.id,
|
||||
authorName: item.annotation.account.name,
|
||||
logAnnotation: item.annotation,
|
||||
created_at: 0,
|
||||
}
|
||||
}
|
||||
|
||||
return undefined
|
||||
})(),
|
||||
parentMessageId: `question-${item.id}`,
|
||||
})
|
||||
|
||||
const questionFiles = item.message_files?.filter((file: any) => file.belongs_to === 'user') || []
|
||||
newChatList.push({
|
||||
id: `question-${item.id}`,
|
||||
content: item.inputs.query || item.inputs.default_input || item.query, // text generation: item.inputs.query; chat: item.query
|
||||
isAnswer: false,
|
||||
message_files: getProcessedFilesFromResponse(questionFiles.map((item: any) => ({ ...item, related_id: item.id }))),
|
||||
parentMessageId: item.parent_message_id || undefined,
|
||||
})
|
||||
}
|
||||
|
||||
const getFormattedChatList = (messages: ChatMessage[], conversationId: string, timezone: string, format: string) => {
|
||||
const newChatList: IChatItem[] = []
|
||||
let nextMessageId = null
|
||||
for (const item of messages) {
|
||||
if (!item.parent_message_id) {
|
||||
appendQAToChatList(newChatList, item, conversationId, timezone, format)
|
||||
break
|
||||
}
|
||||
messages.forEach((item: ChatMessage) => {
|
||||
const questionFiles = item.message_files?.filter((file: any) => file.belongs_to === 'user') || []
|
||||
newChatList.push({
|
||||
id: `question-${item.id}`,
|
||||
content: item.inputs.query || item.inputs.default_input || item.query, // text generation: item.inputs.query; chat: item.query
|
||||
isAnswer: false,
|
||||
message_files: getProcessedFilesFromResponse(questionFiles.map((item: any) => ({ ...item, related_id: item.id }))),
|
||||
parentMessageId: item.parent_message_id || undefined,
|
||||
})
|
||||
|
||||
if (!nextMessageId) {
|
||||
appendQAToChatList(newChatList, item, conversationId, timezone, format)
|
||||
nextMessageId = item.parent_message_id
|
||||
}
|
||||
else {
|
||||
if (item.id === nextMessageId || nextMessageId === UUID_NIL) {
|
||||
appendQAToChatList(newChatList, item, conversationId, timezone, format)
|
||||
nextMessageId = item.parent_message_id
|
||||
}
|
||||
}
|
||||
}
|
||||
return newChatList.reverse()
|
||||
const answerFiles = item.message_files?.filter((file: any) => file.belongs_to === 'assistant') || []
|
||||
newChatList.push({
|
||||
id: item.id,
|
||||
content: item.answer,
|
||||
agent_thoughts: addFileInfos(item.agent_thoughts ? sortAgentSorts(item.agent_thoughts) : item.agent_thoughts, item.message_files),
|
||||
feedback: item.feedbacks.find(item => item.from_source === 'user'), // user feedback
|
||||
adminFeedback: item.feedbacks.find(item => item.from_source === 'admin'), // admin feedback
|
||||
feedbackDisabled: false,
|
||||
isAnswer: true,
|
||||
message_files: getProcessedFilesFromResponse(answerFiles.map((item: any) => ({ ...item, related_id: item.id }))),
|
||||
log: [
|
||||
...item.message,
|
||||
...(item.message[item.message.length - 1]?.role !== 'assistant'
|
||||
? [
|
||||
{
|
||||
role: 'assistant',
|
||||
text: item.answer,
|
||||
files: item.message_files?.filter((file: any) => file.belongs_to === 'assistant') || [],
|
||||
},
|
||||
]
|
||||
: []),
|
||||
] as IChatItem['log'],
|
||||
workflow_run_id: item.workflow_run_id,
|
||||
conversationId,
|
||||
input: {
|
||||
inputs: item.inputs,
|
||||
query: item.query,
|
||||
},
|
||||
more: {
|
||||
time: dayjs.unix(item.created_at).tz(timezone).format(format),
|
||||
tokens: item.answer_tokens + item.message_tokens,
|
||||
latency: item.provider_response_latency.toFixed(2),
|
||||
},
|
||||
citation: item.metadata?.retriever_resources,
|
||||
annotation: (() => {
|
||||
if (item.annotation_hit_history) {
|
||||
return {
|
||||
id: item.annotation_hit_history.annotation_id,
|
||||
authorName: item.annotation_hit_history.annotation_create_account?.name || 'N/A',
|
||||
created_at: item.annotation_hit_history.created_at,
|
||||
}
|
||||
}
|
||||
|
||||
if (item.annotation) {
|
||||
return {
|
||||
id: item.annotation.id,
|
||||
authorName: item.annotation.account.name,
|
||||
logAnnotation: item.annotation,
|
||||
created_at: 0,
|
||||
}
|
||||
}
|
||||
|
||||
return undefined
|
||||
})(),
|
||||
parentMessageId: `question-${item.id}`,
|
||||
})
|
||||
})
|
||||
return newChatList
|
||||
}
|
||||
|
||||
// const displayedParams = CompletionParams.slice(0, -2)
|
||||
|
||||
@@ -10,6 +10,8 @@ import { ImagePlus } from '../icons/src/vender/line/images'
|
||||
import { useDraggableUploader } from './hooks'
|
||||
import { ALLOW_FILE_EXTENSIONS } from '@/types/app'
|
||||
|
||||
const MAX_FILE_SIZE = 5 * 1024 * 1024
|
||||
|
||||
type UploaderProps = {
|
||||
className?: string
|
||||
onImageCropped?: (tempUrl: string, croppedAreaPixels: Area, fileName: string) => void
|
||||
@@ -38,7 +40,7 @@ const Uploader: FC<UploaderProps> = ({
|
||||
|
||||
const handleLocalFileInput = (e: ChangeEvent<HTMLInputElement>) => {
|
||||
const file = e.target.files?.[0]
|
||||
if (file)
|
||||
if (file && ALLOW_FILE_EXTENSIONS.includes(file.type.split('/').pop()?.toLowerCase() || '') && file.size <= MAX_FILE_SIZE)
|
||||
setInputImage({ file, url: URL.createObjectURL(file) })
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
export const createImage = (url: string) =>
|
||||
const createImage = (url: string) =>
|
||||
new Promise<HTMLImageElement>((resolve, reject) => {
|
||||
const image = new Image()
|
||||
image.addEventListener('load', () => resolve(image))
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { ThoughtItem } from '@/app/components/base/chat/chat/type'
|
||||
import type { FileEntity } from '@/app/components/base/file-uploader/types'
|
||||
import type { VisionFile } from '@/types/app'
|
||||
|
||||
export const sortAgentSorts = (list: ThoughtItem[]) => {
|
||||
if (!list)
|
||||
@@ -11,7 +12,7 @@ export const sortAgentSorts = (list: ThoughtItem[]) => {
|
||||
return temp
|
||||
}
|
||||
|
||||
export const addFileInfos = (list: ThoughtItem[], messageFiles: FileEntity[]) => {
|
||||
export const addFileInfos = (list: ThoughtItem[], messageFiles: (FileEntity | VisionFile)[]) => {
|
||||
if (!list || !messageFiles)
|
||||
return list
|
||||
return list.map((item) => {
|
||||
|
||||
Reference in New Issue
Block a user