# ALMA - Atacama Large Millimeter Array
# (c) European Southern Observatory, 2025
# (c) Associated Universities Inc., 2025
# Copyright by ESO (in the framework of the ALMA collaboration),
# Copyright by AUI (in the framework of the ALMA collaboration),
# All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY, without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
# File BDFReader.py
#
import sys
import re
import os
from enum import Enum, auto
import codecs
from xml.dom import minidom
import numpy as np
from .BDFHeader import BDFHeader
from pyasdm.exceptions.BDFReaderException import BDFReaderException
[docs]class BDFReader:
"""
A class to read the contents of a BDF file in a sequential way, as a stream of bytes.
Methods are provided to return the different parts (actual times,
actual durations, cross data, auto data, zero lags, and flags) recorded
at each inegration. Methods are also provided to return the meta information relevant
to the entire file as well as to the individual integrations.
This code is based on the c++ class of the same name as well as the python
code in pyBDFExplorer.py (which is also based on the c++ class).
"""
# possible states
class _States(Enum):
S_NO_BDF = auto()
S_AT_BEGINNING = auto()
S_READING = auto()
S_AT_END = auto()
# possible transitions
class _Transitions(Enum):
T_OPEN = auto()
T_QUERY = auto()
T_TEST_END = auto()
T_READ = auto()
T_READ_NEXT = auto()
T_READ_ALL = auto()
T_CLOSE = auto()
_currentState = _States.S_NO_BDF
def _getCurrentState(self):
"""
Returns a string representation of the current state.
This is useful to enhance the raised exceptions by adding the current state to
the message.
"""
result = "Unknown, this should NEVER happen."
if self._currentState == self._States.S_NO_BDF:
result = "unopened, no BDF available"
elif self._currentState == self._States.S_AT_BEGINNING:
result = "at the beginning of the BDF"
elif self._currentState == self._States.S_READING:
result = "reading the BDF"
elif self._currentState == self._States.S_AT_END:
result = "at the end of the BDF"
return result
def _checkState(self, transition, methodName):
"""
check that the transition requestioned by methodName is allowed given the current state.
The _currentState value (a _States enum value) limits what transitions are allowed.
This method checks that a transition is allowed. The calling method name is provided so
that if a transition is not allowed that the raised exception has a more useful message.
raises a BDFReaderException if the transition is not allowed.
Parameters
----------
transition : _Transitions
A _Transitions enum value, the transition to check.
methodName : str
The name of the calling method, to be used when raising an exception for an illegal transition.
Raises
------
BDFReaderException :
When the transition is not allowed by for the _currentState.
"""
# return when the transition is allowed by the current state
if self._currentState == self._States.S_NO_BDF:
if transition == self._Transitions.T_OPEN:
return
elif self._currentState == self._States.S_AT_BEGINNING:
if transition in (
self._Transitions.T_QUERY,
self._Transitions.T_TEST_END,
self._Transitions.T_READ,
self._Transitions.T_READ_NEXT,
self._Transitions.T_READ_ALL,
self._Transitions.T_CLOSE,
):
return
elif self._currentState == self._States.S_READING:
if transition in (
self._Transitions.T_TEST_END,
self._Transitions.T_READ,
self._Transitions.T_READ_NEXT,
self._Transitions.T_READ_ALL,
self._Transitions.T_QUERY,
self._Transitions.T_CLOSE,
):
return
elif self._currentState == self._States.S_AT_END:
if transition in (
self._Transitions.T_TEST_END,
self._Transitions.T_QUERY,
self._Transitions.T_READ_NEXT,
self._Transitions.T_READ_ALL,
self._Transitions.T_CLOSE,
):
return
# end of if/elif on _currentState
# any other combination of transition and current state raises this exception
raise BDFReaderException(
"Invalid call of method '"
+ methodName
+ "' in the current context: "
+ self._getCurrentState()
+ "."
)
[docs] def reset(self):
"""
Reinitializes this instance to it's initial state.
An open file is closed an all internal values are set to their initial state.
"""
if self._f is not None:
if not self._f.closed:
self._f.close()
self._f = None
self._currentState = self._States.S_NO_BDF
self._boundary_1 = (
None # the string indicating the top level boundary definition
)
self._currentLine = None # the line most recently read by _nextLine
self._integrationIndex = -1 # no integration read yet
self._integrationStartsAt = -1
self._path = None # the path that is opened, when opened
self._sdmDataHeaderDOM = (
None # the initial (main) BDF data header as a DOM instance
)
self._bdfHeaderData = (
BDFHeader()
) # initialized from the sdmDataHeader XML via a DOM
def __init__(self):
"""
Initialize internal values.
This instance is not yet connected to a BDF.
"""
self._path = None # the path to the BDF
self._f = None # the opened file object
self.reset()
[docs] def open(self, bdfPath):
"""
Open a file expected to contain BDF data, read and parse the global header.
On exit the first block of data (subscan, integration, or subintegration) is
ready to be read by the getData method.
Parameters
----------
bdfPath : str or MainRow
The path to the BDF file.
Raises
------
BDFReaderException :
When a file has previously been opened. Use 'close' or 'reset' to re-use this instance.
"""
# only allowed if not already open
self._checkState(self._Transitions.T_OPEN, "open")
self._sdmDataHeaderDOM = (
None # the initial (main) BDF data header as a DOM instance
)
self._path = bdfPath
try:
self._f = open(self._path, mode="rb")
except Exception as exc:
raise BDFReaderException(
"Error while opening '"
+ self._path
+ "'. The exception was "
+ str(exc)
) from None
# I don't think this can happen without throwing an exception
if self._f is None:
raise BDFReaderException(
"Unable to open '" + self._path + "'. No exception was raised."
)
self._boundary_1 = self._requireMIMEHeader()
self._requireSDMDataHeaderMIMEPart()
self._integrationIndex = -1 # no integration read yet
self._currentState = self._States.S_AT_BEGINNING
[docs] def getPath(self):
"""
Return the path of the currently opened file. Returns None if no path is opened.
"""
return self._path
[docs] def position(self):
"""
Returns the current position in bytes of the BDF file.
"""
if self._f is None:
return None
return self._f.tell()
[docs] def close(self):
"""
Equivalent to reset, closes any opened file and sets the internal values to
their initial state.
"""
self.reset()
[docs] def currentIntegrationIndex(self):
"""
Return the index of the current block of data (subscan, integration, or subintegration).
This is the integration index of the block of data most recently returned by getSubset
The indexing is 0 based.
"""
self._checkState(self._Transitions.T_QUERY, "currentIntegrationIndex")
return self._integrationIndex
[docs] def currentIntegrationStartsAt(self):
"""
Return the position in bytes in the file of the current block of data (subscan, integration, or subintegration).
This is the position of the block of data most recently returned by getSubset
"""
self._checkState(self._Transitions.T_QUERY, "currentIntegrationIndex")
return self._integrationStartsAt
[docs] def title(self):
"""
Return the title found in the primary BDF header
"""
self._checkState(self._Transitions.T_QUERY, "title")
return self._bdfHeaderData.getTitle()
[docs] def byteOrder(self):
"""
Return the byte order found in the primary BDF header.
Returns
-------
A ByteOrder instance set using the byteOrder found in the primary BDF header.
"""
self._checkState(self._Transitions.T_QUERY, "byteOrder")
return self._bdfHeaderData.getByteOrder()
[docs] def startTime(self):
"""
Return the start time found in the primary BDF header.
Returns
-------
An int holding the startTime value (nanoseconds) found in the primary BDF header.
"""
self._checkState(self._Transitions.T_QUERY, "startTime")
return self._bdfHeaderData.getStartTime()
[docs] def numTime(self):
"""
Return the numTime value found in the primary BDF header.
This is only ever non-zero for packed data.
Returns
-------
An int holding the numTime value found in the primary BDF header.
"""
self._checkState(self._Transitions.T_QUERY, "numTime")
return self._bdfHeaderData.getNumTime()
[docs] def dataOID(self):
"""
Return the dataOID value found in the primary BDF header.
"""
self._checkState(self._Transitions.T_QUERY, "dataOID")
return self._bdfHeaderData.getDataOID()
[docs] def execBlockUID(self):
"""
Return the UID of the ExecBlock found in the primary BDF header.
"""
self._checkState(self._Transitions.T_QUERY, "execBlockUID")
return self._bdfHeaderData.getExecBlockUID()
[docs] def execBlockNum(self):
"""
Return the number of the ExecBlock found in the projectPath in the primary BDF header.
"""
self._checkState(self._Transitions.T_QUERY, "execBlockNum")
return self._bdfHeaderData.getExecBlockNum()
[docs] def scanNum(self):
"""
Return the number of the scan found in the projectPath in the primary BDF header.
"""
self._checkState(self._Transitions.T_QUERY, "scanNum")
return self._bdfHeaderData.getScanNum()
[docs] def subscanNum(self):
"""
Return the number of the subscan found in the projectPath in the primary BDF header.
"""
self._checkState(self._Transitions.T_QUERY, "subscanNum")
return self._bdfHeaderData.getSubscanNum()
[docs] def numAntenna(self):
"""
Return the number of antenna found in the projectPath in the primary BDF header.
"""
self._checkState(self._Transitions.T_QUERY, "numAntenna")
return self._bdfHeaderData.getNumAntenna()
[docs] def correlationMode(self):
"""
Return the correlation mode found in the primary BDF header.
Returns a CorrelationMode enumeration instance.
"""
self._checkState(self._Transitions.T_QUERY, "correlationMode")
return self._bdfHeaderData.getCorrelationMode()
[docs] def spectralResolutionType(self):
"""
Return the spectral resolution type found in the primary BDF header.
spectralResolutionType is optional in a BDF header. The return
value is None if that value is not found.
Returns a SpectralResolutionType enumeration instance or None if
no spectralResolutionType value is found (non-correlator data).
"""
self._checkState(self._Transitions.T_QUERY, "spectralResolutionType")
return self._bdfHeaderData.getSpectralResolutionType()
[docs] def processorType(self):
"""
Return the processor type found in the primary BDF header.
Returns a ProcessorType enumeration instance
"""
self._checkState(self._Transitions.T_QUERY, "processorType")
return self._bdfHeaderData.getProcessorType()
[docs] def correlatorType(self):
"""
Return the correlator type found in the primary BDF header.
Returns a CorrelatorType enumeration instance
"""
self._checkState(self._Transitions.T_QUERY, "correlatorType")
return self._bdfHeaderData.correlatorType()
[docs] def hasPackedData(self):
"""
True if the binary data are all packed in one subset covering multipled times.
Returns a boolean.
"""
self._checkState(self._Transitions.T_QUERY, "hasPackedData")
return self._bdfHeaderData.hasPackedData()
[docs] def isTP(self):
"""
True if this is total power data.
Returns a boolean.
"""
self._checkState(self._Transitions.T_QUERY, "isTP")
return self._bdfHeaderData.isTP()
[docs] def isWVR(self):
"""
True if this is WVR data.
Returns a boolean.
"""
self._checkState(self._Transitions.T_QUERY, "isWVR")
return self._bdfHeaderData.isWVR()
[docs] def isCorrelation(self):
"""
True if the processorType is CORRELATOR
Returns a boolean.
"""
self._checkState(self._Transitions.T_QUERY, "isCorrelation")
return self._bdfHeaderData.isCorrelation()
[docs] def hasSubset(self):
"""
Returns True if and only if there are still data to read in the BDF file.
"""
self._checkState(self._Transitions.T_TEST_END, "hasSubset")
atEnd = self._currentLine == (b"--" + self._boundary_1 + b"--")
if atEnd:
self._currentState = self._States.S_AT_END
return not atEnd
[docs] def getSubset(self):
"""
Returns an SDM Data Subset (one integration) as a dictionary.
This reads the next subset found at the current location in the file.
"""
self._checkState(self._Transitions.T_READ, "getSubset")
self._integrationIndex += 1
sdmSubset = self._requireSDMDataSubsetMIMEPart()
# this line isn't used, but the file should be advanced to the next line
line = self._nextLine()
self._currentState = self._States.S_READING
return sdmSubset
# the c++ code offers nextSubsets and allRemainingSubsets methods that return
# a reference to a vector of subsets. Here, that would be implemented by
# repeated calls to getSubset until hasSubset is False or (for nextSubsets)
# the requested number of subsets had been read, with the result returned as
# a list. There's minimal benefit over simply calling getSubset as needed so
# they are not implemented here. That is where the T_READ_NEXT and T_READ_ALL
# transitions are used in the c++ code.
def _nextLine(self):
"""
A method which reads the next line of text and returns it.
If the line ends in a newline, it is stripped before the line is returned.
This method is only for internal use.
"""
line = self._f.readline()
# line is bytes type, comparision with "/n" or b"/n" doesn't work, but endswith does
if line.endswith(b"\n"):
line = line[:-1]
self._currentLine = line
return line
def _headerField2Pair(self, hf):
"""
A method which decomposes a MIME header into a (name, value) pair and returns that pair.
Leading and trailing whitespace is removed from name and value. A colon separates the name from the value.
This method is only for internal use.
"""
name = None
value = None
colonIndex = hf.find(b":")
if colonIndex == -1:
raise BDFReaderException(
"could not detect a well formed MIME header field in '" + str(hf) + "'"
)
if colonIndex > 0:
name = hf[:colonIndex]
name = name.strip()
if colonIndex < len(hf) - 1:
value = hf[colonIndex + 1 :]
value = value.strip()
# null values are OK
return (name, value)
def _requireHeaderField(self, hf):
"""
A method which consumes a MIME header and returns the (name, value) pair it has found in that header.
This method is only for internal use.
"""
name, value = self._headerField2Pair(self._nextLine())
if name.upper() != codecs.encode(hf, "utf-8"):
raise BDFReaderException(
"Did not find expected field '"
+ hf
+ "' in '"
+ self._currentLine.decode("utf-8")
+ "'."
)
return name, value
def _unquote(self, s):
"""
A utility method which returns an unquoted version of a quoted string (leading and trailing quotes are removed)
s is assumed to b a bytes type
This method is only for internal use.
"""
if len(s) < 2:
return s
if (s[0] == ord('"') and s[-1] == ord('"')) or (
s[0] == ord("'") and s[-1] == ord("'")
):
return s[1:-1]
return s
def _skipUntilEmptyLine(self, maxSkips):
"""
A method which skips all the lines read from the current position until it finds an
empty line or it reaches the maximum number of skips.
This method is only for inernal use.
"""
numSkip = 0
line = self._nextLine()
while len(line) > 0 and numSkip <= maxSkips:
line = self._nextLine()
numSkip += 1
if numSkip > maxSkips:
raise BDFReaderException(
"could not find an empty line in less than " + str(maxSkips + 1)
<< " lines."
)
def _accumulateUntilBoundary(self, boundary, maxLines):
"""
A method which accumulates all of the lines of text until it reaches a
boundary line whose value is equal to the parameter 'boundary' or
until it reaches a number of read lines equal to maxLines.
Returns the lines ready in one value (bytes type).
This method is only for inernal use.
"""
numLines = 0
line = self._nextLine()
result = b""
while numLines <= maxLines and line.find(b"--" + boundary) != 0:
result += line
numLines += 1
line = self._nextLine()
if numLines > maxLines:
raise BDFReaderException(
"could not find the boundary string '"
+ boundary.decode("utf-8")
+ "' in less than "
+ str(maxLines + 1)
+ " lines."
)
return result
def _requireBoundary(self, boundary, maxLines):
"""
A method which reads lines of text until it finds a boundary line whose value is
equal to the parameter 'boundary' or until it reaches a number of read lins
equal to maxLines.
Returns nothing.
This method is only for internal use.
"""
numLines = 0
line = self._nextLine()
while numLines <= maxLines and line.find(b"--" + boundary) != 0:
numLines += 1
line = self._nextLine()
if numLines > maxLines:
raise BDFReaderException(
"could not find the boundary string '"
+ boundary.decode("utf-8")
+ "' in less than "
+ str(maxLines + 1)
+ " lines."
)
def _requireBoundaryInCT(self, ctValue):
"""
A method which looks for the BOUNDARY definition in a CONTENT-TYPE MIME header and returns
the unquoted version of that definition.
This method is only for internal use.
"""
cvValueItems = [item.strip() for item in ctValue.split(b";")]
cvValueItemsNameValue = [item.partition(b"=") for item in cvValueItems]
boundaryValues = [
item[2]
for item in cvValueItemsNameValue
if item[0].upper() == b"BOUNDARY" and item[2] != ""
]
if boundaryValues == []:
raise BDFReaderException(
"count not found a boundary definition in '"
+ ctValue.decode("utf-8")
+ "'."
)
else:
return self._unquote(boundaryValues[0])
def _requireMIMEHeader(self):
"""
A method which consumes the toplevel MIME header present in the BDF.
It returns the toplevel BOUNDARY definition.
This method is only for internal use.
"""
# "MIME-VERSION: 1.0"
line = self._nextLine()
name, value = self._headerField2Pair(line)
if not line.endswith(b"IME-Version: 1.0"):
raise BDFReaderException(
"'MIME-VERSION: 1.0' missing at the very beginning of the file '"
+ self._path
+ "'."
)
# CONTENT-TYPE
name, value = self._requireHeaderField("CONTENT-TYPE")
# extract level 1 boundary from value
self._boundary_1 = self._requireBoundaryInCT(value)
# CONTENT-DESCRIPTION
name, value = self._requireHeaderField("CONTENT-DESCRIPTION")
# CONTENT_LOCATION
name, value = self._requireHeaderField("CONTENT-LOCATION")
# skip until an empty line is found, skipping at most 10 lines
self._skipUntilEmptyLine(10)
return self._boundary_1
def _requireSDMDataHeaderMIMEPart(self):
"""
Reads the 1st part (in the MIME meaning) of the BDF, extracts its content,
which is an XML document known as the SDM data header, and parses it into
a DOM document.
Returns nothing.
This method is only for internal use.
"""
# Requires the presence of boundary_1
self._requireBoundary(self._boundary_1, 0)
# ignore header fields (do not require these)
# CONTENT-TYPE, CONTENT-TRANSFER-ENCODING, CONTENT-LOCATION
# look for an empty line at most 10 lines from the the current location
self._skipUntilEmptyLine(10)
# extract the header, everything until the next boundary but at most 100 lines
sdmDataHeader = self._accumulateUntilBoundary(self._boundary_1, 100)
# parse the XML into a DOM document
dom = minidom.parseString(sdmDataHeader)
self._sdmDataHeaderDOM = dom
# the primary data header (sdmDataHeader element)
try:
dataHeaderElem = dom.getElementsByTagName("sdmDataHeader")[0]
self._bdfHeaderData.fromDOM(dataHeaderElem)
except Exception as exc:
import traceback
traceback.print_exc()
raise BDFReaderException(
"Unexpected exception while parsing the main BDF header: '"
+ str(exc)
+ "."
) from None
# it's useful to construct a dict of the sizes of the binary parts which have non-zero size
self._binaryPartSize = {}
for partName in self._bdfHeaderData.getBinaryTypes():
if self._bdfHeaderData.hasBinary(partName):
self._binaryPartSize[partName] = self._bdfHeaderData.getSize(partName)
def _skipAsLongAsLineStartsWith(self, startStr):
"""
Skip so long as the current line starts with startStr (as a bytes type).
This method name comes from c++ and is confusing. It's intended to be used for
the case where there are 2 successive MIME boundary occurrences (CAS-8151).
It skips ahead using nextline until the currentLine does not start with that
string of characters. Then it sets the position in the file to the start of
the previous line read, without reading that line. So, currentLine will reflect
the line that was just read but nextLine will re-read that same line.
This returns nothing
"""
curpos = 0
while self._currentLine.find(startStr) == 0:
curpos = self.position()
self._nextLine()
self._setPosition(curpos)
def _requireSDMDataSubsetMIMEPart(self):
self._integrationStartsAt = self.position()
# CAS-8151, apparently there are cases where there are two occurrences of the MIME bouneary instead of only one
self._skipAsLongAsLineStartsWith(b"--" + self._boundary_1)
# CONTENT-TYPE
name, value = self._requireHeaderField("CONTENT-TYPE")
# that gives boundary_2
self._boundary_2 = self._requireBoundaryInCT(value)
# CONTENT-DESCRIPTION
name, value = self._requireHeaderField("CONTENT-DESCRIPTION")
# and move to boundary_2, must be within the next 10 lines
self._requireBoundary(self._boundary_2, 10)
# and skip until the empty line is reached, or 10 lines
self._skipUntilEmptyLine(10)
# read the SDM Data Subset header, assume it can't be more than 100 lines
sdmDataSubsetHeader = self._accumulateUntilBoundary(self._boundary_2, 100)
sdmDataSubsetHeaderDOM = minidom.parseString(sdmDataSubsetHeader)
# it's all in the firstChild node
sdmDataSubsetHeaderDOM = sdmDataSubsetHeaderDOM.firstChild
if sdmDataSubsetHeaderDOM.nodeName != "sdmDataSubsetHeader":
raise BDFReaderException(
"Unexpected XML node found where sdmDataSubsetHeader is expected.: %s"
% sdmDataSubsetHeaderDOM.nodeName
)
projectPath = sdmDataSubsetHeaderDOM.attributes.getNamedItem(
"projectPath"
).value
projectPathParts = projectPath.split("/")
# drop the last part if projectPath ends in a "/"
if projectPath.endswith('/'):
projectPathParts = projectPathParts[:-1]
numPathParts = len(projectPathParts)
if self.isCorrelation():
# correlator data should have 4 of 5 parts here
if ((numPathParts < 4) or (numPathParts > 5)):
raise BDFReaderException(
"Invalid string for projectPath, expectes 4 or 5 parts '"
+ projectPath
+ "'"
)
else:
# TP data should have 3 or 4 parts here
if ((numPathParts < 3) or (numPathParts > 4)):
raise BDFReaderException(
"Invalid string for projectPath, expects 3 or 4 parts '"
+ projectPath
+ "'"
)
execBlockNum = int(projectPathParts[0])
scanNum = int(projectPathParts[1])
subscanNum = int(projectPathParts[2])
intNum = 0
subIntNum = 0
if numPathParts > 3:
intNum = int(projectPathParts[3])
if numPathParts > 4:
subIntNum = int(projectPathParts[4])
# the first 3 values should match those for this BDF
if (
(execBlockNum != self._bdfHeaderData.getExecBlockNum())
or (scanNum != self._bdfHeaderData.getScanNum())
or (subscanNum != self._bdfHeaderData.getSubscanNum())
):
raise BDFReaderException(
"The project path of this data subset '"
+ projectPath
+ "' is not compatible with the project path announced in the global header"
+ " '"
+ self._bdfHeaderData.projectPath()
+ "'"
)
# TBD : SDMDataSubset should be a class, reset it here
aborted = False
stopTime = None
abortReason = None
abortedObservationElements = sdmDataSubsetHeaderDOM.getElementsByTagName(
"abortedObservation"
)
if len(abortedObservationElements) > 0:
# aborted subscan
aborted = True
# it must have stopTime and reason
abortedNode = abotedObservationElements[0]
stopTime = abortedNode.getElementsByTagName("stopTime")
if stopTime is None or len(stopTime) == 0:
raise BDFReaderException(
"expected 'stopTime' element not found in aborted subscan at integrationIndex "
+ str(self._integrationIndex)
)
stopTime = int(stopTime[0].childNodes[0].nodeValue)
reason = abortedNode.getElementsByTagName("reason")
if reason is None or len(reason) == 0:
raise BDFReaderException(
"expected 'reason' element not found in aborted subscan at integrationIndex "
+ str(self._integrationIndex)
)
abortReason = reason[0].childNodes[0].nodeValue
schedulePeriodTime = sdmDataSubsetHeaderDOM.getElementsByTagName(
"schedulePeriodTime"
)
if schedulePeriodTime is None or len(schedulePeriodTime) == 0:
raise BDFReaderException(
"expected 'schedulePeriodTime' element not found in subscan at integrationIndex "
+ str(self._integrationIndex)
)
schedulePeriodTime = schedulePeriodTime[0]
integrationMidpoint = int(
(schedulePeriodTime.getElementsByTagName("time"))[0].childNodes[0].nodeValue
)
integrationInterval = int(
(schedulePeriodTime.getElementsByTagName("interval"))[0]
.childNodes[0]
.nodeValue
)
# if it's aborted, this can probably be skipped, but it should also be harmless to not skip it just in case there's more there to be skipped over
# no aborted scans available yet to test this on
actualTimesDesc = {}
actualTimesDesc["present"] = False
actualTimesDesc["startsAt"] = -1
actualTimesDesc["arr"] = None
actualTimesDesc["type"] = "INT64_TYPE"
actualTimesDesc["np_type"] = np.dtype(np.int64)
actualDurationsDesc = {}
actualDurationsDesc["present"] = False
actualDurationsDesc["startsAt"] = -1
actualDurationsDesc["arr"] = None
actualDurationsDesc["type"] = "INT64_TYPE"
actualDurationsDesc["np_type"] = np.dtype(np.int64)
crossDataDesc = {}
crossDataDesc["present"] = False
crossDataDesc["startsAt"] = -1
crossDataDesc["arr"] = None
crossDataDesc["type"] = None
crossDataDesc["np_type"] = None
# the actual crossDataDesc varies among these types
npInt16 = np.dtype(np.int16)
npInt32 = np.dtype(np.int32)
npFloat32 = np.dtype(np.float32)
autoDataDesc = {}
autoDataDesc["present"] = False
autoDataDesc["startsAt"] = -1
autoDataDesc["arr"] = None
autoDataDesc["type"] = "FLOAT32_TYPE"
autoDataDesc["np_type"] = np.dtype(np.float32)
flagsDesc = {}
flagsDesc["present"] = False
flagsDesc["startsAt"] = -1
flagsDesc["arr"] = None
flagsDesc["type"] = "INT32_TYPE"
flagsDesc["np_type"] = np.dtype(np.int32)
zeroLagsDesc = {}
zeroLagsDesc["present"] = False
zeroLagsDesc["startsAt"] = -1
zeroLagsDesc["arr"] = None
zeroLagsDesc["type"] = "FLOAT32_TYPE"
zeroLagsDesc["np_type"] = np.dtype(np.float32)
# adjust the numpy data types when the byte order is not native
if not self._bdfHeaderData.getByteOrder().isNative():
np_byteOrder = "<"
if self._bdfHeaderData.getByteOrder().getByteOrder() == "big":
np_byteOrder = ">"
actualTimesDesc["np_type"] = actualTimesDesc["np_type"].newbyteorder(
np_byteOrder
)
actualDurationsDesc["np_type"] = actualDurationsDesc[
"np_type"
].newbyteorder(np_byteOrder)
autoDataDesc["np_type"] = autoDataDesc["np_type"].newbyteorder(np_byteOrder)
flagsDesc["np_type"] = flagsDesc["np_type"].newbyteorder(np_byteOrder)
zeroLagsDesc["np_type"] = zeroLagsDesc["np_type"].newbyteorder(np_byteOrder)
npInt16 = npInt16.newbyteorder(np_byteOrder)
npInt32 = npInt32.newbyteorder(np_byteOrder)
npFloat32 = npFloat32.newbyteorder(np_byteOrder)
dataPath = None
done = False
while not done:
name, value = self._requireHeaderField("CONTENT-TYPE")
name, value = self._requireHeaderField("CONTENT-LOCATION")
# what part is this?
result = re.match(
b"(([0-9]+/)+)(actualDurations|actualTimes|autoData|crossData|zeroLags|flags)\\.bin",
value.lstrip().rstrip(),
)
if result == None:
raise BDFReaderException(
"Could not identify the part name in '"
+ value.decode("utf-8")
+ "'."
)
binaryPartName = result.group(3)
dataPath = result.group(1)
if binaryPartName.decode("utf-8") not in self._binaryPartSize:
raise BDFReaderException(
"The size of '"
+ binaryPartName.decode("utf-8")
+ "' was not announced in the data header!"
)
if self._binaryPartSize[binaryPartName.decode("utf-8")] == 0:
raise BDFReaderException(
"The size of '"
+ binaryPartName.decode("utf-8")
+ "' was announced as null. I was not expecting a '"
+ binaryPartName.decode("utf-8")
+ "' attachment here."
)
if binaryPartName == b"crossData":
# crossData must be in the minidom and it must have a type element
elements = sdmDataSubsetHeaderDOM.getElementsByTagName("crossData")
if len(elements) == 0:
raise BDFReaderException(
"Missing expected 'crossData' element in '%s'"
% sdmDataSubsetHeaderDOM.toprettyxml()
)
crossDataType = elements[0].getAttribute("type")
if len(crossDataType) == 0:
raise BDFReaderException(
"Missing expected 'type' attribute in element '%s'."
% elements[0].nodeName
)
crossDataDesc["type"] = crossDataType
self._skipUntilEmptyLine(10)
# TBD - binaryPartName can be used to know the "type" and then there should be
# a dict of type to numberOfBytesPerValue, would require restructuring the dicts used here
numberOfBytesPerValue = None
if binaryPartName == b"actualDurations" or binaryPartName == b"actualTimes":
numberOfBytesPerValue = 8
elif binaryPartName == b"autoData":
numberOfBytesPerValue = 4
elif binaryPartName == b"crossData":
if crossDataType == "INT16_TYPE":
numberOfBytesPerValue = 2
crossDataDesc["np_type"] = npInt16
elif crossDataType == "INT32_TYPE":
numberOfBytesPerValue = 4
crossDataDesc["np_type"] = npInt32
elif crossDataType == "FLOAT32_TYPE":
numberOfBytesPerValue = 4
crossDataDesc["np_type"] = npFloat32
elif binaryPartName == b"flags":
numberOfBytesPerValue = 4
elif binaryPartName == b"zeroLags":
numberOfBytesPerValue = 4
# I don't believe this can ever happen, TBD - raise a better exception here
if numberOfBytesPerValue is None:
raise BDFReaderException(
"unrecognized binary part name or data type for that part : '"
+ binaryPartName.decode("utf-8")
+ "'."
)
numberOfElementsToRead = self._binaryPartSize[
binaryPartName.decode("utf-8")
]
numberOfBytesToRead = numberOfBytesPerValue * numberOfElementsToRead
# with restructuring, this can also be cleaner - TBD
# actual number of bytes read
nReadBytes = None
if binaryPartName == b"zeroLags":
zeroLagsDesc["present"] = True
zeroLagsDesc["startsAt"] = self.position()
dt = zeroLagsDesc["np_type"]
zeroLagsDesc["arr"] = np.fromfile(
self._f, dtype=dt, count=numberOfElementsToRead
)
nReadBytes = zeroLagsDesc["arr"].size * numberOfBytesPerValue
elif binaryPartName == b"actualTimes":
actualTimesDesc["present"] = True
actualTimesDesc["startsAt"] = self.position()
dt = actualTimesDesc["np_type"]
actualTimesDesc["arr"] = np.fromfile(
self._f, dtype=dt, count=numberOfElementsToRead
)
nReadBytes = len(actualTimesDesc["arr"]) * numberOfBytesPerValue
elif binaryPartName == b"actualDurations":
actualDurationsDesc["present"] = True
actualDurationsDesc["startsAt"] = self.position()
dt = actualDurationsDesc["np_type"]
actualDurationsDesc["arr"] = np.fromfile(
self._f, dtype=dt, count=numberOfElementsToRead
)
nReadBytes = len(actualDurationsDesc["arr"]) * numberOfBytesPerValue
elif binaryPartName == b"crossData":
crossDataDesc["present"] = True
crossDataDesc["startsAt"] = self.position()
dt = crossDataDesc["np_type"]
crossDataDesc["arr"] = np.fromfile(
self._f, dtype=dt, count=numberOfElementsToRead
)
nReadBytes = len(crossDataDesc["arr"]) * numberOfBytesPerValue
elif binaryPartName == b"autoData":
autoDataDesc["present"] = True
autoDataDesc["startsAt"] = self.position()
dt = autoDataDesc["np_type"]
autoDataDesc["arr"] = np.fromfile(
self._f, dtype=dt, count=numberOfElementsToRead
)
nReadBytes = len(autoDataDesc["arr"]) * numberOfBytesPerValue
elif binaryPartName == b"flags":
flagsDesc["present"] = True
flagsDesc["startsAt"] = self.position()
dt = flagsDesc["np_type"]
flagsDesc["arr"] = np.fromfile(
self._f, dtype=dt, count=numberOfElementsToRead
)
nReadBytes = len(flagsDesc["arr"]) * numberOfBytesPerValue
else:
# should never reach here ! but just in case
bytes = self._f.read(numberOfBytesToRead)
nReadBytes = len(bytes)
print(
"Unknown binary part name '(%s)' with %s bytes to read, skipping in case it's possible to continue, this should never happen!"
% (binaryPartName.decode("utf-8"), numberOfBytesToRead)
)
if nReadBytes < numberOfBytesToRead:
raise BDFReaderException(
"End of file reached while reading binary attachment '"
+ binaryPartName.decode("utf")
+ "'."
)
line = (
self._nextLine()
) # absorb the nl right after the last byte of the binary attachment
line = self._nextLine() # this should be boundary_2
if line.find(b"--" + self._boundary_2.strip(b'"')) != 0:
raise BDFReaderException(
"Processing integration # "
+ str(self._integrationIndex)
+ ":Unexpected '"
+ line.decode("utf-8")
+ "' after the binary part '"
+ binaryPartName.decode("utf-8")
+ "'."
)
# is this the end?
done = line == b"--" + self._boundary_2.strip(b'"') + b"--"
# TBD - this struct could be assembed earlier and used more efficiently in the middle
# TBD - pyBDFExplorer returns sdmDataSubsetHeaderDOM here, is that useful?
return {
"projectPath": projectPath,
"integrationNumber": intNum,
"subIntegrationNumber": subIntNum,
"midpointInNanoSeconds": integrationMidpoint,
"intervalInNanoSeconds": integrationInterval,
"aborted": aborted,
"stopTime": stopTime,
"abortReason": abortReason,
"actualTimes": actualTimesDesc,
"actualDurations": actualDurationsDesc,
"crossData": crossDataDesc,
"autoData": autoDataDesc,
"flags": flagsDesc,
"zeroLags": zeroLagsDesc,
}
def _setPosition(self, newpos):
"""
Set the position of the BDF to the location given by newpos (bytes).
This is intended for inernal use because it may confuse the public methods if this is
used out of the normal sequence.
"""
self._f.seek(newpos, 0)
[docs] def printSubset(self, subset):
# not for release
# prints a subset, prining up to the first 10 elements of each type
# does not use self
print("projectPath = " + subset["projectPath"])
print("time = %s" % subset["midpointInNanoSeconds"])
print("interval = %s" % subset["intervalInNanoSeconds"])
floatItems = ["autoData", "zeroLags"]
if "crossData" in subset and subset["crossData"]["present"]:
print("crossDataType = " + subset["crossData"]["type"])
if subset["crossData"]["type"] == "FLOAT32_TYPE":
floatItems.append("crossData")
print("Binary attachments :")
for item in [
"actualTimes",
"actualDurations",
"flags",
"crossData",
"autoData",
"zeroLags",
]:
if (item in subset) and subset[item]["present"]:
nOut = min(10, subset[item]["arr"].size)
outStr = "%s (%s values ) = " % (
(item[0].upper() + item[1:]),
subset[item]["arr"].size,
)
nOut = min(10, subset[item]["arr"].size)
floatFormat = item in floatItems
for itemVal in subset[item]["arr"][0:nOut]:
if floatFormat:
outStr = outStr + " " + f"{itemVal:.6f}"
else:
outStr = outStr + " " + str(itemVal)
if nOut < subset[item]["arr"].size:
outStr = outStr + "..."
print(outStr)