Source code for pyasdm.EndianOutput

# ALMA - Atacama Large Millimeter Array
# (c) European Southern Observatory, 2024
# (c) Associated Universities Inc., 2024
# Copyright by ESO (in the framework of the ALMA collaboration),
# Copyright by AUI (in the framework of the ALMA collaboration),
# All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY, without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNUn
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307  USA
#
# File EndianInput.py

# Modelled on the EndianOSStream c++ class and a similar Java class

from .ByteOrder import ByteOrder

import io
import struct
import sys


[docs]class EndianOutput: """ This class handles the conversion of values used by pyasdm objects into a buffered byte stream output previous opened. """ # the buffered output being read _outStream = None # the specified byte order as a ByteOrder instance _byteOrder = None def __init__(self, outStream, byteOrder=None): """ Initialize this class using the given output stream, which be an instance of io.BufferedWriter that was opened as 'open(<filepath>,"wb")' or something equivalent. If byteOrder is not given then the native byte order as found in sys.byteorder is used. When provided the """ if not isinstance(outStream, io.BufferedWriter): raise ValueError("outStream must be an io.BufferedWriter instance") if byteOrder is None: # default to the local byte order byteOrder = ByteOrder() if not isinstance(byteOrder, ByteOrder): raise ValueError("byteOrder must be a ByteOrder instance when provided") self._outStream = outStream self._byteOrder = byteOrder
[docs] def byteOrder(self): """ Return the ByteOrder instance used by this EndianOutput. """ return self.byteOrder
[docs] def close(self): """ Close the output stream being written to. """ self._outStream.close()
[docs] def closed(self): """ Returns True if the output stream is closed. """ return self._outStream.closed
[docs] def writeBool(self, boolVal): """ Write a a bool value as 1 byte to the output stream. """ if not isinstance(boolVal, bool): raise ValueError("boolVal must be a bool instance") self._outStream.write(boolVal.to_bytes(1, self._byteOrder.getByteOrder()))
[docs] def writeByte(self, byteVal): """ Writes a bytes value to the output stream. If byteVal is an integer this assumes it can be represented as a single byte and it converts it to that representation. Any other type fir byteVal is an error """ try: if isinstance(byteVal, int): byteVal = byteVal.to_bytes(1, self._byteOrder.getByteOrder()) except OverflowError as exc: raise ValueError("is too large to convert to a single byte") from None if not isinstance(byteValue, bytes): raise ValueError( "byteVal must be a bytes instance of an int that can be converted to a bytes as a single byte" ) self._outStream.write(byteVal)
[docs] def writeShort(self, shortVal): """ Write an integer to the output stream as a 2-element bytes instance """ if not isinstance(shortVal, int): raise ValueError("shortVal must be an int instance") try: self._outStream.write( shortVal.to_bytes(2, self._byteOrder.getByteOrder(), signed=True) ) except OverflowError as exc: raise ValueError( "shortVal is too large to convert to 2 bytes representing a short integer" ) from None
[docs] def writeUShort(self, uShortVal): """ Write an integer to the otuput stream as a 2-element bytes instance representing an unsigned integer. """ if not isinstance(shortVal, int) or shortVal < 0: raise ValueError("shortVal must a postitive integer") try: self._outStream.write( uShortVal.to_bytes(2, self._byteOrder.getByteOrder(), signed=False) ) except OverflowError as exc: raise ValueError( "uShhortVal is too large to convert to 2 bytes representing an unsigned short integer" ) from None
[docs] def writeInt(self, intVal): """ Write an integer to the output stream as a bytes instance, representing a 4-byte integer. """ if not isinstance(intVal, int): raise ValueError("intVal must be an integer") try: self._outStream.write( intVal.to_bytes(4, self._byteOrder.getByteOrder(), signed=True) ) except OverflowError as exc: raise ValueError( "intVal is too large to convert to 4 bytes representing an integer" ) from None
[docs] def writeUInt(self, intVal): """ Write an integer to the output stream as a bytes instance, representing an unsigned 4-byte integer. """ if not isinstance(intVal, int) or intVal < 0: raise ValueError("intVal must be a positive integer") try: self._outStream.write( intVal.to_bytes(4, self._byteOrder.getByteOrder(), signed=False) ) except OverflowError as exc: raise ValueError( "intVal is too large to convert to 4 bytes representing an unsigned integer" ) from None
[docs] def writeLong(self, intVal): """ Write an integer to the output stream as a bytes instance, representing an 8-byte integer. """ if not isinstance(intVal, int): raise ValueError("intVal must be an integer") try: self._outStream.write( intVal.to_bytes(8, self._byteOrder.getByteOrder(), signed=True) ) except OverflowError as exc: raise ValueError( "intVal is too large to convert to 8 bytes representing an integer" ) from None
[docs] def writeULong(self, intVal): """ Write an integer to the output stream as a bytes instance, representing an 8-byte unsigned integer. """ if not isinstance(intVal, int) or intVal < 0: raise ValueError("intVal must be a positive integer") try: self._outStream.write( intVal.to_bytes(8, self._byteOrder.getByteOrder(), signed=False) ) except OverflowError as exc: raise ValueError( "intVal is too large to convert to 8 bytes representing an unsigned integer" ) from None
[docs] def writeFloat(self, floatVal): """ Write a float value to the output stream as a 4-byte float """ if not isinstance(floatVal, float): raise ValueError("floatVal must a float") # convert it to bytes in the native byte order using the "f" format asBytes = struct.pack("f", floatVal) # flip the byte order if the output byte order is not the native order if not self._byteOrder.isNative(): asBytes = asBytes[::-1] self._outStream.write(asBytes)
[docs] def writeDouble(self, dblVal): """ Write a float value to the output stream as a 8-byte float """ if not isinstance(dblVal, float): raise ValueError("dblVal must a float") # convert it to bytes in the native byte order using the "d" format asBytes = struct.pack("d", dblVal) # flip the byte order if the output byte order is not the native order if not self._byteOrder.isNative(): asBytes = asBytes[::-1] self._outStream.write(asBytes)
[docs] def writeString(self, strVal): """ Write a string to the output stream as first a 4-byte integer representing the length of strVal followed by the bytes that make up strVal. """ if not isinstance(strVal, str): raise ValueError("strVal is not a str") self.writeInt(len(strVal)) self._outStream.write(bytes(strVal, "utf=8"))
[docs] def writeStr(self, strVal): """ Identical to writeString, used by the template generator for reasons. """ self.writeString(strVal)