Models
Models in Krescent are the core components of the framework yet relatively loosely defined. In essence, they are end-user flavored event handlers with additional syntactic sugar. While there is a base EventModelBase
, you will mostly be using flavors of the ReadModelBase
and the WriteModelBase
classes, so let's take a look at them.
Read Model
A read model processes events to provide data accessibly for querying or reporting. Writing data to a target outside the read model is referred to as projection, since it reduces the event stream to a small more specialized form.
Krescent provides various ways to project data, like in memory projection to some premade collections and custom serializable data structures or projections to a database like MongoDB.
Example Read Model
class AllAvailableBooksReadModel(
collectionName: String,
database: MongoDatabase,
) : ReadModelBase(
"book.all_available_books", 1, bookstoreEventCatalog
) {
val collection by mongoCollectionProjection(
collectionName, database,
allowBatching = true
)
override suspend fun process(event: Event) {
when (event) {
// Handle specific events here
}
}
}
You can see that the read models constructor defines all dependencies that we need to start this model, excluding the event source in this case. In this case, we are using the
mongoCollectionProjection
function to automatically create the corresponding model extension and automatically register it. The projection will automatically handle the initialization and checkpoint of the projection if we choose to enable it.
val mongoDatabase = mongoClient.getDatabase("my_database")
val eventSource = KurrentEventSource(kurrentClient, "my_event_stream")
AllAvailableBooksReadModel(
collectionName = "available_books",
database = mongoDatabase
).stream(eventSource)
// Will stream until interrupted
The
stream
function will start the model and begin processing events from the event source. It will run until interrupted, processing events as they come in and updating the projection accordingly.
val mongoCheckpointStorage = MongoCheckpointStorage(mongoDatabase)
AllAvailableBooksReadModel(
collectionName = "available_books",
database = mongoDatabase
).withConfiguration {
useCheckpoints(
mongoCheckpointStorage,
FixedTimeRateCheckpointStrategy(10.toDuration(DurationUnit.SECONDS))
)
}.stream(eventSource)
We can perform additional configuration before streaming the model. In this case, we are enabling checkpoints using the
MongoCheckpointStorage
and a fixed time rate checkpoint strategy that will create a snapshot if the last event is older than 10 seconds.
NOTE
Configuration done outside the class should usually only change behavior not directly relevant to the model's logic. You may setup transaction support, checkpointing, logging or similar features here when possible.
Write Model
Similar to read models, write models also process events to create a state, but they are mostly focused on a single entity or a small set of entities instead of whole domains. They are generally also not continuous, being only materialized when needed for a write operation and mostly also not checkpointed.
Usually, write models establish transactional boundaries in one way or another so that writes do not interfere with each other. Krescent currently only supports fine-grained pessimistic locking. Locking is normally done on the entity id or related entity ids, similar to Dynamic Consistency Boundaries.
Example Write Model
class BookWriteModel(
val bookId: String,
val lockProvider: KrescentLockProvider,
) : WriteModelBase("book_write", 1, bookstoreEventCatalog, configure = {
// Configure here directly to save on some boilerplate
useTransaction(lockProvider, "book-$bookId")
}) {
var book: BookState? = null
var available: Int = 0
override suspend fun process(event: Event) {
if (event !is BookEvent || event.bookId != bookId) return
when (event) {
// Handle replaying the state from events
}
}
suspend fun lend(userId: String) {
if (available <= 0) error("No book available to be lent.")
emitEvent(BookLentEvent(bookId, userId, "now"))
available--
}
suspend fun returnBook(userId: String) {
emitEvent(BookReturnedEvent(bookId, userId, "now"))
available++
}
}
Just as with the read model, we extend the
EventModelBase
using theWriteModelBase
flavor and define our state variables and event processing logic. But additionally, we also define methods to alter the state of the current model. To configure transactional boundaries, we use the configuration block in the inherited constructor to separate configuration and model logic a bit clearer. We could also configure this externally or using the openconfigure
function of theEventModelBase
.
val lockProvider = LocalSharedLockProvider()
val eventSource = KurrentEventSource(kurrentClient, "my_event_stream")
BookWriteModel("1", lockProvider).withSource(eventSource) handles {
lend("250cd9d5-2f6b-4831-bb2e-ed9a46e9cc08")
}
We now attach the event source to the model using the
withSource
builder function and then call thehandles
infix function to execute the model build. This will do a transactional catchup replay from the event source and then execute the specified block while still in transaction. We use this scope to execute thelend
method to create a new event and emit it to the event source. After this operation, the events will be added to the event source and the transaction will be finished, freeing the lock that has been acquired.
Reducing Models
Krescent also provides a reduce-style syntax for models, which automatically adds serialization for checkpointing and enforces clearly visible state changes.
Example Reducing Read Model
class BooksAvailableModel() : ReducingReadModel<BooksAvailableModel.State>(
"book.available", 1, bookstoreEventCatalog
) {
override val initialState: State
get() = State()
override suspend fun reduce(state: State, event: Event) = when (event) {
is BookAddedEvent -> state.copy(
available = state.available + (event.bookId to event.copies)
)
// Other events can be handled here
else -> state
}
@Serializable
data class State(
val available: Map<String, Int> = emptyMap(),
)
}
Example Reducing Write Model
class ReducingBookWriteModel(
val bookId: String,
val lockProvider: KrescentLockProvider,
) : ReducingWriteModel<ReducingBookWriteModel.State>(
"test", 1, bookstoreEventCatalog,
configure = { useTransaction(lockProvider, "book-$bookId") }
) {
override val initialState: State
get() = State()
override suspend fun reduce(state: State, event: Event): State {
if (event !is BookEvent || event.bookId != bookId) return state
return when (event) {
is BookAddedEvent -> state.copy(
book = BookState(
title = event.title, author = event.author,
price = event.price, copies = event.copies
), available = event.copies
)
// Handle other events here
is BookLentEvent -> state.copy(available = state.available - 1)
is BookReturnedEvent -> state.copy(available = state.available + 1)
else -> state
}
}
suspend fun lend(userId: String) = useState {
if (canBeLent(1)) error("No book available to be lent.")
emitEvent(BookLentEvent(bookId, userId, "2025-1-1"))
// Use .push() to update the internal state of the model
state.copy(available = state.available - 1).push()
}
// Use useState() to access the current state of the model as "state"
fun canBeLent(count: Int): Boolean = useState {
state.available >= count
}
@Serializable
data class State(
val book: BookState? = null,
val available: Int = 0,
)
}