[go: up one dir, main page]

Menu

[r49]: / minimclass.py  Maximize  Restore  History

Download this file

167 lines (147 with data), 6.7 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import math
from model import Model
class Minim(object):
"""
Main Minimization Class
-------------------------------------------------------------------------------
Instanciation: Minim(allocations=None, groups=None, new_cae=None)
'allocations' is a list of cases.
Each 'case' is a dictionairy of 'levels' and 'allocation' and optianlly other info.
Levels element is a list, with ith element is the level of ith var
allocation is the allocation of the case: 0 to n-1, -1 = unalocated
new_case is one case, with allocation = -1
"""
def __init__(self, random, model=None, new_cae=None):
self.random = random
self._allocations = model.allocations
self._prob_method = model.prob_method
self._distance_measure = model.distance_measure
self._groups = model.groups
self._variables = model.variables
self._variables_weight = model.variables_weight
self._allocation_ratio = model.allocation_ratio
self._high_prob = model.high_prob
self._min_group = model.min_group
@property
def allocations(self):
return self._allocations
@allocations.setter
def allocations(self, allocations):
self._allocations = allocations
@property
def groups(self):
return self._groups
@groups.setter
def groups(self, groups):
self._groups = groups
def build_probs(self, min_high_prob=None, min_group=0):
"""
This is only for bcm method of probability assignment
min_high_prob: The high probability value for the group with
lowest allocation ratio
min_group: index of the group with the lowest probability ratio
for Naive method group rows are similar
"""
if min_high_prob == None:
min_high_prob = self._high_prob
self._probs = [[0] * len(self._groups) for g in range(len(self._groups))]
self._probs[min_group][min_group] = min_high_prob
for r in self._groups:
if r == min_group:
continue
numerator = sum([self._allocation_ratio[row] for row in range(len(self._groups)) if row != r])
denominator = sum([self._allocation_ratio[row] for row in range(len(self._groups)) if row != min_group])
if self._prob_method == Model.BCM:
self._probs[r][r] = 1.0 - (1.0 * numerator / denominator) * (1.0 - min_high_prob)
elif self._prob_method == Model.NM:
self._probs[r][r] = min_high_prob
for r in self._groups:
for c in self._groups:
if r == c:
continue
H = self._probs[r][r]
numerator = self._allocation_ratio[c]
denominator = sum([self._allocation_ratio[col] for col in range(len(self._groups)) if col != r])
if self._prob_method == Model.BCM:
self._probs[r][c] = (1.0 * numerator / denominator) * (1.0 - H)
elif self._prob_method == Model.NM:
self._probs[r][c] = 1.0 * (1.0 - min_high_prob) / (len(self._groups) - 1.0)
def enroll(self, case, freq_table=None):
if freq_table:
self.freq_table = freq_table
else:
self.build_freq_table()
new_levels = case['levels']
level_count = [[] for v in self._variables]
# add the freq_table to level count
for row in self.freq_table:
for variable, level in enumerate(new_levels):
level_count[variable].append(row[variable][level])
scores = []
for g in self._groups:
scores.append(sum([1.0 * self._variables_weight[v] * self.get_imbalance_score(level_count[v], g) for v in range(len(self._variables))]))
# indices of minimum score values
min_indices = self.get_min_ties_index(scores)
if len(min_indices) == len(self._groups):
# all treatment have same score
# so build a probs based on allocation ratio
probs = self._allocation_ratio
else:
# indices of prefered treatment(s)
# randomly selecting an index
pt = self.random.choice(min_indices)
probs = self._probs[pt]
case['allocation'] = self.get_rand_biased(probs)
self.build_freq_table()
return case['allocation']
def get_rand_biased(self, probs):
p = self.random.uniform(0, sum(probs))
for i in range(len(probs)):
if p < sum(probs[:i+1]):
return i
def get_min_ties_index(self, lst):
"""get indices of min values of the input list"""
L, ret = min(lst), []
for idx, item in enumerate(lst):
# this pair is equal
if abs(item - L) < sys.float_info.epsilon:
# so take it
ret.append(idx)
return ret
def get_marginal_balance(self, count):
numerator = sum([abs(count[i] - count[j]) for i in range(len(count)-1) for j in range(i+1, len(count))])
denominator = (len(count)-1) * sum(count)
if denominator == 0: return 0.0
return (1.0 * numerator) / denominator
def get_imbalance_score(self, count, group, enroll=True):
if enroll: count[group] += 1
adj_count = [(1.0 * count[i]) / self._allocation_ratio[i] for i in range(len(count))]
if self._distance_measure == Model.MB:
ret = self.get_marginal_balance(adj_count)
elif self._distance_measure == Model.rng:
ret = max(adj_count) - min(adj_count)
elif self._distance_measure == Model.var:
ret = self.get_variance(adj_count)
elif self._distance_measure == Model.SD:
ret = self.get_standard_deviation(adj_count)
if enroll: count[group] -= 1
return ret
def get_standard_deviation(self, count):
return math.sqrt(self.get_variance(count))
def get_variance(self, count):
mean = 1.0 * sum(count) / len(count)
sq_terms = sum([(i - mean)**2 for i in count])
return 1.0 * sq_terms / (len(count) - 1.0)
def build_freq_table(self):
table = [[[0 for l in v] for v in self._variables] for g in self._groups]
self.freq_table = table
if not self._allocations: return
for case in self._allocations:
if not case.has_key('allocation'): return
group = case['allocation']
for variable, level in enumerate(case['levels']):
table[group][variable][level] += 1
self.freq_table = table