This is the endpoint (one component of the live lock)
@app.route(’/kvs/view-change’, methods=[‘PUT’])
async def view_change(request):
global state
view_str = request.json['view']
print("Start broadcast view change: " + str(state.view))
await state.broadcast_view(view_str)
print("Completed broadcast view change: " + str(state.view))
shards = []
print("started kvs key count")
for address in state.view:
print(address)
if address == state.address:
print("self" + address)
shards.append({"address":state.address, "key-count":len(state.storage)})
else:
print("others" + address)
r = requests.get(f'http://{address}/kvs/key-count')
shards.append({"address":address, "key-count":r.json()['key-count']})
return response.json({"message": "View change successful","shards":shards}, status=200)
And this is the sending function (the other component of the live lock)
async def broadcast_view(self, view, multi_threaded = False):
addresses = set(sorted(view.split(',')) + self.view)
# First send node-change to all nodes.
for address in addresses:
State.send_node_change(address, view)
# Second send key-migration to all nodes.
if not multi_threaded:
for address in addresses:
State.send_key_migration(address, view)
else:
threads = []
for address in addresses:
threads.append(threading.Thread(target=State.send_key_migration, args=(address, view)))
threads[-1].start()
for thread in threads:
thread.join()
@staticmethod
def send_node_change(address, view):
print("HELLO FROM send_node_change()!!!!")
requests.put(f'http://{address}/kvs/node-change', json = {"view":view}, timeout=6, headers = {"Content-Type": "application/json"})
@staticmethod
def send_key_migration(address, view):
print("HELLO FROM send_key_migration()!!!!")
requests.put(f'http://{address}/kvs/key-migration', json = {"view":view}, timeout=6, headers = {"Content-Type": "application/json"})