I am attempting to build a distributed database (all from scratch) using Sanic. So far, things have been going quite well. The only problem I have encountered is the following: I have replicated database up and running. When a client performs a PUT to my database, the local replica has to broadcast that PUT (write) to the other replicas in the cluster. I do this by calling a function called “updateOtherInstances()” after performing the PUT operation in my PUT case code block. When I throw some concurrent requests at my data base, each going to different replicas (this system works in a peer-to-peer fashion, so each replica is a peer with the other replicas in the cluster) my ‘updateOtherInstances()’ function gets a ReadTimeout, even though I’m returning a response from the other end. Here’s my code:
Here’s my ‘updateOtherInstances()’ function
async def updateOtherInstances(dataDict):
updateDict = {}
key = dataDict['key']
value = dataDict['value']
version = dataDict['version']
localVC = vectorClock.returnVC()
localIdNum = identifierNumDict[str(os.environ['SOCKET_ADDRESS'])]
updateDict.update({'key':key, 'value':value, 'causal-metadata':{'vectorclock':localVC, 'version':version, 'localIdNum':localIdNum}})
payload = json.dumps(updateDict)
headers = {'content-type':'application/json'}
localAddress = str(os.environ['SOCKET_ADDRESS'])
for x in range(len(viewListCopy)):
url = "http://" + str(viewListCopy[x]) + "/dataStoreDispersal"
if(str(viewListCopy[x]) != localAddress):
try:
response = requests.put(url, headers=headers, data=payload, timeout=1)
#requests.exceptions.ReadTimeout
except(requests.exceptions.ConnectionError, requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout) as e:
print("replica " + viewListCopy[x] + " is down or timed out.")
downReplicaAddress = str(viewListCopy[x])
if( downReplicaAddress in viewList ):
print("From " + localAddress + " updateOtherInstances funct: server " + str(viewListCopy[x]) + " is down or timed out.")
viewList.remove(viewListCopy[x])
if( downReplicaAddress in scheduledUpdateDict ):
outgoingMissDict = scheduledUpdateDict[downReplicaAddress]
outgoingMissDict.update({key:{'value':value, 'version':version, 'ID':localIdNum}})
scheduledUpdateDict.update({downReplicaAddress:outgoingMissDict})
if( downReplicaAddress not in scheduledUpdateDict):
scheduledUpdateDict.update( {downReplicaAddress:{key:{'value':value, 'version':version, 'ID':localIdNum}}} )
The above function makes contact with other replicas in this function:
async def put(self, request):
key = request.json['key']
incomingValue = request.json['value']
incomingCM = request.json['causal-metadata']
incomingID = incomingCM['localIdNum']
incomingVersion = incomingCM['version']
incomingVC = incomingCM['vectorclock']
localVC = vectorClock.returnVC()
localAddress = str(os.environ['SOCKET_ADDRESS'])
if(vectorClock.VcComparator(incomingVC, localVC) == ">"):
print("Hello from dataDisperse greater than case!")
data.update({str(key):{'value':incomingValue, 'version':incomingVersion, 'ID':incomingID}})
vectorClock.replaceVC(incomingVC)
if(vectorClock.VcComparator(incomingVC, localVC) == "||"):
if(key in data):
print("Hello from dataDisperse 'key in data' case!!")
localKeyData = data[str(key)]
localVersion = localKeyData['version']
localValue = localKeyData['value']
localKeyId = localKeyData['ID']
if(incomingVersion > localVersion):
data.update({str(key):{'value':incomingValue, 'version':incomingVersion, 'ID':incomingID}})
if(incomingVersion < localVersion):
data.update({str(key):{'value':localValue, 'version':localVersion, 'ID':localKeyId}})
if(incomingVersion == localVersion):
if(incomingID > localKeyId):
data.update({str(key):{'value':incomingValue, 'version':incomingVersion, 'ID':incomingID}})
if(incomingID < localKeyId):
data.update({str(key):{'value':localValue, 'version':localVersion, 'ID':localKeyId}})
if(key not in data):
data.update({str(key):{'value':incomingValue, 'version':incomingVersion, 'ID':incomingID}})
vectorClock.updateVCDelivery(incomingVC)
return response.json({"message":"Update handled."}, status=200)
And then here’s my PUT case (where the client actually makes contact with the replica:
async def put(self, request, key):
print("Hello from dataOps PUT case!")
updateDict = {}
value = request.json['value']
localVC = vectorClock.returnVC()
localAddress = str(os.environ['SOCKET_ADDRESS'])
localID = identifierNumDict[localAddress]
#localVcValue = localVC[localAddress] + 1
if(request.json['causal-metadata'] == ""):
if(str(key) in data):
keyData = data[str(key)]
localVersion = keyData['version']
updatedVersionNum = localVersion + 1
dataDict = {str(key): {'value': str(value), 'version': updatedVersionNum, 'ID':localID}}
data.update(dataDict)
updatedVcValue = localVC[localAddress] + 1
vectorClock.updateVC(localAddress, updatedVcValue)
updateDict.update({'key':str(key), 'value':value, 'version':updatedVersionNum})
if(str(key) not in data):
dataDict = {str(key): {'value': str(value), 'version': 1, 'ID':localID}}
data.update(dataDict)
updatedVcValue = localVC[localAddress] + 1
vectorClock.updateVC(localAddress, updatedVcValue)
updateDict.update({'key':str(key), 'value':value, 'version':1})
await updateOtherInstances(updateDict)
VC = vectorClock.returnVC()
keyData = data[str(key)]
version = keyData['version']
localAddress = str(os.environ['SOCKET_ADDRESS'])
return response.json({"message":"Added successfully", "causal-metadata": {'vectorclock':VC, 'key':key, 'version':version, 'last_addr_contacted':localAddress}}, status=200)
Anyone have any idea what is going on?