mokksy

Mokksy - Mock HTTP Server, built with Kotlin and Ktor.
Check out the AI-Mocks project for advanced LLM and A2A protocol mocking capabilities.
Full documentation: https://mokksy.dev/docs/mokksy/
[!NOTE] Mokksy server was a part of the AI-Mocks project and has now moved to a separate repository. No artefact relocation is required.
Additional resources:
- API reference: https://mokksy.github.io/mokksy/
- AI-Mocks project: https://github.com/mokksy/ai-mocks/
Table of Contents
- Why Mokksy?
- Key Features
- Quick start
- Responding with predefined responses
- Server-Side Events (SSE) response
- Request Specification Matchers
- Verifying Requests
- Request Journal
- Embedding in an existing Ktor application
- Java API
Why Mokksy?
Wiremock does not support true SSE and streaming responses.
Mokksy is here to address those limitations. Particularly, it might be useful for integration testing LLM clients.
Key Features
- Streaming Support: True support for streaming responses and Server-Side Events (SSE)
- Response Control: Flexibility to control server responses directly via
ApplicationCallobject - Delay Simulation: Support for simulating response delays and delays between chunks
- Modern API: Fluent Kotlin DSL API with Kotest Assertions
- Error Simulation: Ability to mock negative scenarios and error responses
- Specificity-Based Matching: When multiple stubs match a request, Mokksy automatically selects the most specific one — no explicit priority configuration required for common cases
- Ktor Integration: Embed Mokksy into any existing Ktor application via
Application.mokksy()andRoute.mokksy()extension functions — including behind authentication middleware
Quick start
Add dependencies:
Gradle build.gradle.kts:
dependencies { // for multiplatform projects implementation("dev.mokksy:mokksy:$latestVersion") // for JVM projects implementation("dev.mokksy:mokksy-jvm:$latestVersion") }pom.xml:
<dependency> <groupId>dev.mokksy</groupId> <artifactId>mokksy-jvm</artifactId> <version>[LATEST_VERSION]</version> <scope>test</scope> </dependency>Create and start Mokksy server:
Kotlin — all platforms (coroutine-based):
import dev.mokksy.mokksy.Mokksy val mokksy = Mokksy() mokksy.startSuspend() mokksy.awaitStarted() // port() and baseUrl() are safe after this pointKotlin — JVM blocking:
import dev.mokksy.mokksy.Mokksy val mokksy = Mokksy().start()Java — see Java API below.
Configure http client using Mokksy server's as baseUrl in your application:
val client = HttpClient {
install(DefaultRequest) {
url(mokksy.baseUrl())
}
install(ContentNegotiation) {
json()
}
}
Responding with predefined responses
Mokksy supports all HTTP verbs. Here are some examples.
GET request
GET request example:
// given
val expectedResponse =
// language=json
"""
{
"response": "Pong"
}
""".trimIndent()
mokksy.get {
path = beEqual("/ping")
containsHeader("Foo", "bar")
} respondsWith {
body = expectedResponse
}
// when
val result = client.get("/ping") {
headers.append("Foo", "bar")
}
// then
result.status shouldBe HttpStatusCode.OK
result.bodyAsText() shouldBe expectedResponse
When the request does not match - Mokksy server returns 404 (Not Found):
val notFoundResult = client.get("/ping") {
headers.append("Foo", "baz")
}
notFoundResult.status shouldBe HttpStatusCode.NotFound
POST request
POST request example:
// given
val id = Random.nextInt()
val expectedResponse =
// language=json
"""
{
"id": "$id",
"name": "thing-$id"
}
""".trimIndent()
mokksy.post {
path = beEqual("/things")
bodyContains("\"$id\"")
} respondsWith {
body = expectedResponse
httpStatus = HttpStatusCode.Created
headers {
// type-safe builder style
append(HttpHeaders.Location, "/things/$id")
}
headers += "Foo" to "bar" // list style
}
// when
val result =
client.post("/things") {
headers.append("Content-Type", "application/json")
setBody(
// language=json
"""
{
"id": "$id"
}
""".trimIndent(),
)
}
// then
result shouldNotBeNull {
status shouldBe HttpStatusCode.Created
bodyAsText() shouldBe expectedResponse
headers["Location"] shouldBe "/things/$id"
headers["Foo"] shouldBe "bar"
}
Typed request body
When the request body type is known at compile time, use the reified overloads to let the compiler infer the type —
no explicit ::class argument required:
@Serializable
@JvmRecord
data class CreateItemRequest(val name: String)
@Serializable
@JvmRecord
data class CreateItemResponse(val message: String)
val itemName = "Widget"
mokksy.post<CreateItemRequest>(name = "create-item") {
path("/items")
bodyMatchesPredicate("name should match") { it?.name == itemName }
} respondsWith {
body = CreateItemResponse("Hello, $itemName!")
httpStatus = HttpStatusCode.Created
headers += "Foo" to "bar"
}
val result =
client.post("/items") {
contentType(ContentType.Application.Json)
setBody(CreateItemRequest(itemName))
}
result shouldNotBeNull {
status shouldBe HttpStatusCode.Created
headers["Foo"] shouldBe "bar"
body<CreateItemResponse>().message shouldBe "Hello, $itemName!"
}
Reified overloads are provided for all HTTP verbs (get, post, put, delete, patch, head,
options) and the generic method function. Two overloads exist per verb: one taking an optional
stub name (name: String? = null) and one taking a StubConfiguration.
The deserialized request body is accessible inside the response lambda as request.body().
When the type is determined at runtime or when you want an explicit name on the stub,
pass a KClass token using the named requestType parameter:
mokksy.post(requestType = CreateItemRequest::class) {
path("/items")
bodyMatchesPredicate { it?.name == "widget" }
} respondsWith {
body = CreateItemResponse("Hello, widget!")
httpStatus = HttpStatusCode.Created
}
val result =
client.post("/items") {
contentType(ContentType.Application.Json)
setBody(CreateItemRequest("widget"))
}
result.status shouldBe HttpStatusCode.Created
result.body<CreateItemResponse>().message shouldBe "Hello, widget!"
Java callers use CreateItemRequest.class via the Java API:
mokksy.post(CreateItemRequest.class, spec -> spec.path("/items").bodyMatchesPredicate(req -> "widget".equals(req.getName()))).
Deserialization uses Ktor's ContentNegotiation plugin. For projects that use Jackson instead of
kotlinx.serialization, create the server with MokksyJackson.create() (Java API) —
see Jackson support.
When no stub matches and verbose mode is on (Mokksy(verbose = true)), Mokksy logs the closest
partial match and its failed conditions to help diagnose the mismatch.
Status-only responses
Use respondsWithStatus when the test only needs to verify a status code — no body needed.
It's an infix function, so it reads naturally next to the stub definition:
mokksy.get { path("/ping") } respondsWithStatus HttpStatusCode.NoContent.
Java callers use the int overload on JavaBuildingStep:
mokksy.get(spec ->spec.
path("/ping")).
respondsWithStatus(204);
mokksy.
delete(spec ->spec.
path("/item")).
respondsWithStatus(410);
Server-Side Events (SSE) response
Server-Side Events (SSE) is a technology that allows a server to push updates to the client over a single, long-lived HTTP connection. This enables real-time updates without requiring the client to continuously poll the server for new data.
SSE streams events in a standardized format, making it easy for clients to consume the data and handle events as they arrive. It's lightweight and efficient, particularly well-suited for applications requiring real-time updates like live notifications or feed updates.
Server-Side Events (SSE) example:
mokksy.post {
path = beEqual("/sse")
} respondsWithSseStream {
flow =
flow {
delay(200.milliseconds)
emit(
ServerSentEvent(
data = "One",
),
)
delay(50.milliseconds)
emit(
ServerSentEvent(
data = "Two",
),
)
}
}
// when
val result = client.post("/sse")
// then
result shouldNotBeNull {
status shouldBe HttpStatusCode.OK
contentType() shouldBe ContentType.Text.EventStream.withCharsetIfNeeded(Charsets.UTF_8)
bodyAsText() shouldBe "data: One\r\n\r\ndata: Two\r\n\r\n"
}
Long-lived SSE streams
By default, the SSE stream closes when the flow completes.
To keep it open (e.g. for clients that reconnect on close), end the flow with awaitCancellation():
flow = flow {
emit(ServerSentEvent(data = "hello"))
awaitCancellation() // stream stays open until client disconnects
}
Request Specification Matchers
Mokksy provides various matcher types to specify conditions for matching incoming HTTP requests:
- Path matchers —
path("/things")orpath = beEqual("/things") - Header matchers —
containsHeader("X-Request-ID", "abc")checks for a header with an exact value - Content matchers —
bodyContains("value")checks if the raw body string contains a substring;bodyString += contain("value")adds a Kotest matcher directly - Predicate matchers —
bodyMatchesPredicate { it?.name == "foo" }matches against the typed, deserialized request body — see Typed request body for the full API - Call matchers —
successCallMatchermatches if a function called with the body does not throw - Priority —
priority = 10onRequestSpecificationBuildersets theRequestSpecification.priorityof the stub; higher values indicate higher priority. Default is0. Use negative values (e.g.priority = -1) for catch-all / fallback stubs. Priority is a tiebreaker: it applies only when two stubs match with an equal number of conditions satisfied. For most cases, specificity-based matching (see below) selects the right stub automatically.
Stub Specificity
When multiple stubs could match the same request, Mokksy scores each one by counting how many conditions it satisfies, then selects the highest-scoring stub. A stub with two matching conditions beats a stub with one, regardless of registration order.
// Generic: matches any POST to /users
mokksy.post {
path("/users")
} respondsWith {
body = "any user"
}
// Specific: matches only requests whose body contains "admin" — two conditions
mokksy.post {
path("/users")
bodyContains("admin")
} respondsWith {
body = "admin user"
}
// Admin request → specific stub wins (score 2 beats score 1)
val adminResult = client.post("/users") { setBody("admin") }
adminResult.bodyAsText() shouldBe "admin user"
// Other request → only the generic stub matches
val genericResult = client.post("/users") { setBody("regular") }
genericResult.bodyAsText() shouldBe "any user"
mokksy.get { path("/ping") } respondsWithStatus HttpStatusCode.NoContent
val response = client.get("/ping")
response.status shouldBe HttpStatusCode.NoContent
Priority Example
If multiple stubs match with the same specificity score, the one with the higher priority value wins:
// Catch-all stub with low priority (negative value)
mokksy.get {
path = contain("/things")
priority = -1
} respondsWith {
body = "Generic Thing"
}
// Specific stub with high priority (positive value)
mokksy.get {
path = beEqual("/things/special")
priority = 1
} respondsWith {
body = "Special Thing"
}
// when
val generic = client.get("/things/123")
val special = client.get("/things/special")
// then
generic.bodyAsText() shouldBe "Generic Thing"
special.bodyAsText() shouldBe "Special Thing"
Verifying Requests
Mokksy provides two complementary verification methods that check opposite sides of the stub/request contract.
Verify all stubs were triggered
verifyNoUnmatchedStubs() fails if any registered stub was never matched by an incoming request.
Use this to catch stubs you set up but that were never actually called — a sign the code under test took
a different path than expected.
// Fails if any stub has never been matched
mokksy.verifyNoUnmatchedStubs()
Note: Be careful when running tests in parallel against a single
MokksyServerinstance. Some stubs might be unmatched when one test completes. Avoid calling this in@AfterEach/@AfterTestunless each test owns its own server instance.
Verify no unexpected requests arrived
verifyNoUnexpectedRequests() fails if any HTTP request arrived at the server but no stub matched it.
These requests are recorded in the RequestJournal and reported together.
// Fails if any request arrived with no matching stub
mokksy.verifyNoUnexpectedRequests()
Recommended AfterEach setup
Always run verifyNoUnexpectedRequests() in @AfterEach to catch requests that arrived but
matched no stub. For verifyNoUnmatchedStubs(), the right placement depends on your fixture strategy:
- Per-test instance (
@TestInstance(Lifecycle.PER_METHOD)or a fresh server per test): call both checks in@AfterEach— every stub registered during that test should have been matched before the server is torn down. - Shared instance (
@TestInstance(Lifecycle.PER_CLASS)or a companion-object server): callverifyNoUnmatchedStubs()in@AfterAll, immediately beforeshutdown(). Calling it after each individual test would falsely report stubs registered for later tests as unmatched.
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyTest {
val mokksy = Mokksy.create()
lateinit var client: HttpClient
@BeforeAll
suspend fun setup() {
mokksy.startSuspend()
mokksy.awaitStarted() // port() and baseUrl() are safe after this point
client = HttpClient {
install(DefaultRequest) {
url(mokksy.baseUrl())
}
}
}
@Test
suspend fun testSomething() {
mokksy.get {
path("/hi")
} respondsWith {
delay = 100.milliseconds // wait 100ms, then reply
body = "Hello"
}
// when
val response = client.get("/hi")
// then
response.status shouldBe HttpStatusCode.OK
response.bodyAsText() shouldBe "Hello"
}
@AfterEach
fun afterEach() {
mokksy.verifyNoUnexpectedRequests()
}
@AfterAll
suspend fun afterAll() {
client.close()
mokksy.verifyNoUnmatchedStubs() // shared instance: check once, after all tests ran
mokksy.shutdownSuspend()
}
}
Inspecting unmatched items
Use the find* variants to retrieve the unmatched items directly for custom assertions:
// List<RecordedRequest> — HTTP requests with no matching stub
val unmatchedRequests: List<RecordedRequest> = mokksy.findAllUnexpectedRequests()
// List<RequestSpecification<*>> — stubs that were never triggered
val unmatchedStubs: List<RequestSpecification<*>> = mokksy.findAllUnmatchedStubs()
RecordedRequest is an immutable snapshot that captures method, uri, and headers of the incoming request.
Request Journal
Mokksy records incoming requests in a RequestJournal. The recording mode is controlled by JournalMode in
ServerConfiguration:
- JournalMode.NONE - Disables request recording entirely.
findAllUnexpectedRequests(),findAllMatchedRequests(), andverifyNoUnexpectedRequests()throwIllegalStateException. - JournalMode.LEAN (default) – Records only requests with no matching stub. Lower overhead; sufficient for
verifyNoUnexpectedRequests(). - JournalMode.FULL - Records all incoming requests, both matched and unmatched.
val mokksy = MokksyServer(
configuration = ServerConfiguration(
journalMode = JournalMode.FULL,
),
)
Call resetMatchState() between scenarios to clear stub match state and the journal:
@AfterTest
fun afterEach() {
mokksy.resetMatchState()
}
Note: Stubs configured with
eventuallyRemove = trueare permanently removed from the registry on first match and cannot be re-armed byresetMatchState(). Re-register them before the next scenario.
Embedding in an existing Ktor application
If you already own a Ktor Application — a test harness with authentication middleware, custom
plugins, or routes that must coexist with stubs — use the mokksy extension functions to mount
stub handling directly, without allocating a second embedded server.
Application-level installation
Application.mokksy(server) installs SSE, DoubleReceive, and ContentNegotiation
automatically, then mounts a catch-all route that dispatches every incoming request through the
stub registry:
import dev.mokksy.mokksy.MokksyServer
import dev.mokksy.mokksy.mokksy
import io.ktor.server.engine.embeddedServer
import io.ktor.server.netty.Netty
val server = MokksyServer()
server.get { path("/ping") } respondsWith { body = "pong" }
embeddedServer(Netty, port = 8080) {
mokksy(server)
}.start(wait = true)
Use this overload when Mokksy owns the entire application and you want the simplest possible setup.
Route-level installation
Route.mokksy(server) mounts the stub handler inside an existing route scope. Unlike the
application-level overload, it does not install plugins — you are responsible for installing
SSE, DoubleReceive, and ContentNegotiation on the surrounding application. This makes it
suitable when Mokksy stubs coexist with real routes:
routing {
get("/health") { call.respondText("OK") }
mokksy(server)
}
To place stubs behind an authentication check, install the required plugins and wrap mokksy in
an authenticate block:
install(SSE)
install(DoubleReceive)
install(ContentNegotiation) { json() }
install(Authentication) {
basic("auth-basic") {
validate { credentials ->
if (credentials.name == "user" && credentials.password == "pass") {
UserIdPrincipal(credentials.name)
} else null
}
}
}
routing {
authenticate("auth-basic") {
mokksy(server)
}
}
Both extension functions accept any path pattern as a second parameter (default: "{...}",
which matches all routes). Narrow the scope by passing a prefix:
mokksy(server, path = "/api/{...}")
Java API
Java callers use dev.mokksy.Mokksy — a JVM-only, AutoCloseable wrapper that exposes a
Consumer-based fluent API instead of Kotlin lambdas with receivers.
Lifecycle:
import dev.mokksy.Mokksy;
Mokksy mokksy = Mokksy.create().start();
mokksy.
get("/ping").
respondsWith("Pong");
mokksy.
shutdown();
Mokksy implements AutoCloseable, so try-with-resources works for test fixtures that need a short-lived server:
try(Mokksy mokksy = Mokksy.create().start()){
mokksy
.
post("/items")
.
respondsWith("{\"id\":\"42\"}",201);
}
JUnit 5 setup:
import dev.mokksy.Mokksy;
import java.net.http.HttpClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyTest {
private final Mokksy mokksy = Mokksy.create();
private HttpClient httpClient;
@BeforeAll
void setUp() {
mokksy.start();
httpClient = HttpClient.newHttpClient();
}
@Test
void test() {
// call server
}
@AfterAll
void tearDown() {
mokksy.shutdown();
}
}
Path shortcuts — every HTTP verb has a String overload that matches the path exactly.
Use the full spec lambda when you need additional matchers:
// Path-only shortcut (most common case)
mokksy.get("/ping").
respondsWith("Pong");
// Full spec for headers, body matchers, or priority
mokksy.
get(spec ->spec.
path("/ping").
containsHeader("Accept","application/json"))
.
respondsWith(builder ->builder.
body("Pong"));
Response shortcuts — respondsWith(String) and respondsWith(String, int) cover the
common case of returning a string body with an optional status code:
mokksy.get("/hello").
respondsWith("Hello, World!");
mokksy.
post("/items").
respondsWith("{\"id\":42}",201);
When you need headers, delays, or a custom content type, use the full builder:
mokksy.post("/items")
.
respondsWith(builder ->builder
.
body("{\"id\":\"42\"}")
.
status(201)
.
header("Location","/items/42")
.
delayMillis(50));
Status-only responses — use respondsWithStatus(int) when no body is needed:
mokksy.get("/health").
respondsWithStatus(200);
mokksy.
delete("/items/1").
respondsWithStatus(204);
One-time stubs — use StubConfiguration.once() to create a stub that is removed after
its first match:
import dev.mokksy.mokksy.StubConfiguration;
mokksy.get(StubConfiguration.once("my-stub"), "/once")
.
respondsWith("First!");
// second request to /once returns 404
Request matchers — the spec block mirrors the Kotlin DSL:
mokksy.post(spec ->{
spec.
path("/secured");
spec.
containsHeader("X-Api-Key","secret");
spec.
bodyContains("\"role\":\"admin\"");
}).
respondsWith(builder ->builder.
body("authorized").
status(200));
All HTTP verbs are available as named methods (get, post, put, delete, patch,
head, options) — each with both String path and Consumer spec overloads.
Use method(String, String) or method(String, spec) for dynamic method names in parameterised tests:
mokksy.method("PATCH","/resource").
respondsWith("patched");
Streaming responses
Testing an LLM client or any endpoint that streams data chunk-by-chunk? Use respondsWithStream
to stub a chunked HTTP response. The default Content-Type is text/event-stream; charset=UTF-8,
which matches what most streaming AI APIs and SSE endpoints produce.
Chunks from a list — the simplest case:
mokksy.get(spec ->spec.
path("/stream"))
.
respondsWithStream(builder ->builder
.
chunks(List.of("Hello", " ","World")));
Chunks from a Stream<T> — the stream is consumed lazily when the first matching request
arrives, not when the stub is registered. This is useful for live generators or mutable sources
that should reflect their state at request time:
mokksy.get(spec ->spec.
path("/events"))
.
respondsWithStream(builder ->builder
.
chunks(Stream.of("data1", "data2")));
Delays — simulate network and processing latency at two granularities:
mokksy.get(spec ->spec.
path("/slow-stream"))
.
respondsWithStream(builder ->builder
.
chunks(List.of("A", "B","C"))
.
delayMillis(200L) // pause before the first chunk
.
delayBetweenChunksMillis(100L)); // pause between each subsequent chunk
Custom Content-Type — override the default when the stream carries a different format, such
as NDJSON:
mokksy.get(spec ->spec.
path("/ndjson"))
.
respondsWithStream(builder ->builder
.
chunks(List.of("{\"value\":1}", "{\"value\":2}"))
.
contentType("application/x-ndjson"));
For typed chunks, pass the class token as the first argument. Chunks are serialized to the
response body using each object's toString():
mokksy.get(spec ->spec.
path("/typed"))
.
respondsWithStream(MyEvent .class, builder ->builder
.
chunk(new MyEvent("start"))
.
chunk(new MyEvent("end")));
SSE streaming responses
Use respondsWithSseStream to stub a true Server-Sent Events (SSE) response.
Each chunk is a ServerSentEvent that Mokksy sends using the standard SSE wire format
(data: ...\r\n), including proper Cache-Control and Connection headers.
Basic SSE — use SseEvent.data() to create data-only events (the most common case):
import dev.mokksy.mokksy.SseEvent;
mokksy.get("/sse")
.
respondsWithSseStream(builder ->builder
.
chunk(SseEvent.data("Hello"))
.
chunk(SseEvent.data("World")));
SSE with event type, id, retry, and comments — use SseEvent.builder() for multi-field events:
mokksy.get("/sse-full")
.
respondsWithSseStream(builder ->builder
.
chunk(SseEvent.builder()
.
data("payload")
.
event("message")
.
id("42")
.
retry(5000L)
.
comments("keep-alive")
.
build()));
From a list:
mokksy.get("/sse-list")
.
respondsWithSseStream(builder ->builder
.
chunks(List.of(
SseEvent.data("event-1"),
SseEvent.
data("event-2"))));
With delays:
mokksy.get("/sse-slow")
.
respondsWithSseStream(builder ->builder
.
chunk(SseEvent.data("first"))
.
chunk(SseEvent.data("second"))
.
delayMillis(200L)
.
delayBetweenChunksMillis(100L));
Typed SSE — pass the data-type class token for the event's data field type:
mokksy.get("/typed-sse")
.
respondsWithSseStream(String .class, builder ->builder
.
chunk(SseEvent.data("typed-event")));
Tip:
SseEvent.data(String)andSseEvent.builder()are Java-friendly factories for Ktor'sServerSentEvent. Ktor's constructor requires all five parameters in Java (new ServerSentEvent("data", null, null, null, null)); these factories eliminate the trailing nulls.
Jackson support
Use MokksyJackson.create() when your tests match typed Java objects deserialized from the
request body. The API mirrors Mokksy.create() exactly — same host, port, and verbose
parameters — with an optional ObjectMapper configuration callback.
Add the dependency alongside mokksy:
testImplementation("io.ktor:ktor-serialization-jackson:$ktorVersion")
Then create the server:
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.mokksy.Mokksy;
import dev.mokksy.MokksyJackson;
// Default Jackson configuration
Mokksy mokksy = MokksyJackson.create().start();
// // Or customise the ObjectMapper — e.g. register Java time / records support
Mokksy mokksyWithJackson = MokksyJackson.create(ObjectMapper::findAndRegisterModules).start();
Typed body matchers work the same way as in the standard API — pass the Class token to
the stub-registration method and use bodyMatchesPredicate to assert on the deserialized object:
record CreateItemRequest(String name, int quantity) {
}
mokksy.
post(
CreateItemRequest .class,
spec ->spec.
path("/items")
.
bodyMatchesPredicate(req ->"widget".
equals(req.name()))
).
respondsWith(builder ->builder.
body("{\"id\":\"1\"}").
status(201));

