risksim
Monte Carlo simulation of portfolios of risk models, with reinsurance contracts and
layers. You wrap any sampling model (for example a lossmodels collective-risk model)
in a PortfolioItem, collect items into a Portfolio, and simulate the aggregate —
gross, retained, ceded, and by layer.
Quickstart
import lossmodels as lm
import risksim as rs
# each group is a collective-risk model
group_a = lm.CollectiveRiskModel(lm.Poisson(lam=3.0), lm.Lognormal(8.0, 1.5))
group_b = lm.CollectiveRiskModel(lm.Poisson(lam=5.0), lm.Lognormal(8.2, 1.4))
portfolio = rs.Portfolio([
rs.PortfolioItem("group_a", group_a),
rs.PortfolioItem("group_b", group_b),
])
result = portfolio.simulate(50_000) # SimulationResult
result.mean() # expected aggregate loss
result.prob_exceeding(1_000_000) # P(aggregate > 1M)
result.summary() # gross/retained/ceded summary
Reinsurance layers
import risksim as rs
layer = rs.AggregateLayer(attachment=1_000_000, limit=2_000_000)
result = portfolio.simulate(50_000, contract=layer)
result.ceded_mean() # expected ceded into the layer
result.retained_mean() # expected retained
API reference
risksim
AggregateLayer
dataclass
Aggregate annual layer applied to simulated aggregate losses.
For aggregate annual loss S, ceded loss is:
C = share * min((S - attachment)+, limit)
with no cap if limit is None.
Source code in risksim/contracts.py
| @dataclass(frozen=True, slots=True)
class AggregateLayer:
"""
Aggregate annual layer applied to simulated aggregate losses.
For aggregate annual loss S, ceded loss is:
C = share * min((S - attachment)+, limit)
with no cap if limit is None.
"""
attachment: float = 0.0
limit: float | None = None
share: float = 1.0
name: str | None = None
def __post_init__(self) -> None:
if self.attachment < 0:
raise ValueError("attachment must be nonnegative")
if self.limit is not None and self.limit < 0:
raise ValueError("limit must be nonnegative or None")
if not (0.0 <= self.share <= 1.0):
raise ValueError("share must be between 0 and 1")
def ceded(self, losses: np.ndarray | list[float]) -> np.ndarray:
gross = as_1d_float_array(losses)
recoverable = np.maximum(gross - self.attachment, 0.0)
if self.limit is not None:
recoverable = np.minimum(recoverable, self.limit)
return self.share * recoverable
def retained(self, losses: np.ndarray | list[float]) -> np.ndarray:
gross = as_1d_float_array(losses)
return gross - self.ceded(gross)
def attachment_probability(self, losses: np.ndarray | list[float]) -> float:
"""Probability that a loss reaches the layer, i.e. P(loss > attachment).
This depends only on the attachment point, not on ``share`` or ``limit``,
so it stays correct under degenerate parameters (e.g. ``share=0``).
"""
gross = as_1d_float_array(losses)
return float(np.mean(gross > self.attachment))
def exhaustion_probability(self, losses: np.ndarray | list[float]) -> float | None:
if self.limit is None:
return None
gross = as_1d_float_array(losses)
return float(np.mean(gross >= self.attachment + self.limit))
|
attachment_probability
attachment_probability(
losses: ndarray | list[float],
) -> float
Probability that a loss reaches the layer, i.e. P(loss > attachment).
This depends only on the attachment point, not on share or limit,
so it stays correct under degenerate parameters (e.g. share=0).
Source code in risksim/contracts.py
| def attachment_probability(self, losses: np.ndarray | list[float]) -> float:
"""Probability that a loss reaches the layer, i.e. P(loss > attachment).
This depends only on the attachment point, not on ``share`` or ``limit``,
so it stays correct under degenerate parameters (e.g. ``share=0``).
"""
gross = as_1d_float_array(losses)
return float(np.mean(gross > self.attachment))
|
ContractProgram
Collection of aggregate layers applied to the same gross loss.
This first version assumes the layers are intended to work together
without overlap. For a standard non-overlapping tower, total ceded loss
is the row-wise sum of ceded loss by layer.
Source code in risksim/contracts.py
| class ContractProgram:
"""
Collection of aggregate layers applied to the same gross loss.
This first version assumes the layers are intended to work together
without overlap. For a standard non-overlapping tower, total ceded loss
is the row-wise sum of ceded loss by layer.
"""
def __init__(
self,
layers: Sequence[AggregateLayer],
name: str = "contract_program",
) -> None:
if not layers:
raise ValueError("layers must contain at least one AggregateLayer")
self.layers = tuple(layers)
self.name = name
def layer_names(self) -> list[str]:
names: list[str] = []
for i, layer in enumerate(self.layers):
names.append(layer.name or f"layer_{i}")
return names
def ceded_by_layer(self, losses: np.ndarray | list[float]) -> np.ndarray:
gross = as_1d_float_array(losses)
cols = [layer.ceded(gross) for layer in self.layers]
return np.column_stack(cols)
def ceded(self, losses: np.ndarray | list[float]) -> np.ndarray:
by_layer = self.ceded_by_layer(losses)
return np.sum(by_layer, axis=1)
def retained(self, losses: np.ndarray | list[float]) -> np.ndarray:
gross = as_1d_float_array(losses)
return gross - self.ceded(gross)
|
Portfolio
Portfolio of aggregate-loss components.
This first version assumes components are sampled independently.
Source code in risksim/portfolio.py
| class Portfolio:
"""
Portfolio of aggregate-loss components.
This first version assumes components are sampled independently.
"""
def __init__(self, items: Sequence[PortfolioItem], name: str = "portfolio") -> None:
if not items:
raise ValueError("items must contain at least one PortfolioItem")
self.items = tuple(items)
self.name = name
def component_names(self) -> list[str]:
return [item.name for item in self.items]
def sample_components(self, size: int = 1) -> np.ndarray:
_validate_size(size)
columns = [item.sample(size=size) for item in self.items]
return np.column_stack(columns)
def sample(self, size: int = 1) -> np.ndarray:
component_losses = self.sample_components(size=size)
return np.sum(component_losses, axis=1)
def mean(self) -> float:
return float(sum(item.mean() for item in self.items))
def variance(self) -> float:
"""
Analytic variance under the independence assumption.
"""
return float(sum(item.variance() for item in self.items))
def std(self) -> float:
return float(np.sqrt(self.variance()))
def simulate(
self,
size: int = 100_000,
contract: AggregateLayer | ContractProgram | None = None,
) -> SimulationResult:
component_losses = self.sample_components(size=size)
gross_losses = np.sum(component_losses, axis=1)
if contract is None:
return SimulationResult(
gross_losses=gross_losses,
component_losses=component_losses,
component_names=self.component_names(),
)
ceded_losses, retained_losses = apply_contract(gross_losses, contract)
if isinstance(contract, ContractProgram):
return SimulationResult(
gross_losses=gross_losses,
ceded_losses=ceded_losses,
retained_losses=retained_losses,
component_losses=component_losses,
component_names=self.component_names(),
layer_losses=contract.ceded_by_layer(gross_losses),
layer_names=contract.layer_names(),
contract_name=contract.name,
)
return SimulationResult(
gross_losses=gross_losses,
ceded_losses=ceded_losses,
retained_losses=retained_losses,
component_losses=component_losses,
component_names=self.component_names(),
contract_name=contract.name or contract.__class__.__name__,
)
|
variance
Analytic variance under the independence assumption.
Source code in risksim/portfolio.py
| def variance(self) -> float:
"""
Analytic variance under the independence assumption.
"""
return float(sum(item.variance() for item in self.items))
|
SimulationResult
dataclass
Container for portfolio simulation outputs.
If retained_losses is present, the primary losses view is retained/net loss.
Otherwise, the primary losses view is gross loss.
Source code in risksim/results.py
| @dataclass(slots=True)
class SimulationResult:
"""
Container for portfolio simulation outputs.
If retained_losses is present, the primary `losses` view is retained/net loss.
Otherwise, the primary `losses` view is gross loss.
"""
gross_losses: np.ndarray
ceded_losses: np.ndarray | None = None
retained_losses: np.ndarray | None = None
component_losses: np.ndarray | None = None
component_names: Sequence[str] | None = None
layer_losses: np.ndarray | None = None
layer_names: Sequence[str] | None = None
contract_name: str | None = None
def __post_init__(self) -> None:
self.gross_losses = as_1d_float_array(self.gross_losses)
if self.ceded_losses is not None:
self.ceded_losses = as_1d_float_array(self.ceded_losses)
if self.ceded_losses.shape != self.gross_losses.shape:
raise ValueError("ceded_losses must match gross_losses shape")
if self.retained_losses is not None:
self.retained_losses = as_1d_float_array(self.retained_losses)
if self.retained_losses.shape != self.gross_losses.shape:
raise ValueError("retained_losses must match gross_losses shape")
if self.component_losses is not None:
self.component_losses = np.asarray(self.component_losses, dtype=float)
if self.component_losses.ndim != 2:
raise ValueError("component_losses must be a 2D array")
if self.component_losses.shape[0] != self.gross_losses.shape[0]:
raise ValueError("component_losses must have one row per simulation")
if self.component_names is not None:
if len(self.component_names) != self.component_losses.shape[1]:
raise ValueError(
"component_names length must match number of component columns"
)
if self.layer_losses is not None:
self.layer_losses = np.asarray(self.layer_losses, dtype=float)
if self.layer_losses.ndim != 2:
raise ValueError("layer_losses must be a 2D array")
if self.layer_losses.shape[0] != self.gross_losses.shape[0]:
raise ValueError("layer_losses must have one row per simulation")
if self.layer_names is not None:
if len(self.layer_names) != self.layer_losses.shape[1]:
raise ValueError(
"layer_names length must match number of layer columns"
)
@property
def n_sims(self) -> int:
return int(self.gross_losses.size)
@property
def losses(self) -> np.ndarray:
if self.retained_losses is not None:
return self.retained_losses
return self.gross_losses
def mean(self) -> float:
return metrics.mean(self.losses)
def variance(self, ddof: int = 0) -> float:
return metrics.variance(self.losses, ddof=ddof)
def std(self, ddof: int = 0) -> float:
return metrics.std(self.losses, ddof=ddof)
def var(self, q: float) -> float:
return metrics.var(self.losses, q)
def tvar(self, q: float) -> float:
return metrics.tvar(self.losses, q)
def prob_exceeding(self, threshold: float) -> float:
return metrics.prob_exceeding(self.losses, threshold)
def gross_mean(self) -> float:
return metrics.mean(self.gross_losses)
def ceded_mean(self) -> float | None:
if self.ceded_losses is None:
return None
return metrics.mean(self.ceded_losses)
def retained_mean(self) -> float | None:
if self.retained_losses is None:
return None
return metrics.mean(self.retained_losses)
def component_means(self) -> dict[str, float]:
if self.component_losses is None:
return {}
means = np.mean(self.component_losses, axis=0)
if self.component_names is None:
names = [f"component_{i}" for i in range(self.component_losses.shape[1])]
else:
names = list(self.component_names)
return {name: float(value) for name, value in zip(names, means)}
def layer_means(self) -> dict[str, float]:
if self.layer_losses is None:
return {}
means = np.mean(self.layer_losses, axis=0)
if self.layer_names is None:
names = [f"layer_{i}" for i in range(self.layer_losses.shape[1])]
else:
names = list(self.layer_names)
return {name: float(value) for name, value in zip(names, means)}
def summary(self, quantiles: tuple[float, ...] = (0.95, 0.99)) -> dict[str, Any]:
out = metrics.summary(self.losses, quantiles=quantiles)
out["gross_mean"] = self.gross_mean()
if self.ceded_losses is not None:
out["ceded_mean"] = self.ceded_mean()
if self.retained_losses is not None:
out["retained_mean"] = self.retained_mean()
component_means = self.component_means()
if component_means:
out["component_means"] = component_means
layer_means = self.layer_means()
if layer_means:
out["layer_means"] = layer_means
if self.contract_name is not None:
out["contract_name"] = self.contract_name
return out
|
apply_contract
apply_contract(
losses: ndarray | list[float],
contract: AggregateLayer | ContractProgram,
) -> tuple[np.ndarray, np.ndarray]
Return (ceded, retained) arrays for a single aggregate layer
or a multi-layer contract program.
Source code in risksim/contracts.py
| def apply_contract(
losses: np.ndarray | list[float],
contract: AggregateLayer | ContractProgram,
) -> tuple[np.ndarray, np.ndarray]:
"""
Return (ceded, retained) arrays for a single aggregate layer
or a multi-layer contract program.
"""
gross = as_1d_float_array(losses)
ceded = contract.ceded(gross)
retained = gross - ceded
return ceded, retained
|