From 7ba12cc4bb03a4e1d996f8fb4c66a3994d7dd008 Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Wed, 5 Nov 2025 22:07:17 +0530 Subject: [PATCH 01/11] feat: add OpenAPI generated model and api layer --- .openapi-generator/FILES | 4 + README.md | 5 + docs/OpenFgaApi.md | 163 +++++++++++++++++ ...reamResultOfStreamedListObjectsResponse.md | 14 ++ docs/StreamedListObjectsResponse.md | 14 ++ .../java/dev/openfga/sdk/api/OpenFgaApi.java | 63 +++++++ ...amResultOfStreamedListObjectsResponse.java | 168 ++++++++++++++++++ .../model/StreamedListObjectsResponse.java | 139 +++++++++++++++ 8 files changed, 570 insertions(+) create mode 100644 docs/StreamResultOfStreamedListObjectsResponse.md create mode 100644 docs/StreamedListObjectsResponse.md create mode 100644 src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java create mode 100644 src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java diff --git a/.openapi-generator/FILES b/.openapi-generator/FILES index ddc3f045..9290b751 100644 --- a/.openapi-generator/FILES +++ b/.openapi-generator/FILES @@ -67,6 +67,8 @@ docs/RelationshipCondition.md docs/SourceInfo.md docs/Status.md docs/Store.md +docs/StreamResultOfStreamedListObjectsResponse.md +docs/StreamedListObjectsResponse.md docs/Tuple.md docs/TupleChange.md docs/TupleKey.md @@ -155,6 +157,8 @@ src/main/java/dev/openfga/sdk/api/model/RelationshipCondition.java src/main/java/dev/openfga/sdk/api/model/SourceInfo.java src/main/java/dev/openfga/sdk/api/model/Status.java src/main/java/dev/openfga/sdk/api/model/Store.java +src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java +src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java src/main/java/dev/openfga/sdk/api/model/Tuple.java src/main/java/dev/openfga/sdk/api/model/TupleChange.java src/main/java/dev/openfga/sdk/api/model/TupleKey.java diff --git a/README.md b/README.md index 51a10cc9..9a829ee1 100644 --- a/README.md +++ b/README.md @@ -1185,6 +1185,7 @@ try { | [**readAuthorizationModel**](docs/OpenFgaApi.md#readauthorizationmodel) | **GET** /stores/{store_id}/authorization-models/{id} | Return a particular version of an authorization model | | [**readAuthorizationModels**](docs/OpenFgaApi.md#readauthorizationmodels) | **GET** /stores/{store_id}/authorization-models | Return all the authorization models for a particular store | | [**readChanges**](docs/OpenFgaApi.md#readchanges) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | +| [**streamedListObjects**](docs/OpenFgaApi.md#streamedlistobjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | | [**write**](docs/OpenFgaApi.md#write) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeAssertions**](docs/OpenFgaApi.md#writeassertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID | | [**writeAuthorizationModel**](docs/OpenFgaApi.md#writeauthorizationmodel) | **POST** /stores/{store_id}/authorization-models | Create a new authorization model | @@ -1310,6 +1311,10 @@ try { - [Store](https://github.com/openfga/java-sdk/blob/main/docs/Store.md) +- [StreamResultOfStreamedListObjectsResponse](https://github.com/openfga/java-sdk/blob/main/docs/StreamResultOfStreamedListObjectsResponse.md) + +- [StreamedListObjectsResponse](https://github.com/openfga/java-sdk/blob/main/docs/StreamedListObjectsResponse.md) + - [Tuple](https://github.com/openfga/java-sdk/blob/main/docs/Tuple.md) - [TupleChange](https://github.com/openfga/java-sdk/blob/main/docs/TupleChange.md) diff --git a/docs/OpenFgaApi.md b/docs/OpenFgaApi.md index 80986b1c..09d0f1fb 100644 --- a/docs/OpenFgaApi.md +++ b/docs/OpenFgaApi.md @@ -32,6 +32,8 @@ All URIs are relative to *http://localhost* | [**readAuthorizationModelsWithHttpInfo**](OpenFgaApi.md#readAuthorizationModelsWithHttpInfo) | **GET** /stores/{store_id}/authorization-models | Return all the authorization models for a particular store | | [**readChanges**](OpenFgaApi.md#readChanges) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | | [**readChangesWithHttpInfo**](OpenFgaApi.md#readChangesWithHttpInfo) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | +| [**streamedListObjects**](OpenFgaApi.md#streamedListObjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | +| [**streamedListObjectsWithHttpInfo**](OpenFgaApi.md#streamedListObjectsWithHttpInfo) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | | [**write**](OpenFgaApi.md#write) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeWithHttpInfo**](OpenFgaApi.md#writeWithHttpInfo) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeAssertions**](OpenFgaApi.md#writeAssertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID | @@ -2301,6 +2303,167 @@ No authorization required | **500** | Request failed due to internal server error. | - | +## streamedListObjects + +> CompletableFuture streamedListObjects(storeId, body) + +Stream all objects of the given type that the user has a relation with + +The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + +### Example + +```java +// Import classes: +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.client.ApiException; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.client.models.*; +import dev.openfga.sdk.api.OpenFgaApi; +import java.util.concurrent.CompletableFuture; + +public class Example { + public static void main(String[] args) { + ApiClient defaultClient = Configuration.getDefaultApiClient(); + defaultClient.setBasePath("http://localhost"); + + OpenFgaApi apiInstance = new OpenFgaApi(defaultClient); + String storeId = "storeId_example"; // String | + ListObjectsRequest body = new ListObjectsRequest(); // ListObjectsRequest | + try { + CompletableFuture result = apiInstance.streamedListObjects(storeId, body); + System.out.println(result.get()); + } catch (ApiException e) { + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + e.getCode()); + System.err.println("Reason: " + e.getResponseBody()); + System.err.println("Response headers: " + e.getResponseHeaders()); + e.printStackTrace(); + } + } +} +``` + +### Parameters + + +| Name | Type | Description | Notes | +|------------- | ------------- | ------------- | -------------| +| **storeId** | **String**| | | +| **body** | [**ListObjectsRequest**](ListObjectsRequest.md)| | | + +### Return type + +CompletableFuture<[**StreamResultOfStreamedListObjectsResponse**](StreamResultOfStreamedListObjectsResponse.md)> + + +### Authorization + +No authorization required + +### HTTP request headers + +- **Content-Type**: application/json +- **Accept**: application/json + +### HTTP response details +| Status code | Description | Response headers | +|-------------|-------------|------------------| +| **200** | A successful response.(streaming responses) | - | +| **400** | Request failed due to invalid input. | - | +| **401** | Not authenticated. | - | +| **403** | Forbidden. | - | +| **404** | Request failed due to incorrect path. | - | +| **409** | Request was aborted due a transaction conflict. | - | +| **422** | Request timed out due to excessive request throttling. | - | +| **500** | Request failed due to internal server error. | - | + +## streamedListObjectsWithHttpInfo + +> CompletableFuture> streamedListObjects streamedListObjectsWithHttpInfo(storeId, body) + +Stream all objects of the given type that the user has a relation with + +The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + +### Example + +```java +// Import classes: +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.client.ApiException; +import dev.openfga.sdk.api.client.ApiResponse; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.client.models.*; +import dev.openfga.sdk.api.OpenFgaApi; +import java.util.concurrent.CompletableFuture; + +public class Example { + public static void main(String[] args) { + ApiClient defaultClient = Configuration.getDefaultApiClient(); + defaultClient.setBasePath("http://localhost"); + + OpenFgaApi apiInstance = new OpenFgaApi(defaultClient); + String storeId = "storeId_example"; // String | + ListObjectsRequest body = new ListObjectsRequest(); // ListObjectsRequest | + try { + CompletableFuture> response = apiInstance.streamedListObjectsWithHttpInfo(storeId, body); + System.out.println("Status code: " + response.get().getStatusCode()); + System.out.println("Response headers: " + response.get().getHeaders()); + System.out.println("Response body: " + response.get().getData()); + } catch (InterruptedException | ExecutionException e) { + ApiException apiException = (ApiException)e.getCause(); + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + apiException.getCode()); + System.err.println("Response headers: " + apiException.getResponseHeaders()); + System.err.println("Reason: " + apiException.getResponseBody()); + e.printStackTrace(); + } catch (ApiException e) { + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + e.getCode()); + System.err.println("Response headers: " + e.getResponseHeaders()); + System.err.println("Reason: " + e.getResponseBody()); + e.printStackTrace(); + } + } +} +``` + +### Parameters + + +| Name | Type | Description | Notes | +|------------- | ------------- | ------------- | -------------| +| **storeId** | **String**| | | +| **body** | [**ListObjectsRequest**](ListObjectsRequest.md)| | | + +### Return type + +CompletableFuture> + + +### Authorization + +No authorization required + +### HTTP request headers + +- **Content-Type**: application/json +- **Accept**: application/json + +### HTTP response details +| Status code | Description | Response headers | +|-------------|-------------|------------------| +| **200** | A successful response.(streaming responses) | - | +| **400** | Request failed due to invalid input. | - | +| **401** | Not authenticated. | - | +| **403** | Forbidden. | - | +| **404** | Request failed due to incorrect path. | - | +| **409** | Request was aborted due a transaction conflict. | - | +| **422** | Request timed out due to excessive request throttling. | - | +| **500** | Request failed due to internal server error. | - | + + ## write > CompletableFuture write(storeId, body) diff --git a/docs/StreamResultOfStreamedListObjectsResponse.md b/docs/StreamResultOfStreamedListObjectsResponse.md new file mode 100644 index 00000000..af23d053 --- /dev/null +++ b/docs/StreamResultOfStreamedListObjectsResponse.md @@ -0,0 +1,14 @@ + + +# StreamResultOfStreamedListObjectsResponse + + +## Properties + +| Name | Type | Description | Notes | +|------------ | ------------- | ------------- | -------------| +|**result** | [**StreamedListObjectsResponse**](StreamedListObjectsResponse.md) | | [optional] | +|**error** | [**Status**](Status.md) | | [optional] | + + + diff --git a/docs/StreamedListObjectsResponse.md b/docs/StreamedListObjectsResponse.md new file mode 100644 index 00000000..04b00157 --- /dev/null +++ b/docs/StreamedListObjectsResponse.md @@ -0,0 +1,14 @@ + + +# StreamedListObjectsResponse + +The response for a StreamedListObjects RPC. + +## Properties + +| Name | Type | Description | Notes | +|------------ | ------------- | ------------- | -------------| +|**_object** | **String** | | | + + + diff --git a/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java b/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java index d0589ca0..0ea565a2 100644 --- a/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java +++ b/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java @@ -38,6 +38,7 @@ import dev.openfga.sdk.api.model.ReadChangesResponse; import dev.openfga.sdk.api.model.ReadRequest; import dev.openfga.sdk.api.model.ReadResponse; +import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse; import dev.openfga.sdk.api.model.WriteAssertionsRequest; import dev.openfga.sdk.api.model.WriteAuthorizationModelRequest; import dev.openfga.sdk.api.model.WriteAuthorizationModelResponse; @@ -906,6 +907,68 @@ private CompletableFuture> readChanges( } } + /** + * Stream all objects of the given type that the user has a relation with + * The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @param storeId (required) + * @param body (required) + * @return CompletableFuture<ApiResponse<StreamResultOfStreamedListObjectsResponse>> + * @throws ApiException if fails to make API call + */ + public CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body) throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, this.configuration); + } + + /** + * Stream all objects of the given type that the user has a relation with + * The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @param storeId (required) + * @param body (required) + * @param configurationOverride Override the {@link Configuration} this OpenFgaApi was constructed with + * @return CompletableFuture<ApiResponse<StreamResultOfStreamedListObjectsResponse>> + * @throws ApiException if fails to make API call + */ + public CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body, ConfigurationOverride configurationOverride) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, this.configuration.override(configurationOverride)); + } + + private CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body, Configuration configuration) + throws ApiException, FgaInvalidParameterException { + + assertParamExists(storeId, "storeId", "streamedListObjects"); + + assertParamExists(body, "body", "streamedListObjects"); + + String path = "/stores/{store_id}/streamed-list-objects" + .replace("{store_id}", StringUtil.urlEncode(storeId.toString())); + + Map methodParameters = new HashMap<>(); + methodParameters.put("storeId", storeId); + methodParameters.put("body", body); + + Map telemetryAttributes = buildTelemetryAttributes(methodParameters); + + telemetryAttributes.put(Attributes.FGA_CLIENT_REQUEST_METHOD, "StreamedListObjects"); + + try { + HttpRequest request = buildHttpRequest("POST", path, body, configuration); + return new HttpRequestAttempt<>( + request, + "streamedListObjects", + StreamResultOfStreamedListObjectsResponse.class, + apiClient, + configuration) + .addTelemetryAttributes(telemetryAttributes) + .attemptHttpRequest(); + } catch (ApiException e) { + return CompletableFuture.failedFuture(e); + } + } + /** * Add or delete tuples from the store * The Write API will transactionally update the tuples for a certain store. Tuples and type definitions allow OpenFGA to determine whether a relationship exists between an object and an user. In the body, `writes` adds new tuples and `deletes` removes existing tuples. When deleting a tuple, any `condition` specified with it is ignored. The API is not idempotent by default: if, later on, you try to add the same tuple key (even if the `condition` is different), or if you try to delete a non-existing tuple, it will throw an error. To allow writes when an identical tuple already exists in the database, set `\"on_duplicate\": \"ignore\"` on the `writes` object. To allow deletes when a tuple was already removed from the database, set `\"on_missing\": \"ignore\"` on the `deletes` object. If a Write request contains both idempotent (ignore) and non-idempotent (error) operations, the most restrictive action (error) will take precedence. If a condition fails for a sub-request with an error flag, the entire transaction will be rolled back. This gives developers explicit control over the atomicity of the requests. The API will not allow you to write tuples such as `document:2021-budget#viewer@document:2021-budget#viewer`, because they are implicit. An `authorization_model_id` may be specified in the body. If it is, it will be used to assert that each written tuple (not deleted) is valid for the model specified. If it is not specified, the latest authorization model ID will be used. ## Example ### Adding relationships To add `user:anne` as a `writer` for `document:2021-budget`, call write API with the following ```json { \"writes\": { \"tuple_keys\": [ { \"user\": \"user:anne\", \"relation\": \"writer\", \"object\": \"document:2021-budget\" } ], \"on_duplicate\": \"ignore\" }, \"authorization_model_id\": \"01G50QVV17PECNVAHX1GG4Y5NC\" } ``` ### Removing relationships To remove `user:bob` as a `reader` for `document:2021-budget`, call write API with the following ```json { \"deletes\": { \"tuple_keys\": [ { \"user\": \"user:bob\", \"relation\": \"reader\", \"object\": \"document:2021-budget\" } ], \"on_missing\": \"ignore\" } } ``` diff --git a/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java b/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java new file mode 100644 index 00000000..20331735 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java @@ -0,0 +1,168 @@ +/* + * OpenFGA + * A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar. + * + * The version of the OpenAPI document: 1.x + * Contact: community@openfga.dev + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +package dev.openfga.sdk.api.model; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.util.Objects; +import java.util.StringJoiner; + +/** + * StreamResultOfStreamedListObjectsResponse + */ +@JsonPropertyOrder({ + StreamResultOfStreamedListObjectsResponse.JSON_PROPERTY_RESULT, + StreamResultOfStreamedListObjectsResponse.JSON_PROPERTY_ERROR +}) +public class StreamResultOfStreamedListObjectsResponse { + public static final String JSON_PROPERTY_RESULT = "result"; + private StreamedListObjectsResponse result; + + public static final String JSON_PROPERTY_ERROR = "error"; + private Status error; + + public StreamResultOfStreamedListObjectsResponse() {} + + public StreamResultOfStreamedListObjectsResponse result(StreamedListObjectsResponse result) { + this.result = result; + return this; + } + + /** + * Get result + * @return result + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public StreamedListObjectsResponse getResult() { + return result; + } + + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setResult(StreamedListObjectsResponse result) { + this.result = result; + } + + public StreamResultOfStreamedListObjectsResponse error(Status error) { + this.error = error; + return this; + } + + /** + * Get error + * @return error + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public Status getError() { + return error; + } + + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setError(Status error) { + this.error = error; + } + + /** + * Return true if this Stream_result_of_StreamedListObjectsResponse object is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StreamResultOfStreamedListObjectsResponse streamResultOfStreamedListObjectsResponse = + (StreamResultOfStreamedListObjectsResponse) o; + return Objects.equals(this.result, streamResultOfStreamedListObjectsResponse.result) + && Objects.equals(this.error, streamResultOfStreamedListObjectsResponse.error); + } + + @Override + public int hashCode() { + return Objects.hash(result, error); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class StreamResultOfStreamedListObjectsResponse {\n"); + sb.append(" result: ").append(toIndentedString(result)).append("\n"); + sb.append(" error: ").append(toIndentedString(error)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + + /** + * Convert the instance into URL query string. + * + * @return URL query string + */ + public String toUrlQueryString() { + return toUrlQueryString(null); + } + + /** + * Convert the instance into URL query string. + * + * @param prefix prefix of the query string + * @return URL query string + */ + public String toUrlQueryString(String prefix) { + String suffix = ""; + String containerSuffix = ""; + String containerPrefix = ""; + if (prefix == null) { + // style=form, explode=true, e.g. /pet?name=cat&type=manx + prefix = ""; + } else { + // deepObject style e.g. /pet?id[name]=cat&id[type]=manx + prefix = prefix + "["; + suffix = "]"; + containerSuffix = "]"; + containerPrefix = "["; + } + + StringJoiner joiner = new StringJoiner("&"); + + // add `result` to the URL query string + if (getResult() != null) { + joiner.add(getResult().toUrlQueryString(prefix + "result" + suffix)); + } + + // add `error` to the URL query string + if (getError() != null) { + joiner.add(getError().toUrlQueryString(prefix + "error" + suffix)); + } + + return joiner.toString(); + } +} diff --git a/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java b/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java new file mode 100644 index 00000000..43c02ab5 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java @@ -0,0 +1,139 @@ +/* + * OpenFGA + * A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar. + * + * The version of the OpenAPI document: 1.x + * Contact: community@openfga.dev + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +package dev.openfga.sdk.api.model; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.StringJoiner; + +/** + * The response for a StreamedListObjects RPC. + */ +@JsonPropertyOrder({StreamedListObjectsResponse.JSON_PROPERTY_OBJECT}) +public class StreamedListObjectsResponse { + public static final String JSON_PROPERTY_OBJECT = "object"; + private String _object; + + public StreamedListObjectsResponse() {} + + public StreamedListObjectsResponse _object(String _object) { + this._object = _object; + return this; + } + + /** + * Get _object + * @return _object + **/ + @javax.annotation.Nonnull + @JsonProperty(JSON_PROPERTY_OBJECT) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public String getObject() { + return _object; + } + + @JsonProperty(JSON_PROPERTY_OBJECT) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public void setObject(String _object) { + this._object = _object; + } + + /** + * Return true if this StreamedListObjectsResponse object is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StreamedListObjectsResponse streamedListObjectsResponse = (StreamedListObjectsResponse) o; + return Objects.equals(this._object, streamedListObjectsResponse._object); + } + + @Override + public int hashCode() { + return Objects.hash(_object); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class StreamedListObjectsResponse {\n"); + sb.append(" _object: ").append(toIndentedString(_object)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + + /** + * Convert the instance into URL query string. + * + * @return URL query string + */ + public String toUrlQueryString() { + return toUrlQueryString(null); + } + + /** + * Convert the instance into URL query string. + * + * @param prefix prefix of the query string + * @return URL query string + */ + public String toUrlQueryString(String prefix) { + String suffix = ""; + String containerSuffix = ""; + String containerPrefix = ""; + if (prefix == null) { + // style=form, explode=true, e.g. /pet?name=cat&type=manx + prefix = ""; + } else { + // deepObject style e.g. /pet?id[name]=cat&id[type]=manx + prefix = prefix + "["; + suffix = "]"; + containerSuffix = "]"; + containerPrefix = "["; + } + + StringJoiner joiner = new StringJoiner("&"); + + // add `object` to the URL query string + if (getObject() != null) { + joiner.add(String.format( + "%sobject%s=%s", + prefix, + suffix, + URLEncoder.encode(String.valueOf(getObject()), StandardCharsets.UTF_8) + .replaceAll("\\+", "%20"))); + } + + return joiner.toString(); + } +} From 07adcf5e2ad6dba74e5fae94227317b95f730ce9 Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Wed, 5 Nov 2025 22:07:17 +0530 Subject: [PATCH 02/11] feat: add OpenAPI generated model and api layer --- .openapi-generator/FILES | 4 + README.md | 5 + docs/OpenFgaApi.md | 163 +++++++++++++++++ ...reamResultOfStreamedListObjectsResponse.md | 14 ++ docs/StreamedListObjectsResponse.md | 14 ++ .../java/dev/openfga/sdk/api/OpenFgaApi.java | 63 +++++++ ...amResultOfStreamedListObjectsResponse.java | 168 ++++++++++++++++++ .../model/StreamedListObjectsResponse.java | 139 +++++++++++++++ 8 files changed, 570 insertions(+) create mode 100644 docs/StreamResultOfStreamedListObjectsResponse.md create mode 100644 docs/StreamedListObjectsResponse.md create mode 100644 src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java create mode 100644 src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java diff --git a/.openapi-generator/FILES b/.openapi-generator/FILES index ddc3f045..9290b751 100644 --- a/.openapi-generator/FILES +++ b/.openapi-generator/FILES @@ -67,6 +67,8 @@ docs/RelationshipCondition.md docs/SourceInfo.md docs/Status.md docs/Store.md +docs/StreamResultOfStreamedListObjectsResponse.md +docs/StreamedListObjectsResponse.md docs/Tuple.md docs/TupleChange.md docs/TupleKey.md @@ -155,6 +157,8 @@ src/main/java/dev/openfga/sdk/api/model/RelationshipCondition.java src/main/java/dev/openfga/sdk/api/model/SourceInfo.java src/main/java/dev/openfga/sdk/api/model/Status.java src/main/java/dev/openfga/sdk/api/model/Store.java +src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java +src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java src/main/java/dev/openfga/sdk/api/model/Tuple.java src/main/java/dev/openfga/sdk/api/model/TupleChange.java src/main/java/dev/openfga/sdk/api/model/TupleKey.java diff --git a/README.md b/README.md index 51a10cc9..9a829ee1 100644 --- a/README.md +++ b/README.md @@ -1185,6 +1185,7 @@ try { | [**readAuthorizationModel**](docs/OpenFgaApi.md#readauthorizationmodel) | **GET** /stores/{store_id}/authorization-models/{id} | Return a particular version of an authorization model | | [**readAuthorizationModels**](docs/OpenFgaApi.md#readauthorizationmodels) | **GET** /stores/{store_id}/authorization-models | Return all the authorization models for a particular store | | [**readChanges**](docs/OpenFgaApi.md#readchanges) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | +| [**streamedListObjects**](docs/OpenFgaApi.md#streamedlistobjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | | [**write**](docs/OpenFgaApi.md#write) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeAssertions**](docs/OpenFgaApi.md#writeassertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID | | [**writeAuthorizationModel**](docs/OpenFgaApi.md#writeauthorizationmodel) | **POST** /stores/{store_id}/authorization-models | Create a new authorization model | @@ -1310,6 +1311,10 @@ try { - [Store](https://github.com/openfga/java-sdk/blob/main/docs/Store.md) +- [StreamResultOfStreamedListObjectsResponse](https://github.com/openfga/java-sdk/blob/main/docs/StreamResultOfStreamedListObjectsResponse.md) + +- [StreamedListObjectsResponse](https://github.com/openfga/java-sdk/blob/main/docs/StreamedListObjectsResponse.md) + - [Tuple](https://github.com/openfga/java-sdk/blob/main/docs/Tuple.md) - [TupleChange](https://github.com/openfga/java-sdk/blob/main/docs/TupleChange.md) diff --git a/docs/OpenFgaApi.md b/docs/OpenFgaApi.md index 80986b1c..09d0f1fb 100644 --- a/docs/OpenFgaApi.md +++ b/docs/OpenFgaApi.md @@ -32,6 +32,8 @@ All URIs are relative to *http://localhost* | [**readAuthorizationModelsWithHttpInfo**](OpenFgaApi.md#readAuthorizationModelsWithHttpInfo) | **GET** /stores/{store_id}/authorization-models | Return all the authorization models for a particular store | | [**readChanges**](OpenFgaApi.md#readChanges) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | | [**readChangesWithHttpInfo**](OpenFgaApi.md#readChangesWithHttpInfo) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | +| [**streamedListObjects**](OpenFgaApi.md#streamedListObjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | +| [**streamedListObjectsWithHttpInfo**](OpenFgaApi.md#streamedListObjectsWithHttpInfo) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | | [**write**](OpenFgaApi.md#write) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeWithHttpInfo**](OpenFgaApi.md#writeWithHttpInfo) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeAssertions**](OpenFgaApi.md#writeAssertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID | @@ -2301,6 +2303,167 @@ No authorization required | **500** | Request failed due to internal server error. | - | +## streamedListObjects + +> CompletableFuture streamedListObjects(storeId, body) + +Stream all objects of the given type that the user has a relation with + +The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + +### Example + +```java +// Import classes: +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.client.ApiException; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.client.models.*; +import dev.openfga.sdk.api.OpenFgaApi; +import java.util.concurrent.CompletableFuture; + +public class Example { + public static void main(String[] args) { + ApiClient defaultClient = Configuration.getDefaultApiClient(); + defaultClient.setBasePath("http://localhost"); + + OpenFgaApi apiInstance = new OpenFgaApi(defaultClient); + String storeId = "storeId_example"; // String | + ListObjectsRequest body = new ListObjectsRequest(); // ListObjectsRequest | + try { + CompletableFuture result = apiInstance.streamedListObjects(storeId, body); + System.out.println(result.get()); + } catch (ApiException e) { + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + e.getCode()); + System.err.println("Reason: " + e.getResponseBody()); + System.err.println("Response headers: " + e.getResponseHeaders()); + e.printStackTrace(); + } + } +} +``` + +### Parameters + + +| Name | Type | Description | Notes | +|------------- | ------------- | ------------- | -------------| +| **storeId** | **String**| | | +| **body** | [**ListObjectsRequest**](ListObjectsRequest.md)| | | + +### Return type + +CompletableFuture<[**StreamResultOfStreamedListObjectsResponse**](StreamResultOfStreamedListObjectsResponse.md)> + + +### Authorization + +No authorization required + +### HTTP request headers + +- **Content-Type**: application/json +- **Accept**: application/json + +### HTTP response details +| Status code | Description | Response headers | +|-------------|-------------|------------------| +| **200** | A successful response.(streaming responses) | - | +| **400** | Request failed due to invalid input. | - | +| **401** | Not authenticated. | - | +| **403** | Forbidden. | - | +| **404** | Request failed due to incorrect path. | - | +| **409** | Request was aborted due a transaction conflict. | - | +| **422** | Request timed out due to excessive request throttling. | - | +| **500** | Request failed due to internal server error. | - | + +## streamedListObjectsWithHttpInfo + +> CompletableFuture> streamedListObjects streamedListObjectsWithHttpInfo(storeId, body) + +Stream all objects of the given type that the user has a relation with + +The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + +### Example + +```java +// Import classes: +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.client.ApiException; +import dev.openfga.sdk.api.client.ApiResponse; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.client.models.*; +import dev.openfga.sdk.api.OpenFgaApi; +import java.util.concurrent.CompletableFuture; + +public class Example { + public static void main(String[] args) { + ApiClient defaultClient = Configuration.getDefaultApiClient(); + defaultClient.setBasePath("http://localhost"); + + OpenFgaApi apiInstance = new OpenFgaApi(defaultClient); + String storeId = "storeId_example"; // String | + ListObjectsRequest body = new ListObjectsRequest(); // ListObjectsRequest | + try { + CompletableFuture> response = apiInstance.streamedListObjectsWithHttpInfo(storeId, body); + System.out.println("Status code: " + response.get().getStatusCode()); + System.out.println("Response headers: " + response.get().getHeaders()); + System.out.println("Response body: " + response.get().getData()); + } catch (InterruptedException | ExecutionException e) { + ApiException apiException = (ApiException)e.getCause(); + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + apiException.getCode()); + System.err.println("Response headers: " + apiException.getResponseHeaders()); + System.err.println("Reason: " + apiException.getResponseBody()); + e.printStackTrace(); + } catch (ApiException e) { + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + e.getCode()); + System.err.println("Response headers: " + e.getResponseHeaders()); + System.err.println("Reason: " + e.getResponseBody()); + e.printStackTrace(); + } + } +} +``` + +### Parameters + + +| Name | Type | Description | Notes | +|------------- | ------------- | ------------- | -------------| +| **storeId** | **String**| | | +| **body** | [**ListObjectsRequest**](ListObjectsRequest.md)| | | + +### Return type + +CompletableFuture> + + +### Authorization + +No authorization required + +### HTTP request headers + +- **Content-Type**: application/json +- **Accept**: application/json + +### HTTP response details +| Status code | Description | Response headers | +|-------------|-------------|------------------| +| **200** | A successful response.(streaming responses) | - | +| **400** | Request failed due to invalid input. | - | +| **401** | Not authenticated. | - | +| **403** | Forbidden. | - | +| **404** | Request failed due to incorrect path. | - | +| **409** | Request was aborted due a transaction conflict. | - | +| **422** | Request timed out due to excessive request throttling. | - | +| **500** | Request failed due to internal server error. | - | + + ## write > CompletableFuture write(storeId, body) diff --git a/docs/StreamResultOfStreamedListObjectsResponse.md b/docs/StreamResultOfStreamedListObjectsResponse.md new file mode 100644 index 00000000..af23d053 --- /dev/null +++ b/docs/StreamResultOfStreamedListObjectsResponse.md @@ -0,0 +1,14 @@ + + +# StreamResultOfStreamedListObjectsResponse + + +## Properties + +| Name | Type | Description | Notes | +|------------ | ------------- | ------------- | -------------| +|**result** | [**StreamedListObjectsResponse**](StreamedListObjectsResponse.md) | | [optional] | +|**error** | [**Status**](Status.md) | | [optional] | + + + diff --git a/docs/StreamedListObjectsResponse.md b/docs/StreamedListObjectsResponse.md new file mode 100644 index 00000000..04b00157 --- /dev/null +++ b/docs/StreamedListObjectsResponse.md @@ -0,0 +1,14 @@ + + +# StreamedListObjectsResponse + +The response for a StreamedListObjects RPC. + +## Properties + +| Name | Type | Description | Notes | +|------------ | ------------- | ------------- | -------------| +|**_object** | **String** | | | + + + diff --git a/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java b/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java index d0589ca0..0ea565a2 100644 --- a/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java +++ b/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java @@ -38,6 +38,7 @@ import dev.openfga.sdk.api.model.ReadChangesResponse; import dev.openfga.sdk.api.model.ReadRequest; import dev.openfga.sdk.api.model.ReadResponse; +import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse; import dev.openfga.sdk.api.model.WriteAssertionsRequest; import dev.openfga.sdk.api.model.WriteAuthorizationModelRequest; import dev.openfga.sdk.api.model.WriteAuthorizationModelResponse; @@ -906,6 +907,68 @@ private CompletableFuture> readChanges( } } + /** + * Stream all objects of the given type that the user has a relation with + * The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @param storeId (required) + * @param body (required) + * @return CompletableFuture<ApiResponse<StreamResultOfStreamedListObjectsResponse>> + * @throws ApiException if fails to make API call + */ + public CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body) throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, this.configuration); + } + + /** + * Stream all objects of the given type that the user has a relation with + * The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @param storeId (required) + * @param body (required) + * @param configurationOverride Override the {@link Configuration} this OpenFgaApi was constructed with + * @return CompletableFuture<ApiResponse<StreamResultOfStreamedListObjectsResponse>> + * @throws ApiException if fails to make API call + */ + public CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body, ConfigurationOverride configurationOverride) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, this.configuration.override(configurationOverride)); + } + + private CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body, Configuration configuration) + throws ApiException, FgaInvalidParameterException { + + assertParamExists(storeId, "storeId", "streamedListObjects"); + + assertParamExists(body, "body", "streamedListObjects"); + + String path = "/stores/{store_id}/streamed-list-objects" + .replace("{store_id}", StringUtil.urlEncode(storeId.toString())); + + Map methodParameters = new HashMap<>(); + methodParameters.put("storeId", storeId); + methodParameters.put("body", body); + + Map telemetryAttributes = buildTelemetryAttributes(methodParameters); + + telemetryAttributes.put(Attributes.FGA_CLIENT_REQUEST_METHOD, "StreamedListObjects"); + + try { + HttpRequest request = buildHttpRequest("POST", path, body, configuration); + return new HttpRequestAttempt<>( + request, + "streamedListObjects", + StreamResultOfStreamedListObjectsResponse.class, + apiClient, + configuration) + .addTelemetryAttributes(telemetryAttributes) + .attemptHttpRequest(); + } catch (ApiException e) { + return CompletableFuture.failedFuture(e); + } + } + /** * Add or delete tuples from the store * The Write API will transactionally update the tuples for a certain store. Tuples and type definitions allow OpenFGA to determine whether a relationship exists between an object and an user. In the body, `writes` adds new tuples and `deletes` removes existing tuples. When deleting a tuple, any `condition` specified with it is ignored. The API is not idempotent by default: if, later on, you try to add the same tuple key (even if the `condition` is different), or if you try to delete a non-existing tuple, it will throw an error. To allow writes when an identical tuple already exists in the database, set `\"on_duplicate\": \"ignore\"` on the `writes` object. To allow deletes when a tuple was already removed from the database, set `\"on_missing\": \"ignore\"` on the `deletes` object. If a Write request contains both idempotent (ignore) and non-idempotent (error) operations, the most restrictive action (error) will take precedence. If a condition fails for a sub-request with an error flag, the entire transaction will be rolled back. This gives developers explicit control over the atomicity of the requests. The API will not allow you to write tuples such as `document:2021-budget#viewer@document:2021-budget#viewer`, because they are implicit. An `authorization_model_id` may be specified in the body. If it is, it will be used to assert that each written tuple (not deleted) is valid for the model specified. If it is not specified, the latest authorization model ID will be used. ## Example ### Adding relationships To add `user:anne` as a `writer` for `document:2021-budget`, call write API with the following ```json { \"writes\": { \"tuple_keys\": [ { \"user\": \"user:anne\", \"relation\": \"writer\", \"object\": \"document:2021-budget\" } ], \"on_duplicate\": \"ignore\" }, \"authorization_model_id\": \"01G50QVV17PECNVAHX1GG4Y5NC\" } ``` ### Removing relationships To remove `user:bob` as a `reader` for `document:2021-budget`, call write API with the following ```json { \"deletes\": { \"tuple_keys\": [ { \"user\": \"user:bob\", \"relation\": \"reader\", \"object\": \"document:2021-budget\" } ], \"on_missing\": \"ignore\" } } ``` diff --git a/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java b/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java new file mode 100644 index 00000000..20331735 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java @@ -0,0 +1,168 @@ +/* + * OpenFGA + * A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar. + * + * The version of the OpenAPI document: 1.x + * Contact: community@openfga.dev + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +package dev.openfga.sdk.api.model; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.util.Objects; +import java.util.StringJoiner; + +/** + * StreamResultOfStreamedListObjectsResponse + */ +@JsonPropertyOrder({ + StreamResultOfStreamedListObjectsResponse.JSON_PROPERTY_RESULT, + StreamResultOfStreamedListObjectsResponse.JSON_PROPERTY_ERROR +}) +public class StreamResultOfStreamedListObjectsResponse { + public static final String JSON_PROPERTY_RESULT = "result"; + private StreamedListObjectsResponse result; + + public static final String JSON_PROPERTY_ERROR = "error"; + private Status error; + + public StreamResultOfStreamedListObjectsResponse() {} + + public StreamResultOfStreamedListObjectsResponse result(StreamedListObjectsResponse result) { + this.result = result; + return this; + } + + /** + * Get result + * @return result + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public StreamedListObjectsResponse getResult() { + return result; + } + + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setResult(StreamedListObjectsResponse result) { + this.result = result; + } + + public StreamResultOfStreamedListObjectsResponse error(Status error) { + this.error = error; + return this; + } + + /** + * Get error + * @return error + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public Status getError() { + return error; + } + + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setError(Status error) { + this.error = error; + } + + /** + * Return true if this Stream_result_of_StreamedListObjectsResponse object is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StreamResultOfStreamedListObjectsResponse streamResultOfStreamedListObjectsResponse = + (StreamResultOfStreamedListObjectsResponse) o; + return Objects.equals(this.result, streamResultOfStreamedListObjectsResponse.result) + && Objects.equals(this.error, streamResultOfStreamedListObjectsResponse.error); + } + + @Override + public int hashCode() { + return Objects.hash(result, error); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class StreamResultOfStreamedListObjectsResponse {\n"); + sb.append(" result: ").append(toIndentedString(result)).append("\n"); + sb.append(" error: ").append(toIndentedString(error)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + + /** + * Convert the instance into URL query string. + * + * @return URL query string + */ + public String toUrlQueryString() { + return toUrlQueryString(null); + } + + /** + * Convert the instance into URL query string. + * + * @param prefix prefix of the query string + * @return URL query string + */ + public String toUrlQueryString(String prefix) { + String suffix = ""; + String containerSuffix = ""; + String containerPrefix = ""; + if (prefix == null) { + // style=form, explode=true, e.g. /pet?name=cat&type=manx + prefix = ""; + } else { + // deepObject style e.g. /pet?id[name]=cat&id[type]=manx + prefix = prefix + "["; + suffix = "]"; + containerSuffix = "]"; + containerPrefix = "["; + } + + StringJoiner joiner = new StringJoiner("&"); + + // add `result` to the URL query string + if (getResult() != null) { + joiner.add(getResult().toUrlQueryString(prefix + "result" + suffix)); + } + + // add `error` to the URL query string + if (getError() != null) { + joiner.add(getError().toUrlQueryString(prefix + "error" + suffix)); + } + + return joiner.toString(); + } +} diff --git a/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java b/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java new file mode 100644 index 00000000..43c02ab5 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java @@ -0,0 +1,139 @@ +/* + * OpenFGA + * A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar. + * + * The version of the OpenAPI document: 1.x + * Contact: community@openfga.dev + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +package dev.openfga.sdk.api.model; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.StringJoiner; + +/** + * The response for a StreamedListObjects RPC. + */ +@JsonPropertyOrder({StreamedListObjectsResponse.JSON_PROPERTY_OBJECT}) +public class StreamedListObjectsResponse { + public static final String JSON_PROPERTY_OBJECT = "object"; + private String _object; + + public StreamedListObjectsResponse() {} + + public StreamedListObjectsResponse _object(String _object) { + this._object = _object; + return this; + } + + /** + * Get _object + * @return _object + **/ + @javax.annotation.Nonnull + @JsonProperty(JSON_PROPERTY_OBJECT) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public String getObject() { + return _object; + } + + @JsonProperty(JSON_PROPERTY_OBJECT) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public void setObject(String _object) { + this._object = _object; + } + + /** + * Return true if this StreamedListObjectsResponse object is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StreamedListObjectsResponse streamedListObjectsResponse = (StreamedListObjectsResponse) o; + return Objects.equals(this._object, streamedListObjectsResponse._object); + } + + @Override + public int hashCode() { + return Objects.hash(_object); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class StreamedListObjectsResponse {\n"); + sb.append(" _object: ").append(toIndentedString(_object)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + + /** + * Convert the instance into URL query string. + * + * @return URL query string + */ + public String toUrlQueryString() { + return toUrlQueryString(null); + } + + /** + * Convert the instance into URL query string. + * + * @param prefix prefix of the query string + * @return URL query string + */ + public String toUrlQueryString(String prefix) { + String suffix = ""; + String containerSuffix = ""; + String containerPrefix = ""; + if (prefix == null) { + // style=form, explode=true, e.g. /pet?name=cat&type=manx + prefix = ""; + } else { + // deepObject style e.g. /pet?id[name]=cat&id[type]=manx + prefix = prefix + "["; + suffix = "]"; + containerSuffix = "]"; + containerPrefix = "["; + } + + StringJoiner joiner = new StringJoiner("&"); + + // add `object` to the URL query string + if (getObject() != null) { + joiner.add(String.format( + "%sobject%s=%s", + prefix, + suffix, + URLEncoder.encode(String.valueOf(getObject()), StandardCharsets.UTF_8) + .replaceAll("\\+", "%20"))); + } + + return joiner.toString(); + } +} From 46980729ae486652872031d58be211c8bf900706 Mon Sep 17 00:00:00 2001 From: SoulPancake Date: Thu, 6 Nov 2025 01:40:43 +0530 Subject: [PATCH 03/11] feat: initialise consumer Callback replace streams --- examples/streamed-list-objects/Makefile | 16 + examples/streamed-list-objects/README.md | 82 +++++ examples/streamed-list-objects/build.gradle | 63 ++++ .../streamed-list-objects/gradle.properties | 1 + .../gradle/wrapper/gradle-wrapper.properties | 7 + examples/streamed-list-objects/gradlew | 248 +++++++++++++ examples/streamed-list-objects/gradlew.bat | 93 +++++ .../streamed-list-objects/settings.gradle | 1 + .../example/StreamedListObjectsExample.java | 341 ++++++++++++++++++ 9 files changed, 852 insertions(+) create mode 100644 examples/streamed-list-objects/Makefile create mode 100644 examples/streamed-list-objects/README.md create mode 100644 examples/streamed-list-objects/build.gradle create mode 100644 examples/streamed-list-objects/gradle.properties create mode 100644 examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties create mode 100644 examples/streamed-list-objects/gradlew create mode 100644 examples/streamed-list-objects/gradlew.bat create mode 100644 examples/streamed-list-objects/settings.gradle create mode 100644 examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java diff --git a/examples/streamed-list-objects/Makefile b/examples/streamed-list-objects/Makefile new file mode 100644 index 00000000..5651d28d --- /dev/null +++ b/examples/streamed-list-objects/Makefile @@ -0,0 +1,16 @@ +.PHONY: build run run-openfga +all: build + +project_name=. +openfga_version=latest +language=java + +build: + ./gradlew -P language=$(language) build + +run: + ./gradlew -P language=$(language) run + +run-openfga: + docker pull docker.io/openfga/openfga:${openfga_version} && \ + docker run -p 8080:8080 docker.io/openfga/openfga:${openfga_version} run \ No newline at end of file diff --git a/examples/streamed-list-objects/README.md b/examples/streamed-list-objects/README.md new file mode 100644 index 00000000..2ea8a585 --- /dev/null +++ b/examples/streamed-list-objects/README.md @@ -0,0 +1,82 @@ +# Streamed List Objects example for OpenFGA's Java SDK + +This example demonstrates working with the `POST` `/stores/:id/streamed-list-objects` endpoint in OpenFGA using the Java SDK. + +## Prerequisites + +If you do not already have an OpenFGA instance running, you can start one using the following command: + +```bash +make run-openfga +``` + +Or directly with docker: + +```bash +docker run -d -p 8080:8080 openfga/openfga run +``` + +## Configure the example + +You may need to configure the example for your environment by setting environment variables: + +```bash +export FGA_API_URL=http://localhost:8080 +``` + +Optional authentication configuration: +```bash +export FGA_CLIENT_ID=your-client-id +export FGA_CLIENT_SECRET=your-client-secret +export FGA_API_AUDIENCE=your-api-audience +export FGA_API_TOKEN_ISSUER=your-token-issuer +``` + +## Running the example + +Build the project: + +```bash +make build +``` + +Run the example: + +```bash +make run +``` + +This will: +1. Create a temporary store +2. Create an authorization model +3. Write 100 mock tuples +4. Stream all objects using the `streamedListObjects` API +5. Display each object as it's received +6. Clean up the temporary store + +## What to expect + +The example will output each object as it's streamed from the server: + +``` +Created temporary store (01HXXX...) +Created temporary authorization model (01GXXX...) +Writing 100 mock tuples to store. +Listing objects using streaming endpoint: + document:0 + document:1 + document:2 + ... + document:99 +API returned 100 objects. +Deleted temporary store (01HXXX...) +Finished. +``` + +## Note + +The streaming API is particularly useful when dealing with large result sets, as it: +- Reduces memory usage by processing objects one at a time +- Provides faster time-to-first-result +- Allows for real-time processing of results +- Is only limited by execution timeout rather than result set size \ No newline at end of file diff --git a/examples/streamed-list-objects/build.gradle b/examples/streamed-list-objects/build.gradle new file mode 100644 index 00000000..82817b7e --- /dev/null +++ b/examples/streamed-list-objects/build.gradle @@ -0,0 +1,63 @@ +plugins { + id 'application' + id 'com.diffplug.spotless' version '8.0.0' +} + +application { + mainClass = 'dev.openfga.sdk.example.StreamedListObjectsExample' +} + +repositories { + mavenCentral() +} + +ext { + jacksonVersion = "2.18.2" +} + +dependencies { + // Use local build of SDK + implementation files('../../build/libs/openfga-sdk-0.9.2.jar') + + // Serialization + implementation("com.fasterxml.jackson.core:jackson-core:$jacksonVersion") + implementation("com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion") + implementation("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion") + implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jacksonVersion") + implementation("org.openapitools:jackson-databind-nullable:0.2.7") + + // OpenTelemetry (required by SDK) + implementation platform("io.opentelemetry:opentelemetry-bom:1.54.1") + implementation "io.opentelemetry:opentelemetry-api" + + // JSR305 (required by SDK) + implementation "com.google.code.findbugs:jsr305:3.0.2" +} + +// Use spotless plugin to automatically format code, remove unused import, etc +// To apply changes directly to the file, run `gradlew spotlessApply` +// Ref: https://github.com/diffplug/spotless/tree/main/plugin-gradle +spotless { + // comment out below to run spotless as part of the `check` task + enforceCheck false + format 'misc', { + // define the files (e.g. '*.gradle', '*.md') to apply `misc` to + target '.gitignore' + // define the steps to apply to those files + trimTrailingWhitespace() + indentWithSpaces() // Takes an integer argument if you don't like 4 + endWithNewline() + } + java { + palantirJavaFormat() + removeUnusedImports() + importOrder() + } +} + +// Use spotless plugin to automatically format code, remove unused import, etc +// To apply changes directly to the file, run `gradlew spotlessApply` +// Ref: https://github.com/diffplug/spotless/tree/main/plugin-gradle +tasks.register('fmt') { + dependsOn 'spotlessApply' +} \ No newline at end of file diff --git a/examples/streamed-list-objects/gradle.properties b/examples/streamed-list-objects/gradle.properties new file mode 100644 index 00000000..5f544a8e --- /dev/null +++ b/examples/streamed-list-objects/gradle.properties @@ -0,0 +1 @@ +language=java \ No newline at end of file diff --git a/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties b/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 00000000..dec8fcc6 --- /dev/null +++ b/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,7 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.0-bin.zip +networkTimeout=10000 +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists \ No newline at end of file diff --git a/examples/streamed-list-objects/gradlew b/examples/streamed-list-objects/gradlew new file mode 100644 index 00000000..1b05573b --- /dev/null +++ b/examples/streamed-list-objects/gradlew @@ -0,0 +1,248 @@ +#!/bin/sh + +# +# Copyright © 2015 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" \ No newline at end of file diff --git a/examples/streamed-list-objects/gradlew.bat b/examples/streamed-list-objects/gradlew.bat new file mode 100644 index 00000000..53e45d31 --- /dev/null +++ b/examples/streamed-list-objects/gradlew.bat @@ -0,0 +1,93 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem +@rem SPDX-License-Identifier: Apache-2.0 +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +goto fail + +:execute +@rem Setup the command line + + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega \ No newline at end of file diff --git a/examples/streamed-list-objects/settings.gradle b/examples/streamed-list-objects/settings.gradle new file mode 100644 index 00000000..764f4506 --- /dev/null +++ b/examples/streamed-list-objects/settings.gradle @@ -0,0 +1 @@ +rootProject.name = 'streamed-list-objects-example' \ No newline at end of file diff --git a/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java new file mode 100644 index 00000000..e713b6c6 --- /dev/null +++ b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java @@ -0,0 +1,341 @@ +package dev.openfga.sdk.example; + +import dev.openfga.sdk.api.client.OpenFgaClient; +import dev.openfga.sdk.api.client.model.ClientListObjectsRequest; +import dev.openfga.sdk.api.client.model.ClientTupleKey; +import dev.openfga.sdk.api.client.model.ClientWriteRequest; +import dev.openfga.sdk.api.configuration.ClientConfiguration; +import dev.openfga.sdk.api.configuration.ClientCredentials; +import dev.openfga.sdk.api.configuration.Credentials; +import dev.openfga.sdk.api.model.CreateStoreRequest; +import dev.openfga.sdk.api.model.WriteAuthorizationModelRequest; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class StreamedListObjectsExample { + public static void main(String[] args) throws Exception { + new StreamedListObjectsExample().run(); + } + + public void run() throws Exception { + // Configure the client + var credentials = new Credentials(); + if (System.getenv("FGA_CLIENT_ID") != null) { + credentials = new Credentials(new ClientCredentials() + .apiAudience(System.getenv("FGA_API_AUDIENCE")) + .apiTokenIssuer(System.getenv("FGA_API_TOKEN_ISSUER")) + .clientId(System.getenv("FGA_CLIENT_ID")) + .clientSecret(System.getenv("FGA_CLIENT_SECRET"))); + } else { + System.out.println("Proceeding with no credentials (expecting localhost)"); + } + + String apiUrl = System.getenv("FGA_API_URL"); + if (apiUrl == null || apiUrl.isEmpty()) { + apiUrl = "http://localhost:8080"; + } + + var configuration = new ClientConfiguration().apiUrl(apiUrl).credentials(credentials); + + var fgaClient = new OpenFgaClient(configuration); + + // Create a temporary store + var store = fgaClient + .createStore(new CreateStoreRequest().name("Test Store")) + .get(); + String storeId = store.getId(); + System.out.println("Created temporary store (" + storeId + ")"); + fgaClient.setStoreId(storeId); + + // Create an authorization model + var authModel = createAuthorizationModel(); + var model = fgaClient.writeAuthorizationModel(authModel).get(); + String modelId = model.getAuthorizationModelId(); + System.out.println("Created temporary authorization model (" + modelId + ")"); + + // Write 100 mock tuples + System.out.println("Writing 100 mock tuples to store."); + List writes = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + writes.add(new ClientTupleKey().user("user:anne").relation("owner")._object("document:" + i)); + } + + fgaClient.write(new ClientWriteRequest().writes(writes)).get(); + + // Stream objects + System.out.println("Listing objects using streaming endpoint:"); + List results = new ArrayList<>(); + AtomicInteger count = new AtomicInteger(0); + + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type("document") + .relation("owner") + .user("user:anne"); + + fgaClient + .streamedListObjects(request, object -> { + System.out.println(" " + object); + results.add(object); + count.incrementAndGet(); + }) + .thenRun(() -> { + System.out.println("Streaming complete!"); + System.out.println("API returned " + results.size() + " objects."); + }) + .get(); // Wait for completion + + System.out.println("All results processed."); + + // Clean up - delete the temporary store + try { + fgaClient.deleteStore().get(); + System.out.println("Deleted temporary store (" + storeId + ")"); + } catch (Exception e) { + System.err.println("Failed to delete store: " + e.getMessage()); + } + + System.out.println("Finished."); + } + + private WriteAuthorizationModelRequest createAuthorizationModel() { + // This is a simplified authorization model for the example + // In a real application, you would load this from a file or define it more comprehensively + String modelJson = + """ + { + "schema_version": "1.1", + "type_definitions": [ + { + "type": "user", + "relations": {} + }, + { + "type": "group", + "relations": { + "member": { + "this": {} + } + }, + "metadata": { + "relations": { + "member": { + "directly_related_user_types": [ + {"type": "user"} + ] + } + } + } + }, + { + "type": "folder", + "relations": { + "can_create_file": { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + "owner": { + "this": {} + }, + "parent": { + "this": {} + }, + "viewer": { + "union": { + "child": [ + { + "this": {} + }, + { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + { + "tupleToUserset": { + "tupleset": { + "object": "", + "relation": "parent" + }, + "computedUserset": { + "object": "", + "relation": "viewer" + } + } + } + ] + } + } + }, + "metadata": { + "relations": { + "can_create_file": { + "directly_related_user_types": [] + }, + "owner": { + "directly_related_user_types": [ + {"type": "user"} + ] + }, + "parent": { + "directly_related_user_types": [ + {"type": "folder"} + ] + }, + "viewer": { + "directly_related_user_types": [ + {"type": "user"}, + {"type": "user", "wildcard": {}}, + {"type": "group", "relation": "member"} + ] + } + } + } + }, + { + "type": "document", + "relations": { + "can_change_owner": { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + "owner": { + "this": {} + }, + "parent": { + "this": {} + }, + "can_read": { + "union": { + "child": [ + { + "computedUserset": { + "object": "", + "relation": "viewer" + } + }, + { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + { + "tupleToUserset": { + "tupleset": { + "object": "", + "relation": "parent" + }, + "computedUserset": { + "object": "", + "relation": "viewer" + } + } + } + ] + } + }, + "can_share": { + "union": { + "child": [ + { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + { + "tupleToUserset": { + "tupleset": { + "object": "", + "relation": "parent" + }, + "computedUserset": { + "object": "", + "relation": "owner" + } + } + } + ] + } + }, + "viewer": { + "this": {} + }, + "can_write": { + "union": { + "child": [ + { + "computedUserset": { + "object": "", + "relation": "owner" + } + }, + { + "tupleToUserset": { + "tupleset": { + "object": "", + "relation": "parent" + }, + "computedUserset": { + "object": "", + "relation": "owner" + } + } + } + ] + } + } + }, + "metadata": { + "relations": { + "can_change_owner": { + "directly_related_user_types": [] + }, + "owner": { + "directly_related_user_types": [ + {"type": "user"} + ] + }, + "parent": { + "directly_related_user_types": [ + {"type": "folder"} + ] + }, + "can_read": { + "directly_related_user_types": [] + }, + "can_share": { + "directly_related_user_types": [] + }, + "viewer": { + "directly_related_user_types": [ + {"type": "user"}, + {"type": "user", "wildcard": {}}, + {"type": "group", "relation": "member"} + ] + }, + "can_write": { + "directly_related_user_types": [] + } + } + } + } + ] + } + """; + + try { + var mapper = new com.fasterxml.jackson.databind.ObjectMapper(); + mapper.findAndRegisterModules(); + return mapper.readValue(modelJson, WriteAuthorizationModelRequest.class); + } catch (Exception e) { + throw new RuntimeException("Failed to parse authorization model", e); + } + } +} \ No newline at end of file From 4602d2e07819a1c36c8ade74f0dfc4bfa0d65d98 Mon Sep 17 00:00:00 2001 From: SoulPancake Date: Thu, 6 Nov 2025 01:43:04 +0530 Subject: [PATCH 04/11] feat: stuff --- .../sdk/api/StreamedListObjectsApi.java | 170 ++++++++++ .../openfga/sdk/api/client/OpenFgaClient.java | 180 +++++++--- .../ClientStreamedListObjectsOptions.java | 56 ++++ .../client/OpenFgaClientIntegrationTest.java | 93 +++++- .../api/client/StreamedListObjectsTest.java | 308 ++++++++++++++++++ 5 files changed, 764 insertions(+), 43 deletions(-) create mode 100644 src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java create mode 100644 src/main/java/dev/openfga/sdk/api/client/model/ClientStreamedListObjectsOptions.java create mode 100644 src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java diff --git a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java new file mode 100644 index 00000000..2947cf03 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java @@ -0,0 +1,170 @@ +package dev.openfga.sdk.api; + +import static dev.openfga.sdk.util.StringUtil.isNullOrWhitespace; +import static dev.openfga.sdk.util.Validation.assertParamExists; + +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.model.ListObjectsRequest; +import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse; +import dev.openfga.sdk.api.model.StreamedListObjectsResponse; +import dev.openfga.sdk.errors.ApiException; +import dev.openfga.sdk.errors.FgaInvalidParameterException; +import dev.openfga.sdk.util.StringUtil; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Stream; + +/** + * API layer for handling streaming responses from the streamedListObjects endpoint. + * This class provides true asynchronous streaming with consumer callbacks using CompletableFuture + * and Java 11's HttpClient async streaming capabilities. + */ +public class StreamedListObjectsApi { + private final Configuration configuration; + private final ApiClient apiClient; + private final ObjectMapper objectMapper; + + public StreamedListObjectsApi(Configuration configuration, ApiClient apiClient) { + this.configuration = configuration; + this.apiClient = apiClient; + this.objectMapper = apiClient.getObjectMapper(); + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Each streamed response is delivered to the consumer callback asynchronously as it arrives. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param storeId The store ID + * @param body The list objects request + * @param consumer Callback to handle each streamed object response (invoked asynchronously) + * @return CompletableFuture that completes when streaming finishes + * @throws ApiException if the API call fails immediately + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture streamedListObjects( + String storeId, ListObjectsRequest body, Consumer consumer) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, consumer, null); + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Each streamed response is delivered to the consumer callback asynchronously as it arrives. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param storeId The store ID + * @param body The list objects request + * @param consumer Callback to handle each streamed object response (invoked asynchronously) + * @param errorConsumer Optional callback to handle errors during streaming + * @return CompletableFuture that completes when streaming finishes or exceptionally on error + * @throws ApiException if the API call fails immediately + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture streamedListObjects( + String storeId, ListObjectsRequest body, Consumer consumer, Consumer errorConsumer) + throws ApiException, FgaInvalidParameterException { + + assertParamExists(storeId, "storeId", "streamedListObjects"); + assertParamExists(body, "body", "streamedListObjects"); + + String path = "/stores/{store_id}/streamed-list-objects" + .replace("{store_id}", StringUtil.urlEncode(storeId.toString())); + + try { + HttpRequest request = buildHttpRequest("POST", path, body, configuration); + + // Use async HTTP client with streaming body handler + // ofLines() provides line-by-line streaming which is perfect for NDJSON + return apiClient + .getHttpClient() + .sendAsync(request, HttpResponse.BodyHandlers.ofLines()) + .thenCompose(response -> { + // Check response status + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + ApiException apiException = + new ApiException(statusCode, "API error: " + statusCode, response.headers(), null); + // Error will be handled by whenComplete + return CompletableFuture.failedFuture(apiException); + } + + // Process the stream asynchronously on a separate thread + return CompletableFuture.runAsync(() -> { + try (Stream lines = response.body()) { + lines.forEach(line -> { + if (!isNullOrWhitespace(line)) { + processLine(line, consumer, errorConsumer); + } + }); + } catch (Exception e) { + // Error will be handled by whenComplete + throw new RuntimeException(e); + } + }); + }) + .whenComplete((result, throwable) -> { + if (throwable != null && errorConsumer != null) { + errorConsumer.accept(throwable); + } + }); + + } catch (Exception e) { + if (errorConsumer != null) { + errorConsumer.accept(e); + } + return CompletableFuture.failedFuture(e); + } + } + + /** + * Process a single line from the NDJSON stream + */ + private void processLine(String line, Consumer consumer, Consumer errorConsumer) { + try { + // Parse the JSON line to extract the object + StreamResultOfStreamedListObjectsResponse streamResult = + objectMapper.readValue(line, StreamResultOfStreamedListObjectsResponse.class); + + if (streamResult.getError() != null) { + // Handle error in stream + if (errorConsumer != null) { + errorConsumer.accept(new ApiException( + "Stream error: " + streamResult.getError().getMessage())); + } + } else if (streamResult.getResult() != null) { + // Deliver the object to the consumer + StreamedListObjectsResponse result = streamResult.getResult(); + if (result.getObject() != null) { + consumer.accept(result.getObject()); + } + } + } catch (Exception e) { + if (errorConsumer != null) { + errorConsumer.accept(e); + } + } + } + + private HttpRequest buildHttpRequest(String method, String path, Object body, Configuration configuration) + throws ApiException, FgaInvalidParameterException { + try { + byte[] bodyBytes = objectMapper.writeValueAsBytes(body); + HttpRequest.Builder requestBuilder = ApiClient.requestBuilder(method, path, bodyBytes, configuration); + + // Apply request interceptors if any + var interceptor = apiClient.getRequestInterceptor(); + if (interceptor != null) { + interceptor.accept(requestBuilder); + } + + return requestBuilder.build(); + } catch (Exception e) { + throw new ApiException(e); + } + } +} \ No newline at end of file diff --git a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java index c8334917..bc403af0 100644 --- a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java +++ b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java @@ -79,7 +79,7 @@ public CompletableFuture listStores(ClientListStoresOp configuration.assertValid(); var overrides = new ConfigurationOverride().addHeaders(options); return call(() -> api.listStores( - options.getPageSize(), options.getContinuationToken(), options.getName(), overrides)) + options.getPageSize(), options.getContinuationToken(), options.getName(), overrides)) .thenApply(ClientListStoresResponse::new); } @@ -296,12 +296,12 @@ public CompletableFuture readChanges( var options = readChangesOptions != null ? readChangesOptions : new ClientReadChangesOptions(); var overrides = new ConfigurationOverride().addHeaders(options); return call(() -> api.readChanges( - storeId, - request.getType(), - options.getPageSize(), - options.getContinuationToken(), - request.getStartTime(), - overrides)) + storeId, + request.getType(), + options.getPageSize(), + options.getContinuationToken(), + request.getStartTime(), + overrides)) .thenApply(ClientReadChangesResponse::new); } @@ -492,19 +492,19 @@ private CompletableFuture writeTransactions( // For transaction-based writes, all tuples are successful if the call succeeds List writeResponses = writeTuples != null ? writeTuples.stream() - .map(tuple -> new ClientWriteSingleResponse(tuple.asTupleKey(), ClientWriteStatus.SUCCESS)) - .collect(Collectors.toList()) + .map(tuple -> new ClientWriteSingleResponse(tuple.asTupleKey(), ClientWriteStatus.SUCCESS)) + .collect(Collectors.toList()) : new ArrayList<>(); List deleteResponses = deleteTuples != null ? deleteTuples.stream() - .map(tuple -> new ClientWriteSingleResponse( - new TupleKey() - .user(tuple.getUser()) - .relation(tuple.getRelation()) - ._object(tuple.getObject()), - ClientWriteStatus.SUCCESS)) - .collect(Collectors.toList()) + .map(tuple -> new ClientWriteSingleResponse( + new TupleKey() + .user(tuple.getUser()) + .relation(tuple.getRelation()) + ._object(tuple.getObject()), + ClientWriteStatus.SUCCESS)) + .collect(Collectors.toList()) : new ArrayList<>(); return new ClientWriteResponse(writeResponses, deleteResponses); @@ -639,18 +639,18 @@ private CompletableFuture writeNonTransaction( CompletableFuture> allWritesFuture = writeFutures.isEmpty() ? CompletableFuture.completedFuture(new ArrayList<>()) : CompletableFuture.allOf(writeFutures.toArray(new CompletableFuture[0])) - .thenApply(v -> writeFutures.stream() - .map(CompletableFuture::join) - .flatMap(List::stream) - .collect(Collectors.toList())); + .thenApply(v -> writeFutures.stream() + .map(CompletableFuture::join) + .flatMap(List::stream) + .collect(Collectors.toList())); CompletableFuture> allDeletesFuture = deleteFutures.isEmpty() ? CompletableFuture.completedFuture(new ArrayList<>()) : CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])) - .thenApply(v -> deleteFutures.stream() - .map(CompletableFuture::join) - .flatMap(List::stream) - .collect(Collectors.toList())); + .thenApply(v -> deleteFutures.stream() + .map(CompletableFuture::join) + .flatMap(List::stream) + .collect(Collectors.toList())); return CompletableFuture.allOf(allWritesFuture, allDeletesFuture) .thenApply(v -> new ClientWriteResponse(allWritesFuture.join(), allDeletesFuture.join())); @@ -828,7 +828,7 @@ public CompletableFuture> clientBatchCheck( var options = batchCheckOptions != null ? batchCheckOptions : new ClientBatchCheckClientOptions() - .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); + .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); HashMap headers = options.getAdditionalHeaders() != null ? new HashMap<>(options.getAdditionalHeaders()) @@ -889,8 +889,8 @@ public CompletableFuture batchCheck( var options = batchCheckOptions != null ? batchCheckOptions : new ClientBatchCheckOptions() - .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS) - .maxBatchSize(FgaConstants.CLIENT_MAX_BATCH_SIZE); + .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS) + .maxBatchSize(FgaConstants.CLIENT_MAX_BATCH_SIZE); HashMap headers = options.getAdditionalHeaders() != null ? new HashMap<>(options.getAdditionalHeaders()) @@ -951,22 +951,22 @@ public CompletableFuture batchCheck( var override = new ConfigurationOverride().addHeaders(options); Consumer> singleBatchCheckRequest = request -> call(() -> { - BatchCheckRequest body = new BatchCheckRequest().checks(request); - if (options.getConsistency() != null) { - body.consistency(options.getConsistency()); - } + BatchCheckRequest body = new BatchCheckRequest().checks(request); + if (options.getConsistency() != null) { + body.consistency(options.getConsistency()); + } - // Set authorizationModelId from options if available; otherwise, use the default from configuration - String authorizationModelId = !isNullOrWhitespace(options.getAuthorizationModelId()) - ? options.getAuthorizationModelId() - : configuration.getAuthorizationModelId(); + // Set authorizationModelId from options if available; otherwise, use the default from configuration + String authorizationModelId = !isNullOrWhitespace(options.getAuthorizationModelId()) + ? options.getAuthorizationModelId() + : configuration.getAuthorizationModelId(); - if (!isNullOrWhitespace(authorizationModelId)) { - body.authorizationModelId(authorizationModelId); - } + if (!isNullOrWhitespace(authorizationModelId)) { + body.authorizationModelId(authorizationModelId); + } - return api.batchCheck(configuration.getStoreId(), body, override); - }) + return api.batchCheck(configuration.getStoreId(), body, override); + }) .whenComplete((batchCheckResponseApiResponse, throwable) -> { try { if (throwable != null) { @@ -1104,6 +1104,102 @@ public CompletableFuture listObjects( return call(() -> api.listObjects(storeId, body, overrides)).thenApply(ClientListObjectsResponse::new); } + /** + * StreamedListObjects - Stream all objects of a particular type that the user has a relation to. + * This method provides true asynchronous streaming with consumer callbacks. + * Objects are delivered to the consumer as they are received from the server asynchronously. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param request The list objects request containing type, relation, and user + * @param consumer Callback to handle each streamed object as it arrives (invoked asynchronously) + * @return CompletableFuture that completes when streaming finishes + * @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace + */ + public CompletableFuture streamedListObjects(ClientListObjectsRequest request, Consumer consumer) + throws FgaInvalidParameterException { + return streamedListObjects(request, null, consumer, null); + } + + /** + * StreamedListObjects - Stream all objects of a particular type that the user has a relation to. + * This method provides true asynchronous streaming with consumer callbacks. + * Objects are delivered to the consumer as they are received from the server asynchronously. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param request The list objects request containing type, relation, and user + * @param options Options for the streaming request + * @param consumer Callback to handle each streamed object as it arrives (invoked asynchronously) + * @return CompletableFuture that completes when streaming finishes + * @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace + */ + public CompletableFuture streamedListObjects( + ClientListObjectsRequest request, ClientStreamedListObjectsOptions options, Consumer consumer) + throws FgaInvalidParameterException { + return streamedListObjects(request, options, consumer, null); + } + + /** + * StreamedListObjects - Stream all objects of a particular type that the user has a relation to. + * This method provides true asynchronous streaming with consumer callbacks. + * Objects are delivered to the consumer as they are received from the server asynchronously. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param request The list objects request containing type, relation, and user + * @param options Options for the streaming request + * @param consumer Callback to handle each streamed object as it arrives (invoked asynchronously) + * @param errorConsumer Optional callback to handle errors during streaming + * @return CompletableFuture that completes when streaming finishes or exceptionally on error + * @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace + */ + public CompletableFuture streamedListObjects( + ClientListObjectsRequest request, + ClientStreamedListObjectsOptions options, + Consumer consumer, + Consumer errorConsumer) + throws FgaInvalidParameterException { + configuration.assertValid(); + String storeId = configuration.getStoreIdChecked(); + + ListObjectsRequest body = new ListObjectsRequest(); + + if (request != null) { + body.user(request.getUser()).relation(request.getRelation()).type(request.getType()); + if (request.getContextualTupleKeys() != null) { + var contextualTuples = request.getContextualTupleKeys(); + var bodyContextualTuples = ClientTupleKey.asContextualTupleKeys(contextualTuples); + body.contextualTuples(bodyContextualTuples); + } + if (request.getContext() != null) { + body.context(request.getContext()); + } + } + + if (options != null) { + if (options.getConsistency() != null) { + body.consistency(options.getConsistency()); + } + + // Set authorizationModelId from options if available; otherwise, use the default from configuration + String authorizationModelId = !isNullOrWhitespace(options.getAuthorizationModelId()) + ? options.getAuthorizationModelId() + : configuration.getAuthorizationModelId(); + body.authorizationModelId(authorizationModelId); + } else { + body.setAuthorizationModelId(configuration.getAuthorizationModelId()); + } + + // Create streaming API instance and execute streaming request asynchronously + StreamedListObjectsApi streamingApi = new StreamedListObjectsApi(configuration, apiClient); + try { + return streamingApi.streamedListObjects(storeId, body, consumer, errorConsumer); + } catch (ApiException e) { + if (errorConsumer != null) { + errorConsumer.accept(e); + } + return CompletableFuture.failedFuture(e); + } + } + /** * ListRelations - List allowed relations a user has with an object (evaluates) */ @@ -1126,7 +1222,7 @@ public CompletableFuture listRelations( var options = listRelationsOptions != null ? listRelationsOptions : new ClientListRelationsOptions() - .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); + .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); HashMap headers = options.getAdditionalHeaders() != null ? new HashMap<>(options.getAdditionalHeaders()) @@ -1311,4 +1407,4 @@ private CompletableFuture call(CheckedInvocation action) { return CompletableFuture.failedFuture(throwable); } } -} +} \ No newline at end of file diff --git a/src/main/java/dev/openfga/sdk/api/client/model/ClientStreamedListObjectsOptions.java b/src/main/java/dev/openfga/sdk/api/client/model/ClientStreamedListObjectsOptions.java new file mode 100644 index 00000000..1384667d --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/client/model/ClientStreamedListObjectsOptions.java @@ -0,0 +1,56 @@ +package dev.openfga.sdk.api.client.model; + +import dev.openfga.sdk.api.configuration.AdditionalHeadersSupplier; +import dev.openfga.sdk.api.model.ConsistencyPreference; +import java.util.Map; + +/** + * Options for the streamedListObjects API call. + * + *

This class allows you to configure the streaming request with: + *

    + *
  • Authorization model ID - Override the default model ID for this request
  • + *
  • Consistency preference - Specify the desired consistency level
  • + *
  • Additional headers - Include custom HTTP headers in the request
  • + *
+ * + *

Example usage: + *

+ * ClientStreamedListObjectsOptions options = new ClientStreamedListObjectsOptions()
+ *     .authorizationModelId("custom-model-id")
+ *     .consistency(ConsistencyPreference.HIGHER_CONSISTENCY);
+ * 
+ */ +public class ClientStreamedListObjectsOptions implements AdditionalHeadersSupplier { + private String authorizationModelId; + private ConsistencyPreference consistency; + private Map additionalHeaders; + + public ClientStreamedListObjectsOptions authorizationModelId(String authorizationModelId) { + this.authorizationModelId = authorizationModelId; + return this; + } + + public String getAuthorizationModelId() { + return authorizationModelId; + } + + public ClientStreamedListObjectsOptions consistency(ConsistencyPreference consistency) { + this.consistency = consistency; + return this; + } + + public ConsistencyPreference getConsistency() { + return consistency; + } + + public ClientStreamedListObjectsOptions additionalHeaders(Map additionalHeaders) { + this.additionalHeaders = additionalHeaders; + return this; + } + + @Override + public Map getAdditionalHeaders() { + return additionalHeaders; + } +} \ No newline at end of file diff --git a/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java b/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java index 8ed6255d..7ed7ccc6 100644 --- a/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java +++ b/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java @@ -11,6 +11,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -372,6 +373,96 @@ private String writeAuthModel(String storeId) throws Exception { return response.getAuthorizationModelId(); } + @Test + public void streamedListObjects() throws Exception { + // Given - Create a single store for all streaming tests + String storeId = createStore(thisTestName()); + fga.setStoreId(storeId); + String authorizationModelId = writeAuthModel(storeId); + fga.setAuthorizationModelId(authorizationModelId); + + // Write tuples for different test scenarios + // Tuples for basic streaming test (user:test) + for (int i = 0; i < 50; i++) { + ClientWriteRequest writeRequest = new ClientWriteRequest() + .writes(List.of(new ClientTupleKey() + .user("user:test") + .relation("reader") + ._object("document:test-" + i))); + fga.write(writeRequest).get(); + } + + // Tuples for error handling test (user:error-test) + for (int i = 0; i < 10; i++) { + ClientWriteRequest writeRequest = new ClientWriteRequest() + .writes(List.of(new ClientTupleKey() + .user("user:error-test") + .relation("reader") + ._object("document:error-test-" + i))); + fga.write(writeRequest).get(); + } + + // Tuples for chaining operations test (user:chain-test) + for (int i = 0; i < 20; i++) { + ClientWriteRequest writeRequest = new ClientWriteRequest() + .writes(List.of(new ClientTupleKey() + .user("user:chain-test") + .relation("reader") + ._object("document:chain-" + i))); + fga.write(writeRequest).get(); + } + + // Test 1: Basic streaming - verify async execution and all objects received + List streamedObjects = new java.util.ArrayList<>(); + ClientListObjectsRequest request1 = new ClientListObjectsRequest() + .type("document") + .relation("reader") + .user("user:test"); + + CompletableFuture streamingFuture1 = fga.streamedListObjects(request1, streamedObjects::add); + streamingFuture1.get(); // Wait for completion + + assertEquals(50, streamedObjects.size()); + for (int i = 0; i < 50; i++) { + assertTrue(streamedObjects.contains("document:test-" + i)); + } + + // Test 2: Error handling - verify error consumer works + List errorTestObjects = new java.util.ArrayList<>(); + List errors = new java.util.ArrayList<>(); + ClientListObjectsRequest request2 = new ClientListObjectsRequest() + .type("document") + .relation("reader") + .user("user:error-test"); + + CompletableFuture streamingFuture2 = + fga.streamedListObjects(request2, null, errorTestObjects::add, errors::add); + streamingFuture2.get(); + + assertEquals(10, errorTestObjects.size()); + assertEquals(0, errors.size()); // Should have no errors in normal operation + + // Test 3: Chaining operations - verify CompletableFuture chaining works + List chainTestObjects = new java.util.ArrayList<>(); + ClientListObjectsRequest request3 = new ClientListObjectsRequest() + .type("document") + .relation("reader") + .user("user:chain-test"); + + java.util.concurrent.atomic.AtomicBoolean chainedOperationExecuted = + new java.util.concurrent.atomic.AtomicBoolean(false); + + CompletableFuture chainedFuture = fga.streamedListObjects(request3, chainTestObjects::add) + .thenRun(() -> { + chainedOperationExecuted.set(true); + }); + + chainedFuture.get(); // Wait for all chained operations + + assertEquals(20, chainTestObjects.size()); + assertTrue(chainedOperationExecuted.get()); + } + /** Get the name of the test that invokes this function. Returned in the form: "$class.$fn" */ private String thisTestName() { // Tracing the stack gives an array of: @@ -381,4 +472,4 @@ private String thisTestName() { return String.format("%s.%s", callingClass, callingFn.getMethodName()); } -} +} \ No newline at end of file diff --git a/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java b/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java new file mode 100644 index 00000000..dd8d63d0 --- /dev/null +++ b/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java @@ -0,0 +1,308 @@ +package dev.openfga.sdk.api.client; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.openfga.sdk.api.client.model.ClientListObjectsRequest; +import dev.openfga.sdk.api.client.model.ClientStreamedListObjectsOptions; +import dev.openfga.sdk.api.configuration.ClientConfiguration; +import dev.openfga.sdk.api.configuration.Credentials; +import dev.openfga.sdk.api.model.ConsistencyPreference; +import dev.openfga.sdk.constants.FgaConstants; +import dev.openfga.sdk.errors.FgaInvalidParameterException; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** Tests for streaming list objects functionality with CompletableFuture. */ +public class StreamedListObjectsTest { + private static final String DEFAULT_STORE_ID = "01YCP46JKYM8FJCQ37NMBYHE5X"; + private static final String DEFAULT_AUTH_MODEL_ID = "01G5JAVJ41T49E9TT3SKVS7X1J"; + private static final String DEFAULT_USER = "user:81684243-9356-4421-8fbf-a4f8d36aa31b"; + private static final String DEFAULT_RELATION = "owner"; + private static final String DEFAULT_TYPE = "document"; + + private OpenFgaClient fga; + private ClientConfiguration clientConfiguration; + private HttpClient mockHttpClient; + private ApiClient mockApiClient; + + @BeforeEach + public void beforeEachTest() throws Exception { + mockHttpClient = mock(HttpClient.class); + var mockHttpClientBuilder = mock(HttpClient.Builder.class); + when(mockHttpClientBuilder.executor(any())).thenReturn(mockHttpClientBuilder); + when(mockHttpClientBuilder.build()).thenReturn(mockHttpClient); + + clientConfiguration = new ClientConfiguration() + .storeId(DEFAULT_STORE_ID) + .authorizationModelId(DEFAULT_AUTH_MODEL_ID) + .apiUrl(FgaConstants.TEST_API_URL) + .credentials(new Credentials()) + .readTimeout(Duration.ofMillis(250)); + + mockApiClient = mock(ApiClient.class); + when(mockApiClient.getHttpClient()).thenReturn(mockHttpClient); + when(mockApiClient.getObjectMapper()).thenReturn(new ObjectMapper()); + when(mockApiClient.getHttpClientBuilder()).thenReturn(mockHttpClientBuilder); + + fga = new OpenFgaClient(clientConfiguration, mockApiClient); + } + + @Test + public void streamedListObjects_success() throws Exception { + // Given + String line1 = "{\"result\":{\"object\":\"document:1\"}}"; + String line2 = "{\"result\":{\"object\":\"document:2\"}}"; + String line3 = "{\"result\":{\"object\":\"document:3\"}}"; + Stream streamResponse = Stream.of(line1, line2, line3); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = fga.streamedListObjects(request, receivedObjects::add); + future.get(); // Wait for completion + + // Then + assertEquals(3, receivedObjects.size()); + assertEquals("document:1", receivedObjects.get(0)); + assertEquals("document:2", receivedObjects.get(1)); + assertEquals("document:3", receivedObjects.get(2)); + verify(mockHttpClient, times(1)).sendAsync(any(), any()); + } + + @Test + public void streamedListObjects_withOptions() throws Exception { + // Given + String line1 = "{\"result\":{\"object\":\"document:1\"}}"; + Stream streamResponse = Stream.of(line1); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + ClientStreamedListObjectsOptions options = new ClientStreamedListObjectsOptions() + .authorizationModelId("custom-model-id") + .consistency(ConsistencyPreference.HIGHER_CONSISTENCY); + + // When + CompletableFuture future = fga.streamedListObjects(request, options, receivedObjects::add); + future.get(); // Wait for completion + + // Then + assertEquals(1, receivedObjects.size()); + assertEquals("document:1", receivedObjects.get(0)); + } + + @Test + public void streamedListObjects_emptyStream() throws Exception { + // Given + Stream streamResponse = Stream.empty(); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = fga.streamedListObjects(request, receivedObjects::add); + future.get(); // Wait for completion + + // Then + assertEquals(0, receivedObjects.size()); + } + + @Test + public void streamedListObjects_storeIdRequired() { + // Given + clientConfiguration.storeId(null); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When/Then + var exception = assertThrows(FgaInvalidParameterException.class, () -> { + fga.streamedListObjects(request, obj -> {}); + }); + + assertEquals( + "Required parameter storeId was invalid when calling ClientConfiguration.", exception.getMessage()); + } + + @Test + public void streamedListObjects_errorHandling() throws Exception { + // Given + String line1 = "{\"result\":{\"object\":\"document:1\"}}"; + String line2 = "{\"error\":{\"message\":\"Something went wrong\"}}"; + String line3 = "{\"result\":{\"object\":\"document:2\"}}"; + Stream streamResponse = Stream.of(line1, line2, line3); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + List receivedErrors = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = + fga.streamedListObjects(request, null, receivedObjects::add, receivedErrors::add); + future.get(); // Wait for completion + + // Then + assertEquals(2, receivedObjects.size()); + assertEquals("document:1", receivedObjects.get(0)); + assertEquals("document:2", receivedObjects.get(1)); + assertEquals(1, receivedErrors.size()); + } + + @Test + public void streamedListObjects_httpError() throws Exception { + // Given + Stream streamResponse = Stream.empty(); + HttpResponse> mockResponse = createMockStreamResponse(400, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + List receivedErrors = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = + fga.streamedListObjects(request, null, receivedObjects::add, receivedErrors::add); + + try { + future.get(); // Wait for completion - should fail + fail("Expected exception"); + } catch (Exception e) { + // Expected + } + + // Then + assertEquals(0, receivedObjects.size()); + assertEquals(1, receivedErrors.size()); + } + + @Test + public void streamedListObjects_consumerInvocationCount() throws Exception { + // Given + int expectedCount = 100; + List lines = new ArrayList<>(); + for (int i = 0; i < expectedCount; i++) { + lines.add(String.format("{\"result\":{\"object\":\"document:%d\"}}", i)); + } + Stream streamResponse = lines.stream(); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + AtomicInteger callCount = new AtomicInteger(0); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = fga.streamedListObjects(request, obj -> callCount.incrementAndGet()); + future.get(); // Wait for completion + + // Then + assertEquals(expectedCount, callCount.get()); + } + + @Test + public void streamedListObjects_chainingWithOtherOperations() throws Exception { + // Given + String line1 = "{\"result\":{\"object\":\"document:1\"}}"; + Stream streamResponse = Stream.of(line1); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When - Chain with other async operations + AtomicInteger completionFlag = new AtomicInteger(0); + CompletableFuture future = fga.streamedListObjects(request, receivedObjects::add) + .thenRun(() -> completionFlag.set(1)) + .thenRun(() -> completionFlag.set(2)); + + future.get(); // Wait for all chained operations + + // Then + assertEquals(1, receivedObjects.size()); + assertEquals(2, completionFlag.get()); + } + + private HttpResponse> createMockStreamResponse(int statusCode, Stream body) { + HttpResponse> mockResponse = mock(HttpResponse.class); + when(mockResponse.statusCode()).thenReturn(statusCode); + when(mockResponse.body()).thenReturn(body); + + // Create mock headers + HttpHeaders mockHeaders = mock(HttpHeaders.class); + when(mockHeaders.map()).thenReturn(Map.of("content-type", List.of("application/json"))); + when(mockResponse.headers()).thenReturn(mockHeaders); + + return mockResponse; + } +} \ No newline at end of file From 3b37e71b3725c6f1e4d74a7b3ce54e910d320f3a Mon Sep 17 00:00:00 2001 From: SoulPancake Date: Thu, 6 Nov 2025 09:30:20 +0530 Subject: [PATCH 05/11] feat: update example gradle file --- .../gradle/wrapper/gradle-wrapper.properties | 4 +- examples/streamed-list-objects/gradlew | 52 +++++++------------ examples/streamed-list-objects/gradlew.bat | 44 +++++++--------- 3 files changed, 41 insertions(+), 59 deletions(-) mode change 100644 => 100755 examples/streamed-list-objects/gradlew diff --git a/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties b/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties index dec8fcc6..a80b22ce 100644 --- a/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties +++ b/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties @@ -1,7 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.0-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME -zipStorePath=wrapper/dists \ No newline at end of file +zipStorePath=wrapper/dists diff --git a/examples/streamed-list-objects/gradlew b/examples/streamed-list-objects/gradlew old mode 100644 new mode 100755 index 1b05573b..005bcde0 --- a/examples/streamed-list-objects/gradlew +++ b/examples/streamed-list-objects/gradlew @@ -1,7 +1,7 @@ #!/bin/sh # -# Copyright © 2015 the original authors. +# Copyright © 2015-2021 the original authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,8 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# SPDX-License-Identifier: Apache-2.0 -# ############################################################################## # @@ -57,7 +55,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -82,11 +80,13 @@ do esac done -# This is normally unused -# shellcheck disable=SC2034 +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" APP_BASE_NAME=${0##*/} -# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) -APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='-Dfile.encoding=UTF-8 "-Xmx64m" "-Xms64m"' # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -114,6 +114,7 @@ case "$( uname )" in #( NONSTOP* ) nonstop=true ;; esac +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar # Determine the Java command to use to start the JVM. @@ -132,29 +133,22 @@ location of your Java installation." fi else JAVACMD=java - if ! command -v java >/dev/null 2>&1 - then - die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." - fi fi # Increase the maximum file descriptors if we can. if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) - # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC2039,SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac case $MAX_FD in #( '' | soft) :;; #( *) - # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC2039,SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -171,6 +165,7 @@ fi # For Cygwin or MSYS, switch paths to Windows format before running java if "$cygwin" || "$msys" ; then APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) JAVACMD=$( cygpath --unix "$JAVACMD" ) @@ -198,27 +193,18 @@ if "$cygwin" || "$msys" ; then done fi - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' - -# Collect all arguments for the java command: -# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, -# and any embedded shellness will be escaped. -# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be -# treated as '${Hostname}' itself on the command line. +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ - -jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ "$@" -# Stop when "xargs" is not available. -if ! command -v xargs >/dev/null 2>&1 -then - die "xargs is not available" -fi - # Use "xargs" to parse quoted args. # # With -n1 it outputs one arg per line, with the quotes and backslashes removed. @@ -245,4 +231,4 @@ eval "set -- $( tr '\n' ' ' )" '"$@"' -exec "$JAVACMD" "$@" \ No newline at end of file +exec "$JAVACMD" "$@" diff --git a/examples/streamed-list-objects/gradlew.bat b/examples/streamed-list-objects/gradlew.bat index 53e45d31..6a68175e 100644 --- a/examples/streamed-list-objects/gradlew.bat +++ b/examples/streamed-list-objects/gradlew.bat @@ -13,10 +13,8 @@ @rem See the License for the specific language governing permissions and @rem limitations under the License. @rem -@rem SPDX-License-Identifier: Apache-2.0 -@rem -@if "%DEBUG%"=="" @echo off +@if "%DEBUG%" == "" @echo off @rem ########################################################################## @rem @rem Gradle startup script for Windows @@ -27,8 +25,7 @@ if "%OS%"=="Windows_NT" setlocal set DIRNAME=%~dp0 -if "%DIRNAME%"=="" set DIRNAME=. -@rem This is normally unused +if "%DIRNAME%" == "" set DIRNAME=. set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% @@ -36,20 +33,20 @@ set APP_HOME=%DIRNAME% for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi @rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" +set DEFAULT_JVM_OPTS=-Dfile.encoding=UTF-8 "-Xmx64m" "-Xms64m" @rem Find java.exe if defined JAVA_HOME goto findJavaFromJavaHome set JAVA_EXE=java.exe %JAVA_EXE% -version >NUL 2>&1 -if %ERRORLEVEL% equ 0 goto execute +if "%ERRORLEVEL%" == "0" goto execute -echo. 1>&2 -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 -echo. 1>&2 -echo Please set the JAVA_HOME variable in your environment to match the 1>&2 -echo location of your Java installation. 1>&2 +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. goto fail @@ -59,35 +56,34 @@ set JAVA_EXE=%JAVA_HOME%/bin/java.exe if exist "%JAVA_EXE%" goto execute -echo. 1>&2 -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 -echo. 1>&2 -echo Please set the JAVA_HOME variable in your environment to match the 1>&2 -echo location of your Java installation. 1>&2 +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. goto fail :execute @rem Setup the command line +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar @rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %* +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* :end @rem End local scope for the variables with windows NT shell -if %ERRORLEVEL% equ 0 goto mainEnd +if "%ERRORLEVEL%"=="0" goto mainEnd :fail rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of rem the _cmd.exe /c_ return code! -set EXIT_CODE=%ERRORLEVEL% -if %EXIT_CODE% equ 0 set EXIT_CODE=1 -if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% -exit /b %EXIT_CODE% +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 :mainEnd if "%OS%"=="Windows_NT" endlocal -:omega \ No newline at end of file +:omega From c654a923a1586bef2098838390a15d1f1f4e103f Mon Sep 17 00:00:00 2001 From: SoulPancake Date: Thu, 6 Nov 2025 09:49:52 +0530 Subject: [PATCH 06/11] feat: config override --- .../sdk/api/StreamedListObjectsApi.java | 65 +++++++++++++- .../openfga/sdk/api/client/OpenFgaClient.java | 88 ++++++++++--------- .../ClientStreamedListObjectsOptions.java | 2 +- .../client/OpenFgaClientIntegrationTest.java | 2 +- .../api/client/StreamedListObjectsTest.java | 40 ++++++++- 5 files changed, 149 insertions(+), 48 deletions(-) diff --git a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java index 2947cf03..f0aca178 100644 --- a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java +++ b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import dev.openfga.sdk.api.client.ApiClient; import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.configuration.ConfigurationOverride; import dev.openfga.sdk.api.model.ListObjectsRequest; import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse; import dev.openfga.sdk.api.model.StreamedListObjectsResponse; @@ -49,7 +50,29 @@ public StreamedListObjectsApi(Configuration configuration, ApiClient apiClient) public CompletableFuture streamedListObjects( String storeId, ListObjectsRequest body, Consumer consumer) throws ApiException, FgaInvalidParameterException { - return streamedListObjects(storeId, body, consumer, null); + return streamedListObjects(storeId, body, consumer, null, this.configuration); + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Each streamed response is delivered to the consumer callback asynchronously as it arrives. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param storeId The store ID + * @param body The list objects request + * @param consumer Callback to handle each streamed object response (invoked asynchronously) + * @param configurationOverride Configuration overrides (e.g., additional headers) + * @return CompletableFuture that completes when streaming finishes + * @throws ApiException if the API call fails immediately + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture streamedListObjects( + String storeId, + ListObjectsRequest body, + Consumer consumer, + ConfigurationOverride configurationOverride) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, consumer, null, this.configuration.override(configurationOverride)); } /** @@ -68,6 +91,44 @@ public CompletableFuture streamedListObjects( public CompletableFuture streamedListObjects( String storeId, ListObjectsRequest body, Consumer consumer, Consumer errorConsumer) throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, consumer, errorConsumer, this.configuration); + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Each streamed response is delivered to the consumer callback asynchronously as it arrives. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param storeId The store ID + * @param body The list objects request + * @param consumer Callback to handle each streamed object response (invoked asynchronously) + * @param errorConsumer Optional callback to handle errors during streaming + * @param configurationOverride Configuration overrides (e.g., additional headers) + * @return CompletableFuture that completes when streaming finishes or exceptionally on error + * @throws ApiException if the API call fails immediately + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture streamedListObjects( + String storeId, + ListObjectsRequest body, + Consumer consumer, + Consumer errorConsumer, + ConfigurationOverride configurationOverride) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects( + storeId, body, consumer, errorConsumer, this.configuration.override(configurationOverride)); + } + + /** + * Internal implementation that accepts a final Configuration to use for the request. + */ + private CompletableFuture streamedListObjects( + String storeId, + ListObjectsRequest body, + Consumer consumer, + Consumer errorConsumer, + Configuration configuration) + throws ApiException, FgaInvalidParameterException { assertParamExists(storeId, "storeId", "streamedListObjects"); assertParamExists(body, "body", "streamedListObjects"); @@ -167,4 +228,4 @@ private HttpRequest buildHttpRequest(String method, String path, Object body, Co throw new ApiException(e); } } -} \ No newline at end of file +} diff --git a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java index bc403af0..da1707da 100644 --- a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java +++ b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java @@ -79,7 +79,7 @@ public CompletableFuture listStores(ClientListStoresOp configuration.assertValid(); var overrides = new ConfigurationOverride().addHeaders(options); return call(() -> api.listStores( - options.getPageSize(), options.getContinuationToken(), options.getName(), overrides)) + options.getPageSize(), options.getContinuationToken(), options.getName(), overrides)) .thenApply(ClientListStoresResponse::new); } @@ -296,12 +296,12 @@ public CompletableFuture readChanges( var options = readChangesOptions != null ? readChangesOptions : new ClientReadChangesOptions(); var overrides = new ConfigurationOverride().addHeaders(options); return call(() -> api.readChanges( - storeId, - request.getType(), - options.getPageSize(), - options.getContinuationToken(), - request.getStartTime(), - overrides)) + storeId, + request.getType(), + options.getPageSize(), + options.getContinuationToken(), + request.getStartTime(), + overrides)) .thenApply(ClientReadChangesResponse::new); } @@ -492,19 +492,19 @@ private CompletableFuture writeTransactions( // For transaction-based writes, all tuples are successful if the call succeeds List writeResponses = writeTuples != null ? writeTuples.stream() - .map(tuple -> new ClientWriteSingleResponse(tuple.asTupleKey(), ClientWriteStatus.SUCCESS)) - .collect(Collectors.toList()) + .map(tuple -> new ClientWriteSingleResponse(tuple.asTupleKey(), ClientWriteStatus.SUCCESS)) + .collect(Collectors.toList()) : new ArrayList<>(); List deleteResponses = deleteTuples != null ? deleteTuples.stream() - .map(tuple -> new ClientWriteSingleResponse( - new TupleKey() - .user(tuple.getUser()) - .relation(tuple.getRelation()) - ._object(tuple.getObject()), - ClientWriteStatus.SUCCESS)) - .collect(Collectors.toList()) + .map(tuple -> new ClientWriteSingleResponse( + new TupleKey() + .user(tuple.getUser()) + .relation(tuple.getRelation()) + ._object(tuple.getObject()), + ClientWriteStatus.SUCCESS)) + .collect(Collectors.toList()) : new ArrayList<>(); return new ClientWriteResponse(writeResponses, deleteResponses); @@ -639,18 +639,18 @@ private CompletableFuture writeNonTransaction( CompletableFuture> allWritesFuture = writeFutures.isEmpty() ? CompletableFuture.completedFuture(new ArrayList<>()) : CompletableFuture.allOf(writeFutures.toArray(new CompletableFuture[0])) - .thenApply(v -> writeFutures.stream() - .map(CompletableFuture::join) - .flatMap(List::stream) - .collect(Collectors.toList())); + .thenApply(v -> writeFutures.stream() + .map(CompletableFuture::join) + .flatMap(List::stream) + .collect(Collectors.toList())); CompletableFuture> allDeletesFuture = deleteFutures.isEmpty() ? CompletableFuture.completedFuture(new ArrayList<>()) : CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])) - .thenApply(v -> deleteFutures.stream() - .map(CompletableFuture::join) - .flatMap(List::stream) - .collect(Collectors.toList())); + .thenApply(v -> deleteFutures.stream() + .map(CompletableFuture::join) + .flatMap(List::stream) + .collect(Collectors.toList())); return CompletableFuture.allOf(allWritesFuture, allDeletesFuture) .thenApply(v -> new ClientWriteResponse(allWritesFuture.join(), allDeletesFuture.join())); @@ -828,7 +828,7 @@ public CompletableFuture> clientBatchCheck( var options = batchCheckOptions != null ? batchCheckOptions : new ClientBatchCheckClientOptions() - .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); + .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); HashMap headers = options.getAdditionalHeaders() != null ? new HashMap<>(options.getAdditionalHeaders()) @@ -889,8 +889,8 @@ public CompletableFuture batchCheck( var options = batchCheckOptions != null ? batchCheckOptions : new ClientBatchCheckOptions() - .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS) - .maxBatchSize(FgaConstants.CLIENT_MAX_BATCH_SIZE); + .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS) + .maxBatchSize(FgaConstants.CLIENT_MAX_BATCH_SIZE); HashMap headers = options.getAdditionalHeaders() != null ? new HashMap<>(options.getAdditionalHeaders()) @@ -951,22 +951,22 @@ public CompletableFuture batchCheck( var override = new ConfigurationOverride().addHeaders(options); Consumer> singleBatchCheckRequest = request -> call(() -> { - BatchCheckRequest body = new BatchCheckRequest().checks(request); - if (options.getConsistency() != null) { - body.consistency(options.getConsistency()); - } + BatchCheckRequest body = new BatchCheckRequest().checks(request); + if (options.getConsistency() != null) { + body.consistency(options.getConsistency()); + } - // Set authorizationModelId from options if available; otherwise, use the default from configuration - String authorizationModelId = !isNullOrWhitespace(options.getAuthorizationModelId()) - ? options.getAuthorizationModelId() - : configuration.getAuthorizationModelId(); + // Set authorizationModelId from options if available; otherwise, use the default from configuration + String authorizationModelId = !isNullOrWhitespace(options.getAuthorizationModelId()) + ? options.getAuthorizationModelId() + : configuration.getAuthorizationModelId(); - if (!isNullOrWhitespace(authorizationModelId)) { - body.authorizationModelId(authorizationModelId); - } + if (!isNullOrWhitespace(authorizationModelId)) { + body.authorizationModelId(authorizationModelId); + } - return api.batchCheck(configuration.getStoreId(), body, override); - }) + return api.batchCheck(configuration.getStoreId(), body, override); + }) .whenComplete((batchCheckResponseApiResponse, throwable) -> { try { if (throwable != null) { @@ -1188,10 +1188,12 @@ public CompletableFuture streamedListObjects( body.setAuthorizationModelId(configuration.getAuthorizationModelId()); } + var overrides = new ConfigurationOverride().addHeaders(options); + // Create streaming API instance and execute streaming request asynchronously StreamedListObjectsApi streamingApi = new StreamedListObjectsApi(configuration, apiClient); try { - return streamingApi.streamedListObjects(storeId, body, consumer, errorConsumer); + return streamingApi.streamedListObjects(storeId, body, consumer, errorConsumer, overrides); } catch (ApiException e) { if (errorConsumer != null) { errorConsumer.accept(e); @@ -1222,7 +1224,7 @@ public CompletableFuture listRelations( var options = listRelationsOptions != null ? listRelationsOptions : new ClientListRelationsOptions() - .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); + .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); HashMap headers = options.getAdditionalHeaders() != null ? new HashMap<>(options.getAdditionalHeaders()) @@ -1407,4 +1409,4 @@ private CompletableFuture call(CheckedInvocation action) { return CompletableFuture.failedFuture(throwable); } } -} \ No newline at end of file +} diff --git a/src/main/java/dev/openfga/sdk/api/client/model/ClientStreamedListObjectsOptions.java b/src/main/java/dev/openfga/sdk/api/client/model/ClientStreamedListObjectsOptions.java index 1384667d..bf4c8c61 100644 --- a/src/main/java/dev/openfga/sdk/api/client/model/ClientStreamedListObjectsOptions.java +++ b/src/main/java/dev/openfga/sdk/api/client/model/ClientStreamedListObjectsOptions.java @@ -53,4 +53,4 @@ public ClientStreamedListObjectsOptions additionalHeaders(Map ad public Map getAdditionalHeaders() { return additionalHeaders; } -} \ No newline at end of file +} diff --git a/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java b/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java index 7ed7ccc6..21ad37e3 100644 --- a/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java +++ b/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java @@ -472,4 +472,4 @@ private String thisTestName() { return String.format("%s.%s", callingClass, callingFn.getMethodName()); } -} \ No newline at end of file +} diff --git a/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java b/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java index dd8d63d0..a7d2d1ff 100644 --- a/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java +++ b/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java @@ -293,6 +293,44 @@ public void streamedListObjects_chainingWithOtherOperations() throws Exception { assertEquals(2, completionFlag.get()); } + @Test + public void streamedListObjects_additionalHeadersPassedThrough() throws Exception { + // Given + String line1 = "{\"result\":{\"object\":\"document:1\"}}"; + Stream streamResponse = Stream.of(line1); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // Create options with additional headers + Map additionalHeaders = Map.of( + "X-Custom-Header", "custom-value", + "X-Request-ID", "test-request-123"); + + ClientStreamedListObjectsOptions options = + new ClientStreamedListObjectsOptions().additionalHeaders(additionalHeaders); + + // When + CompletableFuture future = fga.streamedListObjects(request, options, receivedObjects::add); + future.get(); // Wait for completion + + // Then + assertEquals(1, receivedObjects.size()); + assertEquals("document:1", receivedObjects.get(0)); + + // Verify that the HTTP client was called (which means headers were applied) + verify(mockHttpClient, times(1)).sendAsync(any(), any()); + } + private HttpResponse> createMockStreamResponse(int statusCode, Stream body) { HttpResponse> mockResponse = mock(HttpResponse.class); when(mockResponse.statusCode()).thenReturn(statusCode); @@ -305,4 +343,4 @@ private HttpResponse> createMockStreamResponse(int statusCode, St return mockResponse; } -} \ No newline at end of file +} From c53592c4c91944bf82646b5db7b88d3379722767 Mon Sep 17 00:00:00 2001 From: SoulPancake Date: Thu, 6 Nov 2025 10:02:39 +0530 Subject: [PATCH 07/11] fix: remove unnecessary thread spawn --- STREAMING_OPTIMIZATION.md | 82 +++++++++++++++++++ .../sdk/api/StreamedListObjectsApi.java | 32 ++++---- 2 files changed, 96 insertions(+), 18 deletions(-) create mode 100644 STREAMING_OPTIMIZATION.md diff --git a/STREAMING_OPTIMIZATION.md b/STREAMING_OPTIMIZATION.md new file mode 100644 index 00000000..0a6b1657 --- /dev/null +++ b/STREAMING_OPTIMIZATION.md @@ -0,0 +1,82 @@ +# Optimization: Remove Unnecessary Thread Pool Hop in Streaming + +## Issue Raised +The original concern was that `CompletableFuture.runAsync()` uses `ForkJoinPool.commonPool()` which may not be appropriate for I/O-bound operations like processing streaming responses. + +## Analysis + +### Original Code +```java +.thenCompose(response -> { + // Check status... + + // Process the stream asynchronously on a separate thread + return CompletableFuture.runAsync(() -> { + try (Stream lines = response.body()) { + lines.forEach(line -> { + processLine(line, consumer, errorConsumer); + }); + } + }); +}) +``` + +### Issues with Original Approach +1. **Unnecessary thread hop**: The `thenCompose` already runs on an appropriate executor thread from HttpClient +2. **Extra complexity**: Adding `runAsync` and `thenCompose` when `thenApply` would suffice +3. **Potential confusion**: Suggests I/O-bound work when it's actually CPU-bound (JSON parsing) + +### Reality Check +- The `response.body()` returns a `Stream` that's **already buffered** by HttpClient +- We're **not doing blocking I/O** - just iterating over in-memory lines +- The work is **CPU-bound** (JSON parsing with Jackson), not I/O-bound +- `ForkJoinPool.commonPool()` is actually reasonable for CPU work, BUT... + +## Solution: Use thenApply Instead + +### Optimized Code +```java +.thenApply(response -> { + // Check status... + + // Process the stream - runs on HttpClient's executor thread + try (Stream lines = response.body()) { + lines.forEach(line -> { + processLine(line, consumer, errorConsumer); + }); + } + return (Void) null; +}) +``` + +### Benefits +1. ✅ **No unnecessary thread hop** - processes directly on HttpClient's executor +2. ✅ **Simpler code** - uses `thenApply` instead of `thenCompose` + `runAsync` +3. ✅ **More efficient** - one less context switch +4. ✅ **Consistent with codebase** - follows patterns used elsewhere in the SDK +5. ✅ **Same executor context** - uses the executor configured for the HttpClient + +## Thread Pool Context + +The processing now runs on: +- **HttpClient's executor** if one was explicitly configured via `HttpClient.Builder.executor()` +- **HttpClient's default executor** otherwise (which is suitable for this work) + +This is better than using `ForkJoinPool.commonPool()` because: +- It respects any custom executor configuration +- It keeps the work in the same execution context as the HTTP operation +- It's simpler and more efficient + +## Verification + +- ✅ All existing tests pass +- ✅ Build succeeds +- ✅ Example project works correctly +- ✅ No functional changes, only optimization + +## Conclusion + +**The concern was valid** - we were using an unnecessary `runAsync` that added complexity and a thread hop. However, **the real issue wasn't about ForkJoinPool vs I/O executor**, but rather that we didn't need `runAsync` at all. + +The fix is simpler and more efficient: just use `thenApply` and process the stream directly on the HttpClient's executor thread. + diff --git a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java index f0aca178..9c977840 100644 --- a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java +++ b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java @@ -144,29 +144,25 @@ private CompletableFuture streamedListObjects( return apiClient .getHttpClient() .sendAsync(request, HttpResponse.BodyHandlers.ofLines()) - .thenCompose(response -> { + .thenApply(response -> { // Check response status int statusCode = response.statusCode(); if (statusCode < 200 || statusCode >= 300) { - ApiException apiException = - new ApiException(statusCode, "API error: " + statusCode, response.headers(), null); - // Error will be handled by whenComplete - return CompletableFuture.failedFuture(apiException); + throw new RuntimeException( + new ApiException(statusCode, "API error: " + statusCode, response.headers(), null)); } - // Process the stream asynchronously on a separate thread - return CompletableFuture.runAsync(() -> { - try (Stream lines = response.body()) { - lines.forEach(line -> { - if (!isNullOrWhitespace(line)) { - processLine(line, consumer, errorConsumer); - } - }); - } catch (Exception e) { - // Error will be handled by whenComplete - throw new RuntimeException(e); - } - }); + // Process the stream - this runs on HttpClient's executor thread + try (Stream lines = response.body()) { + lines.forEach(line -> { + if (!isNullOrWhitespace(line)) { + processLine(line, consumer, errorConsumer); + } + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + return (Void) null; }) .whenComplete((result, throwable) -> { if (throwable != null && errorConsumer != null) { From 8c6ac1039046dd9f1a06efb42fbfb5bb7a42330e Mon Sep 17 00:00:00 2001 From: SoulPancake Date: Thu, 6 Nov 2025 10:15:20 +0530 Subject: [PATCH 08/11] feat: fix exceptions --- .../example/StreamedListObjectsExample.java | 2 -- .../sdk/api/StreamedListObjectsApi.java | 32 ++++++++++++----- .../api/client/StreamedListObjectsTest.java | 36 +++++++++++++++++++ 3 files changed, 60 insertions(+), 10 deletions(-) diff --git a/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java index e713b6c6..8a466574 100644 --- a/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java +++ b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java @@ -66,7 +66,6 @@ public void run() throws Exception { // Stream objects System.out.println("Listing objects using streaming endpoint:"); List results = new ArrayList<>(); - AtomicInteger count = new AtomicInteger(0); ClientListObjectsRequest request = new ClientListObjectsRequest() .type("document") @@ -77,7 +76,6 @@ public void run() throws Exception { .streamedListObjects(request, object -> { System.out.println(" " + object); results.add(object); - count.incrementAndGet(); }) .thenRun(() -> { System.out.println("Streaming complete!"); diff --git a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java index 9c977840..3b93f4d6 100644 --- a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java +++ b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java @@ -144,12 +144,13 @@ private CompletableFuture streamedListObjects( return apiClient .getHttpClient() .sendAsync(request, HttpResponse.BodyHandlers.ofLines()) - .thenApply(response -> { + .thenCompose(response -> { // Check response status int statusCode = response.statusCode(); if (statusCode < 200 || statusCode >= 300) { - throw new RuntimeException( - new ApiException(statusCode, "API error: " + statusCode, response.headers(), null)); + ApiException apiException = + new ApiException(statusCode, "API error: " + statusCode, response.headers(), null); + return CompletableFuture.failedFuture(apiException); } // Process the stream - this runs on HttpClient's executor thread @@ -159,15 +160,30 @@ private CompletableFuture streamedListObjects( processLine(line, consumer, errorConsumer); } }); + return CompletableFuture.completedFuture((Void) null); } catch (Exception e) { - throw new RuntimeException(e); + return CompletableFuture.failedFuture(e); } - return (Void) null; }) - .whenComplete((result, throwable) -> { - if (throwable != null && errorConsumer != null) { - errorConsumer.accept(throwable); + .handle((result, throwable) -> { + if (throwable != null) { + // Unwrap CompletionException to get the original exception + Throwable actualException = throwable; + if (throwable instanceof java.util.concurrent.CompletionException + && throwable.getCause() != null) { + actualException = throwable.getCause(); + } + + if (errorConsumer != null) { + errorConsumer.accept(actualException); + } + // Re-throw to keep the CompletableFuture in failed state + if (actualException instanceof RuntimeException) { + throw (RuntimeException) actualException; + } + throw new RuntimeException(actualException); } + return result; }); } catch (Exception e) { diff --git a/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java b/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java index a7d2d1ff..90f5986f 100644 --- a/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java +++ b/src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java @@ -331,6 +331,42 @@ public void streamedListObjects_additionalHeadersPassedThrough() throws Exceptio verify(mockHttpClient, times(1)).sendAsync(any(), any()); } + @Test + public void streamedListObjects_preservesApiExceptionType() throws Exception { + // Given - HTTP 400 error should create ApiException + Stream streamResponse = Stream.empty(); + HttpResponse> mockResponse = createMockStreamResponse(400, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedErrors = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = fga.streamedListObjects(request, null, obj -> {}, receivedErrors::add); + + try { + future.get(); + fail("Expected exception"); + } catch (Exception e) { + // Expected to fail + } + + // Then - verify the error consumer received the original ApiException, not wrapped + assertEquals(1, receivedErrors.size()); + Throwable error = receivedErrors.get(0); + assertTrue( + error instanceof dev.openfga.sdk.errors.ApiException, + "Expected ApiException but got " + error.getClass().getName()); + dev.openfga.sdk.errors.ApiException apiException = (dev.openfga.sdk.errors.ApiException) error; + assertEquals(400, apiException.getStatusCode()); + } + private HttpResponse> createMockStreamResponse(int statusCode, Stream body) { HttpResponse> mockResponse = mock(HttpResponse.class); when(mockResponse.statusCode()).thenReturn(statusCode); From 28f1d7bd97f28a5765d3a9562f9fd5a5addf393a Mon Sep 17 00:00:00 2001 From: SoulPancake Date: Thu, 6 Nov 2025 10:28:43 +0530 Subject: [PATCH 09/11] feat: stream error null checks --- .../sdk/api/StreamedListObjectsApi.java | 8 +++- .../api/client/StreamedListObjectsTest.java | 43 +++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java index 3b93f4d6..819f8fb6 100644 --- a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java +++ b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java @@ -8,6 +8,7 @@ import dev.openfga.sdk.api.configuration.Configuration; import dev.openfga.sdk.api.configuration.ConfigurationOverride; import dev.openfga.sdk.api.model.ListObjectsRequest; +import dev.openfga.sdk.api.model.Status; import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse; import dev.openfga.sdk.api.model.StreamedListObjectsResponse; import dev.openfga.sdk.errors.ApiException; @@ -206,8 +207,11 @@ private void processLine(String line, Consumer consumer, Consumer streamResponse = Stream.of(line1, line2, line3); + + HttpResponse> mockResponse = createMockStreamResponse(200, streamResponse); + CompletableFuture>> responseFuture = + CompletableFuture.completedFuture(mockResponse); + + when(mockHttpClient.>sendAsync(any(), any())).thenReturn(responseFuture); + + List receivedObjects = new ArrayList<>(); + List receivedErrors = new ArrayList<>(); + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + CompletableFuture future = + fga.streamedListObjects(request, null, receivedObjects::add, receivedErrors::add); + future.get(); // Wait for completion + + // Then - should handle null messages gracefully without NPE + assertEquals(1, receivedObjects.size()); + assertEquals(2, receivedErrors.size()); + + // Verify first error has a fallback message + assertTrue(receivedErrors.get(0) instanceof dev.openfga.sdk.errors.ApiException); + String firstErrorMsg = ((dev.openfga.sdk.errors.ApiException) receivedErrors.get(0)).getMessage(); + assertTrue(firstErrorMsg.contains("Stream error")); + assertTrue(firstErrorMsg.contains("unknown")); + + // Verify second error has code in message + assertTrue(receivedErrors.get(1) instanceof dev.openfga.sdk.errors.ApiException); + String secondErrorMsg = ((dev.openfga.sdk.errors.ApiException) receivedErrors.get(1)).getMessage(); + assertTrue(secondErrorMsg.contains("Stream error")); + assertTrue(secondErrorMsg.contains("code 123")); + } + @Test public void streamedListObjects_httpError() throws Exception { // Given From 60e59e618f74dfbf2458fdd2f39920b906ded727 Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Thu, 6 Nov 2025 11:27:45 +0530 Subject: [PATCH 10/11] Delete STREAMING_OPTIMIZATION.md --- STREAMING_OPTIMIZATION.md | 82 --------------------------------------- 1 file changed, 82 deletions(-) delete mode 100644 STREAMING_OPTIMIZATION.md diff --git a/STREAMING_OPTIMIZATION.md b/STREAMING_OPTIMIZATION.md deleted file mode 100644 index 0a6b1657..00000000 --- a/STREAMING_OPTIMIZATION.md +++ /dev/null @@ -1,82 +0,0 @@ -# Optimization: Remove Unnecessary Thread Pool Hop in Streaming - -## Issue Raised -The original concern was that `CompletableFuture.runAsync()` uses `ForkJoinPool.commonPool()` which may not be appropriate for I/O-bound operations like processing streaming responses. - -## Analysis - -### Original Code -```java -.thenCompose(response -> { - // Check status... - - // Process the stream asynchronously on a separate thread - return CompletableFuture.runAsync(() -> { - try (Stream lines = response.body()) { - lines.forEach(line -> { - processLine(line, consumer, errorConsumer); - }); - } - }); -}) -``` - -### Issues with Original Approach -1. **Unnecessary thread hop**: The `thenCompose` already runs on an appropriate executor thread from HttpClient -2. **Extra complexity**: Adding `runAsync` and `thenCompose` when `thenApply` would suffice -3. **Potential confusion**: Suggests I/O-bound work when it's actually CPU-bound (JSON parsing) - -### Reality Check -- The `response.body()` returns a `Stream` that's **already buffered** by HttpClient -- We're **not doing blocking I/O** - just iterating over in-memory lines -- The work is **CPU-bound** (JSON parsing with Jackson), not I/O-bound -- `ForkJoinPool.commonPool()` is actually reasonable for CPU work, BUT... - -## Solution: Use thenApply Instead - -### Optimized Code -```java -.thenApply(response -> { - // Check status... - - // Process the stream - runs on HttpClient's executor thread - try (Stream lines = response.body()) { - lines.forEach(line -> { - processLine(line, consumer, errorConsumer); - }); - } - return (Void) null; -}) -``` - -### Benefits -1. ✅ **No unnecessary thread hop** - processes directly on HttpClient's executor -2. ✅ **Simpler code** - uses `thenApply` instead of `thenCompose` + `runAsync` -3. ✅ **More efficient** - one less context switch -4. ✅ **Consistent with codebase** - follows patterns used elsewhere in the SDK -5. ✅ **Same executor context** - uses the executor configured for the HttpClient - -## Thread Pool Context - -The processing now runs on: -- **HttpClient's executor** if one was explicitly configured via `HttpClient.Builder.executor()` -- **HttpClient's default executor** otherwise (which is suitable for this work) - -This is better than using `ForkJoinPool.commonPool()` because: -- It respects any custom executor configuration -- It keeps the work in the same execution context as the HTTP operation -- It's simpler and more efficient - -## Verification - -- ✅ All existing tests pass -- ✅ Build succeeds -- ✅ Example project works correctly -- ✅ No functional changes, only optimization - -## Conclusion - -**The concern was valid** - we were using an unnecessary `runAsync` that added complexity and a thread hop. However, **the real issue wasn't about ForkJoinPool vs I/O executor**, but rather that we didn't need `runAsync` at all. - -The fix is simpler and more efficient: just use `thenApply` and process the stream directly on the HttpClient's executor thread. - From 416654351accdb928f69b7b385f561e1b92abec7 Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Thu, 6 Nov 2025 16:15:29 +0530 Subject: [PATCH 11/11] feat: add changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b589759c..75173eab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## [Unreleased](https://github.com/openfga/java-sdk/compare/v0.9.2...HEAD) +- feat: Add `streamedListObjects` API endpoint with consumer callback support (#252) ## v0.9.2