EMA NI Provider - Efficiently publishing multiple data items

Download tutorial source code

Click here to download

Last update Dec 2020
Compilers

Visual Studio 2015
For other compilers settings, please check out the EMA C++ Compiler Settings tutorial.

Prerequisites EMA NI Provider - Publishing item statuses
Declare the NI_PUB and TEST_NI_PUB services in your LSEG Real-Time Distribution System (see Before you start).

Tutorial purpose

In this tutorial, you will learn how to efficiently use EMA to publish refresh and update messages for a number of data items.

To this aim we will go through the following sections:

Introduction

Even if EMA is an ease-of-use API, it is also an efficient API that enables you to build high-performance applications. When it comes to performance, using an efficient API is one thing, but you also have to use it the right way. In this tutorial, you will learn how to efficiently use EMA to publish a large number of messages with the best performance the API can offer.

EMA objects reuse

In the previous tutorials of this series, we did not pay too much attention to performance, and, for the sake of simplicity, we generally constructed new EMA objects each time we needed them. For example, to send an update message we wrote something like this:

    	
            

 _provider->submit(

                    UpdateMsg()

                        .payload(

                            FieldList()

                                .addReal(

                                    BID,

                                    item.getBidPrice(),

                                    OmmReal::ExponentNeg2Enum)

                                .addReal(

                                    ASK,

                                    item.getAskPrice(),

                                    OmmReal::ExponentNeg2Enum)

                                .complete()),

                    item.getHandle());

This works fine, but you may have noticed that each time we send an update we instantiate and construct an UpdateMsg and a FieldList on the stack. Taken individually these constructions may not represent much, but they start to have an importance as soon as you send a large number of messages. For this reason, it is always a good idea to reuse EMA objects when you can. The API helps you in this domain and provides a clear() method for all its reusable objects. When called, this method clears all the object’s values and resets them to the defaults.

So, instead of re-creating RefreshMsg, UpdateMsg and FieldList objects each time we need them, we can just preserve one instance of each and call the clear() method whenever we need to reuse it. Here is an example with the code snippet above refactored for EMA objects reuse:

    	
            

 _provider->submit(

                    _updateMessage.clear()

                        .payload(

                            _fieldList.clear()

                                .addReal(

                                    BID,

                                    item.getBidPrice(),

                                    OmmReal::ExponentNeg2Enum)

                                .addReal(

                                    ASK,

                                    item.getAskPrice(),

                                    OmmReal::ExponentNeg2Enum)

                                .complete()),

                    item.getHandle());

This is an optimization we implemented in this tutorial. In the next sections, we will explain how we refactored the application to achieve this goal.

About performance and optimization

Obviously, performance and optimization are not just about following some guidelines as the one described above. It is all about testing and measuring in isolated or real environments to understand what specific parts of your software or your system degrade performance. This requires the usage of “off the shelf” measurement tools and maybe also the development of bespoke tools that help you measure very specific aspects of your application. For example, as this tutorial series is about data items publication, it may be interesting to know how fast our application is able to publish images and updates.

This is what we will find out in this tutorial, by adopting a simplistic approach that consists of measuring the time spent in messages publication. Obviously, this will only measure publication performance at the provider level. It will not tell us anything about the performance of the complete distribution chain (Provider => LSEG Real-Time Distribution System => Consumer), nor about other types of performance like message latency. But this is a start. The other aspects go beyond the scope of this tutorial. If you want to learn more about the performance of your system, you may be interested in the EMA PerfTools and the ETA PerfTools included in the RTSDK examples.

NI Provider refactoring

Being able to publish multiple data items requires to refactor our application. The major part of this refactoring takes place in the NiProvider class, as described below.
 

The _items collection

Because we need the provider to be able to handle several data items, we got rid of the _theOnlyOneItem member and replaced it with a collection of items named _items. The _items member is actually an std:map that uses the item name as the key, and the corresponding item as the value. That way, we can efficiently find an item by its name like this:

    	
             Item & item = _items["SHARE-0"];
        
        
    

This is required for the refresh(const string & itemName) and update(const string & itemName) methods that send a single message to items identified by their name. The _items collection can also be efficiently iterated to send messages to all items.
 

AnchorThe createItems() method

In order to build the collection of items, we added the createItems() method that takes two parameters: a name prefix and the number of items to create. The method creates the requested number of items and adds them to the _items collection. The name of the items is based on the name prefix and on the item ID. For example, if the prefix is “SHARE-” and the number of items is 100, then the items will be named “SHARE-0, “SHARE-1”, “SHARE-2” to “SHARE-99”. The base value of the items is also driven by their IDs.

Here are the details of the method:

    	
            

void NiProvider::createItems(const std::string & itemNamePrefix, unsigned int itemCount)

{

  // Clears the items collection

  _items.clear();

 

  for (unsigned int itemId = 0; itemId < itemCount; ++itemId)

  {

    // Generates the item name

    std::ostringstream oss;

    oss << itemNamePrefix << itemId;

    std::string itemName = oss.str();

 

    // Compute a base price for this item 

    int itemBasePrice = BASE_PRICE + itemId * 100;

 

    // Creates the item and preserves it in the collection

    _items[itemName] = Item(itemName, itemBasePrice);

  }

}

The refreshAll() and updateAll() methods

In order to efficiently send refresh and update messages for all these items, we created a refreshAll() method and an updateAll() method that respectively refresh and update all the items of the _items collection.

To avoid duplication, we factorized the code that actually builds and sends the refresh and update messages, to two new private methods: refresh(Item& item) and update(Item& item) (see below for details). 

As an example, here are the details of the updateAll() method (refreshAll() is built on the same model):

    	
            

void NiProvider::updateAll()

{

  cout << "  Updating all items" << endl;

 

  for (ItemsByName::iterator it = _items.begin(); it != _items.end(); ++it)

  {

    Item & item = it->second;

    update(item);

  }

}

The refresh(Item& item) and update(Item& item) new methods

These two new methods take advantage of the good practice described in the EMA objects reuse section above. They build and send EMA messages reusing the EMA objects we preserved as members of the NiProvider class. These methods are private to the NiProvider class and used by the following public methods: refresh(string itemName), refreshAll(), update(string itemName) and updateAll()

As an example, here is the source code of the update() method:

    	
            

void NiProvider::update(refinitiv::tutorials::ema::nip::Item & item)

{

    item.generateNextTick();

 

    _provider->submit(

#if defined REUSE_EMA_OBJECTS

                _updateMessage.clear()

#else

                UpdateMsg()

#endif

                    .payload(

#if defined REUSE_EMA_OBJECTS

                        _fieldList.clear()

#else

                        FieldList()

#endif

                            .addReal(

                                BID,

                                item.getBidPrice(),

                                OmmReal::ExponentNeg2Enum)

                            .addReal(

                                ASK,

                                item.getAskPrice(),

                                OmmReal::ExponentNeg2Enum)

                            .complete()),

                item.getHandle());

}

As you can notice this method is implemented in two different ways, depending on the definition of the  REUSE_EMA_OBJECTS pre-processor macro. If the macro is defined, the refresh() and update() methods reuse the preserved EMA objects. If the macro is not defined, the methods use automatic variables built on the stack for each call (no object reuse). By default, the REUSE_EMA _OBJECTS macro is defined, so EMA objects are reused. If you want to undefine the macro, you just have to comment out the following pre-processor directive (in the 5th line of the NiProvider.cpp file):

    	
               #define REUSE_EMA_OBJECTS
        
        
    

This gives you the opportunity to measure the performance difference between the two modes (EMA objects created vs EMA objects reused). For more details please refer to the related note in the Build and run the application section.

The itemExists() method

We also add the itemExists() method that we use in the refresh(string itemName) and update(string itemName) methods, to test the existence of a data item.

Here are the details of the method:

    	
            

bool NiProvider::itemExists(const std::string & itemName) const

{

  if (_items.find(itemName) == _items.end())

    return false;

  

  return true;

}

The main advantage of this method is to improve the readability of the coad by ending writting like this:

 

    	
            

if (itemExists(itemName))

  {

    ...

  }

insted of this:

    	
            

 if (_items.find(itemName) != _items.end())

  {

    ...

  }

The main workflow

We changed the main function so that it implements a workflow equivalent to that of the Publishing updates tutorial, but for 10,000 items instead of one.

    	
            

int main(int argc, char* argv[])

{

    .

    .

    .

        NiProvider provider;

        unsigned int itemsCount = 10000;

 

        provider.setServiceName("TEST_NI_PUB");

        provider.createItems("SHARE-", itemsCount);

        provider.connectAs("YOUR_PROVIDER_USER_NAME");

 

        waitFor(5);

 

        provider.refreshAll();

    

        waitFor(1);

 

        for (int i = 0; i < 30; ++i)

        {

            provider.updateAll();

 

            waitFor(1);

        }

        

        provider.disconnect();

    .

    .

    .

}

We also added time measurement and statistics display for the refresh phase and for the update phase. These statistics rely on the getCurrentTime() and printStatistics() functions also defined in main.cpp.

This is the main workflow with the additional statistics:

    	
            

int main(int argc, char* argv[])

{

    .

    .

    .

        NiProvider provider;

        unsigned int itemsCount = 10000;

        unsigned long long startTime = 0;

        unsigned long long endTime = 0;

 

        provider.setServiceName("TEST_NI_PUB");

        provider.createItems("SHARE-", itemsCount);

        provider.connectAs("YOUR_PROVIDER_USER_NAME");

 

        waitFor(5);

 

        startTime = getCurrentTime();

        provider.refreshAll();

        endTime = getCurrentTime();

        printStatistics(itemsCount, startTime, endTime);

    

        waitFor(1);

 

        for (int i = 0; i < 30; ++i)

        {

            startTime = getCurrentTime();

            provider.updateAll();

            endTime = getCurrentTime();

            printStatistics(itemsCount, startTime, endTime);

 

            waitFor(1);

        }

        

        provider.disconnect();

    .

    .

    .

}

The printStatistics() function displays the time spent to publish the 10,000 refresh or update messages. It also displays the corresponding output rate.

Here are the details of this method:

    	
            

void printStatistics(unsigned int itemsCount, unsigned long long startTime, unsigned long long endTime)

{

    float timeSpent = (float)(endTime - startTime) / (float)1000;

 

    cout << "\tMessage count = " << itemsCount

         << "\tTotal time = " << timeSpent << " sec"

         << "\tMessage rate = " << (float)itemsCount / timeSpent << " msg/sec" << endl;

}

Build and run the application

Build the application and start it. Please refer to the Build and Run section within the first tutorial of this series (A barebones EMA NIP application shell) for detailed instructions.

This is what you should get:

  1. A console application should open and display something like:
    	
            

-------------------------------------------------------------------------------

|                    Non Interactive Provider EMA Tutorial                    |

|                                                                             |

|                 Tutorial 9 - Publishing multiple data items                 |

-------------------------------------------------------------------------------  

  Provider created  

  Connecting Provider to the ADH as YOUR_PROVIDER_USER_NAME  

  Waiting for 5 seconds...   

  Provider is connected.

  OmmState:Open / Ok / None / 'Refresh Completed'  

  Refreshing all items        

       Message count: 10000        

       Total time   : 0.08 sec        

       Message rate : 125000 msg/sec  

  Waiting for 1 seconds...  

  Updating all items        

       Message count: 10000        

       Total time   : 0.157 sec        

       Message rate : 63694.3 msg/sec  

  Waiting for 1 seconds...  

  Updating all items        

       Message count: 10000        

       Total time   : 0.152 sec        

       Message rate : 65789.5 msg/sec  

  Waiting for 1 seconds...    

       .    

       .        

       .  

  Updating all items        

       Message count: 10000        

       Total time   : 0.151 sec        

       Message rate : 66225.2 msg/sec  

  Waiting for 1 seconds...  

  Disconnecting...  

  Provider is disconnected. OmmState:Open / Suspect / None / 'channel down'  

  Exiting the application

Press any key to continue . . .

2. Open a consuming application and subscribe to the SHARE-1 market price item of the TEST_NI_PUB service. After a short while, you should receive values for the 5 fields (DSPLY_NAME/3, OPEN_PRC/19, HST_CLOSE/21, BID/22 and ASK/25) published by the Ni Provider. Then, every second the BID and ASK fields are updated.

As an example, this is a screenshot of the Eikon Quote object that we used to subscribe to TEST_NI_PUB/SHARE-1. In this Eikon, updated fields are displayed in yellow for a short while:

3. Your can also open any of the SHARE-0 to SHARE-9999 data items. You will see that all of them are streaming (one update every second).
This is a screenshot of SHARE-9999 opened in the Eikon Quote object:

4. After the 30 updates sent for each of the 10,000 items, the TEST_NI_PUB service goes down and the application exits.

5. Press a key to close the console when you are prompted to.

Note: If you want to measure the performance difference between the two modes (EMA objects created on the stack vs EMA objects reused), you can build two versions of the application (one with the REUSE_EMA _OBJECTS macro defined and one with the macro undefined - the REUSE_EMA _OBJECTS macro is defined in NiProvider.cpp), run them and compare the printed statistics.

Troubleshooting

Q: When I build the tutorial project, I get errors like:

    	
            error MSB3073: The command "copy "\Ema\Libs\WIN_64_VS140\Debug_MDd\Shared\*.*" .\Debug_WIN_64_VS140_Shared\
        
        
    

A: The ElektronSDKInstallPath environment variable is not set or set to the wrong path. See Setup the development environment.
 

Q: The application is stuck after the "Connecting Provider to ADH…" message is displayed.
After a while the application displays an error like: 

    	
            Exception Type='OmmInvalidUsageException',   Text='login failed (timed out after waiting 45000 milliseconds) for 10.2.43.149:14003)'
        
        
    

A: Verify that the ADH of your LSEG Real-Time Distribution System infrastructure is up and that you properly set the host parameter in the EmaConfig.xml file. 
You can also use the telnet command tool to verify that your NIP application machine can connect to the ADH (telnet <ADH host> <port>). If the telnet succeeds but you still can’t connect, verify that you don’t have any firewall blocking the messages sent/received by the application.  
Ultimately, ask your administrator to help you to investigate with monitoring tools like adhmon.
 

Q: The “Waiting for 5 seconds...” message and the “Provider is connected” message get mixed up when displayed in the console.
A: This is perfectly normal and actually caused by the EMA background thread that prints the “Provider is connected” message at the same time the main application thread prints the “Waiting for 30 seconds...” message. This can be fixed either by choosing a mono threaded application model (see the Message Processing - dispatch section of the Requesting MarketPrice data EMA Consumer tutorial) or by printing these messages atomically (use critical sections or a single cout << call to print the whole message).
 

Q: My provider unexpectedly disconnects and raises an OmmInvalidUsageException that says : "The output buffer may need to be flushed".
Details of the exception I receive:

    	
            Exception Type='OmmInvalidUsageException',   Text='Internal error: rsslReactorSubmitMsg() failed in OmmNiProviderImpl::submit( const RefreshMsg& ). RsslReactorChannel name Channel_1 RsslReactor 0x0000000002010230 RsslReactorChannel 0x0000000002010230 RsslSocket 172 RsslChannel 0x0000000002010230 Error Id -4 Internal sysError 0 Error Location ..\..\..\ValueAdd\Reactor\Impl\rsslReactor.c:1302 Error Text <..\..\..\Ripc\Impl\ripcsrvr.c:7674> Error: 1009 ripcIntWrtHeader() failed,   out of output buffers. The output buffer may need to be flushed.'
        
        
    

A: This indicates that the output buffer of the EMA/ETA library has overflowed. This happens if the ADH is too slow to consume the messages sent by your provider. This may be the sign of a slow connection between your Provider and the ADH (e.g. if you are connected via a VPN). You can work around this issue by decreasing the number of items created by your provider (try 1000 and then increase to adjust to your available bandwidth).