RxJS (Reactive Extensions for JavaScript) ist eine Bibliothek, die auf dem Observer-Muster und funktionaler Programmierung basiert, um ereignisbasierte und asynchrone Programmierung in JavaScript zu vereinfachen. Der Kern von RxJS liegt in der Idee der “reaktiven Programmierung”, bei der Datenströme und die Verbreitung von Änderungen im Mittelpunkt stehen.
Im Herzen von RxJS stehen drei fundamentale Konzepte: Observable, Observer und Operatoren.
Ein Observable repräsentiert eine Sequenz von Werten oder Ereignissen, die über Zeit emittiert werden. Man kann es sich als eine Quelle vorstellen, die Daten produziert – sei es ein Mausklick, ein Netzwerkantwort oder ein Timer.
import { Observable } from 'rxjs';
// Ein einfaches Observable, das nacheinander die Werte 1, 2 und 3 ausgibt
const einfachesObservable = new Observable(subscriber => {
subscriber.next(1); // Erster Wert wird emittiert
subscriber.next(2); // Zweiter Wert wird emittiert
subscriber.next(3); // Dritter Wert wird emittiert
subscriber.complete(); // Observable wird als abgeschlossen markiert
});Der Observer ist das Gegenstück zum Observable. Er hört auf
Ereignisse und reagiert darauf. Ein Observer besteht aus drei
Callback-Funktionen: - next(): Wird aufgerufen, wenn ein
neuer Wert emittiert wird - error(): Wird aufgerufen, wenn
ein Fehler auftritt - complete(): Wird aufgerufen, wenn das
Observable abgeschlossen ist
const beobachter = {
next: wert => console.log(`Empfangener Wert: ${wert}`),
error: fehler => console.error(`Fehler: ${fehler}`),
complete: () => console.log('Observable abgeschlossen')
};
// Verknüpfung von Observable und Observer
einfachesObservable.subscribe(beobachter);Operatoren sind das wahre Kraftzentrum von RxJS. Sie erlauben es, Datenströme zu transformieren, zu filtern, zu kombinieren und auf verschiedenste Weise zu manipulieren. Diese Funktionen arbeiten nach dem Prinzip der funktionalen Programmierung – sie nehmen ein Observable entgegen und geben ein neues Observable zurück.
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
// Ein Observable mit den Werten 1-5 erstellen
const zahlen$ = of(1, 2, 3, 4, 5);
// Operatoren anwenden: nur gerade Zahlen auswählen und dann verdoppeln
zahlen$.pipe(
filter(zahl => zahl % 2 === 0), // Nur gerade Zahlen durchlassen
map(zahl => zahl * 2) // Jede Zahl verdoppeln
).subscribe(
ergebnis => console.log(ergebnis) // Ausgabe: 4, 8
);RxJS basiert auf der Erkenntnis, dass in modernen Webanwendungen Daten aus vielen Quellen stammen können: Benutzerinteraktionen, Netzwerkanfragen, WebSockets und mehr. Die traditionelle ereignisbasierte Programmierung wird schnell komplex, wenn diese Quellen interagieren müssen.
Stellen wir uns vor, wir möchten eine Suchfunktion implementieren, die: 1. Auf Benutzereingaben reagiert 2. Keine Anfragen sendet, während der Benutzer noch tippt 3. Anfragen abbricht, wenn eine neuere Eingabe erfolgt 4. Duplikate vermeidet
In traditionellem JavaScript wäre dies ein komplexes Unterfangen mit vielen Zustandsvariablen. Mit RxJS können wir dies elegant ausdrücken:
import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
// HTML-Sucheingabefeld
const suchfeld = document.getElementById('suchfeld');
// Observable für Eingabeereignisse erstellen
fromEvent(suchfeld, 'input').pipe(
debounceTime(300), // Warte 300ms nach der letzten Eingabe
map(event => event.target.value), // Extrahiere den Textwert
distinctUntilChanged(), // Ignoriere, wenn sich der Text nicht geändert hat
switchMap(suchtext => { // Führe Suchanfrage durch und breche alte ab
return sucheAusführen(suchtext);
})
).subscribe(
ergebnisse => zeigeErgebnisseAn(ergebnisse)
);Diese deklarative Herangehensweise macht den Code lesbarer und wartbarer, da sie die Intention (was passieren soll) über die Implementierung (wie es passieren soll) stellt.
RxJS unterscheidet zwischen “kalten” und “heißen” Observables:
Kalte Observables beginnen erst mit der
Datenproduktion, wenn jemand sie abonniert. Jeder Abonnent bekommt einen
eigenen, unabhängigen Datenstrom. Beispiele sind HTTP-Anfragen mit
HttpClient.
Heiße Observables produzieren Daten unabhängig davon, ob jemand zuhört. Wenn ein Abonnent hinzukommt, erhält er nur die Daten, die ab diesem Zeitpunkt emittiert werden. Beispiele sind Mausereignisse oder WebSockets.
Ein kaltes Observable kann mit Operatoren wie share()
oder publish() in ein heißes umgewandelt werden:
import { share } from 'rxjs/operators';
// Ein teures Observable, zum Beispiel eine HTTP-Anfrage
const geteiltesDaten$ = teueresDatenObservable$.pipe(
share() // Macht das Observable "heiß", sodass mehrere Abonnenten denselben Datenstrom teilen
);
// Mehrere Komponenten können nun abonnieren, ohne dass multiple Anfragen ausgelöst werden
geteiltesDaten$.subscribe(daten => verarbeiteInKomponenteA(daten));
geteiltesDaten$.subscribe(daten => verarbeiteInKomponenteB(daten));In asynchronen Anwendungen ist die Fehlerbehandlung oft komplex. RxJS bietet hierfür elegante Lösungen:
import { catchError, retry } from 'rxjs/operators';
import { of } from 'rxjs';
datenAnfrage$.pipe(
retry(3), // Bei Fehler bis zu dreimal wiederholen
catchError(fehler => { // Wenn immer noch fehlerhaft, Ersatzdaten liefern
console.error('Fehler bei Datenanfrage:', fehler);
return of({ fehler: true, ersatzdaten: [] }); // Ersatz-Observable
})
).subscribe({
next: daten => verarbeiteDaten(daten),
error: fehler => console.error('Unerwarteter Fehler:', fehler), // Sollte nie erreicht werden
complete: () => console.log('Anfrage abgeschlossen')
});RxJS eignet sich hervorragend für eine reaktive Zustandsverwaltung in Anwendungen:
import { BehaviorSubject } from 'rxjs';
// Ein BehaviorSubject speichert den aktuellen Zustand und sendet ihn an neue Abonnenten
const zustand$ = new BehaviorSubject({
benutzer: null,
istEingeloggt: false,
einstellungen: { darkMode: false }
});
// Zustand abonnieren
zustand$.subscribe(neuerZustand => {
console.log('Zustand aktualisiert:', neuerZustand);
aktualisiereBenutzeroberfläche(neuerZustand);
});
// Zustand aktualisieren (immutabel)
function benutzerEinloggen(benutzer) {
const aktuellerZustand = zustand$.getValue();
zustand$.next({
...aktuellerZustand,
benutzer,
istEingeloggt: true
});
}Oft müssen wir Daten aus verschiedenen Quellen kombinieren. RxJS macht dies einfach:
import { combineLatest, timer } from 'rxjs';
import { map } from 'rxjs/operators';
// Verschiedene Datenquellen
const benutzer$ = benutzerService.getAktuellenBenutzer();
const benachrichtigungen$ = benachrichtigungsService.getBenachrichtigungen();
const onlineStatus$ = timer(0, 30000).pipe(map(() => navigator.onLine));
// Kombiniere die neuesten Werte aller Quellen
combineLatest([benutzer$, benachrichtigungen$, onlineStatus$]).pipe(
map(([benutzer, benachrichtigungen, istOnline]) => ({
benutzerName: benutzer?.name || 'Gast',
benachrichtigungsAnzahl: benachrichtigungen.length,
verbindungsstatus: istOnline ? 'online' : 'offline'
}))
).subscribe(dashboardDaten => {
aktualisiereDashboard(dashboardDaten);
});RxJS wird häufig in Verbindung mit modernen JavaScript-Frameworks eingesetzt:
Angular hat RxJS tief in seine Architektur integriert. Der HttpClient gibt Observables zurück, und Reactive Forms basieren vollständig auf RxJS:
import { Component, OnInit } from '@angular/core';
import { FormGroup, FormControl } from '@angular/forms';
import { debounceTime, distinctUntilChanged } from 'rxjs/operators';
@Component({
selector: 'app-suchformular',
template: `
<form [formGroup]="suchFormular">
<input formControlName="suchbegriff" placeholder="Suchen...">
</form>
<div *ngIf="suchErgebnisse.length">
<!-- Ergebnisse anzeigen -->
</div>
`
})
export class SuchformularComponent implements OnInit {
suchFormular = new FormGroup({
suchbegriff: new FormControl('')
});
suchErgebnisse = [];
ngOnInit() {
// ValueChanges ist ein Observable
this.suchFormular.get('suchbegriff').valueChanges.pipe(
debounceTime(300),
distinctUntilChanged()
).subscribe(begriff => {
this.sucheAusführen(begriff);
});
}
}In React wird RxJS oft mit Hooks kombiniert:
import React, { useState, useEffect } from 'react';
import { Subject } from 'rxjs';
import { debounceTime, distinctUntilChanged } from 'rxjs/operators';
// Suchbegriff-Subject erstellen
const suchBegriff$ = new Subject();
// Such-Logic einrichten
suchBegriff$.pipe(
debounceTime(300),
distinctUntilChanged()
).subscribe(begriff => {
// Suche durchführen
});
function Suchkomponente() {
const [ergebnisse, setErgebnisse] = useState([]);
// Suchfunktion, die das Subject aktualisiert
const handleSuche = (event) => {
suchBegriff$.next(event.target.value);
};
// Effekt zum Abonnieren der Suchergebnisse
useEffect(() => {
const subscription = suchergebnisse$.subscribe(neueErgebnisse => {
setErgebnisse(neueErgebnisse);
});
// Aufräumen beim Unmount
return () => subscription.unsubscribe();
}, []);
return (
<div>
<input onChange={handleSuche} placeholder="Suchen..." />
{/* Ergebnisse anzeigen */}
</div>
);
}RxJS bietet eine mächtige Abstraktion für die Arbeit mit asynchronen Datenströmen. Es erfordert zwar ein Umdenken von der imperativen zur reaktiven Programmierung, belohnt diesen Aufwand jedoch mit eleganteren Lösungen für komplexe asynchrone Probleme. Die Bibliothek hat sich als unverzichtbares Werkzeug für moderne Web-Entwicklung etabliert, besonders in komplexen Single-Page-Anwendungen, wo Daten aus verschiedenen Quellen zusammengeführt werden müssen.
Der wahre Wert von RxJS liegt in der Möglichkeit, komplexe asynchrone Prozesse in deklarative, lesbare Datenflüsse zu transformieren, die leichter zu verstehen, zu testen und zu warten sind. Mit zunehmender Erfahrung werden Entwickler feststellen, dass viele Probleme, die mit herkömmlichen Methoden schwierig zu lösen sind, mit RxJS natürlich und intuitiv angegangen werden können.