Get Dynatrace json dumps from threaded Python 3.

Today was a boring day so I was doing anything to make it go faster. I ended up redoing some Python code. It didn’t need to be threaded but I was bored! It’s cold, windy, and dark here. Blah! Maybe someone will find this useful?

#!/usr/bin/python3
from concurrent.futures import ThreadPoolExecutor
import requests # https://pypi.org/project/requests/
import datetime
import os
import json
import logging
import traceback
 
def init_logging(log_path):
    log_format = '%(asctime)s - %(levelname)s - %(message)s'
    logging.basicConfig(filename=log_path, level=logging.INFO, format=log_format)
 
def dynatrace_request(args):
    '''Request/Reponse'''
   
    uri,output,token,cert = args
    result = None
   
    headers = {'accept':'application/json','Authorization':'Api-Token %s' % token}
    try:
        response = requests.get(uri, headers=headers, verify=cert)
        logging.info('Request status: %s' % response.status_code)
        result = json.dumps(response.json())
        with open(output,'w') as output:
            output.write(result)
    except:
        err_msg = traceback.format_exc()
        logging.error(err_msg)
       
    return result
 
if __name__ in '__main__':
 
    DAYS  = 30   # Days to current date.
    HOST  = 'https://SOME_SERVER/e/SOME-ID'
    TOKEN = 'YOUR TOKEN'
    CERT  = False # Path to cert. Will work with false but undsafe.
    LOG   = r'c:/temp/dyna.log'
 
    # Increment timestamp.
    const_day = 86400
    days_ts = DAYS * const_day
    adj_dt = int(datetime.datetime.now().timestamp())
    today_dt = int(datetime.datetime.now().timestamp())
    start_dt = adj_dt - days_ts
   
    # Convert to string.
    start_dt = '%s000' % start_dt
    today_dt = '%s000' % today_dt
 
    # Make sure we have a directory to put our data.
    base_path = os.path.split(LOG)[0]
    if not os.exists(base_path):
        os.mkdir(base_path)
 
    # Dictionary of tasks with output paths since we are dumping to json to be picked up by an Ajax routine on some dashboard.
    tasks = {
        "c:/temp/counts.json":r"{0}/api/v1/problem/status".format(HOST),
        "c:/temp/apdex.json":r"{0}/api/v1/userSessionQueryLanguage/table?query=select%20apdexCategory%2C%20application%2C%20count(*)%20FROM%20useraction%20where%20usersession.userType%20%3D%20'REAL_USER'%20GROUP%20BY%20apdexCategory%2Capplication&startTimestamp={1}&endTimestamp={2}&explain=false".format(HOST,start_dt,today_dt)
    }
 
    # init logging.
    init_logging(LOG)
 
    logging.info('Started')
   
    # Get json.
    work_count = len(tasks)   
    with ThreadPoolExecutor(max_workers=work_count) as executor:
        for outp,url in tasks.items():
            executor.submit(dynatrace_request, (url, outp, TOKEN, CERT))
 
    logging.info('Finished')