Package buspyrate

Python interface for BusPirate binary mode

GitLab Project | PyPi Package | API Documentation

Expand source code
"""\
Python interface for BusPirate binary mode
==========================================

[GitLab Project][gitlab] | [PyPi Package][pypi] | [API Documentation][pages]

[gitlab]: https://gitlab.com/mpasternacki/buspyrate
[pypi]: https://pypi.org/project/buspyrate/
[pages]: https://mpasternacki.gitlab.io/buspyrate
"""

__all__ = ["BusPirate", "Data", "SPISpeed"]

from .buspirate import BusPirate
from .const import SPISpeed
from .typing import Data

Sub-modules

buspyrate.buspirate

Main BusPirate interface implementation.

buspyrate.const

BusPyrate constants and enums

buspyrate.typing

BusPyrate types and type aliases

Classes

class BusPirate (serial: Union[str, bytes, os.PathLike[str], os.PathLike[bytes], buspyrate.SerialDuck], spi_wtr_0r_confirm=True, **kwargs: Any)

Main BusPirate interface class.

Initialize new BusPirate connection.

serial can be either a pyserial's Serial instance, or path to a serial device. If it's a path, a Serial instance will be created. Keyword arguments will be passed to Serial's constructor, with timeout defaulting to 0.5 and baudrate to 115200.

spi_wtr_0r_confirm can be set to False to turn on a workaround for BusPirate/Bus_Pirate#158 - BusPyrate won't expect confirmation on spi_write_then_read when read=0. If you need this workaround, remember to sleep() a bit after a write-only spi_write_then_read.

Expand source code
class BusPirate:
    """Main BusPirate interface class."""

    serial: SerialDuck
    """Serial port connection"""

    mode: t.Optional[Mode] = None
    """Current BP mode (None if unknown or text)"""

    _spi_wtr_0r_confirm: bool

    def __init__(
        self,
        serial: t.Union[AnyPath, SerialDuck],
        spi_wtr_0r_confirm=True,
        **kwargs: t.Any,
    ) -> None:
        """Initialize new BusPirate connection.

        `serial` can be either a pyserial's Serial instance, or path
        to a serial device. If it's a path, a Serial instance will be
        created. Keyword arguments will be passed to Serial's
        constructor, with timeout defaulting to 0.5 and baudrate to
        115200.

        `spi_wtr_0r_confirm` can be set to `False` to turn on a
        workaround for [BusPirate/Bus_Pirate#158] - BusPyrate won't
        expect confirmation on spi_write_then_read when read=0. If you
        need this workaround, remember to sleep() a bit after a
        write-only spi_write_then_read.

        [BusPirate/Bus_Pirate#158]: https://github.com/BusPirate/Bus_Pirate/pull/158
        """
        if isinstance(serial, SerialDuck):
            self.serial = serial
        else:
            kwargs.setdefault("baudrate", 115200)
            kwargs.setdefault("timeout", 0.5)
            self.serial = Serial(serial, **kwargs)
        self.spi_wtr_0r_confirm = spi_wtr_0r_confirm

    def receive(self, size, timeout=LEAVE_TIMEOUT_ALONE) -> bytes:
        """Receive `size` bytes from serial."""
        if timeout is LEAVE_TIMEOUT_ALONE:
            rv = self.serial.read(size)
            # log.debug(f"<- {rv.hex()}")
            return rv

        orig_timeout = self.serial.timeout
        self.serial.timeout = timeout
        try:
            rv = self.serial.read(size)
            # log.debug(f"<- {rv.hex()}")
            return rv
        finally:
            self.serial.timeout = orig_timeout

    def send(
        self,
        data: Data,
        expect: t.Optional[bytes] = CONFIRM,
        timeout=LEAVE_TIMEOUT_ALONE,
    ) -> None:
        """Send `data` to serial.

        If `expect` is not empty and not none, read `len(expect)`
        bytes from serial after sending data and raise `Unexpected` if
        it's different than `expect`.

        The `timeout` argument is only used for receiving the expected
        value.
        """
        # log.debug(f"-> {data.hex()}")
        self.serial.write(data)
        # print(ser.out_waiting, ser.in_waiting)
        self.serial.flush()
        sleep(_MILLISECOND)
        if expect is not None and len(expect) > 0:
            resp = self.receive(len(expect), timeout=timeout)
            if resp != expect:
                raise Unexpected(f"Expected {expect!r}, got {resp!r}")

    def enter_bitbang(self) -> None:
        """Enter binary bitbang mode.

        If current mode is not binary, sends zero byte up to 25
        times. If BP is already in some binary mode, just sends zero
        once and expect confirmation.
        """
        if self.mode is None:
            # discard all pending serial data
            while self.receive(1024) != b"":
                pass
            for i in range(25):
                try:
                    self.send(b"\x00", expect=b"BBIO1")
                except Unexpected:
                    pass
                else:
                    self.mode = Mode.BITBANG
                    break

            if self.mode != Mode.BITBANG:
                raise RuntimeError("Couldn't enter binary mode")
        else:
            # self.mode is not None - we're in some binary mode. Even
            # if we're already in bitbang, sending a zero won't hurt.
            self.send(b"\x00", expect=b"BBIO1")
            self.mode = Mode.BITBANG
        log.info("Entered binary bitbang mode")

    def reset(self) -> None:
        """Reset BusPirate.

        Enters bitbang mode if needed, even from text/unknown mode.
        """
        self.enter_bitbang()  # just to make sure
        self.send(b"\x0F")
        self.mode = None
        sleep(250 * _MILLISECOND)
        self.receive(1024, timeout=0)

    def __enter__(self: TBP) -> TBP:
        """Use the instance in a `with` statement.

        It yields self (but doesn't enter any mode) and on exit it
        resets BP and closes serial connection.
        """
        return self

    def __exit__(self, *args: t.Any) -> None:
        """On exiting the context, reset BP and close serial."""
        self.reset()
        self.serial.close()

    def _expect_mode(self, mode: Mode) -> None:
        if self.mode != mode:
            # Maybe this could be an assert?
            raise RuntimeError(f"Need mode {mode}, but BusPirate is in {self.mode}")

    def enter_spi(self) -> None:
        """Enter binary SPI mode.

        If already in SPI mode, does nothing. If not in bitbang mode,
        enters bitbang mode first.

        If you don't need detailed control, you may prefer the `spi()`
        wrapper which enters mode and configures it in one call.
        """
        if self.mode == Mode.SPI:
            return  # log something?
        if self.mode != Mode.BITBANG:
            self.enter_bitbang()

        self.send(b"\x01", expect=b"SPI1")
        self.mode = Mode.SPI
        log.info("Entered SPI mode")

    def spi_speed(self, speed: SPISpeed) -> None:
        """Set SPI speed.

        If you don't need detailed control, you may prefer the `spi()`
        wrapper which enters mode and configures it in one call.
        """
        self._expect_mode(Mode.SPI)
        self.send(pack("B", 0x60 | speed))

    def spi_config(
        self,
        output_3v3=False,
        clock_idle_phase_high=False,
        clock_edge_falling=False,
        sample_time_end=False,
    ) -> None:
        """Configure SPI.

        If you don't need detailed control, you may prefer the `spi()`
        wrapper which enters mode and configures it in one call.
        """
        self._expect_mode(Mode.SPI)

        cmd = 0x80
        if output_3v3:
            cmd |= 8
        if clock_idle_phase_high:
            cmd |= 4
        if clock_edge_falling:
            cmd |= 2
        if sample_time_end:
            cmd |= 1
        self.send(pack("B", cmd))

    def spi_periph_config(
        self,
        power=False,
        pullups=False,
        aux=False,
        cs=False,
    ) -> None:
        """Configure SPI peripherals.

        If you don't need detailed control, you may prefer the `spi()`
        wrapper which enters mode and configures it in one call.
        """
        self._expect_mode(Mode.SPI)

        cmd = 0x40
        if power:
            cmd |= 8
        if pullups:
            cmd |= 4
        if aux:
            cmd |= 2
        if cs:
            cmd |= 1
        self.send(pack("B", cmd))

    def spi(
        self,
        speed: SPISpeed,
        output_3v3=False,
        clock_idle_phase_high=False,
        clock_edge_falling=True,
        sample_time_end=False,
        power=True,
        pullups=False,
        aux=False,
        cs=True,
    ) -> None:
        """Enter SPI mode and configure it.

        See `enter_spi`, `spi_speed`, `spi_config` and `spi_periph_config` for details.
        """
        self.enter_spi()
        self.spi_speed(speed)
        self.spi_config(
            output_3v3=output_3v3,
            clock_idle_phase_high=clock_idle_phase_high,
            clock_edge_falling=clock_edge_falling,
            sample_time_end=sample_time_end,
        )
        self.spi_periph_config(power=power, pullups=pullups, aux=aux, cs=cs)

    @contextmanager
    def spi_cs(self):
        self._expect_mode(Mode.SPI)

        self.send(b"\x02")
        try:
            yield
        finally:
            self.send(b"\x03")

    def spi_write_then_read(self, data: Data, read: int, cs: bool = True) -> bytes:
        self._expect_mode(Mode.SPI)

        if cs:
            cmd = 0x04
        else:
            cmd = 0x05
        self.send(pack(">BHH", cmd, len(data), read), expect=b"")
        # If the number of bytes to read or write are out of bounds,
        # the Bus Pirate will return 0x00 now <- this is why we split
        # into two send calls
        if read == 0 and not self.spi_wtr_0r_confirm:
            # Workaround for https://github.com/BusPirate/Bus_Pirate/pull/158
            #
            # There's a bug in BP firmware where write-then-read
            # operation doesn't send confirmation when there is 0
            # bytes to read. Additional problem is that we don't know
            # when BP has finished writing, so client needs to sleep
            # after WTR to avoid interfering with ongoing write.
            expect = b""
        else:
            expect = CONFIRM
        self.send(data, timeout=30, expect=expect)
        if read > 0:
            return self.receive(read, timeout=30)  # FIXME: adjustable timeout?
        return b""

    def spi_transfer(self, data: Data) -> bytes:
        self._expect_mode(Mode.SPI)

        assert len(data) <= 16
        self.send(pack(">B", 0x10 | len(data) - 1))
        self.send(data, expect=None)
        return self.receive(len(data))

Class variables

var mode : Optional[Mode]

Current BP mode (None if unknown or text)

var serialSerialDuck

Serial port connection

Methods

def enter_bitbang(self) ‑> NoneType

Enter binary bitbang mode.

If current mode is not binary, sends zero byte up to 25 times. If BP is already in some binary mode, just sends zero once and expect confirmation.

Expand source code
def enter_bitbang(self) -> None:
    """Enter binary bitbang mode.

    If current mode is not binary, sends zero byte up to 25
    times. If BP is already in some binary mode, just sends zero
    once and expect confirmation.
    """
    if self.mode is None:
        # discard all pending serial data
        while self.receive(1024) != b"":
            pass
        for i in range(25):
            try:
                self.send(b"\x00", expect=b"BBIO1")
            except Unexpected:
                pass
            else:
                self.mode = Mode.BITBANG
                break

        if self.mode != Mode.BITBANG:
            raise RuntimeError("Couldn't enter binary mode")
    else:
        # self.mode is not None - we're in some binary mode. Even
        # if we're already in bitbang, sending a zero won't hurt.
        self.send(b"\x00", expect=b"BBIO1")
        self.mode = Mode.BITBANG
    log.info("Entered binary bitbang mode")
def enter_spi(self) ‑> NoneType

Enter binary SPI mode.

If already in SPI mode, does nothing. If not in bitbang mode, enters bitbang mode first.

If you don't need detailed control, you may prefer the spi() wrapper which enters mode and configures it in one call.

Expand source code
def enter_spi(self) -> None:
    """Enter binary SPI mode.

    If already in SPI mode, does nothing. If not in bitbang mode,
    enters bitbang mode first.

    If you don't need detailed control, you may prefer the `spi()`
    wrapper which enters mode and configures it in one call.
    """
    if self.mode == Mode.SPI:
        return  # log something?
    if self.mode != Mode.BITBANG:
        self.enter_bitbang()

    self.send(b"\x01", expect=b"SPI1")
    self.mode = Mode.SPI
    log.info("Entered SPI mode")
def receive(self, size, timeout=<object object>) ‑> bytes

Receive size bytes from serial.

Expand source code
def receive(self, size, timeout=LEAVE_TIMEOUT_ALONE) -> bytes:
    """Receive `size` bytes from serial."""
    if timeout is LEAVE_TIMEOUT_ALONE:
        rv = self.serial.read(size)
        # log.debug(f"<- {rv.hex()}")
        return rv

    orig_timeout = self.serial.timeout
    self.serial.timeout = timeout
    try:
        rv = self.serial.read(size)
        # log.debug(f"<- {rv.hex()}")
        return rv
    finally:
        self.serial.timeout = orig_timeout
def reset(self) ‑> NoneType

Reset BusPirate.

Enters bitbang mode if needed, even from text/unknown mode.

Expand source code
def reset(self) -> None:
    """Reset BusPirate.

    Enters bitbang mode if needed, even from text/unknown mode.
    """
    self.enter_bitbang()  # just to make sure
    self.send(b"\x0F")
    self.mode = None
    sleep(250 * _MILLISECOND)
    self.receive(1024, timeout=0)
def send(self, data: Union[bytes, bytearray], expect: Optional[bytes] = b'\x01', timeout=<object object>) ‑> NoneType

Send data to serial.

If expect is not empty and not none, read len(expect) bytes from serial after sending data and raise Unexpected if it's different than expect.

The timeout argument is only used for receiving the expected value.

Expand source code
def send(
    self,
    data: Data,
    expect: t.Optional[bytes] = CONFIRM,
    timeout=LEAVE_TIMEOUT_ALONE,
) -> None:
    """Send `data` to serial.

    If `expect` is not empty and not none, read `len(expect)`
    bytes from serial after sending data and raise `Unexpected` if
    it's different than `expect`.

    The `timeout` argument is only used for receiving the expected
    value.
    """
    # log.debug(f"-> {data.hex()}")
    self.serial.write(data)
    # print(ser.out_waiting, ser.in_waiting)
    self.serial.flush()
    sleep(_MILLISECOND)
    if expect is not None and len(expect) > 0:
        resp = self.receive(len(expect), timeout=timeout)
        if resp != expect:
            raise Unexpected(f"Expected {expect!r}, got {resp!r}")
def spi(self, speed: SPISpeed, output_3v3=False, clock_idle_phase_high=False, clock_edge_falling=True, sample_time_end=False, power=True, pullups=False, aux=False, cs=True) ‑> NoneType

Enter SPI mode and configure it.

See enter_spi, spi_speed, spi_config and spi_periph_config for details.

Expand source code
def spi(
    self,
    speed: SPISpeed,
    output_3v3=False,
    clock_idle_phase_high=False,
    clock_edge_falling=True,
    sample_time_end=False,
    power=True,
    pullups=False,
    aux=False,
    cs=True,
) -> None:
    """Enter SPI mode and configure it.

    See `enter_spi`, `spi_speed`, `spi_config` and `spi_periph_config` for details.
    """
    self.enter_spi()
    self.spi_speed(speed)
    self.spi_config(
        output_3v3=output_3v3,
        clock_idle_phase_high=clock_idle_phase_high,
        clock_edge_falling=clock_edge_falling,
        sample_time_end=sample_time_end,
    )
    self.spi_periph_config(power=power, pullups=pullups, aux=aux, cs=cs)
def spi_config(self, output_3v3=False, clock_idle_phase_high=False, clock_edge_falling=False, sample_time_end=False) ‑> NoneType

Configure SPI.

If you don't need detailed control, you may prefer the spi() wrapper which enters mode and configures it in one call.

Expand source code
def spi_config(
    self,
    output_3v3=False,
    clock_idle_phase_high=False,
    clock_edge_falling=False,
    sample_time_end=False,
) -> None:
    """Configure SPI.

    If you don't need detailed control, you may prefer the `spi()`
    wrapper which enters mode and configures it in one call.
    """
    self._expect_mode(Mode.SPI)

    cmd = 0x80
    if output_3v3:
        cmd |= 8
    if clock_idle_phase_high:
        cmd |= 4
    if clock_edge_falling:
        cmd |= 2
    if sample_time_end:
        cmd |= 1
    self.send(pack("B", cmd))
def spi_cs(self)
Expand source code
@contextmanager
def spi_cs(self):
    self._expect_mode(Mode.SPI)

    self.send(b"\x02")
    try:
        yield
    finally:
        self.send(b"\x03")
def spi_periph_config(self, power=False, pullups=False, aux=False, cs=False) ‑> NoneType

Configure SPI peripherals.

If you don't need detailed control, you may prefer the spi() wrapper which enters mode and configures it in one call.

Expand source code
def spi_periph_config(
    self,
    power=False,
    pullups=False,
    aux=False,
    cs=False,
) -> None:
    """Configure SPI peripherals.

    If you don't need detailed control, you may prefer the `spi()`
    wrapper which enters mode and configures it in one call.
    """
    self._expect_mode(Mode.SPI)

    cmd = 0x40
    if power:
        cmd |= 8
    if pullups:
        cmd |= 4
    if aux:
        cmd |= 2
    if cs:
        cmd |= 1
    self.send(pack("B", cmd))
def spi_speed(self, speed: SPISpeed) ‑> NoneType

Set SPI speed.

If you don't need detailed control, you may prefer the spi() wrapper which enters mode and configures it in one call.

Expand source code
def spi_speed(self, speed: SPISpeed) -> None:
    """Set SPI speed.

    If you don't need detailed control, you may prefer the `spi()`
    wrapper which enters mode and configures it in one call.
    """
    self._expect_mode(Mode.SPI)
    self.send(pack("B", 0x60 | speed))
def spi_transfer(self, data: Union[bytes, bytearray]) ‑> bytes
Expand source code
def spi_transfer(self, data: Data) -> bytes:
    self._expect_mode(Mode.SPI)

    assert len(data) <= 16
    self.send(pack(">B", 0x10 | len(data) - 1))
    self.send(data, expect=None)
    return self.receive(len(data))
def spi_write_then_read(self, data: Union[bytes, bytearray], read: int, cs: bool = True) ‑> bytes
Expand source code
def spi_write_then_read(self, data: Data, read: int, cs: bool = True) -> bytes:
    self._expect_mode(Mode.SPI)

    if cs:
        cmd = 0x04
    else:
        cmd = 0x05
    self.send(pack(">BHH", cmd, len(data), read), expect=b"")
    # If the number of bytes to read or write are out of bounds,
    # the Bus Pirate will return 0x00 now <- this is why we split
    # into two send calls
    if read == 0 and not self.spi_wtr_0r_confirm:
        # Workaround for https://github.com/BusPirate/Bus_Pirate/pull/158
        #
        # There's a bug in BP firmware where write-then-read
        # operation doesn't send confirmation when there is 0
        # bytes to read. Additional problem is that we don't know
        # when BP has finished writing, so client needs to sleep
        # after WTR to avoid interfering with ongoing write.
        expect = b""
    else:
        expect = CONFIRM
    self.send(data, timeout=30, expect=expect)
    if read > 0:
        return self.receive(read, timeout=30)  # FIXME: adjustable timeout?
    return b""
class SPISpeed (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class SPISpeed(IntEnum):
    S30k = 0
    S125k = 1
    S250k = 2
    S1M = 3
    S2M = 4
    S2M6 = 5
    S4M = 6
    S8M = 7

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var S125k
var S1M
var S250k
var S2M
var S2M6
var S30k
var S4M
var S8M