Download tutorial source code |
Click here to download |
Last updated | Dec 2020 |
Compilers | Visual Studio 2013 |
Prerequisites | Complete the first 5 ETA Consumer tutorials in this series |
This tutorial assumes you are familiar with consuming MarketPrice data using ETA C and as a minimum, you have worked through the Transport API Consumer Tutorial on the LSEG Developer portal and read the MRN Data Models and Real-Time Implementation Guide. If you have not worked through the Transport API Consumer Tutorial or are not familiar with the concepts covered, it is strongly recommended that you complete the tutorial first.
For this tutorial, we will use the source code from tutorial 5 of the Transport API Consumer Tutorial as the basis and add additional required code to illustrate how to Request and Decode News Text Analytics (MRN) domain from a Machine Readable News (MRN) service.
Machine Readable News (MRN) data is published over the Real-Time Distribution System (RTDS) using an Open Message Model (OMM) envelope in the News Text Analytics domain RSSL messages. The News Text Analytics domain is designed for publishing large complex nested data structures using a FieldList-based Envelope.
Whilst RTDS can deliver large payloads, to ensure optimized transport of MRN data items (which can extend to several kilobytes), MRN items may be fragmented and delivered as multiple envelope messages. Each envelope message has several fields to hold metadata and one field to hold the actual data (fragment) itself. To further optimize the message transportation, the MRN item is converted to a JSON UTF-8 string, compressed using Zlib prior to splitting into fragments (if required) as part of the message encoding process. The consumer needs to re-assemble the fragments and unzip the completed buffer before it can be used.
In view of the above, processing a response to MRN request is more involved than a typical MarketPrice response.
The News Text Analytics Domain Model describes how the MRN data items are modeled using OMM. It makes use of the FieldList OMM container for the metadata and the Zlib library to decompress the compressed MRN data fragments.
The Initial Refresh response does not contain any MRN data – but the relevant Fields are populated with appropriate values. e.g., an MRN_STORY Refresh could contain:
RIC: MRN_STORY
IELDLIST (StandardDataCount=16):
FieldEntry PROD_PERM (1): 10001
FieldEntry ACTIV_DATE (17): 21/10/2015
FieldEntry RECORDTYPE (259): 30
FieldEntry RDN_EXCHD2 (1709): 1370->"MRN"
FieldEntry TIMACT_MS (4148): 60413133
FieldEntry GUID (4271): <noData>
FieldEntry CONTEXT_ID (5357): 3752
FieldEntry DDS_DSO_ID (6401): 12424
FieldEntry SPS_SP_RIC (6480): ".[SPSML1L1"
FieldEntry MRN_V_MAJ (8506): "2"
FieldEntry MRN_TYPE (8593): "STORY"
FieldEntry MRN_V_MIN (11787): "10"
FieldEntry MRN_SRC (12215): "DTC_QA_A"
FieldEntry FRAG_NUM (32479): 0
FieldEntry TOT_SIZE (32480): 0
FieldEntry FRAGMENT (32641): <noData>
...
As you can see, the FRAGMENT field which would contain the MRN data fragments is empty – as are the GUID, FRAG_NUM and TOT_SIZE fields.
However, in the subsequent Update messages, these fields will be populated - but none of the feed related / static fields (contained in the Refresh response) will be, for example:
RIC: MRN_STORY
FieldEntry TIMACT_MS (4148): 37698507
FieldEntry ACTIV_DATE (17): 22/10/2015
FieldEntry MRN_TYPE (8593): "STORY"
FieldEntry MRN_V_MAJ (8506): "2"
FieldEntry MRN_V_MIN (11787): "10"
FieldEntry TOT_SIZE (32480): 1006
FieldEntry FRAG_NUM (32479): 1
FieldEntry GUID (4271): "HKS6BMNj5_1510222UAxU7Co7ts6yZmhm642yYMvKxt3AXKDp1zPwS"
FieldEntry MRN_SRC (12215): "DTC_QA_A"
FieldEntry FRAGMENT (32641): ZIPPED
...
Some of the key Fields to note:
The other point to note is that for a Multi fragment item, Update messages with FRAG_NUM >1 will have fewer FIDs as the metadata is included in the first Update message (FRAG_NUM=1) for that item as shown below:
FIELDLIST (StandardDataCount=10):
FieldEntry TIMACT_MS (4148): 37740249
FieldEntry ACTIV_DATE (17): 22/10/2015
FieldEntry MRN_TYPE (8593): "STORY"
FieldEntry MRN_V_MAJ (8506): "2"
FieldEntry MRN_V_MIN (11787): "10"
FieldEntry TOT_SIZE (32480): 2939
FieldEntry FRAG_NUM (32479): 1
FieldEntry GUID (4271): "Bw8SQFvGa_1510222GwUDOcedmj/Aoy2thyoBkTblGmxkWUZZ8be8K"
FieldEntry MRN_SRC (12215): "DTC_QA_A"
FieldEntry FRAGMENT (32641): ZIPPED
...
...
FIELDLIST (StandardDataCount=4):
FieldEntry GUID (4271): "Bw8SQFvGa_1510222GwUDOcedmj/Aoy2thyoBkTblGmxkWUZZ8be8K"
FieldEntry MRN_SRC (12215): "DTC_QA_A"
FieldEntry FRAG_NUM (32479): 2
FieldEntry FRAGMENT (32641): ZIPPED
In the above example, you can see that only the essential fields are repeated i.e. the unique identifier, source name, fragment number and of course the fragment payload. We will use these fields later when re-assembling fragments of a multi-fragment MRN item to ensure that we are appending fragments to the correct MRN item.
As well as using standard ETA Status & State indicators to identify connectivity or data issues, the FRAG_NUM and TOT_SIZE fields should be used to detect missing fragments.
The FRAG_NUM FID is set to 1 for the first update of each item and is incremented in each subsequent update for that item. This allows you to detect a missing fragment (and ensure correct order of the fragments for re-assembly).
Additionally, the TOT_SIZE contains the total size of the complete set of fragmented data in bytes. By comparing TOT_SIZE with the sum of the fragment sizes received, it should allow you to confirm when all the fragments for an item have been received and the MRN item is complete.
Using the FRAG_NUM and TOT_SIZE to detect outages requires the consumer to implement timeout functionality because we need to allow time for the missing fragments to reach the consumer before assuming they have been missed. This functionality is outside the scope of this tutorial.
Finally, as the FRAGMENT field contains compressed data, we will need to use the Zlib library to decompress the payload to access the true data content. Note that for a multi-fragment MRN item, you will have to ensure you have received all the fragments before unzipping the complete multi-fragment buffer.
To decode & re-assemble the MRN item we will create a new file, "basicMRNHandler". The file will contain a new struct to represent an MRN item, and some helper functions to decode and assemble multi-fragment items.
The key functions of the struct simplify the decoding process and the re-assembly of multi-fragment messages.
/* Machine Readable News Item */
typedef struct machineReadableNewsItem {
RsslBuffer fragmentBuffer;
char guid[GUID_SIZE];
char source[MRN_SRC_SIZE];
char type[MRN_TYPE_SIZE];
RsslUInt64 fragmentNumber;
RsslUInt64 expectedSize;
} MachineReadableNewsItem;
We also need a method to copy the struct and free the allocated fragment buffer memory
Please remember that we already know the final size of the buffer. So when we allocate the memory, we should use the expected total size.
/*
* Free allocated memory and set buffer length to 0.
* _ptr - The pointer to MachineReadableNewsItem to be freed.
*/
void mrnFree(MachineReadableNewsItem* _ptr)
{
/* Do not free() if the length is already zero */
if (_ptr->fragmentBuffer.length > 0)
free(_ptr->fragmentBuffer.data);
rsslClearBuffer(&_ptr->fragmentBuffer);
}
/*
* Copy MachineReadableNewsItem. The allocated size of Destination fragmentBuffer is equal to expectedSize of Source
* The method will call mrnFree(_Dest) to free previously allocated fragmentBuffer before reallocate
* _Dest - Pointer to the destination where the content is to be copied.
* _Source - Pointer to the source of data to be copied.
*/
void mrnCopy(MachineReadableNewsItem* _Dest, MachineReadableNewsItem* _Source)
{
mrnFree(_Dest);
_Dest->expectedSize = _Source->expectedSize;
_Dest->fragmentBuffer.data = (char*)calloc(_Source->expectedSize, sizeof(char));
memcpy(_Dest->fragmentBuffer.data, _Source->fragmentBuffer.data, _Source->fragmentBuffer.length);
_Dest->fragmentBuffer.length = _Source->fragmentBuffer.length;
_Dest->fragmentNumber = _Source->fragmentNumber;
strcpy(_Dest->guid, _Source->guid);
strcpy(_Dest->source, _Source->source);
strcpy(_Dest->type, _Source->type);
}
For multi-fragment messages, we need a method to check the validity of the received fragment and to build up the fragment buffer.
/*
* Append the fragmentBuffer of _Fragment to _Item. If GUID or Source of _Fragment is not equal to _Item,
* or _Fragment size is larger than expectedSize, it will return RSSL_RET_FAILURE.
* _Item - Pointer to the MachineReadableNewsItem, which should be large enough to contain the concatenated resulting buffer
* _Fragment - Pointer to MachineReadableNewsItem to be appended
*/
RsslRet addFragment(MachineReadableNewsItem* _Item, MachineReadableNewsItem* _Fragment)
{
if (strncmp(_Item->guid, _Fragment->guid, GUID_SIZE) != 0)
{
printf("Cannot add fragment to item: mismatching GUID\n");
return RSSL_RET_FAILURE;
}
if (strncmp(_Item->source, _Fragment->source, MRN_SRC_SIZE) != 0)
{
printf("Cannot add fragment to item: mismatching data source\n");
return RSSL_RET_FAILURE;
}
if (_Item->fragmentNumber + 1 != _Fragment->fragmentNumber)
{
printf("Cannot add fragment to item: fragment number is not in sequence\n");
return RSSL_RET_FAILURE;
}
if (_Item->fragmentBuffer.length + _Fragment->fragmentBuffer.length > _Item->expectedSize)
{
printf("Cannot add fragment to item: fragment size is larger than expected total size\n");
return RSSL_RET_FAILURE;
}
_Item->fragmentNumber = _Fragment->fragmentNumber;
memcpy((_Item->fragmentBuffer.data + _Item->fragmentBuffer.length),
_Fragment->fragmentBuffer.data, _Fragment->fragmentBuffer.length);
_Item->fragmentBuffer.length = (_Item->fragmentBuffer.length + _Fragment->fragmentBuffer.length);
printf("Add Fragment:: Expected total buffer: %i current %i\n", _Item->expectedSize, _Item->fragmentBuffer.length);
return RSSL_RET_SUCCESS;
}
And finally, we declare a static global variable for MRN Item. We will use this variable to combine multi fragment items.
/*
* MRN Item for multi-fragment
*/
static MachineReadableNewsItem multiFragItem;
Before sending a request, we have to make sure that the service can handle News Text Analytics domain. We can do so by checking the capabilities of the service.
We will add another if statement to the processDirectoryResponse in basicDirectoryHandler.c which compare the capability number with the News Text Analytic domain enum.
/* check if name matches service name entered by user.
* if it does, store the service ID. Then check if
* the service support News Text Analytic domain.
* This will be used to make item requests later. */
tmpServiceNameBuffer.data = service->serviceName;
tmpServiceNameBuffer.length = (RsslUInt32)strlen(service->serviceName);
if (rsslBufferIsEqual(&pService->info.serviceName, &tmpServiceNameBuffer))
{
RsslUInt32 j;
service->serviceId = (RsslUInt16)pService->serviceId;
printf("\tFound your service %s using serviceId: %d\n", service->serviceName, pService->serviceId);
for (j = 0; j < pService->info.capabilitiesCount; ++j)
{
if (pService->info.capabilitiesList[j] == RSSL_DMT_NEWS_TEXT_ANALYTICS)
{
service->supportNTA = RSSL_TRUE;
printf("\tRSSL_DMT_NEWS_TEXT_ANALYTICS domain type is supported.\n");
}
}
}
Then, we only make item request if the service supports News Text Analytics domain. We add another check to channelEventCallback in "basicConsumer.C".
char itemName[] = "MRN_STORY";
RsslUInt8 domainType = RSSL_DMT_NEWS_TEXT_ANALYTICS;
Next, we add the decodeMRNPayload method to processRDMResponse to decode the response.
RsslRet processRDMResponse(RsslReactorChannel *pReactorChannel, RsslMsg *msg, RsslDecodeIterator* dIter)
{
...
switch (msg->msgBase.msgClass)
{
...
case RSSL_MC_UPDATE:
printf("Domain: %s\n", rsslDomainTypeToString(msg->msgBase.domainType));
if (msg->msgBase.domainType == RSSL_DMT_NEWS_TEXT_ANALYTICS)
{
ret = decodeMRNPayload(getDictionary(), dIter);
break;
}
For decoding, we can reuse the decodeFieldList and decodeFieldEntry methods. The methods shall dump the metadata fields to the console like before. But we will add the decoded data to the local MRN Item.
/* decode and print out fid value */
dataType = dictionaryEntry->rwfType;
switch (dataType)
{
case RSSL_DT_UINT:
if ((ret = rsslDecodeUInt(dIter, &fidUIntValue)) == RSSL_RET_SUCCESS)
{
printf("" RTR_LLU "\n", fidUIntValue);
switch (fEntry->fieldId)
{
case FRAG_NUM: // Fragment Number
mrnItem->fragmentNumber = fidUIntValue;
break;
case TOT_SIZE: // Expected total size of all fragments
mrnItem->expectedSize = fidUIntValue;
break;
default:
break;
}
}
else if (ret != RSSL_RET_BLANK_DATA)
{
printf("rsslDecodeUInt() failed with return code: %d\n", ret);
return ret;
}
break;
...
case RSSL_DT_ASCII_STRING:
case RSSL_DT_UTF8_STRING:
case RSSL_DT_RMTES_STRING:
if ((ret = rsslDecodeBuffer(dIter, &fidBufferValue)) == RSSL_RET_SUCCESS)
{
printf("%.*s\n", fidBufferValue.length, fidBufferValue.data);
switch (fEntry->fieldId)
{
case GUID_FID: // The Unique (source specific) ID for the News Item
strncpy(mrnItem->guid, fidBufferValue.data, fidBufferValue.length);
break;
case MRN_SRC: // The component which published the data
strncpy(mrnItem->source, fidBufferValue.data, fidBufferValue.length);
break;
case MRN_TYPE: // The type of News Item e.g Story, Headline etc
strncpy(mrnItem->type, fidBufferValue.data, fidBufferValue.length);
break;
default:
break;
}
}
else if (ret != RSSL_RET_BLANK_DATA)
{
printf("rsslDecodeBuffer() failed with return code: %d\n", ret);
return ret;
}
break;
In its standard form the existing decodeFieldEntry method will dump any compressed FRAGMENT field contents as garbage to the console. To avoid this, we have to separate the switch case of RSSL_DT_BUFFER from other buffer type data:
case RSSL_DT_BUFFER:
if ((ret = rsslDecodeBuffer(dIter, &fidBufferValue)) == RSSL_RET_SUCCESS)
{
if (fEntry->fieldId == FRAGMENT) // The zipped Fragment buffer
{
printf("<<COMPRESSED>>\n");
mrnItem->fragmentBuffer = fidBufferValue;
}
}
else if (ret != RSSL_RET_BLANK_DATA)
{
printf("rsslDecodeBuffer() failed with return code: %d\n", ret);
return ret;
}
break;
As mentioned earlier, the Refresh message does not contain any MRN Data fragments, therefore, we only use a custom decompress method for Update messages. However, the refresh message will have fragment number fields as zero. We can use this to filter out the refresh message.
/*
* Start combine and uncompress fragment
*/
/* Is this 1st fragment of a News Item? */
if (mrnItem.fragmentNumber == 1)
{
/* Is it the complete News Item ? */
if (isMRNComplete(&mrnItem)) // check length of this single fragment buffer == expected total size of News Item
{
decompressedBuf.data = (char*)calloc(mrnItem.fragmentBuffer.length * 4 , sizeof(char));
printf("\n<<Single Fragment>>\n");
if (decompress(&mrnItem.fragmentBuffer, &decompressedBuf) == RSSL_RET_SUCCESS) // decompress the buffer
{
printf("%.*s\n\n", decompressedBuf.length, decompressedBuf.data); // Display the decompressed News Item
}
free(decompressedBuf.data);
}
else if (mrnItem.expectedSize != 0)
{ /* Start of a multi-fragment item. so copy this first fragment into our pending item instance */
mrnCopy(&multiFragItem, &mrnItem);
}
}
else if (mrnItem.fragmentNumber > 1)
{ // FRAG_NUM > 1 so continue to build up News Item
if (addFragment(&multiFragItem, &mrnItem) == RSSL_RET_SUCCESS) // Add the newly received fragment to the pending instance
{
if (isMRNComplete(&multiFragItem)) // Does length of the buffer now == expected total size of News Item ?
{
decompressedBuf.data = (char*)calloc(mrnItem.fragmentBuffer.length * 4, sizeof(char));
printf("<<Multi Fragment complete>>\n");
if (decompress(&multiFragItem.fragmentBuffer, &decompressedBuf) == RSSL_RET_SUCCESS) // decompress the buffer
{
printf("%.*s\n\n", decompressedBuf.length, decompressedBuf.data); // Display the decompressed News Item
}
free(decompressedBuf.data);
}
}
}
As you can see, if the MRN item is completed in a single fragment, we can immediately decompress and output to the screen. However, if this is the first of a multi-fragment message, we initialize our global multi-fragment storage item with this initial item.
When we receive further fragments we add them to the global multi-fragment item until all fragments have been received and once all the fragments have been received we can decompress the full buffer and output to the screen.
if (isMRNComplete(&multiFragItem)) // Does length of the buffer now == expected total size of News Item ?
{
decompressedBuf.data = (char*)calloc(mrnItem.fragmentBuffer.length * 4, sizeof(char));
printf("<<Multi Fragment complete>>\n");
if (decompress(&multiFragItem.fragmentBuffer, &decompressedBuf) == RSSL_RET_SUCCESS) // decompress the buffer
{
printf("%.*s\n\n", decompressedBuf.length, decompressedBuf.data); // Display the decompressed News Item
}
free(decompressedBuf.data);
}
We will not include the decompress code here as it is not API specific code - the full method can be found in the accompanying source files. For ease of compiling this tutorial, we have used the Zlib 1.2.3 from the ETA package itself. However, this means the tutorial code must be compiled as a static library only. You will have to add a reference to a precompiled Zlib if you wish to build your project as a shared library.
For these instructions, refer to the Build and Run section within the first tutorial of this series. Again, be sure to either define the server name within your environment to point to a valid market data server or simply modify the server configuration parameters in basicConsumer.c file.
Note that you may see what looks like garbage output in the completed MRN console output – this is not unzipped data – it is foreign language characters which cannot be represented correctly on the console. Displaying this using the appropriate character set is outside the scope of this tutorial.
Example output from an MRN_STORY response
MRN_STORY
DOMAIN: RSSL_DMT_NEWS_TEXT_ANALYTICS
TIMACT_MS 33040952
ACTIV_DATE 26 JAN 2016
MRN_TYPE STORY
MRN_V_MAJ 2
MRN_V_MIN 10
TOT_SIZE 462
FRAG_NUM 1 GUID DJAZ0055D_16012612GYMAkuLYlQdJAjNZFNWkEjVbzcApCUOb9pKT
MRN_SRC DTC_QA_B
FRAGMENT <<COMPRESSED>>
<<Single Fragment>> {"altId":"nDJAZ0055D","audiences":["NP:MFDJ"],"body":"","firstCreated":"2016-01-26T09:10:40.000Z","headline":"DJ MUTUIONLINE: TI TOLO ACCELERA AL RIALZO, +5,03%<MOL.MI>","id":"DJAZ0055D_16012612GYMAkuLYlQdJAjNZFNWkEjVbzcApCUOb9pKT","instancesOf":[],"languag e":"it","mimeType":"text/plain","provider":"NS:DJN","pubStatus":"stat:usable","subjects":["A:1","B:125","B:126","B:127","B:129", "B:134","BL:62","G:3","G:5J","G:A","G:AL","G:BR","G:BS","G:BV","M:DU","M:J","M:K","M:Z","R:MOL.MI","N2:BISV","N2:BSVC","N2:CEEUE ","N2:CMPNY","N2:COFS","N2:DFIN","N2:EMEAE","N2:EU","N2:EUROP","N2:EZC","N2:FIN","N2:FINS","N2:INTAG","N2:IT","N2:NWEE","N2:STX" ,"N2:WEU"],"takeSequence":1,"urgency":1,"versionCreated":"2016-01-26T09:10:40.000Z"}
So now that we've come to the end of the tutorial let's summarise what this example has demonstrated and what we've learned. The key goal was to show how to request and parse Machine Readable News data. We learned that the Initial Refresh message contains only metadata. Subsequent update messages deliver the MRN item in one or more fragments. If delivered as a single fragment we extract the buffer and decompress to obtain the MRN Item. If delivered as multiple fragments, they need to be concatenated in sequence to obtain the complete buffer, before decompressing.
This tutorial deals with requesting a single MRN Item which makes the handling of multi-fragment items relatively straightforward. When requesting multiple MRN Items you need to allow for the Update messages for the different items arriving interspersed between each other. In view of this, the GUID and MRN_SRC should be used to confirm that a fragment is part of the same MRN Item. As Update messages for multi-fragment messages arrive you would need to ensure the additional fragments are added to the correct incomplete item.
One final thing to note that the decompress method - as with the rest of the source code - is provided for illustration purposes only and is not warranted.
Thank you for reading.
For more information on the NTA domain and MRN Data model, please take a look at the following notes:
MRN Data Model and Real-Time Implementation Guide - summary of NTA domain, MRN data model, sample output, and implementation commentary.