Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ Makefile
temp/
*Temp.java
*TempTest.java

# markdoc
markdoc/

# macOS
.DS_Store
7 changes: 5 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ allprojects {
// The ".git" directory may not exist when resolving dependencies in the Docker image build
if (File(rootProject.rootDir, ".git").exists()) {
val grgit = org.ajoberstar.grgit.Grgit.open(mapOf("currentDir" to rootProject.rootDir))
rootProject.extra["gitHashFull"] = grgit.head().id
rootProject.extra["gitCommitTime"] = grgit.head().dateTime.withZoneSameLocal(java.time.ZoneOffset.UTC)
val head = grgit.head()
if (head != null) {
rootProject.extra["gitHashFull"] = head.id
rootProject.extra["gitCommitTime"] = head.dateTime.withZoneSameLocal(java.time.ZoneOffset.UTC)
}
grgit.close()
}

Expand Down
2 changes: 2 additions & 0 deletions config/checkstyle/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@
<suppress files="[/\\]temp[/\\]" checks="." />
<suppress files="Temp.java*" checks="." />
<suppress files="TempTest.java*" checks="." />
<!-- Exclude markdoc directory -->
<suppress files="[/\\]markdoc[/\\]" checks="." />
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.hc.core5.http.message.BasicClassicHttpResponse;
import org.apache.hc.core5.http.message.RequestLine;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -37,7 +39,7 @@
* Holds an elasticsearch response. It wraps the {@link BasicClassicHttpResponse} returned and associates
* it with its corresponding {@link RequestLine} and {@link HttpHost}.
*/
public class Response {
public class Response implements Closeable {

private final RequestLine requestLine;
private final HttpHost host;
Expand Down Expand Up @@ -201,6 +203,11 @@ ClassicHttpResponse getHttpResponse() {
return response;
}

@Override
public void close() throws IOException {
response.close();
}

@Override
public String toString() {
return "Response{requestLine=" + requestLine + ", host=" + host + ", response=" + response.getCode() + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -127,6 +130,8 @@ public class Rest5Client implements Closeable {
private final WarningsHandler warningsHandler;
private final boolean compressionEnabled;
private final boolean metaHeaderEnabled;
private final ScheduledExecutorService healthCheckExecutor;
private volatile boolean closed = false;

Rest5Client(
CloseableHttpAsyncClient client,
Expand All @@ -148,6 +153,19 @@ public class Rest5Client implements Closeable {
this.compressionEnabled = compressionEnabled;
this.metaHeaderEnabled = metaHeaderEnabled;
setNodes(nodes);

// 初始化连接池健康检查执行器
this.healthCheckExecutor = new ScheduledThreadPoolExecutor(1, r -> {
Thread thread = new Thread(r, "elasticsearch-rest-client-health-check");
thread.setDaemon(true);
return thread;
});

// 启动定期健康检查(每30秒执行一次)
this.healthCheckExecutor.scheduleAtFixedRate(
this::performHealthCheck,
30, 30, TimeUnit.SECONDS
);
}

/**
Expand Down Expand Up @@ -584,9 +602,66 @@ private void onFailure(Node node) {
failureListener.onFailure(node);
}

@Override
/**
* 执行连接池健康检查
*/
private void performHealthCheck() {
if (closed) {
return;
}

try {
// 检查客户端状态
if (client instanceof org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClientImpl) {
org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClientImpl clientImpl =
(org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClientImpl) client;

// 获取连接管理器
org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager connectionManager =
(org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager) clientImpl.getConnectionManager();

// 检查连接池状态
int totalConnections = connectionManager.getTotalStats().getAvailable() +
connectionManager.getTotalStats().getLeased() +
connectionManager.getTotalStats().getPending() +
connectionManager.getTotalStats().getMax();

// 如果连接数超过阈值,输出警告日志
if (totalConnections > connectionManager.getMaxTotal()) {
logger.warn("Connection pool usage exceeded maximum limit. Total connections: " + totalConnections + ", Max: " + connectionManager.getMaxTotal());
}

// 定期清理过期连接
connectionManager.closeExpiredConnections();
connectionManager.closeIdleConnections(Timeout.of(5, TimeUnit.MINUTES));
}
} catch (Exception e) {
logger.debug("Failed to perform connection pool health check", e);
}
}

@Override
public void close() throws IOException {
client.close();
closed = true;

// 关闭健康检查执行器
if (healthCheckExecutor != null) {
healthCheckExecutor.shutdown();
try {
if (!healthCheckExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
healthCheckExecutor.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
healthCheckExecutor.shutdownNow();
}
}

// 关闭HTTP客户端
if (client != null) {
client.close();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public final class Rest5ClientBuilder {
private Consumer<RequestConfig.Builder> requestConfigCallback;
private Consumer<ConnectionConfig.Builder> connectionConfigCallback;
private Consumer<PoolingAsyncClientConnectionManagerBuilder> connectionManagerCallback;
private Consumer<PoolingAsyncClientConnectionManager> connectionManagerMonitor;
private Header[] defaultHeaders = EMPTY_HEADERS;
private Rest5Client.FailureListener failureListener;
private SSLContext sslContext;
Expand Down Expand Up @@ -369,14 +370,21 @@ public Rest5ClientBuilder setConnectionConfigCallback(Consumer<ConnectionConfig.
* and used by the {@link CloseableHttpAsyncClient}.
* Commonly used to customize {@link PoolingAsyncClientConnectionManager} without losing any other useful default
* value that the {@link Rest5ClientBuilder} internally sets.
* @throws NullPointerException if {@code connectionManagerCallback} is {@code null}.
*/
public Rest5ClientBuilder setConnectionManagerCallback(Consumer<PoolingAsyncClientConnectionManagerBuilder> connectionManagerCallback) {
Objects.requireNonNull(connectionManagerCallback, "connectionManagerCallback must not be null");
this.connectionManagerCallback = connectionManagerCallback;
return this;
}

/**
* Allows to monitor the {@link PoolingAsyncClientConnectionManager} after it has been created.
* This can be used to add custom monitoring or alerting for connection pool usage.
*/
public Rest5ClientBuilder setConnectionManagerMonitor(Consumer<PoolingAsyncClientConnectionManager> connectionManagerMonitor) {
this.connectionManagerMonitor = connectionManagerMonitor;
return this;
}

/**
* Creates a new {@link Rest5Client} based on the provided configuration.
*/
Expand Down Expand Up @@ -446,20 +454,29 @@ private CloseableHttpAsyncClient createHttpClient() {
connectionConfigCallback.accept(connectionConfigBuilder);
}

PoolingAsyncClientConnectionManagerBuilder connectionManagerBuilder =
PoolingAsyncClientConnectionManagerBuilder connectionManagerBuilder =
PoolingAsyncClientConnectionManagerBuilder.create()
.setDefaultConnectionConfig(connectionConfigBuilder.build())
.setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE)
.setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL)
.setTlsStrategy(new DefaultClientTlsStrategy(sslContext));
.setTlsStrategy(new DefaultClientTlsStrategy(sslContext))
// 配置连接池定期清理机制
.setValidateAfterInactivity(Timeout.of(5, TimeUnit.SECONDS))
// 配置连接存活时间
.setConnectionTimeToLive(Timeout.of(30, TimeUnit.MINUTES));

if (connectionManagerCallback != null) {
connectionManagerCallback.accept(connectionManagerBuilder);
}
PoolingAsyncClientConnectionManager connectionManager = connectionManagerBuilder.build();
// 调用连接池监控回调
if (connectionManagerMonitor != null) {
connectionManagerMonitor.accept(connectionManager);
}

HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create()
.setDefaultRequestConfig(requestConfigBuilder.build())
.setConnectionManager(connectionManagerBuilder.build())
.setConnectionManager(connectionManager)
.setUserAgent(USER_AGENT_HEADER_VALUE)
.setTargetAuthenticationStrategy(new DefaultAuthenticationStrategy())
.setThreadFactory(new RestClientThreadFactory());
Expand Down