Source code for pdclient.drop

from enum import Enum
from typing import Sequence, Tuple, Union

from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from pdclient import PdClient

from pdclient.api_types import MoveCommand
from pdclient.exceptions import InvalidMoveException

class Dir(Enum):
    UP = 1
    DOWN = 2
    LEFT = 3
    RIGHT = 4

def dir2str(dir):
    if dir == Dir.UP:
        return "Up"
    elif dir == Dir.DOWN:
        return "Down"
    elif dir == Dir.LEFT:
        return "Left"
    elif dir == Dir.RIGHT:
        return "Right"

def str2dir(s):
    sl = s.lower()
    if sl == "up":
        return Dir.UP
    elif sl == "down":
        return Dir.DOWN
    elif sl == "left":
        return Dir.LEFT
    elif sl == "right":
        return Dir.RIGHT

    raise ValueError(f"Invalid direction string {s}")

def validate_dir(d):
    if isinstance(d, Dir):
        return d
    elif isinstance(d, str):
        return str2dir(d)

    raise ValueError("Direction must be a string ('left', 'right', 'up', 'down') or a Dir object")

def move(pos, direction: Union[str, Dir]):
    direction = validate_dir(direction)
    newpos = None
    if direction == Dir.UP:
        newpos = (pos[0], pos[1] - 1)
    elif direction == Dir.DOWN:
        newpos = (pos[0], pos[1] + 1)
    elif direction == Dir.LEFT:
        newpos = (pos[0] - 1, pos[1])
    elif direction == Dir.RIGHT:
        newpos = (pos[0] + 1, pos[1])

    return newpos

[docs]class Drop(object): """Represents a drop on the electrode board""" def __init__(self, pos: Sequence[int], size: Sequence[int], client: 'PdClient'): self.pos = pos self.size = size self.client = client
[docs] def get_move_command(self, dir: Union[str, Dir], **kwargs) -> MoveCommand: """Returns a MoveCommand which can be passed to the move_drops method Raises InvalidMoveException if the current or new drop position are not valid on the current electrode board. For example, if the move causes the drop to move off the electrode grid. **kwargs are passed on to the MoveCommand, and can be used to set other move options, e.g. `timeout`, or `threshold`. """ new_drop = Drop(move(self.pos, dir), self.size, self.client) cmd = None try: cmd = MoveCommand(self.pins(), new_drop.pins(), **kwargs) except ValueError: raise InvalidMoveException(f"Cannot move drop of size {self.size} to {new_drop.pos}") return cmd
[docs] def move(self, dir: Union[str, Dir], **kwargs): """Move the drop one electrode in the given direction If the move is successful, `self.pos` is updated to reflect the new position. Args: dir: One of [Dir.UP, Dir.DOWN, dir.LEFT, dir.RIGHT], or (case-insensitive) strings ["up", "down", "left", "right"] Returns: The `move_drop` response object from the API """ cmd = self.get_move_command(dir, **kwargs) response = self.client.move_drops([cmd])[0] if(response['success']): self.pos = move(self.pos, dir) return response
[docs] def move_up(self, **kwargs): """Move the drop one electrode up (i.e. y = y - 1) """ return self.move(Dir.UP, **kwargs)
[docs] def move_down(self, **kwargs): """Move the drop one electrode down (i.e. y = y + 1) """ return self.move(Dir.DOWN, **kwargs)
[docs] def move_left(self, **kwargs): """Move the drop one electrode left (i.e. x = x - 1) """ return self.move(Dir.LEFT, **kwargs)
[docs] def move_right(self, **kwargs): """Move the drop one electrode right (i.e. x = x + 1) """ return self.move(Dir.RIGHT, **kwargs)
[docs] def pins(self): """Return all pins which are part of the drop """ pins = [] for x in range(self.size[0]): for y in range(self.size[1]): loc = (self.pos[0] + x, self.pos[1] + y) pins.append(self.client.get_pin(loc)) return pins
[docs] def activate(self): """Activate the electrodes for this drop """ self.client.enable_pins(self.pins())
def __str__(self): return f"Drop(pos={self.pos}, size={self.size})"
def move_multiple_drops(*moves: Tuple[Drop, Union[str, Dir]], **kwargs): """Execute concurrent device controlled moves for a set of drops Local state is updated for drops that are reported as successfully moved. Example: drop1 = Drop((1, 1), (2, 2), client) drop2 = Drop((5, 1), (2, 2), client) move_multiple_drops((drop1, "Right"), (drop2, "Right")) """ if len(moves) == 0: return [] # Get the client from the first move's drop client = moves[0][0].client commands = [drop.get_move_command(dir, **kwargs) for drop, dir in moves] results = client.move_drops(commands) for i in range(len(moves)): if results[i]['success']: drop = moves[i][0] dir = moves[i][1] drop.pos = move(drop.pos, dir) return results