You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
92 lines
3.2 KiB
92 lines
3.2 KiB
#!/usr/bin/env python
|
|
|
|
# RS485 support
|
|
#
|
|
# This file is part of pySerial. https://github.com/pyserial/pyserial
|
|
# (C) 2015 Chris Liechti <cliechti@gmx.net>
|
|
#
|
|
# SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
"""\
|
|
The settings for RS485 are stored in a dedicated object that can be applied to
|
|
serial ports (where supported).
|
|
NOTE: Some implementations may only support a subset of the settings.
|
|
"""
|
|
|
|
import time
|
|
import serial
|
|
|
|
|
|
class RS485Settings(object):
|
|
def __init__(
|
|
self,
|
|
rts_level_for_tx=True,
|
|
rts_level_for_rx=False,
|
|
loopback=False,
|
|
delay_before_tx=None,
|
|
delay_before_rx=None):
|
|
self.rts_level_for_tx = rts_level_for_tx
|
|
self.rts_level_for_rx = rts_level_for_rx
|
|
self.loopback = loopback
|
|
self.delay_before_tx = delay_before_tx
|
|
self.delay_before_rx = delay_before_rx
|
|
|
|
|
|
class RS485(serial.Serial):
|
|
"""\
|
|
A subclass that replaces the write method with one that toggles RTS
|
|
according to the RS485 settings.
|
|
|
|
NOTE: This may work unreliably on some serial ports (control signals not
|
|
synchronized or delayed compared to data). Using delays may be
|
|
unreliable (varying times, larger than expected) as the OS may not
|
|
support very fine grained delays (no smaller than in the order of
|
|
tens of milliseconds).
|
|
|
|
NOTE: Some implementations support this natively. Better performance
|
|
can be expected when the native version is used.
|
|
|
|
NOTE: The loopback property is ignored by this implementation. The actual
|
|
behavior depends on the used hardware.
|
|
|
|
Usage:
|
|
|
|
ser = RS485(...)
|
|
ser.rs485_mode = RS485Settings(...)
|
|
ser.write(b'hello')
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(RS485, self).__init__(*args, **kwargs)
|
|
self._alternate_rs485_settings = None
|
|
|
|
def write(self, b):
|
|
"""Write to port, controlling RTS before and after transmitting."""
|
|
if self._alternate_rs485_settings is not None:
|
|
# apply level for TX and optional delay
|
|
self.setRTS(self._alternate_rs485_settings.rts_level_for_tx)
|
|
if self._alternate_rs485_settings.delay_before_tx is not None:
|
|
time.sleep(self._alternate_rs485_settings.delay_before_tx)
|
|
# write and wait for data to be written
|
|
super(RS485, self).write(b)
|
|
super(RS485, self).flush()
|
|
# optional delay and apply level for RX
|
|
if self._alternate_rs485_settings.delay_before_rx is not None:
|
|
time.sleep(self._alternate_rs485_settings.delay_before_rx)
|
|
self.setRTS(self._alternate_rs485_settings.rts_level_for_rx)
|
|
else:
|
|
super(RS485, self).write(b)
|
|
|
|
# redirect where the property stores the settings so that underlying Serial
|
|
# instance does not see them
|
|
@property
|
|
def rs485_mode(self):
|
|
"""\
|
|
Enable RS485 mode and apply new settings, set to None to disable.
|
|
See serial.rs485.RS485Settings for more info about the value.
|
|
"""
|
|
return self._alternate_rs485_settings
|
|
|
|
@rs485_mode.setter
|
|
def rs485_mode(self, rs485_settings):
|
|
self._alternate_rs485_settings = rs485_settings
|
|
|