88import time
99
1010import progressbar
11+ # This actually imports progressbar2 but `import progressbar2' itself doesn't work.
12+ # In case of problems with the progressbar/progressbar2, check that you have the
13+ # progressbar2 installed and the path to it or venv is specified.
14+
1115import psycopg2 .extensions
1216
1317import common
@@ -22,7 +26,10 @@ def setup_tpcds(config):
2226 try :
2327 conn = psycopg2 .connect (** config )
2428 cur = conn .cursor ()
29+ except Exception as e :
30+ raise DataLoadException ('Load failed: %s' % e )
2531
32+ try :
2633 # Create pg_query_state extension
2734 cur .execute ('CREATE EXTENSION IF NOT EXISTS pg_query_state' )
2835
@@ -55,13 +62,13 @@ def run_tpcds(config):
5562 TPC_DS_STATEMENT_TIMEOUT = 20000 # statement_timeout in ms
5663
5764 print ('Preparing TPC-DS queries...' )
65+ err_count = 0
5866 queries = []
5967 for query_file in sorted (os .listdir ('tmp_stress/tpcds-result-reproduction/query_qualification/' )):
6068 with open ('tmp_stress/tpcds-result-reproduction/query_qualification/%s' % query_file , 'r' ) as f :
6169 queries .append (f .read ())
6270
6371 acon , = common .n_async_connect (config )
64- pid = acon .get_backend_pid ()
6572
6673 print ('Starting TPC-DS queries...' )
6774 timeout_list = []
@@ -84,8 +91,25 @@ def run_tpcds(config):
8491 PG_QS_DELAY , BEFORE_GETTING_QS_DELAY = 0.1 , 0.1
8592 BEFORE_GETTING_QS , GETTING_QS = range (2 )
8693 state , n_first_getting_qs_retries = BEFORE_GETTING_QS , 0
94+
95+ pg_qs_args = {
96+ 'config' : config ,
97+ 'pid' : acon .get_backend_pid ()
98+ }
99+
87100 while True :
88- result , notices = common .pg_query_state (config , pid )
101+ try :
102+ result , notices = common .pg_query_state (** pg_qs_args )
103+ except Exception as e :
104+ # do not consider the test failed if the "error in message
105+ # queue data transmitting" is received, this may happen with
106+ # some small probability, but if it happens too often it is
107+ # a problem, we will handle this case after the loop
108+ if "error in message queue data transmitting" in e .pgerror :
109+ err_count += 1
110+ else :
111+ raise e
112+
89113 # run state machine to determine the first getting of query state
90114 # and query finishing
91115 if state == BEFORE_GETTING_QS :
@@ -109,6 +133,12 @@ def run_tpcds(config):
109133 except psycopg2 .extensions .QueryCanceledError :
110134 timeout_list .append (i + 1 )
111135
136+ if err_count > 2 :
137+ print ("\n ERROR: error in message queue data transmitting" )
138+ raise Exception ('error was received %d times' % err_count )
139+ elif err_count > 0 :
140+ print (err_count , " times there was error in message queue data transmitting" )
141+
112142 common .n_close ((acon ,))
113143
114144 if len (timeout_list ) > 0 :
0 commit comments