Introduction
Update: March 2021
This article is intended for anyone who is interested in using Enterprise Message API (EMA - formerly known as Elektron Message API) - Java in Scala language to consume real-time data i.e. Market Price, level 2 data i.e. Market By Order, Market By Price and Market Maker including Symbol List from the server e.g. Advanced Distribution Server (ADS), Refinitiv Real-Time Host or any OMM Provider application. EMA is one part of the Refinitiv Real-Time Software Development Kit or RTSDK (formerly known as ESDK). EMA is an easy-to-use API providing access to OMM and RWF data, high-performance open source API that operates at the Message Layer. Therefore, the developers generally only need to deal with data and status messages. For more details of EMA and the resource e.g. Overview, Quick Start, Documentation, Downloads and Tutorials, please refer to Real-Time SDK - Java. This article also demonstrates how to request a snapshot and streaming request and apply the Views feature and Batches feature. For more details of these, you can find them in the API Concepts Guide.
EMA Overview
EMA Consumer Life Cycle
Developing a consumer application with EMA is simple. The general process can be summarized by the following steps
- Establish network communication to the server
- Log in to the server
- Obtain feeds/services information e.g. the supported domain types, each service’s status(up/down)
- Load or download dictionary information e.g. all field ids, field name, and their type. This info is used when processing data
- Issue item requests and process responses e.g. data or status messages
- Logout from the server and shut down
However, EMA makes things simpler by further reduce the coding needed. The API will automatically perform the first four steps with minimal coding. Hence, the application just issues item requests, process responses, logout and shutdown as shown in the figure below:
Processing Responses from the Server
In order to process the responses, the consumer applications need to implement the following callback methods of OmmConsumerClient interface:
- onRefreshMsg() for the processing of refresh messages. A refresh message contains all the data for the requested instrument.
- onUpdateMsg() for the processing of update messages. An update message contains the changed data values for the instrument being subscribed.
- onStatusMsg() for the processing of status messages. A status message contains the health information on the status of the feed and delivery infrastructure and data subscriptions themselves.
Moreover, The interface also provide onGenericMsg(),onAckMsg() and onAllMsg() for the processing of generic messages, ack messages and all messages respectively. Normally the consumer applications can ignore these methods; put an empty body for each of them. Since all messages the consumer applications receive are from onRefreshMsg() , onUpdateMsg() and onStatusMsg() already.
Scala Overview
Scala was developed specifically with the aim of being a better language than Java. Scala is a modern multi-paradigm programming language designed to express common programming patterns in a concise, elegant, and type-safe way. It smoothly integrates features of object-oriented and functional languages. Scala is designed to interoperate well with the popular Java Runtime Environment (JRE). In particular, the interaction with the mainstream object-oriented Java programming language is as smooth as possible. Hence, you can write a Scala application to call any Java interfaces e.g. EMA Java. Scala syntax is also closed to the Java language so the learning curve for the Java developers is minimized. Therefore, you can develop EMA Java application in Scala within minimum time to create a better Java application e.g. extremely compact and easier to write. For more details of Scala including Documentation, Download, Online Courses, please refer to Scala page.
Solution
This section explains how to develop an EMA Consumer application written by Scala language. It shows the snipped Scala source code which uses EMA to consume, process data, use Views, and Batches feature. The complete application source code can be found at Refinitiv-API-Samples/Article.EMA.Scala.Consumer GitHub. The section also explains how to build and run the application by Scala command line.
Prerequisites
- JDK8
- Scala binaries or IDE. For more details, please refer to Scala Download Page
- Real-Time SDK Java Package which provides EMA, it's dependency libraries which are required to develop EMA consumer applications.
Source Code
This section demonstrates how to develop an EMA Consumer according to EMA Consumer Life Cycle section. The application is simple. There is only one file named Consumer.scala which contains the following part:
- Consumer object: It contains the main (args:Array[String]) method which performs:
- Extract the following program parameters from the command line.
- -server <server_host_port>. The Server host and port. The default is localhost:14002.
- -serviceName <service_name>. The Service name. The default is DIRECT_FEED.
- -username <name>. The Name of the application user. The default is user.
- -domainType <domain_type>. The Domain Type of the requested item(s). The default is MARKET_PRICE. The other Domain Types are MARKET_BY_ORDER, MARKET_BY_PRICE, MARKET_MAKER, and SYMBOL_LIST.
- -itemNames <list_of_RICs>. The List of RICs separated by ','. The default is IBM.N.
- -fieldIds <list_of_FieldIds>. The List of field Ids separated by ','. The default is all fields.
- -streamingReq <true or false>. Are all streaming requests?. If it is true, the application receives a Refresh follow by Update messages. Otherwise, only a Refresh is received. The default is true.
- -runTime <seconds>. How long (in seconds) application should run before exiting. The default is 60.
- Initialize AppClient object containing the callback methods which process all responses from the server.
- Create OmmConsumer object which connects and logs in to the server, obtains services information, loads or downloads dictionary information automatically.
- Call OmmConsumer to subscribe the item(s) according to the program parameters e.g. request streaming or snapshot. If there are more than one items, the Batches feature is applied. If the field id(s) is/are specified, the Views feature is applied instead of receiving all fields available on the item(s).
- Sleep the application till the runtime parameter (seconds) elapses. This is to wait for EMA receives responses from the server and call the callback methods i.e. onRefreshMsg() , onUpdateMsg() and onStatusMsg()
- Before shutdown the application, uninitialized OmmConsumer object. This is to log out and disconnect from the server, at which time all open item streams are closed.
- Extract the following program parameters from the command line.
The snipped source code in the main (args:Array[String]) method:
def main (args:Array[String]) {
var myapp = new AppClient
//Extract program parameters from the command line.
this.getCommandLineOptions(args)
//Show all parameters used by the application
showAllParameters()
//EMA part to subscribe items and retrieve data
var consumer: OmmConsumer = null
try {
val appClient: AppClient = new AppClient()
val config: OmmConsumerConfig = EmaFactory.createOmmConsumerConfig()
//Create EMA OmmConsumer object.
//The OmmConsumer connects and logs in to the server then
//it obtains services information, loads or downloads dictionary information automatically.
consumer = EmaFactory.createOmmConsumer(config.host(server).username(user))
val reqMsg: ReqMsg = EmaFactory.createReqMsg()
//1 item/RIC with all fields so batch and view request are not created; specify the item/RIC in the name(..) method
if(itemNames.size == 1 && fieldIds.isEmpty) {
//send the request
consumer.registerClient(reqMsg.serviceName(service).domainType(domainType).name(itemNames.head).interestAfterRefresh(streamingReq), appClient)
}
//1 or more items/RICs with all fields or specified fields
else {
//create batch and/or view request
val batchview: ElementList = createBatchViewElementList(itemNames,fieldIds)
//send view request for 1 item and specify the item/RIC in the name(..) method
if(itemNames.size == 1)
consumer.registerClient(reqMsg.serviceName(service).domainType(domainType).name(itemNames.head).interestAfterRefresh(streamingReq).payload(batchview), appClient)
//send batch(multiple items) and/or view request
else
consumer.registerClient(reqMsg.serviceName(service).domainType(domainType).interestAfterRefresh(streamingReq).payload(batchview), appClient)
}
//EMA calls onRefreshMsg(), onUpdateMsg() and onStatusMsg() when an event is received automatically
//till the runTime seconds elapsed.
Thread.sleep(runTime * 1000)
} catch {
case i: InterruptedException => {
println("InterruptedException:" + i.getMessage)
}
case o : OmmException => {
println("OmmException:" + o.getMessage)
}
} finally {
//before the application exits, log out and disconnect from the server,
//at which time all open item streams are closed.
if (consumer != null)
consumer.uninitialize()
System.exit(0);
}
}
The snipped source code to apply Batches and Views feature:
//create batch and/or view request
def createBatchViewElementList(items:Set[String],fidsSet:Set[Int]) : ElementList = {
val batchview: ElementList = EmaFactory.createElementList()
//If there are more than 1 items/RICs,
//create a batch request by add each item(RIC) into an array of the ElementList
if(items.size > 1) {
val itemsarray: OmmArray = EmaFactory.createOmmArray()
items.foreach(anItem =>
itemsarray.add(EmaFactory.createOmmArrayEntry().ascii(anItem))
)
batchview.add(EmaFactory.createElementEntry().array(EmaRdm.ENAME_BATCH_ITEM_LIST, itemsarray));
}
//If field ids are specified,
//create a view request by adding each field id into an array of the ElementList
//set view type is 1 to indicate that the array contains field ids.
if(fidsSet.isEmpty == false)
{
val fidsarray: OmmArray = EmaFactory.createOmmArray()
fidsSet.foreach(afid =>
fidsarray.add(EmaFactory.createOmmArrayEntry().intValue(afid))
)
batchview.add(EmaFactory.createElementEntry().uintValue(EmaRdm.ENAME_VIEW_TYPE, 1))
batchview.add(EmaFactory.createElementEntry().array(EmaRdm.ENAME_VIEW_DATA, fidsarray))
}
return batchview
}
2. AppClient class: This class processes responses EMA receives i.e. data or the status of item streams. It implements the callback methods of OmmConsumerClient interface.
- onRefreshMsg() to process refresh messages
- onUpdateMsg() to process update messages.
- onStatusMsg() for process status messages.
The snipped source code in AppClient class:
//This callback is invoked upon receiving a refresh message.
//This message contains all currently available information about the item.
//It also contains all fields or requested fields(in case of view request).
def onRefreshMsg(refreshMsg: RefreshMsg,event: OmmConsumerEvent) {
println("Refresh Message of: " +
(if (refreshMsg.hasName()) refreshMsg.name()
else "<not set>"))
println("Domain Type: "+Consumer.domainTypeMap(refreshMsg.domainType))
println("Service Name: " +
(if (refreshMsg.hasServiceName()) refreshMsg.serviceName()
else "<not set>"))
println("Item State: " + refreshMsg.state());
//Normally, it is data of Market Price domain
if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType())
decode(refreshMsg.payload().fieldList());
//for data of other domain types i.e. MARKET_BY_ORDER, MARKET_BY_PRICE, MARKET_MAKER and SYMBOL_LIST
else if (DataType.DataTypes.MAP == refreshMsg.payload().dataType())
decode(refreshMsg.payload().map());
println()
}
Process update messages source code:
//This callback is invoked upon receiving an update message.
//This message conveys any changes to an item’s data.
def onUpdateMsg(updateMsg: UpdateMsg,event: OmmConsumerEvent) {
println("Update Message of: " +
(if (updateMsg.hasName()) updateMsg.name()
else "<not set>"))
println("Domain Type: "+Consumer.domainTypeMap(updateMsg.domainType))
println("Service Name: " +
(if (updateMsg.hasServiceName()) updateMsg.serviceName()
else "<not set>"))
//Normally, it is data of Market Price domain
if (DataType.DataTypes.FIELD_LIST == updateMsg.payload().dataType())
decode(updateMsg.payload().fieldList());
//for data of other domain types i.e. MARKET_BY_ORDER, MARKET_BY_PRICE, MARKET_MAKER and SYMBOL_LIST
else if (DataType.DataTypes.MAP == updateMsg.payload().dataType())
decode(updateMsg.payload().map());
println()
}
Process status messages:
//This callback is invoked upon receiving a status message.
//This message conveys state change information associated with an item stream.
def onStatusMsg(statusMsg: StatusMsg,event: OmmConsumerEvent) {
println("Status Message of:" +
(if (statusMsg.hasName()) statusMsg.name()
else "<not set>"))
println("Domain Type: "+Consumer.domainTypeMap(statusMsg.domainType()))
println("Service Name: " +
(if (statusMsg.hasServiceName()) statusMsg.serviceName()
else "<not set>"))
if (statusMsg.hasState())
println("Item State: " + statusMsg.state())
println()
}
You can notice that the class calls the utility method named decode(fieldList: FieldList) to decode level 1 data which is Market Price. The class also calls the method decode(map: com.refinitiv.ema.access.Map) to decode level 2 data i.e. Market By Order, Market By Price and Market Maker and including Symbol List.
The decode(fieldList: FieldList) method source code to process level 1 data:
//decode the field list according to each field's type
def decode(fieldList: FieldList){
fieldList.forEach( fieldEntry => {
print("\tFid: " + fieldEntry.fieldId() + " Name: " + fieldEntry.name() + " DataType: " + DataType.asString(fieldEntry.load().dataType()) + " Value: ")
if (Data.DataCode.BLANK == fieldEntry.code()) println(" blank")
else {
fieldEntry.loadType() match {
case DataTypes.REAL => println(fieldEntry.real().asDouble)
case DataTypes.DATE => println(fieldEntry.date().day() + " / " + fieldEntry.date().month() + " / " + fieldEntry.date().year())
case DataTypes.TIME => println(fieldEntry.time().hour() + ":" + fieldEntry.time().minute() + ":" + fieldEntry.time().second() + ":" + fieldEntry.time().millisecond())
case DataTypes.DATETIME => println(fieldEntry.dateTime().day() + " / " + fieldEntry.dateTime().month() + " / "
+ fieldEntry.dateTime().year() + "." + fieldEntry.dateTime().hour() + ":"
+ fieldEntry.dateTime().minute() + ":" + fieldEntry.dateTime().second() + ":"
+ fieldEntry.dateTime().millisecond() + ":" + fieldEntry.dateTime().microsecond()+ ":"
+ fieldEntry.dateTime().nanosecond())
case DataTypes.INT => println(fieldEntry.intValue())
case DataTypes.UINT => println(fieldEntry.uintValue())
case DataTypes.ASCII => println(fieldEntry.ascii())
case DataTypes.ENUM => println(
if (fieldEntry.hasEnumDisplay()) fieldEntry.enumDisplay()
else fieldEntry.enumValue())
case DataTypes.RMTES => println(fieldEntry.rmtes())
case DataTypes.ERROR => println("(" + fieldEntry.error().errorCodeAsString() + ")");
case _ => println()
}
}
}
)
}
The decode(map: com.refinitiv.ema.access.Map) source code to process level 2 data and Symbol List.
//decode a map which can contain a summary data and the map entries
def decode(map: com.refinitiv.ema.access.Map){
if (DataTypes.FIELD_LIST == map.summaryData().dataType())
{
println("Map Summary Data:");
decode(map.summaryData().fieldList())
}
map.forEach(mapEntry => {
print("Map Entry: action = " + mapEntry.mapActionAsString() + " key = ")
mapEntry.key().dataType() match {
case DataTypes.BUFFER => println(EmaUtility.asAsciiString(mapEntry.key().buffer()))
case DataTypes.ASCII => println(mapEntry.key().ascii())
case DataTypes.RMTES => println(mapEntry.key().rmtes());
}
if (DataTypes.FIELD_LIST == mapEntry.loadType())
{
println("Map Entry Data:");
decode(mapEntry.fieldList());
}
}
)
}
}
Build and Run the Consumer
To build and run the Consumer, the library files i.e. EMA, ETA, and EMA's dependency are required. All files are shipped with the Real-Time SDK Java Package. Thus, you just set the classpath to all of them before building and running the application.
The following show how to build and run the application using the commands in Scala binaries software on Windows:
- Set the classpath to EMA, ETA, and EMA's dependency library files. For example:
set CLASSPATH=C:\RTSDK-2.0.1.L1.java.rrg\Java\Ema\Libs\ema-3.6.1.0.jar;C:\RTSDK-2.0.1.L1.java.rrg\Java\Eta\Libs\eta-3.6.1.0.jar;C:\RTSDK-2.0.1.L1.java.rrg\Java\Eta\Libs\etaValueAdd-3.6.1.0.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Eta\Libs\jdacsetalib.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Ema\Libs\apache\commons-collections-3.2.2.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Ema\Libs\apache\commons-configuration-1.10.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Ema\Libs\apache\commons-lang-2.6.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Ema\Libs\apache\commons-logging-1.2.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Eta\Libs\SLF4J\slf4j-1.7.12\slf4j-api-1.7.12.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Eta\Libs\SLF4J\slf4j-1.7.12\slf4j-jdk14-1.7.12.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Eta\Libs\xpp3-1.1.4c.jar
2. Compile the application using scalac command:
scalac com\refinitiv\ema\example\scala\Consumer.scala
3. Run the application:
- Include the path containing the Consumer class files generated from step 2 to the classpath
set CLASSPATH=.;%CLASSPATH%
- Run the application using scala command. For example, log in with the user named pimchaya to requests data with updates(streaming request) of field TIMACT, BID and ASK (field id is 5,22 and 25 respectively) of a market price item named JPY= from the service named ELEKTRON provided by server IP 192.168.27.49 port 14002.
scala com.refinitiv.ema.example.scala.Consumer -server 192.168.27.49:14002 -service ELEKTRON -user pimchaya -itemNames JPY= -fieldIds 5,22,25
The Example Result
The application is using the following parameters:
server=192.168.27.49:14002
service=ELEKTRON
user=pimchaya
domainType=MARKET_PRICE
itemNames=JPY=
fieldIds=5,25,22
streamingReq=true
runTime=60
Mar 11, 2021 10:45:36 AM com.refinitiv.ema.access.ChannelCallbackClient reactorChannelEventCallback
INFO: loggerMsg
ClientName: ChannelCallbackClient
Severity: Info
Text: Received ChannelUp event on channel Channel_1
Instance Name Consumer_1_1
Component Version ads3.3.2.L1.linux.tis.rrg 64-bit
loggerMsgEnd
Refresh Message of: JPY=
Domain Type: MARKET_PRICE
Service Name: ELEKTRON
Item State: Open / Ok / None / 'All is well'
Fid: 5 Name: TIMACT DataType: Time Value: 3:18:0:0
Fid: 22 Name: BID DataType: Real Value: 113.55
Fid: 25 Name: ASK DataType: Real Value: 113.58
Update Message of: JPY=
Domain Type: MARKET_PRICE
Service Name: ELEKTRON
Fid: 22 Name: BID DataType: Real Value: 113.56
Fid: 25 Name: ASK DataType: Real Value: 113.57
Fid: 5 Name: TIMACT DataType: Time Value: 3:18:0:0
...
Troubleshooting
Q: The application shows "Error - exceeded initialization timeout (5 s)".
Sep 18, 2018 3:32:05 PM com.refinitiv.ema.access.ChannelCallbackClient reactorChannelEventCallback
WARNING: loggerMsg
ClientName: ChannelCallbackClient
Severity: Warning
Text: Received ChannelDownReconnecting event on channel Channel_1
RsslReactor Channel is null
Error Id 0
Internal sysError 0
Error Location Reactor.processWorkerEvent
Error text Error - exceeded initialization timeout (5 s)
loggerMsgEnd
A: The server IP or/and port number is incorrect, the server went down or unreachable due to network or firewall. Please contact your infrastructure or network administrator or the Account team to verify.
Q: The application shows "Error initializing channel: errorId=-1 text=Connection refused: no further information".
Sep 20, 2018 3:44:46 PM com.refinitiv.ema.access.ChannelCallbackClient reactorChannelEventCallback
WARNING: loggerMsg
ClientName: ChannelCallbackClient
Severity: Warning
Text: Received ChannelDownReconnecting event on channel Channel_1
RsslReactor Channel is null
Error Id 0
Internal sysError 0
Error Location Reactor.processWorkerEvent
Error text Error initializing channel: errorId=-1 text=Connection refused: no further information
loggerMsgEnd
A: The server process e.g. ADS, Refinitiv Real-Time host went down or the port number is incorrect. Please contact your infrastructure administrator or the Account team to verify.
Q: The application shows "Error initializing channel: errorId=-1 text=Handshake failed with far end. No more Protocols to try."
Sep 20, 2018 3:40:10 PM com.refinitiv.ema.access.ChannelCallbackClient reactorChannelEventCallback
WARNING: loggerMsg
ClientName: ChannelCallbackClient
Severity: Warning
Text: Received ChannelDownReconnecting event on channel Channel_1
RsslReactor Channel is null
Error Id 0
Internal sysError 0
Error Location Reactor.processWorkerEvent
Error text Error initializing channel: errorId=-1 text=Handshake failed with far end. No more Protocols to try.
loggerMsgEnd
A: The consumer application is trying to reach to the server on SSL (Source Sink library) connection; which default port is 8101. EMA supports only RSSL (Refinitiv Source Sink Layer) connection. Please change the port to be 14002 which is the default RSSL port. If your application can still not connect to the server, contact your infrastructure administrator or the Account team to confirm the correct server and port providing RSSL connection.
Q: The application shows "<user>, unknown to system."
Sep 20, 2018 3:48:06 PM com.refinitiv.ema.access.LoginCallbackClient rdmLoginMsgCallback
SEVERE: loggerMsg
ClientName: LoginCallbackClient
Severity: Error
Text: RDMLogin stream was closed with status message
username john
usernameType 1
State: Closed/Suspect/Not entitled - text: "john, unknown to system."
loggerMsgEnd
A: The consumer application logs in with the user who does not have the permission to access the server. Please contact your infrastructure administrator or the Account team to ask for a valid username of the server.
Q: The application shows "Service not up" error
Status Message of:IBM.N
Domain Type: MARKET_PRICE
Service Name: ELEKTRON
Item State: Open / Suspect / None / 'Service not up'
A: The consumer application request items from the down service. Please contact your infrastructure administrator or the Account team who can help you to bring the service up.
Q: The application shows "Service name of <service_name> is not found." error
Status Message of:IBM.N
Domain Type: MARKET_PRICE
Service Name: ELEKTON
Item State: Closed / Suspect / None / 'Service name of 'ELEKTON' is not found.'
A: The consumer application request items from the service which does not provide the item or the service name is wrong. Please contact your infrastructure administrator or the Account team to ask for the correct service name.
Q: The application shows "Capability not supported" error
Status Message of:VOD.L
Domain Type: MARKET_BY_PRICE
Service Name: DIRECT_FEED
Item State: Open / Suspect / None / 'Capability not supported'
A: The consumer application requests an item from the service which does not support the domain type of the item. In the example above, the service named DIRECT_FEED does not support MARKET_BY_PRICE domain type of the item named VOD.L. Please contact your infrastructure administrator or the Account team to ask for the service which supports the requested domain type.
Q: The application shows "The record could not be found" error
Status Message of:IBM.M
Domain Type: MARKET_PRICE
Service Name: ELEKTRON
Item State: Closed / Suspect / Not found / 'The record could not be found'
A: The requested item/RIC named is not available by the system. In the example above, the item named IBM.M which domain type is Market Price is not found on the service named ELEKTRON. The name of RIC may be incorrect. The user should verify the subscribed RIC name or contact the data support team via Get Support on My Account in order to find the correct RICs which can provide the data required.
Q: The application shows "Access Denied: <permission info>" error
Status Message of:IBM.N
Domain Type: MARKET_PRICE
Service Name: ELEKTRON
Item State: Closed / Suspect / Not entitled / 'Access Denied: User req to IDN for Exch - NYS'
A: The logged in user does not have the permission to access data. In the example above, the user does not have the permission to access data of IBM.N due to the lack of NYS Exchange permission. Please contact your infrastructure administrator or the Account team who can help you to grant the permission.
Summary
After finishing this article, you will understand more about Enterprise Message API. It shows how easier and faster to leverage EMA in Scala language to consume data according to your business requirements. This article also includes the solutions for the common problems. If you do not have a server where your application can consume data from, please contact the Account team for the process and details.