Tasks tutorial

The tasks module provides a simple alternative to writing multi-threaded applications, using Python's generator functions. In this tutorial, we create a simple network service that can handle connections from several clients simultaneously.

tasks

Introduction

You will often find that your application must perform several tasks at once. For example, when the user is download something from the network, they will still want to be able to use your application (if only to click the Cancel button).

There are four ways to do this:

  • Use a new thread for each task.
  • Use callbacks.
  • Use a recursive mainloop.
  • Use this tasks module.

Using threads causes a number of problems. Some builds of PyGTK/Python don't support them; they can easily introduce race conditions, often leading to many subtle bugs; and they require lots of resources (you probably wouldn't want 10,000 threads running at once). In particular, two threads can run at exactly the same time (perhaps on different processors), so you have to be really careful that they don't both try to update the same variable at the same time. This requires lots of messy locking, which is hard to get right.

Callbacks work within a single thread. For example, you open a dialog box and then tell the system to call one function if it's closed, and another if the user clicks OK, etc. The function that opened the box then returns, and the system calls one of the given callback functions later. Callbacks only execute one at a time, so you don't have to worry about race conditions. However, they are often very awkward to program with, because you have to save state somewhere and then pass it to the functions when they're called.

A recursive mainloop only works with nested tasks (you can create a sub-task, but the main task can't continue until the sub-task has finished). We use these for, eg, rox.alert() boxes since you don't normally want to do anything else until the box is closed, but it is not appropriate for long-running jobs.

Tasks use Python's generator feature to provide a more pleasant interface to callbacks.

Getting started

We'll begin with this simple program that displays a log window:

#!/usr/bin/env python
from __future__ import generators
 
import socket
 
import findrox; findrox.version(1, 9, 17)
import rox
from rox import tasks, g
 
class LogWindow(rox.Window):
        def __init__(self):
                rox.Window.__init__(self)
                self.set_title('Network logger')
 
                swin = g.ScrolledWindow()
                swin.set_policy(g.POLICY_AUTOMATIC, g.POLICY_ALWAYS)
                swin.set_shadow_type(g.SHADOW_IN)
                swin.set_border_width(4)
                self.add(swin)
 
                self.text = g.TextView()
                self.text.set_editable(False)
                self.text.set_cursor_visible(False)
                swin.add(self.text)
 
                self.set_default_size(400, 300)
                swin.show_all()
 
        def append(self, message):
                self.text.get_buffer().insert_at_cursor(message)
 
log = LogWindow()
log.show()
log.append('Started\n')
 
rox.mainloop()

There's nothing surprising here, it's just so we can see what's going on. When something interesting happens, we use log.append() to log the message. If you run the program now, it will just open a window with a single message saying that it has started.

The main server task

We're going to allow other programs to connect to our logger and log messages with it. There can be any number of these clients sending us messages at the same time. We'll start by creating a Task whose job is to accept new connections. Put this just before rox.mainloop():

tasks.Task(server_task())

Define server_task somewhere above this, as follows:

def server_task():
        server_port = 8123
 
        server_socket = socket.socket()
        server_socket.bind(('', server_port))
        server_socket.listen(5)
        log.append('Listening on port %d...\n' % server_port)
        while True:
                blocker = tasks.InputBlocker(server_socket)
                yield blocker
                client_socket, client_addr = server_socket.accept()
                log.append("Got connection from %s\n" % str(client_addr))
                client_socket.close()

This function contains the yield keyword, which makes it special: when the function is called (with server_task()) it doesn't actually execute any of the code. Instead, it immediately returns a Python iterator. Calling this iterator's next method actually runs the function. The tasks module calls this later (once the mainloop has started) to actually start the task running.

If you're not familiar with sockets code, read the python sockets module documentation. Basically, we create a new socket and set it to listen for incoming connections on port 8123 (any highish number will do). Then, we loop forever (while True). Inside the loop, we want to wait for something to happen on our new socket. We create an InputBlocker for the socket object. A Blocker waits until something happens (in this case, until the socket needs our attention).
We then yield the Blocker object back to the tasks module. This suspends our function mid-execution. The tasks module then allows the rest of the program to continue running as normal. When a client attempts to connect to our server, blocker is triggered. The tasks module then resumes the suspended function, which handles the connection request (logging the event and then, rather rudely, closing the new client's socket).

You can test the new server using telnet:

$ telnet localhost 8123
Trying 127.0.0.1...
Connected to localhost (127.0.0.1).
Escape character is '^]'.
Connection closed by foreign host.

Each time you do this, a message should be logged in the window.

Handling the clients

Rather than closing the new client's socket immediately, we will create a new Task for each one. Replace the close line with this:

tasks.Task(client_task(client_socket, client_addr))

Again, client_task is a generator function (a function containing a yield statement):

def client_task(client_socket, client_addr):
        while True:
                blocker = tasks.InputBlocker(client_socket)
                yield blocker
                data = client_socket.recv(1000)
                if data:
                        log.append(data)
                else:
                        log.append('Lost connection from %s\n' % str(client_addr))
                        client_socket.close()
                        break

This is similar to before. We create a new InputBlocker for the client's socket and yield it, causing the task to be suspended until the client sends us some data. Then, we read whatever we got (up to 1000 bytes at a time) and add it to the log. Getting nothing back from the recv is the kernel's way of telling us the client closed the connection. In that case, we close our end too and finish the task, logging the event.
You should now find that you can have any number of simultaneous telnet connections to the server without problems.

Points to note

  • If we'd done this using threads, we would have had to be very careful about locking, otherwise if two clients sent us data at the same time (or if we were handing a redraw event from the X server while one client sent us data) we would likely crash. Here, only one piece of python code can run at a time. The code has complete control of the program until it uses yield.
  • There wasn't too much state to store here, but if we wanted to have a dialogue with each client (eg, asking for a user name and password) then a callback-based solution would have quickly become complicated.
  • Each Blocker can only be triggered once. This is why we create a new InputBlocker each time round the loop.
    Should the blocker be given a name in the example, then? The name doesn't enable anything useful to be done, and it could be a slight attractive nuisance, in that the blocker is still around for another attempt at triggering. (Naming could be useful in more advanced cases, but this example need not be bound by those.)

    Correct: in this case, the name isn't needed (you could yield InputBlocker(stream) directly). However, you will often want to use the objects after the yield to find out which one was triggered (when yielding multiple blockers) or to extract some information about the event.

  • We could have used a single task, and had it yield multiple blockers at once. However, using several tasks is easier.

For more information, see the tasks API documentation.