Websockets - Documentation and best practices

Hi!

So I’m writing a real-life chat app as part of a bigger app, and I’d love to help make Sanic’s WebSockets documentation better (IMHO it can be significantly improved, eg there’s this gist by Adam https://gist.github.com/ahopkins/5b6d380560d8e9d49e25281ff964ed81 and a small section in the documentation- but this info might be not quite enough for a commercial product).
I read through the documentation and discussion, and here are two issues for which I am confused:

  1. While I understand that Adam said that Sanic is not opinionated(Overrride websocket events like Open, error, message,close using sanic - #5 by fyndpurav) in the context of WebSockets, I’m still not sure if my current approach is viable/principled.

In particular, since Websocket is bi-directional,
For my app I want to send messages to the client and get “acks” from the client, eg acknowledge receiving a particular message.
So for every message, there’s a unique ack id attached, and the client sends the server back an ack with the message’s specific ack if received.
For simplicity (see my confusion regarding pub/sub in my second question) and although it’s ugly, for now, let’s just use a single worker and a dict of connected_websockets.

This is how i register a websocket from the client:

@app.websocket("/register_client_websocket")
async def register_websocket(request, websocket):
    print("going to wait for user's info")
    message = await websocket.recv()
    user_data = json.loads(message)
    user_id = user_data['user_id']
    async with connected_websockets_lock:
        connected_websockets[user_id] = websocket
    print(f'Registered user {user_id}')
    try:
        request.app.add_task(get_acks(user_id=user_id, websocket=websocket))
        await websocket.keepalive_ping()
        print(f'connection with user {user_id} was closed')
        
        if user_id in connected_websockets and connected_websockets[user_id] == websocket:
            del connected_websockets[user_id]
        await websocket.close()
    finally:
        pass

From my understanding of Websockets documentation, when the register_websocket handler finishes, the WebSocket closes automatically. This is the reason I have this line:
await websocket.keepalive_ping()

Also notice that I’m calling the task get_acks so that someone will handle the incoming acks from the client. Here’s what it looks like:

async def get_acks(user_id, websocket):
    '''
    Handle acks for receiving acks from client by deleting ack id from the dict acks
    :param user_id:
    :param websocket:
    :return:
    '''
    try:
        while True:
            message = await websocket.recv()
            try:
                print(f'acks got the message {message}')
                ack = json.loads(message)
                if ack['message_type'] != 'ack':
                    continue 
                ack_id = ack['ack_id']
                async with acks_lock:
                    acks[user_id].remove(ack_id)
            except:
                pass
    except:
        return

I’m not sure if my approach is principled. Something feels off - and there’s no real-life example or tutorial on this matter (I’d love to write it with you guys).

  1. Pub/Sub:

As I read through Adam’s gist example, I realized that I don’t fully understand what will happen when using pub/sub.

-To my understanding, each worker process may get any client (in the gist example by Adam, a few workers might have the same channel with different clients for example, right?)

For simplicity let’s look at a private chat app, eg assume channel_name = <user_id> and each such “channel” may have a single user in it or no users in it.
You recommend using pub/sub, in that paradigm a publisher might not know if there are any subs. So it seems like the only way for me to know if the client got the message through a Websocket (otherwise I send the message via a different push mechanism, in my case FCM) is to use some kind of acks mechanism, where I’ll have to handle Redis synchronization(since one worker might remove some ack from the same key[user_id] while another adds some ack to the same key). And yet, this feels wrong. Your advice?

Yes, this is correct.

Let’s do it.

In what? I am not sure sending an ack for every message is necessary. The socket has already been open and checking that is what ping/pong are for. But, if you want the guarantee I see no harm in it.

This is the whole point. If you are running single server than a lot of that is not necessary and it becomes much more simple. You can store all of your clients in a single set() for example in memory. But, once you have multiple servers, there is no guarantee that multiple clients will be on the same worker. Therefore the only way to make sure that they can communicate is to use something to synchronize your workers. I choose pubsub because it is relatively simple to implement and very efficient.

In that example, a channel is meant to be a shared resource that the clients want to communicate on. Think of it as a chatroom perhaps.

That seems very complicated for a simple chat. You send the message and just can assume it was received. IF you need an ack back that EVERY single client in the channel received it, then you would want to store that probably per sending client. Before publishing you would need to get a guarantee of exactly which clients are on the channel and then wait for them to come back with an ack; potentially resending messages after some time period if no ack is received. HOWEVER, what happens if a client disconnects? Now you need to notify each message state of the disconnection, or conversely check upon non-ack if the client still exists. Yikes. This sounds like a very complicated protocol. Certainly doable, and I suppose for some use case it might be necessary.

“You send the message and just can assume it was received.”

“what happens if a client disconnects?”

The socket has already been open

The main point in my question (that I still don’t understand)
is this:
In pub/sub, I send the message by publishing it on a channel.
I’m not an expert on pub/sub so I might have missed something, but to my understanding, the pubs are decoupled from the subs, and that is why on principle I think that I cannot know directly from the pub/sub API who is connected or if a websocket even exists- I might be wrong of course.

So…why would I assume it was received?
What are the alternatives to my approach?
(what that I’m describing is a very common use-case,eg any app which supports private chat).

In my app, I’m using websockets when the client is currently online, and FCM when the client is not connected right now. Most of the time the client will not be online, and therefore no websocket.

Yikes. This sounds like a very complicated protocol.

Indeed complicated.

The protocol I’m actually using right now is simple though:
if there’s no ack from the receiver for that particular message after some time(say 1 second) I just use FCM (firebase cloud messaging) where the message will be eventually sent even if the phone is off for a couple of days.

That’s simple enough, but feels wrong. I gather from your answer that you feel that as well, but I still don’t know/understand the alternative.

You can store all of your clients in a single set() for example in memory.

You mean for a single worker(=process) right? But I want to run a few workers on a single server.

Let’s do it.

As soon as I’ll wrap my head around this, I’ll get started :slight_smile:

Lots going on here - your application should manage pubs and subs appropriately for your use case. In many cases, that means that the application is subscribed to the same channel it publishes on. This is usually the case for chat “rooms.” If you take every message that delivered via subscription to the app and send it onward to every client, you can assume it was received unless you get an error.

You cannot assume it was received if the client is not connected, which is a different problem. In that case, your pub/sub mechanism needs to support journaliing, and you would need to pull messages from the journaled subscription channel in a certain range like forever, last day, last connection. This is one of the problems that Apache Kafka was built to solve.

Well, you don’t really need to ack a message. That’s inherently built into the websocket protocol - if the message you’re sending to the client with ws.send() fails, an exception is raised. Almost certainly you can lose the client acks with no penalty, and just check to make sure an exception is not raised - which you should be doing already because you’d want to deregister the client from the server in that case.

I’d suggest using a KV store, which depending on the pubsub backend you’re using, may be a feature there. If you want all clients to be aware of each other, you will need some way to share that among workers. If you’re not using one that does that, consider this plugin: https://github.com/ashleysommer/sanic-synchro-ctx

If you don’t care about the clients being aware of each other, then so long as the server is subscribed to the same channel for each worker, you should be good to go.

Finally, Sanic’s websocket implementation is built on websockets and I highly recommend doing their full tutorial to help wrap your head around websockets in python: Getting started - websockets 10.1 documentation

3 Likes