Today was a boring day so I was doing anything to make it go faster. I ended up redoing some Python code to get Dynatrace data. It didn’t need to be threaded but I was bored! It’s cold, windy, and dark here. Blah! Maybe someone will find this useful?
#!/usr/bin/python3
from concurrent.futures import ThreadPoolExecutor
import requests # https://pypi.org/project/requests/
import datetime
import os
import json
import logging
import traceback
def init_logging(log_path):
log_format = '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(filename=log_path, level=logging.INFO, format=log_format)
def dynatrace_request(args):
'''Request/Reponse'''
uri,output,token,cert = args
result = None
headers = {'accept':'application/json','Authorization':'Api-Token %s' % token}
try:
response = requests.get(uri, headers=headers, verify=cert)
logging.info('Request status: %s' % response.status_code)
result = json.dumps(response.json())
with open(output,'w') as output:
output.write(result)
except:
err_msg = traceback.format_exc()
logging.error(err_msg)
return result
if __name__ in '__main__':
DAYS = 30 # Days to current date.
HOST = 'https://SOME_SERVER/e/SOME-ID'
TOKEN = 'YOUR TOKEN'
CERT = False # Path to cert. Will work with false but undsafe.
LOG = r'c:/temp/dyna.log'
# Increment timestamp.
const_day = 86400
days_ts = DAYS * const_day
adj_dt = int(datetime.datetime.now().timestamp())
today_dt = int(datetime.datetime.now().timestamp())
start_dt = adj_dt - days_ts
# Convert to string.
start_dt = '%s000' % start_dt
today_dt = '%s000' % today_dt
# Make sure we have a directory to put our data.
base_path = os.path.split(LOG)[0]
if not os.exists(base_path):
os.mkdir(base_path)
# Dictionary of tasks with output paths since we are dumping to json to be picked up by an Ajax routine on some dashboard.
tasks = {
"c:/temp/counts.json":r"{0}/api/v1/problem/status".format(HOST),
"c:/temp/apdex.json":r"{0}/api/v1/userSessionQueryLanguage/table?query=select%20apdexCategory%2C%20application%2C%20count(*)%20FROM%20useraction%20where%20usersession.userType%20%3D%20'REAL_USER'%20GROUP%20BY%20apdexCategory%2Capplication&startTimestamp={1}&endTimestamp={2}&explain=false".format(HOST,start_dt,today_dt)
}
# init logging.
init_logging(LOG)
logging.info('Started')
# Get json.
work_count = len(tasks)
with ThreadPoolExecutor(max_workers=work_count) as executor:
for outp,url in tasks.items():
executor.submit(dynatrace_request, (url, outp, TOKEN, CERT))
logging.info('Finished')