|
| 1 | +import signal |
| 2 | +from contextlib import contextmanager |
1 | 3 | import usb.core |
2 | | -from functools import wraps |
| 4 | +import usb.util |
| 5 | +import time |
| 6 | + |
3 | 7 |
|
4 | 8 |
|
5 | 9 | ESC = 27 |
@@ -82,117 +86,174 @@ def set_print_speed(speed): |
82 | 86 | speed] |
83 | 87 | return byte_array |
84 | 88 |
|
| 89 | + |
| 90 | +def send_command_to_device(timeout=5000): |
| 91 | + """ decorator used to send the result of a command to the usb device""" |
| 92 | + def _send_command_to_device(func): |
| 93 | + def wrapper(*args, **kwargs): |
| 94 | + printer = args[0] |
| 95 | + byte_array = func(*args, **kwargs) |
| 96 | + printer.write_bytes(byte_array, timeout) |
| 97 | + return wrapper |
| 98 | + return _send_command_to_device |
| 99 | + |
| 100 | + |
| 101 | +class TimeoutException(Exception): pass |
| 102 | + |
| 103 | +@contextmanager |
| 104 | +def time_limit(seconds): |
| 105 | + def signal_handler(signum, frame): |
| 106 | + raise TimeoutException, "Timed out!" |
| 107 | + signal.signal(signal.SIGALRM, signal_handler) |
| 108 | + signal.alarm(seconds) |
| 109 | + try: |
| 110 | + yield |
| 111 | + finally: |
| 112 | + signal.alarm(0) |
| 113 | + |
| 114 | + |
| 115 | + |
85 | 116 | class EpsonPrinter: |
86 | 117 | """ An Epson thermal printer based on ESC/POS""" |
87 | 118 |
|
88 | 119 | printer = None |
89 | 120 |
|
90 | | - def __init__(self, id_vendor, id_product, out_ep=0x01): |
91 | | - """ |
92 | | - @param id_vendor : Vendor ID |
93 | | - @param id_product : Product ID |
94 | | - @param interface : USB device interface |
95 | | - @param in_ep : Input end point |
96 | | - @param out_ep : Output end point |
97 | | - """ |
| 121 | + def __init__(self, id_product): |
98 | 122 |
|
99 | | - self.out_ep = out_ep |
| 123 | + id_vendor = 0x04b8 |
100 | 124 |
|
101 | 125 | # Search device on USB tree and set is as printer |
102 | | - self.printer = usb.core.find(idVendor=id_vendor, idProduct=id_product) |
103 | | - if self.printer is None: |
| 126 | + self.device = usb.core.find(idVendor=id_vendor, idProduct=id_product) |
| 127 | + if self.device is None: |
104 | 128 | raise ValueError("Printer not found. Make sure the cable is plugged in.") |
105 | 129 |
|
106 | | - if self.printer.is_kernel_driver_active(0): |
| 130 | + if self.device.is_kernel_driver_active(0): |
107 | 131 | try: |
108 | | - self.printer.detach_kernel_driver(0) |
| 132 | + self.device.detach_kernel_driver(0) |
109 | 133 | except usb.core.USBError as e: |
110 | | - print("Could not detatch kernel driver: %s" % str(e)) |
| 134 | + print("Could not detach kernel driver: %s" % str(e)) |
111 | 135 |
|
112 | | - try: |
113 | | - self.printer.set_configuration() |
114 | | - self.printer.reset() |
115 | | - except usb.core.USBError as e: |
116 | | - print("Could not set configuration: %s" % str(e)) |
| 136 | + configuration = self.device.get_active_configuration() |
| 137 | + interface = configuration[(0, 0)] |
117 | 138 |
|
118 | | - def write_this(func): |
119 | | - """ |
120 | | - Decorator that writes the bytes to the wire |
121 | | - """ |
122 | | - @wraps(func) |
123 | | - def wrapper(self, *args, **kwargs): |
124 | | - byte_array = func(self, *args, **kwargs) |
125 | | - self.write_bytes(byte_array) |
126 | | - return wrapper |
| 139 | + def out_endpoint_match(ep): |
| 140 | + return usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT |
| 141 | + |
| 142 | + self.out_endpoint = usb.util.find_descriptor(interface, custom_match=out_endpoint_match) |
| 143 | + |
| 144 | + def in_endpoint_match(ep): |
| 145 | + return usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN |
127 | 146 |
|
128 | | - def write_bytes(self, byte_array): |
| 147 | + self.in_endpoint = usb.util.find_descriptor(interface, custom_match=in_endpoint_match) |
| 148 | + |
| 149 | + |
| 150 | + def write_bytes(self, byte_array, timeout=5000): |
129 | 151 | msg = ''.join([chr(b) for b in byte_array]) |
130 | | - self.write(msg) |
| 152 | + self.write(msg, timeout) |
131 | 153 |
|
132 | | - def write(self, msg): |
133 | | - self.printer.write(self.out_ep, msg, timeout=20000) |
| 154 | + def write(self, msg, timeout=5000): |
| 155 | + self.out_endpoint.write(msg, timeout=timeout) |
| 156 | + |
| 157 | + def read(self): |
| 158 | + try: |
| 159 | + return self.in_endpoint.read(self.in_endpoint.wMaxPacketSize) |
| 160 | + except usb.core.USBError as e: |
| 161 | + return None |
| 162 | + |
| 163 | + def blocking_read(self): |
| 164 | + while True: |
| 165 | + data = self.read() |
| 166 | + if len(data) > 0: |
| 167 | + return data[0] |
| 168 | + |
| 169 | + def flush_read(self): |
| 170 | + while True: |
| 171 | + data = self.read() |
| 172 | + if not data or len(data) == 0: |
| 173 | + break |
134 | 174 |
|
135 | 175 | def print_text(self, msg): |
136 | 176 | self.write(msg) |
137 | 177 |
|
138 | | - @write_this |
| 178 | + @send_command_to_device() |
139 | 179 | def linefeed(self, lines=1): |
140 | 180 | """Feed by the specified number of lines.""" |
141 | 181 | return linefeed(lines) |
142 | 182 |
|
143 | | - @write_this |
| 183 | + @send_command_to_device() |
144 | 184 | def cut(self): |
145 | 185 | """Full paper cut.""" |
146 | 186 | return FULL_PAPER_CUT |
147 | 187 |
|
148 | | - @write_this |
| 188 | + @send_command_to_device() |
149 | 189 | def underline_on(self, weight=1): |
150 | 190 | """ Activate underline |
151 | 191 | weight = 0 1-dot-width |
152 | 192 | weight = 1 2-dots-width |
153 | 193 | """ |
154 | 194 | return underline_on(weight) |
155 | 195 |
|
156 | | - @write_this |
| 196 | + @send_command_to_device() |
157 | 197 | def underline_off(self): |
158 | 198 | return UNDERLINE_OFF |
159 | 199 |
|
160 | | - @write_this |
| 200 | + @send_command_to_device() |
161 | 201 | def bold_on(self): |
162 | 202 | return BOLD_ON |
163 | 203 |
|
164 | | - @write_this |
| 204 | + @send_command_to_device() |
165 | 205 | def bold_off(self): |
166 | 206 | return BOLD_OFF |
167 | 207 |
|
168 | | - @write_this |
| 208 | + @send_command_to_device() |
169 | 209 | def set_line_spacing(self, dots): |
170 | 210 | """Set line spacing with a given number of dots. Default is 30.""" |
171 | 211 | return set_line_spacing(dots) |
172 | 212 |
|
173 | | - @write_this |
| 213 | + @send_command_to_device() |
174 | 214 | def set_default_line_spacing(self): |
175 | 215 | return DEFAULT_LINE_SPACING |
176 | 216 |
|
177 | | - @write_this |
| 217 | + @send_command_to_device() |
178 | 218 | def set_text_size(self, width_magnification, height_magnification): |
179 | 219 | """Set the text size. width_magnification and height_magnification can |
180 | 220 | be between 0(x1) and 7(x8). |
181 | 221 | """ |
182 | 222 | return set_text_size(width_magnification, height_magnification) |
183 | 223 |
|
184 | | - @write_this |
| 224 | + @send_command_to_device() |
185 | 225 | def center(self): |
186 | 226 | return CENTER |
187 | 227 |
|
188 | | - @write_this |
| 228 | + @send_command_to_device() |
189 | 229 | def left_justified(self): |
190 | 230 | return LEFT_JUSTIFIED |
191 | 231 |
|
192 | | - @write_this |
| 232 | + @send_command_to_device() |
193 | 233 | def right_justified(self): |
194 | 234 | return RIGHT_JUSTIFIED |
195 | 235 |
|
196 | | - @write_this |
| 236 | + @send_command_to_device() |
197 | 237 | def set_print_speed(self, speed): |
198 | 238 | return set_print_speed(speed) |
| 239 | + |
| 240 | + @send_command_to_device(timeout=1) |
| 241 | + def transmit_real_time_status(self, n): |
| 242 | + return [16, 4, n] |
| 243 | + |
| 244 | + def paper_present(self): |
| 245 | + self.flush_read() |
| 246 | + try: |
| 247 | + self.transmit_real_time_status(4) |
| 248 | + with time_limit(1): |
| 249 | + data = self.blocking_read() |
| 250 | + return data == 18 |
| 251 | + except: |
| 252 | + return False |
| 253 | + |
| 254 | + |
| 255 | + |
| 256 | + |
| 257 | + |
| 258 | + |
| 259 | + |
0 commit comments