简介
本文展示了开发人员如何使用订阅价格流数据和实时服务的Websocket API(又称Websocket API应用程序)从Refinitiv实时数据分发系统(Refinitiv Real-Time Advanced Data Hub和Refinitiv Real-Time Advanced Distribution Server)订阅机器可读新闻(MRN),然后组装和解码MRN文本新闻消息,在控制台/Jupyter notebook中显示新闻。
机器可读新闻(Refinitiv Machine Readable News-MRN)是一项先进的服务,用于消费和系统分析新闻自动化。它直接向您的应用程序提供深厚的历史新闻档案、超低延迟的结构化新闻和新闻分析。这使算法能够利用新闻的力量来抓住机会,充分利用市场的低效率,并管理事件风险。
更新(截至2021年12月)。GitHub示例的主/主分支现在支持Refinitiv实时云平台(Refinitiv Real-Time -- Optimized, RTO-以前称为ERT in Cloud)连接。你不需要再切换到ERT in Cloud分支。
- RTO控制台的例子:请查看我同事的Refinitiv-API-Samples/Example.WebSocketAPI.Python.MRN.RTO GitHub 源。
- RTO笔记本的例子:mrn_notebook_app_rto.ipynb笔记本文件。
- 已部署的Refinitiv实时数据平台(RTDS)的例子是:mrn_console_app.py控制台应用程序和mrn_notebook_app.ipynb笔记本文件。
WebSocket API概览和先决条件
WebSocket API是直接访问的工具,能够轻松集成到众多的客户端技术环境中,如脚本和网络。 该API直接运行在你的Refinitiv实时数据平台基础设施或云端,并以开放(JSON)的可读格式呈现数据。
本示例仅侧重于Refinitiv机器可读新闻(MRN)的数据处理。如果你不熟悉WebSocket API,我强烈建议你查看WebSocket API教程页面。
教程页面为那些有兴趣开发WebSocket应用程序以消费Refinitiv Real-Time实时数据的开发者提供了一个逐步指南(连接、登录、请求数据、解析数据等)。教程还涵盖了Refinitiv Real-Time Optimized(以前称为ERT in Cloud)和已部署的实时数据平台Refinitiv Real-Time Distribution System基础设施连接场景。
MRN概览
MRN通过Refinitiv Real-Time发布,使用新闻文本分析领域消息中的开放消息模型(OMM)封套。实时新闻内容集通过MRN_STORY RIC提供。内容数据包含在一个FRAGMENT字段中,该字段已被压缩并可能被分割成多个消息,以减少带宽和消息大小。
一个FRAGMENT字段基于连接类型的不同而有不同的数据类型:
- RSSL连接(RTSDK C++/Java):BUFFER类型
- WebSocket连接:Base64 ASCII字符串
数据要经过以下一系列的转换。
- 核心内容数据是一个UTF-8的JSON字符串;
- 这个JSON字符串使用gzip进行压缩;
- 压缩后的JSON被分割成几个片段(BUFFER或Base64 ASCII字符串),每个片段都适合于单独一个更新信息。
- 数据片段被添加到一个更新信息中,作为FieldList信封中的FRAGMENT字段值。
因此,为了解析核心内容数据,应用程序将需要逆转这一过程。
数据模型
五个字段以及RIC本身都是必要的,以确定是否收到了整个条目的各种碎片,以及如何将这些碎片串联起来重建条目。
- MRN_SRC:发布FRAGMENT的计分/处理系统的标识。
- GUID:数据项的全球唯一标识符,该数据项的所有消息都有相同的GUID值。
- FRAGMENT:压缩的数据项片段。
- TOT_SIZE:碎片数据的总大小,以字节为单位。
- FRAG_NUM:数据项中片段的序列号,对于每个条目发布的第一个片段,该序列号被设置为1。
并对同一条目的每个后续片段进行递增。一个单一的MRN数据项发布是由RIC名称、MRN_SRC和GUID的组合来唯一标识的。
碎片化
对于一个给定的RIC/MRN_SRC/GUID组合,当一个数据项只需要一个消息时,那么TOT_SIZE将等于FRAGMENT中的字节长度,FRAG_NUM将是1。
当需要多个信息时,那么一旦每个FRAGMENT的字节长度之和等于TOT_SIZE,就可以认为该数据项被完全接收。接收者还将观察到,所有FRAG_NUM的范围是从1到没有跳过中间的整数的片段编号的数字。换句话说,一个通过三个消息传输的数据项将包含1、2和3的FRAG_NUM值。
在进一步处理这个字段之前,WebSocket应用程序还需要将FRAGMENT字段数据从Base64字符串转换为字节数据。Python应用程序可以使用base64 模块将Base64字符串解码为字节数据。
压缩
FRAGMENT字段是用gzip压缩的,因此需要接收者解压以显示该FID中的JSON纯文本数据。
当一个MRN数据项在多个消息中发送时,所有的消息都必须被接收,并在解压前将其所有的碎片连接起来。换句话说,碎片FRAGMENTs不应该被独立解压。
解压后的输出是以UTF-8编码的,格式为JSON。Python应用程序可以使用zlib模块来解压JSON字符串。
如何处理MRN数据
一旦应用程序建立了连接,并从Refinitiv实时数据平台请求MRN RIC数据,应用程序可以通过以下流程处理传入的MRN数据信息:
请在下面的代码演练部分查看更多细节。
代码演练
请求数据
应用程序可以用NewsTextAnalytics域和以下针对MRN的RIC名称订阅MRN数据。
- MRN_STORY:实时新闻
- MRN_TRNA: 新闻分析:公司和C&E资产
- MRN_TRNA_DOC: 新闻分析:宏观经济新闻和事件
- MRN_TRSI: 新闻情绪指数
# Global Default Variables
mrn_domain = 'NewsTextAnalytics'
mrn_item = 'MRN_STORY'
def send_mrn_request(ws):
""" Create and send MRN request """
mrn_req_json = {
'ID': 2,
"Domain": mrn_domain,
'Key': {
'Name': mrn_item
}
}
ws.send(json.dumps(mrn_req_json))
print("SENT:")
print(json.dumps(mrn_req_json, sort_keys=True, indent=2, separators=(',', ':')))
处理刷新信息
MRN刷新响应消息不包含任何新闻或片段信息。它包含相关的数据源或其他静态字段。应用程序只是在控制台中打印出每个传入的字段数据,供参考:
def processRefresh(ws, message_json):
print("RECEIVED: Refresh Message")
decodeFieldList(message_json["Fields"])
def decodeFieldList(fieldList_dict):
for key, value in fieldList_dict.items():
print("Name = %s: Value = %s" % (key, value))
MRN刷新响应信息的示例结果如下:
RECEIVED: Refresh Message
Name = PROD_PERM: Value = 10001
Name = ACTIV_DATE: Value = 2019-07-20
Name = RECORDTYPE: Value = 30
Name = RDN_EXCHD2: Value = MRN
Name = TIMACT_MS: Value = 37708132
Name = GUID: Value = None
Name = CONTEXT_ID: Value = 3752
Name = DDS_DSO_ID: Value = 4232
Name = SPS_SP_RIC: Value = .[SPSML2L1
Name = MRN_V_MAJ: Value = 2
Name = MRN_TYPE: Value = STORY
Name = MRN_V_MIN: Value = 10
Name = MRN_SRC: Value = HK1_PRD_A
Name = FRAG_NUM: Value = 1
Name = TOT_SIZE: Value = 0
Name = FRAGMENT: Value = None
处理更新信息
更新响应消息包含新闻信息和片段数据。首先,应用程序从更新消息中获得FRAGMENT、FRAG_NUM、GUID和MRN_SRC字段的值。我们使用Python的base64模块将FRAGMENT字段值从ASCII字符串解码为字节数据。
def processMRNUpdate(ws, message_json): # process incoming News Update messages
fields_data = message_json["Fields"]
# declare variables
tot_size = 0
guid = None
# Get data for all requried fields
fragment = base64.b64decode(fields_data["FRAGMENT"])
frag_num = int(fields_data["FRAG_NUM"])
guid = fields_data["GUID"]
mrn_src = fields_data["MRN_SRC"]
处理第一个片段/单一片断的新闻
接下来,我们检查是否有一个FRAG_NUM = 1,这意味着它是这个新闻消息的第一个片段。然后应用程序检查是否有一个FRAGMENT字节长度=TOT_SIZE字段。
- 如果相等,这是一个单片段新闻的片段,这个新闻消息已经完成。
- 如果不相等,这是多片段新闻的第一个片段。我们将此更新消息的所有字段数据存储到_news_envelopes列表对象中,并等待下一个片段的到来。
def processMRNUpdate(ws, message_json): # process incoming News Update messages
...
# Get data for all requried fields
...
if frag_num > 1: # We are now processing more than one part of an envelope - retrieve the current details
pass
else: # FRAG_NUM = 1 The first fragment
tot_size = int(fields_data["TOT_SIZE"])
print("FRAGMENT length = %d" % len(fragment))
# The fragment news is not completed, waiting and add this news data to envelop object.
if tot_size != len(fragment):
print("Add new fragments to news envelop for guid %s" % guid)
_news_envelopes.append({ # the envelop object is a Python dictionary with GUID as a key and other fields are data
"guid": guid,
"data": {
"fragment": fragment,
"mrn_src": mrn_src,
"frag_num": frag_num,
"tot_size": tot_size
}
})
return None
处理多片段新闻
当应用程序收到FRAG_NUM>1的更新消息时,意味着这个消息是多片段的新闻。我们通过一个GUID从_news_envelopes列表对象中获得前一个/几个片段的数据。请注意,FRAG_NUM>1的更新消息将包含较少的字段,因为源数据已经包含在该新闻消息的第一个更新消息中(FRAG_NUM=1)。
应用程序还需要通过检查MRN_SRC和FRAG_NUM与以前的片段的顺序来检查收到的片段的有效性。如果收到的片段是有效的,应用程序将收到的FRAGMENT字节数据与之前的片段进行组合,并将总的片段字节长度值与TOT_SIZE值相比较。
- 如果相等,这意味着这个多片段的新闻消息已经完成了。
- 如果不相等,我们更新当前的新闻更新消息字段到_news_envelopes列表对象,并等待更多的片段。
def processMRNUpdate(ws, message_json): # process incoming News Update messages
...
# Get data for all requried fields
...
if frag_num > 1: # We are now processing more than one part of an envelope - retrieve the current details
guid_index = next((index for (index, d) in enumerate(
_news_envelopes) if d["guid"] == guid), None)
envelop = _news_envelopes[guid_index]
if envelop and envelop["data"]["mrn_src"] == mrn_src and frag_num == envelop["data"]["frag_num"] + 1:
print("process multiple fragments for guid %s" % envelop["guid"])
# Merge incoming data to existing news envelop and getting FRAGMENT and TOT_SIZE data to local variables
fragment = envelop["data"]["fragment"] = envelop["data"]["fragment"] + fragment
envelop["data"]["frag_num"] = frag_num
tot_size = envelop["data"]["tot_size"]
print("TOT_SIZE = %d" % tot_size)
print("Current FRAGMENT length = %d" % len(fragment))
# The multiple fragments news are not completed, waiting.
if tot_size != len(fragment):
return None
# The multiple fragments news are completed, delete assoiclate GUID envelop
elif tot_size == len(fragment):
del _news_envelopes[guid_index]
else: # FRAG_NUM = 1 The first fragment
...
处理一个已完成的新闻FRAGMENT消息
为了解压缩内容并获得JSON字符串的新闻消息,我们使用Python的zlib模块来解压缩gzip字节数据,然后我们将其解析为JSON消息。
def processMRNUpdate(ws, message_json): # process incoming News Update messages
...
# Get data for all requried fields
...
if frag_num > 1: # We are now processing more than one part of an envelope - retrieve the current details
...
else: # FRAG_NUM = 1 The first fragment
...
# News Fragment(s) completed, decompress and print data as JSON to console
if tot_size == len(fragment):
print("decompress News FRAGMENT(s) for GUID %s" % guid)
decompressed_data = zlib.decompress(fragment, zlib.MAX_WBITS | 32)
print("News = %s" % json.loads(decompressed_data))
示例结果
该示例应用程序支持控制台、Docker 和经典的Jupyter Notebook(classic Jupyter Notebook )环境。请查看源代码项目中的README.md文件,以了解关于如何在每种环境下运行应用程序的更多细节。
注意:新闻信息是UTF-8 JSON字符串格式。由于操作系统的限制,一些包含特殊Unicode字符的新闻消息可能无法在Windows操作系统控制台(cmd、git bash、powershell等)显示。这些信息将被打印为UnicodeEncodeError异常。无法在控制台中解码Unicode字符的信息。
RECEIVED:
[
{
"DoNotCache":true,
"DoNotConflate":true,
"DoNotRipple":true,
"Domain":"NewsTextAnalytics",
"Fields":{
"ACTIV_DATE":"2019-08-14",
"FRAGMENT":"H4sIAAAAAAAC/41W727iRhD/3qcY+UN1lRJigzFg6dQScAg5Y6jtkN6VqlrwAtvYa253HYKqvkufpU/WWa+T0JPaXqQws7Mzs/Pnt7P+3SK5mmaWb/E4iex2t3NHrAuLVBmjfEOl5f9sRQs/TIJojPKGtX65sNZldkKzm1LAmkoFgsoqVxKOe8rhIBhXjO9A7ZkEwnlZobeCcnUBh5wSSWGTs80jlBxyxh/RRV4e/RXfK3Xwr64O2bYlaKWokK1NWVztVZFzepSvTIvIw/fsvduxu4NNZ721aafXdZ1vq/eV4L7WKHL/zIPftp2B3Xdc/y3PFV/xOEogqoo1FeBDLV7xW8ozNMPQhjnac6LYE4VECaIgxUwX4WjFHReG1a7Cpfa84n/9af5vsVBBnMwjmEbLIElnQZTCzX00TiCczqZpMD7XGYZpEEfDdLoMIEnjYRpMpkECaXyfpOackO5IDgFWU51gmmEF2ZZR4UPb6fRt+84Lw/lo2Bndj2+8szC+DO8/cqI7RiWkQisf8g28W1lqT2FUFgfCTyvruxUf7QnfUVAljJmgG4U9n/JtKQp0U3Jz4pQD2WxKkRHsNByZ2kPIZA2CuMopDFpey3Ev4Mz3KzAkSrG4MwETQfakWPF5nonylKGfPEdwADkcSoQUzYAgnoCX/JI+001Vp5G9BFVu4Xo2h0lerrFoSUHynAoMvz5OZ6nzq0Oj2y2awFaUBTgwR2sNAV2r1lsRNba3lcCIBbC3fN8gjP79pu00p5zALRFP9GQsCc80vNd0T/IthnbehIRuBFVEMB0mFU9M1yBkBcMMV/xl96Qr/vWdW/EUozjsS041lm3o9Z0+5tTumhhDUgkC6b4siNT9esJrq69k42ERw4xwstMVuyO8kmdHG+VSyH85w+t4uKPv+lmdAJcHUT6xDNu2PgHeNdN+fT1BmrR1z7QsLHmmC6NKnArB86ZGXEvbaDfYfnRk3GjtG8YRZrp6I7SrsJPDSu1LoS8J1oygoMbJQrBC1/EMrSirQ9ItrX3dc111+IBIzcqiBSkVhay7t0HfTNtInG45MQOtrI0qbL9WyRDigq2r2nOdyhc1KMhJR5+fWvC/cCq5wshXXHD5Qy7pTg8ueFcQlqvSPxfilQR09sQkQ/Xj8djCXaPdDNAz2RVqN6DGCb5lQqqRoIgePfU14i/t/qXjpnbf73b8rt0aOPYn1NxTkuFspqgVBxO4/CcSm3mI4teRMKabnIg6JzRn2v3rrP3VTN9296fPd/chG39y6cfQO8nlLpuxeed5xx7j43IcTj5MqaOtuVR6kMj5Vj9Bcew7tu3pF2jqB/fxVTCM61coR5hUiFk8i+pTCyolLtPTAUVtXLOiWViKPqur58tDThi/3LJnzP/CavApcD9K9NumZdU6UURV+PhZGIXyK0nWOcUdWa1/w1zrV/Had9pdlCF1e4Z6A6dm2l5DezbSid+pf3t3NRni78x3fhwb+hDVtNM25LYmuvwL320Puv2B53l6L/Zvh0naCuudjus6rtdz3V6vXndt/ed5g/7bstdtu9ow8J36172uiZcasqxJYIJY2oZ+6jW031ATXPu6DuDudtLSgqjtL+IgSWLDX8+TxDHsLJpMk6RRWYSz+fJFZRiPG51kOhnNX0yHo9RwN9MosfuGx4czPeffNAw3QqHhHoJ7w0yuDUVozBfNOfFy1BhEwUNzIAK58TFbRB8NG5rvGUUeaUI/V/qzx/KdC6sSO+TxG6dzYT0h7hHWX3Fv/vjmb4qautlTCQAA",
"FRAG_NUM":1,
"GUID":"RSN0253Ja_19081425XqJULiDZ4eYL6ysVgdMiO3xgikRwVDLGKIe1",
"MRN_SRC":"HK1_PRD_A",
"MRN_TYPE":"STORY",
"MRN_V_MAJ":"2",
"MRN_V_MIN":"10",
"TIMACT_MS":35640745,
"TOT_SIZE":1335
},
"ID":2,
"Key":{
"Name":"MRN_STORY",
"Service":"ELEKTRON_DD"
},
"PermData":"AwhCEBVM",
"SeqNumber":13678,
"Type":"Update",
"UpdateType":"Unspecified"
}
]
FRAGMENT length = 1335
decompress News FRAGMENT(s) for GUID RSN0253Ja_19081425XqJULiDZ4eYL6ysVgdMiO3xgikRwVDLGKIe1
News = {'altId': 'nRSN0253Ja', 'audiences': ['NP:LSEND', 'NP:LSEN'], 'body': 'For best results when printing this announcement, please click on link below:\nhttp://pdf.reuters.com/htmlnews/htmlnews.asp?i=43059c3bf0e37541&u=urn:newsml:reuters.com:20190814:nRSN0253Ja\n\nRNS Number : 0253J\nHenderson Alternative Strat Tst PLC\n14 August 2019\n\xa0\n\xa0\nHENDERSON INVESTMENT FUNDS LIMITED\nHENDERSON ALTERNATIVE STRATEGIES TRUST PLC\nLegal Entity Identifier: 213800J6LLOCA3CUDF69\n\xa0\n\xa0\n14 August 2019\nHenderson Alternative Strategies Trust plc ("the Company")\nChange to Director Information\n\xa0\nIn accordance with Listing Rule 9.6.14, the Company announces that Mr Graham\nOldroyd will be appointed as a non-executive director of BMO Global Smaller\nCompanies plc with effect from 1 October 2019.\n\xa0\n\xa0\nFor further information, please call:\n\xa0\nHelena Harvey\nFor and on behalf of\nHenderson Secretarial Services Limited\nSecretary to Henderson Alternative Strategies Trust plc\nTelephone: 020 7818 2025\n\xa0\nLaura Thomas\nInvestment Trust PR Manager\nJanus Henderson Investors\nTelephone: 020 7818 2636\nThis information is provided by RNS, the news service of the London Stock Exchange. RNS is approved by the Financial Conduct Authority to act as a Primary Information Provider in the United Kingdom. Terms and conditions relating to the use and distribution of this information may apply. For further information, please contact\nrns@lseg.com (mailto:rns@lseg.com)\n or visit\nwww.rns.com (http://www.rns.com/)\n.\n\xa0\n', 'firstCreated': '2019-08-14T08:53:50.910Z', 'headline': 'REG - Henderson Alt Strat - Director Declaration', 'id': 'RSN0253Ja_19081425XqJULiDZ4eYL6ysVgdMiO3xgikRwVDLGKIe1', 'instancesOf': ['RR:1006', 'NI:EUR/EARN'], 'language': 'en', 'messageType': 2, 'mimeType': 'text/x-plain-fixed', 'provider': 'NS:LSE', 'pubStatus': 'stat:usable', 'subjects': ['B:125', 'B:147', 'B:1691', 'B:261', 'B:270', 'G:3', 'G:7J', 'G:A', 'M:1QD', 'M:1WN', 'M:32', 'M:3H', 'M:Z', 'P:4295896662', 'R:HAST.L', 'P:34414674477', 'P:5000006698', 'P:5000075242', 'E:1', 'E:4B', 'E:6T', 'E:6V', 'E:E', 'M:1V0', 'M:1Z7', 'M:1Z8', 'M:1ZN', 'M:2B', 'R:JHG.N', 'N2:PRESSR', 'N2:BOSS1', 'N2:MNGISS', 'N2:PPLMOV', 'N2:BOARD1', 'N2:SIGCOR', 'N2:BACT', 'N2:FINS08', 'N2:INVT08', 'N2:INVT', 'N2:FINS', 'N2:CINV', 'N2:WEU', 'N2:GB', 'N2:EUROP', 'N2:SRVCS', 'N2:NEWR', 'N2:REG', 'N2:CMPNY', 'N2:LEN'], 'takeSequence': 1, 'urgency': 3, 'versionCreated': '2019-08-14T08:53:50.910Z'}
相关的应用示例
- 带有新闻标题和故事显示的网络应用程序示例,请参考:MRN WebSocket JavaScript example on GitHub。
- 使用C#编写的GUI示例,请参考:MRN WebSocket C# NewsViewer example on GitHub.
- Refinitiv-API-Samples/Example.WebSocketAPI.Python.MRN.RTO 源自GitHub Repository.
- 新闻分析示例:Refinitiv-API-Samples/Example.WebSocketAPI.Python.MRN.TRNA 源自GitHub Repository。
参考资料
欲知更多详情,请查看以下资源:
- Refinitiv Real-Time & Distribution Family page on the Refinitiv Developer Community website.
- WebSocket API page.
- Developer Webinar Recording: Introduction to Electron WebSocket API.
- Introduction to Machine Readable News with WebSocket API.
- Machine Readable News (MRN) & N2_UBMS Comparison and Migration Guide.
- Introduction to Machine Readable News (MRN) with Enterprise Message API (EMA).
- MRN Data Models and Real-Time SDK Implementation Guide.
- MRN WebSocket JavaScript example on GitHub.
- MRN WebSocket C# NewsViewer example on GitHub.
对于与本例或WebSocket API有关的任何问题,请使用开发者社区问答论坛(Q&A Forum)。
下载: