| import os |
| import struct |
| from pathlib import Path |
| from typing import Literal, Union |
|
|
| import numpy as np |
| import torch |
| import lightgbm as lgb |
| import torchaudio |
| from huggingface_hub import hf_hub_download |
| from joblib import dump, load |
| from sklearn.exceptions import NotFittedError |
| from torch import Tensor |
| from torchaudio.transforms import Spectrogram |
| import torch.nn.functional as F |
| from datasets.formatting import query_table |
| from datasets import Dataset |
| import warnings |
|
|
| warnings.filterwarnings("ignore") |
|
|
| SR = 12000 |
|
|
|
|
| class FastModel: |
| """ |
| A class designed for training and predicting using LightGBM, incorporating spectral and cepstral features. |
| |
| Workflow: |
| 1. Batch Loading and Decoding: |
| Load audio data in batches directly from a table and decode byte-encoded information. |
| |
| 2. Processing Audio: |
| - Resampling, Padding, or Truncating: |
| Adjust audio durations by padding, cutting, or resampling as needed. |
| - Spectral and Cepstral Feature Extraction: |
| - Compute the spectrogram for audio signals. |
| - Focus on a selected frequency range (~50-1500 Hz) to derive the cepstrum, calculated as the FFT of the logarithm of the spectrogram. |
| - Average both spectrogram and cepstral features over the time axis and combine them into a unified feature vector. |
| |
| 3. Model Application: |
| Use the extracted features as input for the LightGBM model to perform predictions. |
| |
| Attributes |
| ---------- |
| audio_processing_params : dict |
| Parameters for configuring audio processing. |
| feature_params : dict |
| Parameters for configuring the Spectrogram and Cepstrogram transformation. |
| lgbm_params : dict, optional |
| Parameters for configuring the LightGBM model. |
| device : str |
| Device used for computation ("cpu" or "cuda"). |
| """ |
|
|
| def __init__( |
| self, |
| audio_processing_params: dict, |
| feature_params: dict, |
| lgbm_params: dict, |
| device: str = "cuda", |
| ): |
| self.audio_processing_params = audio_processing_params |
| self.feature_params = feature_params |
| self.lgbm_params = lgbm_params |
| self.device = torch.device( |
| "cuda" if device == "cuda" and torch.cuda.is_available() else "cpu" |
| ) |
| self.model = None |
|
|
| |
| self.spectrogram_transformer = Spectrogram( |
| n_fft=self.feature_params["n_fft"], |
| hop_length=self.feature_params["hop_length"], |
| pad=self.feature_params["pad"], |
| window_fn=torch.hamming_window, |
| power=self.feature_params["power"], |
| pad_mode=self.feature_params["pad_mode"], |
| onesided=True, |
| center=False, |
| ).to(self.device) |
| self.f = torch.fft.rfftfreq(self.feature_params["n_fft"], d=1.0 / SR) |
| self.ind_f_filtered = torch.tensor( |
| (self.f > self.feature_params["f_min"]) & (self.f < self.feature_params["f_max"]), |
| device=self.device, |
| ) |
| self.n_fft_cepstral = self.ind_f_filtered.sum() |
| self.cepstral_transformer = Spectrogram( |
| n_fft=self.n_fft_cepstral, |
| hop_length=self.n_fft_cepstral, |
| pad=0, |
| window_fn=torch.hamming_window, |
| power=self.feature_params["power"], |
| pad_mode=self.feature_params["pad_mode"], |
| onesided=True, |
| center=False, |
| ).to(self.device) |
| self.cf = torch.fft.rfftfreq(self.n_fft_cepstral, d=0.5) |
| self.ind_cf_filtered = torch.tensor( |
| (self.cf > self.feature_params["fc_min"]) & (self.cf < self.feature_params["fc_max"]), |
| device=self.device, |
| ) |
|
|
| def fit(self, dataset: Dataset, batch_size: int = 5000): |
| """Trains a LightGBM model on features extracted from the dataset. |
| |
| Parameters |
| ---------- |
| dataset : Dataset |
| Arrow Dataset object containing audio samples and their corresponding labels. |
| batch_size : int, optional |
| Number of audio samples per batch (default is 5000). |
| |
| Raises |
| ------ |
| ValueError |
| If the dataset is empty or invalid. |
| """ |
| features, labels = [], [] |
| for audio, label in self.batch_audio_loader( |
| dataset, |
| batch_size=batch_size, |
| ): |
| feature = self.get_features(audio) |
| features.append(feature) |
| labels.extend(label) |
| x_train = torch.cat(features, dim=0) |
| train_data = lgb.Dataset(x_train.cpu(), label=labels) |
| self.model = lgb.train(self.lgbm_params, train_data) |
|
|
| def predict(self, dataset: Dataset, get_proba: bool = False, batch_size: int = 5000): |
| """Predicts labels or probabilities for a dataset using the trained model. |
| |
| Parameters |
| ---------- |
| dataset : Dataset |
| The dataset containing audio data for prediction. |
| get_proba : bool, optional |
| If True, returns class probabilities rather than binary predictions (default is False). |
| batch_size : int, optional |
| Number of audio samples per batch (default is 5000). |
| |
| Returns |
| ------- |
| numpy.ndarray |
| If `get_proba` is True, returns a 1D array of class probabilities. |
| If `get_proba` is False, returns a 1D array of binary predictions (0 or 1). |
| |
| Raises |
| ------ |
| NotFittedError |
| If the model is not yet trained. |
| """ |
| if not self.model: |
| raise NotFittedError("LGBM model is not fitted yet.") |
| features = [] |
| for audio, _ in self.batch_audio_loader( |
| dataset, |
| batch_size=batch_size, |
| ): |
| feature = self.get_features(audio) |
| features.append(feature) |
| features = torch.cat(features, dim=0) |
| torch.cuda.empty_cache() |
|
|
| y_score = self.model.predict(features.cpu()) |
|
|
| return y_score if get_proba else (y_score >= 0.5).astype(int) |
|
|
| def get_features(self, audios: Tensor): |
| """ |
| Extracts features from raw audio using spectrogram and cepstrum transformations. |
| |
| Parameters |
| ---------- |
| audios : torch.Tensor |
| A batch of audio waveforms as 2D tensors (n_audios, n_samples_per_audio). |
| |
| Returns |
| ------- |
| torch.Tensor |
| Extracted features for the audio batch. Includes both cepstral and log-scaled spectrogram features. |
| |
| Raises |
| ------ |
| ValueError |
| If the input audio tensor is empty or invalid. |
| """ |
| audios = audios.to(self.device) |
| sxx = self.spectrogram_transformer(audios) |
| sxx = torch.log10(torch.clamp(sxx.permute(0, 2, 1), min=1e-10)) |
| cepstral_mat = self.cepstral_transformer(sxx[:, :, self.ind_f_filtered]).squeeze(dim=3)[ |
| :, :, self.ind_cf_filtered |
| ] |
|
|
| return torch.cat( |
| [ |
| cepstral_mat.mean(dim=1), |
| sxx.mean(dim=1), |
| ], |
| dim=1, |
| ) |
|
|
| def batch_audio_loader( |
| self, dataset: Dataset, batch_size: int = 1, offset: int = 0, device="cpu" |
| ): |
| """Optimized loader for audio data from a dataset for training or inference in batches. |
| |
| Parameters |
| ---------- |
| dataset : Dataset |
| The dataset containing audio samples and labels. |
| waveform_duration : int, optional |
| Desired duration of the audio waveforms in seconds (default is 3). |
| batch_size : int, optional |
| Number of audio samples per batch (default is 1). |
| sr : int, optional |
| Target sampling rate for audio processing (default is 12000). |
| device : str, optional |
| Device for processing ("cpu" or "cuda") (default is "cpu"). |
| padding_method : str, optional |
| Method to pad audio waveforms smaller than the desired size (e.g., "zero", "reflect"). |
| offset : int, optional |
| Number of samples to skip before processing the first audio sample (default is 0). |
| |
| Yields |
| ------ |
| tuple (Tensor, Tensor) |
| A tuple (batch_audios, batch_labels), where: |
| - batch_audios is a torch.tensor of processed audio waveforms. |
| - batch_labels is a torch.tensor of corresponding audio labels. |
| |
| Raises |
| ------ |
| ValueError |
| If an unsupported sampling rate is encountered in the dataset. |
| """ |
|
|
| def process_resampling(resample_buffer, resample_indices, batch_audios, sr, target_sr): |
| if resample_buffer: |
| resampler = torchaudio.transforms.Resample( |
| orig_freq=sr, new_freq=target_sr, lowpass_filter_width=6 |
| ) |
| resampled = resampler(torch.stack(resample_buffer)) |
| for idx, original_idx in enumerate(resample_indices): |
| batch_audios[original_idx] = resampled[idx] |
|
|
| |
| sr = self.audio_processing_params["sample_rate"] |
| waveform_duration = self.audio_processing_params["duration"] |
| padding_method = self.audio_processing_params["padding_method"] |
|
|
| device = torch.device( |
| "cuda" if device == "cuda" and torch.cuda.is_available() else "cpu" |
| ) |
| batch_audios, batch_labels = [], [] |
| resample_24000, resample_24000_indices = [], [] |
|
|
| for i in range(len(dataset)): |
| pa_subtable = query_table(dataset._data, i, indices=dataset._indices) |
| wav_bytes = pa_subtable[0][0][0].as_py() |
| sampling_rate = struct.unpack("<I", wav_bytes[24:28])[0] |
|
|
| if sampling_rate not in [sr, sr * 2]: |
| raise ValueError( |
| f"Unsupported sampling rate: {sampling_rate}Hz. Only {sr}Hz and {sr * 2}Hz are allowed." |
| ) |
|
|
| data_size = struct.unpack("<I", wav_bytes[40:44])[0] // 2 |
| if data_size == 0: |
| batch_audios.append(torch.zeros(int(waveform_duration * SR))) |
| else: |
| try: |
| waveform = ( |
| torch.frombuffer(wav_bytes[44:], dtype=torch.int16, offset=offset)[ |
| : int(waveform_duration * sampling_rate) |
| ].float() |
| / 32767 |
| ) |
| except Exception as e: |
| continue |
| waveform = apply_padding( |
| waveform, int(waveform_duration * sampling_rate), padding_method |
| ) |
|
|
| if sampling_rate == sr: |
| batch_audios.append(waveform) |
| elif sampling_rate == 2 * sr: |
| resample_24000.append(waveform) |
| resample_24000_indices.append(len(batch_audios)) |
| batch_audios.append(None) |
|
|
| batch_labels.append(pa_subtable[1][0].as_py()) |
|
|
| if len(batch_audios) == batch_size: |
| |
| process_resampling(resample_24000, resample_24000_indices, batch_audios, sr * 2, SR) |
|
|
| batch_audios_on_device = torch.stack(batch_audios).to(device) |
| batch_labels_on_device = torch.tensor(batch_labels).to(device) |
|
|
| yield batch_audios_on_device, batch_labels_on_device |
|
|
| batch_audios, batch_labels = [], [] |
| resample_24000, resample_24000_indices = [], [] |
|
|
| if batch_audios: |
| process_resampling(resample_24000, resample_24000_indices, batch_audios, sr * 2, SR) |
| batch_audios_on_device = torch.stack(batch_audios).to(device) |
| batch_labels_on_device = torch.tensor(batch_labels).to(device) |
|
|
| yield batch_audios_on_device, batch_labels_on_device |
|
|
|
|
| def apply_padding( |
| waveform: torch.Tensor, |
| output_size: int, |
| padding_method: Literal["zero", "reflect", "replicate", "circular"] = "zero", |
| ) -> torch.Tensor: |
| """ |
| Applies padding to the waveform when its size is smaller than the desired output size. |
| |
| Parameters |
| ---------- |
| waveform : torch.Tensor |
| Input 1D waveform tensor. |
| output_size : int |
| Desired output size after padding or truncation. |
| padding_method : str, default="zero" |
| Padding method to apply. |
| |
| Returns |
| ------- |
| torch.Tensor |
| Padded or truncated waveform of size `output_size`. |
| """ |
| if waveform.size(0) >= output_size: |
| return waveform[:output_size] |
|
|
| total_pad = output_size - waveform.size(0) |
| if padding_method == "zero": |
| return F.pad(waveform, (0, total_pad), mode="constant", value=0) |
| if padding_method in ["reflect", "replicate", "circular"]: |
| |
| if waveform.size(0) < total_pad: |
| num_repeats = (total_pad // waveform.size(0)) + 1 |
| waveform = torch.tile(waveform, (num_repeats,)) |
| total_pad = output_size - waveform.size(0) |
|
|
| return F.pad(waveform.unsqueeze(0), (0, total_pad), mode=padding_method).squeeze() |
| raise ValueError(f"Invalid padding method: {padding_method}") |
|
|
|
|
| class FastModelHuggingFace: |
| """ |
| Class for loading a FastModel instance from the Hugging Face Hub. |
| Includes preprocessing pipelines and a LightGBM model. |
| |
| Attributes |
| ---------- |
| pipeline : object |
| The serialized preprocessing pipeline. |
| model : lgb.Booster |
| The LightGBM model instance used for predictions. |
| |
| Methods |
| ------- |
| from_pretrained(repo_id: str, revision: str = "main", |
| pipeline_file_name: str = "pipeline.joblib", |
| model_file_name: str = "model_lightgbm.txt") -> "FastModelHuggingFace": |
| Loads the FastModel pipeline and model from the Hugging Face Hub. |
| predict(input_data: Union[str, "HuggingFaceDataset"], get_proba: bool = False) -> np.ndarray: |
| Predicts labels or probabilities for a WAV file or dataset. |
| """ |
|
|
| def __init__(self, pipeline: object, lightgbm_model: lgb.Booster): |
| """ |
| Initializes a FastModelHuggingFace instance. |
| |
| Parameters |
| ---------- |
| pipeline : object |
| The serialized preprocessing pipeline. |
| lightgbm_model : lgb.Booster |
| A LightGBM booster model for predictions. |
| """ |
| self.pipeline = pipeline |
| self.model = lightgbm_model |
|
|
| @classmethod |
| def from_pretrained( |
| cls, |
| repo_id: str, |
| revision: str = "main", |
| pipeline_file_name: str = "pipeline.joblib", |
| model_file_name: str = "model_lightgbm.txt", |
| ) -> "FastModelHuggingFace": |
| """ |
| Loads the FastModel pipeline and LightGBM model from the Hugging Face Hub. |
| |
| Parameters |
| ---------- |
| repo_id : str |
| The Hugging Face repository ID. |
| revision : str, optional |
| The specific revision of the repository to use (default is "main"). |
| pipeline_file_name : str, optional |
| The filename of the serialized pipeline (default is "pipeline.joblib"). |
| model_file_name : str, optional |
| The filename of the LightGBM model (default is "model_lightgbm.txt"). |
| |
| Returns |
| ------- |
| FastModelHuggingFace |
| A FastModelHuggingFace instance with the loaded pipeline and model. |
| |
| Raises |
| ------ |
| FileNotFoundError |
| If either the pipeline or LightGBM model files are missing or corrupted. |
| """ |
| pipeline_path = hf_hub_download(repo_id, filename=pipeline_file_name, revision=revision) |
| model_lgbm_path = hf_hub_download(repo_id, filename=model_file_name, revision=revision) |
|
|
| if not os.path.exists(pipeline_path): |
| raise FileNotFoundError(f"Pipeline file {pipeline_path} is missing or corrupted.") |
| pipeline = load(pipeline_path) |
|
|
| if not os.path.exists(model_lgbm_path): |
| raise FileNotFoundError( |
| f"LightGBM model file {model_lgbm_path} is missing or corrupted." |
| ) |
| lightgbm_model = lgb.Booster(model_file=model_lgbm_path) |
|
|
| return cls(pipeline=pipeline, lightgbm_model=lightgbm_model) |
|
|
| def predict( |
| self, |
| input_data: Union[str, "HuggingFaceDataset"], |
| get_proba: bool = False, |
| batch_size: int = 5000, |
| device: Literal["cpu", "cuda"] = "cuda", |
| ) -> np.ndarray: |
| """ |
| Predicts labels or probabilities for a given audio input. |
| |
| Parameters |
| ---------- |
| input_data : Union[str, HuggingFaceDataset] |
| The input for prediction, either the path to a WAV file or a Hugging Face dataset. |
| get_proba : bool, optional |
| If True, returns class probabilities instead of binary predictions (default is False). |
| batch_size : int, optional |
| Number of audio samples per batch (default is 5000). |
| device : Literal["cpu", "cuda"] |
| |
| Returns |
| ------- |
| np.ndarray |
| If `get_proba` is True, returns an array of probabilities. |
| If `get_proba` is False, returns binary predictions. |
| |
| Raises |
| ------ |
| ValueError |
| If the input data type is neither a WAV file path string nor a Hugging Face dataset. |
| """ |
| if isinstance(input_data, str): |
| audio_waveform, sr = torchaudio.load(input_data) |
| audio_waveform = audio_waveform.mean(dim=0) |
| if sr != self.pipeline.audio_processing_params["sample_rate"]: |
| resampler = torchaudio.transforms.Resample( |
| orig_freq=sr, new_freq=self.pipeline.audio_processing_params["sample_rate"] |
| ) |
| audio_waveform = resampler(audio_waveform) |
| features = self.pipeline.get_features(audio_waveform.unsqueeze(0).to(device)) |
| predictions = self.model.predict(features.cpu().numpy()) |
| return predictions if get_proba else (predictions >= 0.5).astype(int) |
|
|
| elif hasattr(input_data, "_data"): |
| features = [] |
| for batch_audios, _ in self.pipeline.batch_audio_loader( |
| input_data, |
| batch_size=batch_size, |
| device=device, |
| ): |
| batch_features = self.pipeline.get_features(batch_audios) |
| features.append(batch_features) |
| features = torch.cat(features, dim=0) |
| predictions = self.model.predict(features.cpu().numpy()) |
| return predictions if get_proba else (predictions >= 0.5).astype(int) |
| else: |
| raise ValueError("Input must be either a path to a WAV file or a Hugging Face Dataset.") |
|
|
|
|
| def save_pipeline( |
| model_class_instance: FastModel, |
| path: str, |
| lgbm_file_name: str = None, |
| pipeline_file_name: str = None, |
| ): |
| """ |
| Serializes the complete FastModel instance for saving. |
| |
| Parameters |
| ---------- |
| model_class_instance : FastModelHuggingFace |
| The trained FastModel instance to serialize. |
| path : str |
| The directory to save the FastModel instance. |
| lgbm_file_name : str, optional |
| The filename for saving the LightGBM model (default is "model_fast_model.txt"). |
| pipeline_file_name : str, optional |
| The filename for saving the pipeline (default is "pipeline.joblib"). |
| """ |
| lgbm_file_name = lgbm_file_name or "model_lightgbm.txt" |
| pipeline_file_name = pipeline_file_name or "pipeline.joblib" |
|
|
| lightgbm_path = Path(path) / lgbm_file_name |
| if model_class_instance.model: |
| model_class_instance.model_file_name = str(lightgbm_path) |
| model_class_instance.model.save_model(model_class_instance.model_file_name) |
|
|
| pipeline_path = Path(path) / pipeline_file_name |
| dump(model_class_instance, pipeline_path) |
|
|
|
|
| def load_pipeline( |
| path: str, lgbm_file_name: str = None, pipeline_file_name: str = None |
| ) -> FastModelHuggingFace: |
| """ |
| Loads a serialized pipeline and LightGBM model. |
| |
| Parameters |
| ---------- |
| path : str |
| The directory containing the serialized FastModel. |
| lgbm_file_name : str, optional |
| The filename for the LightGBM model (default is "model_fast_model.txt"). |
| pipeline_file_name : str, optional |
| The filename for the pipeline (default is "pipeline.joblib"). |
| |
| Returns |
| ------- |
| FastModelHuggingFace |
| An instance of the loaded FastModel. |
| |
| Raises |
| ------ |
| FileNotFoundError |
| If either the LightGBM model or pipeline file is not found. |
| """ |
| lgbm_file_name = lgbm_file_name or "model_fast_model.txt" |
| pipeline_file_name = pipeline_file_name or "pipeline.joblib" |
|
|
| pipeline_path = Path(path) / pipeline_file_name |
| if not pipeline_path.exists(): |
| raise FileNotFoundError(f"Pipeline file {pipeline_path} not found.") |
|
|
| model_class_instance = load(pipeline_path) |
|
|
| lightgbm_path = Path(path) / lgbm_file_name |
| if not lightgbm_path.exists(): |
| raise FileNotFoundError(f"LightGBM file {lightgbm_path} not found.") |
| model_class_instance.model = lgb.Booster(model_file=str(lightgbm_path)) |
|
|
| return model_class_instance |
|
|