| import numpy as np |
| from scipy.special import betaln, logsumexp |
| from sklearn.cluster import KMeans |
|
|
| class BetaMixtureModel: |
| """ |
| Beta Mixture Model (Multivariate version). |
| Each dimension is modeled independently by a Beta distribution. |
| """ |
|
|
| def __init__(self, n_mixtures=3, random_seed=1): |
| self.n_mixtures = n_mixtures |
| self.random_seed = random_seed |
| self.convergence = False |
|
|
| def _init_clusters(self, data_matrix, init_round): |
| """ |
| Initialize the mixture responsibilities (assignments) via k-means or uniformly random |
| """ |
| if self.method == "kmeans": |
| km = KMeans( |
| n_clusters=self.n_mixtures, |
| n_init=1, |
| random_state=self.random_seed + init_round |
| ).fit(data_matrix) |
| resp_matrix = np.zeros((self.n_observations, self.n_mixtures)) |
| resp_matrix[np.arange(self.n_observations), km.labels_] = 1 |
| else: |
| np.random.seed(self.random_seed + init_round) |
| resp_matrix = np.random.rand(self.n_observations, self.n_mixtures) |
| resp_matrix /= resp_matrix.sum(axis=1, keepdims=True) |
| |
| |
| resp_matrix += 10 * np.finfo(resp_matrix.dtype).eps |
|
|
| |
| self.beta_params_ = np.zeros((self.n_mixtures, self.n_components * 2)) |
| self._M_step(data_matrix, np.log(resp_matrix)) |
|
|
|
|
| def _calc_log_weights(self): |
| """ |
| Return log of current mixture weights. |
| """ |
| return np.log(self.mix_weights_) |
|
|
| def _calc_mixture_log_probs(self, data_matrix, mixture_idx): |
| """ |
| Compute log-prob for a single mixture (used if parallelized). |
| """ |
| alpha_vec = self.beta_params_[mixture_idx, :self.n_components] |
| beta_vec = self.beta_params_[mixture_idx, self.n_components:] |
| beta_func_log = betaln(alpha_vec, beta_vec) |
| return ( |
| (alpha_vec - 1) * np.log(data_matrix) |
| + (beta_vec - 1) * np.log(1 - data_matrix) |
| - beta_func_log |
| ).sum(axis=1) |
|
|
| def _calc_log_probs_all_mixtures(self, data_matrix): |
| """ |
| Return log-prob for each observation under each mixture (unnormalized). |
| """ |
| log_prob = np.empty((self.n_observations, self.n_mixtures)) |
| for mix in range(self.n_mixtures): |
| alpha_vec = self.beta_params_[mix, :self.n_components] |
| beta_vec = self.beta_params_[mix, self.n_components:] |
| bfn = betaln(alpha_vec, beta_vec) |
| log_prob[:, mix] = ( |
| (alpha_vec - 1) * np.log(data_matrix) |
| + (beta_vec - 1) * np.log(1 - data_matrix) |
| - bfn |
| ).sum(axis=1) |
| return log_prob |
|
|
| def _calc_weighted_log_probs(self, data_matrix): |
| """ |
| Return the sum of log-probabilities and log-weights. |
| """ |
| return self._calc_log_probs_all_mixtures(data_matrix) + self._calc_log_weights() |
|
|
| def _calc_log_resp_and_norm(self, data_matrix): |
| """ |
| Return (log_prob_norm, log_resp) for the E-step. |
| """ |
| weighted_lp = self._calc_weighted_log_probs(data_matrix) |
| lp_norm = logsumexp(weighted_lp, axis=1) |
| with np.errstate(under="ignore"): |
| log_resp = weighted_lp - lp_norm[:, None] |
| return lp_norm, log_resp |
|
|
| def _E_step(self, data_matrix): |
| """ |
| E-step: compute average log_prob_norm and log_resp. |
| """ |
| lp_norm, log_resp = self._calc_log_resp_and_norm(data_matrix) |
| return np.mean(lp_norm), log_resp |
|
|
| def _compute_responsibilities(self, log_resp): |
| """ |
| Exponentiate log_resp and sum across observations. |
| """ |
| resp_matrix = np.exp(log_resp) |
| cluster_counts = resp_matrix.sum(axis=0) + 10 * np.finfo(resp_matrix.dtype).eps |
| return resp_matrix, cluster_counts |
|
|
| def _update_mixture_weights(self, cluster_counts): |
| """ |
| Update mixture weights from mixture counts. |
| """ |
| self.mix_weights_ = cluster_counts / cluster_counts.sum() |
|
|
| def _M_step(self, data_matrix, log_resp): |
| """ |
| M-step: update weights and Beta distribution parameters via moment matching. |
| """ |
| resp_matrix, cluster_counts = self._compute_responsibilities(log_resp) |
| self._update_mixture_weights(cluster_counts) |
|
|
| w_sums = resp_matrix.T @ data_matrix |
| w_sums_sq = resp_matrix.T @ (data_matrix ** 2) |
|
|
| for m_idx in range(self.n_mixtures): |
| sum_vals = w_sums[m_idx] |
| sum_sq_vals = w_sums_sq[m_idx] |
| mean_val = sum_vals / cluster_counts[m_idx] |
| var_val = sum_sq_vals / cluster_counts[m_idx] - mean_val ** 2 |
|
|
| |
| variance_cap = mean_val * (1 - mean_val) / 4 |
| var_val = np.minimum(var_val, variance_cap) |
| var_val += 10 * np.finfo(var_val.dtype).eps |
|
|
| |
| scaling_factor = (mean_val * (1 - mean_val)) / (var_val + 1e-10) - 1 |
| self.beta_params_[m_idx, :self.n_components] = scaling_factor * mean_val |
| self.beta_params_[m_idx, self.n_components:] = scaling_factor * (1 - mean_val) |
|
|
| def fit(self, data_matrix, num_init=3, method="kmeans", max_iter=1000, tol=1e-4): |
| """ |
| Fit BetaMixtureModel to the data using EM, possibly with multiple initializations. |
| """ |
| self.n_observations, self.n_components = data_matrix.shape |
| self.convergence = False |
| self.method = method |
| best_lower_bound = -np.inf |
| optimal_params = None |
|
|
| for init_round in range(num_init): |
| |
| self._init_clusters(data_matrix, init_round) |
| ll_bound = -np.inf |
|
|
| for _ in range(max_iter): |
| prev_bound = ll_bound |
| lp_norm, log_resp = self._E_step(data_matrix) |
| self._M_step(data_matrix, log_resp) |
| ll_bound = lp_norm |
| delta_bound = ll_bound - prev_bound |
|
|
| if abs(delta_bound) < tol: |
| self.convergence = True |
| break |
|
|
| if ll_bound > best_lower_bound: |
| best_lower_bound = ll_bound |
| |
| _, cluster_counts = self._compute_responsibilities(log_resp) |
| self._update_mixture_weights(cluster_counts) |
| optimal_params = (self.mix_weights_.copy(), self.beta_params_.copy()) |
|
|
| self.mix_weights_, self.beta_params_ = optimal_params |
| self.max_lower_bound = best_lower_bound |
| return self |
|
|
| def predict_proba(self, data_matrix): |
| """ |
| Return the per-mixture membership probabilities for each sample. |
| """ |
| _, log_resp = self._calc_log_resp_and_norm(data_matrix) |
| return np.exp(log_resp) |
|
|
| def predict(self, data_matrix): |
| """ |
| Return the most probable mixture index for each sample. |
| """ |
| return np.argmax(self.predict_proba(data_matrix), axis=1) |