#Python and Alienvault
I recently identified the need to perform a triage on several hundred IPs extracted from a log that was not associated with any SIEM or CTI solution. To address this, I utilized simple scripting techniques to compile the data into a user-friendly list of IPs for Python to interpret. Despite my efforts to locate a suitable tool for this purpose, the majority of my findings consisted of outdated and extensive codebases, making it difficult to pinpoint my specific requirements. As a result, I seized the opportunity to write a simple tool to meet my needs. Although the code may not be aesthetically pleasing, it effectively provides the necessary feedback. I am confident that this code snippet can be of great value to others facing similar challenges, and the best part is that it does not require any prior coding experience to utilize and generate results. Just add in your Alienvault API key and the list of IPs as commented in the code and run it like any other python script.
This script performs a comprehensive analysis of each IP address provided, cross-referencing it with information from AlienVault. Specifically, it examines any suspicious activity within the malware section that occurred within the timeframe defined by the variable lookback (which is currently set to 30 days). Upon completion of the analysis, each IP address is stored in the designated file ips.txt
.
from datetime import datetime, timedelta
import json
import requests
import os
import concurrent.futures
lookback = 30
import os
filename = 'ips.txt'
# Create file if not existing
if not os.path.exists(filename):
open(filename, 'w').close()
# Insert your AlienVault API key
API_KEY = ""
# Insert a list of IP addresses to check
ips = []
OTX_API_ENDPOINT = "https://otx.alienvault.com/api/v1/indicators/IPv6/{ip}/{malware}"
HEADERS = {"X-OTX-API-KEY": API_KEY}
def check_date(date_str):
date = datetime.strptime(date_str.split('T')[0], '%Y-%m-%d')
delta = datetime.now() - date
if delta < timedelta(days=lookback):
return True
triage_ips = []
with open('ips.txt', 'r') as f:
existing_ips = set(f.read().splitlines())
with open('ips.txt', 'a') as f:
with requests.Session() as session:
def process_ip(ip):
if ip in existing_ips:
print(f"Skipping {ip} as it already exists in the file")
return
try:
with session.get(f"https://otx.alienvault.com/api/v1/indicators/IPv4/{ip}/reputation", stream=True) as reputation:
reputation.raise_for_status()
reputation_data = reputation.json()
with session.get(f"https://otx.alienvault.com/api/v1/indicators/IPv4/{ip}/malware", stream=True) as malware:
malware.raise_for_status()
malware_data = malware.json()
date_values = [d['date'] for d in malware_data['data']]
for date in date_values:
if check_date(date):
triage_ips.append(ip)
print(ip)
f.write(f"{ip}\n")
break
except requests.exceptions.HTTPError as e:
print(f"Error: {e} for {ip}")
except json.decoder.JSONDecodeError:
print(f"Error: JSON decoding failed for {ip}")
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(process_ip, ips)
print("Triage these IPs:", triage_ips)
Moving forward, it would be interesting to add a few triage levels based on different lookbacks stored in separate files for 7 days, 14 days and 30 days and possibly avoid duplicates between them (where the most recent IP activity gets precedense).
Any additions or suggestions, please reach out through issues or any other SoMe.