Skip to content

Integration-Automation/LoadDensity

LoadDensity

Multi-protocol load and stress automation: Locust + WebSocket + gRPC + MQTT + raw sockets, plus a JSON-driven action executor with batteries included.

PyPI Version Python Version License Documentation Status

繁體中文 | 简体中文


LoadDensity (je_load_density) started as a Locust wrapper and grew into a full multi-protocol load framework: HTTP, FastHttp, WebSocket, gRPC, MQTT, and raw TCP/UDP user templates behind one JSON-driven action executor, plus modules for parameterised data, scenario flow, reports, observability, distributed runners, recording, persistent storage, and an MCP control surface so Claude can drive load tests end-to-end. Every executor command has a deterministic name (LD_*) and a single dispatch point, so an action JSON can mix protocols, exporters, and reports in the same script.

Optional dependencies, opt-in install — every protocol driver and exporter ships behind a pip install je_load_density[<extra>] extra. The base install footprint is unchanged for users who only need HTTP load testing.

Table of Contents

Highlights

  • One executor, six protocols — HTTP, FastHttp, WebSocket, gRPC, MQTT, raw TCP/UDP — all dispatched from the same LD_start_test command via a user key.
  • JSON-driven — Every test is an action JSON list; the same script can be hand-authored, generated by HAR import, scheduled by an MCP tool, or sent over the control socket.
  • Parameter resolver${var.x}, ${env.X}, ${csv.source.col}, ${faker.method}, plus built-in ${uuid()}, ${now()}, ${randint(min,max)} helpers; values can also be extracted from responses and reused downstream.
  • Scenario flow — Declare tasks as sequence (default), weighted, or conditional (run_if / skip_if predicates) without touching Python.
  • Six report formats — HTML, JSON, XML, CSV, JUnit XML, and a percentile-summary JSON. The summary covers totals, failure rate, and per-name p50 / p90 / p95 / p99 latencies for trend tracking.
  • Three exporters — Prometheus HTTP endpoint, InfluxDB line-protocol UDP/HTTP sink, and OpenTelemetry OTLP gRPC exporter.
  • Distributed runnersrunner_mode="master" / "worker" for cross-machine load with the same start_test API.
  • HAR record / replay — Convert real browser traffic into a runnable action JSON with regex include/exclude filters.
  • Persistent records — Optional SQLite sink with run / record / metadata schema for cross-run regression checks.
  • MCP serverpython -m je_load_density.mcp_server exposes 11 tools so Claude (Desktop, Code, any MCP client) can drive LoadDensity end-to-end.
  • Hardened control socket — Length-prefixed framing, optional TLS, shared-secret token (env or arg), with backwards-compatible legacy mode for existing IDE integrations such as PyBreeze.
  • Live GUI — Optional PySide6 GUI with a live stats panel (RPS / avg / p95 / failures), translated to English, Traditional Chinese, Japanese, and Korean.
  • CLI subcommandsrun / run-dir / run-str / init / serve. Legacy -e/-d/-c/--execute_str flags remain for downstream tools.

Installation

pip install je_load_density

Pulls in Locust and defusedxml — nothing else.

Optional extras

Extra Adds
gui PySide6 + qt-material (graphical front-end)
websocket websocket-client (WebSocket user template)
grpc grpcio + protobuf (gRPC user template)
mqtt paho-mqtt (MQTT user template)
prometheus prometheus-client (Prometheus exporter)
opentelemetry OpenTelemetry SDK + OTLP gRPC exporter
metrics prometheus + opentelemetry bundle
faker Faker (powers ${faker.method} placeholders)
mcp mcp SDK (drives the MCP server)
all Everything above
pip install "je_load_density[gui]"
pip install "je_load_density[mqtt,grpc,websocket]"
pip install "je_load_density[metrics]"
pip install "je_load_density[mcp]"
pip install "je_load_density[all]"

Development install

git clone https://github.com/Integration-Automation/LoadDensity.git
cd LoadDensity
pip install -e ".[all]"
pip install -r requirements.txt

Architecture

flowchart TD
    subgraph Entry["Entry Surfaces"]
        CLI[CLI]
        MCP[MCP Server]
        GUI[GUI]
        SOCK[Control Socket]
    end

    Entry -- "action JSON" --> EXEC["Action Executor<br/>(LD_* dispatch + safe builtins)"]
    EXEC -- "start_test" --> WRAPPER["locust_wrapper_proxy<br/>(per-protocol task store)"]

    WRAPPER --> HTTP["HTTP / FastHttp"]
    WRAPPER --> WS["WebSocket"]
    WRAPPER --> GRPC["gRPC"]
    WRAPPER --> MQTT["MQTT"]
    WRAPPER --> RAW["Raw TCP / UDP"]

    HTTP -- "Locust events" --> BUS([Locust Event Bus])
    WS --> BUS
    GRPC --> BUS
    MQTT --> BUS
    RAW --> BUS

    BUS --> REC["test_record_instance"]
    BUS --> METRICS["Prometheus / InfluxDB / OTel"]

    REC --> REPORTS["HTML / JSON / XML / CSV / JUnit / Summary reports"]
    REC --> SQLITE[("SQLite persistence<br/>(cross-run comparison)")]
Loading

The dependency direction always points from the action layer down to Locust, never the other way around.

Quick Start

HTTP load test in Python

from je_load_density import start_test

start_test(
    user_detail_dict={"user": "fast_http_user"},
    user_count=50,
    spawn_rate=10,
    test_time=30,
    variables={"base": "https://httpbin.org"},
    tasks=[
        {"method": "get",  "request_url": "${var.base}/get"},
        {"method": "post", "request_url": "${var.base}/post",
         "json": {"hello": "world"},
         "assertions": [{"type": "status_code", "value": 200}]},
    ],
)

Action JSON

{"load_density": [
  ["LD_register_variables", {"variables": {"base": "https://httpbin.org"}}],
  ["LD_start_test", {
    "user_detail_dict": {"user": "fast_http_user"},
    "user_count": 20, "spawn_rate": 10, "test_time": 30,
    "tasks": [
      {"method": "get",  "request_url": "${var.base}/get"},
      {"method": "post", "request_url": "${var.base}/post",
       "json": {"hello": "world"}}
    ]
  }],
  ["LD_generate_summary_report", {"report_name": "smoke"}]
]}

Run via the CLI:

python -m je_load_density run smoke.json

Core API

from je_load_density import (
    start_test, prepare_env, create_env,
    execute_action, execute_files, executor, add_command_to_executor,
    test_record_instance, locust_wrapper_proxy,
    register_variable, register_variables,
    register_csv_source, register_csv_sources,
    parameter_resolver, resolve,
    har_to_action_json, har_to_tasks, load_har,
    persist_records, list_runs, fetch_run_records,
    start_prometheus_exporter, stop_prometheus_exporter,
    start_influxdb_sink, stop_influxdb_sink,
    start_opentelemetry_exporter, stop_opentelemetry_exporter,
    start_load_density_socket_server,
    generate_html_report, generate_json_report, generate_xml_report,
    generate_csv_report, generate_junit_report, generate_summary_report,
    build_summary,
    create_project_dir, callback_executor, read_action_json,
)

__all__ documents the full public surface in je_load_density/__init__.py.

Action Executor

The action executor maps command strings to callable functions. Every action is a list:

["command_name"]                        # No parameters
["command_name", {"key": "value"}]      # Keyword arguments
["command_name", [arg1, arg2]]          # Positional arguments

The top-level document is either a bare list or {"load_density": [...]}.

Built-in LD_* commands

Group Commands
Core LD_start_test, LD_execute_action, LD_execute_files, LD_add_package_to_executor, LD_start_socket_server
Reports LD_generate_html(_report), LD_generate_json(_report), LD_generate_xml(_report), LD_generate_csv_report, LD_generate_junit_report, LD_generate_summary_report, LD_summary
Persistence LD_persist_records, LD_list_runs, LD_fetch_run_records, LD_clear_records
Parameters LD_register_variable(s), LD_register_csv_source(s), LD_clear_resolver
Recording LD_load_har, LD_har_to_tasks, LD_har_to_action_json
Metrics LD_start/stop_prometheus_exporter, LD_start/stop_influxdb_sink, LD_start/stop_opentelemetry_exporter

Safe Python built-ins (print, len, range, …) are also accepted; eval, exec, compile, __import__, breakpoint, open, and input are explicitly blocked.

Custom commands

from je_load_density import add_command_to_executor

def slack_notify(message: str) -> None:
    ...

add_command_to_executor({"LD_slack_notify": slack_notify})

User Templates

Every template registers under start_test via user_detail_dict={"user": "<key>"}. Tasks share the same shape across HTTP, WebSocket, gRPC, MQTT, and raw socket users; only the protocol-specific fields differ.

HTTP / FastHttp

start_test(
    user_detail_dict={"user": "fast_http_user"},
    user_count=50, spawn_rate=10, test_time=60,
    variables={"base": "https://api.example.com"},
    tasks=[
        {"method": "post", "request_url": "${var.base}/login",
         "json": {"email": "u@example.com", "password": "secret"},
         "extract": [{"var": "auth", "from": "json_path", "path": "data.token"}]},
        {"method": "get", "request_url": "${var.base}/profile",
         "headers": {"Authorization": "Bearer ${var.auth}"},
         "assertions": [{"type": "status_code", "value": 200}]},
    ],
)

WebSocket

pip install je_load_density[websocket]

start_test(
    user_detail_dict={"user": "websocket_user"},
    user_count=10, spawn_rate=5, test_time=60,
    tasks=[
        {"method": "connect", "request_url": "wss://echo.example.com/socket"},
        {"method": "sendrecv", "payload": '{"ping": 1}', "expect": "pong"},
        {"method": "close"},
    ],
)

gRPC

pip install je_load_density[grpc]

start_test(
    user_detail_dict={"user": "grpc_user"},
    user_count=20, spawn_rate=5, test_time=60,
    tasks=[{
        "name": "say_hello",
        "target": "localhost:50051",
        "stub_path": "pkg.greeter_pb2_grpc.GreeterStub",
        "request_path": "pkg.greeter_pb2.HelloRequest",
        "method": "SayHello",
        "payload": {"name": "world"},
        "metadata": [["x-token", "abc"]],
        "timeout": 5,
    }],
)

stub_path and request_path are validated against a strict identifier regex before importlib.import_module, so traversal-style attacks are rejected.

MQTT

pip install je_load_density[mqtt]

start_test(
    user_detail_dict={"user": "mqtt_user"},
    user_count=10, spawn_rate=5, test_time=60,
    tasks=[
        {"method": "connect",   "broker": "127.0.0.1:1883"},
        {"method": "subscribe", "topic":  "telemetry/in", "qos": 1},
        {"method": "publish",   "topic":  "telemetry/out", "payload": "ping", "qos": 1},
        {"method": "disconnect"},
    ],
)

Raw TCP / UDP

Stdlib only; nothing to install.

start_test(
    user_detail_dict={"user": "socket_user"},
    user_count=20, spawn_rate=5, test_time=60,
    tasks=[
        {"protocol": "tcp", "target": "127.0.0.1:9000",
         "payload": "PING\n", "expect_bytes": 64,
         "expect_substring": "PONG"},
        {"protocol": "udp", "target": "127.0.0.1:9000",
         "payload": "hex:DEADBEEF", "expect_bytes": 4},
    ],
)

Parameter Resolver

Placeholders are expanded automatically on every task:

Placeholder Resolves to
${var.NAME} Value passed to register_variable(s)
${env.NAME} Environment variable NAME
${csv.SOURCE.COL} Next row from CSV source SOURCE (cycles by default)
${faker.METHOD} Faker().METHOD() (lazy import)
${uuid()} New UUID 4 string
${now()} Local ISO-8601 timestamp (seconds)
${randint(min, max)} Cryptographically-strong random int
from je_load_density import register_variable, register_csv_source

register_variable("base", "https://api.example.com")
register_csv_source("users", "users.csv")

Or from action JSON:

["LD_register_variables", {"variables": {"base": "https://api.example.com"}}]
["LD_register_csv_sources", {"sources": [{"name": "users", "file_path": "users.csv"}]}]

Unknown placeholders are left in place so missing data is visible during a dry run.

Scenario Modes

{
  "mode": "weighted",
  "tasks": [
    {"method": "get", "request_url": "/products", "weight": 3},
    {"method": "get", "request_url": "/expensive", "weight": 1}
  ]
}
Mode Behaviour
sequence Run every task in order each tick (default)
weighted Pick one task per tick by weight
conditional Use run_if / skip_if predicates evaluated against the parameter resolver

Predicates: bool, "${var.x}", {"equals": [a,b]}, {"not_equals": [a,b]}, {"in": [needle, haystack]}, {"truthy": value}.

Assertions & Extractors

Both run under Locust's catch_response; failed assertions surface in every report.

{
  "method": "post",
  "request_url": "${var.base}/login",
  "json": {"email": "u@example.com", "password": "secret"},
  "assertions": [
    {"type": "status_code", "value": 200},
    {"type": "json_path", "path": "data.role", "value": "admin"}
  ],
  "extract": [
    {"var": "auth_token", "from": "json_path", "path": "data.token"},
    {"var": "request_id", "from": "header",    "name": "X-Request-Id"}
  ]
}

Assertion types: status_code, contains, not_contains, json_path, header. Extractor sources: json_path, header, status_code.

Reports

Six formats consumed from test_record_instance:

from je_load_density import (
    generate_html_report, generate_json_report, generate_xml_report,
    generate_csv_report, generate_junit_report, generate_summary_report,
)

generate_html_report("report")           # report.html
generate_json_report("report")           # report_success.json + report_failure.json
generate_xml_report("report")            # report_success.xml  + report_failure.xml
generate_csv_report("report")            # report.csv
generate_junit_report("report-junit")    # report-junit.xml (CI)
generate_summary_report("report-sum")    # totals + per-name p50/p90/p95/p99

Observability

from je_load_density import (
    start_prometheus_exporter, start_influxdb_sink, start_opentelemetry_exporter,
)

start_prometheus_exporter(port=9646, addr="127.0.0.1")
start_influxdb_sink(transport="udp", host="influxdb", port=8089)
start_opentelemetry_exporter(endpoint="http://otel-collector:4317",
                             service_name="loaddensity")
Sink Metrics
Prometheus loaddensity_requests_total, loaddensity_request_latency_ms, loaddensity_response_bytes
InfluxDB loaddensity_request line-protocol points (UDP or HTTP)
OTel loaddensity.requests, loaddensity.request.latency, loaddensity.response.size

All three are loaded lazily and gated by the matching install extra.

Distributed Master / Worker

# master
start_test(
    user_detail_dict={"user": "fast_http_user"},
    runner_mode="master",
    master_bind_host="0.0.0.0", master_bind_port=5557,
    expected_workers=4,
    web_ui_dict={"host": "0.0.0.0", "port": 8089},
    user_count=400, spawn_rate=40, test_time=600,
    tasks=[...],
)

# worker
start_test(
    user_detail_dict={"user": "fast_http_user"},
    runner_mode="worker",
    master_host="10.0.0.10", master_port=5557,
    tasks=[...],
)

The master waits up to 60 s for expected_workers workers to register before starting the load ramp.

HAR Record / Replay

from je_load_density import load_har, har_to_action_json

har = load_har("recording.har")
action_json = har_to_action_json(
    har,
    user="fast_http_user",
    user_count=20, spawn_rate=10, test_time=120,
    include=[r"api\.example\.com"],
    exclude=[r"\.svg$"],
)

Captures from Chrome / Firefox DevTools, mitmproxy, Charles, etc. all work. Status codes flow through as status_code assertions on every generated task.

Persistent Records (SQLite)

from je_load_density import persist_records, list_runs, fetch_run_records

run_id = persist_records(
    "loadtests.db",
    label="checkout-2026-04-28",
    metadata={"branch": "dev", "commit": "abc1234"},
)
for row in list_runs("loadtests.db", limit=10):
    print(row)

Schema is created lazily; an empty file is fine. Indexes on run_id and name keep cross-run queries fast.

MCP Server (for Claude)

pip install "je_load_density[mcp]"
python -m je_load_density.mcp_server

Wire it into Claude Desktop / Code:

{
  "mcpServers": {
    "loaddensity": {
      "command": "python",
      "args": ["-m", "je_load_density.mcp_server"]
    }
  }
}

Eleven tools are exposed: run_test, run_action_json, create_project, list_executor_commands, import_har, generate_reports, summary, persist_records, list_runs, fetch_run, clear_records.

Hardened Control Socket

python -m je_load_density serve \
    --host 0.0.0.0 --port 9940 --framed \
    --token "$LOAD_DENSITY_SOCKET_TOKEN" \
    --tls-cert /etc/loaddensity/server.crt \
    --tls-key /etc/loaddensity/server.key
  • 4-byte big-endian length-prefixed frames (1 MiB cap)
  • Optional TLS (cert/key on disk; ssl.create_default_context, TLS 1.2+ minimum)
  • Shared-secret token compared with hmac.compare_digest; once configured, all payloads must use {"token": "...", "command": [...]} and may set "op": "quit" to stop the server
  • Token also reads from LOAD_DENSITY_SOCKET_TOKEN env var
  • Legacy unauthenticated mode preserved for backwards compatibility

GUI

pip install "je_load_density[gui]"
import sys
from PySide6.QtWidgets import QApplication
from je_load_density.gui.main_window import LoadDensityUI

app = QApplication(sys.argv)
window = LoadDensityUI()
window.show()
sys.exit(app.exec())

The GUI ships English, Traditional Chinese, Japanese, and Korean translations and a live stats panel that polls test_record_instance once a second (RPS, average / p95 latency, failure count).

CLI Usage

python -m je_load_density run FILE              # execute one action JSON file
python -m je_load_density run-dir DIR           # execute every .json in DIR
python -m je_load_density run-str JSON          # execute an inline JSON string
python -m je_load_density init PATH             # scaffold a project skeleton
python -m je_load_density serve [--host ...]    # start the control socket

Legacy single-flag form (-e/-d/-c/--execute_str) is still accepted for backwards compatibility with downstream tools.

Test Record

test_record_instance.test_record_list and error_record_list collect every request with Method, test_url, name, status_code, response_time_ms, response_length, and (for failures) error. Reports and the SQLite sink read directly from these lists.

Exception Handling

LoadDensityTestException
├── LoadDensityTestJsonException
├── LoadDensityGenerateJsonReportException
├── LoadDensityTestExecuteException
├── LoadDensityAssertException
├── LoadDensityHTMLException
├── LoadDensityAddCommandException
├── XMLException → XMLTypeException
└── CallbackExecutorException

All custom exceptions inherit from LoadDensityTestException; catching that one class covers the public surface.

Logging

LoadDensity exposes a single configured logger (load_density_logger) under je_load_density.utils.logging.loggin_instance. Hook it into your existing log infrastructure with the standard logging module APIs.

Supported Platforms

Platform Status
Windows 10 / 11 Fully supported
macOS Fully supported
Ubuntu / Linux Fully supported
Raspberry Pi Tested on 3B+ and later

Python 3.10+ required.

License

MIT — see LICENSE.