Дата и время публикации:
Проблема и решение
1. Суть проблемы
Заключается в том, что два объекта sys.stdout.buffer и Pipe(), который реализуется в модуле multiprocessing, неодназначно реализованы. Поэтому последний нужно приводить к виду и наборов свойств стандартного вывода (STDOUT), чтобы использовать в качестве "проброса". Иначе могут быть проблемы с использованием кода модуля pexpect, который показано в листинге 1.1
Листинг 1.1
... (outconn_a, outconn_b) = Pipe() ... child=Process(target=fbody,args=(self, outconn_b,)) ... def fbody ( outconn ) : ... pchild = pexpect.spawn('ssh -l %s %s'%(user, host)) pchild.logfile = outconn ...
Если оставить все как есть, можно схватить ошибку, связанной с различием в реализации multiprocessing.Pipe() и sys.stdout.buffer, как показано в дампе 1.2
Дамп 1.1
... Traceback (most recent call last): File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/home/user/Projects/gnome-example.py/gshell-autorun/GSPexpect.py", line 74, in run i = pchild.expect(...) File "/usr/lib/python3/dist-packages/pexpect/spawnbase.py", line 343, in expect return self.expect_list(compiled_pattern_list, File "/usr/lib/python3/dist-packages/pexpect/spawnbase.py", line 372, in expect_list return exp.expect_loop(timeout) File "/usr/lib/python3/dist-packages/pexpect/expect.py", line 169, in expect_loop incoming = spawn.read_nonblocking(spawn.maxread, timeout) File "/usr/lib/python3/dist-packages/pexpect/pty_spawn.py", line 501, in read_nonblocking return super(spawn, self).read_nonblocking(size) File "/usr/lib/python3/dist-packages/pexpect/spawnbase.py", line 182, in read_nonblocking self._log(s, 'read') File "/usr/lib/python3/dist-packages/pexpect/spawnbase.py", line 129, in _log self.logfile.write(s) AttributeError: 'Connection' object has no attribute 'write' ...
2. Решение
Будет состоять из нескольких шагов, первый из которых сравнение реализации Pipe() и sys.stdout.buffer(). Как пишет документация Python конечной реализацией является класс multiprocessing.connection [3.1], описание методов которого приводится в таблице 2.1
## | Метод назначения | Назначение |
---|---|---|
1 | send(obj) | Отправляет объект (obj) на другой конец соединения, который в свою очередь должен быть прочитан используя recv() . При этом рекомендуется, что размер объекта не должен превышать 32 MiB,а в случае превышения, уровень которого зависит от реализации ОС, может вызывать исключение ValueError |
2 | recv() | Возвращает отправленный объект (obj) с другого конца соединения с использованием send() . Блокируется до тех пор пока что-от не будет принято. Вызывает EOFError , в случае, если ничего не было принято, потому что было закрыто соединение на другом конце. |
3 | fileno() | Возвращает файловый дескриптор |
4 | close() | Закрывает соединение |
5 | poll([timeout]) | Возращает управление немедленно, если timeout=0.00 секунд или по истечению задержки, либо когда есть данные. |
6 | send_bytes(buffer[, offset[, size]]) | Отправляет байтовую последовательность в buffer, с учетом размера и смещения в нем. Где Buffer является io.BytesIO |
7 | recv_bytes([maxlength]) | Принимает байтовую последовательность с указанной максимальной длинной В случае , если maxlength указан и сообщение длинее чем значение указанное в нем, тогда будет возникать OSError, а соединение больше не будет читабильно. |
8 | recv_bytes_into(buffer[, offset]) | Прием байтовой последовательности внутри указанного буфера с учетом смещения в нем. Также как и в recv() и recv_bytes() возникать ошибка EOFError . |
Для чтения/записи бинарных данных из/в стандартные потоки, используется нижележащий объект buffer [3.2], который в свою очередь является экземпляром BufferedIOBase, что связан с TextIOBase, но не является его часть, потому что последний может не использоваться в некоторых решениях.
Соответственно, все потоки sys.stdin, sys.stdout, sys.stderr относят к нас к следующим атрибутам и методам, которые перечислены в таблице 2.2
## | Метод/атрибут назначения | Назначение |
---|---|---|
1 | Raw | Атрибут неформатированного потока. Следует избегать использования, т.к. может не использоваться в некоторых решениях. |
2 | detach() | Отделяет неформатированный поток от буфера и возвращает последний. |
3 | read(size=-1) | Читает и возвращает некоторую последовательность байтов, указанных размером в size.
Если
|
4 | write(b) | Записывает некоторую последовательность байтов объекта b и возвращает число записанных байт. При этом, это число должно согласоваться с байтовой длинной объекта b . Иначе, возникнет исключение OSError в случае ошибки записи. |
5 | readinto(b) | Читает последовательность байт в заранее выделенный объект b и возвращает число прочитанных байтов. Для примера объект b может быть создан с bytearray() |
В таблице 2.3 приведено соответствие методов базовых классов BufferedIOBase и multiprocessing.connection для их производных Pipe() и sys.[stdout|stdin|sderr].buffer()
## | Метод класса BufferedIOBase | Метод класса multiprocessing.connection | Назначение |
---|---|---|---|
1 | read(size=-1) | recv_bytes([maxlength]) | Чтение некоторой байтовой последовательности указанного размера в size. Байтовая последовательность возвращается методом read |
2 | write(b) | send_bytes(buffer[, offset[, size]]) | запись/передача некоторой байтовой последовательности с указанием размера и смещения внутри выделенной заранее памяти |
3 | readinto(b) | recv_bytes_into(buffer[, offset]) | Чтение/получение байтовой последовательности в заранее выделенную память |
Таким образом, как видно из таблицы 2.3, что методы read(), write() и close() класса BufferedIOBase сошлись только с recv_bytes(), send_bytes() и close() класса multiprocessing.connection.Connection
Поэтому все сводится к написанию надстроечного класса над UnifiedPipeConnection, пример определения и реализации которого приведен в листинге 2.4
Листинг 2.4
... class UnifiedPipeConnection(): ... def read(size=-1) : ... return b def write(b) : ... return def close() : ...
Нюанс заключается в том, что метод send() / recv(), как и write()/read(), оперируют с чисто байтовой последовательностью, возвращаемой bytes(). Также определяется метод close(), который может гипотетически быть задействован. В листинге 2.5 приводится код реализующий всю схему взаимодействия pexpect
Листинг 2.5
23 class UnifiedPipeConnection (): ... 35 def __init__(self,direction=True) : 36 if direction : 37 self.parent_conn, self.child_conn = Pipe(duplex=False) 38 else : 39 self.child_conn, self.parent_conn = Pipe(duplex=False) 40 self.dir = direction ... 41 def read(self, size=-1) : ... 59 def write(self,b) : ... 76 def _close(self) : ... 120 def forward_child(conn): 121 n=conn.write(b'hello world! I\'m writting in Python') 122 print ( 'Child: write %d bytes' % n ) 123 conn._close() 124 125 forward = UnifiedPipeConnection() 126 child = Process(target=forward_child, args=(forward,)) 127 child.start() 128 129 b=forward.read() 130 print(str(b)) 131 child.join() ... 146 def back_child(conn): 147 b=conn.read() 148 print ( 'Child: read %d bytes' % len(b) , ': ' + str(b) ) 149 conn._close() 150 151 backward = UnifiedPipeConnection(direction=False) 152 child = child(target=back_child, args=(backward,)) 153 child.start() 154 155 n=backward.write(b'And I can write backwards and forwards') 156 print ( 'Parent: write %d bytes' % n ) 157 process.join() ...
Как показано в строках 35-40 листинга класс UnifiedPipeConnection в своем конструкторе организуется направление передачи данных, которая производится через Pipe() по двум схемам :
- Когда direction=True данные передаются от родительского процесса Parent() к дочернему child() по прямой схеме обмена данными: Parent() ⇦ Pipe() ⇦ child();
- Когда direction=False данные передаются по обратной схеме обмена данными: child() ⇦ Pipe() ⇦ Parent()
Для прямой схемы обмена
- в child() для передачи байтовой последовательности задействуется метод write() , определенный в строкe 121 листинга 2.5
- в parent() для получения отправленной байтовой последовательности из child() используется метод read(), определенный в строкe 129 листинга 2.5
Для обратной схемы обмена
- в child() с использованием read() производится прием/чтение данных от parent(), как показано в строке 147 листинга 2.5
- в parent(), наоборот, пишет в обратный канал данные с помошью метода write(), как показано в строке 155 того же листинга
Метод write()
Детализация метода приведена в листинге 2.6
Листинг 2.5
... 59 def write(self,b) : 60 nbytes = 0 61 if type(b) == bytes : 62 bbuffer = bytes(b) 63 try : 64 if self.dir : 65 self.child_conn.send_bytes(bbuffer) 66 else : 67 self.parent_conn.send_bytes(bbuffer) 68 nbytes = len(bbuffer) 69 except ValueError as e: 70 print("Error {0}".format(str(e, encoding = 'utf-8'))) 71 raise ValueError(str(e, encoding = 'utf-8')) 72 else : 73 raise OSError('Object b isn\'t bytes-like objects!') 74 return nbytes ...
Из которого можно увидеть, что все операции производятся с байтовой последовательностью и что метод write(), как его тезка в случае определенных пойманных исключений передает их дальше в точку своего вызова . При нормальном завершении процедуры передачи данных, он вернет записанное число байт.
Метод read()
Детализация метода приведена в листинге 2.7
Листинг 2.5
... 41 def read(self, size=-1) : 42 __get = (lambda s: None if s < 1 else s ) 43 try: 44 if self.dir : 45 b = self.parent_conn.recv_bytes(maxlength=__get(size)) 46 else : 47 b = self.child_conn.recv_bytes(maxlength=__get(size)) 48 except EOFError as e1: 49 print("Error {0}".format(str(e1, encoding = 'utf-8'))) 50 raise EOFError(str(e1, encoding = 'utf-8')) 51 return "" 52 except OSError as e2: 53 print("Error {0}".format(str(e2, encoding = 'utf-8'))) 54 raise EOFError(str(e2, encoding = 'utf-8')) 55 return "" 56 return bytes(b) 57 58 """ Write the given bytes buffer to the IO stream/ 59 Return the number of bytes written, which is always the length of b in bytes.""" ...
Метод close()
реализация приведена в листинге 2.8
Листинг 2.8
... 76 def _close(self) : 77 if direction : 78 self.child_conn.close() 79 else: 80 self.parent_conn.close() ...
Из которого видно, что в основном этот метод будет использоваться в дочерних процессах, как было показано для обоих схем обмена данных в строке 123 и 129 листинга 2.5
Сопряжение STDOUT и Pipe()
Как это было показано в листинге 1.1, для сопряжение pexpect с Pipe() , который задействует STDOUT нужно создать Pipe() с помощь класса UnifiedPipeConnection канал, реализованный в виде объекта outconn в листинге 2.9
Листинг 2.8
... outconn = UnifiedPipeConnection() ... child=Process(target=fbody,args=(self, outconn,)) ... def fbody ( outconn ) : ... pchild = pexpect.spawn('ssh -l %s %s'%(user, host)) pchild.logfile = outconn ... i = pchild.expect([pexpect.TIMEOUT, SSH_NEWKEY, '[Pp]assword: ', '\\$|#|\\] > $']) ... def parent_reader() : while True : ... bmsg=outconn.read() if bmsg : print(bmsg)
В результате, как это было показано в дампе 1.2, дочернему процессу fbody() будет передана ссылка на объект outconn, а при выполнении pchild.expect() не явно будет вызван outconn.write(), соответственно в родительском процессе в функции parent_reader() в бесконечном цикле с помощью outconn.read() будет принята вся переданная последовательность байт.
Кроме того, дополнительно мне потребовалось решить проблему попытки обращения к методу flush(), как показано в дампе 2.10
Листинг 2.10
... File "/usr/lib/python3/dist-packages/pexpect/pty_spawn.py", line 501, in read_nonblocking return super(spawn, self).read_nonblocking(size) File "/usr/lib/python3/dist-packages/pexpect/spawnbase.py", line 182, in read_nonblocking self._log(s, 'read') File "/usr/lib/python3/dist-packages/pexpect/spawnbase.py", line 130, in _log self.logfile.flush() AttributeError: 'UnifiedPipeConnection' object has no attribute 'flush' ...
Как показано в листинге 2.11, это было решено путем отправки в канал символа конца передачи EOF(b'\x04'), тем самым извещая терминальное окно о конце передачи и необходимости сброса своих внутренних буферов.
Листинг 2.10
... 23 class UnifiedPipeConnection (): ... 121 def flush(self) : 122 bbuffer = bytes(b'\x04') 123 conn=(lambda d : self.child_conn if d else self.parent_conn)(self.dir) 124 conn.send_bytes( bbuffer ) # send EOF (End of File) 125 return ...
3. Библиография
3.1 Python. Class multiprocessing.connection.Connection
3.2 Python. System-specific parameters and functions
3.3 Python. Class io.TextIOBase
3.4 Python. class io.BufferedIOBase
3.5 Pycopy Documentation[pdf]