mirror of
https://github.com/langgenius/dify.git
synced 2026-03-18 22:07:02 +00:00
Compare commits
1 Commits
3-18-dev-w
...
fix-kpa-is
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
74daee3a1d |
@@ -4,6 +4,8 @@ from core.helper.code_executor.template_transformer import TemplateTransformer
|
||||
|
||||
|
||||
class NodeJsTemplateTransformer(TemplateTransformer):
|
||||
_comment_prefix: str = "//"
|
||||
|
||||
@classmethod
|
||||
def get_runner_script(cls) -> str:
|
||||
runner_script = dedent(f""" {cls._code_placeholder}
|
||||
|
||||
@@ -31,7 +31,7 @@ class Jinja2TemplateTransformer(TemplateTransformer):
|
||||
script = script.replace(cls._template_b64_placeholder, code_b64)
|
||||
inputs_str = cls.serialize_inputs(inputs)
|
||||
script = script.replace(cls._inputs_placeholder, inputs_str)
|
||||
return script
|
||||
return cls._generate_anti_kpa_padding() + script
|
||||
|
||||
@classmethod
|
||||
def get_runner_script(cls) -> str:
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import json
|
||||
import re
|
||||
import secrets
|
||||
from abc import ABC, abstractmethod
|
||||
from base64 import b64encode
|
||||
from collections.abc import Mapping
|
||||
@@ -7,11 +8,16 @@ from typing import Any
|
||||
|
||||
from dify_graph.variables.utils import dumps_with_segments
|
||||
|
||||
# Minimum random padding bytes to exceed the sandbox XOR key length (64 bytes).
|
||||
_ANTI_KPA_MIN_PADDING = 512
|
||||
_ANTI_KPA_JITTER = 256
|
||||
|
||||
|
||||
class TemplateTransformer(ABC):
|
||||
_code_placeholder: str = "{{code}}"
|
||||
_inputs_placeholder: str = "{{inputs}}"
|
||||
_result_tag: str = "<<RESULT>>"
|
||||
_comment_prefix: str = "#"
|
||||
|
||||
@classmethod
|
||||
def serialize_code(cls, code: str) -> str:
|
||||
@@ -106,6 +112,12 @@ class TemplateTransformer(ABC):
|
||||
input_base64_encoded = b64encode(inputs_json_str).decode("utf-8")
|
||||
return input_base64_encoded
|
||||
|
||||
@classmethod
|
||||
def _generate_anti_kpa_padding(cls) -> str:
|
||||
padding_size = secrets.randbelow(_ANTI_KPA_JITTER) + _ANTI_KPA_MIN_PADDING
|
||||
nonce = secrets.token_hex(padding_size)
|
||||
return f"{cls._comment_prefix} {nonce}\n"
|
||||
|
||||
@classmethod
|
||||
def assemble_runner_script(cls, code: str, inputs: Mapping[str, Any]) -> str:
|
||||
# assemble runner script
|
||||
@@ -113,7 +125,7 @@ class TemplateTransformer(ABC):
|
||||
script = script.replace(cls._code_placeholder, code)
|
||||
inputs_str = cls.serialize_inputs(inputs)
|
||||
script = script.replace(cls._inputs_placeholder, inputs_str)
|
||||
return script
|
||||
return cls._generate_anti_kpa_padding() + script
|
||||
|
||||
@classmethod
|
||||
def get_preload_script(cls) -> str:
|
||||
|
||||
@@ -8,5 +8,17 @@ def test_get_runner_script():
|
||||
script = NodeJsTemplateTransformer.assemble_runner_script(code, inputs)
|
||||
script_lines = script.splitlines()
|
||||
code_lines = code.splitlines()
|
||||
# Check that the first lines of script are exactly the same as code
|
||||
assert script_lines[: len(code_lines)] == code_lines
|
||||
# First line is a random anti-KPA padding comment using JS syntax
|
||||
assert script_lines[0].startswith("// ")
|
||||
# User code follows immediately after the padding line
|
||||
assert script_lines[1 : 1 + len(code_lines)] == code_lines
|
||||
|
||||
|
||||
def test_anti_kpa_padding_is_unique():
|
||||
code = JavascriptCodeProvider.get_default_code()
|
||||
inputs = {"arg1": "a", "arg2": "b"}
|
||||
script_a = NodeJsTemplateTransformer.assemble_runner_script(code, inputs)
|
||||
script_b = NodeJsTemplateTransformer.assemble_runner_script(code, inputs)
|
||||
padding_a = script_a.splitlines()[0]
|
||||
padding_b = script_b.splitlines()[0]
|
||||
assert padding_a != padding_b, "Each assembled script must have unique random padding"
|
||||
|
||||
@@ -8,5 +8,17 @@ def test_get_runner_script():
|
||||
script = Python3TemplateTransformer.assemble_runner_script(code, inputs)
|
||||
script_lines = script.splitlines()
|
||||
code_lines = code.splitlines()
|
||||
# Check that the first lines of script are exactly the same as code
|
||||
assert script_lines[: len(code_lines)] == code_lines
|
||||
# First line is a random anti-KPA padding comment
|
||||
assert script_lines[0].startswith("# ")
|
||||
# User code follows immediately after the padding line
|
||||
assert script_lines[1 : 1 + len(code_lines)] == code_lines
|
||||
|
||||
|
||||
def test_anti_kpa_padding_is_unique():
|
||||
code = Python3CodeProvider.get_default_code()
|
||||
inputs = {"arg1": "a", "arg2": "b"}
|
||||
script_a = Python3TemplateTransformer.assemble_runner_script(code, inputs)
|
||||
script_b = Python3TemplateTransformer.assemble_runner_script(code, inputs)
|
||||
padding_a = script_a.splitlines()[0]
|
||||
padding_b = script_b.splitlines()[0]
|
||||
assert padding_a != padding_b, "Each assembled script must have unique random padding"
|
||||
|
||||
Reference in New Issue
Block a user