Skip to content

Commit ddc7e63

Browse files
committed
Implemented Reactive versions of PostAction, PreAction and JdbcSelectWithActions
1 parent 1adce18 commit ddc7e63

18 files changed

+1442
-20
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionFactoryImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
import org.hibernate.reactive.boot.spi.ReactiveMetadataImplementor;
1414
import org.hibernate.reactive.mutiny.Mutiny;
1515
import org.hibernate.reactive.mutiny.impl.MutinySessionFactoryImpl;
16+
import org.hibernate.reactive.sql.exec.internal.ReactiveJdbcSelectWithActions;
1617
import org.hibernate.reactive.stage.Stage;
1718
import org.hibernate.reactive.stage.impl.StageSessionFactoryImpl;
19+
import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder;
1820

1921
/**
2022
* A Hibernate {@link org.hibernate.SessionFactory} that can be
@@ -42,4 +44,9 @@ public <T> T unwrap(Class<T> type) {
4244
}
4345
return super.unwrap( type );
4446
}
47+
48+
public JdbcSelectWithActionsBuilder getJdbcSelectWithActionsBuilder(){
49+
return new ReactiveJdbcSelectWithActions.Builder();
50+
}
51+
4552
}
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.sql.exec.internal;
7+
8+
import org.hibernate.LockOptions;
9+
import org.hibernate.Locking;
10+
import org.hibernate.dialect.lock.spi.LockTimeoutType;
11+
import org.hibernate.dialect.lock.spi.LockingSupport;
12+
import org.hibernate.internal.util.collections.CollectionHelper;
13+
import org.hibernate.reactive.pool.ReactiveConnection;
14+
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveCollectionLockingAction;
15+
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveConnectionLockTimeoutStrategyBuilder;
16+
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveFollowOnLockingAction;
17+
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveLockTimeoutHandler;
18+
import org.hibernate.reactive.sql.exec.spi.ReactiveJdbcSelect;
19+
import org.hibernate.reactive.sql.exec.spi.ReactivePostAction;
20+
import org.hibernate.reactive.sql.exec.spi.ReactivePreAction;
21+
import org.hibernate.sql.ast.spi.LockingClauseStrategy;
22+
import org.hibernate.sql.ast.tree.select.QuerySpec;
23+
import org.hibernate.sql.exec.internal.JdbcOperationQuerySelect;
24+
import org.hibernate.sql.exec.internal.JdbcSelectWithActions;
25+
import org.hibernate.sql.exec.spi.ExecutionContext;
26+
import org.hibernate.sql.exec.spi.JdbcSelect;
27+
import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder;
28+
import org.hibernate.sql.exec.spi.LoadedValuesCollector;
29+
import org.hibernate.sql.exec.spi.PostAction;
30+
import org.hibernate.sql.exec.spi.PreAction;
31+
import org.hibernate.sql.exec.spi.SecondaryAction;
32+
33+
import java.util.ArrayList;
34+
import java.util.Collections;
35+
import java.util.List;
36+
import java.util.concurrent.CompletionStage;
37+
38+
import static org.hibernate.reactive.util.impl.CompletionStages.loop;
39+
import static org.hibernate.reactive.util.impl.CompletionStages.nullFuture;
40+
41+
/**
42+
* Reactive version of {@link JdbcSelectWithActions}
43+
*/
44+
public class ReactiveJdbcSelectWithActions extends JdbcSelectWithActions implements ReactiveJdbcSelect {
45+
46+
public ReactiveJdbcSelectWithActions(
47+
JdbcOperationQuerySelect primaryOperation,
48+
LoadedValuesCollector loadedValuesCollector,
49+
PreAction[] preActions,
50+
PostAction[] postActions) {
51+
super( primaryOperation, loadedValuesCollector, preActions, postActions );
52+
}
53+
54+
public ReactiveJdbcSelectWithActions(
55+
JdbcOperationQuerySelect primaryAction,
56+
LoadedValuesCollector loadedValuesCollector) {
57+
super( primaryAction, loadedValuesCollector );
58+
}
59+
60+
@Override
61+
public CompletionStage<Void> reactivePerformPreActions(
62+
ReactiveConnection connection,
63+
ExecutionContext executionContext) {
64+
if ( preActions == null ) {
65+
return nullFuture();
66+
}
67+
68+
return loop( preActions, preAction ->
69+
( (ReactivePreAction) preAction ).reactivePerformPreAction( connection, executionContext )
70+
);
71+
}
72+
73+
@Override
74+
public CompletionStage<Void> reactivePerformPostActions(
75+
boolean succeeded,
76+
ReactiveConnection connection,
77+
ExecutionContext executionContext) {
78+
if ( postActions != null ) {
79+
return loop(
80+
postActions, postAction -> {
81+
if ( succeeded || postAction.shouldRunAfterFail() ) {
82+
return ( (ReactivePostAction) postAction ).reactivePerformReactivePostAction(
83+
connection,
84+
executionContext
85+
);
86+
}
87+
return nullFuture();
88+
}
89+
).thenAccept( unused -> {
90+
if ( loadedValuesCollector != null ) {
91+
loadedValuesCollector.clear();
92+
}
93+
} );
94+
}
95+
else {
96+
if ( loadedValuesCollector != null ) {
97+
loadedValuesCollector.clear();
98+
}
99+
return nullFuture();
100+
}
101+
}
102+
103+
public static class Builder implements JdbcSelectWithActionsBuilder {
104+
private JdbcOperationQuerySelect primaryAction;
105+
private LoadedValuesCollector loadedValuesCollector;
106+
protected List<PreAction> preActions;
107+
protected List<PostAction> postActions;
108+
protected LockTimeoutType lockTimeoutType;
109+
protected LockingSupport lockingSupport;
110+
protected LockOptions lockOptions;
111+
protected QuerySpec lockingTarget;
112+
protected LockingClauseStrategy lockingClauseStrategy;
113+
boolean isFollonOnLockStrategy;
114+
115+
@Override
116+
public Builder setPrimaryAction(JdbcSelect primaryAction) {
117+
assert primaryAction instanceof JdbcOperationQuerySelect;
118+
this.primaryAction = (JdbcOperationQuerySelect) primaryAction;
119+
return this;
120+
}
121+
122+
@SuppressWarnings("UnusedReturnValue")
123+
@Override
124+
public Builder setLoadedValuesCollector(LoadedValuesCollector loadedValuesCollector) {
125+
this.loadedValuesCollector = loadedValuesCollector;
126+
return this;
127+
}
128+
129+
@Override
130+
public Builder setLockTymeOutType(LockTimeoutType lockTimeoutType) {
131+
this.lockTimeoutType = lockTimeoutType;
132+
return this;
133+
}
134+
135+
@Override
136+
public Builder setLockingSupport(LockingSupport lockingSupport) {
137+
this.lockingSupport = lockingSupport;
138+
return this;
139+
}
140+
141+
@Override
142+
public Builder setLockOptions(LockOptions lockOptions) {
143+
this.lockOptions = lockOptions;
144+
return this;
145+
}
146+
147+
@Override
148+
public Builder setLockingTarget(QuerySpec lockingTarget) {
149+
this.lockingTarget = lockingTarget;
150+
return this;
151+
}
152+
153+
@Override
154+
public Builder setLockingClauseStrategy(LockingClauseStrategy lockingClauseStrategy) {
155+
this.lockingClauseStrategy = lockingClauseStrategy;
156+
return this;
157+
}
158+
159+
@Override
160+
public Builder setIsFollowOnLockStrategy(boolean isFollonOnLockStrategy) {
161+
this.isFollonOnLockStrategy = isFollonOnLockStrategy;
162+
return this;
163+
}
164+
165+
@Override
166+
public JdbcSelect build() {
167+
if ( lockTimeoutType == LockTimeoutType.CONNECTION ) {
168+
addSecondaryActionPair(
169+
new ReactiveLockTimeoutHandler(
170+
lockOptions.getTimeout(),
171+
ReactiveConnectionLockTimeoutStrategyBuilder.build( lockingSupport.getConnectionLockTimeoutStrategy() )
172+
)
173+
);
174+
}
175+
if ( isFollonOnLockStrategy ) {
176+
ReactiveFollowOnLockingAction.apply( lockOptions, lockingTarget, lockingClauseStrategy, this );
177+
}
178+
else if ( lockOptions.getScope() == Locking.Scope.INCLUDE_COLLECTIONS ) {
179+
ReactiveCollectionLockingAction.apply( lockOptions, lockingTarget, this );
180+
}
181+
if ( preActions == null && postActions == null ) {
182+
assert loadedValuesCollector == null;
183+
return primaryAction;
184+
}
185+
final PreAction[] preActions = toPreActionArray( this.preActions );
186+
final PostAction[] postActions = toPostActionArray( this.postActions );
187+
return new ReactiveJdbcSelectWithActions( primaryAction, loadedValuesCollector, preActions, postActions );
188+
}
189+
190+
/**
191+
* Appends the {@code actions} to the growing list of pre-actions,
192+
* executed (in order) after all currently registered actions.
193+
*
194+
* @return {@code this}, for method chaining.
195+
*/
196+
@Override
197+
public Builder appendPreAction(PreAction... actions) {
198+
if ( preActions == null ) {
199+
preActions = new ArrayList<>();
200+
}
201+
Collections.addAll( preActions, actions );
202+
return this;
203+
}
204+
205+
/**
206+
* Prepends the {@code actions} to the growing list of pre-actions
207+
*
208+
* @return {@code this}, for method chaining.
209+
*/
210+
@Override
211+
public Builder prependPreAction(PreAction... actions) {
212+
if ( preActions == null ) {
213+
preActions = new ArrayList<>();
214+
}
215+
// todo (DatabaseOperation) : should we invert the order of the incoming actions?
216+
Collections.addAll( preActions, actions );
217+
return this;
218+
}
219+
220+
/**
221+
* Appends the {@code actions} to the growing list of post-actions
222+
*
223+
* @return {@code this}, for method chaining.
224+
*/
225+
@Override
226+
public Builder appendPostAction(PostAction... actions) {
227+
if ( postActions == null ) {
228+
postActions = new ArrayList<>();
229+
}
230+
Collections.addAll( postActions, actions );
231+
return this;
232+
}
233+
234+
/**
235+
* Prepends the {@code actions} to the growing list of post-actions
236+
*
237+
* @return {@code this}, for method chaining.
238+
*/
239+
@Override
240+
public Builder prependPostAction(PostAction... actions) {
241+
if ( postActions == null ) {
242+
postActions = new ArrayList<>();
243+
}
244+
// todo (DatabaseOperation) : should we invert the order of the incoming actions?
245+
Collections.addAll( postActions, actions );
246+
return this;
247+
}
248+
249+
/**
250+
* Adds a secondary action pair.
251+
* Assumes the {@code action} implements both {@linkplain PreAction} and {@linkplain PostAction}.
252+
*
253+
* @return {@code this}, for method chaining.
254+
*
255+
* @apiNote Prefer {@linkplain #addSecondaryActionPair(PreAction, PostAction)} to avoid
256+
* the casts needed here.
257+
* @see #prependPreAction
258+
* @see #appendPostAction
259+
*/
260+
@Override
261+
public Builder addSecondaryActionPair(SecondaryAction action) {
262+
return addSecondaryActionPair( (PreAction) action, (PostAction) action );
263+
}
264+
265+
/**
266+
* Adds a PreAction/PostAction pair.
267+
*
268+
* @return {@code this}, for method chaining.
269+
*
270+
* @see #prependPreAction
271+
* @see #appendPostAction
272+
*/
273+
@Override
274+
public Builder addSecondaryActionPair(PreAction preAction, PostAction postAction) {
275+
prependPreAction( preAction );
276+
appendPostAction( postAction );
277+
return this;
278+
}
279+
280+
static PreAction[] toPreActionArray(List<PreAction> actions) {
281+
if ( CollectionHelper.isEmpty( actions ) ) {
282+
return null;
283+
}
284+
return actions.toArray( new PreAction[0] );
285+
}
286+
287+
static PostAction[] toPostActionArray(List<PostAction> actions) {
288+
if ( CollectionHelper.isEmpty( actions ) ) {
289+
return null;
290+
}
291+
return actions.toArray( new PostAction[0] );
292+
}
293+
294+
}
295+
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveSelectExecutor.java

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import org.hibernate.query.TupleTransformer;
2323
import org.hibernate.reactive.engine.impl.ReactivePersistenceContextAdapter;
2424
import org.hibernate.query.spi.QueryOptions;
25+
import org.hibernate.reactive.pool.ReactiveConnection;
26+
import org.hibernate.reactive.session.ReactiveConnectionSupplier;
27+
import org.hibernate.reactive.sql.exec.spi.ReactiveJdbcSelect;
2528
import org.hibernate.reactive.sql.exec.spi.ReactiveRowProcessingState;
2629
import org.hibernate.reactive.sql.exec.spi.ReactiveSelectExecutor;
2730
import org.hibernate.reactive.sql.exec.spi.ReactiveValuesResultSet;
@@ -220,10 +223,15 @@ public boolean shouldReturnProxies() {
220223
};
221224

222225
final JdbcValuesSourceProcessingStateStandardImpl valuesProcessingState =
223-
new JdbcValuesSourceProcessingStateStandardImpl( executionContext, processingOptions );
226+
new JdbcValuesSourceProcessingStateStandardImpl(
227+
jdbcSelect.getLoadedValuesCollector(),
228+
processingOptions,
229+
executionContext
230+
);
224231

232+
final SharedSessionContractImplementor session = executionContext.getSession();
225233
final ReactiveRowReader<R> rowReader = ReactiveResultsHelper.createRowReader(
226-
executionContext.getSession().getSessionFactory(),
234+
session.getSessionFactory(),
227235
rowTransformer,
228236
domainResultType,
229237
jdbcValues
@@ -237,21 +245,34 @@ public boolean shouldReturnProxies() {
237245
);
238246

239247
rowReader.startLoading( rowProcessingState );
240-
241-
return resultsConsumer
242-
.consume(
243-
jdbcValues,
244-
executionContext.getSession(),
245-
processingOptions,
246-
valuesProcessingState,
247-
rowProcessingState,
248-
rowReader
249-
)
250-
.thenApply( result -> {
251-
statistics.end( jdbcSelect, result );
252-
return result;
253-
} );
254-
} );
248+
ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection();
249+
if ( jdbcSelect instanceof ReactiveJdbcSelect reactiveJdbcSelect ) {
250+
return reactiveJdbcSelect.reactivePerformPreActions( reactiveConnection, executionContext )
251+
.thenCompose( unused -> resultsConsumer
252+
.consume(
253+
jdbcValues,
254+
session,
255+
processingOptions,
256+
valuesProcessingState,
257+
rowProcessingState,
258+
rowReader
259+
) )
260+
.thenCompose( result -> reactiveJdbcSelect
261+
.reactivePerformPostActions( true, reactiveConnection, executionContext )
262+
.thenApply( v -> {
263+
statistics.end( jdbcSelect, result );
264+
return result;
265+
} )
266+
);
267+
}
268+
else {
269+
return resultsConsumer.consume( jdbcValues, session, processingOptions, valuesProcessingState, rowProcessingState, rowReader )
270+
.thenApply( result -> {
271+
statistics.end( jdbcSelect, result );
272+
return result;
273+
} );
274+
}
275+
});
255276
}
256277

257278
private static <R> RowTransformer<R> rowTransformer(

0 commit comments

Comments
 (0)