Collect user logons from Windows 10 with Python 3.

This is basically how I collect physical user logons along with reboots, etc. I dump this shit into a database and deliver via a web-page so folks can do dated searches by workstation or user ID. Works beautifully. You will need the Win32api libraries. I am running this on a Win 2019 server from Python3.7, which I am locked into as I am compiled against WSGI. This focuses on Windows 10 workstations since we moved to the latest and greatest so it will need adjusted for older versions. That will change the game a bit so look up 528/538 if that is the case. Read through the code for ideas on how best to use this in your world.

import win32evtlog
import win32evtlogutil
import win32security
import win32net
import win32con
import winerror
import datetime

'''
User logon collections running on Windows 2019 server collecting records on
Windows 10 workstations.
'''
 
def logon_types(evt_type, evt_id):
    '''
    This has been fairly solid and I have validated against my
    workstation usage. If your org allows screen savers add in
    4801 activated and 4802 deactivated.
    '''
    logon_type = None
       
    if evt_id == 4624:
       
        # Successful user log on event. I skip evt_type 7 as it is collected in 4800/4801.
        if evt_type in [2, 11]:
            value_type='user_logon' # Interactive/cached logon.
           
    # I skip 4635 as it adds too many unneeded entries.        
    if evt_id == 4647:
       
        # Successful user log off event.
        logon_type='user_logoff' # Interactive log off.
   
    if evt_id == 4800:
 
        logon_type='wkst_lock' # workstation lock invoked.
 
    if evt_id == 4801:
 
        logon_type='wkst_unlock' # workstation unlock invoked.
       
    return value_type
 
def get_logon_events(server, run_delta):
    '''Get user logons from event log.'''
   
    format = '%a %b %d %H:%M:%S %Y'
    print('Received %s, %s' % (server,run_delta))
   
    rows = []
    try:
 
        # ==========================================
        # START ON SECURITY LOG...
        # ==========================================
       
        log='Security'
       
        #id_filter = [528,534,4611,4624,4634,4647,4672,4800,4801]
        id_filter = [4800,4801,4624,4634,4647]
 
        flags = win32evtlog.EVENTLOG_BACKWARDS_READ|win32evtlog.EVENTLOG_SEQUENTIAL_READ
 
        #This dict converts the event type into a human readable form
        evt_dict={win32con.EVENTLOG_AUDIT_FAILURE:'EVENTLOG_AUDIT_FAILURE',\
                win32con.EVENTLOG_AUDIT_SUCCESS:'EVENTLOG_AUDIT_SUCCESS',\
                win32con.EVENTLOG_INFORMATION_TYPE:'EVENTLOG_INFORMATION_TYPE',\
                win32con.EVENTLOG_WARNING_TYPE:'EVENTLOG_WARNING_TYPE',\
                win32con.EVENTLOG_ERROR_TYPE:'EVENTLOG_ERROR_TYPE'}
 
        #open event log
        hand=win32evtlog.OpenEventLog(server,log)
        events=1
       
        _break = False
       
        while events:
 
            if _break: break
 
            events=win32evtlog.ReadEventLog(hand,flags,0)
           
            for ev_obj in events:
                try: 
 
                    event_time = ev_obj.TimeGenerated.Format()
                    event_dt = datetime.datetime.strptime(event_time, format)
                    if event_dt < run_delta:
                        _break = True
                        break
 
                    evt_id = str(winerror.HRESULT_CODE(ev_obj.EventID)).strip().lower()
                    evt_id = int(evt_id)
                    if evt_id not in id_filter: continue
                   
                    # Get the data elements.
                    cat = str(ev_obj.EventCategory)
                    src = str(ev_obj.SourceName)
                    record = str(ev_obj.RecordNumber)
 
                    evt_type = str(evt_dict[ev_obj.EventType]).strip().lower()
 
                    if 'EVENTLOG_AUDIT_SUCCESS' not in evt_type.upper():
                        continue
                   
                    msg = win32evtlogutil.SafeFormatMessage(ev_obj)
                    if not msg: continue
                    ascii_msg = msg.encode('ascii','ignore').decode().strip().lower()
 
                    # Do some filtering if you're picking up noise, like service accounts, test, whatever.
                    user_id = '-'
                    msg_parts = ascii_msg.split(',')
                    action = 0
                    ltype = 0
                    if evt_id == 4624:
                        # LOG ON
                        domain_name = msg_parts[6].strip().lower()
                        user_id = msg_parts[5].strip().lower()
                        ltype = int(msg_parts[8])
                        if 'winlogon.exe' not in msg_parts[17].lower() and 'svchost.exe' not in msg_parts[17].lower(): continue
                           
                    elif evt_id == 4634:
 
                        domain_name = msg_parts[-3].strip().lower()
                        user_id = msg_parts[-4].strip().lower()
                   
                        ltype = int(msg_parts[-1].replace("'.>",''))
                           
                    elif evt_id == 4647:
                       
                        domain_name = msg_parts[-2].strip().lower()
                        user_id = msg_parts[-3].strip().lower()
                       
                    elif evt_id in [4800,4801]:
                        domain_name = msg_parts[2].strip().lower()
                        user_id = msg_parts[1].strip().lower()
                        ltype = int(msg_parts[4].replace("'.>",''))
                    else:
                        continue
 
                    logon_key = logon_types(ltype, evt_id)
                    if not logon_key: continue
 
                    # Do some filtering if you're picking up noise, like service accounts, test, whatever.
                    if 'MYDOMAIN' not in domain_name.lower():
                        domain_name = 'MYDOMAIN' # CHANGE TO YOUR DOMAIN OR REMOVE THIS!
 
                    if not user_id: continue
                    if '$' in user_id: continue
                    if 'SYSTEM' in user_id.upper(): continue
                    if 'ANONYMOUS LOGON' in user_id.upper(): continue
                    if '-' in user_id.upper(): continue
                    if '%%' in user_id.upper(): continue
                    if 'TEST' in user_id.upper(): continue
                    if 'IMAUSER' in user_id.upper(): continue
                    if 'SERVICE_' in user_id.upper(): continue # Filter service accounts however yours are defined.
                    
                    #print(ascii_msg) # Raw message.
                    #print('****', event_dt, evt_id, domain_name, user_id, ltype, logon_key) # Parts.
 
                    # This data gets inserted into a database and needs to be in this format for me.
                    row = {'date':event_dt,'wkst':server,'evt_id':evt_id,'user_id':user_id,'domain':domain_name,'desc':logon_key}
                    if row not in rows: rows.append(row)
                   
                except:
                    pass # Junk, ignore.
      
        win32evtlog.CloseEventLog(hand) # Clean up...
        del(hand)
    except:
        pass # Can't access, ignore.
   
    print('%s processed ok, record count %s' % (server,len(rows)))
    return rows
 
def get_shutdown_events(server, run_delta):
    '''Get user logons from event log.'''
   
    format = '%a %b %d %H:%M:%S %Y'
    print('Received %s, %s' % (server,run_delta))
   
    rows = []
    try:
        # ==========================================
        # START ON SYSTEM LOG...
        # ==========================================
        log='System'
        id_filter = [6005,6006,6008,1074]
 
        flags = win32evtlog.EVENTLOG_BACKWARDS_READ|win32evtlog.EVENTLOG_SEQUENTIAL_READ
 
        #This dict converts the event type into a human readable form
        evt_dict={win32con.EVENTLOG_AUDIT_FAILURE:'EVENTLOG_AUDIT_FAILURE',\
                win32con.EVENTLOG_AUDIT_SUCCESS:'EVENTLOG_AUDIT_SUCCESS',\
                win32con.EVENTLOG_INFORMATION_TYPE:'EVENTLOG_INFORMATION_TYPE',\
                win32con.EVENTLOG_WARNING_TYPE:'EVENTLOG_WARNING_TYPE',\
                win32con.EVENTLOG_ERROR_TYPE:'EVENTLOG_ERROR_TYPE'}
 
        #open event log
        hand=win32evtlog.OpenEventLog(server,log)
        events=1
 
        _break = False
       
        while events:
 
            if _break: break
           
            events=win32evtlog.ReadEventLog(hand,flags,0)
           
            for ev_obj in events:
                try: 
                    reason_key = 'power_off'
 
                    event_time = ev_obj.TimeGenerated.Format()
                    event_dt = datetime.datetime.strptime(event_time, format)
                    if event_dt < run_delta:
                        _break = True
                        break
                                       
                    # Get the data elements.
                    cat = str(ev_obj.EventCategory)
                    src = str(ev_obj.SourceName)
                    record = str(ev_obj.RecordNumber)
                    evt_id = int(winerror.HRESULT_CODE(ev_obj.EventID))

                    if evt_id not in id_filter: continue               
                    
                    msg = win32evtlogutil.SafeFormatMessage(ev_obj)
                    if not msg: continue
                    
                    ascii_msg = msg.encode('ascii','ignore').decode().strip().lower()
                    #Out.debug(ascii_msg)
                   
                    if evt_id == 6006:
                        reason_key = 'power_off'
                    elif evt_id == 6008:
                        reason_key = 'power_off'
                    if evt_id == 6005:
                        reason_key = 'power_on'
                    if evt_id == 1074:
                        if 'explorer.exe' not in ascii_msg: continue
                        reason_key = 'restart'
                   
                    ascii_msg = ascii_msg.replace(".>",'').replace("'",'')
                    msg_parts = ascii_msg.split(',')
 
                    user_id = '-'
                    if '\\' in msg_parts[-1]:
                        domain_name = msg_parts[-1].split('\\')[0].strip()
                        user_id = msg_parts[-1].split('\\')[-1]
                    else:
                        user_id = msg_parts[-1]
                       
                    if 'SOME_DOMAIN' not in domain_name.lower():
                        domain_name = '-'
                       
                    user_id = user_id.strip()
                    if user_id.startswith('{'): continue
                    if user_id.startswith('svc'): continue
                    if '-' in user_id: continue
               
                    if 'system' in user_id.lower(): user_id = '-'
 
                    row = {'date':event_dt,'wkst':server,'evt_id':evt_id,'user_id':user_id,'domain':domain_name,'desc':reason_key}
                    if row not in rows: rows.append(row)
                   
                except:
                    pass # Junk, ignore.
               
        win32evtlog.CloseEventLog(hand) # Clean up...
        del(hand)
    except:
        pass # Not accessable, ignore.
   
    print('%s processed ok, record count %s' % (server,len(rows)))
    return rows
 
def save_rows(records):
    if not records:
        print('No records received.')
        return False
    print('Save records here')
    return True
 
if __name__=='__main__':
    try:               
        ALL_RECORDS = []
        period_in_hours = 24 # I collect 24 hours of log data each run.
           
        '''
        This is a working example only so you might not want to put into production
        until you have done some cleanup and changes/error handling. This is not the code I run in production,
        just bits and pieces to provide an example. The big thing to pay attention to are the event IDs,
        the event type, and the splits to get the details you need. This only focuses on a physical standard
        user.
       
        In my production code I load up a collection of successfully pinged user workstations from AD that
        I store daily. I divide the full list into three separate lists and store on disk. Each night I run
        three separate logon collection scripts reading in its respective list of computers. about 670 per script.
        Each script runs threaded, about 40 threads, collecting 24 hours of records for each machine. I get
        all 2000 machines in less than 4 hours this way and only hit each machine once per night. We used to have a
        distributed method that wold get all 2000 in less than an hour, but I am scaling back as our current staff
        do not want to support in-house systems code/environments.
        
        If you have a lot of machines threaded will speed things up.
        I offset the tasks by two hours, starting the first at 6 pm. If your machines are not
        returning all the records you expect to see it is likely the logs are filling up and rotating
        what you want before you get to it. You may want to change tasks and collections to 8 hours
        if that is the case.
        '''
        works = ['computer1', 'computer2', 'etc...']
        print('Total wkst count: %s' % len(works))
 
        # Here is an example on how to devide your master list.
        works.sort()

        list_count = 3
        limit = round(len(works) / list_count)

        work_lists = [works[n:n+limit] for n in range(0, len(works), limit)]
 
        works_group = work_lists[0] # Get the first list to process.
       
        print('Processing count: %s' % len(works_group))
 
        # Set the delta for log collection.
        run_time_dt = datetime.datetime.now()
        run_delta = run_time_dt - datetime.timedelta(hours=period_in_hours)
 
        # Loop the workstations and do the collections. I merge this data...
        for wrk in works_group:
            # Do logons.
            recs = get_logon_events(wrk, run_delta)
            ALL_RECORDS.extend(recs)
 
            # Do shutdowns.
            recs = get_shutdown_events(wrk, run_delta)
            ALL_RECORDS.extend(recs)
 
        # Save however you please or generate a report.
        if ALL_RECORDS:
            print('Saving %s records' % len(ALL_RECORDS))
            saved = save_rows(ALL_RECORDS)
            if saved:
                print('All records saved successfully.')
            else:
                print('Could not save records.')
        else:
            print('No records to save.')
           
        print('Finished')
    except:
        print('Failed in main!')