Source code for rarity.interpreters.structured_data.int_loss_clusters

# Copyright 2021 AI Singapore. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Union
import math

from rarity.data_loader import CSVDataLoader, DataframeLoader
from rarity.interpreters.structured_data.base_interpreters import BaseInterpreters
from rarity.utils.methods import create_clusters, calculate_logloss, get_optimum_num_clusters
from rarity.utils.common_functions import is_regression, is_classification


[docs]class IntLossClusterer(BaseInterpreters): ''' Transform raw data into input format suitable for visualization on loss clusters Arguments: data_loader (:class:`~rarity.data_loader.CSVDataLoader` or :class:`~rarity.data_loader.DataframeLoader`): Class object from data_loader module ''' def __init__(self, data_loader: Union[CSVDataLoader, DataframeLoader]): super().__init__(data_loader)
[docs] def extract_misspredictions(self): ''' Function to tapout list of dataframe with prediction state info included ''' ls_dfs_prob, _ = super().get_df_with_probability_values() ls_dfs_prob_misspred = [df.loc[lambda x: x['pred_state'] == 'miss-predict', :] for df in ls_dfs_prob] return ls_dfs_prob_misspred
[docs] def xform(self, num_cluster: int, log_func: math.log, specific_dataset: str): ''' Core transformation function to tap-out data into input format suitable for plotly graph Arguments: num_cluster (int): Number of cluster to form log_funct (:obj:`math.log`): Mathematics logarithm function used to calculate log-loss between yTrue and yPred specific_dataset (str): Default to 'All' indicating to include all miss-predict labels. Other options flexibly expand depending on class labels Returns: Compact outputs consist of the followings - df (:obj:`~pd.DataFrame`): dataframes for overview visualization need with offset values included - ls_score (:obj:`List[float]`): list of silhouette scores, indication of clustering quality - ls_cluster_range (:obj:`List[List[int]]`): list of list containing cluster number range from 1 to 10 - ls_ssd (:obj:`List[float]`): sum of squared distance generated via kmean_inertia .. note:: if classification, returns: Compact outputs consist of the followings - ls_dfs_viz (:obj:`List[~pd.DataFrame]`): dataframes for overview visualization need with offset values included - ls_class_labels (:obj:`List[str]`): list of all class labels - ls_class_labels_misspred (:obj:`List[str]`): list of class labels with minimum of 1 miss-prediction - ls_score (:obj:`List[float]`): list of silhouette scores, indication of clustering quality - ls_cluster_range (:obj:`List[List[int]]`): list of list containing cluster number range from 1 to 10 - ls_ssd (:obj:`List[float]`): sum of squared distance generated via kmean_inertia ''' if is_regression(self.analysis_type): df = super().get_df_with_offset_values() df.insert(0, 'index', df.index) # for ease of user to trace the datapoint in raw dataframe cluster_groups_m1, cluster_score_m1 = create_clusters(df[f'offset_{self.models[0]}'], num_cluster) df[f'cluster_{self.models[0]}'] = cluster_groups_m1 cluster_range_m1, sum_squared_distance_m1 = get_optimum_num_clusters(df[f'offset_{self.models[0]}'], num_cluster) # wrapped in list to std with classification output format for ease of feature integration at later stage ls_score = [cluster_score_m1] ls_cluster_range = [cluster_range_m1] ls_ssd = [sum_squared_distance_m1] if len(self.models) == 2: cluster_groups_m2, cluster_score_m2 = create_clusters(df[f'offset_{self.models[1]}'], num_cluster) df[f'cluster_{self.models[1]}'] = cluster_groups_m2 cluster_range_m2, sum_squared_distance_m2 = get_optimum_num_clusters(df[f'offset_{self.models[1]}'], num_cluster) ls_score.append(cluster_score_m2) ls_cluster_range.append(cluster_range_m2) ls_ssd.append(sum_squared_distance_m2) return df, ls_score, ls_cluster_range, ls_ssd elif is_classification(self.analysis_type): ls_dfs_prob, ls_class_labels = super().get_df_with_probability_values() ls_dfs_prob_misspred = [df.loc[lambda x: x['pred_state'] == 'miss-predict', :] for df in ls_dfs_prob] ls_class_labels_misspred = [list(df['yTrue'].astype('str').unique()) for df in ls_dfs_prob_misspred] if len(ls_dfs_prob) == 2: ls_class_labels_misspred = list(set(ls_class_labels_misspred[0] + ls_class_labels_misspred[1])) else: ls_class_labels_misspred = ls_class_labels_misspred[0] try: ls_class_labels_misspred = [int(label) for label in ls_class_labels_misspred] ls_class_labels_misspred.sort() except TypeError: ls_class_labels_misspred.sort() ls_score = [] ls_dfs_viz = [] ls_cluster_range = [] ls_ssd = [] for df in ls_dfs_prob_misspred: df['eff_prob_for_loss_cal'] = [df[df['yTrue'].astype('str').values[i]].values[i] for i in range(len(df))] if specific_dataset != 'All': try: df_viz = df.loc[lambda x: x['yTrue'].astype('str') == specific_dataset, :] except ValueError: pass else: df_viz = df df_temp = df_viz[['yTrue', 'eff_prob_for_loss_cal']] df_viz['lloss'] = df_temp.apply(lambda x: calculate_logloss(x['yTrue'], x['eff_prob_for_loss_cal'], log_func), axis=1) df_viz.pop('eff_prob_for_loss_cal') cluster_groups, cluster_score = create_clusters(df_viz['lloss'], num_cluster) df_viz['cluster'] = cluster_groups cluster_range, sum_squared_distance = get_optimum_num_clusters(df_viz['lloss'], num_cluster) ls_score.append(cluster_score) ls_dfs_viz.append(df_viz) ls_cluster_range.append(cluster_range) ls_ssd.append(sum_squared_distance) return ls_dfs_viz, ls_class_labels, ls_class_labels_misspred, ls_score, ls_cluster_range, ls_ssd