123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706 |
- import string
- import types
- try:
- import numpy
- except ImportError:
- numpy = None
- ####################
- ## Give an error if the counts in a reaction output file do not agree with
- ## those passed in. 'eps' specifies the tolerance with which each value is
- ## tested, and may be a tuple or list with one element for each column in the
- ## file, including the time column. 'header' specifies the header, which must
- ## be present if header is given, and must not be present if header is not
- ## given. 'times_vals' contains the expected counts, as a list of tuples, one
- ## tuple per row, and in each tuple, one floating point (or integer) value for
- ## each column including the time column.
- ##
- ## Example usage:
- ##
- ## assertCounts("output.txt", [(f*1e-6, 5) for f in range(0, 101)])
- ##
- ## Check a count file from a run with a timestep of 1e-6 that ran for
- ## 100 iterations, producing a constant value of 5 on each timestep, and
- ## had no header, as in the following REACTION_DATA_OUTPUT:
- ##
- ## REACTION_DATA_OUTPUT
- ## {
- ## STEP = 1e-6
- ## {5} => "output.txt"
- ## }
- ##
- ####################
- def assertCounts(fname, times_vals, header=None, eps=1e-8):
- try:
- got_contents = open(fname).read()
- except:
- assert False, "Expected reaction output file '%s' was not created" % fname
- # Rend file into tiny pieces
- lines = [l for l in got_contents.split('\n') if l != '']
- # Validate header
- if header != None:
- assert header == lines[0], "In reaction output file '%s', the header is incorrect ('%s' instead of '%s')" % (fname, header, lines[0])
- lines = lines[1:]
- # Validate number of rows
- assert len(lines) == len(times_vals), "In reaction output file '%s', an incorrect number of data rows was found (%d instead of %d)" % (fname, len(lines), len(times_vals))
- # If a single epsilon was given, replicate it into a tuple
- if type(eps) == types.FloatType or type(eps) == types.IntType:
- eps = len(times_vals[0]) * (eps,)
- # Check each row's values
- for row in range(0, len(times_vals)):
- file_row = [float(f.strip()) for f in lines[row].split(' ') if f.strip() != '']
- tmpl_row = times_vals[row]
- # Make sure we don't have more templates for this row than we did for the first row!
- assert len(tmpl_row) == len(eps), "Internal error: For reaction output file '%s', data template row %d has incorrect number of columns (%d instead of %d)" % (fname, row, len(tmpl_row), len(eps))
- # Make sure we don't have more or less data for this row than we expect
- assert len(file_row) == len(eps), "In reaction output file '%s', data row %d has incorrect number of columns (%d instead of %d)" % (fname, row, len(file_row), len(eps))
- # Compare each datum
- for col in range(0, len(file_row)):
- assert abs(file_row[col] - tmpl_row[col]) < eps[col], "In reaction output file '%s', data row %d, column %d differs by more than epsilon from expected (%.15g instead of %.15g, eps=%.15g)" % (fname, row, col, file_row[col], tmpl_row[col], eps[col])
- class RequireCounts:
- def __init__(self, name, times_vals, header=None, eps=None):
- self.name = name
- self.times_vals = times_vals
- self.args = {}
- if header is not None:
- self.args["header"] = header
- if eps is not None:
- self.args["eps"] = eps
- def check(self):
- assertCounts(self.name, self.times_vals, **self.args)
- ####################
- ## Give an error if the counts in a reaction output file are not less than
- ## those passed in. 'eps' specifies the tolerance with which each value is
- ## tested, and may be a tuple or list with one element for each column in the
- ## file, including the time column. 'header' specifies the header, which must
- ## be present if header is given, and must not be present if header is not
- ## given. 'val' contains the count to compare. The first column in the output
- ## file is time column and is excluded from the comparison, all other columns
- ## are compared with 'val'.
- ##
- ##
- ## Example usage:
- ##
- ## assertCountsLessThan("output.txt", 5)
- ##
- ## Check a count file from a run with a timestep of 1e-6 that ran for
- ## 100 iterations, producing a value that is less than 5 on each
- ## timestep, and had no header, as in the following REACTION_DATA_OUTPUT:
- ##
- ## REACTION_DATA_OUTPUT
- ## {
- ## STEP = 1e-6
- ## {3} => "output.txt"
- ## }
- ##
- ####################
- def assertCountsLessThan(fname, val, header=None, eps=1e-8):
- try:
- got_contents = open(fname).read()
- except:
- assert False, "Expected reaction output file '%s' was not created" % fname
- # Rend file into tiny pieces
- lines = [l for l in got_contents.split('\n') if l != '']
- # Validate header
- if header != None:
- assert header == lines[0], "In reaction output file '%s', the header is incorrect ('%s' instead of '%s')" % (fname, header, lines[0])
- lines = lines[1:]
- # If a single epsilon was given, replicate it into a tuple
- # if type(eps) == types.FloatType or type(eps) == types.IntType:
- # eps = len(lines[1]) * (eps,)
- # Check each row's values
- for row in range(0, len(lines)):
- file_row = [float(f.strip()) for f in lines[row].split(' ') if f.strip() != '']
- # Make sure we don't have more templates for this row than we did for the first row!
- # assert len(file_row) == len(eps), "Internal error: For reaction output file '%s', data template row %d has incorrect number of columns (%d instead of %d)" % (fname, row, len(tmpl_row), len(eps))
- # Make sure we don't have more or less data for this row than we expect
- # assert len(file_row) == len(eps), "In reaction output file '%s', data row %d has incorrect number of columns (%d instead of %d)" % (fname, row, len(file_row), len(eps))
- # Compare each datum
- for col in range(1, len(file_row)):
- assert abs(file_row[col] - val) < eps, "In reaction output file '%s', data row %d, column %d is not less than expected (%.15g instead of %.15g, eps=%.15g)" % (fname, row, col, file_row[col], val, eps)
- class RequireCountsLessThan:
- def __init__(self, name, val, header=None, eps=None):
- self.name = name
- self.val = val
- self.args = {}
- if header is not None:
- self.args["header"] = header
- if eps is not None:
- self.args["eps"] = eps
- def check(self):
- assertCountsLessThan(self.name, self.val, **self.args)
- ####################
- ## Check value in the output file. All columns should be positive,
- ## while time column is discarded. 'header' specifies the header, which must
- ## be present if header is given, and must not be present if header is not
- ## given.
- ##
- ## Example usage:
- ##
- ## assertCountsPositive("output.txt")
- ##
- ## Check a count file from a run with a timestep of 1e-6 that ran for
- ## 100 iterations and had no header, as in the following
- ## REACTION_DATA_OUTPUT:
- ##
- ## REACTION_DATA_OUTPUT
- ## {
- ## STEP = 1e-6
- ## {
- ## COUNT[A, box1]:"A",
- ## COUNT[B, box2]:"B",
- ## COUNT[C, box3]:"C"
- ## } => "output.txt"
- ## }
- ##
- ####################
- def assertCountsPositive(fname, header=None):
- try:
- got_contents = open(fname).read()
- except:
- assert False, "Expected reaction output file '%s' was not created" % fname
- # Rend file into tiny pieces
- lines = [l for l in got_contents.split('\n') if l != '']
- # Validate header
- if header != None:
- assert header == lines[0], "In reaction output file '%s', the header is incorrect ('%s' instead of '%s')" % (fname, header, lines[0])
- start_row = 1
- lines = lines[1:]
- else:
- start_row = 0
- # Check each row's values
- for row in range(start_row, len(lines)):
- file_row = [float(f.strip()) for f in lines[row].split(' ') if f.strip() != '']
- # Compare each datum (time column is discarded)
- for col in range(1, len(file_row)):
- assert (file_row[col] > 0), "In reaction output file '%s', data row %d, column %d value %.15g is zero or negative." % (fname, row, col, file_row[col])
- class RequireCountsPositive:
- def __init__(self, name, header=None):
- self.name = name
- self.args = {}
- if header is not None:
- self.args["header"] = header
- def check(self):
- assertCountsPositive(self.name, **self.args)
- ####################
- ## Check value in the output file. All columns should be positive or zero.
- ## while time column is discarded. 'header' specifies the header, which must
- ## be present if header is given, and must not be present if header is not
- ## given.
- ##
- ## Example usage:
- ##
- ## assertCountsPositiveOrZero("output.txt")
- ##
- ## Check a count file from a run with a timestep of 1e-6 that ran for
- ## 100 iterations and had no header, as in the following
- ## REACTION_DATA_OUTPUT:
- ##
- ## REACTION_DATA_OUTPUT
- ## {
- ## STEP = 1e-6
- ## {
- ## COUNT[A, box1]:"A",
- ## COUNT[B, box2]:"B",
- ## COUNT[C, box3]:"C"
- ## } => "output.txt"
- ## }
- ##
- ####################
- def assertCountsPositiveOrZero(fname, header=None):
- try:
- got_contents = open(fname).read()
- except:
- assert False, "Expected reaction output file '%s' was not created" % fname
- # Rend file into tiny pieces
- lines = [l for l in got_contents.split('\n') if l != '']
- # Validate header
- if header != None:
- assert header == lines[0], "In reaction output file '%s', the header is incorrect ('%s' instead of '%s')" % (fname, header, lines[0])
- start_row = 1
- lines = lines[1:]
- else:
- start_row = 0
- # Check each row's values
- for row in range(start_row, len(lines)):
- file_row = [float(f.strip()) for f in lines[row].split(' ') if f.strip() != '']
- # Compare each datum (time column is discarded)
- for col in range(1, len(file_row)):
- assert (file_row[col] >= 0), "In reaction output file '%s', data row %d, column %d value %.15g is negative." % (fname, row, col, file_row[col])
- class RequireCountsPositiveOrZero:
- def __init__(self, name, header=None):
- self.name = name
- self.args = {}
- if header is not None:
- self.args["header"] = header
- def check(self):
- assertCountsPositiveOrZero(self.name, **self.args)
- ####################
- ## Check values in the output file. Some columns should be positive.
- ## Some columns should be zeroes.
- ## The values in column 3 should be equal to the sum of values in
- ## columns 1 and 2 (indexed from 0).
- ## If 'header' is TRUE, we discard the first row in the files under
- ## consideration. Otherwise we consider the first
- ## row also. The time column is always discarded from consideration.
- ##
- ## Example usage:
- ##
- ## assertHitsCrossRelations("A_hits_C_cross.dat", header=None)
- ##
- ## Check a count file from a run that had no header,
- ## as in the following REACTION_DATA_OUTPUT:
- ##
- ## REACTION_DATA_OUTPUT
- ## {
- ## STEP = 1e-6
- ## {
- ## COUNT[A, world.box[r1], FRONT_HITS]: "A_fr_hits",
- ## COUNT[A, world.box[r1], BACK_HITS]: "A_back_hits",
- ## COUNT[A, world.box[r1], ALL_HITS]: "A_all_hits",
- ## COUNT[C, world.box[r1], FRONT_CROSSINGS]: "A_fr_cross",
- ## COUNT[C, world.box[r1], BACK_CROSSINGS]: "A_back_cross",
- ## COUNT[C, world.box[r1], ALL_CROSSINGS]: "A_all_cross"
- ## } => "A_hits_C_cross.dat"
- ## }
- ##
- ####################
- def assertHitsCrossRelations(fname, header=None):
- try:
- get_contents = open(fname).read()
- except:
- assert False, "Expected reaction output file '%s' was not created" % fname
- # Rend files into tiny pieces
- lines = [l for l in get_contents.split('\n') if l != '']
-
- # Validate header
- if header != None:
- assert header == lines[0], "In reaction output file '%s', the header is incorrect ('%s' instead of '%s')" % (fname, header, lines[0])
- lines = lines[1:]
- # Check each row's values
- for row in range(0, len(lines)):
- file_row = [float(f.strip()) for f in lines[row].split(' ') if f.strip() != '']
- # Compare each datum
- assert (file_row[2] == 0), "In reaction output file '%s', data row %d, column 2 value %d is nonzero." % (fname, row, file_row[2])
- assert (file_row[3] == file_row[1] + file_row[2]), "In reaction output file '%s', data row %d, column 3 is not equal to the sum of columns 1 and 2." % (fname, row)
- if(row > 1):
- assert (file_row[1] > 0), "In reaction output file '%s', data row %d, column 1 value %d is zero." % (fname, row, file_row[1])
- assert (file_row[4] > 0), "In reaction output file '%s', data row %d, column 4 value %d is zero." % (fname, row, file_row[4])
- assert (file_row[5] > 0), "In reaction output file '%s', data row %d, column 5 value %d is zero." % (fname, row, file_row[5])
- assert (file_row[6] > 0), "In reaction output file '%s', data row %d, column 6 value %d is zero." % (fname, row, file_row[6])
-
- class RequireHitsCrossRelations:
- def __init__(self, name, header=None):
- self.name = name
- self.args = {}
- if header is not None:
- self.args["header"] = header
- def check(self):
- assertHitsCrossRelations(self.name, **self.args)
- ####################
- ## Check numeric constraints on reaction data counts.
- ##
- ## What this checks is a linear constraint between the various columns in a
- ## particular row of reaction data output. For instance, if the simulation has a
- ## molecule which undergoes a reversible unimolecular reaction, and is
- ## otherwise inert, and the simulation releases 900 molecules divided between
- ## the two types at the beginning, and writes their counts to the first two
- ## columns of a reaction output file, the constraint 'C1 + C2 == 900' could be
- ## checked.
- ##
- ## Example usage:
- ##
- ## assertCountConstraints("output.txt", constraints=[(1, 1)], totals=[900])
- ##
- ## Essentially, this checks if the product of the constraints matrix with each
- ## row vector in the output file (excluding the time column) is equal to the
- ## totals vector. In the simple case, constraints is a vector, and totals is a
- ## scalar. min_time and max_time may also be specified to limit the comparison
- ## to a certain time interval.
- ####################
- def assertCountConstraints(fname, constraints=None, totals=None, min_time=None, max_time=None, header=None, num_vals=None, minimums=None, maximums=None):
- try:
- file = open(fname)
- except:
- assert False, "Expected reaction output file '%s' was not created" % fname
- try:
- # Validate header
- if header is not None and header != False:
- got_header = file.readline()
- if header == True:
- assert got_header != '', "In reaction output file '%s', expected at least a header, but none was found" % fname
- else:
- assert got_header.strip() == header, "In reaction output file '%s', the header is incorrect (expected '%s', got '%s')" % (fname, header, got_header.strip())
-
- # Validate constraints
- if constraints is not None or totals != None:
- got_vals = 0
- if numpy is not None:
- if len(constraints) != 0:
- m_constraint = numpy.transpose(numpy.matrix(constraints))
- else:
- m_constraint = None
- for line in file:
- cols = line.strip().split()
- time = float(cols[0])
- if min_time != None and min_time > time:
- continue
- if max_time != None and max_time < time:
- break
- got_vals += 1
- counts = [int(x) for x in cols[1:]]
- if constraints == None:
- constraints = [(1,) * len(counts)]
- if totals == None:
- totals = (0,) * len(constraints)
- if minimums == None:
- minimums = (0,) * len(counts)
- if maximums == None:
- maximums = (1e300,) * len(counts)
- for c in range(0, len(counts)):
- assert counts[c] >= minimums[c], "In reaction output file '%s', at time %s, value (%g) is less than minimum value (%g)" % (fname, cols[0], counts[c], minimums[c])
- assert counts[c] <= maximums[c], "In reaction output file '%s', at time %s, value (%g) is greater than maximum value (%g)" % (fname, cols[0], counts[c], maximums[c])
- # XXX: We might be able to do this faster as a single matrix multiply
- if numpy is not None:
- if m_constraint is not None:
- m_count = numpy.matrix(counts)
- m_result = m_count * m_constraint - totals
- assert (m_result == 0).all(), "In reaction output file '%s', at time %s, a constraint is not satisfied" % (fname, cols[0])
- else:
- for c in range(0, len(constraints)):
- val = sum(map(lambda x, y: x*y, constraints[c], counts))
- assert val == totals[c], "In reaction output file '%s', at time %s, constraint %s*%s == %d is not satisfied" % (fname, cols[0], str(constraints[c]), str(counts), totals[c])
- if num_vals is not None:
- assert num_vals == got_vals, "In reaction output file '%s', expected %d rows within the selected time window, but only found %d" % (fname, num_vals, got_vals)
- finally:
- file.close()
- class RequireCountConstraints:
- def __init__(self, name, constraints=None, totals=None, min_time=None, max_time=None, header=None, num_vals=None, minimums=None, maximums=None):
- self.name = name
- self.args = {}
- if constraints is not None:
- self.args["constraints"] = constraints
- if totals is not None:
- self.args["totals"] = totals
- if min_time is not None:
- self.args["min_time"] = min_time
- if max_time is not None:
- self.args["max_time"] = max_time
- if header is not None:
- self.args["header"] = header
- if num_vals is not None:
- self.args["num_vals"] = num_vals
- if minimums is not None:
- self.args["minimums"] = minimums
- if maximums is not None:
- self.args["maximums"] = maximums
- def check(self):
- assertCountConstraints(self.name, **self.args)
- ####################
- ## Check numeric equilibrium of reaction data counts.
- ##
- ## This check will average a particular range of rows from a reaction output
- ## file and compare the computed mean to an expected mean, with a given tolerance.
- ##
- ## TODO: add a check for the std dev?
- ##
- ## Example usage:
- ##
- ## assertCountEquilibrium("output.txt", values=[590.5], tolerances=[5.0], min_time=1e-3)
- ##
- ####################
- def assertCountEquilibrium(fname, values=None, tolerances=None, min_time=None, max_time=None, header=None, num_vals=None):
- try:
- file = open(fname)
- except:
- assert False, "Expected reaction output file '%s' was not created" % fname
- try:
- # Validate header
- if header is not None and header != False:
- got_header = file.readline()
- if header == True:
- assert got_header != '', "In reaction output file '%s', expected at least a header, but none was found" % fname
- else:
- assert got_header.strip() == header, "In reaction output file '%s', the header is incorrect (expected '%s', got '%s')" % (fname, header, got_header.strip())
- # account for each line
- if values is not None and tolerances is not None:
- sums = list((0.0,) * len(values))
- count = 0
- for line in file:
- cols = [x.strip() for x in line.split()]
- if min_time is not None and float(cols[0]) < min_time:
- continue
- if max_time is not None and float(cols[0]) > max_time:
- continue
- data = [float(x) for x in cols[1:]]
- assert len(sums) <= len(data), "In reaction output file '%s', expected at least %d columns, but found only %d" % (fname, len(sums) + 1, len(data) + 1)
- for i in range(0, len(sums)):
- sums[i] += data[i]
- count += 1
-
- # Compute the mean
- if num_vals is not None:
- assert count == num_vals, "In reaction output file '%s', expected %d matching data rows, but found %d" % (fname, num_vals, count)
- assert count != 0, "In reaction output file '%s', found no valid data rows" % fname
- avgs = [x / float(count) for x in sums]
- for i in range(0, len(avgs)):
- assert avgs[i] >= values[i] - tolerances[i] and avgs[i] <= values[i] + tolerances[i], "In reaction output file '%s', expected column %d to have a mean between %.15g and %.15g, but got a mean of %.15g" % (fname, i+1, values[i] - tolerances[i], values[i] + tolerances[i], avgs[i])
- finally:
- file.close()
-
- class RequireCountEquilibrium:
- def __init__(self, name, values, tolerances, min_time=None, max_time=None, header=None, num_vals=None):
- self.name = name
- self.values = values
- self.tolerances = tolerances
- self.args = {}
- if min_time is not None:
- self.args["min_time"] = min_time
- if max_time is not None:
- self.args["max_time"] = max_time
- if header is not None:
- self.args["header"] = header
- if num_vals is not None:
- self.args["num_vals"] = num_vals
- def check(self):
- assertCountEquilibrium(self.name, self.values, self.tolerances, **self.args)
- ####################
- ## Check numeric rates of occurrence of reactions.
- ##
- ## This check will check that the average rate of occurrence of a counted
- ## reaction in a count file is within some tolerance of a specified rate.
- ##
- ## Essentially, the computation we are doing is:
- ##
- ## count_column / (time_column - base_time)
- ##
- ## which should be within "tolerance" of the specified value at all times.
- ##
- ## Example usage:
- ##
- ## assertCountRxnRate("output.txt", values=[0.1], tolerances=[0.15], min_time=1e-3, base_time=0.0)
- ##
- ####################
- def assertCountRxnRate(fname, values, tolerances, min_time=None, max_time=None, base_time=0.0, header=None):
- try:
- file = open(fname)
- except:
- assert False, "Expected reaction output file '%s' was not created" % fname
- try:
- # Validate header
- if header is not None and header != False:
- got_header = file.readline()
- if header == True:
- assert got_header != '', "In reaction output file '%s', expected at least a header, but none was found" % fname
- else:
- assert got_header.strip() == header, "In reaction output file '%s', the header is incorrect (expected '%s', got '%s')" % (fname, header, got_header.strip())
- # account for each line
- for line in file:
- cols = [x.strip() for x in line.split()]
- this_time = float(cols[0])
- if min_time is not None and this_time < min_time:
- continue
- if max_time is not None and this_time > max_time:
- continue
- data = [float(x) / (this_time - base_time) for x in cols[1:]]
- assert len(values) <= len(data), "In reaction output file '%s', expected at least %d columns, but found only %d" % (fname, len(values) + 1, len(data) + 1)
- for i in range(len(data)):
- assert data[i] >= values[i] - tolerances[i] and data[i] <= values[i] + tolerances[i], "In reaction output file '%s', at time %g, value %g in column %d is outside of tolerance" % (fname, this_time, data[i], i+1)
- finally:
- file.close()
-
- class RequireCountRxnRate:
- def __init__(self, name, values, tolerances, min_time=None, max_time=None, base_time=None, header=None):
- self.name = name
- self.values = values
- self.tolerances = tolerances
- self.args = {}
- if min_time is not None:
- self.args["min_time"] = min_time
- if max_time is not None:
- self.args["max_time"] = max_time
- if base_time is not None:
- self.args["base_time"] = base_time
- if header is not None:
- self.args["header"] = header
- def check(self):
- assertCountRxnRate(self.name, self.values, self.tolerances, **self.args)
- ####################
- ## Give an error if the specified trigger file is invalid, according to the
- ## specified parameters.
- ##
- ## Parameters:
- ## fname - the filename to check
- ## data_cols - number of data columns (0 for reaction, 1 for hits, 2
- ## for mol counts)
- ## exact_time - True if exact time is enabled, false otherwise
- ## header - text of header to expect, or None to expect no header
- ## event_titles - list of event titles (trigger labels), or None to expect
- ## all trigger labels to be empty. Any trigger whose label
- ## does not match at least one of the items in the list is
- ## considered an error.
- ## itertime - length of an iteration for checking the exact time
- ## against the iteration time, if exact_time == True
- ## [xyz]range - 2-tuples giving the min and max value for x, y, or z, or
- ## None to skip checking the trigger location
- ##
- ## Example usage:
- ##
- ## assertValidTriggerOutput("triggers.txt", 2, True, xrange=(-1, 1), yrange=(-1, 1), zrange=(-1, 1))
- ##
- ## Check a molecule count trigger file with an exact time column,
- ## assuming the default time step of 1e-6, and assuming that the
- ## molecule triggers are restricted to a 2x2x2 micron box centered at
- ## the origin. All events should be unlabelled.
- ####################
- def assertValidTriggerOutput(fname, data_cols, exact_time=True, header=None, event_titles=None, itertime=1e-6, xrange=None, yrange=None, zrange=None):
- try:
- got_contents = open(fname).read()
- except:
- assert False, "Expected trigger output file '%s' was not created" % fname
- # Rend file into tiny pieces
- lines = [l for l in got_contents.split('\n') if l != '']
- # Validate header
- if header != None:
- assert header == lines[0], "In trigger output file '%s', the header is incorrect ('%s' instead of '%s')" % (fname, header, lines[0])
- lines = lines[1:]
- # Compute column offsets
- total_cols = 1 + 3 + data_cols
- first_data = 4
- location = 1
- # Insert exact time column, if appropriate
- if exact_time:
- total_cols += 1
- first_data += 1
- location += 1
- # Process each row
- for row in range(0, len(lines)):
- # Tokenize row
- cols = lines[row].split(' ', total_cols)
- # Check column count
- assert len(cols) >= total_cols, "In trigger output file '%s', output row %d has incorrect number of columns (%d instead of %d)" % (fname, row, len(cols), total_cols)
- # Validate trigger label
- if event_titles is None:
- assert cols[-1] == '', "In trigger output file '%s', output row %d has an unexpected event label ('%s' instead of nothing)" % (fname, row, cols[-1])
- else:
- assert event_titles.count(cols[-1]) > 0, "In trigger output file '%s', output row %d has an unexpected event label ('%s' instead of one of {%s})" % (fname, row, cols[-1], string.join(event_titles, ','))
- # Validate exact time column
- if exact_time:
- assert float(cols[0]) <= float(cols[1]), "In trigger output file '%s', row %d, exact time precedes iteration time (%s versus %s)" % (fname, row, cols[1], cols[0])
- assert float(cols[0]) + itertime > float(cols[1]), "In trigger output file '%s', row %d, exact time doesn't match given iteration time (%s versus %s-%.15g)" % (fname, row, cols[1], cols[0], float(cols[0]) + itertime)
- # Validate data columns
- if data_cols > 0:
- assert cols[first_data] == "0" or cols[first_data] == "-1" or cols[first_data] == "1", "In trigger output file '%s', row %d, an invalid orientation was found (%s, rather than one of {-1,0,1})" % (fname, row, cols[first_data])
- if data_cols > 1:
- assert cols[first_data+1] == "-1" or cols[first_data+1] == "1", "In trigger output file '%s', row %d, an invalid count was found (%s, rather than one of {-1,1})" % (fname, row, cols[first_data+1])
- # Validate location
- if xrange != None:
- x = float(cols[location])
- assert x >= xrange[0] and x <= xrange[1], "In trigger output file '%s', row %d, an out-of-bounds event was found (x=%.15g, instead of %.15g ... %.15g)" % (fname, row, x, xrange[0], xrange[1])
- if yrange != None:
- y = float(cols[location])
- assert y >= yrange[0] and y <= yrange[1], "In trigger output file '%s', row %d, an out-of-bounds event was found (y=%.15g, instead of %.15g ... %.15g)" % (fname, row, y, yrange[0], yrange[1])
- if zrange != None:
- z = float(cols[location])
- assert z >= zrange[0] and z <= zrange[1], "In trigger output file '%s', row %d, an out-of-bounds event was found (z=%.15g, instead of %.15g ... %.15g)" % (fname, row, z, zrange[0], zrange[1])
- class RequireValidTriggerOutput:
- def __init__(self, name, data_cols, exact_time=False, header=None, event_titles=None, itertime=None, xrange=None, yrange=None, zrange=None):
- self.name = name
- self.data_cols = data_cols
- self.args = {}
- if exact_time is not None:
- self.args["exact_time"] = exact_time
- if header is not None:
- self.args["header"] = header
- if event_titles is not None:
- self.args["event_titles"] = event_titles
- if itertime is not None:
- self.args["itertime"] = itertime
- if xrange is not None:
- self.args["xrange"] = xrange
- if yrange is not None:
- self.args["yrange"] = yrange
- if zrange is not None:
- self.args["zrange"] = zrange
- def check(self):
- assertValidTriggerOutput(self.name, self.data_cols, **self.args)
|