Introduction to Client File Store (CFS)Â - Python

Introduction
Client File Store houses and makes available content file stores from publisher repositories. At the time of this writing, the available stores that are in early release:
- Bulk ESG data: scores and measures in weekly bulk files
- The Refinitiv Trade Discovery product supports banks adopting their own market risk models to comply with the Risk Factors Eligibility Test (RFET) under the Fundamental Review of the Trading Book market risk regulation. It delivers trade observations across multiple asset classes and instruments, both ETI and OTC, that banks can use to assess level of liquidity associated with their risk factors in order to pass the RFET
- Tick History - Venue By Day
The approach to selecting and downloading the required content is largely common, some filtering options are specific to a particular content set. In this article we will discuss common tasks and use cases, to enable developers to make this content part of their custom applications.
Client File Store service is part of Refinitiv Data Platform (RDP) family of services. See section References for the detailed information on RDP.
Permissions are required to access an RDP service, such as CFS, and additional permissions are required to access a specific content set within CFS. Permissioining is beyond the scope of this article, and is best discussed with one's Refinitiv account team.
The Article at a Glance
- Define Token Handling and Obtain a Valid Token
- Request File Sets Available in the Bucket
- Paginate Through All the Available FileSets
- Request Available Packages
- Retrieve FileSets of Specific File Type (Filter By Attribute and By PackageId)
- Example - Retrieve Tick History File Sets for a Given Venue In View
- Example - Retrieve Tick History File Sets for a Given Venue Limit By Dates
- Retrieve Complete File Details of FileSet ID
- Stream File via File Id using Redirect
- Stream All Files In FileSet
- Get File Location (Step 1 of 2)
- Download File From File Location (Step 2 of 2)
Define Token Handling and Obtain a Valid Token
TOKEN_ENDPOINT = RDP_BASE_URL + CATEGORY_URL + RDP_AUTH_VERSION + ENDPOINT_URL
Â
def _requestNewToken(refreshToken):
  if refreshToken is None:
    tData = {
      "username": USERNAME,
      "password": PASSWORD,
      "grant_type": "password",
      "scope": SCOPE,
      "takeExclusiveSignOnControl": "true"
    };
  else:
    tData = {
      "refresh_token": refreshToken,
      "grant_type": "refresh_token",
    };
Â
  # Make a REST call to get latest access token
  response = requests.post(
    TOKEN_ENDPOINT,
    headers = {
      "Accept": "application/json"
    },
    data = tData,
    auth = (
      CLIENT_ID,
      CLIENT_SECRET
    )
  )
  Â
  if response.status_code != 200:
    raise Exception("Failed to get access token {0} - {1}".format(response.status_code, response.text));
Â
  # Return the new token
  return json.loads(response.text);
Â
def saveToken(tknObject):
  tf = open(TOKEN_FILE, "w+");
  print("Saving the new token");
  # Append the expiry time to token
  tknObject["expiry_tm"] = time.time() + int(tknObject["expires_in"]) - 10;
  # Store it in the file
  json.dump(tknObject, tf, indent=4)
  Â
def getToken():
  try:
    print("Reading the token from: " + TOKEN_FILE);
    # Read the token from a file
    tf = open(TOKEN_FILE, "r+")
    tknObject = json.load(tf);
Â
    # Is access token valid
    if tknObject["expiry_tm"] > time.time():
      # return access token
      return tknObject["access_token"];
Â
    print("Token expired, refreshing a new one...");
    tf.close();
    # Get a new token from refresh token
    tknObject = _requestNewToken(tknObject["refresh_token"]);
Â
  except Exception as exp:
    print("Caught exception: " + str(exp))
    print("Getting a new token using Password Grant...");
    tknObject = _requestNewToken(None);
Â
  # Persist this token for future queries
  saveToken(tknObject)
  print("Token is: " + tknObject["access_token"])
  # Return access token
  return tknObject["access_token"];
Â
accessToken = getToken();
print("Have token now");
FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/file-sets?bucket='+ RDP_CFS_BUCKET
FILESET_ID = ''
Â
def requestFileSets(token, withNext, skipToken, bucket, attributes):Â Â
  global FILESET_ENDPOINT  Â
  print("Obtaining FileSets in "+bucket+" Bucket...")
 Â
  FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/file-sets?bucket='+ bucket
  Â
  querystring = {}
  payload = ""
  jsonfull = ""
  jsonpartial = ""
  Â
  headers = {
      'Content-Type': "application/json",
      'Authorization': "Bearer " + token,
      'cache-control': "no-cache"
  }
Â
  if attributes:
    FILESET_ENDPOINT = FILESET_ENDPOINT + attributes
  if withNext:
    FILESET_ENDPOINT = FILESET_ENDPOINT + '&skipToken=' +skipToken
    Â
  response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)
  Â
  if response.status_code != 200:
    if response.status_code == 401:  # error when token expired
        accessToken = getToken();   # token refresh on token expired
        headers['Authorization'] = "Bearer " + accessToken
        response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)
    Â
  print('Raw response=');
  print(response);
  Â
  if response.status_code == 200:
    jsonFullResp = json.loads(response.text)
    return jsonFullResp;Â
  else:
    return '';
Â
jsonFullResp = requestFileSets(accessToken, False, '',RDP_CFS_BUCKET,'');
Â
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df
Paginate Through All the Available FileSets
Paginating through all available file sets is often required. If attributes are set, the filtering per attributes take place, and narrows down the the File Sets to those that are required for the use case.
If the number of all FileSets in a bucket is (or is expected) to be very large, we would certainly want to further filter by attributes, otherwise this iterative step can take up a significant time.
i = 1
while "@nextLink" in jsonFullResp:Â
  print('<<< Iteraction: '+str(i)+' >>> More exists: '+ jsonFullResp['@nextLink'] + ', skipToken is: ' + jsonFullResp['@nextLink'][-62:]+'\n')
  jsonFullResp = requestFileSets(accessToken, True, jsonFullResp['@nextLink'][-62:],RDP_CFS_BUCKET,'');
  print(json.dumps(jsonFullResp, indent=2));
  i+=1;
print('Last response without next=');
print(json.dumps(jsonFullResp, indent=2));
PACKAGES_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/packages?packageType=bulk'
PACKAGE_ID = ''
Â
def requestPackages(token, withNext, skipToken, attributes):Â Â
  global PACKAGES_ENDPOINT  Â
  print("Obtaining Packages of type bulk")
 Â
  PACKAGES_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/packages'
  Â
  querystring = {}
  payload = ""
  jsonfull = ""
  jsonpartial = ""
  Â
  headers = {
      'Content-Type': "application/json",
      'Authorization': "Bearer " + token,
      'cache-control': "no-cache"
  }
Â
  if attributes:
    PACKAGES_ENDPOINT = PACKAGES_ENDPOINT + attributes
  if withNext:
    PACKAGES_ENDPOINT = PACKAGES_ENDPOINT + '&skipToken=' +skipToken
    Â
  response = requests.request("GET", PACKAGES_ENDPOINT, data=payload, headers=headers, params=querystring)
  Â
  if response.status_code != 200:
    if response.status_code == 401:  # error when token expired
        accessToken = getToken();   # token refresh on token expired
        headers['Authorization'] = "Bearer " + accessToken
        response = requests.request("GET", PACKAGES_ENDPOINT, data=payload, headers=headers, params=querystring)
    Â
  print('Raw response=');
  print(response);
  Â
  if response.status_code == 200:
    jsonFullResp = json.loads(response.text)
    return jsonFullResp;Â
  else:
    return '';
Â
jsonFullResp = requestPackages(accessToken, False, '','');Â
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
df = pd.json_normalize(jsonFullResp['value'])
df
Retrieve FileSets of Specific File Type (Filter By Attribute and By PackageId)
When files of several distinct types are available, filtering by file type is a very useful capability. In the case of ESG content, we filter by ContentType.
The file types that are available may change over time; at the time of this writing, we are going to use as examples:
- ESG Scores
- Symbology Cusip
Further, the selected package, if also filtering by packageId has to contain the files per filter in attributes, in order to request their listings successfully,
otherwise the result will be empty.
Note how if we also have the required PackageId available, whether it is learned from reviewing the available information, or recommended by Refinitiv account team, we can further filer FileSets by PackageId
jsonFullResp = requestFileSets(accessToken, False, '',RDP_CFS_BUCKET,'&attributes=ContentType:Symbology Cusip');
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df
jsonFullResp = requestFileSets(accessToken, False, '',RDP_CFS_BUCKET,'&packageId='+PACKAGE_ID+'&attributes=ContentType:ESG Scores');
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df
Â
Example - Retrieve Tick History File Sets for a Given Venue In View
Venues (exchanges) and Views (normalized, reference, level 2) are an important means of selection of the required content with CFS- Tick History, therefore, let us consider an example of selecting FileSets for required Venue, of required View (type).
RDP_TH_BUCKET = 'TICKHISTORY_VBD_UNLIMITED'
CFS_VBD_VENUE = 'NSQ'
CFS_VBD_VIEW = 'normalised'
jsonFullResp = requestFileSets(accessToken, False, '',RDP_TH_BUCKET,'&attributes=venue:'+CFS_VBD_VENUE+',view:'+CFS_VBD_VIEW);
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df
jsonFullResp = requestFileSets(accessToken, False, '',RDP_TH_BUCKET,'&attributes=venue:'+CFS_VBD_VENUE+',view='+CFS_VBD_VIEW+
                '&contentFrom=2007-01-01T00:00:00Z&contentTo=2010-01-01T00:00:00Z');
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df
FILES_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files?filesetId='
Â
def requestFileDetails(token, fileSetId):Â Â
  global FILES_ENDPOINT
  print("Obtaining File details for FileSet= "+ fileSetId + " ...")
  FILES_ENDPOINT = FILES_ENDPOINT + fileSetId
  print("(If result is Response=400, make sure that fileSetId that is passed in is set with a valid value...)")
 Â
  querystring = {}
  payload = ""
  jsonfull = ""
  jsonpartial = ""
  Â
  headers = {
      'Content-Type': "application/json",
      'Authorization': "Bearer " + token,
      'cache-control': "no-cache"
  }
    Â
  response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)
  Â
  if response.status_code != 200:
    if response.status_code == 401:  # error when token expired
        accessToken = getToken();   # token refresh on token expired
        headers['Authorization'] = "Bearer " + accessToken
        response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)
    Â
  print('Raw response=');
  print(response);
  Â
  if response.status_code == 200:
    jsonFullResp = json.loads(response.text)
    return jsonFullResp;Â
  else:
    return '';
Â
jsonFullResp = requestFileDetails(accessToken, FILESET_ID);
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
df = pd.json_normalize(jsonFullResp['value'])
df
Stream File via File Id using Redirect
Note that both FileId and file name are required, to be passed into the next function, so that the file being streamed is stored in a meaningful name that corresponds to the downloaded content. This information can be pre-selected, by manually pasting from the previous sections output and reading in; for production solutions it should be parsed out from the complete file details, in code.
import shutil
Â
FILES_STREAM_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files/'
Â
# use valid values, obtained from the previous step
exampleFileId = '4edd-99af-da829f42-8ddd-07fabfcddca9'Â Â
exampleFileName = 'RFT-ESG-Sources-Full-Init-2021-01-17-part07.jsonl.gz'
Â
def requestFileDownload(token, fileId, fileName):Â Â
  FILES_STREAM_ENDPOINT = FILES_STREAM_ENDPOINT_START + fileId+ '/stream'
  print("Obtaining File ... " + FILES_STREAM_ENDPOINT)
 Â
  chunk_size = 1000
  Â
  headers = {
      'Authorization': 'Bearer ' + token,
      'cache-control': "no-cache",
      'Accept': '*/*'
  }
    Â
  response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)
  Â
  if response.status_code != 200:
    if response.status_code == 401:  # error when token expired
        accessToken = getToken();   # token refresh on token expired
        headers['Authorization'] = "Bearer " + accessToken
        response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)
Â
    Â
  print('Response code=' + str(response.status_code));
  Â
  if response.status_code == 200:
    print('Processing...')
    with open(fileName, 'wb') as fd:
      shutil.copyfileobj(response.raw, fd)Â
    print('Look for gzipped file named: '+ fileName + ' in current directory')
    response.connection.close()
    Â
  return;Â
Â
# consider below an example only
requestFileDownload(accessToken, exampleFileId, exampleFileName);
#requestFileDownload(accessToken, FILE_ID, FILE_NAME);
print("List of files to be streamed by this step:")
for item in jsonFullResp['value']:
  print ('File name: ' +item['filename'])
print("\n... Starting to stream now, this may take long .<.>. .<.>. .<.>.")
for item in jsonFullResp['value']:
  print ('Streaming File: ' +item['filename'])
  requestFileDownload(accessToken, item['id'],item['filename']);
import shutil
Â
FILES_STREAM_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files/'
DIRECT_URL = ''
Â
exampleFileId = '4edd-99af-da829f42-8ddd-07fabfcddca9'
Â
def requestFileLocation(token, fileId):Â Â
  Â
  FILES_STREAM_ENDPOINT = FILES_STREAM_ENDPOINT_START + fileId+ '/stream?doNotRedirect=true'
  Â
  print("Obtaining File ... " + FILES_STREAM_ENDPOINT)
 Â
  chunk_size = 1000
  Â
  headers = {
      'Authorization': 'Bearer ' + token,
      'cache-control': "no-cache",
      'Accept': '*/*'
  }
    Â
  response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)
  Â
  if response.status_code != 200:
    if response.status_code == 401:  # error when token expired
        accessToken = getToken();   # token refresh on token expired
        headers['Authorization'] = "Bearer " + accessToken
        response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)
Â
    Â
  print('Response code=' + str(response.status_code));
  Â
  if response.status_code == 200:
    jsonFullResp = json.loads(response.text)
    print('Parsed json response=');
    print(json.dumps(jsonFullResp, indent=2));
    DIRECT_URL = jsonFullResp['url'];
    print('File Direct URL is: ' +str(DIRECT_URL)+ '|||');
    Â
  return DIRECT_URL;Â
Â
Â
DIRECT_URL = requestFileLocation(accessToken, FILE_ID);
from urllib.parse import urlparse, parse_qs
Â
exampleFileName = 'RFT-ESG-Sources-Full-Init-2021-01-17-part07.jsonl.gz'
Â
def requestDirectFileDownload(token, fileURL, fileName):Â Â
Â
  print("Obtaining File from URL... " + fileURL + '... to file name=' + fileName)
  Â
  #Parse out URL parameters for submission into requests
  url_obj = urlparse(fileURL)
  parsed_params = parse_qs(url_obj.query)
  # extract the URL without query parameters
  parsed_url = url_obj._replace(query=None).geturl()
Â
  response = requests.get(parsed_url, params=parsed_params,stream=True)
    Â
  if response.status_code != 200:
    if response.status_code == 401:  # error when token expired
        accessToken = getToken();   # token refresh on token expired
        headers['Authorization'] = "Bearer " + accessToken
        response = requests.get(parsed_url, params=query)
Â
    Â
  print('Response code=' + str(response.status_code));    Â
 Â
  filename = 'another_'+fileName  Â
  Â
  if response.status_code == 200:
    print('Processing...')
    with open(filename, 'wb') as fd:
      shutil.copyfileobj(response.raw, fd)Â
Â
    print('Look for gzipped file named: '+ filename + ' in current directory')
    response.connection.close()
    Â
  return;Â
Â
Â
requestDirectFileDownload(accessToken, DIRECT_URL, FILE_NAME);