How to monitor services in your apps using RFA Java
This article is intended for Java software users who develop RFA Java applications and are planning to monitor the health of services on a server. The users should have a basic understanding of:
- RFA Java configuration
- RFA Event Dispatch Mechanism
- RFA Logging
- RDM Login and event registration
These can be learnt from RFA Java Tutorials 2, 3-6 in Refinitiv Developer Community website. In addition, the general techniques outlined below could be applied to any of Refintiv Real-Time streaming APIs e.g. RFA.NET/C++, ETA C++/Java.
Overview
Refinitiv Real-Time Distribution System (will be refered as "infrastructure") facilitates interactions between many different service provider applications and consumer applications. While provider applications implement services and expose a certain set of capabilities or contents e.g. Market Price (level 1 data) or Market by Order (level 2 data) , consumer applications use this content to serve their business requirements. Consumer applications can consume content which provided by the provider applications via infrastructure.
ADH (Advanced Data Hub) is a data distribution server that runs in the infrastructure. It consumes content from provider applications and reliably fans these contents out to ADS(s) over a backbone network
ADS (Advanced Distribution Server) is one component of infrastructure. It interacts with consumer applications directly to deliver content that the consumer applications request.
Source Directory Domain is one of Refinitiv Domain Models (RDMs) provided or consumed by a Refinitiv product, such as Refinitiv Real-Time Distribution System, Data Feed Direct, or Refinitiv Real-Time. It is used to advertise information about available services and their state(Up or Down), quality of service (QoS), and capabilities. Therefore, the application can monitor the services if they are available or not by requesting Source Directory to the server (ADS) providing the services. For example, a Consumer application connects to ADS which provides the services as the figure below:
f the application requests Source Directory to ADS, it can monitor the four services(OMM Non-Interactive Provider, OMM Interactive Provider, Data Feed Direct\Exchange and Refinitiv Real-Time Edge Device\Refinitiv Real-Time) provided by ADS. The application will be notified if there is a change in any service e.g. Refinitiv Real-Time Edge Device goes down later.
Solution
RFA Java consumer applications can be implemented to monitor the services by requesting Source Directory to the server(ADS) as a simple diagram below:
A consumer can request information about all services by omitting OMMAttribInfo.ServiceName information. Because the Source Directory domain uses a OMMFilterList, a consumer can indicate the specific source related information in which it is interested via an OMMAttribInfo.Filter. Each bit value represented in the filter corresponds to an information set that can be provided in response messages.
Refinitiv recommends that a consumer application minimally request RDMSERVICE.Filter.INFO and RDMSERVICE.Filter.STATE for the Source Directory:
- The Info filter contains the ServiceName and ServiceID information for all available services with theirs available data.
- The State filter contains current state for the service whether the service is up(available) or down (unavailable).
Source Directory Request Message
The message that the consumer application sends to the server contains the components below:
COMPONENT |
DESCRIPTION / VALUE |
---|---|
MsgModelType | Required. RDMMsgTypes.DIRECTORY = 4 |
IndicationFlag | Required.
|
QoS | Not used. |
Priority | Not used. |
ExtendedHeader | Not used. |
OMMAttribInfo.Filter | Required. A filter indicates the specific information in which a consumer is interested. Available categories include:
|
OMMAttribInfo.NameType | Not used. |
OMMAttribInfo.Name | Not used. |
OMMAttribInfo.ServiceName | Optional.
|
OMMAttribInfo.ServiceID | Optional.
|
OMMAttribInfo.Id | Not used. |
OMMAttribInfo.Attrib | Not used. Only getAttrib() exists. Attrib implicitly get populated by OMMEncoder. |
OMMAttribInfo.Attrib | Not used. Only getPayload() exists. Payload implicitly get populated by OMMEncoder. |
- The application will set IndicationFlag to OMMMsg.Indication.REFRESH to make a streaming request, so it receives Update messages when any service changes after making source directory request as well.
- The application will set OMMAttribInfo.Filter to RDMSERVICE.Filter.INFO and RDMSERVICE.Filter.STATE to get Service Info and State. Both are enough to be used to monitor the services.
The example of a Source Directory Request message which OMMAttribInfo.Filters is RDMSERVICE.Filter.INFO and RDMSERVICE.Filter.STATE is shown below:
Msg Type: MsgType.REQUEST
Msg Model Type: DIRECTORY
Indication Flags: REFRESH
Hint Flags: HAS_ATTRIB_INFO
AttribInfo
Filter: 3 (INFO | STATE)
Payload: None
Source Directory Refresh Message
The message that the consumer application receives from the server contains the components below:
COMPONENT | DESCRIPTION/VALUE |
---|---|
MsgModelType | Required. RDMMsgTypes.DIRECTORY = 4 |
MsgType | Required. OMMsg.MsgType.REFRESH_RESP |
State | Required. |
RespTypeNum | Required.
|
IndicationFlag | Optional:
Not use.
|
QoS | Not used. |
ExtendedHeader | Not used. |
SeqNum | Optional. A user-specified, item-level sequence number which can be used by the application for sequencing messages within this stream. |
ConflationCount | Not used. |
ConflationTime | Not used. |
OMMItemGroup | Not used. |
PermissionData | Not used. |
OMMAttribInfo.Filter | Required. The filter represents the filter entries being provided in this response. When possible, this should match the filter as set in the consumer’s request. |
OMMAttribInfo.NameType | Not used. |
OMMAttribInfo.Name | Not used. |
OMMAttribInfo.ServiceName | Not used. |
OMMAttribInfo.ServiceID | Not used. |
OMMAttribInfo.Id | Not used. |
OMMAttribInfo.Attrib | Not used. Only getAttrib() exists. Attrib implicitly get populated by OMMEncoder. |
Payload | Required. The payload contains information about available services in the form of a OMMMap where each OMMMapEntry.KeyData is one ServiceName. Only getPayload() exists. Payload implicitly get populated by OMMEncoder. |
Service Info and State are in the payload of Refresh Message and we will explain them in the Source Directory Info Filter Entry and Source Directory State Filter Entry section respectively.
An example of a Source Directory Refresh message containing 2 services, API_NEON_NI and ELEKTRON_NEON, with their Service Info and State is shown below:
Msg Type: MsgType.REFRESH_RESP
Msg Model Type: DIRECTORY
Indication Flags: REFRESH_COMPLETE
Hint Flags: HAS_ATTRIB_INFO | HAS_ITEM_GROUP | HAS_RESP_TYPE_NUM | HAS_STATE
State: OPEN, OK, NONE, ""
Group: 0000
RespTypeNum: 0 (RespType.SOLICITED)
AttribInfo
Filter: 3 (INFO | STATE)
Payload: 561 bytes
MAP
flags:
MAP_ENTRY (Action.ADD) :
flags:
Key: API_NEON_NI
Value:
FILTER_LIST
flags:
FILTER_ENTRY 1 (Action.SET) :
flags:
ELEMENT_LIST
ELEMENT_ENTRY Name: API_NEON_NI
ELEMENT_ENTRY Vendor: Reuters
ELEMENT_ENTRY IsSource: 0
ELEMENT_ENTRY Capabilities:
ARRAY
ARRAY_ENTRY: 5
ARRAY_ENTRY: 6
ARRAY_ENTRY: 10
ELEMENT_ENTRY DictionariesProvided:
ARRAY
ARRAY_ENTRY: RWFFld
ARRAY_ENTRY: RWFEnum
ELEMENT_ENTRY DictionariesUsed:
ARRAY
ARRAY_ENTRY: RWFFld
ARRAY_ENTRY: RWFEnum
ELEMENT_ENTRY QoS:
ARRAY
ARRAY_ENTRY: (RT, TbT)
ELEMENT_ENTRY SupportsQoSRange: 0
ELEMENT_ENTRY ServiceID: 4844
FILTER_ENTRY 2 (Action.SET) :
flags:
ELEMENT_LIST
ELEMENT_ENTRY ServiceState: 1
ELEMENT_ENTRY AcceptingRequests: 1
MAP_ENTRY (Action.ADD) :
flags:
Key: ELEKTRON_NEON
Value:
FILTER_LIST
flags:
FILTER_ENTRY 1 (Action.SET) :
flags:
ELEMENT_LIST
ELEMENT_ENTRY Name: ELEKTRON_NEON
ELEMENT_ENTRY Vendor: Reuters
ELEMENT_ENTRY IsSource: 0
ELEMENT_ENTRY Capabilities:
ARRAY
ARRAY_ENTRY: 5
ARRAY_ENTRY: 6
ARRAY_ENTRY: 10
ELEMENT_ENTRY DictionariesProvided:
ARRAY
ARRAY_ENTRY: RWFFld
ARRAY_ENTRY: RWFEnum
ELEMENT_ENTRY DictionariesUsed:
ARRAY
ARRAY_ENTRY: RWFFld
ARRAY_ENTRY: RWFEnum
ELEMENT_ENTRY QoS:
ARRAY
ARRAY_ENTRY: (RT, TbT)
ELEMENT_ENTRY SupportsQoSRange: 0
ELEMENT_ENTRY ServiceID: 4544
FILTER_ENTRY 2 (Action.SET) :
flags:
ELEMENT_LIST
ELEMENT_ENTRY ServiceState: 1
ELEMENT_ENTRY AcceptingRequests: 1
Source Directory Update Message
The message that the consumer application receives from the server when any service changes contains the components below:
COMPONENT | DESCRIPTION / VALUE |
---|---|
MsgModelType | Required. RDMMsgTypes.DIRECTORY = 4 |
MsgType | Required. OMMsg.MsgType.UPDATE_RESP Update will be used for Service and Group status messages. |
State | Not used |
RespTypeNum | Not used, is always 0. |
IndicationFlag | Optional:
Not use.
|
QoS | Not used. |
ExtendedHeader | Not used. |
SeqNum | Optional. A user-specified, item-level sequence number which can be used by the application for sequencing messages within this stream. |
ConflationCount | Not used. |
ConflationTime | Not used. |
OMMItemGroup | Not used. |
PermissionData | Not used. |
OMMAttribInfo.Filter | Optional. The filter represents the filter entries being provided in this response. For an Update, this conveys only the ID values associated with filter entries present in the Update payload. |
OMMAttribInfo.NameType | Not used. |
OMMAttribInfo.Name | Not used. |
OMMAttribInfo.ServiceName | Not used. |
OMMAttribInfo.ServiceID | Not used. |
OMMAttribInfo.Id | Not used. |
OMMAttribInfo.Attrib | Not used. Only getAttrib() exists. Attrib implicitly get populated by OMMEncoder. |
Payload | Required. The payload is OMMMap contains only the changed information associated with the provided services. Only getPayload() exists. Payload implicitly get populated by OMMEncoder. |
Service Info and State are in the payload of Update Message and we will explain them in the Source Directory Info Filter Entry and Source Directory State Filter Entry section respectively.
The example of a Source Directory Update message containing a service which goes down, ServiceState is 0, is shown below:
Msg Type: MsgType.UPDATE_RESP
Msg Model Type: DIRECTORY
Indication Flags: DO_NOT_CONFLATE
Hint Flags: HAS_ATTRIB_INFO | HAS_RESP_TYPE_NUM
RespTypeNum: 0 AttribInfo
Filter: 3 (INFO | STATE)
Payload: 83 bytes
MAP
flags:
MAP_ENTRY (Action.UPDATE) :
flags:
Key: API_NEON_NI
Value:
FILTER_LIST
flags:
FILTER_ENTRY 2 (Action.SET) :
flags:
ELEMENT_LIST
ELEMENT_ENTRY ServiceState: 0
ELEMENT_ENTRY AcceptingRequests: 1
ELEMENT_ENTRY Status: OPEN, SUSPECT, NONE, ""
Source Directory Info Filter Entry
The Info filter entry (RDMSERVICE.Filter.INFO, RDMSERVICE.FilterId.INFO) conveys information that identifies a service and the content it can provide. This includes information about provided domain types (e.g. MarketPrice, MarketByOrder), available QoS, and the names of any dictionaries needed to parse the published content.
Source Directory Info Filter Entry contains the components below:
ELEMENT NAME | TYPE | REQUIRED | DEFAULT | RANGE/EXAMPLE | DESCRIPTION |
---|---|---|---|---|---|
Name | ASCII_STRING | Yes | n/a | e.g.IDN_RDF | Service name. This will match the concrete service name or the service group name that is in the OMMMap.KeyData. |
ServiceID | UINT | No | n/a | 0-65535 | ServiceID of the service. Allows OMM Consumer applications to know the ServiceID of a service. |
Vendor | ASCII_STRING | No | None | e.g.Refinitiv | Name of the vendor that provides the data for this service |
IsSource | UINT | No | 0 | 0,1 |
|
Capabilities | OMMArray of UINT | Yes | None | e.g. [5, 6] | List of MessageModelTypes which this service can provide. The UInt MesageModelType is extensible, using values defined in the RDM Usage Guide (1-255). For example, a list containing RDMMsgTypes.DICTIONARY(5) and RDMMsgTypes.MARKET_PRICE(6) indicates consumer can request dictionaries and market price data from this service. |
DictionariesProvided | OMMArray of ASCII_STRING | No | None | e.g. RWFFld | List of Dictionary names which this service can provide. A consumer may obtain these dictionaries by requesting them by name on the RDMMsgTypes.DICTIONARY MessageModelTypes. |
DictionariesUsed | OMMArray of ASCII_STRING | No (Yes if any are used.) | None | e.g.RWFFld,RWFEnum | List of Dictionary names that may be required to process all of the data from this service. Whether or not the dictionary is required depends on the needs of the consumer. For example, if the consumer application is not a display application, it might not need a Enumerated Types Dictionary. |
QoS | OMMArray of QoS | No | Real-time,TickByTick | e.g. Real-time, TickByTick | The available Qualities of Service
|
SupportsQoSRange | UINT | No | 0 | 0,1 | Can a consumer application request a QoS range with a best and worst QoS? While RFA applications can always request a QoS range,this value reflects if the range will be supported on the network or locally by RFA.
|
ItemList | ASCII_STRING | No | None | Name of SymbolList that includes all the items that publisher currently provides. If it is not present, this feature is not supported. The consumer requests this item via RDMMsgTypes.SYMBOL_LIST MessageModelTypes | |
SupportsOutOfBandSnapshots | UINT | No | 1 | 0,1 | Indicates whether Snapshot requests can be made even when the OpenLimit has been reached
|
AcceptingConsumerStatus | UINT | No | 1 | 0,1 | Indicates whether a service can accept and process messages related to Source Mirroring
|
The example of a Source Directory Info Filter Entry of a service is shown below:
FILTER_ENTRY 1 (Action.SET) :
flags:
ELEMENT_LIST
ELEMENT_ENTRY Name: API_NEON_NI
ELEMENT_ENTRY Vendor: Reuters
ELEMENT_ENTRY IsSource: 0
ELEMENT_ENTRY Capabilities:
ARRAY
ARRAY_ENTRY: 5
ARRAY_ENTRY: 6
ARRAY_ENTRY: 10
ELEMENT_ENTRY DictionariesProvided:
ARRAY
ARRAY_ENTRY: RWFFld
ARRAY_ENTRY: RWFEnum
ELEMENT_ENTRY DictionariesUsed:
ARRAY
ARRAY_ENTRY: RWFFld
ARRAY_ENTRY: RWFEnum
ELEMENT_ENTRY QoS:
ARRAY
ARRAY_ENTRY: (RT, TbT)
ELEMENT_ENTRY SupportsQoSRange: 0
ELEMENT_ENTRY ServiceID: 4844
Source Directory State Filter Entry
The State filter entry (RDMSERVICE.Filter.STATE, RDMSERVICE.FilterId.STATE) conveys information about the current state of a service. This information usually has some bearing on the availability of data from a service. If a service becomes temporarily unavailable or becomes available again, consumers are informed via Update messages to this category.
Source Directory State Filter Entry contains the component below:
ELEMENT NAME | TYPE | REQUIRED | DEFAULT | RANGE/EXAMPLE | DESCRIPTION |
---|---|---|---|---|---|
ServiceState | UINT | Yes | n/a | 0,1 |
Indicates whether the original provider of the data is available to respond to new requests. |
AcceptingRequests | UINT | No | 1 | 0,1 |
Indicates whether the immediate provider can accept new requests and/or handle reissue requests on already open streams. |
Status | STATE | No | Open (1),Ok (1),None (0), “” | e.g.,OMMState.Stream.OPEN, OMMState.Data.OK, OMMState.Code.NONE , “OK” |
Specifies a status change to apply to all items provided by this service. This status only applies to item streams that have received a Refresh or status of OPEN/OK. |
The example of a Source Directory State Filter Entry of a service is shown below. The service is up, ServiceState is 1:
FILTER_ENTRY 2 (Action.SET) :
flags:
ELEMENT_LIST
ELEMENT_ENTRY ServiceState: 1
ELEMENT_ENTRY AcceptingRequests: 1
Solution Code
We will start with DictionaryDemo application located in RFA Java package. The application demonstrates how to request source directory to get a list of dictionaries available for each service on the server. Then, download the dictionaries. We will modify the application to get the Services Info and their State for monitoring all services on the server and not to downloading dictionaries. Only the file named DirectoryClient.java in [RFA_Java_package]\Examples\com\reuters\rfa\example\omm\dictionary will be modified. The steps to modify the file are list below:
1. Declare a map named _serviceStateMap to keep service names and their state(up or down). Next, initialize the map in the constructor of DirectoryClient class:
public class DirectoryClient implements Client
{
...
//create a private HashMap to keep a service’s name as a key and its state as a value
private HashMap<String, Boolean> _serviceStateMap;
...
protected DirectoryClient(DictionaryDemo mainApp)
{
...
//initialize the map
_serviceStateMap = new HashMap<String, Boolean>();
}
...
}
2. Modify a source directory request to be streaming not snapshot. Hence, the application can get changes of services in Update messages.
private OMMMsg encodeSrcDirReqMsg(){
...
OMMMsg msg = pool.acquireMsg();
// This application need only a snapshot of directory.
msg.setMsgType(OMMMsg.MsgType.REQUEST);
msg.setMsgModelType(RDMMsgTypes.DIRECTORY);
//remove OMMMsg.Indication.NONSTREAMING to make streaming request
/* msg.setIndicationFlags(OMMMsg.Indication.REFRESH
| OMMMsg.Indication.NONSTREAMING);*/
msg.setIndicationFlags(OMMMsg.Indication.REFRESH);
...
}
3. Make the application accept Update messages because the application accepts only a Refresh message.
public void processEvent(Event event){
...
GenericOMMParser.parse(respMsg);
//add || respMsg.getMsgType() == OMMMsg.MsgType.UPDATE_RESP
//if (respMsg.getMsgType() == OMMMsg.MsgType.REFRESH_RESP)
if (respMsg.getMsgType() == OMMMsg.MsgType.REFRESH_RESP || respMsg.getMsgType() == OMMMsg.MsgType.UPDATE_RESP)
{
// comment out 3 lines below to remove them because there is no State: OPEN, OK, NONE, "" in any Update message
//1 if ((respMsg.getState().getDataState() == OMMState.Data.OK))
//2 {
...
if (respMsg.isSet(OMMMsg.Indication.REFRESH_COMPLETE))
{
...
}
//3 }
}
...
}
4. Update services’ state in _serviceStateMap according to a source directory response. The application updates _serviceMap according to the source directory response. Hence, you just modify the application to update _serviceStateMap instead.
public void processEvent(Event event){
...
else if (action == OMMMapEntry.Action.UPDATE)
{
//in Update action, check if the service is in _serviceStateMap because the application checks the service in _serviceMap keeping service’s name with its dictionary names
//If so, decode the entry to get the service’s state
//if (_serviceMap.containsKey(serviceName))
if (_serviceStateMap.containsKey(serviceName))
{
...
}
}
else if (action == OMMMapEntry.Action.DELETE)
{
//in Delete action, remove the deleted service in_serviceStateMap instead of _serviceMap and print the deleted service on the console
// _serviceMap.remove(serviceName);
_serviceStateMap.remove(serviceName);
System.out.println("## Monitor - "+"Service=" + serviceName + " has been deleted from the server");
}
//remove source code to process snapshot request because a streaming request is made.
/*if (respMsg.isSet(OMMMsg.Indication.REFRESH_COMPLETE))
{
// Receive all dictionary names from all source directory
// Notify application to download dictionary
_dirHandle = null; // Because the request is NON-Streaming
}*/
//add the line below to call reportServiceState() method
//the method prints state of each service in _serviceStateMap which has been updated according to the response message
reportServiceState();
...
}
5. Decode source directory response message to get state(UP or Down) of a service. This application already decodes and gets the service's state. Hence, you just modify the application to put the service name with its state in _serviceStateMap after the application decodes the state.
private void decodeServiceData(String serviceName, OMMMapEntry serviceData){
...
if ((int)value.getLongValue() == RDMService.State.UP)
{
stateUp = true;
//put the service name with UP state(true)
_serviceStateMap.put(serviceName, true);
}
else
//put the service name with DOWN state(false)
_serviceStateMap.put(serviceName, false);
...
}
6. Add a method named reportServiceState() which prints each service in _serviceStateMap with its state.
//the method prints each service in _serviceStateMap with its state.
public void reportServiceState() {
Iterator<Map.Entry<String, Boolean>> iter = _serviceStateMap.entrySet().iterator();
Map.Entry<String, Boolean> entry;
while (iter.hasNext())
{
entry = iter.next();
String serviceName = entry.getKey();
Boolean state = entry.getValue();
System.out.println("## Monitor - "+"Service=" + serviceName + " State="+(state?"Up":"Down"));
}
}
The complete DirectoryClient.java modified as explained above is available at Github. Please replace DirectoryClient.java in [RFA_Java_package]\Examples\com\reuters\rfa\example\omm\dictionary with this file. You can run the application using the following command line:
java -cp ..\Libs\rfa.jar;..\Examples com.reuters.rfa.example.omm.dictionary.DictionaryDemo -session myNamespace::consSession -user username
- -session is an OMM consumer session name in your RFA Java Configuration in the Registry of Windows machine or in the file preference on the Unix machine. For the steps to setup RFA Java Configuration, please refer to RFA Java Tutorials 2.
- -user is a user who is authorized to access all services on the server.
The example output below shows a server containing the service named API_NEON_NI and ELEKTRON_NEON and both services are available(Up):
Conclusion
If you are planning to develop an application to monitor the health of the services providing by a server(ADS - Advanced Distribution System), you can use this article as a guide line. RFA can be used to request source directory messages to get the services' state representing their health(Up or Down). Implementing an application to monitor the health of services based on RFA Java example application is easy and can be very useful.
References
For further details, please check out the following resources:
- RFA Java Tutorials on the Refinitiv Developer Community web site.
- RFA Java RDM Usage Guide(RFAJ_RDMUsageGuide.pdf) shipped with RFA Java package on the Refinitiv Developer Community web site.