
“我不想学习一个API,我只想将一些数据发布到Refinitiv”
在客户现场工作期间,我经常遇到一类不需要使用Refinitiv数据的开发人员。他们是一些内部生成的实时数据的所有者,他们希望共享这些数据 - 供其他用户在内部或外部使用 - 但不想学习API来共享数据。
如果您是这些开发人员之一,或者只是想了解如何向Refinitivg贡献数据,请继续阅读....
数据发布
通常,一个组织会生成一些定价数据或其他数据,他们希望或者提供给内部用户使用,或者内部使用并且共享给外部金融市场。
这些价格或其他类型的数据先在内部生成,然后提交给Refinitiv实时系统。
如果仅用于内部使用,这些数据将存储在内部实时缓存中 - 内部用户可以通过使用内部通信的RIC(代码名称)请求相关代码来使用数据。
如果是供外部使用,Refinitiv实时系统会将这些数据转发给贡献引擎,该引擎将负责将这些数据转发给Refinitiv,Refinitiv将在其Elektron实时数据源上发布价格。然后,Elektron 的外部用户可以通过请求相关的 RIC 代码来使用数据。
我在开场白中使用了“发布”这个词,但之后又使用了“贡献”这个词。这是因为许多开发人员在谈到发布时,他们的意思是贡献。在 Refinitiv Real-Time 实时数据场景中,“发布”意味着不同的东西,这超出了本指南的范围。
发布(或插入)数据
用于贡献数据的编程功能称为“发布” – 在旧版的API中也被称为“插入”。
直到最近,如果您想执行Post(或Insert),您必须花一些时间学习我们的API之一,以便执行必要的步骤来达到:
- 连接到服务器
- 使用您的账号密码登录
- 将需要 Post 的有效负载编码为 API 特定的对象/格式
- 提交发布内容到服务器
- (可选)处理确认发布内容被接受的确认消息
很多客户端开发人员只会选取一个示例应用程序,然后根据他们的贡献需求对其进行破解 - 但没有时间了解该应用程序的工作原理。
随着我们新的Websocket接口的发布(在最新版本的Refinitiv实时数据系统上公开),这个过程变得更加简单。您不需要了解 API 细节 - 您可以使用基于标准的 Websocket 连接和 JSON 格式。
您需要了解一些 Refinitiv 特定知识 - 即您需要为服务器登录请求而编码JSON 消息的格式以及 Post 消息有效负载本身的格式。
虽然您可以使用任何支持JSON和websocket连接的编程语言,但我将在本指南中使用Python。
先决条件
为了完成本指南并成功将一些数据发布到Refinitiv或内部缓存,您将需要在PC上执行以下操作:
- Python安装 - 我已经用Python v3.7测试了以下代码
- 安装了“Websocket-client” Python 模块
- 下载本文附带的示例源代码(请参阅本页右上角的链接)。
您还需要从内部市场数据团队获得以下内容:
- 能使用 v3.2.1 或更高版本的 ADS 组件访问 Refinitiv 实时系统的权限(您将连接到的服务器)
- ADS 上的主机名以及和ADS连接的Websocket 接口的端口号
- DACS 用户名,具有将数据发布(贡献)到以下内容的正确权限:
- 要发布到的服务(数据源)的名称
- 可以供您安全发布的一个或多个测试 RIC(代码名称)
连接和登录
假设您已经满足上述所有先决条件,我们需要做的第一件事就是建立与ADS服务器的Websocket连接。
创建与服务器的 Websocket 连接
我们希望创建一个与服务器的长期连接,因此我们使用WebSocketApp(来自websocket-client模块),它是Websocket的一个封装器,提供一个事件驱动接口。
我们通过调用我在示例中定义的 connect() 方法来执行此操作:
wsa = connect("myADS",15000, "umer.nalla")
我正在传入 myADS 的主机名、端口 15000 和我的 DACS 用户名。此时不需要用户名,但该方法会将其存储在全局变量中以供以后使用。您显然需要将这些替换为您从市场数据团队获得的。
def connect(hostname, port, username, pos = socket.gethostbyname(socket.gethostname()), appid = 256):
""" Called to connect to server """
global app_id, user, position,web_socket_app
# appid may be allocated by your Market Data team, otherwise default to 256
app_id = appid
user = username
position = pos
# the above values are stored for later usage when we attempt login.
# Start websocket handshake
ws_address = "ws://{}:{}/WebSocket".format(hostname, port)
print("Connecting to WebSocket " + ws_address + " ...")
web_socket_app = websocket.WebSocketApp(ws_address, header=['User-Agent: Python'],
on_message=on_message,
on_error=on_error,
on_close=on_close,
subprotocols=['tr_json2'])
# callback for once websocket is open - which will send the Login request
web_socket_app.on_open = on_open
# Create Thread for WebsocketApp processing
wst = threading.Thread(target=web_socket_app.run_forever)
wst.start()
首先要注意的是连接地址ws_address是使用主机名和端口形成的,例如myADS:15000,其中myADS是主机名,15000是ADS监听websocket连接请求的端口。
接下来,我们创建一个 WebsocketApp 实例,并提供被以下事件类型激发的回撤方法:
- 一旦websocket连接建立 - on_open
- 当websocket关闭时 – on_close
- 如果发生错误 – on_error
- 当从 ADS 收到消息时 – on_message
另请注意,我们将子协议设置为tr_json2 - 以确保Websocket连接的Sec-WebSocket-Protocol标头值为tr_json2 - 这是ADS所期望的。
如果您决定使用不同的websocket库/语言,则上述代码无疑会有所不同 - 但关键点是我们需要处理上述事件类型。
调用上述连接方法后,我们等待建立websocket连接并成功登录响应:
print ("Waiting for Login response")
while not logged_in and (not shutdown_app):
time.sleep(1)
一旦websocket建立连接后登录ADS
一旦我们的应用程序和ADS之间建立了websocket连接,我们要做的第一件事就是向ADS发送登录请求。
def on_open(ws):
""" Called when handshake is complete and websocket is open. Now send login """
global web_socket_open
web_socket_open = True
send_login_request(ws)
因此,一旦websocket连接建立,WebSocketApp应该马上显示on_open。在这里,我们设置一个统一化的标志来指示 websocket 已打开,然后编码并向 ADS 发送 JSON 登录请求消息。
def send_login_request(ws, is_refresh_token=False):
""" Generate a login request and send """
# Set values for ADS login
# Note StreamID is 1 and the Domain is Login
login_json = {
'ID': 1,
'Domain': 'Login',
'Key': {
'Name': '',
'Elements': {
'ApplicationId': '',
'Position': ''
}
}
}
login_json['Key']['Name'] = user
login_json['Key']['Elements']['ApplicationId'] = app_id
login_json['Key']['Elements']['Position'] = position
ws.send(json.dumps(login_json))
因此,我们创建一个 JSON 对象并设置以下值:
- 数据流ID - 应用程序和服务器之间的每个请求(&响应)的唯一标识符,使用值1作为登录请求
- 用户名 – 通常称为 DACS 用户名(DACS 是 Refinitiv Real-Time 使用的身份验证和授权系统)。
- 应用程序 ID – 由您的市场数据团队分配的ID,否则使用默认值 256
- 位置 - 运行应用程序的PC的本地IP地址/主机名
有些组织的 DACS 政策坚持使用非默认应用程序 ID 来执行成功登录 - 因此请与您的市场数据团队核实要求。
然后,我们通过 websocket 将 JSON 消息发送到 ADS。
传出login_json对象应如下所示:
{
"Domain":"Login",
"ID":1,
"Key":{
"Elements":{
"ApplicationId":"256",
"Position":"101.43.2.193"
},
"Name":"umer.nalla"
}
发送登录请求后,我们可以期望服务器以JSON消息的形式通过Websocket进行异步响应。
来自 ADS 的成功登录刷新消息将如下所示:
{
"Domain":"Login",
"Elements":{
"MaxMsgSize":61430,
"PingTimeout":30
},
"ID":1,
"Key":{
"Elements":{
"AllowSuspectData":1,
"ApplicationId":"256",
"SupportViewRequests":1
},
"Name":"umer.nalla"
},
"State":{
"Data":"Ok",
"Stream":"Open",
"Text":"Login accepted by host centos7-2."
},
"Type":"Refresh"
}
需要注意的几点:
- 数据流ID为1,对应于我们在登录请求消息中使用的值
- 数据状态为“正常”和“数据流状态为打开” - 确认登录请求已被接受
- “刷新”的类型值 - ADS 发送“刷新”作为对成功请求的初始响应
如果登录请求被ADS拒绝,我们将看到如下内容:
{
"ID": 1,
"Type": "Status",
"Domain": "Login",
"Key": {
"Name": "fred"
},
"State": {
"Stream": "Closed",
"Data": "Suspect",
"Code": "UserUnknownToPermSys",
"Text": "fred, unknown to system."
}
}
因此,对于登录失败,请注意以下事项:
- 根据我们发送的登录请求,数据流ID为1
- 类型是状态(而不是刷新)
- 数据流状态为关闭(非打开)
- 数据状态为可疑(不是OK)
如果您确实收到状态类型响应,请联系您的内部市场数据团队,并提供状态消息的详细信息,包括代码和文本值。
如果您还记得,我们之前指定了一个回撤来处理我们通过 websocket 收到的消息:
def on_message(ws, message):
""" Called when message received, parse message into JSON for processing """
print("RECEIVED: ")
message_json = json.loads(message)
print(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':')))
for singleMsg in message_json:
process_message(ws, singleMsg)
默认情况下,ADS 可以在单个 websocket 消息中发送多个 JSON 消息。因此,我们循环访问每个消息,并为每个消息调用process_message处理:
def process_message(ws, message_json):
global shutdown_app
""" Extract Message Type and Domain"""
message_type = message_json['Type']
if 'Domain' in message_json:
message_domain = message_json['Domain']
# check for a Login Refresh response to confirm successful login
if message_type == "Refresh" and message_domain == "Login":
global logged_in
logged_in = True
print ("LOGGED IN")
elif message_type == "Ping":
pong_json = { 'Type':'Pong' }
ws.send(json.dumps(pong_json))
print("SENT:")
print(json.dumps(pong_json, sort_keys=True, indent=2, separators=(',', ':')))
elif message_type == "Status" and message_domain == "Login":
# A Login Status message usually indicates a problem - so report it and shutdown
if message_json['State']['Stream'] != "Open" or message_json['State']['Data'] != "Ok":
print("LOGIN REQUEST REJECTED.")
shutdown_app = True
elif message_type == "Error":
shutdown_app = True
process_message方法会根据消息类型执行一些操作:
- 如果收到刷新类型消息,我们将设置logged_in标志以指示登录成功。
- 如果收到状态或错误消息,我们会将应用程序标记为关闭。
- 当ADS向我们发送Ping消息时,我们会发送Pong响应 - 以确认应用程序仍在运行。
假设 Login 响应成功,回到 _main_ 方法后,由于logged_in标志设置为 True,等待循环应已退出。因此,我们现在可以继续并开始将数据发布到ADS。
将数据发布到ADS
正如我在开始时提到的,本指南针对的是不需要使用Refinitiv Real-Time的任何数据的开发人员 - 只需Post即可。针对此类要求的适宜发布技术称为“非数据流发布/非即时发布”。对于想要使用他们要发布到的工具的开发人员,还有一种称为“在线发布”的技术 - 请参阅本指南末尾,以获取涵盖这两种技术的教程的链接。
在计算机科学中,Stream通常被定义为“在一段时间内可用的列队数据 - 每个单点数据单独到达(而不是批量)”
当我们登录ADS时,我们建立了一个ID为1的数据流 - 即应用程序和ADS之间流式传输的任何与登录相关的数据都将以1的ID标识。
非即时发布
对于非即时发布,我们不需要订阅要发布的条目或者打开那个数据流。事实上,该条目甚至可能不存在,我们可能希望使用发布来创建该条目。
相反,我们将使用我们已经打开的一个数据流 - 即登录数据流来发送我们的帖子。
因此,假设登录请求成功 - 我们可以使用登录流ID 1发送离线帖子。
svcname = "NIPROV" # Service to Post on - Check with your Market Data team for correct value
ric = "UMER.TST" # RIC to Post / Contribute data to - Check with your Market Data team for Test RICs
bid = 22.1 # A few dummy starting values
ask = 24.5
trdprc = 23.3
# Use a python dict to store our FieldName(key) + Value pairs
fields = { 'BID' : bid, 'ASK' : ask, 'TRDPRC_1' : trdprc, 'GEN_TEXT16' : 'some text' }
# Send our Refresh message to create the RIC or refresh the fields of an existing RIC
send_mp_offstream_post(svcname,ric, fields, True)
在上面的代码中,我们指定服务名称,测试RIC代码,我们创建一个带有虚拟值的几个字段的字典,然后我们调用我们的方法发送刷新类型帖子。
def send_mp_offstream_post(svc,riccode, fields, refresh = False):
global post_id
""" Send an off-stream post message containing market-price content """
mp_post_json = {
'ID': 1,
'Type':'Post',
'Key': {
'Service': svc,
'Name': riccode
},
'Ack':True,
'PostID':post_id,
'PostUserInfo': { # Ask you Market Data team if this is mandated by your organisation
'Address':position, # Use IP address as the Post User Address.
'UserID':os.getpid() # Using process ID as the Post User Id - check with your MD team
},
'Message': {
'ID': 0,
'Type':'Refresh' if refresh else 'Update',
'Fields':fields
}
}
首先要注意的是,有一个外部 Post 类型消息,其中包含内部刷新类型消息。除此之外,请注意以下事项:
- 数据流ID为1 - 即应该打开的登录流
- 外部消息的帖子类型
- 关键内容包括
- 由您的市场数据团队提供的服务
- 名称 - 由您的市场数据团队提供的要发布的工具的RIC代码
- Ack – 为Ture以请求ADS对此发布内容的确认
- PostID - 对于我们发送的每个帖子,此值应该是唯一的 - 我们可以从Ack响应中提取它,以确定Ack与哪个帖子相关。
- PostUserInfo - 您的组织出于审核目的可能需要地址和用户 ID(否则您可以省略这些值)
- 内部消息包含您希望发布的实际数据
- ID为0 - 我们正在发布一个值,因此我们不需要建立新的数据流
- 类型 - 要创建新项目或刷新所有字段,应将其设置为刷新 - 否则使用Update来更新部分字段集
- 字段 - 提供包含字段名称(关键值)+你想向其提供数据的字段的值对(Value pairs)的字典
调用上述方法应会导致将以下 JSON 消息发送到 ADS:
{
"Ack":true,
"Domain":"MarketPrice",
"ID":1,
"Key":{
"Name":"UMER.TST",
"Service":"NIPROV"
},
"Message":{
"Fields":{
"ASK":24.5,
"BID":22.1,
"GEN_TEXT16":"some text",
"TRDPRC_1":23.3
},
"ID":0,
"Solicited":false,
"State":{
"Data":"Ok",
"Stream":"Open"
},
"Type":"Refresh"
},
"PostID":1,
"PostUserInfo":{
"Address":"10.44.12.152",
"UserID":7408
},
"Type":"Post"
}
假设发布被接受,我们应该从 ADS 获得一个异步 Ack 响应:
{
"AckID":1,
"ID":1,
"Key":{
"Name":"UMER.TST",
"Service":"NIPROV"
},
"Type":"Ack"
}
请注意 AckID 1,它与我们在刚刚发送到服务器的 Post 消息中指定的 PostID 1 相对应。我们可以使用此值来确定 Ack 与哪个 Post 相关。因此,我们应该对我们发送的每条Post消息使用唯一的PostId。
由于这只是一个示例,因此 _main_ 方法的其余部分只是按定时间隔发送具有随机价格值的 Update 类型消息。显然,您将在需要时发送“刷新”和“更新”。
发布刷新或更新?
在上面的示例中,我最初发布了一个刷新,然后是按时间间隔更新 - 纯粹是为了演示目的。但是,您选择使用哪种方法将取决于您的要求:
- 如果要在内部缓存服务中创建新条目,可以发布刷新
- 要添加或删除条目中包含的实际字段,您需要使用修订后的字段列表发布刷新
- 如果您遇到一些现已解决的临时数据问题,并希望强制使用者覆盖任何本地缓存的字段,请发送 Refresh。这可确保数据的任何现有使用者获得所有字段的一组干净值
- 要更改现有条目的一个或多个字段的值,您可以发布更新
请注意,如果我们尝试将更新发布到不存在的条目,我们将收到Ack响应,但它使用NakCode,例如,如果我们尝试发布到不存在的RIC代码 'DAVE.TST'会显示:
{
"ID": 1,
"Type": "Ack",
"AckID": 1,
"NakCode": "SymbolUnknown",
"Text": "F44: Unable to find item on post update.",
"Key": {
"Service": "NIPROV",
"Name": "DAVE.TST"
}
}
本指南到此结束 - 总结一下:
- 如果您想将数据贡献到内部缓存或Refinitiv,而无需使用数据,则可以使用Websocket接口以相对直接的方式实现这一点
- 如果您想使用和贡献数据,您仍然可以应用上述大部分知识,通过使用在线发布来实现这一目标 - 如下面的教程链接中所述
- 您应该能够使用任何支持Websockets的语言和编码JSON消息的能力来实现上述内容
其他资源
您可以在链接面板中找到指向源代码、API 和其他相关文章的链接