Real time Python via TCP (Part II)

TL;DR
From this post you’ll learn how to connect Tornado TCP Server to the Redis
Pub/Sub channel and send notifications to the subscribed clients. All code is on GitHub.

Roadmap

  1. In the first post we’ve learned how to build simple echo TCP Server in Tornado.
  2. In this post we’ll take a closer look at how to connect Tornado to Redis Pub/Sub mechanism in order to deliver data updates in real time.
  3. The final part will cover the implementation of the simple protocol that can control client-server interaction.

Redis Pub/Sub connection

Let’s add some more useful logic to our echo Server. Say, we have a service that delivers some updates and we want to push these updates to the clients as soon as we get them. This service might be updating scores of a sporting event, stock prices, news, weather forecast, etc.

The core of the problem is how do we connect that random service with our Tornado server. Each particular case might have a more appropriate solution, but there are some general approaches. One of them is using message broker. Basically, it’s a program that can transfer messages between other programs. It’s convenient because the data delivering service doesn’t need to be integrated with Tornado server, there’s no need to create such coupling.

We shall use Redis with its Pub/Sub mechanism here as one of the more robust and easy to use solutions available. The architecture of the system would look like this:

Server side

Client connection wrapper

In order to handle updates for each client, we’ll need a special class to store the socket connection as well as Redis update callback. Let’s call it ClientConnection:

class ClientConnection(object):
message_separator = b'\r\n'
def __init__(self, stream):
self._stream = stream
view raw server.py hosted with ❤ by GitHub

Here we store our message separator as we did in the server class previously. Each new instance is initialized with the Tornado stream object to keep connection independently from server class.

@gen.coroutine
def run(self):
while True:
try:
request = yield self._stream.read_until(
self.message_separator)
request_body = request.rstrip(self.message_separator)
except StreamClosedError:
self._stream.close(exc_info=True)
return
else:
response_body = request_body
response = response_body + self.message_separator
try:
yield self._stream.write(response)
except StreamClosedError:
self._stream.close(exc_info=True)
return
view raw server.py hosted with ❤ by GitHub

The logic from server’s handle_stream method described in previous post has moved to the run method of this class. Everything is basically the same, we just use our own instance of Tornado stream: self._stream in order to communicate via socket.

@gen.coroutine
def update(self, message):
response = message + self.message_separator
try:
yield self._stream.write(response)
except StreamClosedError:
self._stream.close(exc_info=True)
return
view raw server.py hosted with ❤ by GitHub

Now there’s the update method that we’ll pass to the Redis client as an update callback. It gets string message and for now just passes it further to the client. In order to connect to Redis, we’ll use aioredis asynchronous Redis client. It is designed to work with Python own asyncio library, but, thanks to Tornado’s smart IOLoop, we can get it working here as well.

TCP Server

So, let’s see how the server itself now looks like, shall we?

class Server(TCPServer):
def __init__(self, *args, **kwargs):
super(Server, self).__init__(*args, **kwargs)
self._redis = None
self._channel = None
self._connections = []
view raw server.py hosted with ❤ by GitHub

We added a couple of new parameters to the class initialization. First is self._redis for storing Redis client object instance, second is self._channel for storing the name of the Redis Pub/Sub channel.

subscribe is a new interface method for setting up Redis client connection and subscribing to the given channel. Here is how it looks like:

@asyncio.coroutine
def subscribe(self, channel_name):
self._redis = yield aioredis.create_redis(('localhost', 6379))
channels = yield self._redis.subscribe(channel_name)
print('Subscribed to "{}" Redis channel.'.format(channel_name))
self._channel = channels[0]
yield self.listen_redis()
view raw server.py hosted with ❤ by GitHub

Here we use aioredis factory method to get our instance of Redis client based on the host and port of the launched Redis server. Then we subscribe to the specified channel and get the list of channel objects. It doesn’t matter that we passed a single channel name, this method always returns list. In this case, we know it has only one element, so we take it and call listen_redis method:

@gen.coroutine
def listen_redis(self):
while True:
yield self._channel.wait_message()
try:
msg = yield self._channel.get(encoding='utf-8')
except aioredis.errors.ChannelClosedError:
print("Redis channel was closed. Stopped listening.")
return
if msg:
body_utf8 = msg.encode('utf-8')
yield [con.update(body_utf8) for con in self._connections]
print("Message in {}: {}".format(self._channel.name, msg))
view raw server.py hosted with ❤ by GitHub

The whole function is an infinite loop of reading messages from the channel via wait_message async method and then calling update methods of all stored client connections. Note the yield [con.update(body_utf8) for con in self._connections] line. Tornado, in contrast to asyncio, has the possibility to launch a list of coroutines in parallel. As we’re going to use Tornado Loop runner, we’re making use of it here.

And the new handler method is handle_stream:

@gen.coroutine
def handle_stream(self, stream, address):
connection = ClientConnection(stream)
self._connections.append(connection)
yield connection.run()
self._connections.remove(connection)
view raw server.py hosted with ❤ by GitHub

This is the place we instantiate ClientConnection and store it in our list of connections. We then launch its coroutine runner and remove from the list after it exits.

Launcher code has also changed:

if __name__ == '__main__':
AsyncIOMainLoop().install()
server = Server()
server.listen(5567)
IOLoop.current().spawn_callback(server.subscribe, 'updates')
print('Starting the server...')
asyncio.get_event_loop().run_forever()
print('Server has shut down.')
view raw server.py hosted with ❤ by GitHub

In order to make use of aioredis, we need to modify IOLoop to use asyncio loop under the hood. This is exactly what AsyncIOMainLoop().install() call makes. Then we instantiate the server in a similar fashion and launch server.subscribe coroutine by calling spawn_callback method of IOLoop. We can’t just call a coroutine to make it work. It has to be submitted to the event loop, that’s why such a call here. Finally, we launch the asyncio loop with its native run command: asyncio.get_event_loop().run_forever(). It’s a suggested method of wrapping asyncio and Tornado loops.

Client

The new client should be able to simultaneously read and write the data from socket since the updates may come at arbitrary times. So, we’ll need separate methods for reading and writing. Writing routine will wait on input and send anything submitted by user whereas reading routine will just consume everything appearing on the incoming end of the socket.

class Client(TCPClient):
msg_separator = b'\r\n'
def __init__(self):
super(Client, self).__init__()
self._stream = None
self._executor = ThreadPoolExecutor(1)
view raw client.py hosted with ❤ by GitHub

Some complexity comes here in a form of ThreadPoolExecutor that we’re going to use in order to asynchronously handle user input.

@gen.coroutine
def run(self, host, port):
self._stream = yield self.connect(host, port)
yield [self.read(), self.write()]
view raw client.py hosted with ❤ by GitHub

run method capitalizes on the fact that we can launch a list of coroutines and wait on them all to complete. It’s exactly what is needed in our case: we want to read and write simultaneously.

Now let’s see our IO functions:

@gen.coroutine
def read(self):
while True:
try:
data = yield self._stream.read_until(self.msg_separator)
body = data.rstrip(self.msg_separator)
print(body)
except StreamClosedError:
self.disconnect()
return
view raw client.py hosted with ❤ by GitHub

read: infinite socket reader. Strips data from end separator. Not much to dig into.

@gen.coroutine
def write(self):
while True:
try:
data = yield self._executor.submit(input)
encoded_data = data.encode('utf8')
encoded_data += self.msg_separator
yield self._stream.write(encoded_data)
except StreamClosedError:
self.disconnect()
return
view raw client.py hosted with ❤ by GitHub

write: infinite socket writer. self._executor.submit is an asynchronous call that doesn’t block reading loop while waiting on user keyboard input. Everything else is similar to what we’ve already had before.

Running the code

First things first. If we want to use Redis Pub/Sub, we need Redis to be running. On various operating systems there might be nuances in the process. However, the general guideline is simple and takes few commands. It’s well explained on the official page.

Here’s the cheat sheet of commands to be used:

Running the server:

$ redis-server

To be able to send messages to Redis instance we’ll also need to launch the Redis client:

$ redis-cli

After connecting to the Redis instance you’ll be able to send messages to any Pub/Sub channel of your desire like this:

127.0.0.1:6379> PUBLISH updates "follow the white rabbit"

The command will send the message “follow the white rabbit” to the “updates” channel. Everyone who is subscribed to it will receive the message. It’s the beautiful thing about Pub/Sub that you don’t need to know who will consume the messages you’re publishing. Anyone can just subscribe to the channel and start listening to the updates. It leads to the overall less coupling.

Launch the server with the same command as before:

$ python real_time/server.py
Starting the server...
Subscribed to "updates" Redis channel.

Let’s now connect to the server:

$ python real_time/client.py
Connecting to the server socket...

After sending some messages in Redis CLI with the PUBLISH command we’ll see something like this in the server terminal output:

Message in b'updates': follow the white rabbit
Message in b'updates': follow the white rabbit
Message in b'updates': follow the white rabbit

Client console might look something like this, provided you were sending some messages to the server in the meantime:

Connecting to the server socket...
message 1
b'message 1'
b'follow the white rabbit'
b'message 2'
b'follow the white rabbit'
b'follow the white rabbit'
message 3
b'message 3'

Conclusion

This is it for today, folks. I hope it was useful information for somebody. I’ve come up with such a solution while working on a Back End system for one iOS app back in the day, where we desperately wanted to deliver updates as fast as possible, without the need to ping the server every now and then. Mobile app had some kind of client socket code I’ve described here and was successfully consuming the Push data over TCP.

In the next post I’ll talk about the way to implement a simple protocol to control the behavior of such connection. Stay tuned and… don’t brawl, stay calm.

References:

  1. Publish-subscriber:
    1. wikipedia.org: Publish-subscriber pattern
    2. toptal.com: The Publish-Subscribe Pattern on Rails: An Implementation Tutorial
    3. redis.io: Pub/Sub
    4. stackoverflow.com: Recommended Python publish/subscribe/dispatch module?
  2. Books on topic:
    1. Redis Programming by Example
    2. Redis Essentials

Serge Mosin

https://www.databrawl.com/author/svmosingmail-com/

Pythonista, Data/Code lover, Apline skier and gym hitter.