Control Sequences in Python

Write a control sequence for pressurizing a tank.

One of the most powerful tools Synnax has to offer is the ability to easily write control sequences in Python. These control sequences can be run from a command line to run both simple and complex sequences. In this guide, we will walk you through writing a control sequence to pressurize and vent a pressure vessel.

Prerequisites

Before writing a control sequence, you should have done the following:

Running a Simulated Data Acquisition System

For the purpose of this guide, we will use a simulated data acquisition system (DAQ) to represent the hardware. You can view the code for this DAQ on Synnax’s GitHub repository. This DAQ simulates a pressure vessel that can be filled and vented using several valves. The DAQ has the following sensors and actuators:

ChannelDescription
daq_timeThe current timestamps of the DAQ. This is an index channel for the sensors on the DAQ.
sensor_#The channels representing pressure sensors on the pressure vessel.
valve_command_#The channels used to write data to (open/close) a valve.
valve_response_#The channels used to read data from a valve (see if a valve is open or closed).
valve_command_time_#The index channels used for commanding a valve. Valves need separate index channels than sensors in case the valves are commanded at different times.

We will be using the following sensors and valves in our control sequence:

ChannelPurpose
sensor_0The current pressure in the pressure vessel.
valve_0The valve used to pressurize the pressure vessel.
valve_1The valve used to vent the pressure vessel.

Writing the Control Sequence

At the top of your script, make sure to import the Synnax library:

import synnax as sy

The next step is to connect to a Synnax cluster. This can be done by logging in with through the CLI or by passing the credentials into your script.

After that, we will want to instantiate the Synnax client so we can interact with the cluster:

client = sy.Synnax()

We are also going to create channel names to use in our control sequence. These names were created by the simulated DAQ when the respective channels were created.

PRESS_VALVE = "valve_command_0"
VENT_VALVE = "valve_command_1"
PRESSURE = "sensor_0"

Next, you’ll need to open up a Controller. A Controller is used to control hardware by extending a framework for writing data to channels. Controllers have methods such as get, set, and wait_until that make it easy to interact with channels in a control sequence. We recommend using a context manager to ensure that the Controller is properly closed when the script is finished.

with client.control.acquire(
    name="Pressurization Sequence",
    write_authorities=[200],
    write=[PRESS_VALVE, VENT_VALVE],
    read=[PRESSURE],
) as controller:

Write authorities determine which writer is allowed to write to a given channel. Every writer in Synnax has an authority between 0 and 255. If two writers are open on the same channel, the writer with the higher authority will be able to write but the other one will not.

Next, lets grab the start timestamp from the control sequence and set a target pressure:

start = sy.TimeStamp.now()

target_pressure = 20 # psi

We should also make sure the vent valve is closed by setting a False value for its channel on the controller:

controller[VENT_VALVE] = False

Finally, we can open the pressurization valve and wait to reach the target pressure before closing the valve. We will use the wait_until method on the controller to accomplish this. The wait_until method will wait until the first argument is met or a certain time has elapsed. If we want to wait to stop pressurizing the tank after a certain time has passed even if we haven’t reached out target pressure, we can use the timeout argument:

controller[PRESS_VALVE] = True
controller.wait_until(
    lambda c: c[PRESSURE] > target_pressure,
    timeout = 20 * sy.TimeSpan.Second,
)
controller[PRESS_VALVE] = False

Lambda functions are short, anonymous functions in Python. This lambda function can be equivalent to writing a function like this:

def if_pressure_reached(controller):
    return controller[PRESSURE] > target_pressure

and then calling it in our code like this:

controller.wait_until(if_pressure_reached, timeout = 20 * sy.TimeSpan.Second)

For more information on lambda functions, you can visit our reference page on controllers.

We can then depressurize the tank by opening the vent valve:

controller[VENT_VALVE] = True
controller.wait_until(
    lambda c: c[PRESSURE] < 5,
    timeout = 20 * sy.TimeSpan.Second,
)
controller[VENT_VALVE] = False

After this, we will create a range to represent the test. We can later use this range to easily look up this test later:

client.ranges.create(
    name="Pressurization Test",
    time_range=sy.TimeRange(start=start, end=sy.TimeStamp.now()),
)

This range is now saved and can be referenced later to view this test.

The full file can be viewed below:

control_sequence.py
import synnax as sy
import time

client = sy.Synnax(
    host="localhost",
    port=9090,
    username="synnax",
    password="seldon",
    secure=False,
)

PRESS_VALVE = "valve_command_0"
VENT_VALVE = "valve_command_1"
PRESSURE = "sensor_0"

with client.control.acquire(
    name="Pressurization Sequence",
    write_authorities=[200],
    write=[PRESS_VALVE, VENT_VALVE],
    read=[PRESSURE],
) as controller:
    start = sy.TimeStamp.now()

    target_pressure = 20 # psi

    controller[VENT_VALVE] = False

    controller[PRESS_VALVE] = True
    controller.wait_until(
        lambda c: c[PRESSURE] > target_pressure,
        timeout = 20 * sy.TimeSpan.Second,
    )
    controller[PRESS_VALVE] = False

    time.sleep(3)

    controller[VENT_VALVE] = True
    controller.wait_until(
        lambda c: c[PRESSURE] < 5,
        timeout = 20 * sy.TimeSpan.Second,
    )
    controller[VENT_VALVE] = False

    client.ranges.create(
        name="Pressurization Test",
        time_range=sy.TimeRange(start=start, end=sy.TimeStamp.now()),
    )

Running the Control Sequence

The first step to running the control sequence is to start a local Synnax cluster. After installing the server you should run the following command:

synnax start --insecure --memory

Next, you will want to run the simulated DAQ. You can do this by downloading the Python script from the Synnax GitHub repository. After downloading the script, run the following command:

python simulated_daq.py

Python can be tricky to get installed and running properly. If you are having troubles, please visit our Python troubleshooting guide.

Now its, time to open the Synnax Console to set up two visualizations of the tank. After opening the console, you can connect to the cluster using the same connection parameters in the Python script.

Now, we will setup a schematic of our tank and valves:

In the schematic, make sure there are the following components:

SymbolChannelsPurpose
ValueInput Channel: sensor_0The current pressure in the tank.
ValveState Channel: valve_response_0
Command Channel: valve_command_0
The valve used to pressurize the tank.
ValveState Channel: valve_response_1
Command Channel: valve_command_1
The valve used to vent the tank.

Once the schematic is set up, you can enter control mode to manually control the valves.

Now, you can set up a line plot to view a plot of the pressure in the tank:

Once you finished setting up the line plot, your console should look like this:

Now, it’s time to run your control sequence. You can do this by running the Python script from earlier:

python control_sequence.py

You should see this result on the console:

Conclusion

If you want to look at more examples of control sequences, visit our GitHub repository. You can find more informations about there about functions used in our Python client.