Moderne Webanwendungen sind fundamental asynchron. User-Interaktionen, HTTP-Requests, WebSocket-Messages, Timer – all diese Events passieren zu unvorhersehbaren Zeitpunkten. Traditionelle Callback-basierte oder Promise-basierte Ansätze stoßen schnell an ihre Grenzen.
Eine typische Anforderung illustriert das Problem: Eine Suchfunktion soll bei jedem Keystroke den Server anfragen, aber nur wenn der User 300ms pausiert hat, und vorherige Requests sollen abgebrochen werden, und identische aufeinanderfolgende Anfragen sollen vermieden werden. Mit Callbacks oder Promises wird dieser Code komplex, zustandsbehaftet und fehleranfällig.
RxJS (Reactive Extensions for JavaScript) löst dieses Problem durch eine fundamentale Abstraktion: den Observable-Stream. Asynchrone Events werden als Werte über Zeit modelliert. Operationen auf diesen Streams sind deklarativ, komposierbar und funktional. Der Code drückt aus was passieren soll, nicht wie es implementiert wird.
Ein Observable ist eine Funktion, die eine Stream-Sequenz produziert.
Technisch ist ein Observable ein Objekt mit einer
subscribe-Methode. Calling subscribe startet
die Stream-Produktion und registriert Callbacks für Werte, Errors und
Completion.
Ein Observable von Hand erstellt:
import { Observable } from 'rxjs';
const numberStream = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
// Cleanup function - wird bei unsubscribe ausgeführt
return () => {
console.log('Cleanup executed');
};
});
const subscription = numberStream.subscribe({
next: value => console.log('Value:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Complete')
});
// Output:
// Value: 1
// Value: 2
// Value: 3
// (nach 1 Sekunde)
// Value: 4
// CompleteDie Callback-Funktion an Observable ist der Producer.
Sie erhält einen Subscriber mit drei Methoden:
next emittiert Werte, error signalisiert
Fehler, complete signalisiert Stream-Ende. Die
Return-Funktion ist Cleanup-Logic – sie läuft bei Unsubscribe oder
Complete.
Observables sind lazy. Die Producer-Funktion läuft nicht bei Creation, sondern erst bei Subscription. Jede Subscription erhält eine unabhängige Execution. Dies ist fundamental unterschiedlich zu Promises, die eager und shared sind.
Manuelle Observable-Creation ist verbose. RxJS bietet Creation-Operators für Common Cases:
import { of, from, interval, fromEvent, timer } from 'rxjs';
// of: emittiert Argumente synchron, dann complete
const numbers$ = of(1, 2, 3);
// from: konvertiert Array, Promise, Iterable zu Observable
const array$ = from([1, 2, 3]);
const promise$ = from(fetch('/api/data'));
// interval: emittiert incrementing numbers in Intervallen
const ticker$ = interval(1000); // 0, 1, 2, 3... jede Sekunde
// timer: emittiert einmalig nach Delay
const delayed$ = timer(3000); // emittiert 0 nach 3 Sekunden
// fromEvent: konvertiert DOM-Events zu Observable
const clicks$ = fromEvent(document, 'click');Die $-Suffix-Konvention signalisiert Observables. Dies
ist nicht technisch nötig aber weit verbreitet und verbessert
Lesbarkeit.
Operators transformieren Observables zu neuen Observables. Sie sind
pure Functions – sie modifizieren nicht das Source-Observable, sondern
returnen ein neues. Dies ermöglicht Chaining via die
pipe-Methode.
map transformiert jeden emittierten Wert:
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
of(1, 2, 3).pipe(
map(x => x * 10)
).subscribe(x => console.log(x));
// Output: 10, 20, 30pluck extrahiert Properties aus Objects:
import { of } from 'rxjs';
import { pluck } from 'rxjs/operators';
of(
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 25 }
).pipe(
pluck('name')
).subscribe(name => console.log(name));
// Output: Alice, Bobscan ist wie reduce für Arrays, aber
emittiert intermediate Werte:
import { of } from 'rxjs';
import { scan } from 'rxjs/operators';
of(1, 2, 3, 4).pipe(
scan((acc, val) => acc + val, 0)
).subscribe(sum => console.log(sum));
// Output: 1, 3, 6, 10filter lässt nur Werte durch, die ein Prädikat
erfüllen:
import { interval } from 'rxjs';
import { filter, take } from 'rxjs/operators';
interval(1000).pipe(
filter(x => x % 2 === 0),
take(5)
).subscribe(x => console.log(x));
// Output: 0, 2, 4, 6, 8distinctUntilChanged filtert aufeinanderfolgende
Duplikate:
import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
of(1, 1, 2, 2, 3, 1).pipe(
distinctUntilChanged()
).subscribe(x => console.log(x));
// Output: 1, 2, 3, 1debounceTime emittiert nur wenn eine Pause zwischen
Emissions war:
import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';
fromEvent(inputElement, 'input').pipe(
debounceTime(300),
map(event => event.target.value)
).subscribe(value => console.log('Search:', value));Dies ist essentiell für Search-as-you-Type. Ohne Debouncing würde jeder Keystroke einen Request triggern. Mit Debouncing wartet das Observable bis der User pausiert.
combineLatest kombiniert die neuesten Werte mehrerer
Observables:
import { combineLatest, interval } from 'rxjs';
import { map } from 'rxjs/operators';
const first$ = interval(1000);
const second$ = interval(1500);
combineLatest([first$, second$]).pipe(
map(([a, b]) => `First: ${a}, Second: ${b}`)
).subscribe(console.log);combineLatest emittiert erst wenn alle
Source-Observables mindestens einmal emittiert haben. Dann emittiert es
bei jeder Emission einer Source, mit den neuesten Werten aller
Sources.
merge interleaved alle Sources:
import { merge, interval } from 'rxjs';
import { map } from 'rxjs/operators';
const first$ = interval(1000).pipe(map(x => `First: ${x}`));
const second$ = interval(1500).pipe(map(x => `Second: ${x}`));
merge(first$, second$).subscribe(console.log);
// Output: First: 0, Second: 0, First: 1, Second: 1, First: 2...forkJoin wartet bis alle Sources completen, dann
emittiert die letzten Werte:
import { forkJoin, of } from 'rxjs';
import { delay } from 'rxjs/operators';
const first$ = of('First').pipe(delay(1000));
const second$ = of('Second').pipe(delay(1500));
forkJoin([first$, second$]).subscribe(([a, b]) => {
console.log(a, b); // Output nach 1500ms: First Second
});forkJoin ist ähnlich zu Promise.all – es
parallelisiert asynchrone Operations und aggregiert Results.
Viele Scenarios produzieren “Observable of Observables”. Ein Search-Input emittiert Strings. Jeder String triggert einen HTTP-Request, der ein Observable ist. Das Result ist ein Observable das Observables emittiert – ein Higher-Order Observable.
Flattening Operators subscriben automatisch inner Observables und flattern das Result zu einem single-level Observable.
switchMap cancelt die vorherige inner Observable bei
jeder neuen Emission:
import { fromEvent } from 'rxjs';
import { switchMap, debounceTime, map } from 'rxjs/operators';
fromEvent(searchInput, 'input').pipe(
debounceTime(300),
map(event => event.target.value),
switchMap(query => httpClient.get(`/api/search?q=${query}`))
).subscribe(results => displayResults(results));Bei jeder Eingabe wird der vorherige Request gecancelt. Dies verhindert Race Conditions wo ein langsamer Request Results für einen veralteten Query liefert.
mergeMap (alias flatMap) subscribed alle
inner Observables parallel:
import { of } from 'rxjs';
import { mergeMap, delay } from 'rxjs/operators';
of(1, 2, 3).pipe(
mergeMap(x => of(x * 10).pipe(delay(x * 1000)))
).subscribe(console.log);
// Output: 10 (nach 1s), 20 (nach 2s), 30 (nach 3s)Alle drei inner Observables laufen parallel. Results emittieren in completion order, nicht source order.
concatMap subscribed inner Observables sequentiell:
import { of } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';
of(1, 2, 3).pipe(
concatMap(x => of(x * 10).pipe(delay(1000)))
).subscribe(console.log);
// Output: 10 (nach 1s), 20 (nach 2s), 30 (nach 3s)Jedes inner Observable muss completen bevor das nächste startet. Results sind in source order garantiert.
exhaustMap ignoriert neue Emissions während ein inner
Observable läuft:
import { fromEvent, of } from 'rxjs';
import { exhaustMap, delay } from 'rxjs/operators';
fromEvent(button, 'click').pipe(
exhaustMap(() => saveData().pipe(delay(2000)))
).subscribe(result => console.log('Saved:', result));Clicks während eines laufenden Save-Operations werden ignoriert. Dies verhindert duplicate Submissions.
Die Wahl zwischen diesen Operators ist kritisch:
| Operator | Verwendung | Beispiel |
|---|---|---|
switchMap |
Latest value matters | Search, Auto-complete |
mergeMap |
All values matter | Logging, Analytics |
concatMap |
Order matters | Sequential Operations |
exhaustMap |
Ignore during execution | Form Submission, Button Clicks |
Normale Observables sind unicast – jede Subscription erhält eine separate Execution. Subjects sind multicast – alle Subscriptions teilen sich eine Execution.
Ein Subject ist gleichzeitig Observable und Observer. Es kann subscribed werden wie ein Observable und Werte emittieren wie ein Observer.
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe(x => console.log('Observer A:', x));
subject.subscribe(x => console.log('Observer B:', x));
subject.next(1);
subject.next(2);
// Output:
// Observer A: 1
// Observer B: 1
// Observer A: 2
// Observer B: 2BehaviorSubject speichert den aktuellen Wert. Neue
Subscribers erhalten sofort den letzten emittierten Wert:
import { BehaviorSubject } from 'rxjs';
const state$ = new BehaviorSubject({ count: 0 });
state$.subscribe(state => console.log('A:', state.count));
state$.next({ count: 1 });
state$.subscribe(state => console.log('B:', state.count));
// Output:
// A: 0
// A: 1
// B: 1Dies ist ideal für State Management. Die aktuelle State ist immer
verfügbar via getValue():
const currentState = state$.getValue();ReplaySubject speichert eine History von Emissions. Neue
Subscribers erhalten die letzten N Werte:
import { ReplaySubject } from 'rxjs';
const replay$ = new ReplaySubject(3); // Speichere 3 Werte
replay$.next(1);
replay$.next(2);
replay$.next(3);
replay$.next(4);
replay$.subscribe(x => console.log('Subscriber:', x));
// Output:
// Subscriber: 2
// Subscriber: 3
// Subscriber: 4Dies ist nützlich für Caching oder Event-History.
Observables können Errors emittieren via error(). Dies
terminiert das Observable. Error-Handling ist essentiell für
Production-Code.
catchError fängt Errors und returnt ein
Fallback-Observable:
import { of, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';
throwError(() => new Error('Something went wrong')).pipe(
catchError(err => {
console.error('Error caught:', err);
return of({ error: true, data: [] });
})
).subscribe({
next: data => console.log('Data:', data),
error: err => console.error('Unhandled:', err) // Wird nie erreicht
});Das Observable recovered von dem Error und emittiert Fallback-Data. Der Error propagiert nicht zum Subscriber.
retry resubscribet automatisch bei Errors:
import { ajax } from 'rxjs/ajax';
import { retry, catchError } from 'rxjs/operators';
import { of } from 'rxjs';
ajax.getJSON('/api/data').pipe(
retry(3), // Versuche bis zu 3 mal
catchError(err => {
console.error('Failed after 3 retries:', err);
return of({ error: true });
})
).subscribe(data => console.log(data));retry ist nützlich für transient Network-Errors. Für
exponential Backoff gibt es retryWhen.
Diese Unterscheidung ist fundamental für Performance und Verhalten.
Cold Observables starten Production bei jedem Subscribe. Jeder Subscriber erhält eine unabhängige Execution:
import { Observable } from 'rxjs';
const cold$ = new Observable(subscriber => {
console.log('Execution started');
subscriber.next(Math.random());
subscriber.complete();
});
cold$.subscribe(x => console.log('A:', x));
cold$.subscribe(x => console.log('B:', x));
// Output:
// Execution started
// A: 0.123...
// Execution started
// B: 0.456...HTTP-Requests via Angular’s HttpClient sind cold. Jeder
Subscribe triggert einen neuen Request.
Hot Observables produzieren unabhängig von Subscriptions. Subscribers teilen sich die Execution:
import { fromEvent } from 'rxjs';
const clicks$ = fromEvent(document, 'click');
clicks$.subscribe(event => console.log('A:', event));
clicks$.subscribe(event => console.log('B:', event));
// Ein Click triggert beide SubscriptionsDOM-Events, WebSockets sind hot. Die Events passieren unabhängig von Subscriptions.
Ein cold Observable kann hot gemacht werden via Multicasting:
import { interval } from 'rxjs';
import { share } from 'rxjs/operators';
const cold$ = interval(1000);
const hot$ = cold$.pipe(share());
hot$.subscribe(x => console.log('A:', x));
setTimeout(() => {
hot$.subscribe(x => console.log('B:', x));
}, 2500);
// A: 0, A: 1, A: 2, B: 2, A: 3, B: 3...share macht das Observable hot. Der zweite Subscriber
sieht nur Werte ab Subscription-Zeitpunkt.
Observables die nicht completen müssen unsubscribed werden. Sonst resultiert ein Memory Leak.
import { interval } from 'rxjs';
const subscription = interval(1000).subscribe(x => console.log(x));
// Später cleanup
subscription.unsubscribe();In Angular-Komponenten ist dies kritisch:
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs';
@Component({
selector: 'app-data',
template: `<div>{{ data }}</div>`
})
export class DataComponent implements OnInit, OnDestroy {
data: any;
private subscription = new Subscription();
ngOnInit() {
this.subscription.add(
dataService.getData().subscribe(data => this.data = data)
);
this.subscription.add(
timer(1000).subscribe(() => this.refresh())
);
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}Die Subscription aggregiert multiple Sub-Subscriptions.
Ein einzelnes unsubscribe() cleaned alle.
Angular’s AsyncPipe handlet Unsubscription
automatisch:
@Component({
template: `<div>{{ data$ | async }}</div>`
})
export class DataComponent {
data$ = dataService.getData();
}Kein ngOnDestroy nötig. Die Pipe subscribet bei
Component-Init und unsubscribet bei Component-Destroy.
Eine production-ready Search-Komponente:
import { Component } from '@angular/core';
import { FormControl } from '@angular/forms';
import { debounceTime, distinctUntilChanged, switchMap, catchError, startWith } from 'rxjs/operators';
import { of } from 'rxjs';
@Component({
selector: 'app-search',
template: `
<input [formControl]="searchControl" placeholder="Search...">
@if (loading$ | async) {
<div class="spinner">Loading...</div>
}
@if (results$ | async; as results) {
<div class="results">
@for (result of results; track result.id) {
<div class="result-item">{{ result.name }}</div>
}
</div>
}
@if (error$ | async; as error) {
<div class="error">{{ error }}</div>
}
`
})
export class SearchComponent {
searchControl = new FormControl('');
loading$ = new BehaviorSubject(false);
error$ = new BehaviorSubject<string | null>(null);
results$ = this.searchControl.valueChanges.pipe(
startWith(''),
debounceTime(300),
distinctUntilChanged(),
tap(() => {
this.loading$.next(true);
this.error$.next(null);
}),
switchMap(query =>
this.searchService.search(query).pipe(
catchError(err => {
this.error$.next('Search failed');
return of([]);
})
)
),
tap(() => this.loading$.next(false))
);
}Dieses Pattern handled Loading-State, Error-State und Debouncing. Der
switchMap cancelt vorherige Requests automatisch.
Ein Service für Application-State:
import { Injectable } from '@angular/core';
import { BehaviorSubject } from 'rxjs';
import { map } from 'rxjs/operators';
interface AppState {
user: User | null;
cart: CartItem[];
notifications: Notification[];
}
@Injectable({ providedIn: 'root' })
export class StateService {
private state$ = new BehaviorSubject<AppState>({
user: null,
cart: [],
notifications: []
});
// Selectors
readonly user$ = this.state$.pipe(map(state => state.user));
readonly cart$ = this.state$.pipe(map(state => state.cart));
readonly cartItemCount$ = this.cart$.pipe(map(cart => cart.length));
// Actions
setUser(user: User) {
this.updateState({ user });
}
addToCart(item: CartItem) {
const cart = [...this.state$.value.cart, item];
this.updateState({ cart });
}
private updateState(partial: Partial<AppState>) {
this.state$.next({
...this.state$.value,
...partial
});
}
}Komponenten subscriben Selectors für reactive Updates. Actions sind imperative Mutations auf immutable State.
Ein Dashboard aggregiert Daten aus verschiedenen Services:
@Component({
template: `
@if (dashboard$ | async; as dashboard) {
<h1>Welcome {{ dashboard.userName }}</h1>
<div>Orders: {{ dashboard.orderCount }}</div>
<div>Cart: {{ dashboard.cartItems }}</div>
<div>Status: {{ dashboard.onlineStatus }}</div>
}
`
})
export class DashboardComponent {
dashboard$ = combineLatest([
this.userService.currentUser$,
this.orderService.orders$,
this.cartService.cart$,
timer(0, 30000).pipe(map(() => navigator.onLine))
]).pipe(
map(([user, orders, cart, online]) => ({
userName: user?.name || 'Guest',
orderCount: orders.length,
cartItems: cart.length,
onlineStatus: online ? 'Online' : 'Offline'
}))
);
}combineLatest synchronisiert alle Sources. Jede Änderung
triggert ein Dashboard-Update.
RxJS transformiert Asynchronität von einem Implementation-Detail zu einer First-Class-Abstraktion. Observables kapseln Event-Streams als composable Values. Operators sind deklarative Transformationen dieser Streams. Das Result ist Code der lesbarer, testbarer und wartbarer ist als imperative Callbacks. Die Lernkurve ist steil – das Observer-Pattern, funktionale Operators, Hot vs. Cold Semantics erfordern Umdenken. Aber für komplexe asynchrone Szenarien ist RxJS unverzichtbar, besonders in Angular wo es tief in die Architektur integriert ist.