× К оглавлению На главную Об авторе

Дата и время публикации:    

Проблема и решение

1. Суть проблемы

Заключается в том, что два объекта sys.stdout.buffer и Pipe(), который реализуется в модуле multiprocessing, неодназначно реализованы. Поэтому последний нужно приводить к виду и наборов свойств стандартного вывода (STDOUT), чтобы использовать в качестве "проброса". Иначе могут быть проблемы с использованием кода модуля pexpect, который показано в листинге 1.1

Листинг 1.1

...
       (outconn_a, outconn_b) = Pipe()
...
       child=Process(target=fbody,args=(self, outconn_b,))
...
def  fbody ( outconn ) :
...
       pchild = pexpect.spawn('ssh -l %s %s'%(user, host))
       pchild.logfile = outconn
...

Если оставить все как есть, можно схватить ошибку, связанной с различием в реализации multiprocessing.Pipe() и sys.stdout.buffer, как показано в дампе 1.2

Дамп 1.1

...
Traceback (most recent call last):
  File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/user/Projects/gnome-example.py/gshell-autorun/GSPexpect.py", line 74, in run
    i = pchild.expect(...)
  File "/usr/lib/python3/dist-packages/pexpect/spawnbase.py", line 343, in expect
    return self.expect_list(compiled_pattern_list,
  File "/usr/lib/python3/dist-packages/pexpect/spawnbase.py", line 372, in expect_list
    return exp.expect_loop(timeout)
  File "/usr/lib/python3/dist-packages/pexpect/expect.py", line 169, in expect_loop
    incoming = spawn.read_nonblocking(spawn.maxread, timeout)
  File "/usr/lib/python3/dist-packages/pexpect/pty_spawn.py", line 501, in read_nonblocking
    return super(spawn, self).read_nonblocking(size)
  File "/usr/lib/python3/dist-packages/pexpect/spawnbase.py", line 182, in read_nonblocking
    self._log(s, 'read')
  File "/usr/lib/python3/dist-packages/pexpect/spawnbase.py", line 129, in _log
    self.logfile.write(s)
AttributeError: 'Connection' object has no attribute 'write'
...

2. Решение

Будет состоять из нескольких шагов, первый из которых сравнение реализации Pipe() и sys.stdout.buffer(). Как пишет документация Python конечной реализацией является класс multiprocessing.connection [3.1], описание методов которого приводится в таблице 2.1

Таблица 2.1
## Метод назначения Назначение
  1   send(obj) Отправляет объект (obj) на другой конец соединения, который в свою очередь должен быть прочитан используя recv() . При этом рекомендуется, что размер объекта не должен превышать 32 MiB,а в случае превышения, уровень которого зависит от реализации ОС, может вызывать исключение ValueError
  2   recv() Возвращает отправленный объект (obj) с другого конца соединения с использованием send() . Блокируется до тех пор пока что-от не будет принято. Вызывает EOFError , в случае, если ничего не было принято, потому что было закрыто соединение на другом конце.
  3   fileno() Возвращает файловый дескриптор
  4   close() Закрывает соединение
  5   poll([timeout]) Возращает управление немедленно, если timeout=0.00 секунд или по истечению задержки, либо когда есть данные.
  6   send_bytes(buffer[, offset[, size]]) Отправляет байтовую последовательность в buffer, с учетом размера и смещения в нем. Где Buffer является io.BytesIO
  7   recv_bytes([maxlength]) Принимает байтовую последовательность с указанной максимальной длинной  В случае , если maxlength указан и сообщение длинее чем значение указанное в нем, тогда будет возникать OSError, а соединение больше не будет читабильно.
  8   recv_bytes_into(buffer[, offset]) Прием байтовой последовательности внутри указанного буфера с учетом смещения в нем. Также как и в recv() и recv_bytes() возникать ошибка EOFError .

Для чтения/записи бинарных данных из/в стандартные потоки, используется нижележащий объект buffer [3.2], который в свою очередь является экземпляром BufferedIOBase, что связан с TextIOBase, но не является его часть, потому что последний может не использоваться в некоторых решениях.

Примечания. Также, есть один нюанс, который выражается в парности использования методов send()/recv() и send_bytes() / recv_bytes(). Таким образом, отправленная байтовая последовательность send_bytes(), может быть принята только recv_bytes(), а посланный объект obj методом send() может быть получен только recv(). Поэтому будьте внимательны, чтобы не ломать голову почему ничего не принимается и не тратить драгоценное время попусту.

Соответственно, все потоки sys.stdin, sys.stdout, sys.stderr относят к нас к следующим атрибутам и методам, которые перечислены в таблице 2.2

Таблица 2.2
## Метод/атрибут назначения Назначение
  1   Raw Атрибут неформатированного потока. Следует избегать использования, т.к. может не использоваться в некоторых решениях.
  2   detach() Отделяет неформатированный поток от буфера и возвращает последний.
  3   read(size=-1) Читает и возвращает некоторую последовательность байтов, указанных размером в size.
Если
  • size=-1, читает до упора, до самого конца (EOF). В случае отсутствия данных тут же возвращается, если поток уже находится на EOF;
  • аргумент имеет положительное значение будет прочитано столько байт сколько указано в size или до тех пор пока не будет достигнут EOF.
  4   write(b) Записывает некоторую последовательность байтов объекта b и возвращает число записанных байт. При этом, это число должно согласоваться с байтовой длинной объекта b . Иначе, возникнет исключение OSError в случае ошибки записи.
  5   readinto(b) Читает последовательность байт в заранее выделенный объект b и возвращает число прочитанных байтов. Для примера объект b может быть создан с bytearray()

В таблице 2.3 приведено соответствие методов базовых классов BufferedIOBase и multiprocessing.connection для их производных Pipe() и sys.[stdout|stdin|sderr].buffer()

Таблица 2.3
## Метод класса BufferedIOBase Метод класса multiprocessing.connection Назначение
  1   read(size=-1) recv_bytes([maxlength]) Чтение некоторой байтовой последовательности указанного размера в size. Байтовая последовательность возвращается методом read
  2   write(b) send_bytes(buffer[, offset[, size]]) запись/передача некоторой байтовой последовательности с указанием размера и смещения внутри выделенной заранее памяти
  3   readinto(b) recv_bytes_into(buffer[, offset]) Чтение/получение байтовой последовательности в заранее выделенную память

Таким образом, как видно из таблицы 2.3, что методы read(), write() и close() класса BufferedIOBase сошлись только с recv_bytes(), send_bytes() и close() класса multiprocessing.connection.Connection

Поэтому все сводится к написанию надстроечного класса над UnifiedPipeConnection, пример определения и реализации которого приведен в листинге 2.4

Листинг 2.4

...
class UnifiedPipeConnection():
...
      def read(size=-1) :
...
           return b

      def write(b) :
...
           return 

      def close() :
...

Нюанс заключается в том, что метод send() / recv(), как и write()/read(), оперируют с чисто байтовой последовательностью, возвращаемой bytes(). Также определяется метод close(), который может гипотетически быть задействован. В листинге 2.5 приводится код реализующий всю схему взаимодействия pexpect

Листинг 2.5

  23 class UnifiedPipeConnection ():
...
  35      def __init__(self,direction=True) :
  36          if direction : 
  37              self.parent_conn, self.child_conn = Pipe(duplex=False)
  38          else :
  39              self.child_conn, self.parent_conn = Pipe(duplex=False)   
  40          self.dir = direction
...
  41      def read(self, size=-1) :
...
  59      def write(self,b) : 
...
  76      def _close(self) :
... 
120    def forward_child(conn):
121        n=conn.write(b'hello world! I\'m writting in Python')
122        print ( 'Child: write %d bytes' % n )
123        conn._close()
124
125    forward = UnifiedPipeConnection()   
126    child = Process(target=forward_child, args=(forward,))
127    child.start()
128
129    b=forward.read() 
130    print(str(b))
131    child.join()
...
146    def back_child(conn):
147        b=conn.read() 
148        print ( 'Child: read %d bytes' % len(b) , ': ' + str(b) )       
149        conn._close()
150
151    backward = UnifiedPipeConnection(direction=False)   
152    child = child(target=back_child, args=(backward,))
153    child.start()
154    
155    n=backward.write(b'And I can write backwards and forwards')
156    print ( 'Parent: write %d bytes' % n )
157    process.join()                
...

Как показано в строках 35-40 листинга класс UnifiedPipeConnection в своем конструкторе организуется направление передачи данных, которая производится через Pipe() по двум схемам :

Для прямой схемы обмена

Для обратной схемы обмена

Метод write()

Детализация метода приведена в листинге 2.6

Листинг 2.5

...
  59     def write(self,b) : 
  60         nbytes = 0 
  61         if type(b) == bytes : 
  62            bbuffer = bytes(b)
  63            try : 
  64                  if self.dir :
  65                     self.child_conn.send_bytes(bbuffer)
  66                  else :
  67                     self.parent_conn.send_bytes(bbuffer)
  68                  nbytes = len(bbuffer)
  69            except ValueError as e:
  70                 print("Error {0}".format(str(e, encoding = 'utf-8')))
  71                 raise ValueError(str(e, encoding = 'utf-8'))
  72         else :
  73            raise OSError('Object b isn\'t bytes-like objects!')
  74         return nbytes
...

Из которого можно увидеть, что все операции производятся с байтовой последовательностью и что метод write(), как его тезка в случае определенных пойманных исключений передает их дальше в точку своего вызова . При нормальном завершении процедуры передачи данных, он вернет записанное число байт.

Метод read()

Детализация метода приведена в листинге 2.7

Листинг 2.5

...
  41 def read(self, size=-1) :
  42          __get = (lambda s: None if s < 1 else s )      
  43          try:
  44               if self.dir :
  45                  b = self.parent_conn.recv_bytes(maxlength=__get(size))
  46               else :
  47                 b = self.child_conn.recv_bytes(maxlength=__get(size))
  48         except EOFError as e1:
  49                  print("Error {0}".format(str(e1, encoding = 'utf-8')))
  50                  raise EOFError(str(e1, encoding = 'utf-8'))   
  51                  return ""
  52         except OSError as e2:
  53                  print("Error {0}".format(str(e2, encoding = 'utf-8')))
  54                  raise EOFError(str(e2, encoding = 'utf-8'))
  55                  return ""
  56         return bytes(b) 
  57
  58     """ Write the given bytes buffer to the IO stream/  
  59         Return the number of bytes written, which is always the length of b in bytes.""" 
...

Метод close()

реализация приведена в листинге 2.8

Листинг 2.8

...
  76     def _close(self) :
  77           if direction : 
  78               self.child_conn.close()
  79           else:
  80               self.parent_conn.close()
...

Из которого видно, что в основном этот метод будет использоваться в дочерних процессах, как было показано для обоих схем обмена данных в строке 123 и 129 листинга 2.5

Сопряжение STDOUT и Pipe()

Как это было показано в листинге 1.1, для сопряжение pexpect с Pipe() , который задействует STDOUT нужно создать Pipe() с помощь класса UnifiedPipeConnection канал, реализованный в виде объекта outconn в листинге 2.9

Листинг 2.8

...
outconn =  UnifiedPipeConnection()
...
child=Process(target=fbody,args=(self, outconn,))
...
def  fbody ( outconn ) :
...
       pchild = pexpect.spawn('ssh -l %s %s'%(user, host))
       pchild.logfile = outconn
...
       i = pchild.expect([pexpect.TIMEOUT, SSH_NEWKEY, '[Pp]assword: ', '\\$|#|\\] > $'])
...

def parent_reader() :
        while True :
...
              bmsg=outconn.read()
                          if  bmsg :
                              print(bmsg)

В результате, как это было показано в дампе 1.2, дочернему процессу fbody() будет передана ссылка на объект outconn, а при выполнении pchild.expect() не явно будет вызван outconn.write(), соответственно в родительском процессе в функции parent_reader() в бесконечном цикле с помощью outconn.read() будет принята вся переданная последовательность байт.

Кроме того, дополнительно мне потребовалось решить проблему попытки обращения к методу flush(), как показано в дампе 2.10

Листинг 2.10

...
 File "/usr/lib/python3/dist-packages/pexpect/pty_spawn.py", line 501, in read_nonblocking
    return super(spawn, self).read_nonblocking(size)
  File "/usr/lib/python3/dist-packages/pexpect/spawnbase.py", line 182, in read_nonblocking
    self._log(s, 'read')
  File "/usr/lib/python3/dist-packages/pexpect/spawnbase.py", line 130, in _log
    self.logfile.flush()
AttributeError: 'UnifiedPipeConnection' object has no attribute 'flush'
...

Как показано в листинге 2.11, это было решено путем отправки в канал символа конца передачи EOF(b'\x04'), тем самым извещая терминальное окно о конце передачи и необходимости сброса своих внутренних буферов.

Листинг 2.10

...
  23 class UnifiedPipeConnection ():
...
121      def flush(self) :
122          bbuffer = bytes(b'\x04')
123          conn=(lambda d : self.child_conn if d else self.parent_conn)(self.dir) 
124          conn.send_bytes( bbuffer ) # send EOF (End of File)
125          return
...

 

3. Библиография

3.1 Python. Class multiprocessing.connection.Connection

3.2 Python. System-specific parameters and functions

3.3 Python. Class io.TextIOBase

3.4 Python. class io.BufferedIOBase

3.5 Pycopy Documentation[pdf]

Сайт разработан в соответствии с рекомендациями консорциума W3C для языка разметки HTML5.

Об авторе можно прочитать здесь.

Copyright © 2015-2019 Андрей Ржавсков