About this article
Update: October 2024
The intended audiences of this article are software developers/engineers interested in or are currently working with ATS (Real-Time Advanced Transformation Server) i.e. contribute records (RICs), fields, data to ATS or delete fields/RICs from ATS. The article will show how to use our strategic APIs, the Real-Time SDK (EMA and ETA) and WebSocket API to achieve this. For more details of Real-Time SDK family, please refer to the Real-Time SDK family page.
The article is structured in two parts:
- Part 1 - ATS and Posting Overview, what they are and how they can work together. The prerequisites and the overview steps to work with ATS. Finally, how to create a new RIC and its fields to ATS.
- Part 2 - Other operations which are updating field values, deleting fields or a RIC by using Real-Time APIs. The troubleshooting how to solve some common problems when you work with ATS.
Before working with ATS using Real-Time APIs, you should have a basic understanding of how to log in to the server using Real-Time APIs which can be learned from the following
- EMA C++ Tutorials
- ETA C Tutorials
- EMA Java Tutorials
- ETA Java Tutorials
- EMA C# Tutorials
- WebSocket Tutorials
- WebSocket Documentation and Examples
ATS and Posting Overview
What's ATS?
The Real-Time Advanced Transformation Server (ATS) is an analytical server that brings real-time data together from multiple sources, including internal contributed data, and transforms, centrally calculates, and publishes the resulting data onto the platform for further distribution and consumption by other applications and users.
Benefits
ATS can also serve as an internal cache for sharing prices across the organization or feeding them into further calculations and logical rules. Prices can be easily sent to ATS with the RtContribute() function from Workspace Excel. ATS also accepts prices in post messages sent from the Real-Time APIs to ATS via Real-Time Distribution System (RTDS). More than 100,000 updates can be shared every second by using ATS as a central cache.
ATS is the component that bridges the space between the Workspace and the Enterprise Platform. Business users can quickly create thousands of models that scale and produce real-time results to be shared across all client sites. ATS also provides highlight features e.g. Real-Time Calculations, Fault Tolerance with ATS Resiliency System. For more details, please refer to ATS documents.
Posting to ATS
ATS is the value-added server. It is positioned as a hybrid application that consumes and provides data on the RTDS network. In addition, it collaborates with other components both on the publication and subscription sides.
Through posting, API consumers can easily push content into any cache within the Real-Time infrastructure (i.e., an HTTP POST request). Data contributions/inserts into the ATS or publishing into a cache offer similar capabilities today. When posting, API consumer applications reuse their existing sessions to publish content to any cache(s) residing within the Real-Time infrastructure (i.e., a service provider(s) and/or infrastructure components). When compared to spreadsheets or other applications, posting offers a more efficient form of publishing, because the application does not need to create a separate provider session or manage event streams.
ATS supports OMM MarketPrice data format which increases the overall performance. The two types of posting are on-stream and off-stream:
- On-Stream Post: Before sending an on-stream post, the client must first open (request) a data stream for an item. After opening the data stream, the client application can then send a post. The route of the post is determined by the route of the data stream.
- Off-Stream Post: In an off-stream post, the client application can send a post for an item via a Login stream, regardless of whether a data stream first exists. The route of the post is determined by the Core Infrastructure (i.e., ADS server, ADH server, etc.) configuration.
The posting capability offers optional acknowledgments per posted message to indicate receipt of a specific message. The acknowledgment carries success or failure (i.e., a negative acknowledgment or ‘NAK’) information to the consumer. On the consumer side, it can choose whether it wants an acknowledgment back for a particular post message or not.
Visible Publisher Identifier (VIP)
The customer can also specify the originating data contributor which is called Visible Publisher Identifier (VPI) information with the Post message. You can use Visible Publisher Identifier data to identify the user ID and user address for users who post, insert or publish to the ATS server (or the Real-Time Distribution System infrastructure cache). Visible Publisher Identifier (VPI) consists of:
- Post User Id (i.e., publisher ID): which should be an ID associated with the user. For example, a DACS user ID or if unavailable, a process id)
- Post User Address (i.e., publisher address): which normally contains the IP address of the application posting the content. The EMA Java, C# and C++ APIs accept this value as a long value, so the customer must convert the IP address to a long value.
Optionally, such information can be carried along with republished messages so that receiving consumers can identify the posting user. The application can set VIP information via the following RTSDK APIs methods:
EMA Java, C# and C++ APIs:
- PostMsg.publisherId(long userId, long userAddress)
ETA Java API:
- PostMsg.postUserInfo().userAddr(long userAddr) or PostMsg.postUserInfo().userAddr(java.lang.String userAddrString)
- PostMsg.postUserInfo().userId(long userId)
ETA C++ API:
- RsslPostMsg.postUserInfo.postUserAddr = RsslUint32 address
- RsslPostMsg.postUserInfo.postUserId RsslUint32 userId
WebSocket API JSON Post Message:
- {"Type": "Post", "PostUserInfo": { "Address" : <string - address> , "UserId": <int - userId> } }
Prerequisites to Posting messages to ATS via the Real-Time Distribution System
ATS and Real-Time Distribution System (Real-Time Advanced Distribution and Real-Time Advanced Data Hub servers) must be ready to use. The Real-Time Distribution System can work with ATS properly. RSSL connection on Real-Time Advanced Distribution Server,/Real-Time Advanced Data Hub server must be enabled for consumer applications. To set up Real-Time Advanced Distribution Server, Real-Time Advanced Data Hub, and ATS, please contact your LSEG Account team (if the infrastructure is owned by LSEG) or your infrastructure administrator team (if the infrastructure is owned by your company).
Real-Time Distribution System must contain the ATS field definitions in the RDMFieldDictionary file as shown below:
!ACRONYM DDE ACRONYM FID RIPPLES TO FIELD TYPE LENGTH RWF TYPE RWF LEN
!------- ----------- --- ---------- ---------- ------ -------- -------
!
X_RIC_NAME "RIC NAME" -1 NULL ALPHANUMERIC 32 RMTES_STRING 32
X_ERRORMSG "X_ERRORMSG" -2 NULL ALPHANUMERIC 80 RMTES_STRING 80
X_LOLIM_FD "X_LOLIM_FD" -3 NULL ALPHANUMERIC 3 RMTES_STRING 3
X_HILIM_FD "X_HILIM_FD" -4 NULL ALPHANUMERIC 3 RMTES_STRING 3
X_LOW_LIM "X_LOW_LIM" -5 NULL ALPHANUMERIC 17 RMTES_STRING 17
X_HIGH_LIM "X_HIGH_LIM" -6 NULL ALPHANUMERIC 17 RMTES_STRING 17
X_ARRAY "X_ARRAY" -7 NULL ALPHANUMERIC 25 RMTES_STRING 25
X_BU "X_BU" -8 NULL ALPHANUMERIC 20 RMTES_STRING 20
X_CONTAINER "X_CONTAINER" -9 NULL ALPHANUMERIC 20 RMTES_STRING 20
X_PE "X_PE" -10 NULL ALPHANUMERIC 20 RMTES_STRING 20
X_MODEL "X_MODEL" -11 NULL ALPHANUMERIC 20 RMTES_STRING 20
X_LINK "X_LINK" -12 NULL ALPHANUMERIC 20 RMTES_STRING 20
X_ARGS "X_ARGS" -13 NULL ALPHANUMERIC 20 RMTES_STRING 20
X_HOLIDAYS "X_HOLIDAYS" -14 NULL ALPHANUMERIC 255 RMTES_STRING 255
X_PPE "X_PPE" -15 NULL ALPHANUMERIC 20 RMTES_STRING 20
To update Real-Time Distribution System dictionary file, please contact LSEG Account team (if the infrastructure is owned by LSEG) or your infrastructure administrator team (if the infrastructure is owned by your company).
3. For WebSocket applications, the WebSocket connection license and configuration need to set on Real-Time Advanced Distribution Server. Please contact LSEG team who can assist you to obtain the license.
Overview of steps to performing ATS operations
ATS allows you to insert/delete fields or RIC, update data by sending OMM post messages from consumer applications to Real-Time Distribution System which connects to ATS. In this article, we will use off-stream posting which is easier than on-stream posting to perform all operations. The overview steps are shown in the figure below:
Note:
- Before posting the first message in the step 3, the application needs to verify if it can log in to the server successfully. The article will show how to verify this in the later section.
- The application can perform the step 3 to send a post message repeatly depending on its requirements.
Each Real-Time API package provides a Posting example application using which you can start working with ATS:
- Real-Time-SDK Java\EMA package provides ex341_MP_OffStreamPost which demonstrates how to consume data, perform off-stream posting and decode ack/nak messages.
- Real-Time-SDK Java\ETA package provides Consumer which demonstrates how to consume data, perform off-stream or on-stream posting and decode ack/nak messages.
- Real-Time-SDK C++\EMA package provides 341__MP__OffStreamPost which demonstrates how to consume data, perform off-stream posting and decode ack/nak messages.
- Real-Time-SDK C++\ETA package provides rsslConsumer which demonstrates how to consume data, perform off-stream or on-stream posting and decode ack/nak messages.
- Real-Time SDK C#\EMA package provides 341_MP_OffStreamPost which demonstrates how to consume data, perform off-stream posting and decode ack/nak messages.
- WebSocket API Sample Applications package provides market_price_posting.rb which demonstrates how to consume data, perform on-stream posting and decode ack/nak messages in Ruby language which we will demonstrate in this article to perform each operation.
Sample Posting with Adding RIC and Fields
You can use ATS command, ATS_INSERT_S, to create a RIC/record by sending a post message from a consumer application as shown in step 3 in the figure above. Here's the sample of a post message that will add a RIC named NEW.RIC with field id 22 and 25 with value of 120 and 150 respectively:
<POST domainType="MARKET_PRICE" streamId="1" containerType="MSG" flags="0x66 (HAS_POST_ID|HAS_MSG_KEY|POST_COMPLETE|ACK)" postId="1" postUserId="18" postUserAddr="10.42.61.200" dataSize="39">
<key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_INSERT_S"/>
<dataBody>
<REFRESH domainType="MARKET_PRICE" streamId="0" containerType="FIELD_LIST" flags="0x00" groupId="0" State: Open/Ok/None - text: "" dataSize="23">
<dataBody>
<fieldList flags="0x08 (HAS_STANDARD_DATA)">
<fieldEntry fieldId="-1" data="4E45 572E 5249 43"/>
<fieldEntry fieldId="22" data="0F0C"/>
<fieldEntry fieldId="25" data="0F0F"/>
</fieldList>
</dataBody>
</REFRESH>
</dataBody>
</POST>
Notice that:
- The post message's domain type is Market Price. The streamId is 1 means the post message is sent via the login stream, off-stream posting. The message contains the postId and the flag ACK(to need an ack message) is set. It also contains Visible Publisher Identifier(VPI). VPI consists of postUserId and postUserAddr.
- The key name of the post message must be ATS_INSERT_S to inform ATS to insert a RIC/Record.
- The payload of the post message is a Refresh of Market Price message. The payload of the Refresh message is a field list.
- The field list consists of:
- The field Id -1 for the RIC/record name to insert to ATS.
- The fields i.e. field id 22(BID) and field id 25(ASK) with their initial values to be added to this RIC.
- Data values shown in the fields list are encoded OMM.
An example of success Ack message:
<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x12 (HAS_TEXT|HAS_MSG_KEY)" ackId="1" text="[4]: Creation Accepted" dataSize="0">
<key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_INSERT_S"/>
<dataBody>
</dataBody>
</ACK>
Notice that:
- The Ack message's domain type is Market Price. ackId is 1 which corresponds with the postId(1) of the post message. Hence, this is the result of the post message above.
- There is no NAK(Negative Acknowledge) code so ATS can perform the operation according to the post message successfully. That's mean ATS can insert the RIC/record with the fields and their initial values successfully.
The example of each Real-Time API snipped source code to create the post message above for inserting a RIC are below:
- EMA Java:
//Consumer.java in ex341_MP_OffStreamPost folder
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)
{
...
if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN &&
refreshMsg.state().streamState() == OmmState.StreamState.OPEN &&
refreshMsg.state().dataState() == OmmState.DataState.OK )
{
PostMsg postMsg = EmaFactory.createPostMsg();
RefreshMsg nestedRefreshMsg = EmaFactory.createRefreshMsg();
FieldList nestedFieldList = EmaFactory.createFieldList();
//FieldList is a collection in java
nestedFieldList.add(EmaFactory.createFieldEntry().ascii(-1, "NEW.RIC"));
nestedFieldList.add(EmaFactory.createFieldEntry().real(22, 12, OmmReal.MagnitudeType.EXPONENT_POS_1));
nestedFieldList.add(EmaFactory.createFieldEntry().real(25, 15, OmmReal.MagnitudeType.EXPONENT_POS_1));
nestedRefreshMsg.payload(nestedFieldList );
// The Post User address 170540488 (long) is converted from IP address 10.42.61.200
((OmmConsumer)event.closure()).submit( postMsg.postId(1).serviceId(267)
.name( "ATS_INSERT_S" ).solicitAck( true ).complete(true).publisherId(18,170540488)
.payload(nestedRefreshMsg), event.handle() );
}
...
}
- EMA C++:
/* Consumer.cpp in 341__MarketPrice__OffStreamPost */
void AppClient::onRefreshMsg( const RefreshMsg& refreshMsg, const OmmConsumerEvent& ommEvent )
{
...
if ( refreshMsg.getDomainType() == MMT_LOGIN &&
refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&
refreshMsg.getState().getDataState() == OmmState::OkEnum )
{
// The Post User address 170540488 (long) is converted from IP address 10.42.61.200
_pOmmConsumer->submit( PostMsg().postId( 1 ).serviceId( 267 ).name( "ATS_INSERT_S" ).solicitAck( true)
.publisherId(18,170540488).complete().payload(RefreshMsg().payload( FieldList().addAscii(-1,"NEW.RIC").addReal( 22, 12, OmmReal::ExponentPos1Enum).addReal( 25, 15, OmmReal::ExponentPos1Enum ).complete() ).complete() ), ommEvent.getHandle() );
}
...
}
- EMA C#:
// Consumer.cs in 341_MP_OffStreamPost folder
public void OnRefreshMsg(RefreshMsg refreshMsg, IOmmConsumerEvent consumerEvent)
{
...
if (refreshMsg.DomainType() == EmaRdm.MMT_LOGIN &&
refreshMsg.State().StreamState == OmmState.StreamStates.OPEN &&
refreshMsg.State().DataState == OmmState.DataStates.OK)
{
PostMsg postMsg = new();
RefreshMsg nestedRefreshMsg = new();
FieldList nestedFieldList = new();
//FieldList is a collection
nestedFieldList.AddAscii(-1, "NEW.RIC");
nestedFieldList.AddReal(22, 12, OmmReal.MagnitudeTypes.EXPONENT_POS_1);
nestedFieldList.AddReal(25, 13, OmmReal.MagnitudeTypes.EXPONENT_POS_1);
nestedFieldList.Complete();
nestedRefreshMsg.Payload(nestedFieldList).Complete(true);
// The Post User address 170540488 (long) is converted from IP address 10.42.61.200
((OmmConsumer)consumerEvent!.Closure!).Submit(postMsg.PostId(postId++).ServiceId(267)
.Name("ATS_INSERT_S").SolicitAck(true).Complete(true).PublisherId(18,170540488)
.Payload(nestedRefreshMsg), consumerEvent.Handle);
}
...
}
- ETA Java:
//PostHandler.java in Consumer folder
//To declare variables used to create a post message for inserting a RIC on ATS
private RefreshMsg refreshMsg = (RefreshMsg)CodecFactory.createMsg();
protected FieldList fieldList = CodecFactory.createFieldList();
protected FieldEntry fieldEntry = CodecFactory.createFieldEntry();
protected UInt tempUInt = CodecFactory.createUInt();
...
// To create a post message for inserting a RIC on ATS
//Next, the method is called by sendOffstreamPostMsg(..) which sends an off-stream post message in Consumer ETA Example application.
private int encodePostWithMsg(ChannelSession chnlSession, TransportBuffer msgBuf)
{
// First encode message for payload
postMsg.clear();
// set-up message
postMsg.msgClass(MsgClasses.POST);
postMsg.streamId(1);
postMsg.domainType(DomainTypes.MARKET_PRICE);
postMsg.containerType(DataTypes.MSG);
// Note: post message key not required for on-stream post
postMsg.applyPostComplete();
postMsg.applyAck();
postMsg.applyHasPostId();
postMsg.postId(1);
postMsg.applyHasMsgKey();
postMsg.msgKey().applyHasServiceId();
postMsg.msgKey().serviceId(267);
postMsg.msgKey().applyHasName();
postMsg.msgKey().name().data("ATS_INSERT_S");
// populate post user info
postMsg.postUserInfo().userAddr("10.42.61.200");
postMsg.postUserInfo().userId(18);
// encode post message
encIter.clear();
int ret = encIter.setBufferAndRWFVersion(msgBuf, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());
if (ret != CodecReturnCodes.SUCCESS)
{
System.out.println("Encoder.setBufferAndRWFVersion() failed: <" + CodecReturnCodes.toString(ret) + ">");
return ret;
}
ret = postMsg.encodeInit(encIter, 0);
if (ret != CodecReturnCodes.ENCODE_CONTAINER)
{
System.out.println("EncodeMsgInit() failed: <" + CodecReturnCodes.toString(ret) + ">");
return ret;
}
// get a buffer for nested market price refresh
postNestedMsgPayLoad = CodecFactory.createBuffer();
postNestedMsgPayLoad.data(ByteBuffer.allocate(1024));
// Although we are encoding RWF message, this code
// encodes nested message into a separate buffer.
// this is because MarketPrice.encode message is shared by all
// applications, and it expects to encode the message into a stand alone
// buffer.
ret = encIter.encodeNonRWFInit(postNestedMsgPayLoad);
if (ret != CodecReturnCodes.SUCCESS)
{
System.out.println("EncodeNonRWFDataTypeInit() failed: <" + CodecReturnCodes.toString(ret));
return CodecReturnCodes.FAILURE;
}
postMsgEncIter.clear();
ret = postMsgEncIter.setBufferAndRWFVersion(postNestedMsgPayLoad, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());
if (ret != CodecReturnCodes.SUCCESS)
{
System.out.println("EncodeIter.setBufferAndRWFVersion() failed: <" + CodecReturnCodes.toString(ret));
return CodecReturnCodes.FAILURE;
}
ret = encodeCreateRIC(postMsgEncIter);
if (ret != CodecReturnCodes.SUCCESS) {
System.out.println("ATS_INSERT_S failed: <" + CodecReturnCodes.toString(ret));
return CodecReturnCodes.FAILURE;
}
shouldOffstreamPost = false; //set flag to send post message once
ret = encIter.encodeNonRWFComplete(postNestedMsgPayLoad, true);
if (ret != CodecReturnCodes.SUCCESS)
{
System.out.println("EncodeNonRWFDataTypeComplete() failed: <" + CodecReturnCodes.toString(ret));
return CodecReturnCodes.FAILURE;
}
// complete encode message
if ((ret = postMsg.encodeComplete(encIter, true)) < CodecReturnCodes.SUCCESS)
{
System.out.println("EncodeMsgComplete() failed with return code: " + ret);
return ret;
}
System.out.println("\n\nSENDING POST WITH MESSAGE:\n" + " streamId = " + postMsg.streamId() + "\n postId = " + postMsg.postId());
return CodecReturnCodes.SUCCESS;
}
// To create a payload of the post message
private int encodeCreateRIC(EncodeIterator encodeIter) {
fieldList.clear();
fieldEntry.clear();
tempUInt.clear();
tempReal.clear();
// set-up message
Msg msg = encodeRefreshMsg();
// encode message
int ret = msg.encodeInit(encodeIter, 0);
if (ret < CodecReturnCodes.SUCCESS)
{
return ret;
}
// encode field list
fieldList.applyHasStandardData();
ret = fieldList.encodeInit(encodeIter, null, 0);
if (ret < CodecReturnCodes.SUCCESS)
{
return ret;
}
// X_RIC_NAME
fieldEntry.clear();
fieldEntry.fieldId(-1);
fieldEntry.dataType(DataTypes.ASCII_STRING);
Buffer aRIC = CodecFactory.createBuffer();
aRIC.data("NEW.RIC");
ret = fieldEntry.encode(encodeIter, aRIC);
if (ret < CodecReturnCodes.SUCCESS)
{
return ret;
}
// BID
fieldEntry.clear();
DictionaryEntry dictionaryEntry = dictionary.entry(22);
if (dictionaryEntry != null)
{
fieldEntry.fieldId(22);
fieldEntry.dataType(dictionaryEntry.rwfType());
tempReal.clear();
tempReal.value(12, RealHints.EXPONENT1);
ret = fieldEntry.encode(encodeIter, tempReal);
if (ret < CodecReturnCodes.SUCCESS)
{
return ret;
}
}
// ASK
fieldEntry.clear();
dictionaryEntry = dictionary.entry(25);
if (dictionaryEntry != null)
{
fieldEntry.fieldId(25);
fieldEntry.dataType(dictionaryEntry.rwfType());
tempReal.clear();
tempReal.value(15, RealHints.EXPONENT1);
ret = fieldEntry.encode(encodeIter, tempReal);
if (ret < CodecReturnCodes.SUCCESS)
{
return ret;
}
}
// complete encode field list
ret = fieldList.encodeComplete(encodeIter, true);
if (ret < CodecReturnCodes.SUCCESS)
{
return ret;
}
// complete encode message
return msg.encodeComplete(encodeIter, true);
}
// To create a refresh message which is in the payload of the post message
public Msg encodeRefreshMsg()
{
refreshMsg.clear();
refreshMsg.msgClass(MsgClasses.REFRESH);
refreshMsg.streamId(0);
refreshMsg.domainType(DomainTypes.MARKET_PRICE);
refreshMsg.containerType(DataTypes.FIELD_LIST);
refreshMsg.state().streamState(StreamStates.OPEN);
refreshMsg.state().dataState(DataStates.OK);
refreshMsg.state().code(StateCodes.NONE);
refreshMsg.state().text().data("");
return refreshMsg;
}
- ETA C:
/* rsslPostHandler.c in Consumer folder */
static RsslRet encodePostWithMsg(RsslChannel* chnl, RsslBuffer* msgBuf)
{
RsslRet ret = 0;
RsslPostMsg postMsg = RSSL_INIT_POST_MSG;
RsslDataDictionary* dictionary = getDictionary();
RsslBool isSolicited = RSSL_FALSE; // for post
RsslUInt16 serviceId = (RsslUInt16)getServiceId();
RsslBuffer payloadMsgBuf = RSSL_INIT_BUFFER;
RsslEncodeIterator encodeIter = RSSL_INIT_ENCODE_ITERATOR;
RsslBuffer hostName = RSSL_INIT_BUFFER;
char hostNameBuf[] = "10.42.61.200";
/* set-up message */
postMsg.msgBase.msgClass = RSSL_MC_POST;
postMsg.msgBase.streamId = 1;
postMsg.msgBase.domainType = RSSL_DMT_MARKET_PRICE;
postMsg.msgBase.containerType = RSSL_DT_MSG;
// Note: post message key not required for on-stream post
postMsg.flags = RSSL_PSMF_POST_COMPLETE
| RSSL_PSMF_ACK // request ACK
| RSSL_PSMF_HAS_POST_ID
| RSSL_PSMF_HAS_MSG_KEY;
postMsg.postId = 1
/* populate post user info */
hostName.data = hostNameBuf;
hostName.length = (RsslUInt32)strlen(hostName.data);
if ((ret = rsslHostByName(&hostName, &postMsg.postUserInfo.postUserAddr)) < RSSL_RET_SUCCESS)
{
printf("Populating postUserInfo failed. Error %s (%d) with rsslHostByName: %s\n",
rsslRetCodeToString(ret), ret, rsslRetCodeInfo(ret));
return ret;
}
postMsg.postUserInfo.postUserId = 18;
postMsg.msgBase.msgKey.flags = RSSL_MKF_HAS_NAME | RSSL_MKF_HAS_SERVICE_ID;
postMsg.msgBase.msgKey.name.data = "ATS_INSERT_S";
postMsg.msgBase.msgKey.name.length = strlen("ATS_INSERT_S");
postMsg.msgBase.msgKey.serviceId = (RsslUInt16)267;
// encode message
if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeIteratorBuffer() failed with return code: %d\n", ret);
return ret;
}
rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);
if ((ret = rsslEncodeMsgInit(&encodeIter, (RsslMsg*)&postMsg, 0)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);
return ret;
}
ret = rsslEncodeNonRWFDataTypeInit(&encodeIter, &payloadMsgBuf);
if (ret != RSSL_RET_SUCCESS)
{
printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");
return RSSL_RET_FAILURE;
}
ret = encodeCreateRIC(chnl, &payloadMsgBuf, dictionary);
if (ret != RSSL_RET_SUCCESS)
{
printf("encodePostWithMsg: encodeCreateRIC() failed\n");
return RSSL_RET_FAILURE;
}
shouldOffstreamPost = FALSE; //set flag to send a post message once
ret = rsslEncodeNonRWFDataTypeComplete(&encodeIter, &payloadMsgBuf, RSSL_TRUE);
if (ret != RSSL_RET_SUCCESS)
{
printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");
return RSSL_RET_FAILURE;
}
/* complete encode message */
if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);
return ret;
}
msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);
printf("\n\nSENDING POST WITH MESSAGE:\n");
printf(" streamId = %d\n", postMsg.msgBase.streamId);
printf(" postId = %d\n", postMsg.postId);
return RSSL_RET_SUCCESS;
}
RsslRet encodeCreateRIC(RsslChannel* chnl, RsslBuffer* msgBuf, RsslDataDictionary* dictionary)
{
RsslRet ret = 0;
RsslRefreshMsg refreshMsg = RSSL_INIT_REFRESH_MSG;
RsslUpdateMsg updateMsg = RSSL_INIT_UPDATE_MSG;
RsslMsgBase* msgBase;
RsslMsg* msg;
RsslFieldList fList = RSSL_INIT_FIELD_LIST;
RsslFieldEntry fEntry = RSSL_INIT_FIELD_ENTRY;
char stateText[MAX_ITEM_INFO_STRLEN];
char errTxt[256];
RsslBuffer errorText = { 255, (char*)errTxt };
RsslBuffer tempBuffer;
RsslReal tempReal = RSSL_INIT_REAL;
RsslDictionaryEntry* dictionaryEntry = NULL;
RsslEncodeIterator encodeIter;
double price;
/* clear encode iterator */
rsslClearEncodeIterator(&encodeIter);
/* set-up message */
/* set message depending on whether refresh or update */
msgBase = &refreshMsg.msgBase;
msgBase->msgClass = RSSL_MC_REFRESH;
refreshMsg.state.streamState = RSSL_STREAM_OPEN;
refreshMsg.state.dataState = RSSL_DATA_OK;
refreshMsg.state.code = RSSL_SC_NONE;
snprintf(stateText, 128, "Item Refresh Completed");
refreshMsg.state.text.data = stateText;
refreshMsg.state.text.length = (RsslUInt32)strlen(stateText);
msg = (RsslMsg *)&refreshMsg;
msgBase->domainType = RSSL_DMT_MARKET_PRICE;
msgBase->containerType = RSSL_DT_FIELD_LIST;
/* encode message */
if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)
{
printf("rsslSetEncodeIteratorBuffer() failed with return code: %d\n", ret);
return ret;
}
rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);
if ((ret = rsslEncodeMsgInit(&encodeIter, msg, 0)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);
return ret;
}
/* encode field list */
fList.flags = RSSL_FLF_HAS_STANDARD_DATA;
if ((ret = rsslEncodeFieldListInit(&encodeIter, &fList, 0, 0)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeFieldListInit() failed with return code: %d\n", ret);
return ret;
}
/* encode fields */
/* X_RIC_NAME */
rsslClearFieldEntry(&fEntry);
fEntry.fieldId = -1;
fEntry.dataType = RSSL_DT_ASCII_STRING;
tempBuffer.data = "NEW.RIC";
tempBuffer.length = strlen("NEW.RIC");
if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempBuffer)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
return ret;
}
rsslClearFieldEntry(&fEntry);
dictionaryEntry = dictionary->entriesArray[22];
if (dictionaryEntry)
{
fEntry.fieldId = 22;
fEntry.dataType = dictionaryEntry->rwfType;
rsslClearReal(&tempReal);
price = 12;
rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);
if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
return ret;
}
}
rsslClearFieldEntry(&fEntry);
dictionaryEntry = dictionary->entriesArray[25];
if (dictionaryEntry)
{
fEntry.fieldId = 25;
fEntry.dataType = dictionaryEntry->rwfType;
rsslClearReal(&tempReal);
price = 15;
rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);
if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
return ret;
}
}
/* complete encode field list */
if ((ret = rsslEncodeFieldListComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeFieldListComplete() failed with return code: %d\n", ret);
return ret;
}
/* complete encode message */
if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);
return ret;
}
msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);
return RSSL_RET_SUCCESS;
}
- WebSocket API in Ruby:
# market_price_posting.rb in ruby folder
if message_type == 'Refresh' then
message_domain = message_json['Domain']
if message_domain != nil then
if message_domain == 'Login' then
mp_post_json_hash = {
'ID' => 1,
'Type' => 'Post',
'Domain' => 'MarketPrice',
'Ack' => true,
'PostID' => 1,
'PostUserInfo' => {
'Address' => '10.42.61.200',
'UserID' => 18
},
'Key' => {
'Name' => 'ATS_INSERT_S',
'Service' => 267
},
'Message' => {
'ID' => 0,
'Type' => 'Refresh',
'Domain' => 'MarketPrice',
'Fields' => {'X_RIC_NAME' => 'NEW.RIC' ,'BID' => 120, 'ASK' => 150}
}
}
ws.send mp_post_json_hash.to_json.to_s
end
end
end
Adding Fields
You can use ATS command, ATS_ADDFIELD_S, to add fields by sending a post message from a consumer application as shown in step 3 in the figure above. Here's the sample of a post message that will add field id 12 and 13 with the value of 220 and 30 respectively to a RIC named NEW.RIC:
<POST domainType="MARKET_PRICE" streamId="1" containerType="MSG" flags="0x66 (HAS_POST_ID|HAS_MSG_KEY|POST_COMPLETE|ACK)" postId="2" postUserId="18" postUserAddr="10.42.61.200" dataSize="34">
<key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_ADDFIELD_S"/>
<dataBody>
<UPDATE domainType="MARKET_PRICE" streamId="0" containerType="FIELD_LIST" flags="0x00" updateType="0" dataSize="23">
<dataBody>
<fieldList flags="0x08 (HAS_STANDARD_DATA)">
<fieldEntry fieldId="-1" data="4E45 572E 5249 43"/>
<fieldEntry fieldId="12" data="0F16"/>
<fieldEntry fieldId="13" data="0F03"/>
</fieldList>
</dataBody>
</UPDATE>
</dataBody>
</POST>
Notice that:
- The post message's domain type is Market Price. The streamId is 1 means the post message is sent via the login stream, off-stream posting. The message contains the postId and the flag ACK(to need an ack message) is set. It also contains Visible Publisher Identifier(VPI). VPI consists of postUserId and postUserAddr.
- The key name of the post message must be ATS_ADDFIELD_S to inform ATS to add the fields.
- The payload of the post message is an Update of Market Price message. The payload of the Update message is a field list.
- The field list consists of:
- The Field Id -1 for the RIC/record name which the fields are added.
- The Fields i.e. field id 12(HIGH_1) and field id 13(LOW_1) with their initial values to be added to this RIC.
- Data values shown in the fields list are encoded OMM.
An example of success Ack message:
<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x12 (HAS_TEXT|HAS_MSG_KEY)" ackId="2" text="[4]: Creation Accepted" dataSize="0">
<key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_ADDFIELD_S"/>
<dataBody>
</dataBody>
</ACK>
Notice that:
- The Ack message's domain type is Market Price. The ackId is 2 which corresponds with the postId(2) of the post message. Hence, this is the result of the post message above.
- There is no NAK(Negative Acknowledge) code so ATS can perform the operation according to the post message successfully. That's mean ATS can add the fields and their initial values to the RIC successfully.
The example of each Real-Time API snipped source code to create the post message above for adding the fields are below:
- EMA Java:
//Consumer.java in ex341_MP_OffStreamPost folder
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)
{
...
if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN &&
refreshMsg.state().streamState() == OmmState.StreamState.OPEN &&
refreshMsg.state().dataState() == OmmState.DataState.OK )
{
PostMsg postMsg = EmaFactory.createPostMsg();
UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg();
FieldList nestedFieldList = EmaFactory.createFieldList();
//FieldList is a collection in java
nestedFieldList.add(EmaFactory.createFieldEntry().ascii(-1, "NEW.RIC"));
nestedFieldList.add(EmaFactory.createFieldEntry().real(12, 22, OmmReal.MagnitudeType.EXPONENT_POS_1));
nestedFieldList.add(EmaFactory.createFieldEntry().real(13, 3, OmmReal.MagnitudeType.EXPONENT_POS_1));
nestedUpdateMsg.payload(nestedFieldList );
// The Post User address 170540488 (long) is converted from IP address 10.42.61.200
((OmmConsumer)event.closure()).submit( postMsg.postId( 2 ).serviceId(267)
.name( "ATS_ADDFIELD_S" ).solicitAck( true ).complete(true).publisherId(18,170540488)
.payload(nestedUpdateMsg), event.handle() );
}
...
}
- EMA C++:
/* Consumer.cpp in 341__MarketPrice__OffStreamPost */
void AppClient::onRefreshMsg( const RefreshMsg& refreshMsg, const OmmConsumerEvent& ommEvent )
{
...
if ( refreshMsg.getDomainType() == MMT_LOGIN &&
refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&
refreshMsg.getState().getDataState() == OmmState::OkEnum )
{
// The Post User address 170540488 (long) is converted from IP address 10.42.61.200
_pOmmConsumer->submit(PostMsg().postId(2).serviceId( 267 ).name("ATS_ADDFIELD_S").solicitAck(true).publisherId(18,170540488)
.complete().payload(UpdateMsg().payload(FieldList().addAscii(-1, "NEW.RIC").addReal(12, 22, OmmReal::ExponentPos1Enum).addReal(13, 3, OmmReal::ExponentPos1Enum).complete())), ommEvent.getHandle());
}
...
}
- EMA C#:
// Consumer.cs in 341_MP_OffStreamPost folder
public void OnRefreshMsg(RefreshMsg refreshMsg, IOmmConsumerEvent consumerEvent)
{
...
if (refreshMsg.DomainType() == EmaRdm.MMT_LOGIN &&
refreshMsg.State().StreamState == OmmState.StreamStates.OPEN &&
refreshMsg.State().DataState == OmmState.DataStates.OK)
{
PostMsg postMsg = new();
UpdateMsg nestedUpdateMsg = new();
FieldList nestedFieldList = new();
//FieldList is a collection
nestedFieldList.AddAscii(-1, "NEW.RIC");
nestedFieldList.AddReal(12, 22, OmmReal.MagnitudeTypes.EXPONENT_POS_1);
nestedFieldList.AddReal(13, 3, OmmReal.MagnitudeTypes.EXPONENT_POS_1);
nestedFieldList.Complete();
nestedUpdateMsg.Payload(nestedFieldList);
// The Post User address 170540488 (long) is converted from IP address 10.42.61.200
((OmmConsumer)consumerEvent!.Closure!).Submit(postMsg.PostId(2).ServiceId(267)
.Name("ATS_INSERT_S").SolicitAck(true).Complete(true).PublisherId(18,170540488)
.Payload(nestedUpdateMsg), consumerEvent.Handle);
}
...
}
- ETA Java:
//PostHandler.java in Consumer folder
//To declare variables used to create a post message for adding some fields of a RIC on ATS
private UpdateMsg updateMsg = (UpdateMsg)CodecFactory.createMsg();
protected FieldList fieldList = CodecFactory.createFieldList();
protected FieldEntry fieldEntry = CodecFactory.createFieldEntry();
protected UInt tempUInt = CodecFactory.createUInt();
...
//To create a postMessage for adding fields on ATS
//Next, the method is called by sendOffstreamPostMsg(..) which sends an off-stream post message in Consumer ETA Example application.
private int encodePostWithMsg(ChannelSession chnlSession, TransportBuffer msgBuf)
{
// First encode message for payload
postMsg.clear();
// set-up message
postMsg.msgClass(MsgClasses.POST);
postMsg.streamId(1);
postMsg.domainType(DomainTypes.MARKET_PRICE);
postMsg.containerType(DataTypes.MSG);
// Note: post message key not required for on-stream post
postMsg.applyPostComplete();
postMsg.applyAck();
postMsg.applyHasPostId();
postMsg.postId(2);
postMsg.applyHasMsgKey();
postMsg.msgKey().applyHasServiceId();
postMsg.msgKey().serviceId(267);
postMsg.msgKey().applyHasName();
postMsg.msgKey().name().data("ATS_ADDFIELD_S");
// populate post user info
postMsg.postUserInfo().userAddr("10.42.61.200");
postMsg.postUserInfo().userId(18);
// encode post message
encIter.clear();
int ret = encIter.setBufferAndRWFVersion(msgBuf, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());
if (ret != CodecReturnCodes.SUCCESS)
{
System.out.println("Encoder.setBufferAndRWFVersion() failed: <" + CodecReturnCodes.toString(ret) + ">");
return ret;
}
ret = postMsg.encodeInit(encIter, 0);
if (ret != CodecReturnCodes.ENCODE_CONTAINER)
{
System.out.println("EncodeMsgInit() failed: <" + CodecReturnCodes.toString(ret) + ">");
return ret;
}
// get a buffer for nested market price refresh
postNestedMsgPayLoad = CodecFactory.createBuffer();
postNestedMsgPayLoad.data(ByteBuffer.allocate(1024));
// Although we are encoding RWF message, this code
// encodes nested message into a separate buffer.
// this is because MarketPrice.encode message is shared by all
// applications, and it expects to encode the message into a stand alone
// buffer.
ret = encIter.encodeNonRWFInit(postNestedMsgPayLoad);
if (ret != CodecReturnCodes.SUCCESS)
{
System.out.println("EncodeNonRWFDataTypeInit() failed: <" + CodecReturnCodes.toString(ret));
return CodecReturnCodes.FAILURE;
}
postMsgEncIter.clear();
ret = postMsgEncIter.setBufferAndRWFVersion(postNestedMsgPayLoad, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());
if (ret != CodecReturnCodes.SUCCESS)
{
System.out.println("EncodeIter.setBufferAndRWFVersion() failed: <" + CodecReturnCodes.toString(ret));
return CodecReturnCodes.FAILURE;
}
ret = encodeAddFields(postMsgEncIter);
if (ret != CodecReturnCodes.SUCCESS) {
System.out.println("ATS_ADDFIELD_S failed: <" + CodecReturnCodes.toString(ret));
return CodecReturnCodes.FAILURE;
}
shouldOffstreamPost = false; //set flag to send post message once
ret = encIter.encodeNonRWFComplete(postNestedMsgPayLoad, true);
if (ret != CodecReturnCodes.SUCCESS)
{
System.out.println("EncodeNonRWFDataTypeComplete() failed: <" + CodecReturnCodes.toString(ret));
return CodecReturnCodes.FAILURE;
}
// complete encode message
if ((ret = postMsg.encodeComplete(encIter, true)) < CodecReturnCodes.SUCCESS)
{
System.out.println("EncodeMsgComplete() failed with return code: " + ret);
return ret;
}
System.out.println("\n\nSENDING POST WITH MESSAGE:\n" + " streamId = " + postMsg.streamId() + "\n postId = " + postMsg.postId());
return CodecReturnCodes.SUCCESS;
}
// To create a payload of the post message
private int encodeAddFields(EncodeIterator encodeIter) {
fieldList.clear();
fieldEntry.clear();
tempUInt.clear();
tempReal.clear();
// set-up message
Msg msg = encodeUpdateMsg();
// encode message
int ret = msg.encodeInit(encodeIter, 0);
if (ret < CodecReturnCodes.SUCCESS)
{
return ret;
}
// encode field list
fieldList.applyHasStandardData();
ret = fieldList.encodeInit(encodeIter, null, 0);
if (ret < CodecReturnCodes.SUCCESS)
{
return ret;
}
// X_RIC_NAME
fieldEntry.clear();
fieldEntry.fieldId(-1);
fieldEntry.dataType(DataTypes.ASCII_STRING);
Buffer aRIC = CodecFactory.createBuffer();
aRIC.data("NEW.RIC");
ret = fieldEntry.encode(encodeIter, aRIC);
if (ret < CodecReturnCodes.SUCCESS)
{
return ret;
}
fieldEntry.clear();
DictionaryEntry dictionaryEntry = dictionary.entry(12);
if (dictionaryEntry != null)
{
fieldEntry.fieldId(12);
fieldEntry.dataType(dictionaryEntry.rwfType());
tempReal.clear();
tempReal.value(22, RealHints.EXPONENT1);
ret = fieldEntry.encode(encodeIter, tempReal);
if (ret < CodecReturnCodes.SUCCESS)
{
return ret;
}
}
fieldEntry.clear();
dictionaryEntry = dictionary.entry(13);
if (dictionaryEntry != null)
{
fieldEntry.fieldId(13);
fieldEntry.dataType(dictionaryEntry.rwfType());
tempReal.clear();
tempReal.value(3, RealHints.EXPONENT1);
ret = fieldEntry.encode(encodeIter, tempReal);
if (ret < CodecReturnCodes.SUCCESS)
{
return ret;
}
}
// complete encode field list
ret = fieldList.encodeComplete(encodeIter, true);
if (ret < CodecReturnCodes.SUCCESS)
{
return ret;
}
// complete encode message
return msg.encodeComplete(encodeIter, true);
}
// To create an update message which is in the payload of the post message
public Msg encodeUpdateMsg()
{
updateMsg.clear();
updateMsg.msgClass(MsgClasses.UPDATE);
updateMsg.streamId(0);
updateMsg.domainType(DomainTypes.MARKET_PRICE);
updateMsg.containerType(DataTypes.FIELD_LIST);
return updateMsg;
}
- ETA C:
/* rsslPostHandler.c in Consumer folder */
static RsslRet encodePostWithMsg(RsslChannel* chnl, RsslBuffer* msgBuf)
{
RsslRet ret = 0;
RsslPostMsg postMsg = RSSL_INIT_POST_MSG;
RsslDataDictionary* dictionary = getDictionary();
RsslBool isSolicited = RSSL_FALSE; // ??? for post
RsslUInt16 serviceId = (RsslUInt16)getServiceId();
RsslBuffer payloadMsgBuf = RSSL_INIT_BUFFER;
RsslEncodeIterator encodeIter = RSSL_INIT_ENCODE_ITERATOR;
RsslBuffer hostName = RSSL_INIT_BUFFER;
char hostNameBuf[] = "10.42.61.200";
/* set-up message */
postMsg.msgBase.msgClass = RSSL_MC_POST;
postMsg.msgBase.streamId = 1;
postMsg.msgBase.domainType = RSSL_DMT_MARKET_PRICE;
postMsg.msgBase.containerType = RSSL_DT_MSG;
// Note: post message key not required for on-stream post
postMsg.flags = RSSL_PSMF_POST_COMPLETE
| RSSL_PSMF_ACK // request ACK
| RSSL_PSMF_HAS_POST_ID
| RSSL_PSMF_HAS_MSG_KEY;
postMsg.postId = 2;
/* populate post user info */
hostName.data = hostNameBuf;
hostName.length = (RsslUInt32)strlen(hostName.data);
if ((ret = rsslHostByName(&hostName, &postMsg.postUserInfo.postUserAddr)) < RSSL_RET_SUCCESS)
{
printf("Populating postUserInfo failed. Error %s (%d) with rsslHostByName: %s\n",
rsslRetCodeToString(ret), ret, rsslRetCodeInfo(ret));
return ret;
}
postMsg.postUserInfo.postUserId = 18;
postMsg.msgBase.msgKey.flags = RSSL_MKF_HAS_NAME | RSSL_MKF_HAS_SERVICE_ID;
postMsg.msgBase.msgKey.name.data = (char *)"ATS_ADDFIELD_S";
postMsg.msgBase.msgKey.name.length = (RsslUInt32)strlen("ATS_ADDFIELD_S");
postMsg.msgBase.msgKey.serviceId = (RsslUInt16)267;
// encode message
if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeIteratorBuffer() failed with return code: %d\n", ret);
return ret;
}
rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);
if ((ret = rsslEncodeMsgInit(&encodeIter, (RsslMsg*)&postMsg, 0)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);
return ret;
}
ret = rsslEncodeNonRWFDataTypeInit(&encodeIter, &payloadMsgBuf);
if (ret != RSSL_RET_SUCCESS)
{
printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");
return RSSL_RET_FAILURE;
}
ret = encodeAddFields(chnl, &payloadMsgBuf, dictionary);
if (ret != RSSL_RET_SUCCESS)
{
printf("encodePostWithMsg: encodeAddFields() failed\n");
return RSSL_RET_FAILURE;
}
shouldOffstreamPost = FALSE; //set flag to send a post message once
ret = rsslEncodeNonRWFDataTypeComplete(&encodeIter, &payloadMsgBuf, RSSL_TRUE);
if (ret != RSSL_RET_SUCCESS)
{
printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");
return RSSL_RET_FAILURE;
}
/* complete encode message */
if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);
return ret;
}
msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);
printf("\n\nSENDING POST WITH MESSAGE:\n");
printf(" streamId = %d\n", postMsg.msgBase.streamId);
printf(" postId = %d\n", postMsg.postId);
return RSSL_RET_SUCCESS;
}
RsslRet encodeAddFields(RsslChannel* chnl, RsslBuffer* msgBuf, RsslDataDictionary* dictionary)
{
RsslRet ret = 0;
RsslRefreshMsg refreshMsg = RSSL_INIT_REFRESH_MSG;
RsslUpdateMsg updateMsg = RSSL_INIT_UPDATE_MSG;
RsslMsgBase* msgBase;
RsslMsg* msg;
RsslFieldList fList = RSSL_INIT_FIELD_LIST;
RsslFieldEntry fEntry = RSSL_INIT_FIELD_ENTRY;
char errTxt[256];
RsslBuffer errorText = { 255, (char*)errTxt };
RsslBuffer tempBuffer;
RsslReal tempReal = RSSL_INIT_REAL;
RsslDictionaryEntry* dictionaryEntry = NULL;
RsslEncodeIterator encodeIter;
double price;
/* clear encode iterator */
rsslClearEncodeIterator(&encodeIter);
/* set-up message */
/* set message depending on whether refresh or update */
msgBase = &updateMsg.msgBase;
msgBase->msgClass = RSSL_MC_UPDATE;
msg = (RsslMsg *)&updateMsg;
msgBase->domainType = RSSL_DMT_MARKET_PRICE;
msgBase->containerType = RSSL_DT_FIELD_LIST;
/* encode message */
if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)
{
printf("rsslSetEncodeIteratorBuffer() failed with return code: %d\n", ret);
return ret;
}
rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);
if ((ret = rsslEncodeMsgInit(&encodeIter, msg, 0)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);
return ret;
}
/* encode field list */
fList.flags = RSSL_FLF_HAS_STANDARD_DATA;
if ((ret = rsslEncodeFieldListInit(&encodeIter, &fList, 0, 0)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeFieldListInit() failed with return code: %d\n", ret);
return ret;
}
/* encode fields */
/* X_RIC_NAME */
rsslClearFieldEntry(&fEntry);
fEntry.fieldId = -1;
fEntry.dataType = RSSL_DT_ASCII_STRING;
tempBuffer.data = (char *)"NEW.RIC";
tempBuffer.length = (RsslUInt32)strlen("NEW.RIC");
if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempBuffer)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
return ret;
}
rsslClearFieldEntry(&fEntry);
dictionaryEntry = dictionary->entriesArray[12];
if (dictionaryEntry)
{
fEntry.fieldId = 12;
fEntry.dataType = dictionaryEntry->rwfType;
rsslClearReal(&tempReal);
price = 22;
rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);
if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
return ret;
}
}
rsslClearFieldEntry(&fEntry);
dictionaryEntry = dictionary->entriesArray[13];
if (dictionaryEntry)
{
fEntry.fieldId = 13;
fEntry.dataType = dictionaryEntry->rwfType;
rsslClearReal(&tempReal);
price = 3;
rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);
if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
return ret;
}
}
/* complete encode field list */
if ((ret = rsslEncodeFieldListComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeFieldListComplete() failed with return code: %d\n", ret);
return ret;
}
/* complete encode message */
if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)
{
printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);
return ret;
}
msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);
return RSSL_RET_SUCCESS;
}
- WebSocket API in Ruby:
# market_price_posting.rb in ruby folder
if message_type == 'Refresh' then
message_domain = message_json['Domain']
if message_domain != nil then
if message_domain == 'Login' then
mp_post_json_hash = {
'ID' => 1,
'Type' => 'Post',
'Domain' => 'MarketPrice',
'Ack' => true,
'PostID' => 2,
'PostUserInfo' => {
'Address' => '10.42.61.200',
'UserID' => 18
},
'Key' => {
'Name' => 'ATS_ADDFIELD_S',
'Service' => 267
},
'Message' => {
'ID' => 0,
'Type' => 'Update',
'Domain' => 'MarketPrice',
'Fields' => {'X_RIC_NAME' => 'NEW.RIC' ,'HIGH_1' => 220,'LOW_1' => 30 }
}
}
ws.send mp_post_json_hash.to_json.to_s
end
end
end
Please refer to my next article Implementing Real-Time API application to work with ATS - Part 2, for the Real-Time APIs application source code to update data, delete fields, and RICs. You can also find troubleshooting with the solutions for common problems that may occur when working with ATS.
References
For further details, please check out the following resources:
- EMA Java Reference Manual
- ETA Java Reference Manual
- EMA C++ Reference Manual
- ETA C Reference Manual
- EMA C# Reference Manual
- WebSocket API Reference Manual
- ATS documents
For any question related to this article or the RTSDK page, please use the Developer Community Q&A Forum.