11 RxJS: Reactive Programming für Angular

11.1 Das Problem der Asynchronität

Moderne Webanwendungen sind fundamental asynchron. User-Interaktionen, HTTP-Requests, WebSocket-Messages, Timer – all diese Events passieren zu unvorhersehbaren Zeitpunkten. Traditionelle Callback-basierte oder Promise-basierte Ansätze stoßen schnell an ihre Grenzen.

Eine typische Anforderung illustriert das Problem: Eine Suchfunktion soll bei jedem Keystroke den Server anfragen, aber nur wenn der User 300ms pausiert hat, und vorherige Requests sollen abgebrochen werden, und identische aufeinanderfolgende Anfragen sollen vermieden werden. Mit Callbacks oder Promises wird dieser Code komplex, zustandsbehaftet und fehleranfällig.

RxJS (Reactive Extensions for JavaScript) löst dieses Problem durch eine fundamentale Abstraktion: den Observable-Stream. Asynchrone Events werden als Werte über Zeit modelliert. Operationen auf diesen Streams sind deklarativ, komposierbar und funktional. Der Code drückt aus was passieren soll, nicht wie es implementiert wird.

11.2 Observable: Der Kern von RxJS

Ein Observable ist eine Funktion, die eine Stream-Sequenz produziert. Technisch ist ein Observable ein Objekt mit einer subscribe-Methode. Calling subscribe startet die Stream-Produktion und registriert Callbacks für Werte, Errors und Completion.

Ein Observable von Hand erstellt:

import { Observable } from 'rxjs';

const numberStream = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
  
  // Cleanup function - wird bei unsubscribe ausgeführt
  return () => {
    console.log('Cleanup executed');
  };
});

const subscription = numberStream.subscribe({
  next: value => console.log('Value:', value),
  error: err => console.error('Error:', err),
  complete: () => console.log('Complete')
});

// Output:
// Value: 1
// Value: 2
// Value: 3
// (nach 1 Sekunde)
// Value: 4
// Complete

Die Callback-Funktion an Observable ist der Producer. Sie erhält einen Subscriber mit drei Methoden: next emittiert Werte, error signalisiert Fehler, complete signalisiert Stream-Ende. Die Return-Funktion ist Cleanup-Logic – sie läuft bei Unsubscribe oder Complete.

Observables sind lazy. Die Producer-Funktion läuft nicht bei Creation, sondern erst bei Subscription. Jede Subscription erhält eine unabhängige Execution. Dies ist fundamental unterschiedlich zu Promises, die eager und shared sind.

11.3 Creation Operators

Manuelle Observable-Creation ist verbose. RxJS bietet Creation-Operators für Common Cases:

import { of, from, interval, fromEvent, timer } from 'rxjs';

// of: emittiert Argumente synchron, dann complete
const numbers$ = of(1, 2, 3);

// from: konvertiert Array, Promise, Iterable zu Observable
const array$ = from([1, 2, 3]);
const promise$ = from(fetch('/api/data'));

// interval: emittiert incrementing numbers in Intervallen
const ticker$ = interval(1000); // 0, 1, 2, 3... jede Sekunde

// timer: emittiert einmalig nach Delay
const delayed$ = timer(3000); // emittiert 0 nach 3 Sekunden

// fromEvent: konvertiert DOM-Events zu Observable
const clicks$ = fromEvent(document, 'click');

Die $-Suffix-Konvention signalisiert Observables. Dies ist nicht technisch nötig aber weit verbreitet und verbessert Lesbarkeit.

11.4 Pipeable Operators

Operators transformieren Observables zu neuen Observables. Sie sind pure Functions – sie modifizieren nicht das Source-Observable, sondern returnen ein neues. Dies ermöglicht Chaining via die pipe-Methode.

11.4.1 Transformation Operators

map transformiert jeden emittierten Wert:

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

of(1, 2, 3).pipe(
  map(x => x * 10)
).subscribe(x => console.log(x));
// Output: 10, 20, 30

pluck extrahiert Properties aus Objects:

import { of } from 'rxjs';
import { pluck } from 'rxjs/operators';

of(
  { name: 'Alice', age: 30 },
  { name: 'Bob', age: 25 }
).pipe(
  pluck('name')
).subscribe(name => console.log(name));
// Output: Alice, Bob

scan ist wie reduce für Arrays, aber emittiert intermediate Werte:

import { of } from 'rxjs';
import { scan } from 'rxjs/operators';

of(1, 2, 3, 4).pipe(
  scan((acc, val) => acc + val, 0)
).subscribe(sum => console.log(sum));
// Output: 1, 3, 6, 10

11.4.2 Filtering Operators

filter lässt nur Werte durch, die ein Prädikat erfüllen:

import { interval } from 'rxjs';
import { filter, take } from 'rxjs/operators';

interval(1000).pipe(
  filter(x => x % 2 === 0),
  take(5)
).subscribe(x => console.log(x));
// Output: 0, 2, 4, 6, 8

distinctUntilChanged filtert aufeinanderfolgende Duplikate:

import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

of(1, 1, 2, 2, 3, 1).pipe(
  distinctUntilChanged()
).subscribe(x => console.log(x));
// Output: 1, 2, 3, 1

debounceTime emittiert nur wenn eine Pause zwischen Emissions war:

import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';

fromEvent(inputElement, 'input').pipe(
  debounceTime(300),
  map(event => event.target.value)
).subscribe(value => console.log('Search:', value));

Dies ist essentiell für Search-as-you-Type. Ohne Debouncing würde jeder Keystroke einen Request triggern. Mit Debouncing wartet das Observable bis der User pausiert.

11.4.3 Combination Operators

combineLatest kombiniert die neuesten Werte mehrerer Observables:

import { combineLatest, interval } from 'rxjs';
import { map } from 'rxjs/operators';

const first$ = interval(1000);
const second$ = interval(1500);

combineLatest([first$, second$]).pipe(
  map(([a, b]) => `First: ${a}, Second: ${b}`)
).subscribe(console.log);

combineLatest emittiert erst wenn alle Source-Observables mindestens einmal emittiert haben. Dann emittiert es bei jeder Emission einer Source, mit den neuesten Werten aller Sources.

merge interleaved alle Sources:

import { merge, interval } from 'rxjs';
import { map } from 'rxjs/operators';

const first$ = interval(1000).pipe(map(x => `First: ${x}`));
const second$ = interval(1500).pipe(map(x => `Second: ${x}`));

merge(first$, second$).subscribe(console.log);
// Output: First: 0, Second: 0, First: 1, Second: 1, First: 2...

forkJoin wartet bis alle Sources completen, dann emittiert die letzten Werte:

import { forkJoin, of } from 'rxjs';
import { delay } from 'rxjs/operators';

const first$ = of('First').pipe(delay(1000));
const second$ = of('Second').pipe(delay(1500));

forkJoin([first$, second$]).subscribe(([a, b]) => {
  console.log(a, b); // Output nach 1500ms: First Second
});

forkJoin ist ähnlich zu Promise.all – es parallelisiert asynchrone Operations und aggregiert Results.

11.5 Flattening Operators: Die kritischen Higher-Order Operators

Viele Scenarios produzieren “Observable of Observables”. Ein Search-Input emittiert Strings. Jeder String triggert einen HTTP-Request, der ein Observable ist. Das Result ist ein Observable das Observables emittiert – ein Higher-Order Observable.

Flattening Operators subscriben automatisch inner Observables und flattern das Result zu einem single-level Observable.

11.5.1 switchMap: Cancel Previous

switchMap cancelt die vorherige inner Observable bei jeder neuen Emission:

import { fromEvent } from 'rxjs';
import { switchMap, debounceTime, map } from 'rxjs/operators';

fromEvent(searchInput, 'input').pipe(
  debounceTime(300),
  map(event => event.target.value),
  switchMap(query => httpClient.get(`/api/search?q=${query}`))
).subscribe(results => displayResults(results));

Bei jeder Eingabe wird der vorherige Request gecancelt. Dies verhindert Race Conditions wo ein langsamer Request Results für einen veralteten Query liefert.

11.5.2 mergeMap: Parallel Execution

mergeMap (alias flatMap) subscribed alle inner Observables parallel:

import { of } from 'rxjs';
import { mergeMap, delay } from 'rxjs/operators';

of(1, 2, 3).pipe(
  mergeMap(x => of(x * 10).pipe(delay(x * 1000)))
).subscribe(console.log);
// Output: 10 (nach 1s), 20 (nach 2s), 30 (nach 3s)

Alle drei inner Observables laufen parallel. Results emittieren in completion order, nicht source order.

11.5.3 concatMap: Sequential Execution

concatMap subscribed inner Observables sequentiell:

import { of } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';

of(1, 2, 3).pipe(
  concatMap(x => of(x * 10).pipe(delay(1000)))
).subscribe(console.log);
// Output: 10 (nach 1s), 20 (nach 2s), 30 (nach 3s)

Jedes inner Observable muss completen bevor das nächste startet. Results sind in source order garantiert.

11.5.4 exhaustMap: Ignore While Busy

exhaustMap ignoriert neue Emissions während ein inner Observable läuft:

import { fromEvent, of } from 'rxjs';
import { exhaustMap, delay } from 'rxjs/operators';

fromEvent(button, 'click').pipe(
  exhaustMap(() => saveData().pipe(delay(2000)))
).subscribe(result => console.log('Saved:', result));

Clicks während eines laufenden Save-Operations werden ignoriert. Dies verhindert duplicate Submissions.

Die Wahl zwischen diesen Operators ist kritisch:

Operator Verwendung Beispiel
switchMap Latest value matters Search, Auto-complete
mergeMap All values matter Logging, Analytics
concatMap Order matters Sequential Operations
exhaustMap Ignore during execution Form Submission, Button Clicks

11.6 Subjects: Multicast Observables

Normale Observables sind unicast – jede Subscription erhält eine separate Execution. Subjects sind multicast – alle Subscriptions teilen sich eine Execution.

Ein Subject ist gleichzeitig Observable und Observer. Es kann subscribed werden wie ein Observable und Werte emittieren wie ein Observer.

import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe(x => console.log('Observer A:', x));
subject.subscribe(x => console.log('Observer B:', x));

subject.next(1);
subject.next(2);

// Output:
// Observer A: 1
// Observer B: 1
// Observer A: 2
// Observer B: 2

11.6.1 BehaviorSubject: Current Value

BehaviorSubject speichert den aktuellen Wert. Neue Subscribers erhalten sofort den letzten emittierten Wert:

import { BehaviorSubject } from 'rxjs';

const state$ = new BehaviorSubject({ count: 0 });

state$.subscribe(state => console.log('A:', state.count));

state$.next({ count: 1 });

state$.subscribe(state => console.log('B:', state.count));

// Output:
// A: 0
// A: 1
// B: 1

Dies ist ideal für State Management. Die aktuelle State ist immer verfügbar via getValue():

const currentState = state$.getValue();

11.6.2 ReplaySubject: Replay History

ReplaySubject speichert eine History von Emissions. Neue Subscribers erhalten die letzten N Werte:

import { ReplaySubject } from 'rxjs';

const replay$ = new ReplaySubject(3); // Speichere 3 Werte

replay$.next(1);
replay$.next(2);
replay$.next(3);
replay$.next(4);

replay$.subscribe(x => console.log('Subscriber:', x));

// Output:
// Subscriber: 2
// Subscriber: 3
// Subscriber: 4

Dies ist nützlich für Caching oder Event-History.

11.7 Error Handling

Observables können Errors emittieren via error(). Dies terminiert das Observable. Error-Handling ist essentiell für Production-Code.

11.7.1 catchError: Graceful Degradation

catchError fängt Errors und returnt ein Fallback-Observable:

import { of, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';

throwError(() => new Error('Something went wrong')).pipe(
  catchError(err => {
    console.error('Error caught:', err);
    return of({ error: true, data: [] });
  })
).subscribe({
  next: data => console.log('Data:', data),
  error: err => console.error('Unhandled:', err) // Wird nie erreicht
});

Das Observable recovered von dem Error und emittiert Fallback-Data. Der Error propagiert nicht zum Subscriber.

11.7.2 retry: Automatic Retry

retry resubscribet automatisch bei Errors:

import { ajax } from 'rxjs/ajax';
import { retry, catchError } from 'rxjs/operators';
import { of } from 'rxjs';

ajax.getJSON('/api/data').pipe(
  retry(3), // Versuche bis zu 3 mal
  catchError(err => {
    console.error('Failed after 3 retries:', err);
    return of({ error: true });
  })
).subscribe(data => console.log(data));

retry ist nützlich für transient Network-Errors. Für exponential Backoff gibt es retryWhen.

11.8 Cold vs. Hot Observables

Diese Unterscheidung ist fundamental für Performance und Verhalten.

Cold Observables starten Production bei jedem Subscribe. Jeder Subscriber erhält eine unabhängige Execution:

import { Observable } from 'rxjs';

const cold$ = new Observable(subscriber => {
  console.log('Execution started');
  subscriber.next(Math.random());
  subscriber.complete();
});

cold$.subscribe(x => console.log('A:', x));
cold$.subscribe(x => console.log('B:', x));

// Output:
// Execution started
// A: 0.123...
// Execution started
// B: 0.456...

HTTP-Requests via Angular’s HttpClient sind cold. Jeder Subscribe triggert einen neuen Request.

Hot Observables produzieren unabhängig von Subscriptions. Subscribers teilen sich die Execution:

import { fromEvent } from 'rxjs';

const clicks$ = fromEvent(document, 'click');

clicks$.subscribe(event => console.log('A:', event));
clicks$.subscribe(event => console.log('B:', event));

// Ein Click triggert beide Subscriptions

DOM-Events, WebSockets sind hot. Die Events passieren unabhängig von Subscriptions.

Ein cold Observable kann hot gemacht werden via Multicasting:

import { interval } from 'rxjs';
import { share } from 'rxjs/operators';

const cold$ = interval(1000);
const hot$ = cold$.pipe(share());

hot$.subscribe(x => console.log('A:', x));

setTimeout(() => {
  hot$.subscribe(x => console.log('B:', x));
}, 2500);

// A: 0, A: 1, A: 2, B: 2, A: 3, B: 3...

share macht das Observable hot. Der zweite Subscriber sieht nur Werte ab Subscription-Zeitpunkt.

11.9 Unsubscription und Memory Leaks

Observables die nicht completen müssen unsubscribed werden. Sonst resultiert ein Memory Leak.

import { interval } from 'rxjs';

const subscription = interval(1000).subscribe(x => console.log(x));

// Später cleanup
subscription.unsubscribe();

In Angular-Komponenten ist dies kritisch:

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs';

@Component({
  selector: 'app-data',
  template: `<div>{{ data }}</div>`
})
export class DataComponent implements OnInit, OnDestroy {
  data: any;
  private subscription = new Subscription();
  
  ngOnInit() {
    this.subscription.add(
      dataService.getData().subscribe(data => this.data = data)
    );
    
    this.subscription.add(
      timer(1000).subscribe(() => this.refresh())
    );
  }
  
  ngOnDestroy() {
    this.subscription.unsubscribe();
  }
}

Die Subscription aggregiert multiple Sub-Subscriptions. Ein einzelnes unsubscribe() cleaned alle.

Angular’s AsyncPipe handlet Unsubscription automatisch:

@Component({
  template: `<div>{{ data$ | async }}</div>`
})
export class DataComponent {
  data$ = dataService.getData();
}

Kein ngOnDestroy nötig. Die Pipe subscribet bei Component-Init und unsubscribet bei Component-Destroy.

11.10 Practical Patterns in Angular

11.10.1 Smart Search Implementation

Eine production-ready Search-Komponente:

import { Component } from '@angular/core';
import { FormControl } from '@angular/forms';
import { debounceTime, distinctUntilChanged, switchMap, catchError, startWith } from 'rxjs/operators';
import { of } from 'rxjs';

@Component({
  selector: 'app-search',
  template: `
    <input [formControl]="searchControl" placeholder="Search...">
    
    @if (loading$ | async) {
      <div class="spinner">Loading...</div>
    }
    
    @if (results$ | async; as results) {
      <div class="results">
        @for (result of results; track result.id) {
          <div class="result-item">{{ result.name }}</div>
        }
      </div>
    }
    
    @if (error$ | async; as error) {
      <div class="error">{{ error }}</div>
    }
  `
})
export class SearchComponent {
  searchControl = new FormControl('');
  
  loading$ = new BehaviorSubject(false);
  error$ = new BehaviorSubject<string | null>(null);
  
  results$ = this.searchControl.valueChanges.pipe(
    startWith(''),
    debounceTime(300),
    distinctUntilChanged(),
    tap(() => {
      this.loading$.next(true);
      this.error$.next(null);
    }),
    switchMap(query => 
      this.searchService.search(query).pipe(
        catchError(err => {
          this.error$.next('Search failed');
          return of([]);
        })
      )
    ),
    tap(() => this.loading$.next(false))
  );
}

Dieses Pattern handled Loading-State, Error-State und Debouncing. Der switchMap cancelt vorherige Requests automatisch.

11.10.2 State Management mit BehaviorSubject

Ein Service für Application-State:

import { Injectable } from '@angular/core';
import { BehaviorSubject } from 'rxjs';
import { map } from 'rxjs/operators';

interface AppState {
  user: User | null;
  cart: CartItem[];
  notifications: Notification[];
}

@Injectable({ providedIn: 'root' })
export class StateService {
  private state$ = new BehaviorSubject<AppState>({
    user: null,
    cart: [],
    notifications: []
  });
  
  // Selectors
  readonly user$ = this.state$.pipe(map(state => state.user));
  readonly cart$ = this.state$.pipe(map(state => state.cart));
  readonly cartItemCount$ = this.cart$.pipe(map(cart => cart.length));
  
  // Actions
  setUser(user: User) {
    this.updateState({ user });
  }
  
  addToCart(item: CartItem) {
    const cart = [...this.state$.value.cart, item];
    this.updateState({ cart });
  }
  
  private updateState(partial: Partial<AppState>) {
    this.state$.next({
      ...this.state$.value,
      ...partial
    });
  }
}

Komponenten subscriben Selectors für reactive Updates. Actions sind imperative Mutations auf immutable State.

11.10.3 Combining Multiple Data Sources

Ein Dashboard aggregiert Daten aus verschiedenen Services:

@Component({
  template: `
    @if (dashboard$ | async; as dashboard) {
      <h1>Welcome {{ dashboard.userName }}</h1>
      <div>Orders: {{ dashboard.orderCount }}</div>
      <div>Cart: {{ dashboard.cartItems }}</div>
      <div>Status: {{ dashboard.onlineStatus }}</div>
    }
  `
})
export class DashboardComponent {
  dashboard$ = combineLatest([
    this.userService.currentUser$,
    this.orderService.orders$,
    this.cartService.cart$,
    timer(0, 30000).pipe(map(() => navigator.onLine))
  ]).pipe(
    map(([user, orders, cart, online]) => ({
      userName: user?.name || 'Guest',
      orderCount: orders.length,
      cartItems: cart.length,
      onlineStatus: online ? 'Online' : 'Offline'
    }))
  );
}

combineLatest synchronisiert alle Sources. Jede Änderung triggert ein Dashboard-Update.

RxJS transformiert Asynchronität von einem Implementation-Detail zu einer First-Class-Abstraktion. Observables kapseln Event-Streams als composable Values. Operators sind deklarative Transformationen dieser Streams. Das Result ist Code der lesbarer, testbarer und wartbarer ist als imperative Callbacks. Die Lernkurve ist steil – das Observer-Pattern, funktionale Operators, Hot vs. Cold Semantics erfordern Umdenken. Aber für komplexe asynchrone Szenarien ist RxJS unverzichtbar, besonders in Angular wo es tief in die Architektur integriert ist.