Download tutorial source code |
Click here to download |
---|---|
Last update | Aug 2024 |
Environment | Windows, Linux |
Compilers | Refer to the EMA Compiler Guides for a complete list |
Prerequisite | Downloaded, installed, compiled, and ran an EMA consumer example |
Knowledge Prerequisite – Must have an understanding of an RTSDK API and be familiar with consuming OMM Market Price data. You should also have a basic understanding of EMA Configuration (see the end of this article for links to relevant tutorials)
Currently, clients can contribute data to the contribution system called Contribution Channel. The Contributions Channel is provided through a cluster of virtual servers that can provide data optimization & conflation capabilities as well as a much higher rate of content onto Real-Time.
To contribute directly to Contribution Channel, you can develop an OMM Consumer application using one of our RTSDKs, i.e. Enterprise Transport API (ETA), Enterprise Message API (EMA), or WebSocket API. Now, RTSDK supports both Windows and Linux platforms.
This article focuses on the application flows used to develop an EMA application to connect and contribute data to Contribution Channel (RCC).
As with any OMM Consumer application, the first thing to do is establishing a connection to the Contribution Channel server.
int main(int argc, char* argv[])
{
try {
AppClient client;
const EmaString RCC_SERVICE = "<RCC Service>";
const EmaString RCC_USERNAME = "<RCC Username>";
const EmaString RCC_PASSWORD = "<RCC Password>";
const EmaString RCC_POST_RIC = "<Post Item>";
TunnelStreamClient tunnelClient(RCC_SERVICE, RCC_USERNAME, RCC_PASSWORD, RCC_POST_RIC);
client.setTunnelStreamClient(tunnelClient);
OmmConsumer consumer(OmmConsumerConfig().username("rmds"));
tunnelClient.setOmmConsumer(consumer);
UInt64 loginHandle = consumer.registerClient(ReqMsg().domainType(MMT_LOGIN), client);
UInt64 directoryHandle = consumer.registerClient(ReqMsg().domainType(MMT_DIRECTORY).serviceName(RCC_SERVICE), client);
while(true)
sleep(600000);
catch (const OmmException& excp) {
cout << excp << endl;
}
return 0;
}
The code loads the default configurations, attempts to establish a connection, and performs a login with the provided username. It gets the RCC service name, RCC username, RCC password, and post RIC from variables so the values of these variables must be updated.
It also creates instances of callback client classes (AppClient and TunnelStreamClient) and registers the AppClient to receive login and source directory-related messages.
The connection settings are in the EMA configuration file (EMAConfig.xml). It uses the connection settings from the Consumer_1 and Channel_1 configurations.
<ConsumerGroup>
<DefaultConsumer value="Consumer_1"/>
<ConsumerList>
<Consumer>
<Name value="Consumer_1"/>
<Channel value="Channel_1"/>
...
...
</Consumer>
</ConsumerList>
</ConsumerGroup>
<ChannelGroup>
...
...
<ChannelGroup>
<ChannelList>
<Channel>
<Name value="Channel_1"/>
<ChannelType value="ChannelType::RSSL_ENCRYPTED"/>
<Host value="<RCC_Hostname>"/>
<Port value="443"/>
<EncryptedProtocolType value="EncryptedProtocolType::RSSL_SOCKET"/>
...
...
</Channel>
</ChannelList>
</ChannelGroup>
The Channel_1 uses the RSSL_ENCRYPTED connection type and RSSL_SOCKET encrypted protocol type which requires the OpenSSL library. The hostname for the RCC server in the EMAConfig.xml configuration file must be specified. Please see chapter 4.3.3: Encrypted Connections in the EMA Developer's Guide for more information.
Note that developers should be coding against the UAT Environment while developing applications.
The servers must be used with Real-Time SDK applications with the RSSL_ENCRYPTED connection type and RSSL_SOCKET encrypted protocol type.
Note: In the future, RCC will not support the RSSL_HTTP encrypted protocol type. Please use the RSSL_SOCKET encrypted protocol type when connecting to RCC.
The application opens a tunnel stream when it receives the login and source directory response. The code is in the TunnelStreamClient::startTunnelStream method.
void TunnelStreamClient::startTunnelStream() {
cout << "Start Tunnel Stream" << endl;
CosAuthentication cosAuthentication;
cosAuthentication.type(CosAuthentication::NotRequiredEnum);
CosDataIntegrity cosDataIntegrity;
cosDataIntegrity.type(CosDataIntegrity::ReliableEnum);
CosFlowControl cosFlowControl;
cosFlowControl.type(CosFlowControl::BidirectionalEnum).recvWindowSize(12288).sendWindowSize(1200);
CosGuarantee cosGuarantee;
cosGuarantee.type(CosGuarantee::NoneEnum);
ClassOfService cos;
cos.authentication(cosAuthentication).dataIntegrity(cosDataIntegrity).flowControl(cosFlowControl).guarantee(cosGuarantee);
TunnelStreamRequest tsr;
tsr.classOfService(cos).domainType(MMT_SYSTEM).name("Tunnel").serviceName(_serviceName);
_tunnelStreamHandle = _pOmmConsumer->registerClient(tsr, *this);
}
A Tunnel Stream is a private stream that supports additional functionalities such as authentication, flow control, guaranteed delivery etc - which can be specified on a per-stream basis.
The code creates a Tunnel Stream request by providing an arbitrary name ('Tunnel') and the RCC service name. The Domain type of the tunnel stream is MMT_SYSTEM which is a system domain for content-neutral requests (i.e. non-data domains). Finally, the code registers the TunnelStreamClient as the callback client to receive events related to the Tunnel Stream and store the Tunnel Stream handle into a variable.
When the Tunnel Stream has been established, the application will get a status message with a domain type of MMT_SYSTEM and stream state of Open, as shown below.
AppClient::OnStatusMsg :: StatusMsg
streamId="5"
domain="System Domain"
PrivateStream
ClearCache
state="Open / Ok / None / ''"
name="Seikan"
serviceId="10"
serviceName="DDS_TRCE"
StatusMsgEnd
void TunnelStreamClient::onStatusMsg(const StatusMsg& statusMsg, const OmmConsumerEvent& event)
{
cout << "TunnelStreamClient onStatusMsg" << endl;
cout << statusMsg << endl;
if (event.getHandle() == _tunnelStreamHandle &&
statusMsg.hasState() &&
statusMsg.getState().getStreamState() == OmmState::OpenEnum)
{
cout << "************* Login to RCC ********************" << statusMsg.getStreamId() << endl;
loginTunnelStream(statusMsg.getStreamId());
}
}
void TunnelStreamClient::loginTunnelStream(refinitiv::ema::access::Int32 streamId) {
ElementList elmList;
elmList.addAscii("Password", _rccPassword).complete();
ReqMsg reqMsg;
reqMsg.domainType(MMT_LOGIN)
.name(_rccUserName)
.attrib(elmList)
.streamId(streamId);
cout << endl << "Submit GenericMsg from onStatus" << endl;
_subStreamHandle = _pOmmConsumer->registerClient(reqMsg, *this, (void*)1, _tunnelStreamHandle);
}
The code gets the status message of the tunnel stream via the TunnelStreamClient::onStatusMsg method. It compares the handle in the message to the handle of the tunnel stream. If the handles are equal and the stream state is open, the application will call the TunnelStreamClient::loginTunnelStream method to send the tunnel login request message. The tunnel login request message has the following attributes:
It also registers this TunnelStreamClient instance as the callback client to receive events related to the tunnel login and stores a stream handle in a variable.
If the credential is valid, the application will receive a login refresh message from RCC through the tunnel stream and the login state is Open/OK. The login refresh message will include a maximum message rate and this message rate must be used to limit the number of messages sent by the application. This message looks like:
<genericMsg domainType="RSSL_DMT_SYSTEM" streamId="3" containerType="RSSL_DT_MSG" flags="0x19 (RSSL_GNMF_HAS_EXTENDED_HEADER|RSSL_GNMF_HAS_SEQ_NUM|RSSL_GNMF_MESSAGE_COMPLETE)" seqNum="1" dataSize="127">
<extendedHeader data="0100"/>
<dataBody>
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<refreshMsg domainType="RSSL_DMT_LOGIN" streamId="1" containerType="RSSL_DT_NO_DATA" flags="0x648 (RSSL_RFMF_HAS_MSG_KEY|RSSL_RFMF_REFRESH_COMPLETE|RSSL_RFMF_DO_NOT_CACHE|RSSL_RFMF_PRIVATE_STREAM)" groupId="0" dataState="RSSL_DATA_OK" streamState="RSSL_STREAM_OPEN" code="RSSL_SC_NONE" text="Login accepted by host HOSTNAME via HANDLERNAME" dataSize="0">
<key flags="0x23 (RSSL_MKF_HAS_SERVICE_ID|RSSL_MKF_HAS_NAME|RSSL_MKF_HAS_ATTRIB)" serviceId="10" name="USERNAME" attribContainerType="RSSL_DT_ELEMENT_LIST">
<attrib>
<elementList flags="0x8 (RSSL_ELF_HAS_STANDARD_DATA)">
<elementEntry name="TRCE:MaxMessagesPerSecond" dataType="RSSL_DT_UINT" data="300"/>
</elementList>
</attrib>
</key>
<dataBody>
</dataBody>
</refreshMsg>
</dataBody>
</genericMsg>
The TRCE:MaxMessagesPerSecond element is present in all Contributions Channel environments and the message rate posted by the application must not exceed this number.
Below is an example of a valid login refresh that takes the form of a RefreshMsg:
AppClient::OnRefreshMsg :: RefreshMsg
streamId="5"
domain="Login Domain"
RefreshComplete
PrivateStream
DoNotCache
state="Open / Ok / None / 'Login accepted by host 452655c7cb54 via ip-10-28-149-90.eu-west-1.compute.internal'"
itemGroup="00 00"
name="RCC Username"
Attrib dataType="ElementList"
ElementList
ElementEntry name="TRCE:MaxMessagesPerSecond" dataType="UInt" value="10000"
ElementListEnd
AttribEnd
RefreshMsgEnd
AppClient::OnRefreshMsg :: RefreshMsg
streamId="5"
When the application receives the login refresh and the state of the login refresh is Open/OK, the application starts contributing data by sending post messages through the tunnel stream. The code that sends a post message is in the TunnelStreamClient::postData method.
void TunnelStreamClient::onRefreshMsg(const RefreshMsg& refreshMsg, const OmmConsumerEvent& event)
{
cout << "TunnelStreamClient onRefreshMsg" << endl;
cout << refreshMsg << endl;
if (refreshMsg.getDomainType() == MMT_LOGIN &&
event.getHandle() == _subStreamHandle &&
refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&
refreshMsg.getState().getDataState() == OmmState::OkEnum)
{
_postStreamID = refreshMsg.getStreamId();
postData();
}
}
void TunnelStreamClient::postData() {
cout << "Send First PostMsg()....." << endl;
_pOmmConsumer->submit(PostMsg()
.streamId(_postStreamID)
.postId(_postID).domainType(MMT_MARKET_PRICE)
.solicitAck(true).complete()
.payload(UpdateMsg().streamId(_postStreamID)
.name(_rccPostRIC)
.payload(FieldList()
.addReal(22, _BID, OmmReal::ExponentNeg2Enum)
.addReal(25, _ASK, OmmReal::ExponentNeg2Enum)
.complete()))
.complete(),
_subStreamHandle);
_postID++;
_BID++;
_ASK++;
}
The code creates a post message (PostMsg) with the following properties and values:
Then, it submits the PostMsg using the handle of the Tunnel Login stream. Finally, it increments the Post IDs, BID value, and ASK value.
Note:
1. The unique Post ID can be retrieved from the Ack response so the application can tie up the Ack to the Post and confirm that each Post has been acknowledged by the server
2. RCC currently only allows applications to update the existing records on the server via an API. New records can be defined manually by an RCC administrator.
Because the code sets the solicited acknowledgment flag in the post message, the application will receive an acknowledgment message (AckMsg) back from the server shortly after the Post is submitted. The acknowledgment message will be received through the TunnelStreamClient::onAckMsg callback method. Below is a sample of the acknowledgment message. The value of Ack Id is 1 which matches the Post ID used in the first PostMsg:
AppClient::onAckMsg :: AckMsg
streamId="5"
domain="MarketPrice Domain"
ackId="1"
AckMsgEnd
void TunnelStreamClient::onAckMsg(const AckMsg& ackMsg, const OmmConsumerEvent& event)
{
cout << "TunnelStreamClient onAckMsg" << endl;
cout << ackMsg << endl;
if (ackMsg.hasNackCode() == false)
{
cout << "Posting Update....." << endl;
postData();
}
}
From the code above, when the application receives an acknowledge message response, it checks the NACK code. If the NACK code is false, the application will send another post message.
Note that it is possible to receive a NACK response if the Post message is not accepted by the RCC server. For example, the application posts a message to an invalid RIC or updates fields that are not defined in the target RIC.
Below is an example of a NACK response when posting to an invalid RIC:
AppClient::onAckMsg :: AckMsg
streamId="5"
domain="MarketPrice Domain"
ackId="1"
nackCode="SymbolUnknown"
text="Symbol unknown"
AckMsgEnd
Contrition Channel (RCC) is the Real-Time contribution system that allows users to contribute data directly to the Real-Time network. Users can use Real-Time SDKs or WebSocket APIs to develop applications to connect and contribute real-time data to RCC. This tutorial focuses on how to use EMA C++ to establish a connection to RCC, open a tunnel stream, log in to RCC and send post messages to contribute data.
A few points are worth mentioning here:
<statusMsg domainType="RSSL_DMT_LOGIN" streamId="5" containerType="RSSL_DT_NO_DATA" flags="0xA0 (RSSL_STMF_HAS_STATE|RSSL_STMF_PRIVATE_STREAM)" dataState="RSSL_DATA_SUSPECT" streamState="RSSL_STREAM_CLOSED" code="RSSL_SC_NO_RESOURCES" text="System busy" dataSize="0">
<dataBody>
</dataBody>
</statusMsg>
This is not an error. It simply means that the server can't handle the request now. This is normal for a cloud application. In this case, the connection will be disconnected and then EMA will recover the connection. Therefore, the application needs to open a tunnel and perform a client login to connect and contribute data to RCC. Otherwise, the application can try to connect to other servers.
If you have any further questions I recommend you post them on our Developer
Forum or contact our Data Helpdesk
Existing Tutorials mentioned above:
Consumer_VS141.vcxproj is for Visual Studio 2017 and Consumer_VS142.vcxproj is for Visual Studio 2019. The example has been built and tested with the Real-Time-SDK. Set the EMAInstall environment variable to the directory of Real-Time SDK.
Set the RTSDK_INSTALL_PATH environment variable to the directory of Real-Time SDK, such as /opt/lseg/Real-Time-SDK. Then modify the LIBTYPE, LINKTYPE, and PLATFORM variables in the Makefile.
LIBTYPE=Optimized
#If User wants to build with Shared libraries, change LINKTYPE=Shared
LINKTYPE=
#Specify Platform for compilers
PLATFORM=OL7_64_GCC485
...
The resulting Consumer executable does not use any arguments.