Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package io.modelcontextprotocol.client.transport;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
Expand Down Expand Up @@ -36,6 +38,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import sun.misc.Unsafe;

/**
* Server-Sent Events (SSE) implementation of the
Expand Down Expand Up @@ -116,6 +119,12 @@ public class HttpClientSseClientTransport implements McpClientTransport {
*/
private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;

/**
* Consumer to handle HttpClient closure. If null, no cleanup is performed (external
* HttpClient).
*/
private final Consumer<HttpClient> onCloseClient;

/**
* Creates a new transport instance with custom HTTP client builder, object mapper,
* and headers.
Expand All @@ -129,7 +138,8 @@ public class HttpClientSseClientTransport implements McpClientTransport {
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
*/
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
String sseEndpoint, McpJsonMapper jsonMapper, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) {
String sseEndpoint, McpJsonMapper jsonMapper, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
Consumer<HttpClient> onCloseClient) {
Assert.notNull(jsonMapper, "jsonMapper must not be null");
Assert.hasText(baseUri, "baseUri must not be empty");
Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
Expand All @@ -142,6 +152,7 @@ public class HttpClientSseClientTransport implements McpClientTransport {
this.httpClient = httpClient;
this.requestBuilder = requestBuilder;
this.httpRequestCustomizer = httpRequestCustomizer;
this.onCloseClient = onCloseClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert that onCloseClient is not null.

}

@Override
Expand Down Expand Up @@ -169,6 +180,8 @@ public static class Builder {

private HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1);

private HttpClient externalHttpClient;

private McpJsonMapper jsonMapper;

private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder();
Expand All @@ -177,6 +190,8 @@ public static class Builder {

private Duration connectTimeout = Duration.ofSeconds(10);

private Consumer<HttpClient> onCloseClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a default value here.


/**
* Creates a new builder instance.
*/
Expand Down Expand Up @@ -220,24 +235,15 @@ public Builder sseEndpoint(String sseEndpoint) {
}

/**
* Sets the HTTP client builder.
* @param clientBuilder the HTTP client builder
* @return this builder
*/
public Builder clientBuilder(HttpClient.Builder clientBuilder) {
Assert.notNull(clientBuilder, "clientBuilder must not be null");
this.clientBuilder = clientBuilder;
return this;
}

/**
* Customizes the HTTP client builder.
* @param clientCustomizer the consumer to customize the HTTP client builder
* Sets an external HttpClient instance to use instead of creating a new one. When
* an external HttpClient is provided, the transport will not attempt to close it
* during graceful shutdown, leaving resource management to the caller.
* @param httpClient the HttpClient instance to use
* @return this builder
*/
public Builder customizeClient(final Consumer<HttpClient.Builder> clientCustomizer) {
Assert.notNull(clientCustomizer, "clientCustomizer must not be null");
clientCustomizer.accept(clientBuilder);
public Builder httpClient(HttpClient httpClient) {
Assert.notNull(httpClient, "httpClient must not be null");
this.externalHttpClient = httpClient;
return this;
}

Expand Down Expand Up @@ -310,13 +316,13 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
}

/**
* Sets the connection timeout for the HTTP client.
* @param connectTimeout the connection timeout duration
* Sets a custom consumer to handle HttpClient closure when the transport is
* closed.
* @param onCloseClient the consumer to handle HttpClient closure
* @return this builder
*/
public Builder connectTimeout(Duration connectTimeout) {
Assert.notNull(connectTimeout, "connectTimeout must not be null");
this.connectTimeout = connectTimeout;
public Builder onCloseClient(Consumer<HttpClient> onCloseClient) {
this.onCloseClient = onCloseClient;
return this;
}

Expand All @@ -325,9 +331,23 @@ public Builder connectTimeout(Duration connectTimeout) {
* @return a new transport instance
*/
public HttpClientSseClientTransport build() {
HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
HttpClient httpClient;
Consumer<HttpClient> closeHandler;

if (externalHttpClient != null) {
// Use external HttpClient, use custom close handler or no-op
httpClient = externalHttpClient;
closeHandler = onCloseClient; // null means no cleanup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not use null, use a no-op lambda instead, client -> {}.

}
else {
// Create new HttpClient, use custom close handler or default cleanup
httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
closeHandler = onCloseClient != null ? onCloseClient
: HttpClientSseClientTransport::closeHttpClientResourcesStatic;
}

return new HttpClientSseClientTransport(httpClient, requestBuilder, baseUri, sseEndpoint,
jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpRequestCustomizer);
jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpRequestCustomizer, closeHandler);
}

}
Expand Down Expand Up @@ -495,7 +515,58 @@ public Mono<Void> closeGracefully() {
if (subscription != null && !subscription.isDisposed()) {
subscription.dispose();
}
});
}).then(onCloseClient != null ? Mono.fromRunnable(() -> onCloseClient.accept(httpClient)) : Mono.empty());
}

/**
* Static method to close HttpClient resources using reflection.
*/
private static void closeHttpClientResourcesStatic(HttpClient httpClient) {
try {
// unsafe
Class<?> UnsafeClass = Class.forName("sun.misc.Unsafe");
Field unsafeField = UnsafeClass.getDeclaredField("theUnsafe");
unsafeField.setAccessible(true);
Unsafe unsafe = (Unsafe) unsafeField.get(null);
Module ObjectModule = Object.class.getModule();
Class<HttpClientSseClientTransport> currentClass = HttpClientSseClientTransport.class;
long addr = unsafe.objectFieldOffset(Class.class.getDeclaredField("module"));
unsafe.getAndSetObject(currentClass, addr, ObjectModule);
Comment on lines +526 to +534
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should need unsafe for this. Is there another way?


try {
Method closeMethod = httpClient.getClass().getMethod("close");
closeMethod.invoke(httpClient);
logger.debug("Successfully used JDK 21+ close() method to close HttpClient");
return;
}
catch (NoSuchMethodException e) {
logger.debug("JDK 21+ close() method not available, falling back to internal reflection");
}
// This prevents the accumulation of HttpClient-xxx-SelectorManager threads
try {
java.lang.reflect.Field implField = httpClient.getClass().getDeclaredField("impl");
implField.setAccessible(true);
Object implObj = implField.get(httpClient);
java.lang.reflect.Field selmgrField = implObj.getClass().getDeclaredField("selmgr");
selmgrField.setAccessible(true);
Object selmgrObj = selmgrField.get(implObj);

if (selmgrObj != null) {
Method shutDownMethod = selmgrObj.getClass().getDeclaredMethod("shutdown");
shutDownMethod.setAccessible(true);
shutDownMethod.invoke(selmgrObj);
logger.debug("HttpClient SelectorManager shutdown completed via reflection");
}
}
catch (NoSuchFieldException | NoSuchMethodException e) {
// Field/method structure might differ across JDK versions
logger.debug("SelectorManager field/method not found, skipping internal cleanup: {}", e.getMessage());
}

}
catch (Exception e) {
logger.warn("Failed to close HttpClient resources cleanly: {}", e.getMessage());
}
}

/**
Expand Down
Loading
Loading