魔改的原因是某次在精简大概 2w+的样本时候,程序跑到最后一个文件崩溃了…这不仅要浪费大量时间去再次精简,对于我本人的心情也影响极大,所以抽了一点时间对该工具进行了魔改,魔改的也非常简单,但是基本解决了我目前所碰到的问题。

魔改后支持如下:

  1. 支持 python3
  2. 增加 output.txt crash_files.txt hang_files.txt 分别记录输出文件、crash 文件、 hang 文件
  3. 增加 -move 命令,用以移动文件,我预想的场景是:精简过程中程序卡死,我们可以把已经筛选过得文件剔除出来,会节约很多时间。但是相应的精简的效果可能会对比正常结束略差,我们进行二次精简就可以了。
  4. 增加 -copy 命令,用以复制文件。

大致更改的代码如下:

https://github.com/0xfocu5/tools/blob/main/my-cmin.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
--- winafl-cmin.py	2021-05-18 15:02:53.000000000 +0800
+++ my-cmin.py 2021-05-20 11:54:55.000000000 +0800
@@ -286,6 +286,7 @@
'--skip-dry-run', action = 'store_true', default = False,
help = 'Skip the dry-run step even if it failed'
)
+
parser.add_argument(
'target_cmdline', nargs = argparse.REMAINDER,
help = 'target command line'
@@ -305,10 +306,46 @@
# Now we can copy the file to destination
shutil.copy(filepath, new_dest)

+def MvArgsparse():
+ parser = argparse.ArgumentParser(description='Move setting')
+ parser.add_argument(
+ '-mv', '--move', action = 'append', required = True,
+ metavar = 'dir', help = 'Move files to output folder.'
+ ' Mostly used for reuse after the process is stuck.'
+ )
+ parser.add_argument(
+ '-o', '--output', required = True,
+ metavar = 'dir', help = 'output directory for files.'
+ 'Absolute path is better.'
+ )
+ return parser.parse_args()
+
+def CpArgsparse():
+ parser = argparse.ArgumentParser(description='Copy setting')
+ parser.add_argument(
+ '-cp', '--copy', action = 'append', required = True,
+ metavar = 'dir', help = 'Copy files to output folder.'
+ ' Mostly used for reuse after the process is stuck.'
+ )
+ parser.add_argument(
+ '-o', '--output', required = True,
+ metavar = 'dir', help = 'output directory for files.'
+ 'Absolute path is better.'
+ )
+ return parser.parse_args()
+
+def check_file(file):
+ if os.path.isfile(file):
+ print('[+] %s has been deleted, new file will be generated.' %file)
+ command = "del "
+ command = command + file
+ os.system(command)
+
def main(argc, argv):
- print 'corpus minimization tool for WinAFL by <0vercl0k@tuxfamily.org>'
- print 'Based on WinAFL by <ifratric@google.com>'
- print 'Based on AFL by <lcamtuf@google.com>'
+ print('corpus minimization tool for WinAFL by <0vercl0k@tuxfamily.org>')
+ print('Based on WinAFL by <ifratric@google.com>')
+ print('Based on AFL by <lcamtuf@google.com>')
+ print('Changed by 0xfocu5 <o0xfocu5@gmail.com>')

logging.basicConfig(
filename = 'winafl-cmin.log',
@@ -316,7 +353,48 @@
format = '%(asctime)s [%(levelname)-5.5s] [%(funcName)s] %(message)s'
)

+ if (argv[1] == '-mv' or argv[1] == "-move"):
+ args = MvArgsparse()
+ file = "".join(args.move)
+ if os.path.isfile(file) is False:
+ print('[!] The argument of \'- move\' %s is not a file. Or it doesn\'t exist' %(file))
+ return 1
+ if os.path.isdir(args.output):
+ print('[!] %s already exists, please remove it to avoid data loss.' %(args.output))
+ return 1
+
+ path = os.getcwd()+"\\"+str(args.output)
+ os.mkdir(args.output)
+ for line in open(file):
+ line = line.strip("\n")
+ print("Move %s to %s" %(line, path))
+ shutil.move(line, path)
+ return 0
+
+ elif(argv[1] == '-cp' or argv[1] == "-copy"):
+ args = CpArgsparse()
+ file = "".join(args.copy)
+ if os.path.isfile(file) is False:
+ print('[!] The argument of \'- move\' %s is not a file. Or it doesn\'t exist' %(file))
+ return 1
+ if os.path.isdir(args.output):
+ print('[!] %s already exists, please remove it to avoid data loss.' %(args.output))
+ return 1
+ output = []
+ for line in open(file):
+ line = line.strip("\n")
+ output.append(line)
+ path = os.getcwd()+"\\"+str(args.output)
+ os.mkdir(args.output)
+ for file_path in output:
+ print("Copy %s to %s" %(file_path, path))
+ do_unique_copy(file_path, args.output)
+ return 0
+
args = setup_argparse()
+ check_file("output.txt")
+ check_file("crash_files.txt")
+ check_file("hang_files.txt")
cli_handler = logging.StreamHandler(sys.stdout)
cli_handler.setLevel(args.verbose)
logging.getLogger().addHandler(cli_handler)
@@ -419,7 +497,7 @@
# Do a dry run with the first file in the set
logging.info('[*] Testing the target binary...')
f = AFLShowMapWorker(args)
- results = map(f, (inputs[0], inputs[0]))
+ results = list(map(f, (inputs[0], inputs[0])))
if results[0] != results[1]:
logging.error('[!] Dry-run failed, 2 executions resulted differently:')
logging.error(
@@ -435,7 +513,6 @@
return 1

@@ -497,12 +571,10 @@

# Counter tracking how many files we have been through already.
i = 1
- for result in p.imap_unordered(
- AFLShowMapWorker(args),
- inputs
- ):
- print '\rProcessing file %d/%d...' % (i, inputs_len),
+ for result in p.imap_unordered(AFLShowMapWorker(args), inputs):
+ print('\rProcessing file %d/%d...' % (i, inputs_len)),
i += 1
+
# If the set of tuples is empty, something weird happened
if len(result.tuples) == 0:
logging.debug(
@@ -516,12 +588,15 @@
if result.returncode != wanted_returncode:
if result.returncode == 1:
hang_files.append(result.path)
-
+ with open('hang_files.txt', 'a+') as f:
+ f.write(result.path+"\n")
# If the mode crash only is enabled, we track the non-crashing
# test cases in the same tuple.
if (result.returncode == 2 and args.crash_only is False) or \
(result.returncode == 0 and args.crash_only):
crash_files.append(result.path)
+ with open('crash_files.txt', 'a+') as f:
+ f.write(result.path+"\n")

if args.crash_only is False:
logging.debug(
@@ -539,14 +614,13 @@
continue

totalsize += result.filesize
-
# Generate the list of unique tuples while processing the results,
# also keep track of their popularities.
uniq_tuples.update(result.tuples.keys())

# Keep an updated dictionary mapping a tuple to the fittest file
# of all the paths.
- for tuple_id, tuple_hitcount in result.tuples.iteritems():
+ for tuple_id, tuple_hitcount in result.tuples.items():
fileinfo = {
'size' : result.filesize,
'path' : result.path,
@@ -573,6 +647,10 @@
else:
candidates[tuple_id] = fileinfo

+ with open('output.txt', 'a+') as f:
+ f.write(result.path+"\n")
+
+
len_crash_files, len_hang_files, len_empty_tuple_files = map(
len, (crash_files, hang_files, empty_tuple_files)
)
@@ -586,6 +664,7 @@
'[+] Found %d unique tuples across %d files',
len_uniq_tuples, effective_len
)
+
if len_hang_files > 0:
logging.info(' - %d files triggered a hang', len_hang_files)
for hang_file in hang_files:
@@ -609,7 +688,6 @@
logging.debug(' - %s generated an empty tuple', empty_tuple_file)

logging.info('[*] Finding best candidates for each tuple...')
-
# Using the same strategy than in afl-cmin, quoting lcamtuf:
# '''
# The "best" part is understood simply as the smallest input that
@@ -631,7 +709,7 @@

# Remove the other tuples also exercised by the candidate
# from the remaining_tuples list.
- for tuple_exercised in candidate['tuples'].iterkeys():
+ for tuple_exercised in candidate['tuples'].keys():
# Remove the tuples exercised if we have not
# removed them already from the
# remaining_tuples list.
@@ -640,15 +718,16 @@

# Keep track of the final minset and its size.
minset.append(candidate['path'])
+
minsetsize += candidate['size']

# We are now done with this tuple, we can get rid of it.
del candidates[tuple_]

- print '\rProcessing tuple %d/%d...' % (
+ print('\rProcessing tuple %d/%d...' % (
len_uniq_tuples - len(remaining_tuples),
len_uniq_tuples
- ),
+ )),

# If we don't have any more tuples left, we are done.
if len(remaining_tuples) == 0:
@@ -670,10 +749,14 @@
'[*] Saving the minset in %s...', os.path.abspath(args.output)
)
os.mkdir(args.output)
+ os.system("del output.txt")
for file_path in minset:
+ with open('output.txt', 'a+') as f:
+ f.write(file_path+"\n")
do_unique_copy(file_path, args.output)

logging.info('[+] Time elapsed: %d seconds', time.time() - t0)
+
return 0

if __name__ == '__main__':