add leader but not review

This commit is contained in:
hjlarry
2025-08-08 14:54:18 +08:00
parent d1a5db3310
commit 2d1621c43d
4 changed files with 98 additions and 8 deletions

View File

@@ -67,7 +67,24 @@ def handle_user_connect(sid, data):
"sid": sid,
}
redis_client.hset(f"workflow_online_users:{workflow_id}", user_id, json.dumps(user_info))
# --- Leader Election Logic ---
workflow_users_key = f"workflow_online_users:{workflow_id}"
workflow_order_key = f"workflow_user_order:{workflow_id}"
# Remove user from list in case of reconnection, to add them to the end
redis_client.lrem(workflow_order_key, 0, user_id)
# Add user to the end of the list
redis_client.rpush(workflow_order_key, user_id)
# The first user in the list is the leader
leader_user_id_bytes = redis_client.lindex(workflow_order_key, 0)
is_leader = leader_user_id_bytes and leader_user_id_bytes.decode("utf-8") == user_id
# Notify the connecting client of their leader status
sio.emit("status", {"isLeader": is_leader}, room=sid)
# --- End of Leader Election Logic ---
redis_client.hset(workflow_users_key, user_id, json.dumps(user_info))
redis_client.set(f"ws_sid_map:{sid}", json.dumps({"workflow_id": workflow_id, "user_id": user_id}))
sio.enter_room(sid, workflow_id)
@@ -86,8 +103,35 @@ def handle_disconnect(sid):
data = json.loads(mapping)
workflow_id = data["workflow_id"]
user_id = data["user_id"]
redis_client.hdel(f"workflow_online_users:{workflow_id}", user_id)
workflow_users_key = f"workflow_online_users:{workflow_id}"
workflow_order_key = f"workflow_user_order:{workflow_id}"
# Get leader before any modification
leader_user_id_bytes = redis_client.lindex(workflow_order_key, 0)
was_leader = leader_user_id_bytes and leader_user_id_bytes.decode("utf-8") == user_id
# Remove user
redis_client.hdel(workflow_users_key, user_id)
redis_client.delete(f"ws_sid_map:{sid}")
redis_client.lrem(workflow_order_key, 0, user_id)
# Check if leader disconnected and a new one needs to be elected
if was_leader:
new_leader_user_id_bytes = redis_client.lindex(workflow_order_key, 0)
if new_leader_user_id_bytes:
new_leader_user_id = new_leader_user_id_bytes.decode("utf-8")
# get new leader's info to find their sid
new_leader_info_json = redis_client.hget(workflow_users_key, new_leader_user_id)
if new_leader_info_json:
new_leader_info = json.loads(new_leader_info_json)
new_leader_sid = new_leader_info.get("sid")
if new_leader_sid:
sio.emit("status", {"isLeader": True}, room=new_leader_sid)
# If the room is empty, clean up the redis key
if redis_client.llen(workflow_order_key) == 0:
redis_client.delete(workflow_order_key)
broadcast_online_users(workflow_id)
@@ -96,14 +140,25 @@ def broadcast_online_users(workflow_id):
"""
broadcast online users to the workflow room
"""
users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}")
workflow_users_key = f"workflow_online_users:{workflow_id}"
workflow_order_key = f"workflow_user_order:{workflow_id}"
# The first user in the list is the leader
leader_user_id_bytes = redis_client.lindex(workflow_order_key, 0)
leader_user_id = leader_user_id_bytes.decode("utf-8") if leader_user_id_bytes else None
users_json = redis_client.hgetall(workflow_users_key)
users = []
for _, user_info_json in users_json.items():
try:
users.append(json.loads(user_info_json))
except Exception:
continue
sio.emit("online_users", {"workflow_id": workflow_id, "users": users}, room=workflow_id)
sio.emit(
"online_users",
{"workflow_id": workflow_id, "users": users, "leader": leader_user_id},
room=workflow_id,
)
@sio.on("collaboration_event")

View File

@@ -13,6 +13,7 @@ import { syncWorkflowDraft } from '@/service/workflow'
import { useFeaturesStore } from '@/app/components/base/features/hooks'
import { API_PREFIX } from '@/config'
import { useWorkflowRefreshDraft } from '.'
import { useCollaboration } from '@/app/components/workflow/collaboration/hooks/use-collaboration'
export const useNodesSyncDraft = () => {
const store = useStoreApi()
@@ -21,6 +22,7 @@ export const useNodesSyncDraft = () => {
const { getNodesReadOnly } = useNodesReadOnly()
const { handleRefreshWorkflowDraft } = useWorkflowRefreshDraft()
const params = useParams()
const { isLeader } = useCollaboration(params.appId as string)
const getPostParams = useCallback(() => {
const {
@@ -85,13 +87,14 @@ export const useNodesSyncDraft = () => {
environment_variables: environmentVariables,
conversation_variables: conversationVariables,
hash: syncWorkflowDraftHash,
_is_collaborative: true,
},
}
}
}, [store, featuresStore, workflowStore])
const syncWorkflowDraftWhenPageClose = useCallback(() => {
if (getNodesReadOnly())
if (getNodesReadOnly() || !isLeader)
return
const postParams = getPostParams()
@@ -111,8 +114,10 @@ export const useNodesSyncDraft = () => {
onSettled?: () => void
},
) => {
if (getNodesReadOnly())
if (getNodesReadOnly() || !isLeader)
return
console.log('I am the leader, saving draft...')
const postParams = getPostParams()
if (postParams) {

View File

@@ -14,6 +14,8 @@ export class CollaborationManager {
private eventEmitter = new EventEmitter()
private currentAppId: string | null = null
private reactFlowStore: any = null
private isLeader = false
private leaderId: string | null = null
private cursors: Record<string, CursorPosition> = {}
init = (appId: string, reactFlowStore: any): void => {
@@ -115,6 +117,10 @@ export class CollaborationManager {
return this.eventEmitter.on('varsAndFeaturesUpdate', callback)
}
onLeaderChange(callback: (isLeader: boolean) => void): () => void {
return this.eventEmitter.on('leaderChange', callback)
}
private syncNodes(oldNodes: Node[], newNodes: Node[]): void {
if (!this.nodesMap) return
@@ -223,7 +229,7 @@ export class CollaborationManager {
}
})
socket.on('online_users', (data: { users: OnlineUser[] }) => {
socket.on('online_users', (data: { users: OnlineUser[]; leader: string }) => {
const onlineUserIds = new Set(data.users.map(user => user.user_id))
// Remove cursors for offline users
@@ -233,10 +239,27 @@ export class CollaborationManager {
})
console.log('Updated online users and cleaned offline cursors:', data.users)
this.leaderId = data.leader
this.eventEmitter.emit('onlineUsers', data.users)
this.eventEmitter.emit('cursors', { ...this.cursors })
})
socket.on('status', (data: { isLeader: boolean }) => {
if (this.isLeader !== data.isLeader) {
this.isLeader = data.isLeader
console.log(`Collaboration: I am now the ${this.isLeader ? 'Leader' : 'Follower'}.`)
this.eventEmitter.emit('leaderChange', this.isLeader)
}
})
socket.on('status', (data: { isLeader: boolean }) => {
if (this.isLeader !== data.isLeader) {
this.isLeader = data.isLeader
console.log(`Collaboration: I am now the ${this.isLeader ? 'Leader' : 'Follower'}.`)
this.eventEmitter.emit('leaderChange', this.isLeader)
}
})
socket.on('connect', () => {
this.eventEmitter.emit('stateChange', { isConnected: true })
})

View File

@@ -4,10 +4,11 @@ import { CursorService } from '../services/cursor-service'
import type { CollaborationState } from '../types/collaboration'
export function useCollaboration(appId: string, reactFlowStore?: any) {
const [state, setState] = useState<Partial<CollaborationState>>({
const [state, setState] = useState<Partial<CollaborationState & { isLeader: boolean }>>({
isConnected: false,
onlineUsers: [],
cursors: {},
isLeader: false,
})
const cursorServiceRef = useRef<CursorService | null>(null)
@@ -44,10 +45,15 @@ export function useCollaboration(appId: string, reactFlowStore?: any) {
setState((prev: any) => ({ ...prev, onlineUsers: users }))
})
const unsubscribeLeaderChange = collaborationManager.onLeaderChange((isLeader: boolean) => {
setState((prev: any) => ({ ...prev, isLeader }))
})
return () => {
unsubscribeStateChange()
unsubscribeCursors()
unsubscribeUsers()
unsubscribeLeaderChange()
cursorServiceRef.current?.stopTracking()
collaborationManager.disconnect()
}
@@ -69,6 +75,7 @@ export function useCollaboration(appId: string, reactFlowStore?: any) {
isConnected: state.isConnected || false,
onlineUsers: state.onlineUsers || [],
cursors: state.cursors || {},
isLeader: state.isLeader || false,
startCursorTracking,
stopCursorTracking,
}