temp
第一步:设置环境
-
安装Python库:
nmap
: 用于网络扫描。sqlite3
: 用于与SQLite数据库交互(Python标准库中已包含)。requests
和httpx
: 用于HTTP检测。sqlite3
: 用于与SQLite数据库交互(Python标准库中已包含)。requests
和httpx
: 用于HTTP检测。ssl
:用于获取TLS信息。- 使用
pip
安装库:python-nmap
,requests
,httpx
.
pip install python-nmap requests httpx
第二步:设置SQLite数据库
首先,我们需要一个简单的SQLite数据库,用于存储扫描结果。
import sqlite3
def init_db():
conn = sqlite3.connect('network_assets.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS assets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
host TEXT,
port INTEGER,
protocol TEXT,
service TEXT,
software TEXT,
version TEXT,
banner TEXT,
update_time TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS http_servers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
asset_id INTEGER,
http_status INTEGER,
title TEXT,
update_time TEXT,
FOREIGN KEY(asset_id) REFERENCES assets(id)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS https_servers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
asset_id INTEGER,
tls_version TEXT,
cipher TEXT,
server_name TEXT,
dns_name TEXT,
certificate_authority TEXT,
sha256_fingerprint TEXT,
sha256_openssl_fingerprint TEXT,
update_time TEXT,
FOREIGN KEY(asset_id) REFERENCES assets(id)
)
''')
conn.commit()
conn.close()
init_db()
第三步:扫描网络资产
- 使用Nmap扫描主机和端口:
import nmap
import datetime
def scan_network_assets(host):
nm = nmap.PortScanner()
nm.scan(host, '1-65535') # 扫描所有端口
results = []
for host in nm.all_hosts():
for proto in nm[host].all_protocols():
ports = nm[host][proto].keys()
for port in ports:
data = {
'host': host,
'port': port,
'protocol': proto,
'service': nm[host][proto][port]['name'],
'software': nm[host][proto][port].get('product', ''),
'version': nm[host][proto][port].get('version', ''),
'banner': nm[host][proto][port].get('extrainfo', ''),
'update_time': datetime.datetime.now().isoformat()
}
results.append(data)
return results
# Example usage:
# results = scan_network_assets('192.168.1.1')
# print(results)
第四步:检查HTTP状态
import requests
def check_http_server(host, port):
try:
response = requests.get(f'http://{host}:{port}', timeout=5)
status = response.status_code
title = response.headers.get('Server', '')
return {
'http_status': status,
'title': title
}
except requests.RequestException:
return None
# Example usage:
# http_info = check_http_server('192.168.1.1', 80)
# print(http_info)
第五步:获取HTTPS信息
import ssl
import socket
def get_https_info(host, port):
context = ssl.create_default_context()
conn = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=host)
try:
conn.connect((host, port))
cert = conn.getpeercert()
tls_info = {
'tls_version': conn.version(),
'cipher': conn.cipher(),
'server_name': host,
'dns_name': cert.get('subjectAltName', ''),
'certificate_authority': cert.get('issuer', ''),
'sha256_fingerprint': ssl.DER_cert_to_PEM_cert(conn.getpeercert(True)).hex(),
'sha256_openssl_fingerprint': conn.getpeercert(True).hex()
}
return tls_info
except Exception as e:
return None
finally:
conn.close()
# Example usage:
# https_info = get_https_info('example.com', 443)
# print(https_info)
第六步:整合所有数据并存储到数据库
在此步骤中,我们可以将所得数据整合,并存储到SQLite数据库。
def insert_into_db(asset_data, http_data=None, https_data=None):
conn = sqlite3.connect('network_assets.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO assets (host, port, protocol, service, software, version, banner, update_time)
VALUES (:host, :port, :protocol, :service, :software, :version, :banner, :update_time)
''', asset_data)
asset_id = cursor.lastrowid
if http_data:
cursor.execute('''
INSERT INTO http_servers (asset_id, http_status, title, update_time)
VALUES (?, ?, ?, ?)
''', (asset_id, http_data['http_status'], http_data['title'], asset_data['update_time']))
if https_data:
cursor.execute('''
INSERT INTO https_servers (asset_id, tls_version, cipher, server_name, dns_name, certificate_authority,
sha256_fingerprint, sha256_openssl_fingerprint, update_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (asset_id, https_data['tls_version'], https_data['cipher'][0], https_data['server_name'],
str(https_data['dns_name']), str(https_data['certificate_authority']),
https_data['sha256_fingerprint'], https_data['sha256_openssl_fingerprint'],
asset_data['update_time']))
conn.commit()
conn.close()
# Example usage:
# for asset in results:
# http_info = check_http_server(asset['host'], asset['port'])
# https_info = None
# if 'https' in asset['service']:
# https_info = get_https_info(asset['host'], asset['port'])
# insert_into_db(asset, http_info, https_info)
最后一步:运行整体系统
为了运行这个系统,你需要为特定的资产调用这些功能——扫描每个主机,检查其HTTP状态,获取HTTPS信息,并将其结果存储到数据库中。结合上面的所有代码段,你可以逐步执行扫描和存储操作。
这个系统简要展示如何使用Python和几个库去架构一个简单的网络资产扫描系统。需要实际使用时,可能还需要处理更多的异常和边界条件以确保系统的健壮性。