From bf5c391e47fae1def10104dab4a1dc95d1c2492c Mon Sep 17 00:00:00 2001 From: JoshuaL3000 <112940391+JoshuaL3000@users.noreply.github.com> Date: Fri, 1 Nov 2024 09:50:20 +0800 Subject: [PATCH] Add Workflow Executor Example (#892) Signed-off-by: JoshuaL3000 Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- WorkflowExecAgent/README.md | 131 ++++++++++++++++++ .../intel/cpu/xeon/compose_vllm.yaml | 31 +++++ .../docker_image_build/build.yaml | 13 ++ WorkflowExecAgent/tests/1_build_images.sh | 29 ++++ .../tests/2_start_vllm_service.sh | 67 +++++++++ .../tests/3_launch_and_validate_agent.sh | 66 +++++++++ WorkflowExecAgent/tests/README.md | 38 +++++ .../tests/test_compose_vllm_on_xeon.sh | 33 +++++ .../tool_chat_template_mistral_custom.jinja | 83 +++++++++++ .../tools/components/component.py | 10 ++ .../tools/components/workflow.py | 70 ++++++++++ WorkflowExecAgent/tools/custom_prompt.py | 9 ++ WorkflowExecAgent/tools/sdk.py | 19 +++ WorkflowExecAgent/tools/tools.py | 42 ++++++ WorkflowExecAgent/tools/tools.yaml | 14 ++ .../tools/utils/handle_requests.py | 76 ++++++++++ 16 files changed, 731 insertions(+) create mode 100644 WorkflowExecAgent/README.md create mode 100644 WorkflowExecAgent/docker_compose/intel/cpu/xeon/compose_vllm.yaml create mode 100644 WorkflowExecAgent/docker_image_build/build.yaml create mode 100644 WorkflowExecAgent/tests/1_build_images.sh create mode 100644 WorkflowExecAgent/tests/2_start_vllm_service.sh create mode 100644 WorkflowExecAgent/tests/3_launch_and_validate_agent.sh create mode 100644 WorkflowExecAgent/tests/README.md create mode 100644 WorkflowExecAgent/tests/test_compose_vllm_on_xeon.sh create mode 100644 WorkflowExecAgent/tests/tool_chat_template_mistral_custom.jinja create mode 100644 WorkflowExecAgent/tools/components/component.py create mode 100644 WorkflowExecAgent/tools/components/workflow.py create mode 100644 WorkflowExecAgent/tools/custom_prompt.py create mode 100644 WorkflowExecAgent/tools/sdk.py create mode 100644 WorkflowExecAgent/tools/tools.py create mode 100644 WorkflowExecAgent/tools/tools.yaml create mode 100644 WorkflowExecAgent/tools/utils/handle_requests.py diff --git a/WorkflowExecAgent/README.md b/WorkflowExecAgent/README.md new file mode 100644 index 000000000..0a4b7f333 --- /dev/null +++ b/WorkflowExecAgent/README.md @@ -0,0 +1,131 @@ +# Workflow Executor Agent + +## Overview + +GenAI Workflow Executor Example showcases the capability to handle data/AI workflow operations via LangChain agents to execute custom-defined workflow-based tools. These workflow tools can be interfaced from any 3rd-party tools in the market (no-code/low-code/IDE) such as Alteryx, RapidMiner, Power BI, Intel Data Insight Automation which allows users to create complex data/AI workflow operations for different use-cases. + +### Workflow Executor + +This example demonstrates a single React-LangGraph with a `Workflow Executor` tool to ingest a user prompt to execute workflows and return an agent reasoning response based on the workflow output data. + +First the LLM extracts the relevant information from the user query based on the schema of the tool in `tools/tools.yaml`. Then the agent sends this `AgentState` to the `Workflow Executor` tool. + +`Workflow Executor` tool uses `EasyDataSDK` class as seen under `tools/sdk.py` to interface with several high-level API's. There are 3 steps to this tool implementation: + +1. Starts the workflow with workflow parameters and workflow id extracted from the user query. + +2. Periodically checks the workflow status for completion or failure. This may be through a database which stores the current status of the workflow + +3. Retrieves the output data from the workflow through a storage service. + +The `AgentState` is sent back to the LLM for reasoning. Based on the output data, the LLM generates a response to answer the user's input prompt. + +Below shows an illustration of this flow: + +![image](https://github.com/user-attachments/assets/cb135042-1505-4aef-8822-c78c2f72aa2a) + +### Workflow Serving for Agent + +As an example, here we have a Churn Prediction use-case workflow as the serving workflow for the agent execution. It is created through Intel Data Insight Automation platform. The image below shows a snapshot of the Churn Prediction workflow. + +![image](https://github.com/user-attachments/assets/c067f8b3-86cf-4abc-a8bd-51a98de8172d) + +The workflow contains 2 paths which can be seen in the workflow illustrated, the top path and bottom path. + +1. Top path - The training path which ends at the random forest classifier node is the training path. The data is cleaned through a series of nodes and used to train a random forest model for prediction. + +2. Bottom path - The inference path where trained random forest model is used for inferencing based on input parameter. + +For this agent workflow execution, the inferencing path is executed to yield the final output result of the `Model Predictor` node. The same output is returned to the `Workflow Executor` tool through the `Langchain API Serving` node. + +There are `Serving Parameters` in the workflow, which are the tool input variables used to start a workflow instance obtained from `params` the LLM extracts from the user query. Below shows the parameter configuration option for the Intel Data Insight Automation workflow UI. + +![image](https://github.com/user-attachments/assets/ce8ef01a-56ff-4278-b84d-b6e4592b28c6) + +Manually running the workflow yields the tabular data output as shown below: + +![image](https://github.com/user-attachments/assets/241c1aba-2a24-48da-8005-ec7bfe657179) + +In the workflow serving for agent, this output will be returned to the `Workflow Executor` tool. The LLM can then answer the user's original question based on this output. + +To start prompting the agent microservice, we will use the following command for this use case: + +```sh +curl http://${ip_address}:9090/v1/chat/completions -X POST -H "Content-Type: application/json" -d '{ + "query": "I have a data with gender Female, tenure 55, MonthlyAvgCharges 103.7. Predict if this entry will churn. My workflow id is '${workflow_id}'." + }' +``` + +The user has to provide a `workflow_id` and workflow `params` in the query. `workflow_id` a unique id used for serving the workflow to the microservice. Notice that the `query` string includes all the workflow `params` which the user has defined in the workflow. The LLM will extract these parameters into a dictionary format for the workflow `Serving Parameters` as shown below: + +```python +params = {"gender": "Female", "tenure": 55, "MonthlyAvgCharges": 103.7} +``` + +These parameters will be passed into the `Workflow Executor` tool to start the workflow execution of specified `workflow_id`. Thus, everything will be handled via the microservice. + +And finally here are the results from the microservice logs: + +![image](https://github.com/user-attachments/assets/969fefb7-543d-427f-a56c-dc70e474ae60) + +## Microservice Setup + +### Start Agent Microservice + +Workflow Executor will have a single docker image. First, build the agent docker image. + +```sh +git clone https://github.com/opea-project/GenAIExamples.git +cd GenAIExamples//WorkflowExecAgent/docker_image_build/ +docker compose -f build.yaml build --no-cache +``` + +Configure `GenAIExamples/WorkflowExecAgent/docker_compose/.env` file with the following. Replace the variables according to your usecase. + +```sh +export SDK_BASE_URL=${SDK_BASE_URL} +export SERVING_TOKEN=${SERVING_TOKEN} +export HUGGINGFACEHUB_API_TOKEN=${HF_TOKEN} +export llm_engine=${llm_engine} +export llm_endpoint_url=${llm_endpoint_url} +export ip_address=$(hostname -I | awk '{print $1}') +export model="mistralai/Mistral-7B-Instruct-v0.3" +export recursion_limit=${recursion_limit} +export temperature=0 +export max_new_tokens=1000 +export WORKDIR=${WORKDIR} +export TOOLSET_PATH=$WORKDIR/GenAIExamples/WorkflowExecAgent/tools/ +export http_proxy=${http_proxy} +export https_proxy=${https_proxy} +``` + +Launch service by running the docker compose command. + +```sh +cd $WORKDIR/GenAIExamples/WorkflowExecAgent/docker_compose +docker compose -f compose.yaml up -d +``` + +### Validate service + +The microservice logs can be viewed using: + +```sh +docker logs workflowexec-agent-endpoint +``` + +You should be able to see "HTTP server setup successful" upon successful startup. + +You can validate the service using the following command: + +```sh +curl http://${ip_address}:9090/v1/chat/completions -X POST -H "Content-Type: application/json" -d '{ + "query": "I have a data with gender Female, tenure 55, MonthlyAvgCharges 103.7. Predict if this entry will churn. My workflow id is '${workflow_id}'." + }' +``` + +Update the `query` with the workflow parameters, workflow id, etc based on the workflow context. + +## Roadmap + +Phase II: Agent memory integration to enable capability to store tool intermediate results, such as workflow instance key. diff --git a/WorkflowExecAgent/docker_compose/intel/cpu/xeon/compose_vllm.yaml b/WorkflowExecAgent/docker_compose/intel/cpu/xeon/compose_vllm.yaml new file mode 100644 index 000000000..6fede271a --- /dev/null +++ b/WorkflowExecAgent/docker_compose/intel/cpu/xeon/compose_vllm.yaml @@ -0,0 +1,31 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +services: + worflowexec-agent: + image: opea/agent-langchain:latest + container_name: workflowexec-agent-endpoint + volumes: + - ${WORKDIR}/GenAIComps/comps/agent/langchain/:/home/user/comps/agent/langchain/ + - ${TOOLSET_PATH}:/home/user/tools/ + ports: + - "9090:9090" + ipc: host + environment: + ip_address: ${ip_address} + strategy: react_langgraph + recursion_limit: ${recursion_limit} + llm_engine: ${llm_engine} + llm_endpoint_url: ${llm_endpoint_url} + model: ${model} + temperature: ${temperature} + max_new_tokens: ${max_new_tokens} + streaming: false + tools: /home/user/tools/tools.yaml + no_proxy: ${no_proxy} + http_proxy: ${http_proxy} + https_proxy: ${https_proxy} + port: 9090 + SDK_BASE_URL: ${SDK_BASE_URL} + SERVING_TOKEN: ${SERVING_TOKEN} + custom_prompt: /home/user/tools/custom_prompt.py diff --git a/WorkflowExecAgent/docker_image_build/build.yaml b/WorkflowExecAgent/docker_image_build/build.yaml new file mode 100644 index 000000000..e2a778b9a --- /dev/null +++ b/WorkflowExecAgent/docker_image_build/build.yaml @@ -0,0 +1,13 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +services: + agent-langchain: + build: + context: GenAIComps + dockerfile: comps/agent/langchain/Dockerfile + args: + http_proxy: ${http_proxy} + https_proxy: ${https_proxy} + no_proxy: ${no_proxy} + image: ${REGISTRY:-opea}/agent-langchain:${TAG:-latest} diff --git a/WorkflowExecAgent/tests/1_build_images.sh b/WorkflowExecAgent/tests/1_build_images.sh new file mode 100644 index 000000000..ebb4883f4 --- /dev/null +++ b/WorkflowExecAgent/tests/1_build_images.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -e +WORKPATH=$(dirname "$PWD") +export WORKDIR=$WORKPATH/../../ +echo "WORKDIR=${WORKDIR}" + +function get_genai_comps() { + if [ ! -d "GenAIComps" ] ; then + git clone https://github.com/opea-project/GenAIComps.git && cd GenAIComps && git checkout "${opea_branch:-"main"}" && cd ../ + fi +} + +function build_agent_docker_image() { + cd $WORKDIR/GenAIExamples/WorkflowExecAgent/docker_image_build/ + get_genai_comps + echo "Build agent image with --no-cache..." + docker compose -f build.yaml build --no-cache +} + +function main() { + echo "==================== Build agent docker image ====================" + build_agent_docker_image + echo "==================== Build agent docker image completed ====================" +} + +main diff --git a/WorkflowExecAgent/tests/2_start_vllm_service.sh b/WorkflowExecAgent/tests/2_start_vllm_service.sh new file mode 100644 index 000000000..2c3425328 --- /dev/null +++ b/WorkflowExecAgent/tests/2_start_vllm_service.sh @@ -0,0 +1,67 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -e + +WORKPATH=$(dirname "$PWD") +LOG_PATH="$WORKPATH/tests" +vllm_port=${vllm_port} +[[ -z "$vllm_port" ]] && vllm_port=8084 +model=mistralai/Mistral-7B-Instruct-v0.3 +export WORKDIR=$WORKPATH/../../ +export HF_TOKEN=${HUGGINGFACEHUB_API_TOKEN} + +function build_vllm_docker_image() { + echo "Building the vllm docker images" + cd $WORKPATH + echo $WORKPATH + if [ ! -d "./vllm" ]; then + git clone https://github.com/vllm-project/vllm.git + cd ./vllm; git checkout tags/v0.6.0 + else + cd ./vllm + fi + docker build -f Dockerfile.cpu -t vllm-cpu-env --shm-size=100g . + if [ $? -ne 0 ]; then + echo "opea/vllm:cpu failed" + exit 1 + else + echo "opea/vllm:cpu successful" + fi +} + +function start_vllm_service() { + echo "start vllm service" + docker run -d -p ${vllm_port}:${vllm_port} --rm --network=host --name test-comps-vllm-service -v ~/.cache/huggingface:/root/.cache/huggingface -v ${WORKPATH}/tests/tool_chat_template_mistral_custom.jinja:/root/tool_chat_template_mistral_custom.jinja -e HF_TOKEN=$HF_TOKEN -e http_proxy=$http_proxy -e https_proxy=$https_proxy -it vllm-cpu-env --model ${model} --port ${vllm_port} --chat-template /root/tool_chat_template_mistral_custom.jinja --enable-auto-tool-choice --tool-call-parser mistral + echo ${LOG_PATH}/vllm-service.log + sleep 5s + echo "Waiting vllm ready" + n=0 + until [[ "$n" -ge 100 ]] || [[ $ready == true ]]; do + docker logs test-comps-vllm-service &> ${LOG_PATH}/vllm-service.log + n=$((n+1)) + if grep -q "Uvicorn running on" ${LOG_PATH}/vllm-service.log; then + break + fi + if grep -q "No such container" ${LOG_PATH}/vllm-service.log; then + echo "container test-comps-vllm-service not found" + exit 1 + fi + sleep 5s + done + sleep 5s + echo "Service started successfully" +} + +function main() { + echo "==================== Build vllm docker image ====================" + build_vllm_docker_image + echo "==================== Build vllm docker image completed ====================" + + echo "==================== Start vllm docker service ====================" + start_vllm_service + echo "==================== Start vllm docker service completed ====================" +} + +main diff --git a/WorkflowExecAgent/tests/3_launch_and_validate_agent.sh b/WorkflowExecAgent/tests/3_launch_and_validate_agent.sh new file mode 100644 index 000000000..5c9e6da58 --- /dev/null +++ b/WorkflowExecAgent/tests/3_launch_and_validate_agent.sh @@ -0,0 +1,66 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -e + +WORKPATH=$(dirname "$PWD") +workflow_id=9809 +vllm_port=${vllm_port} +[[ -z "$vllm_port" ]] && vllm_port=8084 +export WORKDIR=$WORKPATH/../../ +echo "WORKDIR=${WORKDIR}" +export SDK_BASE_URL=${SDK_BASE_URL} +export SERVING_TOKEN=${SERVING_TOKEN} +export HF_TOKEN=${HUGGINGFACEHUB_API_TOKEN} +export llm_engine=vllm +export ip_address=$(hostname -I | awk '{print $1}') +export llm_endpoint_url=http://${ip_address}:${vllm_port} +export model=mistralai/Mistral-7B-Instruct-v0.3 +export recursion_limit=25 +export temperature=0 +export max_new_tokens=1000 +export TOOLSET_PATH=$WORKDIR/GenAIExamples/WorkflowExecAgent/tools/ + +function start_agent_and_api_server() { + echo "Starting Agent services" + cd $WORKDIR/GenAIExamples/WorkflowExecAgent/docker_compose/intel/cpu/xeon + WORKDIR=$WORKPATH/docker_image_build/ docker compose -f compose_vllm.yaml up -d + echo "Waiting agent service ready" + sleep 5s +} + +function validate() { + local CONTENT="$1" + local EXPECTED_RESULT="$2" + local SERVICE_NAME="$3" + + if echo "$CONTENT" | grep -q "$EXPECTED_RESULT"; then + echo "[ $SERVICE_NAME ] Content is as expected: $CONTENT" + echo "[TEST INFO]: Workflow Executor agent service PASSED" + else + echo "[ $SERVICE_NAME ] Content does not match the expected result: $CONTENT" + echo "[TEST INFO]: Workflow Executor agent service FAILED" + fi +} + +function validate_agent_service() { + echo "----------------Test agent ----------------" + local CONTENT=$(curl http://${ip_address}:9090/v1/chat/completions -X POST -H "Content-Type: application/json" -d '{ + "query": "I have a data with gender Female, tenure 55, MonthlyAvgCharges 103.7. Predict if this entry will churn. My workflow id is '${workflow_id}'." + }') + validate "$CONTENT" "The entry is not likely to churn" "workflowexec-agent-endpoint" + docker logs workflowexec-agent-endpoint +} + +function main() { + echo "==================== Start agent ====================" + start_agent_and_api_server + echo "==================== Agent started ====================" + + echo "==================== Validate agent service ====================" + validate_agent_service + echo "==================== Agent service validated ====================" +} + +main diff --git a/WorkflowExecAgent/tests/README.md b/WorkflowExecAgent/tests/README.md new file mode 100644 index 000000000..1dbaab6e9 --- /dev/null +++ b/WorkflowExecAgent/tests/README.md @@ -0,0 +1,38 @@ +# Validate Workflow Agent Microservice + +Microservice validation for Intel Data Insight Automation platform workflow serving. + +## Usage + +Configure necessary variables as listed below. Replace the variables according to your usecase. + +```sh +export SDK_BASE_URL=${SDK_BASE_URL} +export SERVING_TOKEN=${SERVING_TOKEN} +export HUGGINGFACEHUB_API_TOKEN=${HF_TOKEN} +export workflow_id=${workflow_id} # workflow_id of the serving workflow +export vllm_port=${vllm_port} # vllm serving port +export ip_address=$(hostname -I | awk '{print $1}') +export VLLM_CPU_OMP_THREADS_BIND=${VLLM_CPU_OMP_THREADS_BIND} +export http_proxy=${http_proxy} +export https_proxy=${https_proxy} +``` + +Note: `SDK_BASE_URL` and `SERVING_TOKEN` can be obtained from Intel Data Insight Automation platform. + +Launch validation by running the following command. + +```sh +cd GenAIExamples/WorkflowExecAgent/tests +. /test_compose_on_xeon.sh +``` + +`test_compose_on_xeon.sh` will run the other `.sh` files under `tests/`. The validation script launches 1 docker container for the agent microservice, and another for the vllm model serving on CPU. When validation is completed, all containers will be stopped. + +The validation is tested by checking if the model reasoning output response matches a partial substring. The expected output is shown below: + +![image](https://github.com/user-attachments/assets/88081bc8-7b73-470d-970e-92e0fe5f96ec) + +## Note + +- Currently the validation test is only designed with vllm model serving (CPU only). diff --git a/WorkflowExecAgent/tests/test_compose_vllm_on_xeon.sh b/WorkflowExecAgent/tests/test_compose_vllm_on_xeon.sh new file mode 100644 index 000000000..d1faa05a8 --- /dev/null +++ b/WorkflowExecAgent/tests/test_compose_vllm_on_xeon.sh @@ -0,0 +1,33 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +function stop_agent_and_api_server() { + echo "Stopping Agent services" + docker rm --force $(docker ps -a -q --filter="name=workflowexec-agent-endpoint") +} + +function stop_vllm_docker() { + cid=$(docker ps -aq --filter "name=test-comps-vllm-service") + echo "Stopping the docker containers "${cid} + if [[ ! -z "$cid" ]]; then docker rm $cid -f && sleep 1s; fi + echo "Docker containers stopped successfully" +} + +echo "=================== #1 Building docker images ====================" +bash 1_build_images.sh +echo "=================== #1 Building docker images completed ====================" + +echo "=================== #2 Start vllm service ====================" +bash 2_start_vllm_service.sh +echo "=================== #2 Start vllm service completed ====================" + +echo "=================== #3 Start agent and API server ====================" +bash 3_launch_and_validate_agent.sh +echo "=================== #3 Agent test completed ====================" + +echo "=================== #4 Stop agent and API server ====================" +stop_agent_and_api_server +stop_vllm_docker +echo "=================== #4 Agent and API server stopped ====================" + +echo "ALL DONE!" diff --git a/WorkflowExecAgent/tests/tool_chat_template_mistral_custom.jinja b/WorkflowExecAgent/tests/tool_chat_template_mistral_custom.jinja new file mode 100644 index 000000000..05905ea35 --- /dev/null +++ b/WorkflowExecAgent/tests/tool_chat_template_mistral_custom.jinja @@ -0,0 +1,83 @@ +{%- if messages[0]["role"] == "system" %} + {%- set system_message = messages[0]["content"] %} + {%- set loop_messages = messages[1:] %} +{%- else %} + {%- set loop_messages = messages %} +{%- endif %} +{%- if not tools is defined %} + {%- set tools = none %} +{%- endif %} +{%- set user_messages = loop_messages | selectattr("role", "equalto", "user") | list %} + +{%- for message in loop_messages | rejectattr("role", "equalto", "tool") | rejectattr("role", "equalto", "tool_results") | selectattr("tool_calls", "undefined") %} +{%- endfor %} + +{{- bos_token }} +{%- for message in loop_messages %} + {%- if message["role"] == "user" %} + {%- if tools is not none and (message == user_messages[-1]) %} + {{- "[AVAILABLE_TOOLS] [" }} + {%- for tool in tools %} + {%- set tool = tool.function %} + {{- '{"type": "function", "function": {' }} + {%- for key, val in tool.items() if key != "return" %} + {%- if val is string %} + {{- '"' + key + '": "' + val + '"' }} + {%- else %} + {{- '"' + key + '": ' + val|tojson }} + {%- endif %} + {%- if not loop.last %} + {{- ", " }} + {%- endif %} + {%- endfor %} + {{- "}}" }} + {%- if not loop.last %} + {{- ", " }} + {%- else %} + {{- "]" }} + {%- endif %} + {%- endfor %} + {{- "[/AVAILABLE_TOOLS]" }} + {%- endif %} + {%- if loop.last and system_message is defined %} + {{- "[INST] " + system_message + "\n\n" + message["content"] + "[/INST]" }} + {%- else %} + {{- "[INST] " + message["content"] + "[/INST]" }} + {%- endif %} + {%- elif message["role"] == "tool_calls" or message.tool_calls is defined %} + {%- if message.tool_calls is defined %} + {%- set tool_calls = message.tool_calls %} + {%- else %} + {%- set tool_calls = message.content %} + {%- endif %} + {{- "[TOOL_CALLS] [" }} + {%- for tool_call in tool_calls %} + {%- set out = tool_call.function|tojson %} + {{- out[:-1] }} + {%- if not tool_call.id is defined or tool_call.id|length < 9 %} + {{- raise_exception("Tool call IDs should be alphanumeric strings with length >= 9! (1)" + tool_call.id) }} + {%- endif %} + {{- ', "id": "' + tool_call.id[-9:] + '"}' }} + {%- if not loop.last %} + {{- ", " }} + {%- else %} + {{- "]" + eos_token }} + {%- endif %} + {%- endfor %} + {%- elif message["role"] == "assistant" %} + {{- " " + message["content"] + eos_token }} + {%- elif message["role"] == "tool_results" or message["role"] == "tool" %} + {%- if message.content is defined and message.content.content is defined %} + {%- set content = message.content.content %} + {%- else %} + {%- set content = message.content %} + {%- endif %} + {{- '[TOOL_RESULTS] {"content": ' + content|string + ", " }} + {%- if not message.tool_call_id is defined or message.tool_call_id|length < 9 %} + {{- raise_exception("Tool call IDs should be alphanumeric strings with length >= 9! (2)" + message.tool_call_id) }} + {%- endif %} + {{- '"call_id": "' + message.tool_call_id[-9:] + '"}[/TOOL_RESULTS]' }} + {%- else %} + {{- raise_exception("Only user and assistant roles are supported, with the exception of an initial optional system message!") }} + {%- endif %} +{%- endfor %} diff --git a/WorkflowExecAgent/tools/components/component.py b/WorkflowExecAgent/tools/components/component.py new file mode 100644 index 000000000..e0491aaa1 --- /dev/null +++ b/WorkflowExecAgent/tools/components/component.py @@ -0,0 +1,10 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +class Component: + def __init__(self, request_handler): + self.request_handler = request_handler + + def _make_request(self, *args, **kwargs): + return self.request_handler._make_request(*args, **kwargs) diff --git a/WorkflowExecAgent/tools/components/workflow.py b/WorkflowExecAgent/tools/components/workflow.py new file mode 100644 index 000000000..7ac1863c2 --- /dev/null +++ b/WorkflowExecAgent/tools/components/workflow.py @@ -0,0 +1,70 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import json +from typing import Dict + +from tools.components.component import Component + + +class Workflow(Component): + """Class for handling EasyData workflow operations. + + Attributes: + workflow_id: workflow id + wf_key: workflow key. Generated and stored when starting a servable workflow. + """ + + def __init__(self, request_handler, workflow_id=None, workflow_key=None): + super().__init__(request_handler) + self.workflow_id = workflow_id + self.wf_key = workflow_key + + def start(self, params: Dict[str, str]) -> Dict[str, str]: + """ + ``POST https://SDK_BASE_URL/serving/servable_workflows/{workflow_id}/start`` + + Starts a workflow instance with the workflow_id and parameters provided. + Returns a workflow key used to track the workflow instance. + + :param dict params: Workflow parameters used to start workflow. + + :returns: WorkflowKey + + :rtype: string + """ + data = json.dumps({"params": params}) + endpoint = f"serving/servable_workflows/{self.workflow_id}/start" + self.wf_key = self._make_request(endpoint, "POST", data)["wf_key"] + if self.wf_key: + return f"Workflow successfully started. The workflow key is {self.wf_key}." + else: + return "Workflow failed to start" + + def get_status(self) -> Dict[str, str]: + """ + ``GET https://SDK_BASE_URL/serving/serving_workflows/{workflow_key}/status`` + + Gets the workflow status. + + :returns: WorkflowStatus + + :rtype: string + """ + + endpoint = f"serving/serving_workflows/{self.wf_key}/status" + return self._make_request(endpoint, "GET") + + def result(self) -> list[Dict[str, str]]: + """ + ``GET https://SDK_BASE_URL/serving/serving_workflows/{workflow_key}/results`` + + Gets the workflow output result. + + :returns: WorkflowOutputData + + :rtype: string + """ + + endpoint = f"serving/serving_workflows/{self.wf_key}/results" + return self._make_request(endpoint, "GET") diff --git a/WorkflowExecAgent/tools/custom_prompt.py b/WorkflowExecAgent/tools/custom_prompt.py new file mode 100644 index 000000000..bdad5d6b9 --- /dev/null +++ b/WorkflowExecAgent/tools/custom_prompt.py @@ -0,0 +1,9 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +REACT_SYS_MESSAGE = """\ +You are a helpful assistant. You are to start the workflow using the tool provided. +After the workflow is completed, you will use the output data to answer the user's original question in a one short sentence. + +Now begin! +""" diff --git a/WorkflowExecAgent/tools/sdk.py b/WorkflowExecAgent/tools/sdk.py new file mode 100644 index 000000000..30838887f --- /dev/null +++ b/WorkflowExecAgent/tools/sdk.py @@ -0,0 +1,19 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os + +from tools.components.workflow import Workflow +from tools.utils.handle_requests import RequestHandler + + +class EasyDataSDK: + def __init__(self): + self.request_handler = RequestHandler(os.environ["SDK_BASE_URL"], os.environ["SERVING_TOKEN"]) + + def create_workflow(self, workflow_id=None, workflow_key=None): + return Workflow( + self.request_handler, + workflow_id=workflow_id, + workflow_key=workflow_key, + ) diff --git a/WorkflowExecAgent/tools/tools.py b/WorkflowExecAgent/tools/tools.py new file mode 100644 index 000000000..4a9c98909 --- /dev/null +++ b/WorkflowExecAgent/tools/tools.py @@ -0,0 +1,42 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import time + +from tools.sdk import EasyDataSDK + + +def workflow_executor(params, workflow_id: int) -> dict: + sdk = EasyDataSDK() + workflow = sdk.create_workflow(workflow_id) + + params = {key: str(val) for key, val in params.items()} + start_workflow = workflow.start(params) + print(start_workflow) + + def check_workflow(): + workflow_status = workflow.get_status()["workflow_status"] + if workflow_status == "finished": + message = "Workflow finished." + elif workflow_status == "initializing" or workflow_status == "running": + message = "Workflow execution is still in progress." + else: + message = "Workflow has failed." + + return workflow_status, message + + MAX_RETRY = 50 + num_retry = 0 + while num_retry < MAX_RETRY: + workflow_status, message = check_workflow() + print(message) + if workflow_status == "failed" or workflow_status == "finished": + break + else: + time.sleep(100) # interval between each status checking retry + num_retry += 1 + + if workflow_status == "finished": + return workflow.result() + else: + return message diff --git a/WorkflowExecAgent/tools/tools.yaml b/WorkflowExecAgent/tools/tools.yaml new file mode 100644 index 000000000..c326d5506 --- /dev/null +++ b/WorkflowExecAgent/tools/tools.yaml @@ -0,0 +1,14 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +workflow_executor: + description: "Starts a workflow with the given workflow id and params. Gets the output result of the workflow." + callable_api: tools.py:workflow_executor + args_schema: + workflow_id: + type: int + description: Workflow id + params: + type: dict + description: Workflow parameters. + return_output: workflow_data diff --git a/WorkflowExecAgent/tools/utils/handle_requests.py b/WorkflowExecAgent/tools/utils/handle_requests.py new file mode 100644 index 000000000..e9806d09d --- /dev/null +++ b/WorkflowExecAgent/tools/utils/handle_requests.py @@ -0,0 +1,76 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from functools import wraps + +import requests + + +class RequestHandler: + """Class for handling requests. + + Attributes: + base_url (string): The url of the API. + api_key (string): Secret token. + """ + + def __init__(self, base_url: str, api_key: str): + self.base_url = base_url + self.api_key = api_key + + def _make_request(self, endpoint, method="GET", data=None, stream=False): + url = f"{self.base_url}{endpoint}" + headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"} + + error = "" + + if method == "GET": + response = requests.get(url, headers=headers) + elif method == "POST": + response = requests.post(url, data, headers=headers, stream=stream) + elif method == "PUT": + response = requests.put(url, data, headers=headers) + elif method == "DELETE": + response = requests.delete(url, headers=headers) + else: + raise ValueError(f"error: Invalid HTTP method {method}") + + @self._handle_request + def check_status(response): + response.raise_for_status() + + error = check_status(response) + + if error: + return error + + else: + try: + response.json() + return response.json() + except: + return response + + def _handle_request(self, func): + @wraps(func) + def decorated(response=None, *args, **kwargs): + if response is not None: + try: + return func(response, *args, **kwargs) + + except requests.exceptions.HTTPError as errh: + error = {"error": f"{response.status_code} {response.reason} HTTP Error {errh}"} + except requests.exceptions.ConnectionError as errc: + error = {"error": f"{response.status_code} {response.reason} Connection Error {errc}"} + except requests.exceptions.Timeout as errt: + error = {"error": f"{response.status_code} {response.reason} Timeout Error {errt}"} + except requests.exceptions.ChunkedEncodingError as errck: + error = {"error": f"Invalid chunk encoding: {str(errck)}"} + except requests.exceptions.RequestException as err: + error = {"error": f"{response.status_code} {response.reason} {err}"} + except Exception as err: + error = err + + return error + + return decorated