|
|
|
|
@@ -11,7 +11,7 @@ import serial.rs485
|
|
|
|
|
import minimalmodbus
|
|
|
|
|
|
|
|
|
|
# Custom Imports
|
|
|
|
|
from rover_control.msg import DriveControlMessage
|
|
|
|
|
from rover_control.msg import DriveControlMessage, DriveStatusMessage
|
|
|
|
|
|
|
|
|
|
#####################################
|
|
|
|
|
# Global Variables
|
|
|
|
|
@@ -24,6 +24,7 @@ DEFAULT_BAUD = 115200
|
|
|
|
|
DEFAULT_INVERT = False
|
|
|
|
|
|
|
|
|
|
DEFAULT_DRIVE_CONTROL_TOPIC = "drive_control/rear"
|
|
|
|
|
DEFAULT_DRIVE_CONTROL_STATUS_TOPIC = "drive_status/rear"
|
|
|
|
|
|
|
|
|
|
FIRST_MOTOR_ID = 1
|
|
|
|
|
SECOND_MOTOR_ID = 2
|
|
|
|
|
@@ -68,6 +69,8 @@ class DriveControl(object):
|
|
|
|
|
|
|
|
|
|
self.drive_control_subscriber_topic = rospy.get_param("~drive_control_topic", DEFAULT_DRIVE_CONTROL_TOPIC)
|
|
|
|
|
|
|
|
|
|
self.drive_control_status_topic = rospy.get_param("~drive_control_status_topic", DEFAULT_DRIVE_CONTROL_STATUS_TOPIC)
|
|
|
|
|
|
|
|
|
|
self.first_motor = minimalmodbus.Instrument(self.port, FIRST_MOTOR_ID)
|
|
|
|
|
self.second_motor = minimalmodbus.Instrument(self.port, SECOND_MOTOR_ID)
|
|
|
|
|
self.__setup_minimalmodbus_for_485()
|
|
|
|
|
@@ -75,6 +78,11 @@ class DriveControl(object):
|
|
|
|
|
self.drive_control_subscriber = \
|
|
|
|
|
rospy.Subscriber(self.drive_control_subscriber_topic, DriveControlMessage, self.drive_control_callback)
|
|
|
|
|
|
|
|
|
|
self.drive_control_status_publisher = rospy.Publisher(self.drive_control_status_topic, DriveStatusMessage, queue_size=1)
|
|
|
|
|
|
|
|
|
|
self.drive_control_message = DriveControlMessage()
|
|
|
|
|
self.new_control_message = False
|
|
|
|
|
|
|
|
|
|
self.run()
|
|
|
|
|
|
|
|
|
|
def __setup_minimalmodbus_for_485(self):
|
|
|
|
|
@@ -90,26 +98,64 @@ class DriveControl(object):
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
while not rospy.is_shutdown():
|
|
|
|
|
sleep(0.25)
|
|
|
|
|
self.send_drive_control_message()
|
|
|
|
|
self.get_drive_status()
|
|
|
|
|
sleep(0.005)
|
|
|
|
|
|
|
|
|
|
def send_drive_control_message(self):
|
|
|
|
|
if self.new_control_message:
|
|
|
|
|
drive_control = self.drive_control_message
|
|
|
|
|
try:
|
|
|
|
|
first_motor_register_data = list(MOTOR_DRIVER_DEFAULT_MESSAGE)
|
|
|
|
|
first_direction = \
|
|
|
|
|
not drive_control.first_motor_direction if self.first_motor_inverted else drive_control.first_motor_direction
|
|
|
|
|
first_motor_register_data[MODBUS_REGISTERS["DIRECTION"]] = first_direction
|
|
|
|
|
first_motor_register_data[MODBUS_REGISTERS["SPEED"]] = min(drive_control.first_motor_speed, UINT16_MAX)
|
|
|
|
|
|
|
|
|
|
second_motor_register_data = list(MOTOR_DRIVER_DEFAULT_MESSAGE)
|
|
|
|
|
second_direction = not drive_control.second_motor_direction if self.second_motor_inverted else drive_control.second_motor_direction
|
|
|
|
|
second_motor_register_data[MODBUS_REGISTERS["DIRECTION"]] = second_direction
|
|
|
|
|
second_motor_register_data[MODBUS_REGISTERS["SPEED"]] = min(drive_control.second_motor_speed, UINT16_MAX)
|
|
|
|
|
|
|
|
|
|
self.first_motor.write_registers(0, first_motor_register_data)
|
|
|
|
|
self.second_motor.write_registers(0, second_motor_register_data)
|
|
|
|
|
|
|
|
|
|
except Exception, error:
|
|
|
|
|
print "Error occurred:", error
|
|
|
|
|
|
|
|
|
|
self.new_control_message = False
|
|
|
|
|
|
|
|
|
|
def get_drive_status(self):
|
|
|
|
|
status = DriveStatusMessage()
|
|
|
|
|
|
|
|
|
|
first_motor_status = [0, 0, 0]
|
|
|
|
|
second_motor_status = [0, 0, 0]
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
first_motor_status = self.first_motor.read_registers(3, 3)
|
|
|
|
|
status.first_motor_connected = True
|
|
|
|
|
except Exception:
|
|
|
|
|
status.first_motor_connected = False
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
second_motor_status = self.second_motor.read_registers(3, 3)
|
|
|
|
|
status.second_motor_connected = True
|
|
|
|
|
except Exception:
|
|
|
|
|
status.second_motor_connected = False
|
|
|
|
|
|
|
|
|
|
if status.first_motor_connected:
|
|
|
|
|
status.first_motor_current = first_motor_status[0] / 1000.0
|
|
|
|
|
status.first_motor_fault = first_motor_status[1]
|
|
|
|
|
status.first_motor_temp = first_motor_status[2] / 1000.0
|
|
|
|
|
|
|
|
|
|
if status.second_motor_connected:
|
|
|
|
|
status.second_motor_current = second_motor_status[0] / 1000.0
|
|
|
|
|
status.second_motor_fault = second_motor_status[1]
|
|
|
|
|
status.second_motor_temp = second_motor_status[2] / 1000.0
|
|
|
|
|
|
|
|
|
|
def drive_control_callback(self, drive_control):
|
|
|
|
|
try:
|
|
|
|
|
first_motor_register_data = list(MOTOR_DRIVER_DEFAULT_MESSAGE)
|
|
|
|
|
first_direction = \
|
|
|
|
|
not drive_control.first_motor_direction if self.first_motor_inverted else drive_control.first_motor_direction
|
|
|
|
|
first_motor_register_data[MODBUS_REGISTERS["DIRECTION"]] = first_direction
|
|
|
|
|
first_motor_register_data[MODBUS_REGISTERS["SPEED"]] = min(drive_control.first_motor_speed, UINT16_MAX)
|
|
|
|
|
|
|
|
|
|
second_motor_register_data = list(MOTOR_DRIVER_DEFAULT_MESSAGE)
|
|
|
|
|
second_direction = not drive_control.second_motor_direction if self.second_motor_inverted else drive_control.second_motor_direction
|
|
|
|
|
second_motor_register_data[MODBUS_REGISTERS["DIRECTION"]] = second_direction
|
|
|
|
|
second_motor_register_data[MODBUS_REGISTERS["SPEED"]] = min(drive_control.second_motor_speed, UINT16_MAX)
|
|
|
|
|
|
|
|
|
|
self.first_motor.write_registers(0, first_motor_register_data)
|
|
|
|
|
self.second_motor.write_registers(0, second_motor_register_data)
|
|
|
|
|
|
|
|
|
|
except Exception, error:
|
|
|
|
|
print "Error occurred:", error
|
|
|
|
|
self.drive_control_message = drive_control
|
|
|
|
|
self.new_control_message = True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|