Catch-up subscriptions
Catch-up subscriptions
Subscriptions allow you to subscribe to a stream and receive notifications about new events added to the stream.
You provide an event handler and an optional starting point to the subscription. The handler is called for each event from the starting point onward.
If events already exist, the handler will be called for each event one by one until it reaches the end of the stream. The server will then notify the handler whenever a new event appears.
Tips
Check the Getting Started guide to learn how to configure and use the client SDK.
Subscribing from the start
If you need to process all the events in the store, including historical events, you'll need to subscribe from the beginning. You can either subscribe to receive events from a single stream or subscribe to $all
if you need to process all events in the database.
Subscribing to a stream
The simplest stream subscription looks like the following :
subscription = client.subscribe_to_stream(stream_name)
for event in subscription:
print(f"received event: {event.stream_position} {event.type}")
# do something with the event
handle_event(event)
const subscription = client.subscribeToStream("some-stream");
for await (const resolvedEvent of subscription) {
console.log(
`Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
);
await handleEvent(resolvedEvent);
}
const subscription =
client.subscribeToStream<SomeStreamEvents>("some-stream");
for await (const resolvedEvent of subscription) {
console.log(
`Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
);
await handleEvent(resolvedEvent);
}
SubscriptionListener listener = new SubscriptionListener() {
@Override
public void onEvent(Subscription subscription, ResolvedEvent event) {
System.out.println("Received event"
+ event.getOriginalEvent().getRevision()
+ "@" + event.getOriginalEvent().getStreamId());
HandleEvent(event);
}
};
client.subscribeToStream("some-stream", listener);
await using var subscription = client.SubscribeToStream(
"some-stream",
FromStream.Start,
cancellationToken: ct);
await foreach (var message in subscription.Messages.WithCancellation(ct)) {
switch (message) {
case StreamMessage.Event(var evnt):
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
await HandleEvent(evnt);
break;
}
}
stream, err := db.SubscribeToStream(context.Background(), "some-stream", kurrentdb.SubscribeToStreamOptions{})
if err != nil {
panic(err)
}
defer stream.Close()
for {
event := stream.Recv()
if event.EventAppeared != nil {
// handles the event...
}
if event.SubscriptionDropped != nil {
break
}
}
let mut stream = client
.subscribe_to_stream("some-stream", &Default::default())
.await;
loop {
let event = stream.next().await?;
// Handles the event...
}
The provided handler will be called for every event in the stream.
When you subscribe to a stream with link events, for example the $ce
category stream, you need to set resolveLinkTos
to true
. Read more about it below.
Subscribing to $all
Subscribing to $all
is similar to subscribing to a single stream. The handler will be called for every event appended after the starting position.
subscription = client.subscribe_to_all(from_end=True)
# Append some events
event = client.append_to_stream(
stream_name=stream_name,
events=[NewEvent(type="MyEventType", data=b"")],
current_version=StreamState.ANY,
)
for event in subscription:
print(f"received event: {event.stream_position} {event.type}")
# do something with the event
handle_event(event)
const subscription = client.subscribeToAll();
for await (const resolvedEvent of subscription) {
console.log(
`Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
);
await handleEvent(resolvedEvent);
}
const subscription = client.subscribeToAll();
for await (const resolvedEvent of subscription) {
console.log(
`Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
);
await handleEvent(resolvedEvent);
}
SubscriptionListener listener = new SubscriptionListener() {
@Override
public void onEvent(Subscription subscription, ResolvedEvent event) {
System.out.println("Received event"
+ event.getOriginalEvent().getRevision()
+ "@" + event.getOriginalEvent().getStreamId());
HandleEvent(event);
}
};
client.subscribeToAll(listener);
await using var subscription = client.SubscribeToAll(
FromAll.Start,
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case StreamMessage.Event(var evnt):
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
await HandleEvent(evnt);
break;
}
}
stream, err := db.SubscribeToAll(context.Background(), kurrentdb.SubscribeToAllOptions{})
if err != nil {
panic(err)
}
defer stream.Close()
for {
event := stream.Recv()
if event.EventAppeared != nil {
// handles the event...
}
if event.SubscriptionDropped != nil {
break
}
}
let mut stream = client.subscribe_to_all(&Default::default()).await;
loop {
let event = stream.next().await?;
// Handles the event...
}
Subscribing from a specific position
The previous examples subscribed to the stream from the beginning. That subscription invoked the handler for every event in the stream before waiting for new events.
Both stream and $all subscriptions accept a starting position if you want to read from a specific point onward. If events already exist at the position you subscribe to, they will be read on the server side and sent to the subscription.
Once caught up, the server will push any new events received on the streams to the client. There is no difference between catching up and live on the client side.
Warning
The positions provided to the subscriptions are exclusive. You will only receive the next event after the subscribed position.
Subscribing to a stream
To subscribe to a stream from a specific position, you must provide a stream position. This can be Start
, End
or a big int (unsigned 64 bit integer) position.
The following subscribes to the stream some-stream
at position 20
, this means that events 21
and onward will be handled:
subscription = client.subscribe_to_stream(
stream_name=stream_name,
stream_position=20,
)
const subscription = client.subscribeToStream("some-stream", {
fromRevision: BigInt(20),
});
const subscription = client.subscribeToStream<SomeStreamEvents>(
"some-stream",
{
fromRevision: BigInt(20),
}
);
client.subscribeToStream(
"some-stream",
listener,
SubscribeToStreamOptions.get()
.fromRevision(20)
);
await using var subscription = client.SubscribeToStream(
"some-stream",
FromStream.After(StreamPosition.FromInt64(20)),
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case StreamMessage.Event(var evnt):
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
await HandleEvent(evnt);
break;
}
}
db.SubscribeToStream(context.Background(), "some-stream", kurrentdb.SubscribeToStreamOptions{
From: kurrentdb.Revision(20),
})
let options = SubscribeToStreamOptions::default().start_from(StreamPosition::Position(20));
client.subscribe_to_stream("some-stream", &options).await;
Subscribing to $all
Subscribing to the $all
stream is similar to subscribing to a regular stream. The difference is how to specify the starting position. For the $all
stream, provide a Position
structure that consists of two big integers: the prepare and commit positions. Use Start
, End
, or create a Position
from specific commit and prepare values.
The corresponding $all
subscription will subscribe from the event after the one at commit position 1056
and prepare position 1056
.
Please note that this position will need to be a legitimate position in $all
.
subscription = client.subscribe_to_all(
commit_position=1_056,
)
const subscription = client.subscribeToAll({
fromPosition: {
commit: BigInt(1056),
prepare: BigInt(1056),
},
});
const subscription = client.subscribeToAll({
fromPosition: {
commit: BigInt(1056),
prepare: BigInt(1056),
},
});
client.subscribeToAll(
listener,
SubscribeToAllOptions.get()
.fromPosition(new Position(1056, 1056))
);
var result = await client.AppendToStreamAsync(
"subscribe-to-all-from-position",
StreamState.NoStream,
new[] {
new EventData(Uuid.NewUuid(), "-", ReadOnlyMemory<byte>.Empty)
});
await using var subscription = client.SubscribeToAll(
FromAll.After(result.LogPosition),
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case StreamMessage.Event(var evnt):
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
await HandleEvent(evnt);
break;
}
}
db.SubscribeToAll(context.Background(), kurrentdb.SubscribeToAllOptions{
From: kurrentdb.Position{
Commit: 1_056,
Prepare: 1_056,
},
})
let options = SubscribeToAllOptions::default().position(StreamPosition::Position(Position {
commit: 1_056,
prepare: 1_056,
}));
client.subscribe_to_all(&options).await;
Subscribing to a stream for live updates
You can subscribe to a stream to get live updates by subscribing to the end of the stream:
subscription = client.subscribe_to_stream(
stream_name=stream_name,
from_end=True,
)
const subscription = client.subscribeToStream("some-stream", {
fromRevision: END,
});
const subscription = client.subscribeToStream<SomeStreamEvents>(
"some-stream",
{
fromRevision: END,
}
);
client.subscribeToStream(
"some-stream",
listener,
SubscribeToStreamOptions.get()
.fromEnd()
);
await using var subscription = client.SubscribeToStream(
"some-stream",
FromStream.End,
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case StreamMessage.Event(var evnt):
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
await HandleEvent(evnt);
break;
}
}
options = kurrentdb.SubscribeToStreamOptions{
From: kurrentdb.End{},
}
db.SubscribeToStream(context.Background(), "some-stream", options)
let options = SubscribeToStreamOptions::default().start_from(StreamPosition::End);
client.subscribe_to_stream("some-stream", &options).await;
And the same works with $all
:
subscription = client.subscribe_to_all(from_end=True)
# Append some events
commit_position = client.append_to_stream(
stream_name,
events=NewEvent(type="MyEventType", data=b""),
current_version=StreamState.ANY,
)
# Retrieve the event we just appended from our subscription.
next_event = next(subscription)
const subscription = client.subscribeToAll({
fromPosition: END,
});
const subscription = client.subscribeToAll({
fromPosition: END,
});
client.subscribeToAll(
listener,
SubscribeToAllOptions.get()
.fromEnd()
);
var subscription = client.SubscribeToAll(
FromAll.End,
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case StreamMessage.Event(var evnt):
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
await HandleEvent(evnt);
break;
}
}
db.SubscribeToAll(context.Background(), kurrentdb.SubscribeToAllOptions{
From: kurrentdb.End{},
})
let options = SubscribeToAllOptions::default().position(StreamPosition::End);
client.subscribe_to_all(&options).await;
This will not read through the history of the stream but will notify the handler when a new event appears in the respective stream.
Keep in mind that when you subscribe to a stream from a specific position, as described above, you will also get live updates after your subscription catches up (processes all the historical events).
Resolving link-to's
Link-to events point to events in other streams in KurrentDB. These are generally created by projections such as the $by_event_type
projection which links events of the same event type into the same stream. This makes it easier to look up all events of a specific type.
Tips
Filtered subscriptions make it easier and faster to subscribe to all events of a specific type or matching a prefix.
When reading a stream you can specify whether to resolve link-to's. By default, link-to events are not resolved. You can change this behaviour by setting the resolveLinkTos
parameter to true
:
subscription = client.subscribe_to_stream(
stream_name="$et-MyEventType",
stream_position=0,
resolve_links=True,
)
const subscription = client.subscribeToStream("$et-myEventType", {
fromRevision: START,
resolveLinkTos: true,
});
const subscription = client.subscribeToStream<SomeStreamEvents>(
"$et-myEventType",
{
fromRevision: START,
resolveLinkTos: true,
}
);
client.subscribeToStream(
"$et-myEventType",
listener,
SubscribeToStreamOptions.get()
.fromStart()
.resolveLinkTos()
);
await using var subscription = client.SubscribeToStream(
"$et-myEventType",
FromStream.Start,
true,
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case StreamMessage.Event(var evnt):
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
await HandleEvent(evnt);
break;
}
}
options = kurrentdb.SubscribeToStreamOptions{
From: kurrentdb.Start{},
ResolveLinkTos: true,
}
db.SubscribeToStream(context.Background(), "$et-myEventType", options)
let options = SubscribeToStreamOptions::default()
.start_from(StreamPosition::Start)
.resolve_link_tos();
client
.subscribe_to_stream("$et-myEventType", &options)
.await;
Dropped subscriptions
When a subscription stops or experiences an error, it will be dropped. The subscription provides a subscriptionDropped
callback, which will get called when the subscription breaks.
The subscriptionDropped
callback allows you to inspect the reason why the subscription dropped, as well as any exceptions that occurred.
The possible reasons for a subscription to drop are:
Reason | Why it might happen |
---|---|
Disposed | The client canceled or disposed of the subscription. |
SubscriberError | An error occurred while handling an event in the subscription handler. |
ServerError | An error occurred on the server, and the server closed the subscription. Check the server logs for more information. |
Bear in mind that a subscription can also drop because it is slow. The server tried to push all the live events to the subscription when it is in the live processing mode. If the subscription gets the reading buffer overflow and won't be able to acknowledge the buffer, it will break.
Handling subscription drops
An application, which hosts the subscription, can go offline for some time for different reasons. It could be a crash, infrastructure failure, or a new version deployment. As you rarely would want to reprocess all the events again, you'd need to store the current position of the subscription somewhere, and then use it to restore the subscription from the point where it dropped off:
# get last recorded stream position
last_stream_position = 0
while True:
subscription = client.subscribe_to_stream(
stream_name=stream_name,
stream_position=last_stream_position,
)
try:
for event in subscription:
# remember stream position
last_stream_position = event.stream_position
# record stream position
handle_event(event)
except ConsumerTooSlow:
# subscription was dropped
continue
let checkpoint = START;
const subscription = client
.subscribeToStream("some-stream", {
fromRevision: checkpoint,
})
.on("data", (resolvedEvent) => {
handleEvent(resolvedEvent);
checkpoint = resolvedEvent.event?.revision ?? checkpoint;
});
let checkpoint: ReadRevision = START;
const subscription = client
.subscribeToStream<SomeStreamEvents>("some-stream", {
fromRevision: checkpoint,
})
.on("data", (resolvedEvent) => {
handleEvent(resolvedEvent);
checkpoint = resolvedEvent.event?.revision ?? checkpoint;
});
client.subscribeToStream(
"some-stream",
new SubscriptionListener() {
StreamPosition<Long> checkpoint = StreamPosition.start();
@Override
public void onEvent(Subscription subscription, ResolvedEvent event) {
HandleEvent(event);
checkpoint = StreamPosition.position(event.getOriginalEvent().getRevision());
}
@Override
public void onCancelled(Subscription subscription, Throwable exception) {
// Subscription was dropped by the user.
if (exception == null)
return;
System.out.println("Subscription was dropped due to " + exception.getMessage());
Resubscribe(checkpoint);
}
},
SubscribeToStreamOptions.get()
.fromStart()
);
var checkpoint = await ReadStreamCheckpointAsync() switch {
null => FromStream.Start,
var position => FromStream.After(position.Value)
};
Subscribe:
try {
await using var subscription = client.SubscribeToStream(
"some-stream",
checkpoint,
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case StreamMessage.Event(var evnt):
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
await HandleEvent(evnt);
checkpoint = FromStream.After(evnt.OriginalEventNumber);
break;
}
}
} catch (OperationCanceledException) {
Console.WriteLine($"Subscription was canceled.");
} catch (ObjectDisposedException) {
Console.WriteLine($"Subscription was canceled by the user.");
} catch (Exception ex) {
Console.WriteLine($"Subscription was dropped: {ex}");
goto Subscribe;
}
options = kurrentdb.SubscribeToStreamOptions{
From: kurrentdb.Start{},
}
for {
stream, err := db.SubscribeToStream(context.Background(), "some-stream", options)
if err != nil {
time.Sleep(1 * time.Second)
continue
}
for {
event := stream.Recv()
if event.SubscriptionDropped != nil {
stream.Close()
break
}
if event.EventAppeared != nil {
// handles the event...
options.From = kurrentdb.Revision(event.EventAppeared.OriginalEvent().EventNumber)
}
}
}
let retry = RetryOptions::default().retry_forever();
let options = SubscribeToStreamOptions::default().retry_options(retry);
let mut stream = client.subscribe_to_stream("some-stream", &options).await;
loop {
let event = stream.next().await?;
// Handles the event...
}
When subscribed to $all
you want to keep the event's position in the $all
stream. As mentioned previously, the $all
stream position consists of two big integers (prepare and commit positions), not one:
# get last recorded commit position
last_commit_position = 0
while True:
subscription = client.subscribe_to_all(
commit_position=last_commit_position,
)
try:
for event in subscription:
# remember commit position
last_commit_position = event.commit_position
# record commit position
handle_event(event)
except ConsumerTooSlow:
# subscription was dropped
continue
let checkpoint = START;
const subscription = client
.subscribeToAll({
fromPosition: checkpoint,
})
.on("data", (resolvedEvent) => {
handleEvent(resolvedEvent);
checkpoint = resolvedEvent.event?.position ?? checkpoint;
});
let checkpoint: ReadPosition = START;
const subscription = client
.subscribeToAll({
fromPosition: checkpoint,
})
.on("data", (resolvedEvent) => {
handleEvent(resolvedEvent);
checkpoint = resolvedEvent.event?.position ?? checkpoint;
});
client.subscribeToAll(
new SubscriptionListener() {
StreamPosition<Position> checkpoint = StreamPosition.start();
@Override
public void onEvent(Subscription subscription, ResolvedEvent event) {
HandleEvent(event);
checkpoint = StreamPosition.position(event.getOriginalEvent().getPosition());
}
@Override
public void onCancelled(Subscription subscription, Throwable exception) {
// Subscription was dropped by the user.
if (exception == null)
return;
System.out.println("Subscription was dropped due to " + exception.getMessage());
Resubscribe(checkpoint);
}
},
SubscribeToAllOptions.get()
.fromStart()
);
var checkpoint = await ReadCheckpointAsync() switch {
null => FromAll.Start,
var position => FromAll.After(position.Value)
};
Subscribe:
try {
await using var subscription = client.SubscribeToAll(
checkpoint,
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case StreamMessage.Event(var evnt):
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
await HandleEvent(evnt);
if (evnt.OriginalPosition is not null) {
checkpoint = FromAll.After(evnt.OriginalPosition.Value);
}
break;
}
}
} catch (OperationCanceledException) {
Console.WriteLine($"Subscription was canceled.");
} catch (ObjectDisposedException) {
Console.WriteLine($"Subscription was canceled by the user.");
} catch (Exception ex) {
Console.WriteLine($"Subscription was dropped: {ex}");
goto Subscribe;
}
options = kurrentdb.SubscribeToAllOptions{
From: kurrentdb.Start{},
}
for {
stream, err := db.SubscribeToAll(context.Background(), options)
if err != nil {
time.Sleep(1 * time.Second)
continue
}
for {
event := stream.Recv()
if event.SubscriptionDropped != nil {
stream.Close()
break
}
if event.EventAppeared != nil {
// handles the event...
options.From = event.EventAppeared.OriginalEvent().Position
}
}
}
let retry = RetryOptions::default().retry_forever();
let options = SubscribeToAllOptions::default().retry_options(retry);
let mut stream = client.subscribe_to_all(&options).await;
loop {
let event = stream.next().await?;
// Handles the event...
}
User credentials
The user creating a subscription must have read access to the stream it's subscribing to, and only admin users may subscribe to $all
or create filtered subscriptions.
The code below shows how you can provide user credentials for a subscription. When you specify subscription credentials explicitly, it will override the default credentials set for the client. If you don't specify any credentials, the client will use the credentials specified for the client, if you specified those.
credentials = client.construct_call_credentials(
username="admin",
password="changeit",
)
subscription = client.subscribe_to_all(
credentials=credentials,
)
const subscription = client.subscribeToStream("some-stream", {
credentials: {
username: "admin",
password: "changeit",
},
});
const subscription = client.subscribeToStream<SomeStreamEvents>(
"some-stream",
{
credentials: {
username: "admin",
password: "changeit",
},
}
);
UserCredentials credentials = new UserCredentials("admin", "changeit");
SubscribeToAllOptions options = SubscribeToAllOptions.get()
.authenticated(credentials);
client.subscribeToAll(
listener,
options);
await using var subscription = client.SubscribeToAll(
FromAll.Start,
userCredentials: new UserCredentials("admin", "changeit"),
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case StreamMessage.Event(var evnt):
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
await HandleEvent(evnt);
break;
}
}
db.SubscribeToAll(context.Background(), kurrentdb.SubscribeToAllOptions{
Authenticated: &kurrentdb.Credentials{
Login: "admin",
Password: "changeit",
},
})
let options =
SubscribeToAllOptions::default().authenticated(Credentials::new("admin", "changeit"));
client.subscribe_to_all(&options).await;
Server-side filtering
KurrentDB allows you to filter the events whilst subscribing to the $all
stream to only receive the events you care about.
You can filter by event type or stream name using a regular expression or a prefix. Server-side filtering is currently only available on the $all
stream.
Tips
Server-side filtering was introduced as a simpler alternative to projections. You should consider filtering before creating a projection to include the events you care about.
A simple stream prefix filter looks like this:
client.subscribe_to_all(
filter_include=[r"test-\w+"],
filter_by_stream_name=True,
)
const subscription = client.subscribeToAll({
filter: streamNameFilter({ prefixes: ["test-", "other-"] }),
});
const subscription = client.subscribeToAll({
filter: streamNameFilter({ prefixes: ["test-", "other-"] }),
});
SubscriptionFilter filter = SubscriptionFilter.newBuilder()
.addStreamNamePrefix("test-")
.build();
SubscribeToAllOptions options = SubscribeToAllOptions.get()
.filter(filter);
client.subscribeToAll(
listener,
options);
var prefixStreamFilter = new SubscriptionFilterOptions(StreamFilter.Prefix("test-", "other-"));
await using var subscription = client.SubscribeToAll(
FromAll.Start,
filterOptions: prefixStreamFilter,
cancellationToken: ct);
await foreach (var message in subscription.Messages) {
switch (message) {
case StreamMessage.Event(var evnt):
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
await HandleEvent(evnt);
break;
case StreamMessage.AllStreamCheckpointReached(var position):
Console.WriteLine($"Checkpoint reached: {position}");
break;
}
}
db.SubscribeToAll(context.Background(), kurrentdb.SubscribeToAllOptions{
Filter: &kurrentdb.SubscriptionFilter{
Type: kurrentdb.StreamFilterType,
Prefixes: []string{"test-"},
},
})
let filter = SubscriptionFilter::on_stream_name().add_prefix("test-");
let options = SubscribeToAllOptions::default().filter(filter);
client.subscribe_to_all(&options).await;
The filtering API is described more in-depth in the filtering section.
Filtering out system events
There are events in KurrentDB called system events. These are prefixed with a $
and under most circumstances you won't care about these. They can be filtered out by passing in a SubscriptionFilterOptions
when subscribing to the $all
stream.
subscription = client.subscribe_to_all(
filter_exclude=[KDB_SYSTEM_EVENTS_REGEX]
)
for event in subscription:
print("Received event:", event.stream_position, event.type)
break
const subscription = client
.subscribeToAll({
fromPosition: START,
filter: excludeSystemEvents(),
})
.on("data", (resolvedEvent) => {
console.log(
`Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
);
});
const subscription = client
.subscribeToAll({
fromPosition: START,
filter: excludeSystemEvents(),
})
.on("data", (resolvedEvent) => {
console.log(
`Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
);
});
SubscriptionListener listener = new SubscriptionListener() {
@Override
public void onEvent(Subscription subscription, ResolvedEvent event) {
System.out.println("Received event"
+ event.getOriginalEvent().getRevision()
+ "@" + event.getOriginalEvent().getStreamId());
}
};
String excludeSystemEventsRegex = "^[^\\$].*";
SubscriptionFilter filter = SubscriptionFilter.newBuilder()
.withEventTypeRegularExpression(excludeSystemEventsRegex)
.build();
SubscribeToAllOptions options = SubscribeToAllOptions.get()
.filter(filter);
client.subscribeToAll(
listener,
options
);
await using var subscription = client.SubscribeToAll(
FromAll.Start,
filterOptions: new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents()));
await foreach (var message in subscription.Messages) {
switch (message) {
case StreamMessage.Event(var e):
Console.WriteLine($"{e.Event.EventType} @ {e.Event.Position.CommitPosition}");
break;
}
}
sub, err := db.SubscribeToAll(context.Background(), kurrentdb.SubscribeToAllOptions{
Filter: kurrentdb.ExcludeSystemEventsFilter(),
})
if err != nil {
panic(err)
}
defer sub.Close()
for {
event := sub.Recv()
if event.EventAppeared != nil {
streamId := event.EventAppeared.OriginalEvent().StreamID
revision := event.EventAppeared.OriginalEvent().EventNumber
fmt.Printf("received event %v@%v", revision, streamId)
}
if event.SubscriptionDropped != nil {
break
}
}
let filter = SubscriptionFilter::on_event_type().exclude_system_events();
let options = SubscribeToAllOptions::default().filter(filter);
let mut sub = client.subscribe_to_all(&options).await;
loop {
let event = sub.next().await?;
let stream_id = event.get_original_stream_id();
let revision = event.get_original_event().revision;
println!("Received event {}@{}", revision, stream_id);
}
Tips
$stats
events are no longer stored in KurrentDB by default so there won't be as many $
events as before.
Filtering by event type
If you only want to subscribe to events of a given type, there are two options. You can either use a regular expression or a prefix.
Filtering by prefix
If you want to filter by prefix, pass in a SubscriptionFilterOptions
to the subscription with an EventTypeFilter.Prefix
.
subscription = client.subscribe_to_all(
filter_include=[r"customer-.*"],
)
for event in subscription:
print(f"received event: {event.stream_position} {event.type}")
# do something with the event
handle_event(event)
const filter = eventTypeFilter({
prefixes: ["customer-"],
});
const filter = eventTypeFilter({
prefixes: ["customer-"],
});
SubscriptionFilter filter = SubscriptionFilter.newBuilder()
.addEventTypePrefix("customer-")
.build();
var filterOptions = new SubscriptionFilterOptions(EventTypeFilter.Prefix("customer-"));
sub, err := db.SubscribeToAll(context.Background(), kurrentdb.SubscribeToAllOptions{
Filter: &kurrentdb.SubscriptionFilter{
Type: kurrentdb.EventFilterType,
Prefixes: []string{"customer-"},
},
})
if err != nil {
panic(err)
}
defer sub.Close()
for {
event := sub.Recv()
if event.EventAppeared != nil {
streamId := event.EventAppeared.OriginalEvent().StreamID
revision := event.EventAppeared.OriginalEvent().EventNumber
fmt.Printf("received event %v@%v", revision, streamId)
}
if event.SubscriptionDropped != nil {
break
}
}
let filter = SubscriptionFilter::on_event_type().add_prefix("customer-");
let options = SubscribeToAllOptions::default().filter(filter);
let mut sub = client.subscribe_to_all(&options).await;
This will only subscribe to events with a type that begin with customer-
.
Filtering by regular expression
It might be advantageous to provide a regular expression when you want to subscribe to multiple event types.
subscription = client.subscribe_to_all(
filter_by_stream_name=False,
filter_include=["user.*", "company.*"],
)
for event in subscription:
print(f"received event: {event.stream_position} {event.type}")
# do something with the event
handle_event(event)
const filter = eventTypeFilter({
regex: "^user|^company",
});
const filter = eventTypeFilter({
regex: "^user|^company",
});
SubscriptionFilter filter = SubscriptionFilter.newBuilder()
.withEventTypeRegularExpression("^user|^company")
.build();
var filterOptions = new SubscriptionFilterOptions(EventTypeFilter.RegularExpression("^user|^company"));
sub, err := db.SubscribeToAll(context.Background(), kurrentdb.SubscribeToAllOptions{
Filter: &kurrentdb.SubscriptionFilter{
Type: kurrentdb.EventFilterType,
Regex: "^user|^company",
},
})
if err != nil {
panic(err)
}
defer sub.Close()
for {
event := sub.Recv()
if event.EventAppeared != nil {
streamId := event.EventAppeared.OriginalEvent().StreamID
revision := event.EventAppeared.OriginalEvent().EventNumber
fmt.Printf("received event %v@%v", revision, streamId)
}
if event.SubscriptionDropped != nil {
break
}
}
let filter = SubscriptionFilter::on_event_type().regex("^user|^company");
let options = SubscribeToAllOptions::default().filter(filter);
let mut sub = client.subscribe_to_all(&options).await;
This will subscribe to any event that begins with user
or company
.
Filtering by stream name
To subscribe to a stream by name, choose either a regular expression or a prefix.
Filtering by prefix
If you want to filter by prefix, pass in a SubscriptionFilterOptions
to the subscription with an StreamFilter.Prefix
.
subscription = client.subscribe_to_all(
filter_by_stream_name=True,
filter_include=[r"user-.*"],
)
for event in subscription:
print(f"received event: {event.stream_position} {event.type}")
# do something with the event
handle_event(event)
const filter = streamNameFilter({
prefixes: ["user-"],
});
const filter = streamNameFilter({
prefixes: ["user-"],
});
SubscriptionFilter filter = SubscriptionFilter.newBuilder()
.addStreamNamePrefix("user-")
.build();
var filterOptions = new SubscriptionFilterOptions(StreamFilter.Prefix("user-"));
sub, err := db.SubscribeToAll(context.Background(), kurrentdb.SubscribeToAllOptions{
Filter: &kurrentdb.SubscriptionFilter{
Type: kurrentdb.StreamFilterType,
Prefixes: []string{"user-"},
},
})
if err != nil {
panic(err)
}
defer sub.Close()
for {
event := sub.Recv()
if event.EventAppeared != nil {
streamId := event.EventAppeared.OriginalEvent().StreamID
revision := event.EventAppeared.OriginalEvent().EventNumber
fmt.Printf("received event %v@%v", revision, streamId)
}
if event.SubscriptionDropped != nil {
break
}
}
let filter = SubscriptionFilter::on_stream_name().add_prefix("user-");
let options = SubscribeToAllOptions::default().filter(filter);
let mut sub = client.subscribe_to_all(&options).await;
This will only subscribe to all streams with a name that begins with user-
.
Filtering by regular expression
To subscribe to multiple streams, use a regular expression.
subscription = client.subscribe_to_all(
filter_by_stream_name=True,
filter_include=["account.*", "savings.*"],
)
for event in subscription:
# do something with the event
handle_event(event)
const filter = streamNameFilter({
regex: "^account|^savings",
});
const filter = streamNameFilter({
regex: "^account|^savings",
});
SubscriptionFilter filter = SubscriptionFilter.newBuilder()
.withStreamNameRegularExpression("^account|^savings")
.build();
var filterOptions = new SubscriptionFilterOptions(StreamFilter.RegularExpression("^account|^savings"));
sub, err := db.SubscribeToAll(context.Background(), kurrentdb.SubscribeToAllOptions{
Filter: &kurrentdb.SubscriptionFilter{
Type: kurrentdb.StreamFilterType,
Regex: "^user|^company",
},
})
if err != nil {
panic(err)
}
defer sub.Close()
for {
event := sub.Recv()
if event.EventAppeared != nil {
streamId := event.EventAppeared.OriginalEvent().StreamID
revision := event.EventAppeared.OriginalEvent().EventNumber
fmt.Printf("received event %v@%v", revision, streamId)
}
if event.SubscriptionDropped != nil {
break
}
}
let filter = SubscriptionFilter::on_event_type().regex("/^[^\\$].*/");
let options = SubscribeToAllOptions::default().filter(filter);
let mut sub = client.subscribe_to_all(&options).await;
This will subscribe to any stream with a name that begins with account
or savings
.
Checkpointing
When a catch-up subscription is used to process an $all
stream containing many events, the last thing you want is for your application to crash midway, forcing you to restart from the beginning.
What is a checkpoint?
A checkpoint is the position of an event in the $all
stream to which your application has processed. By saving this position to a persistent store (e.g., a database), it allows your catch-up subscription to:
- Recover from crashes by reading the checkpoint and resuming from that position
- Avoid reprocessing all events from the start
To create a checkpoint, store the event's commit or prepare position.
Warning
If your database contains events created by the legacy TCP client using the transaction feature, you should store both the commit and prepare positions together as your checkpoint.
Updating checkpoints at regular intervals
The client SDK provides a way to notify your application after processing a configurable number of events. This allows you to periodically save a checkpoint at regular intervals.
# get last recorded commit position
last_commit_position = 0
while True:
subscription = client.subscribe_to_all(
commit_position=last_commit_position,
filter_by_stream_name=True,
filter_include=["account.*", "savings.*"],
include_checkpoints=True,
)
try:
for received in subscription:
last_commit_position = received.commit_position
# checkpoints are like events but only have a commit position
if isinstance(received, Checkpoint):
print("We got a checkpoint!")
else:
print("We got an event!")
# record commit position
handle_event(received)
except ConsumerTooSlow:
# subscription was dropped
continue
import {
START,
KurrentDBClient,
excludeSystemEvents,
eventTypeFilter,
streamNameFilter,
} from "@kurrent/kurrentdb-client";
import { createTestNode, jsonTestEvents } from "@test-utils";
describe("[sample] server-side-filtering", () => {
const log = console.log;
const node = createTestNode();
let client;
beforeAll(async () => {
await node.up();
client = KurrentDBClient.connectionString(node.connectionString());
await client.appendToStream("some-stream", jsonTestEvents());
console.log = jest.fn();
});
afterAll(async () => {
console.log = log;
await node.down();
});
test("exclude-system", async () => {
// region exclude-system
const subscription = client
.subscribeToAll({
fromPosition: START,
filter: excludeSystemEvents(),
})
.on("data", (resolvedEvent) => {
console.log(
`Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`
);
});
// endregion exclude-system
await subscription.unsubscribe();
});
test("event-type-prefix", async () => {
// region event-type-prefix
const filter = eventTypeFilter({
prefixes: ["customer-"],
});
// endregion event-type-prefix
return filter;
});
test("event-type-regex", async () => {
// region event-type-regex
const filter = eventTypeFilter({
regex: "^user|^company",
});
// endregion event-type-regex
return filter;
});
test("stream-prefix", async () => {
// region stream-prefix
const filter = streamNameFilter({
prefixes: ["user-"],
});
// endregion stream-prefix
return filter;
});
test("stream-regex", async () => {
// region stream-regex
const filter = streamNameFilter({
regex: "^account|^savings",
});
// endregion stream-regex
return filter;
});
test("checkpoint", async () => {
const doSomethingAsync = async () => {
// :shrug:
}; // region checkpoint
excludeSystemEvents({
async checkpointReached(_subscription, position) {
// The subscription will wait until the promise is resolved
// Save commit position to a persistent store as a checkpoint
await doSomethingAsync();
console.log(`checkpoint taken at ${position.commit}`);
},
});
// endregion checkpoint
});
test("checkpoint-with-interval", async () => {
// region checkpoint-with-interval
const filter = eventTypeFilter({
regex: "^[^$].*",
checkpointInterval: 1000,
checkpointReached(_subscription, position) {
// Save commit position to a persistent store as a checkpoint
console.log(`checkpoint taken at ${position.commit}`);
},
});
// endregion checkpoint-with-interval
return filter;
});
});
excludeSystemEvents({
async checkpointReached(_subscription, position) {
// The subscription will wait until the promise is resolved
// Save commit position to a persistent store as a checkpoint
await doSomethingAsync();
console.log(`checkpoint taken at ${position.commit}`);
},
});
String excludeSystemEventsRegex = "/^[^\\$].*/";
SubscriptionFilter filter = SubscriptionFilter.newBuilder()
.withEventTypeRegularExpression(excludeSystemEventsRegex)
.withCheckpointer(
new Checkpointer() {
@Override
public CompletableFuture<Void> onCheckpoint(Subscription subscription, Position position) {
// Save commit position to a persistent store as a checkpoint
System.out.println("checkpoint taken at {position.getCommitUnsigned}");
return CompletableFuture.completedFuture(null);
}
})
.build();
var filterOptions = new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents());
await using var subscription = client.SubscribeToAll(FromAll.Start, filterOptions: filterOptions);
await foreach (var message in subscription.Messages) {
switch (message) {
case StreamMessage.Event(var e):
Console.WriteLine($"{e.Event.EventType} @ {e.Event.Position.CommitPosition}");
break;
case StreamMessage.AllStreamCheckpointReached(var p):
// Save commit position to a persistent store as a checkpoint
Console.WriteLine($"checkpoint taken at {p.CommitPosition}");
break;
}
}
for {
event := sub.Recv()
if event.EventAppeared != nil {
streamId := event.EventAppeared.OriginalEvent().StreamID
revision := event.EventAppeared.OriginalEvent().EventNumber
fmt.Printf("received event %v@%v", revision, streamId)
}
if event.CheckPointReached != nil {
// Save commit position to a persistent store as a checkpoint
fmt.Printf("checkpoint taken at %v", event.CheckPointReached.Commit)
}
if event.SubscriptionDropped != nil {
break
}
}
loop {
let event = sub.next_subscription_event().await?;
match event {
SubscriptionEvent::EventAppeared(event) => {
let stream_id = event.get_original_stream_id();
let revision = event.get_original_event().revision;
println!("Received event {}@{}", revision, stream_id);
}
SubscriptionEvent::Checkpoint(position) => {
// Save commit position to a persistent store as a checkpoint
println!("checkpoint taken at {}", position.commit);
}
_ => {}
}
}
By default, the checkpoint notification is sent after every 32 non-system events processed from $all.
Configuring the checkpoint interval
You can adjust the checkpoint interval to change how often the client is notified.
subscription = client.subscribe_to_all(
commit_position=last_commit_position,
include_checkpoints=True,
checkpoint_interval_multiplier=5,
)
const filter = eventTypeFilter({
regex: "^[^$].*",
checkpointInterval: 1000,
checkpointReached(_subscription, position) {
// Save commit position to a persistent store as a checkpoint
console.log(`checkpoint taken at ${position.commit}`);
},
});
const filter = eventTypeFilter({
regex: "^[^$].*",
checkpointInterval: 1000,
checkpointReached(_subscription, position) {
// Save commit position to a persistent store as a checkpoint
console.log(`checkpoint taken at ${position.commit}`);
},
});
String excludeSystemEventsRegex = "/^[^\\$].*/";
SubscriptionFilter filter = SubscriptionFilter.newBuilder()
.withEventTypeRegularExpression(excludeSystemEventsRegex)
.withCheckpointer(
new Checkpointer() {
@Override
public CompletableFuture<Void> onCheckpoint(Subscription subscription, Position position) {
// Save commit position to a persistent store as a checkpoint
System.out.println("checkpoint taken at {position.getCommitUnsigned}");
return CompletableFuture.completedFuture(null);
}
},
1000)
.build();
var filterOptions = new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents(), 1000);
sub, err := db.SubscribeToAll(context.Background(), kurrentdb.SubscribeToAllOptions{
Filter: &kurrentdb.SubscriptionFilter{
Type: kurrentdb.EventFilterType,
Regex: "/^[^\\$].*/",
},
})
let filter = SubscriptionFilter::on_event_type().regex("/^[^\\$].*/");
let options = SubscribeToAllOptions::default().filter(filter);
let mut sub = client.subscribe_to_all(&options).await;
By configuring this parameter, you can balance between reducing checkpoint overhead and ensuring quick recovery in case of a failure.
Info
The checkpoint interval parameter configures the database to notify the client after n
* 32 number of events where n
is defined by the parameter.
For example:
- If
n
= 1, a checkpoint notification is sent every 32 events. - If
n
= 2, the notification is sent every 64 events. - If
n
= 3, it is sent every 96 events, and so on.