1、基於python原生組件的ping批量工具,不依賴系統ping.
2、默認情況下,一個CSegment scan cost3s左右.
3、The built-in structure can be modified,Reduce required dependencies
''' 基於python原生組件的ping批量工具,不依賴系統ping. python3 Ping-thread.py -l 10.111.111.166/24 # 掃描10.111.111.166/24 python3 Ping-thread.py -f ip.txt # 掃描 ip.txt內所有IP python3 Ping-thread.py -l 10.111.111.166/24 -t 5 # 使用5Thread scan python3 Ping-thread.py -f ip.txt -t 10 -o result.txt # 使用10Thread scanip.txt內所有IP並輸出到result.txt '''
import os
import sys
import argparse
import socket
import struct
import select
import time
import IPy
# https://www.t00ls.net/thread-61292-1-1.html
#線程組件
from concurrent.futures import ThreadPoolExecutor,as_completed
import time
class MyThreadPool():
def __init__(self, my_func,my_list,thread_num):
self.my_func = my_func
self.my_list = my_list
self.thread_num = thread_num
def start(self):
with ThreadPoolExecutor(max_workers=self.thread_num) as executor:
all_task = [executor.submit(self.my_func, (test)) for test in self.my_list]
for future in as_completed(all_task):
pass
# data = future.result()
# print('the result is {}'.format(data))
#ping組件
ICMP_ECHO_REQUEST = 8 # Platform specific
DEFAULT_TIMEOUT = 0.1
DEFAULT_COUNT = 4
class Pinger(object):
""" Pings to a host -- the Pythonic way"""
def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):
self.target_host = target_host
self.count = count
self.timeout = timeout
def do_checksum(self, source_string):
""" Verify the packet integritity """
sum = 0
max_count = (len(source_string)/2)*2
count = 0
while count < max_count:
val = source_string[count + 1]*256 + source_string[count]
sum = sum + val
sum = sum & 0xffffffff
count = count + 2
if max_count<len(source_string):
sum = sum + ord(source_string[len(source_string) - 1])
sum = sum & 0xffffffff
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def receive_pong(self, sock, ID, timeout):
""" Receive ping from the socket. """
time_remaining = timeout
while True:
start_time = time.time()
readable = select.select([sock], [], [], time_remaining)
time_spent = (time.time() - start_time)
if readable[0] == []: # Timeout
return
time_received = time.time()
recv_packet, addr = sock.recvfrom(1024)
icmp_header = recv_packet[20:28]
type, code, checksum, packet_ID, sequence = struct.unpack(
"bbHHh", icmp_header
)
if packet_ID == ID:
bytes_In_double = struct.calcsize("d")
time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]
return time_received - time_sent
time_remaining = time_remaining - time_spent
if time_remaining <= 0:
return
def send_ping(self, sock, ID):
""" Send ping to the target host """
target_addr = socket.gethostbyname(self.target_host)
my_checksum = 0
# Create a dummy heder with a 0 checksum.
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
bytes_In_double = struct.calcsize("d")
data = (192 - bytes_In_double) * "Q"
data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))
# Get the checksum on the data and the dummy header.
my_checksum = self.do_checksum(header + data)
header = struct.pack(
"bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
)
packet = header + data
sock.sendto(packet, (target_addr, 1))
def ping_once(self):
""" Returns the delay (in seconds) or none on timeout. """
icmp = socket.getprotobyname("icmp")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except socket.error as e:
if e.errno == 1:
# Not superuser, so operation not permitted
e.msg += "ICMP messages can only be sent from root user processes"
raise socket.error(e.msg)
except Exception as e:
print("Exception: %s" %(e))
my_ID = os.getpid() & 0xFFFF
self.send_ping(sock, my_ID)
delay = self.receive_pong(sock, my_ID, self.timeout)
sock.close()
return delay
def ping(self):
""" Run the ping process """
for i in range(self.count):
print ("Ping to %s..." % self.target_host,)
try:
delay = self.ping_once()
except socket.gaierror as e:
print ("Ping failed. (socket error: '%s')" % e[1])
break
if delay == None:
print ("Ping failed. (timeout within %ssec.)" % self.timeout)
else:
delay = delay * 1000
print("Get pong in %0.4fms" % delay)
def ping(host):
host = str(host)
pinger = Pinger(target_host=host)
delay = pinger.ping_once()
if delay == None:
print("Ping %s Failed, timed out for 2 seconds" % host)
else:
print("Ping %s = %s ms" % (host, round(delay * 1000, 4)))
alive.append(host)
# time.sleep(0.5)
def getFileType(file_path):
#判斷文件類型gbk、utf-8
FileType = "gbk"
try:
htmlf = open(file_path, 'r', encoding=FileType)
htmlf.read()
except UnicodeDecodeError:
FileType = "utf-8"
else:
htmlf.close()
return FileType
def readFile2List(filename):
result_list = []
#讀取文件到列表
FileType = getFileType(filename)
with open(filename,'r',encoding=FileType) as f:
for line in f.readlines():
linestr = line.strip()
if linestr!='':
result_list.append(linestr)
return result_list
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.description="Native MultiThreading Ping scan, does not dependent System Ping -- by NOVASEC"
parser.add_argument("-l", "--ipList", help="Specify the target IP segment , eg:192.168.1.1,192.168.0.1/24" , default=None)
parser.add_argument("-f", "--ipFile", help="Specify the target IP file, one IP per line , eg: ipfile.txt" , default=None)
parser.add_argument("-t", "--thread", help="Specify Ping thread" , type=int , default=5 )
parser.add_argument("-o", "--output", help="Specified result output file" , default= 'result.txt' )
args = parser.parse_args()
ipList = args.ipList
ipFile = args.ipFile
result= args.output
thread= args.thread
###########################
alive = []
if ipList != None or ipFile !=None :
if ipList != None:
ip_list = IPy.IP(ipList,make_net=1)
if ipFile != None:
ip_list = readFile2List(ipFile)
###########################
last_time = time.time()
thread_num = thread
myThreadPool = MyThreadPool(ping,ip_list,thread_num) #init
myThreadPool.start()
print('All Scan use time is {}'.format(time.time() - last_time)) #線程time is 3.0822317600250244
###########################
print('Found alive host :', len(alive))
if alive !=[]:
print('Write result to file :', result)
f_result=open(result , "w+")
f_result.writelines('\n'.join(alive))
f_result.close()
else:
print('Not Found alive host ')
###########################
else:
print('None Input target')
###########################
參考地址:https://mp.weixin.qq.com/s/Pi5eBkKc5fdMcadk1hRrgQ
What are the hidden holes in Python? On the rounding of round function in Python
After looking around, Gao Zan
Python uses pymysql to connect with MySQL to add, delete, change and query
List of articles One 、 instal