From fbacbaf8788021601651659cd1ae9e6a497110db Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Thu, 23 Oct 2025 12:18:25 +0200 Subject: [PATCH 1/7] feat: add API to request cluster/nodes info Extended the test framework to support starting a cluster of several nodes --- .../java/io/weaviate/containers/Weaviate.java | 116 +++++++++++++++++- .../io/weaviate/integration/ClusterITest.java | 53 ++++++++ .../client6/v1/api/WeaviateClient.java | 10 +- .../api/cluster/AsyncReplicationStatus.java | 9 ++ .../v1/api/cluster/CollectionStats.java | 8 ++ .../v1/api/cluster/ListNodesRequest.java | 64 ++++++++++ .../v1/api/cluster/ListNodesResponse.java | 6 + .../v1/api/cluster/ListShardsRequest.java | 55 +++++++++ .../v1/api/cluster/ListShardsResponse.java | 6 + .../weaviate/client6/v1/api/cluster/Node.java | 16 +++ .../client6/v1/api/cluster/NodeVerbosity.java | 10 ++ .../client6/v1/api/cluster/Shard.java | 15 +++ .../client6/v1/api/cluster/ShardReplica.java | 6 + .../client6/v1/api/cluster/ShardingState.java | 10 ++ .../v1/api/cluster/VectorIndexingStatus.java | 12 ++ .../v1/api/cluster/WeaviateClusterClient.java | 79 ++++++++++++ .../client6/v1/api/rbac/NodesPermission.java | 13 +- .../client6/v1/api/rbac/Permission.java | 8 +- .../client6/v1/internal/rest/UrlEncoder.java | 5 +- .../client6/v1/internal/json/JSONTest.java | 4 +- 20 files changed, 483 insertions(+), 22 deletions(-) create mode 100644 src/it/java/io/weaviate/integration/ClusterITest.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/AsyncReplicationStatus.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/CollectionStats.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/ListNodesRequest.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/ListNodesResponse.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/ListShardsRequest.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/ListShardsResponse.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/Node.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/NodeVerbosity.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/Shard.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/ShardReplica.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/ShardingState.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/VectorIndexingStatus.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClient.java diff --git a/src/it/java/io/weaviate/containers/Weaviate.java b/src/it/java/io/weaviate/containers/Weaviate.java index a89714bd..b214c49c 100644 --- a/src/it/java/io/weaviate/containers/Weaviate.java +++ b/src/it/java/io/weaviate/containers/Weaviate.java @@ -1,13 +1,19 @@ package io.weaviate.containers; import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.lifecycle.Startable; import org.testcontainers.weaviate.WeaviateContainer; import io.weaviate.client6.v1.api.Config; @@ -20,6 +26,7 @@ public class Weaviate extends WeaviateContainer { public static String OIDC_ISSUER = "https://auth.wcs.api.weaviate.io/auth/realms/SeMI"; private volatile SharedClient clientInstance; + private final String containerName; /** * Create a new instance of WeaviateClient connected to this container if none @@ -85,17 +92,22 @@ public static Weaviate.Builder custom() { } public static class Builder { - private String versionTag; + private String versionTag = VERSION; + private String containerName = "weaviate"; private Set enableModules = new HashSet<>(); private Set adminUsers = new HashSet<>(); private Set viewerUsers = new HashSet<>(); private Map environment = new HashMap<>(); public Builder() { - this.versionTag = VERSION; enableAutoSchema(false); } + public Builder withContainerName(String containerName) { + this.containerName = containerName; + return this; + } + public Builder withVersion(String version) { this.versionTag = version; return this; @@ -138,6 +150,7 @@ public Builder withFilesystemBackup(String fsPath) { environment.put("BACKUP_FILESYSTEM_PATH", fsPath); return this; } + public Builder withAdminUsers(String... admins) { adminUsers.addAll(Arrays.asList(admins)); return this; @@ -195,7 +208,7 @@ public Builder withOIDC(String clientId, String issuer, String usernameClaim, St } public Weaviate build() { - var c = new Weaviate(DOCKER_IMAGE + ":" + versionTag); + var c = new Weaviate(containerName, DOCKER_IMAGE + ":" + versionTag); if (!enableModules.isEmpty()) { c.withEnv("ENABLE_API_BASED_MODULES", Boolean.TRUE.toString()); @@ -217,13 +230,18 @@ public Weaviate build() { } environment.forEach((name, value) -> c.withEnv(name, value)); - c.withCreateContainerCmdModifier(cmd -> cmd.withHostName("weaviate")); + c.withCreateContainerCmdModifier(cmd -> cmd.withName(containerName).withHostName(containerName)); return c; } } - private Weaviate(String dockerImageName) { + private Weaviate() { + this("weaviate", DOCKER_IMAGE + ":" + VERSION); + } + + private Weaviate(String containerName, String dockerImageName) { super(dockerImageName); + this.containerName = containerName; } @Override @@ -264,4 +282,92 @@ private void close(Weaviate caller) throws Exception { public void close() throws IOException { } } + + public static Weaviate cluster(int replicas) { + List nodes = new ArrayList<>(); + for (var i = 0; i < replicas; i++) { + nodes.add(Weaviate.custom() + .withContainerName("weaviate-" + i) + .build()); + } + return new Cluster(nodes); + } + + public static class Cluster extends Weaviate { + /** Leader and followers combined. */ + private final List nodes; + + private final Weaviate leader; + private final List followers; + + private Cluster(List nodes) { + assert nodes.size() > 1 : "cluster must have 1+ nodes"; + + this.nodes = List.copyOf(nodes); + this.leader = nodes.remove(0); + this.followers = List.copyOf(nodes); + + for (var follower : followers) { + follower.dependsOn(leader); + } + setNetwork(Network.SHARED); + bindNodes(7110, 7111, 8300); + } + + @Override + public WeaviateContainer dependsOn(List startables) { + leader.dependsOn(startables); + return this; + } + + @Override + public void setNetwork(Network network) { + nodes.forEach(node -> node.setNetwork(network)); + } + + @Override + public WeaviateClient getClient() { + if (!isRunning()) { + start(); + } + return leader.getClient(); + } + + @Override + public void start() { + followers.forEach(Startable::start); // testcontainers will resolve dependencies + } + + @Override + public void stop() { + followers.forEach(Startable::stop); + leader.stop(); + } + + /** + * Set environment variables for inter-cluster communication. + * + * @param gossip Gossip bind port. + * @param data Data bind port. + * @param raft RAFT port. + */ + private void bindNodes(int gossip, int data, int raft) { + var publicPort = leader.getExposedPorts().get(0); // see WeaviateContainer Testcontainer. + + nodes.forEach(node -> node + .withEnv("CLUSTER_GOSSIP_BIND_PORT", String.valueOf(gossip)) + .withEnv("CLUSTER_DATA_BIND_PORT", String.valueOf(data)) + .withEnv("RAFT_PORT", String.valueOf(raft)) + .withEnv("RAFT_BOOTSTRAP_EXPECT", "1") + .withEnv("RAFT_JOIN", leader.containerName)); + + followers.forEach(node -> node + .withEnv("CLUSTER_JOIN", leader.containerName + ":" + gossip) + .waitingFor( + Wait.forHttp("/v1/.well-known/ready") + .forPort(publicPort) + .forStatusCode(200) + .withStartupTimeout(Duration.ofSeconds(10)))); + } + } } diff --git a/src/it/java/io/weaviate/integration/ClusterITest.java b/src/it/java/io/weaviate/integration/ClusterITest.java new file mode 100644 index 00000000..58d49fab --- /dev/null +++ b/src/it/java/io/weaviate/integration/ClusterITest.java @@ -0,0 +1,53 @@ +package io.weaviate.integration; + +import java.io.IOException; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import io.weaviate.ConcurrentTest; +import io.weaviate.client6.v1.api.WeaviateClient; +import io.weaviate.client6.v1.api.cluster.ShardingState; +import io.weaviate.containers.Weaviate; + +public class ClusterITest extends ConcurrentTest { + private static final WeaviateClient client = Weaviate.cluster(3).getClient(); + + @Test + public void test_shardingState() throws IOException { + // Arrange + var nsA = ns("A"); + var nsB = ns("B"); + + client.collections.create(nsA, + a -> a.replication(r -> r.replicationFactor(2))); + client.collections.create(nsB, + b -> b.replication(r -> r.replicationFactor(3))); + + // Act + var optShardsA = client.cluster.shardingState(nsA); + var optShardsB = client.cluster.shardingState(nsB); + + // Assert + var shardsA = Assertions.assertThat(optShardsA).get() + .returns(nsA, ShardingState::collection) + .extracting(ShardingState::shards) + .actual(); + + var shardsB = Assertions.assertThat(optShardsB).get() + .returns(nsB, ShardingState::collection) + .extracting(ShardingState::shards) + .actual(); + + Assertions.assertThat(shardsA).doesNotContainAnyElementsOf(shardsB); + } + + @Test + public void test_listNodes() throws IOException { + // Act + var allNodes = client.cluster.listNodes(); + + // Assert + Assertions.assertThat(allNodes).as("total no. nodes").hasSize(3); + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java b/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java index 9bf44552..d93f7eb8 100644 --- a/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java +++ b/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java @@ -5,6 +5,7 @@ import io.weaviate.client6.v1.api.alias.WeaviateAliasClient; import io.weaviate.client6.v1.api.backup.WeaviateBackupClient; +import io.weaviate.client6.v1.api.cluster.WeaviateClusterClient; import io.weaviate.client6.v1.api.collections.WeaviateCollectionsClient; import io.weaviate.client6.v1.api.rbac.groups.WeaviateGroupsClient; import io.weaviate.client6.v1.api.rbac.roles.WeaviateRolesClient; @@ -37,7 +38,7 @@ public class WeaviateClient implements AutoCloseable { /** Client for {@code /backups} endpoints for managing backups. */ public final WeaviateBackupClient backup; - + /** * Client for {@code /authz/roles} endpoints for managing RBAC roles. */ @@ -53,6 +54,12 @@ public class WeaviateClient implements AutoCloseable { */ public final WeaviateUsersClient users; + /** + * Client for {@code /nodes} and {@code /replication} endpoints + * for managing replication and sharding. + */ + public final WeaviateClusterClient cluster; + public WeaviateClient(Config config) { RestTransportOptions restOpt; GrpcChannelOptions grpcOpt; @@ -108,6 +115,7 @@ public WeaviateClient(Config config) { this.roles = new WeaviateRolesClient(restTransport); this.groups = new WeaviateGroupsClient(restTransport); this.users = new WeaviateUsersClient(restTransport); + this.cluster = new WeaviateClusterClient(restTransport); this.config = config; } diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/AsyncReplicationStatus.java b/src/main/java/io/weaviate/client6/v1/api/cluster/AsyncReplicationStatus.java new file mode 100644 index 00000000..9eac8f64 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/AsyncReplicationStatus.java @@ -0,0 +1,9 @@ +package io.weaviate.client6.v1.api.cluster; + +import com.google.gson.annotations.SerializedName; + +public record AsyncReplicationStatus( + @SerializedName("objectsPropagated") long objectsPropagated, + @SerializedName("startDiffTimeUnixMillis") long startDiffTimeUnixMillis, + @SerializedName("targetNode") String targetNode) { +} diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/CollectionStats.java b/src/main/java/io/weaviate/client6/v1/api/cluster/CollectionStats.java new file mode 100644 index 00000000..7c74e507 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/CollectionStats.java @@ -0,0 +1,8 @@ +package io.weaviate.client6.v1.api.cluster; + +import com.google.gson.annotations.SerializedName; + +public record CollectionStats( + @SerializedName("shardCount") int shardCount, + @SerializedName("objectCount") long objectCount) { +} diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/ListNodesRequest.java b/src/main/java/io/weaviate/client6/v1/api/cluster/ListNodesRequest.java new file mode 100644 index 00000000..93583d7b --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/ListNodesRequest.java @@ -0,0 +1,64 @@ +package io.weaviate.client6.v1.api.cluster; + +import java.util.HashMap; +import java.util.List; +import java.util.function.Function; + +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.json.JSON; +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.SimpleEndpoint; + +public record ListNodesRequest(String collection, String shard, NodeVerbosity verbosity) { + + static final Endpoint> _ENDPOINT = SimpleEndpoint.noBody( + __ -> "GET", + request -> "/nodes" + (request.collection != null + ? "/" + request.collection + : ""), + request -> new HashMap<>() { // HashMap permits null values. + { + put("shardName", request.shard); + put("output", request.verbosity); + } + }, + (statusCode, response) -> JSON.deserialize(response, ListNodesResponse.class).nodes()); + + public static ListNodesRequest of() { + return of(ObjectBuilder.identity()); + } + + public static ListNodesRequest of(Function> fn) { + return fn.apply(new Builder()).build(); + } + + public ListNodesRequest(Builder builder) { + this(builder.collection, builder.shard, builder.verbosity); + } + + public static class Builder implements ObjectBuilder { + private String collection; + private String shard; + private NodeVerbosity verbosity; + + public Builder collection(String collection) { + this.collection = collection; + return this; + } + + public Builder shard(String shard) { + this.shard = shard; + return this; + } + + public Builder verbosity(NodeVerbosity verbosity) { + this.verbosity = verbosity; + return this; + } + + @Override + public ListNodesRequest build() { + return new ListNodesRequest(this); + } + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/ListNodesResponse.java b/src/main/java/io/weaviate/client6/v1/api/cluster/ListNodesResponse.java new file mode 100644 index 00000000..775a8f49 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/ListNodesResponse.java @@ -0,0 +1,6 @@ +package io.weaviate.client6.v1.api.cluster; + +import java.util.List; + +public record ListNodesResponse(List nodes) { +} diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/ListShardsRequest.java b/src/main/java/io/weaviate/client6/v1/api/cluster/ListShardsRequest.java new file mode 100644 index 00000000..604a247f --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/ListShardsRequest.java @@ -0,0 +1,55 @@ +package io.weaviate.client6.v1.api.cluster; + +import java.util.HashMap; +import java.util.Optional; +import java.util.function.Function; + +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.json.JSON; +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.OptionalEndpoint; + +public record ListShardsRequest(String collection, String shard) { + + static final Endpoint> _ENDPOINT = OptionalEndpoint.noBodyOptional( + __ -> "GET", + __ -> "/replication/sharding-state", + request -> new HashMap<>() { // `shard` can be null, HashMap permits null values. + { + put("collection", request.collection); + put("shard", request.shard); + } + }, + (statusCode, response) -> JSON.deserialize(response, ListShardsResponse.class).shardingState()); + + public static ListShardsRequest of(String collection) { + return of(collection, ObjectBuilder.identity()); + } + + public static ListShardsRequest of(String collection, Function> fn) { + return fn.apply(new Builder(collection)).build(); + } + + public ListShardsRequest(Builder builder) { + this(builder.collection, builder.shard); + } + + public static class Builder implements ObjectBuilder { + private final String collection; + private String shard; + + public Builder(String collection) { + this.collection = collection; + } + + public Builder shard(String shard) { + this.shard = shard; + return this; + } + + @Override + public ListShardsRequest build() { + return new ListShardsRequest(this); + } + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/ListShardsResponse.java b/src/main/java/io/weaviate/client6/v1/api/cluster/ListShardsResponse.java new file mode 100644 index 00000000..d8a66210 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/ListShardsResponse.java @@ -0,0 +1,6 @@ +package io.weaviate.client6.v1.api.cluster; + +import com.google.gson.annotations.SerializedName; + +public record ListShardsResponse(@SerializedName("shardingState") ShardingState shardingState) { +} diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/Node.java b/src/main/java/io/weaviate/client6/v1/api/cluster/Node.java new file mode 100644 index 00000000..e7e9f9ed --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/Node.java @@ -0,0 +1,16 @@ +package io.weaviate.client6.v1.api.cluster; + +import java.util.List; + +import com.google.gson.annotations.SerializedName; + +import io.weaviate.client6.v1.api.collections.config.ShardStatus; + +public record Node( + @SerializedName("name") String name, + @SerializedName("status") String status, + @SerializedName("gitHash") String gitHash, + @SerializedName("version") String version, + @SerializedName("stats") CollectionStats stats, + @SerializedName("shards") List shards) { +} diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/NodeVerbosity.java b/src/main/java/io/weaviate/client6/v1/api/cluster/NodeVerbosity.java new file mode 100644 index 00000000..abdcbe0e --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/NodeVerbosity.java @@ -0,0 +1,10 @@ +package io.weaviate.client6.v1.api.cluster; + +import com.google.gson.annotations.SerializedName; + +public enum NodeVerbosity { + @SerializedName("minimal") + MINIMAL, + @SerializedName("verbose") + VERBOSE; +} diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/Shard.java b/src/main/java/io/weaviate/client6/v1/api/cluster/Shard.java new file mode 100644 index 00000000..e5625a83 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/Shard.java @@ -0,0 +1,15 @@ +package io.weaviate.client6.v1.api.cluster; + +import com.google.gson.annotations.SerializedName; + +public record Shard( + @SerializedName("name") String name, + @SerializedName("class") String collection, + @SerializedName("objectCount") int objectCount, + @SerializedName("vectorIndexingStatus") VectorIndexingStatus vectorIndexingStatus, + @SerializedName("vectorQueueLenght") int vectorQueueLenght, + @SerializedName("compressed") boolean compressed, + @SerializedName("loaded") boolean loaded, + @SerializedName("numberOfReplicas") int numberOfReplicas, + @SerializedName("replicationFactor") int replicationFactor) { +} diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/ShardReplica.java b/src/main/java/io/weaviate/client6/v1/api/cluster/ShardReplica.java new file mode 100644 index 00000000..4cd6304e --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/ShardReplica.java @@ -0,0 +1,6 @@ +package io.weaviate.client6.v1.api.cluster; + +import java.util.List; + +public record ShardReplica(String shardName, List replicas) { +} diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/ShardingState.java b/src/main/java/io/weaviate/client6/v1/api/cluster/ShardingState.java new file mode 100644 index 00000000..dbca6a79 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/ShardingState.java @@ -0,0 +1,10 @@ +package io.weaviate.client6.v1.api.cluster; + +import java.util.List; + +import com.google.gson.annotations.SerializedName; + +public record ShardingState( + @SerializedName("collection") String collection, + @SerializedName("shards") List shards) { +} diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/VectorIndexingStatus.java b/src/main/java/io/weaviate/client6/v1/api/cluster/VectorIndexingStatus.java new file mode 100644 index 00000000..b1e58a41 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/VectorIndexingStatus.java @@ -0,0 +1,12 @@ +package io.weaviate.client6.v1.api.cluster; + +import com.google.gson.annotations.SerializedName; + +public enum VectorIndexingStatus { + @SerializedName("READONLY") + READONLY, + @SerializedName("INDEXING") + INDEXING, + @SerializedName("READY") + READY; +} diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClient.java b/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClient.java new file mode 100644 index 00000000..862d7464 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClient.java @@ -0,0 +1,79 @@ +package io.weaviate.client6.v1.api.cluster; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +import io.weaviate.client6.v1.api.WeaviateApiException; +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.rest.RestTransport; + +public class WeaviateClusterClient { + private final RestTransport restTransport; + + public WeaviateClusterClient(RestTransport restTransport) { + this.restTransport = restTransport; + } + + /** + * Query sharding state of a collection. + * + * @param collection Collection name. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Optional shardingState(String collection) throws IOException { + return this.restTransport.performRequest(ListShardsRequest.of(collection), ListShardsRequest._ENDPOINT); + } + + /** + * Query sharding state of a collection. + * + * @param collection Collection name. + * @param fn Lambda expression for optional parameters. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Optional shardingState(String collection, + Function> fn) + throws IOException { + return this.restTransport.performRequest(ListShardsRequest.of(collection, fn), ListShardsRequest._ENDPOINT); + } + + /** + * Get the status of all nodes in the cluster. + * + * @param fn Lambda expression for optional parameters. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public List listNodes() + throws IOException { + return this.restTransport.performRequest(ListNodesRequest.of(), ListNodesRequest._ENDPOINT); + } + + /** + * Get the status of all nodes in the cluster. + * + * @param fn Lambda expression for optional parameters. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public List listNodes(Function> fn) + throws IOException { + return this.restTransport.performRequest(ListNodesRequest.of(fn), ListNodesRequest._ENDPOINT); + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/rbac/NodesPermission.java b/src/main/java/io/weaviate/client6/v1/api/rbac/NodesPermission.java index 1d3919c0..2607f19b 100644 --- a/src/main/java/io/weaviate/client6/v1/api/rbac/NodesPermission.java +++ b/src/main/java/io/weaviate/client6/v1/api/rbac/NodesPermission.java @@ -5,12 +5,14 @@ import com.google.gson.annotations.SerializedName; +import io.weaviate.client6.v1.api.cluster.NodeVerbosity; + public record NodesPermission( @SerializedName("collection") String collection, - @SerializedName("verbosity") Verbosity verbosity, + @SerializedName("verbosity") NodeVerbosity verbosity, @SerializedName("actions") List actions) implements Permission { - public NodesPermission(String collection, Verbosity verbosity, Action... actions) { + public NodesPermission(String collection, NodeVerbosity verbosity, Action... actions) { this(collection, verbosity, Arrays.asList(actions)); } @@ -39,11 +41,4 @@ public String jsonValue() { return jsonValue; } } - - public enum Verbosity { - @SerializedName("minimal") - MINIMAL, - @SerializedName("verbose") - VERBOSE; - } } diff --git a/src/main/java/io/weaviate/client6/v1/api/rbac/Permission.java b/src/main/java/io/weaviate/client6/v1/api/rbac/Permission.java index 0d088b85..9374c4e3 100644 --- a/src/main/java/io/weaviate/client6/v1/api/rbac/Permission.java +++ b/src/main/java/io/weaviate/client6/v1/api/rbac/Permission.java @@ -20,7 +20,7 @@ import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonWriter; -import io.weaviate.client6.v1.api.rbac.NodesPermission.Verbosity; +import io.weaviate.client6.v1.api.cluster.NodeVerbosity; import io.weaviate.client6.v1.api.rbac.RolesPermission.Scope; import io.weaviate.client6.v1.api.rbac.groups.GroupType; import io.weaviate.client6.v1.internal.json.JsonEnum; @@ -115,18 +115,18 @@ public static GroupsPermission groups(String groupId, GroupType groupType, Group /** * Create {@link NodesPermission} scoped to all collections. */ - public static NodesPermission nodes(NodesPermission.Verbosity verbosity, NodesPermission.Action... actions) { + public static NodesPermission nodes(NodeVerbosity verbosity, NodesPermission.Action... actions) { checkDeprecation(actions); return new NodesPermission("*", verbosity, actions); } /** * Create {@link NodesPermission} scoped to a specific collection. Verbosity is - * set to {@link Verbosity#VERBOSE} by default. + * set to {@link NodeVerbosity#VERBOSE} by default. */ public static NodesPermission nodes(String collection, NodesPermission.Action... actions) { checkDeprecation(actions); - return new NodesPermission(collection, Verbosity.VERBOSE, actions); + return new NodesPermission(collection, NodeVerbosity.VERBOSE, actions); } /** diff --git a/src/main/java/io/weaviate/client6/v1/internal/rest/UrlEncoder.java b/src/main/java/io/weaviate/client6/v1/internal/rest/UrlEncoder.java index 2b53f1c8..2bade27b 100644 --- a/src/main/java/io/weaviate/client6/v1/internal/rest/UrlEncoder.java +++ b/src/main/java/io/weaviate/client6/v1/internal/rest/UrlEncoder.java @@ -28,7 +28,7 @@ public static String encodeQuery(Map queryParams) { if (queryParams == null || queryParams.isEmpty()) { return ""; } - return queryParams.entrySet().stream() + var query = queryParams.entrySet().stream() .filter(qp -> { if (qp == null) { return false; @@ -38,7 +38,10 @@ public static String encodeQuery(Map queryParams) { } return true; }) + .filter(qp -> qp.getKey() != null && qp.getValue() != null) .map(qp -> qp.getKey() + "=" + encodeValue(qp.getValue())) .collect(Collectors.joining("&", "?", "")); + + return query.equals("?") ? "" : query; } } diff --git a/src/test/java/io/weaviate/client6/v1/internal/json/JSONTest.java b/src/test/java/io/weaviate/client6/v1/internal/json/JSONTest.java index f6f15e0d..6cb8fcad 100644 --- a/src/test/java/io/weaviate/client6/v1/internal/json/JSONTest.java +++ b/src/test/java/io/weaviate/client6/v1/internal/json/JSONTest.java @@ -14,6 +14,7 @@ import com.jparams.junit4.JParamsTestRunner; import com.jparams.junit4.data.DataMethod; +import io.weaviate.client6.v1.api.cluster.NodeVerbosity; import io.weaviate.client6.v1.api.collections.CollectionConfig; import io.weaviate.client6.v1.api.collections.Encoding; import io.weaviate.client6.v1.api.collections.Generative; @@ -48,7 +49,6 @@ import io.weaviate.client6.v1.api.rbac.DataPermission; import io.weaviate.client6.v1.api.rbac.GroupsPermission; import io.weaviate.client6.v1.api.rbac.NodesPermission; -import io.weaviate.client6.v1.api.rbac.NodesPermission.Verbosity; import io.weaviate.client6.v1.api.rbac.ReplicatePermission; import io.weaviate.client6.v1.api.rbac.Role; import io.weaviate.client6.v1.api.rbac.RolesPermission; @@ -655,7 +655,7 @@ public static Object[][] testCases() { List.of( new NodesPermission( "Collection", - Verbosity.MINIMAL, + NodeVerbosity.MINIMAL, List.of(NodesPermission.Action.READ)))), """ { From 5525aa6d38166ac4568a8cfde11ab30264cdaa16 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Thu, 23 Oct 2025 12:23:05 +0200 Subject: [PATCH 2/7] feat: extend cluster/nodes to async client --- .../client6/v1/api/WeaviateClientAsync.java | 8 +++ .../cluster/WeaviateClusterClientAsync.java | 59 +++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClientAsync.java diff --git a/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java b/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java index 8a797f0f..85f6103d 100644 --- a/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java +++ b/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java @@ -6,6 +6,7 @@ import io.weaviate.client6.v1.api.alias.WeaviateAliasClientAsync; import io.weaviate.client6.v1.api.backup.WeaviateBackupClientAsync; +import io.weaviate.client6.v1.api.cluster.WeaviateClusterClientAsync; import io.weaviate.client6.v1.api.collections.WeaviateCollectionsClient; import io.weaviate.client6.v1.api.collections.WeaviateCollectionsClientAsync; import io.weaviate.client6.v1.api.rbac.groups.WeaviateGroupsClientAsync; @@ -52,6 +53,12 @@ public class WeaviateClientAsync implements AutoCloseable { */ public final WeaviateUsersClientAsync users; + /** + * Client for {@code /nodes} and {@code /replication} endpoints + * for managing replication and sharding. + */ + public final WeaviateClusterClientAsync cluster; + /** * This constructor is blocking if {@link Authentication} configured, * as the client will need to do the initial token exchange. @@ -110,6 +117,7 @@ public WeaviateClientAsync(Config config) { this.roles = new WeaviateRolesClientAsync(restTransport); this.groups = new WeaviateGroupsClientAsync(restTransport); this.users = new WeaviateUsersClientAsync(restTransport); + this.cluster = new WeaviateClusterClientAsync(restTransport); this.collections = new WeaviateCollectionsClientAsync(restTransport, grpcTransport); } diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClientAsync.java b/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClientAsync.java new file mode 100644 index 00000000..1d62873d --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClientAsync.java @@ -0,0 +1,59 @@ +package io.weaviate.client6.v1.api.cluster; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.rest.RestTransport; + +public class WeaviateClusterClientAsync { + private final RestTransport restTransport; + + public WeaviateClusterClientAsync(RestTransport restTransport) { + this.restTransport = restTransport; + } + + /** + * Query sharding state of a collection. + * + * @param collection Collection name. + */ + public CompletableFuture> shardingState(String collection) throws IOException { + return this.restTransport.performRequestAsync(ListShardsRequest.of(collection), ListShardsRequest._ENDPOINT); + } + + /** + * Query sharding state of a collection. + * + * @param collection Collection name. + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture> shardingState(String collection, + Function> fn) + throws IOException { + return this.restTransport.performRequestAsync(ListShardsRequest.of(collection, fn), ListShardsRequest._ENDPOINT); + } + + /** + * Get the status of all nodes in the cluster. + * + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture> listNodes() + throws IOException { + return this.restTransport.performRequestAsync(ListNodesRequest.of(), ListNodesRequest._ENDPOINT); + } + + /** + * Get the status of all nodes in the cluster. + * + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture> listNodes(Function> fn) + throws IOException { + return this.restTransport.performRequestAsync(ListNodesRequest.of(fn), ListNodesRequest._ENDPOINT); + } +} From 7c6b972f51c76ebe3f1c3e8617912f339db9a61f Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Thu, 23 Oct 2025 14:16:40 +0200 Subject: [PATCH 3/7] chore: fix javadoc --- .../weaviate/client6/v1/api/cluster/WeaviateClusterClient.java | 1 - .../client6/v1/api/cluster/WeaviateClusterClientAsync.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClient.java b/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClient.java index 862d7464..f3deee2a 100644 --- a/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClient.java +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClient.java @@ -50,7 +50,6 @@ public Optional shardingState(String collection, /** * Get the status of all nodes in the cluster. * - * @param fn Lambda expression for optional parameters. * @throws WeaviateApiException in case the server returned with an * error status code. * @throws IOException in case the request was not sent successfully diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClientAsync.java b/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClientAsync.java index 1d62873d..b33d7225 100644 --- a/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClientAsync.java +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClientAsync.java @@ -39,8 +39,6 @@ public CompletableFuture> shardingState(String collectio /** * Get the status of all nodes in the cluster. - * - * @param fn Lambda expression for optional parameters. */ public CompletableFuture> listNodes() throws IOException { From 60e1e34d25bc73dcc68151f195af6bb161cd53ad Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Thu, 23 Oct 2025 14:43:13 +0200 Subject: [PATCH 4/7] test: use random container name --- .../java/io/weaviate/containers/Weaviate.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/it/java/io/weaviate/containers/Weaviate.java b/src/it/java/io/weaviate/containers/Weaviate.java index b214c49c..e9a80284 100644 --- a/src/it/java/io/weaviate/containers/Weaviate.java +++ b/src/it/java/io/weaviate/containers/Weaviate.java @@ -28,6 +28,21 @@ public class Weaviate extends WeaviateContainer { private volatile SharedClient clientInstance; private final String containerName; + /** + * By default, testcontainer's name is only available after calling + * {@link #start}. + * We need to know each container's name in advance to run a cluster + * of several nodes, in which case we alse set the name manually. + * + * @see Builder#build() + */ + @Override + public String getContainerName() { + return containerName != null + ? containerName + : super.getContainerName(); + } + /** * Create a new instance of WeaviateClient connected to this container if none * exist. Get an existing client otherwise. @@ -230,7 +245,7 @@ public Weaviate build() { } environment.forEach((name, value) -> c.withEnv(name, value)); - c.withCreateContainerCmdModifier(cmd -> cmd.withName(containerName).withHostName(containerName)); + c.withCreateContainerCmdModifier(cmd -> cmd.withHostName(containerName)); return c; } } From db29b5206dcac5d4e1f41fa380097222ed555ce1 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Thu, 23 Oct 2025 14:44:46 +0200 Subject: [PATCH 5/7] test: set RAFT_JOIN value for follower nodes only --- src/it/java/io/weaviate/containers/Weaviate.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/it/java/io/weaviate/containers/Weaviate.java b/src/it/java/io/weaviate/containers/Weaviate.java index e9a80284..baa430ce 100644 --- a/src/it/java/io/weaviate/containers/Weaviate.java +++ b/src/it/java/io/weaviate/containers/Weaviate.java @@ -373,11 +373,11 @@ private void bindNodes(int gossip, int data, int raft) { .withEnv("CLUSTER_GOSSIP_BIND_PORT", String.valueOf(gossip)) .withEnv("CLUSTER_DATA_BIND_PORT", String.valueOf(data)) .withEnv("RAFT_PORT", String.valueOf(raft)) - .withEnv("RAFT_BOOTSTRAP_EXPECT", "1") - .withEnv("RAFT_JOIN", leader.containerName)); + .withEnv("RAFT_BOOTSTRAP_EXPECT", "1")); followers.forEach(node -> node .withEnv("CLUSTER_JOIN", leader.containerName + ":" + gossip) + .withEnv("RAFT_JOIN", leader.containerName) .waitingFor( Wait.forHttp("/v1/.well-known/ready") .forPort(publicPort) From 7721136f2c34397c17d8e8bbdaa642a9ee2a9e36 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Tue, 28 Oct 2025 11:57:07 +0100 Subject: [PATCH 6/7] chore: delete redundant import --- src/main/java/io/weaviate/client6/v1/api/cluster/Node.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/Node.java b/src/main/java/io/weaviate/client6/v1/api/cluster/Node.java index e7e9f9ed..ae85bb73 100644 --- a/src/main/java/io/weaviate/client6/v1/api/cluster/Node.java +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/Node.java @@ -4,8 +4,6 @@ import com.google.gson.annotations.SerializedName; -import io.weaviate.client6.v1.api.collections.config.ShardStatus; - public record Node( @SerializedName("name") String name, @SerializedName("status") String status, From 8196d52dfd9cfa693faf4a7d2136dcc3310951cf Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Tue, 28 Oct 2025 12:20:25 +0100 Subject: [PATCH 7/7] refactor: introduce enum for node status --- .../weaviate/client6/v1/api/cluster/Node.java | 14 +++++++++++++- .../client6/v1/api/cluster/NodeStatus.java | 18 ++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/weaviate/client6/v1/api/cluster/NodeStatus.java diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/Node.java b/src/main/java/io/weaviate/client6/v1/api/cluster/Node.java index ae85bb73..becebc39 100644 --- a/src/main/java/io/weaviate/client6/v1/api/cluster/Node.java +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/Node.java @@ -6,9 +6,21 @@ public record Node( @SerializedName("name") String name, - @SerializedName("status") String status, + @SerializedName("status") NodeStatus status, + /** Commit hash of the Weaviate build the node is running. */ @SerializedName("gitHash") String gitHash, + /** Weaviate version the node is running. */ @SerializedName("version") String version, + /** + * Can be {@code null} if "minimal" output is requested. + * + * @see NodeVerbosity#MINIMAL. + */ @SerializedName("stats") CollectionStats stats, + /** + * Can be {@code null} if "minimal" output is requested. + * + * @see NodeVerbosity#MINIMAL. + */ @SerializedName("shards") List shards) { } diff --git a/src/main/java/io/weaviate/client6/v1/api/cluster/NodeStatus.java b/src/main/java/io/weaviate/client6/v1/api/cluster/NodeStatus.java new file mode 100644 index 00000000..46d9249f --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/cluster/NodeStatus.java @@ -0,0 +1,18 @@ +package io.weaviate.client6.v1.api.cluster; + +import com.google.gson.annotations.SerializedName; + +public enum NodeStatus { + /** The node is functional and operating normally. */ + @SerializedName("HEALTHY") + HEALTHY, + /** The node is down after encountering a problem. */ + @SerializedName("UNHEALTHY") + UNHEALTHY, + /** The node is not available. */ + @SerializedName("UNAVAILABLE") + UNAVAILABLE, + /** Liveness probe to a node timed out. */ + @SerializedName("TIMEOUT") + TIMEOUT; +};