0%

angr deflat源码分析

  • angr ollvm对抗脚本分析


前言

Github

https://github.com/cq674350529/deflat

Use

python deflat.py -f samples/bin/check_passwd_x8664_flat –addr 0x400530

Note

控制流平坦化混淆的本质是:

  1. 区分出真实块和虚假块

  2. 把虚假块nop掉,再把真实块的连接关系梳理出来

  3. 真实块的逻辑关系主要是顺序和分支

接下来分析源码

deflat.py

def main

参数解析,CFG转换

  • 拿到解混淆的文件和起始分析地址【给定地址找不到的话会拿基地址】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 参数解析
parser = argparse.ArgumentParser(description="deflat control flow script")
parser.add_argument("-f", "--file", help="binary to analyze")
parser.add_argument(
"--addr", help="address of target function in hex format")
args = parser.parse_args()

if args.file is None or args.addr is None:
parser.print_help()
sys.exit(0)

filename = args.file # 获取文件名
start = int(args.addr, 16) # 获取目标函数地址并转换为整数

project = angr.Project(filename, load_options={'auto_load_libs': False}) # 加载二进制文件
# 进行控制流图分析,normalize选项用于避免块重叠,force_complete_scan选项用于避免可能的错误块
cfg = project.analyses.CFGFast(normalize=True, force_complete_scan=False)
base_addr = project.loader.main_object.mapped_base >> 12 << 12 # 计算基地址
target_function = cfg.functions.get(start) # 获取目标函数
if target_function is None:
target_function = cfg.kb.functions.get_by_addr(base_addr + start) # 如果未找到,尝试使用基地址

# 将控制流图转换为超级图,类似于IDA Pro的CFG
supergraph = am_graph.to_supergraph(target_function.transition_graph)

代码块分类

  • 找到序言、主分发器、return块

  • 然后把相关块和nop块区分出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
prologue_node = None
for node in supergraph.nodes():
if supergraph.in_degree(node) == 0:
prologue_node = node # 入度为0的节点是prologue节点
if supergraph.out_degree(node) == 0 and len(node.out_branches) == 0:
retn_node = node # 出度为0且没有分支的节点是retn节点

if prologue_node is None or prologue_node.addr not in [start, base_addr + start]:
print("Something must be wrong...") # 如果未找到prologue节点,则报错并退出
sys.exit(-1)

main_dispatcher_node = list(supergraph.successors(prologue_node))[0] # 获取主分发器节点
for node in supergraph.predecessors(main_dispatcher_node):
if node.addr != prologue_node.addr:
pre_dispatcher_node = node # 找主分发器的前驱
break

relevant_nodes, nop_nodes = get_relevant_nop_nodes(
supergraph, pre_dispatcher_node, prologue_node, retn_node) # 获取相关节点和NOP节点

print('*******************relevant blocks************************')
print('prologue: %#x' % prologue_node.addr)
print('main_dispatcher: %#x' % main_dispatcher_node.addr)
print('pre_dispatcher: %#x' % pre_dispatcher_node.addr)
print('retn: %#x' % retn_node.addr)
relevant_block_addrs = [node.addr for node in relevant_nodes]
print('relevant_blocks:', [hex(addr) for addr in relevant_block_addrs])

print('*******************symbolic execution*********************')

梳理真实块逻辑

遍历代码块 –> 所有的指令

通过符号执行得到真实要跳转执行的地址(代码块)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
relevants = relevant_nodes
relevants.append(prologue_node)
# 排除retn块的相关代码块
relevants_without_retn = list(relevants)
relevants.append(retn_node)
# 所有相关块
relevant_block_addrs.extend([prologue_node.addr, retn_node.addr])

flow = defaultdict(list)
patch_instrs = {}
for relevant in relevants_without_retn: # 遍历所有相关块
print('-------------------dse %#x---------------------' % relevant.addr)
block = project.factory.block(relevant.addr, size=relevant.size)
has_branches = False
hook_addrs = set([]) # 存放带跳转的指令
for ins in block.capstone.insns: # 遍历指令
if project.arch.name in ARCH_X86:
if ins.insn.mnemonic.startswith('cmov'):
# 如果指令是cmovx,则记录该指令
if relevant not in patch_instrs:
patch_instrs[relevant] = ins
has_branches = True
elif ins.insn.mnemonic.startswith('call'):
hook_addrs.add(ins.insn.address) # 如果指令是call,则记录钩子地址
elif project.arch.name in ARCH_ARM:
if ins.insn.mnemonic != 'mov' and ins.insn.mnemonic.startswith('mov'):
if relevant not in patch_instrs:
patch_instrs[relevant] = ins
has_branches = True
elif ins.insn.mnemonic in {'bl', 'blx'}:
hook_addrs.add(ins.insn.address)
elif project.arch.name in ARCH_ARM64:
if ins.insn.mnemonic.startswith('cset'):
if relevant not in patch_instrs:
patch_instrs[relevant] = ins
has_branches = True
elif ins.insn.mnemonic in {'bl', 'blr'}:
hook_addrs.add(ins.insn.address)

# 符号执行,拿到真实要跳转执行的地址
if has_branches:
tmp_addr = symbolic_execution(project, relevant_block_addrs,
relevant.addr, hook_addrs, claripy.BVV(1, 1), True)
if tmp_addr is not None:
flow[relevant].append(tmp_addr)
tmp_addr = symbolic_execution(project, relevant_block_addrs,
relevant.addr, hook_addrs, claripy.BVV(0, 1), True)
if tmp_addr is not None:
flow[relevant].append(tmp_addr)
else:
tmp_addr = symbolic_execution(project, relevant_block_addrs,
relevant.addr, hook_addrs)
if tmp_addr is not None:
flow[relevant].append(tmp_addr)

patch字节码

  1. nop无关块

  2. 改真实跳转

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
print('************************flow******************************')
for k, v in flow.items():
print('%#x: ' % k.addr, [hex(child) for child in v]) # 打印执行流信息

print('%#x: ' % retn_node.addr, [])

print('************************patch*****************************')
with open(filename, 'rb') as origin:
# 读取原始二进制文件
origin_data = bytearray(origin.read())
origin_data_len = len(origin_data)

recovery_file = filename + '_recovered' # 创建恢复文件名
recovery = open(recovery_file, 'wb') # 打开恢复文件

# 无关代码块NOP
for nop_node in nop_nodes:
fill_nop(origin_data, project.loader.main_object.addr_to_offset(nop_node.addr),
nop_node.size, project.arch)

# 真实块逻辑重组
for parent, childs in flow.items():
if len(childs) == 1:
# 如果两个代码块是一对一关系,patch为直接跳转
parent_block = project.factory.block(parent.addr, size=parent.size)
last_instr = parent_block.capstone.insns[-1]
file_offset = project.loader.main_object.addr_to_offset(last_instr.address)
if project.arch.name in ARCH_X86:
fill_nop(origin_data, file_offset,
last_instr.size, project.arch)
patch_value = ins_j_jmp_hex_x86(last_instr.address, childs[0], 'jmp')
elif project.arch.name in ARCH_ARM:
patch_value = ins_b_jmp_hex_arm(last_instr.address, childs[0], 'b')
if project.arch.memory_endness == "Iend_BE":
patch_value = patch_value[::-1]
elif project.arch.name in ARCH_ARM64:
if parent.addr in [start, base_addr + start]:
file_offset += 4
patch_value = ins_b_jmp_hex_arm64(last_instr.address+4, childs[0], 'b')
else:
patch_value = ins_b_jmp_hex_arm64(last_instr.address, childs[0], 'b')
if project.arch.memory_endness == "Iend_BE":
patch_value = patch_value[::-1]
patch_instruction(origin_data, file_offset, patch_value)
else:
# 如果是一对多关系,则修补为条件跳转
instr = patch_instrs[parent]
file_offset = project.loader.main_object.addr_to_offset(instr.address)
block_end_offset = project.loader.main_object.addr_to_offset(parent.addr + parent.size)
fill_nop(origin_data, file_offset, block_end_offset - file_offset, project.arch)
if project.arch.name in ARCH_X86:
patch_value = ins_j_jmp_hex_x86(instr.address, childs[0], instr.mnemonic[len('cmov'):])
patch_instruction(origin_data, file_offset, patch_value)

file_offset += 6
patch_value = ins_j_jmp_hex_x86(instr.address+6, childs[1], 'jmp')
patch_instruction(origin_data, file_offset, patch_value)
elif project.arch.name in ARCH_ARM:
bx_cond = 'b' + instr.mnemonic[len('mov'):]
patch_value = ins_b_jmp_hex_arm(instr.address, childs[0], bx_cond)
if project.arch.memory_endness == 'Iend_BE':
patch_value = patch_value[::-1]
patch_instruction(origin_data, file_offset, patch_value)

file_offset += 4
patch_value = ins_b_jmp_hex_arm(instr.address+4, childs[1], 'b')
if project.arch.memory_endness == 'Iend_BE':
patch_value = patch_value[::-1]
patch_instruction(origin_data, file_offset, patch_value)
elif project.arch.name in ARCH_ARM64:
bx_cond = instr.op_str.split(',')[-1].strip()
patch_value = ins_b_jmp_hex_arm64(instr.address, childs[0], bx_cond)
if project.arch.memory_endness == 'Iend_BE':
patch_value = patch_value[::-1]
patch_instruction(origin_data, file_offset, patch_value)

file_offset += 4
patch_value = ins_b_jmp_hex_arm64(instr.address+4, childs[1], 'b')
if project.arch.memory_endness == 'Iend_BE':
patch_value = patch_value[::-1]
patch_instruction(origin_data, file_offset, patch_value)

修改写回

1
2
3
4
5
# 写入
assert len(origin_data) == origin_data_len, "Error: size of data changed!!!" # 确保数据大小未改变
recovery.write(origin_data)
recovery.close()
print('Successful! The recovered file: %s' % recovery_file) # 打印成功信息

def get_relevant_nop_nodes

  • find 相关块和nop块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def get_relevant_nop_nodes(supergraph, pre_dispatcher_node, prologue_node, retn_node):
# 获取与控制流相关的代码块和NOP代码块
relevant_nodes = []
nop_nodes = []
for node in supergraph.nodes():
if supergraph.has_edge(node, pre_dispatcher_node) and node.size > 8:
# node和pre_dispatcher_node相连,则为相关块
relevant_nodes.append(node)
continue
if node.addr in (prologue_node.addr, retn_node.addr, pre_dispatcher_node.addr):
# 如果节点是prologue[inDegree=0]、retn[outDegree=0]或pre_dispatcher节点,则跳过
continue
nop_nodes.append(node) # 否则认为是NOP块
return relevant_nodes, nop_nodes

def symbolic_execution

  • 从指定地址开始进行符号执行,寻找可达的相关代码块地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def symbolic_execution(project, relevant_block_addrs, start_addr, hook_addrs=None, modify_value=None, inspect=False):
# 符号执行函数,用于分析控制流

def retn_procedure(state):
# 钩子函数,用于在特定地址返回
ip = state.solver.eval(state.regs.ip)
project.unhook(ip)
return

def statement_inspect(state):
# 语句检查函数,用于修改符号执行中的临时变量
expressions = list(
state.scratch.irsb.statements[state.inspect.statement].expressions)
if len(expressions) != 0 and isinstance(expressions[0], pyvex.expr.ITE):
state.scratch.temps[expressions[0].cond.tmp] = modify_value
state.inspect._breakpoints['statement'] = []

if hook_addrs is not None:
# 如果提供了钩子地址,则在相应地址设置钩子
skip_length = 4
if project.arch.name in ARCH_X86:
skip_length = 5

for hook_addr in hook_addrs:
project.hook(hook_addr, retn_procedure, length=skip_length)

state = project.factory.blank_state(addr=start_addr, remove_options={
angr.sim_options.LAZY_SOLVES}) # 创建初始状态
if inspect:
state.inspect.b(
'statement', when=angr.state_plugins.inspect.BP_BEFORE, action=statement_inspect) # 设置语句检查断点
sm = project.factory.simulation_manager(state) # 创建符号执行管理器
sm.step() # 开始符号执行
while len(sm.active) > 0:
for active_state in sm.active:
if active_state.addr in relevant_block_addrs:
return active_state.addr # 如果到达相关块地址,则返回该地址
sm.step()

return None

am_graph.py

util.py

  1. nop指令 –> fill_nop

  2. patch指令 –> patch_instruction

  3. 计算跳转指令hex

  4. 计算文件md5 –> calc_md5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/env python3

import struct
import hashlib

# 定义不同架构的常量
ARCH_X86 = {"X86", "AMD64"}
ARCH_ARM = {"ARMEL", "ARMHF"}
ARCH_ARM64 = {'AARCH64'}

# 定义不同架构的指令操作码
OPCODES = {
'x86':
{
'a': b'\x87', 'ae': b'\x83', 'b': b'\x82', 'be': b'\x86', 'c': b'\x82', 'e': b'\x84', 'z': b'\x84', 'g': b'\x8F', 'ge': b'\x8D', 'l': b'\x8C', 'le': b'\x8E', 'na': b'\x86', 'nae': b'\x82', 'nb': b'\x83', 'nbe': b'\x87', 'nc': b'\x83', 'ne': b'\x85', 'ng': b'\x8E', 'nge': b'\x8C', 'nl': b'\x8D', 'nle': b'\x8F', 'no': 'b\x81', 'np': b'\x8B', 'ns': b'\x89', 'nz': b'\x85', 'o': b'\x80', 'p': b'\x8A', 'pe': b'\x8A', 'po': b'\x8B', 's': b'\x88', 'nop': b'\x90', 'jmp': b'\xE9', 'j': b'\x0F'
},
'arm':
{
'nop': b'\x00\xF0\x20\xE3', 'b': b'\xEA', 'blt': b'\xBA', 'beq': b'\x0A', 'bne': b'\x1A', 'bgt': b'\xCA', 'bhi': b'\x8A', 'bls': b'\x9A', 'ble': b'\xDA', 'bge': b'\xAA'
},
'arm64':
{
'nop': b'\x1F\x20\x03\xD5', 'b': b'\x14', 'b_cond':{'eq': 0x0, 'ne': 0x1, 'hs': 0x2, 'lo': 0x3, 'mi': 0x4, 'pl': 0x5, 'vs': 0x6, 'vc': 0x7, 'hi': 0x8, 'ls': 0x9, 'ge': 0xA, 'lt': 0xB, 'gt':0xC, 'le':0xD}
}
}


# 在指定位置填充NOP指令
def fill_nop(data, start_addr, length, arch):
if arch.name in ARCH_X86:
# 对于x86架构[小端序],填充单字节NOP指令
for i in range(0, length):
data[start_addr + i] = ord(OPCODES['x86']['nop'])
elif arch.name in ARCH_ARM | ARCH_ARM64:
# 对于ARM或ARM64架构,填充4字节NOP指令
if arch.name in ARCH_ARM:
nop_value = OPCODES['arm']['nop']
else:
nop_value = OPCODES['arm64']['nop']

# 如果是大端序,反转NOP指令的字节顺序
if arch.memory_endness == "Iend_BE":
nop_value = nop_value[::-1]
for i in range(0, length, 4):
data[start_addr+i] = nop_value[0]
data[start_addr+i+1] = nop_value[1]
data[start_addr+i+2] = nop_value[2]
data[start_addr+i+3] = nop_value[3]


# patch指令
def patch_instruction(data, offset, value):
for i in range(len(value)):
data[offset+i] = value[i]


# 获取x86架构下跳转指令的机器码
def ins_j_jmp_hex_x86(cur_addr, target_addr, j_cond):
if j_cond == 'jmp':
j_opcode = OPCODES['x86']['jmp']
j_ins_size = 5
else:
j_opcode = OPCODES['x86']['j'] + OPCODES['x86'][j_cond]
j_ins_size = 6

# 计算跳转偏移量
jmp_offset = target_addr - cur_addr - j_ins_size
patch_ins_hex = j_opcode + struct.pack('<i', jmp_offset)
return patch_ins_hex


# 获取ARM架构下跳转指令的机器码
def ins_b_jmp_hex_arm(cur_addr, target_addr, b_cond):
b_offset = (target_addr - cur_addr - 4*2) // 4
patch_ins_hex = struct.pack('<i', b_offset)[:-1] + OPCODES['arm'][b_cond]
return patch_ins_hex


# 获取ARM64架构下跳转指令的机器码
def ins_b_jmp_hex_arm64(cur_addr, target_addr, b_cond):
if b_cond == 'b':
# 计算无条件跳转的偏移量
if cur_addr > target_addr:
patch_ins_hex = struct.pack('<I', ((0x14000000 | 0x03ffffff) - (cur_addr - target_addr - 4) // 4))
else:
patch_ins_hex = struct.pack('<I', ((0x14000000 & 0xfc000000) + (target_addr - cur_addr) // 4))
else:
# 计算条件跳转的偏移量
offset = (((target_addr - cur_addr) // 4) << 5) & 0x00ffffe0
# XXX: 对于ARM64,条件跳转需要使用相反的条件码
opcode = OPCODES['arm64']['b_cond'][b_cond.lower()]
if opcode % 2 == 0:
opcode += 1
else:
opcode -= 1
patch_ins_hex = struct.pack('<I', 0x54000000 | offset | opcode)
return patch_ins_hex


# 计算文件的MD5哈希值
def calc_md5(file):
return hashlib.md5(open(file,'rb').read()).hexdigest()