Skip to content
Merged
220 changes: 216 additions & 4 deletions comtypes/test/test_stream.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,43 @@
import contextlib
import ctypes
import struct
import unittest as ut
from ctypes import HRESULT, POINTER, OleDLL, byref, c_ubyte, c_ulonglong, pointer
from ctypes.wintypes import BOOL, HGLOBAL, ULARGE_INTEGER
from collections.abc import Iterator
from ctypes import (
HRESULT,
POINTER,
OleDLL,
WinDLL,
byref,
c_size_t,
c_ubyte,
c_ulonglong,
pointer,
)
from ctypes.wintypes import (
BOOL,
HDC,
HGLOBAL,
HWND,
INT,
LONG,
LPVOID,
UINT,
ULARGE_INTEGER,
)
from typing import Optional

import comtypes.client
from comtypes import hresult

comtypes.client.GetModule("portabledeviceapi.dll")
# The stdole module is generated automatically during the portabledeviceapi
# module generation.
import comtypes.gen.stdole as stdole
from comtypes.gen.PortableDeviceApiLib import IStream

SIZE_T = c_size_t

STATFLAG_DEFAULT = 0
STGC_DEFAULT = 0
STGTY_STREAM = 2
Expand All @@ -27,10 +58,12 @@
_IStream_Size.restype = HRESULT


def _create_stream() -> IStream:
def _create_stream(
handle: Optional[int] = None, delete_on_release: bool = True
) -> IStream:
# Create an IStream
stream = POINTER(IStream)() # type: ignore
_CreateStreamOnHGlobal(None, True, byref(stream))
_CreateStreamOnHGlobal(handle, delete_on_release, byref(stream))
return stream # type: ignore


Expand Down Expand Up @@ -158,5 +191,184 @@ def test_Clone(self):
self.assertEqual(bytearray(buf)[0:read], test_data)


_user32 = WinDLL("user32")

_GetDC = _user32.GetDC
_GetDC.argtypes = (HWND,)
_GetDC.restype = HDC

_ReleaseDC = _user32.ReleaseDC
_ReleaseDC.argtypes = (HWND, HDC)
_ReleaseDC.restype = INT

_gdi32 = WinDLL("gdi32")

_GetDeviceCaps = _gdi32.GetDeviceCaps
_GetDeviceCaps.argtypes = (HDC, INT)
_GetDeviceCaps.restype = INT

_kernel32 = WinDLL("kernel32")

_GlobalAlloc = _kernel32.GlobalAlloc
_GlobalAlloc.argtypes = (UINT, SIZE_T)
_GlobalAlloc.restype = HGLOBAL

_GlobalFree = _kernel32.GlobalFree
_GlobalFree.argtypes = (HGLOBAL,)
_GlobalFree.restype = HGLOBAL

_GlobalLock = _kernel32.GlobalLock
_GlobalLock.argtypes = (HGLOBAL,)
_GlobalLock.restype = LPVOID

_GlobalUnlock = _kernel32.GlobalUnlock
_GlobalUnlock.argtypes = (HGLOBAL,)
_GlobalUnlock.restype = BOOL

_oleaut32 = WinDLL("oleaut32")

_OleLoadPicture = _oleaut32.OleLoadPicture
_OleLoadPicture.argtypes = (
POINTER(IStream), # lpstm
LONG, # lSize
BOOL, # fSave
POINTER(comtypes.GUID), # riid
POINTER(POINTER(comtypes.IUnknown)), # ppvObj
)
_OleLoadPicture.restype = HRESULT

# Constants for the type of a picture object
PICTYPE_BITMAP = 1

# Constants for GetDeviceCaps
LOGPIXELSX = 88 # Logical pixels/inch in X
LOGPIXELSY = 90 # Logical pixels/inch in Y

METERS_PER_INCH = 0.0254

GMEM_FIXED = 0x0000
GMEM_ZEROINIT = 0x0040

BI_RGB = 0 # No compression


@contextlib.contextmanager
def get_dc(hwnd: int) -> Iterator[int]:
"""Context manager to get and release a device context (DC)."""
dc = _GetDC(hwnd)
assert dc, "Failed to get device context."
try:
yield dc
finally:
# Release the device context
_ReleaseDC(hwnd, dc)


@contextlib.contextmanager
def global_alloc(uflags: int, dwbytes: int) -> Iterator[int]:
"""Context manager to allocate and free a global memory handle."""
handle = _GlobalAlloc(uflags, dwbytes)
assert handle, "Failed to GlobalAlloc"
try:
yield handle
finally:
_GlobalFree(handle)


@contextlib.contextmanager
def global_lock(handle: int) -> Iterator[int]:
"""Context manager to lock a global memory handle and obtain a pointer."""
lp_mem = _GlobalLock(handle)
assert lp_mem, "Failed to GlobalLock"
try:
yield lp_mem
finally:
_GlobalUnlock(handle)


def get_screen_dpi() -> tuple[int, int]:
"""Gets the screen DPI using GDI functions."""
# Get a handle to the desktop window's device context
with get_dc(0) as dc:
# Get the horizontal and vertical DPI
dpi_x = _GetDeviceCaps(dc, LOGPIXELSX)
dpi_y = _GetDeviceCaps(dc, LOGPIXELSY)
return dpi_x, dpi_y


def create_pixel_data(
red: int,
green: int,
blue: int,
dpi_x: int,
dpi_y: int,
width: int,
height: int,
) -> bytes:
# Generates width x height pixel 32-bit BGRA BMP binary data.
SIZEOF_BITMAPFILEHEADER = 14
SIZEOF_BITMAPINFOHEADER = 40
pixel_data = b""
for _ in range(height):
# Each row is padded to a 4-byte boundary. For 32bpp, no padding is needed.
for _ in range(width):
# B, G, R, Alpha (fully opaque)
pixel_data += struct.pack(b"BBBB", blue, green, red, 0xFF)
BITMAP_DATA_OFFSET = SIZEOF_BITMAPFILEHEADER + SIZEOF_BITMAPINFOHEADER
file_size = BITMAP_DATA_OFFSET + len(pixel_data)
bmp_header = struct.pack(
b"<2sIHHI",
b"BM", # File type signature "BM"
file_size, # Total file size
0, # Reserved1
0, # Reserved2
BITMAP_DATA_OFFSET, # Offset to pixel data
)
# Calculate pixels_per_meter based on the provided DPI
pixels_per_meter_x = int(dpi_x / METERS_PER_INCH)
pixels_per_meter_y = int(dpi_y / METERS_PER_INCH)
info_header = struct.pack(
b"<IiiHHIIiiII",
SIZEOF_BITMAPINFOHEADER, # Size of BITMAPINFOHEADER
width, # Image width
height, # Image height
1, # Planes
32, # Bits per pixel (for BGRA)
BI_RGB, # Compression
len(pixel_data), # Size of image data
pixels_per_meter_x, # X pixels per meter
pixels_per_meter_y, # Y pixels per meter
0, # Colors used
0, # Colors important
)
return bmp_header + info_header + pixel_data


class Test_Picture(ut.TestCase):
def test_ole_load_picture(self):
dpi_x, dpi_y = get_screen_dpi()
data = create_pixel_data(255, 0, 0, dpi_x, dpi_y, 1, 1)
# Allocate global memory with `GMEM_FIXED` (fixed-size) and
# `GMEM_ZEROINIT` (initialize to zero) and copy BMP data.
with global_alloc(GMEM_FIXED | GMEM_ZEROINIT, len(data)) as handle:
with global_lock(handle) as lp_mem:
ctypes.memmove(lp_mem, data, len(data))
pstm = _create_stream(handle, delete_on_release=False)
# Load picture from the stream
pic: stdole.IPicture = POINTER(stdole.IPicture)() # type: ignore
hr = _OleLoadPicture(
pstm,
len(data), # lSize
False, # fSave
byref(stdole.IPicture._iid_),
byref(pic),
)
self.assertEqual(hr, hresult.S_OK)
self.assertEqual(pic.Type, PICTYPE_BITMAP)
pstm.RemoteSeek(0, STREAM_SEEK_SET)
buf, read = pstm.RemoteRead(len(data))
self.assertEqual(bytes(buf)[:read], data)


if __name__ == "__main__":
ut.main()