Download tutorial source code |
Click here to download |
---|---|
Last update | Dec 2020 |
Environment | Windows, Linux |
Compilers | JDK 1.7 or greater |
Prerequisite | Complete the first 4 ETA tutorials in this series |
This tutorial assumes you are familiar with consuming MarketPrice data from RTDS using ETA Java and as a minimum you have worked through the ETA Java Tutorial on the LSEG Developer portal. If you have not worked through the ETA Java Tutorial or are not familiar with the concepts covered, it is strongly recommended that you complete the tutorial first.
For this MRN tutorial we will use the source code from the Step 4 of ETA Java Tutorial as the starting point. We will amend / add the additional required code to illustrate how to Request and Decode News Text Analytics (NTA) domain from a Machine Readable News (MRN) service.
The News Text Analytics domain is designed for publishing large complex nested data structures over the Real-Time network using a FieldList-based Envelope.
Whilst RTDS can deliver large payloads, to ensure optimized transport of NTA data items (which can extend to several kilobytes), NTA items may be fragmented and delivered as multiple envelope messages. Each envelope message has several fields to hold metadata and one field to hold the actual data (fragment) itself. To further optimise the message transportation, the NTA data item is converted to a JSON UTF-8 string, compressed using zlib prior to splitting into fragments (if required) as part of the message encoding process. The consumer needs to re-assemble the fragments and unzip the completed buffer before it can be used.
In view of the above, processing a response to NTA request is more involved than a typical MarketPrice response.
The News Text Analytics Domain Model describes how the NTA data items are modelled using OMM. It makes use of the FieldList OMM container for the metadata and the zlib library to decompress the compressed NTA fragments.
The Initial Refresh response does not contain any NTA data – but the relevant feed related or other static Fields are populated with appropriate values. e.g., a MRN_STORY Refresh could contain:
MRN_STORY
DOMAIN: NEWS_TEXT_ANALYTICS
REFRESH: State: Open/Ok/None - text: "All is well"
1/PROD_PERM: 10001
17/ACTIV_DATE: 17 FEB 2016
259/RECORDTYPE: 30
1709/RDN_EXCHD2: MRN(1370)
4148/TIMACT_MS: 73649763
4271/GUID: <blank data>
5357/CONTEXT_ID: 3752.0
6401/DDS_DSO_ID: 12424
6480/SPS_SP_RIC: .[SPSML1L1
8506/MRN_V_MAJ: 2
8593/MRN_TYPE: STORY
11787/MRN_V_MIN: 10
12215/MRN_SRC: DTC_STG_A
32479/FRAG_NUM: 0
32480/TOT_SIZE: 0
32641/FRAGMENT: <blank data>
As you can see, the FRAGMENT field which would contain the NTA data fragments is empty – and the GUID, FRAG_NUM and TOT_SIZE fields are zero or empty.
However, in the subsequent Update messages, these fields will be populated - but none of the feed related / static fields (contained in the Refresh response) will be, for example:
DOMAIN: NEWS_TEXT_ANALYTICS
UPDATE
4148/TIMACT_MS: 32525441
17/ACTIV_DATE: 18 FEB 2016
8593/MRN_TYPE: STORY
8506/MRN_V_MAJ: 2
11787/MRN_V_MIN: 10
32480/TOT_SIZE: 779
32479/FRAG_NUM: 1
4271/GUID: HKSc2KHDD_1602182Wz6hiVtw1GMHCJkN6bVymxuHCVOUEDF8aAfzr
12215/MRN_SRC: DTC_STG_A
32641/FRAGMENT: <BUFFER>
Some of the key Fields to note:
The other point to note is that (for a Multi fragment item), Update messages with FRAG_NUM >1 will have fewer FIDs as the metadata is included in the first Update message (FRAG_NUM=1) for that item as shown below:
DOMAIN: NEWS_TEXT_ANALYTICS
UPDATE
4148/TIMACT_MS: 32585844
17/ACTIV_DATE: 18 FEB 2016
8593/MRN_TYPE: STORY
8506/MRN_V_MAJ: 2
11787/MRN_V_MIN: 10
32480/TOT_SIZE: 3205
32479/FRAG_NUM: 1
4271/GUID: DJBR0067D_1602183AopLN6JH2bGrv4OyamJ3p8OgYyUXZeOj78lU4
12215/MRN_SRC: DTC_STG_A
32641/FRAGMENT: <BUFFER>
...
...
DOMAIN: NEWS_TEXT_ANALYTICS
UPDATE
4271/GUID: DJBR0067D_1602183AopLN6JH2bGrv4OyamJ3p8OgYyUXZeOj78lU4
12215/MRN_SRC: DTC_STG_A
32479/FRAG_NUM: 2
32641/FRAGMENT: <BUFFER>
In the above example you can see that only the essential fields are repeated i.e. the unique identifier, source name, fragment number and off course the fragment payload. We will use these fields later when re-assembling fragments of a multi fragment NTA item to ensure that we are appending fragments to the correct NTA item.
As well as using standard RTDS Status & State indicators to identify connectivity or data issues, the FRAG_NUM and TOT_SIZE fields should be used to detect missing fragments.
The FRAG_NUM FID is set to 1 for the first Update of each item and is incremented in each subsequent Update for that item. This allows you to detect a missing fragment (and ensure correct order of the fragments for re-assembly).
Additionally, the TOT_SIZE contains the total size of the complete set of fragmented data in bytes. By comparing TOT_SIZE with the sum of the fragment sizes received, it should allow you to confirm when all the fragments for an item have been received and the NTA item is complete.
Using the FRAG_NUM and TOT_SIZE to detect outages requires the consumer to implement timeout functionality, because we need to allow time for the missing fragments to reach the consumer before assuming they have been missed. This functionality is outside the scope of this tutorial.
Finally, as the FRAGMENT field contains compressed data, we will need to use the Java built-in classes to decompress the payload to access the true data content. Note that for a multi-fragment NTA item, you will have to ensure you have received all the fragments before unzipping the complete multi-fragment buffer.
To request NTA domain items and decode & re-assemble the NTA item we will modify the Step 4 of ETA Java Tutorial example application as follows:
To request a News Text Analytics domain model MRN_STORY item we need to change the domainType variable to specify the domain and itemNames variable to specify the item name when creating our request message:
// Item name
private static final List<String> itemNames = Arrays.asList("MRN_STORY");
// Domain Type
private static final int domainType = DomainTypes.NEWS_TEXT_ANALYTICS;
...
Next we modify the basicMsgHandler.processRDMResponse method to only accept the NEWS_TEXT_ANALYTICS as a valid domain type:
// RefreshMsg contains feed specific metadata - will not contain NTA Item fragment
int processRDMResponse(Msg msg, DecodeIterator dIter, DataDictionary dictionary)
{
...
// Confirm domain type
if((msg.domainType()!=DomainTypes.NEWS_TEXT_ANALYTICS)) {
output.append("Wrong Domain:" + msg.domainType());
return retval;
}
switch (msg.msgClass())
{ //Confirm payload and dump the metadata Fields to the console
case MsgClasses.REFRESH:
switch (msg.containerType())
{
case DataTypes.FIELD_LIST:
retval = decodeFieldList(msg, dIter, dictionary);
break;
default:
output.append("Unable to process RDM msg with container type: " + msg.containerType());
break;
}
break;
case MsgClasses.UPDATE:
// Confirm payload and dump the metadata Fields to the console
switch (msg.containerType())
{
case DataTypes.FIELD_LIST:
retval = decodeNewsTextAnalytics(msg,dIter,dictionary);
break;
default:
output.append("Unable to process RDM msg with container type: " + msg.containerType());
break;
}
break;
...
As mentioned earlier, the Refresh message does not contain any NTA Data fragments so we can use the existing basicMsgHandler.decodeFieldList() method to output the meta data fields to the console. For the Update message however, we will need to use a custom decodeNewsTextAnalytics() method which we will define later on.
In addition to the FRAGMENT field, a number of other fields are used to deliver the metadata for the NTA item. So, we need methods to extract the metadata and to re-assemble the data fragment(s).
To ease readability and simplify the basicMsgHandler methods we can create a new Class to represent the NTA item with some methods to help with the decoding and re-assembly of the NTA Data fragments.
The key functions of the Class simplify the decoding process and the re-assembly of multi-fragment messages.
class NewsTextAnalyticsItem
{
int fragNum, expectedSize, bufferSize;
String guid, source, type;
final static int GUID_FID= 4271;//RMTES_STRING
final static int MRN_TYPE_FID= 8593;//RMTES_STRING
final static int MRN_SRC_FID =12215;//RMTES_STRING
final static int FRAG_NUM_FID=32479;//UNIT
final static int TOT_SIZE_FID=32480;//UNIT
final static int FRAGMENT_FID=32641;//BUFFER
// Reusable variables used for decoding
private FieldList fieldList = CodecFactory.createFieldList();
private FieldEntry fieldEntry = CodecFactory.createFieldEntry();
private UInt fidUIntValue = CodecFactory.createUInt();
private Int fidIntValue = CodecFactory.createInt();
private Real fidRealValue = CodecFactory.createReal();
private Enum fidEnumValue = CodecFactory.createEnum();
private Date fidDateValue = CodecFactory.createDate();
private Time fidTimeValue = CodecFactory.createTime();
private DateTime fidDateTimeValue = CodecFactory.createDateTime();
private Float fidFloatValue = CodecFactory.createFloat();
private Double fidDoubleValue = CodecFactory.createDouble();
private Buffer fidBufferValue = CodecFactory.createBuffer();
private byte[] buffer;
// Buffer to hold output
StringBuilder output = new StringBuilder();
...
// Have we received all the fragments i.e. TOT_SIZE number of bytes ?
public boolean isComplete() {
return ((expectedSize > 0) && (expectedSize == bufferSize));
}
Firstly we define the method used to initialise the NewsTextAnalyticsItem instance and extract the fields from the envelope
// Initialise instance with Update field list
protected int decodeFieldEntry(FieldEntry fEntry, DecodeIterator dIter, DataDictionary dictionary)
{
// get dictionary entry
DictionaryEntry dictionaryEntry = dictionary.entry(fEntry.fieldId());
// return if no entry found
if (dictionaryEntry == null)
{
output.append("\tFid " + fEntry.fieldId() + " not found in dictionary");
return CodecReturnCodes.SUCCESS;
}
// print out fid name
output.append("\t" + fEntry.fieldId() + "/" + dictionaryEntry.acronym().toString() + ": ");
// decode and print out fid value
int dataType = dictionaryEntry.rwfType();
int ret = 0;
switch (dataType)
{
case DataTypes.UINT:
ret = fidUIntValue.decode(dIter);
if (ret == CodecReturnCodes.SUCCESS)
{
output.append(fidUIntValue.toLong());
switch(fEntry.fieldId()) {
case FRAG_NUM_FID:// Fragment Number
fragNum = Integer.parseInt(fidUIntValue.toString());
break;
case TOT_SIZE_FID:// Expected total size of all fragments
expectedSize = Integer.parseInt(fidUIntValue.toString());
break;
}
}
else if (ret != CodecReturnCodes.BLANK_DATA)
{
output.insert(0, "DecodeUInt() failed: <" + CodecReturnCodes.toString(ret) + ">\n");
return ret;
}
break;
...
case DataTypes.BUFFER:
ret = fidBufferValue.decode(dIter);
if (ret == CodecReturnCodes.SUCCESS)
{
if(fEntry.fieldId()==FRAGMENT_FID)
{// The zipped Fragment buffer
//Write "<BUFFER>" on the console
output.append("<BUFFER>");
Buffer tmpBuffer = null;
tmpBuffer = fEntry.encodedData();
//if it is the first Fragment keeping the complete news,
//its length should be the TOT_SIZE kept in expectedSize variable
if(fragNum==1)
buffer = new byte[expectedSize];
//otherwise, it keeps a fragment.
else
buffer = new byte[tmpBuffer.length()];
tmpBuffer.copy(buffer);
//update the size
bufferSize += tmpBuffer.length();
}
}
else if (ret != CodecReturnCodes.BLANK_DATA)
{
output.append("DecodeString() failed: <" + CodecReturnCodes.toString(ret) + ">");
return ret;
}
break;
case DataTypes.ASCII_STRING:
case DataTypes.UTF8_STRING:
case DataTypes.RMTES_STRING:
if (fEntry.encodedData().length() > 0)
{
output.append(fEntry.encodedData().toString());
switch(fEntry.fieldId())
{ //Get the Field ID for this field
case GUID_FID:// The Unique (source specific) ID for the NTA Item
guid=fEntry.encodedData().toString();
break;
case MRN_TYPE_FID:// The type of NTA Item e.g Story, Analytics etc
type=fEntry.encodedData().toString();
break;
case MRN_SRC_FID:// The component which published the data
source=fEntry.encodedData().toString();
break;
}
}
else
{
ret = CodecReturnCodes.BLANK_DATA;
}
break;
default:
output.append("Unsupported data type (" + DataTypes.toString(dataType) + ")");
break;
}
if (ret == CodecReturnCodes.BLANK_DATA)
{
output.append("<blank data>");
}
return CodecReturnCodes.SUCCESS;
}
For multi-fragment messages we need a method to check the validity of the received fragment and to build up the fragment buffer
// Append an additional fragments to this instance
public int addFragment(NewsTextAnalyticsItem fragment)
{
int ret = CodecReturnCodes.SUCCESS;
if(!fragment.guid.equals(guid)) {
output.append("Error during processing news : \nCannot add fragment to news guid=" + guid+ ". Expected Guid:" + guid + " , Received: " + fragment.guid +"\n");
return ret;
}
if(!fragment.source.equals(source)) {
output.append("Error during processing news : \nCannot add fragment to news guid=" + guid+ ". Expected Source:" + source + " , Received: " + fragment.source +"\n");
return ret;
}
if(fragment.fragNum != fragNum + 1) {
output.append("Error during processing news : \nCannot add fragment to news guid=" + guid+ ". Expected Fragment No.:" + (fragNum + 1) + " , Received: " + fragment.fragNum +"\n");
return ret;
}
fragNum = fragment.fragNum;
System.arraycopy(fragment.getBuffer(), 0, buffer, bufferSize, fragment.getBufferSize());
bufferSize += fragment.getBufferSize();
output.append("Addtional fragment size: " + fragment.getBufferSize()+"\n");
output.append("Add Fragment:: Expected total buffer:" + expectedSize + " current " + bufferSize + "\n");
return ret;
}
With the NewsTextAnalyticsItem class now defined, we can continue with the decoding of the Update message to extract the fields, build up the NTA item and output to console when complete:
//decode UpdateMsg containing NTA Item fragment and item related metadata
private int decodeNewsTextAnalytics(Msg msg, DecodeIterator dIter, DataDictionary dictionary)
{
int ret = CodecReturnCodes.SUCCESS;
NewsTextAnalyticsItem item = new NewsTextAnalyticsItem();
// Dump the FieldList and extract the relevant fields into a NTA item - including the Fragment
ret = item.decodeFieldList(msg, dIter, dictionary);
System.out.println(item.getOutputString());
// Is this 1st fragment of a NTA Item?
if(item.getFragNum() == 1)
{
// Is it the complete NTA Item?
if(item.isComplete()) // i.e. check length of this single fragment buffer == expected total size of NTA Item
{
output.append("<<Single Fragment item>>:\n");
writeNewsToScreen(item); // decompress and display NTA item to screen
}
else {// Start of a multi-fragment item
multiFragItem = item; // so copy this first fragment into our pending item instance
output.append("Init:: Expected total buffer size:").append(multiFragItem.getExpectedSize()).append(" current size:").append(multiFragItem.getBufferSize());
}
}
else
{ // FRAG_NUM > 1 so continue to build up NTA Item
if(multiFragItem==null) //miss first fragment
{
output.append("Error during processing news : \nMissing the first Fragment of news guid=" + item.getGuid() + "\n");
} else {
// Add the newly received fragment to the pending instance
ret = multiFragItem.addFragment(item);
System.out.println(multiFragItem.getOutputString());
// Does length of the buffer now == expected total size of NTA Item ?
if(multiFragItem.isComplete())
{
output.append("<<Multi Fragment complete>>:\n");
writeNewsToScreen(multiFragItem); // decompress and display complete NTA item to screen
multiFragItem = null;
}
}
}
return ret;
}
As you can see, if the NTA item is completed in a single fragment, we can immediately unzip and output to the screen. However, if this is the first of a multi-fragment message, we initialize our multi-fragment storage item with this initial FieldList.
When we receive further fragments we add them to the multi-fragment item until all fragments has been received and once all the fragments have been received we can unzip the full buffer and output to the screen.
//uncompress a complete NTA item
private int uncompressData(byte[] compressedData,int length)
{
int ret = CodecReturnCodes.SUCCESS;
try
{
java.io.ByteArrayInputStream bytein = new java.io.ByteArrayInputStream(compressedData);
java.util.zip.GZIPInputStream gzin = new java.util.zip.GZIPInputStream(bytein);
java.io.ByteArrayOutputStream byteout = new java.io.ByteArrayOutputStream();
int res = 0;
byte buf[] = new byte[length];
while (res >= 0) {
res = gzin.read(buf, 0, buf.length);
if (res > 0) {
byteout.write(buf, 0, res);
}
}
byte uncompressed[] = byteout.toByteArray();
newsContent = new String(uncompressed);
}
catch (Exception e)
{
//if failed, record the exception and return FAILURE code.
output.append("Decompress Failed : " + e.getMessage() + "\n");
ret = CodecReturnCodes.FAILURE;
}
return ret;
}
// uncompress and display complete NTA item to screen
private void writeNewsToScreen(NewsTextAnalyticsItem item) {
output.append("New of Guid="+item.getGuid()+ ", source="+ item.getSource()+ ", type=" + item.getType() + "\n\n");
int ret =uncompressData (item.getBuffer(),(int)item.getExpectedSize());
//if NTA item is uncompressed successfully, display it.
if(ret == CodecReturnCodes.SUCCESS)
output.append(newsContent);
}
The NTA Item output is in the form of a JSON UTF-8 string.
Before building and running this tutorial, you need to set up the environment variables, please refer to the Establish an ETA Java build environment section within the ETA Tutorial 1 - Creating a starter consumer application.
As the connectivity parameters, host, port, service name, etc are hardcoded into the basicConsumer class, you will need to amend these to reflect your environment before running the application. Additionally, the tutorial tries to load dictionaries from local files which are hardcoded into the basicDictionaryHandler class. If it fails, it will attempt to download dictionaries from the provider. To use your local dictionary file, you will need to amend basicDictionaryHandler class accordingly. The RIC code has been hardcoded to request MRN_STORY items – this can be changed to request alternative items such as MRN_TRNA, MRN_TRSI. After you amend any classes, to take effect of the amending, please refer to the Build and run section for instructions to successfully execute this tutorial.
Eg (Windows):
> buildConsumer 6
> runConsumer 6
Running the tutorial will display the response of the requesting item e.g. MRN_STORY like an example below:
<<Single Fragment item>>:
New of Guid=DJBX004DC_1602242hH9qU5iRAgvdfFvHXRVIIxISc95EIR0F+nn8X, source=DTC_S
TG_A, type=STORY
{"altId":"nDJBX004DC","audiences":["NP:MFDJ"],"body":"DJ MARKET TALK VALUTE: eur
o/usd su minimi da 3 settimane dopo fiducia Francia\r\n \r\n MILANO (MF-DJ)-
-Il cambio euro/usd tratta intorno a 1,0995, sui minimi da \ntre settimane, dopo
la pubblicazione della fiducia dei consumatori francesi che \nha deluso le atte
se. Il dato ?? infatti calato \r\n a 95 punti a febbraio dai 97 di gennaio e
del consenso. \r\n alb \r\n alberto.chimenti@mfdowjones.it \r\n \r\n
(END) Dow Jones Newswires\r\n \r\n ","firstCreated":"2016-02-24T08:15:49.000Z
","headline":"DJ MARKET TALK VALUTE: euro/usd su minimi da 3 settimane dopo fidu
cia Francia","id":"DJBX004DC_1602242hH9qU5iRAgvdfFvHXRVIIxISc95EIR0F+nn8X","inst
ancesOf":[],"language":"it","mimeType":"text/plain","provider":"NS:DJN","pubStat
us":"stat:usable","subjects":["A:1","A:9","N2:FRX","N2:STX"],"takeSequence":1,"u
rgency":3,"versionCreated":"2016-02-24T08:15:49.000Z"}
Note: You may occasionally see what looks like garbage output in the completed NTA console output; this is due to foreign language characters which cannot be represented correctly on the console. Displaying this using the appropriate character set is outside the scope of this tutorial.
So now that we've come to the end of the tutorial let's summarise what this example has demonstrated and what we've learned. The key goal was to show how to request and parse News Text Analytics data. We learned that the Initial Refresh message contains only metadata. Subsequent Update messages deliver the NTA item in one or more fragments. If delivered as a single fragment we extract the buffer and unzip to obtain the NTA Item. If delivered as multiple fragments, they need to be concatenated in sequence to obtain the complete buffer, before unzipping.
This tutorial deals with requesting a single NTA Item which makes the handling of multi-fragment items relatively straightforward. When requesting multiple NTA Items you need to allow for the Update messages for the different items arriving interspersed between each other. In view of this, the GUID and Source can be used to confirm a fragment is part of the same NTA Item. As Update messages for multi-fragment messages arrive you would need to ensure the additional fragments are added to the correct incomplete item.
One final thing to note is that this source code is provided for illustration purposes only and is not warranted.
Thank you for reading.
For more information on the NTA domain and MRN Data model, please take a look at the following notes:
MRN Data Model Implementation Guide - Summary of NTA domain, MRN data model, sample output, and implementation commentary.