Modbus Mapper ExampleΒΆ

# pylint: disable=missing-type-doc
r"""This is used to generate decoder blocks.

so that non-programmers can define the
register values and then decode a modbus device all
without having to write a line of code for decoding.

Currently supported formats are:

* csv
* json
* xml

Here is an example of generating and using a mapping decoder
(note that this is still in the works and will be greatly
simplified in the final api; it is just an example of the
requested functionality)::

    CSV:
    address,type,size,name,function
    0,int16,1,Comm. count PLC,hr
    1,int16,1,Comm. count PLC,hr
    2,int16,1,Comm. count PLC,hr
    3,int16,1,Comm. count PLC,hr
    4,int16,1,Comm. count PLC,hr
    5,int16,1,Comm. count PLC,hr
    6,int16,1,Comm. count PLC,hr
    7,int16,1,Comm. count PLC,hr
    8,int16,1,Comm. count PLC,hr
    9,int16,1,Comm. count PLC,hr
    10,int32,2,Comm. count PLC,hr
    12,int32,2,Comm. count PLC,hr

    from modbus_mapper import csv_mapping_parser
    from modbus_mapper import mapping_decoder
    from pymodbus.client import ModbusTcpClient
    from pymodbus.payload import BinaryPayloadDecoder
    from pymodbus.constants import Endian

    from pprint import pprint
    import logging

    # FORMAT = "%(asctime)-15s %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s"
    # logging.basicConfig(format=FORMAT)
    # _logger = logging.getLogger()
    # _logger.setLevel(logging.DEBUG)

    template = ["address", "type", "size", "name", "function"]
    raw_mapping = csv_mapping_parser("simple_mapping_client.csv", template)
    # raw_mapping = csv_mapping_parser("Naust_Comm_to_scr_client.csv", template)
    mapping = mapping_decoder(raw_mapping)

    client = ModbusTcpClient(host="localhost", port=5020)

    response = client.read_holding_registers(address=int(0), count=14)
    decoder = BinaryPayloadDecoder.fromRegisters(
        response.registers, byteorder=Endian.Big, wordorder=Endian.Little
    )

    for block in mapping.items():
        for mac in block:
            if type(mac) == dict:
                # response = client.read_holding_registers(
                #     address=int(mac["address"]), count=mac["size"]
                # )
                # decoder = BinaryPayloadDecoder.fromRegisters(
                #     response.registers, byteorder=Endian.Big, wordorder=Endian.Little
                # )
                print("[{}]\t{}".format(mac["address"], mac["type"]()(decoder)))
                # decoder._payload # remove mac["size"] bytes from beginning

Also, using the same input mapping parsers, we can generate
populated slave contexts that can be run behind a modbus server::

    CSV:
    address,value,function,name,description
    0,0,hr,Comm. count PLC,Comm. count PLC
    1,10,hr,Comm. count PLC,Comm. count PLC
    2,20,hr,Comm. count PLC,Comm. count PLC
    3,30,hr,Comm. count PLC,Comm. count PLC
    4,40,hr,Comm. count PLC,Comm. count PLC
    5,50,hr,Comm. count PLC,Comm. count PLC
    6,60,hr,Comm. count PLC,Comm. count PLC
    7,70,hr,Comm. count PLC,Comm. count PLC
    8,80,hr,Comm. count PLC,Comm. count PLC
    9,90,hr,Comm. count PLC,Comm. count PLC
    10,100,hr,Comm. count PLC,Comm. count PLC
    11,0,hr,Comm. count PLC,Comm. count PLC
    12,120,hr,Comm. count PLC,Comm. count PLC
    13,0,hr,Comm. count PLC,Comm. count PLC

    from modbus_mapper import csv_mapping_parser
    from modbus_mapper import modbus_context_decoder

    from pymodbus.server import StartTcpServer
    from pymodbus.datastore.context import ModbusServerContext
    from pymodbus.device import ModbusDeviceIdentification
    from pymodbus.version import version


    from pprint import pprint
    import logging

    FORMAT = "%(asctime)-15s %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s"
    logging.basicConfig(format=FORMAT)
    _logger = logging.getLogger()
    _logger.setLevel(logging.DEBUG)

    template = ["address", "value", "function", "name", "description"]
    raw_mapping = csv_mapping_parser("simple_mapping_server.csv", template)

    slave_context = modbus_context_decoder(raw_mapping)
    context = ModbusServerContext(slaves=slave_context, single=True)
    identity = ModbusDeviceIdentification(
        info_name={
            "VendorName": "Pymodbus",
            "ProductCode": "PM",
            "VendorUrl": "https://github.com/riptideio/pymodbus/",
            "ProductName": "Pymodbus Server",
            "ModelName": "Pymodbus Server",
            "MajorMinorRevision": version.short(),
        }
    )
    StartTcpServer(context=context, identity=identity, address=("localhost", 5020))

"""
from collections import defaultdict
import csv
from io import StringIO
import json
from tokenize import generate_tokens

from pymodbus.datastore import (
    ModbusSlaveContext,
    ModbusSparseDataBlock,
)

# --------------------------------------------------------------------------- #
# raw mapping input parsers
# --------------------------------------------------------------------------- #
# These generate the raw mapping_blocks from some form of input
# which can then be passed to the decoder in question to supply
# the requested output result.
# --------------------------------------------------------------------------- #


def csv_mapping_parser(path, template):
    """Given a csv file of the the mapping data for a modbus device,

    return a mapping layout that can be used to decode an new block.

    .. note:: For the template, a few values are required
    to be defined: address, size, function, and type. All the remaining
    values will be stored, but not formatted by the application.
    So for example::

        template = ["address", "type", "size", "name", "function"]
        mappings = json_mapping_parser("mapping.json", template)

    :param path: The path to the csv input file
    :param template: The row value template
    :returns: The decoded csv dictionary
    """
    mapping_blocks = defaultdict(dict)
    with open(path, "r") as handle:  # pylint: disable=unspecified-encoding
        reader = csv.reader(handle)
        next(reader)  # skip the csv header
        for row in reader:
            mapping = dict(zip(template, row))
            # mapping.pop("function")
            aid = mapping["address"]
            mapping_blocks[aid] = mapping
    return mapping_blocks


def json_mapping_parser(path, template):
    """Given a json file of the the mapping data for a modbus device,

    return a mapping layout that can
    be used to decode an new block.

    .. note:: For the template, a few values are required
    to be mapped: address, size, and type. All the remaining
    values will be stored, but not formatted by the application.
    So for example::

        template = {
            "Start": "address",
            "DataType": "type",
            "Length": "size"
            # the remaining keys will just pass through
        }
        mappings = json_mapping_parser("mapping.json", template)

    :param path: The path to the csv input file
    :param template: The row value template
    :returns: The decoded csv dictionary
    """
    mapping_blocks = {}
    with open(path, "r") as handle:  # pylint: disable=unspecified-encoding
        for tid, rows in json.load(handle).iteritems():
            mappings = {}
            for key, values in rows.iteritems():
                mapping = {template.get(k, k): v for k, v in values.iteritems()}
                mappings[int(key)] = mapping
            mapping_blocks[tid] = mappings
    return mapping_blocks


def xml_mapping_parser():
    """Given an xml file of the the mapping data for a modbus device,

    return a mapping layout that can be used to decode an new block.

    :returns: The decoded csv dictionary
    """


# --------------------------------------------------------------------------- #
# modbus context decoders
# --------------------------------------------------------------------------- #
# These are used to decode a raw mapping_block into a slave context with
# populated function data blocks.
# --------------------------------------------------------------------------- #
def modbus_context_decoder(mapping_blocks):
    """Generate a backing slave context with initialized data blocks.

    .. note:: This expects the following for each block:
    address, value, and function where function is one of
    di (discretes), co (coils), hr (holding registers), or
    ir (input registers).

    :param mapping_blocks: The mapping blocks
    :returns: The initialized modbus slave context
    """
    sparse = ModbusSparseDataBlock()
    sparse.create()
    for block in mapping_blocks.items():
        for mapping in block:
            if type(mapping) == dict:
                value = mapping["value"]
                address = mapping["address"]
                sparse.setValues(address=int(address), values=int(value))
    return ModbusSlaveContext(
        di=sparse, co=sparse, hr=sparse, ir=sparse, zero_mode=True
    )


# --------------------------------------------------------------------------- #
# modbus mapping decoder
# --------------------------------------------------------------------------- #
# These are used to decode a raw mapping_block into a request decoder.
# So this allows one to simply grab a number of registers, and then
# pass them to this decoder which will do the rest.
# --------------------------------------------------------------------------- #
class ModbusTypeDecoder:
    """This is a utility to determine the correct decoder to use given a type name.

    By default this supports all the types available in the default modbus
    decoder, however this can easily be extended this class
    and adding new types to the mapper::

        class CustomTypeDecoder(ModbusTypeDecoder):
            def __init__(self):
                ModbusTypeDecode.__init__(self)
                self.mapper["type-token"] = self.callback

            def parse_my_bitfield(self, tokens):
                return lambda d: d.decode_my_type()

    """

    def __init__(self):
        """Initialize a new instance of the decoder"""
        self.default = lambda m: self.parse_16bit_uint
        self.parsers = {
            "uint": self.parse_16bit_uint,
            "uint8": self.parse_8bit_uint,
            "uint16": self.parse_16bit_uint,
            "uint32": self.parse_32bit_uint,
            "uint64": self.parse_64bit_uint,
            "int": self.parse_16bit_int,
            "int8": self.parse_8bit_int,
            "int16": self.parse_16bit_int,
            "int32": self.parse_32bit_int,
            "int64": self.parse_64bit_int,
            "float": self.parse_32bit_float,
            "float32": self.parse_32bit_float,
            "float64": self.parse_64bit_float,
            "string": self.parse_32bit_int,
            "bits": self.parse_bits,
        }

    # ------------------------------------------------------------ #
    # Type parsers
    # ------------------------------------------------------------ #
    @staticmethod
    def parse_string(tokens):
        """Parse value."""
        _ = next(tokens)
        size = int(next(tokens))
        return lambda d: d.decode_string(size=size)

    @staticmethod
    def parse_bits():
        """Parse value."""
        return lambda d: d.decode_bits()

    @staticmethod
    def parse_8bit_uint():
        """Parse value."""
        return lambda d: d.decode_8bit_uint()

    @staticmethod
    def parse_16bit_uint():
        """Parse value."""
        return lambda d: d.decode_16bit_uint()

    @staticmethod
    def parse_32bit_uint():
        """Parse value."""
        return lambda d: d.decode_32bit_uint()

    @staticmethod
    def parse_64bit_uint():
        """Parse value."""
        return lambda d: d.decode_64bit_uint()

    @staticmethod
    def parse_8bit_int():
        """Parse value."""
        return lambda d: d.decode_8bit_int()

    @staticmethod
    def parse_16bit_int():
        """Parse value."""
        return lambda d: d.decode_16bit_int()

    @staticmethod
    def parse_32bit_int():
        """Parse value."""
        return lambda d: d.decode_32bit_int()

    @staticmethod
    def parse_64bit_int():
        """Parse value."""
        return lambda d: d.decode_64bit_int()

    @staticmethod
    def parse_32bit_float():
        """Parse value."""
        return lambda d: d.decode_32bit_float()

    @staticmethod
    def parse_64bit_float():
        """Parse value."""
        return lambda d: d.decode_64bit_float()

    # ------------------------------------------------------------
    # Public Interface
    # ------------------------------------------------------------
    def tokenize(self, value):
        """Return the tokens

        :param value: The value to tokenize
        :returns: A token generator
        """
        tokens = generate_tokens(StringIO(value).readline)
        for _, tokval, _, _, _ in tokens:
            yield tokval

    def parse(self, value):
        """Return a function that supplied with a decoder,

        will decode the correct value.

        :param value: The type of value to parse
        :returns: The decoder method to use
        """
        tokens = self.tokenize(value)
        token = next(tokens).lower()  # pylint: disable=no-member
        parser = self.parsers.get(token, self.default)
        return parser


def mapping_decoder(mapping_blocks, decoder=None):
    """Convert them into modbus value decoder map.

    :param mapping_blocks: The mapping blocks
    :param decoder: The type decoder to use
    """
    decoder = decoder or ModbusTypeDecoder()
    map = defaultdict(dict)
    for block in mapping_blocks.items():
        for mapping in block:
            if type(mapping) == dict:
                mapping["address"] = mapping["address"]
                mapping["size"] = mapping["size"]
                mapping["type"] = decoder.parse(mapping["type"])
                map[mapping["address"]] = mapping
    return map