This is basically how I collect physical user logons along with reboots, etc. I dump this shit into a database and deliver via a web-page so folks can do dated searches by workstation or user ID. Works beautifully. You will need the Win32api libraries. I am running this on a Win 2019 server from Python3.7, which I am locked into as I am compiled against WSGI. This focuses on Windows 10 workstations since we moved to the latest and greatest so it will need adjusted for older versions. That will change the game a bit so look up 528/538 if that is the case. Read through the code for ideas on how best to use this in your world.
import win32evtlog
import win32evtlogutil
import win32security
import win32net
import win32con
import winerror
import datetime
'''
User logon collections running on Windows 2019 server collecting records on
Windows 10 workstations.
'''
def logon_types(evt_type, evt_id):
'''
This has been fairly solid and I have validated against my
workstation usage. If your org allows screen savers add in
4801 activated and 4802 deactivated.
'''
logon_type = None
if evt_id == 4624:
# Successful user log on event. I skip evt_type 7 as it is collected in 4800/4801.
if evt_type in [2, 11]:
value_type='user_logon' # Interactive/cached logon.
# I skip 4635 as it adds too many unneeded entries.
if evt_id == 4647:
# Successful user log off event.
logon_type='user_logoff' # Interactive log off.
if evt_id == 4800:
logon_type='wkst_lock' # workstation lock invoked.
if evt_id == 4801:
logon_type='wkst_unlock' # workstation unlock invoked.
return value_type
def get_logon_events(server, run_delta):
'''Get user logons from event log.'''
format = '%a %b %d %H:%M:%S %Y'
print('Received %s, %s' % (server,run_delta))
rows = []
try:
# ==========================================
# START ON SECURITY LOG...
# ==========================================
log='Security'
#id_filter = [528,534,4611,4624,4634,4647,4672,4800,4801]
id_filter = [4800,4801,4624,4634,4647]
flags = win32evtlog.EVENTLOG_BACKWARDS_READ|win32evtlog.EVENTLOG_SEQUENTIAL_READ
#This dict converts the event type into a human readable form
evt_dict={win32con.EVENTLOG_AUDIT_FAILURE:'EVENTLOG_AUDIT_FAILURE',\
win32con.EVENTLOG_AUDIT_SUCCESS:'EVENTLOG_AUDIT_SUCCESS',\
win32con.EVENTLOG_INFORMATION_TYPE:'EVENTLOG_INFORMATION_TYPE',\
win32con.EVENTLOG_WARNING_TYPE:'EVENTLOG_WARNING_TYPE',\
win32con.EVENTLOG_ERROR_TYPE:'EVENTLOG_ERROR_TYPE'}
#open event log
hand=win32evtlog.OpenEventLog(server,log)
events=1
_break = False
while events:
if _break: break
events=win32evtlog.ReadEventLog(hand,flags,0)
for ev_obj in events:
try:
event_time = ev_obj.TimeGenerated.Format()
event_dt = datetime.datetime.strptime(event_time, format)
if event_dt < run_delta:
_break = True
break
evt_id = str(winerror.HRESULT_CODE(ev_obj.EventID)).strip().lower()
evt_id = int(evt_id)
if evt_id not in id_filter: continue
# Get the data elements.
cat = str(ev_obj.EventCategory)
src = str(ev_obj.SourceName)
record = str(ev_obj.RecordNumber)
evt_type = str(evt_dict[ev_obj.EventType]).strip().lower()
if 'EVENTLOG_AUDIT_SUCCESS' not in evt_type.upper():
continue
msg = win32evtlogutil.SafeFormatMessage(ev_obj)
if not msg: continue
ascii_msg = msg.encode('ascii','ignore').decode().strip().lower()
# Do some filtering if you're picking up noise, like service accounts, test, whatever.
user_id = '-'
msg_parts = ascii_msg.split(',')
action = 0
ltype = 0
if evt_id == 4624:
# LOG ON
domain_name = msg_parts[6].strip().lower()
user_id = msg_parts[5].strip().lower()
ltype = int(msg_parts[8])
if 'winlogon.exe' not in msg_parts[17].lower() and 'svchost.exe' not in msg_parts[17].lower(): continue
elif evt_id == 4634:
domain_name = msg_parts[-3].strip().lower()
user_id = msg_parts[-4].strip().lower()
ltype = int(msg_parts[-1].replace("'.>",''))
elif evt_id == 4647:
domain_name = msg_parts[-2].strip().lower()
user_id = msg_parts[-3].strip().lower()
elif evt_id in [4800,4801]:
domain_name = msg_parts[2].strip().lower()
user_id = msg_parts[1].strip().lower()
ltype = int(msg_parts[4].replace("'.>",''))
else:
continue
logon_key = logon_types(ltype, evt_id)
if not logon_key: continue
# Do some filtering if you're picking up noise, like service accounts, test, whatever.
if 'MYDOMAIN' not in domain_name.lower():
domain_name = 'MYDOMAIN' # CHANGE TO YOUR DOMAIN OR REMOVE THIS!
if not user_id: continue
if '$' in user_id: continue
if 'SYSTEM' in user_id.upper(): continue
if 'ANONYMOUS LOGON' in user_id.upper(): continue
if '-' in user_id.upper(): continue
if '%%' in user_id.upper(): continue
if 'TEST' in user_id.upper(): continue
if 'IMAUSER' in user_id.upper(): continue
if 'SERVICE_' in user_id.upper(): continue # Filter service accounts however yours are defined.
#print(ascii_msg) # Raw message.
#print('****', event_dt, evt_id, domain_name, user_id, ltype, logon_key) # Parts.
# This data gets inserted into a database and needs to be in this format for me.
row = {'date':event_dt,'wkst':server,'evt_id':evt_id,'user_id':user_id,'domain':domain_name,'desc':logon_key}
if row not in rows: rows.append(row)
except:
pass # Junk, ignore.
win32evtlog.CloseEventLog(hand) # Clean up...
del(hand)
except:
pass # Can't access, ignore.
print('%s processed ok, record count %s' % (server,len(rows)))
return rows
def get_shutdown_events(server, run_delta):
'''Get user logons from event log.'''
format = '%a %b %d %H:%M:%S %Y'
print('Received %s, %s' % (server,run_delta))
rows = []
try:
# ==========================================
# START ON SYSTEM LOG...
# ==========================================
log='System'
id_filter = [6005,6006,6008,1074]
flags = win32evtlog.EVENTLOG_BACKWARDS_READ|win32evtlog.EVENTLOG_SEQUENTIAL_READ
#This dict converts the event type into a human readable form
evt_dict={win32con.EVENTLOG_AUDIT_FAILURE:'EVENTLOG_AUDIT_FAILURE',\
win32con.EVENTLOG_AUDIT_SUCCESS:'EVENTLOG_AUDIT_SUCCESS',\
win32con.EVENTLOG_INFORMATION_TYPE:'EVENTLOG_INFORMATION_TYPE',\
win32con.EVENTLOG_WARNING_TYPE:'EVENTLOG_WARNING_TYPE',\
win32con.EVENTLOG_ERROR_TYPE:'EVENTLOG_ERROR_TYPE'}
#open event log
hand=win32evtlog.OpenEventLog(server,log)
events=1
_break = False
while events:
if _break: break
events=win32evtlog.ReadEventLog(hand,flags,0)
for ev_obj in events:
try:
reason_key = 'power_off'
event_time = ev_obj.TimeGenerated.Format()
event_dt = datetime.datetime.strptime(event_time, format)
if event_dt < run_delta:
_break = True
break
# Get the data elements.
cat = str(ev_obj.EventCategory)
src = str(ev_obj.SourceName)
record = str(ev_obj.RecordNumber)
evt_id = int(winerror.HRESULT_CODE(ev_obj.EventID))
if evt_id not in id_filter: continue
msg = win32evtlogutil.SafeFormatMessage(ev_obj)
if not msg: continue
ascii_msg = msg.encode('ascii','ignore').decode().strip().lower()
#Out.debug(ascii_msg)
if evt_id == 6006:
reason_key = 'power_off'
elif evt_id == 6008:
reason_key = 'power_off'
if evt_id == 6005:
reason_key = 'power_on'
if evt_id == 1074:
if 'explorer.exe' not in ascii_msg: continue
reason_key = 'restart'
ascii_msg = ascii_msg.replace(".>",'').replace("'",'')
msg_parts = ascii_msg.split(',')
user_id = '-'
if '\\' in msg_parts[-1]:
domain_name = msg_parts[-1].split('\\')[0].strip()
user_id = msg_parts[-1].split('\\')[-1]
else:
user_id = msg_parts[-1]
if 'SOME_DOMAIN' not in domain_name.lower():
domain_name = '-'
user_id = user_id.strip()
if user_id.startswith('{'): continue
if user_id.startswith('svc'): continue
if '-' in user_id: continue
if 'system' in user_id.lower(): user_id = '-'
row = {'date':event_dt,'wkst':server,'evt_id':evt_id,'user_id':user_id,'domain':domain_name,'desc':reason_key}
if row not in rows: rows.append(row)
except:
pass # Junk, ignore.
win32evtlog.CloseEventLog(hand) # Clean up...
del(hand)
except:
pass # Not accessable, ignore.
print('%s processed ok, record count %s' % (server,len(rows)))
return rows
def save_rows(records):
if not records:
print('No records received.')
return False
print('Save records here')
return True
if __name__=='__main__':
try:
ALL_RECORDS = []
period_in_hours = 24 # I collect 24 hours of log data each run.
'''
This is a working example only so you might not want to put into production
until you have done some cleanup and changes/error handling. This is not the code I run in production,
just bits and pieces to provide an example. The big thing to pay attention to are the event IDs,
the event type, and the splits to get the details you need. This only focuses on a physical standard
user.
In my production code I load up a collection of successfully pinged user workstations from AD that
I store daily. I divide the full list into three separate lists and store on disk. Each night I run
three separate logon collection scripts reading in its respective list of computers. about 670 per script.
Each script runs threaded, about 40 threads, collecting 24 hours of records for each machine. I get
all 2000 machines in less than 4 hours this way and only hit each machine once per night. We used to have a
distributed method that wold get all 2000 in less than an hour, but I am scaling back as our current staff
do not want to support in-house systems code/environments.
If you have a lot of machines threaded will speed things up.
I offset the tasks by two hours, starting the first at 6 pm. If your machines are not
returning all the records you expect to see it is likely the logs are filling up and rotating
what you want before you get to it. You may want to change tasks and collections to 8 hours
if that is the case.
'''
works = ['computer1', 'computer2', 'etc...']
print('Total wkst count: %s' % len(works))
# Here is an example on how to devide your master list.
works.sort()
list_count = 3
limit = round(len(works) / list_count)
work_lists = [works[n:n+limit] for n in range(0, len(works), limit)]
works_group = work_lists[0] # Get the first list to process.
print('Processing count: %s' % len(works_group))
# Set the delta for log collection.
run_time_dt = datetime.datetime.now()
run_delta = run_time_dt - datetime.timedelta(hours=period_in_hours)
# Loop the workstations and do the collections. I merge this data...
for wrk in works_group:
# Do logons.
recs = get_logon_events(wrk, run_delta)
ALL_RECORDS.extend(recs)
# Do shutdowns.
recs = get_shutdown_events(wrk, run_delta)
ALL_RECORDS.extend(recs)
# Save however you please or generate a report.
if ALL_RECORDS:
print('Saving %s records' % len(ALL_RECORDS))
saved = save_rows(ALL_RECORDS)
if saved:
print('All records saved successfully.')
else:
print('Could not save records.')
else:
print('No records to save.')
print('Finished')
except:
print('Failed in main!')