Introduction to Client File Store (CFS) - Python

Zoya Farberov
Developer Advocate Developer Advocate

Introduction

Client File Store houses and makes available content file stores from publisher repositories.  At the time of this writing, the available stores that are in early release:

  • Bulk ESG data: scores and measures in weekly bulk files 
  • The Refinitiv Trade Discovery product supports banks adopting their own market risk models to comply with the Risk Factors Eligibility Test (RFET) under the Fundamental Review of the  Trading Book market risk regulation. It delivers trade observations across multiple asset classes and instruments, both ETI and OTC, that banks can use to assess level of liquidity associated with their risk factors in order to pass the RFET
  • Tick History - Venue By Day

The approach to selecting and downloading the required content is largely common, some filtering options are specific to a particular content set.  In this article we will discuss common tasks and use cases, to enable developers to make this content part of their custom applications.

Client File Store service is part of Refinitiv Data Platform (RDP) family of services.   See section References for the detailed information on RDP.

Permissions are required to access an RDP service, such as CFS, and additional permissions are required to access a specific content set within CFS.  Permissioining is beyond the scope of this article, and is best discussed with one's Refinitiv account team.

    	
            

TOKEN_ENDPOINT = RDP_BASE_URL + CATEGORY_URL + RDP_AUTH_VERSION + ENDPOINT_URL

 

def _requestNewToken(refreshToken):

    if refreshToken is None:

        tData = {

            "username": USERNAME,

            "password": PASSWORD,

            "grant_type": "password",

            "scope": SCOPE,

            "takeExclusiveSignOnControl": "true"

        };

    else:

        tData = {

            "refresh_token": refreshToken,

            "grant_type": "refresh_token",

        };

 

    # Make a REST call to get latest access token

    response = requests.post(

        TOKEN_ENDPOINT,

        headers = {

            "Accept": "application/json"

        },

        data = tData,

        auth = (

            CLIENT_ID,

            CLIENT_SECRET

        )

    )

    

    if response.status_code != 200:

        raise Exception("Failed to get access token {0} - {1}".format(response.status_code, response.text));

 

    # Return the new token

    return json.loads(response.text);

 

def saveToken(tknObject):

    tf = open(TOKEN_FILE, "w+");

    print("Saving the new token");

    # Append the expiry time to token

    tknObject["expiry_tm"] = time.time() + int(tknObject["expires_in"]) - 10;

    # Store it in the file

    json.dump(tknObject, tf, indent=4)

    

def getToken():

    try:

        print("Reading the token from: " + TOKEN_FILE);

        # Read the token from a file

        tf = open(TOKEN_FILE, "r+")

        tknObject = json.load(tf);

 

        # Is access token valid

        if tknObject["expiry_tm"] > time.time():

            # return access token

            return tknObject["access_token"];

 

        print("Token expired, refreshing a new one...");

        tf.close();

        # Get a new token from refresh token

        tknObject = _requestNewToken(tknObject["refresh_token"]);

 

    except Exception as exp:

        print("Caught exception: " + str(exp))

        print("Getting a new token using Password Grant...");

        tknObject = _requestNewToken(None);

 

    # Persist this token for future queries

    saveToken(tknObject)

    print("Token is: " + tknObject["access_token"])

    # Return access token

    return tknObject["access_token"];

 

accessToken = getToken();

print("Have token now");

Request File Sets Available in the Bucket

Next we define the function requestFileSets that will be out work vehicle  for the next use cases, the same approach is applicable to all CFS, while attributes would differ per CFS as well as per use case.

    	
            

FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/file-sets?bucket='+ RDP_CFS_BUCKET

FILESET_ID = ''

 

def requestFileSets(token, withNext, skipToken, bucket, attributes):   

    global FILESET_ENDPOINT    

    print("Obtaining FileSets in "+bucket+" Bucket...")

  

    FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/file-sets?bucket='+ bucket

    

    querystring = {}

    payload = ""

    jsonfull = ""

    jsonpartial = ""

    

    headers = {

            'Content-Type': "application/json",

            'Authorization': "Bearer " + token,

            'cache-control': "no-cache"

    }

 

    if attributes:

        FILESET_ENDPOINT = FILESET_ENDPOINT + attributes

    if withNext:

        FILESET_ENDPOINT = FILESET_ENDPOINT + '&skipToken=' +skipToken

        

    response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)

         

    print('Raw response=');

    print(response);

    

    if response.status_code == 200:

        jsonFullResp = json.loads(response.text)

        return jsonFullResp; 

    else:

        return '';

 

jsonFullResp = requestFileSets(accessToken, False, '',RDP_CFS_BUCKET,'');

 

print('Parsed json response=');

print(json.dumps(jsonFullResp, indent=2));

print('Same response, tabular view');

df = pd.json_normalize(jsonFullResp['value'])

df

Paginate Through All the Available FileSets

Paginating through all available file sets is often required.  If attributes are set, the filtering per attributes take place, and narrows down the the File Sets to those that are required for the use case.

If the number of all FileSets in a bucket is (or is expected) to be very large, we would certainly want to further filter by attributes, otherwise this iterative step can take up a significant time.

    	
            

i = 1

while "@nextLink" in jsonFullResp: 

    print('<<< Iteraction: '+str(i)+' >>>  More exists: '+ jsonFullResp['@nextLink'] + ', skipToken is: ' + jsonFullResp['@nextLink'][-62:]+'\n')

    jsonFullResp = requestFileSets(accessToken, True, jsonFullResp['@nextLink'][-62:],RDP_CFS_BUCKET,'');

    print(json.dumps(jsonFullResp, indent=2));

    i+=1;

print('Last response without next=');

print(json.dumps(jsonFullResp, indent=2));

Request Available Packages

Requesting available packages is another useful way to examine the available content.  We can next use this information to select the packageId or packageId that are required and to request FilSets from the relevant package(s).

    	
            

PACKAGES_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/packages?packageType=bulk'

PACKAGE_ID = ''

 

def requestPackages(token, withNext, skipToken, attributes):   

    global PACKAGES_ENDPOINT    

    print("Obtaining Packages of type bulk")

  

    PACKAGES_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/packages'

    

    querystring = {}

    payload = ""

    jsonfull = ""

    jsonpartial = ""

    

    headers = {

            'Content-Type': "application/json",

            'Authorization': "Bearer " + token,

            'cache-control': "no-cache"

    }

 

    if attributes:

        PACKAGES_ENDPOINT = PACKAGES_ENDPOINT + attributes

    if withNext:

        PACKAGES_ENDPOINT = PACKAGES_ENDPOINT + '&skipToken=' +skipToken

        

    response = requests.request("GET", PACKAGES_ENDPOINT, data=payload, headers=headers, params=querystring)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET", PACKAGES_ENDPOINT, data=payload, headers=headers, params=querystring)

         

    print('Raw response=');

    print(response);

    

    if response.status_code == 200:

        jsonFullResp = json.loads(response.text)

        return jsonFullResp; 

    else:

        return '';

 

jsonFullResp = requestPackages(accessToken, False, '',''); 

print('Parsed json response=');

print(json.dumps(jsonFullResp, indent=2));

df = pd.json_normalize(jsonFullResp['value'])

df

Retrieve FileSets of Specific File Type (Filter By Attribute and By PackageId)

When files of several distinct types are available, filtering by file type is a very useful capability.  In the case of ESG content, we filter by ContentType.

The file types that are available may change over time; at the time of this writing, we are going to use as examples:

  • ESG Scores
  • Symbology Cusip

Further, the selected package, if also filtering by packageId has to contain the files per filter in attributes, in order to request their listings successfully,

otherwise the result will be empty.

Note how if we also have the required PackageId available, whether it is learned from reviewing the available information, or recommended by Refinitiv account team, we can further filer FileSets by PackageId

    	
            

jsonFullResp = requestFileSets(accessToken, False, '',RDP_CFS_BUCKET,'&attributes=ContentType:Symbology Cusip');

print('Parsed json response=');

print(json.dumps(jsonFullResp, indent=2));

print('Same response, tabular view');

df = pd.json_normalize(jsonFullResp['value'])

df

    	
            

jsonFullResp = requestFileSets(accessToken, False, '',RDP_CFS_BUCKET,'&packageId='+PACKAGE_ID+'&attributes=ContentType:ESG Scores');

print('Parsed json response=');

print(json.dumps(jsonFullResp, indent=2));

print('Same response, tabular view');

df = pd.json_normalize(jsonFullResp['value'])

df

 

Example - Retrieve Tick History File Sets for a Given Venue In View

Venues (exchanges) and Views (normalized, reference, level 2) are an important means of selection of the required content with CFS- Tick History, therefore, let us consider an example of selecting FileSets for required Venue, of required View (type).

    	
            

RDP_TH_BUCKET = 'TICKHISTORY_VBD_UNLIMITED'

CFS_VBD_VENUE = 'NSQ'

CFS_VBD_VIEW = 'normalised'

jsonFullResp = requestFileSets(accessToken, False, '',RDP_TH_BUCKET,'&attributes=venue:'+CFS_VBD_VENUE+',view:'+CFS_VBD_VIEW);

print('Parsed json response=');

print(json.dumps(jsonFullResp, indent=2));

print('Same response, tabular view');

df = pd.json_normalize(jsonFullResp['value'])

df

Example - Retrieve Tick History File Sets for a Given Venue Limit By Dates

Another common use case is selecting FileSets for required Venue within specific Dates, often - years, in the example below we select FileSets for years 2007-2009 to download and work with.

    	
            

jsonFullResp = requestFileSets(accessToken, False, '',RDP_TH_BUCKET,'&attributes=venue:'+CFS_VBD_VENUE+',view='+CFS_VBD_VIEW+

                               '&contentFrom=2007-01-01T00:00:00Z&contentTo=2010-01-01T00:00:00Z');

print('Parsed json response=');

print(json.dumps(jsonFullResp, indent=2));

print('Same response, tabular view');

df = pd.json_normalize(jsonFullResp['value'])

df

Retrieve Complete File Details of FileSet ID

Once we have the FileSet Id of a FileSet that we are interested to examine the files in, we can use it, to retrieve the complete details of a FileSet, by passing it into function requestFileDetails:

    	
            

FILES_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files?filesetId='

 

def requestFileDetails(token, fileSetId):   

    global FILES_ENDPOINT

    print("Obtaining File details for FileSet= "+ fileSetId + " ...")

    FILES_ENDPOINT = FILES_ENDPOINT + fileSetId

    print("(If result is Response=400, make sure that fileSetId that is passed in is set with a valid value...)")

  

    querystring = {}

    payload = ""

    jsonfull = ""

    jsonpartial = ""

    

    headers = {

            'Content-Type': "application/json",

            'Authorization': "Bearer " + token,

            'cache-control': "no-cache"

    }

        

    response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)

         

    print('Raw response=');

    print(response);

    

    if response.status_code == 200:

        jsonFullResp = json.loads(response.text)

        return jsonFullResp; 

    else:

        return '';

 

jsonFullResp = requestFileDetails(accessToken, FILESET_ID);

print('Parsed json response=');

print(json.dumps(jsonFullResp, indent=2));

df = pd.json_normalize(jsonFullResp['value'])

df

Stream File via File Id using Redirect

Note that both FileId and file name are required, to be passed into the next function, so that the file being streamed is stored in a meaningful name that corresponds to the downloaded content.  This information can be pre-selected, by manually pasting from the previous sections output and reading in; for production solutions it should be parsed out from the complete file details, in code.

    	
            

import shutil

 

FILES_STREAM_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files/'

 

# use valid values, obtained from the previous step

exampleFileId = '4edd-99af-da829f42-8ddd-07fabfcddca9'  

exampleFileName = 'RFT-ESG-Sources-Full-Init-2021-01-17-part07.jsonl.gz'

 

def requestFileDownload(token, fileId, fileName):   

    FILES_STREAM_ENDPOINT = FILES_STREAM_ENDPOINT_START + fileId+ '/stream'

    print("Obtaining File ... " + FILES_STREAM_ENDPOINT)

  

    chunk_size = 1000

    

    headers = {

            'Authorization': 'Bearer ' + token,

            'cache-control': "no-cache",

            'Accept': '*/*'

    }

        

    response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)

 

         

    print('Response code=' + str(response.status_code));

    

    if response.status_code == 200:

        print('Processing...')

        with open(fileName, 'wb') as fd:

            shutil.copyfileobj(response.raw, fd) 

        print('Look for gzipped file named: '+ fileName + ' in current directory')

        response.connection.close()

        

    return; 

 

# consider below an example only

requestFileDownload(accessToken, exampleFileId, exampleFileName);

#requestFileDownload(accessToken, FILE_ID, FILE_NAME);

Stream All Files In FileSet

This is a very popular use case, as the files that are part of a FileSet are united by a common criteria.  From the results of requestFilDetails, we derive the fileId and the file name of every file in FilSet and use requestFileDownload passing them as parameters

    	
            

print("List of files to be streamed by this step:")

for item in jsonFullResp['value']:

    print ('File name: ' +item['filename'])

print("\n... Starting to stream now, this may take long  .<.>. .<.>. .<.>.")

for item in jsonFullResp['value']:

    print ('Streaming File: ' +item['filename'])

    requestFileDownload(accessToken, item['id'],item['filename']);

Stream File via File Id  - Get File Location (Step 1 of 2)

Next we would like to introduce another way of streaming files:

  1. Get file's location
  2. Download the file directly, using file's location
    	
            

import shutil

 

FILES_STREAM_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files/'

DIRECT_URL = ''

 

exampleFileId = '4edd-99af-da829f42-8ddd-07fabfcddca9'

 

def requestFileLocation(token, fileId):   

    

    FILES_STREAM_ENDPOINT = FILES_STREAM_ENDPOINT_START + fileId+ '/stream?doNotRedirect=true'

    

    print("Obtaining File ... " + FILES_STREAM_ENDPOINT)

  

    chunk_size = 1000

    

    headers = {

            'Authorization': 'Bearer ' + token,

            'cache-control': "no-cache",

            'Accept': '*/*'

    }

        

    response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)

 

         

    print('Response code=' + str(response.status_code));

    

    if response.status_code == 200:

        jsonFullResp = json.loads(response.text)

        print('Parsed json response=');

        print(json.dumps(jsonFullResp, indent=2));

        DIRECT_URL = jsonFullResp['url'];

        print('File Direct URL is: '  +str(DIRECT_URL)+ '|||');

        

    return DIRECT_URL; 

 

 

DIRECT_URL = requestFileLocation(accessToken, FILE_ID);

Stream File via File Id  - Download File From File Location (Step 2 of 2)

    	
            

from urllib.parse import urlparse, parse_qs

 

exampleFileName = 'RFT-ESG-Sources-Full-Init-2021-01-17-part07.jsonl.gz'

 

def requestDirectFileDownload(token, fileURL, fileName):   

 

    print("Obtaining File from URL... " + fileURL + '... to file name=' + fileName)

    

    #Parse out URL parameters for submission into requests

    url_obj = urlparse(fileURL)

    parsed_params = parse_qs(url_obj.query)

    # extract the URL without query parameters

    parsed_url = url_obj._replace(query=None).geturl()

 

    response = requests.get(parsed_url, params=parsed_params,stream=True)

        

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.get(parsed_url, params=query)

 

         

    print('Response code=' + str(response.status_code));        

  

    filename = 'another_'+fileName    

    

    if response.status_code == 200:

        print('Processing...')

        with open(filename, 'wb') as fd:

            shutil.copyfileobj(response.raw, fd) 

 

        print('Look for gzipped file named: '+ filename + ' in current directory')

        response.connection.close()

        

    return; 

 

 

requestDirectFileDownload(accessToken, DIRECT_URL, FILE_NAME);