Let’s build a custom resource!

Disclaimer: Written when Angular was at version 19.0.5

The new experimental Resource API (at point of writing this) is something I really like to play around with. It’s similar to my “Signalize async service calls” example which I use every day (even before there were signals, when we used the AsyncPipe to pull the current state into the template).

But more important than the resource function is the Resource<T> type as the base of all APIs which produces some kind of “resource”. And because the resource and even the rxResource function only accepts “reactive functions” (think “signal getters”) as source of the request they are a bit limited in usage. But they are new and experimental - which means we have to experiment and find out how they can be improved. And we can expect that they improve over time!

These are the two limitations I want to talk about:

In my applications often the frontend connects to the backend via a websocket so it can receive updates whenever another user is mutating some state. So the “pure” resource doesn’t really fit. Or a simpler example can be an HTTP request with progress events. Currently the resource would only receive the first progress event and then cancels the stream - not what we want or expect.

But the Resource<T> type seems to be the right interface. If I create an own resource function it should return this type, because every Angular developer will know how to handle that in a template.

Where we start

It all starts with a request. But not only with a single request, but a stream of requests (of course the stream can be short and can contain only one request). For now the most commonly used way to describe such a stream is an (rxjs) observable. I will use the “stream notation” with the dollar sign to distinguish the stream from the single request object.

const request$: Observable<TRequest>;

And then we have a “loader” which transforms each request into a stream of responses.

type LoaderFn = (request: TRequest) => Observable<TResponse>;

These are the first two properties of our StreamAggregateResourceOptions.

The core of processing such a request stream (especially when “loading” instead of “posting”) is the usual switchMap. Whenever a new request gets emitted the loader function will switch to a new stream of responses.

const response$: Observable<TResponse> = request$.pipe(
  switchMap((request) => loader(request))
);

Don’t worry, we will add error handling etc. later, because I don’t like too simple examples, where I have to add all those bits before I can actually use it in production. And error handling is “built in” in the Resource<T> - so we don’t get around it anyway.

Where we want to end

We want to produce a Resource<T>, which looks like this:

enum ResourceStatus {
  Idle,
  Error,
  Loading,
  Reloading,
  Resolved,
  Local,
}

interface Resource<T> {
  readonly value: Signal<T | undefined>;
  readonly status: Signal<ResourceStatus>;
  readonly error: Signal<unknown>;
  readonly isLoading: Signal<boolean>;
  hasValue(): boolean;
  reload(): boolean;
}

I think it’s rather self-explanatory. Of course everything is a Signal because it can change during the lifetime of the resource. And signals are “the way” to track (and react to) changes.

The value contains the current (or last) value of the resource. It can be undefined, because we start with an “empty” resource before any request got emitted or returned a response. I will get back to this later, when I introduce the initialValue of my “stream aggregate resource” implementation.

The status is the detailed information about the state the resource is currently in. Derived from that there’s the isLoading signal, which should be true when the resource is in Loading or Reloading state.

And the error will contain the last error, whatever that will be. We will use this for our error handling of our observable stream, because you always want to handle errors in your streams otherwise they stop working.

The hasValue() function is a helper that we can use inside the template to only include the part of the view, which displays the resource’s value, when we actually have a value.

And finally the reload() function enables us to provide some kind of reload mechanism, which we might want to trigger in some circumstances. This is the only property of Resource<T> which I’m not really convinced of, that it’s included in this type. We’re allowed to ignore it - if a reload can’t be triggered, this function just can return false and do nothing. But of course, we will get to it later, too - because implementing a reload behavior in an observable stream is not that difficult…

Connecting the dots

The ResourceStatus enum is our map to follow from start to end. For each kind of state we’ll think about how the stream can get into that state.

In the end we will hopefully get a stream of objects with a status, value and optional error. Because every other field in the Resource<T> can be derived from these three.

And to bridge the gap between the Observable to the Signal world, we will just subscribe to the observable and push every new value into a writeable signal, which we’ll expose in the shape of an Resource<T>.

Idle

This is pretty easy. It’s the state the resource starts in. So we have to think about its initial state regarding the other fields.

{
  status: ResourceStatus.Idle,
  value: undefined,
  error: undefined,
}

There’s one thing I don’t really like. If this is our initial state, then we have to deal with undefined values in the template. You can always do that with something like

@if (resource.value(); as val) {
  <!-- use val inside this block -->
} @else {
  <!-- optionally show some "idle" content -->
}

But most of the time, I will write something like this in the component to avoid undefined in the template:

protected readonly value = computed(() => {
  const value = this.resource.value();
  return value ?? {
    // some initial value like an empty array when displaying a list of item etc.
  };
});

And rumors say the Angular team is considering some kind of “initial” or “default” value.

Since the purpose of this custom resource is to aggregate values over time like rxjs’ scan or Array.prototype.reduce do we will need some initial value anyway - so let’s add that to our StreamAggregateResourceOptions.

We will also distinguish the actual shape of the resource TResource from the kind of values TResponse our loader returns. With that we get some flexibility inside our aggregate function.

And we will use the “undefined” request as a trigger to re-enter the Idle state, so we add that to the type definition.

type StreamAggregateResourceOptions<TResource, TRequest, TResponse = TResource> = {
  readonly initialValue: TResource;
  readonly request: Observable<TRequest | undefined>;
  readonly loader: (request: NoInfer<TRequest>) => Observable<TResponse>;
  readonly aggregate: (accumulator: TResource, current: TResponse) => TResource;
};

With this we can build the startpoint of our stream.

// the signal we will push our stream values into
const source = signal({
  status: ResourceStatus.Idle,
  value: options.initialValue,
  error: undefined as unknown,
});

// derived signals from which we will build our Resource<TResource>
const status = computed(() => source().status);
const value = computed(() => source().value);
const error = computed(() => source().error);
const isLoading = computed(() => status() === ResourceStatus.Loading || status() === ResourceStatus.Reloading);
// our resource will always have a value!
// type signature is needed, because "() => boolean" is incompatible with Resource.hasValue
const hasValue = (): this is Resource<TResource> & { value: Signal<TResource>; } => true;
// no reload functionality for now
const reload = () => false;

options.request.pipe(
  switchMap((request) => {
    if (request === undefined) {
      return of({
        status: ResourceStatus.Idle,
        // preserve the last "value" of the resource
        value: undefined,
        // clear any old error
        error: undefined,
      });
    }

    // TODO transform request into response
    return ...
  }),
)
.subscribe({ // TODO Think about unsubscribe!
  next: nextState => source.update(previous => {
    // if we don't have a new value leave the old one in the resource
    // otherwise aggregate the current response into the next value
    const nextValue = nextState.value === undefined
      ? previous.value
      : options.aggregate(previous.value, nextState.value)

    return {
      status: nextState.status,
      value: nextValue,
      error: nextState.error,
    };
  }),
});

Resovled

Inside the switchMap in the code above there’s the “else” branch missing. That’s the place where we will transform the incoming requests into responses utilizing the loader function.

options.request.pipe(
  switchMap((request) => {
    if (request === undefined) {
      return ...
    }

    return options.loader(request).pipe(
      map(response => ({
        status: ResourceStatus.Resolved,
        value: response,
        error: undefined,
      })));
  }),
)
.subscribe(...);

The response(s) returned by the loader are combined with the Resolved status and since this is a “success”, any old error is cleared with setting it to undefined.

Loading

Whenever we start a new response stream on a new request, we want to represent that with the Loading status. As rxjs veterans we know the right operator for that: startWith

For now we ignore Reloading, we’ll get to that later…

options.request.pipe(
  switchMap((request) => {
    if (request === undefined) {
      return ...
    }

    return loader(request).pipe(
      map(response => ({
        ...
      })),
      startWith({
        status: ResourceStatus.Loading,
        value: undefined,
        error: undefined,
      }));
  }),
)
.subscribe(...);

Error

The two most important things to think about when working with observable streams are:

  • What to do when an error happens (and where can an error happen)?
  • On which condition should the stream complete (think “unsubscribe”)?

Let’s focus on error handling.

Obviously the loader is a place where errors can happen. But it’s not that obvious, that there can be two different kind of errors.

The first is the “easy” one: the stream can throw because of a network error or whatever. And we know, what to do: catchError

And luckily the shape of our stream makes it easy to integrate that. Because the hardest thing to think about when working with catchError is to decide, what it should return. In this case we just use the Error status.

options.request.pipe(
  switchMap((request) => {
    if (request === undefined) {
      return ...
    }

    return loader(request).pipe(
      map(response => ({
        ...
      })),
      startWith({
        ...
      }),
      catchError((error) => of({
        status: ResourceStatus.Error,
        value: undefined,
        error,
      })));
  }),
)
.subscribe(...);

We don’t think about decoding the error into something “displayable” (but we can if we want). In my opinion that’s the task of something different, like a custom ErrorPipe.

But why is the catchError placed on the “inner” observable instead of after the switchMap on the “outer” observable? For this we have to take into account what kind of contract an observable is bound to:

  • It may emit zero, one or multiple values to the next function of the observer.
  • It may complete (calling the complete function of the observer).
  • It may error (calling the error function of the observer).
  • After a stream completes or errors it will never emit anything again.

And this last piece of the contract is why we put the catchError on the inner observable. We want to catch any error where we can recover from. When the inner observable errors it won’t emit anything againg. But when a new request comes in (maybe because of a reload, what we’ll implement later) a new inner stream is started. And since our outer stream doesn’t see the error, it will be always in a valid state.

And this is the hint we need to detect the second kind of error we have to handle. The inner observable can emit an error, but the loader function itself can throw an error, before it is returning the next inner stream. So we have to catch that with the classic try/catch.

options.request.pipe(
  switchMap((request) => {
    if (request === undefined) {
      return ...
    }

    try {
      return loader(request).pipe(
        map(response => ({
          ...
        })),
        startWith({
          ...
        }),
        catchError((error) => of({
          status: ResourceStatus.Error,
          value: undefined,
          error,
        })));
    } catch (error) {
      return of({
        status: ResourceStatus.Error,
        value: undefined,
        error,
      })
    }
  }),
)
.subscribe(...);

The code gets longer - but there’s one trick: refactor!

Interlude: Refactoring

The inner observable gets somewhat long and there’s a lot repetition there. In such a case I refactor all these values into small “value generator functions”.

First I define a type for the values:

type StreamValue<TResponse> = {
  readonly status: ResourceStatus;
  readonly value: TResponse | undefined;
  readonly error: unknown;
};

And then I create a function for each kind of values - sometimes we have the luck, that some values can just be constants. That can be reused and reduce the memory footprint. But we may have to be creative with naming to avoid double identifiers.

const idleValue: StreamValue<undefined> = {
  status: ResourceStatus.Idle,
  value: undefined,
  error: undefined,
};

const resolvedValue = <TResponse>(
  value: TResponse,
): StreamValue<TResponse> => ({
  status: ResourceStatus.Resolved,
  value,
  error: undefined,
});

const errorValue = (error: unknown): StreamValue<undefined> => ({
  status: ResourceStatus.Error,
  value: undefined,
  error,
});

const loadingValue: StreamValue<undefined> = {
  status: ResourceStatus.Loading,
  value: undefined,
  error: undefined,
};

const reloadingValue: StreamValue<undefined> = {
  status: ResourceStatus.Reloading,
  value: undefined,
  error: undefined,
};

With this in place we can reduce the code to something like this:

options.request.pipe(
  switchMap((request) => {
    if (request === undefined) {
      return of(idleValue);
    }

    try {
      return options.loader(request).pipe(
        map((response) => resolvedValue(response)),
        startWith(loadingValue),
        catchError((error) => of(errorValue(error))),
      );
    } catch (error) {
      return of(errorValue(error));
    }
  }),
)
.subscribe(...);

Don’t forget to unsubscribe

We have one TODO in our code: Think about unsubscribe!

And that’s a really important TODO. Whenever we manually subscribe to an observable we always have to be sure it gets unsubscribed somehow. And since we are in an Angular environment, the solution is pretty clear: takeUntilDestroyed

For this, we need either a DestroyRef or an injection context, so the takeUntilDestroyed is able to get it by itself. So we just add this as an optional parameter to our StreamAggregateResourceOptions.

type StreamAggregateResourceOptions<TResource, TRequest, TResponse = TResource> = {
  ...
  readonly destroyRef?: DestroyRef;
};

With this in place we can avoid the typical memory leak:

options.request.pipe(
  switchMap((request) => {
    ...
  }),
  takeUntilDestroyed(options.destroyRef)
)
.subscribe(...);

Error & Idle (part 2)

Now that the subscribe gets into our focus again, we know that the observer passed to this function also has an error handler. We hope that it will never gets called, because that would mean, that our stream will stop working. But since we don’t want our users not to show any (unhandled) error, we will use that callback to set it into the resource’s state.

.subscribe({
  next: ...,
  error: error => source.update(previous => {
    return {
      ...previous,
      status: ResourceStatus.Error,
      error,
    };
  }),
});

And while we’re here, we can set the resource to Idle when our outer stream completes, because it won’t change after that.

.subscribe({
  next: ...,
  error: ...,
  complete: () => source.update(previous => {
    return {
      ...previous,
      status: ResourceStatus.Idle,
    };
  }),
});

Advanced features

Local

It’s possible to extend this example with a Local functionality. But I haven’t needed it yet, so if you do - extend!

Reloading

This is something what I find useful. And while I implemented this I discovered that the “reload” feature should be extracted into a custom rxjs operator. Because it’s something we may want to use in totally different situations.

I don’t want to get into details about it, here’s the code. It’s a bit tricky to differentiate between a “next request” and “reload is triggered” and there may be better/simpler ways. But this works for me.

import { Observable, combineLatest, map, scan, startWith } from 'rxjs';

export const combineReload = <T>(
  source: Observable<T>,
  reload: Observable<unknown>,
): Observable<{ value: T; isReload: boolean }> => {
  return combineLatest([
    source,
    reload.pipe(
      map((_, index) => index),
      startWith(-1),
    ),
  ]).pipe(
    scan(
      ({ lastReloadIndex }, [value, reloadIndex]) => ({
        value,
        isReload: lastReloadIndex !== reloadIndex,
        lastReloadIndex: reloadIndex,
      }),
      {
        value: undefined as T,
        isReload: false,
        lastReloadIndex: -1,
      },
    ),
    // drop lastReloadIndex
    map(({ value, isReload }) => ({ value, isReload })),
  );
};

It takes the original observable and a trigger observable. The trigger should be truly asynchronous so it doesn’t emit on subscribe. This should be more natural, because I guess the most common implementation of a reload trigger is a button click. And this is a stream which will only emit, when the button is actually clicked.

The reload trigger is transformed into a sequence of unique values. So whenever they differ, we can be sure that the emission is due to a reload. When they are equal the original source has emitted a new value. These two informations are combined and emitted as an object with the shape { value: T; isReload: boolean }.

Now that we know if the “new” request is triggered by a reload or is actually new, we can refine the Loading state to show if it’s a Reloading.

And because we only want to trigger a reload when we are in the error state, we add a little guard.

...
const reload$ = new Subject<void>();
const reload = () => {
  if (untracked(status) === ResourceStatus.Error) {
    reload$.next();
    return true;
  }
  return false;
};

combineReload(options.request, reload$).pipe(
  switchMap(({ value: request, isReload }) => {
    if (request === undefined) {
      return of(idleValue);
    }

    try {
      return options.loader(request).pipe(
        map((response) => resolvedValue(response)),
        startWith(isReload ? reloadingValue : loadingValue),
        catchError((error) => of(errorValue(error))),
      );
    } catch (error) {
      return of(errorValue(error));
    }
  }),
)
.subscribe(...);

Summary

And here’s the full code. It seems long, but it’s doing also a lot. But not so much we can’t reason about it.

  • We start with a stream of requests.
  • We transformed them into a stream of responses.
  • We aggregate all responses into one resource value.
  • When we know that the stream is “loading” we reflect that into the resource’s state.
  • And we catch all errors (we can think of) and also use the resource state to report them to the user.
  • And as a sideeffect we got some nice litte combineReload utility.

Now that we know how to create a custom resource nothing will stop us to create even more.

Have fun!

import {
  DestroyRef,
  Resource,
  ResourceStatus,
  Signal,
  computed,
  signal,
  untracked,
} from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

import {
  Subject,
  Observable,
  catchError,
  map,
  of,
  startWith,
  switchMap,
} from 'rxjs';

import { combineReload } from './combine-reload';

export type StreamAggregateResourceOptions<
  TResource,
  TRequest,
  TResponse = TResource,
> = {
  readonly initialValue: TResource;
  readonly request: Observable<TRequest | undefined>;
  readonly loader: (request: NoInfer<TRequest>) => Observable<TResponse>;
  readonly aggregate: (accumulator: TResource, current: TResponse) => TResource;
  readonly destroyRef?: DestroyRef;
};

type StreamValue<TResponse> = {
  readonly status: ResourceStatus;
  readonly value: TResponse | undefined;
  readonly error: unknown;
};

const idleValue: StreamValue<undefined> = {
  status: ResourceStatus.Idle,
  value: undefined,
  error: undefined,
};

const resolvedValue = <TResponse>(
  value: TResponse,
): StreamValue<TResponse> => ({
  status: ResourceStatus.Resolved,
  value,
  error: undefined,
});

const errorValue = (error: unknown): StreamValue<undefined> => ({
  status: ResourceStatus.Error,
  value: undefined,
  error,
});

const loadingValue: StreamValue<undefined> = {
  status: ResourceStatus.Loading,
  value: undefined,
  error: undefined,
};

const reloadingValue: StreamValue<undefined> = {
  status: ResourceStatus.Reloading,
  value: undefined,
  error: undefined,
};

/**
 * Aggregates all responses from the loader into the value of the resource.
 * A new request triggers a new call of the loader.
 * If not called in an injection context a DestroyRef must be provided.
 * The reload method will only work if the resource is in error state.
 */
export const streamAggregateResource = <
  TResource,
  TRequest,
  TResponse = TResource,
>(
  options: StreamAggregateResourceOptions<TResource, TRequest, TResponse>,
): Resource<TResource> => {
  const source = signal({
    status: ResourceStatus.Idle,
    value: options.initialValue,
    error: undefined as unknown,
  });

  const status = computed(() => source().status);
  const value = computed(() => source().value);
  const error = computed(() => source().error);
  const isLoading = computed(
    () =>
      status() === ResourceStatus.Loading ||
      status() === ResourceStatus.Reloading,
  );
  const hasValue = (): this is Resource<TResource> & {
    value: Signal<TResource>;
  } => true;

  const reload$ = new Subject<void>();
  const reload = () => {
    if (untracked(status) === ResourceStatus.Error) {
      reload$.next();
      return true;
    }
    return false;
  };

  combineReload(options.request, reload$)
    .pipe(
      switchMap(({ value: request, isReload }) => {
        if (request === undefined) {
          return of(idleValue);
        }

        try {
          return options.loader(request).pipe(
            map((response) => resolvedValue(response)),
            startWith(isReload ? reloadingValue : loadingValue),
            catchError((error) => of(errorValue(error))),
          );
        } catch (error) {
          return of(errorValue(error));
        }
      }),
      takeUntilDestroyed(options.destroyRef),
    )
    .subscribe({
      next: (nextState) =>
        source.update((previous) => {
          const nextValue =
            nextState.value === undefined
              ? previous.value
              : options.aggregate(previous.value, nextState.value);

          return {
            status: nextState.status,
            value: nextValue,
            error: nextState.error,
          };
        }),
      error: (error) =>
        source.update((previous) => ({
          ...previous,
          status: ResourceStatus.Error,
          error,
        })),
      complete: () =>
        source.update((previous) => ({
          ...previous,
          status: ResourceStatus.Idle,
        })),
    });

  return {
    status,
    value,
    error,
    isLoading,
    hasValue,
    reload,
  };
};

Bonus: Tests

Here are some tests - we use the ability of observables to be synchronous, so they are easy to write. They are not exhaustive, but a good starting point for your own tests.

import { DestroyRef, ResourceStatus, untracked } from '@angular/core';
import { TestBed } from '@angular/core/testing';

import { NEVER, Observable, Subject, from, of } from 'rxjs';

import { streamAggregateResource } from './stream-aggregate-resource';

describe('streamAggreateResource', () => {
  it('should have the initialValue before the first request', () => {
    const initialValue: readonly string[] = [];

    const resource = streamAggregateResource({
      initialValue,
      request: NEVER,
      loader: (request) => of(`response to ${request}`),
      aggregate: (acc, current) => [...acc, current],
      destroyRef: TestBed.inject(DestroyRef),
    });

    expect(untracked(resource.status)).toEqual(ResourceStatus.Idle);
    expect(untracked(resource.hasValue)).toBeTruthy();
    expect(untracked(resource.value)).toEqual(initialValue);
  });

  it('should be in loading state while loader processes request', () => {
    const expectedRequest = 'request';
    const initialValue: readonly string[] = [];

    const resource = streamAggregateResource({
      initialValue,
      request: of(expectedRequest),
      loader: (actualRequest): Observable<string> => {
        expect(actualRequest).toEqual(expectedRequest);
        return NEVER;
      },
      aggregate: (acc, current) => [...acc, current],
      destroyRef: TestBed.inject(DestroyRef),
    });

    expect(untracked(resource.status)).toEqual(ResourceStatus.Loading);
    expect(untracked(resource.hasValue)).toBeTruthy();
    expect(untracked(resource.value)).toEqual(initialValue);
  });

  it('should be in resolved state after loader returned response', () => {
    const expectedRequest = 'request';
    const expectedResponse = 'response';
    const initialValue: readonly string[] = [];

    const resource = streamAggregateResource({
      initialValue,
      request: of(expectedRequest),
      loader: (actualRequest): Observable<string> => {
        expect(actualRequest).toEqual(expectedRequest);
        return of(expectedResponse);
      },
      aggregate: (acc, current) => [...acc, current],
      destroyRef: TestBed.inject(DestroyRef),
    });

    expect(untracked(resource.status)).toEqual(ResourceStatus.Resolved);
    expect(untracked(resource.hasValue)).toBeTruthy();
    expect(untracked(resource.value)).toEqual([expectedResponse]);
  });

  it('should aggregate all values returned by the loader', () => {
    const expectedRequest = 'request';
    const expectedResponses = ['response 1', 'response 2', 'response 3'];
    const initialValue: readonly string[] = [];

    const resource = streamAggregateResource({
      initialValue,
      request: of(expectedRequest),
      loader: (actualRequest): Observable<string> => {
        expect(actualRequest).toEqual(expectedRequest);
        return from(expectedResponses);
      },
      aggregate: (acc, current) => [...acc, current],
      destroyRef: TestBed.inject(DestroyRef),
    });

    expect(untracked(resource.status)).toEqual(ResourceStatus.Resolved);
    expect(untracked(resource.hasValue)).toBeTruthy();
    expect(untracked(resource.value)).toEqual(expectedResponses);
  });

  it('should be in error state when loader throws', () => {
    const expectedRequest = 'request';
    const initialValue: readonly string[] = [];
    const error = 'error';

    const resource = streamAggregateResource({
      initialValue,
      request: of(expectedRequest),
      loader: (actualRequest): Observable<string> => {
        expect(actualRequest).toEqual(expectedRequest);
        throw error;
      },
      aggregate: (acc, current) => [...acc, current],
      destroyRef: TestBed.inject(DestroyRef),
    });

    expect(untracked(resource.status)).toEqual(ResourceStatus.Error);
    expect(untracked(resource.hasValue)).toBeTruthy();
    expect(untracked(resource.value)).toEqual(initialValue);
    expect(untracked(resource.error)).toEqual(error);
  });

  it('should be in reloading state when the resource is reloaded', () => {
    const request = new Subject<string>();
    const expectedRequest = 'request';
    const initialValue: readonly string[] = [];
    const error = 'error';

    let loaderState = 0;
    const resource = streamAggregateResource({
      initialValue,
      request,
      loader: (actualRequest): Observable<string> => {
        if (loaderState === 0) {
          loaderState++;
          throw error;
        }

        expect(actualRequest).toEqual(expectedRequest);
        return NEVER;
      },
      aggregate: (acc, current) => [...acc, current],
      destroyRef: TestBed.inject(DestroyRef),
    });

    request.next(expectedRequest);

    expect(untracked(resource.status)).toEqual(ResourceStatus.Error);
    expect(untracked(resource.hasValue)).toBeTruthy();
    expect(untracked(resource.value)).toEqual(initialValue);
    expect(untracked(resource.error)).toEqual(error);

    const isReloading = resource.reload();

    expect(isReloading).toBeTruthy();
    expect(untracked(resource.status)).toEqual(ResourceStatus.Reloading);
    expect(untracked(resource.hasValue)).toBeTruthy();
    expect(untracked(resource.value)).toEqual(initialValue);
    expect(untracked(resource.error)).toBeUndefined();
  });

  it('should be in loading state after reloading when a new request is emitted', () => {
    const request = new Subject<string>();
    const expectedRequest = 'request';
    const initialValue: readonly string[] = [];
    const error = 'error';

    let loaderState = 0;
    const resource = streamAggregateResource({
      initialValue,
      request,
      loader: (actualRequest): Observable<string> => {
        if (loaderState === 0) {
          loaderState++;
          throw error;
        }

        expect(actualRequest).toEqual(expectedRequest);
        return NEVER;
      },
      aggregate: (acc, current) => [...acc, current],
      destroyRef: TestBed.inject(DestroyRef),
    });

    request.next(expectedRequest);
    resource.reload();
    request.next(expectedRequest);

    expect(untracked(resource.status)).toEqual(ResourceStatus.Loading);
    expect(untracked(resource.hasValue)).toBeTruthy();
    expect(untracked(resource.value)).toEqual(initialValue);
    expect(untracked(resource.error)).toBeUndefined();
  });

  it('should be in idle state without error when the request is undefined', () => {
    const request = new Subject<string | null | undefined>();
    const expectedRequest = 'request';
    const expectedResponse = 'response';
    const initialValue: readonly string[] = [];
    const error = 'error';

    const resource = streamAggregateResource({
      initialValue,
      request,
      loader: (actualRequest): Observable<string> => {
        if (actualRequest === null) {
          throw error;
        }

        expect(actualRequest).toEqual(expectedRequest);
        return of(expectedResponse);
      },
      aggregate: (acc, current) => [...acc, current],
      destroyRef: TestBed.inject(DestroyRef),
    });

    request.next(expectedRequest);

    expect(untracked(resource.status)).toEqual(ResourceStatus.Resolved);
    expect(untracked(resource.hasValue)).toBeTruthy();
    expect(untracked(resource.value)).toEqual([expectedResponse]);

    request.next(null);

    expect(untracked(resource.status)).toEqual(ResourceStatus.Error);
    expect(untracked(resource.hasValue)).toBeTruthy();
    expect(untracked(resource.value)).toEqual([expectedResponse]);
    expect(untracked(resource.error)).toEqual(error);

    request.next(undefined);

    expect(untracked(resource.status)).toEqual(ResourceStatus.Idle);
    expect(untracked(resource.hasValue)).toBeTruthy();
    expect(untracked(resource.value)).toEqual([expectedResponse]);
    expect(untracked(resource.error)).toBeUndefined();
  });
});