| Home | Trees | Indices | Help |
|---|
|
|
1 #
2 # Licensed to the Apache Software Foundation (ASF) under one
3 # or more contributor license agreements. See the NOTICE file
4 # distributed with this work for additional information
5 # regarding copyright ownership. The ASF licenses this file
6 # to you under the Apache License, Version 2.0 (the
7 # "License"); you may not use this file except in compliance
8 # with the License. You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing,
13 # software distributed under the License is distributed on an
14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 # KIND, either express or implied. See the License for the
16 # specific language governing permissions and limitations
17 # under the License.
18 #
19
20 import atexit
21 import functools
22 import os
23 import sys
24 import time
25 import weakref
26
27 try:
28 import opentracing
29 import jaeger_client
30 from opentracing.ext import tags
31 from opentracing.propagation import Format
32 except ImportError:
33 raise ImportError('proton tracing requires opentracing and jaeger_client modules')
34
35 import proton
36 from proton import Sender as ProtonSender
37 from proton.handlers import (
38 OutgoingMessageHandler as ProtonOutgoingMessageHandler,
39 IncomingMessageHandler as ProtonIncomingMessageHandler
40 )
41
42 _tracer = None
43 _trace_key = proton.symbol('x-opt-qpid-tracestate')
44
46 global _tracer
47 if _tracer is not None:
48 return _tracer
49 exe = sys.argv[0] if sys.argv[0] else 'interactive-session'
50 return init_tracer(os.path.basename(exe))
51
53 time.sleep(1)
54 c = opentracing.global_tracer().close()
55 while not c.done():
56 time.sleep(0.5)
57
59 global _tracer
60 if _tracer is not None:
61 return _tracer
62
63 config = jaeger_client.Config(
64 config={},
65 service_name=service_name,
66 validate=True
67 )
68 config.initialize_tracer()
69 _tracer = opentracing.global_tracer()
70 # A nasty hack to ensure enough time for the tracing data to be flushed
71 atexit.register(_fini_tracer)
72 return _tracer
73
74
77 if self.delegate is not None:
78 tracer = get_tracer()
79 message = event.message
80 receiver = event.receiver
81 connection = event.connection
82 span_tags = {
83 tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER,
84 tags.MESSAGE_BUS_DESTINATION: receiver.source.address,
85 tags.PEER_ADDRESS: connection.connected_address,
86 tags.PEER_HOSTNAME: connection.hostname,
87 'inserted_by': 'proton-message-tracing'
88 }
89 if message.annotations is not None:
90 headers = message.annotations[_trace_key]
91 span_ctx = tracer.extract(Format.TEXT_MAP, headers)
92 with tracer.start_active_span('amqp-delivery-receive', child_of=span_ctx, tags=span_tags):
93 proton._events._dispatch(self.delegate, 'on_message', event)
94 else:
95 with tracer.start_active_span('amqp-delivery-receive', ignore_active_span=True, tags=span_tags):
96 proton._events._dispatch(self.delegate, 'on_message', event)
97
100 if self.delegate is not None:
101 delivery = event.delivery
102 state = delivery.remote_state
103 span = delivery.span
104 span.set_tag('delivery-terminal-state', state.name)
105 span.log_kv({'event': 'delivery settled', 'state': state.name})
106 span.finish()
107 proton._events._dispatch(self.delegate, 'on_settled', event)
108
111 tracer = get_tracer()
112 connection = self.connection
113 span_tags = {
114 tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER,
115 tags.MESSAGE_BUS_DESTINATION: self.target.address,
116 tags.PEER_ADDRESS: connection.connected_address,
117 tags.PEER_HOSTNAME: connection.hostname,
118 'inserted_by': 'proton-message-tracing'
119 }
120 span = tracer.start_span('amqp-delivery-send', tags=span_tags)
121 headers = {}
122 tracer.inject(span, Format.TEXT_MAP, headers)
123 if msg.annotations is None:
124 msg.annotations = { _trace_key: headers }
125 else:
126 msg.annotations[_trace_key] = headers
127 delivery = ProtonSender.send(self, msg)
128 delivery.span = span
129 span.set_tag('delivery-tag', delivery.tag)
130 return delivery
131
132 # Monkey patch proton for tracing (need to patch both internal and external names)
133 proton._handlers.IncomingMessageHandler = IncomingMessageHandler
134 proton._handlers.OutgoingMessageHandler = OutgoingMessageHandler
135 proton._endpoints.Sender = Sender
136 proton.handlers.IncomingMessageHandler = IncomingMessageHandler
137 proton.handlers.OutgoingMessageHandler = OutgoingMessageHandler
138 proton.Sender = Sender
139
| Home | Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Tue Oct 15 19:10:47 2019 | http://epydoc.sourceforge.net |