1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 """Channel notifications support.
16
17 Classes and functions to support channel subscriptions and notifications
18 on those channels.
19
20 Notes:
21 - This code is based on experimental APIs and is subject to change.
22 - Notification does not do deduplication of notification ids, that's up to
23 the receiver.
24 - Storing the Channel between calls is up to the caller.
25
26
27 Example setting up a channel:
28
29 # Create a new channel that gets notifications via webhook.
30 channel = new_webhook_channel("https://example.com/my_web_hook")
31
32 # Store the channel, keyed by 'channel.id'. Store it before calling the
33 # watch method because notifications may start arriving before the watch
34 # method returns.
35 ...
36
37 resp = service.objects().watchAll(
38 bucket="some_bucket_id", body=channel.body()).execute()
39 channel.update(resp)
40
41 # Store the channel, keyed by 'channel.id'. Store it after being updated
42 # since the resource_id value will now be correct, and that's needed to
43 # stop a subscription.
44 ...
45
46
47 An example Webhook implementation using webapp2. Note that webapp2 puts
48 headers in a case insensitive dictionary, as headers aren't guaranteed to
49 always be upper case.
50
51 id = self.request.headers[X_GOOG_CHANNEL_ID]
52
53 # Retrieve the channel by id.
54 channel = ...
55
56 # Parse notification from the headers, including validating the id.
57 n = notification_from_headers(channel, self.request.headers)
58
59 # Do app specific stuff with the notification here.
60 if n.resource_state == 'sync':
61 # Code to handle sync state.
62 elif n.resource_state == 'exists':
63 # Code to handle the exists state.
64 elif n.resource_state == 'not_exists':
65 # Code to handle the not exists state.
66
67
68 Example of unsubscribing.
69
70 service.channels().stop(channel.body()).execute()
71 """
72 from __future__ import absolute_import
73
74 import datetime
75 import uuid
76
77 from googleapiclient import errors
78 from googleapiclient import _helpers as util
79 import six
80
81
82
83 EPOCH = datetime.datetime.utcfromtimestamp(0)
84
85
86
87 CHANNEL_PARAMS = {
88 "address": "address",
89 "id": "id",
90 "expiration": "expiration",
91 "params": "params",
92 "resourceId": "resource_id",
93 "resourceUri": "resource_uri",
94 "type": "type",
95 "token": "token",
96 }
97
98 X_GOOG_CHANNEL_ID = "X-GOOG-CHANNEL-ID"
99 X_GOOG_MESSAGE_NUMBER = "X-GOOG-MESSAGE-NUMBER"
100 X_GOOG_RESOURCE_STATE = "X-GOOG-RESOURCE-STATE"
101 X_GOOG_RESOURCE_URI = "X-GOOG-RESOURCE-URI"
102 X_GOOG_RESOURCE_ID = "X-GOOG-RESOURCE-ID"
106 new_headers = {}
107 for k, v in six.iteritems(headers):
108 new_headers[k.upper()] = v
109 return new_headers
110
113 """A Notification from a Channel.
114
115 Notifications are not usually constructed directly, but are returned
116 from functions like notification_from_headers().
117
118 Attributes:
119 message_number: int, The unique id number of this notification.
120 state: str, The state of the resource being monitored.
121 uri: str, The address of the resource being monitored.
122 resource_id: str, The unique identifier of the version of the resource at
123 this event.
124 """
125
126 @util.positional(5)
127 - def __init__(self, message_number, state, resource_uri, resource_id):
128 """Notification constructor.
129
130 Args:
131 message_number: int, The unique id number of this notification.
132 state: str, The state of the resource being monitored. Can be one
133 of "exists", "not_exists", or "sync".
134 resource_uri: str, The address of the resource being monitored.
135 resource_id: str, The identifier of the watched resource.
136 """
137 self.message_number = message_number
138 self.state = state
139 self.resource_uri = resource_uri
140 self.resource_id = resource_id
141
144 """A Channel for notifications.
145
146 Usually not constructed directly, instead it is returned from helper
147 functions like new_webhook_channel().
148
149 Attributes:
150 type: str, The type of delivery mechanism used by this channel. For
151 example, 'web_hook'.
152 id: str, A UUID for the channel.
153 token: str, An arbitrary string associated with the channel that
154 is delivered to the target address with each event delivered
155 over this channel.
156 address: str, The address of the receiving entity where events are
157 delivered. Specific to the channel type.
158 expiration: int, The time, in milliseconds from the epoch, when this
159 channel will expire.
160 params: dict, A dictionary of string to string, with additional parameters
161 controlling delivery channel behavior.
162 resource_id: str, An opaque id that identifies the resource that is
163 being watched. Stable across different API versions.
164 resource_uri: str, The canonicalized ID of the watched resource.
165 """
166
167 @util.positional(5)
168 - def __init__(
169 self,
170 type,
171 id,
172 token,
173 address,
174 expiration=None,
175 params=None,
176 resource_id="",
177 resource_uri="",
178 ):
179 """Create a new Channel.
180
181 In user code, this Channel constructor will not typically be called
182 manually since there are functions for creating channels for each specific
183 type with a more customized set of arguments to pass.
184
185 Args:
186 type: str, The type of delivery mechanism used by this channel. For
187 example, 'web_hook'.
188 id: str, A UUID for the channel.
189 token: str, An arbitrary string associated with the channel that
190 is delivered to the target address with each event delivered
191 over this channel.
192 address: str, The address of the receiving entity where events are
193 delivered. Specific to the channel type.
194 expiration: int, The time, in milliseconds from the epoch, when this
195 channel will expire.
196 params: dict, A dictionary of string to string, with additional parameters
197 controlling delivery channel behavior.
198 resource_id: str, An opaque id that identifies the resource that is
199 being watched. Stable across different API versions.
200 resource_uri: str, The canonicalized ID of the watched resource.
201 """
202 self.type = type
203 self.id = id
204 self.token = token
205 self.address = address
206 self.expiration = expiration
207 self.params = params
208 self.resource_id = resource_id
209 self.resource_uri = resource_uri
210
212 """Build a body from the Channel.
213
214 Constructs a dictionary that's appropriate for passing into watch()
215 methods as the value of body argument.
216
217 Returns:
218 A dictionary representation of the channel.
219 """
220 result = {
221 "id": self.id,
222 "token": self.token,
223 "type": self.type,
224 "address": self.address,
225 }
226 if self.params:
227 result["params"] = self.params
228 if self.resource_id:
229 result["resourceId"] = self.resource_id
230 if self.resource_uri:
231 result["resourceUri"] = self.resource_uri
232 if self.expiration:
233 result["expiration"] = self.expiration
234
235 return result
236
238 """Update a channel with information from the response of watch().
239
240 When a request is sent to watch() a resource, the response returned
241 from the watch() request is a dictionary with updated channel information,
242 such as the resource_id, which is needed when stopping a subscription.
243
244 Args:
245 resp: dict, The response from a watch() method.
246 """
247 for json_name, param_name in six.iteritems(CHANNEL_PARAMS):
248 value = resp.get(json_name)
249 if value is not None:
250 setattr(self, param_name, value)
251
254 """Parse a notification from the webhook request headers, validate
255 the notification, and return a Notification object.
256
257 Args:
258 channel: Channel, The channel that the notification is associated with.
259 headers: dict, A dictionary like object that contains the request headers
260 from the webhook HTTP request.
261
262 Returns:
263 A Notification object.
264
265 Raises:
266 errors.InvalidNotificationError if the notification is invalid.
267 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
268 """
269 headers = _upper_header_keys(headers)
270 channel_id = headers[X_GOOG_CHANNEL_ID]
271 if channel.id != channel_id:
272 raise errors.InvalidNotificationError(
273 "Channel id mismatch: %s != %s" % (channel.id, channel_id)
274 )
275 else:
276 message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
277 state = headers[X_GOOG_RESOURCE_STATE]
278 resource_uri = headers[X_GOOG_RESOURCE_URI]
279 resource_id = headers[X_GOOG_RESOURCE_ID]
280 return Notification(message_number, state, resource_uri, resource_id)
281
285 """Create a new webhook Channel.
286
287 Args:
288 url: str, URL to post notifications to.
289 token: str, An arbitrary string associated with the channel that
290 is delivered to the target address with each notification delivered
291 over this channel.
292 expiration: datetime.datetime, A time in the future when the channel
293 should expire. Can also be None if the subscription should use the
294 default expiration. Note that different services may have different
295 limits on how long a subscription lasts. Check the response from the
296 watch() method to see the value the service has set for an expiration
297 time.
298 params: dict, Extra parameters to pass on channel creation. Currently
299 not used for webhook channels.
300 """
301 expiration_ms = 0
302 if expiration:
303 delta = expiration - EPOCH
304 expiration_ms = (
305 delta.microseconds / 1000 + (delta.seconds + delta.days * 24 * 3600) * 1000
306 )
307 if expiration_ms < 0:
308 expiration_ms = 0
309
310 return Channel(
311 "web_hook",
312 str(uuid.uuid4()),
313 token,
314 url,
315 expiration=expiration_ms,
316 params=params,
317 )
318