"Fossies" - the Fresh Open Source Software Archive 
Member "cinder-14.0.2/cinder/tests/unit/volume/drivers/fusionstorage/test_fs_conf.py" (4 Oct 2019, 6224 Bytes) of package /linux/misc/openstack/cinder-14.0.2.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
1 # Copyright (c) 2018 Huawei Technologies Co., Ltd.
2 # All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15 import ddt
16
17 import mock
18 import os
19 import shutil
20 import tempfile
21
22 from six.moves import configparser
23
24 from cinder import test
25 from cinder.volume.drivers.fusionstorage import fs_conf
26
27
28 @ddt.ddt
29 class FusionStorageConfTestCase(test.TestCase):
30 def setUp(self):
31 super(FusionStorageConfTestCase, self).setUp()
32 self.tmp_dir = tempfile.mkdtemp()
33 self.conf = mock.Mock()
34 self._create_fake_conf_file()
35 self.fusionstorage_conf = fs_conf.FusionStorageConf(
36 self.conf, "cinder@fs")
37
38 def tearDown(self):
39 shutil.rmtree(self.tmp_dir)
40 super(FusionStorageConfTestCase, self).tearDown()
41
42 def _create_fake_conf_file(self):
43 self.conf.cinder_fusionstorage_conf_file = (
44 self.tmp_dir + '/cinder.conf')
45
46 config = configparser.ConfigParser()
47 config.add_section('backend_name')
48 config.set('backend_name', 'dsware_rest_url', 'https://fake_rest_site')
49 config.set('backend_name', 'san_login', 'fake_user')
50 config.set('backend_name', 'san_password', 'fake_passwd')
51 config.set('backend_name', 'dsware_storage_pools', 'fake_pool')
52
53 config.add_section('manager_ip')
54 config.set('manager_ip', 'fake_host', 'fake_ip')
55 config.write(open(self.conf.cinder_fusionstorage_conf_file, 'w'))
56
57 @mock.patch.object(fs_conf.FusionStorageConf, '_encode_authentication')
58 @mock.patch.object(fs_conf.FusionStorageConf, '_pools_name')
59 @mock.patch.object(fs_conf.FusionStorageConf, '_san_address')
60 @mock.patch.object(fs_conf.FusionStorageConf, '_san_user')
61 @mock.patch.object(fs_conf.FusionStorageConf, '_san_password')
62 def test_update_config_value(self, mock_san_password, mock_san_user,
63 mock_san_address, mock_pools_name,
64 mock_encode_authentication):
65 self.fusionstorage_conf.update_config_value()
66 mock_encode_authentication.assert_called_once_with()
67 mock_pools_name.assert_called_once_with()
68 mock_san_address.assert_called_once_with()
69 mock_san_user.assert_called_once_with()
70 mock_san_password.assert_called_once_with()
71
72 @mock.patch.object(os.path, 'exists')
73 def test__encode_authentication(self, mock_exists):
74 config = configparser.ConfigParser()
75 config.read(self.conf.cinder_fusionstorage_conf_file)
76 mock_exists.return_value = False
77
78 user_name = 'fake_user'
79 self.mock_object(
80 self.fusionstorage_conf.configuration, 'safe_get',
81 return_value=user_name)
82 self.fusionstorage_conf._encode_authentication()
83
84 password = 'fake_passwd'
85 self.mock_object(
86 self.fusionstorage_conf.configuration, 'safe_get',
87 return_value=password)
88 self.fusionstorage_conf._encode_authentication()
89
90 @mock.patch.object(os.path, 'exists')
91 @mock.patch.object(configparser.ConfigParser, 'set')
92 def test__rewrite_conf(self, mock_set, mock_exists):
93 mock_exists.return_value = False
94 mock_set.return_value = "success"
95 self.fusionstorage_conf._rewrite_conf('fake_name', 'fake_pwd')
96
97 def test__san_address(self):
98 address = 'https://fake_rest_site'
99 self.mock_object(
100 self.fusionstorage_conf.configuration, 'safe_get',
101 return_value=address)
102 self.fusionstorage_conf._san_address()
103 self.assertEqual('https://fake_rest_site',
104 self.fusionstorage_conf.configuration.san_address)
105
106 def test__san_user(self):
107 user = '!&&&ZmFrZV91c2Vy'
108 self.mock_object(
109 self.fusionstorage_conf.configuration, 'safe_get',
110 return_value=user)
111 self.fusionstorage_conf._san_user()
112 self.assertEqual(
113 'fake_user', self.fusionstorage_conf.configuration.san_user)
114
115 user = 'fake_user_2'
116 self.mock_object(
117 self.fusionstorage_conf.configuration, 'safe_get',
118 return_value=user)
119 self.fusionstorage_conf._san_user()
120 self.assertEqual(
121 'fake_user_2', self.fusionstorage_conf.configuration.san_user)
122
123 def test__san_password(self):
124 password = '!&&&ZmFrZV9wYXNzd2Q='
125 self.mock_object(
126 self.fusionstorage_conf.configuration, 'safe_get',
127 return_value=password)
128 self.fusionstorage_conf._san_password()
129 self.assertEqual(
130 'fake_passwd', self.fusionstorage_conf.configuration.san_password)
131
132 password = 'fake_passwd_2'
133 self.mock_object(
134 self.fusionstorage_conf.configuration, 'safe_get',
135 return_value=password)
136 self.fusionstorage_conf._san_password()
137 self.assertEqual('fake_passwd_2',
138 self.fusionstorage_conf.configuration.san_password)
139
140 def test__pools_name(self):
141 pools_name = 'fake_pool'
142 self.mock_object(
143 self.fusionstorage_conf.configuration, 'safe_get',
144 return_value=pools_name)
145 self.fusionstorage_conf._pools_name()
146 self.assertListEqual(
147 ['fake_pool'], self.fusionstorage_conf.configuration.pools_name)
148
149 def test__manager_ip(self):
150 manager_ips = {'fake_host': 'fake_ip'}
151 self.mock_object(
152 self.fusionstorage_conf.configuration, 'safe_get',
153 return_value=manager_ips)
154 self.fusionstorage_conf._manager_ip()
155 self.assertDictEqual({'fake_host': 'fake_ip'},
156 self.fusionstorage_conf.configuration.manager_ips)