EMA Consumer - Posting data to Contribution Channel
Download tutorial source code    Click here to download
Last update Feb 2023
Compilers JDK 1.8, JDK 1.11, Open JDK 11
Prerequisite Downloaded, installed, compiled, and ran an EMA consumer example


In this tutorial we will explain and use the EMA Java API, to contribute data to the Refinitiv Contribution Channel (RCC) directly, without using the Refinitiv Real-Time Distribution System. The reader must have an understanding of a Refinitiv Real-time SDK (EMA) and be familiar with consuming OMM Market Price data. You should also have a basic understanding of EMA Configuration. To understand the contribution concepts, please refer to this article.

This tutorial has been updated in 2023 to open the tunnel stream to the RCC server based on the RCC RSSL Login and Directory service status.

Contributing Data to Refinitiv Real-Time System

Currently, clients wishing to Contribute data to Refinitiv, typically do so using on-site contribution systems such as MarketLinkIP. To meet the evolving needs of the market, Refinitiv has developed a new Contribution service - the Refinitiv Contribution Channel (RCC). Contributions Channel is provided through a cluster of virtual servers that can provide data optimization & conflation capabilities as well as a much higher rate of content onto Refinitiv Real-Time System.

There are two key mechanisms through which an API developer can contribute data to the Refinitiv head-end. Once contributed, this data can be delivered to other customers who have been permissioned to view it. This service is commonly used by banks and funds to make their data available to the trading community.

The first option is to use and configure the Refinitiv Real-Time Distribution System, to take care of contribution specific connection and login details.

The second option, which is discussed in this tutorial, is for clients who do not have an onsite Refinitiv Real-Time Distribution System installation and will connect to RCC through the Internet. To contribute directly to RCC, an EMA application undertakes the following steps:

  1. Initiating Tunnel Stream
  2. Establish an encrypted RSSL connection to the server
  3. Register for OMM Login and Directory Domain streams
  4. Open a Tunnel stream to the server when the Directory refresh message shows that Contribution Service is Up
  5. Perform a client login via a Tunnel stream
  6. Contribute data using post messages

Let's look at the EMA Java code starting with application configuration. This tutorial source code consists of three Java files as follows:

Contributor.java file:

This Java file contains the Contributor class which is the main application logic class. It receives RCC connection credentials and contribution service information (service name, item name) and interacts with the other two classes to connect and post data.

AppClient.java file:

This Java file contains the AppClient class that implements the OmmConsumerClient interface for handling encrypted RSSL connection events (especially the OMM Login and Directory domain streams) between the API and RCC server.

TunnelStreamClient.java file :

This Java file contains the TunnelStreamClient class that manages all tunnel stream connections, events, and posting logic. The source code is based on the EMA sample: series400\ex440_System_TunStrm which ships with the SDK distribution, in the Examples directory.

Initiating Tunnel Stream

Let's start with initiating Tunnel Stream. Firstly, a Contribution class initiates the TunnelStreamClient class with the RCC credentials and information including RCC Username, RCC Password, RCC Contribution Service Name, and RCC Contribution Item Name.


// Contributor.java


// Create the TunnelStreamClient object to process RCC Tunnel Stream connection

TunnelStreamClient tunnelClient = new TunnelStreamClient(trceUser, trcePass, trceServiceName, trceItem);

Establish an encrypted connection to the server

EMA SDK undertakes all the work required to establish an encrypted connection. All an application developer needs to do is define the <Consumer>, set its <Channel> type to RSSL_ENCRYPTED and <EncryptedProtocolType> to RSSL_SOCKET, in the EMA configuration file EmaConfig.xml. The hostname and the port of the destination RCC server will have to be specified in the configuration as well.

Note that developers should be coding against the UAT Environment while developing applications.

The servers must be used with Refinitiv Real-Time SDK applications with the RSSL_ENCRYPTED connection type and RSSL_SOCKET encrypted protocol type. 

Note: In the future, RCC will not support the RSSL_HTTP encrypted protocol type. Please use the RSSL_SOCKET encrypted protocol type when connecting to the RCC server.



    <ChannelType value="ChannelType::RSSL_ENCRYPTED"/>

    <EncryptedProtocolType value="EncryptedProtocolType::RSSL_SOCKET"/>

    <Host value="RCC_HOST_NAME"/>

    <Port value="RCC_PORT"/>


Now, when an OMMConsumer is created with this configuration, EMA will start an encrypted connection. EMA Java uses the security and encryption framework provided by the Java SDK and uses the Sun JDK's default TLS. The Java standard way is to store the security certificate and private keys required for encryption, in a password-protected keystore file (Optionally since RTSDK 1.5.1.L1). This filename and password have to be specified when creating the OMMConsumer.

Note: Since Refinitiv Real-Time SDK Java 1.5.1.L1 (ETA Java 3.5.1.L1), specifying the Keystore file in the code is optional. The API can get certificates from the default Certificate Authority Keystore location in the Java directory.


// Contributor.java


// Create an OMM consumer

OmmConsumer consumer = EmaFactory.createOmmConsumer(EmaFactory.createOmmConsumerConfig())

Register for OMM Login and Directory Domain streams

Next, an application registers for the OMM Login and Directory domain streams for getting the contribution server service status.

Please be noticed that we pass the AppClient class to the registerClient method to handle the RSSL Login and Directory events generated by our interest.


// Contributor.java


//Register to get the login and source directory message

Login.LoginReq loginReq = EmaFactory.Domain.createLoginReq();

consumer.registerClient(loginReq.message(), appClient);

consumer.registerClient(EmaFactory.createReqMsg().domainType(EmaRdm.MMT_DIRECTORY).serviceName(trceServiceName), appClient);

Open a Tunnel stream

Once an application gets a Directory Refresh response message to confirm that the Contribution Service is UP, open the tunnel stream to the service which allows contribution.




public void processSourceDirectoryRefresh(RefreshMsg refreshMsg, OmmConsumerEvent event) {

    System.out.println("TunnelStreamClient processSourceDirectoryRefresh");


    if (refreshMsg.state().streamState() == OmmState.StreamState.OPEN && refreshMsg.state().dataState() == OmmState.DataState.OK && _tunnelStreamHandle == 0) {

        System.out.println("RSSL encrypted connection Login accepted and Directory is UP, starting Tunnel Stream...");






private void startTunnelStream() {

    System.out.println("TunnelStreamClient Start Tunnel Stream");

    try {

        ClassOfService cos = EmaFactory.createClassOfService()






        TunnelStreamRequest tsr = EmaFactory.createTunnelStreamRequest()






        _tunnelStreamHandle = _ommConsumer.registerClient(tsr, this);

    } catch (Exception excp) {




Here, we used the OMMConsumer defined earlier (which is connected to the RCC server by now), and open a tunnel stream to the contribution service using the system domain. The tunnel stream handle will be used in the next step to log in.

The code creates a Tunnel Stream request by providing an arbitrary name ('Tunnel') and the RCC service name. The Domain type of the tunnel stream is MMT_SYSTEM which is a system domain for content-neutral requests (i.e. non-data domains). Finally, the code registers the TunnelStreamClient class as the callback client to receive events related to the Tunnel Stream and store the Tunnel Stream handle into a variable.

For production applications, it is recommended to establish two simultaneous connections, to two different servers. An application should only contribute to a single stream at any given time. The second stream is for redundancy purposes, and should only be used if there are issues with the first one.

Perform a client login via a Tunnel stream

A successful opening of the tunnel stream is followed by TunnelStreamClient.onStatusMsg() a status event callback method with state OmmState.StreamState.OPEN. It compares the handle in the message to the handle of the tunnel stream. If the handles are equal and the stream state is open, the application will call the TunnelStreamClient.loginTunnelStream() method to send the tunnel login request message. The tunnel login request message has the following attributes:

  • The domain type of the request message is MMT_LOGIN
  • An RCC username is in the message key of the request message
  • An RCC password is in the element list and set in the attribute information of the request message. The password information is no longer required to be obfuscated before sending.
  • The Stream Id is the stream Id of the status message

It also registers this TunnelStreamClient instance as the callback client to receive events related to the tunnel login and stores a stream handle in a variable.





public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent event) {

    System.out.println("----- TunnelStreamClient Status message ----");



    //If this is a Tunnel Stream Status message and the Stream is UP, sends a OMM Login request message over this Tunnel Stream

    if (event.handle() == _tunnelStreamHandle && statusMsg.hasState() && statusMsg.state().streamState() == OmmState.StreamState.OPEN) {

        System.out.println("************* Login to RCC ******************** " + statusMsg.streamId());

        loginTunnelStream(statusMsg.streamId(), statusMsg.serviceId());




private void loginTunnelStream(int streamId, int serviceId) {


    System.out.println("TunnelStreamClient Login Tunnel Stream");


    // Create the OM Login Request Message

    ElementList elementList = EmaFactory.createElementList();

    elementList.add(EmaFactory.createElementEntry().ascii("Password", this.trcePass));


    ReqMsg rMsg = EmaFactory.createReqMsg()




            // .privateStream(true)




    System.out.println("Sending client login request...");

    // get events from login substream

    _subStreamHandle = _ommConsumer.registerClient(rMsg, this, 1, _tunnelStreamHandle);

    _subItemOpen = true;


If the login is accepted by the RCC server, an application receives a refresh callback: onRefreshMsg with state OmmState.StreamState.OPEN. At this point, the application should store the stream handle and is ready to contribute data.

The client's login response will include a *maximum message rate in the refresh response and this message rate must be obeyed.

The TRCE:MaxMessagesPerSecond element is present in all Contributions Channel environments and the message rate posted by the connecting application must not exceed this number.

The example message is shown below:


<genericMsg domainType="RSSL_DMT_SYSTEM" streamId="3" containerType="RSSL_DT_MSG" flags="0x19 (RSSL_GNMF_HAS_EXTENDED_HEADER|RSSL_GNMF_HAS_SEQ_NUM|RSSL_GNMF_MESSAGE_COMPLETE)" seqNum="1" dataSize="127">

    <extendedHeader data="0100"/>


<!-- rwfMajorVer="14" rwfMinorVer="1" -->

        <refreshMsg domainType="RSSL_DMT_LOGIN" streamId="1" containerType="RSSL_DT_NO_DATA" flags="0x648 (RSSL_RFMF_HAS_MSG_KEY|RSSL_RFMF_REFRESH_COMPLETE|RSSL_RFMF_DO_NOT_CACHE|RSSL_RFMF_PRIVATE_STREAM)" groupId="0" dataState="RSSL_DATA_OK" streamState="RSSL_STREAM_OPEN" code="RSSL_SC_NONE" text="Login accepted by host HOSTNAME via HANDLERNAME"  dataSize="0">

            <key  flags="0x23 (RSSL_MKF_HAS_SERVICE_ID|RSSL_MKF_HAS_NAME|RSSL_MKF_HAS_ATTRIB)"  serviceId="10" name="USERNAME" attribContainerType="RSSL_DT_ELEMENT_LIST">


                    <elementList flags="0x8 (RSSL_ELF_HAS_STANDARD_DATA)">

                        <elementEntry name="TRCE:MaxMessagesPerSecond" dataType="RSSL_DT_UINT" data="300"/>









Contribute data using post messages

To contribute, an application can use off-stream posting - i.e. send post messages on the login stream. The application begins by creating the field list for the FIDs it intends to contribute to; packs the field list into an update message, and sends this update message as a PostMessage payload. The following TunnelStreamClient.postMessage() method code snippet shows how to send an update message containing a BID (FID 22) and an ASK (FID 25).


// TunnelStreamClient.java



public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event) {

    System.out.println("----- TunnelStreamClient Refresh message ----");



    //If this is a Tunnel Stream Login Refresh Response message and the Stream is UP, sends a Post message

    if (refreshMsg.domainType() == EmaRdm.MMT_LOGIN && refreshMsg.state().streamState() == OmmState.StreamState.OPEN && refreshMsg.state().dataState() == OmmState.DataState.OK) {

        // 3. Login accepted, app can post data now

        System.out.println("Login accepted, starting posting...");


        _postStreamID = refreshMsg.streamId();


    } else {

        System.out.println("Stream not open");





*  Creates the OMM Post message for RCC and sends it over a Tunnel Stream


private void postMessage() {

    try {


    } catch (Exception e) {



    // populate the contributed FIDs and values

    FieldList nestedFieldList = EmaFactory.createFieldList();

    nestedFieldList.add(EmaFactory.createFieldEntry().real(22, _bid++, OmmReal.MagnitudeType.EXPONENT_NEG_1));

    nestedFieldList.add(EmaFactory.createFieldEntry().real(25, _ask++, OmmReal.MagnitudeType.EXPONENT_NEG_1));


    // create an update message for our item

    UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg()





    // post this market price message







            .payload(nestedUpdateMsg), _subStreamHandle);


The code creates a post message (PostMsg) with the following properties and values:

  • StreamId (the same as the Tunnel Login stream)
  • Domain Type of MarketPrice
  • Acknowledgment flag
  • Unique ID for this Post (PostId)
  • An update message in the payload of the post message to update an existing record
  • Post Item (RIC)
  • Data in the payload of the update message consisting of two fields - BID (22) and ASK (25)

Then, it submits the PostMsg using the handle of the Tunnel Login stream. Finally, it increments the Post IDs, BID value, and ASK value.


  1. The unique Post ID can be retrieved from the Ack response so the application can tie up the Ack to the Post and confirm that each Post has been acknowledged by the server
  2. RCC currently only allows applications to update the existing records on the server via an API. New records can be defined manually by an RCC administrator.

Here, the application is also requesting, that the contributions be acknowledged by the system, by setting solicitAck(true). Setting this flag will cause the application to receive acknowledgment via TunnelStreamClient.onAckMsg() callbacks. The callback acknowledgment message AckMsg, will contain attributes nackCode and text reason if the contribution is denied.

Below is a sample of the acknowledgment message. The value of Ack Id is 1 which matches the Post ID used in the first PostMsg:




    domain="MarketPrice Domain"




// TunnelStreamClient.java



public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent event) {

    System.out.println("----- Ack message ----");



    System.out.println("Continue posting...");

    //Receive ACK (Post success), continue Posting



From the code above, when the application receives an acknowledge message response, it checks the NACK code. If the NACK code is false, the application will send another post message.

Note that it is possible to receive a NACK response if the Post message is not accepted by the RCC server. For example, the application posts a message to an invalid RIC or updates fields that are not defined in the target RIC.

Below is an example of a NACK response when posting to an invalid RIC:




    domain="MarketPrice Domain"



    text="Symbol unknown"


That covers the RCC Posting tutorial with EMA Java API.


Refinitiv Contrition Channel (RCC) is the Real-Time contribution system that allows users to contribute data directly to the Refinitiv Real-Time network. Users can use Refinitiv Real-Time SDKs or WebSocket APIs to develop applications to connect and contribute real-time data to RCC. This tutorial focuses on how to use EMA C++ to establish a connection to RCC, open a tunnel stream, log in to RCC and send post messages to contribute data.

A few points are worth mentioning here:

  • The example contributes data to a single RIC by sending out the first post once it gets a valid Client Login Response and then sending subsequent posts when it receives acknowledgment messages back from the server. In reality, the application will most likely be contributing data to several RICs.
  • If the TRCE:MaxMessagesPerSecond element is present in the login response, the message rate posted by the application should not exceed this number.
  • The application can get the login stream closed message with the “System busy” string in the status text from the tunnel stream after sending a login request message to login to RCC, as shown below.

<statusMsg domainType="RSSL_DMT_LOGIN" streamId="5" containerType="RSSL_DT_NO_DATA" flags="0xA0 (RSSL_STMF_HAS_STATE|RSSL_STMF_PRIVATE_STREAM)" dataState="RSSL_DATA_SUSPECT" streamState="RSSL_STREAM_CLOSED" code="RSSL_SC_NO_RESOURCES" text="System busy"  dataSize="0">




This is not an error. It simply means that the server can't handle the request now. This is normal for a cloud application. In this case, the connection will be disconnected and then EMA will recover the connection. Therefore, the application needs to open a tunnel and perform a client login to connect and contribute data to RCC. Otherwise, the application can try to connect to other servers.

Tutorial Application Prerequisite

This tutorial application requires the following dependencies software and libraries.

  1. Oracle JDK 1.8, 1.11 & 1.17 or OpenJDK 1.8, 1.11 & 1.17.
  2. Apache Maven project management and comprehension tool.
  3. Internet connection.
  4. Access to the Refinitiv Contribution Channel (RCC).

Please contact your Refinitiv representative to help you to access the RCC account and services.

How to run the tutorial application

Firstly, you need to configure the RCC server endpoint and Dictionary files path in the EmaConfig.xml file.

Open the EmaConfig.xml file and input the RCC endpoint and Port:



    <Name value="Channel_1"/>

    <ChannelType value="ChannelType::RSSL_ENCRYPTED"/>


    <Host value="contrib1-....refinitiv.com"/>

    <Port value="443"/>


Then, modify the Field Dictionary nodes in the EmaConfig.xml file to the local RDMFieldDictionary and enumtype.def files on your machine.



    <Name value="Dictionary_2"/>

    <DictionaryType value="DictionaryType::FileDictionary"/>

    <!-- dictionary names are optional: defaulted to RDMFieldDictionary and enumtype.def -->

    <RdmFieldDictionaryFileName value="./RDMFieldDictionary"/>

    <EnumTypeDefFileName value="./enumtype.def"/>


Now we come to build and compile the project. Open the project folder in the command prompt and then build a project with the following command:

            mvn package

Once the project compilation is completed, run the following command to start the tutorial application:

            java -jar ./target/EMA_Java_RCC_Tutorial-1.0-jar-with-dependencies.jar -username <RCC User> -password <RCC Password> -service <RCC Service Name> -itemName <Post RIC name>

Run output example:


Contributing to Refinitiv Contributions Channel

Starting encrypted connection...

Dec 28, 2022 3:08:07 PM com.refinitiv.ema.access.ChannelCallbackClient reactorChannelEventCallback

INFO: loggerMsg

    ClientName: ChannelCallbackClient

    Severity: Info

    Text:    Received ChannelUp event on channel Channel_1

    Instance Name Consumer_1_1



----- Refresh message ----



    domain="Login Domain"



    state="Open / Ok / None / ''"

    itemGroup="00 00"



    Attrib dataType="ElementList"


            ElementEntry name="ApplicationId" dataType="Ascii" value="256"

            ElementEntry name="ApplicationName" dataType="Ascii" value="ema"






TunnelStreamClient processLoginStatus

Tunnel Stream Login accepted

----- Refresh message ----



    domain="Directory Domain"



    state="Open / Ok / None / ''"

    itemGroup="00 00"


    Payload dataType="Map"







TunnelStreamClient processSourceDirectoryRefresh

RSSL encrypted connection Login accepted and Directory is UP, starting Tunnel Stream...

TunnelStreamClient Start Tunnel Stream

----- TunnelStreamClient Status message ----



    domain="System Domain"


    state="Open / Ok / None / ''"






************* Login to RCC ******************** 6

TunnelStreamClient Login Tunnel Stream

Sending client login request...

----- TunnelStreamClient Refresh message ----



    domain="Login Domain"



    state="Open / Ok / None / 'Login accepted by host XXXXXXX'"

    itemGroup="00 00"



    Attrib dataType="ElementList"


            ElementEntry name="TRCE:MaxMessagesPerSecond" dataType="UInt" value="10000"





Login accepted, starting posting...

----- Ack message ----



    domain="MarketPrice Domain"




Continue posting...

----- Ack message ----



    domain="MarketPrice Domain"



Data which is contributed to the production RCC, can be viewed by subscribing to that RIC in either desktop applications or any other consumer applications, like EMA Consumer sample.



Enable Logging

EMA SDK uses Java logging package for logging the API messages. The detail, format and destination of the logger messages can be controlled by configuring the Java logger properties.

Sample logging.properties file for storing log files in /log subdirectory:




handlers=java.util.logging.FileHandler, java.util.logging.ConsoleHandler








# Write 100000 bytes before rotating this file



# Number of rotating files to be used



# Format timestamp as date/time with millisecond

java.util.logging.SimpleFormatter.format=%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s %2$s %n%5$s

EMA Java logging can be enabled, by passing above properties file to JVM at run time. In addition, it might be helpful to turn on the JDK network logging, to view the security and encryption handshake messages, between the API and the server. Following command line parameters to JVM will enable verbose network and EMA logs.


set LOGGING_CONFIG=-Djava.util.logging.config.file=logs\logging.properties

set NET_DEBUGGING=-Djavax.net.debug=all



            java -Djava.util.logging.config.file=./resources/logging.properties -jar ./target/EMA_Java_RCC_Tutorial-1.0-jar-with-dependencies.jar -username <RCC User> -password <RCC Password> -service <RCC Service Name> -itemName <Post RIC name>

Additional Resources

If you have any further questions I recommend you post them on our Developer Forum or contact our Data Helpdesk

Existing Tutorials mentioned above:

For any questions related to this article or the RTSDK page, please use the Developer Community Q&A Forum.