Dynamic server#
Import#
Import required module for the example.
import os
import time
from py61850.common import CdcControlModelOptions, MmsDataAccessError, MmsValue
from py61850.server import (
CheckHandlerResult,
ClientConnection,
ControlAction,
ControlHandlerResult,
DataAttribute,
DataObject,
IedModel,
IedServer,
SelectStateChangedReason,
SettingGroupControlBlock,
)
Callback#
def on_connection_change(client: ClientConnection, connected: bool):
"""Callback used when the connection status to the serve change.
Parameters
----------
client : ClientConnection
Client at the origin of the change
connected : bool
Indicate whether the new state is connected or disconnected
"""
if connected:
print(f"Client {client.local_address()} is connected to {client.peer_address()}.")
else:
print(f"Client {client.local_address()} is disconnected.")
def on_active_setting_group_change(
sgcb: SettingGroupControlBlock,
client: ClientConnection,
val: int,
) -> bool:
"""Callback when the active setting group is changed
Parameters
----------
sgcb : SettingGroupControlBlock
Setting group control block where the setting group is changed
client : ClientConnection
Client which trigger the change
val : int
New setting group
Returns
-------
bool
True to indicate the server can perform the chnage False to reject
the change.
"""
print(
f"Client {client.local_address()} change active setting group from {sgcb.act_sg} to {val}."
)
return True
def on_edit_setting_group_change(
sgcb: SettingGroupControlBlock,
client: ClientConnection,
val: int,
) -> bool:
"""Callback when the edit setting group is changed
Parameters
----------
sgcb : SettingGroupControlBlock
Setting group control block where the setting group is changed
client : ClientConnection
Client which trigger the change
val : int
New setting group
Returns
-------
bool
True to indicate the server can perform the chnage False to reject
the change.
"""
print(
f"Client {client.local_address()} change edit setting group from {sgcb.edit_sg} to {val}."
)
return True
def on_edit_setting_group_confirmed(
sgcb: SettingGroupControlBlock,
val: int,
):
"""Callback when the client trigger the CnfEdit of the setting group control block
Parameters
----------
sgcb : SettingGroupControlBlock
Setting group control block where the edit setting group is confirmed
val : int
Value of the edit setting group
Returns
-------
_type_
_description_
"""
print(f"CnfEdit has been changed and confirmed edit group is {val}")
def on_static(
data_object: DataObject,
action: ControlAction,
ctl_value: MmsValue,
test: bool,
interlockCheck: bool,
) -> CheckHandlerResult:
# Call by select and operate
print("on_static")
return CheckHandlerResult.ACCEPTED
def on_dynamic(
data_object: DataObject,
action: ControlAction,
ctl_value: MmsValue,
test: bool,
sychroCheck: bool,
) -> ControlHandlerResult:
# called only by operate
# optional callback
print("on_dynamic")
return ControlHandlerResult.OK
def on_control_select_state_change(
data_object: DataObject,
action: ControlAction,
test: bool,
reason: SelectStateChangedReason,
):
print(f"Select state changed for {data_object.name} due to {reason}")
print(f" client: {action.get_client_connection().local_address()}")
print(f" Interlock flag is {'on' if action.get_interlock_check() else 'off'}")
print(f" Synchrocheck flag is {'on' if action.get_synchro_check() else 'off'}")
print(f" test flag is {'on' if test else 'off'}")
print(f" Command received at {action.getT().get_time()}")
print(f" Originator identifier is {action.get_originator_identifier()}")
print(f" Originator category is {action.get_originator_category()}")
print(action.get_control_time()) # Not working ?
def on_control_operate(
data_object: DataObject,
action: ControlAction,
ctl_value: MmsValue,
test: bool,
) -> ControlHandlerResult:
print(f"Received control command for {data_object.name}")
print(f" New value is {ctl_value.get_value()}")
print(f" client: {action.get_client_connection().local_address()}")
print(f" Interlock flag is {'on' if action.get_interlock_check() else 'off'}")
print(f" Synchrocheck flag is {'on' if action.get_synchro_check() else 'off'}")
print(f" test flag is {'on' if test else 'off'}")
print(f" Command received at {action.getT().get_time()}")
print(f" Originator identifier is {action.get_originator_identifier()}")
print(f" Originator category is {action.get_originator_category()}")
# print(action.get_control_time()) # Not working ?
if data_object.name == b"Mod":
ied_server.update_utc_time(data_object.child("t"))
ied_server.update_int32(data_object.child("stVal"), ctl_value.get_value())
return ControlHandlerResult.OK
def on_write(
client: ClientConnection,
data_attribute: DataAttribute,
value: MmsValue,
) -> MmsDataAccessError:
print(f"Received write command for {data_attribute.name}")
print(f" client: {client.local_address()}")
return MmsDataAccessError.SUCCESS
Model creation#
Create a Iedmodel by using the create_logical_device, create_logical_node… functions First create the model itself
ied_model = IedModel("IEDName")
Add logical device to the model
ld0 = ied_model.create_logical_device("LD0")
Add some logical nodes in logical devices
ld0_lln0 = ld0.create_logical_node("LLN0")
ld0_lphd1 = ld0.create_logical_node("LPHD1")
ld0_ptoc1 = ld0.create_logical_node("PTOC1")
Add some data object in logical nodes
ld0_lln0_mod = ld0_lln0.create_cdc_inc(
"Mod",
control_options=CdcControlModelOptions.MODEL_SBO_ENHANCED,
)
ld0_lln0_beh = ld0_lln0.create_cdc_ins("Beh")
ld0_lln0_health = ld0_lln0.create_cdc_ins("Health")
ld0_ptoc1_beh = ld0_ptoc1.create_cdc_ens("Beh")
ld0_ptoc1_str = ld0_ptoc1.create_cdc_acd("Str")
ld0_ptoc1_op = ld0_ptoc1.create_cdc_act("Op")
ld0_ptoc1_strval = ld0_ptoc1.create_cdc_asg("StrVal", False)
ld0_ptoc1_ppdltmms = ld0_ptoc1.create_cdc_ing("OpDlTmms")
Setting group control block#
Create a setting group control block in the LLN0 logcial node
ld0_sgcb = ld0_lln0.create_setting_group_control_block(1, 4)
Dataset#
Create dataset
dataset = ld0_lln0.create_dataset("DsRpt")
dataset.create_dataset_entry("PTOC1$ST$Str")
dataset.create_dataset_entry("PTOC1$ST$Op")
Report control block#
Create report control block
urcba = ld0_lln0.create_report_control_block("URCBA", "URCBA01", False, dataset.name)
brcba = ld0_lln0.create_report_control_block("BRCBA", "BRCBA01", True, dataset.name)
Retrieve attribute for further operation
ld0_lln0_mod_t: DataAttribute = ld0_lln0_mod.child("t") # type:ignore
ld0_ptoc1_str_t: DataAttribute = ld0_ptoc1_str.child("t") # type:ignore
ld0_ptoc1_str_general: DataAttribute = ld0_ptoc1_str.child("general") # type:ignore
ld0_ptoc1_ppdltmms_setval: DataAttribute = ld0_ptoc1_ppdltmms.child("setVal") # type:ignore
Initialise some value before creating the server
ld0_lln0_mod_t.init_value(MmsValue.new_utc_time())
Create the server from the model
ied_server = IedServer(ied_model)
Register callback for connection status change (connection/disconnection)
ied_server.register_connection_change(on_connection_change)
Register callback for setting group
ied_server.register_active_setting_group_change_handler(ld0_sgcb, on_active_setting_group_change)
ied_server.register_edit_setting_group_change_handler(ld0_sgcb, on_edit_setting_group_change)
ied_server.register_edit_setting_group_confirmed_handler(ld0_sgcb, on_edit_setting_group_confirmed)
Register control callback
ied_server.register_control_handler(ld0_lln0_mod, on_control_operate) # Mandatory
ied_server.register_control_select_state_handler(ld0_lln0_mod, on_control_select_state_change)
ied_server.register_control_static_check_handler(ld0_lln0_mod, on_static) # Optional
ied_server.register_control_wait_handler(ld0_lln0_mod, on_dynamic) # Optional
Register write handler for a specific data attribute
ied_server.register_write_handler(ld0_ptoc1_ppdltmms_setval, on_write)
# Set the base path for the file service
current_file_directory = os.path.dirname(os.path.realpath(__file__))
ied_server.set_filestore_basepath(os.path.join(current_file_directory, "file-store", ""))
Start the server
ied_server.start(102)
if ied_server.is_running:
print("Server is running")
else:
raise RuntimeError("Server is not running")
While loop to never terminate
while True:
time.sleep(10)
ied_server.lock_data_model()
ied_server.update_utc_time(ld0_ptoc1_str_t)
ied_server.update_boolean(
ld0_ptoc1_str_general, not ied_server.get_boolean(ld0_ptoc1_str_general)
)
ied_server.unlock_data_model()
This notebook can be downloaded as server_dynamic.py