#!/usr/bin/python
#
# This file is part of fluid3d - a software for image registration
#
# Copyright (c) Madrid 2013 Gert Wollny
#
# fluid3d is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with fluid3d; if not, see <http://www.gnu.org/licenses/>.
#
#
import pyopencl as cl
import pyopencl.array as cl_array
import numpy as np
from math import cos
from math import sin
from math import fabs
from math import sqrt
import unittest
from ssd import CostSSD
from cldevice import CLDevice
from OpenCLHelpers import get_cl_devices
class TestSSD(unittest.TestCase):
def setUp(self):
self.nx = 8
self.ny = 8
self.nz = 12
studyx = self.__fill_cos_row(self.nx, 0)
studyy = self.__fill_cos_row(self.ny, 1)
studyz = self.__fill_cos_row(self.nz, 2)
self.study = np.asarray(studyz * studyx * studyy, order='F')
self.reference = np.empty([self.nx,self.ny,self.nz], dtype=np.float32, order='F')
self.reference.fill(1.0)
self.imgformat = cl.ImageFormat(cl.channel_order.INTENSITY, cl.channel_type.FLOAT)
self.expect = np.sum(np.square(np.subtract(self.study, self.reference)))
self.test_g = np.multiply(np.subtract(self.study, self.reference), np.gradient( self.study))
def createImageInContext(self, ctx, img):
return cl.Image(ctx, cl.mem_flags.READ_ONLY |cl.mem_flags.COPY_HOST_PTR,
self.imgformat, None, None, img)
def createImageBufferInContext(self, ctx, img):
nelm = img.shape[0] * img.shape[1] * img.shape[2]
hbuf = np.reshape(img, (1,1, nelm))[0][0]
return cl.Buffer(ctx, cl.mem_flags.READ_ONLY|cl.mem_flags.COPY_HOST_PTR, 4 * nelm, hbuf)
def __run_test_on_device(self, d):
device = CLDevice(d)
study_dev = self.createImageInContext(device.context, self.study)
ref_dev = self.createImageInContext(device.context, self.reference)
ssd = CostSSD(device)
self.__checkClose(ssd.cost(study_dev, ref_dev, None), self.expect, "cost value")
nelm = self.study.shape[0] * self.study.shape[1] * self.study.shape[2]
grad_dev = device.get_rw_buffer(4 * 4 * nelm)
self.__checkClose(ssd.cost(study_dev, ref_dev, grad_dev), self.expect, "cost value")
grad = np.empty(4 * nelm, dtype=np.float32)
cl.enqueue_copy(device.queue, grad, grad_dev).wait()
for z in range(self.nz-1)[1:]:
zi = self.ny * z
for y in range(self.ny-1)[1:]:
yi = (zi + y) * self.nx
for i in range(3):
for x in range(self.nx -1)[1:]:
xi = (yi + x) * 4
self.__checkClose(grad[xi+i], self.test_g[i][x][y][z],
"{0},{1},{2},{3}".format(x,y,z,i), False)
def __run_test_on_device_with_buffers(self, d):
device = CLDevice(d)
study_dev = self.createImageBufferInContext(device.context, self.study)
ref_dev = self.createImageBufferInContext(device.context, self.reference)
ssd = CostSSD(device)
self.__checkClose(ssd.cost_from_buf(self.study.shape, study_dev, ref_dev, None), self.expect , "cost value")
nelm = self.study.shape[0] * self.study.shape[1] * self.study.shape[2]
grad_dev = device.get_rw_buffer(4 * 4 * nelm)
self.__checkClose(ssd.cost_from_buf(self.study.shape, study_dev, ref_dev, grad_dev), self.expect, "cost value" )
grad = np.empty(4 * nelm, dtype=np.float32)
cl.enqueue_copy(device.queue, grad, grad_dev).wait()
errors = [0,0,0]
for z in range(self.nz-1)[1:]:
zi = self.ny * z
for y in range(self.ny-1)[1:]:
yi = (zi + y) * self.nx
for x in range(self.nx -1)[1:]:
xi = (yi + x) * 4
for i in range(3):
self.__checkClose(grad[xi+i], self.test_g[i][x][y][z], "{0},{1},{2},{3}".format(x,y,z,i), False)
errors[i] = errors[i] + 1
l = grad[xi+3]
lt = sqrt(grad[xi] * grad[xi] + grad[xi+1] * grad[xi+1] + grad[xi+2] * grad[xi+2])
self.__checkClose(l, lt, "norm", False)
def __run_test_on_device_with_mixed(self, device):
device = CLDevice(d)
study_dev = self.createImageInContext(device.context, self.study)
ref_dev = self.createImageBufferInContext(device.context, self.reference)
ssd = CostSSD(device)
self.__checkClose(ssd.cost_from_mixed(study_dev, ref_dev, None), self.expect)
nelm = self.study.shape[0] * self.study.shape[1] * self.study.shape[2]
grad_dev = device.get_rw_buffer(4 * 4 * nelm)
self.__checkClose(ssd.cost_from_mixed(study_dev, ref_dev, grad_dev), self.expect)
grad = np.empty(4 * nelm, dtype=np.float32)
event = cl.enqueue_copy(device.queue, grad, grad_dev)
test_g = np.multiply(np.subtract(self.study, self.reference), np.gradient( self.study))
event.wait()
for z in range(self.nz-1)[1:]:
zi = self.ny * z
for y in range(self.ny-1)[1:]:
yi = (zi + y) * self.nx
for x in range(self.nx -1)[1:]:
xi = (yi + x) * 4
for i in range(3):
self.__checkClose(grad[xi+i], test_g[i][x][y][z])
l = grad[xi+3]
lt = sqrt(grad[xi] * grad[xi] + grad[xi+1] * grad[xi+1] + grad[xi+2] * grad[xi+2])
self.__checkClose(l, lt)
def __checkClose(self, a,b, text, throw=True):
if fabs(b) < 1e-4:
if fabs(a)> 1e-4:
print "FAIL: got ", a , " expect ", b , text
if throw:
self.assertLess(fabs(a), 1e-4)
else:
if fabs((a - b) / b) > 1e-4:
print "FAIL: got ", a , " expect ", b , " Q:", a /b, text
if throw:
self.assertLess(fabs((a - b) / b), 1e-4)
def testOnDevices(self):
(devchoices, devhelp, devices) = get_cl_devices()
self.__run_test_on_device(devices[0])
def ytestOnDevicesWithBuffers(self):
(devchoices, devhelp, devices) = get_cl_devices()
self.__run_test_on_device_with_buffers(devices[1])
def __fill_cos_row(self, length, idx):
shape = [1,1,1]
shape[idx] = length
row = np.empty([length], dtype=np.float32)
for i in range(length):
x = (3.1415926 * i) / length
row[i] = 4.0 * cos(x)
return np.reshape(row, shape)
def __grad(self, ix, iy, iz):
coord = [(3.1415926 * ix) / self.nx, (3.1415926 * iy) / self.ny, (3.1415926 * iz) / self.nz]
sx = -64.0 *sin(coord[0]) * 3.1415926 / self.nx
sy = -64.0 *sin(coord[1]) * 3.1415926 / self.ny
sz = -64.0 *sin(coord[2]) * 3.1415926 / self.nz
c = [ cos(x) for x in coord ]
result = [sx * c[1] * c[2], c[0] * sy * c[2], c[0] * c[1] * sz]
return result
def suite():
suite = unittest.makeSuite(TestSSD, 'test')
return suite
if __name__ == '__main__':
unittest.main(defaultTest='suite')