ETA Consumer - Establishing a connection to a Provider

Download tutorial source code

Click here to download

Last update Dec 2020
Compilers Refer to the ETA Compiler Guides for a complete list.
Prerequisites ETA Consumer - Creating a bare-bones ETA starter application

Introduction

The goal of this tutorial is to define a Consumer/client interface that successfully connects into an existing OMM-based Provider/server. All communication is asynchronous so the setup involves defining callbacks to manage both success and failure events. In addition, we'll introduce the main Value Added Component, Transport API Reactor (RsslReactor), which manages and establishes communication between the consumer and provider.

Description

Derived from Tutorial 1, we will utilize the Transport API Reactor component to setup and initialize a basic consumer. A consumer in our definition simply acts as a client to a server of data. Prior to session establishment, we must establish basic communication to request and receive data.

Transport API Reactor

The Transport API Reactor is a connection management and event processing component that can significantly reduce the amount of code an application must write to connect to OMM-based devices. The component supports basic session management including user log in, source directory establishment and dictionary download. The reactor also supports event dispatch, flushing of user written content and network pings on the user's behalf. While the component supports both Consumers and Providers, we only require consumer use.

The following diagram shows how the Value Added Components fit within the ETA structure.

Implementation Overview - Communicating with our OMM-based Provider

All communication to an OMM-based Provider utilizes an event-driven, asynchronous paradigm. As such, the ETA interfaces expect callback functions to be defined to capture and react to these events. In this tutorial, we will enhance the "basicConsumer.c" source file to perform the necessary steps to communicate with an OMM-based Provider. The following steps define communication:

Define and set up a role

When determining the type of connection our application must establish, we must specify whether we are acting as a consumer of data or a provider of data. This specification is referred to as a role. In our case, a Consumer role is defined which will require both optional and mandatory event callbacks which will be detailed in this and subsequent tutorials. The code segment below is defined within the init() routine, where we create and set up our role then specify the callbacks the Transport API Reactor component requires.

    	
            

void init()

   {

      ...

 

      RsslReactorOMMConsumerRole consumerRole;   // Channel Consumer role - client (consumer) connecting into a server (provider)

 

      ...

 

      rsslClearOMMConsumerRole(&consumerRole);

 

      /**************************************************************************

       * Setup callback functions for our connection channel - consumer role

       **************************************************************************/

 

      /* Connection/Channel callback */

      consumerRole.base.channelEventCallback = channelEventCallback;

 

      /* Market Data Message callback - temporary placeholder required by RsslReactor */

      consumerRole.base.defaultMsgCallback = defaultMsgCallback;

 

      ...

   }

Note: Refer to the Define our callbacks section below for details regarding callbacks.

Initialize our reactor

Our RSSL Reactor component must live during the life of the connection thus it is defined as a file scope variable.

    	
            

/* RsslReactor components*/

   RsslReactor *pReactor = NULL;

 

   void init()

   {

      ...

 

      RsslCreateReactorOptions reactorOpts;

 

      ...

 

      /* Create our rsslReactor used to manage the UPA channel */

      rsslClearCreateReactorOptions(&reactorOpts);

      if (!(pReactor = rsslCreateReactor(&reactorOpts, &rsslErrorInfo)))

      {

         printf("Error: %s", rsslErrorInfo.rsslError.text);

         exit(-1);

      }

 

      ...

   }

Specify our connection options and initiate a connection

Our connection contains many specifications that can be referred to within the documentation. However, the important items within this tutorial contain the details of the server/port of the server as well as how the reactor handles the loss of connection. Once we have established our options, we issue an asynchronous connection request.

    	
            

/* Server/Provider connection details */

   #define RTDS_SERVER_NAME   "rtds"

   #define RTDS_SERVER_PORT   "14002"

 

   ...

 

   /* RDM objects used at startup */

   RsslReactorConnectOptions connectionOptions;

 

   void init()

   {

      ...

 

      // Prepare our connection properties

      rsslClearReactorConnectOptions(&connectionOptions);

 

      connectionOptions.rsslConnectOptions.connectionInfo.unified.address = RTDS_SERVER_NAME;

      connectionOptions.rsslConnectOptions.connectionInfo.unified.serviceName = RTDS_SERVER_PORT;

      ...

      connectionOptions.reconnectAttemptLimit = -1;   // Keep trying to reconnect upon failure

 

      ...

 

      /* Initialize connection */

      if (rsslReactorConnect(pReactor, &connectionOptions, (RsslReactorChannelRole*)&consumerRole, &rsslErrorInfo) != RSSL_RET_SUCCESS)

      {

         printf("Error rsslReactorConnect(): %s\n", rsslErrorInfo.rsslError.text);

         exit(-1);

      }

   }

Define our callbacks

Within the Define and set up a role section, we outlined the callbacks required for our Consumer role when communicating with our OMM-based Provider. The following callbacks have been defined.

channelEventCallback

The channelEventCallback is a mandatory callback used to manage our connection into the OMM-based Provider. The primary function of this callback is to react to connection status and associate the connection file descriptor with our event dispatch mechanism select(). In addition, we also utilize the "Channel Ready" event to signal we are ready for further communication with our Provider.

    	
            

RsslReactorCallbackRet channelEventCallback(RsslReactor *pReactor, RsslReactorChannel *pReactorChannel, RsslReactorChannelEvent *pConnEvent)

   {

      switch (pConnEvent->channelEventType)

      {

         case RSSL_RC_CET_CHANNEL_UP:

         {

            /* A channel that we have requested via rsslReactorConnect() has come up.  We must now

            * associated the new channel descriptor within our select() watchlist. This will drive

            * the process of detecting message events, i.e. Login, Directory, Dictionary etc. */

            printf("Connection up! Channel fd=%d\n\n", pReactorChannel->socketId);

 

            /* Set file descriptor. */

            FD_SET(pReactorChannel->socketId, &readFds);

            FD_SET(pReactorChannel->socketId, &exceptFds);

 

            /* Note: Refer to the complete rsslConsumer example to set channel properites appropriate for your environment */

            break;

         }

         case RSSL_RC_CET_CHANNEL_READY:

            printf("Channel is ready.\n");

            break;

 

         ...

      }

 

      return RSSL_RC_CRET_SUCCESS;

   }

defaultMsgCallback

The defaultMsgCallback is a mandatory callback and is presently a placeholder for a future Tutorial. The RSSL Reactor requires it's definition for proper operation.

    	
            

RsslReactorCallbackRet defaultMsgCallback(RsslReactor *pReactor, RsslReactorChannel *pChannel, RsslMsgEvent* pMsgEvent)

{

    return(RSSL_RC_CRET_SUCCESS);

}

Dispatch events - Event Loop

Once we have our communication defined and our callbacks in place, we can begin to monitor events. The Event Loop, represented in the diagram below, will react to external events triggered by our OMM Provider. The moment activity is detected, the event is dispatched to the appropriate callback for processing. Once completed, the processing is returned back into the event loop to process any further events or waits for the next available one. In this tutorial, we have only defined the connection event and the associated connection callback, as indicated within our init() function.  Other event callbacks, such as Login and Directory, will be covered in subsequent tutorials.

The following code segment represents our event processing loop:

    	
            

void mainEventLoop()

   {

      RsslReactorDispatchOptions dispatchOpts;

      int ret;

 

      rsslClearReactorDispatchOptions(&dispatchOpts);

 

      /* Main loop - select() to trigger event notification then dispatch (rsslReactorDispatch()) */

      do

      {

         fd_set useReadFds = readFds, useExceptFds = exceptFds;

 

         /* detect channel notifications */

         ret = select(FD_SETSIZE, &useReadFds, NULL, &useExceptFds, NULL);

 

         ...

 

         /* Call rsslReactorDispatch().  This will handle any events that have occurred on our rssl channel and

          * dispatched via the callbacks specified for our consumerRole as referenced within init().

            A return value of RSSL_RET_SUCCESS indicates there may be more to process */

         while ((ret = rsslReactorDispatch(pReactor, &dispatchOpts, &rsslErrorInfo)) > RSSL_RET_SUCCESS)

            ;

 

         ...

 

      } while (ret >= RSSL_RET_SUCCESS);

   }

Build and run

For these instructions, refer to the Build and Run section within the first tutorial of this series.

Note: Ensure you can connect into a valid Real-Time Distribution System to test. Defined within the basicConsumer.c file, these tutorials assume the following server has been defined:

    	
            

   /* Server/Provider connection details */

   #define RTDS_SERVER_NAME "rtds"

   #define RTDS_SERVER_PORT "14002"

 

   ...

You can either define the server name 'rtds' within your environment to point to a valid Real-Time Distribution System or simply modify these server configuration paramters to suit your setup. Running the tutorial will simply echo the connection status and if successful, that our channel is ready, i.e.

    	
            

> Connection up! Channel fd=156

>

> Channel is ready.

References

For more information, refer to the Transport API - C Development Guides