Перейти к основному содержимому

Python-сниффер для анализа Modbus RTU трафика

· 3 мин. чтения
dmn
maintainer

Написал "на коленке" полезный сниффер modbus

modbus_sniffer_raw_pretty.py

#!/usr/bin/env python3
"""
Raw Modbus RTU Sniffer — listens to a serial port and prints decoded Modbus RTU frames.
Now includes decoding of address/count/value for popular function codes.
"""

import serial
import argparse
import time
import logging
import struct

logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.INFO)

def calculate_crc(data: bytes):
crc = 0xFFFF
for pos in data:
crc ^= pos
for _ in range(8):
lsb = crc & 0x0001
crc >>= 1
if lsb:
crc ^= 0xA001
return crc.to_bytes(2, 'little')

def validate_crc(frame: bytes):
if len(frame) < 4:
return False
data, received_crc = frame[:-2], frame[-2:]
return calculate_crc(data) == received_crc

def decode_payload(fc, payload):
if fc in [1, 2, 3, 4]: # Read coils, discrete inputs, HR, IR
if len(payload) >= 4:
address, count = struct.unpack(">HH", payload[:4])
return f"Read | Addr={address} | Count={count}"
elif fc in [5, 6]: # Write single coil/register
if len(payload) >= 4:
address, value = struct.unpack(">HH", payload[:4])
return f"Write Single | Addr={address} | Value={value}"
elif fc in [15, 16]: # Write multiple coils/registers
if len(payload) >= 5:
address, count, byte_count = struct.unpack(">HHB", payload[:5])
return f"Write Multiple | Addr={address} | Count={count} | Bytes={byte_count}"
return f"Payload: {payload.hex()}"

def print_frame_info(frame: bytes):
if not validate_crc(frame):
log.warning(f"❌ Invalid CRC: {frame.hex()}")
return
unit_id = frame[0]
function_code = frame[1]
payload = frame[2:-2]
desc = decode_payload(function_code, payload)
log.info(f"📥 Unit={unit_id} | FC={function_code}{desc}")

def read_frames(ser):
buffer = bytearray()
last_byte_time = time.time()
inter_char_timeout = 0.01
frame_timeout = 0.1

while True:
if ser.in_waiting:
byte = ser.read(1)
now = time.time()
if now - last_byte_time > frame_timeout and buffer:
print_frame_info(bytes(buffer))
buffer.clear()
buffer.append(byte[0])
last_byte_time = now
elif buffer and time.time() - last_byte_time > frame_timeout:
print_frame_info(bytes(buffer))
buffer.clear()
else:
time.sleep(0.005)

def main():
parser = argparse.ArgumentParser(description="Raw Modbus RTU Sniffer with decoding")
parser.add_argument('--port', required=True, help='Serial port (e.g. /dev/ttyUSB0)')
parser.add_argument('--baudrate', type=int, default=9600)
parser.add_argument('--parity', choices=['N', 'E', 'O'], default='N')
parser.add_argument('--stopbits', type=int, choices=[1, 2], default=1)
args = parser.parse_args()

try:
ser = serial.Serial(
port=args.port,
baudrate=args.baudrate,
parity={'N': serial.PARITY_NONE, 'E': serial.PARITY_EVEN, 'O': serial.PARITY_ODD}[args.parity],
stopbits=args.stopbits,
bytesize=8,
timeout=0
)
log.info(f"🔍 Listening on {args.port} at {args.baudrate} bps...")
read_frames(ser)
except Exception as e:
log.error(f"Failed to open serial port: {e}")

if __name__ == "__main__":
main()

использует библиотеку pyserial

pip install pyserial

Пример:

(venv) orangepi@cm4-right:~$ python3 modbus_sniffer_raw_pretty.py  --baudrate 9600 --port /dev/ttyS9
INFO:root:🔍 Listening on /dev/ttyS9 at 9600 bps...
INFO:root:📥 Unit=1 | FC=3 — Read | Addr=0 | Count=1
INFO:root:📥 Unit=1 | FC=3 — Read | Addr=10 | Count=2

Скрипт читает и распознает пакеты modbus, запросы modbus транслирует на экран.

#modbus #modbussniffer

Адресация GPIO в gpiod на примере GPIO2_B4

· 1 мин. чтения
dmn
maintainer

Как адресуются gpio в gpiod на примере GPIO2_B4. Есть банки 0-4 - это первая цифра после GPIO ("2"). В каждом банке 32 ячейки.

А0-А7 - 0 -7
B0-B7 - 8-15
C0-C7 - 16-24
D0-D7 - 25-31

Поэтому GPIO2_B4 это банк 2, ячейка 12.

Опрашивается так:

gpioget --numeric -a -c gpiochip2 12

А устанавливается так:

gpioset -t 0 -c gpiochip2 12=1

Прочитать все банки:

gpiodetect

Прочитать банк:

gpioinfo -c gpiochip2

#gpio #napigpio

Подключение UART4 в NapiLinux

· 1 мин. чтения
dmn
maintainer

Для подключения uart4 в NapiLinux необходимо:

✔️Создать папку /boot/overlay-user ✔️Положить в нее файл rk3308-uart4.dtbo ✔️Добавить строчку в файл /boot/uEnv.txt

user_overlays=rk3308-uart4 

✔️Перезагрузить систему ✔️Должно появиться устройство /dev/ttyS4

#napi #napioverlay #dts

Тестирование MQTT датчиков через Telegraf

· 1 мин. чтения
dmn
maintainer

Для тестирования датчиков mqtt не подходит вывод нашего теста через Веб, так как данные приходят не сразу. Поэтому можем тестировать вручную.

Для того, чтобы было видно какие данные увидел телеграф, перенаправим вывод на экран.

Делаем файл telegtraf.conf.out в любом месте, но не в /etc/telegraf/telegraf.d (чтобы телеграф его по умолчанию не подхватывал)

[[outputs.file]]
files = ["stdout"]
data_format = "influx"

И тестируем командой

telegraf --config /etc/telegraf/telegraf.d/cl-203.conf --config /root/telegraf.conf.out --debug

Смотрим на экране данные

CL213E,host=napi-fearlessporcupine,sensor=CL213E,topic=sensors/CL213E/CO CO=0 1751463487564614267
CL213E,host=napi-fearlessporcupine,sensor=CL213E,topic=sensors/CL213E/CO2 CO2=410 1751463487605135529
CL213E,host=napi-fearlessporcupine,sensor=CL213E,topic=sensors/CL213E/PM25 PM25=248 1751463487605167843
CL213E,host=napi-fearlessporcupine,sensor=CL213E,topic=sensors/CL213E/RH RH=47.6 1751463487605195050
CL213E,host=napi-fearlessporcupine,sensor=CL213E,topic=sensors/CL213E/TC TC=24 1751463487605267960
CL213E,host=napi-fearlessporcupine,sensor=CL213E,topic=sensors/CL213E/DC DC=12.2 1751463487605276033

#telegraf #mqtt #napilinux

Настройка аутентификации Mosquitto по паролю

· 1 мин. чтения
dmn
maintainer

Для запуска Mosquitto с проверкой пользователя и пароля нужно выполнить несколько простых шагов.

Создание файла паролей

Создаем файл паролей с помощью утилиты mosquitto_passwd:

mosquitto_passwd -c /etc/mosquitto/passwd user

После выполнения команды система запросит пароль для пользователя user.

Смена владельца файла

Меняем владельца файла на пользователя mosquitto:

chown mosquitto /etc/mosquitto/passwd

Конфигурация Mosquitto

Простейший конфиг для включения аутентификации:

allow_anonymous false
password_file /etc/mosquitto/passwd

После применения этих настроек Mosquitto будет требовать авторизацию для всех подключений.

Добро пожаловать в техблог!

· 1 мин. чтения
dmn
maintainer

Это новый техблог для технических заметок и решений! 🎯

Раздел основан на материалах из Telegram канала t.me/napilab и находится в процессе наполнения.

Здесь будут публиковаться:

  • Быстрые техзаметки из Telegram канала
  • Решения проблем и troubleshooting
  • Полезные скрипты и автоматизация
  • Советы по работе с Linux и embedded устройствами

Отличие от основного блога новостей:

  • Основной блог - официальные новости, анонсы продуктов, выставки
  • Техблог - технические заметки, решения, quick tips

Пример использования:

  1. Скопируйте заметку из Telegram
  2. Создайте файл в папке techblog/
  3. Добавьте соответствующие теги
  4. Опубликуйте!