EMA Consumer - Request & Decode Machine Readable News

Download tutorial source code

Click here to download

Last update Aug 2024
Compilers Tutorial demonstration: Visual Studio 2019,  2022
Prerequisites Complete the first 4 EMA Consumer tutorials in this series

Introduction

This tutorial assumes you are familiar with consuming MarketPrice data from LSEG Real-Time Distribution System (RTDS) using EMA C++ and as a minimum, you have worked through one or both of the following:

  • EMA Consumer Tutorials - Requesting and Decoding MarketPrice - on the LSEG Developer portal
  • The 100 Series & 200 Series MarketPrice Examples provided with the EMA package. 

If you have not worked through at least one of the above or are not familiar with the concepts covered, it is strongly recommended that you complete the EMA Consumer tutorials first.

For this MRN tutorial we will use the source code from the 200 Series MarketPrice Streaming example (Consumer200) provided with the EMA package as the starting point. We will amend/add the additional required code to illustrate how to Request and Decode News Text Analytics (NTA) domain from a Machine Readable News (MRN) service.

The News Text Analytics domain is designed for publishing large complex nested data structures over the ERT network using a FieldList-based Envelope. 

Whilst RTDS can deliver large payloads, to ensure optimized transport of NTA data items (which can extend to several kilobytes), NTA data items may be fragmented and delivered as multiple envelope messages. Each envelope message has several fields to hold metadata and one field to hold the actual data (fragment) itself. To further optimize the message transportation, the NTA data item is converted to a JSON UTF-8 string, compressed using zlib prior to splitting into fragments (if required) as part of the message encoding process. The consumer needs to re-assemble the fragments and unzip the completed buffer before it can be used. 

In view of the above, processing a response to NTA request is more involved than a typical MarketPrice response.

Description

The News Text Analytics Domain Model describes how the NTA data items are modeled using OMM. It makes use of the FieldList OMM container for the metadata and the zlib library to decompress the compressed NTA fragments.

Initial Refresh Message

The Initial Refresh response does not contain any NTA data – but the relevant feed related or other static Fields are populated with appropriate values e.g., a MRN_STORY Refresh could contain: 

Item Name: MRN_STORY
Service Name: LDN_RSSL_DDS_CHE
Item State: Open / Ok / None / 'All is well'
Fid: 1 Name = PROD_PERM DataType: UInt Value: 10001
Fid: 17 Name = ACTIV_DATE DataType: Date Value: 10 / 11 / 2015
Fid: 259 Name = RECORDTYPE DataType: UInt Value: 30
Fid: 1709 Name = RDN_EXCHD2 DataType: Enum Value: 1370
Fid: 4148 Name = TIMACT_MS DataType: UInt Value: 72588565
Fid: 4271 Name = GUID DataType: Rmtes Value:<BLANK>
Fid: 5357 Name = CONTEXT_ID DataType: Real Value: 3752
Fid: 6401 Name = DDS_DSO_ID DataType: UInt Value: 12424
Fid: 6480 Name = SPS_SP_RIC DataType: Ascii Value: .[SPSML1L1
Fid: 8506 Name = MRN_V_MAJ DataType: Rmtes Value: 2
Fid: 8593 Name = MRN_TYPE DataType: Rmtes Value: STORY
Fid: 11787 Name = MRN_V_MIN DataType: Rmtes Value: 10
Fid: 12215 Name = MRN_SRC DataType: Rmtes Value: DTC_QA_A
Fid: 32479 Name = FRAG_NUM DataType: UInt Value: 0
Fid: 32480 Name = TOT_SIZE DataType: UInt Value: 0
Fid: 32641 Name = FRAGMENT DataType: Buffer Value:<BLANK>

As you can see, the FRAGMENT field which would contain the NTA data fragments is empty – and the GUID, FRAG_NUM and TOT_SIZE fields are zero or empty.

Update Messages

However, in the subsequent Update messages, these fields will be populated - but none of the feed related / static fields (contained in the Refresh response) will be, for example: 

RIC: MRN_STORY
    FieldEntry TIMACT_MS (4148): 37698507
    FieldEntry ACTIV_DATE (17): 22/10/2015
    FieldEntry MRN_TYPE (8593): "STORY"
    FieldEntry MRN_V_MAJ (8506): "2"
    FieldEntry MRN_V_MIN (11787): "10"
    FieldEntry TOT_SIZE (32480): 1006
    FieldEntry FRAG_NUM (32479): 1
    FieldEntry GUID (4271): "HKS6BMNj5_1510222UAxU7Co7ts6yZmhm642yYMvKxt3AXKDp1zPwS"
    FieldEntry MRN_SRC (12215): "DTC_QA_A"
    FieldEntry FRAGMENT (32641): ZIPPED
...

Some of the key Fields to note:

  • MRN_TYPE : Type of NTA item - Story, News Analytics, News Sentiment Indices
  • TOT_SIZE : Total size in bytes of fragmented data
  • FRAG_NUM : Sequential fragment number
  • GUID : Globally Unique Identifier for the NTA item
  • MRN_SRC : Source component that published this NTA item
  • FRAGMENT : zlib compressed data fragment

Multi Fragment Items

The other point to note is that (for a Multi fragment item), Update messages with FRAG_NUM >1 will have fewer FIDs as the metadata is included in the first Update message (FRAG_NUM=1) for that item as shown below:

Item Name: MRN_STORY
Service Name: LDN_RSSL_DDS_CHE
Fid: 4148 Name = TIMACT_MS DataType: UInt Value: 50280294
Fid: 17 Name = ACTIV_DATE DataType: Date Value: 12 / 11 / 2015
Fid: 8593 Name = MRN_TYPE DataType: Rmtes Value: STORY
Fid: 8506 Name = MRN_V_MAJ DataType: Rmtes Value: 2
Fid: 11787 Name = MRN_V_MIN DataType: Rmtes Value: 10
Fid: 32480 Name = TOT_SIZE DataType: UInt Value: 3059
Fid: 32479 Name = FRAG_NUM DataType: UInt Value: 1
Fid: 4271 Name = GUID DataType: Rmtes Value: Bw73VkFYa_15111222URmqNNopwCrTUW5e+
KSc8TSRST4giTBtY069
Fid: 12215 Name = MRN_SRC DataType: Rmtes Value: DTC_QA_A
Fid: 32641 Name = FRAGMENT DataType: Buffer Value: <BUFFER>

Init:: Expected total buffer:3059 current size:2600

Item Name: MRN_STORY
Service Name: LDN_RSSL_DDS_CHE
Fid: 4271 Name = GUID DataType: Rmtes Value: Bw73VkFYa_15111222URmqNNopwCrTUW5e+
KSc8TSRST4giTBtY069
Fid: 12215 Name = MRN_SRC DataType: Rmtes Value: DTC_QA_A
Fid: 32479 Name = FRAG_NUM DataType: UInt Value: 2
Fid: 32641 Name = FRAGMENT DataType: Buffer Value: <BUFFER>

Addtional fragment size: 459
Add Fragment:: Expected total buffer:3059 current 3059
<<Multi Fragment complete>>

In the above example, you can see that only the essential fields are repeated i.e. the unique identifier, source service name, fragment number, and of course the fragment payload. We will use these fields later when re-assembling fragments of a multi fragment NTA item to ensure that we are appending fragments to the correct NTA item.   

Decoding & Re-assembly Overview

As well as using standard RTDS Status & State indicators to identify connectivity or data issues, the FRAG_NUM and TOT_SIZE fields should be used to detect missing fragments.
 
The FRAG_NUM FID is set to 1 for the first Update of each item and is incremented in each subsequent Update for that item. This allows you to detect a missing fragment (and ensure the correct order of the fragments for re-assembly). 

Additionally, the TOT_SIZE contains the total size of the complete set of fragmented data in bytes. By comparing TOT_SIZE with the sum of the fragment sizes received, it should allow you to confirm when all the fragments for an item have been received and the NTA item is complete.

Using the FRAG_NUM and TOT_SIZE to detect outages requires the consumer to implement timeout functionality because we need to allow time for the missing fragments to reach the consumer before assuming they have been missed. This functionality is outside the scope of this tutorial.

Finally, as the FRAGMENT field contains compressed data, we will need to use the zlib library to decompress the payload to access the true data content. Note that for a multi-fragment NTA item, you will have to ensure you have received all the fragments before unzipping the complete multi-fragment buffer.

To request NTA domain items and decode & re-assemble the NTA item we will modify the Consumer200 example application as follows:

  1. Explicitly specify NEWS_TEXT_ANALYTICS as the domain model type when creating our ReqMsg.
  2. Specify “MRN_STORY” as the RIC Code for the ReqMsg (to request NTA type of  Story)
  3. Modify the onRefreshMsg and onUpdateMsg handlers to only accept messages with a NEWS_TEXT_ANALYTICS domain type.
  4. Create a new class to represent a News Text Analytics item with some helper methods to assemble multi fragment items.
  5. Process and decode the FieldList payload of NEWS_TEXT_ANALYTICS message. 

NewsTextAnalytics Msg Model Type Request & Response Handlers

To request a News Text Analytics domain model MRN_STORY item we need to change the main() method to explicitly specify the domain and specify the item name when creating our ReqMsg:

    	
            

int main( int argc, char* argv[] )

...

// Request NTA domain MRN_STORY type items 

consumer.registerClient(ReqMsg().serviceName("ELEKTRON").name("MRN_STORY").domainType(MMT_NEWS_TEXT_ANALYTICS),client);

...

 

}

Next we modify the AppClient::onRefreshMsg and AppClient::onUpdateMsg methods to only accept the NEWS_TEXT_ANALYTICS as a valid domain type:

    	
            

// RefreshMsg contains feed specific metadata - will not contain NTA item fragment 

void AppClient::onRefreshMsg( const RefreshMsg& refreshMsg, const OmmConsumerEvent& ) 

{

if ( refreshMsg.hasMsgKey() )

cout << endl << "Item Name: " << refreshMsg.getName() << endl << "Service Name: " << refreshMsg.getServiceName();

 

cout << endl << "Item State: " << refreshMsg.getState().toString() << endl;

 

// Confirm the payload and domain type and dump the metadata Fields to the console

if (DataType::FieldListEnum == refreshMsg.getPayload().getDataType() && (MMT_NEWS_TEXT_ANALYTICS == refreshMsg.getDomainType()))

decode(refreshMsg.getPayload().getFieldList());

else

cerr << endl << "Wrong Domain and/or Payload " << refreshMsg.getDomainType() << " " << refreshMsg.getPayload().getDataType() << endl;

}

As mentioned earlier, the Refresh message does not contain any NTA data fragments so we can use the existing AppClient::decode() method to output the meta data fields to the console.

For the Update message however, we will need to use a custom decodeNTA() method which we will define later on: 

    	
            

// UpdateMsg contains NTA item fragment and item related metadata 

void AppClient::onUpdateMsg( const UpdateMsg& updateMsg, const OmmConsumerEvent& ) 

{

if ( updateMsg.hasMsgKey() )

cout << endl << "Item Name: " << updateMsg.getName() << endl << "Service Name: " << updateMsg.getServiceName() << endl;

// Confirm the payload and domain type is as expected and then attempt to extract the NTA item 

if (DataType::FieldListEnum == updateMsg.getPayload().getDataType() && (MMT_NEWS_TEXT_ANALYTICS == updateMsg.getDomainType()))

decodeNTA(updateMsg.getPayload().getFieldList());

else

cerr << endl << "Wrong Domain and/or Payload " << updateMsg.getDomainType() << " " << updateMsg.getPayload().getDataType() << endl;

 

}

In its standard form the existing AppClient::decode() method would dump any compressed FRAGMENT field contents as garbage to the console. To avoid this we can change the AppClient::decode() method to skip the output of any Buffer type fields:

    	
            

void AppClient::decode( const FieldList& fl )

{

...

case DataType::BufferEnum:

cout << "<BUFFER>" << endl;

break;

...

 

}

In addition to the FRAGMENT field, a number of other fields are used to deliver the metadata for the NTA item. So, we need methods to extract the metadata and to re-assemble the data fragment(s). 

To ease readability and simplify the AppClient methods we can create a new Class to represent the NTA item with some methods to help with the decoding and re-assembly of the NTA data fragments.

The NTAItem Class

The key functions of the Class simplify the decoding process and the re-assembly of multi-fragment messages.

    	
            

class NTAItem

{

    public:

        NTAItem();

        ~NTAItem();

 

        // Initialise instance with Update field list

        void init(const refinitiv::ema::access::FieldList& envelope);

        

        // Append an additional fragments to this instance

        void addFragment(const NTAItem& fragment);

        

        // Have we received all the fragments i.e. TOT_SIZE number of bytes ?

        bool isComplete() const { return (_expectedSize > 0) && (_expectedSize == _ntaBuffer.size()); };

 

        const std::string& getBuffer() const { return _ntaBuffer; };

 

        const refinitiv::ema::access::EmaString& getGUID() const { return _guid; };

        const refinitiv::ema::access::EmaString& getSource() const{ return _mrnSource; };

        const refinitiv::ema::access::EmaString& getType() const{ return _mrnType; };

        refinitiv::ema::access::UInt64 getFragNum() const { return _fragNum; };

 

    private:

        std::string _ntaBuffer;

        refinitiv::ema::access::EmaString _guid;

        refinitiv::ema::access::EmaString _mrnSource;

        refinitiv::ema::access::EmaString _mrnType;

        refinitiv::ema::access::UInt64 _fragNum;

        refinitiv::ema::access::UInt64 _expectedSize;

 

};

For multi-fragment messages we need a method to check the validity of the received fragment and to build up the fragment buffer.

    	
            

void NTAItem::addFragment(const NTAItem& fragment)

{

    if (fragment._guid != _guid)

        throw std::runtime_error("Cannot add fragment to item: mismatching GUID");

    if (fragment._mrnSource != _mrnSource) 

        throw std::runtime_error("Cannot add fragment to item: mismatching data source");

    if (fragment._fragNum != _fragNum + 1) 

        throw std::runtime_error("Cannot add fragment to item: fragment number is not in sequence");

 

    _fragNum = fragment._fragNum;

    _ntaBuffer.append(fragment._ntaBuffer.begin(), fragment._ntaBuffer.end());

 

    cout << "Add Fragment:: Expected total buffer:" << _expectedSize << " current " << _ntaBuffer.size() << std::endl;

}

Decode Update Message Envelope

With the NTA class now defined, we can continue with the decoding of the Update message to extract the fields, build up the NTA item and output to console when complete:

    	
            

// Handle an Update message which contains a NTA Item Fragment

void AppClient::decodeNTA(const FieldList& envelope)

{

    cout << "DecodeNTA" << endl;

 

    // Dump the FieldList first for informational purposes

    decode(envelope);

 

    NTAItem item;

    // Now lets extract the relevant fields into a NTA Item - including the Fragment

    item.init(envelope);

 

    // Is this 1st fragment of a NTA Item?

    if (item.getFragNum() == 1)

    {

        // Is it the complete NTA Item ?

        if (item.isComplete()) // i.e. check length of this single fragment buffer == expected total size of NTA Item

        {

            std::cout << "<<Single Fragment >>" << endl;

            string unzipped;

            unzip(item.getBuffer(), unzipped); // decompress the buffer

            std::cout << unzipped << endl << endl; // Display the decompressed NTA Item

            return;

        }

        else

        { // Start of a multi-fragment item

            _multiFragItem = item; // so copy this first fragment into our pending item instance

        }

            

    }

    else

    {

        ...

    }

}

As you can see, if the NTA item is completed in a single fragment, we can immediately unzip and output to the screen. 

However, if FRAG_NUM=1 and buffer is incomplete then this is the first of a multi-fragment message. We initialize our multi-fragment storage instance with this initial set of fields and buffer, in expectation of one or more additional Update messages

Build up Multi Fragment until complete

When we receive a subsequent Update message, we extract the fragment and add it to the multi-fragment storage instance until all fragments have been received i.e. the total buffer size equals the expected total size. Once the buffer is complete we can unzip the full buffer and output to the screen.

    	
            

// Handle an Update message which contains a NTA Item Fragment

void AppClient::decodeNTA(const FieldList& envelope)

{

    ...

 

    // Is this 1st fragment of a NTA Item?

    if (item.getFragNum() == 1)

    {

        ...

    }

    else

    { // FRAG_NUM > 1 so continue to build up NTA Item

        _multiFragItem.addFragment(item); // Add the newly received fragment to the pending instance

        if (_multiFragItem.isComplete()) // Does length of the buffer now == expected total size of NTA Item ?

        {

            std::cout << "<<Multi Fragment complete>>" << endl;

            string unzipped;

            unzip(_multiFragItem.getBuffer(), unzipped); // Decompress the complete buffer

            std::cout << unzipped << endl << endl; // Display the decompressed NTA Item

            return;

        }

    }

}

The NTA item output is in the form of a JSON UTF-8 string.

We will not include the unzip code here as it is not EMA specific - the full method can be found in the accompanying source files.

Build and Run

Follow the same steps in previous tutorials to build and run.  If you encounter any compile or linker errors, amend the project settings to reference the correct file paths for your local installation.

As the connectivity parameters, service name, etc are hardcoded into the main() method you will need to amend these to reflect your environment before running the application. Additionally, the supplied EmaConfig.xml has been configured to use a Channel Dictionary – i.e. downloaded from the server. To use a local dictionary you will need to amend the file accordingly.

The RIC code has been hardcoded to request “MRN_STORY” items – this can be changed to request alternative items such as MRN_TRNA, MRN_TRSI.

In the example output shown below, we requested News Text Analytics Story items. 

Example output from an MRN_STORY response

    	
            <<Single Fragment >>: { "Type":"STORY", "MajorVersion":"2", "MinorVersion":"10", "Items":[ {"altId":"nWNAB08PBR","audiences":["NP:E","NP:FMA","NP:U"],"body":"","firstCreat ed":"2015-10-16T12:00:21.000Z","headline":"ABM TO REDUCE SOUTHERN CALIFORNIA FED ERAL BUILDINGS' ENERGY AND OPERATING COSTS BY NEARLY $50 MILLION<ABM.N>","id":"W NAB08PBR_1510161zh3TLBHso9X5kvOXyLTUxfQwv9rMnrJYOyiZq","instancesOf":[],"languag e":"en","mimeType":"text/plain","provider":"NS:RTRS","pubStatus":"stat:usable"," subjects":["R:ABM.N","P:5000007999","B:195","B:34","B:43","B:49","BL:52","G:4"," G:6J","M:NY","M:Z","N2:BSUP","N2:INDS","N2:ISER","N2:CMSS","N2:BUS","N2:AMERS"," N2:US","N2:BLR","N2:CMPNY"],"takeSequence":1,"urgency":1,"versionCreated":"2015- 10-16T12:00:21.000Z"} ] }
        
        
    

Note: You may occasionally see what looks like garbage output in the completed NTA console output; this is due to foreign language characters which cannot be represented correctly on the console. Displaying this using the appropriate character set is outside the scope of this tutorial. 

Tutorial Summary

So now that we've come to the end of the tutorial let's summarise what this example has demonstrated and what we've learned. The key goal was to show how to request and parse News Text Analytics data. We learned that the Initial Refresh message contains only metadata. Subsequent Update messages deliver the NTA item in one or more fragments. If delivered as a single fragment we extract the buffer and unzip to obtain the NTA item. If delivered as multiple fragments, they need to be concatenated in sequence to obtain the complete buffer, before unzipping.

Multiple NTA items

This tutorial deals with requesting a single NTA item which makes the handling of multi-fragment items relatively straightforward. When requesting multiple NTA items you need to allow for the Update messages for the different items arriving interspersed between each other. In view of this the GUID and Source should be used to confirm a fragment is part of the same NTA item. As Update messages for multi-fragment messages arrive you would need to ensure the additional fragments are added to the correct incomplete item.

One final thing to note that the AppClient::unzip method - as with the rest of the source code - is provided for illustration purposes only and therefore should not form the basis of a production application. 

Thank you for reading.

References

For more information on the NTA domain and MRN Data model, please take a look at the following notes:
MRN Data Model and Real-Time Implementation Guide - Summary of NTA domain, MRN data model, sample output, and implementation commentary.