Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
*/
package org.hibernate.reactive.mutiny;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.hibernate.Cache;
import org.hibernate.CacheMode;
import org.hibernate.Filter;
Expand Down Expand Up @@ -44,10 +49,6 @@
import jakarta.persistence.criteria.CriteriaUpdate;
import jakarta.persistence.metamodel.Attribute;
import jakarta.persistence.metamodel.Metamodel;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;

import static org.hibernate.engine.internal.ManagedTypeHelper.asPersistentAttributeInterceptable;
import static org.hibernate.engine.internal.ManagedTypeHelper.isPersistentAttributeInterceptable;
Expand Down Expand Up @@ -1971,6 +1972,59 @@ interface Transaction {
*/
interface SessionFactory extends AutoCloseable {

/**
* Obtain a new {@link Session reactive session}, the main
* interaction point between the user's program and Hibernate
* Reactive.
* <p>
* The underlying database connection is obtained lazily
* when the returned {@link Session} needs to access the
* database.
* <p>
* The client must close the session using {@link Session#close()}.
*/
@Incubating
Session createSession();

/**
* Obtain a new {@link Session reactive session}.
* <p>
* The underlying database connection is obtained lazily
* when the returned {@link Session} needs to access the
* database.
* <p>
* The client must close the session using {@link Session#close()}.
* @param tenantId the id of the tenant
*/
@Incubating
Session createSession(String tenantId);

/**
* Obtain a new {@link StatelessSession reactive stateless session}.
* <p>
* The underlying database connection is obtained lazily
* when the returned {@link StatelessSession} needs to access the
* database.
* <p>
* The client must close the session using {@link Session#close()}.
*/
@Incubating
StatelessSession createStatelessSession();

/**
* Obtain a new {@link StatelessSession reactive stateless session}.
* <p>
* The underlying database connection is obtained lazily
* when the returned {@link StatelessSession} needs to access the
* database.
* <p>
* The client must close the session using {@link Session#close()}.
*
* @param tenantId the id of the tenant
*/
@Incubating
StatelessSession createStatelessSession(String tenantId);

/**
* Obtain a new {@link Session reactive session} {@link Uni}, the main
* interaction point between the user's program and Hibernate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.hibernate.reactive.mutiny.Mutiny;
import org.hibernate.reactive.pool.ReactiveConnection;
import org.hibernate.reactive.pool.ReactiveConnectionPool;
import org.hibernate.reactive.session.ReactiveSession;
import org.hibernate.reactive.session.ReactiveStatelessSession;
import org.hibernate.reactive.session.impl.ReactiveSessionImpl;
import org.hibernate.reactive.session.impl.ReactiveStatelessSessionImpl;
import org.hibernate.service.ServiceRegistry;
Expand Down Expand Up @@ -88,21 +90,48 @@ public Context getContext() {
return context;
}

@Override
public Mutiny.Session createSession() {
return createSession( getTenantIdentifier( options() ) );
}

@Override
public Mutiny.Session createSession(String tenantId) {
final SessionCreationOptions options = options();
ReactiveConnectionPool pool = delegate.getServiceRegistry().getService( ReactiveConnectionPool.class );
ReactiveSession sessionImpl = new ReactiveSessionImpl( delegate, options, pool.getProxyConnection( tenantId ) );
return new MutinySessionImpl( sessionImpl, this );
}

@Override
public Mutiny.StatelessSession createStatelessSession() {
return createStatelessSession( getTenantIdentifier( options() ) );
}

@Override
public Mutiny.StatelessSession createStatelessSession(String tenantId) {
final SessionCreationOptions options = options();
ReactiveConnectionPool pool = delegate.getServiceRegistry().getService( ReactiveConnectionPool.class );
ReactiveStatelessSession sessionImpl = new ReactiveStatelessSessionImpl( delegate, options, pool.getProxyConnection( tenantId ) );
return new MutinyStatelessSessionImpl( sessionImpl, this );
}

@Override
public Uni<Mutiny.Session> openSession() {
SessionCreationOptions options = options();
return uni( () -> connection( getTenantIdentifier( options ) ) )
.chain( reactiveConnection -> create( reactiveConnection,
() -> new ReactiveSessionImpl( delegate, options, reactiveConnection ) ) )
.map( s -> new MutinySessionImpl(s, this) );
.chain( reactiveConnection -> create(
reactiveConnection,
() -> new ReactiveSessionImpl( delegate, options, reactiveConnection )
) )
.map( s -> new MutinySessionImpl( s, this ) );
}

@Override
public Uni<Mutiny.Session> openSession(String tenantId) {
return uni( () -> connection( tenantId ) )
.chain( reactiveConnection -> create( reactiveConnection,
() -> new ReactiveSessionImpl( delegate, options( tenantId ), reactiveConnection ) ) )
.map( s -> new MutinySessionImpl(s, this) );
.chain( reactiveConnection -> create( reactiveConnection, () -> new ReactiveSessionImpl( delegate, options( tenantId ), reactiveConnection ) ) )
.map( s -> new MutinySessionImpl( s, this ) );
}

/**
Expand All @@ -122,16 +151,20 @@ private static Uni<Void> close(ReactiveConnection connection) {
public Uni<Mutiny.StatelessSession> openStatelessSession() {
SessionCreationOptions options = options();
return uni( () -> connection( getTenantIdentifier( options ) ) )
.chain( reactiveConnection -> create( reactiveConnection,
() -> new ReactiveStatelessSessionImpl( delegate, options, reactiveConnection ) ) )
.map( s -> new MutinyStatelessSessionImpl(s, this) );
.chain( reactiveConnection -> create(
reactiveConnection,
() -> new ReactiveStatelessSessionImpl( delegate, options, reactiveConnection )
) )
.map( s -> new MutinyStatelessSessionImpl( s, this ) );
}

@Override
public Uni<Mutiny.StatelessSession> openStatelessSession(String tenantId) {
return uni( () -> connection( tenantId ) )
.chain( reactiveConnection -> create( reactiveConnection,
() -> new ReactiveStatelessSessionImpl( delegate, options( tenantId ), reactiveConnection ) ) )
.chain( reactiveConnection -> create(
reactiveConnection,
() -> new ReactiveStatelessSessionImpl( delegate, options( tenantId ), reactiveConnection )
) )
.map( s -> new MutinyStatelessSessionImpl( s, this ) );
}

Expand Down Expand Up @@ -190,8 +223,8 @@ public <T> Uni<T> withSession(String tenantId, Function<Mutiny.Session, Uni<T>>
Objects.requireNonNull( tenantId, "parameter 'tenantId' is required" );
Objects.requireNonNull( work, "parameter 'work' is required" );
Context.Key<Mutiny.Session> key = new MultitenantKey<>( contextKeyForSession, tenantId );
Mutiny.Session current = context.get(key);
if ( current!=null && current.isOpen() ) {
Mutiny.Session current = context.get( key );
if ( current != null && current.isOpen() ) {
LOG.debugf( "Reusing existing open Mutiny.Session which was found in the current Vert.x context for current tenant '%s'", tenantId );
return work.apply( current );
}
Expand Down Expand Up @@ -227,11 +260,11 @@ public <T> Uni<T> withStatelessSession(String tenantId, Function<Mutiny.Stateles
}
else {
LOG.debugf( "No existing open Mutiny.StatelessSession was found in the current Vert.x context for current tenant '%s': opening a new instance", tenantId );
return withSession( openStatelessSession( tenantId), work, key );
return withSession( openStatelessSession( tenantId ), work, key );
}
}

private<S extends Mutiny.Closeable, T> Uni<T> withSession(
private <S extends Mutiny.Closeable, T> Uni<T> withSession(
Uni<S> sessionUni,
Function<S, Uni<T>> work,
Context.Key<S> contextKey) {
Expand All @@ -246,25 +279,25 @@ private<S extends Mutiny.Closeable, T> Uni<T> withSession(
@Override
public <T> Uni<T> withTransaction(BiFunction<Mutiny.Session, Mutiny.Transaction, Uni<T>> work) {
Objects.requireNonNull( work, "parameter 'work' is required" );
return withSession( s -> s.withTransaction( t -> work.apply(s, t) ) );
return withSession( s -> s.withTransaction( t -> work.apply( s, t ) ) );
}

@Override
public <T> Uni<T> withStatelessTransaction(BiFunction<Mutiny.StatelessSession, Mutiny.Transaction, Uni<T>> work) {
Objects.requireNonNull( work, "parameter 'work' is required" );
return withStatelessSession( s -> s.withTransaction( t -> work.apply(s, t) ) );
return withStatelessSession( s -> s.withTransaction( t -> work.apply( s, t ) ) );
}

@Override
public <T> Uni<T> withTransaction(String tenantId, BiFunction<Mutiny.Session, Mutiny.Transaction, Uni<T>> work) {
Objects.requireNonNull( work, "parameter 'work' is required" );
return withSession( tenantId, s -> s.withTransaction( t -> work.apply(s, t) ) );
return withSession( tenantId, s -> s.withTransaction( t -> work.apply( s, t ) ) );
}

@Override
public <T> Uni<T> withStatelessTransaction(String tenantId, BiFunction<Mutiny.StatelessSession, Mutiny.Transaction, Uni<T>> work) {
Objects.requireNonNull( work, "parameter 'work' is required" );
return withStatelessSession( tenantId, s -> s.withTransaction( t -> work.apply(s, t) ) );
return withStatelessSession( tenantId, s -> s.withTransaction( t -> work.apply( s, t ) ) );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,25 @@
*/
@Incubating
public interface ReactiveConnectionPool extends Service {
/**
* Obtain a lazily-initializing reactive connection. The
* actual connection might be made when the returned
* instance if {@link ReactiveConnection} is first used.
*/
ReactiveConnection getProxyConnection();

/**
* Obtain a reactive connection, returning the connection
* via a {@link CompletionStage}.
* via a {@link CompletionStage} and overriding the default
* {@link SqlExceptionHelper} for the pool.
*/
CompletionStage<ReactiveConnection> getConnection();
ReactiveConnection getProxyConnection(SqlExceptionHelper sqlExceptionHelper);

/**
* Obtain a reactive connection for the given tenant id,
* returning the connection via a {@link CompletionStage}.
*/
ReactiveConnection getProxyConnection(String tenantId);

/**
* Obtain a reactive connection, returning the connection
Expand All @@ -57,6 +70,12 @@ public interface ReactiveConnectionPool extends Service {
*/
CompletionStage<ReactiveConnection> getConnection(String tenantId);

/**
* Obtain a reactive connection, returning the connection
* via a {@link CompletionStage}.
*/
CompletionStage<ReactiveConnection> getConnection();

/**
* Obtain a reactive connection for the given tenant id,
* returning the connection via a {@link CompletionStage}
Expand Down
Loading
Loading