概述
本文研究了路孚特的机器可读新闻产品,讨论了它的特点、最常见的使用案例,并从应用开发者的角度提供了具体的实施细节。
机器可读新闻
路孚特机器可读新闻(MRN)是一项对新闻进行自动订阅和系统分析的先进服务。它直接向您的应用程序提供深厚的历史新闻档案、超低延迟的结构化新闻和新闻分析。这使得算法能够运用新闻的力量来把握机会,利用市场的信息不对称并管理事件风险。
可能的使用案例
那么,如何才能利用这些数据呢?
- 相关性:大量的用例来自于将MRN数据流与另一个数据流相关联,例如,与实时价格流相关联。请参考developers.refinitiv.com上的路孚特实时SDK Java和路孚特实时SDK C++来了解实时价格订阅的详细例子。
- 归档:存储一个固定的时间段内关于一组证券的新闻,以便进一步使用。本文后面提到的一个例子说明了如何使用EclipseLink将数据存储到MySQL数据库。
- 提醒:获得关于可被视为交易事件的异常新闻的通知。异常案例可以在中性或负面的背景下有非常积极的情绪,反之亦然,或者在短时间内有关于某个特定机构的异常数量的新闻。所有这些特征都可以被量化、具体化并建立在您的应用程序的工作流程中。
工具
在这篇文章中,我们将使用企业消息API(EMA)的Java版本,这是一个数据中立的、多线程的、易于使用的API,提供对OMM(Open Message Model)和RWF(Reuters Wire Format)数据的访问。如果您不熟悉EMA,我们强烈建议您查看Java或C/C++的教程。
数据模型
现在让我们开始研究具体细节,首先我们来研究MRN数据是如何形成的。
- 一条核心MRN数据是一个UTF8的JSON字符串;
- JSON字符串使用gzip进行压缩;
- 因为有些数据太大,无法装入单一的消息中,所以压缩后的JSON字符串被分割成若干片段,每个片段都适合于一个RSSL更新(Update)。
- 数据片段作为FieldList包中的FRAGMENT字段值被添加到更新消息中。
因此,为了解析这些数据,我们需要将这个过程倒过来。让我们考虑一下如何识别所有相关的片段,并重新组装一篇单一的新闻报道。
以下五个字段以及RIC本身都是必要的,它们用以确定是否收到了整条数据的各个片段,以及如何将这些片段串联起来重建这条数据:
- MRN_SRC:发布FRAGMENT的评分/处理系统的标识符。
- GUID:该条数据的全球唯一标识符。这条数据的所有消息都有相同的GUID值。
- FRAGMENT:压缩的数据片段本身。
- TOT_SIZE:分片数据的总大小,以字节为单位。
- FRAG_NUM:一条数据里各个片段的序列号。对于发布的每条数据的第一个片段,这个数字被设置为1;对于每条数据的后续片段,这个数字会被递增。
一条MRN数据发布是由RIC、MRN_SRC和GUID的组合来唯一标识的。
对于一个给定的RIC-MRN_SRC-GUID组合,当一条数据只需要一个消息时,TOT_SIZE将等于FRAGMENT的字节数,FRAG_NUM将是1。当需要多个消息时,一旦每个FRAGMENT的字节数之和等于TOT_SUM,就可以认为该条数据已经完全收到。消费者还将观察到所有FRAG_NUM的范围是从1到片段的数量,没有跳过中间的整数。换句话说,一条通过三个消息传输的数据的FRAG_NUM值是1,2,3。
应用程序的高层结构
我们的应用程序是一个消费者,它将:
- 通过直接的接入点(路孚特实时边界设备EED),或通过分布设施,如路孚特实时分布系统(RTDS,即之前的TREP),与数据源(供应商)建立连接;
- 对MRN RIC发出一个或多个订阅请求;
- 注册以获取数据更新,并对其进行解析;
- 注册以获取状态事件并适当地处理它们。
关于编写EMA消费者应用程序的更多细节,请参考《开发者指南》。
让我们来看看使用新闻文本分析域(News Text Analytics)和MRN特定消费发布的数据。首先,我们要请求以下RIC,并订阅它们的更新:
内容集 | RIC |
---|---|
实时新闻 | MRN_STORY |
新闻分析:公司和商品&能源资产 | MRN_NA_ENT0 |
新闻分析:宏观经济新闻和事件 | MRN_NA_DOC |
例子:
OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig();
OmmConsumer consumer = EmaFactory.createOmmConsumer(config.host(_ip + ":" + _port).username(_userName));
ReqMsg reqMsg = EmaFactory.createReqMsg();
for(int i = 0; i<_ricsMRN.length; i++) {
consumer.registerClient(reqMsg.domainType(EmaRdm.MMT_NEWS_TEXT_ANALYTICS).serviceName(
_serviceName).name(_ricsMRN[i]), _appClient, (new Integer(i)));
我们注册以获取如下更新:
- onRefreshMsg
- onUpdateMsg
- onStatusMsg
寻找片段
if (fieldEntry.loadType() == DataTypes.BUFFER) {
if (fieldEntry.fieldId() == FRAGMENT) {
如果只有一个片段,或者组装的长度等于TOT_SIZE,我们就可以转换为JSON,并可能进行格式化打印(pretty-print):
if (fieldEntry.buffer().buffer().array().length == totalSize) {
// there is only one segment, we are ready
// unzip using gzip
String strFlatFrag = unzipPayload(fieldEntry.buffer().buffer().array());
System.out.println("=>FRAGMENT JSON STRING: " + strFlatFrag);
try {
JSONObject jsonResponse = new JSONObject(strFlatFrag);
// pretty-print json response
int spacesToIndentEachLevel = 2;
System.out.println("FRAGMENT JSON PRETTY:\n" + jsonResponse.toString(spacesToIndentEachLevel));
}
catch (Exception e) {
System.err.println("Exception parsing json: " + e);
e.printStackTrace(System.err);
}
}
但如果收到的只是其中一个片段呢?我们应该保留这个片段,直到拥有所有的片段,这些片段存储在一个哈希表中。
Hashtable<String,ArrayList<ByteBuffer>> fragBuilderHash;
...
alFrags = fragBuilderHash.get(guid);
alFrags.add(fieldEntry.buffer().buffer());
fragBuilderHash.put(guid, alFrags);
当fragBuilderHash中的片段的总和长度等于总大小时,我们就可以继续处理了。连接、解压缩、转换为JSON,并准备好将数据投入到一个好的用途。
相关的示例应用程序
· 控制台实例,将数据格式化打印到屏幕上是最简单和最明显的用例。请参考github上的MRN控制台实例。
· GUI浏览器,显示并排的实时新闻(报道)、新闻分析和情绪指数。请参考github上的MRN GUI Viewer。
· 数据库存储,使用EclipseLink存储实现将新闻分析存储到MySQL数据库。请参考github上的MRN数据库存储。
参考资料