Client#

This client is configurated to match the Dynamic server example.

Import#

Import required module for the example.

import time
from py61850.client import (
    ControlObject,
    IedConnection,
    IedConnectionState,
    ReasonForInclusion,
    Report,
)
from py61850.common import (
    ACSIClass,
    ControlModel,
    FunctionalConstraint,
    MmsValue,
    ReportOptions,
    ReportTriggerOptions,
)

Callback#

Create callback for connection status.

def on_connection_state_change(connection: IedConnection, state: IedConnectionState):
    """Callback triggered when the collection state is changed

    Parameters
    ----------
    connection : IedConnection
        _description_
    state : IedConnectionState
        New state
    """
    print(f"Connection status is {state}")
def on_connection_closed(connection: IedConnection):
    """Callback when the connection is closed

    Parameters
    ----------
    connection : IedConnection
        _description_
    """
    print("Connection is closed")
def on_termination(obj: ControlObject):
    print("Action is done")

Create callback to be triggered when a new report is received

def on_report(rpt: Report):
    """Call back triggered when a new report is received

    Parameters
    ----------
    rpt : Report
        _description_
    """
    print("Report received")
    print(f"    rpt.buf_ovfl: {rpt.buf_ovfl}")
    print(f"    rpt.conf_rev: {rpt.conf_rev}")
    print(f"    rpt.dataset_name: {rpt.dataset_name}")
    print(
        f"    rpt.entry_id: {tmp if (tmp:=rpt.entry_id) is None else ' '.join(format(val, '02x') for val in tmp)}"
    )
    print(f"    rpt.more_seqments_follow: {rpt.more_seqments_follow}")
    print(f"    rpt.rcb_reference: {rpt.rcb_reference}")
    print(f"    rpt.rpt_id: {rpt.rpt_id}")
    print(f"    rpt.seq_num: {rpt.seq_num}")
    print(f"    rpt.sub_seq_num: {rpt.sub_seq_num}")
    print(f"    rpt.timestamp: {rpt.timestamp}")

    print("    content of the report:")
    values = rpt.dataset_values
    for i in range(values.size()):
        if (
            rpt.has_reason_for_inclusion
            and rpt.get_reason_for_inclusion(i) == ReasonForInclusion.NOT_INCLUDED
        ):
            print(f"        item: {i} not included")
            continue

        print(f"        item: {i}")
        if rpt.has_data_reference:
            print(f"            {rpt.get_data_reference(i)}")
        print(f"            {rpt.get_reason_for_inclusion(i)}")
        print(f"            {values.get_element(i)}")

Connection#

Create the connection

ied_connection = IedConnection()
hostname = "127.0.0.1"
port = 102
ied_connection.on_connection_state_change(on_connection_state_change)
ied_connection.on_connection_closed(on_connection_closed)
ied_connection.connect(hostname, port)

Browsing data#

To get the list of logical devices and logical nodes, you can use the following functions:

Read logical devices and logical nodes

logical_devices = ied_connection.get_logical_devices()
for ld in logical_devices:
    print(f"LD: {ld}")
    for ln in ied_connection.get_logical_nodes(ld):
        print(f"  LN: {ln}")

To get the content of logical nodes, you can use the following functions:

Read all content of a logical node

print("Content of IEDNameLD0/LLN0:")
for obj in ied_connection.get_logical_node_variables("IEDNameLD0/LLN0"):
    print(f"  {obj}")

Read only some class of a logical node

for obj in ied_connection.get_logical_node_directory("IEDNameLD0/LLN0", ACSIClass.DATA_OBJECT):
    print(f"  DO: {obj}")
for obj in ied_connection.get_logical_node_directory("IEDNameLD0/LLN0", ACSIClass.DATA_SET):
    print(f"  Dataset: {obj}")

To get the content of datobject or complexe data attribute, you can use the following functions:

Read data attribute of a reference

for obj in ied_connection.get_data_directory("IEDNameLD0/LLN0.Mod.Oper.origin"):
    print(f"  {obj}")

Read data attribute of a reference and include the functionnal constraint in the result

for obj in ied_connection.get_data_directory_fc("IEDNameLD0/LLN0.Mod"):
    print(f"  {obj}")

Read data attribute with a specific functionnal constraint of a reference

for obj in ied_connection.get_data_directory_by_fc("IEDNameLD0/LLN0.Mod", FunctionalConstraint.ST):
    print(f"  {obj}")

To read and write values, you can use the following functions:

val = ied_connection.read_float("IEDNameLD0/PTOC1.StrVal.setMag.f", FunctionalConstraint.SP)
print(f"Value of IEDNameLD0/PTOC1.StrVal.setMag.f: {val}")
ied_connection.write_float("IEDNameLD0/PTOC1.StrVal.setMag.f", FunctionalConstraint.SP, val + 1.0)
val = ied_connection.read_float("IEDNameLD0/PTOC1.StrVal.setMag.f", FunctionalConstraint.SP)
print(f"Value of IEDNameLD0/PTOC1.StrVal.setMag.f: {val}")

Setting group control block#

There is no specific function for setting group control block. You should use the read and write function.

act_sg = ied_connection.read_uint32("IEDNameLD0/LLN0.SGCB.ActSG", FunctionalConstraint.SP)
num_of_sgs = ied_connection.read_uint32("IEDNameLD0/LLN0.SGCB.NumOfSG", FunctionalConstraint.SP)
print(f"Current setting group is {act_sg}/{num_of_sgs}")

Control#

To send control command to the server, you first need to read the control object. Then you can configure it with the following functions:

When the control mode is DIRECT_ENHANCED or SBO_ENHANCED, you can get the feedback when command has been processed.

Depending on the control mode, you can perform operation with one or 2 of the following functions:

ctrl = ied_connection.read_control("IEDNameLD0/LLN0.Mod")
ctrl.on_termination(on_termination)  # only used in ENHANCED
control_model = ctrl.control_model
value = 1
if control_model == ControlModel.DIRECT_NORMAL:
    if ctrl.operate(MmsValue.new_int8(value)):
        print("Operate command succeed")
    else:
        error = ctrl.get_last_appl_error()
        print(f"Operate command fail {error.add_cause}")
elif control_model == ControlModel.DIRECT_ENHANCED:
    if ctrl.operate(MmsValue.new_int8(value)):
        print("Operate command succeed")
    else:
        error = ctrl.get_last_appl_error()
        print(f"Operate command fail {error.add_cause}")
elif control_model == ControlModel.SBO_NORMAL:
    if ctrl.select():
        print("Select command succeed")
    else:
        print("Select command fail")
    if ctrl.operate(MmsValue.new_int8(value)):
        print("Operate command succeed")
    else:
        error = ctrl.get_last_appl_error()
        print(f"Operate command fail {error.add_cause}")
elif control_model == ControlModel.SBO_ENHANCED:
    if ctrl.select_with_value(MmsValue.new_int8(value)):
        print("Select command succeed")
    else:
        print("Select command fail")
    if ctrl.operate(MmsValue.new_int8(value)):
        print("Operate command succeed")
    else:
        error = ctrl.get_last_appl_error()
        print(f"Operate command fail {error.add_cause}")

Dataset#

dataset = ied_connection.read_dataset("IEDNameLD0/LLN0.DsRpt")
print(ied_connection.get_dataset_directory(dataset.reference))
print(dataset.values)

Report control block#

Create a report control block and subscribe it.

urcba = ied_connection.read_rcb("IEDNameLD0/LLN0.RP.URCBA")
urcba.on_report(on_report)
urcba.subscribe(
    trg_ops=ReportTriggerOptions.INTEGRITY,
    intg_pd=5000,
)
brcba = ied_connection.read_rcb("IEDNameLD0/LLN0.BR.BRCBA")
brcba.on_report(on_report)
brcba.subscribe(
    trg_ops=ReportTriggerOptions.DATA_CHANGED,  # | ReportTriggerOptions.INTEGRITY,
    intg_pd=5000,
    opt_flds=ReportOptions.ENTRY_ID
    | ReportOptions.TIME_STAMP
    | ReportOptions.REASON_FOR_INCLUSION,
)
print(brcba.dataset_reference)

Files#

To manage file on the ied server, you can use the following functions:

files = ied_connection.get_files()
for file in files:
    print(file.filepath)
    break
content = ied_connection.download_file(b"dummy.txt")
print(content.decode("utf-8"))

Just wait or perform an infinite loop to received report

time.sleep(20)
ied_connection.close()

This notebook can be downloaded as client_simple.py