#! /usr/bin/env python
# -*- coding: utf-8 -*-
import random
import sys
import math
from model import Model
random.seed()
class Minim(object):
"""
Main Minimization Class
-------------------------------------------------------------------------------
Instanciation: Minim(allocations=None, groups=None, new_cae=None)
'allocations' is a list of cases.
Each 'case' is a dictionairy of 'levels' and 'allocation' and optianlly other info.
Levels element is a list, with ith element is the level of ith var
allocation is the allocation of the case: 0 to n-1, -1 = unalocated
new_case is one case, with allocation = -1
"""
def __init__(self, model=None, new_cae=None):
self._demo_mode = False
self._allocations = model.allocations
self._prob_method = model.prob_method
self._distance_measure = model.distance_measure
self._groups = model.groups
self._variables = model.variables
self._variables_weight = model.variables_weight
self._allocation_ratio = model.allocation_ratio
self._high_prob = model.high_prob
self._min_group = model.min_group
def demo(self, demo=True):
if demo:
self._demo_mode = True
self._groups = [0, 1, 2]
self._allocations = []
self._allocation_ratio = [1, 2, 3]
#self.freq_table = [[[18, 25], [6, 3, 3, 4, 3, 6, 4, 5, 5, 4]], [[39, 45], [12, 8, 7, 7, 7, 9, 6, 12, 11, 5]], [[55, 68], [17, 12, 10, 14, 10, 13, 8, 16, 15, 8]]]
self.freq_table = [[[0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], [[0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], [[0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]]
self._variables = [[0, 1], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]
self.build_probs(0.7, 0)
else:
self._demo_mode = False
@property
def allocations(self):
return self._allocations
@allocations.setter
def allocations(self, allocations):
self._allocations = allocations
@property
def groups(self):
return self._groups
@groups.setter
def groups(self, groups):
self._groups = groups
def build_probs(self, min_high_prob=None, min_group=0):
"""
This is only for bcm method of probability assignment
min_high_prob: The high probability value for the group with
lowest allocation ration
min_group: index of the group with the lowest probability ratio
for Naive method group rows are similar
"""
if min_high_prob == None:
min_high_prob = self._high_prob
self._probs = [[0] * len(self._groups) for g in range(len(self._groups))]
self._probs[min_group][min_group] = min_high_prob
for r in self._groups:
if r == min_group:
continue
numerator = sum([self._allocation_ratio[row] for row in range(len(self._groups)) if row != r])
denominator = sum([self._allocation_ratio[row] for row in range(len(self._groups)) if row != min_group])
if self._prob_method == Model.BCM:
self._probs[r][r] = 1.0 - (1.0 * numerator / denominator) * (1.0 - min_high_prob)
elif self._prob_method == 'NM':
self._probs[r][r] = min_high_prob
for r in self._groups:
for c in self._groups:
if r == c:
continue
H = self._probs[r][r]
numerator = self._allocation_ratio[c]
denominator = sum([self._allocation_ratio[col] for col in range(len(self._groups)) if col != r])
if self._prob_method == Model.BCM:
self._probs[r][c] = (1.0 * numerator / denominator) * (1.0 - H)
elif self._prob_method == 'NM':
self._probs[r][c] = 1.0 * (1.0 - min_high_prob) / (len(self._groups) - 1.0)
def enroll(self, case, freq_table=None):
if freq_table:
self.freq_table = freq_table
else:
self.build_freq_table()
new_levels = case['levels']
level_count = [[] for g in self._variables]
for row in self.freq_table:
for variable, level in enumerate(new_levels):
level_count[variable].append(row[variable][level])
scores = []
for g in self._groups:
scores.append(sum([1.0 * self._variables_weight[v] * self.get_imbalance_score(level_count[v], g) for v in range(len(self._variables))]))
ret = self.get_min_ties_index(scores)
if len(ret) == len(self._groups):
# all treatment the same
# build a probs based on allocation ratio
probs = self._allocation_ratio
else:
bt = random.choice(ret)
l = self._probs[bt]
probs = [int(i*10**max([len(str(i).partition('.')[2]) for i in l])) for i in l]
pool = []
for idx, ar in enumerate(probs):
pool.extend([idx] * ar)
random.shuffle(pool)
case['allocation'] = random.choice(pool)
self.build_freq_table()
return case['allocation']
def get_min_ties_index(self, lst):
"""get indices of max valuse of the input list"""
L, ret = min(lst), []
for idx, item in enumerate(lst):
# this pair is equal
if abs(item - L) < sys.float_info.epsilon:
# so take it
ret.append(idx)
return ret
def get_marginal_balance(self, count):
numerator = sum([abs(count[i] - count[j]) for i in range(len(count)-1) for j in range(i+1, len(count))])
denominator = (len(count)-1) * sum(count)
if denominator == 0: return 0.0
return (1.0 * numerator) / denominator
def get_imbalance_score(self, count, group, enroll=True):
if enroll: count[group] += 1
adj_count = [(1.0 * count[i]) / self._allocation_ratio[i] for i in range(len(count))]
if self._distance_measure == Model.MB:
ret = self.get_marginal_balance(adj_count)
elif self._distance_measure == Model.rng:
ret = max(adj_count) - min(adj_count)
elif self._distance_measure == Model.var:
ret = self.get_variance(adj_count)
elif self._distance_measure == Model.SD:
ret = self.get_standard_deviation(adj_count)
if enroll: count[group] -= 1
return ret
def get_standard_deviation(self, count):
return math.sqrt(self.get_variance(count))
def get_variance(self, count):
mean = 1.0 * sum(count) / len(count)
sq_terms = sum([(i - mean)**2 for i in count])
return 1.0 * sq_terms / (len(count) - 1.0)
def build_freq_table(self):
if self._demo_mode: return
table = [[[0 for l in v] for v in self._variables] for g in self._groups]
self.freq_table = table
if not self._allocations: return
for case in self._allocations:
if not case.has_key('allocation'): return
group = case['allocation']
for variable, level in enumerate(case['levels']):
table[group][variable][level] += 1
self.freq_table = table