Download tutorial source code | Click here to download |
---|---|
Last update | Feb 2023 |
Compilers | JDK 1.8, JDK 1.11, Open JDK 11 |
Prerequisite | Downloaded, installed, compiled, and ran an EMA consumer example |
In this tutorial we will explain and use the EMA Java API, to contribute data to the Contribution Channel (RCC) directly, without using the Refnitiv Real-Time Distribution System. The reader must have an understanding of a Real-time SDK (EMA) and be familiar with consuming OMM Market Price data. You should also have a basic understanding of EMA Configuration. To understand the contribution concepts, please refer to this article.
This tutorial has been updated in 2023 to open the tunnel stream to the RCC server based on the RCC RSSL Login and Directory service status.
Currently, clients wishing to Contribute data to the Real-Time Platform, typically do so using on-site contribution systems such as MarketLinkIP. To meet the evolving needs of the market, we has developed a new Contribution service - the Contribution Channel (RCC). Contributions Channel is provided through a cluster of virtual servers that can provide data optimization & conflation capabilities as well as a much higher rate of content onto the Real-Time Platform.
There are two key mechanisms through which an API developer can contribute data to the the Real-Time platform head-end. Once contributed, this data can be delivered to other customers who have been permissioned to view it. This service is commonly used by banks and funds to make their data available to the trading community.
The first option is to use and configure the Real-Time Distribution System, to take care of contribution specific connection and login details.
The second option, which is discussed in this tutorial, is for clients who do not have an onsite Real-Time Distribution System installation and will connect to RCC through the Internet. To contribute directly to RCC, an EMA application undertakes the following steps:
Let's look at the EMA Java code starting with application configuration. This tutorial source code consists of three Java files as follows:
Contributor.java file:
This Java file contains the Contributor class which is the main application logic class. It receives RCC connection credentials and contribution service information (service name, item name) and interacts with the other two classes to connect and post data.
AppClient.java file:
This Java file contains the AppClient class that implements the OmmConsumerClient interface for handling encrypted RSSL connection events (especially the OMM Login and Directory domain streams) between the API and RCC server.
TunnelStreamClient.java file :
This Java file contains the TunnelStreamClient class that manages all tunnel stream connections, events, and posting logic. The source code is based on the EMA sample: series400\ex440_System_TunStrm which ships with the SDK distribution, in the Examples directory.
Let's start with initiating Tunnel Stream. Firstly, a Contribution class initiates the TunnelStreamClient class with the RCC credentials and information including RCC Username, RCC Password, RCC Contribution Service Name, and RCC Contribution Item Name.
// Contributor.java
// Create the TunnelStreamClient object to process RCC Tunnel Stream connection
TunnelStreamClient tunnelClient = new TunnelStreamClient(trceUser, trcePass, trceServiceName, trceItem);
EMA SDK undertakes all the work required to establish an encrypted connection. All an application developer needs to do is define the <Consumer>, set its <Channel> type to RSSL_ENCRYPTED and <EncryptedProtocolType> to RSSL_SOCKET, in the EMA configuration file EmaConfig.xml. The hostname and the port of the destination RCC server will have to be specified in the configuration as well.
Note that developers should be coding against the UAT Environment while developing applications.
The servers must be used with Real-Time SDK applications with the RSSL_ENCRYPTED connection type and RSSL_SOCKET encrypted protocol type.
Note: In the future, RCC will not support the RSSL_HTTP encrypted protocol type. Please use the RSSL_SOCKET encrypted protocol type when connecting to the RCC server.
<Channel>
<ChannelType value="ChannelType::RSSL_ENCRYPTED"/>
<EncryptedProtocolType value="EncryptedProtocolType::RSSL_SOCKET"/>
<Host value="RCC_HOST_NAME"/>
<Port value="RCC_PORT"/>
</Channel>
Now, when an OMMConsumer is created with this configuration, EMA will start an encrypted connection. EMA Java uses the security and encryption framework provided by the Java SDK and uses the Sun JDK's default TLS. The Java standard way is to store the security certificate and private keys required for encryption, in a password-protected keystore file (Optionally since RTSDK 1.5.1.L1). This filename and password have to be specified when creating the OMMConsumer.
Note: Since Real-Time SDK Java 1.5.1.L1 (ETA Java 3.5.1.L1), specifying the Keystore file in the code is optional. The API can get certificates from the default Certificate Authority Keystore location in the Java directory.
// Contributor.java
// Create an OMM consumer
OmmConsumer consumer = EmaFactory.createOmmConsumer(EmaFactory.createOmmConsumerConfig())
Next, an application registers for the OMM Login and Directory domain streams for getting the contribution server service status.
Please be noticed that we pass the AppClient class to the registerClient method to handle the RSSL Login and Directory events generated by our interest.
// Contributor.java
//Register to get the login and source directory message
Login.LoginReq loginReq = EmaFactory.Domain.createLoginReq();
consumer.registerClient(loginReq.message(), appClient);
consumer.registerClient(EmaFactory.createReqMsg().domainType(EmaRdm.MMT_DIRECTORY).serviceName(trceServiceName), appClient);
//TunnelStreamClient.java
public void processSourceDirectoryRefresh(RefreshMsg refreshMsg, OmmConsumerEvent event) {
System.out.println("TunnelStreamClient processSourceDirectoryRefresh");
if (refreshMsg.state().streamState() == OmmState.StreamState.OPEN && refreshMsg.state().dataState() == OmmState.DataState.OK && _tunnelStreamHandle == 0) {
System.out.println("RSSL encrypted connection Login accepted and Directory is UP, starting Tunnel Stream...");
startTunnelStream();
}
}
private void startTunnelStream() {
System.out.println("TunnelStreamClient Start Tunnel Stream");
try {
ClassOfService cos = EmaFactory.createClassOfService()
.authentication(EmaFactory.createCosAuthentication().type(CosAuthentication.CosAuthenticationType.NOT_REQUIRED))
.dataIntegrity(EmaFactory.createCosDataIntegrity().type(CosDataIntegrity.CosDataIntegrityType.RELIABLE))
.flowControl(EmaFactory.createCosFlowControl().type(CosFlowControl.CosFlowControlType.BIDIRECTIONAL).recvWindowSize(1200))
.guarantee(EmaFactory.createCosGuarantee().type(CosGuarantee.CosGuaranteeType.NONE));
TunnelStreamRequest tsr = EmaFactory.createTunnelStreamRequest()
.classOfService(cos)
.domainType(EmaRdm.MMT_SYSTEM)
.name("TUNNEL1")
.serviceName(this.trceServiceName);
_tunnelStreamHandle = _ommConsumer.registerClient(tsr, this);
} catch (Exception excp) {
System.out.println(excp.getMessage());
}
}
Here, we used the OMMConsumer defined earlier (which is connected to the RCC server by now), and open a tunnel stream to the contribution service using the system domain. The tunnel stream handle will be used in the next step to log in.
The code creates a Tunnel Stream request by providing an arbitrary name ('Tunnel') and the RCC service name. The Domain type of the tunnel stream is MMT_SYSTEM which is a system domain for content-neutral requests (i.e. non-data domains). Finally, the code registers the TunnelStreamClient class as the callback client to receive events related to the Tunnel Stream and store the Tunnel Stream handle into a variable.
For production applications, it is recommended to establish two simultaneous connections, to two different servers. An application should only contribute to a single stream at any given time. The second stream is for redundancy purposes, and should only be used if there are issues with the first one.
A successful opening of the tunnel stream is followed by TunnelStreamClient.onStatusMsg() a status event callback method with state OmmState.StreamState.OPEN. It compares the handle in the message to the handle of the tunnel stream. If the handles are equal and the stream state is open, the application will call the TunnelStreamClient.loginTunnelStream() method to send the tunnel login request message. The tunnel login request message has the following attributes:
It also registers this TunnelStreamClient instance as the callback client to receive events related to the tunnel login and stores a stream handle in a variable.
//TunnelStreamClient.java
@Override
public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event) {
System.out.println("----- TunnelStreamClient Status message ----");
System.out.println(statusMsg);
//If this is a Tunnel Stream Status message and the Stream is UP, sends a OMM Login request message over this Tunnel Stream
if (event.handle() == _tunnelStreamHandle && statusMsg.hasState() && statusMsg.state().streamState() == OmmState.StreamState.OPEN) {
System.out.println("************* Login to RCC ******************** " + statusMsg.streamId());
loginTunnelStream(statusMsg.streamId(), statusMsg.serviceId());
}
}
private void loginTunnelStream(int streamId, int serviceId) {
System.out.println("TunnelStreamClient Login Tunnel Stream");
// Create the OM Login Request Message
ElementList elementList = EmaFactory.createElementList();
elementList.add(EmaFactory.createElementEntry().ascii("Password", this.trcePass));
ReqMsg rMsg = EmaFactory.createReqMsg()
.domainType(EmaRdm.MMT_LOGIN)
.name(this.trceUser)
.attrib(elementList)
// .privateStream(true)
.serviceId(serviceId)
.streamId(streamId);
System.out.println("Sending client login request...");
// get events from login substream
_subStreamHandle = _ommConsumer.registerClient(rMsg, this, 1, _tunnelStreamHandle);
_subItemOpen = true;
}
If the login is accepted by the RCC server, an application receives a refresh callback: onRefreshMsg with state OmmState.StreamState.OPEN. At this point, the application should store the stream handle and is ready to contribute data.
The client's login response will include a *maximum message rate in the refresh response and this message rate must be obeyed.
The TRCE:MaxMessagesPerSecond element is present in all Contributions Channel environments and the message rate posted by the connecting application must not exceed this number.
The example message is shown below:
<genericMsg domainType="RSSL_DMT_SYSTEM" streamId="3" containerType="RSSL_DT_MSG" flags="0x19 (RSSL_GNMF_HAS_EXTENDED_HEADER|RSSL_GNMF_HAS_SEQ_NUM|RSSL_GNMF_MESSAGE_COMPLETE)" seqNum="1" dataSize="127">
<extendedHeader data="0100"/>
<dataBody>
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<refreshMsg domainType="RSSL_DMT_LOGIN" streamId="1" containerType="RSSL_DT_NO_DATA" flags="0x648 (RSSL_RFMF_HAS_MSG_KEY|RSSL_RFMF_REFRESH_COMPLETE|RSSL_RFMF_DO_NOT_CACHE|RSSL_RFMF_PRIVATE_STREAM)" groupId="0" dataState="RSSL_DATA_OK" streamState="RSSL_STREAM_OPEN" code="RSSL_SC_NONE" text="Login accepted by host HOSTNAME via HANDLERNAME" dataSize="0">
<key flags="0x23 (RSSL_MKF_HAS_SERVICE_ID|RSSL_MKF_HAS_NAME|RSSL_MKF_HAS_ATTRIB)" serviceId="10" name="USERNAME" attribContainerType="RSSL_DT_ELEMENT_LIST">
<attrib>
<elementList flags="0x8 (RSSL_ELF_HAS_STANDARD_DATA)">
<elementEntry name="TRCE:MaxMessagesPerSecond" dataType="RSSL_DT_UINT" data="300"/>
</elementList>
</attrib>
</key>
<dataBody>
</dataBody>
</refreshMsg>
</dataBody>
</genericMsg>
To contribute, an application can use off-stream posting - i.e. send post messages on the login stream. The application begins by creating the field list for the FIDs it intends to contribute to; packs the field list into an update message, and sends this update message as a PostMessage payload. The following TunnelStreamClient.postMessage() method code snippet shows how to send an update message containing a BID (FID 22) and an ASK (FID 25).
// TunnelStreamClient.java
@Override
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) {
System.out.println("----- TunnelStreamClient Refresh message ----");
System.out.println(refreshMsg);
//If this is a Tunnel Stream Login Refresh Response message and the Stream is UP, sends a Post message
if (refreshMsg.domainType() == EmaRdm.MMT_LOGIN && refreshMsg.state().streamState() == OmmState.StreamState.OPEN && refreshMsg.state().dataState() == OmmState.DataState.OK) {
// 3. Login accepted, app can post data now
System.out.println("Login accepted, starting posting...");
_postStreamID = refreshMsg.streamId();
postMessage();
} else {
System.out.println("Stream not open");
}
}
/**
* Creates the OMM Post message for RCC and sends it over a Tunnel Stream
*/
private void postMessage() {
try {
Thread.sleep(300);
} catch (Exception e) {
}
// populate the contributed FIDs and values
FieldList nestedFieldList = EmaFactory.createFieldList();
nestedFieldList.add(EmaFactory.createFieldEntry().real(22, _bid++, OmmReal.MagnitudeType.EXPONENT_NEG_1));
nestedFieldList.add(EmaFactory.createFieldEntry().real(25, _ask++, OmmReal.MagnitudeType.EXPONENT_NEG_1));
// create an update message for our item
UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg()
.streamId(_postStreamID)
.name(trceItem)
.payload(nestedFieldList);
// post this market price message
_ommConsumer.submit(EmaFactory.createPostMsg()
.streamId(_postStreamID)
.postId(_postID++)
.domainType(EmaRdm.MMT_MARKET_PRICE)
.solicitAck(true)
.complete(true)
.payload(nestedUpdateMsg), _subStreamHandle);
}
The code creates a post message (PostMsg) with the following properties and values:
Then, it submits the PostMsg using the handle of the Tunnel Login stream. Finally, it increments the Post IDs, BID value, and ASK value.
Note:
Here, the application is also requesting, that the contributions be acknowledged by the system, by setting solicitAck(true). Setting this flag will cause the application to receive acknowledgment via TunnelStreamClient.onAckMsg() callbacks. The callback acknowledgment message AckMsg, will contain attributes nackCode and text reason if the contribution is denied.
Below is a sample of the acknowledgment message. The value of Ack Id is 1 which matches the Post ID used in the first PostMsg:
AckMsg
streamId="6"
domain="MarketPrice Domain"
ackId="1"
AckMsgEnd
// TunnelStreamClient.java
@Override
public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent event) {
System.out.println("----- Ack message ----");
System.out.println(ackMsg);
System.out.println("Continue posting...");
//Receive ACK (Post success), continue Posting
postMessage();
}
From the code above, when the application receives an acknowledge message response, it checks the NACK code. If the NACK code is false, the application will send another post message.
Note that it is possible to receive a NACK response if the Post message is not accepted by the RCC server. For example, the application posts a message to an invalid RIC or updates fields that are not defined in the target RIC.
Below is an example of a NACK response when posting to an invalid RIC:
AckMsg
streamId="5"
domain="MarketPrice Domain"
ackId="1"
nackCode="SymbolUnknown"
text="Symbol unknown"
AckMsgEnd
That covers the RCC Posting tutorial with EMA Java API.
The Contrition Channel (RCC) is the Real-Time contribution system that allows users to contribute data directly to the Real-Time platform network. Users can use Real-Time SDKs or WebSocket APIs to develop applications to connect and contribute real-time data to RCC. This tutorial focuses on how to use EMA C++ to establish a connection to RCC, open a tunnel stream, log in to RCC and send post messages to contribute data.
A few points are worth mentioning here:
<statusMsg domainType="RSSL_DMT_LOGIN" streamId="5" containerType="RSSL_DT_NO_DATA" flags="0xA0 (RSSL_STMF_HAS_STATE|RSSL_STMF_PRIVATE_STREAM)" dataState="RSSL_DATA_SUSPECT" streamState="RSSL_STREAM_CLOSED" code="RSSL_SC_NO_RESOURCES" text="System busy" dataSize="0">
<dataBody>
</dataBody>
</statusMsg>
This is not an error. It simply means that the server can't handle the request now. This is normal for a cloud application. In this case, the connection will be disconnected and then EMA will recover the connection. Therefore, the application needs to open a tunnel and perform a client login to connect and contribute data to RCC. Otherwise, the application can try to connect to other servers.
This tutorial application requires the following dependencies software and libraries.
Please contact your LSEG representative to help you to access the RCC account and services.
Firstly, you need to configure the RCC server endpoint and Dictionary files path in the EmaConfig.xml file.
Open the EmaConfig.xml file and input the RCC endpoint and Port:
<Channel>
<Name value="Channel_1"/>
<ChannelType value="ChannelType::RSSL_ENCRYPTED"/>
...
<Host value="contrib1-....refinitiv.com"/>
<Port value="443"/>
</Channel>
Then, modify the Field Dictionary nodes in the EmaConfig.xml file to the local RDMFieldDictionary and enumtype.def files on your machine.
<Dictionary>
<Name value="Dictionary_2"/>
<DictionaryType value="DictionaryType::FileDictionary"/>
<!-- dictionary names are optional: defaulted to RDMFieldDictionary and enumtype.def -->
<RdmFieldDictionaryFileName value="./RDMFieldDictionary"/>
<EnumTypeDefFileName value="./enumtype.def"/>
</Dictionary>
Now we come to build and compile the project. Open the project folder in the command prompt and then build a project with the following command:
mvn package
Once the project compilation is completed, run the following command to start the tutorial application:
java -jar ./target/EMA_Java_RCC_Tutorial-1.0-jar-with-dependencies.jar -username <RCC User> -password <RCC Password> -service <RCC Service Name> -itemName <Post RIC name>
Run output example:
Contributing to Refinitiv Contributions Channel
Starting encrypted connection...
Dec 28, 2022 3:08:07 PM com.refinitiv.ema.access.ChannelCallbackClient reactorChannelEventCallback
INFO: loggerMsg
ClientName: ChannelCallbackClient
Severity: Info
Text: Received ChannelUp event on channel Channel_1
Instance Name Consumer_1_1
...
loggerMsgEnd
----- Refresh message ----
RefreshMsg
streamId="1"
domain="Login Domain"
solicited
RefreshComplete
state="Open / Ok / None / ''"
itemGroup="00 00"
name="U8004042"
nameType="1"
Attrib dataType="ElementList"
ElementList
ElementEntry name="ApplicationId" dataType="Ascii" value="256"
ElementEntry name="ApplicationName" dataType="Ascii" value="ema"
...
ElementListEnd
AttribEnd
RefreshMsgEnd
TunnelStreamClient processLoginStatus
Tunnel Stream Login accepted
----- Refresh message ----
RefreshMsg
streamId="5"
domain="Directory Domain"
solicited
RefreshComplete
state="Open / Ok / None / ''"
itemGroup="00 00"
filter="0"
Payload dataType="Map"
Map
...
MapEnd
PayloadEnd
RefreshMsgEnd
TunnelStreamClient processSourceDirectoryRefresh
RSSL encrypted connection Login accepted and Directory is UP, starting Tunnel Stream...
TunnelStreamClient Start Tunnel Stream
----- TunnelStreamClient Status message ----
StatusMsg
streamId="6"
domain="System Domain"
privateStream
state="Open / Ok / None / ''"
name="TUNNEL1"
serviceId="10"
serviceName="DDS_TRCE"
StatusMsgEnd
************* Login to RCC ******************** 6
TunnelStreamClient Login Tunnel Stream
Sending client login request...
----- TunnelStreamClient Refresh message ----
RefreshMsg
streamId="6"
domain="Login Domain"
RefreshComplete
privateStream
state="Open / Ok / None / 'Login accepted by host XXXXXXX'"
itemGroup="00 00"
name="GEM-SG1-33751"
serviceId="10"
Attrib dataType="ElementList"
ElementList
ElementEntry name="TRCE:MaxMessagesPerSecond" dataType="UInt" value="10000"
ElementListEnd
AttribEnd
RefreshMsgEnd
Login accepted, starting posting...
----- Ack message ----
AckMsg
streamId="6"
domain="MarketPrice Domain"
ackId="1"
AckMsgEnd
Continue posting...
----- Ack message ----
AckMsg
streamId="6"
domain="MarketPrice Domain"
ackId="2"
AckMsgEnd
Data which is contributed to the production RCC, can be viewed by subscribing to that RIC in either desktop applications or any other consumer applications, like EMA Consumer sample.
EMA SDK uses Java logging package for logging the API messages. The detail, format and destination of the logger messages can be controlled by configuring the Java logger properties.
Sample logging.properties file for storing log files in /log subdirectory:
.level=FINEST
handlers=java.util.logging.FileHandler, java.util.logging.ConsoleHandler
java.util.logging.ConsoleHandler.level=FINEST
java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter
java.util.logging.FileHandler.level=FINEST
java.util.logging.FileHandler.pattern=./logs/emaj%u.log
# Write 100000 bytes before rotating this file
java.util.logging.FileHandler.limit=50000000
# Number of rotating files to be used
java.util.logging.FileHandler.count=20
java.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatter
# Format timestamp as date/time with millisecond
java.util.logging.SimpleFormatter.format=%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s %2$s %n%5$s
EMA Java logging can be enabled, by passing above properties file to JVM at run time. In addition, it might be helpful to turn on the JDK network logging, to view the security and encryption handshake messages, between the API and the server. Following command line parameters to JVM will enable verbose network and EMA logs.
set LOGGING_CONFIG=-Djava.util.logging.config.file=logs\logging.properties
set NET_DEBUGGING=-Djavax.net.debug=all
java -cp %CLASSPATH% %LOGGING_CONFIG% %NET_DEBUGGING% Contributor
Or
java -Djava.util.logging.config.file=./resources/logging.properties -jar ./target/EMA_Java_RCC_Tutorial-1.0-jar-with-dependencies.jar -username <RCC User> -password <RCC Password> -service <RCC Service Name> -itemName <Post RIC name>
If you have any further questions I recommend you post them on our Developer Forum or contact our Data Helpdesk
Existing Tutorials mentioned above:
For any questions related to this article or the RTSDK page, please use the Developer Community Q&A Forum.