Compare commits

...

3 Commits

4 changed files with 39 additions and 14 deletions

View File

@@ -8,6 +8,7 @@ on:
- "deploy/enterprise"
- "build/**"
- "release/e-*"
- "hotfix/**"
tags:
- "*"

View File

@@ -1,9 +1,11 @@
import contextvars
import logging
from collections.abc import Generator, Mapping, Sequence
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any, NewType, cast
from flask import Flask, current_app
from typing_extensions import TypeIs
from core.variables import IntegerVariable, NoneSegment
@@ -35,6 +37,7 @@ from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
from core.workflow.nodes.base.node import Node
from core.workflow.nodes.iteration.entities import ErrorHandleMode, IterationNodeData
from libs.datetime_utils import naive_utc_now
from libs.flask_utils import preserve_flask_contexts
from .exc import (
InvalidIteratorValueError,
@@ -239,6 +242,8 @@ class IterationNode(Node):
self._execute_single_iteration_parallel,
index=index,
item=item,
flask_app=current_app._get_current_object(), # type: ignore
context_vars=contextvars.copy_context(),
)
future_to_index[future] = index
@@ -281,8 +286,11 @@ class IterationNode(Node):
self,
index: int,
item: object,
flask_app: Flask,
context_vars: contextvars.Context,
) -> tuple[datetime, list[GraphNodeEventBase], object | None, int]:
"""Execute a single iteration in parallel mode and return results."""
with preserve_flask_contexts(flask_app=flask_app, context_vars=context_vars):
iter_start_at = datetime.now(UTC).replace(tzinfo=None)
events: list[GraphNodeEventBase] = []
outputs_temp: list[object] = []

View File

@@ -262,6 +262,14 @@ class VariableTruncator:
target_length = self._array_element_limit
for i, item in enumerate(value):
# Dirty fix:
# The output of `Start` node may contain list of `File` elements,
# causing `AssertionError` while invoking `_truncate_json_primitives`.
#
# This check ensures that `list[File]` are handled separately
if isinstance(item, File):
truncated_value.append(item)
continue
if i >= target_length:
return _PartResult(truncated_value, used_size, True)
if i > 0:

View File

@@ -588,3 +588,11 @@ class TestIntegrationScenarios:
if isinstance(result.result, ObjectSegment):
result_size = truncator.calculate_json_size(result.result.value)
assert result_size <= original_size
def test_file_and_array_file_variable_mapping(self, file):
truncator = VariableTruncator(string_length_limit=30, array_element_limit=3, max_size_bytes=300)
mapping = {"array_file": [file]}
truncated_mapping, truncated = truncator.truncate_variable_mapping(mapping)
assert truncated is False
assert truncated_mapping == mapping