Compare commits

...

1 Commits

Author SHA1 Message Date
hj24
74daee3a1d fix: kpa issue 2026-03-17 21:21:23 +08:00
5 changed files with 44 additions and 6 deletions

View File

@@ -4,6 +4,8 @@ from core.helper.code_executor.template_transformer import TemplateTransformer
class NodeJsTemplateTransformer(TemplateTransformer):
_comment_prefix: str = "//"
@classmethod
def get_runner_script(cls) -> str:
runner_script = dedent(f""" {cls._code_placeholder}

View File

@@ -31,7 +31,7 @@ class Jinja2TemplateTransformer(TemplateTransformer):
script = script.replace(cls._template_b64_placeholder, code_b64)
inputs_str = cls.serialize_inputs(inputs)
script = script.replace(cls._inputs_placeholder, inputs_str)
return script
return cls._generate_anti_kpa_padding() + script
@classmethod
def get_runner_script(cls) -> str:

View File

@@ -1,5 +1,6 @@
import json
import re
import secrets
from abc import ABC, abstractmethod
from base64 import b64encode
from collections.abc import Mapping
@@ -7,11 +8,16 @@ from typing import Any
from dify_graph.variables.utils import dumps_with_segments
# Minimum random padding bytes to exceed the sandbox XOR key length (64 bytes).
_ANTI_KPA_MIN_PADDING = 512
_ANTI_KPA_JITTER = 256
class TemplateTransformer(ABC):
_code_placeholder: str = "{{code}}"
_inputs_placeholder: str = "{{inputs}}"
_result_tag: str = "<<RESULT>>"
_comment_prefix: str = "#"
@classmethod
def serialize_code(cls, code: str) -> str:
@@ -106,6 +112,12 @@ class TemplateTransformer(ABC):
input_base64_encoded = b64encode(inputs_json_str).decode("utf-8")
return input_base64_encoded
@classmethod
def _generate_anti_kpa_padding(cls) -> str:
padding_size = secrets.randbelow(_ANTI_KPA_JITTER) + _ANTI_KPA_MIN_PADDING
nonce = secrets.token_hex(padding_size)
return f"{cls._comment_prefix} {nonce}\n"
@classmethod
def assemble_runner_script(cls, code: str, inputs: Mapping[str, Any]) -> str:
# assemble runner script
@@ -113,7 +125,7 @@ class TemplateTransformer(ABC):
script = script.replace(cls._code_placeholder, code)
inputs_str = cls.serialize_inputs(inputs)
script = script.replace(cls._inputs_placeholder, inputs_str)
return script
return cls._generate_anti_kpa_padding() + script
@classmethod
def get_preload_script(cls) -> str:

View File

@@ -8,5 +8,17 @@ def test_get_runner_script():
script = NodeJsTemplateTransformer.assemble_runner_script(code, inputs)
script_lines = script.splitlines()
code_lines = code.splitlines()
# Check that the first lines of script are exactly the same as code
assert script_lines[: len(code_lines)] == code_lines
# First line is a random anti-KPA padding comment using JS syntax
assert script_lines[0].startswith("// ")
# User code follows immediately after the padding line
assert script_lines[1 : 1 + len(code_lines)] == code_lines
def test_anti_kpa_padding_is_unique():
code = JavascriptCodeProvider.get_default_code()
inputs = {"arg1": "a", "arg2": "b"}
script_a = NodeJsTemplateTransformer.assemble_runner_script(code, inputs)
script_b = NodeJsTemplateTransformer.assemble_runner_script(code, inputs)
padding_a = script_a.splitlines()[0]
padding_b = script_b.splitlines()[0]
assert padding_a != padding_b, "Each assembled script must have unique random padding"

View File

@@ -8,5 +8,17 @@ def test_get_runner_script():
script = Python3TemplateTransformer.assemble_runner_script(code, inputs)
script_lines = script.splitlines()
code_lines = code.splitlines()
# Check that the first lines of script are exactly the same as code
assert script_lines[: len(code_lines)] == code_lines
# First line is a random anti-KPA padding comment
assert script_lines[0].startswith("# ")
# User code follows immediately after the padding line
assert script_lines[1 : 1 + len(code_lines)] == code_lines
def test_anti_kpa_padding_is_unique():
code = Python3CodeProvider.get_default_code()
inputs = {"arg1": "a", "arg2": "b"}
script_a = Python3TemplateTransformer.assemble_runner_script(code, inputs)
script_b = Python3TemplateTransformer.assemble_runner_script(code, inputs)
padding_a = script_a.splitlines()[0]
padding_b = script_b.splitlines()[0]
assert padding_a != padding_b, "Each assembled script must have unique random padding"