Combining both Spring Data R2DBC and Kotlin Coroutines, we obtain a seamless and efficient way to perform non-blocking database operations. In this blog post, we explore the different options for handling non-blocking database accesses with these two powerful tools inside a reactive stack to write an end-to-end feature using Spring WebFlux, Kotlin Coroutines, and Spring Data R2DBC.
On the one hand, Spring Data R2DBC is a tool that allows developers to access databases using a reactive programming model. This can help improve an application’s performance, especially when dealing with IO-bound tasks such as database accesses and high concurrency.
On the other hand, Kotlin Coroutines is a tool for writing asynchronous and non-blocking code. They allow you to write code that is easy to read and understand while still being efficient and non-blocking.
Reactive vs. Coroutines
Reactive programming is a programming paradigm that handles asynchronous data streams and events by using data flows and propagation of change. In the context of the Spring framework, Spring WebFlux is a reactive web module that allows developers to build reactive web applications inside the Spring ecosystem.
Reactor is the fully non-blocking library of choice for Spring Reactive Web (WebFlux). It is based on the Reactive Streams specification, and it provides a Reactive API with types as Mono
or Flux
to encapsulate a stream of values that is asynchronously computed.
Nevertheless, Kotlin Coroutines provides a simple and intuitive programming model for writing non-blocking code, but in an imperative and sequential style. Moreover, Coroutines work perfectly with non-blocking frameworks such as Spring WebFlux, converting the Reactive API to suspend
functions and providing types as Flow
for data streaming.
Spring Data R2DBC
Now, let’s understand what R2DBC stands for. R2DBC is an acronym for Reactive Relational Database Connectivity. It is a specification that defines how a reactive programming model can be applied to database access. This allows developers to work with databases using a non-blocking programming model.
Spring Data R2DBC is an implementation of the R2DBC specification that is built on top of Spring Data. It provides a simple and consistent API for working with databases using the Spring ecosystem. This means that developers can use the same familiar Spring Data annotations and interfaces to work with databases, but with the added benefits of a reactive stack.
Getting started
To get started with a reactive Spring application, you must first add the appropriate dependencies to your project. If you are starting a new project from scratch, a good approach could be to create the project from Spring Initializr and choosing Spring Reactive Web (WebFlux), Spring Data R2DBC, and some driver, for example, the PostgreSQL Driver.
But this time, we will add the needed dependencies manually, also adding the required dependencies for Kotlin Coroutines and extensions for Reactor.
dependencies {
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
implementation("org.postgresql:r2dbc-postgresql")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
}
Here, we are importing Spring WebFlux, Spring Data R2DBC, R2DBC PostgreSQL Driver, and Kotlin Coroutines libraries. Also, by adding the last dependencies for Kotlin Coroutines and Reactor, we can translate from Reactor API to Coroutines imperative code.
After that, you might need a database instance. In this example, we use Docker Compose to create a Docker container with a PostgreSQL database. For that, we create a docker-compose.yml
file inside the project:
services:
postgres:
image: postgres:15.0-alpine
hostname: postgres
ports:
- 5432:5432
restart: always
environment:
POSTGRES_DB: testDb
POSTGRES_USER: someUser
POSTGRES_PASSWORD: somePassword
Then, we simply run the command docker-compose up -d
in the terminal to run it in the background.
Database configuration
Once we have the dependencies in place and the database running, it’s time to create the database configuration. We could choose to use Spring YAML Configuration, but, for this example, we will do it programmatically and using the default Postgres port (5432):
@Configuration
@EnableR2dbcRepositories
class DatabaseConfig : AbstractR2dbcConfiguration() {
@Bean
override fun connectionFactory(): ConnectionFactory =
PostgresqlConnectionFactory(
PostgresqlConnectionConfiguration.builder()
.host("localhost")
.database("testDb")
.username("someUser")
.password("somePassword")
.build()
)
}
First, we use Spring’s @Configuration
annotation that allows us to use annotations for dependency injection, while @EnableR2dbcRepositories
activates reactive relational repositories using R2DBC. The configuration class is extending AbstractR2dbcConfiguration
because we are overriding the default ConnectionFactory
with our database configuration. Finally, we mark it as a @Bean
to be injected later.
Inside the same DatabaseConfig
class, we will also create a ConnectionFactoryInitializer
for initializing the connection, injecting via constructor the previously defined ConnectionFactory
, and populating the database on the application initialization:
@Bean
fun databaseInitializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer =
ConnectionFactoryInitializer().apply {
setConnectionFactory(connectionFactory)
setDatabasePopulator(
CompositeDatabasePopulator().apply {
addPopulators(
ResourceDatabasePopulator(
ClassPathResource("sql/schema.sql"),
ClassPathResource("sql/data.sql")
)
)
}
)
}
Inside a resources/sql
folder, we define a schema.sql
file for creating the table:
CREATE TABLE IF NOT EXISTS users (
id BIGSERIAL PRIMARY KEY,
username VARCHAR NOT NULL,
email VARCHAR NOT NULL,
CONSTRAINT unique_user UNIQUE (username, email)
);
We define a data.sql
with some default test data:
INSERT INTO users (username, email) VALUES ('test', 'user@test.com') ON CONFLICT DO NOTHING;
We also define a User
data transfer object (DTO), adding the annotations for mapping our code to the database:
@Table("users")
data class User(
@Id val id: Long,
@Column("username") val username: String,
@Column("email") val email: String,
)
Repository
Now let’s explore our different options for operating with the database in a non-blocking way using Kotlin Coroutines in the Repository Layer.
DatabaseClient
DatabaseClient
is a non-blocking, reactive client for performing database calls with reactive streams back pressure, and Spring Data R2DBC provides Coroutines extensions for it.
For example:
@Repository
class UserRepository(private val client: DatabaseClient) {
suspend fun findById(id: Long): User? =
client
.sql("SELECT * FROM users WHERE id = $id")
.map { row ->
User(
row.get("id") as Long,
row.get("username") as String,
row.get("email") as String,
)
}
.awaitOneOrNull()
}
In this example, first, we define a UserRepository
class to perform database operations using DatabaseClient
. The findById
function takes an id
parameter and returns a nullable User
object. Inside the method, we use the databaseClient.sql
to execute a SQL query that selects the user from the database by its id
. After that, we use the map
method to map the result set to the User object. Finally, we use awaitOneOrNull()
to wait for the result of the operation, returning null if there is no result.
If you are familiar with the Spring reactive stack, you might notice there is no need to wrap the value inside a Mono
, but instead, findByUsername
is marked as suspend
. The suspend
keyword in Kotlin is used to indicate that it is a suspend function, and it should be called only from a coroutine or another suspend function (inside a coroutine context).
R2dbcEntityTemplate
It is also possible to use the R2dbcEntityTemplate to perform operations on entities. And, again, Spring Data R2DBC provides Coroutines extensions for it.
For example:
@Repository
class UserRepository(private val template: R2dbcEntityTemplate) {
suspend fun findById(id: Long): User? =
template.select(User::class.java)
.matching(
Query.query(
Criteria.where("id").`is`(id)
)
)
.awaitOneOrNull()
}
This time, we define a UserRepository
class, but using R2dbcEntityTemplate
for the database operations. The suspend findById
function takes an id
parameter and returns a nullable User
object. Inside the method, we use the r2dbcEntityTemplate.select(User::class.java)
to select the user from the database by its id
. The matching
method matches the value of the id
parameter to the query. Finally, we use awaitOneOrNull()
to wait for the result of the operation; it will return null if there is no result.
The main difference with the previous approach is that we are not writing SQL
code this time, and we have the advantage of matching directly against the User
DTO instead of mapping every single field one by one.
CoroutineCrudRepository
Spring Data R2DBC project includes Coroutine Repository, which exposes the non-blocking nature of data access through Kotlin’s Coroutines. To use it, define a repository extending CoroutineCrudRepository
.
For example:
interface UserRepository : CoroutineCrudRepository<User, Long> {
override suspend fun findById(id: Long): User?
}
We define a UserRepository
interface that extends CoroutineCrudRepository
. The latter interface provides several useful methods for performing CRUD operations on User
objects, such as save
, findById
, and delete
.
For this example, we wouldn’t really need to define the findById
function, as it is already part of the CoroutineCrudRepository
interface. However, we can override it or define new custom methods in case we need to do different operations with the database.
Consider the keyword suspend
; without it, or without returning a type that enables context propagation (like Flow
), the coroutine context would not be available.
The advantages of this approach are the same as using any Spring Data repository abstraction, therefore reducing the amount of boilerplate code and allowing us to run queries against the database using just keywords, but in a non-blocking manner with Coroutines.
Service
In the Service Layer, we have to define a class annotated with @Service
, inject the repository, and call it from a suspending function:
@Service
class UserService(val repository: CoroutineUserRepository) {
suspend fun findUserById(id: Long): User? =
repository.findById(id)
}
As we defined each example with the same function signature, we could choose any of them to be called from this service.
Controller
In the Controller Layer, we must define a class annotated with a @Controller
annotation. In our case, we use the specialized version @RestController
. In the constructor, we inject the service and invoke the previously defined method from a suspending function when the /users/{id}
endpoint is called:
@RestController
class UserController(private val userService: UserService) {
@GetMapping("/users/{id}")
suspend fun getUserById(@PathVariable id: String): ResponseEntity<User> {
val user = userService.findUserById(id)
return if (user != null) ResponseEntity.ok(user)
else ResponseEntity.notFound().build()
}
}
Annotating the getUserById
function with the suspend
keyword inside the @RestController
will tell WebFlux to use Coroutines instead of the Reactive API when calling the endpoint.
Running and testing the example
When using Spring WebFlux, Spring automatically configures Netty as the default server, a non-blocking I/O client-server framework.
If we run the example application, we should get in the console something like this, which tells us that the application is up and running:
2023-01-25T15:26:57.692+01:00 INFO 69644 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 65 ms. Found 1 R2DBC repository interfaces.
2023-01-25T15:26:58.399+01:00 INFO 69644 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port 8080
2023-01-25T15:26:58.403+01:00 INFO 69644 --- [ main] c.e.r2dbc.ExampleR2DBCApplicationKt : Started ExampleR2DBCApplicationKt in 1.642 seconds (process running for 2.034)
Now, if we call the endpoint, we should get something like this:
GET http://localhost:8080/users/1
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 58
{
"id": 1,
"username": "testuser",
"email": "testuser@test.com"
}
Conclusions
Both Spring Data R2DBC and Kotlin Coroutines are powerful tools for building asynchronous, non-blocking applications. While Spring Data R2DBC provides a convenient way to operate with relational databases using reactive programming, it also requires a certain mindset and approach, such as sticking to reactive chains and subscribing to values, which can take some time to get used to.
Kotlin Coroutines, on the other hand, provide a more familiar and intuitive approach to writing asynchronous code, allowing developers to use typical programming patterns and write code that looks like it is being executed sequentially.
We have only explored some basics of those concepts and shown the different ways that we can interact with the database using Spring Data R2DBC and Coroutines, demonstrating the relatively easy way to integrate it in a Spring project because libraries such as kotlinx-coroutines-reactive
and kotlinx-coroutines-reactor
allow us to convert from the Reactive API to the Coroutines API.
Also, in the future, we could examine other ways to take advantage of Coroutines in the rest of the Spring ecosystem or even other frameworks.
If you want to expand your knowledge related to this, I recommend reviewing the documentation of all the tools mentioned in the post for further details.