一句话解释:epoll机制可以监听特定的fd,当fd收到内容时,发送事件回调。相比select和poll机制,效率更高。
- epoll_create(int size)
参数:
返回值:epoll实例的fd
作用:
初始化epoll机制,调用API后,操作系统内核会产生一个eventpoll实例,并返回一个fd,这个fd就是epoll实例的句柄。
- epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
参数:
typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; // 表示监听的事件类型(EPOLLIN/EPOLLHUP/EPOLLOUT...) epoll_data_t data; // 用户自定义数据,当事件发生时将会原样返回给用户 };
返回值:
作用:
注册、修改或删除监听的fd和事件。
- epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
参数:
返回值:
到来事件的个数,返回的事件存储在events数组中。
大家都知道,Android的主线程的Looper,本质是运行了一个死循环,不断从MessageQueue中取消息执行,如果没有消息,则会等待在nativePollOnce方法上,这个方法的底层原理就是epoll_wait.
下面一起来看源码,弄清楚Looper的整个流程是怎样的。
首先看Looper.java中的loop方法,这是我们启动一个线程looper的入口。
// Looper.java
public static void loop() { final Looper me = myLooper(); for (;;) { if (!loopOnce(me, ident, thresholdOverride)) { return; } } }
这个方法就是开启一个无限循环,调用loopOnce。
// Looper.java
private static boolean loopOnce(final Looper me, final long ident, final int thresholdOverride) { // 从messagequeue中获取下一条message Message msg = me.mQueue.next(); // 执行message msg.target.dispatchMessage(msg); // Android10开始,可以通过添加observer的方式,监听messsage的执行情况 if (observer != null) { observer.messageDispatched(token, msg); } // 回收message msg.recycleUnchecked(); return true; }
当loopOnce方法返回true之后,又会再次循环调到loopOnce,重复执行。
接下来我们看MessageQueue的next()方法。
// MessageQueue.java
Message next() { for (;;) { // 通过epoll机制,等待消息,或超时唤醒 nativePollOnce(ptr, nextPollTimeoutMills); synchronized (this) { Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { // msg.target == null 表示是同步屏障 // 如果有同步屏障,则直接跳到下一个异步的消息(同步的消息都过滤掉,先不处理) do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // 如果当前还没到达message的执行时间, 则获取当前的时间差作为timeout nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); }else{ // 省略一些链表的操作 prevMsg.next = msg.next; msg.next = null; // 直接返回已经到达执行时间的,第一条message return msg; } } } } }
这个方法,首先会调用nativePollOnce这个native方法,等nativePollOnce返回后,会去MessageQueue的链表中取下一条待执行的message。
取message的方法:
接下来看nativePollOnce的实现。
// android_os_MessageQueue_nativePollOnce()
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { //将Java层传递下来的mPtr转换为nativeMessageQueue NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); 【3】 }
经过一系列调用,最后调到了Looper中的pollInner方法
Looper.cpp
int Looper::pollInner(int timeoutMillis) { ... struct epoll_event eventItems[EPOLL_MAX_EVENTS]; //fd最大个数为16 //等待事件发生或者超时,在nativeWake()方法,向管道写端写入字符,则该方法会返回; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); //循环遍历,处理所有的事件 for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeEventFd) { if (epollEvents & EPOLLIN) { // 如果是唤醒事件,则读取并清空管道数据 awoken(); } } else { ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { // 如果是之前在mRequests中注册过的fd //处理request,生成对应的reponse对象,push到响应数组 pushResponse(events, mRequests.valueAt(requestIndex)); } } } Done: ; //处理Native的Message,调用相应回调方法 while (mMessageEnvelopes.size() != 0) { const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { { handler->handleMessage(message); // 处理消息事件 } } else { mNextMessageUptime = messageEnvelope.uptime; break; } } //处理带有Callback()方法的Response事件,执行Reponse相应的回调方法 for (size_t i = 0; i < mResponses.size(); i++) { if (response.request.ident == POLL_CALLBACK) { // 处理请求的回调方法 int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); //移除fd } } } return result; }
Looper.pollInner主要做如下事情:
我们来看看awoken方法。它的逻辑很简单,就是循环读取fd中的全部内容。
// Looper.cpp
void Looper::awoken() { char buffer[16]; ssize_t nRead; do { nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer)); } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer)); }
关于mRequests和mResponses的逻辑,先挖个坑,后面的文章再讲。
下面我们写一个epoll机制使用的示例代码。
在这个例子中,我们监听了一个sockfd的管道端口,启动一个线程,等待在epoll_wait上。一旦sockfd中写入数据,就可以唤醒我们的线程,进行读取。
#includevoid MonitorInit::createEpoll(int sockfd) { if(mSockFd == sockfd) return; mEpollFd = epoll_create(EPOLL_MAX_EVENTS); int epollEvents = 0; epollEvents |= EPOLLIN; struct epoll_event eventItem; memset(&eventItem, 0, sizeof(epoll_event)); eventItem.events = epollEvents; eventItem.data.fd = sockfd; int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, sockfd, &eventItem); LLOG_ERROR(TAG_LOOPER, "Adding epoll event resutl %d", epollResult); if(epollResult < 0){ LLOG_ERROR(TAG_LOOPER, "Error adding epoll event, fd %d, errno %d", sockfd, epollResult); } pthread_t thd; // 开启一个线程,这个线程用来监听epoll_wait pthread_create(&thd, nullptr, epollCallback, nullptr); pthread_detach(thd); mSockFd = sockfd; } void epollCallback(void *arg){ // 死循环,等待fd消息 while(loop){ int timeoutMillis = 100000; struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); LLOG_ERROR(TAG_LOOPER, "receive event count %d", eventCount); if(eventCount < 0){ LLOG_ERROR(TAG_LOOPER, "Poll failed with an unexpected error, errno=%d", errno); } if(eventCount == 0){ // 超时时间到 LLOG_ERROR(TAG_LOOPER, "pollOnce - timeout"); } for(int i = 0; i < eventCount; i++){ int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if(fd == mSockFd){ // 将fd的内容读出来 } } } }
MessageQueue核心原理:主线程通过Looper中的死循环,不断从MessageQueue中获取待指定的message。
本文地址:https://www.r22.cn/jishuwz/466.html