txObject ATK 1.3 for Python 
Author Message
 txObject ATK 1.3 for Python

Python Folks,

I have been a long user & lover of python since late 1992. Over the years I
have created a toolkit for developing platform-independent C++ software. I
have also created some useful Python interfaces to these tools (Timers, IO
notification & processing, Threads/Events/Lcoks (home grown & native), and
TCP Server etc..).

The end of this email has few Python test programs demonstrating some of the
Python Modules and tweak your interest.

Take a look at http://www.*-*-*.com/

See READMEs in tar balls to build txObject ATK 1.3 and the txPyObject ATK
1.3 Python Modules.

Enjoy and Happy Python Programming

Tom

############################################################################
###
##
## Threads, Events, Locks
##
############################################################################
###

import txLock  # locks are not used in this test, but feel free to use them
import txEvent
import txThread

EXIT_FLAG = 0
EXIT_EVENT = txEvent.txEvent("EXIT")

def func(*args):
    print 'args from func call : ', args

    while not EXIT_FLAG:
        print 'looping'
        txThread.yield(EXIT_EVENT, 1000)

    return

print 'Starting thread test'
txThread.start(func, ("abc", 123))

txThread.yield(10000) # wait ten seconds

EXIT_FLAG = 1

txEvent.trigger(EXIT_EVENT)

############################################################################
###
##
## Timers
##
############################################################################
###

import txTimer
import txEvent
import txThread
import txExtendedTimer # this is not used here, but feel free to use it

COUNT = 0

EXIT_EVENT = txEvent.txEvent("EXIT")

def callback (*args):
    global COUNT
    print 'timer fired : ', args

    if COUNT < 10:
        COUNT = COUNT + 1
        return txTimer.CONTINUE
    else:
        txEvent.trigger(EXIT_EVENT)

    return txTimer.STOP

timer = txTimer.txTimer(callback, ("abc", 123), 1000) # one second timer

txThread.yield(EXIT_EVENT)

############################################################################
###
##
## IO notification
##
############################################################################
###

import sys
import txSync
import txEvent
import txThread

COUNT = 0

EXIT_EVENT = txEvent.txEvent("EXIT")

def callback (*args):
    global COUNT

    read = raw_input("") # get data so to reset stdin fd state

    print 'io fired : ', args, read

    if COUNT < 10:
        COUNT = COUNT + 1
    else:
        txEvent.trigger(EXIT_EVENT)

    return

id = txSync.registerIO(callback,("abc",
123),sys.stdin.fileno(),txSync.IORead)

print '\nEntire something in the console window and then press turn...
repeat'

txThread.yield(EXIT_EVENT)

############################################################################
###
##
## TCP Server
##
############################################################################
###

import sys
import signal
import txEvent
import txThread
import txTCPServer

if len(sys.argv) != 5:
    print 'usage : local_machine local_port remote_machine remote_port'
    sys.exit(1)

local_machine = sys.argv[1]
local_port = eval(sys.argv[2])
remote_machine = sys.argv[3]
remote_port = eval(sys.argv[4])

EXIT_FLAG = 0
EXIT_EVENT = txEvent.txEvent("EXIT")

TCP_SERVER = txTCPServer.txTCPServer((local_machine, local_port))

def shutdown(*args):
    global EXIT_FLAG
    global EXIT_EVENT

    print 'Control-C'

    EXIT_FLAG = 1
    txEvent.trigger(EXIT_EVENT)
    return

def recvFrom(*args):
    while not EXIT_FLAG:
        messages = txTCPServer.recvFrom(TCP_SERVER) # get as many as
possible

        # num_of_mgs = 3 # block until 3 msgs have been received
        # messages = txTCPServer.recvFrom(TCP_SERVER, num_of_msgs)

        print 'Messages : ', messages
    return

def contactEstablish(*args):
    global EXIT_FLAG
    global EXIT_EVENT

    remote_machine, remote_port = args[0]

    print 'PEER UP : ', remote_machine, remote_port

    for i in range(1, 100):
        #
        # also see TCP SERVER multicast & broadcast
        #

        txTCPServer.sendTo( \
            TCP_SERVER, 'HELLO from ' + `local_port`, \
            (remote_machine, remote_port))

        txThread.yield(1000)

    EXIT_FLAG = 1
    txEvent.trigger(EXIT_EVENT)
    return

def contactLost(*args):
    global EXIT_FLAG
    global EXIT_EVENT

    remote_machine, remote_port = args[0]

    print 'PEER DOWN : ', remote_machine, remote_port
    return

signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

txTCPServer.registerContactEstablished(TCP_SERVER, contactEstablish)
txTCPServer.registerContactLost(TCP_SERVER, contactLost)

txThread.start(recvFrom) # start func on a thread to receive messages

txTCPServer.contact(TCP_SERVER, (remote_machine, remote_port))

while not EXIT_FLAG:
    # just a dummy loop for debugging. you can also just wait on an event
    txThread.yield(1000)



Fri, 04 Jul 2003 09:14:15 GMT  
 
 [ 1 post ] 

 Relevant Pages 

1. txObject ATK 1.3.5 Python released

2. txObject ATK 1.3.9

3. ANNOUNCE: txObject ATK 1.3.8

4. ANNOUNCE: mkWidgets 1.3, mkHttpd 1.0, mkTulip 1.0, mkZiplib 1.0, mkGeneric 1.3

5. ANN: Python Spread Module 1.3 Released

6. ANN: empy 1.3 -- Embed Python in template text as markup

7. ANN: Python Spread Module 1.3 Released

8. Refreshall from Python 1.3

9. Python with threads on Caldera 1.3

10. Trouble making Python 1.5 in OpenLinux 1.3

11. Python 1.3 to 1.4

12. Python/Grail Errors on Upgrading from 1.3 to 1.4

 

 
Powered by phpBB® Forum Software