Skip to content

risksim

Monte Carlo simulation of portfolios of risk models, with reinsurance contracts and layers. You wrap any sampling model (for example a lossmodels collective-risk model) in a PortfolioItem, collect items into a Portfolio, and simulate the aggregate — gross, retained, ceded, and by layer.

Quickstart

import lossmodels as lm
import risksim as rs

# each group is a collective-risk model
group_a = lm.CollectiveRiskModel(lm.Poisson(lam=3.0), lm.Lognormal(8.0, 1.5))
group_b = lm.CollectiveRiskModel(lm.Poisson(lam=5.0), lm.Lognormal(8.2, 1.4))

portfolio = rs.Portfolio([
    rs.PortfolioItem("group_a", group_a),
    rs.PortfolioItem("group_b", group_b),
])

result = portfolio.simulate(50_000)            # SimulationResult
result.mean()                                  # expected aggregate loss
result.prob_exceeding(1_000_000)               # P(aggregate > 1M)
result.summary()                               # gross/retained/ceded summary

Reinsurance layers

import risksim as rs

layer = rs.AggregateLayer(attachment=1_000_000, limit=2_000_000)
result = portfolio.simulate(50_000, contract=layer)
result.ceded_mean()                            # expected ceded into the layer
result.retained_mean()                         # expected retained

API reference

risksim

AggregateLayer dataclass

Aggregate annual layer applied to simulated aggregate losses.

For aggregate annual loss S, ceded loss is:

C = share * min((S - attachment)+, limit)

with no cap if limit is None.

Source code in risksim/contracts.py
@dataclass(frozen=True, slots=True)
class AggregateLayer:
    """
    Aggregate annual layer applied to simulated aggregate losses.

    For aggregate annual loss S, ceded loss is:

        C = share * min((S - attachment)+, limit)

    with no cap if limit is None.
    """

    attachment: float = 0.0
    limit: float | None = None
    share: float = 1.0
    name: str | None = None

    def __post_init__(self) -> None:
        if self.attachment < 0:
            raise ValueError("attachment must be nonnegative")

        if self.limit is not None and self.limit < 0:
            raise ValueError("limit must be nonnegative or None")

        if not (0.0 <= self.share <= 1.0):
            raise ValueError("share must be between 0 and 1")

    def ceded(self, losses: np.ndarray | list[float]) -> np.ndarray:
        gross = as_1d_float_array(losses)

        recoverable = np.maximum(gross - self.attachment, 0.0)

        if self.limit is not None:
            recoverable = np.minimum(recoverable, self.limit)

        return self.share * recoverable

    def retained(self, losses: np.ndarray | list[float]) -> np.ndarray:
        gross = as_1d_float_array(losses)
        return gross - self.ceded(gross)

    def attachment_probability(self, losses: np.ndarray | list[float]) -> float:
        """Probability that a loss reaches the layer, i.e. P(loss > attachment).

        This depends only on the attachment point, not on ``share`` or ``limit``,
        so it stays correct under degenerate parameters (e.g. ``share=0``).
        """
        gross = as_1d_float_array(losses)
        return float(np.mean(gross > self.attachment))

    def exhaustion_probability(self, losses: np.ndarray | list[float]) -> float | None:
        if self.limit is None:
            return None

        gross = as_1d_float_array(losses)
        return float(np.mean(gross >= self.attachment + self.limit))
attachment_probability
attachment_probability(
    losses: ndarray | list[float],
) -> float

Probability that a loss reaches the layer, i.e. P(loss > attachment).

This depends only on the attachment point, not on share or limit, so it stays correct under degenerate parameters (e.g. share=0).

Source code in risksim/contracts.py
def attachment_probability(self, losses: np.ndarray | list[float]) -> float:
    """Probability that a loss reaches the layer, i.e. P(loss > attachment).

    This depends only on the attachment point, not on ``share`` or ``limit``,
    so it stays correct under degenerate parameters (e.g. ``share=0``).
    """
    gross = as_1d_float_array(losses)
    return float(np.mean(gross > self.attachment))

ContractProgram

Collection of aggregate layers applied to the same gross loss.

This first version assumes the layers are intended to work together without overlap. For a standard non-overlapping tower, total ceded loss is the row-wise sum of ceded loss by layer.

Source code in risksim/contracts.py
class ContractProgram:
    """
    Collection of aggregate layers applied to the same gross loss.

    This first version assumes the layers are intended to work together
    without overlap. For a standard non-overlapping tower, total ceded loss
    is the row-wise sum of ceded loss by layer.
    """

    def __init__(
        self,
        layers: Sequence[AggregateLayer],
        name: str = "contract_program",
    ) -> None:
        if not layers:
            raise ValueError("layers must contain at least one AggregateLayer")

        self.layers = tuple(layers)
        self.name = name

    def layer_names(self) -> list[str]:
        names: list[str] = []
        for i, layer in enumerate(self.layers):
            names.append(layer.name or f"layer_{i}")
        return names

    def ceded_by_layer(self, losses: np.ndarray | list[float]) -> np.ndarray:
        gross = as_1d_float_array(losses)
        cols = [layer.ceded(gross) for layer in self.layers]
        return np.column_stack(cols)

    def ceded(self, losses: np.ndarray | list[float]) -> np.ndarray:
        by_layer = self.ceded_by_layer(losses)
        return np.sum(by_layer, axis=1)

    def retained(self, losses: np.ndarray | list[float]) -> np.ndarray:
        gross = as_1d_float_array(losses)
        return gross - self.ceded(gross)

Portfolio

Portfolio of aggregate-loss components.

This first version assumes components are sampled independently.

Source code in risksim/portfolio.py
class Portfolio:
    """
    Portfolio of aggregate-loss components.

    This first version assumes components are sampled independently.
    """

    def __init__(self, items: Sequence[PortfolioItem], name: str = "portfolio") -> None:
        if not items:
            raise ValueError("items must contain at least one PortfolioItem")

        self.items = tuple(items)
        self.name = name

    def component_names(self) -> list[str]:
        return [item.name for item in self.items]

    def sample_components(self, size: int = 1) -> np.ndarray:
        _validate_size(size)

        columns = [item.sample(size=size) for item in self.items]
        return np.column_stack(columns)

    def sample(self, size: int = 1) -> np.ndarray:
        component_losses = self.sample_components(size=size)
        return np.sum(component_losses, axis=1)

    def mean(self) -> float:
        return float(sum(item.mean() for item in self.items))

    def variance(self) -> float:
        """
        Analytic variance under the independence assumption.
        """
        return float(sum(item.variance() for item in self.items))

    def std(self) -> float:
        return float(np.sqrt(self.variance()))

    def simulate(
        self,
        size: int = 100_000,
        contract: AggregateLayer | ContractProgram | None = None,
    ) -> SimulationResult:
        component_losses = self.sample_components(size=size)
        gross_losses = np.sum(component_losses, axis=1)

        if contract is None:
            return SimulationResult(
                gross_losses=gross_losses,
                component_losses=component_losses,
                component_names=self.component_names(),
            )

        ceded_losses, retained_losses = apply_contract(gross_losses, contract)

        if isinstance(contract, ContractProgram):
            return SimulationResult(
                gross_losses=gross_losses,
                ceded_losses=ceded_losses,
                retained_losses=retained_losses,
                component_losses=component_losses,
                component_names=self.component_names(),
                layer_losses=contract.ceded_by_layer(gross_losses),
                layer_names=contract.layer_names(),
                contract_name=contract.name,
            )

        return SimulationResult(
            gross_losses=gross_losses,
            ceded_losses=ceded_losses,
            retained_losses=retained_losses,
            component_losses=component_losses,
            component_names=self.component_names(),
            contract_name=contract.name or contract.__class__.__name__,
        )
variance
variance() -> float

Analytic variance under the independence assumption.

Source code in risksim/portfolio.py
def variance(self) -> float:
    """
    Analytic variance under the independence assumption.
    """
    return float(sum(item.variance() for item in self.items))

SimulationResult dataclass

Container for portfolio simulation outputs.

If retained_losses is present, the primary losses view is retained/net loss. Otherwise, the primary losses view is gross loss.

Source code in risksim/results.py
@dataclass(slots=True)
class SimulationResult:
    """
    Container for portfolio simulation outputs.

    If retained_losses is present, the primary `losses` view is retained/net loss.
    Otherwise, the primary `losses` view is gross loss.
    """

    gross_losses: np.ndarray
    ceded_losses: np.ndarray | None = None
    retained_losses: np.ndarray | None = None
    component_losses: np.ndarray | None = None
    component_names: Sequence[str] | None = None
    layer_losses: np.ndarray | None = None
    layer_names: Sequence[str] | None = None
    contract_name: str | None = None

    def __post_init__(self) -> None:
        self.gross_losses = as_1d_float_array(self.gross_losses)

        if self.ceded_losses is not None:
            self.ceded_losses = as_1d_float_array(self.ceded_losses)
            if self.ceded_losses.shape != self.gross_losses.shape:
                raise ValueError("ceded_losses must match gross_losses shape")

        if self.retained_losses is not None:
            self.retained_losses = as_1d_float_array(self.retained_losses)
            if self.retained_losses.shape != self.gross_losses.shape:
                raise ValueError("retained_losses must match gross_losses shape")

        if self.component_losses is not None:
            self.component_losses = np.asarray(self.component_losses, dtype=float)
            if self.component_losses.ndim != 2:
                raise ValueError("component_losses must be a 2D array")
            if self.component_losses.shape[0] != self.gross_losses.shape[0]:
                raise ValueError("component_losses must have one row per simulation")

            if self.component_names is not None:
                if len(self.component_names) != self.component_losses.shape[1]:
                    raise ValueError(
                        "component_names length must match number of component columns"
                    )

        if self.layer_losses is not None:
            self.layer_losses = np.asarray(self.layer_losses, dtype=float)
            if self.layer_losses.ndim != 2:
                raise ValueError("layer_losses must be a 2D array")
            if self.layer_losses.shape[0] != self.gross_losses.shape[0]:
                raise ValueError("layer_losses must have one row per simulation")

            if self.layer_names is not None:
                if len(self.layer_names) != self.layer_losses.shape[1]:
                    raise ValueError(
                        "layer_names length must match number of layer columns"
                    )

    @property
    def n_sims(self) -> int:
        return int(self.gross_losses.size)

    @property
    def losses(self) -> np.ndarray:
        if self.retained_losses is not None:
            return self.retained_losses
        return self.gross_losses

    def mean(self) -> float:
        return metrics.mean(self.losses)

    def variance(self, ddof: int = 0) -> float:
        return metrics.variance(self.losses, ddof=ddof)

    def std(self, ddof: int = 0) -> float:
        return metrics.std(self.losses, ddof=ddof)

    def var(self, q: float) -> float:
        return metrics.var(self.losses, q)

    def tvar(self, q: float) -> float:
        return metrics.tvar(self.losses, q)

    def prob_exceeding(self, threshold: float) -> float:
        return metrics.prob_exceeding(self.losses, threshold)

    def gross_mean(self) -> float:
        return metrics.mean(self.gross_losses)

    def ceded_mean(self) -> float | None:
        if self.ceded_losses is None:
            return None
        return metrics.mean(self.ceded_losses)

    def retained_mean(self) -> float | None:
        if self.retained_losses is None:
            return None
        return metrics.mean(self.retained_losses)

    def component_means(self) -> dict[str, float]:
        if self.component_losses is None:
            return {}

        means = np.mean(self.component_losses, axis=0)

        if self.component_names is None:
            names = [f"component_{i}" for i in range(self.component_losses.shape[1])]
        else:
            names = list(self.component_names)

        return {name: float(value) for name, value in zip(names, means)}

    def layer_means(self) -> dict[str, float]:
        if self.layer_losses is None:
            return {}

        means = np.mean(self.layer_losses, axis=0)

        if self.layer_names is None:
            names = [f"layer_{i}" for i in range(self.layer_losses.shape[1])]
        else:
            names = list(self.layer_names)

        return {name: float(value) for name, value in zip(names, means)}

    def summary(self, quantiles: tuple[float, ...] = (0.95, 0.99)) -> dict[str, Any]:
        out = metrics.summary(self.losses, quantiles=quantiles)
        out["gross_mean"] = self.gross_mean()

        if self.ceded_losses is not None:
            out["ceded_mean"] = self.ceded_mean()

        if self.retained_losses is not None:
            out["retained_mean"] = self.retained_mean()

        component_means = self.component_means()
        if component_means:
            out["component_means"] = component_means

        layer_means = self.layer_means()
        if layer_means:
            out["layer_means"] = layer_means

        if self.contract_name is not None:
            out["contract_name"] = self.contract_name

        return out

apply_contract

apply_contract(
    losses: ndarray | list[float],
    contract: AggregateLayer | ContractProgram,
) -> tuple[np.ndarray, np.ndarray]

Return (ceded, retained) arrays for a single aggregate layer or a multi-layer contract program.

Source code in risksim/contracts.py
def apply_contract(
    losses: np.ndarray | list[float],
    contract: AggregateLayer | ContractProgram,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Return (ceded, retained) arrays for a single aggregate layer
    or a multi-layer contract program.
    """
    gross = as_1d_float_array(losses)
    ceded = contract.ceded(gross)
    retained = gross - ceded
    return ceded, retained