Skip to content

Commit 0f46707

Browse files
committed
Use OS socket API instead of Qt's socket API.
1 parent de9ca30 commit 0f46707

File tree

11 files changed

+887
-11
lines changed

11 files changed

+887
-11
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/* Abstract Client Socket
2+
*
3+
* From: https://github.com/PokemonAutomation/Arduino-Source
4+
*
5+
*/
6+
7+
#ifndef PokemonAutomation_Sockets_AbstractClientSocket_H
8+
#define PokemonAutomation_Sockets_AbstractClientSocket_H
9+
10+
#include <string>
11+
#include "Common/Compiler.h"
12+
#include "Common/Cpp/ListenerSet.h"
13+
14+
namespace PokemonAutomation{
15+
16+
17+
18+
class AbstractClientSocket{
19+
public:
20+
enum class State{
21+
NOT_RUNNING,
22+
CONNECTING,
23+
CONNECTED,
24+
DESTRUCTING,
25+
};
26+
27+
struct Listener{
28+
// Called at the start of the receiver thread. This can be used to set
29+
// the thread priority of the thread.
30+
virtual void on_thread_start(){}
31+
32+
// Called when a connection attempt finishes.
33+
// If successful, "error_message" is empty.
34+
// Otherwise, it contains the error message.
35+
virtual void on_connect_finished(const std::string& error_message){}
36+
37+
// Called when the socket receives data from the server.
38+
virtual void on_receive_data(const void* data, size_t bytes){}
39+
};
40+
41+
void add_listener(Listener& listener){
42+
m_listeners.add(listener);
43+
}
44+
void remove_listener(Listener& listener){
45+
m_listeners.add(listener);
46+
}
47+
48+
49+
public:
50+
AbstractClientSocket()
51+
: m_state(State::NOT_RUNNING)
52+
{}
53+
virtual ~AbstractClientSocket() = default;
54+
55+
State state() const{
56+
return m_state.load(std::memory_order_relaxed);
57+
}
58+
59+
virtual void close() noexcept = 0;
60+
virtual void connect(const std::string& address, uint16_t port) = 0;
61+
62+
virtual size_t blocking_send(const void* data, size_t bytes) = 0;
63+
64+
65+
protected:
66+
std::atomic<State> m_state;
67+
ListenerSet<Listener> m_listeners;
68+
};
69+
70+
71+
72+
73+
74+
}
75+
#endif
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/* Client Socket
2+
*
3+
* From: https://github.com/PokemonAutomation/Arduino-Source
4+
*
5+
*/
6+
7+
#include "ClientSocket.h"
8+
9+
namespace PokemonAutomation{
10+
11+
12+
13+
14+
}

Common/Cpp/Sockets/ClientSocket.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/* Client Socket
2+
*
3+
* From: https://github.com/PokemonAutomation/Arduino-Source
4+
*
5+
*/
6+
7+
#ifndef PokemonAutomation_ClientSocket_H
8+
#define PokemonAutomation_ClientSocket_H
9+
10+
11+
12+
#ifdef _WIN32
13+
#include "ClientSocket_WinSocket.h"
14+
namespace PokemonAutomation{
15+
using ClientSocket = ClientSocket_WinSocket;
16+
}
17+
#else
18+
#include "ClientSocket_POSIX.h"
19+
namespace PokemonAutomation{
20+
using ClientSocket = ClientSocket_POSIX;
21+
}
22+
#endif
23+
24+
25+
26+
#endif
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/* Client Socket (POSIX)
2+
*
3+
* From: https://github.com/PokemonAutomation/Arduino-Source
4+
*
5+
* This file is completely untested!
6+
*
7+
*/
8+
9+
#ifndef PokemonAutomation_ClientSocket_POSIX_H
10+
#define PokemonAutomation_ClientSocket_POSIX_H
11+
12+
#include <iostream>
13+
#include <mutex>
14+
#include <thread>
15+
#include <condition_variable>
16+
#include <sys/socket.h>
17+
#include <fcntl.h>
18+
#include <netinet/in.h>
19+
#include <arpa/inet.h>
20+
#include "AbstractClientSocket.h"
21+
22+
namespace PokemonAutomation{
23+
24+
25+
26+
class ClientSocket_POSIX final : public AbstractClientSocket{
27+
public:
28+
ClientSocket_POSIX()
29+
: m_socket(-1)
30+
{}
31+
32+
virtual ~ClientSocket_POSIX(){
33+
close();
34+
if (m_thread.joinable()){
35+
m_thread.join();
36+
}
37+
if (m_socket != -1){
38+
::close(m_socket);
39+
}
40+
}
41+
virtual void close() noexcept override{
42+
{
43+
std::lock_guard<std::mutex> lg1(m_lock);
44+
m_state.store(State::DESTRUCTING, std::memory_order_relaxed);
45+
m_cv.notify_all();
46+
}
47+
}
48+
49+
virtual void connect(const std::string& address, uint16_t port) override{
50+
std::lock_guard<std::mutex> lg1(m_lock);
51+
if (m_state.load(std::memory_order_relaxed) != State::NOT_RUNNING){
52+
return;
53+
}
54+
try{
55+
m_socket = socket(AF_INET, SOCK_STREAM, 0);
56+
fcntl(m_socket, F_SETFL, O_NONBLOCK);
57+
m_state.store(State::CONNECTING, std::memory_order_relaxed);
58+
m_thread = std::thread(
59+
&ClientSocket_POSIX::thread_loop,
60+
this, address, port
61+
);
62+
}catch (...){
63+
::close(m_socket);
64+
m_socket = -1;
65+
m_state.store(State::NOT_RUNNING, std::memory_order_relaxed);
66+
throw;
67+
}
68+
}
69+
70+
71+
virtual size_t blocking_send(const void* data, size_t bytes) override{
72+
std::unique_lock<std::mutex> lg(m_lock);
73+
constexpr int BLOCK_SIZE = (int)1 << 30;
74+
const char* ptr = (const char*)data;
75+
size_t sent = 0;
76+
if (m_socket == -1){
77+
return 0;
78+
}
79+
bool skip_wait = true;
80+
while (bytes > 0){
81+
if (!skip_wait){
82+
m_cv.wait_for(lg, std::chrono::milliseconds(1));
83+
}
84+
skip_wait = true;
85+
86+
size_t current = std::min<size_t>(bytes, BLOCK_SIZE);
87+
int current_sent = ::send(m_socket, ptr, (int)current, MSG_DONTWAIT);
88+
if (current_sent != -1){
89+
sent += current;
90+
if ((size_t)current_sent < current){
91+
return sent;
92+
}
93+
ptr += current;
94+
bytes -= current;
95+
continue;
96+
}
97+
98+
int error = errno;
99+
// cout << "error = " << error << endl;
100+
switch (error){
101+
case EAGAIN:
102+
// case EWOULDBLOCK:
103+
break;
104+
default:
105+
m_error = "POSIX Error Code: " + std::to_string(error);
106+
return sent;
107+
}
108+
}
109+
return sent;
110+
}
111+
112+
113+
private:
114+
void thread_loop(const std::string& address, uint16_t port){
115+
try{
116+
thread_loop_internal(address, port);
117+
}catch (...){
118+
try{
119+
std::cout << "ClientSocket_POSIX(): An exception was thrown from the receive thread." << std::endl;
120+
}catch (...){}
121+
}
122+
}
123+
124+
void thread_loop_internal(const std::string& address, uint16_t port){
125+
m_listeners.run_method_unique(&Listener::on_thread_start);
126+
127+
{
128+
std::unique_lock<std::mutex> lg(m_lock);
129+
130+
sockaddr_in server;
131+
server.sin_family = AF_INET;
132+
server.sin_port = htons(port);
133+
server.sin_addr.s_addr = inet_addr(address.c_str());
134+
135+
// Connect
136+
while (true){
137+
State state = m_state.load(std::memory_order_relaxed);
138+
if (state == State::DESTRUCTING){
139+
return;
140+
}
141+
142+
if (::connect(m_socket, (struct sockaddr*)&server, sizeof(server)) != -1){
143+
break;
144+
}
145+
146+
int error = errno;
147+
// cout << "error = " << error << endl;
148+
149+
switch (error){
150+
case EISCONN:
151+
goto Connected;
152+
case EAGAIN:
153+
case EALREADY:
154+
case EINPROGRESS:
155+
break;
156+
case ETIMEDOUT:
157+
m_state.store(State::NOT_RUNNING, std::memory_order_relaxed);
158+
m_error = "Connection timed out.";
159+
m_lock.unlock();
160+
m_listeners.run_method_unique(&Listener::on_connect_finished, m_error);
161+
return;
162+
default:
163+
m_state.store(State::NOT_RUNNING, std::memory_order_relaxed);
164+
m_error = "WSA Error Code: " + std::to_string(error);
165+
m_lock.unlock();
166+
m_listeners.run_method_unique(&Listener::on_connect_finished, m_error);
167+
return;
168+
}
169+
170+
m_cv.wait_for(lg, std::chrono::milliseconds(10));
171+
172+
}
173+
174+
Connected:
175+
m_state.store(State::CONNECTED, std::memory_order_relaxed);
176+
}
177+
178+
m_listeners.run_method_unique(&Listener::on_connect_finished, "");
179+
180+
181+
constexpr size_t BUFFER_SIZE = 4096;
182+
char buffer[BUFFER_SIZE];
183+
184+
while (true){
185+
int bytes;
186+
int error = 0;
187+
{
188+
std::unique_lock<std::mutex> lg(m_lock);
189+
State state = m_state.load(std::memory_order_relaxed);
190+
if (state == State::DESTRUCTING){
191+
return;
192+
}
193+
bytes = ::recv(m_socket, buffer, BUFFER_SIZE, 0);
194+
if (bytes < 0){
195+
error = errno;
196+
}
197+
}
198+
199+
if (bytes > 0){
200+
m_listeners.run_method_unique(&Listener::on_receive_data, buffer, bytes);
201+
}else if (bytes < 0){
202+
// cout << "error = " << error << endl;
203+
switch (error){
204+
case EAGAIN:
205+
// case EWOULDBLOCK:
206+
break;
207+
default:
208+
std::unique_lock<std::mutex> lg(m_lock);
209+
m_error = "POSIX Error Code: " + std::to_string(error);
210+
return;
211+
}
212+
}
213+
214+
std::unique_lock<std::mutex> lg(m_lock);
215+
m_cv.wait_for(lg, std::chrono::milliseconds(1));
216+
}
217+
218+
}
219+
220+
221+
private:
222+
int m_socket;
223+
224+
std::string m_error;
225+
226+
mutable std::mutex m_lock;
227+
std::condition_variable m_cv;
228+
std::thread m_thread;
229+
};
230+
231+
232+
}
233+
#endif

0 commit comments

Comments
 (0)