ETA Tutorial 6 - Request & Decode Machine Readable News

Download tutorial source code

Click here to download

Last update Dec 2020
Environment Windows, Linux
Compilers JDK 1.7 or greater
Prerequisite Complete the first 4 ETA tutorials in this series

Introduction

This tutorial assumes you are familiar with consuming MarketPrice data from RTDS using ETA Java and as a minimum you have worked through the ETA Java Tutorial on the LSEG Developer portal. If you have not worked through the ETA Java Tutorial or are not familiar with the concepts covered, it is strongly recommended that you complete the tutorial first.

For this MRN tutorial we will use the source code from the Step 4 of ETA Java Tutorial as the starting point. We will amend / add the additional required code to illustrate how to Request and Decode News Text Analytics (NTA) domain from a Machine Readable News (MRN) service.

The News Text Analytics domain is designed for publishing large complex nested data structures over the Real-Time network using a FieldList-based Envelope. 

Whilst RTDS can deliver large payloads, to ensure optimized transport of NTA data items (which can extend to several kilobytes), NTA items may be fragmented and delivered as multiple envelope messages. Each envelope message has several fields to hold metadata and one field to hold the actual data (fragment) itself. To further optimise the message transportation, the NTA data item is converted to a JSON UTF-8 string, compressed using zlib prior to splitting into fragments (if required) as part of the message encoding process. The consumer needs to re-assemble the fragments and unzip the completed buffer before it can be used. 

In view of the above, processing a response to NTA request is more involved than a typical MarketPrice response.

Description

The News Text Analytics Domain Model describes how the NTA data items are modelled using OMM. It makes use of the FieldList OMM container for the metadata and the zlib library to decompress the compressed NTA fragments.

Initial Refresh Message

The Initial Refresh response does not contain any NTA data – but the relevant feed related or other static Fields are populated with appropriate values. e.g., a MRN_STORY Refresh could contain: 

    	
            

MRN_STORY

DOMAIN: NEWS_TEXT_ANALYTICS

REFRESH: State: Open/Ok/None - text: "All is well"

    1/PROD_PERM: 10001

    17/ACTIV_DATE: 17 FEB 2016 

    259/RECORDTYPE: 30

    1709/RDN_EXCHD2: MRN(1370)

    4148/TIMACT_MS: 73649763

    4271/GUID: <blank data>

    5357/CONTEXT_ID: 3752.0

    6401/DDS_DSO_ID: 12424

    6480/SPS_SP_RIC: .[SPSML1L1

    8506/MRN_V_MAJ: 2

    8593/MRN_TYPE: STORY

    11787/MRN_V_MIN: 10

    12215/MRN_SRC: DTC_STG_A

    32479/FRAG_NUM: 0

    32480/TOT_SIZE: 0

    32641/FRAGMENT: <blank data>

As you can see, the FRAGMENT field which would contain the NTA data fragments is empty – and the GUID, FRAG_NUM and TOT_SIZE fields are zero or empty.

Update Messages

However, in the subsequent Update messages, these fields will be populated - but none of the feed related / static fields (contained in the Refresh response) will be, for example: 

    	
            

DOMAIN: NEWS_TEXT_ANALYTICS

UPDATE

    4148/TIMACT_MS: 32525441

    17/ACTIV_DATE: 18 FEB 2016 

    8593/MRN_TYPE: STORY

    8506/MRN_V_MAJ: 2

    11787/MRN_V_MIN: 10

    32480/TOT_SIZE: 779

    32479/FRAG_NUM: 1

    4271/GUID: HKSc2KHDD_1602182Wz6hiVtw1GMHCJkN6bVymxuHCVOUEDF8aAfzr

    12215/MRN_SRC: DTC_STG_A

    32641/FRAGMENT: <BUFFER>

Some of the key Fields to note: 

  • MRN_TYPE : Type of NTA item - Story, News Analytics, News Sentiment Indices
  • TOT_SIZE : Total size in bytes of fragmented data
  • FRAG_NUM : Sequential fragment number
  • GUID : Globally Unique Identifier for the NTA Item
  • MRN_SRC : Source component that published this NTA Item
  • FRAGMENT : zlib compressed data fragment

Multi Fragment Items

The other point to note is that (for a Multi fragment item), Update messages with FRAG_NUM >1 will have fewer FIDs as the metadata is included in the first Update message (FRAG_NUM=1) for that item as shown below:

    	
            

DOMAIN: NEWS_TEXT_ANALYTICS

UPDATE

    4148/TIMACT_MS: 32585844

    17/ACTIV_DATE: 18 FEB 2016 

    8593/MRN_TYPE: STORY

    8506/MRN_V_MAJ: 2

    11787/MRN_V_MIN: 10

    32480/TOT_SIZE: 3205

    32479/FRAG_NUM: 1

    4271/GUID: DJBR0067D_1602183AopLN6JH2bGrv4OyamJ3p8OgYyUXZeOj78lU4

    12215/MRN_SRC: DTC_STG_A

    32641/FRAGMENT: <BUFFER>

...

...

DOMAIN: NEWS_TEXT_ANALYTICS

UPDATE

    4271/GUID: DJBR0067D_1602183AopLN6JH2bGrv4OyamJ3p8OgYyUXZeOj78lU4

    12215/MRN_SRC: DTC_STG_A

    32479/FRAG_NUM: 2

    32641/FRAGMENT: <BUFFER>

In the above example you can see that only the essential fields are repeated i.e. the unique identifier, source name, fragment number and off course the fragment payload. We will use these fields later when re-assembling fragments of a multi fragment NTA item to ensure that we are appending fragments to the correct NTA item.   

Decoding & Re-assembly Overview

As well as using standard RTDS Status & State indicators to identify connectivity or data issues, the FRAG_NUM and TOT_SIZE fields should be used to detect missing fragments.

The FRAG_NUM FID is set to 1 for the first Update of each item and is incremented in each subsequent Update for that item. This allows you to detect a missing fragment (and ensure correct order of the fragments for re-assembly). 

Additionally, the TOT_SIZE contains the total size of the complete set of fragmented data in bytes. By comparing TOT_SIZE with the sum of the fragment sizes received, it should allow you to confirm when all the fragments for an item have been received and the NTA item is complete.

Using the FRAG_NUM and TOT_SIZE to detect outages requires the consumer to implement timeout functionality, because we need to allow time for the missing fragments to reach the consumer before assuming they have been missed. This functionality is outside the scope of this tutorial.

Finally, as the FRAGMENT field contains compressed data, we will need to use the Java built-in classes to decompress the payload to access the true data content. Note that for a multi-fragment NTA item, you will have to ensure you have received all the fragments before unzipping the complete multi-fragment buffer.

To request NTA domain items and decode & re-assemble the NTA item we will modify the Step 4 of ETA Java Tutorial example application as follows:

  1. Explicitly specify NEWS_TEXT_ANALYTICS as the domain model type when creating our request message.
  2. Specify “MRN_STORY” as the RIC Code for the request message.
  3. Modify the processRDMResponse handler to only accept messages with a NEWS_TEXT_ANALYTICS domain type.
  4. Create a new class to represent a News Text Analytics item with some helper methods to assemble multi fragment items.
  5. Process and decode the FieldList payload of NEWS_TEXT_ANALYTICS message. 
  6. Decompress the complete fragment NTA item using Java built-in classes and display news.

NewsTextAnalytics Msg Model Type Request & Response Handlers

To request a News Text Analytics domain model MRN_STORY item we need to change the domainType variable to specify the domain and itemNames variable to specify the item name when creating our request message:

    	
            

// Item name    

    private static final List<String> itemNames = Arrays.asList("MRN_STORY");

// Domain Type

    private static final int domainType = DomainTypes.NEWS_TEXT_ANALYTICS;

...

Next we modify the basicMsgHandler.processRDMResponse method to only accept the NEWS_TEXT_ANALYTICS as a valid domain type:

    	
            

// RefreshMsg contains feed specific metadata - will not contain NTA Item fragment

int processRDMResponse(Msg msg, DecodeIterator dIter, DataDictionary dictionary)

{

       ...

// Confirm domain type

if((msg.domainType()!=DomainTypes.NEWS_TEXT_ANALYTICS)) {

output.append("Wrong Domain:" + msg.domainType());

return retval;

}

switch (msg.msgClass())

{ //Confirm payload and dump the metadata Fields to the console

case MsgClasses.REFRESH:

switch (msg.containerType()) 

{

case DataTypes.FIELD_LIST:

retval = decodeFieldList(msg, dIter, dictionary);

break;

default:

output.append("Unable to process RDM msg with container type: " + msg.containerType());

break;

}

break;

case MsgClasses.UPDATE:

// Confirm payload and dump the metadata Fields to the console

switch (msg.containerType())

{

case DataTypes.FIELD_LIST:

retval = decodeNewsTextAnalytics(msg,dIter,dictionary);

break;

default:

output.append("Unable to process RDM msg with container type: " + msg.containerType());

break;

}

break;

...

As mentioned earlier, the Refresh message does not contain any NTA Data fragments so we can use the existing basicMsgHandler.decodeFieldList() method to output the meta data fields to the console. For the Update message however, we will need to use a custom decodeNewsTextAnalytics() method which we will define later on.

In addition to the FRAGMENT field, a number of other fields are used to deliver the metadata for the NTA item. So, we need methods to extract the metadata and to re-assemble the data fragment(s). 

To ease readability and simplify the basicMsgHandler methods we can create a new Class to represent the NTA item with some methods to help with the decoding and re-assembly of the NTA Data fragments.

The NewsTextAnalyticsItem Class

The key functions of the Class simplify the decoding process and the re-assembly of multi-fragment messages.

    	
            

class NewsTextAnalyticsItem 

{

int fragNum, expectedSize, bufferSize;

String guid, source, type;

final static int GUID_FID=     4271;//RMTES_STRING

final static int MRN_TYPE_FID= 8593;//RMTES_STRING

final static int MRN_SRC_FID =12215;//RMTES_STRING

final static int FRAG_NUM_FID=32479;//UNIT

final static int TOT_SIZE_FID=32480;//UNIT

final static int FRAGMENT_FID=32641;//BUFFER

 

// Reusable variables used for decoding

private FieldList fieldList = CodecFactory.createFieldList();

private FieldEntry fieldEntry = CodecFactory.createFieldEntry();

private UInt fidUIntValue = CodecFactory.createUInt();

private Int fidIntValue = CodecFactory.createInt();

private Real fidRealValue = CodecFactory.createReal();

private Enum fidEnumValue = CodecFactory.createEnum();

private Date fidDateValue = CodecFactory.createDate();

private Time fidTimeValue = CodecFactory.createTime();

private DateTime fidDateTimeValue = CodecFactory.createDateTime();

private Float fidFloatValue = CodecFactory.createFloat();

private Double fidDoubleValue = CodecFactory.createDouble();

private Buffer fidBufferValue = CodecFactory.createBuffer();

private byte[] buffer;

 

// Buffer to hold output

StringBuilder output = new StringBuilder();

...

// Have we received all the fragments i.e. TOT_SIZE number of bytes ?

public boolean isComplete() {

return ((expectedSize > 0) && (expectedSize == bufferSize));

}

Firstly we define the method used to initialise the NewsTextAnalyticsItem instance and extract the fields from the envelope

    	
            

// Initialise instance with Update field list

protected int decodeFieldEntry(FieldEntry fEntry, DecodeIterator dIter, DataDictionary dictionary) 

{

// get dictionary entry

DictionaryEntry dictionaryEntry = dictionary.entry(fEntry.fieldId());

// return if no entry found

if (dictionaryEntry == null)

{

output.append("\tFid " + fEntry.fieldId() + " not found in dictionary");

return CodecReturnCodes.SUCCESS;

}

// print out fid name

output.append("\t" + fEntry.fieldId() + "/" + dictionaryEntry.acronym().toString() + ": ");

// decode and print out fid value

int dataType = dictionaryEntry.rwfType();

int ret = 0;

switch (dataType)

{

case DataTypes.UINT:

ret = fidUIntValue.decode(dIter);

if (ret == CodecReturnCodes.SUCCESS)

{

output.append(fidUIntValue.toLong());

switch(fEntry.fieldId()) {

case FRAG_NUM_FID:// Fragment Number

fragNum = Integer.parseInt(fidUIntValue.toString());

break;

case TOT_SIZE_FID:// Expected total size of all fragments

expectedSize = Integer.parseInt(fidUIntValue.toString());

break;

}

}

else if (ret != CodecReturnCodes.BLANK_DATA)

{

output.insert(0, "DecodeUInt() failed: <" + CodecReturnCodes.toString(ret) + ">\n");

return ret;

}

break;

...

case DataTypes.BUFFER:

ret = fidBufferValue.decode(dIter);

if (ret == CodecReturnCodes.SUCCESS)

{

if(fEntry.fieldId()==FRAGMENT_FID) 

{// The zipped Fragment buffer

//Write "<BUFFER>" on the console

output.append("<BUFFER>");

Buffer tmpBuffer = null;

tmpBuffer = fEntry.encodedData();

//if it is the first Fragment keeping the complete news,

//its length should be the TOT_SIZE kept in expectedSize variable

if(fragNum==1)

buffer = new byte[expectedSize];

//otherwise, it keeps a fragment.

else

buffer = new byte[tmpBuffer.length()];

tmpBuffer.copy(buffer);

//update the size

bufferSize +=  tmpBuffer.length();

}

}

else if (ret != CodecReturnCodes.BLANK_DATA)

{

output.append("DecodeString() failed: <" + CodecReturnCodes.toString(ret) + ">");

return ret;

}

break;

case DataTypes.ASCII_STRING:

case DataTypes.UTF8_STRING:

case DataTypes.RMTES_STRING:

if (fEntry.encodedData().length() > 0)

{

output.append(fEntry.encodedData().toString());

switch(fEntry.fieldId()) 

{ //Get the Field ID for this field

case GUID_FID:// The Unique (source specific) ID for the NTA Item

guid=fEntry.encodedData().toString();

break;

case MRN_TYPE_FID:// The type of NTA Item e.g Story, Analytics etc

type=fEntry.encodedData().toString();

break;

case MRN_SRC_FID:// The component which published the data

source=fEntry.encodedData().toString();

break;

}

}

else

{

ret = CodecReturnCodes.BLANK_DATA;

}

break;

default:

output.append("Unsupported data type (" + DataTypes.toString(dataType) + ")");

break;

}

if (ret == CodecReturnCodes.BLANK_DATA)

{

output.append("<blank data>");

}

return CodecReturnCodes.SUCCESS;

}

For multi-fragment messages we need a method to check the validity of the received fragment and to build up the fragment buffer

    	
            

// Append an additional fragments to this instance

public int addFragment(NewsTextAnalyticsItem fragment) 

{

int ret = CodecReturnCodes.SUCCESS;

if(!fragment.guid.equals(guid)) {

output.append("Error during processing news : \nCannot add fragment to news guid=" + guid+ ". Expected Guid:" + guid + " , Received: " + fragment.guid +"\n");

return ret;

}

if(!fragment.source.equals(source)) {

output.append("Error during processing news : \nCannot add fragment to news guid=" + guid+ ". Expected Source:" + source + " , Received: " + fragment.source +"\n");

return ret;

}

if(fragment.fragNum != fragNum + 1) {

output.append("Error during processing news : \nCannot add fragment to news guid=" + guid+ ". Expected Fragment No.:" + (fragNum + 1) + " , Received: " + fragment.fragNum +"\n");

return ret;

}

fragNum = fragment.fragNum;

System.arraycopy(fragment.getBuffer(), 0, buffer, bufferSize, fragment.getBufferSize());

bufferSize += fragment.getBufferSize();

output.append("Addtional fragment size: " + fragment.getBufferSize()+"\n");

output.append("Add Fragment:: Expected total buffer:" + expectedSize + " current " + bufferSize + "\n");  

return ret;

}

Decode Update Message Envelope

With the NewsTextAnalyticsItem class now defined, we can continue with the decoding of the Update message to extract the fields, build up the NTA item and output to console when complete:

    	
            

//decode UpdateMsg containing NTA Item fragment and item related metadata

private int decodeNewsTextAnalytics(Msg msg, DecodeIterator dIter, DataDictionary dictionary)  

{

int ret = CodecReturnCodes.SUCCESS;

NewsTextAnalyticsItem item = new NewsTextAnalyticsItem();

// Dump the FieldList and extract the relevant fields into a NTA item - including the Fragment

ret = item.decodeFieldList(msg, dIter, dictionary);

System.out.println(item.getOutputString());

// Is this 1st fragment of a NTA Item?

if(item.getFragNum() == 1)

{

// Is it the complete NTA Item?

if(item.isComplete()) // i.e. check length of this single fragment buffer == expected total size of NTA Item

{

output.append("<<Single Fragment item>>:\n");

writeNewsToScreen(item); // decompress and display NTA item to screen

}

else {// Start of a multi-fragment item

multiFragItem = item; // so copy this first fragment into our pending item instance

output.append("Init:: Expected total buffer size:").append(multiFragItem.getExpectedSize()).append(" current size:").append(multiFragItem.getBufferSize());

}

}

else

{ // FRAG_NUM > 1 so continue to build up NTA Item

if(multiFragItem==null) //miss first fragment

{

output.append("Error during processing news : \nMissing the first Fragment of news guid=" + item.getGuid() + "\n");

} else {

// Add the newly received fragment to the pending instance

ret = multiFragItem.addFragment(item);

System.out.println(multiFragItem.getOutputString());

// Does length of the buffer now == expected total size of NTA Item ?

if(multiFragItem.isComplete())

{

output.append("<<Multi Fragment complete>>:\n");

writeNewsToScreen(multiFragItem); // decompress and display complete NTA item to screen

multiFragItem = null;

}

}

}

return ret;

}

As you can see, if the NTA item is completed in a single fragment, we can immediately unzip and output to the screen. However, if this is the first of a multi-fragment message, we initialize our multi-fragment storage item with this initial FieldList. 

Unzip and output to screen

When we receive further fragments we add them to the multi-fragment item until all fragments has been received and once all the fragments have been received we can unzip the full buffer and output to the screen.

    	
            

//uncompress a complete NTA item

private int uncompressData(byte[] compressedData,int length)

{

int ret = CodecReturnCodes.SUCCESS;

try

{

java.io.ByteArrayInputStream bytein = new java.io.ByteArrayInputStream(compressedData);

java.util.zip.GZIPInputStream gzin = new java.util.zip.GZIPInputStream(bytein);

java.io.ByteArrayOutputStream byteout = new java.io.ByteArrayOutputStream();

int res = 0;

byte buf[] = new byte[length];

while (res >= 0) {

res = gzin.read(buf, 0, buf.length);

if (res > 0) {

byteout.write(buf, 0, res);

}

}

byte uncompressed[] = byteout.toByteArray();

newsContent = new String(uncompressed);

}

catch (Exception e)

{

//if failed, record the exception and return FAILURE code.

output.append("Decompress Failed : " + e.getMessage() + "\n");

ret = CodecReturnCodes.FAILURE;

}

return ret;

}

// uncompress and display complete NTA item to screen

private void writeNewsToScreen(NewsTextAnalyticsItem item) {

output.append("New of Guid="+item.getGuid()+ ", source="+ item.getSource()+ ", type=" + item.getType() + "\n\n");

int ret =uncompressData (item.getBuffer(),(int)item.getExpectedSize());

//if NTA item is uncompressed successfully, display it.

if(ret == CodecReturnCodes.SUCCESS)

output.append(newsContent);

}

The NTA Item output is in the form of a JSON UTF-8 string.

Build and Run

Before building and running this tutorial, you need to set up the environment variables, please refer to the Establish an ETA Java build environment section within the ETA Tutorial 1 - Creating a starter consumer application.

As the connectivity parameters, host, port, service name, etc are hardcoded into the basicConsumer class, you will need to amend these to reflect your environment before running the application. Additionally, the tutorial tries to load dictionaries from local files which are hardcoded into the basicDictionaryHandler class. If it fails, it will attempt to download dictionaries from the provider. To use your local dictionary file, you will need to amend basicDictionaryHandler class accordingly. The RIC code has been hardcoded to request MRN_STORY items – this can be changed to request alternative items such as MRN_TRNA, MRN_TRSI. After you amend any classes, to take effect of the amending, please refer to the Build and run section for instructions to successfully execute this tutorial.

Eg (Windows):

> buildConsumer 6
> runConsumer 6

Running the tutorial will display the response of the requesting item e.g. MRN_STORY like an example below:

    	
            

<<Single Fragment item>>:

New of Guid=DJBX004DC_1602242hH9qU5iRAgvdfFvHXRVIIxISc95EIR0F+nn8X, source=DTC_S

TG_A, type=STORY

 

{"altId":"nDJBX004DC","audiences":["NP:MFDJ"],"body":"DJ MARKET TALK VALUTE: eur

o/usd su minimi da 3 settimane dopo fiducia Francia\r\n \r\n     MILANO (MF-DJ)-

-Il cambio euro/usd tratta intorno a 1,0995, sui minimi da \ntre settimane, dopo

 la pubblicazione della fiducia dei consumatori francesi che \nha deluso le atte

se. Il dato ?? infatti calato \r\n     a 95 punti a febbraio dai 97 di gennaio e

 del consenso. \r\n     alb \r\n     alberto.chimenti@mfdowjones.it \r\n  \r\n

   (END) Dow Jones Newswires\r\n \r\n ","firstCreated":"2016-02-24T08:15:49.000Z

","headline":"DJ MARKET TALK VALUTE: euro/usd su minimi da 3 settimane dopo fidu

cia Francia","id":"DJBX004DC_1602242hH9qU5iRAgvdfFvHXRVIIxISc95EIR0F+nn8X","inst

ancesOf":[],"language":"it","mimeType":"text/plain","provider":"NS:DJN","pubStat

us":"stat:usable","subjects":["A:1","A:9","N2:FRX","N2:STX"],"takeSequence":1,"u

rgency":3,"versionCreated":"2016-02-24T08:15:49.000Z"}

Note: You may occasionally see what looks like garbage output in the completed NTA console output; this is due to foreign language characters which cannot be represented correctly on the console. Displaying this using the appropriate character set is outside the scope of this tutorial. 

Tutorial Summary

So now that we've come to the end of the tutorial let's summarise what this example has demonstrated and what we've learned. The key goal was to show how to request and parse News Text Analytics data. We learned that the Initial Refresh message contains only metadata. Subsequent Update messages deliver the NTA item in one or more fragments. If delivered as a single fragment we extract the buffer and unzip to obtain the NTA Item. If delivered as multiple fragments, they need to be concatenated in sequence to obtain the complete buffer, before unzipping.

Multiple NTA items

This tutorial deals with requesting a single NTA Item which makes the handling of multi-fragment items relatively straightforward. When requesting multiple NTA Items you need to allow for the Update messages for the different items arriving interspersed between each other. In view of this, the GUID and Source can be used to confirm a fragment is part of the same NTA Item. As Update messages for multi-fragment messages arrive you would need to ensure the additional fragments are added to the correct incomplete item.

One final thing to note is that this source code is provided for illustration purposes only and is not warranted. 

Thank you for reading.

References

For more information on the NTA domain and MRN Data model, please take a look at the following notes:

MRN Data Model Implementation Guide - Summary of NTA domain, MRN data model, sample output, and implementation commentary.