"Fossies" - the Fresh Open Source Software Archive

Member "freezer-10.0.0/freezer/tests/unit/utils/test_streaming.py" (14 Apr 2021, 2278 Bytes) of package /linux/misc/openstack/freezer-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file.

    1 # (c) Copyright 2020 ZTE Corporation.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #     http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   12 # See the License for the specific language governing permissions and
   13 # limitations under the License.
   14 
   15 import queue
   16 import unittest
   17 
   18 from freezer.utils import streaming
   19 
   20 
   21 class StreamingTestCase(unittest.TestCase):
   22 
   23     def create_fifo(self, size):
   24         input_queue = streaming.RichQueue(size)
   25         read_except_queue = queue.Queue()
   26         write_except_queue = queue.Queue()
   27 
   28         read_stream = streaming.QueuedThread(self.backup_stream,
   29                                              input_queue,
   30                                              read_except_queue)
   31 
   32         write_stream = streaming.QueuedThread(self.write_stream,
   33                                               input_queue,
   34                                               write_except_queue)
   35 
   36         read_stream.daemon = True
   37         write_stream.daemon = True
   38         read_stream.start()
   39         write_stream.start()
   40         read_stream.join()
   41         write_stream.join()
   42 
   43         return input_queue
   44 
   45     def backup_stream(self, rich_queue):
   46         rich_queue.put_messages('ancd')
   47 
   48     def write_stream(self, rich_queue):
   49         for message in rich_queue.get_messages():
   50             pass
   51 
   52     def test_stream_1(self):
   53         input_queue = self.create_fifo(1)
   54         assert input_queue.finish_transmission is True
   55 
   56     def test_stream_2(self):
   57         input_queue = self.create_fifo(2)
   58         assert input_queue.finish_transmission is True
   59 
   60     def test_stream_3(self):
   61         input_queue = self.create_fifo(3)
   62         assert input_queue.finish_transmission is True
   63 
   64     def test_stream_4(self):
   65         input_queue = self.create_fifo(4)
   66         assert input_queue.finish_transmission is True
   67 
   68     def test_stream_5(self):
   69         input_queue = self.create_fifo(5)
   70         assert input_queue.finish_transmission is True