aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/block/rbd.c2
-rw-r--r--fs/ceph/addr.c98
-rw-r--r--fs/ceph/caps.c323
-rw-r--r--fs/ceph/file.c127
-rw-r--r--fs/ceph/mds_client.c23
-rw-r--r--fs/ceph/mdsmap.c163
-rw-r--r--fs/ceph/snap.c2
-rw-r--r--fs/ceph/super.c10
-rw-r--r--fs/ceph/super.h5
-rw-r--r--include/linux/ceph/auth.h5
-rw-r--r--include/linux/ceph/ceph_fs.h3
-rw-r--r--include/linux/ceph/mdsmap.h5
-rw-r--r--include/linux/ceph/messenger.h2
-rw-r--r--include/linux/ceph/osd_client.h2
-rw-r--r--net/ceph/auth.c4
-rw-r--r--net/ceph/auth_x.c197
-rw-r--r--net/ceph/auth_x.h3
-rw-r--r--net/ceph/crush/mapper.c2
-rw-r--r--net/ceph/crypto.c461
-rw-r--r--net/ceph/crypto.h26
-rw-r--r--net/ceph/messenger.c19
-rw-r--r--net/ceph/mon_client.c12
-rw-r--r--net/ceph/osd_client.c39
23 files changed, 809 insertions, 724 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 7b274ff4632c69..36d2b9f4e83654 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3756,7 +3756,7 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
struct rbd_device *rbd_dev = arg;
void *p = data;
void *const end = p + data_len;
- u8 struct_v;
+ u8 struct_v = 0;
u32 len;
u32 notify_op;
int ret;
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index ef3ebd780aff8b..a0f1e2b91c8e0f 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -315,7 +315,32 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
struct page **pages;
pgoff_t next_index;
int nr_pages = 0;
- int ret;
+ int got = 0;
+ int ret = 0;
+
+ if (!current->journal_info) {
+ /* caller of readpages does not hold buffer and read caps
+ * (fadvise, madvise and readahead cases) */
+ int want = CEPH_CAP_FILE_CACHE;
+ ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got);
+ if (ret < 0) {
+ dout("start_read %p, error getting cap\n", inode);
+ } else if (!(got & want)) {
+ dout("start_read %p, no cache cap\n", inode);
+ ret = 0;
+ }
+ if (ret <= 0) {
+ if (got)
+ ceph_put_cap_refs(ci, got);
+ while (!list_empty(page_list)) {
+ page = list_entry(page_list->prev,
+ struct page, lru);
+ list_del(&page->lru);
+ put_page(page);
+ }
+ return ret;
+ }
+ }
off = (u64) page_offset(page);
@@ -338,15 +363,18 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
CEPH_OSD_FLAG_READ, NULL,
ci->i_truncate_seq, ci->i_truncate_size,
false);
- if (IS_ERR(req))
- return PTR_ERR(req);
+ if (IS_ERR(req)) {
+ ret = PTR_ERR(req);
+ goto out;
+ }
/* build page vector */
nr_pages = calc_pages_for(0, len);
pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
- ret = -ENOMEM;
- if (!pages)
- goto out;
+ if (!pages) {
+ ret = -ENOMEM;
+ goto out_put;
+ }
for (i = 0; i < nr_pages; ++i) {
page = list_entry(page_list->prev, struct page, lru);
BUG_ON(PageLocked(page));
@@ -378,6 +406,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
if (ret < 0)
goto out_pages;
ceph_osdc_put_request(req);
+
+ /* After adding locked pages to page cache, the inode holds cache cap.
+ * So we can drop our cap refs. */
+ if (got)
+ ceph_put_cap_refs(ci, got);
+
return nr_pages;
out_pages:
@@ -386,8 +420,11 @@ out_pages:
unlock_page(pages[i]);
}
ceph_put_page_vector(pages, nr_pages, false);
-out:
+out_put:
ceph_osdc_put_request(req);
+out:
+ if (got)
+ ceph_put_cap_refs(ci, got);
return ret;
}
@@ -424,7 +461,6 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
rc = start_read(inode, page_list, max);
if (rc < 0)
goto out;
- BUG_ON(rc == 0);
}
out:
ceph_fscache_readpages_cancel(inode, page_list);
@@ -438,7 +474,9 @@ out:
* only snap context we are allowed to write back.
*/
static struct ceph_snap_context *get_oldest_context(struct inode *inode,
- loff_t *snap_size)
+ loff_t *snap_size,
+ u64 *truncate_size,
+ u32 *truncate_seq)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_snap_context *snapc = NULL;
@@ -452,6 +490,10 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,
snapc = ceph_get_snap_context(capsnap->context);
if (snap_size)
*snap_size = capsnap->size;
+ if (truncate_size)
+ *truncate_size = capsnap->truncate_size;
+ if (truncate_seq)
+ *truncate_seq = capsnap->truncate_seq;
break;
}
}
@@ -459,6 +501,10 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,
snapc = ceph_get_snap_context(ci->i_head_snapc);
dout(" head snapc %p has %d dirty pages\n",
snapc, ci->i_wrbuffer_ref_head);
+ if (truncate_size)
+ *truncate_size = capsnap->truncate_size;
+ if (truncate_seq)
+ *truncate_seq = capsnap->truncate_seq;
}
spin_unlock(&ci->i_ceph_lock);
return snapc;
@@ -501,7 +547,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
dout("writepage %p page %p not dirty?\n", inode, page);
goto out;
}
- oldest = get_oldest_context(inode, &snap_size);
+ oldest = get_oldest_context(inode, &snap_size,
+ &truncate_size, &truncate_seq);
if (snapc->seq > oldest->seq) {
dout("writepage %p page %p snapc %p not writeable - noop\n",
inode, page, snapc);
@@ -512,12 +559,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
}
ceph_put_snap_context(oldest);
- spin_lock(&ci->i_ceph_lock);
- truncate_seq = ci->i_truncate_seq;
- truncate_size = ci->i_truncate_size;
if (snap_size == -1)
snap_size = i_size_read(inode);
- spin_unlock(&ci->i_ceph_lock);
/* is this a partial page at end of file? */
if (page_off >= snap_size) {
@@ -764,7 +807,8 @@ retry:
/* find oldest snap context with dirty data */
ceph_put_snap_context(snapc);
snap_size = -1;
- snapc = get_oldest_context(inode, &snap_size);
+ snapc = get_oldest_context(inode, &snap_size,
+ &truncate_size, &truncate_seq);
if (!snapc) {
/* hmm, why does writepages get called when there
is no dirty data? */
@@ -774,11 +818,7 @@ retry:
dout(" oldest snapc is %p seq %lld (%d snaps)\n",
snapc, snapc->seq, snapc->num_snaps);
- spin_lock(&ci->i_ceph_lock);
- truncate_seq = ci->i_truncate_seq;
- truncate_size = ci->i_truncate_size;
i_size = i_size_read(inode);
- spin_unlock(&ci->i_ceph_lock);
if (last_snapc && snapc != last_snapc) {
/* if we switched to a newer snapc, restart our scan at the
@@ -1124,7 +1164,8 @@ out:
static int context_is_writeable_or_written(struct inode *inode,
struct ceph_snap_context *snapc)
{
- struct ceph_snap_context *oldest = get_oldest_context(inode, NULL);
+ struct ceph_snap_context *oldest = get_oldest_context(inode, NULL,
+ NULL, NULL);
int ret = !oldest || snapc->seq <= oldest->seq;
ceph_put_snap_context(oldest);
@@ -1169,7 +1210,7 @@ retry_locked:
* this page is already dirty in another (older) snap
* context! is it writeable now?
*/
- oldest = get_oldest_context(inode, NULL);
+ oldest = get_oldest_context(inode, NULL, NULL, NULL);
if (snapc->seq > oldest->seq) {
ceph_put_snap_context(oldest);
@@ -1371,9 +1412,11 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got));
if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
- ci->i_inline_version == CEPH_INLINE_NONE)
+ ci->i_inline_version == CEPH_INLINE_NONE) {
+ current->journal_info = vma->vm_file;
ret = filemap_fault(vma, vmf);
- else
+ current->journal_info = NULL;
+ } else
ret = -EAGAIN;
dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
@@ -1905,6 +1948,15 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
struct ceph_string *pool_ns;
int ret, flags;
+ if (ci->i_vino.snap != CEPH_NOSNAP) {
+ /*
+ * Pool permission check needs to write to the first object.
+ * But for snapshot, head of the first object may have alread
+ * been deleted. Skip check to avoid creating orphan object.
+ */
+ return 0;
+ }
+
if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
NOPOOLPERM))
return 0;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 16e6ded0b7f281..baea866a6751fa 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -987,96 +987,127 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
__cap_delay_cancel(mdsc, ci);
}
+struct cap_msg_args {
+ struct ceph_mds_session *session;
+ u64 ino, cid, follows;
+ u64 flush_tid, oldest_flush_tid, size, max_size;
+ u64 xattr_version;
+ struct ceph_buffer *xattr_buf;
+ struct timespec atime, mtime, ctime;
+ int op, caps, wanted, dirty;
+ u32 seq, issue_seq, mseq, time_warp_seq;
+ u32 flags;
+ kuid_t uid;
+ kgid_t gid;
+ umode_t mode;
+ bool inline_data;
+};
+
/*
* Build and send a cap message to the given MDS.
*
* Caller should be holding s_mutex.
*/
-static int send_cap_msg(struct ceph_mds_session *session,
- u64 ino, u64 cid, int op,
- int caps, int wanted, int dirty,
- u32 seq, u64 flush_tid, u64 oldest_flush_tid,
- u32 issue_seq, u32 mseq, u64 size, u64 max_size,
- struct timespec *mtime, struct timespec *atime,
- struct timespec *ctime, u32 time_warp_seq,
- kuid_t uid, kgid_t gid, umode_t mode,
- u64 xattr_version,
- struct ceph_buffer *xattrs_buf,
- u64 follows, bool inline_data)
+static int send_cap_msg(struct cap_msg_args *arg)
{
struct ceph_mds_caps *fc;
struct ceph_msg *msg;
void *p;
size_t extra_len;
+ struct timespec zerotime = {0};
dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
" seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
- " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op),
- cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted),
- ceph_cap_string(dirty),
- seq, issue_seq, flush_tid, oldest_flush_tid,
- mseq, follows, size, max_size,
- xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
+ " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op),
+ arg->cid, arg->ino, ceph_cap_string(arg->caps),
+ ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty),
+ arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid,
+ arg->mseq, arg->follows, arg->size, arg->max_size,
+ arg->xattr_version,
+ arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0);
/* flock buffer size + inline version + inline data size +
* osd_epoch_barrier + oldest_flush_tid */
- extra_len = 4 + 8 + 4 + 4 + 8;
+ extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4;
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
GFP_NOFS, false);
if (!msg)
return -ENOMEM;
- msg->hdr.version = cpu_to_le16(6);
- msg->hdr.tid = cpu_to_le64(flush_tid);
+ msg->hdr.version = cpu_to_le16(10);
+ msg->hdr.tid = cpu_to_le64(arg->flush_tid);
fc = msg->front.iov_base;
memset(fc, 0, sizeof(*fc));
- fc->cap_id = cpu_to_le64(cid);
- fc->op = cpu_to_le32(op);
- fc->seq = cpu_to_le32(seq);
- fc->issue_seq = cpu_to_le32(issue_seq);
- fc->migrate_seq = cpu_to_le32(mseq);
- fc->caps = cpu_to_le32(caps);
- fc->wanted = cpu_to_le32(wanted);
- fc->dirty = cpu_to_le32(dirty);
- fc->ino = cpu_to_le64(ino);
- fc->snap_follows = cpu_to_le64(follows);
-
- fc->size = cpu_to_le64(size);
- fc->max_size = cpu_to_le64(max_size);
- if (mtime)
- ceph_encode_timespec(&fc->mtime, mtime);
- if (atime)
- ceph_encode_timespec(&fc->atime, atime);
- if (ctime)
- ceph_encode_timespec(&fc->ctime, ctime);
- fc->time_warp_seq = cpu_to_le32(time_warp_seq);
-
- fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid));
- fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid));
- fc->mode = cpu_to_le32(mode);
+ fc->cap_id = cpu_to_le64(arg->cid);
+ fc->op = cpu_to_le32(arg->op);
+ fc->seq = cpu_to_le32(arg->seq);
+ fc->issue_seq = cpu_to_le32(arg->issue_seq);
+ fc->migrate_seq = cpu_to_le32(arg->mseq);
+ fc->caps = cpu_to_le32(arg->caps);
+ fc->wanted = cpu_to_le32(arg->wanted);
+ fc->dirty = cpu_to_le32(arg->dirty);
+ fc->ino = cpu_to_le64(arg->ino);
+ fc->snap_follows = cpu_to_le64(arg->follows);
+
+ fc->size = cpu_to_le64(arg->size);
+ fc->max_size = cpu_to_le64(arg->max_size);
+ ceph_encode_timespec(&fc->mtime, &arg->mtime);
+ ceph_encode_timespec(&fc->atime, &arg->atime);
+ ceph_encode_timespec(&fc->ctime, &arg->ctime);
+ fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq);
+
+ fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid));
+ fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid));
+ fc->mode = cpu_to_le32(arg->mode);
+
+ fc->xattr_version = cpu_to_le64(arg->xattr_version);
+ if (arg->xattr_buf) {
+ msg->middle = ceph_buffer_get(arg->xattr_buf);
+ fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
+ msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
+ }
p = fc + 1;
- /* flock buffer size */
+ /* flock buffer size (version 2) */
ceph_encode_32(&p, 0);
- /* inline version */
- ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
+ /* inline version (version 4) */
+ ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE);
/* inline data size */
ceph_encode_32(&p, 0);
- /* osd_epoch_barrier */
+ /* osd_epoch_barrier (version 5) */
ceph_encode_32(&p, 0);
- /* oldest_flush_tid */
- ceph_encode_64(&p, oldest_flush_tid);
+ /* oldest_flush_tid (version 6) */
+ ceph_encode_64(&p, arg->oldest_flush_tid);
- fc->xattr_version = cpu_to_le64(xattr_version);
- if (xattrs_buf) {
- msg->middle = ceph_buffer_get(xattrs_buf);
- fc->xattr_len = cpu_to_le32(xattrs_buf->vec.iov_len);
- msg->hdr.middle_len = cpu_to_le32(xattrs_buf->vec.iov_len);
- }
+ /*
+ * caller_uid/caller_gid (version 7)
+ *
+ * Currently, we don't properly track which caller dirtied the caps
+ * last, and force a flush of them when there is a conflict. For now,
+ * just set this to 0:0, to emulate how the MDS has worked up to now.
+ */
+ ceph_encode_32(&p, 0);
+ ceph_encode_32(&p, 0);
+
+ /* pool namespace (version 8) (mds always ignores this) */
+ ceph_encode_32(&p, 0);
- ceph_con_send(&session->s_con, msg);
+ /*
+ * btime and change_attr (version 9)
+ *
+ * We just zero these out for now, as the MDS ignores them unless
+ * the requisite feature flags are set (which we don't do yet).
+ */
+ ceph_encode_timespec(p, &zerotime);
+ p += sizeof(struct ceph_timespec);
+ ceph_encode_64(&p, 0);
+
+ /* Advisory flags (version 10) */
+ ceph_encode_32(&p, arg->flags);
+
+ ceph_con_send(&arg->session->s_con, msg);
return 0;
}
@@ -1115,27 +1146,17 @@ void ceph_queue_caps_release(struct inode *inode)
* caller should hold snap_rwsem (read), s_mutex.
*/
static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
- int op, int used, int want, int retain, int flushing,
- u64 flush_tid, u64 oldest_flush_tid)
+ int op, bool sync, int used, int want, int retain,
+ int flushing, u64 flush_tid, u64 oldest_flush_tid)
__releases(cap->ci->i_ceph_lock)
{
struct ceph_inode_info *ci = cap->ci;
struct inode *inode = &ci->vfs_inode;
- u64 cap_id = cap->cap_id;
- int held, revoking, dropping, keep;
- u64 follows, size, max_size;
- u32 seq, issue_seq, mseq, time_warp_seq;
- struct timespec mtime, atime, ctime;
+ struct cap_msg_args arg;
+ int held, revoking, dropping;
int wake = 0;
- umode_t mode;
- kuid_t uid;
- kgid_t gid;
- struct ceph_mds_session *session;
- u64 xattr_version = 0;
- struct ceph_buffer *xattr_blob = NULL;
int delayed = 0;
int ret;
- bool inline_data;
held = cap->issued | cap->implemented;
revoking = cap->implemented & ~cap->issued;
@@ -1148,7 +1169,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
ceph_cap_string(revoking));
BUG_ON((retain & CEPH_CAP_PIN) == 0);
- session = cap->session;
+ arg.session = cap->session;
/* don't release wanted unless we've waited a bit. */
if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
@@ -1177,40 +1198,51 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
cap->implemented &= cap->issued | used;
cap->mds_wanted = want;
- follows = flushing ? ci->i_head_snapc->seq : 0;
-
- keep = cap->implemented;
- seq = cap->seq;
- issue_seq = cap->issue_seq;
- mseq = cap->mseq;
- size = inode->i_size;
- ci->i_reported_size = size;
- max_size = ci->i_wanted_max_size;
- ci->i_requested_max_size = max_size;
- mtime = inode->i_mtime;
- atime = inode->i_atime;
- ctime = inode->i_ctime;
- time_warp_seq = ci->i_time_warp_seq;
- uid = inode->i_uid;
- gid = inode->i_gid;
- mode = inode->i_mode;
+ arg.ino = ceph_vino(inode).ino;
+ arg.cid = cap->cap_id;
+ arg.follows = flushing ? ci->i_head_snapc->seq : 0;
+ arg.flush_tid = flush_tid;
+ arg.oldest_flush_tid = oldest_flush_tid;
+
+ arg.size = inode->i_size;
+ ci->i_reported_size = arg.size;
+ arg.max_size = ci->i_wanted_max_size;
+ ci->i_requested_max_size = arg.max_size;
if (flushing & CEPH_CAP_XATTR_EXCL) {
__ceph_build_xattrs_blob(ci);
- xattr_blob = ci->i_xattrs.blob;
- xattr_version = ci->i_xattrs.version;
+ arg.xattr_version = ci->i_xattrs.version;
+ arg.xattr_buf = ci->i_xattrs.blob;
+ } else {
+ arg.xattr_buf = NULL;
}
- inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+ arg.mtime = inode->i_mtime;
+ arg.atime = inode->i_atime;
+ arg.ctime = inode->i_ctime;
+
+ arg.op = op;
+ arg.caps = cap->implemented;
+ arg.wanted = want;
+ arg.dirty = flushing;
+
+ arg.seq = cap->seq;
+ arg.issue_seq = cap->issue_seq;
+ arg.mseq = cap->mseq;
+ arg.time_warp_seq = ci->i_time_warp_seq;
+
+ arg.uid = inode->i_uid;
+ arg.gid = inode->i_gid;
+ arg.mode = inode->i_mode;
+
+ arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+ arg.flags = 0;
+ if (sync)
+ arg.flags |= CEPH_CLIENT_CAPS_SYNC;
spin_unlock(&ci->i_ceph_lock);
- ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
- op, keep, want, flushing, seq,
- flush_tid, oldest_flush_tid, issue_seq, mseq,
- size, max_size, &mtime, &atime, &ctime, time_warp_seq,
- uid, gid, mode, xattr_version, xattr_blob,
- follows, inline_data);
+ ret = send_cap_msg(&arg);
if (ret < 0) {
dout("error sending cap msg, must requeue %p\n", inode);
delayed = 1;
@@ -1227,15 +1259,42 @@ static inline int __send_flush_snap(struct inode *inode,
struct ceph_cap_snap *capsnap,
u32 mseq, u64 oldest_flush_tid)
{
- return send_cap_msg(session, ceph_vino(inode).ino, 0,
- CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
- capsnap->dirty, 0, capsnap->cap_flush.tid,
- oldest_flush_tid, 0, mseq, capsnap->size, 0,
- &capsnap->mtime, &capsnap->atime,
- &capsnap->ctime, capsnap->time_warp_seq,
- capsnap->uid, capsnap->gid, capsnap->mode,
- capsnap->xattr_version, capsnap->xattr_blob,
- capsnap->follows, capsnap->inline_data);
+ struct cap_msg_args arg;
+
+ arg.session = session;
+ arg.ino = ceph_vino(inode).ino;
+ arg.cid = 0;
+ arg.follows = capsnap->follows;
+ arg.flush_tid = capsnap->cap_flush.tid;
+ arg.oldest_flush_tid = oldest_flush_tid;
+
+ arg.size = capsnap->size;
+ arg.max_size = 0;
+ arg.xattr_version = capsnap->xattr_version;
+ arg.xattr_buf = capsnap->xattr_blob;
+
+ arg.atime = capsnap->atime;
+ arg.mtime = capsnap->mtime;
+ arg.ctime = capsnap->ctime;
+
+ arg.op = CEPH_CAP_OP_FLUSHSNAP;
+ arg.caps = capsnap->issued;
+ arg.wanted = 0;
+ arg.dirty = capsnap->dirty;
+
+ arg.seq = 0;
+ arg.issue_seq = 0;
+ arg.mseq = mseq;
+ arg.time_warp_seq = capsnap->time_warp_seq;
+
+ arg.uid = capsnap->uid;
+ arg.gid = capsnap->gid;
+ arg.mode = capsnap->mode;
+
+ arg.inline_data = capsnap->inline_data;
+ arg.flags = 0;
+
+ return send_cap_msg(&arg);
}
/*
@@ -1858,9 +1917,9 @@ ack:
sent++;
/* __send_cap drops i_ceph_lock */
- delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
- want, retain, flushing,
- flush_tid, oldest_flush_tid);
+ delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false,
+ cap_used, want, retain, flushing,
+ flush_tid, oldest_flush_tid);
goto retry; /* retake i_ceph_lock and restart our cap scan. */
}
@@ -1924,9 +1983,9 @@ retry:
&flush_tid, &oldest_flush_tid);
/* __send_cap drops i_ceph_lock */
- delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
- (cap->issued | cap->implemented),
- flushing, flush_tid, oldest_flush_tid);
+ delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true,
+ used, want, (cap->issued | cap->implemented),
+ flushing, flush_tid, oldest_flush_tid);
if (delayed) {
spin_lock(&ci->i_ceph_lock);
@@ -1996,7 +2055,7 @@ static int unsafe_request_wait(struct inode *inode)
}
spin_unlock(&ci->i_unsafe_lock);
- dout("unsafe_requeset_wait %p wait on tid %llu %llu\n",
+ dout("unsafe_request_wait %p wait on tid %llu %llu\n",
inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL);
if (req1) {
ret = !wait_for_completion_timeout(&req1->r_safe_completion,
@@ -2119,7 +2178,7 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
inode, cap, cf->tid, ceph_cap_string(cf->caps));
ci->i_ceph_flags |= CEPH_I_NODELAY;
ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
- __ceph_caps_used(ci),
+ false, __ceph_caps_used(ci),
__ceph_caps_wanted(ci),
cap->issued | cap->implemented,
cf->caps, cf->tid, oldest_flush_tid);
@@ -2479,6 +2538,27 @@ static void check_max_size(struct inode *inode, loff_t endoff)
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
}
+int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got)
+{
+ int ret, err = 0;
+
+ BUG_ON(need & ~CEPH_CAP_FILE_RD);
+ BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
+ ret = ceph_pool_perm_check(ci, need);
+ if (ret < 0)
+ return ret;
+
+ ret = try_get_cap_refs(ci, need, want, 0, true, got, &err);
+ if (ret) {
+ if (err == -EAGAIN) {
+ ret = 0;
+ } else if (err < 0) {
+ ret = err;
+ }
+ }
+ return ret;
+}
+
/*
* Wait for caps, and take cap references. If we can't get a WR cap
* due to a small max_size, make sure we check_max_size (and possibly
@@ -2507,9 +2587,15 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
if (err < 0)
ret = err;
} else {
- ret = wait_event_interruptible(ci->i_cap_wq,
- try_get_cap_refs(ci, need, want, endoff,
- true, &_got, &err));
+ DEFINE_WAIT_FUNC(wait, woken_wake_function);
+ add_wait_queue(&ci->i_cap_wq, &wait);
+
+ while (!try_get_cap_refs(ci, need, want, endoff,
+ true, &_got, &err))
+ wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
+
+ remove_wait_queue(&ci->i_cap_wq, &wait);
+
if (err == -EAGAIN)
continue;
if (err < 0)
@@ -3570,6 +3656,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
cap->cap_id = le64_to_cpu(h->cap_id);
cap->mseq = mseq;
cap->seq = seq;
+ cap->issue_seq = seq;
spin_lock(&session->s_cap_lock);
list_add_tail(&cap->session_caps,
&session->s_cap_releases);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 159fc8f1a6a00d..045d30d2662485 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -454,71 +454,60 @@ enum {
* only return a short read to the caller if we hit EOF.
*/
static int striped_read(struct inode *inode,
- u64 off, u64 len,
+ u64 pos, u64 len,
struct page **pages, int num_pages,
- int *checkeof)
+ int page_align, int *checkeof)
{
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode);
- u64 pos, this_len, left;
+ u64 this_len;
loff_t i_size;
- int page_align, pages_left;
- int read, ret;
- struct page **page_pos;
+ int page_idx;
+ int ret, read = 0;
bool hit_stripe, was_short;
/*
* we may need to do multiple reads. not atomic, unfortunately.
*/
- pos = off;
- left = len;
- page_pos = pages;
- pages_left = num_pages;
- read = 0;
-
more:
- page_align = pos & ~PAGE_MASK;
- this_len = left;
+ this_len = len;
+ page_idx = (page_align + read) >> PAGE_SHIFT;
ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
&ci->i_layout, pos, &this_len,
- ci->i_truncate_seq,
- ci->i_truncate_size,
- page_pos, pages_left, page_align);
+ ci->i_truncate_seq, ci->i_truncate_size,
+ pages + page_idx, num_pages - page_idx,
+ ((page_align + read) & ~PAGE_MASK));
if (ret == -ENOENT)
ret = 0;
- hit_stripe = this_len < left;
+ hit_stripe = this_len < len;
was_short = ret >= 0 && ret < this_len;
- dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
+ dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
i_size = i_size_read(inode);
if (ret >= 0) {
- int didpages;
if (was_short && (pos + ret < i_size)) {
int zlen = min(this_len - ret, i_size - pos - ret);
- int zoff = (off & ~PAGE_MASK) + read + ret;
+ int zoff = page_align + read + ret;
dout(" zero gap %llu to %llu\n",
- pos + ret, pos + ret + zlen);
+ pos + ret, pos + ret + zlen);
ceph_zero_page_vector_range(zoff, zlen, pages);
ret += zlen;
}
- didpages = (page_align + ret) >> PAGE_SHIFT;
+ read += ret;
pos += ret;
- read = pos - off;
- left -= ret;
- page_pos += didpages;
- pages_left -= didpages;
+ len -= ret;
/* hit stripe and need continue*/
- if (left && hit_stripe && pos < i_size)
+ if (len && hit_stripe && pos < i_size)
goto more;
}
if (read > 0) {
ret = read;
/* did we bounce off eof? */
- if (pos + left > i_size)
+ if (pos + len > i_size)
*checkeof = CHECK_EOF;
}
@@ -532,15 +521,16 @@ more:
*
* If the read spans object boundary, just do multiple reads.
*/
-static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
- int *checkeof)
+static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
+ int *checkeof)
{
struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file);
struct page **pages;
u64 off = iocb->ki_pos;
- int num_pages, ret;
- size_t len = iov_iter_count(i);
+ int num_pages;
+ ssize_t ret;
+ size_t len = iov_iter_count(to);
dout("sync_read on file %p %llu~%u %s\n", file, off,
(unsigned)len,
@@ -559,35 +549,56 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
if (ret < 0)
return ret;
- num_pages = calc_pages_for(off, len);
- pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
- if (IS_ERR(pages))
- return PTR_ERR(pages);
- ret = striped_read(inode, off, len, pages,
- num_pages, checkeof);
- if (ret > 0) {
- int l, k = 0;
- size_t left = ret;
-
- while (left) {
- size_t page_off = off & ~PAGE_MASK;
- size_t copy = min_t(size_t, left,
- PAGE_SIZE - page_off);
- l = copy_page_to_iter(pages[k++], page_off, copy, i);
- off += l;
- left -= l;
- if (l < copy)
- break;
+ if (unlikely(to->type & ITER_PIPE)) {
+ size_t page_off;
+ ret = iov_iter_get_pages_alloc(to, &pages, len,
+ &page_off);
+ if (ret <= 0)
+ return -ENOMEM;
+ num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
+
+ ret = striped_read(inode, off, ret, pages, num_pages,
+ page_off, checkeof);
+ if (ret > 0) {
+ iov_iter_advance(to, ret);
+ off += ret;
+ } else {
+ iov_iter_advance(to, 0);
+ }
+ ceph_put_page_vector(pages, num_pages, false);
+ } else {
+ num_pages = calc_pages_for(off, len);
+ pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+ if (IS_ERR(pages))
+ return PTR_ERR(pages);
+
+ ret = striped_read(inode, off, len, pages, num_pages,
+ (off & ~PAGE_MASK), checkeof);
+ if (ret > 0) {
+ int l, k = 0;
+ size_t left = ret;
+
+ while (left) {
+ size_t page_off = off & ~PAGE_MASK;
+ size_t copy = min_t(size_t, left,
+ PAGE_SIZE - page_off);
+ l = copy_page_to_iter(pages[k++], page_off,
+ copy, to);
+ off += l;
+ left -= l;
+ if (l < copy)
+ break;
+ }
}
+ ceph_release_page_vector(pages, num_pages);
}
- ceph_release_page_vector(pages, num_pages);
if (off > iocb->ki_pos) {
ret = off - iocb->ki_pos;
iocb->ki_pos = off;
}
- dout("sync_read result %d\n", ret);
+ dout("sync_read result %zd\n", ret);
return ret;
}
@@ -849,7 +860,7 @@ void ceph_sync_write_wait(struct inode *inode)
dout("sync_write_wait on tid %llu (until %llu)\n",
req->r_tid, last_tid);
- wait_for_completion(&req->r_safe_completion);
+ wait_for_completion(&req->r_done_completion);
ceph_osdc_put_request(req);
spin_lock(&ci->i_unsafe_lock);
@@ -902,7 +913,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
pos >> PAGE_SHIFT,
(pos + count) >> PAGE_SHIFT);
if (ret2 < 0)
- dout("invalidate_inode_pages2_range returned %d\n", ret);
+ dout("invalidate_inode_pages2_range returned %d\n", ret2);
flags = CEPH_OSD_FLAG_ORDERSNAP |
CEPH_OSD_FLAG_ONDISK |
@@ -1245,8 +1256,9 @@ again:
dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
ceph_cap_string(got));
-
+ current->journal_info = filp;
ret = generic_file_read_iter(iocb, to);
+ current->journal_info = NULL;
}
dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
@@ -1766,6 +1778,7 @@ const struct file_operations ceph_file_fops = {
.fsync = ceph_fsync,
.lock = ceph_lock,
.flock = ceph_flock,
+ .splice_read = generic_file_splice_read,
.splice_write = iter_file_splice_write,
.unlocked_ioctl = ceph_ioctl,
.compat_ioctl = ceph_ioctl,
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 815acd1a56d461..4f49253387a0a5 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2100,17 +2100,26 @@ static int __do_request(struct ceph_mds_client *mdsc,
err = -EIO;
goto finish;
}
+ if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
+ if (mdsc->mdsmap_err) {
+ err = mdsc->mdsmap_err;
+ dout("do_request mdsmap err %d\n", err);
+ goto finish;
+ }
+ if (!(mdsc->fsc->mount_options->flags &
+ CEPH_MOUNT_OPT_MOUNTWAIT) &&
+ !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
+ err = -ENOENT;
+ pr_info("probably no mds server is up\n");
+ goto finish;
+ }
+ }
put_request_session(req);
mds = __choose_mds(mdsc, req);
if (mds < 0 ||
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
- if (mdsc->mdsmap_err) {
- err = mdsc->mdsmap_err;
- dout("do_request mdsmap err %d\n", err);
- goto finish;
- }
dout("do_request no mds or not active, waiting for map\n");
list_add(&req->r_wait, &mdsc->waiting_for_map);
goto out;
@@ -3943,13 +3952,13 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
}
-static int verify_authorizer_reply(struct ceph_connection *con, int len)
+static int verify_authorizer_reply(struct ceph_connection *con)
{
struct ceph_mds_session *s = con->private;
struct ceph_mds_client *mdsc = s->s_mdsc;
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
- return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len);
+ return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
}
static int invalidate_authorizer(struct ceph_connection *con)
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index 8c3591a7fbaeef..5454e2327a5f77 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -42,6 +42,60 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
return i;
}
+#define __decode_and_drop_type(p, end, type, bad) \
+ do { \
+ if (*p + sizeof(type) > end) \
+ goto bad; \
+ *p += sizeof(type); \
+ } while (0)
+
+#define __decode_and_drop_set(p, end, type, bad) \
+ do { \
+ u32 n; \
+ size_t need; \
+ ceph_decode_32_safe(p, end, n, bad); \
+ need = sizeof(type) * n; \
+ ceph_decode_need(p, end, need, bad); \
+ *p += need; \
+ } while (0)
+
+#define __decode_and_drop_map(p, end, ktype, vtype, bad) \
+ do { \
+ u32 n; \
+ size_t need; \
+ ceph_decode_32_safe(p, end, n, bad); \
+ need = (sizeof(ktype) + sizeof(vtype)) * n; \
+ ceph_decode_need(p, end, need, bad); \
+ *p += need; \
+ } while (0)
+
+
+static int __decode_and_drop_compat_set(void **p, void* end)
+{
+ int i;
+ /* compat, ro_compat, incompat*/
+ for (i = 0; i < 3; i++) {
+ u32 n;
+ ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad);
+ /* mask */
+ *p += sizeof(u64);
+ /* names (map<u64, string>) */
+ n = ceph_decode_32(p);
+ while (n-- > 0) {
+ u32 len;
+ ceph_decode_need(p, end, sizeof(u64) + sizeof(u32),
+ bad);
+ *p += sizeof(u64);
+ len = ceph_decode_32(p);
+ ceph_decode_need(p, end, len, bad);
+ *p += len;
+ }
+ }
+ return 0;
+bad:
+ return -1;
+}
+
/*
* Decode an MDS map
*
@@ -55,6 +109,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
int i, j, n;
int err = -EINVAL;
u8 mdsmap_v, mdsmap_cv;
+ u16 mdsmap_ev;
m = kzalloc(sizeof(*m), GFP_NOFS);
if (m == NULL)
@@ -83,7 +138,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
m->m_info = kcalloc(m->m_max_mds, sizeof(*m->m_info), GFP_NOFS);
if (m->m_info == NULL)
- goto badmem;
+ goto nomem;
/* pick out active nodes from mds_info (state > 0) */
n = ceph_decode_32(p);
@@ -166,7 +221,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
info->export_targets = kcalloc(num_export_targets,
sizeof(u32), GFP_NOFS);
if (info->export_targets == NULL)
- goto badmem;
+ goto nomem;
for (j = 0; j < num_export_targets; j++)
info->export_targets[j] =
ceph_decode_32(&pexport_targets);
@@ -180,24 +235,104 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
m->m_num_data_pg_pools = n;
m->m_data_pg_pools = kcalloc(n, sizeof(u64), GFP_NOFS);
if (!m->m_data_pg_pools)
- goto badmem;
+ goto nomem;
ceph_decode_need(p, end, sizeof(u64)*(n+1), bad);
for (i = 0; i < n; i++)
m->m_data_pg_pools[i] = ceph_decode_64(p);
m->m_cas_pg_pool = ceph_decode_64(p);
+ m->m_enabled = m->m_epoch > 1;
+
+ mdsmap_ev = 1;
+ if (mdsmap_v >= 2) {
+ ceph_decode_16_safe(p, end, mdsmap_ev, bad_ext);
+ }
+ if (mdsmap_ev >= 3) {
+ if (__decode_and_drop_compat_set(p, end) < 0)
+ goto bad_ext;
+ }
+ /* metadata_pool */
+ if (mdsmap_ev < 5) {
+ __decode_and_drop_type(p, end, u32, bad_ext);
+ } else {
+ __decode_and_drop_type(p, end, u64, bad_ext);
+ }
- /* ok, we don't care about the rest. */
+ /* created + modified + tableserver */
+ __decode_and_drop_type(p, end, struct ceph_timespec, bad_ext);
+ __decode_and_drop_type(p, end, struct ceph_timespec, bad_ext);
+ __decode_and_drop_type(p, end, u32, bad_ext);
+
+ /* in */
+ {
+ int num_laggy = 0;
+ ceph_decode_32_safe(p, end, n, bad_ext);
+ ceph_decode_need(p, end, sizeof(u32) * n, bad_ext);
+
+ for (i = 0; i < n; i++) {
+ s32 mds = ceph_decode_32(p);
+ if (mds >= 0 && mds < m->m_max_mds) {
+ if (m->m_info[mds].laggy)
+ num_laggy++;
+ }
+ }
+ m->m_num_laggy = num_laggy;
+ }
+
+ /* inc */
+ __decode_and_drop_map(p, end, u32, u32, bad_ext);
+ /* up */
+ __decode_and_drop_map(p, end, u32, u64, bad_ext);
+ /* failed */
+ __decode_and_drop_set(p, end, u32, bad_ext);
+ /* stopped */
+ __decode_and_drop_set(p, end, u32, bad_ext);
+
+ if (mdsmap_ev >= 4) {
+ /* last_failure_osd_epoch */
+ __decode_and_drop_type(p, end, u32, bad_ext);
+ }
+ if (mdsmap_ev >= 6) {
+ /* ever_allowed_snaps */
+ __decode_and_drop_type(p, end, u8, bad_ext);
+ /* explicitly_allowed_snaps */
+ __decode_and_drop_type(p, end, u8, bad_ext);
+ }
+ if (mdsmap_ev >= 7) {
+ /* inline_data_enabled */
+ __decode_and_drop_type(p, end, u8, bad_ext);
+ }
+ if (mdsmap_ev >= 8) {
+ u32 name_len;
+ /* enabled */
+ ceph_decode_8_safe(p, end, m->m_enabled, bad_ext);
+ ceph_decode_32_safe(p, end, name_len, bad_ext);
+ ceph_decode_need(p, end, name_len, bad_ext);
+ *p += name_len;
+ }
+ /* damaged */
+ if (mdsmap_ev >= 9) {
+ size_t need;
+ ceph_decode_32_safe(p, end, n, bad_ext);
+ need = sizeof(u32) * n;
+ ceph_decode_need(p, end, need, bad_ext);
+ *p += need;
+ m->m_damaged = n > 0;
+ } else {
+ m->m_damaged = false;
+ }
+bad_ext:
*p = end;
dout("mdsmap_decode success epoch %u\n", m->m_epoch);
return m;
-
-badmem:
+nomem:
err = -ENOMEM;
+ goto out_err;
bad:
pr_err("corrupt mdsmap\n");
print_hex_dump(KERN_DEBUG, "mdsmap: ",
DUMP_PREFIX_OFFSET, 16, 1,
start, end - start, true);
+out_err:
ceph_mdsmap_destroy(m);
return ERR_PTR(err);
}
@@ -212,3 +347,19 @@ void ceph_mdsmap_destroy(struct ceph_mdsmap *m)
kfree(m->m_data_pg_pools);
kfree(m);
}
+
+bool ceph_mdsmap_is_cluster_available(struct ceph_mdsmap *m)
+{
+ int i, nr_active = 0;
+ if (!m->m_enabled)
+ return false;
+ if (m->m_damaged)
+ return false;
+ if (m->m_num_laggy > 0)
+ return false;
+ for (i = 0; i < m->m_max_mds; i++) {
+ if (m->m_info[i].state == CEPH_MDS_STATE_ACTIVE)
+ nr_active++;
+ }
+ return nr_active > 0;
+}
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 9ff5219d849e94..8f8b41c2ef0f7d 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -593,6 +593,8 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
capsnap->atime = inode->i_atime;
capsnap->ctime = inode->i_ctime;
capsnap->time_warp_seq = ci->i_time_warp_seq;
+ capsnap->truncate_size = ci->i_truncate_size;
+ capsnap->truncate_seq = ci->i_truncate_seq;
if (capsnap->dirty_pages) {
dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
"still has %d dirty pages\n", inode, capsnap,
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index f2f76696ddca20..6bd20d707bfd88 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -137,6 +137,8 @@ enum {
Opt_nofscache,
Opt_poolperm,
Opt_nopoolperm,
+ Opt_require_active_mds,
+ Opt_norequire_active_mds,
#ifdef CONFIG_CEPH_FS_POSIX_ACL
Opt_acl,
#endif
@@ -171,6 +173,8 @@ static match_table_t fsopt_tokens = {
{Opt_nofscache, "nofsc"},
{Opt_poolperm, "poolperm"},
{Opt_nopoolperm, "nopoolperm"},
+ {Opt_require_active_mds, "require_active_mds"},
+ {Opt_norequire_active_mds, "norequire_active_mds"},
#ifdef CONFIG_CEPH_FS_POSIX_ACL
{Opt_acl, "acl"},
#endif
@@ -287,6 +291,12 @@ static int parse_fsopt_token(char *c, void *private)
case Opt_nopoolperm:
fsopt->flags |= CEPH_MOUNT_OPT_NOPOOLPERM;
break;
+ case Opt_require_active_mds:
+ fsopt->flags &= ~CEPH_MOUNT_OPT_MOUNTWAIT;
+ break;
+ case Opt_norequire_active_mds:
+ fsopt->flags |= CEPH_MOUNT_OPT_MOUNTWAIT;
+ break;
#ifdef CONFIG_CEPH_FS_POSIX_ACL
case Opt_acl:
fsopt->sb_flags |= MS_POSIXACL;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 931687f71a7cfa..3373b61faefd0f 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -36,6 +36,7 @@
#define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */
#define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */
#define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */
+#define CEPH_MOUNT_OPT_MOUNTWAIT (1<<12) /* mount waits if no mds is up */
#define CEPH_MOUNT_OPT_DEFAULT CEPH_MOUNT_OPT_DCACHE
@@ -180,6 +181,8 @@ struct ceph_cap_snap {
u64 size;
struct timespec mtime, atime, ctime;
u64 time_warp_seq;
+ u64 truncate_size;
+ u32 truncate_seq;
int writing; /* a sync write is still in progress */
int dirty_pages; /* dirty pages awaiting writeback */
bool inline_data;
@@ -905,6 +908,8 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page);
+extern int ceph_try_get_caps(struct ceph_inode_info *ci,
+ int need, int want, int *got);
/* for counting open files by mode */
extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);
diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h
index 374bb1c4ef5292..a6747789fe5c25 100644
--- a/include/linux/ceph/auth.h
+++ b/include/linux/ceph/auth.h
@@ -64,7 +64,7 @@ struct ceph_auth_client_ops {
int (*update_authorizer)(struct ceph_auth_client *ac, int peer_type,
struct ceph_auth_handshake *auth);
int (*verify_authorizer_reply)(struct ceph_auth_client *ac,
- struct ceph_authorizer *a, size_t len);
+ struct ceph_authorizer *a);
void (*invalidate_authorizer)(struct ceph_auth_client *ac,
int peer_type);
@@ -118,8 +118,7 @@ extern int ceph_auth_update_authorizer(struct ceph_auth_client *ac,
int peer_type,
struct ceph_auth_handshake *a);
extern int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac,
- struct ceph_authorizer *a,
- size_t len);
+ struct ceph_authorizer *a);
extern void ceph_auth_invalidate_authorizer(struct ceph_auth_client *ac,
int peer_type);
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index f96de8de4fa7a1..f4b2ee18f38cbd 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -653,6 +653,9 @@ enum {
extern const char *ceph_cap_op_name(int op);
+/* flags field in client cap messages (version >= 10) */
+#define CEPH_CLIENT_CAPS_SYNC (0x1)
+
/*
* caps message, used for capability callbacks, acks, requests, etc.
*/
diff --git a/include/linux/ceph/mdsmap.h b/include/linux/ceph/mdsmap.h
index 87ed09f548007e..8ed5dc505fbb2e 100644
--- a/include/linux/ceph/mdsmap.h
+++ b/include/linux/ceph/mdsmap.h
@@ -31,6 +31,10 @@ struct ceph_mdsmap {
int m_num_data_pg_pools;
u64 *m_data_pg_pools;
u64 m_cas_pg_pool;
+
+ bool m_enabled;
+ bool m_damaged;
+ int m_num_laggy;
};
static inline struct ceph_entity_addr *
@@ -59,5 +63,6 @@ static inline bool ceph_mdsmap_is_laggy(struct ceph_mdsmap *m, int w)
extern int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m);
extern struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end);
extern void ceph_mdsmap_destroy(struct ceph_mdsmap *m);
+extern bool ceph_mdsmap_is_cluster_available(struct ceph_mdsmap *m);
#endif
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 67bcef2ecddb82..c5c4c713e00f52 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -30,7 +30,7 @@ struct ceph_connection_operations {
struct ceph_auth_handshake *(*get_authorizer) (
struct ceph_connection *con,
int *proto, int force_new);
- int (*verify_authorizer_reply) (struct ceph_connection *con, int len);
+ int (*verify_authorizer_reply) (struct ceph_connection *con);
int (*invalidate_authorizer)(struct ceph_connection *con);
/* there was some error on the socket (disconnect, whatever) */
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index a8e66344bacc22..03a6653d329a01 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -176,7 +176,7 @@ struct ceph_osd_request {
struct kref r_kref;
bool r_mempool;
struct completion r_completion;
- struct completion r_safe_completion; /* fsync waiter */
+ struct completion r_done_completion; /* fsync waiter */
ceph_osdc_callback_t r_callback;
ceph_osdc_unsafe_callback_t r_unsafe_callback;
struct list_head r_unsafe_item;
diff --git a/net/ceph/auth.c b/net/ceph/auth.c
index c822b3ae1bd37b..48bb8d95195b1d 100644
--- a/net/ceph/auth.c
+++ b/net/ceph/auth.c
@@ -315,13 +315,13 @@ int ceph_auth_update_authorizer(struct ceph_auth_client *ac,
EXPORT_SYMBOL(ceph_auth_update_authorizer);
int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac,
- struct ceph_authorizer *a, size_t len)
+ struct ceph_authorizer *a)
{
int ret = 0;
mutex_lock(&ac->mutex);
if (ac->ops && ac->ops->verify_authorizer_reply)
- ret = ac->ops->verify_authorizer_reply(ac, a, len);
+ ret = ac->ops->verify_authorizer_reply(ac, a);
mutex_unlock(&ac->mutex);
return ret;
}
diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c
index a0905f04bd13f3..2034fb9266700e 100644
--- a/net/ceph/auth_x.c
+++ b/net/ceph/auth_x.c
@@ -39,56 +39,58 @@ static int ceph_x_should_authenticate(struct ceph_auth_client *ac)
return need != 0;
}
+static int ceph_x_encrypt_offset(void)
+{
+ return sizeof(u32) + sizeof(struct ceph_x_encrypt_header);
+}
+
static int ceph_x_encrypt_buflen(int ilen)
{
- return sizeof(struct ceph_x_encrypt_header) + ilen + 16 +
- sizeof(u32);
+ return ceph_x_encrypt_offset() + ilen + 16;
}
-static int ceph_x_encrypt(struct ceph_crypto_key *secret,
- void *ibuf, int ilen, void *obuf, size_t olen)
+static int ceph_x_encrypt(struct ceph_crypto_key *secret, void *buf,
+ int buf_len, int plaintext_len)
{
- struct ceph_x_encrypt_header head = {
- .struct_v = 1,
- .magic = cpu_to_le64(CEPHX_ENC_MAGIC)
- };
- size_t len = olen - sizeof(u32);
+ struct ceph_x_encrypt_header *hdr = buf + sizeof(u32);
+ int ciphertext_len;
int ret;
- ret = ceph_encrypt2(secret, obuf + sizeof(u32), &len,
- &head, sizeof(head), ibuf, ilen);
+ hdr->struct_v = 1;
+ hdr->magic = cpu_to_le64(CEPHX_ENC_MAGIC);
+
+ ret = ceph_crypt(secret, true, buf + sizeof(u32), buf_len - sizeof(u32),
+ plaintext_len + sizeof(struct ceph_x_encrypt_header),
+ &ciphertext_len);
if (ret)
return ret;
- ceph_encode_32(&obuf, len);
- return len + sizeof(u32);
+
+ ceph_encode_32(&buf, ciphertext_len);
+ return sizeof(u32) + ciphertext_len;
}
-static int ceph_x_decrypt(struct ceph_crypto_key *secret,
- void **p, void *end, void **obuf, size_t olen)
+static int ceph_x_decrypt(struct ceph_crypto_key *secret, void **p, void *end)
{
- struct ceph_x_encrypt_header head;
- size_t head_len = sizeof(head);
- int len, ret;
-
- len = ceph_decode_32(p);
- if (*p + len > end)
- return -EINVAL;
+ struct ceph_x_encrypt_header *hdr = *p + sizeof(u32);
+ int ciphertext_len, plaintext_len;
+ int ret;
- dout("ceph_x_decrypt len %d\n", len);
- if (*obuf == NULL) {
- *obuf = kmalloc(len, GFP_NOFS);
- if (!*obuf)
- return -ENOMEM;
- olen = len;
- }
+ ceph_decode_32_safe(p, end, ciphertext_len, e_inval);
+ ceph_decode_need(p, end, ciphertext_len, e_inval);
- ret = ceph_decrypt2(secret, &head, &head_len, *obuf, &olen, *p, len);
+ ret = ceph_crypt(secret, false, *p, end - *p, ciphertext_len,
+ &plaintext_len);
if (ret)
return ret;
- if (head.struct_v != 1 || le64_to_cpu(head.magic) != CEPHX_ENC_MAGIC)
+
+ if (hdr->struct_v != 1 || le64_to_cpu(hdr->magic) != CEPHX_ENC_MAGIC)
return -EPERM;
- *p += len;
- return olen;
+
+ *p += ciphertext_len;
+ return plaintext_len - sizeof(struct ceph_x_encrypt_header);
+
+e_inval:
+ return -EINVAL;
}
/*
@@ -143,13 +145,10 @@ static int process_one_ticket(struct ceph_auth_client *ac,
int type;
u8 tkt_struct_v, blob_struct_v;
struct ceph_x_ticket_handler *th;
- void *dbuf = NULL;
void *dp, *dend;
int dlen;
char is_enc;
struct timespec validity;
- struct ceph_crypto_key old_key;
- void *ticket_buf = NULL;
void *tp, *tpend;
void **ptp;
struct ceph_crypto_key new_session_key;
@@ -174,20 +173,17 @@ static int process_one_ticket(struct ceph_auth_client *ac,
}
/* blob for me */
- dlen = ceph_x_decrypt(secret, p, end, &dbuf, 0);
- if (dlen <= 0) {
- ret = dlen;
+ dp = *p + ceph_x_encrypt_offset();
+ ret = ceph_x_decrypt(secret, p, end);
+ if (ret < 0)
goto out;
- }
- dout(" decrypted %d bytes\n", dlen);
- dp = dbuf;
- dend = dp + dlen;
+ dout(" decrypted %d bytes\n", ret);
+ dend = dp + ret;
tkt_struct_v = ceph_decode_8(&dp);
if (tkt_struct_v != 1)
goto bad;
- memcpy(&old_key, &th->session_key, sizeof(old_key));
ret = ceph_crypto_key_decode(&new_session_key, &dp, dend);
if (ret)
goto out;
@@ -203,15 +199,13 @@ static int process_one_ticket(struct ceph_auth_client *ac,
ceph_decode_8_safe(p, end, is_enc, bad);
if (is_enc) {
/* encrypted */
- dout(" encrypted ticket\n");
- dlen = ceph_x_decrypt(&old_key, p, end, &ticket_buf, 0);
- if (dlen < 0) {
- ret = dlen;
+ tp = *p + ceph_x_encrypt_offset();
+ ret = ceph_x_decrypt(&th->session_key, p, end);
+ if (ret < 0)
goto out;
- }
- tp = ticket_buf;
+ dout(" encrypted ticket, decrypted %d bytes\n", ret);
ptp = &tp;
- tpend = *ptp + dlen;
+ tpend = tp + ret;
} else {
/* unencrypted */
ptp = p;
@@ -242,8 +236,6 @@ static int process_one_ticket(struct ceph_auth_client *ac,
xi->have_keys |= th->service;
out:
- kfree(ticket_buf);
- kfree(dbuf);
return ret;
bad:
@@ -294,7 +286,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
{
int maxlen;
struct ceph_x_authorize_a *msg_a;
- struct ceph_x_authorize_b msg_b;
+ struct ceph_x_authorize_b *msg_b;
void *p, *end;
int ret;
int ticket_blob_len =
@@ -308,8 +300,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
if (ret)
goto out_au;
- maxlen = sizeof(*msg_a) + sizeof(msg_b) +
- ceph_x_encrypt_buflen(ticket_blob_len);
+ maxlen = sizeof(*msg_a) + ticket_blob_len +
+ ceph_x_encrypt_buflen(sizeof(*msg_b));
dout(" need len %d\n", maxlen);
if (au->buf && au->buf->alloc_len < maxlen) {
ceph_buffer_put(au->buf);
@@ -343,18 +335,19 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
p += ticket_blob_len;
end = au->buf->vec.iov_base + au->buf->vec.iov_len;
+ msg_b = p + ceph_x_encrypt_offset();
+ msg_b->struct_v = 1;
get_random_bytes(&au->nonce, sizeof(au->nonce));
- msg_b.struct_v = 1;
- msg_b.nonce = cpu_to_le64(au->nonce);
- ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b),
- p, end - p);
+ msg_b->nonce = cpu_to_le64(au->nonce);
+ ret = ceph_x_encrypt(&au->session_key, p, end - p, sizeof(*msg_b));
if (ret < 0)
goto out_au;
+
p += ret;
+ WARN_ON(p > end);
au->buf->vec.iov_len = p - au->buf->vec.iov_base;
dout(" built authorizer nonce %llx len %d\n", au->nonce,
(int)au->buf->vec.iov_len);
- BUG_ON(au->buf->vec.iov_len > maxlen);
return 0;
out_au:
@@ -452,8 +445,9 @@ static int ceph_x_build_request(struct ceph_auth_client *ac,
if (need & CEPH_ENTITY_TYPE_AUTH) {
struct ceph_x_authenticate *auth = (void *)(head + 1);
void *p = auth + 1;
- struct ceph_x_challenge_blob tmp;
- char tmp_enc[40];
+ void *enc_buf = xi->auth_authorizer.enc_buf;
+ struct ceph_x_challenge_blob *blob = enc_buf +
+ ceph_x_encrypt_offset();
u64 *u;
if (p > end)
@@ -464,16 +458,16 @@ static int ceph_x_build_request(struct ceph_auth_client *ac,
/* encrypt and hash */
get_random_bytes(&auth->client_challenge, sizeof(u64));
- tmp.client_challenge = auth->client_challenge;
- tmp.server_challenge = cpu_to_le64(xi->server_challenge);
- ret = ceph_x_encrypt(&xi->secret, &tmp, sizeof(tmp),
- tmp_enc, sizeof(tmp_enc));
+ blob->client_challenge = auth->client_challenge;
+ blob->server_challenge = cpu_to_le64(xi->server_challenge);
+ ret = ceph_x_encrypt(&xi->secret, enc_buf, CEPHX_AU_ENC_BUF_LEN,
+ sizeof(*blob));
if (ret < 0)
return ret;
auth->struct_v = 1;
auth->key = 0;
- for (u = (u64 *)tmp_enc; u + 1 <= (u64 *)(tmp_enc + ret); u++)
+ for (u = (u64 *)enc_buf; u + 1 <= (u64 *)(enc_buf + ret); u++)
auth->key ^= *(__le64 *)u;
dout(" server_challenge %llx client_challenge %llx key %llx\n",
xi->server_challenge, le64_to_cpu(auth->client_challenge),
@@ -600,8 +594,8 @@ static int ceph_x_create_authorizer(
auth->authorizer = (struct ceph_authorizer *) au;
auth->authorizer_buf = au->buf->vec.iov_base;
auth->authorizer_buf_len = au->buf->vec.iov_len;
- auth->authorizer_reply_buf = au->reply_buf;
- auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
+ auth->authorizer_reply_buf = au->enc_buf;
+ auth->authorizer_reply_buf_len = CEPHX_AU_ENC_BUF_LEN;
auth->sign_message = ac->ops->sign_message;
auth->check_message_signature = ac->ops->check_message_signature;
@@ -629,27 +623,25 @@ static int ceph_x_update_authorizer(
}
static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac,
- struct ceph_authorizer *a, size_t len)
+ struct ceph_authorizer *a)
{
struct ceph_x_authorizer *au = (void *)a;
- int ret = 0;
- struct ceph_x_authorize_reply reply;
- void *preply = &reply;
- void *p = au->reply_buf;
- void *end = p + sizeof(au->reply_buf);
+ void *p = au->enc_buf;
+ struct ceph_x_authorize_reply *reply = p + ceph_x_encrypt_offset();
+ int ret;
- ret = ceph_x_decrypt(&au->session_key, &p, end, &preply, sizeof(reply));
+ ret = ceph_x_decrypt(&au->session_key, &p, p + CEPHX_AU_ENC_BUF_LEN);
if (ret < 0)
return ret;
- if (ret != sizeof(reply))
+ if (ret != sizeof(*reply))
return -EPERM;
- if (au->nonce + 1 != le64_to_cpu(reply.nonce_plus_one))
+ if (au->nonce + 1 != le64_to_cpu(reply->nonce_plus_one))
ret = -EPERM;
else
ret = 0;
dout("verify_authorizer_reply nonce %llx got %llx ret %d\n",
- au->nonce, le64_to_cpu(reply.nonce_plus_one), ret);
+ au->nonce, le64_to_cpu(reply->nonce_plus_one), ret);
return ret;
}
@@ -704,35 +696,48 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
invalidate_ticket(ac, CEPH_ENTITY_TYPE_AUTH);
}
-static int calcu_signature(struct ceph_x_authorizer *au,
- struct ceph_msg *msg, __le64 *sig)
+static int calc_signature(struct ceph_x_authorizer *au, struct ceph_msg *msg,
+ __le64 *psig)
{
+ void *enc_buf = au->enc_buf;
+ struct {
+ __le32 len;
+ __le32 header_crc;
+ __le32 front_crc;
+ __le32 middle_crc;
+ __le32 data_crc;
+ } __packed *sigblock = enc_buf + ceph_x_encrypt_offset();
int ret;
- char tmp_enc[40];
- __le32 tmp[5] = {
- cpu_to_le32(16), msg->hdr.crc, msg->footer.front_crc,
- msg->footer.middle_crc, msg->footer.data_crc,
- };
- ret = ceph_x_encrypt(&au->session_key, &tmp, sizeof(tmp),
- tmp_enc, sizeof(tmp_enc));
+
+ sigblock->len = cpu_to_le32(4*sizeof(u32));
+ sigblock->header_crc = msg->hdr.crc;
+ sigblock->front_crc = msg->footer.front_crc;
+ sigblock->middle_crc = msg->footer.middle_crc;
+ sigblock->data_crc = msg->footer.data_crc;
+ ret = ceph_x_encrypt(&au->session_key, enc_buf, CEPHX_AU_ENC_BUF_LEN,
+ sizeof(*sigblock));
if (ret < 0)
return ret;
- *sig = *(__le64*)(tmp_enc + 4);
+
+ *psig = *(__le64 *)(enc_buf + sizeof(u32));
return 0;
}
static int ceph_x_sign_message(struct ceph_auth_handshake *auth,
struct ceph_msg *msg)
{
+ __le64 sig;
int ret;
if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN))
return 0;
- ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
- msg, &msg->footer.sig);
- if (ret < 0)
+ ret = calc_signature((struct ceph_x_authorizer *)auth->authorizer,
+ msg, &sig);
+ if (ret)
return ret;
+
+ msg->footer.sig = sig;
msg->footer.flags |= CEPH_MSG_FOOTER_SIGNED;
return 0;
}
@@ -746,9 +751,9 @@ static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth,
if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN))
return 0;
- ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
- msg, &sig_check);
- if (ret < 0)
+ ret = calc_signature((struct ceph_x_authorizer *)auth->authorizer,
+ msg, &sig_check);
+ if (ret)
return ret;
if (sig_check == msg->footer.sig)
return 0;
diff --git a/net/ceph/auth_x.h b/net/ceph/auth_x.h
index 21a5af904bae75..48e9ad41bd2aa7 100644
--- a/net/ceph/auth_x.h
+++ b/net/ceph/auth_x.h
@@ -24,6 +24,7 @@ struct ceph_x_ticket_handler {
unsigned long renew_after, expires;
};
+#define CEPHX_AU_ENC_BUF_LEN 128 /* big enough for encrypted blob */
struct ceph_x_authorizer {
struct ceph_authorizer base;
@@ -32,7 +33,7 @@ struct ceph_x_authorizer {
unsigned int service;
u64 nonce;
u64 secret_id;
- char reply_buf[128]; /* big enough for encrypted blob */
+ char enc_buf[CEPHX_AU_ENC_BUF_LEN] __aligned(8);
};
struct ceph_x_info {
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index a421e905331af3..130ab407c5ecf8 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -17,10 +17,12 @@
# include <linux/kernel.h>
# include <linux/crush/crush.h>
# include <linux/crush/hash.h>
+# include <linux/crush/mapper.h>
#else
# include "crush_compat.h"
# include "crush.h"
# include "hash.h"
+# include "mapper.h"
#endif
#include "crush_ln_table.h"
diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c
index db2847ac5f1229..3949ce70be07bc 100644
--- a/net/ceph/crypto.c
+++ b/net/ceph/crypto.c
@@ -13,14 +13,60 @@
#include <linux/ceph/decode.h>
#include "crypto.h"
+/*
+ * Set ->key and ->tfm. The rest of the key should be filled in before
+ * this function is called.
+ */
+static int set_secret(struct ceph_crypto_key *key, void *buf)
+{
+ unsigned int noio_flag;
+ int ret;
+
+ key->key = NULL;
+ key->tfm = NULL;
+
+ switch (key->type) {
+ case CEPH_CRYPTO_NONE:
+ return 0; /* nothing to do */
+ case CEPH_CRYPTO_AES:
+ break;
+ default:
+ return -ENOTSUPP;
+ }
+
+ WARN_ON(!key->len);
+ key->key = kmemdup(buf, key->len, GFP_NOIO);
+ if (!key->key) {
+ ret = -ENOMEM;
+ goto fail;
+ }
+
+ /* crypto_alloc_skcipher() allocates with GFP_KERNEL */
+ noio_flag = memalloc_noio_save();
+ key->tfm = crypto_alloc_skcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC);
+ memalloc_noio_restore(noio_flag);
+ if (IS_ERR(key->tfm)) {
+ ret = PTR_ERR(key->tfm);
+ key->tfm = NULL;
+ goto fail;
+ }
+
+ ret = crypto_skcipher_setkey(key->tfm, key->key, key->len);
+ if (ret)
+ goto fail;
+
+ return 0;
+
+fail:
+ ceph_crypto_key_destroy(key);
+ return ret;
+}
+
int ceph_crypto_key_clone(struct ceph_crypto_key *dst,
const struct ceph_crypto_key *src)
{
memcpy(dst, src, sizeof(struct ceph_crypto_key));
- dst->key = kmemdup(src->key, src->len, GFP_NOFS);
- if (!dst->key)
- return -ENOMEM;
- return 0;
+ return set_secret(dst, src->key);
}
int ceph_crypto_key_encode(struct ceph_crypto_key *key, void **p, void *end)
@@ -37,16 +83,16 @@ int ceph_crypto_key_encode(struct ceph_crypto_key *key, void **p, void *end)
int ceph_crypto_key_decode(struct ceph_crypto_key *key, void **p, void *end)
{
+ int ret;
+
ceph_decode_need(p, end, 2*sizeof(u16) + sizeof(key->created), bad);
key->type = ceph_decode_16(p);
ceph_decode_copy(p, &key->created, sizeof(key->created));
key->len = ceph_decode_16(p);
ceph_decode_need(p, end, key->len, bad);
- key->key = kmalloc(key->len, GFP_NOFS);
- if (!key->key)
- return -ENOMEM;
- ceph_decode_copy(p, key->key, key->len);
- return 0;
+ ret = set_secret(key, *p);
+ *p += key->len;
+ return ret;
bad:
dout("failed to decode crypto key\n");
@@ -80,9 +126,14 @@ int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *inkey)
return 0;
}
-static struct crypto_skcipher *ceph_crypto_alloc_cipher(void)
+void ceph_crypto_key_destroy(struct ceph_crypto_key *key)
{
- return crypto_alloc_skcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC);
+ if (key) {
+ kfree(key->key);
+ key->key = NULL;
+ crypto_free_skcipher(key->tfm);
+ key->tfm = NULL;
+ }
}
static const u8 *aes_iv = (u8 *)CEPH_AES_IV;
@@ -157,372 +208,82 @@ static void teardown_sgtable(struct sg_table *sgt)
sg_free_table(sgt);
}
-static int ceph_aes_encrypt(const void *key, int key_len,
- void *dst, size_t *dst_len,
- const void *src, size_t src_len)
+static int ceph_aes_crypt(const struct ceph_crypto_key *key, bool encrypt,
+ void *buf, int buf_len, int in_len, int *pout_len)
{
- struct scatterlist sg_in[2], prealloc_sg;
- struct sg_table sg_out;
- struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher();
- SKCIPHER_REQUEST_ON_STACK(req, tfm);
- int ret;
+ SKCIPHER_REQUEST_ON_STACK(req, key->tfm);
+ struct sg_table sgt;
+ struct scatterlist prealloc_sg;
char iv[AES_BLOCK_SIZE];
- size_t zero_padding = (0x10 - (src_len & 0x0f));
- char pad[16];
-
- if (IS_ERR(tfm))
- return PTR_ERR(tfm);
-
- memset(pad, zero_padding, zero_padding);
-
- *dst_len = src_len + zero_padding;
-
- sg_init_table(sg_in, 2);
- sg_set_buf(&sg_in[0], src, src_len);
- sg_set_buf(&sg_in[1], pad, zero_padding);
- ret = setup_sgtable(&sg_out, &prealloc_sg, dst, *dst_len);
- if (ret)
- goto out_tfm;
-
- crypto_skcipher_setkey((void *)tfm, key, key_len);
- memcpy(iv, aes_iv, AES_BLOCK_SIZE);
-
- skcipher_request_set_tfm(req, tfm);
- skcipher_request_set_callback(req, 0, NULL, NULL);
- skcipher_request_set_crypt(req, sg_in, sg_out.sgl,
- src_len + zero_padding, iv);
-
- /*
- print_hex_dump(KERN_ERR, "enc key: ", DUMP_PREFIX_NONE, 16, 1,
- key, key_len, 1);
- print_hex_dump(KERN_ERR, "enc src: ", DUMP_PREFIX_NONE, 16, 1,
- src, src_len, 1);
- print_hex_dump(KERN_ERR, "enc pad: ", DUMP_PREFIX_NONE, 16, 1,
- pad, zero_padding, 1);
- */
- ret = crypto_skcipher_encrypt(req);
- skcipher_request_zero(req);
- if (ret < 0) {
- pr_err("ceph_aes_crypt failed %d\n", ret);
- goto out_sg;
- }
- /*
- print_hex_dump(KERN_ERR, "enc out: ", DUMP_PREFIX_NONE, 16, 1,
- dst, *dst_len, 1);
- */
-
-out_sg:
- teardown_sgtable(&sg_out);
-out_tfm:
- crypto_free_skcipher(tfm);
- return ret;
-}
-
-static int ceph_aes_encrypt2(const void *key, int key_len, void *dst,
- size_t *dst_len,
- const void *src1, size_t src1_len,
- const void *src2, size_t src2_len)
-{
- struct scatterlist sg_in[3], prealloc_sg;
- struct sg_table sg_out;
- struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher();
- SKCIPHER_REQUEST_ON_STACK(req, tfm);
+ int pad_byte = AES_BLOCK_SIZE - (in_len & (AES_BLOCK_SIZE - 1));
+ int crypt_len = encrypt ? in_len + pad_byte : in_len;
int ret;
- char iv[AES_BLOCK_SIZE];
- size_t zero_padding = (0x10 - ((src1_len + src2_len) & 0x0f));
- char pad[16];
- if (IS_ERR(tfm))
- return PTR_ERR(tfm);
-
- memset(pad, zero_padding, zero_padding);
-
- *dst_len = src1_len + src2_len + zero_padding;
-
- sg_init_table(sg_in, 3);
- sg_set_buf(&sg_in[0], src1, src1_len);
- sg_set_buf(&sg_in[1], src2, src2_len);
- sg_set_buf(&sg_in[2], pad, zero_padding);
- ret = setup_sgtable(&sg_out, &prealloc_sg, dst, *dst_len);
+ WARN_ON(crypt_len > buf_len);
+ if (encrypt)
+ memset(buf + in_len, pad_byte, pad_byte);
+ ret = setup_sgtable(&sgt, &prealloc_sg, buf, crypt_len);
if (ret)
- goto out_tfm;
-
- crypto_skcipher_setkey((void *)tfm, key, key_len);
- memcpy(iv, aes_iv, AES_BLOCK_SIZE);
-
- skcipher_request_set_tfm(req, tfm);
- skcipher_request_set_callback(req, 0, NULL, NULL);
- skcipher_request_set_crypt(req, sg_in, sg_out.sgl,
- src1_len + src2_len + zero_padding, iv);
-
- /*
- print_hex_dump(KERN_ERR, "enc key: ", DUMP_PREFIX_NONE, 16, 1,
- key, key_len, 1);
- print_hex_dump(KERN_ERR, "enc src1: ", DUMP_PREFIX_NONE, 16, 1,
- src1, src1_len, 1);
- print_hex_dump(KERN_ERR, "enc src2: ", DUMP_PREFIX_NONE, 16, 1,
- src2, src2_len, 1);
- print_hex_dump(KERN_ERR, "enc pad: ", DUMP_PREFIX_NONE, 16, 1,
- pad, zero_padding, 1);
- */
- ret = crypto_skcipher_encrypt(req);
- skcipher_request_zero(req);
- if (ret < 0) {
- pr_err("ceph_aes_crypt2 failed %d\n", ret);
- goto out_sg;
- }
- /*
- print_hex_dump(KERN_ERR, "enc out: ", DUMP_PREFIX_NONE, 16, 1,
- dst, *dst_len, 1);
- */
-
-out_sg:
- teardown_sgtable(&sg_out);
-out_tfm:
- crypto_free_skcipher(tfm);
- return ret;
-}
-
-static int ceph_aes_decrypt(const void *key, int key_len,
- void *dst, size_t *dst_len,
- const void *src, size_t src_len)
-{
- struct sg_table sg_in;
- struct scatterlist sg_out[2], prealloc_sg;
- struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher();
- SKCIPHER_REQUEST_ON_STACK(req, tfm);
- char pad[16];
- char iv[AES_BLOCK_SIZE];
- int ret;
- int last_byte;
-
- if (IS_ERR(tfm))
- return PTR_ERR(tfm);
-
- sg_init_table(sg_out, 2);
- sg_set_buf(&sg_out[0], dst, *dst_len);
- sg_set_buf(&sg_out[1], pad, sizeof(pad));
- ret = setup_sgtable(&sg_in, &prealloc_sg, src, src_len);
- if (ret)
- goto out_tfm;
+ return ret;
- crypto_skcipher_setkey((void *)tfm, key, key_len);
memcpy(iv, aes_iv, AES_BLOCK_SIZE);
-
- skcipher_request_set_tfm(req, tfm);
+ skcipher_request_set_tfm(req, key->tfm);
skcipher_request_set_callback(req, 0, NULL, NULL);
- skcipher_request_set_crypt(req, sg_in.sgl, sg_out,
- src_len, iv);
+ skcipher_request_set_crypt(req, sgt.sgl, sgt.sgl, crypt_len, iv);
/*
- print_hex_dump(KERN_ERR, "dec key: ", DUMP_PREFIX_NONE, 16, 1,
- key, key_len, 1);
- print_hex_dump(KERN_ERR, "dec in: ", DUMP_PREFIX_NONE, 16, 1,
- src, src_len, 1);
+ print_hex_dump(KERN_ERR, "key: ", DUMP_PREFIX_NONE, 16, 1,
+ key->key, key->len, 1);
+ print_hex_dump(KERN_ERR, " in: ", DUMP_PREFIX_NONE, 16, 1,
+ buf, crypt_len, 1);
*/
- ret = crypto_skcipher_decrypt(req);
- skcipher_request_zero(req);
- if (ret < 0) {
- pr_err("ceph_aes_decrypt failed %d\n", ret);
- goto out_sg;
- }
-
- if (src_len <= *dst_len)
- last_byte = ((char *)dst)[src_len - 1];
+ if (encrypt)
+ ret = crypto_skcipher_encrypt(req);
else
- last_byte = pad[src_len - *dst_len - 1];
- if (last_byte <= 16 && src_len >= last_byte) {
- *dst_len = src_len - last_byte;
- } else {
- pr_err("ceph_aes_decrypt got bad padding %d on src len %d\n",
- last_byte, (int)src_len);
- return -EPERM; /* bad padding */
- }
- /*
- print_hex_dump(KERN_ERR, "dec out: ", DUMP_PREFIX_NONE, 16, 1,
- dst, *dst_len, 1);
- */
-
-out_sg:
- teardown_sgtable(&sg_in);
-out_tfm:
- crypto_free_skcipher(tfm);
- return ret;
-}
-
-static int ceph_aes_decrypt2(const void *key, int key_len,
- void *dst1, size_t *dst1_len,
- void *dst2, size_t *dst2_len,
- const void *src, size_t src_len)
-{
- struct sg_table sg_in;
- struct scatterlist sg_out[3], prealloc_sg;
- struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher();
- SKCIPHER_REQUEST_ON_STACK(req, tfm);
- char pad[16];
- char iv[AES_BLOCK_SIZE];
- int ret;
- int last_byte;
-
- if (IS_ERR(tfm))
- return PTR_ERR(tfm);
-
- sg_init_table(sg_out, 3);
- sg_set_buf(&sg_out[0], dst1, *dst1_len);
- sg_set_buf(&sg_out[1], dst2, *dst2_len);
- sg_set_buf(&sg_out[2], pad, sizeof(pad));
- ret = setup_sgtable(&sg_in, &prealloc_sg, src, src_len);
- if (ret)
- goto out_tfm;
-
- crypto_skcipher_setkey((void *)tfm, key, key_len);
- memcpy(iv, aes_iv, AES_BLOCK_SIZE);
-
- skcipher_request_set_tfm(req, tfm);
- skcipher_request_set_callback(req, 0, NULL, NULL);
- skcipher_request_set_crypt(req, sg_in.sgl, sg_out,
- src_len, iv);
-
- /*
- print_hex_dump(KERN_ERR, "dec key: ", DUMP_PREFIX_NONE, 16, 1,
- key, key_len, 1);
- print_hex_dump(KERN_ERR, "dec in: ", DUMP_PREFIX_NONE, 16, 1,
- src, src_len, 1);
- */
- ret = crypto_skcipher_decrypt(req);
+ ret = crypto_skcipher_decrypt(req);
skcipher_request_zero(req);
- if (ret < 0) {
- pr_err("ceph_aes_decrypt failed %d\n", ret);
- goto out_sg;
- }
-
- if (src_len <= *dst1_len)
- last_byte = ((char *)dst1)[src_len - 1];
- else if (src_len <= *dst1_len + *dst2_len)
- last_byte = ((char *)dst2)[src_len - *dst1_len - 1];
- else
- last_byte = pad[src_len - *dst1_len - *dst2_len - 1];
- if (last_byte <= 16 && src_len >= last_byte) {
- src_len -= last_byte;
- } else {
- pr_err("ceph_aes_decrypt got bad padding %d on src len %d\n",
- last_byte, (int)src_len);
- return -EPERM; /* bad padding */
- }
-
- if (src_len < *dst1_len) {
- *dst1_len = src_len;
- *dst2_len = 0;
- } else {
- *dst2_len = src_len - *dst1_len;
+ if (ret) {
+ pr_err("%s %scrypt failed: %d\n", __func__,
+ encrypt ? "en" : "de", ret);
+ goto out_sgt;
}
/*
- print_hex_dump(KERN_ERR, "dec out1: ", DUMP_PREFIX_NONE, 16, 1,
- dst1, *dst1_len, 1);
- print_hex_dump(KERN_ERR, "dec out2: ", DUMP_PREFIX_NONE, 16, 1,
- dst2, *dst2_len, 1);
+ print_hex_dump(KERN_ERR, "out: ", DUMP_PREFIX_NONE, 16, 1,
+ buf, crypt_len, 1);
*/
-out_sg:
- teardown_sgtable(&sg_in);
-out_tfm:
- crypto_free_skcipher(tfm);
- return ret;
-}
-
-
-int ceph_decrypt(struct ceph_crypto_key *secret, void *dst, size_t *dst_len,
- const void *src, size_t src_len)
-{
- switch (secret->type) {
- case CEPH_CRYPTO_NONE:
- if (*dst_len < src_len)
- return -ERANGE;
- memcpy(dst, src, src_len);
- *dst_len = src_len;
- return 0;
-
- case CEPH_CRYPTO_AES:
- return ceph_aes_decrypt(secret->key, secret->len, dst,
- dst_len, src, src_len);
-
- default:
- return -EINVAL;
- }
-}
-
-int ceph_decrypt2(struct ceph_crypto_key *secret,
- void *dst1, size_t *dst1_len,
- void *dst2, size_t *dst2_len,
- const void *src, size_t src_len)
-{
- size_t t;
-
- switch (secret->type) {
- case CEPH_CRYPTO_NONE:
- if (*dst1_len + *dst2_len < src_len)
- return -ERANGE;
- t = min(*dst1_len, src_len);
- memcpy(dst1, src, t);
- *dst1_len = t;
- src += t;
- src_len -= t;
- if (src_len) {
- t = min(*dst2_len, src_len);
- memcpy(dst2, src, t);
- *dst2_len = t;
+ if (encrypt) {
+ *pout_len = crypt_len;
+ } else {
+ pad_byte = *(char *)(buf + in_len - 1);
+ if (pad_byte > 0 && pad_byte <= AES_BLOCK_SIZE &&
+ in_len >= pad_byte) {
+ *pout_len = in_len - pad_byte;
+ } else {
+ pr_err("%s got bad padding %d on in_len %d\n",
+ __func__, pad_byte, in_len);
+ ret = -EPERM;
+ goto out_sgt;
}
- return 0;
-
- case CEPH_CRYPTO_AES:
- return ceph_aes_decrypt2(secret->key, secret->len,
- dst1, dst1_len, dst2, dst2_len,
- src, src_len);
-
- default:
- return -EINVAL;
}
-}
-
-int ceph_encrypt(struct ceph_crypto_key *secret, void *dst, size_t *dst_len,
- const void *src, size_t src_len)
-{
- switch (secret->type) {
- case CEPH_CRYPTO_NONE:
- if (*dst_len < src_len)
- return -ERANGE;
- memcpy(dst, src, src_len);
- *dst_len = src_len;
- return 0;
- case CEPH_CRYPTO_AES:
- return ceph_aes_encrypt(secret->key, secret->len, dst,
- dst_len, src, src_len);
-
- default:
- return -EINVAL;
- }
+out_sgt:
+ teardown_sgtable(&sgt);
+ return ret;
}
-int ceph_encrypt2(struct ceph_crypto_key *secret, void *dst, size_t *dst_len,
- const void *src1, size_t src1_len,
- const void *src2, size_t src2_len)
+int ceph_crypt(const struct ceph_crypto_key *key, bool encrypt,
+ void *buf, int buf_len, int in_len, int *pout_len)
{
- switch (secret->type) {
+ switch (key->type) {
case CEPH_CRYPTO_NONE:
- if (*dst_len < src1_len + src2_len)
- return -ERANGE;
- memcpy(dst, src1, src1_len);
- memcpy(dst + src1_len, src2, src2_len);
- *dst_len = src1_len + src2_len;
+ *pout_len = in_len;
return 0;
-
case CEPH_CRYPTO_AES:
- return ceph_aes_encrypt2(secret->key, secret->len, dst, dst_len,
- src1, src1_len, src2, src2_len);
-
+ return ceph_aes_crypt(key, encrypt, buf, buf_len, in_len,
+ pout_len);
default:
- return -EINVAL;
+ return -ENOTSUPP;
}
}
diff --git a/net/ceph/crypto.h b/net/ceph/crypto.h
index 2e9cab09f37ba6..58d83aa7740f64 100644
--- a/net/ceph/crypto.h
+++ b/net/ceph/crypto.h
@@ -12,37 +12,19 @@ struct ceph_crypto_key {
struct ceph_timespec created;
int len;
void *key;
+ struct crypto_skcipher *tfm;
};
-static inline void ceph_crypto_key_destroy(struct ceph_crypto_key *key)
-{
- if (key) {
- kfree(key->key);
- key->key = NULL;
- }
-}
-
int ceph_crypto_key_clone(struct ceph_crypto_key *dst,
const struct ceph_crypto_key *src);
int ceph_crypto_key_encode(struct ceph_crypto_key *key, void **p, void *end);
int ceph_crypto_key_decode(struct ceph_crypto_key *key, void **p, void *end);
int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *in);
+void ceph_crypto_key_destroy(struct ceph_crypto_key *key);
/* crypto.c */
-int ceph_decrypt(struct ceph_crypto_key *secret,
- void *dst, size_t *dst_len,
- const void *src, size_t src_len);
-int ceph_encrypt(struct ceph_crypto_key *secret,
- void *dst, size_t *dst_len,
- const void *src, size_t src_len);
-int ceph_decrypt2(struct ceph_crypto_key *secret,
- void *dst1, size_t *dst1_len,
- void *dst2, size_t *dst2_len,
- const void *src, size_t src_len);
-int ceph_encrypt2(struct ceph_crypto_key *secret,
- void *dst, size_t *dst_len,
- const void *src1, size_t src1_len,
- const void *src2, size_t src2_len);
+int ceph_crypt(const struct ceph_crypto_key *key, bool encrypt,
+ void *buf, int buf_len, int in_len, int *pout_len);
int ceph_crypto_init(void);
void ceph_crypto_shutdown(void);
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index a5502898ea33b0..770c52701efa3e 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1393,15 +1393,9 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
return NULL;
}
- /* Can't hold the mutex while getting authorizer */
- mutex_unlock(&con->mutex);
auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
- mutex_lock(&con->mutex);
-
if (IS_ERR(auth))
return auth;
- if (con->state != CON_STATE_NEGOTIATING)
- return ERR_PTR(-EAGAIN);
con->auth_reply_buf = auth->authorizer_reply_buf;
con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
@@ -2027,6 +2021,19 @@ static int process_connect(struct ceph_connection *con)
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
+ if (con->auth_reply_buf) {
+ /*
+ * Any connection that defines ->get_authorizer()
+ * should also define ->verify_authorizer_reply().
+ * See get_connect_authorizer().
+ */
+ ret = con->ops->verify_authorizer_reply(con);
+ if (ret < 0) {
+ con->error_msg = "bad authorize reply";
+ return ret;
+ }
+ }
+
switch (con->in_reply.tag) {
case CEPH_MSGR_TAG_FEATURES:
pr_err("%s%lld %s feature set mismatch,"
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index a8effc8b728092..29a0ef351c5e06 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -1028,21 +1028,21 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
err = -ENOMEM;
monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
sizeof(struct ceph_mon_subscribe_ack),
- GFP_NOFS, true);
+ GFP_KERNEL, true);
if (!monc->m_subscribe_ack)
goto out_auth;
- monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, GFP_NOFS,
- true);
+ monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
+ GFP_KERNEL, true);
if (!monc->m_subscribe)
goto out_subscribe_ack;
- monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
- true);
+ monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
+ GFP_KERNEL, true);
if (!monc->m_auth_reply)
goto out_subscribe;
- monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
+ monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
monc->pending_auth = 0;
if (!monc->m_auth)
goto out_auth_reply;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index e6ae15bc41b74d..842f049abb86d9 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -460,7 +460,7 @@ static void request_init(struct ceph_osd_request *req)
kref_init(&req->r_kref);
init_completion(&req->r_completion);
- init_completion(&req->r_safe_completion);
+ init_completion(&req->r_done_completion);
RB_CLEAR_NODE(&req->r_node);
RB_CLEAR_NODE(&req->r_mc_node);
INIT_LIST_HEAD(&req->r_unsafe_item);
@@ -1725,7 +1725,7 @@ static void submit_request(struct ceph_osd_request *req, bool wrlocked)
__submit_request(req, wrlocked);
}
-static void __finish_request(struct ceph_osd_request *req)
+static void finish_request(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
struct ceph_osd *osd = req->r_osd;
@@ -1747,12 +1747,6 @@ static void __finish_request(struct ceph_osd_request *req)
ceph_msg_revoke_incoming(req->r_reply);
}
-static void finish_request(struct ceph_osd_request *req)
-{
- __finish_request(req);
- ceph_osdc_put_request(req);
-}
-
static void __complete_request(struct ceph_osd_request *req)
{
if (req->r_callback)
@@ -1770,9 +1764,9 @@ static void complete_request(struct ceph_osd_request *req, int err)
dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
req->r_result = err;
- __finish_request(req);
+ finish_request(req);
__complete_request(req);
- complete_all(&req->r_safe_completion);
+ complete_all(&req->r_done_completion);
ceph_osdc_put_request(req);
}
@@ -1798,6 +1792,8 @@ static void cancel_request(struct ceph_osd_request *req)
cancel_map_check(req);
finish_request(req);
+ complete_all(&req->r_done_completion);
+ ceph_osdc_put_request(req);
}
static void check_pool_dne(struct ceph_osd_request *req)
@@ -2808,12 +2804,12 @@ static bool done_request(const struct ceph_osd_request *req,
* ->r_unsafe_callback is set? yes no
*
* first reply is OK (needed r_cb/r_completion, r_cb/r_completion,
- * any or needed/got safe) r_safe_completion r_safe_completion
+ * any or needed/got safe) r_done_completion r_done_completion
*
* first reply is unsafe r_unsafe_cb(true) (nothing)
*
* when we get the safe reply r_unsafe_cb(false), r_cb/r_completion,
- * r_safe_completion r_safe_completion
+ * r_done_completion r_done_completion
*/
static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
{
@@ -2915,7 +2911,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
}
if (done_request(req, &m)) {
- __finish_request(req);
+ finish_request(req);
if (req->r_linger) {
WARN_ON(req->r_unsafe_callback);
dout("req %p tid %llu cb (locked)\n", req, req->r_tid);
@@ -2934,8 +2930,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
dout("req %p tid %llu cb\n", req, req->r_tid);
__complete_request(req);
}
- if (m.flags & CEPH_OSD_FLAG_ONDISK)
- complete_all(&req->r_safe_completion);
+ complete_all(&req->r_done_completion);
ceph_osdc_put_request(req);
} else {
if (req->r_unsafe_callback) {
@@ -3471,9 +3466,8 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
EXPORT_SYMBOL(ceph_osdc_start_request);
/*
- * Unregister a registered request. The request is not completed (i.e.
- * no callbacks or wakeups) - higher layers are supposed to know what
- * they are canceling.
+ * Unregister a registered request. The request is not completed:
+ * ->r_result isn't set and __complete_request() isn't called.
*/
void ceph_osdc_cancel_request(struct ceph_osd_request *req)
{
@@ -3500,9 +3494,6 @@ static int wait_request_timeout(struct ceph_osd_request *req,
if (left <= 0) {
left = left ?: -ETIMEDOUT;
ceph_osdc_cancel_request(req);
-
- /* kludge - need to to wake ceph_osdc_sync() */
- complete_all(&req->r_safe_completion);
} else {
left = req->r_result; /* completed */
}
@@ -3549,7 +3540,7 @@ again:
up_read(&osdc->lock);
dout("%s waiting on req %p tid %llu last_tid %llu\n",
__func__, req, req->r_tid, last_tid);
- wait_for_completion(&req->r_safe_completion);
+ wait_for_completion(&req->r_done_completion);
ceph_osdc_put_request(req);
goto again;
}
@@ -4478,13 +4469,13 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
}
-static int verify_authorizer_reply(struct ceph_connection *con, int len)
+static int verify_authorizer_reply(struct ceph_connection *con)
{
struct ceph_osd *o = con->private;
struct ceph_osd_client *osdc = o->o_osdc;
struct ceph_auth_client *ac = osdc->client->monc.auth;
- return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len);
+ return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer);
}
static int invalidate_authorizer(struct ceph_connection *con)