Examples

The following examples are also available in the examples directory.

Load and save data

Shows how to load/save methods and measurements and how to inspect the data.

load_save_data.py
from pathlib import Path

import pandas as pd

import pypalmsens

script_dir = Path(__file__).parent

# load a method file
method = pypalmsens.load_method_file(script_dir / 'PSDummyCell_LSV.psmethod', as_method=True)

# save the method file
pypalmsens.save_method_file(script_dir / 'PSDummyCell_LSV_copy.psmethod', method)

# load a session file
measurements = pypalmsens.load_session_file(
    script_dir / 'Demo CV DPV EIS IS-C electrode.pssession'
)

for measurement in measurements:
    print(f'loaded measurement: {measurement.title}, {measurement.timestamp}')
    print(f'number of curves: {len(measurement.curves)}')
    for curve in measurement.curves:
        print(f'curve title: {curve.title}')
        print(f'number of points: {len(curve.x_array)}')
        print(f'number of peaks: {len(curve.peaks)}')
    print(f'Has EIS fit results: {"yes" if len(measurement.eis_fit) > 0 else "no"}')

# save the session file
pypalmsens.save_session_file(
    script_dir / 'Demo CV DPV EIS IS-C electrode_copy.pssession', [measurements[0]]
)

# convert measurments to pandas dataframes
frames = []
frame_names = []

for measurement in measurements:
    dataset = measurement.dataset

    frames.append(dataset.to_dataframe())
    frame_names.append(measurement.title)

df = pd.concat(frames, keys=frame_names)
print(df)

Manual control

Shows how to discover devices, establish a connection and control an instrument manually.

manual_control.py
import pypalmsens

available_instruments = pypalmsens.discover()
print('connecting to ' + available_instruments[0].name)

with pypalmsens.connect(available_instruments[0]) as manager:
    print('connection established')

    manager.set_cell(True)
    print('cell enabled')

    manager.set_potential(1)
    print('set potential to 1V')

    manager.set_current_range(pypalmsens.settings.CURRENT_RANGE.cr_1_mA)
    print('set cell to to 1mA currrent range')

    current = manager.read_current()
    print('current = ' + str(current) + ' µA')

    manager.set_cell(False)
    print('cell disabled')

Manual control async

Shows how to discover devices, establish a connection and control an instrument manually using the asynchronous instrument manager.

manual_control_async.py
import asyncio

import pypalmsens


async def main():
    available_instruments = await pypalmsens.discover_async()
    print(f'connecting to {available_instruments[0].name}')

    async with await pypalmsens.connect_async(available_instruments[0]) as manager:
        print('connection established')

        await manager.set_cell(True)
        print('cell enabled')

        await manager.set_potential(1)
        print('set potential to 1V')

        await manager.set_current_range(pypalmsens.settings.CURRENT_RANGE.cr_1_mA)
        print('set cell to to 1mA currrent range')

        current = await manager.read_current()
        print(f'current = {current} µA')

        await manager.set_cell(False)
        print('cell disabled')


asyncio.run(main())

Measure CA

Shows how to set up and run a chronoamperometry measurement.

measurement_CA.py
import pypalmsens


def new_data_callback(new_data):
    for point in new_data:
        print(point)


available_instruments = pypalmsens.discover()
print('connecting to ' + available_instruments[0].name)

with pypalmsens.connect(available_instruments[0]) as manager:
    print('connection established')

    manager.callback = new_data_callback

    serial = manager.get_instrument_serial()
    print(serial)

    # Chronoamperometry measurement using helper class
    method = pypalmsens.ChronoAmperometry(
        interval_time=0.01,
        potential=1.0,
        run_time=10.0,
    )

    measurement = manager.measure(method)

    if measurement is not None:
        print('measurement finished')
    else:
        print('failed to start measurement')

Measure CA async

Shows how to set up and run a chronoamperometry measurement using the asynchronous instrument manager.

measurement_CA_async.py
import asyncio

import pypalmsens


def new_data_callback(new_data):
    for point in new_data:
        print(point)


async def main():
    available_instruments = await pypalmsens.discover_async()
    print('connecting to ' + available_instruments[0].name)

    async with await pypalmsens.connect_async(available_instruments[0]) as manager:
        print('connection established')
        manager.callback = new_data_callback

        serial = await manager.get_instrument_serial()
        print(serial)

        # Chronoamperometry measurement using helper class
        method = pypalmsens.ChronoAmperometry(
            interval_time=0.02,
            potential=1.0,
            run_time=2.0,
        )

        measurement = await manager.measure(method)

        if measurement is not None:
            print('measurement finished')
        else:
            print('failed to start measurement')


asyncio.run(main())

Measure CV

Shows how to set up and run a cyclic voltammetry measurement.

measurement_CV.py
import pypalmsens


def new_data_callback(new_data):
    for point in new_data:
        print(point)


available_instruments = pypalmsens.discover()
print('connecting to ' + available_instruments[0].name)

with pypalmsens.connect(available_instruments[0]) as manager:
    print('connection established')

    manager.callback = new_data_callback

    serial = manager.get_instrument_serial()
    print(serial)

    method = pypalmsens.CyclicVoltammetry(
        current_range=pypalmsens.settings.CurrentRange(
            max=pypalmsens.settings.CURRENT_RANGE.cr_1_A,  # 1 A range
            min=pypalmsens.settings.CURRENT_RANGE.cr_1_uA,  # 1 µA range
            start=pypalmsens.settings.CURRENT_RANGE.cr_1_mA,  # 1 mA range
        ),
        equilibration_time=2,  # seconds
        begin_potential=-2,  # V
        vertex1_potential=-2,  # V
        vertex2_potential=3,  # V
        step_potential=0.05,  # V
        scanrate=5,  # V/s
        n_scans=3,  # number of scans
    )

    measurement = manager.measure(method)

    if measurement is not None:
        print('measurement finished')
    else:
        print('failed to start measurement')

Measure EIS

Shows how to set up and run a EIS measurement.

measurement_EIS.py
import pypalmsens


def new_data_callback(new_data):
    for point in new_data:
        print(point)


available_instruments = pypalmsens.discover()
print('connecting to ' + available_instruments[0].name)

with pypalmsens.connect(available_instruments[0]) as manager:
    print('connection established')

    manager.callback = new_data_callback

    serial = manager.get_instrument_serial()
    print(serial)

    # EIS measurement using helper class
    method = pypalmsens.ElectrochemicalImpedanceSpectroscopy()

    measurement = manager.measure(method)

    if measurement is not None:
        print('measurement finished')
    else:
        print('failed to start measurement')

MethodSCRIPT sandbox

Shows how to set up and run a MethodSCRIPT Sandbox measurement.

measurement_MethodSCRIPT_sandbox.py
import pypalmsens


def new_data_callback(new_data):
    for point in new_data:
        print(point)


script = """e
var c
var p
var e
var l
var r
var j
var o
var d
var n
set_gpio_cfg 0x0f 1
set_pgstat_chan 0
set_pgstat_mode 3
set_acquisition_frac_autoadjust 50
set_max_bandwidth 0
cell_off
set_range ba 210m
set_autoranging ba 210m 210m
set_range ab 4200m
set_autoranging ab 210m 4200m
meas_loop_ocp o 200m 1
pck_start
    pck_add o
pck_end
endloop
set_range ba 2100u
set_autoranging ba 2100n 2100u
set_range ab 4200m
set_autoranging ab 4200m 4200m
store_var d -500m ab
add_var d o
store_var n 500m ab
add_var n o
set_e d
cell_on
set_gpio 10i
meas_loop_acv p c e l r j d n 10m 50m 10m 100
pck_start
    pck_add p
    pck_add c
    pck_add e
    pck_add l
    pck_add r
    pck_add j
pck_end
endloop
on_finished:
cell_off
"""

available_instruments = pypalmsens.discover()
print('connecting to ' + available_instruments[0].name)

with pypalmsens.connect(available_instruments[0]) as manager:
    print('connection established')

    manager.callback = new_data_callback

    serial = manager.get_instrument_serial()
    print(serial)

    method = pypalmsens.MethodScript(script=script)

    measurement = manager.measure(method)

    if measurement is not None:
        print('measurement finished')
    else:
        print('failed to start measurement')

Stream data to CSV

Shows how to set up and run a chronoamperometry measurement and write the results to a CSV file in real-time.

measurement_stream_to_csv.py
import csv

import pypalmsens


def stream_to_csv_callback(new_data):
    for point in new_data:
        csv_writer.writerow([point['index'], point['x'], point['y']])
        # csv_writer.writerow([point['frequency'], point['zre'], point['zim']]) #for EIS


csv_file = open('test.csv', 'w', newline='')
csv_writer = csv.writer(csv_file, delimiter=' ')

available_instruments = pypalmsens.discover()
print('connecting to ' + available_instruments[0].name)

with pypalmsens.connect(available_instruments[0]) as manager:
    manager.callback = stream_to_csv_callback

    print('connection established')

    serial = manager.get_instrument_serial()
    print(serial)

    # #Chronoamperometry measurement using helper class
    method = pypalmsens.ChronoAmperometry(
        interval_time=0.0004,
        potential=1.0,
        run_time=10.0,
    )

    measurement = manager.measure(method)
    if measurement is not None:
        print('measurement finished')
    else:
        print('failed to start measurement')

    csv_file.close()

SWV versus OCP

Shows how to set up and run a square wave voltammetry measurement versus OCP.

measurement_SWV_vs_OCP.py
import pypalmsens


def new_data_callback(new_data):
    for point in new_data:
        print(point)


available_instruments = pypalmsens.discover()

with pypalmsens.connect(available_instruments[0]) as manager:
    print('connection established')

    manager.callback = new_data_callback

    method = pypalmsens.SquareWaveVoltammetry(
        pretreatment=pypalmsens.settings.Pretreatment(
            conditioning_potential=2.0,  # V
            conditioning_time=2,  # seconds
        ),
        versus_ocp=pypalmsens.settings.VersusOCP(
            mode=3,  # versus begin and end potential
            max_ocp_time=1,  # seconds
        ),
        begin_potential=-0.5,  # V
        end_potential=0.5,  # V
        step_potential=0.01,  # V
        amplitude=0.08,  # V
        frequency=10,  # Hz
    )

    method.frequency = 50

    measurement = manager.measure(method)

    print(f'ocp: {measurement.curves[0]._pscurve.OCPValue}')

Multiplexer

Shows how to set up and control a multiplexer and run consecutive and alternating multiplexer measurments.

multiplexer.py
import pypalmsens


def new_data_callback(new_data):
    for point in new_data:
        print(point)


available_instruments = pypalmsens.discover()
print(f'connecting to {available_instruments[0].name}')

with pypalmsens.connect(available_instruments[0]) as manager:
    manager.callback = new_data_callback

    print('connection established')

    n_multiplexer_channels = manager.initialize_multiplexer(2)
    manager.set_mux8r2_settings()

    for channel in range(n_multiplexer_channels):
        manager.set_multiplexer_channel(channel)

    # When measuring alternatingly the selection is restricted to the first n channels
    altnernating_multiplexer_method = pypalmsens.ChronoAmperometry(
        interval_time=0.5,  # seconds
        potential=1.0,  # volts
        run_time=5.0,  # seconds
        multiplexer=pypalmsens.settings.Multiplexer(
            mode='alternate',  # 'none', 'consecutive', 'alternate'
            # 8 channels, 1 and 2 are enabled
            channels=[1, 2, 8],
            connect_sense_to_working_electrode=False,
            combine_reference_and_counter_electrodes=False,
            # use the reference and counter electrodes of channel 1 for all channels
            use_channel_1_reference_and_counter_electrodes=False,
            # working electrode of the unselected channels are disconnected/floating
            set_unselected_channel_working_electrode=0,
        ),
    )
    measurement = manager.measure(altnernating_multiplexer_method)

    if measurement is not None:
        print('measurement finished')
    else:
        print('failed to start measurement')

    consecutive_multiplexer_method = pypalmsens.SquareWaveVoltammetry(
        begin_potential=-0.5,  # volts
        end_potential=0.5,  # volts
        step_potential=0.01,  # volts
        amplitude=0.1,  # volts
        frequency=10,  # hertz
        multiplexer=pypalmsens.settings.Multiplexer(
            mode='consecutive',  # 'none', 'consecutive', 'alternate'
            # 8 channels, 1, 2, 7 and 8 are enabled
            channels=[1, 2, 7, 8],
            connect_sense_to_working_electrode=False,
            combine_reference_and_counter_electrodes=False,
            # use the reference and counter electrodes of channel 1 for all channels
            use_channel_1_reference_and_counter_electrodes=False,
            # working electrode of the unselected channels are disconnected/floating
            set_unselected_channel_working_electrode=0,
        ),
    )

    measurement = manager.measure(consecutive_multiplexer_method)

    if measurement is not None:
        print('measurement finished')
    else:
        print('failed to start measurement')

Multichannel

Shows how to connect to a collection of instruments and run a chronoamperometry measurement on all channels simultaneously.

multichannel_measurement.py
import asyncio
import csv

import pypalmsens


def stream_to_csv_callback(csv_writer):
    def stream_to_csv_callback_channel(new_data):
        for point in new_data:
            csv_writer.writerow([point['index'], point['x'], point['y']])

    return lambda data: stream_to_csv_callback_channel(data)


async def main():
    available_instruments = await pypalmsens.discover_async()
    managers = {}

    # create an instance of the instrumentmanager per channel
    async def connect(instrument, index):
        managers[index] = pypalmsens.InstrumentManagerAsync(
            instrument,
            # callback=new_data_callback(index)
        )

        success = await managers[index].connect()
        if success:
            print(f'{index + 1}: connected to {instrument.name}')
        else:
            print(f'{index + 1}: error while connecting to {instrument.name}')
        return success

    tasks = [connect(instrument, i) for (i, instrument) in enumerate(available_instruments)]
    connected = await asyncio.gather(*tasks)

    method = pypalmsens.ChronoAmperometry(
        interval_time=0.0004,
        potential=1.0,
        run_time=5.0,
    )

    if all(connected):

        async def measure(manager, channel):
            csv_file = open(f'test{channel + 1}.csv', 'w', newline='')
            csv_writer = csv.writer(csv_file, delimiter=' ')
            manager.callback = stream_to_csv_callback(csv_writer)
            measurement = await manager.measure(method)
            csv_file.close()
            return measurement

        # start measurements asynchronously
        tasks = [measure(manager, channel) for (channel, manager) in managers.items()]
        _ = await asyncio.gather(*tasks)  # use gather to await results
        print('measurement(s) finished')

        async def disconnect(instrument_manager, channel):
            success = await instrument_manager.disconnect()
            if success:
                print(f'channel {channel + 1}: disconnected')
            else:
                print(f'channel {channel + 1}: error while disconnecting')
            return success

        tasks = [disconnect(manager, channel) for (channel, manager) in managers.items()]
        await asyncio.gather(*tasks)


asyncio.run(main())

Multichannel custom loop

Shows how to run and set up a sequence of measurements on a collection of channels simultaneously.

multichannel_measurement_custom_loop.py
import asyncio
from attrs import evolve
import pypalmsens


def new_data_callback(channel):
    def print_results(new_data):
        for point in new_data:
            print(f'channel {channel + 1}: {point}')

    return lambda x: print_results(x)


async def run_steps(manager, channel, steps):
    # Create a new method, a separate method is required for each channel
    method = pypalmsens.ChronoPotentiometry(
        potential_range=pypalmsens.settings.PotentialRange(
            max=pypalmsens.settings.POTENTIAL_RANGE.pr_1_V,  # 1V range
            min=pypalmsens.settings.POTENTIAL_RANGE.pr_10_mV,  # 10mV range
            start=pypalmsens.settings.POTENTIAL_RANGE.pr_1_V,  # 1V range
        ),
        applied_current_range=pypalmsens.settings.CURRENT_RANGE.cr_10_uA,  # 10µA range
        current=0.5,  # applied current in range, i.e. 5µA when the 10µA range is set as the applied range
        interval_time=0.05,  # seconds
        run_time=5,  # seconds
    )
    measurements = []

    for step in steps:
        method = evolve(method, **step)
        measurements.append(await manager.measure(method))

    return measurements


async def main():
    # Create a list of of parameters you want to change
    steps = [
        {
            'current': 0.1,
            'potential_limits': pypalmsens.settings.PotentialLimits(
                limit_max=2, use_limit_min=False
            ),
        },
        {
            'current': -0.2,
            'potential_limits': pypalmsens.settings.PotentialLimits(
                limit_max=-0.5, use_limit_max=False
            ),
        },
        {
            'current': 2,
            'potential_limits': pypalmsens.settings.PotentialLimits(limit_min=2, limit_max=2),
        },
    ]

    available_instruments = await pypalmsens.discover_async()
    managers = {}

    # create an instance of the instrumentmanager per channel
    async def connect(instrument, index):
        managers[index] = pypalmsens.InstrumentManagerAsync(
            instrument, callback=new_data_callback(index)
        )
        success = await managers[index].connect()
        if success:
            print(f'{index + 1}: connected to {instrument.name}')
        else:
            print(f'{index + 1}: error while connecting to {instrument.name}')
        return success

    tasks = [connect(instrument, i) for (i, instrument) in enumerate(available_instruments)]
    connected = await asyncio.gather(*tasks)

    if all(connected):
        # start measurements asynchronously
        tasks = [run_steps(manager, channel, steps) for (channel, manager) in managers.items()]
        channels = await asyncio.gather(*tasks)  # use gather to await results

        for measurements in channels:
            pypalmsens.save_session_file('example.pssession', measurements)

        for channel, manager in managers.items():
            success = manager.disconnect()
            if success:
                print(f'channel {channel + 1}: disconnected')
            else:
                print(f'channel {channel + 1}: error while disconnecting')


asyncio.run(main())

Multichannel HW sync

Shows how to connect to a collection of instruments and run a chronopotentiometry measurement on all channels simultaneously using hardware synchronization.

multichannel_HW_sync.py
import asyncio

import pypalmsens


def new_data_callback(channel):
    def print_results(new_data):
        for point in new_data:
            print(f'channel {channel}: {point}')

    return lambda x: print_results(x)


async def main():
    available_instruments = await pypalmsens.discover_async()
    managers = {}

    # create an instance of the instrumentmanager per channel
    async def connect(instrument, index):
        managers[index] = pypalmsens.InstrumentManagerAsync(
            instrument, callback=new_data_callback(index)
        )
        success = await managers[index].connect()
        if success:
            print(f'{index + 1}: connected to {instrument.name}')
        else:
            print(f'{index + 1}: error while connecting to {instrument.name}')
        return success

    tasks = [connect(instrument, i) for (i, instrument) in enumerate(available_instruments)]
    connected = await asyncio.gather(*tasks)
    follower_channels = [manager for (channel, manager) in managers.items() if channel != 1]

    if all(connected) and 1 in managers:
        method = pypalmsens.ChronoPotentiometry(
            potential_range=pypalmsens.settings.PotentialRange(
                max=pypalmsens.settings.POTENTIAL_RANGE.pr_1_V,  # 1V range
                min=pypalmsens.settings.POTENTIAL_RANGE.pr_10_mV,  # 10mV range
                start=pypalmsens.settings.POTENTIAL_RANGE.pr_1_V,  # 1V range
            ),
            applied_current_range=pypalmsens.settings.CURRENT_RANGE.cr_10_uA,  # 10µA range
            current=0.5,  # applied current in range, i.e. 5µA when the 10µA range is set as the applied range
            interval_time=0.05,  # seconds
            run_time=5,  # seconds
            general=pypalmsens.settings.General(
                use_hardware_sync=True,  # enable hw sync
            ),
        )

        # start measurements asynchronously on follower channels
        follower_channels_initiated = []
        follower_channels_measurement_results = []

        for manager in follower_channels:
            initiated, result = manager.initiate_hardware_sync_follower_channel(method)
            follower_channels_initiated.append(initiated)
            follower_channels_measurement_results.append(result)

        await asyncio.gather(*follower_channels_initiated)

        # start the measurement on primary channel
        follower_channels_measurement_results.append(managers[1].measure(method))
        # use gather to await results
        measurements = await asyncio.gather(*follower_channels_measurement_results)
        print(f'Collected {len(measurements)} measurements')

        for channel, manager in managers.items():
            success = await manager.disconnect()
            if success:
                print(f'channel {channel}: disconnected')
            else:
                print(f'channel {channel}: error while disconnecting')


asyncio.run(main())