EMA Consumer - Posting data to Contribution Channel

Download tutorial source code

Click here to download

Last update Aug 2024
Environment Windows, Linux
Compilers Refer to the EMA Compiler Guides for a complete list
Prerequisite Downloaded, installed, compiled, and ran an EMA consumer example

Posting Directly to Contribution Channel

Knowledge Prerequisite – Must have an understanding of an RTSDK API and be familiar with consuming OMM Market Price data. You should also have a basic understanding of EMA Configuration  (see the end of this article for links to relevant tutorials)

Contributing Data to a Data Feed

Currently, clients can contribute data to the contribution system called Contribution Channel. The Contributions Channel is provided through a cluster of virtual servers that can provide data optimization & conflation capabilities as well as a much higher rate of content onto Real-Time.

To contribute directly to Contribution Channel, you can develop an OMM Consumer application using one of our RTSDKs, i.e. Enterprise Transport API (ETA), Enterprise Message API (EMA), or WebSocket API. Now, RTSDK supports both Windows and Linux platforms.

This article focuses on the application flows used to develop an EMA application to connect and contribute data to Contribution Channel (RCC).

  1. Establish an Encrypted Connection to the server
  2. Open a Tunnel Stream
  3. Perform a Client Login
  4. Contribute data using Post Messages

Established Encrypted Connection to Contribution Channel server

As with any OMM Consumer application, the first thing to do is establishing a connection to the Contribution Channel server. 

    	
            

int main(int argc, char* argv[])

{

       try {            

             AppClient client;            

             const EmaString RCC_SERVICE = "<RCC Service>";

             const EmaString RCC_USERNAME = "<RCC Username>";

             const EmaString RCC_PASSWORD = "<RCC Password>";

             const EmaString RCC_POST_RIC = "<Post Item>";

             TunnelStreamClient tunnelClient(RCC_SERVICE, RCC_USERNAME, RCC_PASSWORD, RCC_POST_RIC);            

             client.setTunnelStreamClient(tunnelClient);            

             OmmConsumer consumer(OmmConsumerConfig().username("rmds"));            

             tunnelClient.setOmmConsumer(consumer);  

            

             UInt64 loginHandle = consumer.registerClient(ReqMsg().domainType(MMT_LOGIN), client);

             UInt64 directoryHandle = consumer.registerClient(ReqMsg().domainType(MMT_DIRECTORY).serviceName(RCC_SERVICE), client);

            

             while(true)

                    sleep(600000); 

       catch (const OmmException& excp) {

             cout << excp << endl;

       }

       return 0;

}

The code loads the default configurations, attempts to establish a connection, and performs a login with the provided username. It gets the RCC service name, RCC username, RCC password, and post RIC from variables so the values of these variables must be updated.

It also creates instances of callback client classes (AppClient and TunnelStreamClient) and registers the AppClient to receive login and source directory-related messages.     

The connection settings are in the EMA configuration file (EMAConfig.xml). It uses the connection settings from the Consumer_1 and Channel_1 configurations. 

    	
            

<ConsumerGroup>

    <DefaultConsumer value="Consumer_1"/>

    <ConsumerList>

        <Consumer>

            <Name value="Consumer_1"/>

            <Channel value="Channel_1"/>

...

...

        </Consumer>

    </ConsumerList>

    </ConsumerGroup>

<ChannelGroup>

...

...

<ChannelGroup>

    <ChannelList>

        <Channel>

            <Name value="Channel_1"/>

            <ChannelType value="ChannelType::RSSL_ENCRYPTED"/>

            <Host value="<RCC_Hostname>"/>

            <Port value="443"/>

            <EncryptedProtocolType value="EncryptedProtocolType::RSSL_SOCKET"/>

...

...

        </Channel>

</ChannelList>

</ChannelGroup>

The Channel_1 uses the RSSL_ENCRYPTED connection type and RSSL_SOCKET encrypted protocol type which requires the OpenSSL library. The hostname for the RCC server in the EMAConfig.xml configuration file must be specified. Please see chapter 4.3.3: Encrypted Connections in the EMA Developer's Guide for more information.

Note that developers should be coding against the UAT Environment while developing applications.

The servers must be used with Real-Time SDK applications with the RSSL_ENCRYPTED connection type and RSSL_SOCKET encrypted protocol type. 

Note: In the future, RCC will not support the RSSL_HTTP encrypted protocol type. Please use the RSSL_SOCKET encrypted protocol type when connecting to RCC.

Open a Tunnel Stream

The application opens a tunnel stream when it receives the login and source directory response. The code is in the TunnelStreamClient::startTunnelStream method.

    	
            

void TunnelStreamClient::startTunnelStream() {

            cout << "Start Tunnel Stream" << endl;

            CosAuthentication cosAuthentication;

            cosAuthentication.type(CosAuthentication::NotRequiredEnum);

 

            CosDataIntegrity cosDataIntegrity;

            cosDataIntegrity.type(CosDataIntegrity::ReliableEnum);

 

            CosFlowControl cosFlowControl;

cosFlowControl.type(CosFlowControl::BidirectionalEnum).recvWindowSize(12288).sendWindowSize(1200);

 

            CosGuarantee cosGuarantee;

            cosGuarantee.type(CosGuarantee::NoneEnum);

 

            ClassOfService cos;

cos.authentication(cosAuthentication).dataIntegrity(cosDataIntegrity).flowControl(cosFlowControl).guarantee(cosGuarantee);

 

            TunnelStreamRequest tsr;

tsr.classOfService(cos).domainType(MMT_SYSTEM).name("Tunnel").serviceName(_serviceName);

 

            _tunnelStreamHandle = _pOmmConsumer->registerClient(tsr, *this);

}

A Tunnel Stream is a private stream that supports additional functionalities such as authentication, flow control, guaranteed delivery etc - which can be specified on a per-stream basis.

The code creates a Tunnel Stream request by providing an arbitrary name ('Tunnel') and the RCC service name. The Domain type of the tunnel stream is MMT_SYSTEM which is a system domain for content-neutral requests (i.e. non-data domains). Finally, the code registers the TunnelStreamClient as the callback client to receive events related to the Tunnel Stream and store the Tunnel Stream handle into a variable.

When the Tunnel Stream has been established, the application will get a status message with a domain type of MMT_SYSTEM and stream state of Open, as shown below.

    	
            

AppClient::OnStatusMsg :: StatusMsg

    streamId="5"

    domain="System Domain"

    PrivateStream

    ClearCache

    state="Open / Ok / None / ''"

    name="Seikan"

    serviceId="10"

    serviceName="DDS_TRCE"

StatusMsgEnd

 

Perform a Client Login

After the tunnel stream is opened, the application sends a login request message with a RCC username and a password to the RCC server through the tunnel stream. The code that sends a login request is in the TunnelStreamClient::loginTunnelStream method.

    	
            

void TunnelStreamClient::onStatusMsg(const StatusMsg& statusMsg, const OmmConsumerEvent& event)

{

       cout << "TunnelStreamClient onStatusMsg" << endl;

       cout << statusMsg << endl;

       if (event.getHandle() == _tunnelStreamHandle &&

                    statusMsg.hasState() &&

                    statusMsg.getState().getStreamState() == OmmState::OpenEnum)

             {

                    cout << "************* Login to RCC ********************" << statusMsg.getStreamId() << endl;

                    loginTunnelStream(statusMsg.getStreamId());

             }

}

 

 

void TunnelStreamClient::loginTunnelStream(refinitiv::ema::access::Int32 streamId) {

      

       ElementList elmList;

       elmList.addAscii("Password", _rccPassword).complete();

 

       ReqMsg reqMsg;

 

       reqMsg.domainType(MMT_LOGIN)

             .name(_rccUserName)

             .attrib(elmList)

             .streamId(streamId);

       cout << endl << "Submit GenericMsg from onStatus" << endl;

 

       _subStreamHandle = _pOmmConsumer->registerClient(reqMsg, *this, (void*)1, _tunnelStreamHandle);

}

The code gets the status message of the tunnel stream via the TunnelStreamClient::onStatusMsg method. It compares the handle in the message to the handle of the tunnel stream. If the handles are equal and the stream state is open, the application will call the TunnelStreamClient::loginTunnelStream method to send the tunnel login request message. The tunnel login request message has the following attributes:

  • The domain type of the request message is MMT_LOGIN 
  • An RCC username is in the message key of the request message
  • An RCC password is in the element list and set in the attribute information of the request message
  • The Stream Id is the stream Id of the status message

It also registers this TunnelStreamClient instance as the callback client to receive events related to the tunnel login and stores a stream handle in a variable.

If the credential is valid, the application will receive a login refresh message from RCC through the tunnel stream and the login state is Open/OK.  The login refresh message will include a maximum message rate and this message rate must be used to limit the number of messages sent by the application.  This message looks like:

    	
            

<genericMsg domainType="RSSL_DMT_SYSTEM" streamId="3" containerType="RSSL_DT_MSG" flags="0x19 (RSSL_GNMF_HAS_EXTENDED_HEADER|RSSL_GNMF_HAS_SEQ_NUM|RSSL_GNMF_MESSAGE_COMPLETE)" seqNum="1" dataSize="127">

    <extendedHeader data="0100"/>

    <dataBody>

<!-- rwfMajorVer="14" rwfMinorVer="1" -->

        <refreshMsg domainType="RSSL_DMT_LOGIN" streamId="1" containerType="RSSL_DT_NO_DATA" flags="0x648 (RSSL_RFMF_HAS_MSG_KEY|RSSL_RFMF_REFRESH_COMPLETE|RSSL_RFMF_DO_NOT_CACHE|RSSL_RFMF_PRIVATE_STREAM)" groupId="0" dataState="RSSL_DATA_OK" streamState="RSSL_STREAM_OPEN" code="RSSL_SC_NONE" text="Login accepted by host HOSTNAME via HANDLERNAME"  dataSize="0">

            <key  flags="0x23 (RSSL_MKF_HAS_SERVICE_ID|RSSL_MKF_HAS_NAME|RSSL_MKF_HAS_ATTRIB)"  serviceId="10" name="USERNAME" attribContainerType="RSSL_DT_ELEMENT_LIST">

                <attrib>

                    <elementList flags="0x8 (RSSL_ELF_HAS_STANDARD_DATA)">

                        <elementEntry name="TRCE:MaxMessagesPerSecond" dataType="RSSL_DT_UINT" data="300"/>

                    </elementList>

                </attrib>

            </key>

            <dataBody>

            </dataBody>

        </refreshMsg>

    </dataBody>

</genericMsg>

The TRCE:MaxMessagesPerSecond element is present in all Contributions Channel environments and the message rate posted by the application must not exceed this number.  

Below is an example of a valid login refresh that takes the form of a RefreshMsg:

    	
            

AppClient::OnRefreshMsg :: RefreshMsg

    streamId="5"

    domain="Login Domain"

    RefreshComplete

    PrivateStream

    DoNotCache

    state="Open / Ok / None / 'Login accepted by host 452655c7cb54 via ip-10-28-149-90.eu-west-1.compute.internal'"

    itemGroup="00 00"

    name="RCC Username"

    Attrib dataType="ElementList"

        ElementList

            ElementEntry name="TRCE:MaxMessagesPerSecond" dataType="UInt" value="10000"

        ElementListEnd

    AttribEnd

RefreshMsgEnd

AppClient::OnRefreshMsg :: RefreshMsg

    streamId="5"

 

Contribute data using Post Messages

When the application receives the login refresh and the state of the login refresh is Open/OK, the application starts contributing data by sending post messages through the tunnel stream. The code that sends a post message is in the TunnelStreamClient::postData method.

    	
            

void TunnelStreamClient::onRefreshMsg(const RefreshMsg& refreshMsg, const OmmConsumerEvent& event)

{

       cout << "TunnelStreamClient onRefreshMsg" << endl;

       cout << refreshMsg << endl;

       if (refreshMsg.getDomainType() == MMT_LOGIN &&

             event.getHandle() == _subStreamHandle &&

             refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&

             refreshMsg.getState().getDataState() == OmmState::OkEnum)

       {

             _postStreamID = refreshMsg.getStreamId();

             postData();

 

       }

}

 

void TunnelStreamClient::postData() {

       cout << "Send First PostMsg()....." << endl;

       _pOmmConsumer->submit(PostMsg()

             .streamId(_postStreamID)

             .postId(_postID).domainType(MMT_MARKET_PRICE)

             .solicitAck(true).complete()

             .payload(UpdateMsg().streamId(_postStreamID)

                    .name(_rccPostRIC)

                    .payload(FieldList()

                           .addReal(22, _BID, OmmReal::ExponentNeg2Enum)

                           .addReal(25, _ASK, OmmReal::ExponentNeg2Enum)

                           .complete()))

             .complete(),

             _subStreamHandle);

 

       _postID++;

       _BID++;

       _ASK++;

}

The code creates a post message (PostMsg) with the following properties and values:

  • StreamId (the same as the Tunnel Login stream)
  • Domain Type of MarketPrice
  • Acknowledgment flag
  • Unique ID for this Post (PostId)
  • An update message in the payload of the post message to update an existing record
  • Post Item (RIC)
  • Data in the payload of the update message consisting of two fields - BID (22) and ASK (25)

Then, it submits the PostMsg using the handle of the Tunnel Login stream. Finally, it increments the Post IDs, BID value, and ASK value.

Note:

1.     The unique Post ID can be retrieved from the Ack response so the application can tie up the Ack to the Post and confirm that each Post has been acknowledged by the server

2.     RCC currently only allows applications to update the existing records on the server via an API. New records can be defined manually by an RCC administrator.

Because the code sets the solicited acknowledgment flag in the post message, the application will receive an acknowledgment message (AckMsg) back from the server shortly after the Post is submitted. The acknowledgment message will be received through the TunnelStreamClient::onAckMsg callback method. Below is a sample of the acknowledgment message. The value of Ack Id is 1 which matches the Post ID used in the first PostMsg:

    	
            

AppClient::onAckMsg :: AckMsg

    streamId="5"

    domain="MarketPrice Domain"

    ackId="1"

AckMsgEnd

    	
            

void TunnelStreamClient::onAckMsg(const AckMsg& ackMsg, const OmmConsumerEvent& event)

{

            cout << "TunnelStreamClient onAckMsg" << endl;

            cout << ackMsg << endl;         

            if (ackMsg.hasNackCode() == false)

            {

                        cout << "Posting Update....." << endl;

                        postData();

            }

 

}

From the code above, when the application receives an acknowledge message response, it checks the NACK code. If the NACK code is false, the application will send another post message.

Note that it is possible to receive a NACK response if the Post message is not accepted by the RCC server. For example, the application posts a message to an invalid RIC or updates fields that are not defined in the target RIC.   

Below is an example of a NACK response when posting to an invalid RIC:

    	
            

AppClient::onAckMsg :: AckMsg

    streamId="5"

    domain="MarketPrice Domain"

    ackId="1"

    nackCode="SymbolUnknown"

    text="Symbol unknown"

AckMsgEnd

Summary

Contrition Channel (RCC) is the Real-Time contribution system that allows users to contribute data directly to the Real-Time network. Users can use Real-Time SDKs or WebSocket APIs to develop applications to connect and contribute real-time data to RCC. This tutorial focuses on how to use EMA C++ to establish a connection to RCC, open a tunnel stream, log in to RCC and send post messages to contribute data.

A few points are worth mentioning here:   

  • The example contributes data to a single RIC by sending out the first post once it gets a valid Client Login Response, and then sending subsequent posts when it receives acknowledgment messages back from the server. In reality, the application will be most likely contributing data to several RICs.
  • If the TRCE:MaxMessagesPerSecond element presents in the login response, the message rate posted by the application should not exceed this number.
  • The application can get the login stream closed message with the “System busy” string in the status text from the tunnel stream after sending a login request message to login to RCC, as shown below.
    	
            

        <statusMsg domainType="RSSL_DMT_LOGIN" streamId="5" containerType="RSSL_DT_NO_DATA" flags="0xA0 (RSSL_STMF_HAS_STATE|RSSL_STMF_PRIVATE_STREAM)" dataState="RSSL_DATA_SUSPECT" streamState="RSSL_STREAM_CLOSED" code="RSSL_SC_NO_RESOURCES" text="System busy"  dataSize="0">

            <dataBody>

            </dataBody>

        </statusMsg>

This is not an error. It simply means that the server can't handle the request now.  This is normal for a cloud application. In this case, the connection will be disconnected and then EMA will recover the connection. Therefore, the application needs to open a tunnel and perform a client login to connect and contribute data to RCC.  Otherwise, the application can try to connect to other servers. 

Additional Resources

If you have any further questions I recommend you post them on our Developer
Forum or contact our Data Helpdesk

Existing Tutorials mentioned above:

Using The Tutorial Source Code

  • Windows

Consumer_VS141.vcxproj is for Visual Studio 2017 and Consumer_VS142.vcxproj is for Visual Studio 2019. The example has been built and tested with the Real-Time-SDK. Set the EMAInstall environment variable to the directory of Real-Time SDK.

  • Linux

Set the RTSDK_INSTALL_PATH environment variable to the directory of Real-Time SDK, such as /opt/lseg/Real-Time-SDK. Then modify the LIBTYPE, LINKTYPE, and PLATFORM variables in the Makefile

    	
            

LIBTYPE=Optimized

#If User wants to build with Shared libraries, change LINKTYPE=Shared

LINKTYPE=

 

#Specify Platform for compilers

PLATFORM=OL7_64_GCC485

...

The resulting Consumer executable does not use any arguments.