Portal服务中Websocket简介

概述

Portal服务中有一些操作不会立即返回结果,需要异步将操作结果返回给界面,这时就需要使用websocket服务推送异步结果。

数据处理流程

  1. 服务通过message_collector模块,从openstack消息队列接收nova、cinder等服务发送过来的通知消息,并根据配置进行初步解码,供其他功能模块使用。

  2. notify_manager模块接收 message_collector 初步解码的数据,针对UI需求,根据配置进行转换。

  3. 最后websocket模块将处理过后的数据推送到UI界面。

graph TB
mq(RabbitMQ)
ui(UI界面)

subgraph 1. message_collector模块
msg_col[接收MQ消息]
base_decode[通用解码]
end

subgraph 2. notify_manager模块
ws_decode[专用解码]
end

subgraph 3. websocket模块
wss[链接管理]
end


mq-->|通知消息|msg_col
msg_col-->base_decode
base_decode-->ws_decode
ws_decode-->wss
wss-->|推送给UI|ui

增加一种消息的推送

目前服务中如果需要增加一类消息推送到界面,只需要修改相应模块配置即可。主要是修改数据处理流程中的1.2部分的配置,一般需要下面几个步骤(以增加云硬盘创建成功的消息推送为例子):

当云硬盘创建成功后会发出event_typevolume.create.end的消息, 在消息payload中有tenant_id,volume_id,display_name等属性信息。

  1. 修改services/message_collector/converter目录下的event_definitions.yaml配置文件,增加相关消息解码配置。

event_type:就是我们要进行解码的消息的event_type,支持通配符*
traits:为此调配置的名称,解码配置支持继承时要用到。
剩下的就是属性映射解码的配置,例如我们要把payload当中的tenant_id解码成新消息体中的tenant_idproject_id,将volume_id解码成resource_id等等。


- event_type: 'volume.create.*'
  traits: &cinder_volume_create
    project_id:
      fields: payload.tenant_id
    tenant_id:
      fields: payload.tenant_id
    availability_zone:
      fields: payload.availability_zone
    display_name:
      fields: payload.display_name
    replication_status:
      fields: payload.replication_status
    status:
      fields: payload.status
    created_at:
      fields: payload.created_at
    resource_id:
      fields: payload.volume_id
    host:
      fields: payload.host
    size:
      fields: payload.size
    type:
      fields: payload.volume_type
    replication_status:
      fields: payload.replication_status
  1. 修改services/notify_manager目录下的settings.ini配置文件,对[NOTIFY_MANAGER]节的transformer配置进行修改,增加转换配置。

服务的ini配置文件格式可参看servos中docs下的说明。

配置中value部分可以是从源消息中提取,使用@标识符表示;否则,使用配置的字符串。
例如:'@traits.resource_id'表示从源消息(经过message_collector转换后的消息)的traits中取resource_id的值。


[NOTIFY_MANAGER]

# 转换器配置
transformer = [
    ...
    {
        # 要处理的消息类型
        'event_type': 'volume.create.end',
        # 转换后的结果
        'event_define': {
            # 转换后的消息类型
            'event_type': 'websocket.send',
            # 转换后的消息体
            'traits': {
                # 接收对象
                'toid': '@traits.user_id',
                # 通知时间类型
                'message_event': 'STATES',
                # 通知消息类型
                'message_type': 'volume',
                # 消息内容(需要什么和UI协商)
                'message': {
                    'resource_id': '@traits.resource_id',
                    'name': '@traits.display_name',
                    'state': 'created',
                    'option': 'reload',
                }
            }
        },
    },
    ...
]