Skip to main content

3 posts tagged with "modbus"

View All Tags

Анализ сигналов Modbus RS485 на анализаторе

· One min read
dmn
maintainer

Анализ сигналов Modbus RS485

Покажем передачу modbus пакетов, отображенную на анализаторе цифровых сигналов.

Оборудование

  • Хост: Napi-C с программным RTS
  • Датчик: учебный Modbus Napi-датчик

Конфигурация каналов

  • Канал 1: RTS хоста
  • Канал 2: RX хоста
  • Канал 3: TX хоста
  • Канал 4: RX датчика
  • Канал 5: TX датчика

Последовательность обмена

  1. Хост поднимает сигнал RTS (передача)
  2. Посылает запрос по линии TX
  3. Датчик принимает запрос через RX
  4. Датчик передает ответ через TX
  5. Хост принимает ответ через RX

#rs485 #rts

Конфигурация UART на модуле CM4 в Токосборщике

· One min read
dmn
maintainer

В Токосборщике на модуле CM4 UART-ы расположились следующим образом:

✔️UART3 - внешний датчик Modbus ✔️UART9 - модуль расширений (Zigbee) ✔️UART7 - встроенный датчик тока

Для корректной работе Debian, необходимо подключить оверлеи с uart7,9 в файле /boot/orangepiEnv.txt

root@orangepicm4:~# cat /boot/orangepiEnv.txt 
verbosity=1
bootlogo=false
extraargs=cma=128M
overlay_prefix=rk356x
overlays=uart7-m2 uart9-m2
rootdev=UUID=a0f8ca89-7eb7-4a1e-947a-2341637b4782
rootfstype=ext4
console=serial

#fcucm4 #orangecm4 #fcu

Python-сниффер для анализа Modbus RTU трафика

· 3 min read
dmn
maintainer

Написал "на коленке" полезный сниффер modbus

modbus_sniffer_raw_pretty.py

#!/usr/bin/env python3
"""
Raw Modbus RTU Sniffer — listens to a serial port and prints decoded Modbus RTU frames.
Now includes decoding of address/count/value for popular function codes.
"""

import serial
import argparse
import time
import logging
import struct

logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.INFO)

def calculate_crc(data: bytes):
crc = 0xFFFF
for pos in data:
crc ^= pos
for _ in range(8):
lsb = crc & 0x0001
crc >>= 1
if lsb:
crc ^= 0xA001
return crc.to_bytes(2, 'little')

def validate_crc(frame: bytes):
if len(frame) < 4:
return False
data, received_crc = frame[:-2], frame[-2:]
return calculate_crc(data) == received_crc

def decode_payload(fc, payload):
if fc in [1, 2, 3, 4]: # Read coils, discrete inputs, HR, IR
if len(payload) >= 4:
address, count = struct.unpack(">HH", payload[:4])
return f"Read | Addr={address} | Count={count}"
elif fc in [5, 6]: # Write single coil/register
if len(payload) >= 4:
address, value = struct.unpack(">HH", payload[:4])
return f"Write Single | Addr={address} | Value={value}"
elif fc in [15, 16]: # Write multiple coils/registers
if len(payload) >= 5:
address, count, byte_count = struct.unpack(">HHB", payload[:5])
return f"Write Multiple | Addr={address} | Count={count} | Bytes={byte_count}"
return f"Payload: {payload.hex()}"

def print_frame_info(frame: bytes):
if not validate_crc(frame):
log.warning(f"❌ Invalid CRC: {frame.hex()}")
return
unit_id = frame[0]
function_code = frame[1]
payload = frame[2:-2]
desc = decode_payload(function_code, payload)
log.info(f"📥 Unit={unit_id} | FC={function_code}{desc}")

def read_frames(ser):
buffer = bytearray()
last_byte_time = time.time()
inter_char_timeout = 0.01
frame_timeout = 0.1

while True:
if ser.in_waiting:
byte = ser.read(1)
now = time.time()
if now - last_byte_time > frame_timeout and buffer:
print_frame_info(bytes(buffer))
buffer.clear()
buffer.append(byte[0])
last_byte_time = now
elif buffer and time.time() - last_byte_time > frame_timeout:
print_frame_info(bytes(buffer))
buffer.clear()
else:
time.sleep(0.005)

def main():
parser = argparse.ArgumentParser(description="Raw Modbus RTU Sniffer with decoding")
parser.add_argument('--port', required=True, help='Serial port (e.g. /dev/ttyUSB0)')
parser.add_argument('--baudrate', type=int, default=9600)
parser.add_argument('--parity', choices=['N', 'E', 'O'], default='N')
parser.add_argument('--stopbits', type=int, choices=[1, 2], default=1)
args = parser.parse_args()

try:
ser = serial.Serial(
port=args.port,
baudrate=args.baudrate,
parity={'N': serial.PARITY_NONE, 'E': serial.PARITY_EVEN, 'O': serial.PARITY_ODD}[args.parity],
stopbits=args.stopbits,
bytesize=8,
timeout=0
)
log.info(f"🔍 Listening on {args.port} at {args.baudrate} bps...")
read_frames(ser)
except Exception as e:
log.error(f"Failed to open serial port: {e}")

if __name__ == "__main__":
main()

использует библиотеку pyserial

pip install pyserial

Пример:

(venv) orangepi@cm4-right:~$ python3 modbus_sniffer_raw_pretty.py  --baudrate 9600 --port /dev/ttyS9
INFO:root:🔍 Listening on /dev/ttyS9 at 9600 bps...
INFO:root:📥 Unit=1 | FC=3 — Read | Addr=0 | Count=1
INFO:root:📥 Unit=1 | FC=3 — Read | Addr=10 | Count=2

Скрипт читает и распознает пакеты modbus, запросы modbus транслирует на экран.

#modbus #modbussniffer