Moved bootloader parts into 'bootloader', Added python flash software from...

Moved bootloader parts into 'bootloader', Added python flash software from http://www.kreatives-chaos.com/artikel/can-bootloader
parent abfdd589
This diff is collapsed.
#!/usr/bin/env python
import threading
import Queue
import uspp.uspp as serial
import message_dispatcher as dispatcher
# ------------------------------------------------------------------------------
class CanException(Exception):
pass
# ------------------------------------------------------------------------------
class Message:
"""Class representation of a CAN message"""
def __init__(self, id, data=[], extended=True, rtr=False, timestamp=None):
self.id = id
self.extended = extended
self.rtr = rtr
self.data = data
self.timestamp = timestamp
def __str__(self):
"""Create a string representation of the message"""
buf = []
if self.timestamp != None:
buf.append("[%6.1f] " % (self.timestamp / 10.0))
if self.extended:
buf.append("id: %08x dlc: %d >" % (self.id, len(self.data)))
else:
buf.append("id: %8x dlc: %d >" % (self.id, len(self.data)))
if self.rtr:
buf.append("rtr")
else:
for data in self.data:
buf.append("%02x" % data)
return " ".join(buf)
# ------------------------------------------------------------------------------
class SerialInterface:
"""Abstract base class for a CAN interface.
Uses two threads, one to receive from and the other to transmit messages
to the serial port.
"""
MESSAGE = 0
RAW = 1
def __init__(self, port = None, baud = 9600, debug = False):
self.port = port
self.baudrate = baud
self.debugFlag = debug
self.isConnected = False
self.__receiverStopEvent = threading.Event()
self.__receiveQueue = Queue.Queue()
self._buf = []
def __del__(self):
self.disconnect()
def send(self, message):
"""Send a message"""
self._interface.write(self._encode(message))
def get(self, block = True, timeout = None):
"""Get the last received message
Wait for a new message if there is no one in the queue and "block"
is set True."""
return self.__receiveQueue.get(block, timeout)
def connect(self, port = None, baud = None, debug = None):
"""Connect to a serial Port"""
# close an existing connection (if there is any)
if self.isConnected:
self.disconnect()
self.port = port if port else self.port
self.baudrate = baud if baud else self.baudrate
self.debugFlag = debug if debug else self.debugFlag
# open serial port
try:
self._interface = serial.SerialPort(self.port, timeout = 200, speed = self.baudrate)
self._interface.flush()
except serial.SerialPortException:
raise CanException("could not connect to %s" % self.port)
self.__receiverStopEvent.clear()
# start the receiver thread
self.__receiverThread = threading.Thread(target = self.__receive)
self.__receiverThread.start()
self.isConnected = True
def disconnect(self):
"""Disconnect from the serial port"""
if not self.isConnected:
return
# send a stop event
self.__receiverStopEvent.set()
# wait for the two threads to stop their work
self.__receiverThread.join()
# close serial port
try:
del self._interface
except serial.SerialPortException, e:
raise CanException(e)
self.isConnected = False
def _debug(self, text):
if self.debugFlag:
print text
def _sendRaw(self, data):
self._interface.write(data)
def _decode(self, chr):
"""Collects and decodes messages"""
pass
def _encode(self, message):
"""Transform a CAN message to a byte stream"""
pass
def __receive(self):
"""Receiver Thread
Try to read and decode messages from the serial port.
"""
while not self.__receiverStopEvent.isSet():
try:
msg = self._decode(self._interface.read())
if msg:
#self.__receiveQueue.put(msg)
self._processMessage(msg)
except serial.SerialPortException:
self.__receiverStopEvent.wait(0.001)
# ------------------------------------------------------------------------------
class Usb2Can(SerialInterface, dispatcher.MessageDispatcher):
"""Interface for all devices compatible with the CAN232 from Lawicel
see http://www.can232.com/can232.pdf for further information
"""
def __init__(self, port = None, baud = 9600, debug = False):
SerialInterface.__init__(self, port, baud, debug)
dispatcher.MessageDispatcher.__init__(self)
def connect(self, port = None, baud = None, debug = None):
SerialInterface.connect(self, port, baud, debug)
# set bitrate and open the channel
self._sendRaw("S4\r")
#self._sendRaw("S6\r")
self._sendRaw("O\r")
def _decode(self, chr):
if chr != '\r':
self._buf.append(chr)
return None
else:
data = ''.join(self._buf)
self._buf = []
if not data:
return None
type = data[0]
if type == 'T':
# extended frame
data2 = data[10:]
message_data = []
for x in range(0, len(data2), 2):
message_data.append(int(data2[x:x + 2], 16))
message = Message( int(data[1:9], 16), message_data, extended = True, rtr = False )
elif type == 't':
data2 = data[5:]
message_data = []
for x in range(0, len(data2), 2):
message_data.append(int(data2[x:x + 2], 16))
message = Message( int(data[1:4], 16), message_data, extended = False, rtr = False )
elif type == 'R':
message = Message( int(data[1:9], 16), [None] * int(data[9]), extended = True, rtr = True )
elif type == 'r':
message = Message( int(data[1:4], 16), [None] * int(data[4]), extended = False, rtr = True )
else:
# all other frame-types are not supported
return None
#only dump frames which pass the acceptance filter. This is done in Bootloader._get_message()
#self._debug("> " + str(message))
return message
def _encode(self, message):
buf = []
self._debug("< " + str(message))
if message.rtr:
if message.extended:
buf.append("R%08x%01x" % (message.id, len(message.data)))
else:
buf.append("r%03x%01x" % (message.id, len(message.data)))
else:
if message.extended:
buf.append("T%08x%01x" % (message.id, len(message.data)))
else:
buf.append("t%03x%01x" % (message.id, len(message.data)))
for data in message.data:
buf.append("%02x" % data)
buf.append("\r")
return ''.join(buf)
# ------------------------------------------------------------------------------
class CanDebugger(SerialInterface, dispatcher.MessageDispatcher):
"""Interface to the Command Shell from the CAN Debugger"""
def __init__(self, port, baud = 9600, debug = False):
SerialInterface.__init__(self, port, baud, debug)
dispatcher.MessageDispatcher.__init__(self)
import re
self.regularExpression = re.compile("^[$ ]*(?P<timestamp>\d+):[ ]+(?P<id>\w+)[ ](?P<len>\d)(( rtr)|(( >)?(?P<data>( \w\w)*)))", re.IGNORECASE)
def connect(self, port = None, baud = None, debug = None):
SerialInterface.connect(self, port, baud, debug)
# set filter to receive all messages
self._sendRaw("set filter 0 0 0\r")
self._sendRaw("set filter 1 0 0\r")
self._sendRaw("set filter 2 0 0\r")
self._sendRaw("set filter 3 0 0\r")
def _decode(self, chr):
if chr != '\n':
self._buf.append(chr)
else:
result = self.regularExpression.match(''.join(self._buf))
self._buf = []
if result:
dict = result.groupdict()
data = dict['data']
if data:
msg_data = []
while data:
msg_data.append(int(data[1:3], 16))
data = data[3:]
rtr = False
else:
msg_data = [None] * int(dict['len'])
rtr = True
id = int(dict['id'], 16)
extended = True if len(dict['id']) > 3 else False
timestamp = int(dict['timestamp'], 10)
# create message
message = Message(id, msg_data, extended = extended, rtr = rtr, timestamp = timestamp)
self._debug("> " + str(message))
return message
return None
def _encode(self, message):
buf = ["> "]
self._debug("< " + str(message))
if message.extended:
buf.append("%04x %d" % (message.id, len(message.data)))
else:
buf.append("%x %d" % (message.id, len(message.data)))
if message.rtr:
buf.append(" rtr\r")
else:
buf.append(" ")
for byte in message.data:
buf.append("%02x" % byte)
buf.append("\r")
return ''.join(buf)
# ------------------------------------------------------------------------------
class DebugInterface(SerialInterface, dispatcher.MessageDispatcher):
"""Prints every message without sending it to a serial interface"""
def __init__(self, port = None, baud = 9600):
dispatcher.MessageDispatcher.__init__(self, None)
def connect(self):
pass
def disconnect(self):
pass
def send(self, message):
print message
def sendRaw(self, data):
pass
def get(self, block, timeout):
while 1:
pass
#!/usr/bin/env python
# -----------------------------------------------------------------------------
class MessageDispatcher:
def __init__(self, filterList = None):
"""Constructor"""
self.filter = []
if filterList:
for filter in filterList:
self.addFilter(filter)
def addFilter(self, filter):
"""Add a filter
The filter-object must feature a check(message) method which returns
True or False whether the callback should be called or not and a
getCallback() method to retrieve this callback function.
"""
self.filter.append(filter)
def removeFilter(self, filter):
"""Remove this Filter"""
self.filter.remove(filter)
def send(self, message):
pass
def _processMessage(self, message):
"""Check all filter for this message and call the callback
functions for those how matches.
"""
for filter in self.filter:
if filter.check(message):
self._executeCallback(filter.getCallback(), message)
def _executeCallback(self, callback, message):
"""Call a callback function."""
callback(message)
#!/usr/bin/env python
# -----------------------------------------------------------------------------
class BaseFilter:
"""Base class for message filter
Will accept all messages if used.
"""
def __init__(self, callback):
self.callback = callback
def getCallback(self):
return self.callback
def check(self, message):
"""Checks if a message matches the criteria"""
return True
# -----------------------------------------------------------------------------
class AttributeFilter(BaseFilter):
"""Universal filter that checks whether the attributes of the message class
meets a list of criteria.
"""
def __init__(self, callback, criteria):
"""
- 'criteria' is a list of (attribute name, value) tuples which
will be tested. If the message matches all attributes it will be
accepted and the callback will be executed."""
BaseFilter.__init__(self, callback)
self.criteria = criteria
def check(self, message):
"""Checks if a message matches the criteria"""
try:
for attr, value in self.criteria:
if getattr(message, attr) != value:
return False
except AttributeError:
return False
return True
# -----------------------------------------------------------------------------
class CanFilter(BaseFilter):
"""A Filter specifically suited for CAN messages
"""
def __init__(self, callback, id = 0, extended = True, rtr = False):
BaseFilter.__init__(self, callback)
self.id = id
self.extended = extended
self.rtr = rtr
def check(self, message):
if message.id == self.id and \
message.extended == self.extended and \
message.rtr == self.rtr:
return True
return False
# -----------------------------------------------------------------------------
if __name__ == '__main__':
import time
class Message:
def __init__(self):
self.id = 1000
self.extended = False
self.rtr = False
filter1 = AttributeFilter(None, [["id", 1000], ["extended", False], ["rtr", False]])
filter2 = CanFilter(None, id = 1000, extended = False)
msg = Message()
times = 100000
start = time.time()
for i in xrange(times):
filter1.check(msg)
mid = time.time()
for i in xrange(times):
filter2.check(msg)
end = time.time()
print mid - start
print end - mid
USPP Library (Universal Serial Port Python Library)
Copyright (C) 2006 Isaac Barona Martínez <ibarona@tid.es>
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
The complete text of the GNU Lesser General Public License can be found in
the file 'lesser.txt'.
USPP Library
Universal Serial Port Python Library
Copyright 2006 Isaac Barona <ibarona@gmail.com>
Contents
--------
1. Introduction
2. Motivation
3. Where can I find it
4. Features
5. Pre-requisites
6. Usage and documentation
7. Where does it work
8. Known problems
9. To-do list
10. Porting to other platforms
11. Licence
12. Author
13. Version
1. Introduction
---------------
USPP Library is a multi-platform Python module to access serial ports. At the
moment, it only works in Windows, Linux and MacOS but as it is written entirely
in Python (doesn't wrap any C/C++ library) I hope you can extend it to support
any other platforms.
2. Motivation
-------------
I like very much to make electronic widgets with microcontrollers, specially
those that can be connected to the computer to send and receive data.
Some months ago, I discovered Python and inmediatelly liked it a lot.
I started playing with it and saw that I could use it to make prototypes of
comunication protocols between computer and microcontrollers really
fast and easily than using C. At the same time, I was interested in
working over different platforms.
I started looking for Python modules to access serial port and I found
the following projects:
* win32comport_demo from the win32 extension module
* win32comm module of wheineman@uconect.net
* Sio Module of Roger Rurnham (rburnham@cri-inc.com)
* pyxal (Python X10 Abstraction Layer) of Les Smithson
(lsmithson@open-networks.co.uk)
but they were not multi-platform, were just a wrap of propietary libraries or
were just simple examples of serial port access.
For these reasons and also for learning more Python, I decided to start
this project. Of course, I have used all this projects as reference to
my module and so, I want to thanks the authors for their excellent work and
for allowing us to study the code.
I hope you enjoy using the uspp module as much as I am enjoying doing it.
3. Where can I find it
----------------------
You may find it at:
* http://ibarona.googlepages.com/uspp
* http://www.telefonica.net/web/babur
as a tar.gz package or as a winzip file.
4. Features
-----------
This module has the following features:
- hight level access to serial port under several platforms.
- autodetects the platform in which it is working and exports
the correct classes for that platform.
- object oriented approach.
- file object semantic operations over the ports.
- allows using the serial port with different speeds and
characteristics.
- RS-232 and RS-485 modes (now only RS-232). In RS-485 mode
the communication is half-duplex and uses the RTS line
to control the direction of the transference.
- blocking, non-blocking or configurable timeout reads.
5. Prerequisites
----------------
You need the following to use the library:
- Python 2.1 or better
- In windows you need the win32 extension modules
6. Usage and documentation
--------------------------
You only have to import in your program the uspp module and automatically
it loads the correct classes for the platform in which you are running
the program.
First of all you have to create a SerialPort object with the settings you
want. If a SerialPortException is not generated then you just can
use the read and write methods of the object to read and write to
the serial port.
Example:
>>> from uspp import *
>>> tty=SerialPort("COM2", 1000, 9600)
>>> # Opens COM2 at 9600 bps and with a read timeout of 1 second.
>>> tty.write("a") # Writes a character to the COM2 port
>>> # Now suppose we receive the string "abc"
>>> tty.inWaiting()
3
>>> tty.read()
'a'
>>> tty.inWaiting()
2
>>> tty.read(2)
'bc'
Documentation of the different classes and methods can be found on
uspp module docstring.
7. Where does it work
---------------------
The library has been tested in Windows 95, Windows XP and Windows 2000
machines with Python 2.1+ and in a Linux (2.0.34 kernel) machine with
Python 2.1+.
8. Known problems
-----------------
9. To-do list
-------------
This is the to-do list:
- implement RS-485 mode.
- port the library to other platforms so that it can be really
multi-platform.
10. Porting to other platforms
-----------------------------
If you want to port the library to other platforms you only have to follow
these steps:
* Create a new python file called SerialPort_XXXX.py in which you
implement the same public classes and methods found in the SerialPort_win
and SerialPort_linux modules.
* Append the new platform to the uspp.py file.
11. Licence
----------
This code is released under the "LGPL" that can be found in
http://www.gnu.org/copyleft/lesser.html or in the lesser.txt file that
is with the library.
If you use this software, I'd like to know about it.
12. Author
---------
This library has been created by Isaac Barona Martinez <ibarona@gmail.com>.
13. Version
----------
0.1 - 09/01/2001 (September 2001)
0.2 - 05/13/2003
1.0 - 02/24/2006
This diff is collapsed.
This diff is collapsed.
# -*- coding: iso-8859-1 -*-
##########################################################################
# USPP Library (Universal Serial Port Python Library)
#
# Copyright (C) 2006 Isaac Barona <ibarona@gmail.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
##########################################################################