LEURN / LEURN.py
CaglarAytekin
auto-feature selection and realistic boundaries
74f6289
"""
@author: Caglar Aytekin
contact: caglar@deepcause.ai
"""
import torch
import torch.nn as nn
import random
import numpy as np
import pandas as pd
import copy
class CustomEncodingFunction(torch.autograd.Function):
@staticmethod
def forward(ctx, x, tau,alpha):
ctx.save_for_backward(x, tau)
# Perform the tanh operation on (x + tau)
y = torch.tanh(x + tau)
# The actual forward output : binarized output
forward_output = alpha * (2 * torch.round((y + 1) / 2) - 1) + (1-alpha)*y
return forward_output
@staticmethod
def backward(ctx, grad_output):
x, tau = ctx.saved_tensors
# Use the derivative of tanh for the backward pass: 1 - tanh^2(x + tau)
grad_input = grad_output * (1 - torch.tanh(x + tau) ** 2)
return grad_input, grad_input,None # Assuming tau also requires gradient
# Wrapping the custom function in a nn.Module for easier use
class EncodingLayer(nn.Module):
def __init__(self):
super(EncodingLayer, self).__init__()
def forward(self, x, tau,alpha):
return CustomEncodingFunction.apply(x, tau,alpha)
class LEURN(nn.Module):
def __init__(self, preprocessor,depth,droprate):
"""
Initializes the model.
Parameters:
- preprocessor: A class containing useful info about the dataset
- Including: attribute names, categorical features details, suggested embedding size for each category, output type, output dimension, transformation information
- depth: Depth of the network
- droprate: dropout rate
"""
super(LEURN, self).__init__()
#Find categorical indices and category numbers for each
self.alpha=1.0
self.preprocessor=preprocessor
self.attribute_names=preprocessor.attribute_names
self.label_encoders=preprocessor.encoders_for_nn
self.categorical_indices = [info[0] for info in preprocessor.category_details]
self.num_categories = [info[1] for info in preprocessor.category_details]
#If embedding_size is integer, cast it to all categories
if isinstance(preprocessor.suggested_embeddings, int):
embedding_sizes = [preprocessor.suggested_embeddings] * len(self.categorical_indices)
else:
assert len(preprocessor.suggested_embeddings) == len(self.categorical_indices), "Length of embedding_size must match number of categorical features"
embedding_sizes = preprocessor.suggested_embeddings
self.embedding_sizes=embedding_sizes
#Embedding layers for categorical features
self.embeddings = nn.ModuleList([
nn.Embedding(num_categories, embedding_dim)
for num_categories, embedding_dim in zip(self.num_categories, embedding_sizes)
])
for embedding_now in self.embeddings:
nn.init.uniform_(embedding_now.weight, -1.0, 1.0)
self.total_embedding_size = sum(embedding_sizes) #number of categorical features for NN
self.non_cat_input_dim = len(self.attribute_names) - len(self.categorical_indices) #Number of numerical features for NN
self.nn_input_dim = self.total_embedding_size + self.non_cat_input_dim #Number of features for NN
#LAYERS
self.tau_initial = nn.Parameter(torch.zeros(1,self.nn_input_dim)) # Initial tau as a learnable parameter
self.layers = nn.ModuleList()
self.depth = depth
self.output_type=preprocessor.output_type
for d_now in range(depth):
# Each iteration adds an encoding layer followed by a dropout and then a linear layer
self.layers.append(EncodingLayer())
self.layers.append(nn.Dropout1d(droprate))
linear_layer = nn.Linear((d_now + 1) * self.nn_input_dim, self.nn_input_dim)
self._init_weights(linear_layer,d_now+1) #special layer initialization
self.layers.append(linear_layer)
# Final stage: dropout and linear layer
self.final_dropout=nn.Dropout1d(droprate)
self.final_linear = nn.Linear(depth * self.nn_input_dim, self.preprocessor.output_dim)
self._init_weights(self.final_linear, depth)
def set_alpha(self, alpha):
"""Method to update the dynamic parameter."""
self.alpha = alpha
def _init_weights(self, layer,depth_now):
# Custom initialization
# Considering the binary (-1,1) nature of the input,
# when we initialize layer in (-1/dim,1/dim) range, output is bounded at (-1,1)
# Knowing our input is roughly at (-1,1) range, this serves as good initialization for tau
if not(self.embedding_sizes==[]):
init_tensor = torch.tensor([1/size for size in self.embedding_sizes for _ in range(size)])
if init_tensor.shape[0]<self.nn_input_dim: #Means we have numericals too
init_tensor=torch.cat((init_tensor, torch.ones(self.non_cat_input_dim)), dim=0)
else:
init_tensor = torch.ones(self.non_cat_input_dim)
init_tensor=init_tensor/((depth_now+1)*torch.tensor(len(self.attribute_names)))
init_tensor=init_tensor.unsqueeze(0).repeat_interleave(repeats=layer.weight.shape[0],dim=0).repeat_interleave(repeats=depth_now,dim=1)
layer.weight.data.uniform_(-1, 1)
layer.weight=torch.nn.Parameter(layer.weight*init_tensor)
def forward(self, x):
# Defines forward function for provided input: Normalizes numericals, embeds categoricals, and gives to neural network.
# Separate categorical and numerical features for easier handling
cat_features = [x[:, i].long() for i in self.categorical_indices]
non_cat_features = [x[:, i] for i in range(x.size(1)) if i not in self.categorical_indices]
non_cat_features = torch.stack(non_cat_features, dim=1) if non_cat_features else x.new_empty(x.size(0), 0)
# Embed categoricals
embedded_features = [embedding(cat_feature) for embedding, cat_feature in zip(self.embeddings, cat_features)]
# Combine categoricals and numericals
try:
embedded_features = torch.cat(embedded_features, dim=1)
nninput = torch.cat([embedded_features, non_cat_features], dim=1)
except:
nninput=non_cat_features
self.nninput=nninput
# Forward pass neural network
output=self.forward_from_embeddings(self.nninput)
self.output=output
return output
def forward_from_embeddings(self,x):
# Forward function for normalized numericals and embedded categoricals
tau=self.tau_initial
tau=torch.repeat_interleave(tau,x.shape[0],0) #tau is 1xF, cast it for batch
# For each depth
for i in range(0, self.depth * 3, 3):
# encode, drop and find next tau
encoding_layer = self.layers[i]
dropout_layer = self.layers[i + 1]
linear_layer = self.layers[i + 2]
#encode and drop
encoded_x =dropout_layer( encoding_layer(x, tau,self.alpha))
#save encodings and thresholds
#notice that threshold is -tau, not tau since we binarize x+tau
if i==0:
encodings=encoded_x
taus=-tau
else:
encodings=torch.cat((encodings,encoded_x),dim=-1)
taus=torch.cat((taus,-tau),dim=-1)
#find next thresholds
tau = linear_layer(encodings) #not used, redundant for last layer
self.encodings=encodings
self.taus=taus
#Final layer: drop and linear
output=self.final_linear(self.final_dropout(encodings))
return output
def find_boundaries(self, x):
"""
Given input, find boundaries for numerical features and valid categories for categorical features
Can accept unnormalized and not embedded input - set embedding False
"""
# Ensure x is the correct shape [1, input_dim]
if x.ndim == 1:
x = x.unsqueeze(0) # Add batch dimension if not present
# Perform a forward pass to update self.encodings and self.taus
# to update self.taus
self(x)
# self.taus has the shape [1, depth * input_dim]
# reshape to [depth, input_dim] for easier boundary finding
taus_reshaped = self.taus.view(self.depth, self.nn_input_dim)
# embedded and normalized input
embedded_x=self.nninput
# Initialize boundaries - numericals are in (-1,1) range and categoricals are from embeddings.
# So -100,100 is safe min and max. -inf,+inf is not chosen since problematic for later sampling
upper_boundaries = torch.full((embedded_x.size(1),), 100.0)
lower_boundaries = torch.full((embedded_x.size(1),), -100.0)
# Compare each threshold in self.taus with the corresponding input value
for feature_index in range(self.nn_input_dim):
for depth_index in range(self.depth):
threshold = taus_reshaped[depth_index, feature_index]
input_value = embedded_x[0, feature_index]
# If the threshold is greater than the input value and less than the current upper boundary, update the upper boundary
if threshold > input_value and threshold < upper_boundaries[feature_index]:
upper_boundaries[feature_index] = threshold
# If the threshold is less than the input value and greater than the current lower boundary, update the lower boundary
if threshold < input_value and threshold > lower_boundaries[feature_index]:
lower_boundaries[feature_index] = threshold
# Convert boundaries to a list of tuples [(lower, upper), ...] for each feature
boundaries = list(zip(lower_boundaries.tolist(), upper_boundaries.tolist()))
self.upper_boundaries=upper_boundaries
self.lower_boundaries=lower_boundaries
return boundaries
def categories_within_boundaries(self):
"""
For each categorical feature, checks if embedding weights fall within the specified upper and lower boundaries.
Returns a dictionary with categorical feature indices as keys and lists of category indices that fall within the boundaries.
"""
categories_within_bounds = {}
emb_st=0
for cat_index, emb_layer in zip(range(len(self.categorical_indices)), self.embeddings):
# Extract upper and lower boundaries for this categorical feature
lower_bound=self.lower_boundaries[emb_st:emb_st+self.embedding_sizes[cat_index]]
upper_bound=self.upper_boundaries[emb_st:emb_st+self.embedding_sizes[cat_index]]
emb_st=emb_st+self.embedding_sizes[cat_index]
# Initialize list to hold categories that fall within boundaries
categories_within = []
# Iterate over each embedding vector in the layer
for i, weight in enumerate(emb_layer.weight):
# Check if the embedding weight falls within the boundaries
if torch.all(weight >= lower_bound) and torch.all(weight <= upper_bound):
categories_within.append(i) # Using index i as category identifier
# Store the categories that fall within the boundaries for this feature
categories_within_bounds[cat_index] = categories_within
return categories_within_bounds
def global_importance(self):
final_layer_weight=torch.clone(self.final_linear.weight).detach().numpy()
importances=np.sum(np.abs(final_layer_weight),0)
importances=importances.reshape(importances.shape[0]//self.nn_input_dim,self.nn_input_dim)
importances=np.sum(importances,0)
importances_features=[]
st=0
for i in range(len(self.attribute_names)):
try:
importances_features.append(np.sum(importances[st:st+self.embedding_sizes[i]]))
st=st+self.embedding_sizes[i]
except:
st=st+1
return np.argsort(importances_features)[::-1],np.sort(importances_features)[::-1]
def influence_matrix(self):
"""
Finds ADG from how each feature effects other's threshold via weight matrices
"""
def create_block_sum_matrix(sizes, matrix):
L = len(sizes)
# Initialize the output matrix with zeros, using PyTorch
block_sum_matrix = torch.zeros((L, L))
# Define the starting row and column indices for slicing
start_row = 0
for i, row_size in enumerate(sizes):
start_col = 0
for j, col_size in enumerate(sizes):
# Calculate the sum of the current block using PyTorch
block_sum = torch.sum(matrix[start_row:start_row+row_size, start_col:start_col+col_size])
block_sum_matrix[i, j] = block_sum
# Update the starting column index for the next block in the row
start_col += col_size
# Update the starting row index for the next block in the column
start_row += row_size
return block_sum_matrix
def add_ones_until_target(initial_list, target_sum):
# Continue adding 1s until the sum of the list equals the target sum
while sum(initial_list) < target_sum:
initial_list.append(1)
return initial_list
for i in range(0, self.depth * 3, 3):
# encode, drop and find next tau
weight_now=self.layers[i + 2].weight
weight_now_reshaped=weight_now.reshape((weight_now.shape[0], weight_now.shape[1]//self.nn_input_dim,self.nn_input_dim)) #shape: output x depth x input
if i==0:
# effects=np.sum(np.abs(weight_now_reshaped.numpy()),axis=1)/self.depth #shape: output x input
effects=torch.sum(torch.abs(weight_now_reshaped), dim=1) / self.depth
else:
effects=effects+torch.sum(torch.abs(weight_now_reshaped), dim=1) / self.depth
effects=effects.t() #shape: input x output
modified_list = add_ones_until_target(copy.deepcopy(self.embedding_sizes), effects.shape[0])
effects=create_block_sum_matrix(modified_list,effects)
return effects
def explain_without_causal_effects(self,x):
"""
Explains decisions of the neural network for input sample.
For numericals, extracts upper and lower boundaries on the sample
For categoricals displays possible categories
Also calculates contributions of each feature to final result
"""
self.find_boundaries(x) #find upper, lower boundaries for all nn inputs
#find valid categories for categorical features
valid_categories=self.categories_within_boundaries()
#numerical boundaries
upper_numerical=self.upper_boundaries[sum(self.embedding_sizes):].detach().numpy()
lower_numerical=self.lower_boundaries[sum(self.embedding_sizes):].detach().numpy()
#Find contribution from each feature in final linear layer, distribute bias evenly
contributions=self.encodings * self.final_linear.weight + self.final_linear.bias.unsqueeze(dim=-1)/self.final_linear.weight.shape[1]
contributions=contributions.detach().resize_((contributions.shape[0], contributions.shape[1]//self.nn_input_dim,self.nn_input_dim))
contributions=torch.sum(contributions,dim=1)
# Initialize an empty list to store the summed contributions
summed_contributions = []
# Initialize start index for slicing
start_idx = 0
#Sum contribution of each categorical within respective embedding
for size in self.embedding_sizes:
# Calculate end index for the current chunk
end_idx = start_idx + size
# Sum the contributions in the current chunk
chunk_sum = contributions[:, start_idx:end_idx].sum(dim=1, keepdim=True)
# Append the summed chunk to the list
summed_contributions.append(chunk_sum)
# Update the start index for the next chunk
start_idx = end_idx
# If there are remaining elements not covered by embedding_sizes, add them as is (numerical features)
if start_idx < contributions.shape[1]:
remaining = contributions[:, start_idx:]
summed_contributions.append(remaining)
# Concatenate the summed contributions back into a tensor
summed_contributions = torch.cat(summed_contributions, dim=1)
# This is to handle multi-class explanations, for binary this is 0 automatically
# Note: multi-output regression is not supported yet. This will just return largest regressed value's explanations
highest_index=torch.argmax(summed_contributions.sum(dim=1))
# This is contribution from each feature
result=summed_contributions[highest_index]
self.result=result
#Explanation and Contribution formats are in ordered format (categoricals first, numericals later)
#Bring them to original format in user input
#Combine categoricals and numericals explanations and contributions
Explanation = [None] * (len(self.categorical_indices) + len(upper_numerical))
Contribution = np.zeros((len(self.categorical_indices) + len(upper_numerical),))
# Fill in the categorical samples
for j, cat_index in enumerate(self.categorical_indices):
Explanation[cat_index] = valid_categories[j]
Contribution[cat_index] = result[j].numpy()
#INVERSE TRANSFORM PART 1-------------------------------------------------------------------------------------------
#Inverse transform upper and lower_numericals
len_num=len(upper_numerical)
if len_num>0:
upper_numerical=self.preprocessor.scaler.inverse_transform(upper_numerical.reshape(1,-1))
lower_numerical=self.preprocessor.scaler.inverse_transform(lower_numerical.reshape(1,-1))
if len_num>1:
upper_numerical=np.squeeze(upper_numerical)
lower_numerical=np.squeeze(lower_numerical)
upper_iter = iter(upper_numerical)
lower_iter = iter(lower_numerical)
cnt=0
for i in range(len(Explanation)):
if Explanation[i] is None:
#Note the denormalization here
Explanation[i] = next(lower_iter),next(upper_iter)
if len(self.categorical_indices)>0:
Contribution[i] = result[j+cnt+1].numpy()
else:
Contribution[i] = result[cnt].numpy()
cnt=cnt+1
attribute_names_list = []
revised_explanations_list = []
contributions_list = []
# Process each feature to fill lists
for idx, attr_name in enumerate(self.attribute_names):
if isinstance(Explanation[idx], list): # Categorical
#INVERSE TRANSFORM PART 2-------------------------------------------------------------------------------------------
#Inverse transform categoricals
category_names = [key for key, value in self.label_encoders[attr_name].items() if value in Explanation[idx]]
revised_explanation = " ,OR, ".join(category_names)
elif isinstance(Explanation[idx], tuple): # Numerical
revised_explanation = f"{Explanation[idx][0].item()} to {Explanation[idx][1].item()}"
else:
revised_explanation = "Unknown" #shouldn't really happen
# Append to lists
attribute_names_list.append(attr_name)
revised_explanations_list.append(revised_explanation)
contributions_list.append(Contribution[idx] if idx < len(Contribution) else None)
# Construct DataFrame
Explanation_df = pd.DataFrame({
'Name': attribute_names_list,
'Category': revised_explanations_list,
'Contribution': contributions_list
})
result=self.preprocessor.inverse_transform_y(self.output)
# Explanation_df['Result'] = [result] * len(Explanation_df)
return copy.deepcopy(Explanation_df),self.output.clone(),copy.deepcopy(result),copy.deepcopy(Explanation)
def explain(self,x,include_causal_analysis=False):
"""
Fixes all features but one, sweeps that feature across its own categories, reports the average change from other categories to current one.
"""
def update_intervals(available_intervals, incoming_interval):
updated_intervals = []
for interval in available_intervals:
if incoming_interval[1] <= interval[0] or incoming_interval[0] >= interval[1]:
# The incoming interval does not overlap, keep the interval as is
updated_intervals.append(interval)
else:
# There is some overlap, possibly split the interval
if incoming_interval[0] > interval[0]:
# Add the left part that doesn't overlap
updated_intervals.append((interval[0], incoming_interval[0]))
if incoming_interval[1] < interval[1]:
# Add the right part that doesn't overlap
updated_intervals.append((incoming_interval[1], interval[1]))
return updated_intervals
def sample_from_intervals(available_intervals):
if not available_intervals:
return None
# Choose a random interval
chosen_interval = random.choice(available_intervals)
# Sample a random point within this interval
return random.uniform(chosen_interval[0], chosen_interval[1])
Explanation_df,output,result,Explanation=self.explain_without_causal_effects(x)
if include_causal_analysis:
# Causal analysis
causal_effect=np.zeros((x.shape[-1],))
numerical_cnt=0
for idx, attr_name in enumerate(self.attribute_names):
if isinstance(Explanation[idx], list): # Categorical
all_category_names = [value for key, value in self.label_encoders[attr_name].items()]
sweeped_category_names = [value for key, value in self.label_encoders[attr_name].items() if value in Explanation[idx]]
if list(set(all_category_names)-set(sweeped_category_names)) == []:
is_category_empty=True
else:
is_category_empty=False
cnt=0
while is_category_empty==False:
new_x=x.clone()
next_category=list(set(all_category_names)-set(sweeped_category_names))[0]
new_x[0,idx]=float(next_category)
Explanation_df_new,output_new,result_new,Explanation_new=self.explain_without_causal_effects(new_x)
sweeped_category_names = sweeped_category_names+[value for key, value in self.label_encoders[attr_name].items() if value in Explanation_new[idx]]
if list(set(all_category_names)-set(sweeped_category_names)) == []:
is_category_empty=True
else:
is_category_empty=False
causal_effect[idx]=causal_effect[idx]+(output-output_new).detach().numpy()[0,0]
cnt=cnt+1
if cnt>0:
causal_effect[idx]=causal_effect[idx]/cnt
else:
search_complete=False
# Initial available interval . we know -100,100 from initial setting up lower, upper bounds
available_intervals = [(-100, 100)]
# Example incoming intervals
#numerical boundaries
self.explain_without_causal_effects(x)
upper_numerical=self.upper_boundaries[sum(self.embedding_sizes):].detach().numpy()
lower_numerical=self.lower_boundaries[sum(self.embedding_sizes):].detach().numpy()
incoming_interval = (lower_numerical[numerical_cnt],upper_numerical[numerical_cnt])
available_intervals = update_intervals(available_intervals, incoming_interval)
cnt=0
while not(search_complete):
new_sample=sample_from_intervals(available_intervals)
new_x=x.clone()
new_x[0,idx]=new_sample
Explanation_df_new,output_new,result_new,Explanation_new=self.explain_without_causal_effects(new_x)
causal_effect[idx]=causal_effect[idx]+(output-output_new).detach().numpy()[0,0]
cnt=cnt+1
upper_numerical=self.upper_boundaries[sum(self.embedding_sizes):].detach().numpy()
lower_numerical=self.lower_boundaries[sum(self.embedding_sizes):].detach().numpy()
incoming_interval = (lower_numerical[numerical_cnt],upper_numerical[numerical_cnt])
available_intervals = update_intervals(available_intervals, incoming_interval)
if available_intervals == []:
search_complete=True
if cnt>0:
causal_effect[idx]=causal_effect[idx]/cnt
numerical_cnt=numerical_cnt+1
Explanation_df['Causal Effects'] = causal_effect
return Explanation_df,output,result
def sample_from_boundaries(self):
"""
Assumes higher and lower boundaries are already extracted (eg self.explain is run on one input already)
Samples a value for each feature within the specified upper and lower boundaries stored in the class instance.
For numericals, samples a value, for categoricals samples a category from possible categories
Returns:
- A tensor containing sampled values within the given boundaries for each feature.
"""
#First sample from categories
categories_within_bounds=self.categories_within_boundaries()
try:
sampled_indices = [random.choice(categories) for categories in categories_within_bounds.values()]
except:
categories_within_bounds=self.categories_within_boundaries()
#Then from numericals
samples = []
cnt=0
for lower, upper in zip(self.lower_boundaries[sum(self.embedding_sizes):], self.upper_boundaries[sum(self.embedding_sizes):]):
# Sample from a uniform distribution between lower and upper boundaries
upper=torch.minimum(upper.detach(),torch.from_numpy(np.array(1.0).astype('float32')))
lower=torch.maximum(lower.detach(),torch.from_numpy(np.array(-1.0).astype('float32')))
sample = lower + (upper - lower) * torch.rand(1)
samples.append(sample)
cnt=cnt+1
#Combine categoricals and numericals
# Initialize an empty list to hold the combined samples
combined_samples = [None] * (len(self.categorical_indices) + len(samples))
# Fill in the categorical samples
for i, cat_index in enumerate(self.categorical_indices):
combined_samples[cat_index] = torch.tensor([sampled_indices[i]], dtype=torch.float)
# Fill in the numerical samples
num_samples_iter = iter(samples)
for i in range(len(combined_samples)):
if combined_samples[i] is None:
combined_samples[i] = next(num_samples_iter)
# Combine into a single tensor
combined_tensor = torch.cat(combined_samples, dim=-1)
return combined_tensor.unsqueeze(dim=0)
def generate(self):
"""
Generates a data sample from learned network
"""
def sample_with_tau(tau,max_bound,min_bound):
# Sample according to tau, lower and upper bounds
sampled=torch.zeros((self.nn_input_dim))
st=0
# Randomly pick from valid categories
for embedding in self.embeddings:
categories_within = []
# Iterate over each embedding vector in the layer
for i, weight in enumerate(embedding.weight):
# Check if the embedding weight falls within the boundaries
if torch.all(weight >= min_bound[st:st+embedding.weight.shape[-1]]) and torch.all(weight <= max_bound[st:st+embedding.weight.shape[-1]]):
categories_within.append(i) # Using index i as category identifier
feature_now=embedding.weight[np.random.choice(categories_within),:]
cnt=0
for j in range(st,st+embedding.weight.shape[-1]):
if feature_now[cnt]>-tau[0,j]:
sampled[j]=1.0
elif feature_now[cnt]<=-tau[0,j]:
sampled[j]=-1.0
cnt=cnt+1
st=st+embedding.weight.shape[-1]
#Randomly sample for numericals
for i in range(st,self.nn_input_dim):
if -tau[0,i]>max_bound[i]: #In this case you have to pick -1
sampled[i]=-1.0
elif -tau[0,i]<=min_bound[i]: #In this case you have to pick 1
sampled[i]=1.0
else:
sampled[i] = (torch.randint(low=0, high=2, size=(1,)) * 2 - 1).float()
return sampled
def bound_update(tau,max_bound,min_bound,sampled):
for i in range(self.nn_input_dim):
if sampled[i]>0: #means input is larger than -tau, so -tau might set a lower bound
if -tau[0,i]>min_bound[i]:
min_bound[i]=-tau[0,i]
elif sampled[i]<=0: #means input is smaller than -tau, so -tau might set an upper bound
if -tau[0,i]<max_bound[i]:
max_bound[i]=-tau[0,i]
return max_bound,min_bound
# Read first tau
tau=self.tau_initial
# Set initial maximum and minimum bounds
max_bound=torch.zeros((self.nn_input_dim))+100.0
min_bound=torch.zeros((self.nn_input_dim))-100.0
for i in range(0, self.depth * 3, 3):
encoding_layer = self.layers[i] #NOT USED HERE, WE ENCODE RANDOMLY MANUALLY
dropout_layer = self.layers[i + 1]
linear_layer = self.layers[i + 2]
#Sample with current tau
sample_now=sample_with_tau(tau,max_bound,min_bound)
#Update bounds with new sample
max_bound,min_bound=bound_update(tau,max_bound,min_bound,sample_now)
encoded_x = dropout_layer(sample_now.unsqueeze(dim=0))
if i==0:
encodings=encoded_x
taus=-tau
else:
encodings=torch.cat((encodings,encoded_x),dim=-1)
taus=torch.cat((taus,-tau),dim=-1)
tau = linear_layer(encodings) #not used for last layer
self.encodings=encodings
self.taus=taus
self.upper_boundaries=torch.clone(max_bound)
self.lower_boundaries=torch.clone(min_bound)
generated_sample=self.sample_from_boundaries()
##Check if manually found and network generated boundaries are same
# if torch.equal(self.upper_boundaries,max_bound) and torch.equal(self.lower_boundaries,min_bound):
# print(True)
self.explain_without_causal_effects(generated_sample)
generated_sample_original_format=self.preprocessor.inverse_transform_X(generated_sample)
result=self.preprocessor.inverse_transform_y(self.output)
generated_sample_original_format['prediction']=result
return generated_sample,generated_sample_original_format,result
def generate_from_same_category(self,x):
self.explain_without_causal_effects(x)
generated_sample=self.sample_from_boundaries()
generated_sample_original_format=self.preprocessor.inverse_transform_X(generated_sample)
result=self.preprocessor.inverse_transform_y(self.output)
return generated_sample,generated_sample_original_format,result