Asynchronous client extended calls exampleΒΆ

#!/usr/bin/env python3
"""Pymodbus Synchronous Client extended calls rxample.

This example uses client_sync.py to handle connection, and have the same options.

The example shows how to use the synchronous modbus client
implementation from pymodbus to perform the extended portions of the
modbus protocol.

If you are performing a request that is not available in the client
mixin, you have to perform the request like this instead:

from pymodbus.diag_message import ClearCountersRequest
from pymodbus.diag_message import ClearCountersResponse

request  = ClearCountersRequest()
response = client.execute(request)
if isinstance(response, ClearCountersResponse):
    ... do something with the response


The corresponding server must be started before e.g. as:
    python3 server_sync.py
"""
import asyncio
import logging

from examples.client_async import run_async_client, setup_async_client

from pymodbus.diag_message import (
    ChangeAsciiInputDelimiterRequest,
    ClearCountersRequest,
    ClearOverrunCountRequest,
    ForceListenOnlyModeRequest,
    GetClearModbusPlusRequest,
    RestartCommunicationsOptionRequest,
    ReturnBusCommunicationErrorCountRequest,
    ReturnBusExceptionErrorCountRequest,
    ReturnDiagnosticRegisterRequest,
    ReturnIopOverrunCountRequest,
    ReturnQueryDataRequest,
    ReturnSlaveBusCharacterOverrunCountRequest,
    ReturnSlaveBusyCountRequest,
    ReturnSlaveMessageCountRequest,
    ReturnSlaveNAKCountRequest,
    ReturnSlaveNoResponseCountRequest,
)
from pymodbus.mei_message import ReadDeviceInformationRequest
from pymodbus.other_message import (
    GetCommEventCounterRequest,
    GetCommEventLogRequest,
    ReadExceptionStatusRequest,
    ReportSlaveIdRequest,
)


UNIT = 0x01


async def _execute_information_requests(client):
    """Execute extended information requests."""
    _logger.info("### Running ReadDeviceInformationRequest")
    rr = await client.execute(ReadDeviceInformationRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running ReportSlaveIdRequest")
    rr = await client.execute(ReportSlaveIdRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK
    assert rr.status  # test that the status is ok

    _logger.info("Running ReadExceptionStatusRequest")
    rr = await client.execute(ReadExceptionStatusRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK
    assert not rr.status  # test the status code

    _logger.info("Running GetCommEventCounterRequest")
    rr = await client.execute(GetCommEventCounterRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK
    assert rr.status  # test the status code
    assert not rr.count  # test the count returned

    _logger.info("Running GetCommEventLogRequest")
    rr = await client.execute(GetCommEventLogRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK
    assert rr.status  # test the status code
    assert not rr.event_count  # test the number of events
    assert not rr.message_count  # test the number of messages
    assert not rr.events  # test the number of events


async def _execute_diagnostic_requests(client):
    """Execute extended diagnostic requests."""
    _logger.info("### Running ReturnQueryDataRequest")
    rr = await client.execute(ReturnQueryDataRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK
    assert not rr.message[0]  # test the resulting message

    _logger.info("Running RestartCommunicationsOptionRequest")
    rr = await client.execute(RestartCommunicationsOptionRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK
    assert not rr.message[0]  # test the resulting message

    _logger.info("Running ReturnDiagnosticRegisterRequest")
    rr = await client.execute(ReturnDiagnosticRegisterRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running ChangeAsciiInputDelimiterRequest")
    rr = await client.execute(ChangeAsciiInputDelimiterRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running ForceListenOnlyModeRequest")
    _logger.info("NOT WORKING")
    _logger.info(str(ForceListenOnlyModeRequest))
    # rr = await client.execute(
    #     ForceListenOnlyModeRequest(unit=UNIT)
    # )  # does not send a response
    # assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running ClearCountersRequest")
    rr = await client.execute(ClearCountersRequest())
    assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running ReturnBusCommunicationErrorCountRequest")
    rr = await client.execute(ReturnBusCommunicationErrorCountRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running ReturnBusExceptionErrorCountRequest")
    rr = await client.execute(ReturnBusExceptionErrorCountRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running ReturnSlaveMessageCountRequest")
    rr = await client.execute(ReturnSlaveMessageCountRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running ReturnSlaveNoResponseCountRequest")
    rr = await client.execute(ReturnSlaveNoResponseCountRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running ReturnSlaveNAKCountRequest")
    rr = await client.execute(ReturnSlaveNAKCountRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running ReturnSlaveBusyCountRequest")
    rr = await client.execute(ReturnSlaveBusyCountRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running ReturnSlaveBusCharacterOverrunCountRequest")
    rr = await client.execute(ReturnSlaveBusCharacterOverrunCountRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running ReturnIopOverrunCountRequest")
    rr = await client.execute(ReturnIopOverrunCountRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running ClearOverrunCountRequest")
    rr = await client.execute(ClearOverrunCountRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK

    _logger.info("Running GetClearModbusPlusRequest")
    rr = await client.execute(GetClearModbusPlusRequest(unit=UNIT))
    assert rr and not rr.isError()  # test that calls was OK


async def run_async_ext_calls(client):
    """Demonstrate basic read/write calls."""
    await _execute_information_requests(client)
    await _execute_diagnostic_requests(client)

# --------------------------------------------------------------------------- #
# Extra code, to allow commandline parameters instead of changing the code
# --------------------------------------------------------------------------- #
FORMAT = "%(asctime)-15s %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s"
logging.basicConfig(format=FORMAT)
_logger = logging.getLogger()


if __name__ == "__main__":
    testclient = setup_async_client()
    asyncio.run(run_async_client(testclient, modbus_calls=run_async_ext_calls))