如何使用WebSocket API订阅机器可读新闻

Wasin Waeosri
Developer Advocate Developer Advocate

简介

本文展示了开发人员如何使用订阅价格流数据和实时服务的Websocket API(又称Websocket API应用程序)从Refinitiv实时数据分发系统(Refinitiv Real-Time Advanced Data Hub和Refinitiv Real-Time Advanced Distribution Server)订阅机器可读新闻(MRN),然后组装和解码MRN文本新闻消息,在控制台/Jupyter notebook中显示新闻。

机器可读新闻(Refinitiv Machine Readable News-MRN)是一项先进的服务,用于消费和系统分析新闻自动化。它直接向您的应用程序提供深厚的历史新闻档案、超低延迟的结构化新闻和新闻分析。这使算法能够利用新闻的力量来抓住机会,充分利用市场的低效率,并管理事件风险。

更新(截至2021年12月)。GitHub示例的主/主分支现在支持Refinitiv实时云平台(Refinitiv Real-Time -- Optimized, RTO-以前称为ERT in Cloud)连接。你不需要再切换到ERT in Cloud分支。

  • RTO控制台的例子:请查看我同事的Refinitiv-API-Samples/Example.WebSocketAPI.Python.MRN.RTO GitHub 源。
  • RTO笔记本的例子:mrn_notebook_app_rto.ipynb笔记本文件。
  • 已部署的Refinitiv实时数据平台(RTDS)的例子是:mrn_console_app.py控制台应用程序和mrn_notebook_app.ipynb笔记本文件。

WebSocket API概览和先决条件

WebSocket API是直接访问的工具,能够轻松集成到众多的客户端技术环境中,如脚本和网络。 该API直接运行在你的Refinitiv实时数据平台基础设施或云端,并以开放(JSON)的可读格式呈现数据。

本示例仅侧重于Refinitiv机器可读新闻(MRN)的数据处理。如果你不熟悉WebSocket API,我强烈建议你查看WebSocket API教程页面

教程页面为那些有兴趣开发WebSocket应用程序以消费Refinitiv Real-Time实时数据的开发者提供了一个逐步指南(连接、登录、请求数据、解析数据等)。教程还涵盖了Refinitiv Real-Time Optimized(以前称为ERT in Cloud)和已部署的实时数据平台Refinitiv Real-Time Distribution System基础设施连接场景。

MRN概览

MRN通过Refinitiv Real-Time发布,使用新闻文本分析领域消息中的开放消息模型(OMM)封套。实时新闻内容集通过MRN_STORY RIC提供。内容数据包含在一个FRAGMENT字段中,该字段已被压缩并可能被分割成多个消息,以减少带宽和消息大小。

一个FRAGMENT字段基于连接类型的不同而有不同的数据类型:

  • RSSL连接(RTSDK C++/Java):BUFFER类型
  • WebSocket连接:Base64 ASCII字符串

数据要经过以下一系列的转换。

  1. 核心内容数据是一个UTF-8的JSON字符串;
  2.  这个JSON字符串使用gzip进行压缩;
  3. 压缩后的JSON被分割成几个片段(BUFFER或Base64 ASCII字符串),每个片段都适合于单独一个更新信息。
  4.  数据片段被添加到一个更新信息中,作为FieldList信封中的FRAGMENT字段值。

因此,为了解析核心内容数据,应用程序将需要逆转这一过程。

数据模型

五个字段以及RIC本身都是必要的,以确定是否收到了整个条目的各种碎片,以及如何将这些碎片串联起来重建条目。

  • MRN_SRC:发布FRAGMENT的计分/处理系统的标识。
  • GUID:数据项的全球唯一标识符,该数据项的所有消息都有相同的GUID值。
  • FRAGMENT:压缩的数据项片段。
  • TOT_SIZE:碎片数据的总大小,以字节为单位。
  • FRAG_NUM:数据项中片段的序列号,对于每个条目发布的第一个片段,该序列号被设置为1。

并对同一条目的每个后续片段进行递增。一个单一的MRN数据项发布是由RIC名称、MRN_SRC和GUID的组合来唯一标识的。

碎片化

对于一个给定的RIC/MRN_SRC/GUID组合,当一个数据项只需要一个消息时,那么TOT_SIZE将等于FRAGMENT中的字节长度,FRAG_NUM将是1。

当需要多个信息时,那么一旦每个FRAGMENT的字节长度之和等于TOT_SIZE,就可以认为该数据项被完全接收。接收者还将观察到,所有FRAG_NUM的范围是从1到没有跳过中间的整数的片段编号的数字。换句话说,一个通过三个消息传输的数据项将包含1、2和3的FRAG_NUM值。

在进一步处理这个字段之前,WebSocket应用程序还需要将FRAGMENT字段数据从Base64字符串转换为字节数据。Python应用程序可以使用base64 模块将Base64字符串解码为字节数据。

压缩

FRAGMENT字段是用gzip压缩的,因此需要接收者解压以显示该FID中的JSON纯文本数据。

当一个MRN数据项在多个消息中发送时,所有的消息都必须被接收,并在解压前将其所有的碎片连接起来。换句话说,碎片FRAGMENTs不应该被独立解压。

解压后的输出是以UTF-8编码的,格式为JSON。Python应用程序可以使用zlib模块来解压JSON字符串。

如何处理MRN数据

一旦应用程序建立了连接,并从Refinitiv实时数据平台请求MRN RIC数据,应用程序可以通过以下流程处理传入的MRN数据信息:

请在下面的代码演练部分查看更多细节。

代码演练

请求数据

应用程序可以用NewsTextAnalytics域和以下针对MRN的RIC名称订阅MRN数据。

  1. MRN_STORY:实时新闻
  2. MRN_TRNA: 新闻分析:公司和C&E资产
  3.  MRN_TRNA_DOC: 新闻分析:宏观经济新闻和事件
  4.  MRN_TRSI: 新闻情绪指数
    	
            

# Global Default Variables

mrn_domain = 'NewsTextAnalytics'

mrn_item = 'MRN_STORY'

 

def send_mrn_request(ws):

    """ Create and send MRN request """

    mrn_req_json = {

        'ID': 2,

        "Domain": mrn_domain,

        'Key': {

            'Name': mrn_item

        }

    }

 

    ws.send(json.dumps(mrn_req_json))

    print("SENT:")

    print(json.dumps(mrn_req_json, sort_keys=True, indent=2, separators=(',', ':')))

处理刷新信息

MRN刷新响应消息不包含任何新闻或片段信息。它包含相关的数据源或其他静态字段。应用程序只是在控制台中打印出每个传入的字段数据,供参考:

    	
            

def processRefresh(ws, message_json):

 

    print("RECEIVED: Refresh Message")

    decodeFieldList(message_json["Fields"])

 

def decodeFieldList(fieldList_dict):

    for key, value in fieldList_dict.items():

        print("Name = %s: Value = %s" % (key, value))

MRN刷新响应信息的示例结果如下:

    	
            

RECEIVED: Refresh Message

Name = PROD_PERM: Value = 10001

Name = ACTIV_DATE: Value = 2019-07-20

Name = RECORDTYPE: Value = 30

Name = RDN_EXCHD2: Value = MRN

Name = TIMACT_MS: Value = 37708132

Name = GUID: Value = None

Name = CONTEXT_ID: Value = 3752

Name = DDS_DSO_ID: Value = 4232

Name = SPS_SP_RIC: Value = .[SPSML2L1

Name = MRN_V_MAJ: Value = 2

Name = MRN_TYPE: Value = STORY

Name = MRN_V_MIN: Value = 10

Name = MRN_SRC: Value = HK1_PRD_A

Name = FRAG_NUM: Value = 1

Name = TOT_SIZE: Value = 0

Name = FRAGMENT: Value = None

处理更新信息

更新响应消息包含新闻信息和片段数据。首先,应用程序从更新消息中获得FRAGMENT、FRAG_NUM、GUID和MRN_SRC字段的值。我们使用Python的base64模块将FRAGMENT字段值从ASCII字符串解码为字节数据。

    	
            

def processMRNUpdate(ws, message_json):  # process incoming News Update messages

 

    fields_data = message_json["Fields"]

    # declare variables

    tot_size = 0

    guid = None

 

    # Get data for all requried fields

    fragment = base64.b64decode(fields_data["FRAGMENT"])

    frag_num = int(fields_data["FRAG_NUM"])

    guid = fields_data["GUID"]

    mrn_src = fields_data["MRN_SRC"]

处理第一个片段/单一片断的新闻

接下来,我们检查是否有一个FRAG_NUM = 1,这意味着它是这个新闻消息的第一个片段。然后应用程序检查是否有一个FRAGMENT字节长度=TOT_SIZE字段。

  • 如果相等,这是一个单片段新闻的片段,这个新闻消息已经完成。
  •  如果不相等,这是多片段新闻的第一个片段。我们将此更新消息的所有字段数据存储到_news_envelopes列表对象中,并等待下一个片段的到来。
    	
            

def processMRNUpdate(ws, message_json):  # process incoming News Update messages

    ...

    # Get data for all requried fields

    ...

    if frag_num > 1:  # We are now processing more than one part of an envelope - retrieve the current details

        pass

    else:  # FRAG_NUM = 1 The first fragment

            tot_size = int(fields_data["TOT_SIZE"])

            print("FRAGMENT length = %d" % len(fragment))

            # The fragment news is not completed, waiting and add this news data to envelop object.

            if tot_size != len(fragment):

                print("Add new fragments to news envelop for guid %s" % guid)

                _news_envelopes.append({  # the envelop object is a Python dictionary with GUID as a key and other fields are data

                    "guid": guid,

                    "data": {

                        "fragment": fragment,

                        "mrn_src": mrn_src,

                        "frag_num": frag_num,

                        "tot_size": tot_size

                    }

                })

                return None

处理多片段新闻

当应用程序收到FRAG_NUM>1的更新消息时,意味着这个消息是多片段的新闻。我们通过一个GUID从_news_envelopes列表对象中获得前一个/几个片段的数据。请注意,FRAG_NUM>1的更新消息将包含较少的字段,因为源数据已经包含在该新闻消息的第一个更新消息中(FRAG_NUM=1)。

应用程序还需要通过检查MRN_SRC和FRAG_NUM与以前的片段的顺序来检查收到的片段的有效性。如果收到的片段是有效的,应用程序将收到的FRAGMENT字节数据与之前的片段进行组合,并将总的片段字节长度值与TOT_SIZE值相比较。

  • 如果相等,这意味着这个多片段的新闻消息已经完成了。
  •  如果不相等,我们更新当前的新闻更新消息字段到_news_envelopes列表对象,并等待更多的片段。
    	
            

def processMRNUpdate(ws, message_json):  # process incoming News Update messages

    ...

    # Get data for all requried fields

    ...

    if frag_num > 1:  # We are now processing more than one part of an envelope - retrieve the current details

        guid_index = next((index for (index, d) in enumerate(

                _news_envelopes) if d["guid"] == guid), None)

        envelop = _news_envelopes[guid_index]

        

        if envelop and envelop["data"]["mrn_src"] == mrn_src and frag_num == envelop["data"]["frag_num"] + 1:

            print("process multiple fragments for guid %s" % envelop["guid"])

            

            # Merge incoming data to existing news envelop and getting FRAGMENT and TOT_SIZE data to local variables

            fragment = envelop["data"]["fragment"] = envelop["data"]["fragment"] + fragment

            envelop["data"]["frag_num"] = frag_num

            tot_size = envelop["data"]["tot_size"]

            print("TOT_SIZE = %d" % tot_size)

            print("Current FRAGMENT length = %d" % len(fragment))

 

            # The multiple fragments news are not completed, waiting.

            if tot_size != len(fragment):

                return None

            # The multiple fragments news are completed, delete assoiclate GUID envelop

            elif tot_size == len(fragment):

                del _news_envelopes[guid_index]

    else:  # FRAG_NUM = 1 The first fragment

        ...

处理一个已完成的新闻FRAGMENT消息

为了解压缩内容并获得JSON字符串的新闻消息,我们使用Python的zlib模块来解压缩gzip字节数据,然后我们将其解析为JSON消息。

    	
            

def processMRNUpdate(ws, message_json):  # process incoming News Update messages

    ...

    # Get data for all requried fields

    ...

    if frag_num > 1:  # We are now processing more than one part of an envelope - retrieve the current details

        ...

    else:  # FRAG_NUM = 1 The first fragment

        ...

 

    # News Fragment(s) completed, decompress and print data as JSON to console

    if tot_size == len(fragment):

        print("decompress News FRAGMENT(s) for GUID  %s" % guid)

        decompressed_data = zlib.decompress(fragment, zlib.MAX_WBITS | 32)

        print("News = %s" % json.loads(decompressed_data))

示例结果

该示例应用程序支持控制台、Docker 和经典的Jupyter Notebook(classic Jupyter Notebook )环境。请查看源代码项目中的README.md文件,以了解关于如何在每种环境下运行应用程序的更多细节。

注意:新闻信息是UTF-8 JSON字符串格式。由于操作系统的限制,一些包含特殊Unicode字符的新闻消息可能无法在Windows操作系统控制台(cmd、git bash、powershell等)显示。这些信息将被打印为UnicodeEncodeError异常。无法在控制台中解码Unicode字符的信息。

    	
            

RECEIVED:

[

  {

    "DoNotCache":true,

    "DoNotConflate":true,

    "DoNotRipple":true,

    "Domain":"NewsTextAnalytics",

    "Fields":{

      "ACTIV_DATE":"2019-08-14",

      "FRAGMENT":"H4sIAAAAAAAC/41W727iRhD/3qcY+UN1lRJigzFg6dQScAg5Y6jtkN6VqlrwAtvYa253HYKqvkufpU/WWa+T0JPaXqQws7Mzs/Pnt7P+3SK5mmaWb/E4iex2t3NHrAuLVBmjfEOl5f9sRQs/TIJojPKGtX65sNZldkKzm1LAmkoFgsoqVxKOe8rhIBhXjO9A7ZkEwnlZobeCcnUBh5wSSWGTs80jlBxyxh/RRV4e/RXfK3Xwr64O2bYlaKWokK1NWVztVZFzepSvTIvIw/fsvduxu4NNZ721aafXdZ1vq/eV4L7WKHL/zIPftp2B3Xdc/y3PFV/xOEogqoo1FeBDLV7xW8ozNMPQhjnac6LYE4VECaIgxUwX4WjFHReG1a7Cpfa84n/9af5vsVBBnMwjmEbLIElnQZTCzX00TiCczqZpMD7XGYZpEEfDdLoMIEnjYRpMpkECaXyfpOackO5IDgFWU51gmmEF2ZZR4UPb6fRt+84Lw/lo2Bndj2+8szC+DO8/cqI7RiWkQisf8g28W1lqT2FUFgfCTyvruxUf7QnfUVAljJmgG4U9n/JtKQp0U3Jz4pQD2WxKkRHsNByZ2kPIZA2CuMopDFpey3Ev4Mz3KzAkSrG4MwETQfakWPF5nonylKGfPEdwADkcSoQUzYAgnoCX/JI+001Vp5G9BFVu4Xo2h0lerrFoSUHynAoMvz5OZ6nzq0Oj2y2awFaUBTgwR2sNAV2r1lsRNba3lcCIBbC3fN8gjP79pu00p5zALRFP9GQsCc80vNd0T/IthnbehIRuBFVEMB0mFU9M1yBkBcMMV/xl96Qr/vWdW/EUozjsS041lm3o9Z0+5tTumhhDUgkC6b4siNT9esJrq69k42ERw4xwstMVuyO8kmdHG+VSyH85w+t4uKPv+lmdAJcHUT6xDNu2PgHeNdN+fT1BmrR1z7QsLHmmC6NKnArB86ZGXEvbaDfYfnRk3GjtG8YRZrp6I7SrsJPDSu1LoS8J1oygoMbJQrBC1/EMrSirQ9ItrX3dc111+IBIzcqiBSkVhay7t0HfTNtInG45MQOtrI0qbL9WyRDigq2r2nOdyhc1KMhJR5+fWvC/cCq5wshXXHD5Qy7pTg8ueFcQlqvSPxfilQR09sQkQ/Xj8djCXaPdDNAz2RVqN6DGCb5lQqqRoIgePfU14i/t/qXjpnbf73b8rt0aOPYn1NxTkuFspqgVBxO4/CcSm3mI4teRMKabnIg6JzRn2v3rrP3VTN9296fPd/chG39y6cfQO8nlLpuxeed5xx7j43IcTj5MqaOtuVR6kMj5Vj9Bcew7tu3pF2jqB/fxVTCM61coR5hUiFk8i+pTCyolLtPTAUVtXLOiWViKPqur58tDThi/3LJnzP/CavApcD9K9NumZdU6UURV+PhZGIXyK0nWOcUdWa1/w1zrV/Had9pdlCF1e4Z6A6dm2l5DezbSid+pf3t3NRni78x3fhwb+hDVtNM25LYmuvwL320Puv2B53l6L/Zvh0naCuudjus6rtdz3V6vXndt/ed5g/7bstdtu9ow8J36172uiZcasqxJYIJY2oZ+6jW031ATXPu6DuDudtLSgqjtL+IgSWLDX8+TxDHsLJpMk6RRWYSz+fJFZRiPG51kOhnNX0yHo9RwN9MosfuGx4czPeffNAw3QqHhHoJ7w0yuDUVozBfNOfFy1BhEwUNzIAK58TFbRB8NG5rvGUUeaUI/V/qzx/KdC6sSO+TxG6dzYT0h7hHWX3Fv/vjmb4qautlTCQAA",

      "FRAG_NUM":1,

      "GUID":"RSN0253Ja_19081425XqJULiDZ4eYL6ysVgdMiO3xgikRwVDLGKIe1",

      "MRN_SRC":"HK1_PRD_A",

      "MRN_TYPE":"STORY",

      "MRN_V_MAJ":"2",

      "MRN_V_MIN":"10",

      "TIMACT_MS":35640745,

      "TOT_SIZE":1335

    },

    "ID":2,

    "Key":{

      "Name":"MRN_STORY",

      "Service":"ELEKTRON_DD"

    },

    "PermData":"AwhCEBVM",

    "SeqNumber":13678,

    "Type":"Update",

    "UpdateType":"Unspecified"

  }

]

FRAGMENT length = 1335

decompress News FRAGMENT(s) for GUID  RSN0253Ja_19081425XqJULiDZ4eYL6ysVgdMiO3xgikRwVDLGKIe1

News = {'altId': 'nRSN0253Ja', 'audiences': ['NP:LSEND', 'NP:LSEN'], 'body': 'For best results when printing this announcement, please click on link below:\nhttp://pdf.reuters.com/htmlnews/htmlnews.asp?i=43059c3bf0e37541&u=urn:newsml:reuters.com:20190814:nRSN0253Ja\n\nRNS Number : 0253J\nHenderson Alternative Strat Tst PLC\n14 August 2019\n\xa0\n\xa0\nHENDERSON INVESTMENT FUNDS LIMITED\nHENDERSON ALTERNATIVE STRATEGIES TRUST PLC\nLegal Entity Identifier: 213800J6LLOCA3CUDF69\n\xa0\n\xa0\n14 August 2019\nHenderson Alternative Strategies Trust plc ("the Company")\nChange to Director Information\n\xa0\nIn accordance with Listing Rule 9.6.14, the Company announces that Mr Graham\nOldroyd will be appointed as a non-executive director of BMO Global Smaller\nCompanies plc with effect from 1 October 2019.\n\xa0\n\xa0\nFor further information, please call:\n\xa0\nHelena Harvey\nFor and on behalf of\nHenderson Secretarial Services Limited\nSecretary to Henderson Alternative Strategies Trust plc\nTelephone: 020 7818 2025\n\xa0\nLaura Thomas\nInvestment Trust PR Manager\nJanus Henderson Investors\nTelephone: 020 7818 2636\nThis information is provided by RNS, the news service of the London Stock Exchange. RNS is approved by the Financial Conduct Authority to act as a Primary Information Provider in the United Kingdom. Terms and conditions relating to the use and distribution of this information may apply. For further information, please contact\nrns@lseg.com (mailto:rns@lseg.com)\n or visit\nwww.rns.com (http://www.rns.com/)\n.\n\xa0\n', 'firstCreated': '2019-08-14T08:53:50.910Z', 'headline': 'REG - Henderson Alt Strat  - Director Declaration', 'id': 'RSN0253Ja_19081425XqJULiDZ4eYL6ysVgdMiO3xgikRwVDLGKIe1', 'instancesOf': ['RR:1006', 'NI:EUR/EARN'], 'language': 'en', 'messageType': 2, 'mimeType': 'text/x-plain-fixed', 'provider': 'NS:LSE', 'pubStatus': 'stat:usable', 'subjects': ['B:125', 'B:147', 'B:1691', 'B:261', 'B:270', 'G:3', 'G:7J', 'G:A', 'M:1QD', 'M:1WN', 'M:32', 'M:3H', 'M:Z', 'P:4295896662', 'R:HAST.L', 'P:34414674477', 'P:5000006698', 'P:5000075242', 'E:1', 'E:4B', 'E:6T', 'E:6V', 'E:E', 'M:1V0', 'M:1Z7', 'M:1Z8', 'M:1ZN', 'M:2B', 'R:JHG.N', 'N2:PRESSR', 'N2:BOSS1', 'N2:MNGISS', 'N2:PPLMOV', 'N2:BOARD1', 'N2:SIGCOR', 'N2:BACT', 'N2:FINS08', 'N2:INVT08', 'N2:INVT', 'N2:FINS', 'N2:CINV', 'N2:WEU', 'N2:GB', 'N2:EUROP', 'N2:SRVCS', 'N2:NEWR', 'N2:REG', 'N2:CMPNY', 'N2:LEN'], 'takeSequence': 1, 'urgency': 3, 'versionCreated': '2019-08-14T08:53:50.910Z'}