Skip to content

Commit 77f5090

Browse files
authored
[bugfix] batch trans on cuda with SM return 700 error (#434)
cuda trans batch api bug fix
1 parent fdc31df commit 77f5090

File tree

3 files changed

+177
-14
lines changed

3 files changed

+177
-14
lines changed

ucm/shared/test/case/trans/trans_test.cc

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,50 @@ TEST_F(UCTransUnitTest, CopyDataWithSM)
9292
ASSERT_EQ(*(size_t*)(((char*)hPtr2.get()) + size * i), i);
9393
}
9494
}
95+
96+
TEST_F(UCTransUnitTest, CopyDataBatchWithSM)
97+
{
98+
const auto ok = UC::Status::OK();
99+
constexpr int32_t deviceId = 0;
100+
constexpr size_t size = 36 * 1024;
101+
constexpr size_t number = 64 * 61;
102+
UC::Trans::Device device;
103+
ASSERT_EQ(device.Setup(deviceId), ok);
104+
auto stream = device.MakeSMStream();
105+
if (!stream) { return; }
106+
auto bDev = device.MakeBuffer();
107+
auto bHost1 = device.MakeBuffer();
108+
auto bHost2 = device.MakeBuffer();
109+
ASSERT_EQ(bDev->MakeDeviceBuffers(size, number), ok);
110+
ASSERT_EQ(bHost1->MakeHostBuffers(size, number), ok);
111+
ASSERT_EQ(bHost2->MakeHostBuffers(size, number), ok);
112+
std::vector<std::shared_ptr<void>> devPtrHolder, host1PtrHolder, host2PtrHolder;
113+
void *dPtrArr[number], *h1PtrArr[number], *h2PtrArr[number];
114+
for (size_t i = 0; i < number; i++) {
115+
auto d = bDev->GetDeviceBuffer(size);
116+
auto h1 = bHost1->GetHostBuffer(size);
117+
auto h2 = bHost2->GetHostBuffer(size);
118+
dPtrArr[i] = d.get();
119+
h1PtrArr[i] = h1.get();
120+
*(size_t*)h1PtrArr[i] = i;
121+
h2PtrArr[i] = h2.get();
122+
devPtrHolder.emplace_back(d);
123+
host1PtrHolder.emplace_back(h1);
124+
host2PtrHolder.emplace_back(h2);
125+
}
126+
constexpr const auto arrSize = sizeof(void*) * number;
127+
auto dPtrArrOnDev = bDev->MakeDeviceBuffer(arrSize);
128+
auto h1PtrArrOnDev = bHost1->MakeDeviceBuffer(arrSize);
129+
auto h2PtrArrOnDev = bHost2->MakeDeviceBuffer(arrSize);
130+
ASSERT_EQ(stream->HostToDeviceAsync((void*)dPtrArr, dPtrArrOnDev.get(), arrSize), ok);
131+
ASSERT_EQ(stream->HostToDeviceAsync((void*)h1PtrArr, h1PtrArrOnDev.get(), arrSize), ok);
132+
ASSERT_EQ(stream->HostToDeviceAsync((void*)h2PtrArr, h2PtrArrOnDev.get(), arrSize), ok);
133+
auto src = (void**)h1PtrArrOnDev.get();
134+
auto dst = (void**)dPtrArrOnDev.get();
135+
ASSERT_EQ(stream->HostToDeviceAsync(src, dst, size, number), ok);
136+
src = (void**)dPtrArrOnDev.get();
137+
dst = (void**)h2PtrArrOnDev.get();
138+
ASSERT_EQ(stream->DeviceToHostAsync(src, dst, size, number), ok);
139+
ASSERT_EQ(stream->Synchronized().Underlying(), ok.Underlying());
140+
for (size_t i = 0; i < number; i++) { ASSERT_EQ(*(size_t*)h2PtrArr[i], i); }
141+
}

ucm/shared/test/example/trans/trans_on_cuda_example.py

Lines changed: 126 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,42 @@ def wrapper(*args, **kwargs):
4343

4444

4545
def make_host_memory(size, number, dtype, fill=False):
46+
element_size = np.dtype(dtype).itemsize
47+
num_elements = size // element_size
4648
host = cupy.cuda.alloc_pinned_memory(size * number)
47-
host_np = np.frombuffer(host, dtype=dtype)
49+
host_np = np.frombuffer(host, dtype=dtype, count=num_elements)
4850
if fill:
4951
fixed_len = min(1024, number)
5052
host_np[:fixed_len] = np.arange(fixed_len, dtype=dtype)
5153
print("make:", host_np.shape, host_np.itemsize, host_np)
5254
return host
5355

5456

55-
def compare(host1, host2, dtype):
56-
host1_np = np.frombuffer(host1, dtype=dtype)
57-
host2_np = np.frombuffer(host2, dtype=dtype)
58-
print("compare[1]:", host1_np.shape, host1_np.itemsize, host1_np)
59-
print("compare[2]:", host2_np.shape, host2_np.itemsize, host2_np)
57+
def make_batch_host_memory(size, number, dtype, fill=False):
58+
element_size = np.dtype(dtype).itemsize
59+
num_elements = size // element_size
60+
host = []
61+
for i in range(number):
62+
pinned_mem = cupy.cuda.alloc_pinned_memory(size)
63+
np_array = np.frombuffer(pinned_mem, dtype=dtype, count=num_elements)
64+
if fill:
65+
value = np.uint64(1023 + i)
66+
np_array[0] = value
67+
np_array[-1] = value
68+
host.append(pinned_mem)
69+
if i == 0:
70+
print("make:", np_array.shape, np_array.itemsize, np_array)
71+
return host
72+
73+
74+
def compare(host1, host2, size, dtype, show_detail=True):
75+
element_size = np.dtype(dtype).itemsize
76+
num_elements = size // element_size
77+
host1_np = np.frombuffer(host1, dtype=dtype, count=num_elements)
78+
host2_np = np.frombuffer(host2, dtype=dtype, count=num_elements)
79+
if show_detail:
80+
print("compare[1]:", host1_np.shape, host1_np.itemsize, host1_np)
81+
print("compare[2]:", host2_np.shape, host2_np.itemsize, host2_np)
6082
return np.array_equal(host1_np, host2_np)
6183

6284

@@ -73,7 +95,7 @@ def trans_with_ce(d, size, number, dtype):
7395
cost = time.perf_counter() - tp
7496
print(f"cost: {cost}s")
7597
print(f"bandwidth: {size * number / cost / 1e9}GB/s")
76-
assert compare(host1, host2, dtype)
98+
assert compare(host1, host2, size, dtype)
7799

78100

79101
@test_wrap
@@ -91,7 +113,7 @@ def trans_with_sm(d, size, number, dtype):
91113
cost = time.perf_counter() - tp
92114
print(f"cost: {cost}s")
93115
print(f"bandwidth: {size * number / cost / 1e9}GB/s")
94-
assert compare(host1, host2, dtype)
116+
assert compare(host1, host2, size, dtype)
95117

96118

97119
@test_wrap
@@ -108,7 +130,7 @@ def trans_with_ce_async(d, size, number, dtype):
108130
cost = time.perf_counter() - tp
109131
print(f"cost: {cost}s")
110132
print(f"bandwidth: {size * number / cost / 1e9}GB/s")
111-
assert compare(host1, host2, dtype)
133+
assert compare(host1, host2, size, dtype)
112134

113135

114136
@test_wrap
@@ -127,7 +149,97 @@ def trans_with_sm_async(d, size, number, dtype):
127149
cost = time.perf_counter() - tp
128150
print(f"cost: {cost}s")
129151
print(f"bandwidth: {size * number / cost / 1e9}GB/s")
130-
assert compare(host1, host2, dtype)
152+
assert compare(host1, host2, size, dtype)
153+
154+
155+
@test_wrap
156+
def trans_batch_with_ce(d, size, number, dtype):
157+
s = d.MakeStream()
158+
host1 = make_batch_host_memory(size, number, dtype, True)
159+
host1_ptr = np.array([h.ptr for h in host1], dtype=np.uint64)
160+
device = [cupy.empty(size, dtype=np.uint8) for _ in range(number)]
161+
device_ptr = np.array([d.data.ptr for d in device], dtype=np.uint64)
162+
host2 = make_batch_host_memory(size, number, dtype)
163+
host2_ptr = np.array([h.ptr for h in host2], dtype=np.uint64)
164+
tp = time.perf_counter()
165+
s.HostToDeviceBatch(host1_ptr, device_ptr, size, number)
166+
s.DeviceToHostBatch(device_ptr, host2_ptr, size, number)
167+
cost = time.perf_counter() - tp
168+
print(f"cost: {cost}s")
169+
print(f"bandwidth: {size * number / cost / 1e9}GB/s")
170+
for h1, h2 in zip(host1, host2):
171+
assert compare(h1, h2, size, dtype, False)
172+
173+
174+
@test_wrap
175+
def trans_batch_with_sm(dev, size, number, dtype):
176+
s = dev.MakeSMStream()
177+
h1 = make_batch_host_memory(size, number, dtype, True)
178+
h1_ptr = np.array([h.ptr for h in h1], dtype=np.uint64)
179+
h1_ptr_cupy = cupy.empty(number, dtype=np.uint64)
180+
h1_ptr_cupy.set(h1_ptr)
181+
d = [cupy.empty(size, dtype=np.uint8) for _ in range(number)]
182+
d_ptr = np.array([d.data.ptr for d in d], dtype=np.uint64)
183+
d_ptr_cupy = cupy.empty(number, dtype=np.uint64)
184+
d_ptr_cupy.set(d_ptr)
185+
h2 = make_batch_host_memory(size, number, dtype)
186+
h2_ptr = np.array([h.ptr for h in h2], dtype=np.uint64)
187+
h2_ptr_cupy = cupy.empty(number, dtype=np.uint64)
188+
h2_ptr_cupy.set(h2_ptr)
189+
tp = time.perf_counter()
190+
s.HostToDeviceBatch(h1_ptr_cupy.data.ptr, d_ptr_cupy.data.ptr, size, number)
191+
s.DeviceToHostBatch(d_ptr_cupy.data.ptr, h2_ptr_cupy.data.ptr, size, number)
192+
cost = time.perf_counter() - tp
193+
print(f"cost: {cost}s")
194+
print(f"bandwidth: {size * number / cost / 1e9}GB/s")
195+
for x, y in zip(h1, h2):
196+
assert compare(x, y, size, dtype, False)
197+
198+
199+
@test_wrap
200+
def trans_batch_with_ce_async(d, size, number, dtype):
201+
s = d.MakeStream()
202+
host1 = make_batch_host_memory(size, number, dtype, True)
203+
host1_ptr = np.array([h.ptr for h in host1], dtype=np.uint64)
204+
device = [cupy.empty(size, dtype=np.uint8) for _ in range(number)]
205+
device_ptr = np.array([d.data.ptr for d in device], dtype=np.uint64)
206+
host2 = make_batch_host_memory(size, number, dtype)
207+
host2_ptr = np.array([h.ptr for h in host2], dtype=np.uint64)
208+
tp = time.perf_counter()
209+
s.HostToDeviceBatchAsync(host1_ptr, device_ptr, size, number)
210+
s.DeviceToHostBatchAsync(device_ptr, host2_ptr, size, number)
211+
s.Synchronized()
212+
cost = time.perf_counter() - tp
213+
print(f"cost: {cost}s")
214+
print(f"bandwidth: {size * number / cost / 1e9}GB/s")
215+
for h1, h2 in zip(host1, host2):
216+
assert compare(h1, h2, size, dtype, False)
217+
218+
219+
@test_wrap
220+
def trans_batch_with_sm_async(dev, size, number, dtype):
221+
s = dev.MakeSMStream()
222+
h1 = make_batch_host_memory(size, number, dtype, True)
223+
h1_ptr = np.array([h.ptr for h in h1], dtype=np.uint64)
224+
h1_ptr_cupy = cupy.empty(number, dtype=np.uint64)
225+
h1_ptr_cupy.set(h1_ptr)
226+
d = [cupy.empty(size, dtype=np.uint8) for _ in range(number)]
227+
d_ptr = np.array([d.data.ptr for d in d], dtype=np.uint64)
228+
d_ptr_cupy = cupy.empty(number, dtype=np.uint64)
229+
d_ptr_cupy.set(d_ptr)
230+
h2 = make_batch_host_memory(size, number, dtype)
231+
h2_ptr = np.array([h.ptr for h in h2], dtype=np.uint64)
232+
h2_ptr_cupy = cupy.empty(number, dtype=np.uint64)
233+
h2_ptr_cupy.set(h2_ptr)
234+
tp = time.perf_counter()
235+
s.HostToDeviceBatchAsync(h1_ptr_cupy.data.ptr, d_ptr_cupy.data.ptr, size, number)
236+
s.DeviceToHostBatchAsync(d_ptr_cupy.data.ptr, h2_ptr_cupy.data.ptr, size, number)
237+
s.Synchronized()
238+
cost = time.perf_counter() - tp
239+
print(f"cost: {cost}s")
240+
print(f"bandwidth: {size * number / cost / 1e9}GB/s")
241+
for x, y in zip(h1, h2):
242+
assert compare(x, y, size, dtype, False)
131243

132244

133245
def main():
@@ -143,6 +255,10 @@ def main():
143255
trans_with_sm(d, size, number, dtype)
144256
trans_with_ce_async(d, size, number, dtype)
145257
trans_with_sm_async(d, size, number, dtype)
258+
trans_batch_with_ce(d, size, number, dtype)
259+
trans_batch_with_sm(d, size, number, dtype)
260+
trans_batch_with_ce_async(d, size, number, dtype)
261+
trans_batch_with_sm_async(d, size, number, dtype)
146262

147263

148264
if __name__ == "__main__":

ucm/shared/trans/cuda/cuda_sm_kernel.cu

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ __global__ void CudaCopyKernel(const void* src, void** dst, size_t size, size_t
9494
cudaError_t CudaSMCopyAsync(void* src[], void* dst[], size_t size, size_t number,
9595
cudaStream_t stream)
9696
{
97-
CudaCopyKernel<<<CUDA_TRANS_BLOCK_NUMBER, CUDA_TRANS_BLOCK_SIZE, 0, stream>>>(src, dst, size,
98-
number);
97+
CudaCopyKernel<<<CUDA_TRANS_BLOCK_NUMBER, CUDA_TRANS_BLOCK_SIZE, 0, stream>>>(
98+
(const void**)src, dst, size, number);
9999
return cudaGetLastError();
100100
}
101101

@@ -108,8 +108,8 @@ cudaError_t CudaSMCopyAsync(void* src[], void* dst, size_t size, size_t number,
108108

109109
cudaError_t CudaSMCopyAsync(void* src, void* dst[], size_t size, size_t number, cudaStream_t stream)
110110
{
111-
CudaCopyKernel<<<CUDA_TRANS_BLOCK_NUMBER, CUDA_TRANS_BLOCK_SIZE, 0, stream>>>(src, dst, size,
112-
number);
111+
CudaCopyKernel<<<CUDA_TRANS_BLOCK_NUMBER, CUDA_TRANS_BLOCK_SIZE, 0, stream>>>(
112+
(const void*)src, dst, size, number);
113113
return cudaGetLastError();
114114
}
115115

0 commit comments

Comments
 (0)