-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathHyxTalker.py
171 lines (131 loc) · 5.29 KB
/
HyxTalker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import os
from socket import socket, AF_UNIX, SOCK_STREAM
from struct import pack, unpack
from subprocess import Popen
from Constants import hyx_path, runargs, socketname, UPD_FROMPAULA_INSERT, MSG_FROMPAULA
from logging2 import debug
from utilsFolder.HeapClass import Heap
from utilsFolder.PaulaPoll import PaulaPoll
SZ_SIZET = 8
class HyxTalker():
def __init__(self, heapobj: Heap, poll: PaulaPoll):
self.rootsock = socket(AF_UNIX, SOCK_STREAM)
self._socketname = socketname
try:
os.unlink(socketname)
except FileNotFoundError:
pass
self.rootsock.bind(socketname)
self.heap = heapobj
self.hyxprocess = None
self.hyxsock = None
self.poll = poll
self.launchHyx(heapobj)
def launchHyx(self, heapobj: Heap):
"""open a segment with Hyx. You can specify the permissions of the segment, default is rwp.
You can use slicing syntax, [1:-3] will open the segment starting with an offset of 0x1000
You can also trim the segment to start at the first page that has some non-zero bytes in it.
In hyx, you can
:!fork cool_child execute commands just like in forkever itself
:f free address where your cursor is at
:1234 jump to offset 1234 (relative to start of opened file)
use hyx --help for more info
Example use:
hyx heap [2:]
hyx stack [i:i]
hyx libc rp
"""
# this docstring shouldnt really be here, but i dont want the
# helper to import from inputhandler to avoid import an loop
def argsStr(args):
return "".join(arg + " " for arg in args)
filepath = heapobj.newHeapfile()
offset = heapobj.start
# prepare args
args = [hyx_path, "-offset", hex(offset), "-socket", self._socketname, filepath]
if runargs:
# print(argsStr(args))
args = runargs + args
self.hyxprocess = Popen(args)
else:
# pref = ["gdb -ex \"b updater.c:requestCommandPaula\" --args"]
pref = []
print(argsStr(pref + args)) # incase spawning new window isnt possible
self.rootsock.listen(1)
self.hyxsock, _ = self.rootsock.accept()
self.poll.register(self.getSockFd(), "hyx")
def getSockFd(self):
return self.hyxsock.fileno()
def sendUpdates(self, tuplelist):
def makeChangeStruct(start, data):
ret = pack("<I", start)
ret += pack("<I", len(data))
ret += data
return ret
code = b"\x01"
code += pack("<I", len(tuplelist))
code += b"".join(makeChangeStruct(start, data) for (start, data) in tuplelist)
return self.hyxsock.send(code)
def sendNewHeap(self, oldstart, oldstop):
if self.heap.start != oldstart:
raise NotImplementedError
if self.heap.stop < oldstop:
raise NotImplementedError
# replace old heap with new heap
self.hyxsock.send(UPD_FROMPAULA_INSERT)
length = self.heap.stop - self.heap.start
self.hyxsock.send(pack("<Q", length))
ret = self.hyxsock.send(self.heap.heapbytes)
debug("sent %#x bytes" % ret)
debug("heapbytes len= %x" % len(self.heap.heapbytes))
def getUpdate(self, isNextByte=False):
"""receive the changes made in Hyx and write them to Memory"""
sock = self.hyxsock
if not isNextByte:
pos = unpack("<Q", sock.recv(SZ_SIZET))[0]
self.nextpos = pos
else:
pos = self.nextpos
length = unpack("<Q", sock.recv(SZ_SIZET))[0]
self.nextpos += length
data = sock.recv(length)
assert len(data) == length
self.heap.writeUpdates(pos, data)
return pos, data
def updateHyx(self):
"""checks if the heap has changed. If yes, forward the changes to Hyx"""
changetype, changeret = self.heap.checkChange()
if changetype == "same":
pass
elif changetype == "length":
abort = self.heap.args.start_nonzero or self.heap.args.stop_nonzero
if abort:
raise NotImplementedError("heapsize changed, "
"but you are only viewing a slice")
self.sendNewHeap(*changeret)
elif changetype == "bytes":
self.sendUpdates(changeret)
else:
raise NotImplementedError
def recvCommand(self):
"""receive a :!command from Hyx"""
cmd = bytearray(0x100)
assert self.hyxsock.recv_into(cmd) == 0x100
replace = cmd.find(b"\x00")
cmd[replace:replace + 1] = b" " # strtok used in hyx replaced the first " " with a nullbyte
end = cmd.find(b"\x00")
return cmd[:end].decode()
def sendCommandResponse(self, cmd):
if isinstance(cmd, str):
cmd = cmd.encode()
assert isinstance(cmd, bytes)
cmd = cmd[:0x100]
self.hyxsock.send(cmd.ljust(0x100, b"\x00"))
def sendMessage(self, cmd):
self.hyxsock.send(MSG_FROMPAULA)
self.sendCommandResponse(cmd)
def destroy(self, rootsock=False):
self.poll.unregister(self.getSockFd())
self.hyxsock.close()
if rootsock:
self.rootsock.close()