Use Case Three
Refinitiv Data Platform News Middle Office Compliance Use Cases
Introduction to Use Case
How a user can make a streaming subscription for news:
- Where he messages are tagged with PermIds (1, 2, 3, 4)
- And the messages contain topic codes (x, y or z)
- And, how the user can pick up the messages from an SQS queue every 24 hours at time (HH:MM:SS) and then delete the messages from the SQS queue- we are going to retrieve and remove all received messages on demand, consequently, the script can be scheduled to run every 24 hours or at any required time N using a standard scheduler
The Functionality
- Delete
- The subscription per subscriptionId
- All subscriptions of type
- Create
- Subscription to news heandlines
- Subscription to new stories
and store subscriptionId information
- List all active subscriptions
- Retrieve, print out and remove from the queue all queued messages per subscriptionId
The Reuse
This use case implementation:
- Is built upon Quickstart Python example
- Uses rdpToken.py to authenticate with RDP
- Uses sqsQueueRetrieveAll.py to work with AWS queue
Connecting Pieces
- credentials.ini - a file that contains valid RDP credentials, one piece of infromation per line, username, password and clientId, and that enable us to generate a valid authentication token that is required for RDP interactions
- lastSubscribed.cfg - a file that stores that information of the last subscription that was established, one piece of information per line, endpoint, cryptographyKey, currentSubscriptionID
- token.txt - a file where the newly generated RDP authentication token is stored, to enable its reuse during it's valid lifespan.
Key Implementation Details
We would like to omit the description of the RDP and SQS access code that are described excellently n other materials, and focus on the key implementations details for the use case only:
- Subscription and filtering
- Preserve the last subscription information
- Retrieve the messages from SQS queue, print and remove iteratively until the queue is fully exhausted
Subscribe to News
The subscription includes filtering. This is where we define PermIDs and TopicCodes that we are interested in
#==============================================
def subscribeToNews():
#==============================================
if gHeadlines:
RESOURCE_ENDPOINT = base_URL + category_URL + RDP_version + endpoint_URL_headlines
else:
RESOURCE_ENDPOINT = base_URL + category_URL + RDP_version + endpoint_URL_stories
# Optional filters can be applied to subscription. Filter by mulitple Topic Codes and mulitple PermIds
requestData = {
"transport": {
"transportType": "AWS-SQS"
},
"filter": {
"type": "operator",
"operator": "and",
"operands": [
{
"operator": "or",
"operands": [
{
"type": "newscode",
"value": "G:9"
},
{
"type": "newscode",
"value": "G:6J"
}],
"type": "operator"
},
{
"operator": "or",
"operands": [
{
"value": "P:4295905573",
"type": "permid"
},
{
"value": "P:5030853586",
"type": "permid"
}],
"type": "operator"
}
]},
"filterOverrides": {},
"maxcount": 10,
"dateFrom": "null",
"dateTo": "null",
"relevanceGroup": "all",
"payloadVersion": "2.0"
}
# get the latest access token
accessToken = rdpToken.getToken()
hdrs = {
"Authorization": "Bearer " + accessToken,
"Content-Type": "application/json"
}
dResp = requests.post(RESOURCE_ENDPOINT, headers = hdrs, data = json.dumps(requestData))
if dResp.status_code != 200:
raise ValueError("Unable to subscribe. Code %s, Message: %s" % (dResp.status_code, dResp.text))
else:
jResp = json.loads(dResp.text)
preserveLastSubscription(jResp["transportInfo"]["endpoint"],
jResp["transportInfo"]["cryptographyKey"], jResp["subscriptionID"])
return jResp["transportInfo"]["endpoint"], jResp["transportInfo"]["cryptographyKey"], jResp["subscriptionID"]
Preserve and Retrieve Last Subscription Information
This is a very small but key piece of the implementation, as this is what allows us, when we are ready to drain our queue of the accumulated filtered messages to just retrieve the information that is stored in the file previously, and to do so without entering complex and easy to mistype parameters
And yes, there is also an option to enter specific parameters to the queue and to retrieve and remove based on that.
#==============================================
def preserveLastSubscription(line1, line2, line3):
#==============================================
f = open(lastSubscriptionFile, "w")
f.write(line1)
f.write("\n")
f.write(line2)
f.write("\n")
f.write(line3)
f.close()
Retrieve and Remove Exhaustively
We connect to the queue that we have defined and until any messages remain
- Retrieve them in chunks of ten
- Print them out
- Remove them from the queue
Hereby we are laying foundation for a multitude of use cases, where the contents of the queue will be processed periodically and not just printed.
#==============================================
def retrieveAndRemove(accessID, secretKey, sessionToken, endpoint, cryptographyKey, callback=None):
#==============================================
# create a SQS session
session = boto3.Session(
aws_access_key_id = accessID,
aws_secret_access_key = secretKey,
aws_session_token = sessionToken,
region_name = REGION
)
# sqs = session.client('sqs')
sqs = session.client('sqs', use_ssl=False, verify=False)
response = sqs.get_queue_attributes(
QueueUrl=endpoint,
AttributeNames=[
'All',
]
)
print('Queue Attributes are:\n')
print(json.dumps(response, indent=2))
print('*** Retrieving all messages from queue... ***')
i = 0
while 1:
resp = sqs.receive_message(QueueUrl = endpoint, MaxNumberOfMessages=10, WaitTimeSeconds = 0)
if 'Messages' in resp:
messages = resp['Messages']
print(f"&&&Number of messages received: {len(messages)}, iteration {i} &&&")
# print and remove all the nested messages
for message in messages:
mBody = message['Body']
# decrypt this message
m = decrypt(cryptographyKey, mBody)
processPayload(m, callback)
# *** accumulate and remove all the nested message
sqs.delete_message(QueueUrl = endpoint, ReceiptHandle = message['ReceiptHandle'])
i += 1
else:
print('No more messages on queue... after ',i, ' iterations')
break
We conclude our present brief discussion of RDP News middle office compliance use case. If you have questions or would like to learn more, get in touch with us on Refinitiv Developers Forums
References
- The complete source code for the use case is available on GitHub:
Refinitiv-API-Samples/Example.RDP.Python.RDPNewsMiddleOfficeComplianceUseCases (github.com)
- Quickstart Python example is available on Refinitiv developers portal:
- Refinitiv developers forums - RDP section: