ETA Consumer - Posting data to Contribution Channel

Download tutorial source code

Click here to download

Last update Feb 2023
Environment Windows, Linux
Compilers Refer to the ETA Compiler Guides for a complete list.
Prerequisites Downloaded, installed, compiled, and ran an ETA consumer example

Posting Directly to Contribution Channel (RCC)

Knowledge Prerequisite – Must have an understanding of an Enterprise Transport API (ETA) and be familiar with consuming OMM Market Price data. You should also have a basic understanding of the Reactor concept and associated topic; Tunnel Streams.

Contributing Data to a LSEG Real-Time Distribution System (RTDS).

Currently, clients wishing to Contribute data to LSEG, typically do so using on-site contribution systems such as MarketLinkIP.

In order to meet the evolving needs of the market, LSEG has developed a new Contribution service - Contribution Channel (RCC). The Contribution Channel is provided through cluster virtual servers that can provide data optimization & conflation capabilities as well as a much higher rate of content onto RTDS.

Whilst it will be possible in the near future to contribute to RCC via the RTDS, some of our clients use 3rd party systems and will, therefore, need to contribute directly to RCC.

To contribute directly to RCC you will need to develop an OMM Consumer application using one of our Real-Time SDKs (RTSDK) - i.e. ETA or the Enterprise Message API (EMA).

The C versions of ETA and EMA on both Windows and Linux platforms offer this functionality (it is supported on Windows since SDK version 1.2.0).

This article focuses on the process you will need to implement using ETA and is summarized as follows:

  1. Establish Encrypted Connection
  2. Perform a Device Login
  3. Open a Tunnel Stream
  4. Perform a Client Login
  5. Contribute data using Post Messages

For the purpose of this tutorial, a new ETA example application has been created based on the existing VAConsumer example provided by ETA package as the starting point for the tutorial code to contribute data to RCC.

Basically, VAConsumer delivered by ETA package is a feature-loaded by auxiliary resources to covers many areas such as Market by Price, Yield Curve, Queue Management, Cache Management, Tunnel Stream and etc.

VACosumerContribution example illustrated here has been simplified to depict the basic workflow required between ETA and TRCE

The source files are recommended to be downloaded, which will be easier to follow this tutorial along with them.

Establish Encrypted Connection

In this example, hostname and port are read via command-line arguments and stored in ChannelCommand – in the rsslChannelCommand.h header file.

    	
            

typedef struct

{

    RsslReactorChannel *reactorChannel;

    RsslReactorConnectOptions cOpts;

    RsslReactorConnectInfo cInfo;

    RsslReactorChannelRole *pRole;

    ...

 

    char hostName[MAX_BUFFER_LENGTH];

    char port[MAX_BUFFER_LENGTH];

} ChannelCommand;

An RsslReactorOMMConsumerRole structure will have been populated with the addresses of callback functions to handle various channel events. For example, the application assigns a function named channelEventCallback to handle RsslReactorChannelEvents, and a loginMsgCallback function to process RsslRDMLoginMsgs received.

    	
            

static RsslReactor *pReactor = NULL;

...

 

/*** MAIN ***/

int main(int argc, char **argv)

{

    RsslRet ret;

    RsslCreateReactorOptions reactorOpts;

    RsslErrorInfo rsslErrorInfo;

    int i;

 

    RsslReactorOMMConsumerRole consumerRole;

    RsslRDMLoginRequest loginRequest;

    RsslReactorDispatchOptions dispatchOpts;

    ...

 

    /* Setup callback functions and use them on all connections*/

    rsslClearOMMConsumerRole(&consumerRole);

    consumerRole.loginMsgCallback = loginMsgCallback;

    consumerRole.base.channelEventCallback = channelEventCallback;

    consumerRole.base.defaultMsgCallback = defaultMsgCallback;

Refer to the snippet code above, the application creates the RsslReactorOMMConsumerRole, and registers callback functions. This will be stored in the ChannelCommand structure mentioned earlier.

Set the encryption protocol flags as RSSL_ENC_TLSV1_2 and connection Type as RSSL_CONN_TYPE_ENCRYPTED in RsslReactorConnectOptions  which will establish secure TLS (Transport Layer Security) encrypted connection using port 443.

    	
            

..
ChannelCommand *pCommand = &chanCommands[i];

RsslReactorConnectOptions *pOpts = &pCommand->cOpts;

RsslReactorConnectInfo *pInfo = &pCommand->cInfo;

 

pCommand->pRole = (RsslReactorChannelRole*)&consumerRole;

pInfo->rsslConnectOptions.guaranteedOutputBuffers = 500;

pInfo->rsslConnectOptions.majorVersion = RSSL_RWF_MAJOR_VERSION;

pInfo->rsslConnectOptions.minorVersion = RSSL_RWF_MINOR_VERSION;

pInfo->rsslConnectOptions.userSpecPtr = &chanCommands[i];

pInfo->initializationTimeout = 30;

pOpts->reactorConnectionList = pInfo;

pOpts->connectionCount = 1;

pOpts->reconnectAttemptLimit = -1;

pOpts->reconnectMaxDelay = 5000;

pOpts->reconnectMinDelay = 1000;

 

pOpts->rsslConnectOptions.userSpecPtr = pCommand;

pOpts->rsslConnectOptions.connectionInfo.unified.address     = pCommand->hostName;

pOpts->rsslConnectOptions.connectionInfo.unified.serviceName = pCommand->port;

pInfo->rsslConnectOptions.connectionType = RSSL_CONN_TYPE_ENCRYPTED;

pInfo->rsslConnectOptions.encryptionOpts.encryptedProtocol = RSSL_CONN_TYPE_SOCKET;

Note that developers should be coding against the UAT Environment while developing applications.

These servers must be used with LSEG Real-Time SDK applications with the RSSL_CONN_TYPE_ENCRYPTED connection type and RSSL_CONN_TYPE_SOCKET encrypted protocol type. 

Note: In the future, RCC will not support the RSSL_CONN_TYPE_HTTP encrypted protocol type (rsslConnectOptions.encryptionOpts.encryptedProtocol). Please use the RSSL_CONN_TYPE_SOCKET encrypted protocol type when connecting to RCC.

Then, the application calls rsslReactorConnect to create a new outbound connection. rsslReactorConnect will create an OMM consumer-type connection using the provided configuration and role information.

    	
            

/* Create an RsslReactor which will manage our channels. */

rsslClearCreateReactorOptions(&reactorOpts);

if (!(pReactor = rsslCreateReactor(&reactorOpts, &rsslErrorInfo)))

{

    printf("Error: %s", rsslErrorInfo.rsslError.text);

    exit(-1);

}

 

...

 

/* Establish connection with Reactor and attributes from ChannelCommand structure */

if (rsslReactorConnect(pReactor, &pCommand->cOpts, pCommand->pRole, &rsslErrorInfo) != RSSL_RET_SUCCESS)

{

    printf("Error rsslReactorConnect(): %s\n", rsslErrorInfo.rsslError.text);

}

After establishing the underlying connection, a channel event is returned to the application’s RsslReactorChannelEventCallback; this provides the RsslReactorChannel and the state of the current connection.

When a RSSL_RC_CET_CHANNEL_READY event is received by the ChannelEventCallback method as this function has been registered in the RsslReactorOMMConsumerRole.base.channelEventCallback statement earlier.

Perform a Device Login

The RsslReactorOMMConsumerRole structure will have been populated with the addresses of callback functions to handle various channel events.

When an RSSL_RC_CET_CHANNEL_READY event is received by the ChannelEventCallback method a communication stream can be opened. The application then authenticates to the connecting server by sending a Login request message.

    	
            

RsslReactorCallbackRet channelEventCallback(RsslReactor *pReactor, RsslReactorChannel *pReactorChannel, RsslReactorChannelEvent *pConnEvent)
{
    ...

    case RSSL_RC_CET_CHANNEL_READY:


    {
        pCommand->reactorChannelReady = RSSL_TRUE;


        return RSSL_RC_CRET_SUCCESS;


    }


    ...

 

    return RSSL_RC_CRET_SUCCESS;

}

In the example code, it sets a flag to convey that the connection between ETA and the server has been established. So, the main loop will check this logic whether it should continue sending a login request or not.

    	
            

/* Main loop. The loop

 * calls select() to wait for notification, then calls rsslReactorDispatch(). */

do

{

    ...

    if (chanCommands[i].reactorChannelReady != RSSL_TRUE)

    {

        continue;

    }

 

 

    ...

 

    if (chanCommands[i].canSendLoginReissue == RSSL_TRUE && currentTime >= (RsslInt)(chanCommands[i].loginReissueTime))

    {

        RsslReactorSubmitMsgOptions submitMsgOpts;

        RsslErrorInfo rsslErrorInfo;

 

        rsslClearReactorSubmitMsgOptions(&submitMsgOpts);

        submitMsgOpts.pRDMMsg = (RsslRDMMsg*)chanCommands[i].pRole->ommConsumerRole.pLoginRequest;

        if ((ret = rsslReactorSubmitMsg(pReactor,chanCommands[i] .reactorChannel,&submitMsgOpts,&rsslErrorInfo)) != RSSL_RET_SUCCESS)

        {

            printf("Login reissue failed: %d(%s)\n", ret, rsslErrorInfo.rsslError.text);

        }

        else

        {

            printf("Login reissue sent\n");

        }

        chanCommands[i].canSendLoginReissue = RSSL_FALSE;

    }

}

So, if the indicator flag is not set, the application will skip the sending login part statement – rsslReactorSubmitMsg in the snippet code above. The login request message has been created at the beginning of the main program. Please refer to the snippet code below:

    	
            

RsslRDMLoginRequest loginRequest;

...

 

/* Initialize the default login request(Use 1 as the Login Stream ID). */

if (rsslInitDefaultRDMLoginRequest(&loginRequest, 1) != RSSL_RET_SUCCESS)

{

    printf("rsslInitDefaultRDMLoginRequest() failed\n");

    cleanUpAndExit(-1);

}

 

/* If a username was specified, change username on login request. */

if (userName.length)

    loginRequest.userName = userName;

 

/* If an authentication Token was specified, set it on the login request and set the user name

   type to RDM_LOGIN_USER_AUTHN_TOKEN */

if (authnToken.length)

{

    loginRequest.flags |= RDM_LG_RQF_HAS_USERNAME_TYPE;

    loginRequest.userName = authnToken;

    loginRequest.userNameType = RDM_LOGIN_USER_AUTHN_TOKEN;

    if (authnExtended.length)

    {

        loginRequest.flags |= RDM_LG_RQF_HAS_AUTHN_EXTENDED;

        loginRequest.authenticationExtended = authnExtended;

    }

}

 

if (appId.length)

{

    loginRequest.flags |= RDM_LG_RQF_HAS_APPLICATION_ID;

    loginRequest.applicationId = appId;

}

 

...

/* Set the messages to send when the channel is up */

consumerRole.pLoginRequest = &loginRequest;

Open a Tunnel Stream

Initialize an RsslTunnelStreamOpenOptions structure (refer to tunnelStreamhandler.c), it is important that the stream id is set to 1, and then call rsslReactorOpenTunnelStream.

The RsslTunnelStreamOpenOptions structure will have been populated with the addresses of callback functions to handle various events.

A market price request should be sent to the tunnel stream to register when an RSSL_STREAM_OPEN event state is received by the status event callback function tunnelStreamStatusEventCallback

    	
            

RsslRet ret;

RsslTunnelStreamOpenOptions tunnelStreamOpenOptions;

RsslErrorInfo errorInfo;

 

...

 

rsslClearTunnelStreamOpenOptions(&tunnelStreamOpenOptions);

tunnelStreamOpenOptions.name = pTunnelHandler->consumerName;

tunnelStreamOpenOptions.streamId = TUNNEL_STREAM_STREAM_ID;

tunnelStreamOpenOptions.domainType = pTunnelHandler->domainType; tunnelStreamOpenOptions.serviceId = pTunnelHandler->serviceId; tunnelStreamOpenOptions.statusEventCallback = tunnelStreamStatusEventCallback;

tunnelStreamOpenOptions.defaultMsgCallback = pTunnelHandler->defaultMsgCallback;

tunnelStreamOpenOptions.classOfService.dataIntegrity.type = RDM_COS_DI_RELIABLE;

tunnelStreamOpenOptions.classOfService.flowControl.type = RDM_COS_FC_BIDIRECTIONAL;

tunnelStreamOpenOptions.userSpecPtr = pTunnelHandler;

 

...

 

if ((ret = rsslReactorOpenTunnelStream(pReactorChannel, &tunnelStreamOpenOptions, &errorInfo)) != RSSL_RET_SUCCESS)

{

    printf("rsslReactorOpenTunnelStream failed: %s(%s)\n", rsslRetCodeToString(ret), &errorInfo.rsslError.text);

    return;

}

Perform a Client Login

When the Tunnel Stream is established, registered Status Event Callback will be triggered to the application and this is the place to send out the Client login request by credentials using API (rsslTunnelStreamSubmit).

    	
            

/* Callback for tunnel stream status events. */

/* Process a tunnel stream status event for the TunnelStreamHandler. */

RsslReactorCallbackRet tunnelStreamStatusEventCallback(RsslTunnelStream *pTunnelStream, RsslTunnelStreamStatusEvent *pEvent)

{

    RsslState *pState = pEvent->pState;

    ...

 

    switch (pState->streamState)

    {

        case RSSL_STREAM_OPEN:

        {

            if (pState->dataState == RSSL_DATA_OK && pTunnelHandler->pTunnelStream == NULL)

            {

                if (pEvent->pAuthInfo)

                {

                    printf(" Client was authenticated.\n");

                }

                pTunnelHandler->pTunnelStream = pTunnelStream;

                pTunnelHandler->processTunnelStreamOpened(pTunnelStream);

 

                sendTunnelStreamAuthenticataion(pTunnelHandler);

            }

            break;

        }

        case RSSL_STREAM_CLOSED:

        ...

The Login request message has been created and sent in the sendTunnelStreamAuthenticataion function. This function will encode RCC Client credentials - which consist of an RCC username and a password - both of which will be supplied by your Market Data team or LSEG Account team. The password can be encrypted and locally stored in the configuration or Windows registry. Then, the password must be decrypted before using it in the login message.  The credentials are protected by the TLS encrypted connection. 

    	
            

/* After a tunnel stream channel is established, send a request message with trceUsername and trcePassword to the server */

void sendTunnelStreamAuthenticataion(TunnelStreamHandler *pTunnelHandler)

{

    ...

    msg.msgBase.msgKey.flags = msgKeyFlags;

    msg.msgBase.msgKey.name = pTunnelHandler->trceUsername;

    msg.msgBase.msgKey.attribContainerType = RSSL_DT_ELEMENT_LIST;

    

    ...

 

    RsslElementEntry ee = RSSL_INIT_ELEMENT_ENTRY;

    ee.dataType = RSSL_DT_ASCII_STRING; ee.name.length = sizeof("Password") - 1;

    ee.name.data = "Password"; RsslRet rsslRet = rsslEncodeElementEntry(&encodeIter, &ee, &(pTunnelHandler->trcePassword));

    rsslRet = rsslEncodeElementListComplete(&encodeIter, RSSL_TRUE);

 

    ...

 

}

Contribute data using Post Messages

A post message can be created and sent when an RSSL_MC_REFRESH message is received by the TunnelStreamDefaultMsgCallback function. The address of this function will have been provided to the RsslTunnelStreamOpenOptions structure as above.

Before that, the application verifies the client's login response to see whether the client login is successful or not. The client's login response will include a maximum message rate in the refresh response and this message rate must be obeyed. This message looks like:

    	
            

<!-- End Message (Channel IPC descriptor = 8) -->

<!-- Incoming Message (Channel IPC descriptor = 8) -->

<!-- Time: 18:04:41:526 -->

<!-- rwfMajorVer="14" rwfMinorVer="1" -->

<genericMsg domainType="RSSL_DMT_SYSTEM" streamId="1" containerType="RSSL_DT_MSG" flags="0x19 (RSSL_GNMF_HAS_EXTENDED_HEADER|RSSL_GNMF_HAS_SEQ_NUM|RSSL_GNMF_MESSAGE_COMPLETE)" seqNum="1" dataSize="177">

    <extendedHeader data="01"/>

    <dataBody>

        <!-- rwfMajorVer="14" rwfMinorVer="1" -->

        <refreshMsg domainType="RSSL_DMT_LOGIN" streamId="1" containerType="RSSL_DT_NO_DATA" flags="0x668 (RSSL_RFMF_HAS_MSG_KEY|RSSL_RFMF_SOLICITED|RSSL_RFMF_REFRESH_COMPLETE|RSSL_RFMF_DO_NOT_CACHE|RSSL_RFMF_PRIVATE_STREAM)" groupId="0" dataState="RSSL_DATA_OK" streamState="RSSL_STREAM_OPEN" code="RSSL_SC_NONE" text="Login accepted by host c454cybdbcat5" dataSize="0">

            <key flags="0x22 (RSSL_MKF_HAS_NAME|RSSL_MKF_HAS_ATTRIB)" name="<RCC Username>" attribContainerType="RSSL_DT_ELEMENT_LIST">

                <attrib>

                    <elementList flags="0x8 (RSSL_ELF_HAS_STANDARD_DATA)">

                        <elementEntry name="TRCE:MaxMessagesPerSecond" dataType="RSSL_DT_UINT" data="300"/>

                    </elementList>

                </attrib>

            </key>

            <dataBody> </dataBody>

        </refreshMsg>

    </dataBody>

</genericMsg>

The TRCE:MaxMessagesPerSecond element is present in all Contributions Channel environments and the message rate posted by the connecting application must not exceed this number.    

The response above also shows that the Client Login was accepted and the stream is Open. From the application perspective, it needs to check the registered callback method to tunnelStreamOpenOptions.defaultMsgCallback. For this application, it uses simpleTunnelMsgHandlerConsumerMsgCallback (refer to simpleTunnelMsgHandler.c), so this method has a logic to check whether the incoming message is a refresh message or not. 

    	
            

RsslReactorCallbackRet simpleTunnelMsgHandlerConsumerMsgCallback(RsslTunnelStream *pTunnelStream, RsslTunnelStreamMsgEvent *pEvent)

{

    switch (pEvent->containerType)

    {

        case RSSL_DT_MSG:

        {

            RsslMsg *msg = pEvent->pRsslMsg;

            switch (msg->msgBase.msgClass)

            {

                case RSSL_MC_REFRESH:

                {

                    printf("simpleTunnelMsgHandlerConsumerMsgCallback: received refresh message %s\n\n", msg->refreshMsg.state.text.data);

                    if (msg->refreshMsg.state.streamState == RSSL_STREAM_OPEN && msg->refreshMsg.state.dataState == RSSL_DATA_OK)

                    {

                        SimpleTunnelMsgHandler *pSimpleTunnelMsgHandler = (SimpleTunnelMsgHandler*)pTunnelStream->userSpecPtr;

                        pSimpleTunnelMsgHandler->tunnelStreamAuthSucceeded = RSSL_TRUE;                         initItemData(pSimpleTunnelMsgHandler);

                    }

 

                    ...

The logic above will also set a tunnelStreamAuthSucceeded flag to be true, so It’ll convey the application to be able to send post messages through the Tunnel Stream channel.

    	
            

void handleSimpleTunnelMsgHandler(RsslReactor *pReactor, RsslReactorChannel *pReactorChannel, SimpleTunnelMsgHandler *pSimpleTunnelMsgHandler)

{

    time_t currentTime = 0;

    RsslTunnelStream *pTunnelStream = pSimpleTunnelMsgHandler->tunnelStreamHandler.pTunnelStream;

    handleTunnelStreamHandler(pReactor, pReactorChannel, &pSimpleTunnelMsgHandler->tunnelStreamHandler);

    if (pTunnelStream == NULL)

        return;

 

    /* Don't try to send messages if tunnel is not established. */

    if (pTunnelStream->state.streamState != RSSL_STREAM_OPEN || pTunnelStream->state.dataState != RSSL_DATA_OK)

        return;

 

    /* If tunnel is open and some time has passed, send a message. */

    time(&currentTime);

    if (currentTime >= pSimpleTunnelMsgHandler->nextMsgTime && pSimpleTunnelMsgHandler->tunnelStreamAuthSucceeded)

    {         

        simpleTunnelMsgHandlerSendMessage(pSimpleTunnelMsgHandler);

        pSimpleTunnelMsgHandler->nextMsgTime = currentTime + (time_t)TUNNEL_MSG_FREQUENCY;

    }

}

So, the code above will call the simpleTunnelMsgHandlerSendMessage() function to construct a POST message. The POST message has to be populated using RsslPostMsg structure and encoded using rsslEncodeMsgInit.

    	
            

static void simpleTunnelMsgHandlerSendMessage(SimpleTunnelMsgHandler *pSimpleTunnelMsgHandler)

{

    ...

 

    RsslTunnelStream *pTunnelStream = pSimpleTunnelMsgHandler->tunnelStreamHandler.pTunnelStream;

 

    ...

 

    /* Write text as the data body. */

    RsslPostMsg postMsg = RSSL_INIT_POST_MSG;

 

    /* set-up message */

    postMsg.msgBase.msgClass = RSSL_MC_POST;

    postMsg.msgBase.streamId = 1;

    postMsg.msgBase.domainType = RSSL_DMT_MARKET_PRICE;

    postMsg.msgBase.containerType = RSSL_DT_MSG;

    postMsg.flags = RSSL_PSMF_POST_COMPLETE

                  | RSSL_PSMF_ACK // request ACK

                  | RSSL_PSMF_HAS_POST_ID

                  | RSSL_PSMF_HAS_SEQ_NUM

                  | RSSL_PSMF_HAS_POST_USER_RIGHTS

                  | RSSL_PSMF_HAS_MSG_KEY;

 

    postMsg.postId = pSimpleTunnelMsgHandler->nextPostId++;

    postMsg.postUserRights = RSSL_PSUR_CREATE | RSSL_PSUR_DELETE;

 

    ...

 

    // encode message

    if ((ret = rsslSetEncodeIteratorBuffer(&eIter, pBuffer)) < RSSL_RET_SUCCESS)

    {

        printf("rsslSetEncodeIteratorBuffer() failed with return code: %d\n", ret);

        return;

    }

    rsslSetEncodeIteratorRWFVersion(&eIter, pTunnelStream->classOfService.common.protocolMajorVersion, pTunnelStream->classOfService.common.protocolMinorVersion);

 

    // Convey that there will be a nested MARKET_PRICE UPDATE message in the outer POST message

    ret = rsslEncodeMsgInit(&eIter, (RsslMsg *)&postMsg, 0);

    if (ret < RSSL_RET_SUCCESS)

    {

        printf("encodePostWithMsg: rsslEncodeMsgInit() failed with return code: %d\n", ret);

        return;

    }

 

    // Encode MARKET_PRICE UPDATE message and its fields.

    ret = encodePostMarketPriceResponse(&eIter, &(pSimpleTunnelMsgHandler->itemData));

 

    ...

OMM MarketPrice has to be sent within the post message. In order to send OMM Market price, the structure RsslUpdateMsg has to be populated and encoded using rsslEncodeMsgInit. 

    	
            

RsslRet encodePostMarketPriceResponse(RsslEncodeIterator *encodeIter, ItemData *itemData)

{

    RsslRet ret = 0;

    RsslUpdateMsg updateMsg = RSSL_INIT_UPDATE_MSG;

    RsslMsgBase* msgBase;

    RsslMsg* msg;

    char stateText[128];

    char errTxt[256];

    RsslBuffer errorText = { 255, (char*)errTxt };

 

    /* set-up message */

    msgBase = &updateMsg.msgBase;

    msgBase->msgClass = RSSL_MC_UPDATE;

 

    /* include msg key in updates for non-interactive provider streams */

    updateMsg.flags = RSSL_UPMF_HAS_MSG_KEY;

 

    msgBase->msgKey.flags = RSSL_MKF_HAS_NAME | RSSL_MKF_HAS_NAME_TYPE;

    msgBase->domainType = RSSL_DMT_MARKET_PRICE;

    msgBase->containerType = RSSL_DT_FIELD_LIST;

 

    /* StreamId */

    msgBase->streamId = 0;

 

    /* Itemname */

    msgBase->msgKey.name = itemData->itemName;

    msgBase->msgKey.nameType = RDM_INSTRUMENT_NAME_TYPE_RIC;

    msg = (RsslMsg *)&updateMsg;

 

    /* encode message */

    if ((ret = rsslEncodeMsgInit(encodeIter, msg, 0)) < RSSL_RET_SUCCESS)

    {

        printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);

        return ret;

    }

 

    RsslFieldList fList = RSSL_INIT_FIELD_LIST;

    RsslFieldEntry fEntry = RSSL_INIT_FIELD_ENTRY;

 

    /* encode field list */

    fList.flags = RSSL_FLF_HAS_STANDARD_DATA;

    if ((ret = rsslEncodeFieldListInit(encodeIter, &fList, 0, 0)) < RSSL_RET_SUCCESS)

    {

        printf("rsslEncodeFieldListInit() failed with return code: %d\n", ret);

        return ret;

    }

 

    /* encode fields */

    /* BID */

    rsslClearFieldEntry(&fEntry);

    fEntry.fieldId = 22;

    fEntry.dataType = RSSL_DT_REAL;

 

    if ((ret = rsslEncodeFieldEntry(encodeIter, &fEntry, (void*)&(itemData->bid))) < RSSL_RET_SUCCESS)

    {

        printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

        return ret;

    }

 

    /* ASK */

    rsslClearFieldEntry(&fEntry);

    fEntry.fieldId = 25;

    fEntry.dataType = RSSL_DT_REAL;

    if ((ret = rsslEncodeFieldEntry(encodeIter, &fEntry, (void*)&(itemData->ask))) < RSSL_RET_SUCCESS)

    {

        printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

        return ret;

    }

 

    ...

The snippet code above shows the application encodes the update message consists of BID and ASK fields in the FieldList structure as a payload of this nested message.

Finally, the encoded Post message has to be sent out using rsslTunnelStreamSubmit.

    	
            

static void simpleTunnelMsgHandlerSendMessage(SimpleTunnelMsgHandler *pSimpleTunnelMsgHandler)

{

    ...

 

    // Encode MARKET_PRICE UPDATE message and its fields.

    ret = encodePostMarketPriceResponse(&eIter, &(pSimpleTunnelMsgHandler->itemData));

    ...

 

    /* Message encoding complete; submit it. */

    rsslClearTunnelStreamSubmitOptions(&submitOpts);

    submitOpts.containerType = RSSL_DT_MSG;

    if ((ret = rsslTunnelStreamSubmit(pTunnelStream, pBuffer, &submitOpts, &errorInfo)) != RSSL_RET_SUCCESS)

    {

        printf("rsslTunnelStreamSubmit(): Failed <%s>\n", errorInfo.rsslError.text);

        if ((ret2 = rsslTunnelStreamReleaseBuffer(pBuffer, &errorInfo)) != RSSL_RET_SUCCESS)

            printf("rsslTunnelStreamReleaseBuffer(): Failed <%d:%s>\n", ret2, errorInfo.rsslError.text);

        return;

    }

 

    ++pSimpleTunnelMsgHandler->msgCount;

    updateItemData(&(pSimpleTunnelMsgHandler->itemData));

}

Remark: Every encodes initialize function has to be completed by invoking the corresponding rssl encode complete API. For example, encoding function of rsslEncodeMsgInit has to be completed by calling rsslEncodeMsgComplete).

A few things to note here:

  1. The unique Post ID can be retrieved from the Ack response you get back - so you can tie up the Ack to the Post and confirm that each Post has been acknowledged by the server
  2. RCC currently only allows the Update of existing records on the server via an API. New records can be defined manually by an RCC administrator.

Once a PostMsg has been submitted, the application should get back an Acknowledgement from the server via the same registered callback method, which is simpleTunnelMsgHandlerConsumerMsgCallback in this case.

    	
            

RsslReactorCallbackRet simpleTunnelMsgHandlerConsumerMsgCallback(RsslTunnelStream *pTunnelStream, RsslTunnelStreamMsgEvent *pEvent)

{

    switch (pEvent->containerType)

    {

        case RSSL_DT_MSG:

        {

            RsslMsg *msg = pEvent->pRsslMsg;

 

            switch (msg->msgBase.msgClass)

            {

                case RSSL_MC_REFRESH:

                {

                    ...

                }

                break;

                case RSSL_MC_STATUS:

                {

                    ...

                }

                break;

                case RSSL_MC_ACK:

                {

                    printf("simpleTunnelMsgHandlerConsumerMsgCallback: received ack message\n");

                    printf("\tget an ack message with ackId:%d\n", msg->ackMsg.ackId);

                    if (msg->ackMsg.flags & RSSL_AKMF_HAS_NAK_CODE)                         printf("\t\t- NAK code\t= %d\n", msg->ackMsg.nakCode);

 

                    if (msg->ackMsg.flags & RSSL_AKMF_HAS_TEXT)

                    {

                        printf("\t\t- text\t\t= %.*s\n", msg->ackMsg.text.length, msg->ackMsg.text.data);

                    }

 

                    printf("\n");

                }

                break;

 

                ...

Note: It is possible to receive a Nack response if the Post is not accepted by the RCC server e.g. if the application uses an invalid RIC or tries to update Fields that are not defined in the target RIC.

Here this is the example of a Nack response that contains various reasons:

    	
            

<!-- End Message (Channel IPC descriptor = 8) -->

<!-- Incoming Message (Channel IPC descriptor = 8) -->

<!-- Time: 18:33:38:770 -->

<!-- rwfMajorVer="14" rwfMinorVer="1" -->

<genericMsg domainType="RSSL_DMT_SYSTEM" streamId="1" containerType="RSSL_DT_MSG" flags="0x19 (RSSL_GNMF_HAS_EXTENDED_HEADER|RSSL_GNMF_HAS_SEQ_NUM|RSSL_GNMF_MESSAGE_COMPLETE)" seqNum="2" dataSize="34">

     <extendedHeader data="01"/>

    <dataBody>

        <!-- rwfMajorVer="14" rwfMinorVer="1" -->

        <ackMsg domainType="RSSL_DMT_MARKET_PRICE" streamId="1" containerType="RSSL_DT_NO_DATA" flags="0x2A (RSSL_AKMF_HAS_TEXT|RSSL_AKMF_HAS_SEQ_NUM|RSSL_AKMF_HAS_NAK_CODE)" ackId="0" nakCode="RSSL_NAKC_SYMBOL_UNKNOWN" text="Symbol unknown" seqNum="0" dataSize="0">

            <dataBody> </dataBody>

        </ackMsg>

    </dataBody>

</genericMsg>

 

<!-- End Message (Channel IPC descriptor = 8) -->

simpleTunnelMsgHandlerConsumerMsgCallback: received ack message

        get an ack message with ackId:0

                - NAK code = 10

                - text = Symbol unknown

 

<!-- End Message (Channel IPC descriptor = 8) -->

<!-- Incoming Message (Channel IPC descriptor = 8) -->

<!-- Time: 18:35:43:281 -->

<!-- rwfMajorVer="14" rwfMinorVer="1" -->

<genericMsg domainType="RSSL_DMT_SYSTEM" streamId="1" containerType="RSSL_DT_MSG" flags="0x19 (RSSL_GNMF_HAS_EXTENDED_HEADER|RSSL_GNMF_HAS_SEQ_NUM|RSSL_GNMF_MESSAGE_COMPLETE)" seqNum="4" dataSize="33">

    <extendedHeader data="01"/>

    <dataBody>

        <!-- rwfMajorVer="14" rwfMinorVer="1" -->

        <ackMsg domainType="RSSL_DMT_MARKET_PRICE" streamId="1" containerType="RSSL_DT_NO_DATA" flags="0x2A (RSSL_AKMF_HAS_TEXT|RSSL_AKMF_HAS_SEQ_NUM|RSSL_AKMF_HAS_NAK_CODE)" ackId="2" nakCode="RSSL_NAKC_DENIED_BY_SRC" text="No such field" seqNum="0" dataSize="0">

            <dataBody> </dataBody>

        </ackMsg>

    </dataBody>

</genericMsg>

 

<!-- End Message (Channel IPC descriptor = 8) -->

simpleTunnelMsgHandlerConsumerMsgCallback: received ack message

        get an ack message with ackId:2

                - NAK code = 2

                - text = No such field

Closing summary

A few points worth noting here:

  • The application provides updates for a single RIC, sending out the first Post once it gets a valid Client Login Response and then Posting an update periodically while monitoring an AckMsg back from the server. In reality, the application will be most likely contributing data to several RICs and therefore, it can submit multiple Posts at any time once the connection is established and receives a successful Client Login response. The trigger for sending further updates out etc will be as and when data need to be performed
  • As this is a simple tutorial, the application doesn't verify that each Post message was acknowledged. In a Production application, the application may well want to verify that it receives an AckMsg back for each PostMsg submitted. As mentioned, this can be done by comparing the AckId in the response to the PostId set in each outgoing PostMsg.
  • This tutorial updates the same two BID and ASK fields in all Posts, however, you should only send out those Fields that have changed since your previous submission.
  • If the TRCE:MaxMessagesPerSecond element presents in the login response, the message rate posted by the application should not exceed this number.
  • The application can get the login stream closed message with the “System busy” string in the status text from the tunnel stream after sending a login request message to login to RCC, as shown below.
    	
            

        <statusMsg domainType="RSSL_DMT_LOGIN" streamId="5" containerType="RSSL_DT_NO_DATA" flags="0xA0 (RSSL_STMF_HAS_STATE|RSSL_STMF_PRIVATE_STREAM)" dataState="RSSL_DATA_SUSPECT" streamState="RSSL_STREAM_CLOSED" code="RSSL_SC_NO_RESOURCES" text="System busy"  dataSize="0">

            <dataBody>

            </dataBody>

        </statusMsg>

This is not an error. It simply means that the server can't handle the request now.  This is normal for a cloud application. In this case, the connection will be disconnected and then ETA VA will recover the connection. Therefore, the application needs to open a tunnel and perform a client login to connect and contribute data to RCC.  Otherwise, the application can try to connect to other servers. 

Additional Resources

If you have any further questions, kindly post them on our Developer Forum or contact our Data Helpdesk

Existing Example mentioned above:

  • \Eta\Applications\Examples\VAConsumer

Building an application from the tutorial source code (RTSDK - C/C++)

For Windows
  • Extract the zip file and place the source folder: VAConsumerContribution in the same path as ETA's Examples directory.
  • Set an environment named EtaInstallPath, and point to an <RTSDK_ROOT_FOLDER>\Cpp-C\Eta folder
  • Open a Visual Studio 2017 project (VAConsumerContribution_VS141.vcxproj), build and run the project.
For Linux
  • Extract the output folder to the <RTSDK_ROOT_FOLDER>\Cpp-C\Eta\Applications\Examples folder. It contains a makefile used for application build.
  • In the VAConsumerContribution folder, build the application with the following command.
    	
            gmake
        
        
    
  • The executable file will be generated in the following executable directory.
  • > <tutorial install dir>/<executable dir>/vaconsumercontribution
  • Eg: > ./OL6_64_GCC444/Optimized/vaconsumercontribution   (runs the statically build version using GCC ver 4.4.4 for Oracle Linux 6.0)

 

The tutorial application's arguments

Here, there is a list of arguments for this tutorial application.

    	
            

$vaconsumercontribution ?

Unknown option: ?

Usage: OL7_64_GCC482/Optimized/vaconsumercontribution or OL7_64_GCC482/Optimized/vaconsumercontribution [-tcp [<hostname>:<port>]] [-uname <LoginUsername>] [-tunnel] [-trceUser <username>] [-trcePass <password>] [-trcePostItem <itemName>] [-x] [-runtime <seconds>]

 

     -tcp specifies a connection to open and a list of items to request:

            hostname: Hostname of provider to connect to

            port: Port of provider to connect to

                A comma-separated list of these may be specified.

                Example Usage: -tcp localhost:14002

    -uname changes the username used when logging into the provider.

    -tunnel causes the consumer to open a tunnel stream that exchanges basic messages.

    -trceUser is a username to be used to for AAA authentication.

    -trcePass is a obfuscated to be used to for AAA authentication.

    -trcePostItem is an itemName for Post messages sent to RCC server.

    -x provides an XML trace of messages.

    -runtime adjusts the running time of the application.[piyasak@apis43 VAConsumerContribution]$

For example:

    	
            -tcp <RCC_Hostname>:443 -uname <username> -tunnel -trceUser <RCC Username> -trcePass <RCC Password> -trcePostItem <Post Item> -x