win32comport_demo.py :  » Windows » pyExcelerator » pywin32-214 » win32 » Demos » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Windows » pyExcelerator 
pyExcelerator » pywin32 214 » win32 » Demos » win32comport_demo.py
# This is a simple serial port terminal demo.
#
# Its primary purpose is to demonstrate the native serial port access offered via
# win32file.

# It uses 3 threads:
# - The main thread, which cranks up the other 2 threads, then simply waits for them to exit.
# - The user-input thread - blocks waiting for a keyboard character, and when found sends it
#   out the COM port.  If the character is Ctrl+C, it stops, signalling the COM port thread to stop.
# - The COM port thread is simply listening for input on the COM port, and prints it to the screen.

# This demo uses userlapped IO, so that none of the read or write operations actually block (however,
# in this sample, the very next thing we do _is_ block - so it shows off the concepts even though it
# doesnt exploit them.

from win32file import *# The base COM port and file IO functions.
fromwin32event*# We use events and the WaitFor[Multiple]Objects functions.
win32con# constants.
msvcrt# For the getch() function.

import threading
import sys

def FindModem():
    # Snoop over the comports, seeing if it is likely we have a modem.
    for i in range(1,5):
        port = "COM%d" % (i,)
        try:
            handle = CreateFile(port,
                                           win32con.GENERIC_READ | win32con.GENERIC_WRITE,
                                           0, # exclusive access
                                           None, # no security
                                           win32con.OPEN_EXISTING,
                                           win32con.FILE_ATTRIBUTE_NORMAL,
                                           None)
            # It appears that an available COM port will always success here,
            # just return 0 for the status flags.  We only care that it has _any_ status
            # flags (and therefore probably a real modem)
            if GetCommModemStatus(handle) != 0:
                return port
        except error:
            pass # No port, or modem status failed.
    return None

# A basic synchronous COM port file-like object
class SerialTTY:
    def __init__(self, port):
        if type(port)==type(0):
            port = "COM%d" % (port,)
        self.handle = CreateFile(port,
                                           win32con.GENERIC_READ | win32con.GENERIC_WRITE,
                                           0, # exclusive access
                                           None, # no security
                                           win32con.OPEN_EXISTING,
                                           win32con.FILE_ATTRIBUTE_NORMAL | win32con.FILE_FLAG_OVERLAPPED,
                                           None)
        # Tell the port we want a notification on each char.
        SetCommMask(self.handle, EV_RXCHAR)
        # Setup a 4k buffer
        SetupComm(self.handle, 4096, 4096)
        # Remove anything that was there
        PurgeComm(self.handle, PURGE_TXABORT | PURGE_RXABORT | PURGE_TXCLEAR | PURGE_RXCLEAR )
        # Setup for overlapped IO.
        timeouts = 0xFFFFFFFF, 0, 1000, 0, 1000
        SetCommTimeouts(self.handle, timeouts)
        # Setup the connection info.
        dcb = GetCommState( self.handle )
        dcb.BaudRate = CBR_115200
        dcb.ByteSize = 8
        dcb.Parity = NOPARITY
        dcb.StopBits = ONESTOPBIT
        SetCommState(self.handle, dcb)
        print "Connected to %s at %s baud" % (port, dcb.BaudRate)

    def _UserInputReaderThread(self):
        overlapped = OVERLAPPED()
        overlapped.hEvent = CreateEvent(None, 1, 0, None)
        try:
            while 1:
                ch = msvcrt.getch()
                if ord(ch)==3:
                    break
                WriteFile(self.handle, ch, overlapped)
                # Wait for the write to complete.
                WaitForSingleObject(overlapped.hEvent, INFINITE)
        finally:
            SetEvent(self.eventStop)

    def _ComPortThread(self):
        overlapped = OVERLAPPED()
        overlapped.hEvent = CreateEvent(None, 1, 0, None)
        while 1:
            # XXX - note we could _probably_ just use overlapped IO on the win32file.ReadFile() statement
            # XXX but this tests the COM stuff!
            rc, mask = WaitCommEvent(self.handle, overlapped)
            if rc == 0: # Character already ready!
                SetEvent(overlapped.hEvent)
            rc = WaitForMultipleObjects([overlapped.hEvent, self.eventStop], 0, INFINITE)
            if rc == WAIT_OBJECT_0:
                # Some input - read and print it
                flags, comstat = ClearCommError( self.handle )
                rc, data = ReadFile(self.handle, comstat.cbInQue, overlapped)
                WaitForSingleObject(overlapped.hEvent, INFINITE)
                sys.stdout.write(data)
            else:
                # Stop the thread!
                # Just incase the user input thread uis still going, close it
                sys.stdout.close()
                break

    def Run(self):
        self.eventStop = CreateEvent(None, 0, 0, None)
        # Start the reader and writer threads.
        user_thread = threading.Thread(target = self._UserInputReaderThread)
        user_thread.start()
        com_thread = threading.Thread(target = self._ComPortThread)
        com_thread.start()
        user_thread.join()
        com_thread.join()

if __name__=='__main__':
    print "Serial port terminal demo - press Ctrl+C to exit"
    if len(sys.argv)<=1:
        port = FindModem()
        if port is None:
            print "No COM port specified, and no modem could be found"
            print "Please re-run this script with the name of a COM port (eg COM3)"
            sys.exit(1)
    else:
        port = sys.argv[1]

    tty = SerialTTY(port)
    tty.Run()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.