Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
import io.modelcontextprotocol.spec.HttpHeaders;
Expand Down Expand Up @@ -118,7 +119,7 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {

private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;

private final AtomicReference<DefaultMcpTransportSession> activeSession = new AtomicReference<>();
private final AtomicReference<McpTransportSession<Disposable>> activeSession = new AtomicReference<>();

private final AtomicReference<Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>>> handler = new AtomicReference<>();

Expand Down Expand Up @@ -163,12 +164,20 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
});
}

private DefaultMcpTransportSession createTransportSession() {
private McpTransportSession<Disposable> createTransportSession() {
Function<String, Publisher<Void>> onClose = sessionId -> sessionId == null ? Mono.empty()
: createDelete(sessionId);
return new DefaultMcpTransportSession(onClose);
}

private McpTransportSession<Disposable> createClosedSession(McpTransportSession<Disposable> existingSession) {
var existingSessionId = Optional.ofNullable(existingSession)
.filter(session -> !(session instanceof ClosedMcpTransportSession<Disposable>))
.flatMap(McpTransportSession::sessionId)
.orElse(null);
return new ClosedMcpTransportSession<>(existingSessionId);
}

private Publisher<Void> createDelete(String sessionId) {

var uri = Utils.resolveUri(this.baseUri, this.endpoint);
Expand Down Expand Up @@ -210,9 +219,9 @@ private void handleException(Throwable t) {
public Mono<Void> closeGracefully() {
return Mono.defer(() -> {
logger.debug("Graceful close triggered");
DefaultMcpTransportSession currentSession = this.activeSession.getAndSet(createTransportSession());
McpTransportSession<Disposable> currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
if (currentSession != null) {
return currentSession.closeGracefully();
return Mono.from(currentSession.closeGracefully());
}
return Mono.empty();
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2025-2025 the original author or authors.
*/
package io.modelcontextprotocol.spec;

import java.util.Optional;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/**
* Represents a closed MCP session, which may not be reused. All calls will throw a
* {@link McpTransportSessionClosedException}.
*
* @param <CONNECTION> the resource representing the connection that the transport
* manages.
* @author Daniel Garnier-Moiroux
*/
public class ClosedMcpTransportSession<CONNECTION> implements McpTransportSession<CONNECTION> {

private final String sessionId;

public ClosedMcpTransportSession(@Nullable String sessionId) {
this.sessionId = sessionId;
}

@Override
public Optional<String> sessionId() {
throw new McpTransportSessionClosedException(sessionId);
}

@Override
public boolean markInitialized(String sessionId) {
throw new McpTransportSessionClosedException(sessionId);
}

@Override
public void addConnection(CONNECTION connection) {
throw new McpTransportSessionClosedException(sessionId);
}

@Override
public void removeConnection(CONNECTION connection) {
throw new McpTransportSessionClosedException(sessionId);
}

@Override
public void close() {

}

@Override
public Publisher<Void> closeGracefully() {
return Mono.empty();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2025-2025 the original author or authors.
*/

package io.modelcontextprotocol.spec;

import reactor.util.annotation.Nullable;

/**
* Exception thrown when trying to use an {@link McpTransportSession} that has been
* closed.
*
* @see ClosedMcpTransportSession
* @author Daniel Garnier-Moiroux
*/
public class McpTransportSessionClosedException extends RuntimeException {

public McpTransportSessionClosedException(@Nullable String sessionId) {
super(sessionId != null ? "MCP session with ID %s has been closed".formatted(sessionId)
: "MCP session has been closed");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpTransport;
import org.junit.jupiter.api.Disabled;
import io.modelcontextprotocol.spec.McpTransportSessionClosedException;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -222,9 +222,10 @@ void testSessionClose() {
// In case of Streamable HTTP this call should issue a HTTP DELETE request
// invalidating the session
StepVerifier.create(mcpAsyncClient.closeGracefully()).expectComplete().verify();
// The next use should immediately re-initialize with no issue and send the
// request without any broken connections.
StepVerifier.create(mcpAsyncClient.ping()).expectNextCount(1).verifyComplete();
// The next tries to use the closed session and fails
StepVerifier.create(mcpAsyncClient.ping())
.expectErrorMatches(err -> err.getCause() instanceof McpTransportSessionClosedException)
.verify();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,39 @@ void testAsyncRequestCustomizer() throws URISyntaxException {
});
}

@Test
void testCloseUninitialized() {
var transport = HttpClientStreamableHttpTransport.builder(host).build();

StepVerifier.create(transport.closeGracefully()).verifyComplete();

var initializeRequest = new McpSchema.InitializeRequest(McpSchema.LATEST_PROTOCOL_VERSION,
McpSchema.ClientCapabilities.builder().roots(true).build(),
new McpSchema.Implementation("Spring AI MCP Client", "0.3.1"));
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE,
"test-id", initializeRequest);

StepVerifier.create(transport.sendMessage(testMessage))
.expectErrorMessage("MCP session has been closed")
.verify();
}

@Test
void testCloseInitialized() {
var transport = HttpClientStreamableHttpTransport.builder(host).build();

var initializeRequest = new McpSchema.InitializeRequest(McpSchema.LATEST_PROTOCOL_VERSION,
McpSchema.ClientCapabilities.builder().roots(true).build(),
new McpSchema.Implementation("Spring AI MCP Client", "0.3.1"));
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE,
"test-id", initializeRequest);

StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete();
StepVerifier.create(transport.closeGracefully()).verifyComplete();

StepVerifier.create(transport.sendMessage(testMessage))
.expectErrorMatches(err -> err.getMessage().matches("MCP session with ID [a-zA-Z0-9-]* has been closed"))
.verify();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.modelcontextprotocol.json.TypeRef;
import io.modelcontextprotocol.json.McpJsonMapper;

import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
import io.modelcontextprotocol.spec.HttpHeaders;
Expand Down Expand Up @@ -98,7 +99,7 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {

private final boolean resumableStreams;

private final AtomicReference<DefaultMcpTransportSession> activeSession = new AtomicReference<>();
private final AtomicReference<McpTransportSession<Disposable>> activeSession = new AtomicReference<>();

private final AtomicReference<Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>>> handler = new AtomicReference<>();

Expand Down Expand Up @@ -143,7 +144,7 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
});
}

private DefaultMcpTransportSession createTransportSession() {
private McpTransportSession<Disposable> createTransportSession() {
Function<String, Publisher<Void>> onClose = sessionId -> sessionId == null ? Mono.empty()
: webClient.delete()
.uri(this.endpoint)
Expand All @@ -159,6 +160,14 @@ private DefaultMcpTransportSession createTransportSession() {
return new DefaultMcpTransportSession(onClose);
}

private McpTransportSession<Disposable> createClosedSession(McpTransportSession<Disposable> existingSession) {
var existingSessionId = Optional.ofNullable(existingSession)
.filter(session -> !(session instanceof ClosedMcpTransportSession<Disposable>))
.flatMap(McpTransportSession::sessionId)
.orElse(null);
return new ClosedMcpTransportSession<>(existingSessionId);
}

@Override
public void setExceptionHandler(Consumer<Throwable> handler) {
logger.debug("Exception handler registered");
Expand All @@ -182,9 +191,9 @@ private void handleException(Throwable t) {
public Mono<Void> closeGracefully() {
return Mono.defer(() -> {
logger.debug("Graceful close triggered");
DefaultMcpTransportSession currentSession = this.activeSession.getAndSet(createTransportSession());
McpTransportSession<Disposable> currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
if (currentSession != null) {
return currentSession.closeGracefully();
return Mono.from(currentSession.closeGracefully());
}
return Mono.empty();
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2024-2025 the original author or authors.
*/
package io.modelcontextprotocol.client.transport;

import io.modelcontextprotocol.spec.McpSchema;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import reactor.test.StepVerifier;

import org.springframework.web.reactive.function.client.WebClient;

class WebClientStreamableHttpTransportTest {

static String host = "http://localhost:3001";

static WebClient.Builder builder;

@SuppressWarnings("resource")
static GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v3")
.withCommand("node dist/index.js streamableHttp")
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
.withExposedPorts(3001)
.waitingFor(Wait.forHttp("/").forStatusCode(404));

@BeforeAll
static void startContainer() {
container.start();
int port = container.getMappedPort(3001);
host = "http://" + container.getHost() + ":" + port;
builder = WebClient.builder().baseUrl(host);
}

@AfterAll
static void stopContainer() {
container.stop();
}

@Test
void testCloseUninitialized() {
var transport = WebClientStreamableHttpTransport.builder(builder).build();

StepVerifier.create(transport.closeGracefully()).verifyComplete();

var initializeRequest = new McpSchema.InitializeRequest(McpSchema.LATEST_PROTOCOL_VERSION,
McpSchema.ClientCapabilities.builder().roots(true).build(),
new McpSchema.Implementation("Spring AI MCP Client", "0.3.1"));
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE,
"test-id", initializeRequest);

StepVerifier.create(transport.sendMessage(testMessage))
.expectErrorMessage("MCP session has been closed")
.verify();
}

@Test
void testCloseInitialized() {
var transport = WebClientStreamableHttpTransport.builder(builder).build();

var initializeRequest = new McpSchema.InitializeRequest(McpSchema.LATEST_PROTOCOL_VERSION,
McpSchema.ClientCapabilities.builder().roots(true).build(),
new McpSchema.Implementation("Spring AI MCP Client", "0.3.1"));
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE,
"test-id", initializeRequest);

StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete();
StepVerifier.create(transport.closeGracefully()).verifyComplete();

StepVerifier.create(transport.sendMessage(testMessage))
.expectErrorMatches(err -> err.getMessage().matches("MCP session with ID [a-zA-Z0-9-]* has been closed"))
.verify();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpTransport;
import org.junit.jupiter.api.Disabled;
import io.modelcontextprotocol.spec.McpTransportSessionClosedException;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -221,9 +221,10 @@ void testSessionClose() {
// In case of Streamable HTTP this call should issue a HTTP DELETE request
// invalidating the session
StepVerifier.create(mcpAsyncClient.closeGracefully()).expectComplete().verify();
// The next use should immediately re-initialize with no issue and send the
// request without any broken connections.
StepVerifier.create(mcpAsyncClient.ping()).expectNextCount(1).verifyComplete();
// The next tries to use the closed session and fails
StepVerifier.create(mcpAsyncClient.ping())
.expectErrorMatches(err -> err.getCause() instanceof McpTransportSessionClosedException)
.verify();
});
}

Expand Down