Introduction to Client File Store (CFS) - Python
Introduction
Client File Store houses and makes available content file stores from publisher repositories. At the time of this writing, the available stores that are in early release:
- Bulk ESG data: scores and measures in weekly bulk files
- The Refinitiv Trade Discovery product supports banks adopting their own market risk models to comply with the Risk Factors Eligibility Test (RFET) under the Fundamental Review of the Trading Book market risk regulation. It delivers trade observations across multiple asset classes and instruments, both ETI and OTC, that banks can use to assess level of liquidity associated with their risk factors in order to pass the RFET
- Tick History - Venue By Day
The approach to selecting and downloading the required content is largely common, some filtering options are specific to a particular content set. In this article we will discuss common tasks and use cases, to enable developers to make this content part of their custom applications.
Client File Store service is part of Refinitiv Data Platform (RDP) family of services. See section References for the detailed information on RDP.
Permissions are required to access an RDP service, such as CFS, and additional permissions are required to access a specific content set within CFS. Permissioining is beyond the scope of this article, and is best discussed with one's Refinitiv account team.
The Article at a Glance
- Define Token Handling and Obtain a Valid Token
- Request File Sets Available in the Bucket
- Paginate Through All the Available FileSets
- Request Available Packages
- Retrieve FileSets of Specific File Type (Filter By Attribute and By PackageId)
- Example - Retrieve Tick History File Sets for a Given Venue In View
- Example - Retrieve Tick History File Sets for a Given Venue Limit By Dates
- Retrieve Complete File Details of FileSet ID
- Stream File via File Id using Redirect
- Stream All Files In FileSet
- Get File Location (Step 1 of 2)
- Download File From File Location (Step 2 of 2)
Define Token Handling and Obtain a Valid Token
TOKEN_ENDPOINT = RDP_BASE_URL + CATEGORY_URL + RDP_AUTH_VERSION + ENDPOINT_URL
def _requestNewToken(refreshToken):
if refreshToken is None:
tData = {
"username": USERNAME,
"password": PASSWORD,
"grant_type": "password",
"scope": SCOPE,
"takeExclusiveSignOnControl": "true"
};
else:
tData = {
"refresh_token": refreshToken,
"grant_type": "refresh_token",
};
# Make a REST call to get latest access token
response = requests.post(
TOKEN_ENDPOINT,
headers = {
"Accept": "application/json"
},
data = tData,
auth = (
CLIENT_ID,
CLIENT_SECRET
)
)
if response.status_code != 200:
raise Exception("Failed to get access token {0} - {1}".format(response.status_code, response.text));
# Return the new token
return json.loads(response.text);
def saveToken(tknObject):
tf = open(TOKEN_FILE, "w+");
print("Saving the new token");
# Append the expiry time to token
tknObject["expiry_tm"] = time.time() + int(tknObject["expires_in"]) - 10;
# Store it in the file
json.dump(tknObject, tf, indent=4)
def getToken():
try:
print("Reading the token from: " + TOKEN_FILE);
# Read the token from a file
tf = open(TOKEN_FILE, "r+")
tknObject = json.load(tf);
# Is access token valid
if tknObject["expiry_tm"] > time.time():
# return access token
return tknObject["access_token"];
print("Token expired, refreshing a new one...");
tf.close();
# Get a new token from refresh token
tknObject = _requestNewToken(tknObject["refresh_token"]);
except Exception as exp:
print("Caught exception: " + str(exp))
print("Getting a new token using Password Grant...");
tknObject = _requestNewToken(None);
# Persist this token for future queries
saveToken(tknObject)
print("Token is: " + tknObject["access_token"])
# Return access token
return tknObject["access_token"];
accessToken = getToken();
print("Have token now");
FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/file-sets?bucket='+ RDP_CFS_BUCKET
FILESET_ID = ''
def requestFileSets(token, withNext, skipToken, bucket, attributes):
global FILESET_ENDPOINT
print("Obtaining FileSets in "+bucket+" Bucket...")
FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/file-sets?bucket='+ bucket
querystring = {}
payload = ""
jsonfull = ""
jsonpartial = ""
headers = {
'Content-Type': "application/json",
'Authorization': "Bearer " + token,
'cache-control': "no-cache"
}
if attributes:
FILESET_ENDPOINT = FILESET_ENDPOINT + attributes
if withNext:
FILESET_ENDPOINT = FILESET_ENDPOINT + '&skipToken=' +skipToken
response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)
if response.status_code != 200:
if response.status_code == 401: # error when token expired
accessToken = getToken(); # token refresh on token expired
headers['Authorization'] = "Bearer " + accessToken
response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)
print('Raw response=');
print(response);
if response.status_code == 200:
jsonFullResp = json.loads(response.text)
return jsonFullResp;
else:
return '';
jsonFullResp = requestFileSets(accessToken, False, '',RDP_CFS_BUCKET,'');
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df
Paginate Through All the Available FileSets
Paginating through all available file sets is often required. If attributes are set, the filtering per attributes take place, and narrows down the the File Sets to those that are required for the use case.
If the number of all FileSets in a bucket is (or is expected) to be very large, we would certainly want to further filter by attributes, otherwise this iterative step can take up a significant time.
i = 1
while "@nextLink" in jsonFullResp:
print('<<< Iteraction: '+str(i)+' >>> More exists: '+ jsonFullResp['@nextLink'] + ', skipToken is: ' + jsonFullResp['@nextLink'][-62:]+'\n')
jsonFullResp = requestFileSets(accessToken, True, jsonFullResp['@nextLink'][-62:],RDP_CFS_BUCKET,'');
print(json.dumps(jsonFullResp, indent=2));
i+=1;
print('Last response without next=');
print(json.dumps(jsonFullResp, indent=2));
PACKAGES_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/packages?packageType=bulk'
PACKAGE_ID = ''
def requestPackages(token, withNext, skipToken, attributes):
global PACKAGES_ENDPOINT
print("Obtaining Packages of type bulk")
PACKAGES_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/packages'
querystring = {}
payload = ""
jsonfull = ""
jsonpartial = ""
headers = {
'Content-Type': "application/json",
'Authorization': "Bearer " + token,
'cache-control': "no-cache"
}
if attributes:
PACKAGES_ENDPOINT = PACKAGES_ENDPOINT + attributes
if withNext:
PACKAGES_ENDPOINT = PACKAGES_ENDPOINT + '&skipToken=' +skipToken
response = requests.request("GET", PACKAGES_ENDPOINT, data=payload, headers=headers, params=querystring)
if response.status_code != 200:
if response.status_code == 401: # error when token expired
accessToken = getToken(); # token refresh on token expired
headers['Authorization'] = "Bearer " + accessToken
response = requests.request("GET", PACKAGES_ENDPOINT, data=payload, headers=headers, params=querystring)
print('Raw response=');
print(response);
if response.status_code == 200:
jsonFullResp = json.loads(response.text)
return jsonFullResp;
else:
return '';
jsonFullResp = requestPackages(accessToken, False, '','');
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
df = pd.json_normalize(jsonFullResp['value'])
df
Retrieve FileSets of Specific File Type (Filter By Attribute and By PackageId)
When files of several distinct types are available, filtering by file type is a very useful capability. In the case of ESG content, we filter by ContentType.
The file types that are available may change over time; at the time of this writing, we are going to use as examples:
- ESG Scores
- Symbology Cusip
Further, the selected package, if also filtering by packageId has to contain the files per filter in attributes, in order to request their listings successfully,
otherwise the result will be empty.
Note how if we also have the required PackageId available, whether it is learned from reviewing the available information, or recommended by Refinitiv account team, we can further filer FileSets by PackageId
jsonFullResp = requestFileSets(accessToken, False, '',RDP_CFS_BUCKET,'&attributes=ContentType:Symbology Cusip');
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df
jsonFullResp = requestFileSets(accessToken, False, '',RDP_CFS_BUCKET,'&packageId='+PACKAGE_ID+'&attributes=ContentType:ESG Scores');
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df
Example - Retrieve Tick History File Sets for a Given Venue In View
Venues (exchanges) and Views (normalized, reference, level 2) are an important means of selection of the required content with CFS- Tick History, therefore, let us consider an example of selecting FileSets for required Venue, of required View (type).
RDP_TH_BUCKET = 'TICKHISTORY_VBD_UNLIMITED'
CFS_VBD_VENUE = 'NSQ'
CFS_VBD_VIEW = 'normalised'
jsonFullResp = requestFileSets(accessToken, False, '',RDP_TH_BUCKET,'&attributes=venue:'+CFS_VBD_VENUE+',view:'+CFS_VBD_VIEW);
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df
jsonFullResp = requestFileSets(accessToken, False, '',RDP_TH_BUCKET,'&attributes=venue:'+CFS_VBD_VENUE+',view='+CFS_VBD_VIEW+
'&contentFrom=2007-01-01T00:00:00Z&contentTo=2010-01-01T00:00:00Z');
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df
FILES_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files?filesetId='
def requestFileDetails(token, fileSetId):
global FILES_ENDPOINT
print("Obtaining File details for FileSet= "+ fileSetId + " ...")
FILES_ENDPOINT = FILES_ENDPOINT + fileSetId
print("(If result is Response=400, make sure that fileSetId that is passed in is set with a valid value...)")
querystring = {}
payload = ""
jsonfull = ""
jsonpartial = ""
headers = {
'Content-Type': "application/json",
'Authorization': "Bearer " + token,
'cache-control': "no-cache"
}
response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)
if response.status_code != 200:
if response.status_code == 401: # error when token expired
accessToken = getToken(); # token refresh on token expired
headers['Authorization'] = "Bearer " + accessToken
response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)
print('Raw response=');
print(response);
if response.status_code == 200:
jsonFullResp = json.loads(response.text)
return jsonFullResp;
else:
return '';
jsonFullResp = requestFileDetails(accessToken, FILESET_ID);
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
df = pd.json_normalize(jsonFullResp['value'])
df
Stream File via File Id using Redirect
Note that both FileId and file name are required, to be passed into the next function, so that the file being streamed is stored in a meaningful name that corresponds to the downloaded content. This information can be pre-selected, by manually pasting from the previous sections output and reading in; for production solutions it should be parsed out from the complete file details, in code.
import shutil
FILES_STREAM_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files/'
# use valid values, obtained from the previous step
exampleFileId = '4edd-99af-da829f42-8ddd-07fabfcddca9'
exampleFileName = 'RFT-ESG-Sources-Full-Init-2021-01-17-part07.jsonl.gz'
def requestFileDownload(token, fileId, fileName):
FILES_STREAM_ENDPOINT = FILES_STREAM_ENDPOINT_START + fileId+ '/stream'
print("Obtaining File ... " + FILES_STREAM_ENDPOINT)
chunk_size = 1000
headers = {
'Authorization': 'Bearer ' + token,
'cache-control': "no-cache",
'Accept': '*/*'
}
response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)
if response.status_code != 200:
if response.status_code == 401: # error when token expired
accessToken = getToken(); # token refresh on token expired
headers['Authorization'] = "Bearer " + accessToken
response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)
print('Response code=' + str(response.status_code));
if response.status_code == 200:
print('Processing...')
with open(fileName, 'wb') as fd:
shutil.copyfileobj(response.raw, fd)
print('Look for gzipped file named: '+ fileName + ' in current directory')
response.connection.close()
return;
# consider below an example only
requestFileDownload(accessToken, exampleFileId, exampleFileName);
#requestFileDownload(accessToken, FILE_ID, FILE_NAME);
print("List of files to be streamed by this step:")
for item in jsonFullResp['value']:
print ('File name: ' +item['filename'])
print("\n... Starting to stream now, this may take long .<.>. .<.>. .<.>.")
for item in jsonFullResp['value']:
print ('Streaming File: ' +item['filename'])
requestFileDownload(accessToken, item['id'],item['filename']);
import shutil
FILES_STREAM_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files/'
DIRECT_URL = ''
exampleFileId = '4edd-99af-da829f42-8ddd-07fabfcddca9'
def requestFileLocation(token, fileId):
FILES_STREAM_ENDPOINT = FILES_STREAM_ENDPOINT_START + fileId+ '/stream?doNotRedirect=true'
print("Obtaining File ... " + FILES_STREAM_ENDPOINT)
chunk_size = 1000
headers = {
'Authorization': 'Bearer ' + token,
'cache-control': "no-cache",
'Accept': '*/*'
}
response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)
if response.status_code != 200:
if response.status_code == 401: # error when token expired
accessToken = getToken(); # token refresh on token expired
headers['Authorization'] = "Bearer " + accessToken
response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)
print('Response code=' + str(response.status_code));
if response.status_code == 200:
jsonFullResp = json.loads(response.text)
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
DIRECT_URL = jsonFullResp['url'];
print('File Direct URL is: ' +str(DIRECT_URL)+ '|||');
return DIRECT_URL;
DIRECT_URL = requestFileLocation(accessToken, FILE_ID);
from urllib.parse import urlparse, parse_qs
exampleFileName = 'RFT-ESG-Sources-Full-Init-2021-01-17-part07.jsonl.gz'
def requestDirectFileDownload(token, fileURL, fileName):
print("Obtaining File from URL... " + fileURL + '... to file name=' + fileName)
#Parse out URL parameters for submission into requests
url_obj = urlparse(fileURL)
parsed_params = parse_qs(url_obj.query)
# extract the URL without query parameters
parsed_url = url_obj._replace(query=None).geturl()
response = requests.get(parsed_url, params=parsed_params,stream=True)
if response.status_code != 200:
if response.status_code == 401: # error when token expired
accessToken = getToken(); # token refresh on token expired
headers['Authorization'] = "Bearer " + accessToken
response = requests.get(parsed_url, params=query)
print('Response code=' + str(response.status_code));
filename = 'another_'+fileName
if response.status_code == 200:
print('Processing...')
with open(filename, 'wb') as fd:
shutil.copyfileobj(response.raw, fd)
print('Look for gzipped file named: '+ filename + ' in current directory')
response.connection.close()
return;
requestDirectFileDownload(accessToken, DIRECT_URL, FILE_NAME);