diff --git a/.gitignore b/.gitignore index 7b31c88c7..e0f5c32e8 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,9 @@ Makefile temp/ *Temp.java *TempTest.java + +# markdoc +markdoc/ + +# macOS +.DS_Store \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index c7c6d8f09..0df73f7b2 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -67,8 +67,11 @@ allprojects { // The ".git" directory may not exist when resolving dependencies in the Docker image build if (File(rootProject.rootDir, ".git").exists()) { val grgit = org.ajoberstar.grgit.Grgit.open(mapOf("currentDir" to rootProject.rootDir)) - rootProject.extra["gitHashFull"] = grgit.head().id - rootProject.extra["gitCommitTime"] = grgit.head().dateTime.withZoneSameLocal(java.time.ZoneOffset.UTC) + val head = grgit.head() + if (head != null) { + rootProject.extra["gitHashFull"] = head.id + rootProject.extra["gitCommitTime"] = head.dateTime.withZoneSameLocal(java.time.ZoneOffset.UTC) + } grgit.close() } diff --git a/config/checkstyle/checkstyle_suppressions.xml b/config/checkstyle/checkstyle_suppressions.xml index 25ef033d5..7174b5177 100644 --- a/config/checkstyle/checkstyle_suppressions.xml +++ b/config/checkstyle/checkstyle_suppressions.xml @@ -32,4 +32,6 @@ + + diff --git a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Response.java b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Response.java index ce9c9afa7..5afd6325f 100644 --- a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Response.java +++ b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Response.java @@ -27,6 +27,8 @@ import org.apache.hc.core5.http.message.BasicClassicHttpResponse; import org.apache.hc.core5.http.message.RequestLine; +import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -37,7 +39,7 @@ * Holds an elasticsearch response. It wraps the {@link BasicClassicHttpResponse} returned and associates * it with its corresponding {@link RequestLine} and {@link HttpHost}. */ -public class Response { +public class Response implements Closeable { private final RequestLine requestLine; private final HttpHost host; @@ -201,6 +203,11 @@ ClassicHttpResponse getHttpResponse() { return response; } + @Override + public void close() throws IOException { + response.close(); + } + @Override public String toString() { return "Response{requestLine=" + requestLine + ", host=" + host + ", response=" + response.getCode() + '}'; diff --git a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java index be2ec04e3..b8c9d08e2 100644 --- a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java +++ b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java @@ -78,6 +78,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -127,6 +130,8 @@ public class Rest5Client implements Closeable { private final WarningsHandler warningsHandler; private final boolean compressionEnabled; private final boolean metaHeaderEnabled; + private final ScheduledExecutorService healthCheckExecutor; + private volatile boolean closed = false; Rest5Client( CloseableHttpAsyncClient client, @@ -148,6 +153,19 @@ public class Rest5Client implements Closeable { this.compressionEnabled = compressionEnabled; this.metaHeaderEnabled = metaHeaderEnabled; setNodes(nodes); + + // 初始化连接池健康检查执行器 + this.healthCheckExecutor = new ScheduledThreadPoolExecutor(1, r -> { + Thread thread = new Thread(r, "elasticsearch-rest-client-health-check"); + thread.setDaemon(true); + return thread; + }); + + // 启动定期健康检查(每30秒执行一次) + this.healthCheckExecutor.scheduleAtFixedRate( + this::performHealthCheck, + 30, 30, TimeUnit.SECONDS + ); } /** @@ -584,9 +602,66 @@ private void onFailure(Node node) { failureListener.onFailure(node); } + @Override + /** + * 执行连接池健康检查 + */ + private void performHealthCheck() { + if (closed) { + return; + } + + try { + // 检查客户端状态 + if (client instanceof org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClientImpl) { + org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClientImpl clientImpl = + (org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClientImpl) client; + + // 获取连接管理器 + org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager connectionManager = + (org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager) clientImpl.getConnectionManager(); + + // 检查连接池状态 + int totalConnections = connectionManager.getTotalStats().getAvailable() + + connectionManager.getTotalStats().getLeased() + + connectionManager.getTotalStats().getPending() + + connectionManager.getTotalStats().getMax(); + + // 如果连接数超过阈值,输出警告日志 + if (totalConnections > connectionManager.getMaxTotal()) { + logger.warn("Connection pool usage exceeded maximum limit. Total connections: " + totalConnections + ", Max: " + connectionManager.getMaxTotal()); + } + + // 定期清理过期连接 + connectionManager.closeExpiredConnections(); + connectionManager.closeIdleConnections(Timeout.of(5, TimeUnit.MINUTES)); + } + } catch (Exception e) { + logger.debug("Failed to perform connection pool health check", e); + } + } + @Override public void close() throws IOException { - client.close(); + closed = true; + + // 关闭健康检查执行器 + if (healthCheckExecutor != null) { + healthCheckExecutor.shutdown(); + try { + if (!healthCheckExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + healthCheckExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + healthCheckExecutor.shutdownNow(); + } + } + + // 关闭HTTP客户端 + if (client != null) { + client.close(); + } } /** diff --git a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5ClientBuilder.java b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5ClientBuilder.java index d35ffac3f..168b3df5d 100644 --- a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5ClientBuilder.java +++ b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5ClientBuilder.java @@ -80,6 +80,7 @@ public final class Rest5ClientBuilder { private Consumer requestConfigCallback; private Consumer connectionConfigCallback; private Consumer connectionManagerCallback; + private Consumer connectionManagerMonitor; private Header[] defaultHeaders = EMPTY_HEADERS; private Rest5Client.FailureListener failureListener; private SSLContext sslContext; @@ -369,14 +370,21 @@ public Rest5ClientBuilder setConnectionConfigCallback(Consumer connectionManagerCallback) { - Objects.requireNonNull(connectionManagerCallback, "connectionManagerCallback must not be null"); this.connectionManagerCallback = connectionManagerCallback; return this; } + /** + * Allows to monitor the {@link PoolingAsyncClientConnectionManager} after it has been created. + * This can be used to add custom monitoring or alerting for connection pool usage. + */ + public Rest5ClientBuilder setConnectionManagerMonitor(Consumer connectionManagerMonitor) { + this.connectionManagerMonitor = connectionManagerMonitor; + return this; + } + /** * Creates a new {@link Rest5Client} based on the provided configuration. */ @@ -446,20 +454,29 @@ private CloseableHttpAsyncClient createHttpClient() { connectionConfigCallback.accept(connectionConfigBuilder); } - PoolingAsyncClientConnectionManagerBuilder connectionManagerBuilder = + PoolingAsyncClientConnectionManagerBuilder connectionManagerBuilder = PoolingAsyncClientConnectionManagerBuilder.create() .setDefaultConnectionConfig(connectionConfigBuilder.build()) .setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE) .setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL) - .setTlsStrategy(new DefaultClientTlsStrategy(sslContext)); + .setTlsStrategy(new DefaultClientTlsStrategy(sslContext)) + // 配置连接池定期清理机制 + .setValidateAfterInactivity(Timeout.of(5, TimeUnit.SECONDS)) + // 配置连接存活时间 + .setConnectionTimeToLive(Timeout.of(30, TimeUnit.MINUTES)); if (connectionManagerCallback != null) { connectionManagerCallback.accept(connectionManagerBuilder); } + PoolingAsyncClientConnectionManager connectionManager = connectionManagerBuilder.build(); + // 调用连接池监控回调 + if (connectionManagerMonitor != null) { + connectionManagerMonitor.accept(connectionManager); + } HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create() .setDefaultRequestConfig(requestConfigBuilder.build()) - .setConnectionManager(connectionManagerBuilder.build()) + .setConnectionManager(connectionManager) .setUserAgent(USER_AGENT_HEADER_VALUE) .setTargetAuthenticationStrategy(new DefaultAuthenticationStrategy()) .setThreadFactory(new RestClientThreadFactory());