Alex Badea 2fe7ddf108 p4runtime lib: wait for MasterArbitrationUpdate to complete (#166)
Running the exercises can race:

	$ make run
	mkdir -p build pcaps logs
	p4c-bm2-ss --p4v 16 --p4runtime-file build/basic.p4info --p4runtime-format text -o build/basic.json basic.p4
	sudo python ../../utils/run_exercise.py -t topology.json -b simple_switch_grpc
	Reading topology file.
	Building mininet topology.
	Switch port mapping:
	s1:  1:h1	2:s2	3:s3
	s2:  1:h2	2:s1	3:s3
	s3:  1:h3	2:s1	3:s2
	Configuring switch s3 using P4Runtime with file s3-runtime.json
	 - Using P4Info file build/basic.p4info...
	 - Connecting to P4Runtime server on 127.0.0.1:50053 (bmv2)...
	 - Setting pipeline config (build/basic.json)...
	Traceback (most recent call last):
	  File "../../utils/run_exercise.py", line 408, in <module>
	    exercise.run_exercise()
	  File "../../utils/run_exercise.py", line 207, in run_exercise
	    self.program_switches()
	  File "../../utils/run_exercise.py", line 312, in program_switches
	    self.program_switch_p4runtime(sw_name, sw_dict)
	  File "../../utils/run_exercise.py", line 284, in program_switch_p4runtime
	    proto_dump_fpath=outfile)
	  File "/home/p4/tutorials/utils/p4runtime_lib/simple_controller.py", line 120, in program_switch
	    bmv2_json_file_path=bmv2_json_fpath)
	  File "/home/p4/tutorials/utils/p4runtime_lib/switch.py", line 85, in SetForwardingPipelineConfig
	    self.client_stub.SetForwardingPipelineConfig(request)
	  File "/usr/local/lib/python2.7/dist-packages/grpc/_interceptor.py", line 141, in __call__
	    return call_future.result()
	  File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 272, in result
	    raise self
	grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.PERMISSION_DENIED, Not master)>
	../../utils/Makefile:27: recipe for target 'run' failed
	make: *** [run] Error 1

Fix that by waiting for one response after sending MasterArbitrationUpdate.

Signed-off-by: Alex Badea <alex.badea@keysight.com>
2018-06-06 15:01:33 -04:00

169 lines
6.0 KiB
Python

# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from Queue import Queue
from abc import abstractmethod
from datetime import datetime
import grpc
from p4 import p4runtime_pb2
from p4.tmp import p4config_pb2
MSG_LOG_MAX_LEN = 1024
# List of all active connections
connections = []
def ShutdownAllSwitchConnections():
for c in connections:
c.shutdown()
class SwitchConnection(object):
def __init__(self, name=None, address='127.0.0.1:50051', device_id=0,
proto_dump_file=None):
self.name = name
self.address = address
self.device_id = device_id
self.p4info = None
self.channel = grpc.insecure_channel(self.address)
if proto_dump_file is not None:
interceptor = GrpcRequestLogger(proto_dump_file)
self.channel = grpc.intercept_channel(self.channel, interceptor)
self.client_stub = p4runtime_pb2.P4RuntimeStub(self.channel)
self.requests_stream = IterableQueue()
self.stream_msg_resp = self.client_stub.StreamChannel(iter(self.requests_stream))
self.proto_dump_file = proto_dump_file
connections.append(self)
@abstractmethod
def buildDeviceConfig(self, **kwargs):
return p4config_pb2.P4DeviceConfig()
def shutdown(self):
self.requests_stream.close()
self.stream_msg_resp.cancel()
def MasterArbitrationUpdate(self, dry_run=False, **kwargs):
request = p4runtime_pb2.StreamMessageRequest()
request.arbitration.device_id = self.device_id
request.arbitration.election_id.high = 0
request.arbitration.election_id.low = 1
if dry_run:
print "P4Runtime MasterArbitrationUpdate: ", request
else:
self.requests_stream.put(request)
for item in self.stream_msg_resp:
return item # just one
def SetForwardingPipelineConfig(self, p4info, dry_run=False, **kwargs):
device_config = self.buildDeviceConfig(**kwargs)
request = p4runtime_pb2.SetForwardingPipelineConfigRequest()
request.election_id.low = 1
request.device_id = self.device_id
config = request.config
config.p4info.CopyFrom(p4info)
config.p4_device_config = device_config.SerializeToString()
request.action = p4runtime_pb2.SetForwardingPipelineConfigRequest.VERIFY_AND_COMMIT
if dry_run:
print "P4Runtime SetForwardingPipelineConfig:", request
else:
self.client_stub.SetForwardingPipelineConfig(request)
def WriteTableEntry(self, table_entry, dry_run=False):
request = p4runtime_pb2.WriteRequest()
request.device_id = self.device_id
request.election_id.low = 1
update = request.updates.add()
update.type = p4runtime_pb2.Update.INSERT
update.entity.table_entry.CopyFrom(table_entry)
if dry_run:
print "P4Runtime Write:", request
else:
self.client_stub.Write(request)
def ReadTableEntries(self, table_id=None, dry_run=False):
request = p4runtime_pb2.ReadRequest()
request.device_id = self.device_id
entity = request.entities.add()
table_entry = entity.table_entry
if table_id is not None:
table_entry.table_id = table_id
else:
table_entry.table_id = 0
if dry_run:
print "P4Runtime Read:", request
else:
for response in self.client_stub.Read(request):
yield response
def ReadCounters(self, counter_id=None, index=None, dry_run=False):
request = p4runtime_pb2.ReadRequest()
request.device_id = self.device_id
entity = request.entities.add()
counter_entry = entity.counter_entry
if counter_id is not None:
counter_entry.counter_id = counter_id
else:
counter_entry.counter_id = 0
if index is not None:
counter_entry.index.index = index
if dry_run:
print "P4Runtime Read:", request
else:
for response in self.client_stub.Read(request):
yield response
class GrpcRequestLogger(grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor):
"""Implementation of a gRPC interceptor that logs request to a file"""
def __init__(self, log_file):
self.log_file = log_file
with open(self.log_file, 'w') as f:
# Clear content if it exists.
f.write("")
def log_message(self, method_name, body):
with open(self.log_file, 'a') as f:
ts = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
msg = str(body)
f.write("\n[%s] %s\n---\n" % (ts, method_name))
if len(msg) < MSG_LOG_MAX_LEN:
f.write(str(body))
else:
f.write("Message too long (%d bytes)! Skipping log...\n" % len(msg))
f.write('---\n')
def intercept_unary_unary(self, continuation, client_call_details, request):
self.log_message(client_call_details.method, request)
return continuation(client_call_details, request)
def intercept_unary_stream(self, continuation, client_call_details, request):
self.log_message(client_call_details.method, request)
return continuation(client_call_details, request)
class IterableQueue(Queue):
_sentinel = object()
def __iter__(self):
return iter(self.get, self._sentinel)
def close(self):
self.put(self._sentinel)