|
1 | | -use std::any::Any; |
| 1 | +use std::sync::atomic::{AtomicBool, Ordering}; |
2 | 2 |
|
3 | 3 | use crate::job::StackJob; |
4 | 4 | use crate::latch::SpinLatch; |
5 | | -use crate::registry::{self, WorkerThread}; |
6 | | -use crate::tlv::{self, Tlv}; |
7 | | -use crate::{FnContext, unwind}; |
| 5 | +use crate::{FnContext, registry, tlv, unwind}; |
8 | 6 |
|
9 | 7 | #[cfg(test)] |
10 | 8 | mod tests; |
@@ -134,68 +132,38 @@ where |
134 | 132 | // Create virtual wrapper for task b; this all has to be |
135 | 133 | // done here so that the stack frame can keep it all live |
136 | 134 | // long enough. |
137 | | - let job_b = StackJob::new(tlv, call_b(oper_b), SpinLatch::new(worker_thread)); |
| 135 | + let job_b_started = AtomicBool::new(false); |
| 136 | + let job_b = StackJob::new( |
| 137 | + tlv, |
| 138 | + |migrated| { |
| 139 | + job_b_started.store(true, Ordering::Relaxed); |
| 140 | + call_b(oper_b)(migrated) |
| 141 | + }, |
| 142 | + SpinLatch::new(worker_thread), |
| 143 | + ); |
138 | 144 | let job_b_ref = job_b.as_job_ref(); |
139 | 145 | let job_b_id = job_b_ref.id(); |
140 | 146 | worker_thread.push(job_b_ref); |
141 | 147 |
|
142 | 148 | // Execute task a; hopefully b gets stolen in the meantime. |
143 | 149 | let status_a = unwind::halt_unwinding(call_a(oper_a, injected)); |
144 | | - let result_a = match status_a { |
145 | | - Ok(v) => v, |
146 | | - Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err, tlv), |
147 | | - }; |
148 | | - |
149 | | - // Now that task A has finished, try to pop job B from the |
150 | | - // local stack. It may already have been popped by job A; it |
151 | | - // may also have been stolen. There may also be some tasks |
152 | | - // pushed on top of it in the stack, and we will have to pop |
153 | | - // those off to get to it. |
154 | | - while !job_b.latch.probe() { |
155 | | - if let Some(job) = worker_thread.take_local_job() { |
156 | | - if job_b_id == job.id() { |
157 | | - // Found it! Let's run it. |
158 | | - // |
159 | | - // Note that this could panic, but it's ok if we unwind here. |
160 | | - |
161 | | - // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. |
162 | | - tlv::set(tlv); |
163 | | - |
164 | | - let result_b = job_b.run_inline(injected); |
165 | | - return (result_a, result_b); |
166 | | - } else { |
167 | | - worker_thread.execute(job); |
168 | | - } |
169 | | - } else { |
170 | | - // Local deque is empty. Time to steal from other |
171 | | - // threads. |
172 | | - worker_thread.wait_until(&job_b.latch); |
173 | | - debug_assert!(job_b.latch.probe()); |
174 | | - break; |
175 | | - } |
176 | | - } |
| 150 | + worker_thread.wait_for_jobs::<_, false>( |
| 151 | + &job_b.latch, |
| 152 | + || job_b_started.load(Ordering::Relaxed), |
| 153 | + |job| job.id() == job_b_id, |
| 154 | + |job| { |
| 155 | + debug_assert_eq!(job.id(), job_b_id); |
| 156 | + job_b.run_inline(injected); |
| 157 | + }, |
| 158 | + ); |
177 | 159 |
|
178 | 160 | // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. |
179 | 161 | tlv::set(tlv); |
180 | 162 |
|
| 163 | + let result_a = match status_a { |
| 164 | + Ok(v) => v, |
| 165 | + Err(err) => unwind::resume_unwinding(err), |
| 166 | + }; |
181 | 167 | (result_a, job_b.into_result()) |
182 | 168 | }) |
183 | 169 | } |
184 | | - |
185 | | -/// If job A panics, we still cannot return until we are sure that job |
186 | | -/// B is complete. This is because it may contain references into the |
187 | | -/// enclosing stack frame(s). |
188 | | -#[cold] // cold path |
189 | | -unsafe fn join_recover_from_panic( |
190 | | - worker_thread: &WorkerThread, |
191 | | - job_b_latch: &SpinLatch<'_>, |
192 | | - err: Box<dyn Any + Send>, |
193 | | - tlv: Tlv, |
194 | | -) -> ! { |
195 | | - unsafe { worker_thread.wait_until(job_b_latch) }; |
196 | | - |
197 | | - // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. |
198 | | - tlv::set(tlv); |
199 | | - |
200 | | - unwind::resume_unwinding(err) |
201 | | -} |
0 commit comments