Source code for rhf.rhf

"""Main module."""

from scipy.stats import kurtosis
import numpy as np
import pandas as pd

[docs]def get_kurtosis_feature_split(data): """ Get attribute split according to Kurtosis Split :param data: the dataset of the node :returns: - feature_index: the attribute index to split - feature_split: the attribute value to split """ kurtosis_values = kurtosis(data.astype(np.float64), fisher=False) kurtosis_values_log = np.log(kurtosis_values+1) kurtosis_values_sum_log = kurtosis_values_log.sum() while True: random_value_feature = np.random.uniform(0, kurtosis_values_sum_log) feature_index = np.digitize(random_value_feature, np.cumsum(kurtosis_values_log)) min_ = np.min(data[feature_index]) max_ = np.max(data[feature_index]) feature_split = np.random.uniform(min_, max_) if min_ < feature_split < max_: break return feature_index, feature_split
[docs]def get_random_feature_split(data): """ Get attribute split according to Random Split :param data: the dataset of the node :returns: - feature_index: the attribute index to split - feature_split: the attribute value to split """ choices = list(range(data.shape[1])) np.random.shuffle(choices) while len(choices) > 0: attribute = choices.pop() min_attribute = np.min(data[attribute]) max_attribute = np.max(data[attribute]) if min_attribute != max_attribute: while True: split_value = np.random.uniform(min_attribute, max_attribute) if min_attribute < split_value < max_attribute: break break return attribute, split_value
[docs]class Node(object): """ Node object """ def __init__(self): super(Node, self).__init__() self.left = None self.right = None self.split_value = None self.split_feature = None self.attribute = None self.data = None self.depth = None self.size = None self.index = None self.type = 0 self.parent = None
[docs]class Root(Node): """ Node (Root) object """ def __init__(self): super().__init__() self.depth = 0 self.index = 0
[docs]class RandomHistogramTree(object): """ Random Histogram Tree object :param int max_height: max height of the tree :param bool split_criterion: split criterion to use: 'kurtosis' or 'random' """ def __init__(self, data = None, max_height = None, split_criterion='kurtosis'): super(RandomHistogramTree, self).__init__() self.N = 0 self.leaves = [] self.max_height = max_height self.split_criterion = split_criterion if data is not None: self.build_tree(data) else: sys.exit('Error data')
[docs] def generate_node(self, depth=None, parent=None): """ Generates a new new :param int depth: depth of the node :param Node parent: parent node """ self.N += 1 node = Node() node.depth = depth node.index = self.N node.parent = parent return node
[docs] def set_leaf(self, node, data): """ Transforms generic node into leaf :param node: generic node to transform into leaf :param data: node data used to define node size and data indexes corresponding to node """ node.type = 1 node.size = data.shape[0] node.data_index = data.index self.leaves.append(node)
[docs] def build(self, node, data): """ Function which recursively builds the tree :param node: current node :param data: data corresponding to current node """ # node.data_index = data.index if data.shape[0] == 0: self.error_node = node if data.shape[0] <= 1 : self.set_leaf(node, data) return if data.duplicated().sum() == data.shape[0] - 1: self.set_leaf(node, data) return if node.depth >= self.max_height: self.set_leaf(node, data) return if self.split_criterion == 'kurtosis': attribute, value = get_kurtosis_feature_split(data) elif self.split_criterion == 'random': attribute, value = get_random_feature_split(data) else: sys.exit('Error: Unknown split criterion') node.left = self.generate_node(depth = node.depth+1, parent = node) node.right = self.generate_node(depth = node.depth+1, parent = node) node.attribute = attribute node.value = value self.build(node.left, data[data[attribute] < value]) self.build(node.right, data[data[attribute] >= value])
[docs] def build_tree(self, data): """ Build tree function: generates the root node and successively builds the tree recursively :param data: the dataset """ self.tree_ = Root() self.build(self.tree_, data)
# def get_leaves(self, node, leaves): # if node.type == 1: # leaves.append(node) # return # self.get_leaves(node.left, leaves) # self.get_leaves(node.right, leaves)
[docs]class RHF(object): """ Random Histogram Forest. Builds and ensemble of Random Histogram Trees :param int num_trees: number of trees :param int max_height: maximum height of each tree :param str split_criterion: split criterion to use - 'kurtosis' or 'random' :param bool check_duplicates: check duplicates in each leaf """ def __init__(self, num_trees = 100, max_height = 5, split_criterion='kurtosis', check_duplicates=True): super(RHF, self).__init__() self.num_trees = num_trees self.max_height = max_height self.has_duplicates = False self.check_duplicates = check_duplicates self.split_criterion = split_criterion
[docs] def fit(self, data): """ Fit function: builds the ensemble and returns the scores :param data: the dataset to fit :return scores: anomaly scores """ data = pd.DataFrame(data) self.check_hash(data) self.forest = [] partial_scores = [] scores = np.zeros(data.shape[0]) for tree_id in range(self.num_trees): randomHistogramTree = RandomHistogramTree( data=data, max_height=self.max_height, split_criterion=self.split_criterion ) # self.forest.append(randomHistogramTree) if self.has_duplicates: for leaf in randomHistogramTree.leaves: samples_indexes = leaf.data_index p = self.data_hash[samples_indexes].nunique()/self.uniques_ scores[samples_indexes] += np.log(1/(p)) else: for leaf in randomHistogramTree.leaves: samples_indexes = leaf.data_index p = leaf.size/self.uniques_ scores[samples_indexes] += np.log(1/(p)) self.scores = scores return self.scores
[docs] def check_hash(self, data): """ Checks if there are duplicates in the dataset :param data: dataset """ if self.check_duplicates: if data.duplicated().sum() > 0: self.has_duplicates = True self.get_hash(data) self.uniques_ = self.data_hash.nunique() else: self.uniques_ = data.shape[0] else: self.uniques_ = data.shape[0]
[docs] def get_hash(self, data): """ Builds hash of data for duplicates identification :param data: dataset """ self.data_hash = data.apply(lambda row: hash('-'.join([str(x) for x in row])), axis=1)