RxJS Operators
The applesauce-relay
package provides several RxJS operators to help process and transform streams of events from Nostr relays.
onlyEvents
The onlyEvents
operator filters the subscription response stream to only emit Nostr events, removing any "EOSE" (End Of Stored Events) messages.
import { onlyEvents } from "applesauce-relay/operators";
// Subscribe to events and only receive Nostr events (no EOSE messages)
relay
.req({
kinds: [1],
limit: 10,
})
.pipe(onlyEvents())
.subscribe((event) => {
// This will only receive events, not "EOSE" strings
console.log(event.id);
});
markFromRelay
The markFromRelay
operator adds metadata to events indicating which relay they were received from. This is useful for tracking event propagation across the network.
import { markFromRelay } from "applesauce-relay/operators";
import { getSeenRelays } from "applesauce-core/helpers";
// Create a new relay instance
const relay = new Relay("wss://relay.example.com");
// Subscribe to events and mark them as coming from this relay
relay
.req({
kinds: [1],
limit: 10,
})
.pipe(markFromRelay(relay.url))
.subscribe((response) => {
if (typeof response !== "string") {
// Check which relays this event has been seen on
console.log(getSeenRelays(response));
}
});
completeOnEose
The completeOnEose
operator completes the subscription stream when an "EOSE" (End Of Stored Events) message is received. This is particularly useful for one-off requests where you want to collect all events and then process them together.
import { completeOnEose } from "applesauce-relay/operators";
import { lastValueFrom } from "rxjs";
import { toArray } from "rxjs/operators";
// Method 1: Complete the stream when EOSE is received
relay
.req({
kinds: [1],
limit: 10,
})
.pipe(completeOnEose())
.subscribe({
next: (event) => console.log(event.id),
complete: () => console.log("All stored events received"),
});
// Method 2: Collect all events into an array
const events = await lastValueFrom(
relay
.req({
kinds: [1],
limit: 10,
})
.pipe(completeOnEose(), toArray()),
);
// Method 3: Include the EOSE message in the stream
relay
.req({
kinds: [1],
limit: 10,
})
.pipe(completeOnEose(true))
.subscribe((response) => {
if (response === "EOSE") {
console.log("End of stored events");
} else {
console.log("Event:", response.id);
}
});
storeEvents
The storeEvents
operator adds all events from the subscription stream to an EventStore
without filtering or removing duplicates. The stream continues to emit all original messages.
import { storeEvents } from "applesauce-relay/operators";
import { EventStore } from "applesauce-core";
// Create an event store
const eventStore = new EventStore();
// Subscribe to events and add them to the store
relay
.req({
kinds: [1],
limit: 10,
})
.pipe(storeEvents(eventStore))
.subscribe((response) => {
if (response === "EOSE") {
// Access all events from the store
const allEvents = eventStore.getAll();
console.log(`Received ${allEvents.length} events`);
}
});
toEventStore
The toEventStore
operator adds all events to an EventStore
, removes duplicates, and returns a sorted array of events when the EOSE message is received. This is perfect for fetching and processing a complete set of events.
WARNING
This operator is deprecated. It's recommended to use the mapEventsToStore
and mapEventsToTimeline
operators from applesauce-core/observable
instead.
import { toEventStore } from "applesauce-relay/operators";
import { EventStore } from "applesauce-core";
import { lastValueFrom } from "rxjs";
// Create an event store
const eventStore = new EventStore();
// Fetch events, deduplicate, and sort them
const timeline = await lastValueFrom(
relay
.req({
kinds: [1],
limit: 10,
})
.pipe(toEventStore(eventStore)),
);
console.log(`Received ${timeline.length} unique events`);
Recommended alternative
import { mapEventsToStore, mapEventsToTimeline } from "applesauce-core/observable";
import { completeOnEose } from "applesauce-relay/operators";
import { EventStore } from "applesauce-core";
import { lastValueFrom } from "rxjs";
// Create an event store
const eventStore = new EventStore();
// Fetch events, deduplicate, and sort them
const timeline = await lastValueFrom(
relay
.req({
kinds: [1],
limit: 10,
})
.pipe(completeOnEose(), mapEventsToStore(eventStore, true), mapEventsToTimeline()),
);
console.log(`Received ${timeline.length} unique events`);
Combining Operators
These operators can be combined to create powerful data processing pipelines:
import { markFromRelay, onlyEvents, completeOnEose } from "applesauce-relay/operators";
import { mapEventsToStore, mapEventsToTimeline } from "applesauce-core/observable";
import { EventStore } from "applesauce-core";
import { lastValueFrom } from "rxjs";
// Create an event store
const eventStore = new EventStore();
// Create a relay pool
const pool = new RelayPool();
// Define relay URLs
const relays = ["wss://relay1.example.com", "wss://relay2.example.com"];
// Fetch events from multiple relays, track where they came from,
// deduplicate, and sort them
const timeline = await lastValueFrom(
pool
.req(relays, {
kinds: [1],
limit: 10,
})
.pipe(
// Mark each event with its source relay
markFromRelay(relays[0]),
// Filter out EOSE messages
onlyEvents(),
// Complete when all events are received
completeOnEose(),
// Store events and remove duplicates
mapEventsToStore(eventStore, true),
// Create a sorted timeline
mapEventsToTimeline(),
),
);
console.log(`Received ${timeline.length} unique events`);