ETA Consumer - Retrieving and decoding level 1 content

Download tutorial source code

Click here to download

Last update Dec 2020
Compilers Refer to the ETA Compiler Guides for a complete list.
Prerequisites ETA Consumer - Establishing a session with a Provider

Introduction

The goal of this tutorial is to extend Tutorial 3 to request and retrieve level 1 (MarketPrice) data from the OMM-based data Provider. To support the level1 content, we offload the decoding of messages into separate files to allow for easy management and support for additional functionality.

Description

In this tutorial, we introduce two new files that will both play a role in decoding the level 1 data responses from the OMM-based data Provider.

  • basicDictionaryHandler

    Data messages received from the OMM-based Provider are packaged as RWF (Wire Format) messages. The RWF is a low-level, compact protocol which requires specific decoding in order to be properly interpreted and used within an application. To support this decoding, an application must load a dictionary of field definitions and utilize these definitions, via ETA function calls, to decode RWF messages.

  • basicMsgHandler

    The RDM (Domain Messages) defines multiple message models that are typically used within a Consumer application. Within this tutorial, we will be requesting for the level1 MarketPrice message model which contains a well-defined message structure. Because our RDM models utilize well-defined structures such as Lists, Maps, etc., our handler only needs to understand the structure to properly process the message.

Implementation Overview - Preparing, requesting and parsing level 1 data

Prior to requesting for level 1 data, we must ensure we can properly decode the RWF messages provided by the OMM-based Provider by first loading a dictionary. As previously mentioned, the dictionary represents the field definitions. Once loaded, we can safely interpret and decode the fields of data sent down.

Dictionary

A Consumer application has two choices to load a dictionary. Either choice has it's advantages but both effectively achieve the same functionality of decoding RWF messages.

  • Loading the dictionary from local files
    • Extremely useful when testing applications and adding user-defined field definitions.
  • Downloading the dictionary from the OMM-based Provider
    • Providers are constantly enhancing content and the associated dictionaries.  Applications can take advantage to automatically self discover new content.

In the second case, the dictionary download procedure is mostly handled via the Transport API Reactor. In this case, the dictionary download is completely dependent on the success of the Directory response we utilized in Tutorial 3. The following outlines the setup of loading a dictionary.

    	
            

void init()

{

 

    ...

 

    /* Load Dictionaries - local or server */

    consumerRole.dictionaryMsgCallback = loadLocalDictionary() ? NULL : dictionaryMsgCallback;

    consumerRole.dictionaryDownloadMode = (consumerRole.dictionaryMsgCallback ? 

                       RSSL_RC_DICTIONARY_DOWNLOAD_FIRST_AVAILABLE : 

                       RSSL_RC_DICTIONARY_DOWNLOAD_NONE);

 

    ...

 

}

From the above code segment, we instruct the reactor to download the dictionary from the OMM-based Provider by setting the Consumer's role dictionaryDownloadMode attribute appropriately. In addition, we also provide the callback to handle the response. The decision is entirely based on whether we can load the dictionary locally first. Note: the decision to provide both mechanisms to load our dictionary was simply to demonstrate both capabilities. If the files can be locally loaded, there is no need to download, otherwise, we attempt to download from the provider. The decision is driven by the loadLocalDictionary function defined within the new file "basicDictionaryHandler.c".

    	
            

/* field dictionary file name */

static const char *fieldDictionaryFileName = "RDMFieldDictionary";

 

/* enum table file name */

static const char *enumTableFileName = "enumtype.def";

 

/* Data Dictionary */

RsslDataDictionary dictionary;

 

BOOL loadLocalDictionary()

{

    char errTxt[256];

    RsslBuffer errorText = {255, (char*)errTxt};

    RsslBool fieldDictionaryLoaded = RSSL_FALSE, enumTableLoaded = RSSL_FALSE;

 

    /* Load dictionary from file if possible. Otherwise we will try to download it. */

    rsslClearDataDictionary(&dictionary);

    if (rsslLoadFieldDictionary(fieldDictionaryFileName, &dictionary, &errorText) < 0)

        printf("\nUnable to load field dictionary.  Will attempt to download from provider.\n\tError Text: %s\n", errorText.data);

    else

        fieldDictionaryLoaded = RSSL_TRUE;

 

    if (rsslLoadEnumTypeDictionary(enumTableFileName, &dictionary, &errorText) < 0)

        printf("\nUnable to load enum type dictionary.  Will attempt to download from provider.\n\tError Text: %s\n", errorText.data);

    else

        enumTableLoaded = RSSL_TRUE;

 

    if (!fieldDictionaryLoaded || !enumTableLoaded)

    {

        /* Dictionary could not be loaded from files. We will need to get them from the provider. */

        rsslDeleteDataDictionary(&dictionary);

        rsslClearDataDictionary(&dictionary);

    }

 

    return(fieldDictionaryLoaded && enumTableLoaded);

}

There are 2 local files that make up the dictionary, i.e.

  • RDMFieldDictionary

Contains all the known fields, types and sizes required to decode.

  • enumtype.def

Contains enumeration definitions for those fields with an Enum type defined within the RDMFieldDictionary file.

If the local dictionary files are available, the appropriate RSSL function calls rsslLoadFieldDictionary() and rsslLoadEnumTypeDictionary() perform the actions of loading the dictionary.

If no local dictionaries are available, we instruct the Transport API Reactor to attempt to download from the OMM-based Provider. This action is automatically handled by the Reactor once a valid Source Directory has been returned.

Upon a dictionary event message from the Provider, we handle the message within the following callback.

    	
            

RsslReactorCallbackRet dictionaryMsgCallback(RsslReactor *pReactor, RsslReactorChannel *pReactorChannel, RsslRDMDictionaryMsgEvent *pDictionaryMsgEvent)

{

 

    ...

 

    RsslReactorCallbackRet ret = RSSL_RC_CRET_SUCCESS;

 

    ret = processDictionaryResponse(pReactor, pReactorChannel, pDictionaryMsgEvent);

 

    if (ret == RSSL_RC_CRET_FAILURE)

        closeConnection(pReactor, pReactorChannel);

 

    return(ret);

}

The callback above instructs a dictionary processing action via the processDictionaryResponse function. Although we could have housed the entire processing within the "basicConsumer.c" module, the intention was to offload the functionality into the separate file "basicDictionaryHandler.c" for purposes of modularity and possible future enhancements.

While the processing details are important, the codebase and the decoding of dictionary definition elements are beyond the scope of this tutorial. The basic processing was derived from the example VAConsumer which will act as a good reference for additional processing requirements and explanation.
 

Level 1 data - MarketPrice

Once we have established our dictionary, we are ready to request for data. In Tutorial 2 - Establishing a connection to a data Provider it was a requirement to define the defaultMsgCallback callback handler. In that instance, we defined an empty event handler as there were no messages to process. We can now populate that with actual processing logic as we are requesting for data.

    	
            

/* Name of our market data service, item and domain of our subscription */

#define SERVICE "ELEKTRON_AD"

char itemName[] = "BB.TO";

RsslUInt8 domainType = RSSL_DMT_MARKET_PRICE;

 

RsslReactorCallbackRet channelEventCallback(RsslReactor *pReactor, RsslReactorChannel *pReactorChannel, RsslReactorChannelEvent *pConnEvent)

{

    switch (pConnEvent->channelEventType)

    {

        case RSSL_RC_CET_CHANNEL_UP:

        {

            ...

        }

        case RSSL_RC_CET_CHANNEL_READY:

            printf("Channel is ready.  Subscribing to %s\n", itemName);

            sendItemRequest(pReactor, pReactorChannel, itemName, service.serviceId, domainType);

            break;

 

        ...

    }

}

Within the existing callback channelEventCallback, the RSSL_RC_CET_CHANNEL_READY event is used to trigger the action of requesting for our data. The request of data in our example is to open a subscription or specify a STREAMING request. A streaming request will result in an initial image followed by updates. Based on the REFRESH event, the initial image contains the latest up-to-date values known for the specified item. The updates represent only those fields that result from change events.

To issue our streaming request, we call the newly defined function sendItemRequest(). Defined within the file "basicMsgHandler.c", this function is designed as a generic item request function that provides 3 key pieces of data, i.e.

  • itemName

The data item to request. Eg: AAPL.O (Apple).

  • serviceId

The service Id. The service Id is a token that corresponds to the service of data we are interested in. This token or ID was retrieved when we processed the Source Directory response from the OMM-based Provider.

  • domainType

The RDM domain type. In this tutorial we request for the level 1 data RSSL_DMT_MARKET_PRICE but future tutorials will utilize additional models.

    	
            

RsslReactorCallbackRet defaultMsgCallback(RsslReactor *pReactor, RsslReactorChannel *pChannel, RsslMsgEvent* pMsgEvent)

{

    RsslReactorCallbackRet ret = generalMsgCallback(pReactor, pChannel, pMsgEvent);

 

    if (ret == RSSL_RC_CRET_FAILURE)

    {

        /* Because this application is single - threaded, it is safe to call it inside callback functions. */

        closeConnection(pReactor, pChannel);

    }

 

    return(ret);

}

The "basicMsgHandler.c" file contains many processing functions to manage message decoding. Above we are executing the generalMsgCallback function which handles any message type. As mentioned in the Description section, the message processing intends to keep processing generic based on the structure of the messages, i.e. Lists, Maps, etc. versus the model of data, i.e. MarketPrice, SymbolList, MarketByPrice, etc. In doing so, we can simplify the processing and reduce the codebase to process multiple message models. Within the generalMsgCallback, we call this general RDM processing routine to handle these data structures. The following function outlines the general structure:

    	
            

RsslRet processRDMResponse(RsslReactorChannel *pReactorChannel, RsslMsg *msg, RsslDecodeIterator* dIter)

{

    RsslRet ret = 0;

 

    ...

 

    switch (msg->msgBase.msgClass)

    {

        case RSSL_MC_REFRESH:

            ...

        case RSSL_MC_UPDATE:

            ...

 

            switch (msg->msgBase.containerType)

            {

                case RSSL_DT_FIELD_LIST:

                    ret = decodeFieldList(getDictionary(), dIter);

                    break;

 

                default:

                    printf("ContainType: %i not handled\n", msg->msgBase.containerType);

            }

 

            break;

 

         ...

 

    }

 

    return(ret);

}

In the code segment above, our level 1 Market Price data is delivered within a FieldList message structure. As such, we walk through the list and extract the elements and decode the fields, using our loaded dictionary. The fields are simply echoed to the display to verify processing. As we encounter new message structures, we update the logic above to handle that processing. While the structure processing is important, it is beyond the scope of this tutorial. The basic processing was derived from the example VAConsumer which will act as a good reference for additional processing requirements.
 

Build and run

For these instructions, refer to the Build and Run section within the first tutorial of this series.

Note: Ensure you can connect into a valid Real-Time Distribution System to test. Defined within the basicConsumer.c file, these tutorials assume the following server has been defined:

    	
            

/* Server/Provider connection details */

#define RTDS_SERVER_NAME "rtds"

#define RTDS_SERVER_PORT "14002"

 

...

You can either define the server name 'rtds' within your environment to point to a valid market data server or simply modify these server configuration paramters to suit your setup. In our execution, we are downloading the dictionaries from the OMM-based Provider. You can see the receipt of the dictionaries, our request to subscribe to the item BB.TO and the corresponding response.

Note: We issued a streaming request which will result in an initial refresh followed by updates showing change based on the event.

    	
            

Item: BB.TO

State: Open/Ok/None - text: "All is well"

Domain: RSSL_DMT_MARKET_PRICE

        PROD_PERM           84

        RDNDISPLAY          65

        DSPLY_NAME          BLACKBERRY LTD

        RDN_EXCHID          TOR(10)

        TRDPRC_1            12.70

        

        ...

 

        IRGTIM              20:25:51:000:000:000

        OFF_CLS_DT          <blank data>

        CAN_DATE            04 JAN 2016

        BLKTRDVOL           12000

        RCS_AS_CL2          <blank data>

        CLA_CD_IND          TRD     (7)

        ACVOL_UNS           2124486

 

Received activity on channel: 428

 

Domain: RSSL_DMT_MARKET_PRICE

        CONDCODE_3          BY

        TRDVOL_1            200

        ACVOL_1             2124686

        ACVOL_UNS           2124686

        BUYER_ID            7

        SELLER_ID           7

        SALTIM              20:26:07:000:000:000

        TRDTIM_1            20:26:00:000:000:000

        SEQNUM              152607

        PRC_QL2             SKP(130)

        SEQ_NO              916329

        LSTSALCOND

        NETCHNG_1           -0.15

        PCTCHNG             -1.17

        PRCTCK_1            ↑(2)

        TRDPRC_1            12.69

        NUM_MOVES           6351

        VOL_X_PRC1          12.5684

        VWAP                12.5684

References

For more information, refer to the Transport API - C Development Guides.