Why am I getting requests.exceptions.ReadTimeout error?

I am attempting to build a distributed database (all from scratch) using Sanic. So far, things have been going quite well. The only problem I have encountered is the following: I have replicated database up and running. When a client performs a PUT to my database, the local replica has to broadcast that PUT (write) to the other replicas in the cluster. I do this by calling a function called “updateOtherInstances()” after performing the PUT operation in my PUT case code block. When I throw some concurrent requests at my data base, each going to different replicas (this system works in a peer-to-peer fashion, so each replica is a peer with the other replicas in the cluster) my ‘updateOtherInstances()’ function gets a ReadTimeout, even though I’m returning a response from the other end. Here’s my code:

Here’s my ‘updateOtherInstances()’ function

async def updateOtherInstances(dataDict):

    updateDict            = {}
    key                   = dataDict['key']
    value                 = dataDict['value']
    version               = dataDict['version']
    localVC               = vectorClock.returnVC()
    localIdNum            = identifierNumDict[str(os.environ['SOCKET_ADDRESS'])]
    updateDict.update({'key':key, 'value':value, 'causal-metadata':{'vectorclock':localVC, 'version':version, 'localIdNum':localIdNum}})

    
    payload      = json.dumps(updateDict)
    headers      = {'content-type':'application/json'}
    localAddress = str(os.environ['SOCKET_ADDRESS'])

    for x in range(len(viewListCopy)):

        url = "http://" + str(viewListCopy[x]) + "/dataStoreDispersal" 

        if(str(viewListCopy[x]) != localAddress):

            try:
                response = requests.put(url, headers=headers, data=payload, timeout=1)
                #requests.exceptions.ReadTimeout
            except(requests.exceptions.ConnectionError, requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout) as e:
                print("replica " + viewListCopy[x] + " is down or timed out.")
                downReplicaAddress = str(viewListCopy[x])

                if( downReplicaAddress in viewList ):

                    print("From " + localAddress + " updateOtherInstances funct: server " + str(viewListCopy[x]) + " is down or timed out.")
                    viewList.remove(viewListCopy[x])

                if( downReplicaAddress in scheduledUpdateDict ):

                    outgoingMissDict = scheduledUpdateDict[downReplicaAddress]
                    outgoingMissDict.update({key:{'value':value, 'version':version, 'ID':localIdNum}})
                    scheduledUpdateDict.update({downReplicaAddress:outgoingMissDict})

                if( downReplicaAddress not in scheduledUpdateDict):

                    scheduledUpdateDict.update( {downReplicaAddress:{key:{'value':value, 'version':version, 'ID':localIdNum}}} )   

The above function makes contact with other replicas in this function:

async def put(self, request):
        
        key             = request.json['key']
        incomingValue   = request.json['value']

        incomingCM      = request.json['causal-metadata']
        incomingID      = incomingCM['localIdNum']
        incomingVersion = incomingCM['version']
        incomingVC      = incomingCM['vectorclock']
        localVC         = vectorClock.returnVC()
        localAddress    = str(os.environ['SOCKET_ADDRESS'])
        

        if(vectorClock.VcComparator(incomingVC, localVC) == ">"):

            print("Hello from dataDisperse greater than case!")

            data.update({str(key):{'value':incomingValue, 'version':incomingVersion, 'ID':incomingID}})
            vectorClock.replaceVC(incomingVC)

        if(vectorClock.VcComparator(incomingVC, localVC) == "||"):

            if(key in data):

                print("Hello from dataDisperse 'key in data' case!!")

                localKeyData    = data[str(key)]
                localVersion    = localKeyData['version']
                localValue      = localKeyData['value']
                localKeyId      = localKeyData['ID']

                if(incomingVersion > localVersion):
                    data.update({str(key):{'value':incomingValue, 'version':incomingVersion, 'ID':incomingID}}) 
                if(incomingVersion < localVersion):
                    data.update({str(key):{'value':localValue, 'version':localVersion, 'ID':localKeyId}})
                if(incomingVersion == localVersion):
                    if(incomingID > localKeyId):
                        data.update({str(key):{'value':incomingValue, 'version':incomingVersion, 'ID':incomingID}})
                    if(incomingID < localKeyId):
                        data.update({str(key):{'value':localValue, 'version':localVersion, 'ID':localKeyId}})
                
            if(key not in data):

                data.update({str(key):{'value':incomingValue, 'version':incomingVersion, 'ID':incomingID}})

            vectorClock.updateVCDelivery(incomingVC)
        
            return response.json({"message":"Update handled."}, status=200)

And then here’s my PUT case (where the client actually makes contact with the replica:

async def put(self, request, key):

        print("Hello from dataOps PUT case!")
        
        updateDict   = {}
        value        = request.json['value']
        localVC      = vectorClock.returnVC()
        localAddress = str(os.environ['SOCKET_ADDRESS'])
        localID      = identifierNumDict[localAddress]
        #localVcValue = localVC[localAddress] + 1

        if(request.json['causal-metadata'] == ""):


            if(str(key) in data):

                keyData           = data[str(key)]
                localVersion      = keyData['version']
                updatedVersionNum = localVersion + 1
                dataDict          = {str(key): {'value': str(value), 'version': updatedVersionNum, 'ID':localID}}
                data.update(dataDict)
                updatedVcValue    = localVC[localAddress] + 1
                vectorClock.updateVC(localAddress, updatedVcValue)
                updateDict.update({'key':str(key), 'value':value, 'version':updatedVersionNum})

            
            if(str(key) not in data):
                
                dataDict       = {str(key): {'value': str(value), 'version': 1, 'ID':localID}}
                data.update(dataDict)
                updatedVcValue = localVC[localAddress] + 1
                vectorClock.updateVC(localAddress, updatedVcValue)
                updateDict.update({'key':str(key), 'value':value, 'version':1})

            
            await updateOtherInstances(updateDict)

            VC           = vectorClock.returnVC()
            keyData      = data[str(key)]
            version      = keyData['version']
            localAddress = str(os.environ['SOCKET_ADDRESS'])  

            return response.json({"message":"Added successfully", "causal-metadata": {'vectorclock':VC, 'key':key, 'version':version, 'last_addr_contacted':localAddress}}, status=200)

Anyone have any idea what is going on?

One obvious question is why are you using requests, which is a blocking library? You should be using something that will be more conducive to allowing async concurrency.

ahopkins, I love you man.

:heart:

I’d suggest httpx, which currently is already installed alongside Sanic.

Hello again…I am trying to utilize httpx, but I get the following error:

AttributeError: module ‘httpx’ has no attribute ‘AsyncClient’

when I make a request using httpx thusly:

async with httpx.AsyncClient() as client:
response = await client.put(url, headers=headers, data=payload, timeout=1)

The only relevant thing I can find online seems to be this:

https://github.com/huge-success/sanic/issues/1786

and it seems to be saying I may need to “bump up” the version of httpx, but I’m sorry for being such a newb, but I don’t know how to do this?

Any advice to offer, thank you so much?

What version of httpx are your running?

python -c "import httpx; print(httpx.__version__)"

Also, what version of Sanic are you running?

It looks like I am using httpx 0.9.3. As for Sanic, I’m not sure? I am using Sanic as base image though, so that I don’t have to deal with the issue of installing uvloop every time I run the docker container?

It make sense that I’m getting the error though, because httpx 0.9.3 doesn’t have the AsyncClient module, or so I gather from the that post I linked to? But how do I upgrade the httpx version?

If I am using Sanic as base image, does that mean I am running Sanic 19.12?

Okay I figured it out. Got rid of the ReadTimeout error by using httpx. Found a github post that mentioned that httpx 0.9.3 uses the “Client()” module instead of the “AsyncClient()” module, but they both provide the same async functionality. I just changed “AsyncClient()” to “Client()”, and voila, async requests, and ReadTimeout error gone.

Thanks for the steer again, ahopkins.