RX320 Controller

Raspberry_Pi_LogoAs part of my little “hands-on Python-learning” project, I developed a small Python program that controls the Ten-Tec RX-320 receiver. The program also provides a TCP/IP interface to the controller, allowing PA3ANG, a certain avid radio amateur who also happens to be my father, to happily feature the RX-320 on his website. Incidentally, he acquired a Raspberry Pi and set it up to control the RX-320.

A few years ago I wrote a similar controller in Java and it was using a lot of CPU time on the Raspberry Pi. It turned out that the Python version uses less CPU time, from about 15% to 1.5%. This may be due to the pySerial library being more efficient than the RXTX library that I used in Java, but this is all speculation.

Check out my project on GitHub to download the code and play with it if you happen to own a RX-320! Alternatively, you can go to the website of PA3ANG and play with a live version!

import serial
import threading
import time

class RX320():
    MODE_AM = 0
    MODE_USB = 1
    MODE_LSB = 2
    MODE_CW = 3

    AGC_SLOW = 1
    AGC_MEDIUM = 2
    AGC_FAST = 3

    FILTERS = [
        6000, 5700, 5400, 5100, 4800, 4500, 4200,
        3900, 3600, 3300, 3000, 2850, 2700, 2550,
        2400, 2250, 2100, 1950, 1800, 1650, 1500,
        1350, 1200, 1050, 900, 750, 675, 600,
        525, 450, 375, 330, 300, 8000,]

    def __init__(self, port, sleep_time=0.2):
        self.ser = serial.Serial(port, 1200, timeout=1)
        self.strength = 0
        self.firmware = ''
        thr = threading.Thread(target=RX320.read_thread, args=[self])
        thr.daemon = True
        thr.start()
        thr = threading.Thread(target=RX320.strength_thread, 
                               args=[self, sleep_time])
        thr.daemon = True
        thr.start()

    def read_thread(self):
        buf = []
        while True:
            ch = self.ser.read(1)
            if len(ch) == 1:
                ch = ord(ch)
                if ch == 10:
                    pass # ignore
                elif ch == 13:
                    if len(buf) > 0: 
                        self.handle_response(buf)
                    buf = []
                else:
                    buf.append(ch)

    def strength_thread(self, sleep_time):
        while True:
            self.send_get_strength()
            time.sleep(sleep_time)

    def handle_response(self, buf):
        if buf[0] == ord('\x58'):
            if len(buf) >= 3:
                self.strength = buf[1] * 256 + buf[2]
            else:
                pass # invalid strength response! (ignore)
        elif buf[0] == ord('\x5a'):
            pass # unrecognized command! (ignore)
        elif len(buf) > 3 and str(bytearray(buf[:3])) == 'VER':
            self.firmware = str(bytearray(buf))
        elif len(buf) > 3 and str(bytearray(buf[:3])) == 'DSP':
            pass # power on! (ignore)
        else:
            pass # unknown response! (ignore)

    def set_freq(self, freq, cwbfo=0):
        assert hasattr(self, 'mode')
        assert hasattr(self, 'filter')

        if self.mode != RX320.MODE_CW: 
            cwbfo = 0

        mcor = [0, 1, -1, -1][self.mode]
        fcor = RX320.FILTERS[self.filter]/2+200

        adjusted_freq = freq - 1250 + mcor*(fcor+cwbfo)

        coarse = int(18000 + (adjusted_freq // 2500))
        fine = int(5.46 * (adjusted_freq % 2500))
        bfo = int(2.73 * (fcor+cwbfo+8000))
        self.set_tuning(coarse, fine, bfo)

        self.freq = freq
        self.cwbfo = cwbfo

    def set_tuning(self, coarse, fine, bfo):
        assert coarse >= 0 and coarse < 65536         
        assert fine >= 0 and fine < 65536         
        assert bfo >= 0 and bfo < 65536                  
        self.ser.write(b'\x4e%c%c%c%c%c%c\x0d' %                  
                ((coarse>>8)&255, coarse&255, 
                (fine>>8)&255, fine&255, 
                (bfo>>8)&255, bfo&255))

        self.coarse = coarse
        self.fine = fine
        self.bfo = bfo

    def set_agc(self, agc):
        if agc >= 0 and agc <= 3:             
            self.ser.write(b'\x47%d\x0d' % agc)
            self.agc = agc     

    def set_mode(self, mode):         
        if mode >= 0 and mode <= 4: 
            self.ser.write(b'\x4d%d\x0d' % mode) 
            self.mode = mode 
    def set_filter(self, filter): 
        if filter >= 0 and filter <= 33:
            self.ser.write(b'\x57%c\x0d' % filter)
            self.filter = filter

    def set_line_volume(self, vol):
        if vol < 0: vol = 0 
        if vol > 63: vol = 63
        self.ser.write(b'\x41\x00%c\x0d' % vol)
        self.line_volume = vol

    def set_speaker_volume(self, vol):
        if vol < 0: vol = 0 
        if vol > 63: vol = 63
        self.ser.write(b'\x56\x00%c\x0d' % vol)
        self.speaker_volume = vol

    def set_volume(self, vol):
        if vol < 0: vol = 0 
        if vol > 63: vol = 63
        self.ser.write(b'\x43\x00%c\x0d' % vol)
        self.line_volume = vol
        self.speaker_volume = vol

    def send_get_firmware(self):
        self.firmware = ''
        self.ser.write(b'\x3f\x0d')

    def send_get_strength(self):
        self.ser.write(b'\x58\x0d')

The code is pretty self-explaining. Two threads are created that run in the background. They are daemon threads which means that they terminate automatically when the program ends. One thread will just read all data from the serial port and call handle_response with the response as a list of integers, the other will call send_get_strength in a loop, and wait for some time between calls. The default waiting time is 0.2 seconds.

Since the RX-320 does not have methods to get the current mode, agc mode, filter, line/speaker volume and frequency, these parameters are remembered by the controller and can be accessed as variables, e.g. controller.freq or controller.mode. To calculate the tuning parameters, knowledge of the current mode and filter is required. If you try to set the frequency but you did not set the mode and the filter, the program will fail the assertions on lines 70 and 71.

To read the firmware version, call send_get_firmware and wait until the firmware variable is set. To get the current strength, just read the strength variable.

from rx320 import RX320
import socket
import threading

class RX320Connection(threading.Thread):
    def __init__(self, connection, controller, *args, **kwargs):
        super(RX320Connection,self).__init__(*args, **kwargs)
        self.connection = connection
        self.controller = controller
        self.daemon = True
        self.start()

    def linesplit(self):
        socket = self.connection
        buffer = socket.recv(4096)
        done = False
        while not done:
            if "\n" in buffer:
                (line, buffer) = buffer.split("\n", 1)
                yield line.strip()
            else:
                more = socket.recv(4096)
                if not more:
                    done = True
                else:
                    buffer = buffer+more
        if buffer:
            yield buffer.strip()

    def run(self):
        try:
            for line in self.linesplit():
                result = self.handle(line.split())
                self.connection.sendall("%s\n" % result)
        finally:
            self.connection.close()

    def handle(self, command):
        if len(command) == 0:
            return "ERROR" 
        if command[0] == 'ALL' and len(command) == 4:
            # ALL freq mode filter
            self.controller.set_mode(int(command[2]))
            self.controller.set_filter(int(command[3]))
            self.controller.set_freq(int(command[1]))
            return "Done"
        elif command[0] == 'FREQ' and len(command) == 2:
            self.controller.set_freq(int(command[1]))
            return "Done"
        elif command[0] == 'VOL' and len(command) == 2:
            self.controller.set_speaker_volume(int(command[1]))
            return "Done"
        elif command[0] == 'LINEVOL' and len(command) == 2:
            self.controller.set_line_volume(int(command[1]))
            return "Done"
        elif command[0] == 'MODE' and len(command) == 2:
            self.controller.set_mode(int(command[1]))
            return "Done"
        elif command[0] == 'FILTER' and len(command) == 2:
            self.controller.set_filter(int(command[1]))
            return "Done"
        elif command[0] == 'AGC' and len(command) == 2:
            self.controller.set_agc(int(command[1]))
            return "Done"
        elif command[0] == 'GETMODE':
            if hasattr(self.controller, 'mode'):
                return str(self.controller.mode)
            else:
                return 'NA'
        elif command[0] == 'GETFILTER':
            if hasattr(self.controller, 'filter'):
                return str(self.controller.filter)
            else:
                return 'NA'
        elif command[0] == 'GETAGC':
            if hasattr(self.controller, 'agc'):
                return str(self.controller.agc)
            else:
                return 'NA'
        elif command[0] == 'GETSMETER':
            return str(self.controller.strength) 
        elif command[0] == 'GETVOL':
            if hasattr(self.controller, 'speaker_volume'):
                return str(self.controller.speaker_volume)
            else:
                return 'NA'
        elif command[0] == 'GETLINEVOL':
            if hasattr(self.controller, 'line_volume'):
                return str(self.controller.line_volume)
            else:
                return 'NA'
        elif command[0] == 'GETFREQ':
            if hasattr(self.controller, 'freq'):
                return str(self.controller.freq)
            else:
                return 'NA'
        return "ERROR"

if __name__ == '__main__':
    import optparse
    parser = optparse.OptionParser(
        usage = "%prog [options] device",
        description = "RX320 controller"
    )
    parser.add_option("-p", "--port",
        dest = "local_port",
        action = "store",
        type = "int",
        help = "TCP/IP port",
        default = 4665
    )
    parser.add_option("-s", "--sleep",
        dest = "sleep_time",
        action = "store",
        type = "float",
        help = "Seconds to wait between strength polls",
        default = 0.2
    )
    (options, args) = parser.parse_args()
    if len(args) != 1:
        parser.error('need device as argument, e.g. /dev/tty...')

    controller = RX320(args[0], options.sleep_time)
    # initialize controller
    controller.set_volume(99)
    controller.set_mode(RX320.MODE_LSB)
    controller.set_agc(RX320.AGC_MEDIUM)
    controller.set_filter(RX320.FILTERS.index(2100))
    controller.set_freq(3630000)
    controller.set_line_volume(16)
    controller.set_speaker_volume(96)

    print "Initialized RX320 '%s'" % args[0]

    srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    srv.bind(('', options.local_port))
    srv.listen(1)

    print "Waiting for connections on port %d..." % (options.local_port)

    while True:
        try:
            conn, addr = srv.accept()
            conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
            c = RX320Connection(conn, controller)
        except KeyboardInterrupt:
            break
        except socket.error, msg:
            pass

To run the program, use something like:

python rxserver.py -p 4665 -s 0.2 /dev/ttyUSB0

This starts a server on TCP/IP port 4665 using the serial port on USB0 (using a USB-to-serial adapter on the Raspberry Pi) to connect to the RS-320, and the signal strength will be read approximately 5 times per second.

One thought to “RX320 Controller”

Comments are closed.