Tutorial Source Code |
Examples referenced: |
Last Update | July 2021 |
Interpreter | Python 3.7.x or greater |
Pre-requisites | Familiarity with Python and a basic understanding of Jupyter Notebook. If accessing content from:
The majority of the examples are built to work within Jupyter Notebook. Ensure this package is installed. |
Whether accessing from the desktop, directly to the cloud, or through your deployed managed streaming services, the Refinitiv Data Libraries provide ease-of-use interfaces to retrieve real-time streaming content from the platform. By utilizing streaming interfaces, developers can easily define properties to request, receive, and process market data in real-time.
The following tutorial requests for a small watchlist of instruments, retrieving level 1 (MarketPrice) streaming content that captures real-time events in user-defined callbacks. The Pricing interface provides the ability to define callback methods to capture market data events including the initial refresh, status events as well as updates, such as trades and quotes, based on changes in the market. The advantage of these capabilities is for applications to immediately respond to critical changes that drive real-time decisions within their application workflow.
In our example, we're requesting to stream 4 instruments. The goal of the example is to demonstrate how to define callbacks for the different types of events that occur and how to responds to those events. The output provided below dumps the entire initial image captured within the refresh callback. In addition, as market conditions change, events will be sent to an update callback that will demonstrate how to pull out individual elements from the response for display purposes. Each update displays a simple line of text detailing the instrument as well as the fields updated.
Within the source code package, select the source code example within the Content section. To execute the example, refer to the pre-requisites section at the top of this tutorial.
The expected output should look similar to this:
The following code segment requests for a small watchlist of items from the streaming services defined within the platform. The streaming interface provides the ability to define callbacks for the different types of events.
# Function to handle the intial Refresh for each item
def handle_refresh(fields, instrument_name, streaming_prices):
# One way to access data - get snapshot of all instruments in a single dataframe
print(f"Refresh : {streaming_prices.get_snapshot()}")
return
# Function to update dataframe, when we receive updates for individual items
def handle_update(fields, instrument_name, streaming_prices):
# Alternative way of accesing data - access the changed fields for a single instrument
print(f"Update : {instrument_name}:{fields}")
# Function to extract status code for an item as & when received from server
# Status contains a 'code' and a more detailed 'message'
def handle_status(fields, instrument_name, streaming_prices):
print(f"Status : {instrument_name}:{status['code']}:{status['message']}")
# Define the streaming price data we want to retrieve
streams = rd.content.pricing.Definition(
['EUR=', 'GBP=', 'JPY=', 'BADRIC'],
fields=['BID', 'ASK']
).get_stream()
# Callback for if we wanted to handle Refresh for all items
streams.on_refresh(handle_refresh)
# Specify callback handler for any updates
streams.on_update(handle_update)
# Specify callback handler for any status messages
streams.on_status(handle_status)
# Send the requests to the server and open the streams for all items
streams.open()
The above code segment was intentionally broken out into distinct steps for the purpose of explanation, but also provides flexibility when using the interfaces. In general, the above code performs the following:
NOTE: Typically, a status event will be generated when opening the stream (e.g. invalid instrument). However, one can be generated at some later point in time - e.g. an event indicating Suspect state if data goes stale. In this example, we have specified an invalid instrument BADRIC - to generate a status event
When you wish to stop the flow of live updates, you will need to issue a close on the stream. The following code segment will close the stream:
// Close the stream
streams.close()
As outlined in the Streaming Cache tutorial, the Pricing interface will automatically prepare and manage an internal cache of the items defined within the interface. In addition, will include other useful features such as adding and removing items from the cache. Depending on your requirements, you may only require the need to capture real-time events as outlined within this tutorial. If there is no need to utilize the cache feature, we would suggest using the OMM stream interface.
The Pricing interface defined within the Refinitiv Data Library for Python includes other useful features, such as the ability to retrieve real-time prices available within the RDP snapshot pricing service. This capability can be extremely useful for users that may not have access to or require the need to stream prices through real-time streaming services. Refer to the tutorial package for additional examples e.g. TUT_2.2.06-Pricing-StreamingEventsDF.
In addition, we encourage the use of auto-complete within your Jupyter Notebook or Python editor to discover other useful capabilities.