"Fossies" - the Fresh Open Source Software Archive 
Member "monasca-log-api-2.9.0/monasca_log_api/app/base/log_publisher.py" (1 Apr 2019, 8414 Bytes) of package /linux/misc/openstack/monasca-log-api-2.9.0.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "log_publisher.py" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
2.7.0_vs_2.9.0.
1 # Copyright 2015 kornicameister@gmail.com
2 # Copyright 2016-2017 FUJITSU LIMITED
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15
16 import time
17
18 import falcon
19 from monasca_common.kafka import producer
20 from monasca_common.rest import utils as rest_utils
21 from oslo_log import log
22 from oslo_utils import encodeutils
23
24 from monasca_log_api.app.base import model
25 from monasca_log_api import conf
26 from monasca_log_api.monitoring import client
27 from monasca_log_api.monitoring import metrics
28
29 LOG = log.getLogger(__name__)
30 CONF = conf.CONF
31
32 _RETRY_AFTER = 60
33 _TIMESTAMP_KEY_SIZE = len(
34 bytearray(str(int(time.time() * 1000)).encode('utf-8')))
35 _TRUNCATED_PROPERTY_SIZE = len(
36 bytearray('"truncated": true'.encode('utf-8')))
37 _KAFKA_META_DATA_SIZE = 32
38 _TRUNCATION_SAFE_OFFSET = 1
39
40
41 class InvalidMessageException(Exception):
42 pass
43
44
45 class LogPublisher(object):
46 """Publishes log data to Kafka
47
48 LogPublisher is able to send single message to multiple configured topic.
49 It uses following configuration written in conf file ::
50
51 [log_publisher]
52 topics = 'logs'
53 kafka_url = 'localhost:8900'
54
55 Note:
56 Uses :py:class:`monasca_common.kafka.producer.KafkaProducer`
57 to ship logs to kafka. For more details
58 see `monasca_common`_ github repository.
59
60 .. _monasca_common: https://github.com/openstack/monasca-common
61
62 """
63
64 def __init__(self):
65 self._topics = CONF.log_publisher.topics
66 self.max_message_size = CONF.log_publisher.max_message_size
67
68 self._kafka_publisher = producer.KafkaProducer(
69 url=CONF.log_publisher.kafka_url
70 )
71 if CONF.monitoring.enable:
72 self._statsd = client.get_client()
73
74 # setup counter, gauges etc
75 self._logs_published_counter = self._statsd.get_counter(
76 metrics.LOGS_PUBLISHED_METRIC
77 )
78 self._publish_time_ms = self._statsd.get_timer(
79 metrics.LOGS_PUBLISH_TIME_METRIC
80 )
81 self._logs_lost_counter = self._statsd.get_counter(
82 metrics.LOGS_PUBLISHED_LOST_METRIC
83 )
84 self._logs_truncated_gauge = self._statsd.get_gauge(
85 metrics.LOGS_TRUNCATED_METRIC
86 )
87
88 LOG.info('Initializing LogPublisher <%s>', self)
89
90 def send_message(self, messages):
91 """Sends message to each configured topic.
92
93 Note:
94 Falsy messages (i.e. empty) are not shipped to kafka
95
96 See also
97 * :py:class:`monasca_log_api.common.model.Envelope`
98 * :py:meth:`._is_message_valid`
99
100 :param dict|list messages: instance (or instances) of log envelope
101 """
102
103 if not messages:
104 return
105 if not isinstance(messages, list):
106 messages = [messages]
107
108 sent_counter = 0
109 num_of_msgs = len(messages)
110
111 LOG.debug('About to publish %d messages to %s topics',
112 num_of_msgs, self._topics)
113
114 try:
115 send_messages = []
116
117 for message in messages:
118 msg = self._transform_message(message)
119 send_messages.append(msg)
120 if CONF.monitoring.enable:
121 with self._publish_time_ms.time(name=None):
122 self._publish(send_messages)
123 else:
124 self._publish(send_messages)
125
126 sent_counter = len(send_messages)
127 except Exception as ex:
128 LOG.exception('Failure in publishing messages to kafka')
129 raise ex
130 finally:
131 self._after_publish(sent_counter, num_of_msgs)
132
133 def _transform_message(self, message):
134 """Transforms message into JSON.
135
136 Method executes transformation operation for
137 single element. Operation is set of following
138 operations:
139
140 * checking if message is valid
141 (:py:func:`.LogPublisher._is_message_valid`)
142 * truncating message if necessary
143 (:py:func:`.LogPublisher._truncate`)
144
145 :param model.Envelope message: instance of message
146 :return: serialized message
147 :rtype: str
148 """
149 if not self._is_message_valid(message):
150 raise InvalidMessageException()
151 truncated = self._truncate(message)
152 return encodeutils.safe_encode(truncated, incoming='utf-8')
153
154 def _truncate(self, envelope):
155 """Truncates the message if needed.
156
157 Each message send to kafka is verified.
158 Method checks if message serialized to json
159 exceeds maximum allowed size that can be posted to kafka
160 queue. If so, method truncates message property of the log
161 by difference between message and allowed size.
162
163 :param Envelope envelope: original envelope
164 :return: serialized message
165 :rtype: str
166 """
167
168 msg_str = model.serialize_envelope(envelope)
169 envelope_size = ((len(bytearray(msg_str, 'utf-8', 'replace')) +
170 _TIMESTAMP_KEY_SIZE +
171 _KAFKA_META_DATA_SIZE)
172 if msg_str is not None else -1)
173
174 diff_size = ((envelope_size - self.max_message_size) +
175 _TRUNCATION_SAFE_OFFSET)
176
177 if diff_size > 1:
178 truncated_by = diff_size + _TRUNCATED_PROPERTY_SIZE
179
180 LOG.warning(('Detected message that exceeds %d bytes,'
181 'message will be truncated by %d bytes'),
182 self.max_message_size,
183 truncated_by)
184
185 log_msg = envelope['log']['message']
186 truncated_log_msg = log_msg[:-truncated_by]
187
188 envelope['log']['truncated'] = True
189 envelope['log']['message'] = truncated_log_msg
190 if CONF.monitoring.enable:
191 self._logs_truncated_gauge.send(name=None, value=truncated_by)
192
193 msg_str = rest_utils.as_json(envelope)
194 else:
195 if CONF.monitoring.enable:
196 self._logs_truncated_gauge.send(name=None, value=0)
197
198 return msg_str
199
200 def _publish(self, messages):
201 """Publishes messages to kafka.
202
203 :param list messages: list of messages
204 """
205 num_of_msg = len(messages)
206
207 LOG.debug('Publishing %d messages', num_of_msg)
208
209 try:
210 for topic in self._topics:
211 self._kafka_publisher.publish(
212 topic,
213 messages
214 )
215 LOG.debug('Sent %d messages to topic %s', num_of_msg, topic)
216 except Exception as ex:
217 raise falcon.HTTPServiceUnavailable('Service unavailable',
218 str(ex), 60)
219
220 @staticmethod
221 def _is_message_valid(message):
222 """Validates message before sending.
223
224 Methods checks if message is :py:class:`model.Envelope`.
225 By being instance of this class it is ensured that all required
226 keys are found and they will have their values.
227
228 """
229 return message and isinstance(message, model.Envelope)
230
231 def _after_publish(self, send_count, to_send_count):
232 """Executed after publishing to sent metrics.
233
234 :param int send_count: how many messages have been sent
235 :param int to_send_count: how many messages should be sent
236
237 """
238
239 failed_to_send = to_send_count - send_count
240
241 if failed_to_send == 0:
242 LOG.debug('Successfully published all [%d] messages',
243 send_count)
244 else:
245 error_str = ('Failed to send all messages, %d '
246 'messages out of %d have not been published')
247 LOG.error(error_str, failed_to_send, to_send_count)
248 if CONF.monitoring.enable:
249 self._logs_published_counter.increment(value=send_count)
250 self._logs_lost_counter.increment(value=failed_to_send)