Run different WebSocket framework rather than built-in websocket

Hi All,
I know that Sanic has built-in Websocket support. However, this socket is not compatible with my application’s requirements. I want to run a different websocket framework (https://github.com/Kitware/wslink) besides FastAPI (in which I can use RestAPI). I wonder if Sanic can support it ?

Thanks

You can, but you will need to do the work yourself. You will have to create a subclass of the HTTP protocol object, and hook into that. There’s an argument on the Sanic instance object to pass a custom protocol object. With this pattern you can make Sanic into any kind of server.

Thanks @ahopkins ,
Do you mean what I should do like this code ?

from wsproto import ConnectionType, WSConnection
from wsproto.events import AcceptConnection, CloseConnection, Message, Ping, Pong
from sanic import Sanic
from sanic.response import json
from sanic.server import HttpProtocol


class CustomHttpProtocol(HttpProtocol):
    def connection_made(self, transport):
        super().connection_made(transport)
        self.ws_connection = WSConnection(ConnectionType.SERVER)
        self.ws_connection.initiate_connection()
        self.transport.write(self.ws_connection.bytes_to_send())

    def data_received(self, data):
        # Parse incoming bytes into wsproto events
        self.ws_connection.receive_bytes(data)
        for event in self.ws_connection.events():
            if isinstance(event, AcceptConnection):
                # Do something when the connection is accepted
                print('WebSocket connection accepted')
            elif isinstance(event, CloseConnection):
                # Do something when the connection is closed
                print('WebSocket connection closed')
            elif isinstance(event, Message):
                # Handle incoming messages
                message = event.data.decode('utf-8')
                print(f'Received message: {message}')
                # Echo the message back to the client
                response_event = Message(data=event.data)
                self.ws_connection.send(response_event)
            elif isinstance(event, Ping):
                # Respond to ping
                pong_event = Pong(event.data)
                self.ws_connection.send(pong_event)
            elif isinstance(event, Pong):
                # Do something when receiving a pong
                print('Received pong')

        # Handle incoming HTTP data
        super().data_received(data)

        # Check for any outgoing data to send back to the client
        while True:
            outgoing_bytes = self.ws_connection.bytes_to_send()
            if not outgoing_bytes:
                break
            self.transport.write(outgoing_bytes)


app = Sanic(__name__)
app.config['WEBSOCKET_MAX_SIZE'] = 2 ** 25  # 32 MB
app.config['WEBSOCKET_MAX_QUEUE'] = 32

@app.route('/')
async def hello_world(request):
    return json({'message': 'Hello, world!'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000, protocol=CustomHttpProtocol)

yeah, exactly. You will need to intercept the upgrade request and pass that off to without letting the rest of Sanic handle it. Then you’ll need to interface with the i/O. Would be happy to see what you come up with.

Thank you for your quick response. But when I test the server by connecting my client to server socket at: ws://localhost:8000, the server said it cannot handle protocol “ws”, seems like it can accept http instead of socket

sorry but it seems like it does not work both websocket and restapi with above code right ?

I’m sorry, I hope this doesn’t come out wrong, but I cannot help you. Using a different module for handling websockets is not supported. It is absolutely possible since you control every aspect of the request i/o with that object. However, I do not have the bandwidth to help you debug it.

Going off on this tangent is unfortunately something you will have to develop without my help.

Hi @ahopkins ,
Sorry for my question of getting your help of debugging my code. I will handle all debugging, executing code by myself. But I just want to get your confirmation : Is this possible to run both external websocket framework and internal Sanic’s RestAPI in the same code base ?

yes, absolutely.

I would actually take a look at the websocket protocol we use. But, the older version. LMK if you have trouble locating it on GitHub.

I say the older version because the current one is a little lower level were we handle all the io. The older version pushes that of to the websockets lib, which I think it’s more what you want.

In sorry, when you start to decide the headers, you detect the upgrade and then push the handling and handshakes off to the lib.

In willing to help from a conceptual level. I just meant since it is such a unique use case I cannot take the time to choose that for you. I hope you understand.

Hi @ahopkins ,
Thanks for the information. I am digging deeper into the source code. I might have already found an old version which integrated old websocket lib into the Sanic. It’s in the commit 55a5ab4be154c35e51dced58ea12e9ca21786821 . In this commit 55a5ab4be154c35e51dced58ea12e9ca21786821, I tried to copy out your implementation

class WebSocketProtocol(HttpProtocol):
    def __init__(
        self,
        *args,
        websocket_timeout=10,
        websocket_max_size=None,
        websocket_max_queue=None,
        websocket_read_limit=2 ** 16,
        websocket_write_limit=2 ** 16,
        websocket_ping_interval=20,
        websocket_ping_timeout=20,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.websocket = None
        # self.app = None
        self.websocket_timeout = websocket_timeout
        self.websocket_max_size = websocket_max_size
        self.websocket_max_queue = websocket_max_queue
        self.websocket_read_limit = websocket_read_limit
        self.websocket_write_limit = websocket_write_limit
        self.websocket_ping_interval = websocket_ping_interval
        self.websocket_ping_timeout = websocket_ping_timeout

    # timeouts make no sense for websocket routes
    def request_timeout_callback(self):
        if self.websocket is None:
            super().request_timeout_callback()

    def response_timeout_callback(self):
        if self.websocket is None:
            super().response_timeout_callback()

    def keep_alive_timeout_callback(self):
        if self.websocket is None:
            super().keep_alive_timeout_callback()

    def connection_lost(self, exc):
        if self.websocket is not None:
            self.websocket.connection_lost(exc)
        super().connection_lost(exc)

    def data_received(self, data):
        if self.websocket is not None:
            # pass the data to the websocket protocol
            self.websocket.data_received(data)
        else:
            try:
                super().data_received(data)
            except HttpParserUpgrade:
                # this is okay, it just indicates we've got an upgrade request
                pass

    def write_response(self, response):
        if self.websocket is not None:
            # websocket requests do not write a response
            self.transport.close()
        else:
            super().write_response(response)

    async def websocket_handshake(self, request, subprotocols=None):
        # let the websockets package do the handshake with the client
        headers = {}

        try:
            key = handshake.check_request(request.headers)
            handshake.build_response(headers, key)
        except InvalidHandshake:
            raise InvalidUsage("Invalid websocket request")

        subprotocol = None
        if subprotocols and "Sec-Websocket-Protocol" in request.headers:
            # select a subprotocol
            client_subprotocols = [
                p.strip()
                for p in request.headers["Sec-Websocket-Protocol"].split(",")
            ]
            for p in client_subprotocols:
                if p in subprotocols:
                    subprotocol = p
                    headers["Sec-Websocket-Protocol"] = subprotocol
                    break

        # write the 101 response back to the client
        rv = b"HTTP/1.1 101 Switching Protocols\r\n"
        for k, v in headers.items():
            rv += k.encode("utf-8") + b": " + v.encode("utf-8") + b"\r\n"
        rv += b"\r\n"
        request.transport.write(rv)

        # hook up the websocket protocol
        self.websocket = WebSocketCommonProtocol(
            close_timeout=self.websocket_timeout,
            max_size=self.websocket_max_size,
            max_queue=self.websocket_max_queue,
            read_limit=self.websocket_read_limit,
            write_limit=self.websocket_write_limit,
            ping_interval=self.websocket_ping_interval,
            ping_timeout=self.websocket_ping_timeout,
        )
        # Following two lines are required for websockets 8.x
        self.websocket.is_client = False
        self.websocket.side = "server"
        self.websocket.subprotocol = subprotocol
        self.websocket.connection_made(request.transport)
        self.websocket.connection_open()
        return self.websocket


class WebSocketConnection:

    # TODO
    # - Implement ping/pong

    def __init__(
        self,
        send: Callable[[ASIMessage], Awaitable[None]],
        receive: Callable[[], Awaitable[ASIMessage]],
        subprotocols: Optional[List[str]] = None,
    ) -> None:
        self._send = send
        self._receive = receive
        self.subprotocols = subprotocols or []

    async def send(self, data: Union[str, bytes], *args, **kwargs) -> None:
        message: Dict[str, Union[str, bytes]] = {"type": "websocket.send"}

        if isinstance(data, bytes):
            message.update({"bytes": data})
        else:
            message.update({"text": str(data)})

        await self._send(message)

    async def recv(self, *args, **kwargs) -> Optional[str]:
        message = await self._receive()

        if message["type"] == "websocket.receive":
            return message["text"]
        elif message["type"] == "websocket.disconnect":
            pass

        return None

    receive = recv

    async def accept(self) -> None:
        await self._send(
            {
                "type": "websocket.accept",
                "subprotocol": ",".join(
                    [subprotocol for subprotocol in self.subprotocols]
                ),
            }
        )

    async def close(self) -> None:
        pass

And it works well. However one issue comes out. In your new Sanic, sanic-org/sanic/sanic/app.py ,

async def _websocket_handler(
        self, handler, request, *args, subprotocols=None, **kwargs
    ):
        if self.asgi:
            ws = request.transport.get_websocket_connection()
            await ws.accept(subprotocols)
        else:
            protocol = request.transport.get_protocol()
            ws = await protocol.websocket_handshake(request, subprotocols)

        # schedule the application handler
        # its future is kept in self.websocket_tasks in case it
        # needs to be cancelled due to the server being stopped
        fut = ensure_future(handler(request, ws, *args, **kwargs))
        self.websocket_tasks.add(fut)
        cancelled = False
        try:
            await fut
        except (CancelledError, ConnectionClosed):
            cancelled = True
        except Exception as e:
            self.error_handler.log(request, e)
        finally:
            self.websocket_tasks.remove(fut)
            if cancelled:
                ws.end_connection(1000)
            else:
                await ws.close()

Sanic framework let’s user to create its underlying implementation ws, but after that Sanic framework over controls that ws, for example it calls some of its functions from that like : ws.end_connection(1000), await ws.close() . However if the implementaion of ws (which comes from third-party websocket framework) does not have such functions, then the above code throws exception like mine (although It works, server can do ping-pong to with client)

[2023-04-25 09:28:46 +0700] [74489] [ERROR] Exception occurred while handling uri: 'ws://127.0.0.1:8000/feed'
Traceback (most recent call last):
  File "/home/itadmin/.cache/pypoetry/virtualenvs/testing-sanic-TJ8Glc6H-py3.9/lib/python3.9/site-packages/sanic/app.py", line 974, in handle_request
    response = await response
  File "/home/itadmin/.cache/pypoetry/virtualenvs/testing-sanic-TJ8Glc6H-py3.9/lib/python3.9/site-packages/sanic/app.py", line 1059, in _websocket_handler
    ws.end_connection(1000)
AttributeError: 'WebSocketCommonProtocol' object has no attribute 'end_connection'

Can you please help to give me more conceptual advice on how to control Sanic Framework more deeper rather than overriding HttpProtocol approach ?

Thanks

It should only be hitting that when it’s a websocket protocol. If you implement your own, that should not be an issue.