Skip to content

Commit 63b3379

Browse files
committed
refactor for transaction
1 parent 570c681 commit 63b3379

File tree

1 file changed

+85
-58
lines changed

1 file changed

+85
-58
lines changed

conn.go

Lines changed: 85 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -146,84 +146,111 @@ func (conn *redshiftDataConn) executeStatement(ctx context.Context, params *reds
146146
}
147147
queryStart := time.Now()
148148
debugLogger.Printf("[%s] sucess execute statement: %s", *executeOutput.Id, coalesce(params.Sql))
149-
describeOutput, err := conn.client.DescribeStatement(ectx, &redshiftdata.DescribeStatementInput{
149+
describeOutput, err := conn.waitWithCancel(ctx, executeOutput.Id, queryStart)
150+
if err != nil {
151+
return nil, nil, err
152+
}
153+
if describeOutput.Status == types.StatusStringAborted {
154+
return nil, nil, fmt.Errorf("query aborted: %s", *describeOutput.Error)
155+
}
156+
if describeOutput.Status == types.StatusStringFailed {
157+
return nil, nil, fmt.Errorf("query failed: %s", *describeOutput.Error)
158+
}
159+
if describeOutput.Status != types.StatusStringFinished {
160+
return nil, nil, fmt.Errorf("query status is not finished: %s", describeOutput.Status)
161+
}
162+
debugLogger.Printf("[%s] success query: elapsed_time=%s", *executeOutput.Id, time.Since(queryStart))
163+
if !*describeOutput.HasResultSet {
164+
return nil, describeOutput, nil
165+
}
166+
debugLogger.Printf("[%s] query has result set: result_rows=%d", *executeOutput.Id, describeOutput.ResultRows)
167+
p := redshiftdata.NewGetStatementResultPaginator(conn.client, &redshiftdata.GetStatementResultInput{
150168
Id: executeOutput.Id,
151169
})
170+
return p, describeOutput, nil
171+
}
172+
173+
func isFinishedStatus(status types.StatusString) bool {
174+
return status == types.StatusStringFinished || status == types.StatusStringFailed || status == types.StatusStringAborted
175+
}
176+
177+
func (conn *redshiftDataConn) wait(ctx context.Context, id *string, queryStart time.Time) (*redshiftdata.DescribeStatementOutput, error) {
178+
timeout := conn.cfg.Timeout
179+
if timeout == 0 {
180+
timeout = 15 * time.Minute
181+
}
182+
polling := conn.cfg.Polling
183+
if polling == 0 {
184+
polling = 10 * time.Millisecond
185+
}
186+
ectx, cancel := context.WithTimeout(ctx, timeout)
187+
defer cancel()
188+
debugLogger.Printf("[%s] wating finsih query: elapsed_time=%s", *id, time.Since(queryStart))
189+
describeOutput, err := conn.client.DescribeStatement(ctx, &redshiftdata.DescribeStatementInput{
190+
Id: id,
191+
})
152192
if err != nil {
153-
return nil, nil, fmt.Errorf("describe statement:%w", err)
154-
}
155-
debugLogger.Printf("[%s] describe statement: status=%s pid=%d query_id=%d", *executeOutput.Id, describeOutput.Status, describeOutput.RedshiftPid, describeOutput.RedshiftQueryId)
156-
157-
var isFinished bool
158-
defer func() {
159-
if !isFinished {
160-
describeOutput, err := conn.client.DescribeStatement(ctx, &redshiftdata.DescribeStatementInput{
161-
Id: executeOutput.Id,
162-
})
163-
if err != nil {
164-
errLogger.Printf("[%s] failed describe statement: %v", *executeOutput.Id, err)
165-
return
166-
}
167-
if describeOutput.Status == types.StatusStringFinished ||
168-
describeOutput.Status == types.StatusStringFailed ||
169-
describeOutput.Status == types.StatusStringAborted {
170-
return
171-
}
172-
debugLogger.Printf("[%s] try cancel statement", *executeOutput.Id)
173-
output, err := conn.client.CancelStatement(ctx, &redshiftdata.CancelStatementInput{
174-
Id: executeOutput.Id,
175-
})
176-
if err != nil {
177-
178-
errLogger.Printf("[%s] failed cancel statement: %v", *executeOutput.Id, err)
179-
return
180-
}
181-
if !*output.Status {
182-
debugLogger.Printf("[%s] cancel statement status is false", *executeOutput.Id)
183-
}
184-
}
185-
}()
186-
delay := time.NewTimer(conn.cfg.Polling)
193+
return nil, fmt.Errorf("describe statement:%w", err)
194+
}
195+
debugLogger.Printf("[%s] describe statement: status=%s pid=%d query_id=%d", *id, describeOutput.Status, describeOutput.RedshiftPid, describeOutput.RedshiftQueryId)
196+
if isFinishedStatus(describeOutput.Status) {
197+
return describeOutput, nil
198+
}
199+
delay := time.NewTimer(polling)
187200
for {
188-
if describeOutput.Status == types.StatusStringAborted {
189-
return nil, nil, fmt.Errorf("query aborted: %s", *describeOutput.Error)
190-
}
191-
if describeOutput.Status == types.StatusStringFailed {
192-
return nil, nil, fmt.Errorf("query failed: %s", *describeOutput.Error)
193-
}
194-
if describeOutput.Status == types.StatusStringFinished {
195-
break
196-
}
197-
debugLogger.Printf("[%s] wating finsih query: elapsed_time=%s", *executeOutput.Id, time.Since(queryStart))
198-
delay.Reset(conn.cfg.Polling)
199201
select {
200202
case <-ectx.Done():
201203
if !delay.Stop() {
202204
<-delay.C
203205
}
204-
return nil, nil, ectx.Err()
206+
return nil, ectx.Err()
205207
case <-delay.C:
206208
case <-conn.aliveCh:
207209
if !delay.Stop() {
208210
<-delay.C
209211
}
210-
return nil, nil, ErrConnClosed
212+
return nil, ErrConnClosed
211213
}
214+
debugLogger.Printf("[%s] wating finsih query: elapsed_time=%s", *id, time.Since(queryStart))
212215
describeOutput, err = conn.client.DescribeStatement(ctx, &redshiftdata.DescribeStatementInput{
213-
Id: executeOutput.Id,
216+
Id: id,
214217
})
215218
if err != nil {
216-
return nil, nil, fmt.Errorf("describe statement:%w", err)
219+
return nil, fmt.Errorf("describe statement:%w", err)
220+
}
221+
if isFinishedStatus(describeOutput.Status) {
222+
return describeOutput, nil
217223
}
224+
delay.Reset(polling)
218225
}
219-
isFinished = true
220-
debugLogger.Printf("[%s] success query: elapsed_time=%s", *executeOutput.Id, time.Since(queryStart))
221-
if !*describeOutput.HasResultSet {
222-
return nil, describeOutput, nil
226+
}
227+
228+
func (conn *redshiftDataConn) waitWithCancel(ctx context.Context, id *string, queryStart time.Time) (*redshiftdata.DescribeStatementOutput, error) {
229+
desc, err := conn.wait(ctx, id, queryStart)
230+
cctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
231+
defer cancel()
232+
if desc == nil {
233+
var rErr error
234+
desc, rErr = conn.client.DescribeStatement(cctx, &redshiftdata.DescribeStatementInput{
235+
Id: id,
236+
})
237+
if rErr != nil {
238+
return nil, err
239+
}
223240
}
224-
debugLogger.Printf("[%s] query has result set: result_rows=%d", *executeOutput.Id, describeOutput.ResultRows)
225-
p := redshiftdata.NewGetStatementResultPaginator(conn.client, &redshiftdata.GetStatementResultInput{
226-
Id: executeOutput.Id,
241+
if isFinishedStatus(desc.Status) {
242+
return desc, err
243+
}
244+
debugLogger.Printf("[%s] try cancel statement", *id)
245+
output, cErr := conn.client.CancelStatement(cctx, &redshiftdata.CancelStatementInput{
246+
Id: id,
227247
})
228-
return p, describeOutput, nil
248+
if cErr != nil {
249+
errLogger.Printf("[%s] failed cancel statement: %v", *id, err)
250+
return desc, err
251+
}
252+
if !*output.Status {
253+
debugLogger.Printf("[%s] cancel statement status is false", *id)
254+
}
255+
return desc, err
229256
}

0 commit comments

Comments
 (0)