aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavidlohr Bueso <dave@stgolabs.net>2015-06-05 15:03:52 +1000
committerMichael Ellerman <mpe@ellerman.id.au>2015-06-05 15:03:52 +1000
commit82709156a7c7e7b195f10b073bfb4205b1350af0 (patch)
tree9f64627be263c93f73e70dec0f5ca3c295f2abb8
parentf3937d2cb2a36ca49d4dfd5d13dd683620185b1c (diff)
downloadlinux-next-82709156a7c7e7b195f10b073bfb4205b1350af0.tar.gz
ipc,msg: provide barrier pairings for lockless receive
We currently use a full barrier on the sender side to to avoid receiver tasks disappearing on us while still performing on the sender side wakeup. We lack however, the proper CPU-CPU interactions pairing on the receiver side which busy-waits for the message. Similarly, we do not need a full smp_mb, and can relax the semantics for the writer and reader sides of the message. This is safe as we are only ordering loads and stores to r_msg. And in both smp_wmb and smp_rmb, there are no stores after the calls _anyway_. This obviously applies for pipelined_send and expunge_all, for EIRDM when destroying a queue. Signed-off-by: Davidlohr Bueso <dbueso@suse.de> Cc: Manfred Spraul <manfred@colorfullife.com> Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
-rw-r--r--ipc/msg.c31
1 files changed, 22 insertions, 9 deletions
diff --git a/ipc/msg.c b/ipc/msg.c
index c2e77b3de6a545..8538074cb77233 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -196,7 +196,7 @@ static void expunge_all(struct msg_queue *msq, int res)
* or dealing with -EAGAIN cases. See lockless receive part 1
* and 2 in do_msgrcv().
*/
- smp_mb();
+ smp_wmb();
msr->r_msg = ERR_PTR(res);
}
}
@@ -580,7 +580,7 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
/* initialize pipelined send ordering */
msr->r_msg = NULL;
wake_up_process(msr->r_tsk);
- smp_mb(); /* see barrier comment below */
+ smp_wmb(); /* see barrier comment below */
msr->r_msg = ERR_PTR(-E2BIG);
} else {
msr->r_msg = NULL;
@@ -589,11 +589,12 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
wake_up_process(msr->r_tsk);
/*
* Ensure that the wakeup is visible before
- * setting r_msg, as the receiving end depends
- * on it. See lockless receive part 1 and 2 in
- * do_msgrcv().
+ * setting r_msg, as the receiving can otherwise
+ * exit - once r_msg is set, the receiver can
+ * continue. See lockless receive part 1 and 2
+ * in do_msgrcv().
*/
- smp_mb();
+ smp_wmb();
msr->r_msg = msg;
return 1;
@@ -934,10 +935,22 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
* wake_up_process(). There is a race with exit(), see
* ipc/mqueue.c for the details.
*/
- msg = (struct msg_msg *)msr_d.r_msg;
- while (msg == NULL) {
- cpu_relax();
+ for (;;) {
+ /*
+ * Pairs with writer barrier in pipelined_send
+ * or expunge_all
+ */
+ smp_rmb();
msg = (struct msg_msg *)msr_d.r_msg;
+ if (msg)
+ break;
+
+ /*
+ * The cpu_relax() call is a compiler barrier
+ * which forces everything in this loop to be
+ * re-loaded.
+ */
+ cpu_relax();
}
/* Lockless receive, part 3: