Source code for pdclient.client

import json
import requests
import time
from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union

from pdclient.api_types import Grid, MoveCommand
import pdclient.reservoir as reservoir

# Feedback mode settings
FB_DISABLED = 0
FB_NORMAL = 1
FB_DIFFERENTIAL = 2

# Capacitance drive group settings
CAP_HIGHGAIN = 0
CAP_LOWGAIN = 1

class RpcClient(object):
    """General RPC call client

    Any method called on this object will be converted to an RPC call to the
    endpoint provided.
    """

    def __init__(self, url):
        self._url = url
        self._id = 0

    def callrpc(self, method, *args):
        payload = {
            "method": method,
            "params": args,
            "jsonrpc": "2.0",
            "id": self._id,
        }
        self._id += 1

        response = requests.post(self._url, json=payload).json()

        if 'result' in response:
            return response['result']
        else:
            raise RuntimeError("Unexpected response: %s" % response)

    def __getattr__(self, name):
        def f(*args):
            return self.callrpc(name, *args)
        return f


[docs]class PdClient(object): """A PdClient object provides the interface for accessing PurpleDrop via RPC calls """ def __init__(self, host: str): """Create a new PdClient object Args: host: The RPC host URI, for example 'http://purpledrophost:7000/rpc' """ self._board: Optional[Dict] = None self.client = RpcClient(host)
[docs] def layout(self) -> dict: """Get the layout object from the electrode board definition """ if self._board is None: self._board = self.client.get_board_definition() return self._board['layout']
[docs] def grid(self, idx: int=0) -> Grid: """Get one grid object from the electrode board definition idx: Index indicating which grid to return for board with multiple grids """ layout = self.layout() if 'grid' in layout: if idx > 0: raise ValueError(f"Grid {idx} not found in board layout") return Grid(pins=layout['grid']) elif 'grids' in layout: if idx > len(layout['grids']): raise ValueError(f"Grid {idx} not found in board layout") return Grid( pins=layout['grids'][idx]['pins'], pitch=layout['grids'][idx]['pitch'], origin=layout['grids'][idx]['origin'], ) else: raise ValueError("No grid found in board layout")
[docs] def grids(self) -> List[Grid]: """Get all grids in the electrode board layout """ layout = self.layout() if 'grid' in layout: return [self.grid(0)] elif 'grids' in layout: return [self.grid(i) for i in range(len(layout['grids']))] else: return []
[docs] def get_pin(self, location: Sequence[int], grid: int=0) -> int: """Get the electrode pin number from a grid location using the layout location: (x, y) coordinate of the electrode to lookup grid: Index indicating which grid is to be used for a board with multiple grids """ p = location if len(p) != 2: raise ValueError(f"location argument ({location}) must be of length 2") if p[0] < 0 or p[1] < 0: raise ValueError(f"Location argument ({location}) must be only positive integers") try: g = self.grid(grid) pin = g[p[1]][p[0]] except IndexError: raise ValueError( "Invalid position (%d, %d), it is outside of the layout range" % (p[0], p[1])) if pin is None: raise ValueError( "In valid position (%d, %d), no electrode is present at this location" % (p[0], p[1])) return pin
[docs] def get_grid_location(self, pin: int) -> Optional[Tuple[Tuple, int]]: """Get the grid location for a pin number Returns: ((x, y), grid_idx) if the pin is located, or None if the pin is not found in the grid definition """ for grid_idx, g in enumerate(self.grids()): for y, row in enumerate(g): for x, electrode in enumerate(row): if electrode == pin: return ((x, y), grid_idx) return None
def get_reservoir(self, id: int) -> reservoir.ReservoirDriver: layout = self.layout() if 'peripherals' not in layout: raise ValueError("Board definition has no reservoirs") for definition in layout['peripherals']: if definition.get('class') == 'reservoir' and definition.get('id') == id: return reservoir.create_driver(definition, self) raise ValueError(f"No reservor found for id={id}")
[docs] def move_drop(self, start: Sequence[int], size: Sequence[int], dir: str) -> dict: """Executes a device controlled drop movement sequence DEPECRATED: This method has been replaced buy the `move_drops` method, and may be removed in the future. Consider using the new method instead. Args: start : The x,y location of the drop initial position (e.g. `[2, 3]`) size : The width and height of the drop (e.g. [2, 2]) dir : One of ['up', 'down', 'left', 'right'] Returns: A dict containing the results of the drop movement, including a success flag and a time series of capacitance data captured during the move. The return dict is of the form: { "success": bool, "closed_loop": bool, "closed_loop_result": { "pre_capacitance": float, "post_capacitance": float, "time_series": List[float], "capacitance_series": List[float] } } closed_loop_result is only present when "closed_loop" is true. It will always be true for devices that support capacitance sensing, but is present to allow devices without sensing to implement an open-loop `move_drop` function. """ return self.client.move_drop(start, size, dir)
[docs] def move_drops(self, move_commands: Sequence[Union[Dict, MoveCommand]]) -> List[dict]: """Executes a device controlled move of drops This command supports movement of up to 5 drops at a time -- limited by the number of capacitance groups supported on the PurpleDrop -- using capacitance feedback to determine when a move is completed. Args: move_commands: A list of MoveCommand objects, one for each drop movement. Alternatively, a dict with the proper fields is accepted. Returns: A list of dict obections containing the results of each move command, including a success flag, and a time series of the capacitance data captured during the move which can be used to measure movement velocity. Each result object is of the form: { "success": bool, "closed_loop": bool, "closed_loop_result": { "pre_capacitance": float, "post_capacitance": float, "time_series": List[float], "capacitance_series": List[float] } } """ args = [m.to_dict() if isinstance(m, MoveCommand) else m for m in move_commands] for arg in args: if not isinstance(arg, dict): raise ValueError(f"Arg {arg} invalid. Move commands must be either MoveCommand object or dict") return self.client.move_drops(args)
[docs] def enable_positions(self, positions): """Enable the specified set of electrodes by grid location positions: List of 2-tuples of (x, y) electrode grid coordinates, e.g. [(0, 0), (0, 1), (1, 0), (1, 1)] """ pins = [self.get_pin(p) for p in positions] self.enable_pins(pins)
[docs] def enable_pins(self, pins: Sequence[int], group_id: int=0, duty_cycle: int=255): """Enable the specified set of electrodes by pin number PurpleDrop supports two drive groups which can be driven independently, with different duty cycles. Adjusting the duty cycle is primarily used for feedback control performed on the device, but can be set remotely via RPC calls. For most use cases, drive group 0 can be used exclusively. But when using feedback control, e.g. for drop splitting, pins for drive group 0 and drive group 1 must be setup prior to enabling feedback control. Unused drive groups should be disabled by setting an empty pin list. Arguments: - pins: List of integers, giving pin numbers to enable - group_id: Drive group index - duty_cycle: On duty cycle to drive (0-255), 255 being max duty cycle, and 0 min """ self.client.set_electrode_pins(pins, group_id, duty_cycle)
[docs] def set_feedback_command(self, target, mode, input_groups_p_mask, input_groups_n_mask, baseline): """Update feedback control settings When enabled, the purpledrop controller will adjust the duty cycle of electrode drive groups based on capacitance measurements. Arguments: - target: The controller target in counts - mode: - 0: Disabled - 1: Normal - 2: Differential - input_groups_p_mask: Bit mask indicating which capacitance groups to sum for positive input (e.g. for groups 0 and 2: 5) - input_groups_n_mask: Bit mask for negative input groups (used in differential mode) - baseline: The duty cycle to apply to both drive groups when no error signal is present (0-255) """ self.client.set_feedback_command(target, mode, input_groups_p_mask, input_groups_n_mask, baseline)
[docs] def active_capacitance(self) -> float: """Get the most recent capacitance for active electrodes """ return self.client.get_active_capacitance()
[docs] def bulk_capacitance(self) -> List[float]: """Get the most recent scan of electrode capacitance This function is deprecated, and may be removed in a future version. Use get_scan_capacitance instead. """ return self.client.get_scan_capacitance()['calibrated']
[docs] def scan_capacitance(self) -> Dict[str, List]: """Get the most recent capacitance scan result Returns: capacitance scan data for all electrodes in the form of a dict containing two lists. { "raw": List[float], "calibrated": List[float] } """ return self.client.get_scan_capacitance()
[docs] def group_capacitance(self) -> Dict[str, List]: """Get the most recent group capacitance measurements Returns: dict containins the raw and calibrated values for all capacitance groups. Example: { "raw": [10, 11, 400, 10, 9], "calibrated": [0.0, 0.0, 2.9, 0.0, 0.0], } """ return self.client.get_group_capacitance()
[docs] def set_capacitance_group(self, pins: Sequence[int], group_id: int, setting: int): """Set configuration for a capacitance group Purpledrop support 5 different group scans. Each group defines a set of electrodes which are measured together after each AC drive cycle. To disable a group, set the pins to an empty list. Arguments: - pins: A list of pins included in the group (may be empty to disable the group) - group_id: The group number to set (0-4) - setting: 0 - high gain, 1 - low gain """ self.client.set_capacitance_group(pins, group_id, setting)
[docs] def temperatures(self) -> List[float]: """Get the most recent temperature measurements Returns a list of temperatures (floats) in degC The length of the return value depends on device configuraiton, and may be zero. """ return self.client.get_temperatures()
[docs] def set_pwm_duty_cycle(self, chan: int, duty_cycle: float): """Set duty cycle on a PWM output channel chan is an integer specifying which PWM channel to change duty_cycle is float in range 0.0 to 1.0. """ return self.client.set_pwm_duty_cycle(chan, duty_cycle)
[docs] def hv_supply_voltage(self) -> float: """Get the latest voltage measurement for the high voltage supply rail Returns a float, in volts. """ return self.client.get_hv_supply_voltage()
[docs] def parameter_definitions(self) -> dict: """Get the list of all parameters which can be set in the firmware """ return self.client.get_parameter_definitions()
[docs] def parameter(self, id: int ) -> Union[bool, int, float]: """Get the value of a particular parameter """ return self.client.get_parameter(id)
[docs] def set_parameter(self, id: int, value: Union[int, float]): """Set the value of a particular parameter """ self.client.set_parameter(id, value)
[docs] def calibrate_capacitance_offset(self): """Trigger the re-calibration of the capacitance sense amplifier offset Note: This calibrates for the zero-input integrator ramp rate. It is not a per-electrode offset calibration. """ self.client.calibrate_capacitance_offset()