Skip to content

Commit 095bf6c

Browse files
committed
+ Socket Listening API
1 parent f7fc1e8 commit 095bf6c

File tree

1 file changed

+228
-0
lines changed

1 file changed

+228
-0
lines changed

libuv_reactor.c

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1681,6 +1681,228 @@ static int libuv_exec(
16811681
}
16821682
/* }}} */
16831683

1684+
/////////////////////////////////////////////////////////////////////////////////
1685+
/// Socket Listening API
1686+
/////////////////////////////////////////////////////////////////////////////////
1687+
1688+
typedef struct {
1689+
zend_async_listen_event_t event;
1690+
uv_tcp_t uv_handle;
1691+
} async_listen_event_t;
1692+
1693+
/* {{{ on_connection_event */
1694+
static void on_connection_event(uv_stream_t *server, int status)
1695+
{
1696+
async_listen_event_t *listen_event = server->data;
1697+
zend_socket_t client_socket = INVALID_SOCKET;
1698+
zend_object *exception = NULL;
1699+
1700+
if (status < 0) {
1701+
exception = async_new_exception(
1702+
async_ce_input_output_exception, "Connection accept error: %s", uv_strerror(status)
1703+
);
1704+
} else {
1705+
uv_tcp_t client;
1706+
int result = uv_tcp_init(UVLOOP, &client);
1707+
1708+
if (result == 0) {
1709+
result = uv_accept(server, (uv_stream_t*)&client);
1710+
if (result == 0) {
1711+
uv_os_fd_t fd;
1712+
result = uv_fileno((uv_handle_t*)&client, &fd);
1713+
if (result == 0) {
1714+
client_socket = (zend_socket_t)fd;
1715+
}
1716+
}
1717+
}
1718+
1719+
if (result < 0) {
1720+
exception = async_new_exception(
1721+
async_ce_input_output_exception, "Failed to accept connection: %s", uv_strerror(result)
1722+
);
1723+
uv_close((uv_handle_t*)&client, NULL);
1724+
}
1725+
}
1726+
1727+
ZEND_ASYNC_CALLBACKS_NOTIFY(&listen_event->event.base, &client_socket, exception);
1728+
1729+
if (exception != NULL) {
1730+
zend_object_release(exception);
1731+
}
1732+
1733+
IF_EXCEPTION_STOP_REACTOR;
1734+
}
1735+
/* }}} */
1736+
1737+
/* {{{ libuv_listen_start */
1738+
static void libuv_listen_start(zend_async_event_t *event)
1739+
{
1740+
EVENT_START_PROLOGUE(event);
1741+
1742+
async_listen_event_t *listen_event = (async_listen_event_t *)(event);
1743+
1744+
const int error = uv_listen((uv_stream_t*)&listen_event->uv_handle, listen_event->event.backlog, on_connection_event);
1745+
1746+
if (error < 0) {
1747+
async_throw_error("Failed to start listening: %s", uv_strerror(error));
1748+
return;
1749+
}
1750+
1751+
event->loop_ref_count++;
1752+
ZEND_ASYNC_INCREASE_EVENT_COUNT;
1753+
}
1754+
/* }}} */
1755+
1756+
/* {{{ libuv_listen_stop */
1757+
static void libuv_listen_stop(zend_async_event_t *event)
1758+
{
1759+
EVENT_STOP_PROLOGUE(event);
1760+
1761+
// uv_listen doesn't have a stop function, we close the handle
1762+
event->loop_ref_count = 0;
1763+
ZEND_ASYNC_DECREASE_EVENT_COUNT;
1764+
}
1765+
/* }}} */
1766+
1767+
/* {{{ libuv_listen_dispose */
1768+
static void libuv_listen_dispose(zend_async_event_t *event)
1769+
{
1770+
if (ZEND_ASYNC_EVENT_REF(event) > 1) {
1771+
ZEND_ASYNC_EVENT_DEL_REF(event);
1772+
return;
1773+
}
1774+
1775+
if (event->loop_ref_count > 0) {
1776+
event->loop_ref_count = 1;
1777+
event->stop(event);
1778+
}
1779+
1780+
zend_async_callbacks_free(event);
1781+
1782+
async_listen_event_t *listen_event = (async_listen_event_t *)(event);
1783+
1784+
if (listen_event->event.host) {
1785+
efree((void*)listen_event->event.host);
1786+
listen_event->event.host = NULL;
1787+
}
1788+
1789+
uv_close((uv_handle_t *)&listen_event->uv_handle, libuv_close_handle_cb);
1790+
}
1791+
/* }}} */
1792+
1793+
/* {{{ libuv_listen_get_local_address */
1794+
static int libuv_listen_get_local_address(
1795+
zend_async_listen_event_t *listen_event,
1796+
char *host, size_t host_len,
1797+
int *port
1798+
)
1799+
{
1800+
struct sockaddr_storage addr;
1801+
int addr_len = sizeof(addr);
1802+
1803+
int result = uv_tcp_getsockname(&((async_listen_event_t*)listen_event)->uv_handle,
1804+
(struct sockaddr*)&addr, &addr_len);
1805+
1806+
if (result < 0) {
1807+
return result;
1808+
}
1809+
1810+
if (addr.ss_family == AF_INET) {
1811+
struct sockaddr_in *addr_in = (struct sockaddr_in*)&addr;
1812+
*port = ntohs(addr_in->sin_port);
1813+
if (host && host_len > 0) {
1814+
uv_ip4_name(addr_in, host, host_len);
1815+
}
1816+
} else if (addr.ss_family == AF_INET6) {
1817+
struct sockaddr_in6 *addr_in6 = (struct sockaddr_in6*)&addr;
1818+
*port = ntohs(addr_in6->sin6_port);
1819+
if (host && host_len > 0) {
1820+
uv_ip6_name(addr_in6, host, host_len);
1821+
}
1822+
} else {
1823+
return -1;
1824+
}
1825+
1826+
return 0;
1827+
}
1828+
/* }}} */
1829+
1830+
/* {{{ libuv_socket_listen */
1831+
zend_async_listen_event_t* libuv_socket_listen(const char *host, int port, int backlog, size_t extra_size)
1832+
{
1833+
START_REACTOR_OR_RETURN_NULL;
1834+
1835+
async_listen_event_t *listen_event = pecalloc(1, extra_size != 0 ?
1836+
sizeof(async_listen_event_t) + extra_size :
1837+
sizeof(async_listen_event_t), 0);
1838+
1839+
int error = uv_tcp_init(UVLOOP, &listen_event->uv_handle);
1840+
if (error < 0) {
1841+
async_throw_error("Failed to initialize TCP handle: %s", uv_strerror(error));
1842+
pefree(listen_event, 0);
1843+
return NULL;
1844+
}
1845+
1846+
// Set socket options
1847+
uv_tcp_nodelay(&listen_event->uv_handle, 1);
1848+
uv_tcp_simultaneous_accepts(&listen_event->uv_handle, 1);
1849+
1850+
// Bind to address
1851+
struct sockaddr_storage addr;
1852+
if (strchr(host, ':') != NULL) {
1853+
// IPv6
1854+
error = uv_ip6_addr(host, port, (struct sockaddr_in6*)&addr);
1855+
} else {
1856+
// IPv4
1857+
error = uv_ip4_addr(host, port, (struct sockaddr_in*)&addr);
1858+
}
1859+
1860+
if (error < 0) {
1861+
async_throw_error("Failed to parse address %s:%d: %s", host, port, uv_strerror(error));
1862+
uv_close((uv_handle_t*)&listen_event->uv_handle, libuv_close_handle_cb);
1863+
pefree(listen_event, 0);
1864+
return NULL;
1865+
}
1866+
1867+
error = uv_tcp_bind(&listen_event->uv_handle, (struct sockaddr*)&addr, 0);
1868+
if (error < 0) {
1869+
async_throw_error("Failed to bind to %s:%d: %s", host, port, uv_strerror(error));
1870+
uv_close((uv_handle_t*)&listen_event->uv_handle, libuv_close_handle_cb);
1871+
pefree(listen_event, 0);
1872+
return NULL;
1873+
}
1874+
1875+
// Get actual socket fd
1876+
uv_os_fd_t fd;
1877+
error = uv_fileno((uv_handle_t*)&listen_event->uv_handle, &fd);
1878+
if (error < 0) {
1879+
async_throw_error("Failed to get socket descriptor: %s", uv_strerror(error));
1880+
uv_close((uv_handle_t*)&listen_event->uv_handle, libuv_close_handle_cb);
1881+
pefree(listen_event, 0);
1882+
return NULL;
1883+
}
1884+
1885+
// Link the handle to the loop
1886+
listen_event->uv_handle.data = listen_event;
1887+
listen_event->event.host = estrdup(host);
1888+
listen_event->event.port = port;
1889+
listen_event->event.backlog = backlog;
1890+
listen_event->event.socket_fd = (zend_socket_t)fd;
1891+
listen_event->event.base.extra_offset = sizeof(async_listen_event_t);
1892+
listen_event->event.base.ref_count = 1;
1893+
1894+
// Initialize the event methods
1895+
listen_event->event.base.add_callback = libuv_add_callback;
1896+
listen_event->event.base.del_callback = libuv_remove_callback;
1897+
listen_event->event.base.start = libuv_listen_start;
1898+
listen_event->event.base.stop = libuv_listen_stop;
1899+
listen_event->event.base.dispose = libuv_listen_dispose;
1900+
listen_event->event.get_local_address = libuv_listen_get_local_address;
1901+
1902+
return &listen_event->event;
1903+
}
1904+
/* }}} */
1905+
16841906
void async_libuv_reactor_register(void)
16851907
{
16861908
zend_async_reactor_register(
@@ -1703,4 +1925,10 @@ void async_libuv_reactor_register(void)
17031925
libuv_new_exec_event,
17041926
libuv_exec
17051927
);
1928+
1929+
zend_async_socket_listening_register(
1930+
LIBUV_REACTOR_NAME,
1931+
false,
1932+
libuv_socket_listen
1933+
);
17061934
}

0 commit comments

Comments
 (0)