Skip to content

mug.py

mug

Objects and methods related to connection to the mug.

Attributes

DISCONNECT_DELAY module-attribute

DISCONNECT_DELAY = 120

P module-attribute

P = ParamSpec('P')

T module-attribute

T = TypeVar('T')

logger module-attribute

logger = getLogger(__name__)

Classes

EmberMug

Handle actual the actual mug connection and update states.

Source code in ember_mug/mug.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
class EmberMug:
    """Handle actual the actual mug connection and update states."""

    def __init__(
        self,
        ble_device: BLEDevice,
        model_info: ModelInfo,
        use_metric: bool = True,
        debug: bool = False,
        **kwargs: Any,
    ) -> None:
        """Initialize connection manager."""
        self.device = ble_device
        self.data = MugData(model_info, use_metric=use_metric, debug=debug)

        self.debug = debug
        self._connect_lock = asyncio.Lock()
        self._operation_lock = asyncio.Lock()
        self._expected_disconnect = False
        self._callbacks: dict[Callable[[MugData], None], Callable[[], None]] = {}
        self._client: BleakClient = None  # type: ignore[assignment]
        self._queued_updates: set[str] = set()
        self._latest_events: dict[int, float] = {}
        self._client_kwargs: dict[str, str] = {}

        logger.debug("New mug connection initialized.")
        self.set_client_options(**kwargs)

    def ble_event_callback(self, ble_device: BLEDevice, advertisement_data: AdvertisementData) -> None:
        """Update BLE Device and, if needed, model information."""
        self.device = ble_device
        logger.debug("Set new device from %s to %s", self.device, ble_device)
        if (
            not self.data.model_info.model
            and advertisement_data.manufacturer_data
            and (model_info := get_model_info_from_advertiser_data(advertisement_data))
        ):
            logger.debug(
                "Updated model info from advertisement data (%s) -> %s",
                advertisement_data,
                model_info,
            )
            self.data.model_info = model_info

    @cached_property
    def model_name(self) -> str | None:
        """Shortcut to model name."""
        return self.data.model_info.model.value if self.data.model_info.model else None

    @property
    def can_write(self) -> bool:
        """Check if the mug can support write operations."""
        return self.data.udsk is not None

    def has_attribute(self, attribute: str) -> bool:
        """Check whether the device has the given attribute."""
        return attribute in self.data.model_info.device_attributes

    async def _ensure_connection(self) -> None:
        """Connect to mug."""
        if self._connect_lock.locked():
            logger.debug("Connection to %s already in progress. Waiting first.", self.device.name)

        if self._client is not None and self._client.is_connected:
            return

        async with self._connect_lock:
            # Also check after lock is acquired
            if self._client is not None and self._client.is_connected:
                return
            try:
                logger.debug("Establishing a new connection from mug (ID: %s) to %s", id(self), self.device)
                client = await establish_connection(
                    client_class=BleakClient,
                    device=self.device,
                    name=f"{self.data.name} ({self.device.address})",
                    disconnected_callback=self._disconnect_callback,
                    ble_device_callback=lambda: self.device,
                )
                if self.debug is True:
                    await discover_services(client)
                self._expected_disconnect = False
            except (asyncio.TimeoutError, BleakError) as error:
                logger.debug("%s: Failed to connect to the mug: %s", self.device, error)
                raise error
            # Attempt to pair for good measure
            try:
                await client.pair()
            except (BleakError, EOFError):
                pass
            except NotImplementedError:
                # workaround for Home Assistant ESPHome Proxy backend which does not allow pairing.
                logger.warning(
                    "Pairing not implemented. "
                    "If your mug is still in pairing mode (blinking blue) tap the button on the bottom to exit.",
                )
            self._client = client
            await self.subscribe()

    async def _read(self, characteristic: MugCharacteristic) -> bytearray:
        """Help read characteristic from Mug."""
        self._check_operation_lock()
        async with self._operation_lock:
            data = await self._client.read_gatt_char(characteristic.uuid)
            logger.debug("Read attribute '%s' with value '%s'", characteristic, data)
            return data

    async def _write(self, characteristic: MugCharacteristic, data: bytearray) -> None:
        """Help write characteristic to Mug."""
        self._check_operation_lock()
        async with self._operation_lock:
            await self._ensure_connection()
            try:
                await self._client.write_gatt_char(characteristic.uuid, data)
                logger.debug("Wrote '%s' to attribute '%s'", data, characteristic)
            except BleakError as e:
                logger.error("Failed to write '%s' to attribute '%s': %s", data, characteristic, e)
                raise

    async def disconnect(self, expected: bool = True) -> None:
        """Disconnect from mug and stop listening to notifications."""
        logger.debug("%s disconnect called", "Expected" if expected else "Unexpected")
        self._expected_disconnect = expected
        if self._client and self._client.is_connected:
            async with self._connect_lock:
                await self.unsubscribe()
                await self._client.disconnect()
        self._client = None  # type: ignore[assignment]
        self._expected_disconnect = False

    def _disconnect_callback(self, client: BleakClient) -> None:
        """Disconnect from device."""
        if self._expected_disconnect:
            logger.debug("Disconnect callback called")
        else:
            logger.debug("Unexpectedly disconnected")

    def _fire_callbacks(self) -> None:
        """Fire the callbacks."""
        logger.debug("Firing callbacks: %s", self._callbacks)
        for callback in self._callbacks:
            callback(self.data)

    def _check_operation_lock(self) -> None:
        """Check and print message if lock occupied."""
        if self._operation_lock.locked():
            logger.debug("Operation already in progress. waiting for it to complete")

    def register_callback(self, callback: Callable[[MugData], None]) -> Callable[[], None]:
        """Register a callback to be called when the state changes."""
        if existing_unregister_callback := self._callbacks.get(callback):
            logger.debug("Callback %s already registered", callback)
            return existing_unregister_callback

        def unregister_callback() -> None:
            if callback in self._callbacks:
                del self._callbacks[callback]
            logger.debug("Unregistered callback: %s", callback)

        self._callbacks[callback] = unregister_callback
        logger.debug("Registered callback: %s", callback)
        return unregister_callback

    async def discover_services(self) -> dict[str, Any]:
        """
        Discover services for development or debugging.

        Call discover_services with this client, ensuring the connection is active first.
        """
        self._check_operation_lock()
        async with self._operation_lock:
            await self._ensure_connection()
            return await discover_services(self._client)

    async def get_meta(self) -> MugMeta:
        """Fetch Meta info from the mug (Serial number and ID)."""
        return MugMeta.from_bytes(await self._read(MugCharacteristic.MUG_ID))

    async def get_battery(self) -> BatteryInfo:
        """Get Battery percent from mug gatt."""
        return BatteryInfo.from_bytes(await self._read(MugCharacteristic.BATTERY))

    @require_attribute("led_colour")
    async def get_led_colour(self) -> Colour:
        """Get RGBA colours from mug gatt."""
        colour_data = await self._read(MugCharacteristic.LED)
        return Colour(*bytearray(colour_data))

    @require_attribute("led_colour")
    async def set_led_colour(self, colour: Colour) -> None:
        """Set new target temp for mug."""
        await self._write(MugCharacteristic.LED, colour.as_bytearray())
        self.data.led_colour = colour

    async def get_target_temp(self) -> float:
        """Get target temp form mug gatt."""
        temp_bytes = await self._read(MugCharacteristic.TARGET_TEMPERATURE)
        return temp_from_bytes(temp_bytes, self.data.use_metric)

    async def set_target_temp(self, target_temp: float) -> None:
        """Set new target temp for mug."""
        unit = TemperatureUnit.CELSIUS if self.data.use_metric else TemperatureUnit.FAHRENHEIT
        min_temp, max_temp = MIN_MAX_TEMPS[unit]
        if target_temp != 0 and not (min_temp <= target_temp <= max_temp):
            raise ValueError(f"Temperature should be between {min_temp} and {max_temp} or 0.")

        if self.data.use_metric is False:
            target_temp = convert_temp_to_celsius(target_temp)

        target = bytearray(int(target_temp / 0.01).to_bytes(2, "little"))
        await self._write(MugCharacteristic.TARGET_TEMPERATURE, target)
        self.data.target_temp = target_temp

    async def get_current_temp(self) -> float:
        """Get current temp from mug gatt."""
        temp_bytes = await self._read(MugCharacteristic.CURRENT_TEMPERATURE)
        return temp_from_bytes(temp_bytes, self.data.use_metric)

    async def get_liquid_level(self) -> int:
        """Get liquid level from mug gatt."""
        liquid_level_bytes = await self._read(MugCharacteristic.LIQUID_LEVEL)
        return bytes_to_little_int(liquid_level_bytes)

    @require_attribute("volume_level")
    async def get_volume_level(self) -> VolumeLevel | None:
        """Get volume level from mug gatt."""
        volume_bytes = await self._read(MugCharacteristic.VOLUME)
        volume_int = bytes_to_little_int(volume_bytes)
        return VolumeLevel.from_state(volume_int)

    @require_attribute("volume_level")
    async def set_volume_level(self, volume: int | VolumeLevel) -> None:
        """Set volume_level on Travel Mug."""
        if not isinstance(volume, VolumeLevel) and isinstance(volume, int) and volume not in (0, 1, 2):
            msg = "Volume level value should be 0, 1, 2 or a VolumeLevel enum"
            raise ValueError(msg)
        volume_level = volume if isinstance(volume, VolumeLevel) else VolumeLevel.from_state(volume)
        await self._write(MugCharacteristic.VOLUME, bytearray([volume_level.state]))
        self.data.volume_level = volume_level

    async def get_liquid_state(self) -> LiquidState:
        """Get liquid state from mug gatt."""
        liquid_state_bytes = await self._read(MugCharacteristic.LIQUID_STATE)
        state = bytes_to_little_int(liquid_state_bytes)
        return LiquidState(state)

    @require_attribute("name")
    async def get_name(self) -> str:
        """Get mug name from gatt."""
        name_bytes: bytearray = await self._read(MugCharacteristic.MUG_NAME)
        return bytes(name_bytes).decode("utf8")

    @require_attribute("name")
    async def set_name(self, name: str) -> None:
        """Assign new name to mug."""
        if MUG_NAME_REGEX.match(name) is None:
            msg = "Name cannot contain any special characters and must be 16 characters or less"
            raise ValueError(msg)
        await self._write(MugCharacteristic.MUG_NAME, bytearray(name.encode("utf8")))
        self.data.name = name

    async def get_udsk(self) -> str | None:
        """Get mug udsk from gatt."""
        try:
            data = await self._read(MugCharacteristic.UDSK)
            if data == bytearray([0] * 20):
                return None
            return decode_byte_string(data)
        except (BleakError, ValueError) as e:
            logger.debug("Unable to read UDSK: %s", e)
        return None

    async def set_udsk(self, udsk: str) -> None:
        """Attempt to write udsk."""
        await self._write(MugCharacteristic.UDSK, bytearray(encode_byte_string(udsk)))
        self.data.udsk = udsk

    async def get_dsk(self) -> str:
        """Get mug dsk from gatt."""
        try:
            return decode_byte_string(await self._read(MugCharacteristic.DSK))
        except BleakError as e:
            logger.debug("Unable to read DSK: %s", e)
        return ""

    async def get_temperature_unit(self) -> TemperatureUnit:
        """Get mug temp unit."""
        unit_bytes = await self._read(MugCharacteristic.TEMPERATURE_UNIT)
        if bytes_to_little_int(unit_bytes) == 0:
            return TemperatureUnit.CELSIUS
        return TemperatureUnit.FAHRENHEIT

    async def set_temperature_unit(self, unit: Literal["°C", "°F"] | TemperatureUnit | Enum) -> None:
        """Set mug unit."""
        text_unit = unit.value if isinstance(unit, Enum) else unit
        unit_bytes = bytearray([1 if text_unit == TemperatureUnit.FAHRENHEIT else 0])
        await self._write(MugCharacteristic.TEMPERATURE_UNIT, unit_bytes)
        self.data.temperature_unit = TemperatureUnit(unit)

    async def ensure_correct_unit(self) -> None:
        """Set mug unit if it's not what we want."""
        desired = TemperatureUnit.CELSIUS if self.data.use_metric else TemperatureUnit.FAHRENHEIT
        if self.data.temperature_unit != desired:
            await self.set_temperature_unit(desired)

    async def get_battery_voltage(self) -> int:
        """Get voltage and charge time."""
        battery_voltage_bytes = await self._read(MugCharacteristic.CONTROL_REGISTER_DATA)
        return bytes_to_big_int(battery_voltage_bytes[:1])

    async def get_date_time_zone(self) -> datetime | None:
        """Get date and time zone."""
        date_time_zone_bytes = await self._read(MugCharacteristic.DATE_TIME_AND_ZONE)
        time_value = bytes_to_big_int(date_time_zone_bytes[:4])
        return datetime.fromtimestamp(time_value, UTC) if time_value > 0 else None

    async def get_firmware(self) -> MugFirmwareInfo:
        """Get firmware info."""
        return MugFirmwareInfo.from_bytes(await self._read(MugCharacteristic.FIRMWARE))

    async def update_initial(self) -> list[Change]:
        """Update attributes that don't normally change and don't need to be regularly updated."""
        return await self._update_multiple(INITIAL_ATTRS)

    async def update_all(self) -> list[Change]:
        """Update all standard attributes."""
        return await self._update_multiple(
            self.data.model_info.device_attributes - INITIAL_ATTRS,
        )

    async def _update_multiple(self, attrs: set[str]) -> list[Change]:
        """Update a list of attributes from the mug."""
        logger.debug("Updating the following attributes: %s", attrs)
        await self._ensure_connection()
        changes = self.data.update_info(**{attr: await getattr(self, f"get_{attr}")() for attr in attrs})
        if changes:
            self._fire_callbacks()
        logger.debug("Attributes updated: %s", changes)
        return changes

    async def update_queued_attributes(self) -> list[Change]:
        """Update all attributes in queue."""
        logger.debug("Updating queued attributes: %s", self._queued_updates)
        if not self._queued_updates:
            return []
        queued_updates = set(self._queued_updates)
        self._queued_updates.clear()
        await self._ensure_connection()
        changes = self.data.update_info(**{attr: await getattr(self, f"get_{attr}")() for attr in queued_updates})
        if changes:
            self._fire_callbacks()
        return changes

    def _notify_callback(self, characteristic: BleakGATTCharacteristic, data: bytearray) -> None:
        """Push events from the mug to indicate changes."""
        event_id = data[0]
        now = time()
        if (last_time := self._latest_events.get(event_id)) and now - last_time < 5:
            return
        self._latest_events[event_id] = now

        if characteristic.uuid == MugCharacteristic.STATISTICS.uuid:
            logger.info("Statistics received from %s (%s) - Data: %s.", self.model_name, event_id, data)
            return

        logger.debug("Push event received from %s (%s) - Data: %s.", self.model_name, event_id, data)

        # Check known IDs
        if event_id in PUSH_EVENT_BATTERY_IDS:
            # 1, 2 and 3 : Battery Change
            if event_id in (
                PushEvent.CHARGER_CONNECTED,
                PushEvent.CHARGER_DISCONNECTED,
            ):
                self.data.battery = BatteryInfo(
                    percent=self.data.battery.percent if self.data.battery else 0,
                    on_charging_base=event_id == PushEvent.CHARGER_CONNECTED,
                )
                self._fire_callbacks()
            # All indicate changes in battery
            self._queued_updates.add("battery")
        elif event_id == PushEvent.TARGET_TEMPERATURE_CHANGED:
            self._queued_updates.add("target_temp")
        elif event_id == PushEvent.DRINK_TEMPERATURE_CHANGED:
            self._queued_updates.add("current_temp")
        elif event_id == PushEvent.AUTH_INFO_NOT_FOUND:
            logger.warning("Auth info missing")
        elif event_id == PushEvent.LIQUID_LEVEL_CHANGED:
            self._queued_updates.add("liquid_level")
        elif event_id == PushEvent.LIQUID_STATE_CHANGED:
            self._queued_updates.add("liquid_state")
        elif event_id == PushEvent.BATTERY_VOLTAGE_STATE_CHANGED:
            self._queued_updates.add("battery_voltage")
        else:
            logger.debug("Unknown event received %s", event_id)

    async def unsubscribe(self) -> None:
        """Unsubscribe from Mug notifications."""
        logger.debug("Unsubscribe called")
        if not self._client:
            return
        with contextlib.suppress(BleakError):
            await self._client.stop_notify(MugCharacteristic.PUSH_EVENT.uuid)
            if self.debug:
                await self._client.stop_notify(MugCharacteristic.STATISTICS.uuid)

    async def subscribe(self) -> None:
        """Subscribe to notifications from the mug."""
        try:
            logger.info("Subscribe to Push Events")
            await self._client.start_notify(MugCharacteristic.PUSH_EVENT.uuid, self._notify_callback)
            if self.debug:
                await self._client.start_notify(MugCharacteristic.STATISTICS.uuid, self._notify_callback)
        except Exception as e:
            logger.warning("Failed to subscribe to state attr: %s", e)

    def set_client_options(self, **kwargs: str) -> None:
        """Update options in case they need to overriden in some cases."""
        if kwargs.get("adapter") and IS_LINUX is False:
            msg = "The adapter option is only valid for the Linux BlueZ Backend."
            raise ValueError(msg)
        self._client_kwargs = {**kwargs}

    @contextlib.asynccontextmanager
    async def connection(self, **kwargs: str) -> AsyncIterator[EmberMug]:
        """Establish a connection and close automatically."""
        self.set_client_options(**kwargs)
        # This will happen automatically, but calling it now will give us immediate feedback
        await self._ensure_connection()
        yield self
        await self.disconnect()
Attributes
can_write property
can_write: bool

Check if the mug can support write operations.

data instance-attribute
data = MugData(
    model_info, use_metric=use_metric, debug=debug
)
debug instance-attribute
debug = debug
device instance-attribute
device = ble_device
model_name cached property
model_name: str | None

Shortcut to model name.

Functions
ble_event_callback
ble_event_callback(
    ble_device: BLEDevice,
    advertisement_data: AdvertisementData,
) -> None

Update BLE Device and, if needed, model information.

Source code in ember_mug/mug.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def ble_event_callback(self, ble_device: BLEDevice, advertisement_data: AdvertisementData) -> None:
    """Update BLE Device and, if needed, model information."""
    self.device = ble_device
    logger.debug("Set new device from %s to %s", self.device, ble_device)
    if (
        not self.data.model_info.model
        and advertisement_data.manufacturer_data
        and (model_info := get_model_info_from_advertiser_data(advertisement_data))
    ):
        logger.debug(
            "Updated model info from advertisement data (%s) -> %s",
            advertisement_data,
            model_info,
        )
        self.data.model_info = model_info
connection async
connection(**kwargs: str) -> AsyncIterator[EmberMug]

Establish a connection and close automatically.

Source code in ember_mug/mug.py
501
502
503
504
505
506
507
508
@contextlib.asynccontextmanager
async def connection(self, **kwargs: str) -> AsyncIterator[EmberMug]:
    """Establish a connection and close automatically."""
    self.set_client_options(**kwargs)
    # This will happen automatically, but calling it now will give us immediate feedback
    await self._ensure_connection()
    yield self
    await self.disconnect()
disconnect async
disconnect(expected: bool = True) -> None

Disconnect from mug and stop listening to notifications.

Source code in ember_mug/mug.py
197
198
199
200
201
202
203
204
205
206
async def disconnect(self, expected: bool = True) -> None:
    """Disconnect from mug and stop listening to notifications."""
    logger.debug("%s disconnect called", "Expected" if expected else "Unexpected")
    self._expected_disconnect = expected
    if self._client and self._client.is_connected:
        async with self._connect_lock:
            await self.unsubscribe()
            await self._client.disconnect()
    self._client = None  # type: ignore[assignment]
    self._expected_disconnect = False
discover_services async
discover_services() -> dict[str, Any]

Discover services for development or debugging.

Call discover_services with this client, ensuring the connection is active first.

Source code in ember_mug/mug.py
241
242
243
244
245
246
247
248
249
250
async def discover_services(self) -> dict[str, Any]:
    """
    Discover services for development or debugging.

    Call discover_services with this client, ensuring the connection is active first.
    """
    self._check_operation_lock()
    async with self._operation_lock:
        await self._ensure_connection()
        return await discover_services(self._client)
ensure_correct_unit async
ensure_correct_unit() -> None

Set mug unit if it's not what we want.

Source code in ember_mug/mug.py
377
378
379
380
381
async def ensure_correct_unit(self) -> None:
    """Set mug unit if it's not what we want."""
    desired = TemperatureUnit.CELSIUS if self.data.use_metric else TemperatureUnit.FAHRENHEIT
    if self.data.temperature_unit != desired:
        await self.set_temperature_unit(desired)
get_battery async
get_battery() -> BatteryInfo

Get Battery percent from mug gatt.

Source code in ember_mug/mug.py
256
257
258
async def get_battery(self) -> BatteryInfo:
    """Get Battery percent from mug gatt."""
    return BatteryInfo.from_bytes(await self._read(MugCharacteristic.BATTERY))
get_battery_voltage async
get_battery_voltage() -> int

Get voltage and charge time.

Source code in ember_mug/mug.py
383
384
385
386
async def get_battery_voltage(self) -> int:
    """Get voltage and charge time."""
    battery_voltage_bytes = await self._read(MugCharacteristic.CONTROL_REGISTER_DATA)
    return bytes_to_big_int(battery_voltage_bytes[:1])
get_current_temp async
get_current_temp() -> float

Get current temp from mug gatt.

Source code in ember_mug/mug.py
291
292
293
294
async def get_current_temp(self) -> float:
    """Get current temp from mug gatt."""
    temp_bytes = await self._read(MugCharacteristic.CURRENT_TEMPERATURE)
    return temp_from_bytes(temp_bytes, self.data.use_metric)
get_date_time_zone async
get_date_time_zone() -> datetime | None

Get date and time zone.

Source code in ember_mug/mug.py
388
389
390
391
392
async def get_date_time_zone(self) -> datetime | None:
    """Get date and time zone."""
    date_time_zone_bytes = await self._read(MugCharacteristic.DATE_TIME_AND_ZONE)
    time_value = bytes_to_big_int(date_time_zone_bytes[:4])
    return datetime.fromtimestamp(time_value, UTC) if time_value > 0 else None
get_dsk async
get_dsk() -> str

Get mug dsk from gatt.

Source code in ember_mug/mug.py
355
356
357
358
359
360
361
async def get_dsk(self) -> str:
    """Get mug dsk from gatt."""
    try:
        return decode_byte_string(await self._read(MugCharacteristic.DSK))
    except BleakError as e:
        logger.debug("Unable to read DSK: %s", e)
    return ""
get_firmware async
get_firmware() -> MugFirmwareInfo

Get firmware info.

Source code in ember_mug/mug.py
394
395
396
async def get_firmware(self) -> MugFirmwareInfo:
    """Get firmware info."""
    return MugFirmwareInfo.from_bytes(await self._read(MugCharacteristic.FIRMWARE))
get_led_colour async
get_led_colour() -> Colour

Get RGBA colours from mug gatt.

Source code in ember_mug/mug.py
260
261
262
263
264
@require_attribute("led_colour")
async def get_led_colour(self) -> Colour:
    """Get RGBA colours from mug gatt."""
    colour_data = await self._read(MugCharacteristic.LED)
    return Colour(*bytearray(colour_data))
get_liquid_level async
get_liquid_level() -> int

Get liquid level from mug gatt.

Source code in ember_mug/mug.py
296
297
298
299
async def get_liquid_level(self) -> int:
    """Get liquid level from mug gatt."""
    liquid_level_bytes = await self._read(MugCharacteristic.LIQUID_LEVEL)
    return bytes_to_little_int(liquid_level_bytes)
get_liquid_state async
get_liquid_state() -> LiquidState

Get liquid state from mug gatt.

Source code in ember_mug/mug.py
318
319
320
321
322
async def get_liquid_state(self) -> LiquidState:
    """Get liquid state from mug gatt."""
    liquid_state_bytes = await self._read(MugCharacteristic.LIQUID_STATE)
    state = bytes_to_little_int(liquid_state_bytes)
    return LiquidState(state)
get_meta async
get_meta() -> MugMeta

Fetch Meta info from the mug (Serial number and ID).

Source code in ember_mug/mug.py
252
253
254
async def get_meta(self) -> MugMeta:
    """Fetch Meta info from the mug (Serial number and ID)."""
    return MugMeta.from_bytes(await self._read(MugCharacteristic.MUG_ID))
get_name async
get_name() -> str

Get mug name from gatt.

Source code in ember_mug/mug.py
324
325
326
327
328
@require_attribute("name")
async def get_name(self) -> str:
    """Get mug name from gatt."""
    name_bytes: bytearray = await self._read(MugCharacteristic.MUG_NAME)
    return bytes(name_bytes).decode("utf8")
get_target_temp async
get_target_temp() -> float

Get target temp form mug gatt.

Source code in ember_mug/mug.py
272
273
274
275
async def get_target_temp(self) -> float:
    """Get target temp form mug gatt."""
    temp_bytes = await self._read(MugCharacteristic.TARGET_TEMPERATURE)
    return temp_from_bytes(temp_bytes, self.data.use_metric)
get_temperature_unit async
get_temperature_unit() -> TemperatureUnit

Get mug temp unit.

Source code in ember_mug/mug.py
363
364
365
366
367
368
async def get_temperature_unit(self) -> TemperatureUnit:
    """Get mug temp unit."""
    unit_bytes = await self._read(MugCharacteristic.TEMPERATURE_UNIT)
    if bytes_to_little_int(unit_bytes) == 0:
        return TemperatureUnit.CELSIUS
    return TemperatureUnit.FAHRENHEIT
get_udsk async
get_udsk() -> str | None

Get mug udsk from gatt.

Source code in ember_mug/mug.py
339
340
341
342
343
344
345
346
347
348
async def get_udsk(self) -> str | None:
    """Get mug udsk from gatt."""
    try:
        data = await self._read(MugCharacteristic.UDSK)
        if data == bytearray([0] * 20):
            return None
        return decode_byte_string(data)
    except (BleakError, ValueError) as e:
        logger.debug("Unable to read UDSK: %s", e)
    return None
get_volume_level async
get_volume_level() -> VolumeLevel | None

Get volume level from mug gatt.

Source code in ember_mug/mug.py
301
302
303
304
305
306
@require_attribute("volume_level")
async def get_volume_level(self) -> VolumeLevel | None:
    """Get volume level from mug gatt."""
    volume_bytes = await self._read(MugCharacteristic.VOLUME)
    volume_int = bytes_to_little_int(volume_bytes)
    return VolumeLevel.from_state(volume_int)
has_attribute
has_attribute(attribute: str) -> bool

Check whether the device has the given attribute.

Source code in ember_mug/mug.py
132
133
134
def has_attribute(self, attribute: str) -> bool:
    """Check whether the device has the given attribute."""
    return attribute in self.data.model_info.device_attributes
register_callback
register_callback(
    callback: Callable[[MugData], None]
) -> Callable[[], None]

Register a callback to be called when the state changes.

Source code in ember_mug/mug.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def register_callback(self, callback: Callable[[MugData], None]) -> Callable[[], None]:
    """Register a callback to be called when the state changes."""
    if existing_unregister_callback := self._callbacks.get(callback):
        logger.debug("Callback %s already registered", callback)
        return existing_unregister_callback

    def unregister_callback() -> None:
        if callback in self._callbacks:
            del self._callbacks[callback]
        logger.debug("Unregistered callback: %s", callback)

    self._callbacks[callback] = unregister_callback
    logger.debug("Registered callback: %s", callback)
    return unregister_callback
set_client_options
set_client_options(**kwargs: str) -> None

Update options in case they need to overriden in some cases.

Source code in ember_mug/mug.py
494
495
496
497
498
499
def set_client_options(self, **kwargs: str) -> None:
    """Update options in case they need to overriden in some cases."""
    if kwargs.get("adapter") and IS_LINUX is False:
        msg = "The adapter option is only valid for the Linux BlueZ Backend."
        raise ValueError(msg)
    self._client_kwargs = {**kwargs}
set_led_colour async
set_led_colour(colour: Colour) -> None

Set new target temp for mug.

Source code in ember_mug/mug.py
266
267
268
269
270
@require_attribute("led_colour")
async def set_led_colour(self, colour: Colour) -> None:
    """Set new target temp for mug."""
    await self._write(MugCharacteristic.LED, colour.as_bytearray())
    self.data.led_colour = colour
set_name async
set_name(name: str) -> None

Assign new name to mug.

Source code in ember_mug/mug.py
330
331
332
333
334
335
336
337
@require_attribute("name")
async def set_name(self, name: str) -> None:
    """Assign new name to mug."""
    if MUG_NAME_REGEX.match(name) is None:
        msg = "Name cannot contain any special characters and must be 16 characters or less"
        raise ValueError(msg)
    await self._write(MugCharacteristic.MUG_NAME, bytearray(name.encode("utf8")))
    self.data.name = name
set_target_temp async
set_target_temp(target_temp: float) -> None

Set new target temp for mug.

Source code in ember_mug/mug.py
277
278
279
280
281
282
283
284
285
286
287
288
289
async def set_target_temp(self, target_temp: float) -> None:
    """Set new target temp for mug."""
    unit = TemperatureUnit.CELSIUS if self.data.use_metric else TemperatureUnit.FAHRENHEIT
    min_temp, max_temp = MIN_MAX_TEMPS[unit]
    if target_temp != 0 and not (min_temp <= target_temp <= max_temp):
        raise ValueError(f"Temperature should be between {min_temp} and {max_temp} or 0.")

    if self.data.use_metric is False:
        target_temp = convert_temp_to_celsius(target_temp)

    target = bytearray(int(target_temp / 0.01).to_bytes(2, "little"))
    await self._write(MugCharacteristic.TARGET_TEMPERATURE, target)
    self.data.target_temp = target_temp
set_temperature_unit async
set_temperature_unit(
    unit: Literal["°C", "°F"] | TemperatureUnit | Enum
) -> None

Set mug unit.

Source code in ember_mug/mug.py
370
371
372
373
374
375
async def set_temperature_unit(self, unit: Literal["°C", "°F"] | TemperatureUnit | Enum) -> None:
    """Set mug unit."""
    text_unit = unit.value if isinstance(unit, Enum) else unit
    unit_bytes = bytearray([1 if text_unit == TemperatureUnit.FAHRENHEIT else 0])
    await self._write(MugCharacteristic.TEMPERATURE_UNIT, unit_bytes)
    self.data.temperature_unit = TemperatureUnit(unit)
set_udsk async
set_udsk(udsk: str) -> None

Attempt to write udsk.

Source code in ember_mug/mug.py
350
351
352
353
async def set_udsk(self, udsk: str) -> None:
    """Attempt to write udsk."""
    await self._write(MugCharacteristic.UDSK, bytearray(encode_byte_string(udsk)))
    self.data.udsk = udsk
set_volume_level async
set_volume_level(volume: int | VolumeLevel) -> None

Set volume_level on Travel Mug.

Source code in ember_mug/mug.py
308
309
310
311
312
313
314
315
316
@require_attribute("volume_level")
async def set_volume_level(self, volume: int | VolumeLevel) -> None:
    """Set volume_level on Travel Mug."""
    if not isinstance(volume, VolumeLevel) and isinstance(volume, int) and volume not in (0, 1, 2):
        msg = "Volume level value should be 0, 1, 2 or a VolumeLevel enum"
        raise ValueError(msg)
    volume_level = volume if isinstance(volume, VolumeLevel) else VolumeLevel.from_state(volume)
    await self._write(MugCharacteristic.VOLUME, bytearray([volume_level.state]))
    self.data.volume_level = volume_level
subscribe async
subscribe() -> None

Subscribe to notifications from the mug.

Source code in ember_mug/mug.py
484
485
486
487
488
489
490
491
492
async def subscribe(self) -> None:
    """Subscribe to notifications from the mug."""
    try:
        logger.info("Subscribe to Push Events")
        await self._client.start_notify(MugCharacteristic.PUSH_EVENT.uuid, self._notify_callback)
        if self.debug:
            await self._client.start_notify(MugCharacteristic.STATISTICS.uuid, self._notify_callback)
    except Exception as e:
        logger.warning("Failed to subscribe to state attr: %s", e)
unsubscribe async
unsubscribe() -> None

Unsubscribe from Mug notifications.

Source code in ember_mug/mug.py
474
475
476
477
478
479
480
481
482
async def unsubscribe(self) -> None:
    """Unsubscribe from Mug notifications."""
    logger.debug("Unsubscribe called")
    if not self._client:
        return
    with contextlib.suppress(BleakError):
        await self._client.stop_notify(MugCharacteristic.PUSH_EVENT.uuid)
        if self.debug:
            await self._client.stop_notify(MugCharacteristic.STATISTICS.uuid)
update_all async
update_all() -> list[Change]

Update all standard attributes.

Source code in ember_mug/mug.py
402
403
404
405
406
async def update_all(self) -> list[Change]:
    """Update all standard attributes."""
    return await self._update_multiple(
        self.data.model_info.device_attributes - INITIAL_ATTRS,
    )
update_initial async
update_initial() -> list[Change]

Update attributes that don't normally change and don't need to be regularly updated.

Source code in ember_mug/mug.py
398
399
400
async def update_initial(self) -> list[Change]:
    """Update attributes that don't normally change and don't need to be regularly updated."""
    return await self._update_multiple(INITIAL_ATTRS)
update_queued_attributes async
update_queued_attributes() -> list[Change]

Update all attributes in queue.

Source code in ember_mug/mug.py
418
419
420
421
422
423
424
425
426
427
428
429
async def update_queued_attributes(self) -> list[Change]:
    """Update all attributes in queue."""
    logger.debug("Updating queued attributes: %s", self._queued_updates)
    if not self._queued_updates:
        return []
    queued_updates = set(self._queued_updates)
    self._queued_updates.clear()
    await self._ensure_connection()
    changes = self.data.update_info(**{attr: await getattr(self, f"get_{attr}")() for attr in queued_updates})
    if changes:
        self._fire_callbacks()
    return changes

Functions

require_attribute

require_attribute(
    attr_name: str,
) -> Callable[
    [Callable[Concatenate[EmberMug, P], Awaitable[T]]],
    Callable[Concatenate[EmberMug, P], Awaitable[T]],
]

Require an attribute to be available on the device.

Source code in ember_mug/mug.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def require_attribute(
    attr_name: str,
) -> Callable[[Callable[Concatenate[EmberMug, P], Awaitable[T]]], Callable[Concatenate[EmberMug, P], Awaitable[T]]]:
    """Require an attribute to be available on the device."""

    def decorator(
        func: Callable[Concatenate[EmberMug, P], Awaitable[T]],
    ) -> Callable[Concatenate[EmberMug, P], Awaitable[T]]:
        """Inner decorator."""

        async def wrapper(self: EmberMug, *args: P.args, **kwargs: P.kwargs) -> T:
            if self.has_attribute(attr_name) is False:
                device_type = self.data.model_info.device_type.value
                raise NotImplementedError(
                    f"The {device_type} does not have the {attr_name} attribute",
                )
            return await func(self, *args, **kwargs)

        return wrapper

    return decorator