blob: d2b4fb5704b5ce122164c53927982371f48fa897 [file] [log] [blame]
Kelvin Zhang35cff4f2021-12-08 16:06:00 -08001# import mmap
2
3import struct
4
5LZ4_FRAME_MAGIC = b"\x04\x22\x4D\x18"
6
7
8def scan_legacy_lz4_frames(data):
9 LZ4_LEGACY_FRAME_MAGIC = b"\x02\x21\x4C\x18"
10 index = 0
11 while index < len(data):
12 try:
13 index = data.index(LZ4_LEGACY_FRAME_MAGIC, index)
14 print("Legacy Lz4 frame at {}".format(index))
15 index += 4
16 while index < len(data):
17 magic = data[index:index+4]
18 if magic == LZ4_LEGACY_FRAME_MAGIC or magic == LZ4_FRAME_MAGIC:
19 break
20 (csize,) = struct.unpack("<L", magic)
21 if index + 4 + csize >= len(data) or csize == 0:
22 break
23 print("Legacy lz4 block at {}, compressed data size {}".format(index, csize))
24 index += csize
25
26 except ValueError:
27 break
28
29
30def scan_lz4_frames(data):
31 index = 0
32 while index < len(data):
33 try:
34 index = data.index(LZ4_FRAME_MAGIC, index)
35 frame_offset = index
36 index += 4
37 flag = data[index]
38 block_descriptor = data[index+1]
39 block_checksum_present = flag & 0x10 != 0
40 content_size_present = flag & 0x8 != 0
41 content_checksum_present = flag & 0x4 != 0
42 dictionary_id = flag & 0x1 != 0
43 index += 2
44 content_size = None
45 if content_size_present:
46 content_size = struct.unpack("<Q", data[index:index+8])
47 index += 8
48 if dictionary_id:
49 dictionary_id = struct.unpack("<L", data[index:index+4])
50 index += 4
51 header_checksum = data[index:index+1]
52 index += 1
53 print("Lz4 frame at {}, content size: {}".format(
54 frame_offset, content_size))
55 while index < len(data):
56 (block_size,) = struct.unpack("<L", data[index:index+4])
57 uncompressed = block_size & 0x80000000 != 0
58 block_size &= 0x7FFFFFFF
59 index += 4
60 index += block_size
61 if index >= len(data) or block_size == 0:
62 break
63 print("Block uncompressed: {}, size: {}".format(uncompressed, block_size))
64 except ValueError:
65 break
66
67
68def main(argv):
69 if len(argv) != 2:
70 print("Usage:", argv[0], "<path to a file>")
71 return 1
72 path = argv[1]
73
74 with open(path, "rb") as fp:
75 data = fp.read()
76 scan_legacy_lz4_frames(data)
77 scan_lz4_frames(data)
78
79
80if __name__ == '__main__':
81 import sys
82 sys.exit(main(sys.argv))