Multi-protocol load and stress automation: Locust + WebSocket + gRPC + MQTT + raw sockets, plus a JSON-driven action executor with batteries included.
LoadDensity (je_load_density) started as a Locust wrapper and grew into a full multi-protocol load framework: HTTP, FastHttp, WebSocket, gRPC, MQTT, and raw TCP/UDP user templates behind one JSON-driven action executor, plus modules for parameterised data, scenario flow, reports, observability, distributed runners, recording, persistent storage, and an MCP control surface so Claude can drive load tests end-to-end. Every executor command has a deterministic name (LD_*) and a single dispatch point, so an action JSON can mix protocols, exporters, and reports in the same script.
Optional dependencies, opt-in install — every protocol driver and exporter ships behind a
pip install je_load_density[<extra>]extra. The base install footprint is unchanged for users who only need HTTP load testing.
- Highlights
- Installation
- Architecture
- Quick Start
- Core API
- Action Executor
- User Templates
- Parameter Resolver
- Scenario Modes
- Assertions & Extractors
- Reports
- Observability
- Distributed Master / Worker
- HAR Record / Replay
- Persistent Records (SQLite)
- MCP Server (for Claude)
- Hardened Control Socket
- GUI
- CLI Usage
- Test Record
- Exception Handling
- Logging
- Supported Platforms
- License
- One executor, six protocols — HTTP, FastHttp, WebSocket, gRPC, MQTT, raw TCP/UDP — all dispatched from the same
LD_start_testcommand via auserkey. - JSON-driven — Every test is an action JSON list; the same script can be hand-authored, generated by HAR import, scheduled by an MCP tool, or sent over the control socket.
- Parameter resolver —
${var.x},${env.X},${csv.source.col},${faker.method}, plus built-in${uuid()},${now()},${randint(min,max)}helpers; values can also be extracted from responses and reused downstream. - Scenario flow — Declare tasks as
sequence(default),weighted, orconditional(run_if/skip_ifpredicates) without touching Python. - Six report formats — HTML, JSON, XML, CSV, JUnit XML, and a percentile-summary JSON. The summary covers totals, failure rate, and per-name p50 / p90 / p95 / p99 latencies for trend tracking.
- Three exporters — Prometheus HTTP endpoint, InfluxDB line-protocol UDP/HTTP sink, and OpenTelemetry OTLP gRPC exporter.
- Distributed runners —
runner_mode="master"/"worker"for cross-machine load with the same start_test API. - HAR record / replay — Convert real browser traffic into a runnable action JSON with regex include/exclude filters.
- Persistent records — Optional SQLite sink with run / record / metadata schema for cross-run regression checks.
- MCP server —
python -m je_load_density.mcp_serverexposes 11 tools so Claude (Desktop, Code, any MCP client) can drive LoadDensity end-to-end. - Hardened control socket — Length-prefixed framing, optional TLS, shared-secret token (env or arg), with backwards-compatible legacy mode for existing IDE integrations such as PyBreeze.
- Live GUI — Optional PySide6 GUI with a live stats panel (RPS / avg / p95 / failures), translated to English, Traditional Chinese, Japanese, and Korean.
- CLI subcommands —
run/run-dir/run-str/init/serve. Legacy-e/-d/-c/--execute_strflags remain for downstream tools.
pip install je_load_densityPulls in Locust and defusedxml — nothing else.
| Extra | Adds |
|---|---|
gui |
PySide6 + qt-material (graphical front-end) |
websocket |
websocket-client (WebSocket user template) |
grpc |
grpcio + protobuf (gRPC user template) |
mqtt |
paho-mqtt (MQTT user template) |
prometheus |
prometheus-client (Prometheus exporter) |
opentelemetry |
OpenTelemetry SDK + OTLP gRPC exporter |
metrics |
prometheus + opentelemetry bundle |
faker |
Faker (powers ${faker.method} placeholders) |
mcp |
mcp SDK (drives the MCP server) |
all |
Everything above |
pip install "je_load_density[gui]"
pip install "je_load_density[mqtt,grpc,websocket]"
pip install "je_load_density[metrics]"
pip install "je_load_density[mcp]"
pip install "je_load_density[all]"git clone https://github.com/Integration-Automation/LoadDensity.git
cd LoadDensity
pip install -e ".[all]"
pip install -r requirements.txtflowchart TD
subgraph Entry["Entry Surfaces"]
CLI[CLI]
MCP[MCP Server]
GUI[GUI]
SOCK[Control Socket]
end
Entry -- "action JSON" --> EXEC["Action Executor<br/>(LD_* dispatch + safe builtins)"]
EXEC -- "start_test" --> WRAPPER["locust_wrapper_proxy<br/>(per-protocol task store)"]
WRAPPER --> HTTP["HTTP / FastHttp"]
WRAPPER --> WS["WebSocket"]
WRAPPER --> GRPC["gRPC"]
WRAPPER --> MQTT["MQTT"]
WRAPPER --> RAW["Raw TCP / UDP"]
HTTP -- "Locust events" --> BUS([Locust Event Bus])
WS --> BUS
GRPC --> BUS
MQTT --> BUS
RAW --> BUS
BUS --> REC["test_record_instance"]
BUS --> METRICS["Prometheus / InfluxDB / OTel"]
REC --> REPORTS["HTML / JSON / XML / CSV / JUnit / Summary reports"]
REC --> SQLITE[("SQLite persistence<br/>(cross-run comparison)")]
The dependency direction always points from the action layer down to Locust, never the other way around.
from je_load_density import start_test
start_test(
user_detail_dict={"user": "fast_http_user"},
user_count=50,
spawn_rate=10,
test_time=30,
variables={"base": "https://httpbin.org"},
tasks=[
{"method": "get", "request_url": "${var.base}/get"},
{"method": "post", "request_url": "${var.base}/post",
"json": {"hello": "world"},
"assertions": [{"type": "status_code", "value": 200}]},
],
){"load_density": [
["LD_register_variables", {"variables": {"base": "https://httpbin.org"}}],
["LD_start_test", {
"user_detail_dict": {"user": "fast_http_user"},
"user_count": 20, "spawn_rate": 10, "test_time": 30,
"tasks": [
{"method": "get", "request_url": "${var.base}/get"},
{"method": "post", "request_url": "${var.base}/post",
"json": {"hello": "world"}}
]
}],
["LD_generate_summary_report", {"report_name": "smoke"}]
]}Run via the CLI:
python -m je_load_density run smoke.jsonfrom je_load_density import (
start_test, prepare_env, create_env,
execute_action, execute_files, executor, add_command_to_executor,
test_record_instance, locust_wrapper_proxy,
register_variable, register_variables,
register_csv_source, register_csv_sources,
parameter_resolver, resolve,
har_to_action_json, har_to_tasks, load_har,
persist_records, list_runs, fetch_run_records,
start_prometheus_exporter, stop_prometheus_exporter,
start_influxdb_sink, stop_influxdb_sink,
start_opentelemetry_exporter, stop_opentelemetry_exporter,
start_load_density_socket_server,
generate_html_report, generate_json_report, generate_xml_report,
generate_csv_report, generate_junit_report, generate_summary_report,
build_summary,
create_project_dir, callback_executor, read_action_json,
)__all__ documents the full public surface in je_load_density/__init__.py.
The action executor maps command strings to callable functions. Every action is a list:
["command_name"] # No parameters
["command_name", {"key": "value"}] # Keyword arguments
["command_name", [arg1, arg2]] # Positional argumentsThe top-level document is either a bare list or {"load_density": [...]}.
| Group | Commands |
|---|---|
| Core | LD_start_test, LD_execute_action, LD_execute_files, LD_add_package_to_executor, LD_start_socket_server |
| Reports | LD_generate_html(_report), LD_generate_json(_report), LD_generate_xml(_report), LD_generate_csv_report, LD_generate_junit_report, LD_generate_summary_report, LD_summary |
| Persistence | LD_persist_records, LD_list_runs, LD_fetch_run_records, LD_clear_records |
| Parameters | LD_register_variable(s), LD_register_csv_source(s), LD_clear_resolver |
| Recording | LD_load_har, LD_har_to_tasks, LD_har_to_action_json |
| Metrics | LD_start/stop_prometheus_exporter, LD_start/stop_influxdb_sink, LD_start/stop_opentelemetry_exporter |
Safe Python built-ins (print, len, range, …) are also accepted; eval, exec, compile, __import__, breakpoint, open, and input are explicitly blocked.
from je_load_density import add_command_to_executor
def slack_notify(message: str) -> None:
...
add_command_to_executor({"LD_slack_notify": slack_notify})Every template registers under start_test via user_detail_dict={"user": "<key>"}. Tasks share the same shape across HTTP, WebSocket, gRPC, MQTT, and raw socket users; only the protocol-specific fields differ.
start_test(
user_detail_dict={"user": "fast_http_user"},
user_count=50, spawn_rate=10, test_time=60,
variables={"base": "https://api.example.com"},
tasks=[
{"method": "post", "request_url": "${var.base}/login",
"json": {"email": "u@example.com", "password": "secret"},
"extract": [{"var": "auth", "from": "json_path", "path": "data.token"}]},
{"method": "get", "request_url": "${var.base}/profile",
"headers": {"Authorization": "Bearer ${var.auth}"},
"assertions": [{"type": "status_code", "value": 200}]},
],
)pip install je_load_density[websocket]
start_test(
user_detail_dict={"user": "websocket_user"},
user_count=10, spawn_rate=5, test_time=60,
tasks=[
{"method": "connect", "request_url": "wss://echo.example.com/socket"},
{"method": "sendrecv", "payload": '{"ping": 1}', "expect": "pong"},
{"method": "close"},
],
)pip install je_load_density[grpc]
start_test(
user_detail_dict={"user": "grpc_user"},
user_count=20, spawn_rate=5, test_time=60,
tasks=[{
"name": "say_hello",
"target": "localhost:50051",
"stub_path": "pkg.greeter_pb2_grpc.GreeterStub",
"request_path": "pkg.greeter_pb2.HelloRequest",
"method": "SayHello",
"payload": {"name": "world"},
"metadata": [["x-token", "abc"]],
"timeout": 5,
}],
)stub_path and request_path are validated against a strict identifier regex before importlib.import_module, so traversal-style attacks are rejected.
pip install je_load_density[mqtt]
start_test(
user_detail_dict={"user": "mqtt_user"},
user_count=10, spawn_rate=5, test_time=60,
tasks=[
{"method": "connect", "broker": "127.0.0.1:1883"},
{"method": "subscribe", "topic": "telemetry/in", "qos": 1},
{"method": "publish", "topic": "telemetry/out", "payload": "ping", "qos": 1},
{"method": "disconnect"},
],
)Stdlib only; nothing to install.
start_test(
user_detail_dict={"user": "socket_user"},
user_count=20, spawn_rate=5, test_time=60,
tasks=[
{"protocol": "tcp", "target": "127.0.0.1:9000",
"payload": "PING\n", "expect_bytes": 64,
"expect_substring": "PONG"},
{"protocol": "udp", "target": "127.0.0.1:9000",
"payload": "hex:DEADBEEF", "expect_bytes": 4},
],
)Placeholders are expanded automatically on every task:
| Placeholder | Resolves to |
|---|---|
${var.NAME} |
Value passed to register_variable(s) |
${env.NAME} |
Environment variable NAME |
${csv.SOURCE.COL} |
Next row from CSV source SOURCE (cycles by default) |
${faker.METHOD} |
Faker().METHOD() (lazy import) |
${uuid()} |
New UUID 4 string |
${now()} |
Local ISO-8601 timestamp (seconds) |
${randint(min, max)} |
Cryptographically-strong random int |
from je_load_density import register_variable, register_csv_source
register_variable("base", "https://api.example.com")
register_csv_source("users", "users.csv")Or from action JSON:
["LD_register_variables", {"variables": {"base": "https://api.example.com"}}]
["LD_register_csv_sources", {"sources": [{"name": "users", "file_path": "users.csv"}]}]Unknown placeholders are left in place so missing data is visible during a dry run.
{
"mode": "weighted",
"tasks": [
{"method": "get", "request_url": "/products", "weight": 3},
{"method": "get", "request_url": "/expensive", "weight": 1}
]
}| Mode | Behaviour |
|---|---|
sequence |
Run every task in order each tick (default) |
weighted |
Pick one task per tick by weight |
conditional |
Use run_if / skip_if predicates evaluated against the parameter resolver |
Predicates: bool, "${var.x}", {"equals": [a,b]}, {"not_equals": [a,b]}, {"in": [needle, haystack]}, {"truthy": value}.
Both run under Locust's catch_response; failed assertions surface in every report.
{
"method": "post",
"request_url": "${var.base}/login",
"json": {"email": "u@example.com", "password": "secret"},
"assertions": [
{"type": "status_code", "value": 200},
{"type": "json_path", "path": "data.role", "value": "admin"}
],
"extract": [
{"var": "auth_token", "from": "json_path", "path": "data.token"},
{"var": "request_id", "from": "header", "name": "X-Request-Id"}
]
}Assertion types: status_code, contains, not_contains, json_path, header. Extractor sources: json_path, header, status_code.
Six formats consumed from test_record_instance:
from je_load_density import (
generate_html_report, generate_json_report, generate_xml_report,
generate_csv_report, generate_junit_report, generate_summary_report,
)
generate_html_report("report") # report.html
generate_json_report("report") # report_success.json + report_failure.json
generate_xml_report("report") # report_success.xml + report_failure.xml
generate_csv_report("report") # report.csv
generate_junit_report("report-junit") # report-junit.xml (CI)
generate_summary_report("report-sum") # totals + per-name p50/p90/p95/p99from je_load_density import (
start_prometheus_exporter, start_influxdb_sink, start_opentelemetry_exporter,
)
start_prometheus_exporter(port=9646, addr="127.0.0.1")
start_influxdb_sink(transport="udp", host="influxdb", port=8089)
start_opentelemetry_exporter(endpoint="http://otel-collector:4317",
service_name="loaddensity")| Sink | Metrics |
|---|---|
| Prometheus | loaddensity_requests_total, loaddensity_request_latency_ms, loaddensity_response_bytes |
| InfluxDB | loaddensity_request line-protocol points (UDP or HTTP) |
| OTel | loaddensity.requests, loaddensity.request.latency, loaddensity.response.size |
All three are loaded lazily and gated by the matching install extra.
# master
start_test(
user_detail_dict={"user": "fast_http_user"},
runner_mode="master",
master_bind_host="0.0.0.0", master_bind_port=5557,
expected_workers=4,
web_ui_dict={"host": "0.0.0.0", "port": 8089},
user_count=400, spawn_rate=40, test_time=600,
tasks=[...],
)
# worker
start_test(
user_detail_dict={"user": "fast_http_user"},
runner_mode="worker",
master_host="10.0.0.10", master_port=5557,
tasks=[...],
)The master waits up to 60 s for expected_workers workers to register before starting the load ramp.
from je_load_density import load_har, har_to_action_json
har = load_har("recording.har")
action_json = har_to_action_json(
har,
user="fast_http_user",
user_count=20, spawn_rate=10, test_time=120,
include=[r"api\.example\.com"],
exclude=[r"\.svg$"],
)Captures from Chrome / Firefox DevTools, mitmproxy, Charles, etc. all work. Status codes flow through as status_code assertions on every generated task.
from je_load_density import persist_records, list_runs, fetch_run_records
run_id = persist_records(
"loadtests.db",
label="checkout-2026-04-28",
metadata={"branch": "dev", "commit": "abc1234"},
)
for row in list_runs("loadtests.db", limit=10):
print(row)Schema is created lazily; an empty file is fine. Indexes on run_id and name keep cross-run queries fast.
pip install "je_load_density[mcp]"
python -m je_load_density.mcp_serverWire it into Claude Desktop / Code:
{
"mcpServers": {
"loaddensity": {
"command": "python",
"args": ["-m", "je_load_density.mcp_server"]
}
}
}Eleven tools are exposed: run_test, run_action_json, create_project, list_executor_commands, import_har, generate_reports, summary, persist_records, list_runs, fetch_run, clear_records.
python -m je_load_density serve \
--host 0.0.0.0 --port 9940 --framed \
--token "$LOAD_DENSITY_SOCKET_TOKEN" \
--tls-cert /etc/loaddensity/server.crt \
--tls-key /etc/loaddensity/server.key- 4-byte big-endian length-prefixed frames (1 MiB cap)
- Optional TLS (cert/key on disk;
ssl.create_default_context, TLS 1.2+ minimum) - Shared-secret token compared with
hmac.compare_digest; once configured, all payloads must use{"token": "...", "command": [...]}and may set"op": "quit"to stop the server - Token also reads from
LOAD_DENSITY_SOCKET_TOKENenv var - Legacy unauthenticated mode preserved for backwards compatibility
pip install "je_load_density[gui]"import sys
from PySide6.QtWidgets import QApplication
from je_load_density.gui.main_window import LoadDensityUI
app = QApplication(sys.argv)
window = LoadDensityUI()
window.show()
sys.exit(app.exec())The GUI ships English, Traditional Chinese, Japanese, and Korean translations and a live stats panel that polls test_record_instance once a second (RPS, average / p95 latency, failure count).
python -m je_load_density run FILE # execute one action JSON file
python -m je_load_density run-dir DIR # execute every .json in DIR
python -m je_load_density run-str JSON # execute an inline JSON string
python -m je_load_density init PATH # scaffold a project skeleton
python -m je_load_density serve [--host ...] # start the control socket
Legacy single-flag form (-e/-d/-c/--execute_str) is still accepted for backwards compatibility with downstream tools.
test_record_instance.test_record_list and error_record_list collect every request with Method, test_url, name, status_code, response_time_ms, response_length, and (for failures) error. Reports and the SQLite sink read directly from these lists.
LoadDensityTestException
├── LoadDensityTestJsonException
├── LoadDensityGenerateJsonReportException
├── LoadDensityTestExecuteException
├── LoadDensityAssertException
├── LoadDensityHTMLException
├── LoadDensityAddCommandException
├── XMLException → XMLTypeException
└── CallbackExecutorException
All custom exceptions inherit from LoadDensityTestException; catching that one class covers the public surface.
LoadDensity exposes a single configured logger (load_density_logger) under je_load_density.utils.logging.loggin_instance. Hook it into your existing log infrastructure with the standard logging module APIs.
| Platform | Status |
|---|---|
| Windows 10 / 11 | Fully supported |
| macOS | Fully supported |
| Ubuntu / Linux | Fully supported |
| Raspberry Pi | Tested on 3B+ and later |
Python 3.10+ required.
MIT — see LICENSE.