 
                                 
                                 
                              “我不想学习一个API,我只想将一些数据发布到Refinitiv”
在客户现场工作期间,我经常遇到一类不需要使用Refinitiv数据的开发人员。他们是一些内部生成的实时数据的所有者,他们希望共享这些数据 - 供其他用户在内部或外部使用 - 但不想学习API来共享数据。
如果您是这些开发人员之一,或者只是想了解如何向Refinitivg贡献数据,请继续阅读....
数据发布
通常,一个组织会生成一些定价数据或其他数据,他们希望或者提供给内部用户使用,或者内部使用并且共享给外部金融市场。
这些价格或其他类型的数据先在内部生成,然后提交给Refinitiv实时系统。
如果仅用于内部使用,这些数据将存储在内部实时缓存中 - 内部用户可以通过使用内部通信的RIC(代码名称)请求相关代码来使用数据。
如果是供外部使用,Refinitiv实时系统会将这些数据转发给贡献引擎,该引擎将负责将这些数据转发给Refinitiv,Refinitiv将在其Elektron实时数据源上发布价格。然后,Elektron 的外部用户可以通过请求相关的 RIC 代码来使用数据。
我在开场白中使用了“发布”这个词,但之后又使用了“贡献”这个词。这是因为许多开发人员在谈到发布时,他们的意思是贡献。在 Refinitiv Real-Time 实时数据场景中,“发布”意味着不同的东西,这超出了本指南的范围。
发布(或插入)数据
用于贡献数据的编程功能称为“发布” – 在旧版的API中也被称为“插入”。
直到最近,如果您想执行Post(或Insert),您必须花一些时间学习我们的API之一,以便执行必要的步骤来达到:
- 连接到服务器
- 使用您的账号密码登录
- 将需要 Post 的有效负载编码为 API 特定的对象/格式
- 提交发布内容到服务器
- (可选)处理确认发布内容被接受的确认消息
很多客户端开发人员只会选取一个示例应用程序,然后根据他们的贡献需求对其进行破解 - 但没有时间了解该应用程序的工作原理。
随着我们新的Websocket接口的发布(在最新版本的Refinitiv实时数据系统上公开),这个过程变得更加简单。您不需要了解 API 细节 - 您可以使用基于标准的 Websocket 连接和 JSON 格式。
您需要了解一些 Refinitiv 特定知识 - 即您需要为服务器登录请求而编码JSON 消息的格式以及 Post 消息有效负载本身的格式。
虽然您可以使用任何支持JSON和websocket连接的编程语言,但我将在本指南中使用Python。
先决条件
为了完成本指南并成功将一些数据发布到Refinitiv或内部缓存,您将需要在PC上执行以下操作:
- Python安装 - 我已经用Python v3.7测试了以下代码
- 安装了“Websocket-client” Python 模块
- 下载本文附带的示例源代码(请参阅本页右上角的链接)。
您还需要从内部市场数据团队获得以下内容:
- 能使用 v3.2.1 或更高版本的 ADS 组件访问 Refinitiv 实时系统的权限(您将连接到的服务器)
- ADS 上的主机名以及和ADS连接的Websocket 接口的端口号
- DACS 用户名,具有将数据发布(贡献)到以下内容的正确权限:- 要发布到的服务(数据源)的名称
- 可以供您安全发布的一个或多个测试 RIC(代码名称)
 
连接和登录
假设您已经满足上述所有先决条件,我们需要做的第一件事就是建立与ADS服务器的Websocket连接。
创建与服务器的 Websocket 连接
我们希望创建一个与服务器的长期连接,因此我们使用WebSocketApp(来自websocket-client模块),它是Websocket的一个封装器,提供一个事件驱动接口。
我们通过调用我在示例中定义的 connect() 方法来执行此操作:
    	
            wsa = connect("myADS",15000, "umer.nalla")
        
        
    
我正在传入 myADS 的主机名、端口 15000 和我的 DACS 用户名。此时不需要用户名,但该方法会将其存储在全局变量中以供以后使用。您显然需要将这些替换为您从市场数据团队获得的。
    	
            def connect(hostname, port, username, pos = socket.gethostbyname(socket.gethostname()), appid = 256):
    
    """ Called to connect to server """
    
    global app_id, user, position,web_socket_app
    # appid may be allocated by your Market Data team, otherwise default to 256
    app_id = appid
    user = username
    position = pos
    # the above values are stored for later usage when we attempt login.
 
    # Start websocket handshake
    ws_address = "ws://{}:{}/WebSocket".format(hostname, port)
    print("Connecting to WebSocket " + ws_address + " ...")
    web_socket_app = websocket.WebSocketApp(ws_address, header=['User-Agent: Python'],
                                        on_message=on_message,
                                        on_error=on_error,
                                        on_close=on_close,
                                        subprotocols=['tr_json2'])
    
    # callback for once websocket is open - which will send the Login request
    web_socket_app.on_open = on_open
 
    # Create Thread for WebsocketApp processing
    wst = threading.Thread(target=web_socket_app.run_forever)
    wst.start()
        
        
    
首先要注意的是连接地址ws_address是使用主机名和端口形成的,例如myADS:15000,其中myADS是主机名,15000是ADS监听websocket连接请求的端口。
接下来,我们创建一个 WebsocketApp 实例,并提供被以下事件类型激发的回撤方法:
- 一旦websocket连接建立 - on_open
- 当websocket关闭时 – on_close
- 如果发生错误 – on_error
- 当从 ADS 收到消息时 – on_message
另请注意,我们将子协议设置为tr_json2 - 以确保Websocket连接的Sec-WebSocket-Protocol标头值为tr_json2 - 这是ADS所期望的。
如果您决定使用不同的websocket库/语言,则上述代码无疑会有所不同 - 但关键点是我们需要处理上述事件类型。
调用上述连接方法后,我们等待建立websocket连接并成功登录响应:
    	
            print ("Waiting for Login response")
 while not logged_in and (not shutdown_app):
     time.sleep(1)
        
        
    
一旦websocket建立连接后登录ADS
一旦我们的应用程序和ADS之间建立了websocket连接,我们要做的第一件事就是向ADS发送登录请求。
    	
            def on_open(ws):
    """ Called when handshake is complete and websocket is open. Now send login """
    global web_socket_open
    web_socket_open = True
    send_login_request(ws)
        
        
    
因此,一旦websocket连接建立,WebSocketApp应该马上显示on_open。在这里,我们设置一个统一化的标志来指示 websocket 已打开,然后编码并向 ADS 发送 JSON 登录请求消息。
    	
            def send_login_request(ws, is_refresh_token=False):
    """ Generate a login request and send """
    # Set values for ADS login
    # Note StreamID is 1 and the Domain is Login
    login_json = {
        'ID': 1,
        'Domain': 'Login',
        'Key': {
            'Name': '',
            'Elements': {
                'ApplicationId': '',
                'Position': ''
            }
        }
    }
 
    login_json['Key']['Name'] = user
    login_json['Key']['Elements']['ApplicationId'] = app_id
    login_json['Key']['Elements']['Position'] = position
    
     ws.send(json.dumps(login_json))
        
        
    
因此,我们创建一个 JSON 对象并设置以下值:
- 数据流ID - 应用程序和服务器之间的每个请求(&响应)的唯一标识符,使用值1作为登录请求
- 用户名 – 通常称为 DACS 用户名(DACS 是 Refinitiv Real-Time 使用的身份验证和授权系统)。
- 应用程序 ID – 由您的市场数据团队分配的ID,否则使用默认值 256
- 位置 - 运行应用程序的PC的本地IP地址/主机名
有些组织的 DACS 政策坚持使用非默认应用程序 ID 来执行成功登录 - 因此请与您的市场数据团队核实要求。
然后,我们通过 websocket 将 JSON 消息发送到 ADS。
传出login_json对象应如下所示:
    	
            {
"Domain":"Login",
  "ID":1,
  "Key":{
    "Elements":{
      "ApplicationId":"256",
      "Position":"101.43.2.193"
    },
    "Name":"umer.nalla"
  }
        
        
    
发送登录请求后,我们可以期望服务器以JSON消息的形式通过Websocket进行异步响应。
来自 ADS 的成功登录刷新消息将如下所示:
    	
            {
"Domain":"Login",
 "Elements":{
   "MaxMsgSize":61430,
   "PingTimeout":30
 },
 "ID":1,
 "Key":{
   "Elements":{
     "AllowSuspectData":1,
     "ApplicationId":"256",
     "SupportViewRequests":1
   },
   "Name":"umer.nalla"
 },
 "State":{
   "Data":"Ok",
   "Stream":"Open",
   "Text":"Login accepted by host centos7-2."
 },
 "Type":"Refresh"
}
        
        
    
需要注意的几点:
- 数据流ID为1,对应于我们在登录请求消息中使用的值
- 数据状态为“正常”和“数据流状态为打开” - 确认登录请求已被接受
- “刷新”的类型值 - ADS 发送“刷新”作为对成功请求的初始响应
如果登录请求被ADS拒绝,我们将看到如下内容:
    	
            {
   "ID": 1,
   "Type": "Status",
   "Domain": "Login",
   "Key": {
     "Name": "fred"
   },
   "State": {
     "Stream": "Closed",
     "Data": "Suspect",
     "Code": "UserUnknownToPermSys",
     "Text": "fred, unknown to system."
   }
 }
        
        
    
因此,对于登录失败,请注意以下事项:
- 根据我们发送的登录请求,数据流ID为1
- 类型是状态(而不是刷新)
- 数据流状态为关闭(非打开)
- 数据状态为可疑(不是OK)
如果您确实收到状态类型响应,请联系您的内部市场数据团队,并提供状态消息的详细信息,包括代码和文本值。
如果您还记得,我们之前指定了一个回撤来处理我们通过 websocket 收到的消息:
    	
            def on_message(ws, message):
    """ Called when message received, parse message into JSON for processing """
    print("RECEIVED: ")
    message_json = json.loads(message)
    print(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':')))
    for singleMsg in message_json:
        process_message(ws, singleMsg)
        
        
    
默认情况下,ADS 可以在单个 websocket 消息中发送多个 JSON 消息。因此,我们循环访问每个消息,并为每个消息调用process_message处理:
    	
            def process_message(ws, message_json):
    global shutdown_app
 
    """ Extract Message Type and Domain"""
    message_type = message_json['Type']
    if 'Domain' in message_json:
        message_domain = message_json['Domain']
    # check for a Login Refresh response to confirm successful login
    if message_type == "Refresh" and message_domain == "Login":
        global logged_in
        logged_in = True
        print ("LOGGED IN")
    elif message_type == "Ping":
        pong_json = { 'Type':'Pong' }
        ws.send(json.dumps(pong_json))
        print("SENT:")
        print(json.dumps(pong_json, sort_keys=True, indent=2, separators=(',', ':')))
    elif message_type == "Status" and message_domain == "Login":
        # A Login Status message usually indicates a problem - so report it and shutdown
        if message_json['State']['Stream'] != "Open" or message_json['State']['Data'] != "Ok":
            print("LOGIN REQUEST REJECTED.")
            shutdown_app = True
    elif message_type == "Error":
            shutdown_app = True
        
        
    
process_message方法会根据消息类型执行一些操作:
- 如果收到刷新类型消息,我们将设置logged_in标志以指示登录成功。
- 如果收到状态或错误消息,我们会将应用程序标记为关闭。
- 当ADS向我们发送Ping消息时,我们会发送Pong响应 - 以确认应用程序仍在运行。
假设 Login 响应成功,回到 _main_ 方法后,由于logged_in标志设置为 True,等待循环应已退出。因此,我们现在可以继续并开始将数据发布到ADS。
将数据发布到ADS
正如我在开始时提到的,本指南针对的是不需要使用Refinitiv Real-Time的任何数据的开发人员 - 只需Post即可。针对此类要求的适宜发布技术称为“非数据流发布/非即时发布”。对于想要使用他们要发布到的工具的开发人员,还有一种称为“在线发布”的技术 - 请参阅本指南末尾,以获取涵盖这两种技术的教程的链接。
在计算机科学中,Stream通常被定义为“在一段时间内可用的列队数据 - 每个单点数据单独到达(而不是批量)”
当我们登录ADS时,我们建立了一个ID为1的数据流 - 即应用程序和ADS之间流式传输的任何与登录相关的数据都将以1的ID标识。
非即时发布
对于非即时发布,我们不需要订阅要发布的条目或者打开那个数据流。事实上,该条目甚至可能不存在,我们可能希望使用发布来创建该条目。
相反,我们将使用我们已经打开的一个数据流 - 即登录数据流来发送我们的帖子。
因此,假设登录请求成功 - 我们可以使用登录流ID 1发送离线帖子。
    	
            svcname = "NIPROV"  # Service to Post on - Check with your Market Data team for correct value
ric = "UMER.TST"    # RIC to Post / Contribute data to - Check with your Market Data team for Test RICs
 
bid = 22.1          # A few dummy starting values
ask = 24.5
trdprc = 23.3
 
# Use a python dict to store our FieldName(key) + Value pairs
fields = { 'BID' : bid, 'ASK' : ask, 'TRDPRC_1' : trdprc, 'GEN_TEXT16' : 'some text' }
 
# Send our Refresh message to create the RIC or refresh the fields of an existing RIC
send_mp_offstream_post(svcname,ric, fields, True)
        
        
    
在上面的代码中,我们指定服务名称,测试RIC代码,我们创建一个带有虚拟值的几个字段的字典,然后我们调用我们的方法发送刷新类型帖子。
    	
            def send_mp_offstream_post(svc,riccode, fields, refresh = False):
    global post_id
    """ Send an off-stream post message containing market-price content """
    mp_post_json = {
        'ID': 1,
        'Type':'Post',
        'Key': {
            'Service': svc,
            'Name': riccode
        },
        'Ack':True,
        'PostID':post_id,
        'PostUserInfo': {       # Ask you Market Data team if this is mandated by your organisation
            'Address':position,  # Use IP address as the Post User Address.
            'UserID':os.getpid() # Using process ID as the Post User Id - check with your MD team
        },
        'Message': {
            'ID': 0,
            'Type':'Refresh' if refresh else 'Update',
            'Fields':fields
        }
    }
        
        
    
首先要注意的是,有一个外部 Post 类型消息,其中包含内部刷新类型消息。除此之外,请注意以下事项:
- 数据流ID为1 - 即应该打开的登录流
- 外部消息的帖子类型
- 关键内容包括- 由您的市场数据团队提供的服务
- 名称 - 由您的市场数据团队提供的要发布的工具的RIC代码
 
- Ack – 为Ture以请求ADS对此发布内容的确认
- PostID - 对于我们发送的每个帖子,此值应该是唯一的 - 我们可以从Ack响应中提取它,以确定Ack与哪个帖子相关。
- PostUserInfo - 您的组织出于审核目的可能需要地址和用户 ID(否则您可以省略这些值)
- 内部消息包含您希望发布的实际数据- ID为0 - 我们正在发布一个值,因此我们不需要建立新的数据流
- 类型 - 要创建新项目或刷新所有字段,应将其设置为刷新 - 否则使用Update来更新部分字段集
- 字段 - 提供包含字段名称(关键值)+你想向其提供数据的字段的值对(Value pairs)的字典
 
调用上述方法应会导致将以下 JSON 消息发送到 ADS:
    	
            {
  "Ack":true,
  "Domain":"MarketPrice",
  "ID":1,
  "Key":{
    "Name":"UMER.TST",
    "Service":"NIPROV"
  },
  "Message":{
    "Fields":{
      "ASK":24.5,
      "BID":22.1,
      "GEN_TEXT16":"some text",
      "TRDPRC_1":23.3
    },
    "ID":0,
    "Solicited":false,
    "State":{
      "Data":"Ok",
      "Stream":"Open"
    },
    "Type":"Refresh"
  },
  "PostID":1,
  "PostUserInfo":{
    "Address":"10.44.12.152",
    "UserID":7408
  },
  "Type":"Post"
}
        
        
    
假设发布被接受,我们应该从 ADS 获得一个异步 Ack 响应:
    	
            {
    "AckID":1,
    "ID":1,
    "Key":{
      "Name":"UMER.TST",
      "Service":"NIPROV"
    },
    "Type":"Ack"
}
        
        
    
请注意 AckID 1,它与我们在刚刚发送到服务器的 Post 消息中指定的 PostID 1 相对应。我们可以使用此值来确定 Ack 与哪个 Post 相关。因此,我们应该对我们发送的每条Post消息使用唯一的PostId。
由于这只是一个示例,因此 _main_ 方法的其余部分只是按定时间隔发送具有随机价格值的 Update 类型消息。显然,您将在需要时发送“刷新”和“更新”。
发布刷新或更新?
在上面的示例中,我最初发布了一个刷新,然后是按时间间隔更新 - 纯粹是为了演示目的。但是,您选择使用哪种方法将取决于您的要求:
- 如果要在内部缓存服务中创建新条目,可以发布刷新
- 要添加或删除条目中包含的实际字段,您需要使用修订后的字段列表发布刷新
- 如果您遇到一些现已解决的临时数据问题,并希望强制使用者覆盖任何本地缓存的字段,请发送 Refresh。这可确保数据的任何现有使用者获得所有字段的一组干净值
- 要更改现有条目的一个或多个字段的值,您可以发布更新
请注意,如果我们尝试将更新发布到不存在的条目,我们将收到Ack响应,但它使用NakCode,例如,如果我们尝试发布到不存在的RIC代码 'DAVE.TST'会显示:
    	
            {
    "ID": 1,
    "Type": "Ack",
    "AckID": 1,
    "NakCode": "SymbolUnknown",
    "Text": "F44: Unable to find item on post update.",
    "Key": {
      "Service": "NIPROV",
      "Name": "DAVE.TST"
    }
  }
        
        
    
本指南到此结束 - 总结一下:
- 如果您想将数据贡献到内部缓存或Refinitiv,而无需使用数据,则可以使用Websocket接口以相对直接的方式实现这一点
- 如果您想使用和贡献数据,您仍然可以应用上述大部分知识,通过使用在线发布来实现这一目标 - 如下面的教程链接中所述
- 您应该能够使用任何支持Websockets的语言和编码JSON消息的能力来实现上述内容
其他资源
您可以在链接面板中找到指向源代码、API 和其他相关文章的链接