跳转到正文
莫尔索随笔
返回

Ryu SDN 源码解析与实验:OpenFlow 交换机、流量监控与 REST API

预计 2 分钟

第一时间捕获有价值的信号

深入剖析 Ryu SDN 控制器 SimpleSwitch13 源码,通过实验理解 OpenFlow 交换机、流量监控和 RESTful API 功能的实现细节。

核心内容

SimpleMonitor13,SimpleSwitchRest13 两个类都继承自 SimpleSwitch13,分别扩展了流量监控和北向 RESTful API 功能,这篇以实验+源码剖析的方式简单总结下。

Switching Hub

  • 传统交换机
    • 学习连接到传统交换机的主机的mac地址,并把其存在mac地址表中
    • 对于已经记录下来的mac地址,若是收到送往该mac地址的数据包时,就往对应的端口进行转发
    • 对于mac地址表中没有的数据包,则进行flooding
  • OpenFlow 交换机实现传统交换机功能
    • 对接收到的数据包向指定的端口转发
    • 把接收到的数据包发送给控制器(Packet-In)
    • 把从控制器接收到的数据包转发到指定的端口(Packet-Out)
# simple_switch_13.py  实现一个带MAC地址学习功能的二层交换机
from ryu.base import app_manager
from ryu.controller import ofp_event
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_3
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
from ryu.lib.packet import ether_types


class SimpleSwitch13(app_manager.RyuApp):
    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    def __init__(self, *args, **kwargs):
        super(SimpleSwitch13, self).__init__(*args, **kwargs)
        # MAC地址表的定义
        self.mac_to_port = {}
    # 控制器向OpenFlow交换机发送Features Request消息,请求OpenFlow交换机上传自己的详细参数。
    # OpenFlow交换机收到请求后,向控制器发送Features Reply消息,详细汇报自身参数,
    # 包括支持的buffer、流表数以及Actions等
    @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
    def switch_features_handler(self, ev):
        # ev.msg是用来存对应事件的OpenFlow消息类别实体,这里指的是OFPSwitchFeatures
        # datapath是和控制器通信的实体单元
        datapath = ev.msg.datapath
        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser
        # install table-miss flow entry
        #
        # We specify NO BUFFER to max_len of the output action due to
        # OVS bug. At this moment, if we specify a lesser number, e.g.,
        # 128, OVS will send Packet-In with invalid buffer_id and
        # truncated packet data. In that case, we cannot output packets
        # correctly.  The bug has been fixed in OVS v2.1.0.
# OpenFlow1.3版本为处理table miss事件专门引入的条目。并规定每一个flow table必须要支持table-miss flow entry去处理table miss情况。
# table-miss flow entry具备最低的优先级(0);必须至少能够支持使用CONTROLLER保留端口发送包,使用Clear-Actions指令丢包;
# table-miss flow entry和其他flow entry具有相同的特性:默认不存在,控制器可以随时添加或丢弃该条目,也可以到期;如果使用CONTROLLER保留端口发生数据包,Packet-In发送原因必须标明table-miss。

        match = parser.OFPMatch()
        # OFPActionOutput() :使用一个packet_out 消息去指定你想从交换机的哪个端口发送出数据包。在该应用中,按照标准指定通过CONTROLLER保留端口发生,所以选择了OFPP_CONTROLLER端口。第二个参数OFPCML_NO_BUFFER,指明:消息中必须包含完整的包,而不会被缓冲。
        actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,
                                          ofproto.OFPCML_NO_BUFFER)]
        # 下发Table-miss Flow Entry优先级为0的流表,即如果报文都没有匹配的话,则匹配该报文,并将其发送给控制器
        self.add_flow(datapath, 0, match, actions)

    # add_flow方法用来发送Flow Mod消息
    def add_flow(self, datapath, priority, match, actions, buffer_id=None):
        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser
        # Instruction是1.0版本中Actions的拓展,Instruction主要负责将流表转发到其他Table,或者进行其他转发操作,# OFPIT_APPLY_ACTIONS: 立即应用actions操作到交换机
        inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,
                                             actions)]
        if buffer_id:
            mod = parser.OFPFlowMod(datapath=datapath, buffer_id=buffer_id,
                                    priority=priority, match=match,
                                    instructions=inst)
        else:
            mod = parser.OFPFlowMod(datapath=datapath, priority=priority,
                                    match=match, instructions=inst)
        datapath.send_msg(mod)

    # 处理Packet-in数据包,交换机状态为正常状态
    @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
    def _packet_in_handler(self, ev):
        # If you hit this you might want to increase
        # the "miss_send_length" of your switch
        if ev.msg.msg_len < ev.msg.total_len:
            self.logger.debug("packet truncated: only %s of %s bytes",
                              ev.msg.msg_len, ev.msg.total_len)
        msg = ev.msg
        datapath = msg.datapath
        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser
        # match 用来存储数据包的Meta元数据
        in_port = msg.match['in_port']
        # data 接受数据包本身的信息
        pkt = packet.Packet(msg.data)
        eth = pkt.get_protocols(ethernet.ethernet)[0]

        if eth.ethertype == ether_types.ETH_TYPE_LLDP:
            # ignore lldp packet
            return
        dst = eth.dst
        src = eth.src

        # 更新mac地址表,每台交换机独立的dpid对应一个表
        dpid = format(datapath.id, "d").zfill(16)
        self.mac_to_port.setdefault(dpid, {})

        self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port)

        # learn a mac address to avoid FLOOD next time.
        self.mac_to_port[dpid][src] = in_port

        if dst in self.mac_to_port[dpid]:
            out_port = self.mac_to_port[dpid][dst]
        else:
            out_port = ofproto.OFPP_FLOOD

        actions = [parser.OFPActionOutput(out_port)]

        # install a flow to avoid packet_in next time
        if out_port != ofproto.OFPP_FLOOD:
            match = parser.OFPMatch(in_port=in_port, eth_dst=dst, eth_src=src)
            # verify if we have a valid buffer_id, if yes avoid to send both
            # flow_mod & packet_out
            if msg.buffer_id != ofproto.OFP_NO_BUFFER:
                self.add_flow(datapath, 1, match, actions, msg.buffer_id)
                return
            else:
                self.add_flow(datapath, 1, match, actions)
        data = None
        if msg.buffer_id == ofproto.OFP_NO_BUFFER:
            data = msg.data

        out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id,
                                  in_port=in_port, actions=actions, data=data)
        datapath.send_msg(out)

实验测试

mininet端执行

$ sudo mn --topo single,3 --mac --switch ovsk,protocols=OpenFlow13 --controller remote
*** Creating network
*** Adding controller
Unable to contact the remote controller at 127.0.0.1:6653
Unable to contact the remote controller at 127.0.0.1:6633
Setting remote controller to 127.0.0.1:6653
*** Adding hosts:
h1 h2 h3
*** Adding switches:
s1
*** Adding links:
(h1, s1) (h2, s1) (h3, s1)
*** Configuring hosts
h1 h2 h3
*** Starting controller
c0
*** Starting 1 switches
s1 ...
*** Starting CLI:

ovs端执行

$ sudo ovs-vsctl show
0f735673-a50c-4059-95bf-6ee3fd50c39c
    Bridge "s1"
        Controller "tcp:127.0.0.1:6653"
        Controller "ptcp:6654"
        fail_mode: secure
        Port "s1"
            Interface "s1"
                type: internal
        Port "s1-eth2"
            Interface "s1-eth2"
        Port "s1-eth1"
            Interface "s1-eth1"
        Port "s1-eth3"
            Interface "s1-eth3"
    ovs_version: "2.9.5"

因为此时的交换机中没有任何可匹配的转发策略,在mininet端执行pingall,结果全丢包

mininet> pingall
*** Ping: testing ping reachability
h1 -> X X
h2 -> X X
h3 -> X X
*** Results: 100% dropped (0/6 received)
再开一个终端,开启ryu

执行ryu-manager --verbose ryu.app.simple_switch_13

  • 在ovs端查看流表,Table-miss Flow Entry 已加入OVS

    $ sudo ovs-ofctl -O OpenFlow13 dump-flows s1
    cookie=0x0, duration=28.824s, table=0, n_packets=0, n_bytes=0, priority=0 actions=CONTROLLER:65535
    
  • mininet端

    mininet> h1 ping -c1 h2
    PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
    64 bytes from 10.0.0.2: icmp_seq=1 ttl=64 time=5.18 ms
    
    --- 10.0.0.2 ping statistics ---
    1 packets transmitted, 1 received, 0% packet loss, time 0ms
    rtt min/avg/max/mdev = 5.183/5.183/5.183/0.000 ms
    
  • ovs端再次查看OVS流表

    $ sudo ovs-ofctl -O OpenFlow13 dump-flows s1
    cookie=0x0, duration=190.004s, table=0, n_packets=2, n_bytes=140, priority=1,in_port="s1-eth2",dl_dst=00:00:00:00:00:01 actions=output:"s1-eth1"
    cookie=0x0, duration=190.003s, table=0, n_packets=1, n_bytes=42, priority=1,in_port="s1-eth1",dl_dst=00:00:00:00:00:02 actions=output:"s1-eth2"
    cookie=0x0, duration=648.460s, table=0, n_packets=6, n_bytes=392, priority=0 actions=CONTROLLER:65535
  • ryu端查看

    EVENT ofp_event->SimpleSwitch13 EventOFPPacketIn
    packet in 1 00:00:00:00:00:01 ff:ff:ff:ff:ff:ff 1
    EVENT ofp_event->SimpleSwitch13 EventOFPPacketIn
    packet in 1 00:00:00:00:00:02 00:00:00:00:00:01 2
    EVENT ofp_event->SimpleSwitch13 EventOFPPacketIn
    packet in 1 00:00:00:00:00:01 00:00:00:00:00:02 1

Traffic Monitor

from operator import attrgetter

from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub


class SimpleMonitor13(simple_switch_13.SimpleSwitch13):

    def __init__(self, *args, **kwargs):
        super(SimpleMonitor13, self).__init__(*args, **kwargs)
        # 增添datapaths字典,存储交换机id
        self.datapaths = {}
        # 引入hub.spawn()函数启动一个新线程,输入为一个新的方法_monitor
        self.monitor_thread = hub.spawn(self._monitor)
    # 创建一个EventOFPStateChange监听事件,监听MAIN_DISPATCHER,DEAD_DISPATCHER两种情况。
    @set_ev_cls(ofp_event.EventOFPStateChange,
                [MAIN_DISPATCHER, DEAD_DISPATCHER])
    def _state_change_handler(self, ev):
        datapath = ev.datapath
        # 监听如果为MAIN_DISPATCHER,并且datapath.id不在datapath列表中,则证明是新加入的交换机
        if ev.state == MAIN_DISPATCHER:
            if datapath.id not in self.datapaths:
                self.logger.debug('register datapath: %016x', datapath.id)
                self.datapaths[datapath.id] = datapath
        # 如果为DEAD_DISPATCHER,并且datapath.id在datapath列表中,则证明是掉线的交换机
        elif ev.state == DEAD_DISPATCHER:
            if datapath.id in self.datapaths:
                self.logger.debug('unregister datapath: %016x', datapath.id)
                del self.datapaths[datapath.id]
    # _monitor方法,循环不断向datapath列表中的交换机发送Flow状态请求,和Port状态请求
    def _monitor(self):
        while True:
            for dp in self.datapaths.values():
                self._request_stats(dp)
            hub.sleep(10)

    def _request_stats(self, datapath):
        self.logger.debug('send stats request: %016x', datapath.id)
        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser

        req = parser.OFPFlowStatsRequest(datapath)
        datapath.send_msg(req)

        req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
        datapath.send_msg(req)
    # 刚刚发送了请求,现在监听其回复
    @set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
    def _flow_stats_reply_handler(self, ev):
        body = ev.msg.body

        self.logger.info('datapath         '
                         'in-port  eth-dst           '
                         'out-port packets  bytes')
        self.logger.info('---------------- '
                         '-------- ----------------- '
                         '-------- -------- --------')
        # 其中sorted ... lambda语法,指元组的排列顺序按照先in_port再eth_dst
        for stat in sorted([flow for flow in body if flow.priority == 1],
                           key=lambda flow: (flow.match['in_port'],
                                             flow.match['eth_dst'])):
            self.logger.info('%016x %8x %17s %8x %8d %8d',
                             ev.msg.datapath.id,
                             stat.match['in_port'], stat.match['eth_dst'],
                             stat.instructions[0].actions[0].port,
                             stat.packet_count, stat.byte_count)

    @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
    def _port_stats_reply_handler(self, ev):
        body = ev.msg.body

        self.logger.info('datapath         port     '
                         'rx-pkts  rx-bytes rx-error '
                         'tx-pkts  tx-bytes tx-error')
        self.logger.info('---------------- -------- '
                         '-------- -------- -------- '
                         '-------- -------- --------')
        for stat in sorted(body, key=attrgetter('port_no')):
            self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
                             ev.msg.datapath.id, stat.port_no,
                             stat.rx_packets, stat.rx_bytes, stat.rx_errors,
                             stat.tx_packets, stat.tx_bytes, stat.tx_errors)

实验测试

  • mininet端 执行sudo mn --topo single,3 --mac --switch ovsk,protocols=OpenFlow13 --controller remote
  • ryu端 执行ryu-manager --verbose ryu.app.simple_monitor_13
  • mininet端 执行h1 ping -c1 h2,可观察到实时的流量检测

REST Linkage

RYU提供了一个类似WSGI的web服务器功能。借助这个功能,我们可以创建一个REST API,基于创建的REST API,可以快速的将RYU系统与其他系统或者是浏览器相连接。

import json

from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller.handler import CONFIG_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.app.wsgi import ControllerBase
from ryu.app.wsgi import Response
from ryu.app.wsgi import route
from ryu.app.wsgi import WSGIApplication
from ryu.lib import dpid as dpid_lib

simple_switch_instance_name = 'simple_switch_api_app'
url = '/simpleswitch/mactable/{dpid}'

# 继承SimpleSwitch13的功能,即具备父类交换机的基本功能。
# 注册WSGI服务
# 配置mac_to_port
class SimpleSwitchRest13(simple_switch_13.SimpleSwitch13):

    _CONTEXTS = {'wsgi': WSGIApplication}

    def __init__(self, *args, **kwargs):
        super(SimpleSwitchRest13, self).__init__(*args, **kwargs)
        self.switches = {}
        # 通过上一步设置的_CONTEXTS成员变量,可以通过kwargs进行实例化一个WSGIApplication。
        # 同时使用register方法注册该服务到controller类上。
        wsgi = kwargs['wsgi']
        wsgi.register(SimpleSwitchController,
                      {simple_switch_instance_name: self})

    # 重写父类的switch_features_handler函数,存储datapath到switches,初始化MAC 地址表
    @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
    def switch_features_handler(self, ev):
        super(SimpleSwitchRest13, self).switch_features_handler(ev)
        datapath = ev.msg.datapath
        self.switches[datapath.id] = datapath
        self.mac_to_port.setdefault(datapath.id, {})
    # 该方法将MAC地址和端口注册到指定的交换机。该方法主要被REST API的PUT方法所调用
    def set_mac_to_port(self, dpid, entry):
        mac_table = self.mac_to_port.setdefault(dpid, {})
        datapath = self.switches.get(dpid)

        entry_port = entry['port']
        entry_mac = entry['mac']

        if datapath is not None:
            parser = datapath.ofproto_parser
            if entry_port not in mac_table.values():

                for mac, port in mac_table.items():

                    # from known device to new device
                    actions = [parser.OFPActionOutput(entry_port)]
                    match = parser.OFPMatch(in_port=port, eth_dst=entry_mac)
                    self.add_flow(datapath, 1, match, actions)

                    # from new device to known device
                    actions = [parser.OFPActionOutput(port)]
                    match = parser.OFPMatch(in_port=entry_port, eth_dst=mac)
                    self.add_flow(datapath, 1, match, actions)

                mac_table.update({entry_mac: entry_port})
        return mac_table


class SimpleSwitchController(ControllerBase):

    def __init__(self, req, link, data, **config):
        super(SimpleSwitchController, self).__init__(req, link, data, **config)
        self.simple_switch_app = data[simple_switch_instance_name]
    # 借助route装饰器关联方法和URL。参数如下:
    # 第一个参数:任何自定义名称
    # 第二个参数:指明URL
    # 第三个参数:指定http方法
    # 第四个参数:指明指定位置的格式,URL(/simpleswitch/mactable/{dpid} 匹配DPID_PATTERN的描述
    @route('simpleswitch', url, methods=['GET'],
           requirements={'dpid': dpid_lib.DPID_PATTERN})
    # 当使用GET方式访问到该REST API接口时,调用list_mac_table函数
    def list_mac_table(self, req, **kwargs):

        simple_switch = self.simple_switch_app
        dpid = kwargs['dpid']

        if dpid not in simple_switch.mac_to_port:
            return Response(status=404)

        mac_table = simple_switch.mac_to_port.get(dpid, {})
        body = json.dumps(mac_table)
        return Response(content_type='application/json', text=body)

    @route('simpleswitch', url, methods=['PUT'],
           requirements={'dpid': dpid_lib.DPID_PATTERN})
    def put_mac_table(self, req, **kwargs):

        simple_switch = self.simple_switch_app
        dpid = kwargs['dpid']
        try:
            new_entry = req.json if req.body else {}
        except ValueError:
            raise Response(status=400)

        if dpid not in simple_switch.mac_to_port:
            return Response(status=404)

        try:
            mac_table = simple_switch.set_mac_to_port(dpid, new_entry)
            body = json.dumps(mac_table)
            return Response(content_type='application/json', text=body)
        except Exception as e:
            return Response(status=500)

实验测试

  • mininet端 执行sudo mn --topo single,3 --mac --switch ovsk,protocols=OpenFlow13 --controller remote
  • ryu端 执行ryu-manager --verbose ryu.app.simple_switch_rest_13
  • mininet端 执行h1 ping -c1 h2,并观察ryu端的变化

使用curl调用 RESTful API 获取mac地址表 curl -X GET http://127.0.0.1:8080/simpleswitch/mactable /0000000000000001 使用curl调用 RESTful API 进行mac地址表的注册 curl -X PUT -d '{"mac" : "00:00:00:00:00:03", "port" : 3}' http://127.0.0.1:8080/simpleswitch/mactable/0000000000000001 查看流表可以看得到刚刚PUT的请求,也转化为流表的形式下发了

参考链接

RYUBook